深層強化学習 Dueling Network

Dueling Networkという深層強化学習を実装します。

Dueling NetworkではQ関数を2つに分けて学習します。

  • 状態sだけで決まるV(s)
  • 行動しだいできまるAdvantage関数 A(s,a)

最後にこの2関数を足してQ(s,a)と決めます。

使用するパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画ファイルを保存する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0),
dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
anim.save('6_3movie_cartpole_dueling_network.mp4') # 動画のファイル名と保存です

経験(Transition)を表すnamedtupleを生成します。
状態 state、行動 action、次の状態 next_state、報酬 rewardに容易にアクセスできます。

1
2
3
# namedtupleを生成
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

定数を宣言します。

1
2
3
4
5
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
GAMMA = 0.99 # 時間割引率
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 500 # 最大試行回数

経験を保存するメモリクラスを定義します。
経験を保存する関数 push、ランダムに経験を取り出す関数 sampleがあります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 経験を保存するメモリクラスを定義します
class ReplayMemory:

def __init__(self, CAPACITY):
self.capacity = CAPACITY # メモリの最大長さ
self.memory = [] # 経験を保存する変数
self.index = 0 # 保存するindexを示す変数

def push(self, state, action, state_next, reward):
'''transition = (state, action, state_next, reward)をメモリに保存する'''
if len(self.memory) < self.capacity:
self.memory.append(None) # メモリが満タンでないときは足す

# namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
self.memory[self.index] = Transition(state, action, state_next, reward)

self.index = (self.index + 1) % self.capacity # 保存するindexを1つずらす

def sample(self, batch_size):
'''batch_size分だけ、ランダムに保存内容を取り出す'''
return random.sample(self.memory, batch_size)

def __len__(self):
'''関数lenに対して、現在の変数memoryの長さを返す'''
return len(self.memory)

Dueling Network型のディープ・ニューラルネットワークを構築します。

Advantage側の層 fc3_advと状態価値の層 fc3_vを作成します。
最後に出力する行動価値 output は上記2つを足したものとなります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Dueling Network型のディープ・ニューラルネットワークの構築
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):

def __init__(self, n_in, n_mid, n_out):
super(Net, self).__init__()
self.fc1 = nn.Linear(n_in, n_mid)
self.fc2 = nn.Linear(n_mid, n_mid)
# Dueling Network
self.fc3_adv = nn.Linear(n_mid, n_out) # Advantage側
self.fc3_v = nn.Linear(n_mid, 1) # 価値V側

def forward(self, x):
h1 = F.relu(self.fc1(x))
h2 = F.relu(self.fc2(h1))

adv = self.fc3_adv(h2) # この出力はReLUしない
val = self.fc3_v(h2).expand(-1, adv.size(1)) # この出力はReLUしない
# valはadvと足し算するために、サイズを[minibatchx1]から[minibatchx2]にexpandする
# adv.size(1)は出力する行動の種類数の2

output = val + adv - adv.mean(1, keepdim=True).expand(-1, adv.size(1))
# val+advからadvの平均値を引き算する
# adv.mean(1, keepdim=True) で列方向(行動の種類方向)に平均し、サイズが[minibatch×1]
# expandで展開して、サイズ[minibatchx2]

return output

Brainクラスを実装します。
このクラスがニューラルネットワークを保持します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# エージェントが持つ脳となるクラスです、Dueling Networkを実行します
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # CartPoleの行動(右に左に押す)の2を取得

# 経験を記憶するメモリオブジェクトを生成
self.memory = ReplayMemory(CAPACITY)

# ニューラルネットワークを構築
n_in, n_mid, n_out = num_states, 32, num_actions
self.main_q_network = Net(n_in, n_mid, n_out) # Netクラスを使用
self.target_q_network = Net(n_in, n_mid, n_out) # Netクラスを使用
print(self.main_q_network) # ネットワークの形を出力

# 最適化手法の設定
self.optimizer = optim.Adam(
self.main_q_network.parameters(), lr=0.0001)

def replay(self):
'''Experience Replayでネットワークの結合パラメータを学習'''

# 1. メモリサイズの確認
if len(self.memory) < BATCH_SIZE:
return

# 2. ミニバッチの作成
self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()

# 3. 教師信号となるQ(s_t, a_t)値を求める
self.expected_state_action_values = self.get_expected_state_action_values()

# 4. 結合パラメータの更新
self.update_main_q_network()

def decide_action(self, state, episode):
'''現在の状態に応じて、行動を決定する'''
# ε-greedy法で徐々に最適行動のみを採用する
epsilon = 0.5 * (1 / (episode + 1))

if epsilon <= np.random.uniform(0, 1):
self.main_q_network.eval() # ネットワークを推論モードに切り替える
with torch.no_grad():
action = self.main_q_network(state).max(1)[1].view(1, 1)
# ネットワークの出力の最大値のindexを取り出します = max(1)[1]
# .view(1,1)は[torch.LongTensor of size 1] を size 1x1 に変換します

else:
# 0,1の行動をランダムに返す
action = torch.LongTensor(
[[random.randrange(self.num_actions)]]) # 0,1の行動をランダムに返す
# actionは[torch.LongTensor of size 1x1]の形になります

return action

def make_minibatch(self):
'''2. ミニバッチの作成'''

# 2.1 メモリからミニバッチ分のデータを取り出す
transitions = self.memory.sample(BATCH_SIZE)

# 2.2 各変数をミニバッチに対応する形に変形
# transitionsは1stepごとの(state, action, state_next, reward)が、BATCH_SIZE分格納されている
# つまり、(state, action, state_next, reward)×BATCH_SIZE
# これをミニバッチにしたい。つまり
# (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする
batch = Transition(*zip(*transitions))

# 2.3 各変数の要素をミニバッチに対応する形に変形し、ネットワークで扱えるようVariableにする
# 例えばstateの場合、[torch.FloatTensor of size 1x4]がBATCH_SIZE分並んでいるのですが、
# それを torch.FloatTensor of size BATCH_SIZEx4 に変換します
# 状態、行動、報酬、non_finalの状態のミニバッチのVariableを作成
# catはConcatenates(結合)のことです。
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

return batch, state_batch, action_batch, reward_batch, non_final_next_states

def get_expected_state_action_values(self):
'''3. 教師信号となるQ(s_t, a_t)値を求める'''

# 3.1 ネットワークを推論モードに切り替える
self.main_q_network.eval()
self.target_q_network.eval()

# 3.2 ネットワークが出力したQ(s_t, a_t)を求める
# self.model(state_batch)は、右左の両方のQ値を出力しており
# [torch.FloatTensor of size BATCH_SIZEx2]になっている。
# ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが右か左かのindexを求め
# それに対応するQ値をgatherでひっぱり出す。
self.state_action_values = self.main_q_network(
self.state_batch).gather(1, self.action_batch)

# 3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意。

# cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
self.batch.next_state)))
# まずは全部0にしておく
next_state_values = torch.zeros(BATCH_SIZE)

a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)

# 次の状態での最大Q値の行動a_mをMain Q-Networkから求める
# 最後の[1]で行動に対応したindexが返る
a_m[non_final_mask] = self.main_q_network(
self.non_final_next_states).detach().max(1)[1]

# 次の状態があるものだけにフィルターし、size 32を32×1へ
a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

# 次の状態があるindexの、行動a_mのQ値をtarget Q-Networkから求める
# detach()で取り出す
# squeeze()でsize[minibatch×1]を[minibatch]に。
next_state_values[non_final_mask] = self.target_q_network(
self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

# 3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
expected_state_action_values = self.reward_batch + GAMMA * next_state_values

return expected_state_action_values

def update_main_q_network(self):
'''4. 結合パラメータの更新'''

# 4.1 ネットワークを訓練モードに切り替える
self.main_q_network.train()

# 4.2 損失関数を計算する(smooth_l1_lossはHuberloss)
# expected_state_action_valuesは
# sizeが[minbatch]になっているので、unsqueezeで[minibatch x 1]へ
loss = F.smooth_l1_loss(self.state_action_values,
self.expected_state_action_values.unsqueeze(1))

# 4.3 結合パラメータを更新する
self.optimizer.zero_grad() # 勾配をリセット
loss.backward() # バックプロパゲーションを計算
self.optimizer.step() # 結合パラメータを更新

def update_target_q_network(self): # DDQNで追加
'''Target Q-NetworkをMainと同じにする'''
self.target_q_network.load_state_dict(self.main_q_network.state_dict())

棒付き台車を表すAgentクラスを実装します。
関数 memorizeでメモリオブジェクトに経験したデータ transitionを格納します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# CartPoleで動くエージェントクラスです、棒付き台車そのものになります
class Agent:
def __init__(self, num_states, num_actions):
'''課題の状態と行動の数を設定する'''
self.brain = Brain(num_states, num_actions) # エージェントが行動を決定するための頭脳を生成

def update_q_function(self):
'''Q関数を更新する'''
self.brain.replay()

def get_action(self, state, episode):
'''行動を決定する'''
action = self.brain.decide_action(state, episode)
return action

def memorize(self, state, action, state_next, reward):
'''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
self.brain.memory.push(state, action, state_next, reward)

def update_target_q_function(self):
'''Target Q-NetworkをMain Q-Networkと同じに更新'''
self.brain.update_target_q_network()

CartPoleを実行する環境クラスを定義します。
表形式表現のように離散化は行わず、観測結果 observationをそのままstateとして使用します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# CartPoleを実行する環境のクラスです
class Environment:

def __init__(self):
self.env = gym.make(ENV) # 実行する課題を設定
num_states = self.env.observation_space.shape[0] # 課題の状態と行動の数を設定
num_actions = self.env.action_space.n # CartPoleの行動(右に左に押す)の2を取得
# 環境内で行動するAgentを生成
self.agent = Agent(num_states, num_actions)

def run(self):
'''実行'''
episode_10_list = np.zeros(10) # 10試行分の立ち続けたstep数を格納し、平均ステップ数を出力に利用
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
episode_final = False # 最後の試行フラグ
frames = [] # 最後の試行を動画にするために画像を格納する変数

for episode in range(NUM_EPISODES): # 試行数分繰り返す
observation = self.env.reset() # 環境の初期化

state = observation # 観測をそのまま状態sとして使用
state = torch.from_numpy(state).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state = torch.unsqueeze(state, 0) # size 4をsize 1x4に変換

for step in range(MAX_STEPS): # 1エピソードのループ

# 動画描画をコメントアウトしています
if episode_final is True: # 最終試行ではframesに各時刻の画像を追加していく
frames.append(self.env.render(mode='rgb_array'))

action = self.agent.get_action(state, episode) # 行動を求める

# 行動a_tの実行により、s_{t+1}とdoneフラグを求める
# actionから.item()を指定して、中身を取り出す
observation_next, _, done, _ = self.env.step(
action.item()) # rewardとinfoは使わないので_にする

# 報酬を与える。さらにepisodeの終了評価と、state_nextを設定する
if done: # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
state_next = None # 次の状態はないので、Noneを格納

# 直近10episodeの立てたstep数リストに追加
episode_10_list = np.hstack(
(episode_10_list[1:], step + 1))

if step < 195:
reward = torch.FloatTensor(
[-1.0]) # 途中でこけたら罰則として報酬-1を与える
complete_episodes = 0 # 連続成功記録をリセット
else:
reward = torch.FloatTensor([1.0]) # 立ったまま終了時は報酬1を与える
complete_episodes = complete_episodes + 1 # 連続記録を更新
else:
reward = torch.FloatTensor([0.0]) # 普段は報酬0
state_next = observation_next # 観測をそのまま状態とする
state_next = torch.from_numpy(state_next).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state_next = torch.unsqueeze(state_next, 0) # size 4をsize 1x4に変換

# メモリに経験を追加
self.agent.memorize(state, action, state_next, reward)

# Experience ReplayでQ関数を更新する
self.agent.update_q_function()

# 観測の更新
state = state_next

# 終了時の処理
if done:
print('%d Episode: Finished after %d steps:10試行の平均step数 = %.1lf' % (
episode, step + 1, episode_10_list.mean()))

# DDQNで追加、2試行に1度、Target Q-NetworkをMainと同じにコピーする
if(episode % 2 == 0):
self.agent.update_target_q_function()
break

if episode_final is True:
# 動画描画をコメントアウトしています
# 動画を保存と描画
display_frames_as_gif(frames)
break

# 10連続で200step経ち続けたら成功
if complete_episodes >= 10:
print('10回連続成功')
episode_final = True # 次の試行を描画を行う最終試行とする

学習を実行します。

1
2
3
# main クラス
cartpole_env = Environment()
cartpole_env.run()

実行結果1 エピソード0~10

実行結果2 エピソード150~165(終了)

164エピソードで学習が完了しました。
出力された動画ファイル’6_3movie_cartpole_dueling_network.mp4’は下記のようになります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

深層強化学習 DDQN (Double-DQN)

DDQN (Double-DQN) では2つのネットワークを使用して Main Q-Networkの更新量を求めます。

  • Main Q-Network
    次の状態でのQ値が最大となる行動を求めるネットワーク
  • Target Q-Network
    Main Q-Networkで求めた行動のQ値を求めるネットワーク

使用するパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画ファイルを保存する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
print(len(frames))
print(frames[0].shape)
plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0),
dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)

anim.save('6_2movie_cartpole_DDQN.mp4') # 動画のファイル名と保存です

経験(Transition)を表すnamedtupleを生成します。
状態 state、行動 action、次の状態 next_state、報酬 rewardに容易にアクセスできます。

1
2
3
# namedtupleを生成
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

定数を宣言します。

1
2
3
4
5
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
GAMMA = 0.99 # 時間割引率
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 500 # 最大試行回数

経験データを保存しておくメモリクラス ReplayMemoryを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 経験を保存するメモリクラスを定義します
class ReplayMemory:

def __init__(self, CAPACITY):
self.capacity = CAPACITY # メモリの最大長さ
self.memory = [] # 経験を保存する変数
self.index = 0 # 保存するindexを示す変数

def push(self, state, action, state_next, reward):
'''transition = (state, action, state_next, reward)をメモリに保存する'''
if len(self.memory) < self.capacity:
self.memory.append(None) # メモリが満タンでないときは足す

# namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
self.memory[self.index] = Transition(state, action, state_next, reward)

self.index = (self.index + 1) % self.capacity # 保存するindexを1つずらす

def sample(self, batch_size):
'''batch_size分だけ、ランダムに保存内容を取り出す'''
return random.sample(self.memory, batch_size)

def __len__(self):
'''関数lenに対して、現在の変数memoryの長さを返す'''
return len(self.memory)

ディープ・ニューラルネットワークの構築を行うクラスを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# ディープ・ニューラルネットワークの構築
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):

def __init__(self, n_in, n_mid, n_out):
super(Net, self).__init__()
self.fc1 = nn.Linear(n_in, n_mid)
self.fc2 = nn.Linear(n_mid, n_mid)
self.fc3 = nn.Linear(n_mid, n_out)

def forward(self, x):
h1 = F.relu(self.fc1(x))
h2 = F.relu(self.fc2(h1))
output = self.fc3(h2)
return output

エージェントが持つ脳となるクラスを定義します。

replay関数では下記の4処理を行います。

  1. メモリサイズの確認
  2. ミニバッチの作成 → make_minibatch()関数
  3. 教師信号となるQ値を求める → get_expected_state_action_values()関数
  4. 結合パラメータの更新 → update_main_q_network()関数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# エージェントが持つ脳となるクラスです、DDQNを実行します
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # CartPoleの行動(右に左に押す)の2を取得

# 経験を記憶するメモリオブジェクトを生成
self.memory = ReplayMemory(CAPACITY)

# ニューラルネットワークを構築
n_in, n_mid, n_out = num_states, 32, num_actions
self.main_q_network = Net(n_in, n_mid, n_out) # Netクラスを使用
self.target_q_network = Net(n_in, n_mid, n_out) # Netクラスを使用
print(self.main_q_network) # ネットワークの形を出力

# 最適化手法の設定
self.optimizer = optim.Adam(self.main_q_network.parameters(), lr=0.0001)

def replay(self):
'''Experience Replayでネットワークの結合パラメータを学習'''
# 1. メモリサイズの確認
if len(self.memory) < BATCH_SIZE:
return

# 2. ミニバッチの作成
self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()

# 3. 教師信号となるQ(s_t, a_t)値を求める
self.expected_state_action_values = self.get_expected_state_action_values()

# 4. 結合パラメータの更新
self.update_main_q_network()

def decide_action(self, state, episode):
'''現在の状態に応じて、行動を決定する'''
# ε-greedy法で徐々に最適行動のみを採用する
epsilon = 0.5 * (1 / (episode + 1))

if epsilon <= np.random.uniform(0, 1):
self.main_q_network.eval() # ネットワークを推論モードに切り替える
with torch.no_grad():
action = self.main_q_network(state).max(1)[1].view(1, 1)
# ネットワークの出力の最大値のindexを取り出します = max(1)[1]
# .view(1,1)は[torch.LongTensor of size 1] を size 1x1 に変換します

else:
# 0,1の行動をランダムに返す
action = torch.LongTensor(
[[random.randrange(self.num_actions)]]) # 0,1の行動をランダムに返す
# actionは[torch.LongTensor of size 1x1]の形になります

return action

def make_minibatch(self):
'''2. ミニバッチの作成'''

# 2.1 メモリからミニバッチ分のデータを取り出す
transitions = self.memory.sample(BATCH_SIZE)

# 2.2 各変数をミニバッチに対応する形に変形
# transitionsは1stepごとの(state, action, state_next, reward)が、BATCH_SIZE分格納されている
# つまり、(state, action, state_next, reward)×BATCH_SIZE
# これをミニバッチにしたい。つまり
# (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする
batch = Transition(*zip(*transitions))

# 2.3 各変数の要素をミニバッチに対応する形に変形し、ネットワークで扱えるようVariableにする
# 例えばstateの場合、[torch.FloatTensor of size 1x4]がBATCH_SIZE分並んでいるのですが、
# それを torch.FloatTensor of size BATCH_SIZEx4 に変換します
# 状態、行動、報酬、non_finalの状態のミニバッチのVariableを作成
# catはConcatenates(結合)のことです。
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

return batch, state_batch, action_batch, reward_batch, non_final_next_states

def get_expected_state_action_values(self):
'''3. 教師信号となるQ(s_t, a_t)値を求める'''

# 3.1 ネットワークを推論モードに切り替える
self.main_q_network.eval()
self.target_q_network.eval()

# 3.2 ネットワークが出力したQ(s_t, a_t)を求める
# self.model(state_batch)は、右左の両方のQ値を出力しており
# [torch.FloatTensor of size BATCH_SIZEx2]になっている。
# ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが右か左かのindexを求め
# それに対応するQ値をgatherでひっぱり出す。
self.state_action_values = self.main_q_network(
self.state_batch).gather(1, self.action_batch)

# 3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意。

# cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
self.batch.next_state)))
# まずは全部0にしておく
next_state_values = torch.zeros(BATCH_SIZE)

a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)

# 次の状態での最大Q値の行動a_mをMain Q-Networkから求める
# 最後の[1]で行動に対応したindexが返る
a_m[non_final_mask] = self.main_q_network(
self.non_final_next_states).detach().max(1)[1]

# 次の状態があるものだけにフィルターし、size 32を32×1へ
a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

# 次の状態があるindexの、行動a_mのQ値をtarget Q-Networkから求める
# detach()で取り出す
# squeeze()でsize[minibatch×1]を[minibatch]に。
next_state_values[non_final_mask] = self.target_q_network(
self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

# 3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
expected_state_action_values = self.reward_batch + GAMMA * next_state_values

return expected_state_action_values

def update_main_q_network(self):
'''4. 結合パラメータの更新'''

# 4.1 ネットワークを訓練モードに切り替える
self.main_q_network.train()

# 4.2 損失関数を計算する(smooth_l1_lossはHuberloss)
# expected_state_action_valuesは
# sizeが[minbatch]になっているので、unsqueezeで[minibatch x 1]へ
loss = F.smooth_l1_loss(self.state_action_values,
self.expected_state_action_values.unsqueeze(1))

# 4.3 結合パラメータを更新する
self.optimizer.zero_grad() # 勾配をリセット
loss.backward() # バックプロパゲーションを計算
self.optimizer.step() # 結合パラメータを更新

def update_target_q_network(self): # DDQNで追加
'''Target Q-NetworkをMainと同じにする'''
self.target_q_network.load_state_dict(self.main_q_network.state_dict())

CartPoleで動くAgentクラスを実装します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# CartPoleで動くエージェントクラスです、棒付き台車そのものになります
class Agent:
def __init__(self, num_states, num_actions):
'''課題の状態と行動の数を設定する'''
self.brain = Brain(num_states, num_actions) # エージェントが行動を決定するための頭脳を生成

def update_q_function(self):
'''Q関数を更新する'''
self.brain.replay()

def get_action(self, state, episode):
'''行動を決定する'''
action = self.brain.decide_action(state, episode)
return action

def memorize(self, state, action, state_next, reward):
'''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
self.brain.memory.push(state, action, state_next, reward)

def update_target_q_function(self):
'''Target Q-NetworkをMain Q-Networkと同じに更新'''
self.brain.update_target_q_network()

CartPoleを実行する環境クラスを実装します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# CartPoleを実行する環境のクラスです
class Environment:

def __init__(self):
self.env = gym.make(ENV) # 実行する課題を設定
num_states = self.env.observation_space.shape[0] # 課題の状態と行動の数を設定
num_actions = self.env.action_space.n # CartPoleの行動(右に左に押す)の2を取得
# 環境内で行動するAgentを生成
self.agent = Agent(num_states, num_actions)

def run(self):
'''実行'''
episode_10_list = np.zeros(10) # 10試行分の立ち続けたstep数を格納し、平均ステップ数を出力に利用
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
episode_final = False # 最後の試行フラグ
frames = [] # 最後の試行を動画にするために画像を格納する変数

for episode in range(NUM_EPISODES): # 試行数分繰り返す
observation = self.env.reset() # 環境の初期化

state = observation # 観測をそのまま状態sとして使用
state = torch.from_numpy(state).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state = torch.unsqueeze(state, 0) # size 4をsize 1x4に変換

for step in range(MAX_STEPS): # 1エピソードのループ

# 動画描画をコメントアウトしています
if episode_final is True: # 最終試行ではframesに各時刻の画像を追加していく
frames.append(self.env.render(mode='rgb_array'))

action = self.agent.get_action(state, episode) # 行動を求める

# 行動a_tの実行により、s_{t+1}とdoneフラグを求める
# actionから.item()を指定して、中身を取り出す
observation_next, _, done, _ = self.env.step(
action.item()) # rewardとinfoは使わないので_にする

# 報酬を与える。さらにepisodeの終了評価と、state_nextを設定する
if done: # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
state_next = None # 次の状態はないので、Noneを格納

# 直近10episodeの立てたstep数リストに追加
episode_10_list = np.hstack(
(episode_10_list[1:], step + 1))

if step < 195:
reward = torch.FloatTensor(
[-1.0]) # 途中でこけたら罰則として報酬-1を与える
complete_episodes = 0 # 連続成功記録をリセット
else:
reward = torch.FloatTensor([1.0]) # 立ったまま終了時は報酬1を与える
complete_episodes = complete_episodes + 1 # 連続記録を更新
else:
reward = torch.FloatTensor([0.0]) # 普段は報酬0
state_next = observation_next # 観測をそのまま状態とする
state_next = torch.from_numpy(state_next).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state_next = torch.unsqueeze(state_next, 0) # size 4をsize 1x4に変換

# メモリに経験を追加
self.agent.memorize(state, action, state_next, reward)

# Experience ReplayでQ関数を更新する
self.agent.update_q_function()

# 観測の更新
state = state_next

# 終了時の処理
if done:
print('%d Episode: Finished after %d steps:10試行の平均step数 = %.1lf' % (
episode, step + 1, episode_10_list.mean()))

# DDQNで追加、2試行に1度、Target Q-NetworkをMainと同じにコピーする
if(episode % 2 == 0):
self.agent.update_target_q_function()
break

if episode_final is True:
# 動画描画をコメントアウトしています
# 動画を保存と描画
display_frames_as_gif(frames)
break

# 10連続で200step経ち続けたら成功
if complete_episodes >= 10:
print('10回連続成功')
episode_final = True # 次の試行を描画を行う最終試行とする

学習を実行します。

1
2
3
# main クラス
cartpole_env = Environment()
cartpole_env.run()

実行結果1 エピソード0~20

実行結果2 エピソード20~47(終了)

46エピソードで学習が完了しました。
出力された動画ファイル’6_2movie_cartpole_DDQN.mp4’は下記のようになります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

深層強化学習 DQN (Deep Q-Network)

深層強化学習でOpenAI GymのCartPoleを攻略します。
深層強化学習は強化学習にディープラーニングを使用した方法です。

使用するパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画ファイルを保存する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
anim.save('5_4movie_cartpole_DQN.mp4') # 動画ファイルを保存します。

namedtupleを使うことで、値をフィールド名とペアで格納でき、値に対してフィールド名でアクセスできて便利です。
namedtupleの動作を確認するためのサンプルを実装してみます。

1
2
3
4
5
6
7
from collections import namedtuple

Tr = namedtuple('tr', ('name_a', 'value_b'))
Tr_object = Tr('名前Aです', 100)

print(Tr_object)
print(Tr_object.value_b)

実行結果1

経験(Transition)を表すnamedtupleを生成します。
状態 state、行動 action、次の状態 next_state、報酬 rewardに容易にアクセスできます。

1
2
3
# namedtupleを生成
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

定数を宣言します。

1
2
3
4
5
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
GAMMA = 0.99 # 時間割引率
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 500 # 最大試行回数

ミニバッチ学習を実現するために、経験データを保存しておくメモリクラス ReplayMemoryを定義します。
経験を保存する関数 push、ランダムに経験を取り出す関数 sampleがあります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 経験を保存するメモリクラスを定義します
class ReplayMemory:

def __init__(self, CAPACITY):
self.capacity = CAPACITY # メモリの最大長さ
self.memory = [] # 経験を保存する変数
self.index = 0 # 保存するindexを示す変数

def push(self, state, action, state_next, reward):
'''transition = (state, action, state_next, reward)をメモリに保存する'''
if len(self.memory) < self.capacity:
self.memory.append(None) # メモリが満タンでないときは足す

# namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
self.memory[self.index] = Transition(state, action, state_next, reward)

self.index = (self.index + 1) % self.capacity # 保存するindexを1つずらす

def sample(self, batch_size):
'''batch_size分だけ、ランダムに保存内容を取り出す'''
return random.sample(self.memory, batch_size)

def __len__(self):
'''関数lenに対して、現在の変数memoryの長さを返す'''
return len(self.memory)

Brainクラスを実装します。
このクラスがニューラルネットワークを保持します。

関数 replayはメモリクラスからミニバッチを取り出し、ニューラルネットワークの結合パラメータを学習し、Q関数を更新します。
この関数の流れは下記のようになります。

  1. メモリサイズの確認

  2. メモリサイズがミニバッチより小さい間は何もしない。

  3. ミニバッチの作成

  4. メモリからミニバッチ分のデータを取り出す。

  5. 各変数をミニバッチに対応する形に変形する。

  6. 各変数の要素をミニバッチに対応する形に変形する。

  7. 教師信号となるQ値を算出

  8. ネットワークを推論モードに切り替える。

  9. ネットワークが出力したQ値を求める。

  10. maxQ値を求める。

  11. 教師となるQ値をQ学習の式から求める。

  12. 結合パラメータの更新

  13. ネットワークを訓練モードに切り替える。

  14. 損失関数の値を計算する。

  15. 結合パラメータを更新する。

関数 decide_actionはε-greedy法によりランダムな行動 または 現在の状態に対してQ値が最大となる行動のindexを返します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# エージェントが持つ脳となるクラスです、DQNを実行します
# Q関数をディープラーニングのネットワークをクラスとして定義
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # CartPoleの行動(右に左に押す)の2を取得

# 経験を記憶するメモリオブジェクトを生成
self.memory = ReplayMemory(CAPACITY)

# ニューラルネットワークを構築
self.model = nn.Sequential()
self.model.add_module('fc1', nn.Linear(num_states, 32))
self.model.add_module('relu1', nn.ReLU())
self.model.add_module('fc2', nn.Linear(32, 32))
self.model.add_module('relu2', nn.ReLU())
self.model.add_module('fc3', nn.Linear(32, num_actions))

print(self.model) # ネットワークの形を出力

# 最適化手法の設定
self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)

def replay(self):
'''Experience Replayでネットワークの結合パラメータを学習'''
# -----------------------------------------
# 1. メモリサイズの確認
# -----------------------------------------
# 1.1 メモリサイズがミニバッチより小さい間は何もしない
if len(self.memory) < BATCH_SIZE:
return

# -----------------------------------------
# 2. ミニバッチの作成
# -----------------------------------------
# 2.1 メモリからミニバッチ分のデータを取り出す
transitions = self.memory.sample(BATCH_SIZE)

# 2.2 各変数をミニバッチに対応する形に変形
# transitionsは1stepごとの(state, action, state_next, reward)が、BATCH_SIZE分格納されている
# つまり、(state, action, state_next, reward)×BATCH_SIZE
# これをミニバッチにしたい。つまり
# (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする
batch = Transition(*zip(*transitions))

# 2.3 各変数の要素をミニバッチに対応する形に変形し、ネットワークで扱えるようVariableにする
# 例えばstateの場合、[torch.FloatTensor of size 1x4]がBATCH_SIZE分並んでいるのですが、
# それを torch.FloatTensor of size BATCH_SIZEx4 に変換します
# 状態、行動、報酬、non_finalの状態のミニバッチのVariableを作成
# catはConcatenates(結合)のことです。
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

# -----------------------------------------
# 3. 教師信号となるQ(s_t, a_t)値を求める
# -----------------------------------------
# 3.1 ネットワークを推論モードに切り替える
self.model.eval()

# 3.2 ネットワークが出力したQ(s_t, a_t)を求める
# self.model(state_batch)は、右左の両方のQ値を出力しており
# [torch.FloatTensor of size BATCH_SIZEx2]になっている。
# ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが右か左かのindexを求め
# それに対応するQ値をgatherでひっぱり出す。
state_action_values = self.model(state_batch).gather(1, action_batch)

# 3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意。

# cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
batch.next_state)))
# まずは全部0にしておく
next_state_values = torch.zeros(BATCH_SIZE)

# 次の状態があるindexの最大Q値を求める
# 出力にアクセスし、max(1)で列方向の最大値の[値、index]を求めます
# そしてそのQ値(index=0)を出力します
# detachでその値を取り出します
next_state_values[non_final_mask] = self.model(
non_final_next_states).max(1)[0].detach()

# 3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
expected_state_action_values = reward_batch + GAMMA * next_state_values

# -----------------------------------------
# 4. 結合パラメータの更新
# -----------------------------------------
# 4.1 ネットワークを訓練モードに切り替える
self.model.train()

# 4.2 損失関数を計算する(smooth_l1_lossはHuberloss)
# expected_state_action_valuesは
# sizeが[minbatch]になっているので、unsqueezeで[minibatch x 1]へ
loss = F.smooth_l1_loss(state_action_values,
expected_state_action_values.unsqueeze(1))

# 4.3 結合パラメータを更新する
self.optimizer.zero_grad() # 勾配をリセット
loss.backward() # バックプロパゲーションを計算
self.optimizer.step() # 結合パラメータを更新

def decide_action(self, state, episode):
'''現在の状態に応じて、行動を決定する'''
# ε-greedy法で徐々に最適行動のみを採用する
epsilon = 0.5 * (1 / (episode + 1))

if epsilon <= np.random.uniform(0, 1):
self.model.eval() # ネットワークを推論モードに切り替える
with torch.no_grad():
action = self.model(state).max(1)[1].view(1, 1)
# ネットワークの出力の最大値のindexを取り出します = max(1)[1]
# .view(1,1)は[torch.LongTensor of size 1] を size 1x1 に変換します

else:
# 0,1の行動をランダムに返す
action = torch.LongTensor(
[[random.randrange(self.num_actions)]]) # 0,1の行動をランダムに返す
# actionは[torch.LongTensor of size 1x1]の形になります

return action

棒付き台車を表すAgentクラスを実装します。
関数 memorizeでメモリオブジェクトに経験したデータ transitionを格納します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# CartPoleで動くエージェントクラスです、棒付き台車そのものになります
class Agent:
def __init__(self, num_states, num_actions):
'''課題の状態と行動の数を設定する'''
self.brain = Brain(num_states, num_actions) # エージェントが行動を決定するための頭脳を生成

def update_q_function(self):
'''Q関数を更新する'''
self.brain.replay()

def get_action(self, state, episode):
'''行動を決定する'''
action = self.brain.decide_action(state, episode)
return action

def memorize(self, state, action, state_next, reward):
'''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
self.brain.memory.push(state, action, state_next, reward)

CartPoleを実行する環境クラスを定義します。
表形式表現のように離散化は行わず、観測結果 observationをそのままstateとして使用します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# CartPoleを実行する環境のクラスです
class Environment:

def __init__(self):
self.env = gym.make(ENV) # 実行する課題を設定
num_states = self.env.observation_space.shape[0] # 課題の状態数4を取得
num_actions = self.env.action_space.n # CartPoleの行動(右に左に押す)の2を取得
self.agent = Agent(num_states, num_actions) # 環境内で行動するAgentを生成

def run(self):
'''実行'''
episode_10_list = np.zeros(10) # 10試行分の立ち続けたstep数を格納し、平均ステップ数を出力に利用
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
episode_final = False # 最後の試行フラグ
frames = [] # 最後の試行を動画にするために画像を格納する変数

for episode in range(NUM_EPISODES): # 最大試行数分繰り返す
observation = self.env.reset() # 環境の初期化

state = observation # 観測をそのまま状態sとして使用
state = torch.from_numpy(state).type(
torch.FloatTensor) # NumPy変数をPyTorchのテンソルに変換
state = torch.unsqueeze(state, 0) # size 4をsize 1x4に変換

for step in range(MAX_STEPS): # 1エピソードのループ

if episode_final is True: # 最終試行ではframesに各時刻の画像を追加していく
frames.append(self.env.render(mode='rgb_array'))

action = self.agent.get_action(state, episode) # 行動を求める

# 行動a_tの実行により、s_{t+1}とdoneフラグを求める
# actionから.item()を指定して、中身を取り出す
observation_next, _, done, _ = self.env.step(
action.item()) # rewardとinfoは使わないので_にする

# 報酬を与える。さらにepisodeの終了評価と、state_nextを設定する
if done: # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
state_next = None # 次の状態はないので、Noneを格納

# 直近10episodeの立てたstep数リストに追加
episode_10_list = np.hstack(
(episode_10_list[1:], step + 1))

if step < 195:
reward = torch.FloatTensor(
[-1.0]) # 途中でこけたら罰則として報酬-1を与える
complete_episodes = 0 # 連続成功記録をリセット
else:
reward = torch.FloatTensor([1.0]) # 立ったまま終了時は報酬1を与える
complete_episodes = complete_episodes + 1 # 連続記録を更新
else:
reward = torch.FloatTensor([0.0]) # 普段は報酬0
state_next = observation_next # 観測をそのまま状態とする
state_next = torch.from_numpy(state_next).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state_next = torch.unsqueeze(state_next, 0) # size 4をsize 1x4に変換

# メモリに経験を追加
self.agent.memorize(state, action, state_next, reward)

# Experience ReplayでQ関数を更新する
self.agent.update_q_function()

# 観測の更新
state = state_next

# 終了時の処理
if done:
print('%d Episode: Finished after %d steps:10試行の平均step数 = %.1lf' % (
episode, step + 1, episode_10_list.mean()))
break

if episode_final is True:
# 動画を保存と描画
display_frames_as_gif(frames)
break

# 10連続で200step経ち続けたら成功
if complete_episodes >= 10:
print('10回連続成功')
episode_final = True # 次の試行を描画を行う最終試行とする

学習を実行します。

1
2
3
# main クラス
cartpole_env = Environment()
cartpole_env.run()

実行結果2 エピソード0~10

実行結果3 エピソード70~86(終了)

85エピソードで学習が完了しました。
出力された動画ファイル’5_4movie_cartpole_DQN.mp4’は下記のようになります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

PyTorchで手書き数字画像の分類課題MNIST

PyTorchはディープラーニング用パッケージです。
PyTorchを使用して手書き数字の画像データ(MNIST)を分類するディープラーニングを実装します。

まずは手書き数字の画像データMNISTをダウンロードします。
変数mnistにデータが格納されます。

1
2
3
# 手書き数字の画像データMNISTをダウンロード
from sklearn.datasets import fetch_openml
mnist = fetch_openml('mnist_784', version=1, data_home=".") # data_homeは保存先を指定します

PyTorchによるディープラーニングは下記の手順で行います。

  1. データの前処理
  2. DataLoaderの作成
  3. ネットワークの構築
  4. 誤差関数と最適化手法の設定
  5. 学習と推論の設定
  6. 学習と推論の実行

データの前処理では、データをニューラルネットワークに投入できるように加工します。

1
2
3
4
5
6
7
# 1. データの前処理(画像データとラベルに分割し、正規化)
X = mnist.data / 255 # 0-255を0-1に正規化
y = mnist.target

import numpy as np
y = np.array(y)
y = y.astype(np.int32)

MNISTのデータの1つ目を可視化してみます。

1
2
3
4
5
6
# MNISTのデータの1つ目を可視化する
import matplotlib.pyplot as plt
%matplotlib inline

plt.imshow(X[0].reshape(28, 28), cmap='gray')
print("この画像データのラベルは{:.0f}です".format(y[0]))

実行結果1

正規化したMNISTデータをPyTorchで扱えるようにDataLoaderという変数に変換します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 2. DataLoderの作成
import torch
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

# 2.1 データを訓練とテストに分割(6:1)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=1/7, random_state=0)

# 2.2 データをPyTorchのTensorに変換
X_train = torch.Tensor(X_train)
X_test = torch.Tensor(X_test)
y_train = torch.LongTensor(y_train)
y_test = torch.LongTensor(y_test)

# 2.3 データとラベルをセットにしたDatasetを作成
ds_train = TensorDataset(X_train, y_train)
ds_test = TensorDataset(X_test, y_test)

# 2.4 データセットのミニバッチサイズを指定した、Dataloaderを作成
# Chainerのiterators.SerialIteratorと似ている
loader_train = DataLoader(ds_train, batch_size=64, shuffle=True)
loader_test = DataLoader(ds_test, batch_size=64, shuffle=False)

ニューラルネットワークを構築します。
‘fc’は全結合(Fully Connecteed)層を意味し、’relu’は活性化関数にReLU関数を使用することを意味します。

1
2
3
4
5
6
7
8
9
10
11
# 3. ネットワークの構築
from torch import nn

model = nn.Sequential()
model.add_module('fc1', nn.Linear(28*28*1, 100))
model.add_module('relu1', nn.ReLU())
model.add_module('fc2', nn.Linear(100, 100))
model.add_module('relu2', nn.ReLU())
model.add_module('fc3', nn.Linear(100, 10))

print(model)

実行結果2

ネットワークの誤差関数と最適化手法の設定を行います。

分類問題では誤差関数にクロスエントロピー誤差関数を使用します。
最適化手法にはAdamというアルゴリズムを使います。

1
2
3
4
5
6
7
8
# 4. 誤差関数と最適化手法の設定
from torch import optim

# 誤差関数の設定
loss_fn = nn.CrossEntropyLoss() # 変数名にはcriterionが使われることも多い

# 重みを学習する際の最適化手法の選択
optimizer = optim.Adam(model.parameters(), lr=0.01)

学習と推論での動作を設定します。

学習では訓練データを入力して出力を求め、出力と正解との誤差を計算し、誤差をバックプロパゲーションして結合パラメータを更新・学習させます。
引数のepochとはデータを一通り使用する1試行のことを意味します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 5. 学習と推論の設定
# 5-1. 学習1回でやることを定義します
def train(epoch):
model.train() # ネットワークを学習モードに切り替える

# データローダーから1ミニバッチずつ取り出して計算する
for data, targets in loader_train:

optimizer.zero_grad() # 一度計算された勾配結果を0にリセット
outputs = model(data) # 入力dataをinputし、出力を求める
loss = loss_fn(outputs, targets) # 出力と訓練データの正解との誤差を求める
loss.backward() # 誤差のバックプロパゲーションを求める
optimizer.step() # バックプロパゲーションの値で重みを更新する

print("epoch{}:終了\n".format(epoch))

推論ではテストデータを入力して出力を求め、正解と一致した割合を計算します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 5. 学習と推論の設定
# 5-2. 推論1回でやることを定義します
def test():
model.eval() # ネットワークを推論モードに切り替える
correct = 0

# データローダーから1ミニバッチずつ取り出して計算する
with torch.no_grad(): # 微分は推論では必要ない
for data, targets in loader_test:

outputs = model(data) # 入力dataをinputし、出力を求める

# 推論する
_, predicted = torch.max(outputs.data, 1) # 確率が最大のラベルを求める
correct += predicted.eq(targets.data.view_as(predicted)).sum() # 正解と一緒だったらカウントアップ

# 正解率を出力
data_num = len(loader_test.dataset) # データの総数
print('\nテストデータの正解率: {}/{} ({:.0f}%)\n'.format(correct, data_num, 100. * correct / data_num))

試しに学習をせずにテストデータで推論してみます。

1
2
# 学習なしにテストデータで推論
test()

実行結果3
正解率は8%となりました。

次にニューラルネットワークの結合パラメータを学習させてから推論を行います。
6万件の訓練データに対して3epoch学習させます。

1
2
3
4
5
# 6. 学習と推論の実行
for epoch in range(3):
train(epoch)

test()

実行結果4
学習後には正解率が95%となり、手書き数字をほぼ正しく認識できるようになりました

試しに2020番目の画像データ推論し、予測結果と画像データ、正解を表示してみます。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 2020番目の画像データを推論してみる
index = 2020

model.eval() # ネットワークを推論モードに切り替える
data = X_test[index]
output = model(data) # 入力dataをinputし、出力を求める
_, predicted = torch.max(output.data, 0) # 確率が最大のラベルを求める

print("予測結果は{}".format(predicted))

X_test_show = (X_test[index]).numpy()
plt.imshow(X_test_show.reshape(28, 28), cmap='gray')
print("この画像データの正解ラベルは{:.0f}です".format(y_test[index]))

実行結果5

なかなか癖のある数字ですが、正しく判定できていることが分かります。

(Google Colaboratoryで動作確認しています。)


参考 > つくりながら学ぶ!深層強化学習 サポートページ

CartPoleをQ学習で制御

CartPoleをQ学習で制御していきます。

実装するクラスは下記の3つです。

  • Agentクラス
    カートを表します。
    Q関数を更新する関数と次の行動を決定する関数があります。
    Brainクラスのオブジェクトをメンバーに持ちます。
  • Brainクラス
    Agentの頭脳となるクラスです。
    Q学習を実装します。
    状態を離散化する関数とQテーブルを更新する関数とQテーブルから行動を決定する関数があります。
  • Environmentクラス
    OpenAI Gymの実行環境です。

まず必要なパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画を描画するための関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames),
interval=50)

anim.save('3_4movie_cartpole.mp4') # 動画ファイルを作成します。

各定数を定義します。
CartPole-v0は200ステップ 棒が立ち続ければゲーム攻略となります。(MAX_STEPS=200)

1
2
3
4
5
6
7
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
NUM_DIZITIZED = 6 # 各状態の離散値への分割数
GAMMA = 0.99 # 時間割引率
ETA = 0.5 # 学習係数
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 1000 # 最大試行回数

Agentクラスを実装します。

初期関数initでCartPoleの状態数と行動数を受け取り、自分の頭脳となるBrainクラスを生成します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Agent:
'''CartPoleのエージェントクラスです、棒付き台車そのものになります'''
def __init__(self, num_states, num_actions):
self.brain = Brain(num_states, num_actions) # エージェントが行動を決定するための頭脳を生成

def update_Q_function(self, observation, action, reward, observation_next):
'''Q関数の更新'''
self.brain.update_Q_table(
observation, action, reward, observation_next)

def get_action(self, observation, step):
'''行動の決定'''
action = self.brain.decide_action(observation, step)
return action

Brainクラスを実装します。
Qテーブルの更新と行動の決定を行います。

試行回数が少ないときは探索行動が多くなるようなε-greedy法としています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Brain:
'''エージェントが持つ脳となるクラスです、Q学習を実行します'''
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # CartPoleの行動(右に左に押す)の2を取得

# Qテーブルを作成。行数は状態を分割数^(4変数)にデジタル変換した値、列数は行動数を示す
self.q_table = np.random.uniform(low=0, high=1, size=(
NUM_DIZITIZED**num_states, num_actions))

def bins(self, clip_min, clip_max, num):
'''観測した状態(連続値)を離散値にデジタル変換する閾値を求める'''
return np.linspace(clip_min, clip_max, num + 1)[1:-1]

def digitize_state(self, observation):
'''観測したobservation状態を、離散値に変換する'''
cart_pos, cart_v, pole_angle, pole_v = observation
digitized = [
np.digitize(cart_pos, bins=self.bins(-2.4, 2.4, NUM_DIZITIZED)),
np.digitize(cart_v, bins=self.bins(-3.0, 3.0, NUM_DIZITIZED)),
np.digitize(pole_angle, bins=self.bins(-0.5, 0.5, NUM_DIZITIZED)),
np.digitize(pole_v, bins=self.bins(-2.0, 2.0, NUM_DIZITIZED))
]
return sum([x * (NUM_DIZITIZED**i) for i, x in enumerate(digitized)])

def update_Q_table(self, observation, action, reward, observation_next):
'''QテーブルをQ学習により更新'''
state = self.digitize_state(observation) # 状態を離散化
state_next = self.digitize_state(observation_next) # 次の状態を離散化
Max_Q_next = max(self.q_table[state_next][:])
self.q_table[state, action] = self.q_table[state, action] + \
ETA * (reward + GAMMA * Max_Q_next - self.q_table[state, action])

def decide_action(self, observation, episode):
'''ε-greedy法で徐々に最適行動のみを採用する'''
state = self.digitize_state(observation)
epsilon = 0.5 * (1 / (episode + 1))

if epsilon <= np.random.uniform(0, 1):
action = np.argmax(self.q_table[state][:])
else:
action = np.random.choice(self.num_actions) # 0,1の行動をランダムに返す
return action

Environmentクラスを定義します。
10回連続で195ステップ以上棒が立ち続ければ学習成功としています。

成功後に動画保存のため1回再実行しています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class Environment:
'''CartPoleを実行する環境のクラスです'''
def __init__(self):
self.env = gym.make(ENV) # 実行する課題を設定
num_states = self.env.observation_space.shape[0] # 課題の状態の数4を取得
num_actions = self.env.action_space.n # CartPoleの行動(右に左に押す)の2を取得
self.agent = Agent(num_states, num_actions) # 環境内で行動するAgentを生成

def run(self):
'''実行'''
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
is_episode_final = False # 最終試行フラグ
frames = [] # 動画用に画像を格納する変数

for episode in range(NUM_EPISODES): # 試行数分繰り返す
observation = self.env.reset() # 環境の初期化

for step in range(MAX_STEPS): # 1エピソードのループ

if is_episode_final is True: # 最終試行ではframesに各時刻の画像を追加していく
frames.append(self.env.render(mode='rgb_array'))

# 行動を求める
action = self.agent.get_action(observation, episode)

# 行動a_tの実行により、s_{t+1}, r_{t+1}を求める
observation_next, _, done, _ = self.env.step(
action) # rewardとinfoは使わないので_にする

# 報酬を与える
if done: # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
if step < 195:
reward = -1 # 途中でこけたら罰則として報酬-1を与える
complete_episodes = 0 # 195step以上連続で立ち続けた試行数をリセット
else:
reward = 1 # 立ったまま終了時は報酬1を与える
complete_episodes += 1 # 連続記録を更新
else:
reward = 0 # 途中の報酬は0

# step+1の状態observation_nextを用いて,Q関数を更新する
self.agent.update_Q_function(
observation, action, reward, observation_next)

# 観測の更新
observation = observation_next

# 終了時の処理
if done:
print('{0} Episode: Finished after {1} time steps'.format(
episode, step + 1))
break

if is_episode_final is True: # 最終試行では動画を保存と描画
display_frames_as_gif(frames)
break

if complete_episodes >= 10: # 10連続成功なら
print('10回連続成功')
is_episode_final = True # 次の試行を描画を行う最終試行とする

Environmentクラスのオブジェクトを生成し、関数runを実行します。

1
2
3
# main
cartpole_env = Environment()
cartpole_env.run()

実行結果 エピソード0~18

実行結果 エピソード261~276(終了)
275エピソードで学習が完了しています。(195ステップ以上を10エピソード連続で成功)

強化学習後のプレイ動画は’3_4movie_cartpole.mp4’ファイルで保存されています。

棒が倒れずにうまくバランスがとれていることが分かります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

多変数・連続値で示される情報の表形式表現「CartPole」

CartPoleの状態を1つの変数に落とし込みます。

まずCartPoleの状態は次の4つの情報で表現されます。

変数 説明
カート位置 -2.4~2.4
カート速度 -Inf~Inf
棒の角度 -41.8°~41.8°
棒の角速度 -Inf~Inf

これら4つの情報は連続値となります。
連続値のままですとQ関数を用いて表形式で表現できません。

表形式で表現するために、連続値を離散化してデジタル化します。

離散化とは、連続した値を不連続な値に分割することです
例えばカート位置を6つの値(0~5)で離散化する場合には、-2.4~-1.6の場合は0、-1.6~-0.8の場合は1・・・というように変換します。
また-2.4を超えてマイナスになる可能性もあるので-Inf~-1.6を0とし、1.6~-Infを5とします。
変換後の0~5の値は離散変数と呼ばれます。

ほかの3つの情報も6つの値で離散化しますと変数が4種類ありますので6の4乗、つまり1296種類のデジタル値で表現できることになります。

CartPoleでとれる行動はカートを右に押すか、カートを左に押すの2通りですのでCartPoleのQ関数は1296行×2列の表形式で表現できます。
Q関数は各状態で各行動をとったときにその後得られる予定の割引報酬和を表します。

では状態の離散化を実装していきます。
ますは使用するパッケージをインポートします。

1
2
3
4
5
# 使用するパッケージの宣言
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

定数を定義します。

1
2
3
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
NUM_DIZITIZED = 6 # 各状態の離散値への分割数

CartPoleを実行し、環境の初期状態を取得します。

1
2
3
# CartPoleを実行してみる
env = gym.make(ENV) # 実行する課題を設定
observation = env.reset() # 環境の初期化

離散化の閾値を計算する関数を定義します。

1
2
3
4
# 離散化の閾値を求める
def bins(clip_min, clip_max, num):
'''観測した状態(連続値)を離散値にデジタル変換する閾値を求める'''
return np.linspace(clip_min, clip_max, num + 1)[1:-1]

np.linspace()関数は等間隔の数列を生成する命令です。
動作確認のための下記のコードを実行します。

1
np.linspace(-2.4, 2.4, 6 + 1)

実行結果1

閾値としましては上記で得られる配列の最初と最後の要素は不要なのでスライスします。

1
np.linspace(-2.4, 2.4, 6 + 1)[1:-1]

実行結果2

bins()関数で求めた閾値に応じて、連続変数を離散化する関数を作成します。4変数を一度に変換します。
np.digitizeは状態変数のリストをbinsに応じてデジタル値に変換します。

最後に4つの離散変数を6進数で計算しreturnしています。

1
2
3
4
5
6
7
8
9
10
def digitize_state(observation):
'''観測したobservation状態を、離散値に変換する'''
cart_pos, cart_v, pole_angle, pole_v = observation
digitized = [
np.digitize(cart_pos, bins=bins(-2.4, 2.4, NUM_DIZITIZED)),
np.digitize(cart_v, bins=bins(-3.0, 3.0, NUM_DIZITIZED)),
np.digitize(pole_angle, bins=bins(-0.5, 0.5, NUM_DIZITIZED)),
np.digitize(pole_v, bins=bins(-2.0, 2.0, NUM_DIZITIZED))]
print('digitized', digitized)
return sum([x * (NUM_DIZITIZED**i) for i, x in enumerate(digitized)])

動作確認のためdigitize_state()関数をコールして結果を確認します。

1
print('digitize_state', digitize_state(observation))

実行結果3

CartPoleの状態を離散化し表形式表現できましたので、次回はQ学習を実装しCartPoleを制御していきます。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

倒立振子課題「CartPole」

今回は強化学習アルゴリズムを実装したり、性能を比較するための実行環境であるOpenAI Gymを使います。

OpenAI Gymでは課題となる実行環境がいくつか用意されていますが、定番の倒立振子課題 CartPole を試します。
倒立振子課題は台車の上に回転軸を固定した棒を立て、その棒が倒れないように台車を右・左と細かく動かす制御課題です。

ますは必要なライブラリをインストールします。(Pythonバージョンは3.6です)

1
2
3
4
5
6
pip install gym
pip install matplotlib
pip install JSAnimation
pip uninstall pyglet -y
pip install pyglet==1.2.4
conda install -c conda-forge ffmpeg

Jupyter Notebookを開きコーディングを始めます。

使用するパッケージは次の通りです。

1
2
3
4
5
# 使用するパッケージの宣言
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画を描画するための関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
anim.save('3_2movie_cartpole.mp4') # 動画を保存

CartPoleを実行するところを実装します。
今回はきちんとした制御をせず、ただランダムに左右動かします。

gym.make()はOpenAIの実行環境を立ち上げる関数です。
環境を実行するとき一番最初にenv.reset()をコールしておきます。
env.step()はゲーム環境を1step進める関数で、引数に0を渡すと左に、1を渡すと右にカートを移動します。

返値は次の4つを返します。

変数 説明
observation カートと棒の状態
reward 即時報酬
カートの位置が±2.4以内 かつ 棒が20.9度以上傾いてない場合 -> 1
カートの位置が±2.4を超える または 棒が20.9度以上傾いている場合 -> 0
done 終了状態になるとTrue
info デバッグなどの必要情報
1
2
3
4
5
6
7
8
9
# CartPoleをランダムに動かす
frames = []
env = gym.make('CartPole-v0')
observation = env.reset() # 最初に環境のresetが必要

for step in range(0, 200):
frames.append(env.render(mode='rgb_array')) # framesに各時刻の画像を追加していく
action = np.random.choice(2) # 0(カートを左に押す), 1(カートを右に押す)をランダムに返す
observation, reward, done, info = env.step(action) # actionを実行する

変数framesに格納された画像を「3_2movie_cartpole.mp4」に保存します。

1
2
# 動画を保存
display_frames_as_gif(frames)

保存した動画は下記のようになります。

ランダムなので棒はすぐに倒れてしまいます。次回は棒が倒れないように強化学習していきます。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

迷路をQ学習で解く

価値反復法の1つQ学習で迷路を攻略します。
Q学習では1エピソードごとではなく1ステップごとに更新を行います。

まず使用するパッケージをインポートします。

1
2
3
4
# 使用するパッケージの宣言
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

次に迷路の初期状態を描画します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 初期位置での迷路の様子

# 図を描く大きさと、図の変数名を宣言
fig = plt.figure(figsize=(5, 5))
ax = plt.gca()

# 赤い壁を描く
plt.plot([1, 1], [0, 1], color='red', linewidth=2)
plt.plot([1, 2], [2, 2], color='red', linewidth=2)
plt.plot([2, 2], [2, 1], color='red', linewidth=2)
plt.plot([2, 3], [1, 1], color='red', linewidth=2)

# 状態を示す文字S0~S8を描く
plt.text(0.5, 2.5, 'S0', size=14, ha='center')
plt.text(1.5, 2.5, 'S1', size=14, ha='center')
plt.text(2.5, 2.5, 'S2', size=14, ha='center')
plt.text(0.5, 1.5, 'S3', size=14, ha='center')
plt.text(1.5, 1.5, 'S4', size=14, ha='center')
plt.text(2.5, 1.5, 'S5', size=14, ha='center')
plt.text(0.5, 0.5, 'S6', size=14, ha='center')
plt.text(1.5, 0.5, 'S7', size=14, ha='center')
plt.text(2.5, 0.5, 'S8', size=14, ha='center')
plt.text(0.5, 2.3, 'START', ha='center')
plt.text(2.5, 0.3, 'GOAL', ha='center')

# 描画範囲の設定と目盛りを消す設定
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
plt.tick_params(axis='both', which='both', bottom='off', top='off',
labelbottom='off', right='off', left='off', labelleft='off')

# 現在地S0に緑丸を描画する
line, = ax.plot([0.5], [2.5], marker="o", color='g', markersize=60)

実行結果1

初期の方策を決定するパラメータtheta_0を設定します。

行は状態0~7を表し、列は上、右、下、左へ行動できるかどうかを表します。
状態8はゴールなので方策の定義は不要です。

1
2
3
4
5
6
7
8
9
10
11
# 初期の方策を決定するパラメータtheta_0を設定
# 行は状態0~7、列は移動方向で↑、→、↓、←を表す
theta_0 = np.array([[np.nan, 1, 1, np.nan], # s0
[np.nan, 1, np.nan, 1], # s1
[np.nan, np.nan, 1, 1], # s2
[1, 1, 1, np.nan], # s3
[np.nan, np.nan, 1, 1], # s4
[1, np.nan, np.nan, np.nan], # s5
[1, np.nan, np.nan, np.nan], # s6
[1, 1, np.nan, np.nan], # s7、※s8はゴールなので、方策はなし
])

方策パラメータθ0をランダム方策piに変換する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 方策パラメータtheta_0をランダム方策piに変換する関数の定義
def simple_convert_into_pi_from_theta(theta):
'''単純に割合を計算する'''

[m, n] = theta.shape # thetaの行列サイズを取得
pi = np.zeros((m, n))
for i in range(0, m):
pi[i, :] = theta[i, :] / np.nansum(theta[i, :]) # 割合の計算

pi = np.nan_to_num(pi) # nanを0に変換

return pi

# ランダム行動方策pi_0を求める
pi_0 = simple_convert_into_pi_from_theta(theta_0)

初期の行動価値関数Qを設定します。

行動価値関数は、行が状態sを表し、列が行動aを表します。
最初は正しい行動価値がわからないのでランダムな値を設定します。

1
2
3
4
5
# 初期の行動価値関数Qを設定

[a, b] = theta_0.shape # 行と列の数をa, bに格納
Q = np.random.rand(a, b) * theta_0 * 0.1
# *theta0をすることで要素ごとに掛け算をし、Qの壁方向の値がnanになる

ε-greedy法を実装します。
一定確率εでランダムに行動し、残りの1-εの確率で行動価値Qが最大になる行動をとります。

get_action()が行動を求める関数で、get_s_next()が行動を引数に次の状態を求める関数になります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# ε-greedy法を実装
def get_action(s, Q, epsilon, pi_0):
direction = ["up", "right", "down", "left"]

# 行動を決める
if np.random.rand() < epsilon:
# εの確率でランダムに動く
next_direction = np.random.choice(direction, p=pi_0[s, :])
else:
# Qの最大値の行動を採用する
next_direction = direction[np.nanargmax(Q[s, :])]

# 行動をindexに
if next_direction == "up":
action = 0
elif next_direction == "right":
action = 1
elif next_direction == "down":
action = 2
elif next_direction == "left":
action = 3

return action

def get_s_next(s, a, Q, epsilon, pi_0):
direction = ["up", "right", "down", "left"]
next_direction = direction[a] # 行動aの方向

# 行動から次の状態を決める
if next_direction == "up":
s_next = s - 3 # 上に移動するときは状態の数字が3小さくなる
elif next_direction == "right":
s_next = s + 1 # 右に移動するときは状態の数字が1大きくなる
elif next_direction == "down":
s_next = s + 3 # 下に移動するときは状態の数字が3大きくなる
elif next_direction == "left":
s_next = s - 1 # 左に移動するときは状態の数字が1小さくなる

return s_next

行動価値関数Q(s,a)が正しい値になるように学習して更新する処理を実装します。
gammaは時間割引率で未来の報酬を割り引いています。

SarsaとQ学習の違いはこのコードだけになります。
Sarsaは更新時に次の行動を求めて更新に使用していましたが、Q学習では状態の行動価値関数の値のうち、最も大きいものを更新に使用します。

1
2
3
4
5
6
7
8
9
10
# Q学習による行動価値関数Qの更新
def Q_learning(s, a, r, s_next, Q, eta, gamma):

if s_next == 8: # ゴールした場合
Q[s, a] = Q[s, a] + eta * (r - Q[s, a])

else:
Q[s, a] = Q[s, a] + eta * (r + gamma * np.nanmax(Q[s_next,: ]) - Q[s, a])

return Q

Q学習に従って迷路を解く処理を実装します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Q学習で迷路を解く関数の定義、状態と行動の履歴および更新したQを出力
def goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi):
s = 0 # スタート地点
a = a_next = get_action(s, Q, epsilon, pi) # 初期の行動
s_a_history = [[0, np.nan]] # エージェントの移動を記録するリスト

while (1): # ゴールするまでループ
a = a_next # 行動更新

s_a_history[-1][1] = a
# 現在の状態(つまり一番最後なのでindex=-1)に行動を代入

s_next = get_s_next(s, a, Q, epsilon, pi)
# 次の状態を格納

s_a_history.append([s_next, np.nan])
# 次の状態を代入。行動はまだ分からないのでnanにしておく

# 報酬を与え, 次の行動を求めます
if s_next == 8:
r = 1 # ゴールにたどり着いたなら報酬を与える
a_next = np.nan
else:
r = 0
a_next = get_action(s_next, Q, epsilon, pi)
# 次の行動a_nextを求めます。

# 価値関数を更新
Q = Q_learning(s, a, r, s_next, Q, eta, gamma)

# 終了判定
if s_next == 8: # ゴール地点なら終了
break
else:
s = s_next

return [s_a_history, Q]

価値関数の更新を繰り返す処理を実装します。
今回の学習終了条件は、100エピソードを行うこととしました。

Q学習で迷路を解く部分は、各エピソードでの状態価値関数の値を変数Vに格納しています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Q学習で迷路を解く
eta = 0.1 # 学習率
gamma = 0.9 # 時間割引率
epsilon = 0.5 # ε-greedy法の初期値
v = np.nanmax(Q, axis=1) # 状態ごとに価値の最大値を求める
is_continue = True
episode = 1

V = [] # エピソードごとの状態価値を格納する
V.append(np.nanmax(Q, axis=1)) # 状態ごとに行動価値の最大値を求める

while is_continue: # is_continueがFalseになるまで繰り返す
print("エピソード:" + str(episode))
# ε-greedyの値を少しずつ小さくする
epsilon = epsilon / 2

# Q学習で迷路を解き、移動した履歴と更新したQを求める
[s_a_history, Q] = goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi_0)

# 状態価値の変化
new_v = np.nanmax(Q, axis=1) # 状態ごとに行動価値の最大値を求める
print(np.sum(np.abs(new_v - v))) # 状態価値関数の変化を出力
v = new_v
V.append(v) # このエピソード終了時の状態価値関数を追加

print("迷路を解くのにかかったステップ数は" + str(len(s_a_history) - 1) + "です")

# 100エピソード繰り返す
episode = episode + 1
if episode > 100:
break

実行結果 エピソード:1~8

エピソード6以降はずっと最小の4ステップで安定しました。

実行結果 エピソード:92~100

エピソードごとに状態価値関数がどのように変化したのかを可視化します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 状態価値の変化を可視化します
from matplotlib import animation
from IPython.display import HTML
import matplotlib.cm as cm # color map

def init():
# 背景画像の初期化
line.set_data([], [])
return (line,)

def animate(i):
# フレームごとの描画内容
# 各マスに状態価値の大きさに基づく色付きの四角を描画
line, = ax.plot([0.5], [2.5], marker="s",
color=cm.jet(V[i][0]), markersize=85) # S0
line, = ax.plot([1.5], [2.5], marker="s",
color=cm.jet(V[i][1]), markersize=85) # S1
line, = ax.plot([2.5], [2.5], marker="s",
color=cm.jet(V[i][2]), markersize=85) # S2
line, = ax.plot([0.5], [1.5], marker="s",
color=cm.jet(V[i][3]), markersize=85) # S3
line, = ax.plot([1.5], [1.5], marker="s",
color=cm.jet(V[i][4]), markersize=85) # S4
line, = ax.plot([2.5], [1.5], marker="s",
color=cm.jet(V[i][5]), markersize=85) # S5
line, = ax.plot([0.5], [0.5], marker="s",
color=cm.jet(V[i][6]), markersize=85) # S6
line, = ax.plot([1.5], [0.5], marker="s",
color=cm.jet(V[i][7]), markersize=85) # S7
line, = ax.plot([2.5], [0.5], marker="s",
color=cm.jet(1.0), markersize=85) # S8
return (line,)

# 初期化関数とフレームごとの描画関数を用いて動画を作成
anim = animation.FuncAnimation(
fig, animate, init_func=init, frames=len(V), interval=200, repeat=False)

HTML(anim.to_jshtml())

最初は青だったマス目が次第に赤く変化していく様子が分かります。
完全に赤色にならないのは時間割引率で状態価値が割り引かれているためです。

ポイントは報酬が得られるゴールから逆向きに状態価値が学習されていくことと、学習後にはスタートからゴールへの道筋ができているということです。

(Google Colaboratoryで動作確認しています。)

参考

つくりながら学ぶ!深層強化学習 サポートページ

迷路をSarsaで解く

価値反復法の1つSarsaで迷路を攻略します。
Sarsaでは1エピソードごとではなく1ステップごとに更新を行います。

まず使用するパッケージをインポートします。

1
2
3
4
# 使用するパッケージの宣言
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

次に迷路の初期状態を描画します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 初期位置での迷路の様子

# 図を描く大きさと、図の変数名を宣言
fig = plt.figure(figsize=(5, 5))
ax = plt.gca()

# 赤い壁を描く
plt.plot([1, 1], [0, 1], color='red', linewidth=2)
plt.plot([1, 2], [2, 2], color='red', linewidth=2)
plt.plot([2, 2], [2, 1], color='red', linewidth=2)
plt.plot([2, 3], [1, 1], color='red', linewidth=2)

# 状態を示す文字S0~S8を描く
plt.text(0.5, 2.5, 'S0', size=14, ha='center')
plt.text(1.5, 2.5, 'S1', size=14, ha='center')
plt.text(2.5, 2.5, 'S2', size=14, ha='center')
plt.text(0.5, 1.5, 'S3', size=14, ha='center')
plt.text(1.5, 1.5, 'S4', size=14, ha='center')
plt.text(2.5, 1.5, 'S5', size=14, ha='center')
plt.text(0.5, 0.5, 'S6', size=14, ha='center')
plt.text(1.5, 0.5, 'S7', size=14, ha='center')
plt.text(2.5, 0.5, 'S8', size=14, ha='center')
plt.text(0.5, 2.3, 'START', ha='center')
plt.text(2.5, 0.3, 'GOAL', ha='center')

# 描画範囲の設定と目盛りを消す設定
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
plt.tick_params(axis='both', which='both', bottom='off', top='off',
labelbottom='off', right='off', left='off', labelleft='off')

# 現在地S0に緑丸を描画する
line, = ax.plot([0.5], [2.5], marker="o", color='g', markersize=60)

実行結果1

初期の方策を決定するパラメータtheta_0を設定します。

行は状態0~7を表し、列は上、右、下、左へ行動できるかどうかを表します。
状態8はゴールなので方策の定義は不要です。

1
2
3
4
5
6
7
8
9
10
11
# 初期の方策を決定するパラメータtheta_0を設定
# 行は状態0~7、列は移動方向で↑、→、↓、←を表す
theta_0 = np.array([[np.nan, 1, 1, np.nan], # s0
[np.nan, 1, np.nan, 1], # s1
[np.nan, np.nan, 1, 1], # s2
[1, 1, 1, np.nan], # s3
[np.nan, np.nan, 1, 1], # s4
[1, np.nan, np.nan, np.nan], # s5
[1, np.nan, np.nan, np.nan], # s6
[1, 1, np.nan, np.nan], # s7、※s8はゴールなので、方策はなし
])

方策パラメータθ0をランダム方策piに変換する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 方策パラメータtheta_0をランダム方策piに変換する関数の定義
def simple_convert_into_pi_from_theta(theta):
'''単純に割合を計算する'''
[m, n] = theta.shape # thetaの行列サイズを取得
pi = np.zeros((m, n))
for i in range(0, m):
pi[i, :] = theta[i, :] / np.nansum(theta[i, :]) # 割合の計算

pi = np.nan_to_num(pi) # nanを0に変換

return pi

# ランダム行動方策pi_0を求める
pi_0 = simple_convert_into_pi_from_theta(theta_0)

初期の行動価値関数Qを設定します。

行動価値関数は、行が状態sを表し、列が行動aを表します。
最初は正しい行動価値がわからないのでランダムな値を設定します。

1
2
3
4
# 初期の行動価値関数Qを設定
[a, b] = theta_0.shape # 行と列の数をa, bに格納
Q = np.random.rand(a, b) * theta_0
# * theta0をすることで要素ごとに掛け算をし、Qの壁方向の値がnanになる

ε-greedy法を実装します。
一定確率εでランダムに行動し、残りの1-εの確率で行動価値Qが最大になる行動をとります。

get_action()が行動を求める関数で、get_s_next()が行動を引数に次の状態を求める関数になります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# ε-greedy法を実装
def get_action(s, Q, epsilon, pi_0):
direction = ["up", "right", "down", "left"]

# 行動を決める
if np.random.rand() < epsilon:
# εの確率でランダムに動く
next_direction = np.random.choice(direction, p=pi_0[s, :])
else:
# Qの最大値の行動を採用する
next_direction = direction[np.nanargmax(Q[s, :])]

# 行動をindexに
if next_direction == "up":
action = 0
elif next_direction == "right":
action = 1
elif next_direction == "down":
action = 2
elif next_direction == "left":
action = 3

return action

def get_s_next(s, a, Q, epsilon, pi_0):
direction = ["up", "right", "down", "left"]
next_direction = direction[a] # 行動aの方向

# 行動から次の状態を決める
if next_direction == "up":
s_next = s - 3 # 上に移動するときは状態の数字が3小さくなる
elif next_direction == "right":
s_next = s + 1 # 右に移動するときは状態の数字が1大きくなる
elif next_direction == "down":
s_next = s + 3 # 下に移動するときは状態の数字が3大きくなる
elif next_direction == "left":
s_next = s - 1 # 左に移動するときは状態の数字が1小さくなる

return s_next

行動価値関数Q(s,a)が正しい値になるように学習して更新する処理を実装します。
gammaは時間割引率で未来の報酬を割り引いています。

1
2
3
4
5
6
7
8
# Sarsaによる行動価値関数Qの更新
def Sarsa(s, a, r, s_next, a_next, Q, eta, gamma):
if s_next == 8: # ゴールした場合
Q[s, a] = Q[s, a] + eta * (r - Q[s, a])
else:
Q[s, a] = Q[s, a] + eta * (r + gamma * Q[s_next, a_next] - Q[s, a])

return Q

Sarsaに従って迷路を解く処理を実装します。
1エピソードごとではなく1ステップごとに更新を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Sarsaで迷路を解く関数の定義、状態と行動の履歴および更新したQを出力
def goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi):
s = 0 # スタート地点
a = a_next = get_action(s, Q, epsilon, pi) # 初期の行動
s_a_history = [[0, np.nan]] # エージェントの移動を記録するリスト

while (1): # ゴールするまでループ
a = a_next # 行動更新

s_a_history[-1][1] = a
# 現在の状態(つまり一番最後なのでindex=-1)に行動を代入

s_next = get_s_next(s, a, Q, epsilon, pi)
# 次の状態を格納

s_a_history.append([s_next, np.nan])
# 次の状態を代入。行動はまだ分からないのでnanにしておく

# 報酬を与え, 次の行動を求めます
if s_next == 8:
r = 1 # ゴールにたどり着いたなら報酬を与える
a_next = np.nan
else:
r = 0
a_next = get_action(s_next, Q, epsilon, pi)
# 次の行動a_nextを求めます。

# 価値関数を更新
Q = Sarsa(s, a, r, s_next, a_next, Q, eta, gamma)

# 終了判定
if s_next == 8: # ゴール地点なら終了
break
else:
s = s_next

return [s_a_history, Q]

価値関数の更新を繰り返す処理を実装します。
今回の学習終了条件は、100エピソードを行うこととしました。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Sarsaで迷路を解く
eta = 0.1 # 学習率
gamma = 0.9 # 時間割引率
epsilon = 0.5 # ε-greedy法の初期値
v = np.nanmax(Q, axis=1) # 状態ごとに価値の最大値を求める
is_continue = True
episode = 1

while is_continue: # is_continueがFalseになるまで繰り返す
print("エピソード:" + str(episode))

# ε-greedyの値を少しずつ小さくする
epsilon = epsilon / 2

# Sarsaで迷路を解き、移動した履歴と更新したQを求める
[s_a_history, Q] = goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi_0)

# 状態価値の変化
new_v = np.nanmax(Q, axis=1) # 状態ごとに価値の最大値を求める
print(np.sum(np.abs(new_v - v))) # 状態価値の変化を出力
v = new_v

print("迷路を解くのにかかったステップ数は" + str(len(s_a_history) - 1) + "です")

# 100エピソード繰り返す
episode = episode + 1
if episode > 100:
break

実行結果 エピソード:1~8
エピソード2ですでに最小ステップ4となっています。エピソード3では6ステップと増えましたがその後は、ずっと4ステップのままです。

実行結果 エピソード:92~100

エピソード100を実行するまでもなくずっと最小の4ステップで安定しました。
今回の迷路と解く問題では方策反復法よりも価値反復法の方が早く学習できるという結果になりました。

(Google Colaboratoryで動作確認しています。)

参考

つくりながら学ぶ!深層強化学習 サポートページ

迷路を方策勾配法で解く

前回はランダムに行動する方策を実装しましたが、今回は方策勾配法に従ってエージェントを動かしてみます。

まず使用するパッケージをインポートします。

1
2
3
4
# 使用するパッケージの宣言
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

次に迷路の初期状態を描画します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 初期位置での迷路の様子
# 図を描く大きさと、図の変数名を宣言
fig = plt.figure(figsize=(5, 5))
ax = plt.gca()

# 赤い壁を描く
plt.plot([1, 1], [0, 1], color='red', linewidth=2)
plt.plot([1, 2], [2, 2], color='red', linewidth=2)
plt.plot([2, 2], [2, 1], color='red', linewidth=2)
plt.plot([2, 3], [1, 1], color='red', linewidth=2)

# 状態を示す文字S0~S8を描く
plt.text(0.5, 2.5, 'S0', size=14, ha='center')
plt.text(1.5, 2.5, 'S1', size=14, ha='center')
plt.text(2.5, 2.5, 'S2', size=14, ha='center')
plt.text(0.5, 1.5, 'S3', size=14, ha='center')
plt.text(1.5, 1.5, 'S4', size=14, ha='center')
plt.text(2.5, 1.5, 'S5', size=14, ha='center')
plt.text(0.5, 0.5, 'S6', size=14, ha='center')
plt.text(1.5, 0.5, 'S7', size=14, ha='center')
plt.text(2.5, 0.5, 'S8', size=14, ha='center')
plt.text(0.5, 2.3, 'START', ha='center')
plt.text(2.5, 0.3, 'GOAL', ha='center')

# 描画範囲の設定と目盛りを消す設定
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
plt.tick_params(axis='both', which='both', bottom='off', top='off',
labelbottom='off', right='off', left='off', labelleft='off')

# 現在地S0に緑丸を描画する
line, = ax.plot([0.5], [2.5], marker="o", color='g', markersize=60)

実行結果1

初期の方策を決定するパラメータtheta_0を設定します。

行は状態0~7を表し、列は上、右、下、左へ行動できるかどうかを表します。
状態8はゴールなので方策の定義は不要です。

1
2
3
4
5
6
7
8
9
10
11
# 初期の方策を決定するパラメータtheta_0を設定
# 行は状態0~7、列は移動方向で↑、→、↓、←を表す
theta_0 = np.array([[np.nan, 1, 1, np.nan], # s0
[np.nan, 1, np.nan, 1], # s1
[np.nan, np.nan, 1, 1], # s2
[1, 1, 1, np.nan], # s3
[np.nan, np.nan, 1, 1], # s4
[1, np.nan, np.nan, np.nan], # s5
[1, np.nan, np.nan, np.nan], # s6
[1, 1, np.nan, np.nan], # s7、※s8はゴールなので、方策はなし
])

方策パラメータ(theta)をsoftmax関数で行動方策(pi)に変換する関数を定義します。
betaは逆温度と呼ばれ、小さいほど行動がランダムになります。
さらに指数関数(np.exp)を使って割合を計算しています。

softmax関数を使用するとパラメータθが負の値になっても方策を算出できるというメリットがあります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 方策パラメータthetaを行動方策piにソフトマックス関数で変換する手法の定義
def softmax_convert_into_pi_from_theta(theta):
'''ソフトマックス関数で割合を計算する'''
beta = 1.0
[m, n] = theta.shape # thetaの行列サイズを取得
pi = np.zeros((m, n))

exp_theta = np.exp(beta * theta) # thetaをexp(theta)へと変換

for i in range(0, m):
# pi[i, :] = theta[i, :] / np.nansum(theta[i, :])
# simpleに割合の計算の場合
pi[i, :] = exp_theta[i, :] / np.nansum(exp_theta[i, :])
# softmaxで計算の場合

pi = np.nan_to_num(pi) # nanを0に変換

return pi

上記で定義したsoftmax_convert_into_pi_from_theta関数を使ってθ0から方策を算出します。

1
2
3
# 初期の方策pi_0を求める
pi_0 = softmax_convert_into_pi_from_theta(theta_0)
print(pi_0)

実行結果2
学習前のため前回(ランダム行動)の方策と同じ結果になりますが問題ありません。

続いてsoftmax関数による方策に従ってエージェントを行動させる関数を実装します。
1ステップ移動後のエージェントの状態とその時の行動を返します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 行動aと1step移動後の状態sを求める関数を定義
def get_action_and_next_s(pi, s):
direction = ["up", "right", "down", "left"]
# pi[s,:]の確率に従って、directionが選択される
next_direction = np.random.choice(direction, p=pi[s, :])

if next_direction == "up":
action = 0
s_next = s - 3 # 上に移動するときは状態の数字が3小さくなる
elif next_direction == "right":
action = 1
s_next = s + 1 # 右に移動するときは状態の数字が1大きくなる
elif next_direction == "down":
action = 2
s_next = s + 3 # 下に移動するときは状態の数字が3大きくなる
elif next_direction == "left":
action = 3
s_next = s - 1 # 左に移動するときは状態の数字が1小さくなる

return [action, s_next]

ゴールにたどり着くまでエージェントを移動させ続ける関数を定義します。
状態と行動の履歴をセットで返します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 迷路を解く関数の定義、状態と行動の履歴を出力
def goal_maze_ret_s_a(pi):
s = 0 # スタート地点
s_a_history = [[0, np.nan]] # エージェントの移動を記録するリスト

while (1): # ゴールするまでループ
[action, next_s] = get_action_and_next_s(pi, s)
s_a_history[-1][1] = action
# 現在の状態(つまり一番最後なのでindex=-1)の行動を代入

s_a_history.append([next_s, np.nan])
# 次の状態を代入。行動はまだ分からないのでnanにしておく

if next_s == 8: # ゴール地点なら終了
break
else:
s = next_s

return s_a_history

初期の方策で迷路を解いてみます。

1
2
3
4
# 初期の方策で迷路を解く
s_a_history = goal_maze_ret_s_a(pi_0)
print(s_a_history)
print("迷路を解くのにかかったステップ数は" + str(len(s_a_history) - 1) + "です")

実行結果3(途中略)
スタートからゴールまでの状態と行動のセットが表示されます。
最後にトータルで何ステップかかったかを表示しています。

方策勾配法に従い方策を更新する関数を定義します。

学習率etaは1回の学習で更新される大きさを表します。
学習率が小さすぎるとなかなか学習が進みませんし、大きすぎるときちんと学習することができません。

方策の更新のために下記の3つを入力しています。

  • 現在の方策 theta
  • 方策 pi
  • 現在の方策での実行結果 s_a_history
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# thetaの更新関数を定義します
def update_theta(theta, pi, s_a_history):
eta = 0.1 # 学習率
T = len(s_a_history) - 1 # ゴールまでの総ステップ数

[m, n] = theta.shape # thetaの行列サイズを取得
delta_theta = theta.copy() # Δthetaの元を作成、ポインタ参照なので、delta_theta = thetaはダメ

# delta_thetaを要素ごとに求めます
for i in range(0, m):
for j in range(0, n):
if not(np.isnan(theta[i, j])): # thetaがnanでない場合

SA_i = [SA for SA in s_a_history if SA[0] == i]
# 履歴から状態iのものを取り出すリスト内包表記です

SA_ij = [SA for SA in s_a_history if SA == [i, j]]
# 状態iで行動jをしたものを取り出す

N_i = len(SA_i) # 状態iで行動した総回数
N_ij = len(SA_ij) # 状態iで行動jをとった回数

# 初版では符号の正負に間違いがありました(修正日:180703)
#delta_theta[i, j] = (N_ij + pi[i, j] * N_i) / T
delta_theta[i, j] = (N_ij - pi[i, j] * N_i) / T

new_theta = theta + eta * delta_theta

return new_theta

方策を更新し、方策がどう変化するのかを確認します。

1
2
3
4
# 方策の更新
new_theta = update_theta(theta_0, pi_0, s_a_history)
pi = softmax_convert_into_pi_from_theta(new_theta)
print(pi)

実行結果4
最初の方策から少し変化していることが分かります。

迷路をノーミスでクリアできるまで迷路内の探索とパラメータθの更新を繰り返す処理を実装します。

学習終了条件は課題に応じて調整する必要がありますが、今回は方策変化の絶対値和が10**-4より小さくなったら終了とします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 方策勾配法で迷路を解く
stop_epsilon = 10**-4 # 10^-4よりも方策に変化が少なくなったら学習終了とする

theta = theta_0
pi = pi_0

is_continue = True
count = 1
while is_continue: # is_continueがFalseになるまで繰り返す
s_a_history = goal_maze_ret_s_a(pi) # 方策πで迷路内を探索した履歴を求める
new_theta = update_theta(theta, pi, s_a_history) # パラメータΘを更新
new_pi = softmax_convert_into_pi_from_theta(new_theta) # 方策πの更新

print(np.sum(np.abs(new_pi - pi))) # 方策の変化を出力
print("迷路を解くのにかかったステップ数は" + str(len(s_a_history) - 1) + "です")

if np.sum(np.abs(new_pi - pi)) < stop_epsilon:
is_continue = False
else:
theta = new_theta
pi = new_pi

実行結果5(途中略)
最終的にゴールまで最小ステップ数の4となっていることが分かります。

最終的な方策を確認してみます。

1
2
3
# 最終的な方策を確認
np.set_printoptions(precision=3, suppress=True) # 有効桁数3、指数表示しないという設定
print(pi)

実行結果6

最後に動画でエージェントの移動を可視化します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# エージェントの移動の様子を可視化します
# 参考URL http://louistiao.me/posts/notebooks/embedding-matplotlib-animations-in-jupyter-notebooks/
from matplotlib import animation
from IPython.display import HTML

def init():
# 背景画像の初期化
line.set_data([], [])
return (line,)

def animate(i):
# フレームごとの描画内容
state = s_a_history[i][0] # 現在の場所を描く
x = (state % 3) + 0.5 # 状態のx座標は、3で割った余り+0.5
y = 2.5 - int(state / 3) # y座標は3で割った商を2.5から引く
line.set_data(x, y)
return (line,)

# 初期化関数とフレームごとの描画関数を用いて動画を作成
anim = animation.FuncAnimation(fig, animate, init_func=init, frames=len(
s_a_history), interval=500, repeat=False)

HTML(anim.to_jshtml())

スタート地点からまっすぐゴールにたどり着いていることが分かります。

(Google Colaboratoryで動作確認しています。)

参考

つくりながら学ぶ!深層強化学習 サポートページ