Predicting stock prices has always been an attractive topic to both investors and researchers. Investors always question if the price of a stock will rise or not, since there are many complicated financial indicators that only investors and people with good finance knowledge can understand, the trend of stock market is inconsistent and look very random to ordinary people.

Machine learning is a great opportunity for non-experts to be able to predict accurately and gain steady fortune and may help experts to get the most informative indicators and make better predictions.

The purpose of this tutorial is to build a neural network in TensorFlow 2 and Keras that predicts stock market prices. More specifically, we will build a Recurrent Neural Network with LSTM cells as it is the current state-of-the-art in time series forecasting.

Alright, let's get start. First, you need to install Tensorflow 2 and other libraries:

`pip3 install tensorflow pandas numpy matplotlib yahoo_fin sklearn`

*More information on how you can install Tensorflow 2 here.*

Once you have everything set up, open up a new Python file (or a notebook) and import the following libraries:

```
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from yahoo_fin import stock_info as si
from collections import deque
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
import os
import random
```

We are using `yahoo_fin`

module, it is essentially a Python scraper that extracts finance data from Yahoo Finance platform, so it isn't a reliable API, feel free to use other data sources such as Alpha Vantage.

Also, we need to make sure after running our training/testing we get stable results, setting a seed will help:

```
# set seed, so we can get the same results after rerunning several times
np.random.seed(314)
tf.random.set_seed(314)
random.seed(314)
```

**Learn also:** How to Build a Spam Classifier using Keras in Python.

As a first step, we need to write a function that downloads the dataset from the Internet and preprocess it:

```
def load_data(ticker, n_steps=50, scale=True, shuffle=True, lookup_step=1,
test_size=0.2, feature_columns=['adjclose', 'volume', 'open', 'high', 'low']):
"""
Loads data from Yahoo Finance source, as well as scaling, shuffling, normalizing and splitting.
Params:
ticker (str/pd.DataFrame): the ticker you want to load, examples include AAPL, TESL, etc.
n_steps (int): the historical sequence length (i.e window size) used to predict, default is 50
scale (bool): whether to scale prices from 0 to 1, default is True
shuffle (bool): whether to shuffle the data, default is True
lookup_step (int): the future lookup step to predict, default is 1 (e.g next day)
test_size (float): ratio for test data, default is 0.2 (20% testing data)
feature_columns (list): the list of features to use to feed into the model, default is everything grabbed from yahoo_fin
"""
# see if ticker is already a loaded stock from yahoo finance
if isinstance(ticker, str):
# load it from yahoo_fin library
df = si.get_data(ticker)
elif isinstance(ticker, pd.DataFrame):
# already loaded, use it directly
df = ticker
else:
raise TypeError("ticker can be either a str or a `pd.DataFrame` instances")
# this will contain all the elements we want to return from this function
result = {}
# we will also return the original dataframe itself
result['df'] = df.copy()
# make sure that the passed feature_columns exist in the dataframe
for col in feature_columns:
assert col in df.columns, f"'{col}' does not exist in the dataframe."
if scale:
column_scaler = {}
# scale the data (prices) from 0 to 1
for column in feature_columns:
scaler = preprocessing.MinMaxScaler()
df[column] = scaler.fit_transform(np.expand_dims(df[column].values, axis=1))
column_scaler[column] = scaler
# add the MinMaxScaler instances to the result returned
result["column_scaler"] = column_scaler
# add the target column (label) by shifting by `lookup_step`
df['future'] = df['adjclose'].shift(-lookup_step)
# last `lookup_step` columns contains NaN in future column
# get them before droping NaNs
last_sequence = np.array(df[feature_columns].tail(lookup_step))
# drop NaNs
df.dropna(inplace=True)
sequence_data = []
sequences = deque(maxlen=n_steps)
for entry, target in zip(df[feature_columns].values, df['future'].values):
sequences.append(entry)
if len(sequences) == n_steps:
sequence_data.append([np.array(sequences), target])
# get the last sequence by appending the last `n_step` sequence with `lookup_step` sequence
# for instance, if n_steps=50 and lookup_step=10, last_sequence should be of 60 (that is 50+10) length
# this last_sequence will be used to predict future stock prices not available in the dataset
last_sequence = list(sequences) + list(last_sequence)
last_sequence = np.array(last_sequence)
# add to result
result['last_sequence'] = last_sequence
# construct the X's and y's
X, y = [], []
for seq, target in sequence_data:
X.append(seq)
y.append(target)
# convert to numpy arrays
X = np.array(X)
y = np.array(y)
# reshape X to fit the neural network
X = X.reshape((X.shape[0], X.shape[2], X.shape[1]))
# split the dataset
result["X_train"], result["X_test"], result["y_train"], result["y_test"] = train_test_split(X, y,
test_size=test_size, shuffle=shuffle)
# return the result
return result
```

This function is long but handy, it accepts several arguments to be as flexible as possible.

The ticker argument is the ticker we want to load, for instance, you can use TSLA for Tesla stock market, AAPL for Apple and so on.

n_steps integer indicates the historical sequence length we want to use, some people call it the window size, recall that we are going to use a recurrent neural network, we need to feed in to the network a sequence data, choosing 50 means that we will use 50 days of stock prices to predict the next day.

scale is a boolean variable that indicates whether to scale prices from 0 to 1, we will set this to True as scaling high values from 0 to 1 will help the neural network to learn much faster and more effectively.

lookup_step is the future lookup step to predict, the default is set to 1 (e.g next day).

We will be using all the features available in this dataset, which are the open, high, low, volume and adjusted close. Please check this tutorial to learn more what these indicators are.

The above function does the following:

- First, it loads the dataset using stock_info.get_data() function in yahoo_fin module.
- If the scale argument is passed as True, it will scale all the prices from 0 to 1 (including the volume) using the sklearn's MinMaxScaler class. Note that each column has its own scaler.
- It then adds the future column which indicates the target values (the labels to predict, or the y's) by shifting the adjusted close column by lookup_step.
- After that, it shuffles and splits the data and returns the result.

To understand the code even better, I highly suggest you to manually print the output variable (result) and see how the features and labels are made.

**Learn also:** How to Make a Speech Emotion Recognizer Using Python And Scikit-learn.

Now that we have a proper function to load and prepare the dataset, we need another core function to build our model:

```
def create_model(sequence_length, units=256, cell=LSTM, n_layers=2, dropout=0.3,
loss="mean_absolute_error", optimizer="rmsprop", bidirectional=False):
model = Sequential()
for i in range(n_layers):
if i == 0:
# first layer
if bidirectional:
model.add(Bidirectional(cell(units, return_sequences=True), input_shape=(None, sequence_length)))
else:
model.add(cell(units, return_sequences=True, input_shape=(None, sequence_length)))
elif i == n_layers - 1:
# last layer
if bidirectional:
model.add(Bidirectional(cell(units, return_sequences=False)))
else:
model.add(cell(units, return_sequences=False))
else:
# hidden layers
if bidirectional:
model.add(Bidirectional(cell(units, return_sequences=True)))
else:
model.add(cell(units, return_sequences=True))
# add dropout after each layer
model.add(Dropout(dropout))
model.add(Dense(1, activation="linear"))
model.compile(loss=loss, metrics=["mean_absolute_error"], optimizer=optimizer)
return model
```

Again, this function is flexible too, you can change the number of layers, dropout rate, the RNN cell, loss and the optimizer used to compile the model.

The above function constructs a RNN that has a dense layer as output layer with 1 neuron, this model requires a sequence of features of sequence_length (in this case, we will pass 50 or 100) consecutive time steps (which are days in this dataset) and outputs a single value which indicates the price of the next time step.

You can tweak the default parameters as you wish, `n_layers`

is the number of RNN layers you want to stack, `dropout`

is the dropout rate after each RNN layer, `units`

are the number of RNN `cell`

units (whether its LSTM, SimpleRNN or GRU), `bidirectional`

is a boolean that indicates whether to use bidirectional RNNs, experiment with those!

Now that we have all the core functions ready, let's train our model, but before we do that, let's initialize all our parameters (so you can edit them later on your needs):

```
# Window size or the sequence length
N_STEPS = 70
# Lookup step, 1 is the next day
LOOKUP_STEP = 1
# test ratio size, 0.2 is 20%
TEST_SIZE = 0.2
# features to use
FEATURE_COLUMNS = ["adjclose", "volume", "open", "high", "low"]
# date now
date_now = time.strftime("%Y-%m-%d")
### model parameters
N_LAYERS = 3
# LSTM cell
CELL = LSTM
# 256 LSTM neurons
UNITS = 256
# 40% dropout
DROPOUT = 0.4
# whether to use bidirectional RNNs
BIDIRECTIONAL = False
### training parameters
# mean absolute error loss
# LOSS = "mae"
# huber loss
LOSS = "huber_loss"
OPTIMIZER = "adam"
BATCH_SIZE = 64
EPOCHS = 400
# Tesla stock market
ticker = "TSLA"
ticker_data_filename = os.path.join("data", f"{ticker}_{date_now}.csv")
# model name to save, making it as unique as possible based on parameters
model_name = f"{date_now}_{ticker}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-seq-{N_STEPS}-step-{LOOKUP_STEP}-layers-{N_LAYERS}-units-{UNITS}"
if BIDIRECTIONAL:
model_name += "-b"
```

So the above code is all about defining all the hyper parameters we gonna use, we explained some of them, while we didn't on the others:

`TEST_SIZE`

: The testing sample rate. For instance 0.2 means 20% of the total dataset.`FEATURE_COLUMNS`

: The features we gonna use to predict the next price value.`N_LAYERS`

: Number of RNN layers to use.`CELL`

: RNN cell to use, default is LSTM.`UNITS`

: Number of`cell`

units.`DROPOUT`

: The dropout rate is the probability of not training a given node in a layer, where 0.0 means no dropout at all. This type of regularization can help the model to not overfit on our training data.`BIDIRECTIONAL`

: Whether to use bidirectional recurrent neural networks.`LOSS`

: Loss function to use for this regression problem, we're using Huber loss, you can use mean absolute error (mae) or mean squared error (mse) as well.`OPTIMIZER`

: Optimization algorithm to use, defaulting to Adam.`BATCH_SIZE`

: The number of data samples to use on each training iteration.`EPOCHS`

: The number of times that the learning algorithm will pass through the entire training dataset, we used 400 here, but try to increase it further more.

Feel free to experiment with these values to get better results than mine.

Alright, let's make sure the results, logs and data folders exist before we train:

```
# create these folders if they does not exist
if not os.path.isdir("results"):
os.mkdir("results")
if not os.path.isdir("logs"):
os.mkdir("logs")
if not os.path.isdir("data"):
os.mkdir("data")
```

Finally, let's train the model:

```
# load the data
data = load_data(ticker, N_STEPS, lookup_step=LOOKUP_STEP, test_size=TEST_SIZE, feature_columns=FEATURE_COLUMNS)
# save the dataframe
data["df"].to_csv(ticker_data_filename)
# construct the model
model = create_model(N_STEPS, loss=LOSS, units=UNITS, cell=CELL, n_layers=N_LAYERS,
dropout=DROPOUT, optimizer=OPTIMIZER, bidirectional=BIDIRECTIONAL)
# some tensorflow callbacks
checkpointer = ModelCheckpoint(os.path.join("results", model_name + ".h5"), save_weights_only=True, save_best_only=True, verbose=1)
tensorboard = TensorBoard(log_dir=os.path.join("logs", model_name))
history = model.fit(data["X_train"], data["y_train"],
batch_size=BATCH_SIZE,
epochs=EPOCHS,
validation_data=(data["X_test"], data["y_test"]),
callbacks=[checkpointer, tensorboard],
verbose=1)
model.save(os.path.join("results", model_name) + ".h5")
```

We used ModelCheckpoint that saves our model in each epoch during the training. We also used TensorBoard to visualize the model performance in the training process.

After running the above block of code, it will train the model for 300 epochs, so it will take some time, here is the first output lines:

```
Epoch 1/300
3510/3510 [==============================] - 21s 6ms/sample - loss: 0.0117 - mean_absolute_error: 0.0515 - val_loss: 0.0065 - val_mean_absolute_error: 0.0487
Epoch 2/300
3264/3510 [==========================>...] - ETA: 0s - loss: 0.0049 - mean_absolute_error: 0.0352
Epoch 00002: val_loss did not improve from 0.00650
3510/3510 [==============================] - 1s 309us/sample - loss: 0.0051 - mean_absolute_error: 0.0357 - val_loss: 0.0082 - val_mean_absolute_error: 0.0494
Epoch 3/300
3456/3510 [============================>.] - ETA: 0s - loss: 0.0039 - mean_absolute_error: 0.0329
Epoch 00003: val_loss improved from 0.00650 to 0.00095, saving model to results\2020-01-08_NFLX-mse-LSTM-seq-50-step-1-layers-3-units-256
3510/3510 [==============================] - 14s 4ms/sample - loss: 0.0039 - mean_absolute_error: 0.0328 - val_loss: 9.5337e-04 - val_mean_absolute_error: 0.0150
Epoch 4/300
3264/3510 [==========================>...] - ETA: 0s - loss: 0.0034 - mean_absolute_error: 0.0304
Epoch 00004: val_loss did not improve from 0.00095
3510/3510 [==============================] - 1s 222us/sample - loss: 0.0037 - mean_absolute_error: 0.0316 - val_loss: 0.0034 - val_mean_absolute_error: 0.0300
```

After the training ends (or during the training), try to run tensorboard using this command:

`tensorboard --logdir="logs"`

Now this will start a local HTTP server "localhost:6006", after going to the browser, you'll see something similar to this:

The loss is Huber loss as specified in the LOSS parameter (you can always change it to mean absolute error or mean squared error), the orange curve is the training loss, whereas the blue curve is what we care about the most, the validation loss. As you can see, it is significantly decreasing over time, so this is working !

Before we test our model, we gonna need to reload the data with no shuffling, as we are going to plot the stock price curve in the correct order:

```
data = load_data(ticker, N_STEPS, lookup_step=LOOKUP_STEP, test_size=TEST_SIZE,
feature_columns=FEATURE_COLUMNS, shuffle=False)
# construct the model
model = create_model(N_STEPS, loss=LOSS, units=UNITS, cell=CELL, n_layers=N_LAYERS,
dropout=DROPOUT, optimizer=OPTIMIZER, bidirectional=BIDIRECTIONAL)
model_path = os.path.join("results", model_name) + ".h5"
model.load_weights(model_path)
```

If you're following along with a notebook, you shouldn't reconstruct the model and load the weights, so you need to comment it out. However, if you're using another Python file for testing, then you should do that.

Now let's test our model:

```
# evaluate the model
mse, mae = model.evaluate(data["X_test"], data["y_test"], verbose=0)
# calculate the mean absolute error (inverse scaling)
mean_absolute_error = data["column_scaler"]["adjclose"].inverse_transform([[mae]])[0][0]
print("Mean Absolute Error:", mean_absolute_error)
```

Remember that the output will be a value between 0 to 1, so we need to get it back to a real price value, here is the output:

`Mean Absolute Error: 6.516846878481972`

Not bad, in average, the predicted price is only far to the real price by 6.52$.

Alright, let's try to predict the future price of Apple Stock Market:

```
def predict(model, data):
# retrieve the last sequence from data
last_sequence = data["last_sequence"][-N_STEPS:]
# retrieve the column scalers
column_scaler = data["column_scaler"]
# reshape the last sequence
last_sequence = last_sequence.reshape((last_sequence.shape[1], last_sequence.shape[0]))
# expand dimension
last_sequence = np.expand_dims(last_sequence, axis=0)
# get the prediction (scaled from 0 to 1)
prediction = model.predict(last_sequence)
# get the price (by inverting the scaling)
predicted_price = column_scaler["adjclose"].inverse_transform(prediction)[0][0]
return predicted_price
```

This function uses the last_sequence variable we saved in the load_data() function, which is basically the last sequence of prices, we use it to predict the next price, let's call this:

```
# predict the future price
future_price = predict(model, data)
print(f"Future price after {LOOKUP_STEP} days is {future_price:.2f}$")
```

Output:

`Future price after 1 days is 404.78$`

Sounds interesting ! Two days before, the price was 447.37$, and yesterday was 416.43$, the model is saying that the next day, it will be 404.78$. A trending decrease. The model just used 70 days of features to be able to get that value, let's plot the prices and see:

```
def plot_graph(model, data):
y_test = data["y_test"]
X_test = data["X_test"]
y_pred = model.predict(X_test)
y_test = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(np.expand_dims(y_test, axis=0)))
y_pred = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(y_pred))
# last 200 days, feel free to edit that
plt.plot(y_test[-200:], c='b')
plt.plot(y_pred[-200:], c='r')
plt.xlabel("Days")
plt.ylabel("Price")
plt.legend(["Actual Price", "Predicted Price"])
plt.show()
```

This function plots the last 200 days of the test set (you can edit it as you wish) as well as the predicted prices, let's call it and see how it looks like:

`plot_graph(model, data)`

Result:

Great, as you can see, the blue curve is the actual test set, and the red curve is the predicted prices ! Notice that the stock price recently is decreasing, as we predicted.

This will still work if you have higher `LOOKUP_STEP`

, but it will use older data (older by `LOOKUP_STEP`

days) in order to draw the red line.

Until now, we have used to predict only the next day, I have tried to build other models that use different lookup_steps, here is an interesting result in tensorboard:

Interestingly enough, the blue curve is the model we used in the tutorial, which uses the next timestep stock price as the label, whereas the green and orange curves used 10 and 30 lookup steps respectively, for instance, in this example, the orange model predicts the stock price after 30 days, which is a great model for more long term investments (which is usually the case).

Now you may think that, but what if we just want to predict if the price is going to rise or fall, not the actual price value as we did here, well you can do it using one of the two ways, one is you compare the predicted price with the current price and you make the decision, or you build an entire model and change the last output's activation function to sigmoid, as well as the loss and the metrics.

The below function calculates the accuracy score by converting the predicted price to 0 or 1 (0 indicates that the price went down, and 1 indicates that it went up):

```
def get_accuracy(model, data):
y_test = data["y_test"]
X_test = data["X_test"]
y_pred = model.predict(X_test)
y_test = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(np.expand_dims(y_test, axis=0)))
y_pred = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(y_pred))
y_pred = list(map(lambda current, future: int(float(future) > float(current)), y_test[:-LOOKUP_STEP], y_pred[LOOKUP_STEP:]))
y_test = list(map(lambda current, future: int(float(future) > float(current)), y_test[:-LOOKUP_STEP], y_test[LOOKUP_STEP:]))
return accuracy_score(y_test, y_pred)
```

Now let's call the function:

`print(str(LOOKUP_STEP) + ":", "Accuracy Score:", get_accuracy(model, data))`

Here is the result when I trained 3 models for different `LOOKUP_STEPS`

:

```
1: Accuracy Score: 0.5642570281124498
10: Accuracy Score: 0.7192622950819673
30: Accuracy Score: 0.8318965517241379
```

As you may notice, the model predicts more accurately in long term prices, it reaches about 71.9% when we train the model to predict the price of the next 10 days, and it reaches about 83.2% accuracy when using 30 lookup steps.

Alright, that's it for this tutorial, you can tweak the parameters and see how you can improve the model performance, try to train on more epochs, say 500 or even more, increase or decrease the BATCH_SIZE and see if does change to the better, or play around with N_STEPS and LOOKUP_STEPS and see which combination works best.

You can also change the model parameters such as increasing the number of layers or the number of LSTM units, or even try the GRU cell instead of LSTM.

Note that there are other features and indicators to use, in order to improve the prediction, it is often known to use some other information as features, such as technical indicators, the company product innovation, interest rate, exchange rate, public policy, the web and financial news and even the number of employees !

I encourage you to change the model architecture, try to use CNNs or Seq2Seq models, or even add bidirectional LSTMs to this existing model, see if you can improve it !

Also, use different stock markets, check the Yahoo Finance page and see which one you actually want !

If you're not using a notebook or an interactive shell, I have splitted the code to different Python files, each one for its purpose, check it here.

Finally, I've collected some useful resources and courses for you for further learning, here you go:

**Read also: **How to Perform Voice Gender Recognition using TensorFlow in Python.

Happy Training ♥

View Full CodeJOIN OUR NEWSLETTER THAT IS FOR PYTHON DEVELOPERS & ENTHUSIASTS LIKE YOU !