ニューラルネットワークで株価予測をしてみます。
まずは必要なモジュールをインポートします。
1 2 3 4 5 6 7
| import numpy as np import matplotlib.pyplot as plt
import chainer.optimizers as Opt import chainer.functions as F import chainer.links as L from chainer import Variable, Chain, config
|
データ振り分け処理、グラフ表示、回帰分析用の関数を定義します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
def data_divide(Dtrain, D, xdata, tdata, shuffle='on'): if shuffle == 'on': index = np.random.permutation(range(D)) elif shuffle == 'off': index = np.arange(D) else: print('error') xtrain = xdata[index[0:Dtrain],:] ttrain = tdata[index[0:Dtrain]] xtest = xdata[index[Dtrain:D],:] ttest = tdata[index[Dtrain:D]] return xtrain, xtest, ttrain, ttest
def show_graph(result1, result2, title, xlabel, ylabel, ymin=0.0, ymax=1.0): Tall = len(result1) plt.figure(figsize=(12, 8)) plt.plot(range(Tall), result1, label='train') plt.plot(range(Tall), result2, label='test') plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.xlim([0, Tall]) plt.ylim(ymin, ymax) plt.legend() plt.show()
def learning_regression(model, optNN, data, T=10): train_loss = [] test_loss = [] for time in range(T): config.train = True optNN.target.cleargrads() ytrain = model(data[0]) loss_train = F.mean_squared_error(ytrain, data[2]) loss_train.backward() optNN.update() config.train = False ytest = model(data[1]) loss_test = F.mean_squared_error(ytest, data[3]) train_loss.append(loss_train.data) test_loss.append(loss_test.data) return train_loss, test_loss
|
株価予測の前に適当な関数で変換したデータを回帰分析してみます。
データを作成します。
1 2 3 4
| M = 100 time_data = np.linspace(0.0, 10.0, M) value_data = np.sin(time_data) + 2.0 * np.sin(2.0 * time_data)
|
直前2回分データを入力データとして、入力データとラベルデータを作成します。
1 2 3 4 5 6 7 8 9 10 11 12
| N = 2 xdata = [] tdata = [] for k in range(N, M): xdata.append(value_data[k - N:k]) tdata.append(value_data[k]) xdata = np.array(xdata).astype(np.float32) tdata = np.array(tdata).reshape(M - N, 1).astype(np.float32)
D, N = xdata.shape
|
4層のニューラルネットワークを作成し、関数化します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| C = 1 H1 = 5 H2 = 5 H3 = 5 layers = {} layers['l1'] = L.Linear(N, H1) layers['l2'] = L.Linear(H1, H2) layers['l3'] = L.Linear(H2, H3) layers['l4'] = L.Linear(H3, C) layers['bnorm1'] = L.BatchNormalization(H1) layers['bnorm2'] = L.BatchNormalization(H2) layers['bnorm3'] = L.BatchNormalization(H3) NN = Chain(**layers)
def model(x): h = NN.l1(x) h = F.relu(h) h = NN.bnorm1(h)
h = NN.l2(h) h = F.relu(h) h = NN.bnorm2(h)
h = NN.l3(h) h = F.relu(h) h = NN.bnorm3(h)
y = NN.l4(h) return y
|
最適化手法を設定します。
1 2 3
| optNN = Opt.MomentumSGD() optNN.setup(NN)
|
入力データとラベルデータをそれぞれ訓練データとテストデータに振り分けます。
ここでは先頭3分の1のデータを訓練データとします。
1 2 3 4 5 6 7 8
| Dtrain = D // 3
xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata, 'off')
data = [xtrain, xtest, ttrain, ttest]
|
回帰分析を行います。200回学習します。
1
| train_loss, test_loss = learning_regression(model, optNN, data, 200)
|
学習結果と予測結果をグラフ表示します。
1 2 3 4 5 6 7
| ytrain = model(xtrain).data ytest = model(xtest).data plt.figure(figsize=(12, 8)) plt.plot(time_data[0:Dtrain], ytrain, marker='x', linestyle='None') plt.plot(time_data[Dtrain:D], ytest, marker='o', linestyle='None') plt.plot(time_data[0:D - N], value_data[N:D]) plt.show()
|
予測がうまくいってない箇所もありますが、十分な結果がでているようです。
(実行するごとに微妙に結果が変わります。ニューラルネットワークの個性ということでしょうか。)
次に予測用の株価データを準備します。
(株価の読み込み期間を変えるニューラルネットワークの学習処理でエラーになることがあります。
異常値エラーとのことですが、理由がよくわからないので今回は何回か試して問題のなかった[2005/01/01-2007/12/31]を分析期間としました。)
1 2 3 4 5 6 7
| import pandas_datareader.data as web import datetime as dt
start = dt.date(2005, 1, 1) end = dt.date(2007, 12, 31) web_data = web.DataReader('AMZN', 'yahoo', start, end)
|
読み込んだ株価データをグラフ化します。
1 2 3
| plt.figure(figsize=(12, 8)) plt.plot(web_data['Close']) plt.show()
|
株価データを入力データとラベルデータに振り分けます。
今回は直前5回分のデータを入力データとします。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| value_data = web_data['Close'] Total = len(value_data)
N = 5 xdata = [] tdata = [] for k in range(N, Total): xdata.append(value_data[k - N:k]) tdata.append(value_data[k])
xdata = np.array(xdata).astype(np.float32) tdata = np.array(tdata).reshape(len(tdata), 1).astype(np.float32)
D, N = xdata.shape
|
4層ニューラルネットワークを作成し、関数化します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| C = 1 H1 = 5 H2 = 5 H3 = 5 layers = {} layers['l1'] = L.Linear(N, H1) layers['l2'] = L.Linear(H1, H2) layers['l3'] = L.Linear(H2, H3) layers['l4'] = L.Linear(H3, C) layers['bnorm1'] = L.BatchNormalization(H1) layers['bnorm2'] = L.BatchNormalization(H2) layers['bnorm3'] = L.BatchNormalization(H3) NN = Chain(**layers)
def model(x): h = NN.l1(x) h = F.relu(h) h = NN.bnorm1(h)
h = NN.l2(h) h = F.relu(h) h = NN.bnorm2(h)
h = NN.l3(h) h = F.relu(h) h = NN.bnorm3(h)
y = NN.l4(h) return y
|
最適化手法を設定し、訓練データとテストデータに振り分けます。
訓練データとテストデータは半分ずつに分けます。
1 2 3 4 5 6 7 8 9 10
| optNN = Opt.MomentumSGD() optNN.setup(NN)
Dtrain = D // 2
xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata, 'off')
data = [xtrain, xtest, ttrain, ttest]
|
回帰分析を行います。今回は学習回数を1000回に設定しました。
1
| train_loss, test_loss = learning_regression(model, optNN, data, 1000)
|
誤差と予測結果を表示します。
1 2 3 4 5 6 7 8 9 10 11
| show_graph(train_loss, test_loss, 'loss function', 'step', 'loss function', ymin=0.0, ymax=10.0)
ytrain = model(xtrain).data ytest = model(xtest).data time_data = np.arange(Total - N) plt.figure(figsize=(12, 8)) plt.plot(time_data[0:Dtrain], ytrain, marker='x', linestyle='None') plt.plot(time_data[Dtrain:Total], ytest, marker='o', linestyle='None') plt.plot(time_data[0:Total - N], value_data[N:Total]) plt.show()
|
いまいちな結果です。やはり株価を予測するのは無理なのでしょうか。
(Google Colaboratoryで動作確認しています。)