強化学習3 (シミュレータ)

強化学習では、戦略を重視するか(Policyベース)、価値を重視するか(Valueベース)が重要なポイントとなります。
この2つをシミュレーションするサンプルがありましたので実行してみました。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習2 (マルコフ決定過程)

前回作成した環境(環境構築に関する記事)を使ってマルコフ決定過程に従う環境を作成します。

  • 遷移先の状態は直前の状態とそこでの行動だけに依存する。
  • 報酬は直前の状態と遷移先に依存する。

構成要素は次の4つとなります。

  1. 状態(State)
  2. 行動(Action)
  3. 状態遷移の確率(遷移関数/Transition function)
  4. 即時報酬(報酬関数/Reward function)

コードにコメントをいっぱい書いてみましたので参考にしてください

environment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from enum import Enum
import numpy as np

# エージェントのいる位置を表すクラス
class State():

def __init__(self, row=-1, column=-1): # コンストラクタ
self.row = row
self.column = column

def __repr__(self): # printした時に表示される
return "<State: [{}, {}]>".format(self.row, self.column)

def clone(self):
return State(self.row, self.column)

def __hash__(self):
return hash((self.row, self.column))

def __eq__(self, other):
return self.row == other.row and self.column == other.column

# 上下左右の行動を表すクラス(対になる行動は -1を掛けた値となる)
class Action(Enum):
UP = 1
DOWN = -1
LEFT = 2
RIGHT = -2

# 環境クラス(遷移関数と報酬関数あり)・・・一番長くそして核となるクラス
class Environment():

def __init__(self, grid, move_prob=0.8):
# 2次元のグリッド。
# [種類]
# 0: 通常(エージェント移動可能)
# -1: ダメージ (ゲーム終了)
# 1: 報酬 (ゲーム終了)
# 9: 壁 (エージェント移動不可)
self.grid = grid
self.agent_state = State() # エージェントの位置

# デフォルトの報酬はマイナスとする。 (毒沼のような感じ)
# エージェントが早くゴール向かうようにするため。
self.default_reward = -0.04

# エージェントは遷移確率(%)で指定された方向に進む。
# 遷移確率から1を引いた確率(%)で違う方向に進む。
self.move_prob = move_prob
self.reset()

@property
def row_length(self):
return len(self.grid)

@property
def column_length(self):
return len(self.grid[0])

@property
def actions(self):
return [Action.UP, Action.DOWN,
Action.LEFT, Action.RIGHT]

@property
def states(self):
states = []
for row in range(self.row_length):
for column in range(self.column_length):
# 壁(9)はステータスに含まない。
if self.grid[row][column] != 9:
states.append(State(row, column))
return states

# 遷移関数
def transit_func(self, state, action):
transition_probs = {} # 遷移確率を初期化
if not self.can_action_at(state):
# すでに終端にいる
return transition_probs
# 反対方向
opposite_direction = Action(action.value * -1)

for a in self.actions:
prob = 0
if a == action:
prob = self.move_prob # 遷移確率(0.8=80%)
elif a != opposite_direction:
prob = (1 - self.move_prob) / 2 # 遷移確率(0.1=10%)

# 移動位置ごとに遷移確率を設定する
next_state = self._move(state, a)
if next_state not in transition_probs:
transition_probs[next_state] = prob
else:
transition_probs[next_state] += prob

return transition_probs

# 移動可能な場所かどうか
def can_action_at(self, state):
if self.grid[state.row][state.column] == 0:
return True
else:
return False

# 移動する
def _move(self, state, action):
if not self.can_action_at(state):
raise Exception("Can't move from here!")
# 移動先用にグリッドをもう1つ用意
next_state = state.clone()
# アクションに応じて移動する
if action == Action.UP:
next_state.row -= 1
elif action == Action.DOWN:
next_state.row += 1
elif action == Action.LEFT:
next_state.column -= 1
elif action == Action.RIGHT:
next_state.column += 1

# グリッドの外かどうかをチェックする。(外だったらもとの位置に戻す)
if not (0 <= next_state.row < self.row_length):
next_state = state
# グリッドの外かどうかをチェックする。(外だったらもとの位置に戻す)
if not (0 <= next_state.column < self.column_length):
next_state = state
# 壁にぶつかったかどうかをチェックする。(ぶつかったらもとの位置に戻す)
if self.grid[next_state.row][next_state.column] == 9:
next_state = state
# 移動先のグリッド位置を返す
return next_state

# 報酬関数(ゲーム終了判定と報酬を返す)
def reward_func(self, state):
reward = self.default_reward
done = False

# 次の状態の属性をチェックする
attribute = self.grid[state.row][state.column]
if attribute == 1:
# 報酬を得てゲーム終了
reward = 1
done = True
elif attribute == -1:
# ダメージを受けてゲーム終了.
reward = -1
done = True

return reward, done

def reset(self): # エージェントを左上に戻す
self.agent_state = State(self.row_length - 1, 0)
return self.agent_state

# エージェントから行動を受け取り、遷移関数/報酬関数を用いて、
# 次の遷移先と即時報酬を計算する
def step(self, action):
next_state, reward, done = self.transit(self.agent_state, action)
if next_state is not None:
self.agent_state = next_state

return next_state, reward, done

def transit(self, state, action):
# 遷移関数で遷移確率を取得する。
transition_probs = self.transit_func(state, action)
if len(transition_probs) == 0:
return None, None, True

next_states = []
probs = []
for s in transition_probs:
next_states.append(s)
probs.append(transition_probs[s])

# 遷移関数の出力した確率に沿って遷移先を得る(ここで実際の行動が決まる!)
next_state = np.random.choice(next_states, p=probs)
# 報酬関数で報酬とゲーム終了判定を取得する
reward, done = self.reward_func(next_state)
return next_state, reward, done

上記で実装した環境を実行するためのコードは下記の通りです。

environment_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import random
from environment import Environment

# エージェントを表すクラス(戦略を定義する)
class Agent():

def __init__(self, env):
self.actions = env.actions

# 戦略(最初なのでただのランダム選択)
def policy(self, state):
return random.choice(self.actions)

# 実行関数
def main():
# グリッド環境を作成
grid = [
[0, 0, 0, 1],
[0, 9, 0, -1],
[0, 0, 0, 0]
]
env = Environment(grid) # グリッドから環境を作成
agent = Agent(env) # 環境をエージェントに設定

# ゲームを10回実行する
for i in range(10):
state = env.reset() # エージェントの位置リセット
total_reward = 0 # トータル報酬初期化
done = False # ゲーム終了状態初期化

while not done: # ゲーム終了まで続ける
# 戦略に沿ったアクション(移動方向)を取得。(現状はランダム選択)
action = agent.policy(state)
# アクションによる移動位置、報酬、ゲーム終了状態を取得。
next_state, reward, done = env.step(action)
# 今回移動分の報酬をトータル報酬に加算・減算する
total_reward += reward
# エージェントの位置を更新する
state = next_state

print("エピソード {}: エージェント取得報酬 {}.".format(i, total_reward))

if __name__ == "__main__":
main()

environment_demo.pyを実行すると下記のような結果となりました。
(乱数を使ってるので実行するたびに結果は異なります。)
結果

最初よくわからなったのですがエージェントの戦略で決めた行動が必ず実行されるわけではなく、その行動をもとに遷移確率を算出し、その確率に応じて実際の動作(Action)が決まるということです。
(Policyで決めた行動と違う方向に進んでいて「なぜなんだ?!」と悩んでました。。。)

今回の例では、まず戦略で決めた行動がUPだとしたらそれが実行される確率が80%で、残り20%の半分10%がRIGHTかLEFTが実行される確率に割り当てられるということになります。(反対行動のDOWNは0%)

強化学習1 (環境構築編)

機械学習の中で一番興味があった強化学習ですが、難しいのと時間がかかるので敬遠していたのですが少しづつ記事に書いていこうかと思います。

今回は環境構築編になります。
前準備としてWindows10環境にAnaconda3-2019.07-Windows-x86_64.exeとgitをインストールしておきます。

それが終わったら強化学習用の実行環境を構築します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
conda create -n env3.6 python=3.6

conda activate env3.6
pip install gym==0.10.9
pip install jupyter==1.0.0
pip install numpy==1.15.4
pip install pandas==0.23.4
pip install scipy==1.1.0
pip install scikit-learn==0.20.0
pip install matplotlib==3.0.1
pip install tensorflow==1.12.0
pip install -e git+https://github.com/ntasfi/PyGame-Learning-Environment.git#egg=ple
pip install -e git+https://github.com/lusob/gym-ple.git#egg=gym-ple
pip install h5py==2.8.0
pip install pygame==1.9.4
pip install tqdm==4.28.1

全て問題なくインストールできたら下記のpyファイルを作成し実行します。

welcome.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import numpy as np
from tensorflow.python import keras as K
import gym
import gym_ple
import time

def welcome():

env = gym.make("Catcher-v0")
num_action = env.action_space.n
episode_count = 10

s = env.reset()
brain = K.Sequential()
brain.add(K.layers.Dense(num_action, input_shape=[np.prod(s.shape)], activation="softmax"))

def policy(s):
evaluation = brain.predict(np.array([s.flatten()]))
return np.argmax(evaluation)

for e in range(episode_count):
time.sleep(3)
s = env.reset()
done = False
while not done:
env.render()
a = policy(s)
n_state, reward, done, info = env.step(a)
s = n_state

if __name__ == "__main__":
welcome()

ウィンドウが現れて、次のようなボールキャッチゲームが動いたら環境構築は完了です。

強化学習用の実行環境を終了する場合は次のコマンドを実行します。

1
conda deactivate

強化学習(環境構築編)は以上になります。お疲れ様でした。

深層学習でCIFAR10分類(一般物体認識)

CIFAR10とういうデータを使って一般物の認識をしてみます。

まずは必要なモジュールをインポートします。

1
2
3
4
5
6
7
8
9
10
import numpy as np
import matplotlib.pyplot as plt

import chainer.optimizers as Opt
import chainer.functions as F
import chainer.links as L
import chainer
import chainer.datasets as ds
import chainer.dataset.convert as con
from chainer import Variable, Chain, config, cuda

共通関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -------------- #
# 共通関数の定義 #
# -------------- #
# データ振り分け関数
def data_divide(Dtrain, D, xdata, tdata, shuffle='on'):
if shuffle == 'on':
index = np.random.permutation(range(D))
elif shuffle == 'off':
index = np.arange(D)
else:
print('error')
xtrain = xdata[index[0:Dtrain],:]
ttrain = tdata[index[0:Dtrain]]
xtest = xdata[index[Dtrain:D],:]
ttest = tdata[index[Dtrain:D]]
return xtrain, xtest, ttrain, ttest

# グラフ表示関数
def show_graph(result1, result2, title, xlabel, ylabel, ymin=0.0, ymax=1.0):
# 学習記録の表示(誤差)
Tall = len(result1)
plt.figure(figsize=(12, 8))
plt.plot(range(Tall), result1, label='train')
plt.plot(range(Tall), result2, label='test')
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.xlim([0, Tall])
plt.ylim(ymin, ymax)
plt.legend()
plt.show()

# 学習用関数
def learning_classification(model,optNN,data,result,T=10):
for time in range(T):
optNN.target.cleargrads()
ytrain = model(data[0])
loss_train = F.softmax_cross_entropy(ytrain,data[2])
acc_train = F.accuracy(ytrain,data[2])
loss_train.backward()
optNN.update()

with chainer.using_config('train', False), chainer.using_config('enable_backprop', False):
ytest = model(data[1])
loss_test = F.softmax_cross_entropy(ytest,data[3])
acc_test = F.accuracy(ytest,data[3])
result[0].append(cuda.to_cpu(loss_train.data))
result[1].append(cuda.to_cpu(loss_test.data))
result[2].append(cuda.to_cpu(acc_train.data))
result[3].append(cuda.to_cpu(acc_test.data))

一般物体認識のデータセットを読み込みます。

1
2
3
4
5
6
7
# 一般物体認識のデータセットの読み込み
train, test = ds.get_cifar10(ndim=3)
xtrain, ttrain = con.concat_examples(train)
xtest, ttest = con.concat_examples(test)

# データサイズの確認
Dtrain, ch, Ny, Nx = xtrain.shape

どんなデータがあるかいくつか画像を出力します。

1
2
3
4
5
6
7
8
9
# 試しに画像を出力
plt.imshow(xtrain[0,:,:,:].transpose(1, 2, 0))
plt.show()

plt.imshow(xtrain[500,:,:,:].transpose(1, 2, 0))
plt.show()

plt.imshow(xtrain[1200,:,:,:].transpose(1, 2, 0))
plt.show()

結果

画像識別の問題に有効な畳み込みニューラルネットワークを設定します。

1
2
3
4
5
6
7
8
9
# 2層畳み込みニューラルネットワークの設定
C = ttrain.max() + 1
H1 = 10

layers = {}
layers['conv1'] = L.Convolution2D(ch, H1, ksize=3, stride=1, pad=1)
layers['bnorm1'] = L.BatchRenormalization(H1)
layers['l1'] = L.Linear(None, C)
NN = Chain(**layers)

プーリング層を導入した関数を定義します。

1
2
3
4
5
6
7
8
# プーリング層の導入
def model(x):
h = NN.conv1(x)
h = F.relu(h)
h = NN.bnorm1(h)
h = F.max_pooling_2d(h, ksize=3, stride=2, pad=1)
y = NN.l1(h)
return y

GPUを設定します。

1
2
3
4
# GPUの設定
gpu_device = 0
cuda.get_device(gpu_device).use()
NN.to_gpu(gpu_device)

最適化手法を設定します。

1
2
3
# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

学習の経過を保存する変数を宣言します。

1
2
3
4
5
6
7
# 学習データ保存エリア
train_loss = []
train_acc = []
test_loss = []
test_acc = []

result = [train_loss, test_loss, train_acc, test_acc]

一度に全部実行するとメモリオーバーになってしまうのでデータセットを分割して実行します。
CIFARのデータは50,000個の訓練データがあるので5,000個ずつを10回に分けて実行します。

1
2
3
4
5
6
# シリアルイテレータの呼び出し
from chainer.iterators import SerialIterator as siter

# データセット振り分ける
batch_size = 5000
train_iter = siter(train, batch_size)

いつくかのバッチと呼ばれる小さな塊に分けて学習を進める方法をバッチ学習といいます。
特にランダムにデータを選んでニューラルネットワークの最適化を進める方法を確率勾配法といいます。
確率勾配法による最適化を行います。

1
2
3
4
5
6
7
# 確率勾配法による最適化
nepoch = 10
while train_iter.epoch < nepoch:
batch = train_iter.next()
xtrain, ttrain = con.concat_examples(batch)
data = cuda.to_gpu([xtrain, xtest, ttrain, ttest])
learning_classification(model, optNN, data, result, 1)

結果を表示します。

1
2
3
show_graph(result[0], result[1], 'loss function', 'step', 'loss function', 0.0, 4.0)

show_graph(result[2], result[3], 'accuracy', 'step', 'accuracy')

誤差

正解率

正解率は6割に届かない程度となりました。
ここから正解率を上げるために4層のニューラルネットワークにしていきます。

畳み込みニューラルネットワークのクラスを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 畳み込みニューラルネットワークのクラス作成
class CNN(Chain):
def __init__(self, ch_in,ch_out):
layers = {}
layers['conv1'] = L.Convolution2D(ch_in,ch_out,ksize=5,stride=2,pad=2)
layers['bnorm1'] = L.BatchNormalization(ch_out)
super().__init__(**layers)

def __call__(self,x):
h = self.conv1(x)
h = self.bnorm1(h)
h = F.relu(h)

return h

多段階の畳み込みニューラルネットワークと関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 多段階の畳み込みニューラルネットワーク
C = ttrain.max() + 1
H1 = 16
H2 = 32
H3 = 64

NN = Chain(cnn1 = L.Convolution2D(ch,H1,ksize=1,stride=1,pad=0),
cnn2 = L.Convolution2D(H1,H1,ksize=3,stride=1,pad=0),
cnn3 = L.Convolution2D(H1,H1,ksize=5,stride=1,pad=0),
cnn4 = L.Convolution2D(H1,H2,ksize=1,stride=1,pad=0),
cnn5 = L.Convolution2D(H2,H2,ksize=3,stride=1,pad=0),
cnn6 = L.Convolution2D(H2,H2,ksize=5,stride=1,pad=0),
cnn7 = L.Convolution2D(H2,H3,ksize=1,stride=1,pad=0),
cnn8 = L.Convolution2D(H3,H3,ksize=3,stride=1,pad=0),
cnn9 = L.Convolution2D(H3,C,ksize=5,stride=1,pad=3),
bnorm1 = L.BatchNormalization(H1),
bnorm2 = L.BatchNormalization(H1),
bnorm3 = L.BatchNormalization(H1),
bnorm4 = L.BatchNormalization(H2),
bnorm5 = L.BatchNormalization(H2),
bnorm6 = L.BatchNormalization(H2),
bnorm7 = L.BatchNormalization(H3),
bnorm8 = L.BatchNormalization(H3))

def model(x):
h = NN.cnn1(x)
h = F.relu(h)
h = NN.bnorm1(h)
h = NN.cnn2(h)
h = F.relu(h)
h = NN.bnorm2(h)
h = NN.cnn3(h)
h = F.relu(h)
h = NN.bnorm3(h)
h = F.max_pooling_2d(h, 2)
h = NN.cnn4(h)
h = F.relu(h)
h = NN.bnorm4(h)
h = NN.cnn5(h)
h = F.relu(h)
h = NN.bnorm5(h)
h = NN.cnn6(h)
h = F.relu(h)
h = NN.bnorm6(h)
h = F.max_pooling_2d(h, 2)
h = NN.cnn7(h)
h = F.relu(h)
h = NN.bnorm7(h)
h = NN.cnn8(h)
h = F.relu(h)
h = NN.bnorm8(h)
h = NN.cnn9(h)
y = F.mean(h,axis=(2,3))

return y

GPUの設定を行います。

1
2
3
4
# GPUの設定
gpu_device = 0
cuda.get_device(gpu_device).use()
NN.to_gpu()

結果を一旦初期化します。

1
2
3
4
5
train_loss = []
train_acc = []
test_loss = []
test_acc = []
result = [train_loss,train_acc,test_loss,test_acc]

最適化手法にAdamを設定します。

1
2
3
# 最適化手法の設定
optNN = Opt.Adam()
optNN.setup(NN)

関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def shift_labeled(labeled_data):
data, label = labeled_data

ch, Ny, Nx = data.shape
z_h = Ny * (2.0 * np.random.rand(1) - 1.0) * 0.3
z_v = Nx * (2.0 * np.random.rand(1) - 1.0) * 0.3
data = np.roll(data, int(z_h), axis=1)
data = np.roll(data, int(z_v), axis=2)

return data, label

def flip_labeled(labeled_data):
data, label = labeled_data
z = np.random.randint(2)
if z == 1:
data = data[:,::-1,:]
z = np.random.randint(2)
if z == 1:
data = data[:,:,::-1]

z = np.random.randint(2)
if z == 1:
data = data.transpose(0, 2, 1)
return data, label

def check_network(x, link):
print('input:', x.shape)
h = link(x)
print('output:', h.shape)

確率勾配法で最適化し、結果をグラフ化します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from tqdm import tqdm

nepoch = 100
batch_size = 128
train_iter = siter(train, batch_size)
with tqdm(total = nepoch) as pbar:
while train_iter.epoch < nepoch:
pbar.update(train_iter.is_new_epoch)
batch = train_iter.next()
batch = ds.TransformDataset(batch, flip_labeled)
xtrain,ttrain = con.concat_examples(batch)
data = cuda.to_gpu([xtrain,xtest,ttrain,ttest])
learning_classification(model,optNN,data,result,1)
if train_iter.is_new_epoch == 1:
show_graph(result[0],result[1],'loss function in training','step','loss function',0.0,4.0)
show_graph(result[2],result[3],'Accuracy in training and test','step','accuracy')
print(result[3][len(result[3])-1])

2、3時間実行しているのですが、進捗が48%でまだ時間がかかるので一旦ここまでの結果を確認しておきます。
誤差
正解率

なんとか6割の正解率を超えてきているでしょうか。ただここまで時間がかかるのと正解率が微妙にしか上昇していないのでなんらかの改善の余地はあると思います。

5時間20分かかってようやく終了しました。
誤差
正解率

最終的な正解率は73%ですか。。。まー実用性があるといってもいいような気がします。

(Google Colaboratoryで動作確認しています。)

ニューラルネットワークで株価予測

ニューラルネットワークで株価予測をしてみます。
まずは必要なモジュールをインポートします。

1
2
3
4
5
6
7
import numpy as np
import matplotlib.pyplot as plt

import chainer.optimizers as Opt
import chainer.functions as F
import chainer.links as L
from chainer import Variable, Chain, config

データ振り分け処理、グラフ表示、回帰分析用の関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# -------------- #
# 共通関数の定義 #
# -------------- #
# データ振り分け関数
def data_divide(Dtrain, D, xdata, tdata, shuffle='on'):
if shuffle == 'on':
index = np.random.permutation(range(D))
elif shuffle == 'off':
index = np.arange(D)
else:
print('error')
xtrain = xdata[index[0:Dtrain],:]
ttrain = tdata[index[0:Dtrain]]
xtest = xdata[index[Dtrain:D],:]
ttest = tdata[index[Dtrain:D]]
return xtrain, xtest, ttrain, ttest

# グラフ表示関数
def show_graph(result1, result2, title, xlabel, ylabel, ymin=0.0, ymax=1.0):
# 学習記録の表示(誤差)
Tall = len(result1)
plt.figure(figsize=(12, 8))
plt.plot(range(Tall), result1, label='train')
plt.plot(range(Tall), result2, label='test')
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.xlim([0, Tall])
plt.ylim(ymin, ymax)
plt.legend()
plt.show()

# 回帰関数の定義
def learning_regression(model, optNN, data, T=10):
train_loss = []
test_loss = []
for time in range(T):
config.train = True
optNN.target.cleargrads() # zerogradsより記憶容量の確保にいい
ytrain = model(data[0])
loss_train = F.mean_squared_error(ytrain, data[2])
loss_train.backward()
optNN.update()

config.train = False
ytest = model(data[1])
loss_test = F.mean_squared_error(ytest, data[3])

train_loss.append(loss_train.data)
test_loss.append(loss_test.data)
return train_loss, test_loss

株価予測の前に適当な関数で変換したデータを回帰分析してみます。
データを作成します。

1
2
3
4
# 時系列データ作成
M = 100
time_data = np.linspace(0.0, 10.0, M)
value_data = np.sin(time_data) + 2.0 * np.sin(2.0 * time_data)

直前2回分データを入力データとして、入力データとラベルデータを作成します。

1
2
3
4
5
6
7
8
9
10
11
12
# 直前2回分のデータを入力にする
N = 2
xdata = []
tdata = []
for k in range(N, M):
xdata.append(value_data[k - N:k])
tdata.append(value_data[k])
xdata = np.array(xdata).astype(np.float32)
tdata = np.array(tdata).reshape(M - N, 1).astype(np.float32)

# データの形を確認
D, N = xdata.shape

4層のニューラルネットワークを作成し、関数化します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 4層ニューラルネットワークを作成し、関数化
C = 1
H1 = 5
H2 = 5
H3 = 5
layers = {}
layers['l1'] = L.Linear(N, H1)
layers['l2'] = L.Linear(H1, H2)
layers['l3'] = L.Linear(H2, H3)
layers['l4'] = L.Linear(H3, C)
layers['bnorm1'] = L.BatchNormalization(H1)
layers['bnorm2'] = L.BatchNormalization(H2)
layers['bnorm3'] = L.BatchNormalization(H3)
NN = Chain(**layers)

def model(x):
h = NN.l1(x)
h = F.relu(h)
h = NN.bnorm1(h)

h = NN.l2(h)
h = F.relu(h)
h = NN.bnorm2(h)

h = NN.l3(h)
h = F.relu(h)
h = NN.bnorm3(h)

y = NN.l4(h)
return y

最適化手法を設定します。

1
2
3
# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

入力データとラベルデータをそれぞれ訓練データとテストデータに振り分けます。
ここでは先頭3分の1のデータを訓練データとします。

1
2
3
4
5
6
7
8
# データ分割(3分の1を訓練データ、3分の2をテストデータとする)
Dtrain = D // 3

xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata, 'off')

# print(xtrain.shape, xtest.shape, ttrain.shape, ttest.shape)
data = [xtrain, xtest, ttrain, ttest]
# result = [train_loss, test_loss]

回帰分析を行います。200回学習します。

1
train_loss, test_loss = learning_regression(model, optNN, data, 200)

学習結果と予測結果をグラフ表示します。

1
2
3
4
5
6
7
ytrain = model(xtrain).data
ytest = model(xtest).data
plt.figure(figsize=(12, 8))
plt.plot(time_data[0:Dtrain], ytrain, marker='x', linestyle='None') # 学習結果
plt.plot(time_data[Dtrain:D], ytest, marker='o', linestyle='None') # 予測結果
plt.plot(time_data[0:D - N], value_data[N:D])
plt.show()

予測結果
予測がうまくいってない箇所もありますが、十分な結果がでているようです。
(実行するごとに微妙に結果が変わります。ニューラルネットワークの個性ということでしょうか。)

次に予測用の株価データを準備します。
(株価の読み込み期間を変えるニューラルネットワークの学習処理でエラーになることがあります。
異常値エラーとのことですが、理由がよくわからないので今回は何回か試して問題のなかった[2005/01/01-2007/12/31]を分析期間としました。)

1
2
3
4
5
6
7
# 株価読み込み用モジュール
import pandas_datareader.data as web
import datetime as dt

start = dt.date(2005, 1, 1)
end = dt.date(2007, 12, 31)
web_data = web.DataReader('AMZN', 'yahoo', start, end)

読み込んだ株価データをグラフ化します。

1
2
3
plt.figure(figsize=(12, 8))
plt.plot(web_data['Close'])
plt.show()

株価データ

株価データを入力データとラベルデータに振り分けます。
今回は直前5回分のデータを入力データとします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
value_data = web_data['Close']
Total = len(value_data)

N = 5 # 直前何回分を入力データとするか
xdata = [] # 入力データ
tdata = [] # ラベルデータ
for k in range(N, Total):
xdata.append(value_data[k - N:k])
tdata.append(value_data[k])

xdata = np.array(xdata).astype(np.float32)
tdata = np.array(tdata).reshape(len(tdata), 1).astype(np.float32)

# 入力データの形を確認
D, N = xdata.shape

4層ニューラルネットワークを作成し、関数化します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 4層ニューラルネットワークを作成し、関数化
C = 1
H1 = 5
H2 = 5
H3 = 5
layers = {}
layers['l1'] = L.Linear(N, H1)
layers['l2'] = L.Linear(H1, H2)
layers['l3'] = L.Linear(H2, H3)
layers['l4'] = L.Linear(H3, C)
layers['bnorm1'] = L.BatchNormalization(H1)
layers['bnorm2'] = L.BatchNormalization(H2)
layers['bnorm3'] = L.BatchNormalization(H3)
NN = Chain(**layers)

def model(x):
h = NN.l1(x)
h = F.relu(h)
h = NN.bnorm1(h)

h = NN.l2(h)
h = F.relu(h)
h = NN.bnorm2(h)

h = NN.l3(h)
h = F.relu(h)
h = NN.bnorm3(h)

y = NN.l4(h)
return y

最適化手法を設定し、訓練データとテストデータに振り分けます。
訓練データとテストデータは半分ずつに分けます。

1
2
3
4
5
6
7
8
9
10
# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# データ分割
Dtrain = D // 2

xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata, 'off')

data = [xtrain, xtest, ttrain, ttest]

回帰分析を行います。今回は学習回数を1000回に設定しました。

1
train_loss, test_loss = learning_regression(model, optNN, data, 1000)

誤差と予測結果を表示します。

1
2
3
4
5
6
7
8
9
10
11
# 誤差の表示
show_graph(train_loss, test_loss, 'loss function', 'step', 'loss function', ymin=0.0, ymax=10.0)

ytrain = model(xtrain).data
ytest = model(xtest).data
time_data = np.arange(Total - N)
plt.figure(figsize=(12, 8))
plt.plot(time_data[0:Dtrain], ytrain, marker='x', linestyle='None') # 学習結果
plt.plot(time_data[Dtrain:Total], ytest, marker='o', linestyle='None') # 予測結果
plt.plot(time_data[0:Total - N], value_data[N:Total])
plt.show()

誤差
株価予測結果
いまいちな結果です。やはり株価を予測するのは無理なのでしょうか。

(Google Colaboratoryで動作確認しています。)

非線形関数一覧

ニューラルネットワークを構築する際に、どの非線形関数を選択するかは大事なポイントとなります。
そこでどんな非線形関数があるかそれぞれグラフ化して確認します。

まずは必要なモジュールのインストールとx軸用に等間隔の数値を用意します。

1
2
3
4
5
6
7
8
# モジュールのインポート
import chainer.functions as F
import numpy as np
import matplotlib.pyplot as plt

# 等間隔の数値を用意する(x軸用)
D = 100
ndata = np.linspace(-5.0, 5.0, D)

非線形関数をグラフ化するコードは下記の通りです。
1行目のF.absoluteの箇所をそれぞれの非線形関数に置き換えて実行していきます。

1
2
3
ydata = F.absolute(ndata).data   # 非線形関数を変えながら実行する。
plt.plot(ndata, ydata)
plt.show()
非線形関数一覧
F.absolute F.add F.arccos
F.arcsin F.arctan F.broadcast
F.ceil F.clipped_relu F.cos
F.cosh F.cumprod F.cumsum
F.digamma F.dropout F.elu
F.erf F.erfc F.erfcinv
F.erfcx F.erfinv F.exp
F.expm1 F.fix F.flatten
F.flipud F.floor F.hard_sigmoid
F.identity F.leaky_relu F.lgamma
F.log F.log10 F.log1p
F.log2 F.log_ndtr F.ndtr
F.ndtri F.relu F.rrelu
F.rsqrt F.selu F.sigmoid
F.sign F.sin F.sinh
F.softplus F.sqrt F.square
F.squeeze F.tan F.tanh
F.transpose

(Google Colaboratoryで動作確認しています。)

ニューラルネットワークで回帰分析

ニューラルネットワークで回帰分析をします。
まずは必要なモジュールのインポートと共通関数の定義をします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# モジュールのインポート
import numpy as np
import matplotlib.pyplot as plt

import chainer.optimizers as Opt
import chainer.functions as F
import chainer.links as L
from chainer import Variable, Chain, config, cuda

# -------------- #
# 共通関数の定義 #
# -------------- #
# データ振り分け関数
def data_divide(Dtrain, D, xdata, tdata):
index = np.random.permutation(range(D))
xtrain = xdata[index[0:Dtrain],:]
ttrain = tdata[index[0:Dtrain]]
xtest = xdata[index[Dtrain:D],:]
ttest = tdata[index[Dtrain:D]]
return xtrain, xtest, ttrain, ttest

# グラフ表示関数
def show_graph(result1, result2, title, xlabel, ylabel, ymin=0.0, ymax=1.0):
# 学習記録の表示(誤差)
Tall = len(result1)
plt.figure(figsize=(8, 6))
plt.plot(range(Tall), result1, label='train')
plt.plot(range(Tall), result2, label='test')
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.xlim([0, Tall])
plt.ylim(ymin, ymax)
plt.legend()
plt.show()

# 回帰関数の定義
def learning_regression(model, optNN, data, T=10):
train_loss = []
test_loss = []
for time in range(T):
config.train = True
optNN.target.cleargrads() # zerogradsより記憶容量の確保にいい
ytrain = model(data[0])
loss_train = F.mean_squared_error(ytrain, data[2])
loss_train.backward()
optNN.update()

config.train = False
ytest = model(data[1])
loss_test = F.mean_squared_error(ytest, data[3])

train_loss.append(loss_train.data)
test_loss.append(loss_test.data)
return train_loss, test_loss

データを準備します。
x軸を-5から5までの等間隔100個分準備し、その後適当な関数でy軸に変換します。

1
2
3
4
5
6
7
8
# 等間隔の数値を用意する(x軸用)
D = 100
ndata = np.linspace(-5.0, 5.0, D)

# 関数でデータ作成
N = 1
xdata = ndata.reshape(D, N).astype(np.float32) # 2次元配列に変換(100行1列)
tdata = (np.sin(ndata) + np.sin(2.0 * ndata)).reshape(D, N).astype(np.float32)

作成したデータを視覚化します。

1
2
3
# 作成したデータをグラフ表示
plt.plot(xdata, tdata)
plt.show()

結果

ニューラルネットワークを作成し関数化します。
非線形関数はreluを使用します。

1
2
3
4
5
6
7
8
9
10
11
# 回帰のニューラルネットワークを作成し、関数化
C = 1 # 出力数
H = 20 # 中間層の数
NN = Chain(l1=L.Linear(N, H), l2=L.Linear(H, C), bnorm1=L.BatchNormalization(H))

def model(x):
h = NN.l1(x)
h = F.relu(h)
h = NN.bnorm1(h)
y = NN.l2(h)
return y

参考までに学習前のニューラルネットワークの状態を確認します。
まずは重さです。

1
2
# ニューラルネットワークの状態調査
print(NN.l1.W.data) # 最初はでたらめな重さ

結果
初期値としてでたらめな重さが設定されています。

次に勾配を表示してみます。

1
2
# ニューラルネットワークの勾配表示
print(NN.l1.W.grad) # 全て未計算(Not a Number)

結果
こちらはnan(Not a number)が設定されています。

最適化手法を設定し学習を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# 学習結果記録用
train_loss = []
test_loss = []

Dtrain = D // 2 # 訓練データ数
xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata) # データ分割
data = [xtrain, xtest, ttrain, ttest]
result = [train_loss, test_loss]

train_loss, test_loss = learning_regression(model, optNN, data, 1000)

show_graph(train_loss, test_loss, 'loss function', 'step', 'loss function', 0.0, 3.0)

学習結果の誤差は下記のようになります。
結果

次第に減少してはいますが0.2から0.3付近で改善しなくなっています。

回帰の結果を比較します。

1
2
3
4
5
6
7
8
# 回帰の結果を比較する
config.train = False
ytrain = model(xtrain).data
ytest = model(xtest).data
plt.plot(xtrain, ytrain, marker='x', linestyle='None')
plt.plot(xtest, ytest, marker='o', linestyle='None')
plt.plot(xdata, tdata)
plt.show()

結果
結果はぜんぜん合っていないようです。なんらかの改善策をとる必要がありそうです。

非線形関数調整してみます。
今回使ったreluの形は下記のようになっており、今回学習するデータには合わないようです。

1
2
3
4
# F.reluの形を調べる
ydata = F.relu(xdata).data
plt.plot(xdata, ydata)
plt.show()

結果

別の非線形関数sigmoidの形を調べると、reluよりは今回学習するデータに近いようにみえます。

1
2
3
4
# F.sigmoidの形を調べる
ydata = F.sigmoid(xdata).data
plt.plot(xdata, ydata)
plt.show()

結果

非線形関数をreluからsigmoidに変更し、同じように学習・結果表示を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 回帰のニューラルネットワークを作成し、関数化
C = 1 # 出力数
H = 20 # 中間層の数
NN = Chain(l1=L.Linear(N, H), l2=L.Linear(H, C), bnorm1=L.BatchNormalization(H))

def model(x):
h = NN.l1(x)
h = F.sigmoid(h) # シグモイドに変更
h = NN.bnorm1(h)
h = NN.l2(h)
return h

# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# 学習結果記録用
train_loss = []
test_loss = []

Dtrain = D // 2 # 訓練データ数
xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata)
data = [xtrain, xtest, ttrain, ttest]
result = [train_loss, test_loss]

train_loss, test_loss = learning_regression(model, optNN, data, 1000)

show_graph(train_loss, test_loss, 'loss function', 'step', 'loss function', 0.0, 3.0)

結果
誤差に関しては2つの非線形関数でそれほど結果が変わりませんでした。

回帰の結果を確認してみます。

1
2
3
4
5
6
7
8
# 回帰の結果を比較する
config.train = False
ytrain = model(xtrain).data
ytest = model(xtest).data
plt.plot(xtrain, ytrain, marker='x', linestyle='None')
plt.plot(xtest, ytest, marker='o', linestyle='None')
plt.plot(xdata, tdata)
plt.show()

結果
reluよりはsigmoidを使った方がだいぶましになったように思われます。

ただまだまだ納得いく結果ではありません。
ほかの非線形関数を調べたところsin関数というのがありました。

1
2
3
4
# F.sinの形を調べる
ydata = F.sin(xdata).data
plt.plot(xdata, ydata)
plt.show()

結果
この関数はかなり今回回帰分析を行いたいグラフと形が似ているようです。

非線形関数をsinに変更してまたまた回帰分析を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 回帰のニューラルネットワークを作成し、関数化
C = 1 # 出力数
H = 20 # 中間層の数
NN = Chain(l1=L.Linear(N, H), l2=L.Linear(H, C), bnorm1=L.BatchNormalization(H))

def model(x):
h = NN.l1(x)
h = F.sin(h) # サインに変更
h = NN.bnorm1(h)
h = NN.l2(h)
return h

# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# 学習結果記録用
train_loss = []
test_loss = []

Dtrain = D // 2 # 訓練データ数
xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata)
data = [xtrain, xtest, ttrain, ttest]
result = [train_loss, test_loss]

train_loss, test_loss = learning_regression(model, optNN, data, 1000)

show_graph(train_loss, test_loss, 'loss function', 'step', 'loss function', 0.0, 3.0)

結果
誤差は十分に下がることが確認できました。

回帰の結果も確認してみます。

1
2
3
4
5
6
7
8
# 回帰の結果を比較する
config.train = False
ytrain = model(xtrain).data
ytest = model(xtest).data
plt.plot(xtrain, ytrain, marker='x', linestyle='None')
plt.plot(xtest, ytest, marker='o', linestyle='None')
plt.plot(xdata, tdata)
plt.show()

結果
完璧な結果がでました。
ただ今回はグラフの見た目でそれに近い非線形関数を使うという少しずるい手順を踏んでしましました。

relu関数のまま結果を改善することはできないのでしょうか。
試しにニューラルネットワークの階層を増やしてみました。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 4層のニューラルネットワークを作成し関数化
C = 1 # 出力の種類
H1 = 5
H2 = 5
H3 = 5
layers = {}
layers['l1'] = L.Linear(N, H1)
layers['l2'] = L.Linear(H1, H2)
layers['l3'] = L.Linear(H2, H3)
layers['l4'] = L.Linear(H3, C)
layers['bnorm1'] = L.BatchNormalization(H1)
layers['bnorm2'] = L.BatchNormalization(H2)
layers['bnorm3'] = L.BatchNormalization(H3)
NN = Chain(**layers)

def model(x):
h = NN.l1(x)
h = F.relu(h)
h = NN.bnorm1(h)
h = NN.l2(h)
h = F.relu(h)
h = NN.bnorm2(h)
h = NN.l3(h)
h = F.relu(h)
h = NN.bnorm3(h)
y = NN.l4(h)
return y

# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# 学習結果記録用
train_loss = []
test_loss = []

Dtrain = D // 2 # 訓練データ数
xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata)
data = [xtrain, xtest, ttrain, ttest]
result = [train_loss, test_loss]

train_loss, test_loss = learning_regression(model, optNN, data, 1000)

show_graph(train_loss, test_loss, 'loss function', 'step', 'loss function', 0.0, 3.0)

# 回帰の結果を比較する
config.train = False
ytrain = model(xtrain).data
ytest = model(xtest).data
plt.plot(xtrain, ytrain, marker='x', linestyle='None')
plt.plot(xtest, ytest, marker='o', linestyle='None')
plt.plot(xdata, tdata)
plt.show()

結果は以下の通りです。
結果
誤差は十分に下がっています。

結果
回帰の結果もなかなか良いものがでました。回帰分析の結果を改善するためには非線形関数を変更する方法とは別に、ニューラルネットワークの階層を深くするのも有効な手段だと言えそうです。

(Google Colaboratoryで動作確認しています。)

GPUを使ってFashion-MNIST

Google ColaboratoryではGPUを簡単に使うことができます。
メニューから[ランタイム]->[ランタイムのタイプを変更]で表示されるダイアログから[ハードウェアアクセラレータ]でGPUを選択するだけです。

またソースもGPU用にいくつか変更しなくてはなりません。
コメントに # GPU対応対応 を追加しておきましたので参考にしてください。

まずは必要なモジュールをインポートし、共通関数(グラフ表示、学習)を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# モジュールのインポート
import numpy as np
import matplotlib.pyplot as plt

import chainer.optimizers as Opt
import chainer.functions as F
import chainer.links as L
from chainer import Variable, Chain, config, cuda

# -------------- #
# 共通関数の定義 #
# -------------- #
# グラフ表示関数
def show_graph(result1, result2, title, xlabel, ylabel, ymin=0.0, ymax=1.0):
# 学習記録の表示(誤差)
Tall = len(result1)
plt.figure(figsize=(8, 6))
plt.plot(range(Tall), result1, label='train')
plt.plot(range(Tall), result2, label='test')
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.xlim([0, Tall])
plt.ylim(ymin, ymax)
plt.legend()
plt.show()

# 学習用関数
def learning(model, optNN, data, T=10):
# 学習記録用エリア(返値)
train_loss = []
train_acc = []
test_loss = []
test_acc = []

data = cuda.to_gpu(data, gpu_device) # GPU対応
for time in range(T):
# 学習
config.train = True
optNN.target.cleargrads() # zerogradsより記憶容量の確保にいい
ytrain = model(data[0])
loss_train = F.softmax_cross_entropy(ytrain, data[2])
acc_train = F.accuracy(ytrain, data[2])
loss_train.backward()
optNN.update()

# テスト(検証)
config.train = False
ytest = model(data[1])
loss_test = F.softmax_cross_entropy(ytest, data[3])
acc_test = F.accuracy(ytest, data[3])

# 結果の記録
train_loss.append(cuda.to_cpu(loss_train.data)) # GPU対応
train_acc.append(cuda.to_cpu(acc_train.data)) # GPU対応
test_loss.append(cuda.to_cpu(loss_test.data)) # GPU対応
test_acc.append(cuda.to_cpu(acc_test.data)) # GPU対応
return train_loss, test_loss, train_acc, test_acc

GPU利用の設定を行い、ニューラルネットワークの作成と関数化を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# GPUの利用
gpu_device = 0 # GPU対応
cuda.get_device(gpu_device).use() # GPU対応
NN.to_gpu() # GPU対応

# ニューラルネットワークの作成
C = ttrain.max() + 1
NN = Chain(l1=L.Linear(N, 400).to_gpu() , l2=L.Linear(400, C).to_gpu(), # GPU対応
bnorm1=L.BatchRenormalization(400).to_gpu()) # GPU対応

def model(x):
h = NN.l1(x)
h = F.relu(h)
h = NN.bnorm1(h) # バッチの規格化(途中結果の整理)
h = NN.l2(h)
return h

ファッションMNISTのデータセットを読み込み訓練データとテストデータに振り分けます。

1
2
3
4
# fashion_mnistのデータセット読込んでデータ分割
train, test = ds.get_fashion_mnist()
xtrain, ttrain = con.concat_examples(train) # 訓練データ
xtest, ttest = con.concat_examples(test) # テスト(検証)データ

読み込んだデータを2つほど視覚化してみます。

1
2
3
4
5
6
7
# データの視覚化
Dtrain, N = xtrain.shape
print(Dtrain, N)
plt.imshow(xtrain[0,:].reshape(28, 28))
plt.show()
plt.imshow(xtrain[10,:].reshape(28, 28))
plt.show()

結果

最適化手法を設定し、学習させてから、誤差と正解率のグラフを表示します。

1
2
3
4
5
6
7
8
9
10
# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# 学習を実行
train_loss, test_loss, train_acc, test_acc = learning(model, optNN, [xtrain, xtest, ttrain, ttest], 100)

# 学習結果をグラフ表示
show_graph(train_loss, test_loss, 'loss function', 'step', 'lossfunction', 0.0, 4.0)
show_graph(train_acc, test_acc, 'accuracy', 'step', 'accuracy')

誤差
正解率

正解率は8割程度となりました。悪くない数字かもしれませんがまだまだ改善の余地はありそうです。

(Google Colaboratoryで動作確認しています。)

バッチ規格化とバッチ再規格化の比較

ニューラルネットワークが複雑になり学習がうまくいかない場合にはバッチ規格化を行うといいです。
バッチ規格化を使うと途中の結果を整理しながら学習をすることができます。
またバッチ再規格化というものがあり、バッチ規格よりもよい性能を示すとのことなのでこの2つを比較してみます。

まずchainerよりMNISTデータを読み込みます。

1
2
3
4
import chainer.datasets as ds

# MNISTデータの読み込み
train, test = ds.get_mnist()

結果

次に読み込んだデータを訓練データとテストデータに分けます。
chainerにはconverくらすにconcat_examplesというメソッドがあり容易にデータの振り分けを行うことができます。

1
2
3
4
5
6
7
8
# 入力データとラベルデータの設定
import chainer.dataset.convert as con
# 訓練データとテストデータ分ける
xtrain, ttrain = con.concat_examples(train)
xtest, ttest = con.concat_examples(test)

print(xtrain.shape, ttrain.shape)
print(xtest.shape, ttest.shape)

結果

振り分けたデータの1つをmatplotlibを使って表示してみます。

1
2
3
4
5
6
7
8
import matplotlib.pyplot as plt

# データの形とMNIST originalの1列を表示する
Dtrain, N = xtrain.shape
plt.imshow(xtrain[0,:].reshape(28, 28))
plt.show()

print('ラベル(答え)=>', ttrain[0])

結果

本題のバッチ規格化を設定します。
まずニューラルネットワークを作成する際にbnorm1=L.BatchNormalization(400)(9行目)を引数に渡し、またニューラルネットワークの関数でNN.bnorm1関数(15行目)をはさみます。
バッチ再規格をする場合は、BatchNormalizationの代わりにBatchRenormalization(10行目)を指定します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import chainer.optimizers as Opt
import chainer.functions as F
import chainer.links as L
from chainer import Variable, Chain, config

# バッチ規格化(途中の結果を整理整頓しながら学習する)
C = ttrain.max() + 1
NN = Chain(l1=L.Linear(N, 400), l2=L.Linear(400, C),
bnorm1=L.BatchNormalization(400))
# bnorm1=L.BatchRenormalization(400)) #バッチの再規格化(Chainer version4以降)

def model(x):
h = NN.l1(x)
h = F.relu(h)
h = NN.bnorm1(h) # バッチの規格化(途中結果の整理)
h = NN.l2(h)
return h

次に学習用関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 学習用関数
def learning(model, optNN, data, T=50):
# 学習記録用エリア(返値)
train_loss = []
train_acc = []
test_loss = []
test_acc = []

for time in range(T):
# 学習
config.train = True
optNN.target.zerograds()
ytrain = model(data[0])
loss_train = F.softmax_cross_entropy(ytrain, data[2])
acc_train = F.accuracy(ytrain, data[2])
loss_train.backward()
optNN.update()

# テスト(検証)
config.train = False
ytest = model(data[1])
loss_test = F.softmax_cross_entropy(ytest, data[3])
acc_test = F.accuracy(ytest, data[3])

# 結果の記録
train_loss.append(loss_train.data)
train_acc.append(acc_train.data)
test_loss.append(loss_test.data)
test_acc.append(acc_test.data)
return train_loss, test_loss, train_acc, test_acc

グラフ表示用関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# グラフ表示関数
def show_graph(result1, result2, title, xlabel, ylabel, ymin=0.0, ymax=1.0):
# 学習記録の表示(誤差)
Tall = len(result1)
plt.figure(figsize=(8, 6))
plt.plot(range(Tall), result1, label='train')
plt.plot(range(Tall), result2, label='test')
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.xlim([0, Tall])
plt.ylim(ymin, ymax)
plt.legend()
plt.show()

最適化手法を設定し、学習を実行したのち結果をグラフ表示します。

1
2
3
4
5
6
7
8
9
10
# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# 学習を実行
train_loss, test_loss, train_acc, test_acc = learning(model, optNN, [xtrain, xtest, ttrain, ttest])

# 学習結果をグラフ表示
show_graph(train_loss, test_loss, 'loss function', 'step', 'lossfunction', 0.0, 4.0)
show_graph(train_acc, test_acc, 'accuracy', 'step', 'accuracy')

バッチ規格化とバッチ再規格化をそれぞれ実行した結果は下記の通りです。

バッチ規格化 バッチ再規格化
誤差 誤差
正解率 正解率

ほぼ同じ結果で明確な違いがないような気がします。。。

(Google Colaboratoryで動作確認しています。)

ニューラルネットワークで画像認識

ニューラルネットワークで手書き文字の認識をします。有名なMNIST(エムニスト)です。
まず必要なモジュールをインポートします。

1
2
3
4
# モジュール読み込み
import numpy as np
import matplotlib.pyplot as plt
import sklearn.datasets as ds

MNISTデータを読み込み、確認のために画像として表示してみます。

1
2
3
4
5
6
7
8
9
10
11
# MNISTデータの読み込み
MNIST = ds.load_digits()
xdata = MNIST.data.astype(np.float32)
tdata = MNIST.target.astype(np.int32)

# 配列の形を確認
D, N = xdata.shape

# 画像データの表示
plt.imshow(xdata[0,:].reshape(8, 8)) # 1列に並んだデータを8行8列に変換
plt.show()

結果

データ分割関数を定義し、実行します。今回、訓練データと学習データはちょうど半分ずつにしています。

1
2
3
4
5
6
7
8
9
10
11
# データ分割関数
def data_divide(Dtrain, D, xdata, tdata):
index = np.random.permutation(range(D))
xtrain = xdata[index[0:Dtrain],:]
ttrain = tdata[index[0:Dtrain]]
xtest = xdata[index[Dtrain:D],:]
ttest = tdata[index[Dtrain:D]]
return xtrain, xtest, ttrain, ttest

Dtrain = D // 2
xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata)

chainerの宣言をします。

1
2
3
4
5
# chainerの宣言
import chainer.optimizers as Opt
import chainer.functions as F
import chainer.links as L
from chainer import Variable, Chain, config

ニューラルネットワークを作成し、ニューラルネットワークの関数を定義します。
また誤差と正解率の遷移を記録する変数を用意します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 2層のニューラルネットワークを作成
C = tdata.max() + 1
NN = Chain(l1=L.Linear(N, 20), l2=L.Linear(20, C))

# 2層ニューラルネットワークの関数化
def model(x):
h = NN.l1(x)
h = F.relu(h)
y = NN.l2(h)
return y

# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# 学習記録用エリア
train_loss = []
train_acc = []
test_loss = []
test_acc = []

最適化を行います。今回は200回学習を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 最適化
T = 200
for time in range(T):
# 学習
config.train = True
optNN.target.zerograds()
ytrain = model(xtrain)
loss_train = F.softmax_cross_entropy(ytrain, ttrain)
acc_train = F.accuracy(ytrain, ttrain)
loss_train.backward()
optNN.update()

# テスト(検証)
config.train = False
ytest = model(xtest)
loss_test = F.softmax_cross_entropy(ytest, ttest)
acc_test = F.accuracy(ytest, ttest)

# 結果の記録
train_loss.append(loss_train.data)
test_loss.append(loss_test.data)
train_acc.append(acc_train.data)
test_acc.append(acc_test.data)

グラフ表示用の関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# グラフ表示関数
def show_graph(result1, result2, title, xlabel, ylabel, ymin=0.0, ymax=1.0):
# 学習記録の表示(誤差)
Tall = len(result1)
plt.figure(figsize=(8, 6))
plt.plot(range(Tall), result1, label='train')
plt.plot(range(Tall), result2, label='test')
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.xlim([0, Tall])
plt.ylim(ymin, ymax)
plt.legend()
plt.show()

誤差と正解率の遷移をグラフ表示します。

1
2
3
# 誤差と正解率をグラフ表示
show_graph(train_loss, test_loss, 'loss function', 'step', 'loss_function', 0.0, 4.0)
show_graph(train_acc, test_acc, 'accuracy', 'step', 'accuracy')

結果

順調に誤差が減少し、正解率が上昇していることが見てとれます。
ただ正解率が9割そこそこなのが少々不満です。
非線形関数や最適化手法を変えて改善する余地はありそうです。

(Google Colaboratoryで動作確認しています。)