強化学習 x ニューラルネットワーク 3 (ボストン市の住宅価格予測)

ボストン市の住宅価格をニューラルネットワークで予想してみます。
住宅価格のデータセットは13の特徴量(入力)と住宅価格(出力)のセットとなっています。

ニューラルネットワークは13の変数 x から1つの値 y を出力することになります。
学習は予測した価格と実際の住宅価格の差異が小さくなるようにパラメータを調整することで行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_boston
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.python import keras as K

# ボストン市の住宅価格
dataset = load_boston()

# 入力と出力に分ける
y = dataset.target
X = dataset.data

# 訓練データとテストデータに分ける
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)

model = K.Sequential([
# データの正規化(入力は13の特徴量)
K.layers.BatchNormalization(input_shape=(13,)),
# 1層目のニューラルネットワーク
# 活性化関数はsoftplus
# kernel_regularizer正則化=>重みに制限をかける=>過学習防止
K.layers.Dense(units=13, activation="softplus", kernel_regularizer="l1"),
# 2層目のニューラルネットワーク
K.layers.Dense(units=1)
])
# loss=最小二乗法 optimizer=最適化に確率的勾配降下法
model.compile(loss="mean_squared_error", optimizer="sgd")

# 学習を行う(学習回数 epochs は8回)
model.fit(X_train, y_train, epochs=8)

# 予測を行う
predicts = model.predict(X_test)

result = pd.DataFrame({
"predict": np.reshape(predicts, (-1,)), # 2次元データを1次元データに変換
"actual": y_test
})
limit = np.max(y_test) # 最大値の取得

# 結果をグラフ表示する。
result.plot.scatter(x="actual", y="predict", xlim=(0, limit), ylim=(0, limit))
plt.show()

結果(コンソール)
8回の学習で誤差が 165.8451 から 18.0005 まで減っていることがわかります。

次に予測結果をグラフで確認します。
結果(グラフ)
横軸 x が実際の住宅価格で、縦軸 y が予測した住宅価格となります。
完全に一致していれば対角線上にプロットされることになります。今回の予測はだいたいあっているようです。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習 x ニューラルネットワーク 2 (2層のニューラルネットワーク)

TensorFlowで2層のニューラルネットワークを実装してみます。

np.random.rand(3, 2)で3件の座標データbatchを作成しています。

また1層目から2層目にデータを送るときには、活性化関数(シグモイド)を適用しています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import numpy as np
from tensorflow.python import keras as K

# 2層のニューラルネットワーク
model = K.Sequential([
# 1層目 出力サイズ4、入力1行2列、活性化関数はシグモイド
K.layers.Dense(units=4, input_shape=((2, )), activation="sigmoid"),
# 2層目 出力サイズ4
K.layers.Dense(units=4),
])

print('-------------------------')
# 3件の座標をまとめたバッチ (2次元).
batch = np.random.rand(3, 2)
print('batchの形状', batch.shape)
print('batch', batch)
print('-------------------------')
y = model.predict(batch)
print('出力yの形状', y.shape)
print('出力y', y)
print('-------------------------')

結果

入力データbatchが3行2列(3件の座標データ)となり、出力データが3行4列(3件の4次元データ)となっていることがわかります。
このようにしてみると1行目に1件目の入力データと1件目の出力データが表示され、2行目に次の入力とその出力がされていて対応がわかりやすくなってます。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習 x ニューラルネットワーク 1 (1層のニューラルネットワーク)

これまではQ[s][a]というテーブル内の値を更新することで学習していましたが、今回からは関数のパラメータを調整することで学習していきます。

まずはTensorFlowで1層のニューラルネットワークを実装してみます。

入力(x)は2行1列の座標、出力(y)は行動価値4行1列を想定しています。
対応する重み(weight)は4行2列でバイアス(bias)は4行1列となります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import numpy as np
from tensorflow.python import keras as K

model = K.Sequential([ # 複数の層をまとめるためのモジュール
# K.layers.Dense => ニューラルネットワークを表す(重みとバイアスを持つ層)
# units=4 => 出力サイズ
# input_shape => 入力サイズ
K.layers.Dense(units=4, input_shape=((2, ))),
])

weight, bias = model.layers[0].get_weights() # 第1層の重みとバイアスを取得
print('---------------------')
print('重みの形状 {}.'.format(weight.shape))
print('重み', weight)
print('---------------------')
print('バイアスの形状 {}.'.format(bias.shape))
print('バイアス', bias)
x = np.random.rand(1, 2)
y = model.predict(x)
print('---------------------')
print('x(入力)の形状', x.shape)
print('x(入力)', x)
print('---------------------')
print('y(出力・結果)の形状', y.shape)
print('y(出力)', y)
print('---------------------')

結果

想定した行列とは全て逆の結果となりました。

これは座標を1行2列で入力したためです。=> np.random.rand(1, 2)
このため重み、バイアス、出力の全てが行列が反対になってしまっていますが、本質的な結果は変わりません。

多くの深層学習フレームワークでは行をデータ数(バッチサイズ)を表すのに使うため、このような仕様となっていますので慣れてしまいましょう。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習9 (モンテカルロ法 / TD法 / SARSA / Actor Critic法 結果一覧)

これまで試してきた4手法(モンテカルロ法 / TD法 / SARSA / Actor Critic法)の結果一覧をまとめました。

まずおさらいとして検証モデルとして使用したのはOpenAI GymのFrozenLakeです。
4 x 4 マスの迷路でところどころに穴があいていて穴に落ちるとゲーム終了となります。
穴に落ちずにゴールに到着すると報酬が得られます。
FrozenLake

次に各手法ごとの「各行動の評価」と「獲得報酬平均」の結果一覧です。

各行動の評価 獲得報酬平均
モンテカルロ法 モンテカルロ法
TD法(Q-learning) TD法(Q-learning)
SARSA SARSA
Actor Critic法 Actor Critic法

簡単に各手法を説明します。

  • モンテカルロ法
     エピソードが終了してから評価を行う。Valueベース。

  • TD法(Q-learning)
     1ステップごとに評価を行う。Valueベース。

  • SARSA
     戦略に戻づいて行動を決定する。Policyベース。

  • Actor Critic法
     戦略と価値評価を相互に更新して学習する。ValueベースかつPolicyベース。

それぞれ特徴がありますが、学習までに時間がかかるものの最終的には一番獲得報酬が安定しているActor Critic法が個人的には好みです。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習8 (Actor Critic法)

Actor Critic法による学習を試してみます。

Actor Critic法は、戦略担当(Actor)と価値評価担当(Critic)を相互に更新して学習する手法です。

まずはエージェントのベースになるクラスを実装します。(強化学習5・6 (モンテカルロ法・TD法)と同様です。)

el_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import numpy as np
import matplotlib.pyplot as plt

class ELAgent():

def __init__(self, epsilon):
self.Q = {}
self.epsilon = epsilon
self.reward_log = []

def policy(self, s, actions):
if np.random.random() < self.epsilon:
# ランダムな行動(探索)
return np.random.randint(len(actions))
else:
# self.Q => 状態における行動の価値
# self.Q[s] => 状態sで行動aをとる場合の価値
if s in self.Q and sum(self.Q[s]) != 0:
# 価値評価に基づき行動(活用)
return np.argmax(self.Q[s])
else:
# ランダムな行動(探索)
return np.random.randint(len(actions))

# 報酬の記録を初期化
def init_log(self):
self.reward_log = []

# 報酬の記録
def log(self, reward):
self.reward_log.append(reward)

def show_reward_log(self, interval=50, episode=-1):
# そのepsilonの報酬をグラフ表示
if episode > 0:
rewards = self.reward_log[-interval:]
mean = np.round(np.mean(rewards), 3)
std = np.round(np.std(rewards), 3)
print("At Episode {} average reward is {} (+/-{}).".format(
episode, mean, std))
# 今までに獲得した報酬をグラフ表示
else:
indices = list(range(0, len(self.reward_log), interval))
means = []
stds = []
for i in indices:
rewards = self.reward_log[i:(i + interval)]
means.append(np.mean(rewards))
stds.append(np.std(rewards))
means = np.array(means)
stds = np.array(stds)
plt.figure()
plt.title("Reward History")
plt.grid()
plt.fill_between(indices, means - stds, means + stds,
alpha=0.1, color="g")
plt.plot(indices, means, "o-", color="g",
label="Rewards for each {} episode".format(interval))
plt.legend(loc="best")
plt.show()

次に環境を扱うためのクラスを実装します。
(強化学習5・6 (モンテカルロ法・TD法)と同様です。)

frozen_lake_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import gym
from gym.envs.registration import register
# スリップする設定(is_slippery)をオフ設定(学習に時間がかかるため)
register(id="FrozenLakeEasy-v0", entry_point="gym.envs.toy_text:FrozenLakeEnv", kwargs={"is_slippery": False})

def show_q_value(Q):
"""
FrozenLake-v0環境での価値は下記の通り。
各行動での評価を表しています。
+----+------+----+
| | 上 | |
| 左 | 平均 | 右 |
| | 下 | |
+-----+------+----+
"""
env = gym.make("FrozenLake-v0")
nrow = env.unwrapped.nrow
ncol = env.unwrapped.ncol
state_size = 3
q_nrow = nrow * state_size
q_ncol = ncol * state_size
reward_map = np.zeros((q_nrow, q_ncol))

for r in range(nrow):
for c in range(ncol):
s = r * nrow + c
state_exist = False
if isinstance(Q, dict) and s in Q:
state_exist = True
elif isinstance(Q, (np.ndarray, np.generic)) and s < Q.shape[0]:
state_exist = True

if state_exist:
# At the display map, the vertical index is reversed.
_r = 1 + (nrow - 1 - r) * state_size
_c = 1 + c * state_size
reward_map[_r][_c - 1] = Q[s][0] # 左 = 0
reward_map[_r - 1][_c] = Q[s][1] # 下 = 1
reward_map[_r][_c + 1] = Q[s][2] # 右 = 2
reward_map[_r + 1][_c] = Q[s][3] # 上 = 3
reward_map[_r][_c] = np.mean(Q[s]) # 中央

# 各状態・行動の評価を表示
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
plt.imshow(reward_map, cmap=cm.RdYlGn, interpolation="bilinear",
vmax=abs(reward_map).max(), vmin=-abs(reward_map).max())
# 報酬推移を表示
ax.set_xlim(-0.5, q_ncol - 0.5)
ax.set_ylim(-0.5, q_nrow - 0.5)
ax.set_xticks(np.arange(-0.5, q_ncol, state_size))
ax.set_yticks(np.arange(-0.5, q_nrow, state_size))
ax.set_xticklabels(range(ncol + 1))
ax.set_yticklabels(range(nrow + 1))
ax.grid(which="both")
plt.show()

Actor Critic法での学習を実行します。

53行目で行動評価(Q値)の更新を行い、54行目で状態価値の更新を行っています。
ValueベースとPolicyベース両方の特性を持っていることになります。

actor_critic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import numpy as np
import gym
from el_agent import ELAgent
from frozen_lake_util import show_q_value

class Actor(ELAgent):

def __init__(self, env):
super().__init__(epsilon=-1)
nrow = env.observation_space.n
ncol = env.action_space.n
self.actions = list(range(env.action_space.n))
self.Q = np.random.uniform(0, 1, nrow * ncol).reshape((nrow, ncol))

def softmax(self, x):
return np.exp(x) / np.sum(np.exp(x), axis=0)

def policy(self, s):
a = np.random.choice(self.actions, 1,
p=self.softmax(self.Q[s]))
return a[0]

class Critic():

def __init__(self, env):
states = env.observation_space.n
self.V = np.zeros(states)

class ActorCritic():

def __init__(self, actor_class, critic_class):
self.actor_class = actor_class
self.critic_class = critic_class

def train(self, env, episode_count=1000, gamma=0.9,
learning_rate=0.1, render=False, report_interval=50):
actor = self.actor_class(env)
critic = self.critic_class(env)

actor.init_log()
for e in range(episode_count):
s = env.reset()
done = False
while not done:
if render:
env.render()
a = actor.policy(s)
n_state, reward, done, info = env.step(a)

gain = reward + gamma * critic.V[n_state]
estimated = critic.V[s]
td = gain - estimated
actor.Q[s][a] += learning_rate * td # 行動評価(Q値)の更新
critic.V[s] += learning_rate * td # 状態価値の更新
s = n_state

else:
actor.log(reward)

if e != 0 and e % report_interval == 0:
actor.show_reward_log(episode=e)

return actor, critic

def train():
trainer = ActorCritic(Actor, Critic)
env = gym.make("FrozenLakeEasy-v0")
actor, critic = trainer.train(env, episode_count=3000)
show_q_value(actor.Q)
actor.show_reward_log()

if __name__ == "__main__":
train()
FrozenLake 各行動の評価

エピソード実行回数と獲得報酬平均の推移は次のようになります。
エピソード数と獲得報酬平均の推移

今まで試してきた手法より学習にかかるエピソード数は長くなっていますが、安定した報酬が得られるようになっています。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習7 (SARSA)

モンテカルロ法・TD法に続いてSARSAによる学習を試してみます。

Q-learning(TD法)は価値が最大となる状態に遷移する行動をとることを前提としますが、SARSAでは次の行動はself.Qに基づく戦略(self.Q)で決められることを前提とします。

まずはエージェントのベースになるクラスを実装します。(強化学習5・6 (モンテカルロ法・TD法)と同様です。)

el_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import numpy as np
import matplotlib.pyplot as plt

class ELAgent():

def __init__(self, epsilon):
self.Q = {}
self.epsilon = epsilon
self.reward_log = []

def policy(self, s, actions):
if np.random.random() < self.epsilon:
# ランダムな行動(探索)
return np.random.randint(len(actions))
else:
# self.Q => 状態における行動の価値
# self.Q[s] => 状態sで行動aをとる場合の価値
if s in self.Q and sum(self.Q[s]) != 0:
# 価値評価に基づき行動(活用)
return np.argmax(self.Q[s])
else:
# ランダムな行動(探索)
return np.random.randint(len(actions))

# 報酬の記録を初期化
def init_log(self):
self.reward_log = []

# 報酬の記録
def log(self, reward):
self.reward_log.append(reward)

def show_reward_log(self, interval=50, episode=-1):
# そのepsilonの報酬をグラフ表示
if episode > 0:
rewards = self.reward_log[-interval:]
mean = np.round(np.mean(rewards), 3)
std = np.round(np.std(rewards), 3)
print("At Episode {} average reward is {} (+/-{}).".format(
episode, mean, std))
# 今までに獲得した報酬をグラフ表示
else:
indices = list(range(0, len(self.reward_log), interval))
means = []
stds = []
for i in indices:
rewards = self.reward_log[i:(i + interval)]
means.append(np.mean(rewards))
stds.append(np.std(rewards))
means = np.array(means)
stds = np.array(stds)
plt.figure()
plt.title("Reward History")
plt.grid()
plt.fill_between(indices, means - stds, means + stds,
alpha=0.1, color="g")
plt.plot(indices, means, "o-", color="g",
label="Rewards for each {} episode".format(interval))
plt.legend(loc="best")
plt.show()

次に環境を扱うためのクラスを実装します。
(強化学習5・6 (モンテカルロ法・TD法)と同様です。)

frozen_lake_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import gym
from gym.envs.registration import register
# スリップする設定(is_slippery)をオフ設定(学習に時間がかかるため)
register(id="FrozenLakeEasy-v0", entry_point="gym.envs.toy_text:FrozenLakeEnv", kwargs={"is_slippery": False})

def show_q_value(Q):
"""
FrozenLake-v0環境での価値は下記の通り。
各行動での評価を表しています。
+----+------+----+
| | 上 | |
| 左 | 平均 | 右 |
| | 下 | |
+-----+------+----+
"""
env = gym.make("FrozenLake-v0")
nrow = env.unwrapped.nrow
ncol = env.unwrapped.ncol
state_size = 3
q_nrow = nrow * state_size
q_ncol = ncol * state_size
reward_map = np.zeros((q_nrow, q_ncol))

for r in range(nrow):
for c in range(ncol):
s = r * nrow + c
state_exist = False
if isinstance(Q, dict) and s in Q:
state_exist = True
elif isinstance(Q, (np.ndarray, np.generic)) and s < Q.shape[0]:
state_exist = True

if state_exist:
# At the display map, the vertical index is reversed.
_r = 1 + (nrow - 1 - r) * state_size
_c = 1 + c * state_size
reward_map[_r][_c - 1] = Q[s][0] # 左 = 0
reward_map[_r - 1][_c] = Q[s][1] # 下 = 1
reward_map[_r][_c + 1] = Q[s][2] # 右 = 2
reward_map[_r + 1][_c] = Q[s][3] # 上 = 3
reward_map[_r][_c] = np.mean(Q[s]) # 中央

# 各状態・行動の評価を表示
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
plt.imshow(reward_map, cmap=cm.RdYlGn, interpolation="bilinear",
vmax=abs(reward_map).max(), vmin=-abs(reward_map).max())
# 報酬推移を表示
ax.set_xlim(-0.5, q_ncol - 0.5)
ax.set_ylim(-0.5, q_nrow - 0.5)
ax.set_xticks(np.arange(-0.5, q_ncol, state_size))
ax.set_yticks(np.arange(-0.5, q_nrow, state_size))
ax.set_xticklabels(range(ncol + 1))
ax.set_yticklabels(range(nrow + 1))
ax.grid(which="both")
plt.show()

SARSAでの学習を実行します。
TD法とほとんど同じですが、gainを算出する箇所のみが違います。コメントをご参照ください。

sarsa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from collections import defaultdict
import gym
from el_agent import ELAgent
from frozen_lake_util import show_q_value

class SARSAAgent(ELAgent):

def __init__(self, epsilon=0.1):
super().__init__(epsilon)

def learn(self, env, episode_count=1000, gamma=0.9,
learning_rate=0.1, render=False, report_interval=50):
self.init_log()
actions = list(range(env.action_space.n))
self.Q = defaultdict(lambda: [0] * len(actions))
for e in range(episode_count):
s = env.reset()
done = False
a = self.policy(s, actions)
while not done:
if render:
env.render()
n_state, reward, done, info = env.step(a)

# Q-learning(価値が最大となる状態に遷移する行動をとる)
# gain = reward + gamma * max(self.Q[n_state])

# SARSA(self.Qに基づく戦略に従って決定)
n_action = self.policy(n_state, actions) # On-policy
gain = reward + gamma * self.Q[n_state][n_action]

estimated = self.Q[s][a]
self.Q[s][a] += learning_rate * (gain - estimated)
s = n_state
a = n_action
else:
self.log(reward)

if e != 0 and e % report_interval == 0:
self.show_reward_log(episode=e)

def train():
agent = SARSAAgent()
env = gym.make("FrozenLakeEasy-v0")
agent.learn(env, episode_count=500)
show_q_value(agent.Q)
agent.show_reward_log()

if __name__ == "__main__":
train()
FrozenLake 各行動の評価

モンテカルロ法、TD法とは違う経路が高く評価されていますが、全体的にゴール向かう行動が高く評価されています。

エピソード実行回数と獲得報酬平均の推移は次のようになります。
エピソード数と獲得報酬平均の推移

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習6 (TD法)

前回は1エピソードをプレイした実績に基づく更新を行うモンテカルロ法による学習を試してみました。
今回は1回の行動ごとに更新を行うTD法と試してみます。

モンテカルロ法に比べて行動修正のスピードが早いため学習が効率的です。
ただ更新は見積りベースになるためモンテカルロ法より適切な行動をとれるかどうか不確実になります。

まずはエージェントのベースになるクラスを実装します。(強化学習5 (モンテカルロ法)と同様です。)

el_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import numpy as np
import matplotlib.pyplot as plt

class ELAgent():

def __init__(self, epsilon):
self.Q = {}
self.epsilon = epsilon
self.reward_log = []

def policy(self, s, actions):
if np.random.random() < self.epsilon:
# ランダムな行動(探索)
return np.random.randint(len(actions))
else:
# self.Q => 状態における行動の価値
# self.Q[s] => 状態sで行動aをとる場合の価値
if s in self.Q and sum(self.Q[s]) != 0:
# 価値評価に基づき行動(活用)
return np.argmax(self.Q[s])
else:
# ランダムな行動(探索)
return np.random.randint(len(actions))

# 報酬の記録を初期化
def init_log(self):
self.reward_log = []

# 報酬の記録
def log(self, reward):
self.reward_log.append(reward)

def show_reward_log(self, interval=50, episode=-1):
# そのepsilonの報酬をグラフ表示
if episode > 0:
rewards = self.reward_log[-interval:]
mean = np.round(np.mean(rewards), 3)
std = np.round(np.std(rewards), 3)
print("At Episode {} average reward is {} (+/-{}).".format(
episode, mean, std))
# 今までに獲得した報酬をグラフ表示
else:
indices = list(range(0, len(self.reward_log), interval))
means = []
stds = []
for i in indices:
rewards = self.reward_log[i:(i + interval)]
means.append(np.mean(rewards))
stds.append(np.std(rewards))
means = np.array(means)
stds = np.array(stds)
plt.figure()
plt.title("Reward History")
plt.grid()
plt.fill_between(indices, means - stds, means + stds,
alpha=0.1, color="g")
plt.plot(indices, means, "o-", color="g",
label="Rewards for each {} episode".format(interval))
plt.legend(loc="best")
plt.show()

次に環境を扱うためのクラスを実装します。

FrozenLakeEasy-v0は、強化学習を行うための環境を提供するライブラリOpenAI Gymの環境の1つです。
4 x 4 マスの迷路でところどころに穴があいていて穴に落ちるとゲーム終了となります。
穴に落ちずにゴールに到着すると報酬が得られます。(強化学習5 (モンテカルロ法)と同様です。)

frozen_lake_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import gym
from gym.envs.registration import register
# スリップする設定(is_slippery)をオフ設定(学習に時間がかかるため)
register(id="FrozenLakeEasy-v0", entry_point="gym.envs.toy_text:FrozenLakeEnv", kwargs={"is_slippery": False})

def show_q_value(Q):
"""
FrozenLake-v0環境での価値は下記の通り。
各行動での評価を表しています。
+----+------+----+
| | 上 | |
| 左 | 平均 | 右 |
| | 下 | |
+-----+------+----+
"""
env = gym.make("FrozenLake-v0")
nrow = env.unwrapped.nrow
ncol = env.unwrapped.ncol
state_size = 3
q_nrow = nrow * state_size
q_ncol = ncol * state_size
reward_map = np.zeros((q_nrow, q_ncol))

for r in range(nrow):
for c in range(ncol):
s = r * nrow + c
state_exist = False
if isinstance(Q, dict) and s in Q:
state_exist = True
elif isinstance(Q, (np.ndarray, np.generic)) and s < Q.shape[0]:
state_exist = True

if state_exist:
# At the display map, the vertical index is reversed.
_r = 1 + (nrow - 1 - r) * state_size
_c = 1 + c * state_size
reward_map[_r][_c - 1] = Q[s][0] # 左 = 0
reward_map[_r - 1][_c] = Q[s][1] # 下 = 1
reward_map[_r][_c + 1] = Q[s][2] # 右 = 2
reward_map[_r + 1][_c] = Q[s][3] # 上 = 3
reward_map[_r][_c] = np.mean(Q[s]) # 中央

# 各状態・行動の評価を表示
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
plt.imshow(reward_map, cmap=cm.RdYlGn, interpolation="bilinear",
vmax=abs(reward_map).max(), vmin=-abs(reward_map).max())
# 報酬推移を表示
ax.set_xlim(-0.5, q_ncol - 0.5)
ax.set_ylim(-0.5, q_nrow - 0.5)
ax.set_xticks(np.arange(-0.5, q_ncol, state_size))
ax.set_yticks(np.arange(-0.5, q_nrow, state_size))
ax.set_xticklabels(range(ncol + 1))
ax.set_yticklabels(range(nrow + 1))
ax.grid(which="both")
plt.show()

TD法での学習を実行します。
(このファイルだけ強化学習5 (モンテカルロ法)と異なります。)

q_learning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from collections import defaultdict
import gym
from el_agent import ELAgent
from frozen_lake_util import show_q_value

class QLearningAgent(ELAgent):

def __init__(self, epsilon=0.1):
super().__init__(epsilon)

def learn(self, env, episode_count=1000, gamma=0.9,
learning_rate=0.1, render=False, report_interval=50):
self.init_log()
actions = list(range(env.action_space.n)) # actions = [0, 1, 2, 3]
# self.Q Qテーブル(ある状態sにおける行動aの報酬を管理)
self.Q = defaultdict(lambda: [0] * len(actions))
for e in range(episode_count):
s = env.reset()
done = False
while not done:
if render:
env.render()
a = self.policy(s, actions)
n_state, reward, done, info = env.step(a)

# reward => 報酬
# gamma => 割引率
# max(self.Q[n_state]) => 価値が最大になるような行動をとる
gain = reward + gamma * max(self.Q[n_state])
estimated = self.Q[s][a]
# Qテーブルの更新(状態sの行動aにおける報酬を管理)
# learning_rate => 学習率
# estimated => 既存の見積り
self.Q[s][a] += learning_rate * (gain - estimated)
s = n_state
else:
self.log(reward)

if e != 0 and e % report_interval == 0:
self.show_reward_log(episode=e)

# 学習を行う
def train():
agent = QLearningAgent()
env = gym.make("FrozenLakeEasy-v0")
agent.learn(env, episode_count=500)
show_q_value(agent.Q)
agent.show_reward_log()

if __name__ == "__main__":
train()
FrozenLake 各行動の評価

全体的にゴール向かう行動が高く評価されていることがわかります。

エピソード実行回数と獲得報酬平均の推移は次のようになります。
エピソード数と獲得報酬平均の推移

モンテカルロ法と同様にうまく学習できています。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習5 (モンテカルロ法)

モンテカルロ法は行動の修正を実績に基づき行う方法です。
エピソードが終了した後で、獲得できた報酬の総和をもとに修正を行うシンプルな方法です。

まずはエージェントのベースになるクラスを実装します。

el_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import numpy as np
import matplotlib.pyplot as plt

class ELAgent():

def __init__(self, epsilon):
self.Q = {}
self.epsilon = epsilon
self.reward_log = []

def policy(self, s, actions):
if np.random.random() < self.epsilon:
# ランダムな行動(探索)
return np.random.randint(len(actions))
else:
# self.Q => 状態における行動の価値
# self.Q[s] => 状態sで行動aをとる場合の価値
if s in self.Q and sum(self.Q[s]) != 0:
# 価値評価に基づき行動(活用)
return np.argmax(self.Q[s])
else:
# ランダムな行動(探索)
return np.random.randint(len(actions))

# 報酬の記録を初期化
def init_log(self):
self.reward_log = []

# 報酬の記録
def log(self, reward):
self.reward_log.append(reward)

def show_reward_log(self, interval=50, episode=-1):
# そのepsilonの報酬をグラフ表示
if episode > 0:
rewards = self.reward_log[-interval:]
mean = np.round(np.mean(rewards), 3)
std = np.round(np.std(rewards), 3)
print("At Episode {} average reward is {} (+/-{}).".format(
episode, mean, std))
# 今までに獲得した報酬をグラフ表示
else:
indices = list(range(0, len(self.reward_log), interval))
means = []
stds = []
for i in indices:
rewards = self.reward_log[i:(i + interval)]
means.append(np.mean(rewards))
stds.append(np.std(rewards))
means = np.array(means)
stds = np.array(stds)
plt.figure()
plt.title("Reward History")
plt.grid()
plt.fill_between(indices, means - stds, means + stds,
alpha=0.1, color="g")
plt.plot(indices, means, "o-", color="g",
label="Rewards for each {} episode".format(interval))
plt.legend(loc="best")
plt.show()

次に環境を扱うためのクラスを実装します。

FrozenLakeEasy-v0は、強化学習を行うための環境を提供するライブラリOpenAI Gymの環境の1つです。
4 x 4 マスの迷路でところどころに穴があいていて穴に落ちるとゲーム終了となります。
穴に落ちずにゴールに到着すると報酬が得られます。

frozen_lake_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import gym
from gym.envs.registration import register
# スリップする設定(is_slippery)をオフ設定(学習に時間がかかるため)
register(id="FrozenLakeEasy-v0", entry_point="gym.envs.toy_text:FrozenLakeEnv", kwargs={"is_slippery": False})

def show_q_value(Q):
"""
FrozenLake-v0環境での価値は下記の通り。
各行動での評価を表しています。
+----+------+----+
| | 上 | |
| 左 | 平均 | 右 |
| | 下 | |
+-----+------+----+
"""
env = gym.make("FrozenLake-v0")
nrow = env.unwrapped.nrow
ncol = env.unwrapped.ncol
state_size = 3
q_nrow = nrow * state_size
q_ncol = ncol * state_size
reward_map = np.zeros((q_nrow, q_ncol))

for r in range(nrow):
for c in range(ncol):
s = r * nrow + c
state_exist = False
if isinstance(Q, dict) and s in Q:
state_exist = True
elif isinstance(Q, (np.ndarray, np.generic)) and s < Q.shape[0]:
state_exist = True

if state_exist:
# At the display map, the vertical index is reversed.
_r = 1 + (nrow - 1 - r) * state_size
_c = 1 + c * state_size
reward_map[_r][_c - 1] = Q[s][0] # 左 = 0
reward_map[_r - 1][_c] = Q[s][1] # 下 = 1
reward_map[_r][_c + 1] = Q[s][2] # 右 = 2
reward_map[_r + 1][_c] = Q[s][3] # 上 = 3
reward_map[_r][_c] = np.mean(Q[s]) # 中央

# 各状態・行動の評価を表示
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
plt.imshow(reward_map, cmap=cm.RdYlGn, interpolation="bilinear",
vmax=abs(reward_map).max(), vmin=-abs(reward_map).max())
# 報酬推移を表示
ax.set_xlim(-0.5, q_ncol - 0.5)
ax.set_ylim(-0.5, q_nrow - 0.5)
ax.set_xticks(np.arange(-0.5, q_ncol, state_size))
ax.set_yticks(np.arange(-0.5, q_nrow, state_size))
ax.set_xticklabels(range(ncol + 1))
ax.set_yticklabels(range(nrow + 1))
ax.grid(which="both")
plt.show()

モンテカルロ法での学習を実行します。

monte_carlo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import math
from collections import defaultdict
import gym
from el_agent import ELAgent
from frozen_lake_util import show_q_value

class MonteCarloAgent(ELAgent):

def __init__(self, epsilon=0.1):
super().__init__(epsilon)

def learn(self, env, episode_count=1000, gamma=0.9,
render=False, report_interval=50):
self.init_log()
actions = list(range(env.action_space.n))
self.Q = defaultdict(lambda: [0] * len(actions))
N = defaultdict(lambda: [0] * len(actions))

for e in range(episode_count):
s = env.reset()
done = False
# エピソードの終了まで実行する
experience = []
while not done:
if render:
env.render()
a = self.policy(s, actions)
n_state, reward, done, info = env.step(a)
experience.append({"state": s, "action": a, "reward": reward})
s = n_state
else:
self.log(reward)

# 各状態・各行動を評価する。
for i, x in enumerate(experience):
s, a = x["state"], x["action"]

# Calculate discounted future reward of s.
# 状態s
G, t = 0, 0
for j in range(i, len(experience)):
G += math.pow(gamma, t) * experience[j]["reward"]
t += 1

N[s][a] += 1 # count of s, a pair
alpha = 1 / N[s][a]
self.Q[s][a] += alpha * (G - self.Q[s][a])

if e != 0 and e % report_interval == 0:
self.show_reward_log(episode=e)

def train():
agent = MonteCarloAgent(epsilon=0.1)
env = gym.make("FrozenLakeEasy-v0")
agent.learn(env, episode_count=500)
show_q_value(agent.Q)
agent.show_reward_log()

if __name__ == "__main__":
train()

FrozenLakeの設定と各行動の評価結果は下記のようになります。
緑色が濃いほど評価が高いことを意味します。

FrozenLake 各行動の評価

全体的にゴール向かう行動が高く評価されていることがわかります。

エピソード実行回数と獲得報酬平均の推移は次のようになります。
エピソード数と獲得報酬平均の推移
エピソード実行が50回付近で報酬が1近くに達していて学習がだいたい完了していることがわかります。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

強化学習4 (経験から計画を立てる)

遷移関数や報酬関数が分からない場合は、行動してみて状態の遷移や得られる報酬を調べていくことになります。
今回は、Epsilon-Greedy法で探索(経験を蓄積する)と活用(行動する)の割合によって報酬がどのように変わっていくかを調査します。

何枚かのコインから1枚を選んで、投げた時に表がでれば報酬が得られるゲームで実装しています。
以下がコードになります。(コメントにて処理を確認して頂ければ幸いです)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import random
import numpy as np


class CoinToss():

# head_probsは各コインの表が出る確率
def __init__(self, head_probs, max_episode_steps=30):
self.head_probs = head_probs
self.max_episode_steps = max_episode_steps
self.toss_count = 0

def __len__(self):
return len(self.head_probs)

def reset(self):
self.toss_count = 0

# actionはコインのindex
def step(self, action):
# max_episode_stepsはデフォルト30回
final = self.max_episode_steps - 1
if self.toss_count > final:
raise Exception("The step count exceeded maximum. Please reset env.")
else:
# 終了確認
done = True if self.toss_count == final else False

if action >= len(self.head_probs):
raise Exception("The No.{} coin doesn't exist.".format(action))
else:
head_prob = self.head_probs[action]
if random.random() < head_prob:
reward = 1.0
else:
reward = 0.0
self.toss_count += 1
return reward, done


class EpsilonGreedyAgent():

def __init__(self, epsilon):
self.epsilon = epsilon
self.V = []

def policy(self):
coins = range(len(self.V))
# random関数は0.0から1.0の値を返す。
if random.random() < self.epsilon:
# 活用優先(行動する)
return random.choice(coins) # コインのindexをランダムに返す
else:
# 探索優先(経験を蓄積)
return np.argmax(self.V) # 配列中の最大値のindexを返す

def play(self, env):
# 初期化
# len(env)はコインごとの表がでる確率の配列
N = [0] * len(env) # [0, 0, 0, 0, ・・・]という配列ができる
self.V = [0] * len(env)

env.reset()
done = False
rewards = []
while not done:
# ランダムに選ばれたコインか、平均報酬が一番高いコインが返る
selected_coin = self.policy()
reward, done = env.step(selected_coin)
rewards.append(reward)

n = N[selected_coin]
coin_average = self.V[selected_coin]
new_average = (coin_average * n + reward) / (n + 1)
N[selected_coin] += 1
# そのコインの平均報酬
self.V[selected_coin] = new_average
# 全ての報酬を配列で返す
return rewards

if __name__ == "__main__":
import pandas as pd
import matplotlib.pyplot as plt

def main():
env = CoinToss([0.1, 0.5, 0.1, 0.9, 0.1]) # 各コインの表がでる確率
# 探索優先(0.0に近いほど)か活用優先(1.0に近いほど)かの割合
epsilons = [0.0, 0.1, 0.2, 0.5, 0.8]
game_steps = list(range(10, 1010, 10)) # 10回から1000回まで10回おきに実行する
result = {}
for e in epsilons:
agent = EpsilonGreedyAgent(epsilon=e)
means = []
for s in game_steps:
env.max_episode_steps = s
rewards = agent.play(env)
means.append(np.mean(rewards))
result["epsilon={}".format(e)] = means
result["count of coin toss"] = game_steps
result = pd.DataFrame(result)
result.set_index("count of coin toss", drop=True, inplace=True)
result.plot.line(figsize=(10, 5))
plt.show()

main()

結果
epsilonが0.0の場合はコイントスの回数を増やしても報酬がぜんぜん増えていません。
epsilonが0.1か0.2くらいだといい結果がでています。
一般的にepsilonは0.1に設定することが多いようですが、今回の結果はそれを裏付けることとなりました。