強化学習6 (TD法)

前回は1エピソードをプレイした実績に基づく更新を行うモンテカルロ法による学習を試してみました。
今回は1回の行動ごとに更新を行うTD法と試してみます。

モンテカルロ法に比べて行動修正のスピードが早いため学習が効率的です。
ただ更新は見積りベースになるためモンテカルロ法より適切な行動をとれるかどうか不確実になります。

まずはエージェントのベースになるクラスを実装します。(強化学習5 (モンテカルロ法)と同様です。)

el_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import numpy as np
import matplotlib.pyplot as plt

class ELAgent():

def __init__(self, epsilon):
self.Q = {}
self.epsilon = epsilon
self.reward_log = []

def policy(self, s, actions):
if np.random.random() < self.epsilon:
# ランダムな行動(探索)
return np.random.randint(len(actions))
else:
# self.Q => 状態における行動の価値
# self.Q[s] => 状態sで行動aをとる場合の価値
if s in self.Q and sum(self.Q[s]) != 0:
# 価値評価に基づき行動(活用)
return np.argmax(self.Q[s])
else:
# ランダムな行動(探索)
return np.random.randint(len(actions))

# 報酬の記録を初期化
def init_log(self):
self.reward_log = []

# 報酬の記録
def log(self, reward):
self.reward_log.append(reward)

def show_reward_log(self, interval=50, episode=-1):
# そのepsilonの報酬をグラフ表示
if episode > 0:
rewards = self.reward_log[-interval:]
mean = np.round(np.mean(rewards), 3)
std = np.round(np.std(rewards), 3)
print("At Episode {} average reward is {} (+/-{}).".format(
episode, mean, std))
# 今までに獲得した報酬をグラフ表示
else:
indices = list(range(0, len(self.reward_log), interval))
means = []
stds = []
for i in indices:
rewards = self.reward_log[i:(i + interval)]
means.append(np.mean(rewards))
stds.append(np.std(rewards))
means = np.array(means)
stds = np.array(stds)
plt.figure()
plt.title("Reward History")
plt.grid()
plt.fill_between(indices, means - stds, means + stds,
alpha=0.1, color="g")
plt.plot(indices, means, "o-", color="g",
label="Rewards for each {} episode".format(interval))
plt.legend(loc="best")
plt.show()

次に環境を扱うためのクラスを実装します。

FrozenLakeEasy-v0は、強化学習を行うための環境を提供するライブラリOpenAI Gymの環境の1つです。
4 x 4 マスの迷路でところどころに穴があいていて穴に落ちるとゲーム終了となります。
穴に落ちずにゴールに到着すると報酬が得られます。(強化学習5 (モンテカルロ法)と同様です。)

frozen_lake_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import gym
from gym.envs.registration import register
# スリップする設定(is_slippery)をオフ設定(学習に時間がかかるため)
register(id="FrozenLakeEasy-v0", entry_point="gym.envs.toy_text:FrozenLakeEnv", kwargs={"is_slippery": False})

def show_q_value(Q):
"""
FrozenLake-v0環境での価値は下記の通り。
各行動での評価を表しています。
+----+------+----+
| | 上 | |
| 左 | 平均 | 右 |
| | 下 | |
+-----+------+----+
"""
env = gym.make("FrozenLake-v0")
nrow = env.unwrapped.nrow
ncol = env.unwrapped.ncol
state_size = 3
q_nrow = nrow * state_size
q_ncol = ncol * state_size
reward_map = np.zeros((q_nrow, q_ncol))

for r in range(nrow):
for c in range(ncol):
s = r * nrow + c
state_exist = False
if isinstance(Q, dict) and s in Q:
state_exist = True
elif isinstance(Q, (np.ndarray, np.generic)) and s < Q.shape[0]:
state_exist = True

if state_exist:
# At the display map, the vertical index is reversed.
_r = 1 + (nrow - 1 - r) * state_size
_c = 1 + c * state_size
reward_map[_r][_c - 1] = Q[s][0] # 左 = 0
reward_map[_r - 1][_c] = Q[s][1] # 下 = 1
reward_map[_r][_c + 1] = Q[s][2] # 右 = 2
reward_map[_r + 1][_c] = Q[s][3] # 上 = 3
reward_map[_r][_c] = np.mean(Q[s]) # 中央

# 各状態・行動の評価を表示
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
plt.imshow(reward_map, cmap=cm.RdYlGn, interpolation="bilinear",
vmax=abs(reward_map).max(), vmin=-abs(reward_map).max())
# 報酬推移を表示
ax.set_xlim(-0.5, q_ncol - 0.5)
ax.set_ylim(-0.5, q_nrow - 0.5)
ax.set_xticks(np.arange(-0.5, q_ncol, state_size))
ax.set_yticks(np.arange(-0.5, q_nrow, state_size))
ax.set_xticklabels(range(ncol + 1))
ax.set_yticklabels(range(nrow + 1))
ax.grid(which="both")
plt.show()

TD法での学習を実行します。
(このファイルだけ強化学習5 (モンテカルロ法)と異なります。)

q_learning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from collections import defaultdict
import gym
from el_agent import ELAgent
from frozen_lake_util import show_q_value

class QLearningAgent(ELAgent):

def __init__(self, epsilon=0.1):
super().__init__(epsilon)

def learn(self, env, episode_count=1000, gamma=0.9,
learning_rate=0.1, render=False, report_interval=50):
self.init_log()
actions = list(range(env.action_space.n)) # actions = [0, 1, 2, 3]
# self.Q Qテーブル(ある状態sにおける行動aの報酬を管理)
self.Q = defaultdict(lambda: [0] * len(actions))
for e in range(episode_count):
s = env.reset()
done = False
while not done:
if render:
env.render()
a = self.policy(s, actions)
n_state, reward, done, info = env.step(a)

# reward => 報酬
# gamma => 割引率
# max(self.Q[n_state]) => 価値が最大になるような行動をとる
gain = reward + gamma * max(self.Q[n_state])
estimated = self.Q[s][a]
# Qテーブルの更新(状態sの行動aにおける報酬を管理)
# learning_rate => 学習率
# estimated => 既存の見積り
self.Q[s][a] += learning_rate * (gain - estimated)
s = n_state
else:
self.log(reward)

if e != 0 and e % report_interval == 0:
self.show_reward_log(episode=e)

# 学習を行う
def train():
agent = QLearningAgent()
env = gym.make("FrozenLakeEasy-v0")
agent.learn(env, episode_count=500)
show_q_value(agent.Q)
agent.show_reward_log()

if __name__ == "__main__":
train()
FrozenLake 各行動の評価

全体的にゴール向かう行動が高く評価されていることがわかります。

エピソード実行回数と獲得報酬平均の推移は次のようになります。
エピソード数と獲得報酬平均の推移

モンテカルロ法と同様にうまく学習できています。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習5 (モンテカルロ法)

モンテカルロ法は行動の修正を実績に基づき行う方法です。
エピソードが終了した後で、獲得できた報酬の総和をもとに修正を行うシンプルな方法です。

まずはエージェントのベースになるクラスを実装します。

el_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import numpy as np
import matplotlib.pyplot as plt

class ELAgent():

def __init__(self, epsilon):
self.Q = {}
self.epsilon = epsilon
self.reward_log = []

def policy(self, s, actions):
if np.random.random() < self.epsilon:
# ランダムな行動(探索)
return np.random.randint(len(actions))
else:
# self.Q => 状態における行動の価値
# self.Q[s] => 状態sで行動aをとる場合の価値
if s in self.Q and sum(self.Q[s]) != 0:
# 価値評価に基づき行動(活用)
return np.argmax(self.Q[s])
else:
# ランダムな行動(探索)
return np.random.randint(len(actions))

# 報酬の記録を初期化
def init_log(self):
self.reward_log = []

# 報酬の記録
def log(self, reward):
self.reward_log.append(reward)

def show_reward_log(self, interval=50, episode=-1):
# そのepsilonの報酬をグラフ表示
if episode > 0:
rewards = self.reward_log[-interval:]
mean = np.round(np.mean(rewards), 3)
std = np.round(np.std(rewards), 3)
print("At Episode {} average reward is {} (+/-{}).".format(
episode, mean, std))
# 今までに獲得した報酬をグラフ表示
else:
indices = list(range(0, len(self.reward_log), interval))
means = []
stds = []
for i in indices:
rewards = self.reward_log[i:(i + interval)]
means.append(np.mean(rewards))
stds.append(np.std(rewards))
means = np.array(means)
stds = np.array(stds)
plt.figure()
plt.title("Reward History")
plt.grid()
plt.fill_between(indices, means - stds, means + stds,
alpha=0.1, color="g")
plt.plot(indices, means, "o-", color="g",
label="Rewards for each {} episode".format(interval))
plt.legend(loc="best")
plt.show()

次に環境を扱うためのクラスを実装します。

FrozenLakeEasy-v0は、強化学習を行うための環境を提供するライブラリOpenAI Gymの環境の1つです。
4 x 4 マスの迷路でところどころに穴があいていて穴に落ちるとゲーム終了となります。
穴に落ちずにゴールに到着すると報酬が得られます。

frozen_lake_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import gym
from gym.envs.registration import register
# スリップする設定(is_slippery)をオフ設定(学習に時間がかかるため)
register(id="FrozenLakeEasy-v0", entry_point="gym.envs.toy_text:FrozenLakeEnv", kwargs={"is_slippery": False})

def show_q_value(Q):
"""
FrozenLake-v0環境での価値は下記の通り。
各行動での評価を表しています。
+----+------+----+
| | 上 | |
| 左 | 平均 | 右 |
| | 下 | |
+-----+------+----+
"""
env = gym.make("FrozenLake-v0")
nrow = env.unwrapped.nrow
ncol = env.unwrapped.ncol
state_size = 3
q_nrow = nrow * state_size
q_ncol = ncol * state_size
reward_map = np.zeros((q_nrow, q_ncol))

for r in range(nrow):
for c in range(ncol):
s = r * nrow + c
state_exist = False
if isinstance(Q, dict) and s in Q:
state_exist = True
elif isinstance(Q, (np.ndarray, np.generic)) and s < Q.shape[0]:
state_exist = True

if state_exist:
# At the display map, the vertical index is reversed.
_r = 1 + (nrow - 1 - r) * state_size
_c = 1 + c * state_size
reward_map[_r][_c - 1] = Q[s][0] # 左 = 0
reward_map[_r - 1][_c] = Q[s][1] # 下 = 1
reward_map[_r][_c + 1] = Q[s][2] # 右 = 2
reward_map[_r + 1][_c] = Q[s][3] # 上 = 3
reward_map[_r][_c] = np.mean(Q[s]) # 中央

# 各状態・行動の評価を表示
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
plt.imshow(reward_map, cmap=cm.RdYlGn, interpolation="bilinear",
vmax=abs(reward_map).max(), vmin=-abs(reward_map).max())
# 報酬推移を表示
ax.set_xlim(-0.5, q_ncol - 0.5)
ax.set_ylim(-0.5, q_nrow - 0.5)
ax.set_xticks(np.arange(-0.5, q_ncol, state_size))
ax.set_yticks(np.arange(-0.5, q_nrow, state_size))
ax.set_xticklabels(range(ncol + 1))
ax.set_yticklabels(range(nrow + 1))
ax.grid(which="both")
plt.show()

モンテカルロ法での学習を実行します。

monte_carlo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import math
from collections import defaultdict
import gym
from el_agent import ELAgent
from frozen_lake_util import show_q_value

class MonteCarloAgent(ELAgent):

def __init__(self, epsilon=0.1):
super().__init__(epsilon)

def learn(self, env, episode_count=1000, gamma=0.9,
render=False, report_interval=50):
self.init_log()
actions = list(range(env.action_space.n))
self.Q = defaultdict(lambda: [0] * len(actions))
N = defaultdict(lambda: [0] * len(actions))

for e in range(episode_count):
s = env.reset()
done = False
# エピソードの終了まで実行する
experience = []
while not done:
if render:
env.render()
a = self.policy(s, actions)
n_state, reward, done, info = env.step(a)
experience.append({"state": s, "action": a, "reward": reward})
s = n_state
else:
self.log(reward)

# 各状態・各行動を評価する。
for i, x in enumerate(experience):
s, a = x["state"], x["action"]

# Calculate discounted future reward of s.
# 状態s
G, t = 0, 0
for j in range(i, len(experience)):
G += math.pow(gamma, t) * experience[j]["reward"]
t += 1

N[s][a] += 1 # count of s, a pair
alpha = 1 / N[s][a]
self.Q[s][a] += alpha * (G - self.Q[s][a])

if e != 0 and e % report_interval == 0:
self.show_reward_log(episode=e)

def train():
agent = MonteCarloAgent(epsilon=0.1)
env = gym.make("FrozenLakeEasy-v0")
agent.learn(env, episode_count=500)
show_q_value(agent.Q)
agent.show_reward_log()

if __name__ == "__main__":
train()

FrozenLakeの設定と各行動の評価結果は下記のようになります。
緑色が濃いほど評価が高いことを意味します。

FrozenLake 各行動の評価

全体的にゴール向かう行動が高く評価されていることがわかります。

エピソード実行回数と獲得報酬平均の推移は次のようになります。
エピソード数と獲得報酬平均の推移
エピソード実行が50回付近で報酬が1近くに達していて学習がだいたい完了していることがわかります。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習4 (経験から計画を立てる)

遷移関数や報酬関数が分からない場合は、行動してみて状態の遷移や得られる報酬を調べていくことになります。
今回は、Epsilon-Greedy法で探索(経験を蓄積する)と活用(行動する)の割合によって報酬がどのように変わっていくかを調査します。

何枚かのコインから1枚を選んで、投げた時に表がでれば報酬が得られるゲームで実装しています。
以下がコードになります。(コメントにて処理を確認して頂ければ幸いです)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import random
import numpy as np


class CoinToss():

# head_probsは各コインの表が出る確率
def __init__(self, head_probs, max_episode_steps=30):
self.head_probs = head_probs
self.max_episode_steps = max_episode_steps
self.toss_count = 0

def __len__(self):
return len(self.head_probs)

def reset(self):
self.toss_count = 0

# actionはコインのindex
def step(self, action):
# max_episode_stepsはデフォルト30回
final = self.max_episode_steps - 1
if self.toss_count > final:
raise Exception("The step count exceeded maximum. Please reset env.")
else:
# 終了確認
done = True if self.toss_count == final else False

if action >= len(self.head_probs):
raise Exception("The No.{} coin doesn't exist.".format(action))
else:
head_prob = self.head_probs[action]
if random.random() < head_prob:
reward = 1.0
else:
reward = 0.0
self.toss_count += 1
return reward, done


class EpsilonGreedyAgent():

def __init__(self, epsilon):
self.epsilon = epsilon
self.V = []

def policy(self):
coins = range(len(self.V))
# random関数は0.0から1.0の値を返す。
if random.random() < self.epsilon:
# 活用優先(行動する)
return random.choice(coins) # コインのindexをランダムに返す
else:
# 探索優先(経験を蓄積)
return np.argmax(self.V) # 配列中の最大値のindexを返す

def play(self, env):
# 初期化
# len(env)はコインごとの表がでる確率の配列
N = [0] * len(env) # [0, 0, 0, 0, ・・・]という配列ができる
self.V = [0] * len(env)

env.reset()
done = False
rewards = []
while not done:
# ランダムに選ばれたコインか、平均報酬が一番高いコインが返る
selected_coin = self.policy()
reward, done = env.step(selected_coin)
rewards.append(reward)

n = N[selected_coin]
coin_average = self.V[selected_coin]
new_average = (coin_average * n + reward) / (n + 1)
N[selected_coin] += 1
# そのコインの平均報酬
self.V[selected_coin] = new_average
# 全ての報酬を配列で返す
return rewards

if __name__ == "__main__":
import pandas as pd
import matplotlib.pyplot as plt

def main():
env = CoinToss([0.1, 0.5, 0.1, 0.9, 0.1]) # 各コインの表がでる確率
# 探索優先(0.0に近いほど)か活用優先(1.0に近いほど)かの割合
epsilons = [0.0, 0.1, 0.2, 0.5, 0.8]
game_steps = list(range(10, 1010, 10)) # 10回から1000回まで10回おきに実行する
result = {}
for e in epsilons:
agent = EpsilonGreedyAgent(epsilon=e)
means = []
for s in game_steps:
env.max_episode_steps = s
rewards = agent.play(env)
means.append(np.mean(rewards))
result["epsilon={}".format(e)] = means
result["count of coin toss"] = game_steps
result = pd.DataFrame(result)
result.set_index("count of coin toss", drop=True, inplace=True)
result.plot.line(figsize=(10, 5))
plt.show()

main()

結果
epsilonが0.0の場合はコイントスの回数を増やしても報酬がぜんぜん増えていません。
epsilonが0.1か0.2くらいだといい結果がでています。
一般的にepsilonは0.1に設定することが多いようですが、今回の結果はそれを裏付けることとなりました。

強化学習3 (シミュレータ)

強化学習では、戦略を重視するか(Policyベース)、価値を重視するか(Valueベース)が重要なポイントとなります。
この2つをシミュレーションするサンプルがありましたので実行してみました。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習2 (マルコフ決定過程)

前回作成した環境(環境構築に関する記事)を使ってマルコフ決定過程に従う環境を作成します。

  • 遷移先の状態は直前の状態とそこでの行動だけに依存する。
  • 報酬は直前の状態と遷移先に依存する。

構成要素は次の4つとなります。

  1. 状態(State)
  2. 行動(Action)
  3. 状態遷移の確率(遷移関数/Transition function)
  4. 即時報酬(報酬関数/Reward function)

コードにコメントをいっぱい書いてみましたので参考にしてください

environment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from enum import Enum
import numpy as np

# エージェントのいる位置を表すクラス
class State():

def __init__(self, row=-1, column=-1): # コンストラクタ
self.row = row
self.column = column

def __repr__(self): # printした時に表示される
return "<State: [{}, {}]>".format(self.row, self.column)

def clone(self):
return State(self.row, self.column)

def __hash__(self):
return hash((self.row, self.column))

def __eq__(self, other):
return self.row == other.row and self.column == other.column

# 上下左右の行動を表すクラス(対になる行動は -1を掛けた値となる)
class Action(Enum):
UP = 1
DOWN = -1
LEFT = 2
RIGHT = -2

# 環境クラス(遷移関数と報酬関数あり)・・・一番長くそして核となるクラス
class Environment():

def __init__(self, grid, move_prob=0.8):
# 2次元のグリッド。
# [種類]
# 0: 通常(エージェント移動可能)
# -1: ダメージ (ゲーム終了)
# 1: 報酬 (ゲーム終了)
# 9: 壁 (エージェント移動不可)
self.grid = grid
self.agent_state = State() # エージェントの位置

# デフォルトの報酬はマイナスとする。 (毒沼のような感じ)
# エージェントが早くゴール向かうようにするため。
self.default_reward = -0.04

# エージェントは遷移確率(%)で指定された方向に進む。
# 遷移確率から1を引いた確率(%)で違う方向に進む。
self.move_prob = move_prob
self.reset()

@property
def row_length(self):
return len(self.grid)

@property
def column_length(self):
return len(self.grid[0])

@property
def actions(self):
return [Action.UP, Action.DOWN,
Action.LEFT, Action.RIGHT]

@property
def states(self):
states = []
for row in range(self.row_length):
for column in range(self.column_length):
# 壁(9)はステータスに含まない。
if self.grid[row][column] != 9:
states.append(State(row, column))
return states

# 遷移関数
def transit_func(self, state, action):
transition_probs = {} # 遷移確率を初期化
if not self.can_action_at(state):
# すでに終端にいる
return transition_probs
# 反対方向
opposite_direction = Action(action.value * -1)

for a in self.actions:
prob = 0
if a == action:
prob = self.move_prob # 遷移確率(0.8=80%)
elif a != opposite_direction:
prob = (1 - self.move_prob) / 2 # 遷移確率(0.1=10%)

# 移動位置ごとに遷移確率を設定する
next_state = self._move(state, a)
if next_state not in transition_probs:
transition_probs[next_state] = prob
else:
transition_probs[next_state] += prob

return transition_probs

# 移動可能な場所かどうか
def can_action_at(self, state):
if self.grid[state.row][state.column] == 0:
return True
else:
return False

# 移動する
def _move(self, state, action):
if not self.can_action_at(state):
raise Exception("Can't move from here!")
# 移動先用にグリッドをもう1つ用意
next_state = state.clone()
# アクションに応じて移動する
if action == Action.UP:
next_state.row -= 1
elif action == Action.DOWN:
next_state.row += 1
elif action == Action.LEFT:
next_state.column -= 1
elif action == Action.RIGHT:
next_state.column += 1

# グリッドの外かどうかをチェックする。(外だったらもとの位置に戻す)
if not (0 <= next_state.row < self.row_length):
next_state = state
# グリッドの外かどうかをチェックする。(外だったらもとの位置に戻す)
if not (0 <= next_state.column < self.column_length):
next_state = state
# 壁にぶつかったかどうかをチェックする。(ぶつかったらもとの位置に戻す)
if self.grid[next_state.row][next_state.column] == 9:
next_state = state
# 移動先のグリッド位置を返す
return next_state

# 報酬関数(ゲーム終了判定と報酬を返す)
def reward_func(self, state):
reward = self.default_reward
done = False

# 次の状態の属性をチェックする
attribute = self.grid[state.row][state.column]
if attribute == 1:
# 報酬を得てゲーム終了
reward = 1
done = True
elif attribute == -1:
# ダメージを受けてゲーム終了.
reward = -1
done = True

return reward, done

def reset(self): # エージェントを左上に戻す
self.agent_state = State(self.row_length - 1, 0)
return self.agent_state

# エージェントから行動を受け取り、遷移関数/報酬関数を用いて、
# 次の遷移先と即時報酬を計算する
def step(self, action):
next_state, reward, done = self.transit(self.agent_state, action)
if next_state is not None:
self.agent_state = next_state

return next_state, reward, done

def transit(self, state, action):
# 遷移関数で遷移確率を取得する。
transition_probs = self.transit_func(state, action)
if len(transition_probs) == 0:
return None, None, True

next_states = []
probs = []
for s in transition_probs:
next_states.append(s)
probs.append(transition_probs[s])

# 遷移関数の出力した確率に沿って遷移先を得る(ここで実際の行動が決まる!)
next_state = np.random.choice(next_states, p=probs)
# 報酬関数で報酬とゲーム終了判定を取得する
reward, done = self.reward_func(next_state)
return next_state, reward, done

上記で実装した環境を実行するためのコードは下記の通りです。

environment_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import random
from environment import Environment

# エージェントを表すクラス(戦略を定義する)
class Agent():

def __init__(self, env):
self.actions = env.actions

# 戦略(最初なのでただのランダム選択)
def policy(self, state):
return random.choice(self.actions)

# 実行関数
def main():
# グリッド環境を作成
grid = [
[0, 0, 0, 1],
[0, 9, 0, -1],
[0, 0, 0, 0]
]
env = Environment(grid) # グリッドから環境を作成
agent = Agent(env) # 環境をエージェントに設定

# ゲームを10回実行する
for i in range(10):
state = env.reset() # エージェントの位置リセット
total_reward = 0 # トータル報酬初期化
done = False # ゲーム終了状態初期化

while not done: # ゲーム終了まで続ける
# 戦略に沿ったアクション(移動方向)を取得。(現状はランダム選択)
action = agent.policy(state)
# アクションによる移動位置、報酬、ゲーム終了状態を取得。
next_state, reward, done = env.step(action)
# 今回移動分の報酬をトータル報酬に加算・減算する
total_reward += reward
# エージェントの位置を更新する
state = next_state

print("エピソード {}: エージェント取得報酬 {}.".format(i, total_reward))

if __name__ == "__main__":
main()

environment_demo.pyを実行すると下記のような結果となりました。
(乱数を使ってるので実行するたびに結果は異なります。)
結果

最初よくわからなったのですがエージェントの戦略で決めた行動が必ず実行されるわけではなく、その行動をもとに遷移確率を算出し、その確率に応じて実際の動作(Action)が決まるということです。
(Policyで決めた行動と違う方向に進んでいて「なぜなんだ?!」と悩んでました。。。)

今回の例では、まず戦略で決めた行動がUPだとしたらそれが実行される確率が80%で、残り20%の半分10%がRIGHTかLEFTが実行される確率に割り当てられるということになります。(反対行動のDOWNは0%)

強化学習1 (環境構築編)

機械学習の中で一番興味があった強化学習ですが、難しいのと時間がかかるので敬遠していたのですが少しづつ記事に書いていこうかと思います。

今回は環境構築編になります。
前準備としてWindows10環境にAnaconda3-2019.07-Windows-x86_64.exeとgitをインストールしておきます。

それが終わったら強化学習用の実行環境を構築します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
conda create -n env3.6 python=3.6

conda activate env3.6
pip install gym==0.10.9
pip install jupyter==1.0.0
pip install numpy==1.15.4
pip install pandas==0.23.4
pip install scipy==1.1.0
pip install scikit-learn==0.20.0
pip install matplotlib==3.0.1
pip install tensorflow==1.12.0
pip install -e git+https://github.com/ntasfi/PyGame-Learning-Environment.git#egg=ple
pip install -e git+https://github.com/lusob/gym-ple.git#egg=gym-ple
pip install h5py==2.8.0
pip install pygame==1.9.4
pip install tqdm==4.28.1

全て問題なくインストールできたら下記のpyファイルを作成し実行します。

welcome.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import numpy as np
from tensorflow.python import keras as K
import gym
import gym_ple
import time

def welcome():

env = gym.make("Catcher-v0")
num_action = env.action_space.n
episode_count = 10

s = env.reset()
brain = K.Sequential()
brain.add(K.layers.Dense(num_action, input_shape=[np.prod(s.shape)], activation="softmax"))

def policy(s):
evaluation = brain.predict(np.array([s.flatten()]))
return np.argmax(evaluation)

for e in range(episode_count):
time.sleep(3)
s = env.reset()
done = False
while not done:
env.render()
a = policy(s)
n_state, reward, done, info = env.step(a)
s = n_state

if __name__ == "__main__":
welcome()

ウィンドウが現れて、次のようなボールキャッチゲームが動いたら環境構築は完了です。

強化学習用の実行環境を終了する場合は次のコマンドを実行します。

1
conda deactivate

強化学習(環境構築編)は以上になります。お疲れ様でした。

深層学習でCIFAR10分類(一般物体認識)

CIFAR10とういうデータを使って一般物の認識をしてみます。

まずは必要なモジュールをインポートします。

1
2
3
4
5
6
7
8
9
10
import numpy as np
import matplotlib.pyplot as plt

import chainer.optimizers as Opt
import chainer.functions as F
import chainer.links as L
import chainer
import chainer.datasets as ds
import chainer.dataset.convert as con
from chainer import Variable, Chain, config, cuda

共通関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -------------- #
# 共通関数の定義 #
# -------------- #
# データ振り分け関数
def data_divide(Dtrain, D, xdata, tdata, shuffle='on'):
if shuffle == 'on':
index = np.random.permutation(range(D))
elif shuffle == 'off':
index = np.arange(D)
else:
print('error')
xtrain = xdata[index[0:Dtrain],:]
ttrain = tdata[index[0:Dtrain]]
xtest = xdata[index[Dtrain:D],:]
ttest = tdata[index[Dtrain:D]]
return xtrain, xtest, ttrain, ttest

# グラフ表示関数
def show_graph(result1, result2, title, xlabel, ylabel, ymin=0.0, ymax=1.0):
# 学習記録の表示(誤差)
Tall = len(result1)
plt.figure(figsize=(12, 8))
plt.plot(range(Tall), result1, label='train')
plt.plot(range(Tall), result2, label='test')
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.xlim([0, Tall])
plt.ylim(ymin, ymax)
plt.legend()
plt.show()

# 学習用関数
def learning_classification(model,optNN,data,result,T=10):
for time in range(T):
optNN.target.cleargrads()
ytrain = model(data[0])
loss_train = F.softmax_cross_entropy(ytrain,data[2])
acc_train = F.accuracy(ytrain,data[2])
loss_train.backward()
optNN.update()

with chainer.using_config('train', False), chainer.using_config('enable_backprop', False):
ytest = model(data[1])
loss_test = F.softmax_cross_entropy(ytest,data[3])
acc_test = F.accuracy(ytest,data[3])
result[0].append(cuda.to_cpu(loss_train.data))
result[1].append(cuda.to_cpu(loss_test.data))
result[2].append(cuda.to_cpu(acc_train.data))
result[3].append(cuda.to_cpu(acc_test.data))

一般物体認識のデータセットを読み込みます。

1
2
3
4
5
6
7
# 一般物体認識のデータセットの読み込み
train, test = ds.get_cifar10(ndim=3)
xtrain, ttrain = con.concat_examples(train)
xtest, ttest = con.concat_examples(test)

# データサイズの確認
Dtrain, ch, Ny, Nx = xtrain.shape

どんなデータがあるかいくつか画像を出力します。

1
2
3
4
5
6
7
8
9
# 試しに画像を出力
plt.imshow(xtrain[0,:,:,:].transpose(1, 2, 0))
plt.show()

plt.imshow(xtrain[500,:,:,:].transpose(1, 2, 0))
plt.show()

plt.imshow(xtrain[1200,:,:,:].transpose(1, 2, 0))
plt.show()

結果

画像識別の問題に有効な畳み込みニューラルネットワークを設定します。

1
2
3
4
5
6
7
8
9
# 2層畳み込みニューラルネットワークの設定
C = ttrain.max() + 1
H1 = 10

layers = {}
layers['conv1'] = L.Convolution2D(ch, H1, ksize=3, stride=1, pad=1)
layers['bnorm1'] = L.BatchRenormalization(H1)
layers['l1'] = L.Linear(None, C)
NN = Chain(**layers)

プーリング層を導入した関数を定義します。

1
2
3
4
5
6
7
8
# プーリング層の導入
def model(x):
h = NN.conv1(x)
h = F.relu(h)
h = NN.bnorm1(h)
h = F.max_pooling_2d(h, ksize=3, stride=2, pad=1)
y = NN.l1(h)
return y

GPUを設定します。

1
2
3
4
# GPUの設定
gpu_device = 0
cuda.get_device(gpu_device).use()
NN.to_gpu(gpu_device)

最適化手法を設定します。

1
2
3
# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

学習の経過を保存する変数を宣言します。

1
2
3
4
5
6
7
# 学習データ保存エリア
train_loss = []
train_acc = []
test_loss = []
test_acc = []

result = [train_loss, test_loss, train_acc, test_acc]

一度に全部実行するとメモリオーバーになってしまうのでデータセットを分割して実行します。
CIFARのデータは50,000個の訓練データがあるので5,000個ずつを10回に分けて実行します。

1
2
3
4
5
6
# シリアルイテレータの呼び出し
from chainer.iterators import SerialIterator as siter

# データセット振り分ける
batch_size = 5000
train_iter = siter(train, batch_size)

いつくかのバッチと呼ばれる小さな塊に分けて学習を進める方法をバッチ学習といいます。
特にランダムにデータを選んでニューラルネットワークの最適化を進める方法を確率勾配法といいます。
確率勾配法による最適化を行います。

1
2
3
4
5
6
7
# 確率勾配法による最適化
nepoch = 10
while train_iter.epoch < nepoch:
batch = train_iter.next()
xtrain, ttrain = con.concat_examples(batch)
data = cuda.to_gpu([xtrain, xtest, ttrain, ttest])
learning_classification(model, optNN, data, result, 1)

結果を表示します。

1
2
3
show_graph(result[0], result[1], 'loss function', 'step', 'loss function', 0.0, 4.0)

show_graph(result[2], result[3], 'accuracy', 'step', 'accuracy')

誤差

正解率

正解率は6割に届かない程度となりました。
ここから正解率を上げるために4層のニューラルネットワークにしていきます。

畳み込みニューラルネットワークのクラスを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 畳み込みニューラルネットワークのクラス作成
class CNN(Chain):
def __init__(self, ch_in,ch_out):
layers = {}
layers['conv1'] = L.Convolution2D(ch_in,ch_out,ksize=5,stride=2,pad=2)
layers['bnorm1'] = L.BatchNormalization(ch_out)
super().__init__(**layers)

def __call__(self,x):
h = self.conv1(x)
h = self.bnorm1(h)
h = F.relu(h)

return h

多段階の畳み込みニューラルネットワークと関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 多段階の畳み込みニューラルネットワーク
C = ttrain.max() + 1
H1 = 16
H2 = 32
H3 = 64

NN = Chain(cnn1 = L.Convolution2D(ch,H1,ksize=1,stride=1,pad=0),
cnn2 = L.Convolution2D(H1,H1,ksize=3,stride=1,pad=0),
cnn3 = L.Convolution2D(H1,H1,ksize=5,stride=1,pad=0),
cnn4 = L.Convolution2D(H1,H2,ksize=1,stride=1,pad=0),
cnn5 = L.Convolution2D(H2,H2,ksize=3,stride=1,pad=0),
cnn6 = L.Convolution2D(H2,H2,ksize=5,stride=1,pad=0),
cnn7 = L.Convolution2D(H2,H3,ksize=1,stride=1,pad=0),
cnn8 = L.Convolution2D(H3,H3,ksize=3,stride=1,pad=0),
cnn9 = L.Convolution2D(H3,C,ksize=5,stride=1,pad=3),
bnorm1 = L.BatchNormalization(H1),
bnorm2 = L.BatchNormalization(H1),
bnorm3 = L.BatchNormalization(H1),
bnorm4 = L.BatchNormalization(H2),
bnorm5 = L.BatchNormalization(H2),
bnorm6 = L.BatchNormalization(H2),
bnorm7 = L.BatchNormalization(H3),
bnorm8 = L.BatchNormalization(H3))

def model(x):
h = NN.cnn1(x)
h = F.relu(h)
h = NN.bnorm1(h)
h = NN.cnn2(h)
h = F.relu(h)
h = NN.bnorm2(h)
h = NN.cnn3(h)
h = F.relu(h)
h = NN.bnorm3(h)
h = F.max_pooling_2d(h, 2)
h = NN.cnn4(h)
h = F.relu(h)
h = NN.bnorm4(h)
h = NN.cnn5(h)
h = F.relu(h)
h = NN.bnorm5(h)
h = NN.cnn6(h)
h = F.relu(h)
h = NN.bnorm6(h)
h = F.max_pooling_2d(h, 2)
h = NN.cnn7(h)
h = F.relu(h)
h = NN.bnorm7(h)
h = NN.cnn8(h)
h = F.relu(h)
h = NN.bnorm8(h)
h = NN.cnn9(h)
y = F.mean(h,axis=(2,3))

return y

GPUの設定を行います。

1
2
3
4
# GPUの設定
gpu_device = 0
cuda.get_device(gpu_device).use()
NN.to_gpu()

結果を一旦初期化します。

1
2
3
4
5
train_loss = []
train_acc = []
test_loss = []
test_acc = []
result = [train_loss,train_acc,test_loss,test_acc]

最適化手法にAdamを設定します。

1
2
3
# 最適化手法の設定
optNN = Opt.Adam()
optNN.setup(NN)

関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def shift_labeled(labeled_data):
data, label = labeled_data

ch, Ny, Nx = data.shape
z_h = Ny * (2.0 * np.random.rand(1) - 1.0) * 0.3
z_v = Nx * (2.0 * np.random.rand(1) - 1.0) * 0.3
data = np.roll(data, int(z_h), axis=1)
data = np.roll(data, int(z_v), axis=2)

return data, label

def flip_labeled(labeled_data):
data, label = labeled_data
z = np.random.randint(2)
if z == 1:
data = data[:,::-1,:]
z = np.random.randint(2)
if z == 1:
data = data[:,:,::-1]

z = np.random.randint(2)
if z == 1:
data = data.transpose(0, 2, 1)
return data, label

def check_network(x, link):
print('input:', x.shape)
h = link(x)
print('output:', h.shape)

確率勾配法で最適化し、結果をグラフ化します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from tqdm import tqdm

nepoch = 100
batch_size = 128
train_iter = siter(train, batch_size)
with tqdm(total = nepoch) as pbar:
while train_iter.epoch < nepoch:
pbar.update(train_iter.is_new_epoch)
batch = train_iter.next()
batch = ds.TransformDataset(batch, flip_labeled)
xtrain,ttrain = con.concat_examples(batch)
data = cuda.to_gpu([xtrain,xtest,ttrain,ttest])
learning_classification(model,optNN,data,result,1)
if train_iter.is_new_epoch == 1:
show_graph(result[0],result[1],'loss function in training','step','loss function',0.0,4.0)
show_graph(result[2],result[3],'Accuracy in training and test','step','accuracy')
print(result[3][len(result[3])-1])

2、3時間実行しているのですが、進捗が48%でまだ時間がかかるので一旦ここまでの結果を確認しておきます。
誤差
正解率

なんとか6割の正解率を超えてきているでしょうか。ただここまで時間がかかるのと正解率が微妙にしか上昇していないのでなんらかの改善の余地はあると思います。

5時間20分かかってようやく終了しました。
誤差
正解率

最終的な正解率は73%ですか。。。まー実用性があるといってもいいような気がします。

(Google Colaboratoryで動作確認しています。)

ニューラルネットワークで株価予測

ニューラルネットワークで株価予測をしてみます。
まずは必要なモジュールをインポートします。

1
2
3
4
5
6
7
import numpy as np
import matplotlib.pyplot as plt

import chainer.optimizers as Opt
import chainer.functions as F
import chainer.links as L
from chainer import Variable, Chain, config

データ振り分け処理、グラフ表示、回帰分析用の関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# -------------- #
# 共通関数の定義 #
# -------------- #
# データ振り分け関数
def data_divide(Dtrain, D, xdata, tdata, shuffle='on'):
if shuffle == 'on':
index = np.random.permutation(range(D))
elif shuffle == 'off':
index = np.arange(D)
else:
print('error')
xtrain = xdata[index[0:Dtrain],:]
ttrain = tdata[index[0:Dtrain]]
xtest = xdata[index[Dtrain:D],:]
ttest = tdata[index[Dtrain:D]]
return xtrain, xtest, ttrain, ttest

# グラフ表示関数
def show_graph(result1, result2, title, xlabel, ylabel, ymin=0.0, ymax=1.0):
# 学習記録の表示(誤差)
Tall = len(result1)
plt.figure(figsize=(12, 8))
plt.plot(range(Tall), result1, label='train')
plt.plot(range(Tall), result2, label='test')
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.xlim([0, Tall])
plt.ylim(ymin, ymax)
plt.legend()
plt.show()

# 回帰関数の定義
def learning_regression(model, optNN, data, T=10):
train_loss = []
test_loss = []
for time in range(T):
config.train = True
optNN.target.cleargrads() # zerogradsより記憶容量の確保にいい
ytrain = model(data[0])
loss_train = F.mean_squared_error(ytrain, data[2])
loss_train.backward()
optNN.update()

config.train = False
ytest = model(data[1])
loss_test = F.mean_squared_error(ytest, data[3])

train_loss.append(loss_train.data)
test_loss.append(loss_test.data)
return train_loss, test_loss

株価予測の前に適当な関数で変換したデータを回帰分析してみます。
データを作成します。

1
2
3
4
# 時系列データ作成
M = 100
time_data = np.linspace(0.0, 10.0, M)
value_data = np.sin(time_data) + 2.0 * np.sin(2.0 * time_data)

直前2回分データを入力データとして、入力データとラベルデータを作成します。

1
2
3
4
5
6
7
8
9
10
11
12
# 直前2回分のデータを入力にする
N = 2
xdata = []
tdata = []
for k in range(N, M):
xdata.append(value_data[k - N:k])
tdata.append(value_data[k])
xdata = np.array(xdata).astype(np.float32)
tdata = np.array(tdata).reshape(M - N, 1).astype(np.float32)

# データの形を確認
D, N = xdata.shape

4層のニューラルネットワークを作成し、関数化します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 4層ニューラルネットワークを作成し、関数化
C = 1
H1 = 5
H2 = 5
H3 = 5
layers = {}
layers['l1'] = L.Linear(N, H1)
layers['l2'] = L.Linear(H1, H2)
layers['l3'] = L.Linear(H2, H3)
layers['l4'] = L.Linear(H3, C)
layers['bnorm1'] = L.BatchNormalization(H1)
layers['bnorm2'] = L.BatchNormalization(H2)
layers['bnorm3'] = L.BatchNormalization(H3)
NN = Chain(**layers)

def model(x):
h = NN.l1(x)
h = F.relu(h)
h = NN.bnorm1(h)

h = NN.l2(h)
h = F.relu(h)
h = NN.bnorm2(h)

h = NN.l3(h)
h = F.relu(h)
h = NN.bnorm3(h)

y = NN.l4(h)
return y

最適化手法を設定します。

1
2
3
# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

入力データとラベルデータをそれぞれ訓練データとテストデータに振り分けます。
ここでは先頭3分の1のデータを訓練データとします。

1
2
3
4
5
6
7
8
# データ分割(3分の1を訓練データ、3分の2をテストデータとする)
Dtrain = D // 3

xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata, 'off')

# print(xtrain.shape, xtest.shape, ttrain.shape, ttest.shape)
data = [xtrain, xtest, ttrain, ttest]
# result = [train_loss, test_loss]

回帰分析を行います。200回学習します。

1
train_loss, test_loss = learning_regression(model, optNN, data, 200)

学習結果と予測結果をグラフ表示します。

1
2
3
4
5
6
7
ytrain = model(xtrain).data
ytest = model(xtest).data
plt.figure(figsize=(12, 8))
plt.plot(time_data[0:Dtrain], ytrain, marker='x', linestyle='None') # 学習結果
plt.plot(time_data[Dtrain:D], ytest, marker='o', linestyle='None') # 予測結果
plt.plot(time_data[0:D - N], value_data[N:D])
plt.show()

予測結果
予測がうまくいってない箇所もありますが、十分な結果がでているようです。
(実行するごとに微妙に結果が変わります。ニューラルネットワークの個性ということでしょうか。)

次に予測用の株価データを準備します。
(株価の読み込み期間を変えるニューラルネットワークの学習処理でエラーになることがあります。
異常値エラーとのことですが、理由がよくわからないので今回は何回か試して問題のなかった[2005/01/01-2007/12/31]を分析期間としました。)

1
2
3
4
5
6
7
# 株価読み込み用モジュール
import pandas_datareader.data as web
import datetime as dt

start = dt.date(2005, 1, 1)
end = dt.date(2007, 12, 31)
web_data = web.DataReader('AMZN', 'yahoo', start, end)

読み込んだ株価データをグラフ化します。

1
2
3
plt.figure(figsize=(12, 8))
plt.plot(web_data['Close'])
plt.show()

株価データ

株価データを入力データとラベルデータに振り分けます。
今回は直前5回分のデータを入力データとします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
value_data = web_data['Close']
Total = len(value_data)

N = 5 # 直前何回分を入力データとするか
xdata = [] # 入力データ
tdata = [] # ラベルデータ
for k in range(N, Total):
xdata.append(value_data[k - N:k])
tdata.append(value_data[k])

xdata = np.array(xdata).astype(np.float32)
tdata = np.array(tdata).reshape(len(tdata), 1).astype(np.float32)

# 入力データの形を確認
D, N = xdata.shape

4層ニューラルネットワークを作成し、関数化します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 4層ニューラルネットワークを作成し、関数化
C = 1
H1 = 5
H2 = 5
H3 = 5
layers = {}
layers['l1'] = L.Linear(N, H1)
layers['l2'] = L.Linear(H1, H2)
layers['l3'] = L.Linear(H2, H3)
layers['l4'] = L.Linear(H3, C)
layers['bnorm1'] = L.BatchNormalization(H1)
layers['bnorm2'] = L.BatchNormalization(H2)
layers['bnorm3'] = L.BatchNormalization(H3)
NN = Chain(**layers)

def model(x):
h = NN.l1(x)
h = F.relu(h)
h = NN.bnorm1(h)

h = NN.l2(h)
h = F.relu(h)
h = NN.bnorm2(h)

h = NN.l3(h)
h = F.relu(h)
h = NN.bnorm3(h)

y = NN.l4(h)
return y

最適化手法を設定し、訓練データとテストデータに振り分けます。
訓練データとテストデータは半分ずつに分けます。

1
2
3
4
5
6
7
8
9
10
# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# データ分割
Dtrain = D // 2

xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata, 'off')

data = [xtrain, xtest, ttrain, ttest]

回帰分析を行います。今回は学習回数を1000回に設定しました。

1
train_loss, test_loss = learning_regression(model, optNN, data, 1000)

誤差と予測結果を表示します。

1
2
3
4
5
6
7
8
9
10
11
# 誤差の表示
show_graph(train_loss, test_loss, 'loss function', 'step', 'loss function', ymin=0.0, ymax=10.0)

ytrain = model(xtrain).data
ytest = model(xtest).data
time_data = np.arange(Total - N)
plt.figure(figsize=(12, 8))
plt.plot(time_data[0:Dtrain], ytrain, marker='x', linestyle='None') # 学習結果
plt.plot(time_data[Dtrain:Total], ytest, marker='o', linestyle='None') # 予測結果
plt.plot(time_data[0:Total - N], value_data[N:Total])
plt.show()

誤差
株価予測結果
いまいちな結果です。やはり株価を予測するのは無理なのでしょうか。

(Google Colaboratoryで動作確認しています。)

非線形関数一覧

ニューラルネットワークを構築する際に、どの非線形関数を選択するかは大事なポイントとなります。
そこでどんな非線形関数があるかそれぞれグラフ化して確認します。

まずは必要なモジュールのインストールとx軸用に等間隔の数値を用意します。

1
2
3
4
5
6
7
8
# モジュールのインポート
import chainer.functions as F
import numpy as np
import matplotlib.pyplot as plt

# 等間隔の数値を用意する(x軸用)
D = 100
ndata = np.linspace(-5.0, 5.0, D)

非線形関数をグラフ化するコードは下記の通りです。
1行目のF.absoluteの箇所をそれぞれの非線形関数に置き換えて実行していきます。

1
2
3
ydata = F.absolute(ndata).data   # 非線形関数を変えながら実行する。
plt.plot(ndata, ydata)
plt.show()
非線形関数一覧
F.absolute F.add F.arccos
F.arcsin F.arctan F.broadcast
F.ceil F.clipped_relu F.cos
F.cosh F.cumprod F.cumsum
F.digamma F.dropout F.elu
F.erf F.erfc F.erfcinv
F.erfcx F.erfinv F.exp
F.expm1 F.fix F.flatten
F.flipud F.floor F.hard_sigmoid
F.identity F.leaky_relu F.lgamma
F.log F.log10 F.log1p
F.log2 F.log_ndtr F.ndtr
F.ndtri F.relu F.rrelu
F.rsqrt F.selu F.sigmoid
F.sign F.sin F.sinh
F.softplus F.sqrt F.square
F.squeeze F.tan F.tanh
F.transpose

(Google Colaboratoryで動作確認しています。)

ニューラルネットワークで回帰分析

ニューラルネットワークで回帰分析をします。
まずは必要なモジュールのインポートと共通関数の定義をします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# モジュールのインポート
import numpy as np
import matplotlib.pyplot as plt

import chainer.optimizers as Opt
import chainer.functions as F
import chainer.links as L
from chainer import Variable, Chain, config, cuda

# -------------- #
# 共通関数の定義 #
# -------------- #
# データ振り分け関数
def data_divide(Dtrain, D, xdata, tdata):
index = np.random.permutation(range(D))
xtrain = xdata[index[0:Dtrain],:]
ttrain = tdata[index[0:Dtrain]]
xtest = xdata[index[Dtrain:D],:]
ttest = tdata[index[Dtrain:D]]
return xtrain, xtest, ttrain, ttest

# グラフ表示関数
def show_graph(result1, result2, title, xlabel, ylabel, ymin=0.0, ymax=1.0):
# 学習記録の表示(誤差)
Tall = len(result1)
plt.figure(figsize=(8, 6))
plt.plot(range(Tall), result1, label='train')
plt.plot(range(Tall), result2, label='test')
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.xlim([0, Tall])
plt.ylim(ymin, ymax)
plt.legend()
plt.show()

# 回帰関数の定義
def learning_regression(model, optNN, data, T=10):
train_loss = []
test_loss = []
for time in range(T):
config.train = True
optNN.target.cleargrads() # zerogradsより記憶容量の確保にいい
ytrain = model(data[0])
loss_train = F.mean_squared_error(ytrain, data[2])
loss_train.backward()
optNN.update()

config.train = False
ytest = model(data[1])
loss_test = F.mean_squared_error(ytest, data[3])

train_loss.append(loss_train.data)
test_loss.append(loss_test.data)
return train_loss, test_loss

データを準備します。
x軸を-5から5までの等間隔100個分準備し、その後適当な関数でy軸に変換します。

1
2
3
4
5
6
7
8
# 等間隔の数値を用意する(x軸用)
D = 100
ndata = np.linspace(-5.0, 5.0, D)

# 関数でデータ作成
N = 1
xdata = ndata.reshape(D, N).astype(np.float32) # 2次元配列に変換(100行1列)
tdata = (np.sin(ndata) + np.sin(2.0 * ndata)).reshape(D, N).astype(np.float32)

作成したデータを視覚化します。

1
2
3
# 作成したデータをグラフ表示
plt.plot(xdata, tdata)
plt.show()

結果

ニューラルネットワークを作成し関数化します。
非線形関数はreluを使用します。

1
2
3
4
5
6
7
8
9
10
11
# 回帰のニューラルネットワークを作成し、関数化
C = 1 # 出力数
H = 20 # 中間層の数
NN = Chain(l1=L.Linear(N, H), l2=L.Linear(H, C), bnorm1=L.BatchNormalization(H))

def model(x):
h = NN.l1(x)
h = F.relu(h)
h = NN.bnorm1(h)
y = NN.l2(h)
return y

参考までに学習前のニューラルネットワークの状態を確認します。
まずは重さです。

1
2
# ニューラルネットワークの状態調査
print(NN.l1.W.data) # 最初はでたらめな重さ

結果
初期値としてでたらめな重さが設定されています。

次に勾配を表示してみます。

1
2
# ニューラルネットワークの勾配表示
print(NN.l1.W.grad) # 全て未計算(Not a Number)

結果
こちらはnan(Not a number)が設定されています。

最適化手法を設定し学習を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# 学習結果記録用
train_loss = []
test_loss = []

Dtrain = D // 2 # 訓練データ数
xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata) # データ分割
data = [xtrain, xtest, ttrain, ttest]
result = [train_loss, test_loss]

train_loss, test_loss = learning_regression(model, optNN, data, 1000)

show_graph(train_loss, test_loss, 'loss function', 'step', 'loss function', 0.0, 3.0)

学習結果の誤差は下記のようになります。
結果

次第に減少してはいますが0.2から0.3付近で改善しなくなっています。

回帰の結果を比較します。

1
2
3
4
5
6
7
8
# 回帰の結果を比較する
config.train = False
ytrain = model(xtrain).data
ytest = model(xtest).data
plt.plot(xtrain, ytrain, marker='x', linestyle='None')
plt.plot(xtest, ytest, marker='o', linestyle='None')
plt.plot(xdata, tdata)
plt.show()

結果
結果はぜんぜん合っていないようです。なんらかの改善策をとる必要がありそうです。

非線形関数調整してみます。
今回使ったreluの形は下記のようになっており、今回学習するデータには合わないようです。

1
2
3
4
# F.reluの形を調べる
ydata = F.relu(xdata).data
plt.plot(xdata, ydata)
plt.show()

結果

別の非線形関数sigmoidの形を調べると、reluよりは今回学習するデータに近いようにみえます。

1
2
3
4
# F.sigmoidの形を調べる
ydata = F.sigmoid(xdata).data
plt.plot(xdata, ydata)
plt.show()

結果

非線形関数をreluからsigmoidに変更し、同じように学習・結果表示を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 回帰のニューラルネットワークを作成し、関数化
C = 1 # 出力数
H = 20 # 中間層の数
NN = Chain(l1=L.Linear(N, H), l2=L.Linear(H, C), bnorm1=L.BatchNormalization(H))

def model(x):
h = NN.l1(x)
h = F.sigmoid(h) # シグモイドに変更
h = NN.bnorm1(h)
h = NN.l2(h)
return h

# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# 学習結果記録用
train_loss = []
test_loss = []

Dtrain = D // 2 # 訓練データ数
xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata)
data = [xtrain, xtest, ttrain, ttest]
result = [train_loss, test_loss]

train_loss, test_loss = learning_regression(model, optNN, data, 1000)

show_graph(train_loss, test_loss, 'loss function', 'step', 'loss function', 0.0, 3.0)

結果
誤差に関しては2つの非線形関数でそれほど結果が変わりませんでした。

回帰の結果を確認してみます。

1
2
3
4
5
6
7
8
# 回帰の結果を比較する
config.train = False
ytrain = model(xtrain).data
ytest = model(xtest).data
plt.plot(xtrain, ytrain, marker='x', linestyle='None')
plt.plot(xtest, ytest, marker='o', linestyle='None')
plt.plot(xdata, tdata)
plt.show()

結果
reluよりはsigmoidを使った方がだいぶましになったように思われます。

ただまだまだ納得いく結果ではありません。
ほかの非線形関数を調べたところsin関数というのがありました。

1
2
3
4
# F.sinの形を調べる
ydata = F.sin(xdata).data
plt.plot(xdata, ydata)
plt.show()

結果
この関数はかなり今回回帰分析を行いたいグラフと形が似ているようです。

非線形関数をsinに変更してまたまた回帰分析を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 回帰のニューラルネットワークを作成し、関数化
C = 1 # 出力数
H = 20 # 中間層の数
NN = Chain(l1=L.Linear(N, H), l2=L.Linear(H, C), bnorm1=L.BatchNormalization(H))

def model(x):
h = NN.l1(x)
h = F.sin(h) # サインに変更
h = NN.bnorm1(h)
h = NN.l2(h)
return h

# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# 学習結果記録用
train_loss = []
test_loss = []

Dtrain = D // 2 # 訓練データ数
xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata)
data = [xtrain, xtest, ttrain, ttest]
result = [train_loss, test_loss]

train_loss, test_loss = learning_regression(model, optNN, data, 1000)

show_graph(train_loss, test_loss, 'loss function', 'step', 'loss function', 0.0, 3.0)

結果
誤差は十分に下がることが確認できました。

回帰の結果も確認してみます。

1
2
3
4
5
6
7
8
# 回帰の結果を比較する
config.train = False
ytrain = model(xtrain).data
ytest = model(xtest).data
plt.plot(xtrain, ytrain, marker='x', linestyle='None')
plt.plot(xtest, ytest, marker='o', linestyle='None')
plt.plot(xdata, tdata)
plt.show()

結果
完璧な結果がでました。
ただ今回はグラフの見た目でそれに近い非線形関数を使うという少しずるい手順を踏んでしましました。

relu関数のまま結果を改善することはできないのでしょうか。
試しにニューラルネットワークの階層を増やしてみました。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 4層のニューラルネットワークを作成し関数化
C = 1 # 出力の種類
H1 = 5
H2 = 5
H3 = 5
layers = {}
layers['l1'] = L.Linear(N, H1)
layers['l2'] = L.Linear(H1, H2)
layers['l3'] = L.Linear(H2, H3)
layers['l4'] = L.Linear(H3, C)
layers['bnorm1'] = L.BatchNormalization(H1)
layers['bnorm2'] = L.BatchNormalization(H2)
layers['bnorm3'] = L.BatchNormalization(H3)
NN = Chain(**layers)

def model(x):
h = NN.l1(x)
h = F.relu(h)
h = NN.bnorm1(h)
h = NN.l2(h)
h = F.relu(h)
h = NN.bnorm2(h)
h = NN.l3(h)
h = F.relu(h)
h = NN.bnorm3(h)
y = NN.l4(h)
return y

# 最適化手法の設定
optNN = Opt.MomentumSGD()
optNN.setup(NN)

# 学習結果記録用
train_loss = []
test_loss = []

Dtrain = D // 2 # 訓練データ数
xtrain, xtest, ttrain, ttest = data_divide(Dtrain, D, xdata, tdata)
data = [xtrain, xtest, ttrain, ttest]
result = [train_loss, test_loss]

train_loss, test_loss = learning_regression(model, optNN, data, 1000)

show_graph(train_loss, test_loss, 'loss function', 'step', 'loss function', 0.0, 3.0)

# 回帰の結果を比較する
config.train = False
ytrain = model(xtrain).data
ytest = model(xtest).data
plt.plot(xtrain, ytrain, marker='x', linestyle='None')
plt.plot(xtest, ytest, marker='o', linestyle='None')
plt.plot(xdata, tdata)
plt.show()

結果は以下の通りです。
結果
誤差は十分に下がっています。

結果
回帰の結果もなかなか良いものがでました。回帰分析の結果を改善するためには非線形関数を変更する方法とは別に、ニューラルネットワークの階層を深くするのも有効な手段だと言えそうです。

(Google Colaboratoryで動作確認しています。)