強化学習の弱点

強化学習にはいくつか弱点があります。

弱点① サンプル効率が悪い。

DNNでの強化学習は、入力が画像であればいろいろな課題に対応できるという大きなメリットがあります。どのようなゲームであっても行動の数を調整するだけで同じネットワークで解くことができます。

ただし学習には大量のサンプルを用意する必要があり時間がかかります。

対策の1つとして何度もプレイ可能なシミュレーターを用意するという手がありますが、シミュレータの用意も相応の時間がかかるため深層強化学習の適用は難しいのが現状です。

OpenAI Gymのようなすでに用意されているシミュレータを使うのが近道となります。

弱点② 局所的な行動に陥る・過学習することが多い

大量のサンプルを学習したとしても、最適な行動を獲得できるとは限りません。

失敗する行動パターンは次の2種類です。

  • 局所最適な行動
    報酬は獲得できているものの最適とは言えない行動です。
    そこそこいい成績であればそれ以上頑張らないようなイメージです。
  • 過学習
    ある環境に特化した行動を獲得してしまうことです。
    試験で問題を理解するのではなく答えを覚えてしまうようなイメージです。

弱点③ 再現性が低い。

同じアルゴリズムを同じパラメータで学習したとしても獲得報酬が異なることが多々あります。
これはフレームワークのデフォルト値が影響するためです。

☆弱点を克服するための対策

対応策は次の3点が挙げられます。

  • テスト可能なモジュールに切り分ける。
    各モジュールごとにテストを行うことで全体テストの前に処理をブラッシュアップします。
    あるいはエージェントを複数用意して、エージェントを切り替えて効率的な学習を目指します。
  • 可能な限りログをとる。
    一度の実験からなるべく多くの情報を得るためです。
  • 学習を自動化する。
    学習の実行をスクリプト化し、設定パラメータや実行結果の明確化を目指します。

できるだけ事前に動作確認を行ったうえで実験を行い、可能な限り取得した動作ログからベストの対策を検討・実装したうえで何度もテストを行い、このプロセスを繰り返すことで強化学習の弱点を克服していきます。

強化学習の概要

強化学習では、「エージェント」がある「環境」の中で「行動」し、その行動から得られる「報酬」が最大化するような「推論モデル」を作成します。
推論モデルがあれば学習した状態で「環境」の中を「行動」することができます。

強化学習のサイクルを簡単にまとめると下記のようになります。

  1. エージェントが環境に対して行動を起こします。
  2. 環境が状態の更新と行動の評価を行います。
  3. 状態と報酬をエージェントに知らせます。

強化学習のポイントとなる用語を下記にまとめます。

用語 説明
エージェント - Agent 環境のなかでいろいろと行動し学習を行います。さまざまな試行を行い状態ごとに行動を最適化していきます。
環境 - Environment 行動に対して、状態の更新と行動の評価を行います。
行動 - Action エージェントがいろいろな状態で起こすことができる行動です。
状態 - State 環境の状態です。行動によって変化します。
報酬 - Reward 行動すると得られる報酬です。いい結果のときは正の報酬が得られ、悪い結果のときには負の報酬となります。

深層強化学習 ブロック崩し(breakout)をA2Cで攻略 -プレイ編-

前回学習したデータを使ってブロック崩し(breakout)をプレイします。

まず必要なパッケージをインポートします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# パッケージのimport
import numpy as np
from collections import deque
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import gym
from gym import spaces
from gym.spaces.box import Box

import matplotlib.pyplot as plt
%matplotlib inline

動画ファイルを保存する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0*1, frames[0].shape[0]/72.0*1),
dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=20)

anim.save('7breakout_play.mp4') # 動画のファイル名と保存です

実行環境を設定します。
おさらいとなりますがブロック崩し(breakout)の学習には4つの工夫をします。

  1. No-Operation
    実行環境をリセットするときに0~30ステップのいずれかの間何もしない行動を実施します。
    => ゲーム開始の初期状態を様々にし、特定の開始情報に特化しないようにするためです。
  2. Episodic Life
    5機ライフがありますが、1回失敗したときにゲーム終了とします。
    ただし崩したブロックはそのままの状態で次の試行を開始するようにします。
    => 多様な状態に対して学習ができるようにするためです。
  3. Max and Skip
    4フレームごとに行動を判断させ、4フレーム連続で同じ行動をするようにします。
    => 60Hzでゲームが進行すると早すぎるためエージェントの行動を15Hzにするためです。
  4. Warp frame
    縦210ピクセル、横160ピクセルのRGB値を縦横84ピクセルずつのグレースケール画像へと変換します。
    => 学習しやすくするためです。

また上記の4工夫とPyTorch環境に合わせるためのクラスWrapPyTorchを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# 実行環境の設定
import cv2
cv2.ocl.setUseOpenCL(False)

class NoopResetEnv(gym.Wrapper):
def __init__(self, env, noop_max=30):
'''工夫1のNo-Operationです。リセット後適当なステップの間何もしないようにし、
ゲーム開始の初期状態を様々にすることfで、特定の開始状態のみで学習するのを防ぐ'''
gym.Wrapper.__init__(self, env)
self.noop_max = noop_max
self.override_num_noops = None
self.noop_action = 0
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'

def reset(self, **kwargs):
""" Do no-op action for a number of steps in [1, noop_max]."""
self.env.reset(**kwargs)
if self.override_num_noops is not None:
noops = self.override_num_noops
else:
noops = self.unwrapped.np_random.randint(1, self.noop_max + 1) # pylint: disable=E1101
assert noops > 0
obs = None
for _ in range(noops):
obs, _, done, _ = self.env.step(self.noop_action)
if done:
obs = self.env.reset(**kwargs)
return obs

def step(self, ac):
return self.env.step(ac)

class EpisodicLifeEnv(gym.Wrapper):
def __init__(self, env):
'''工夫2のEpisodic Lifeです。1機失敗したときにリセットし、失敗時の状態から次を始める'''
gym.Wrapper.__init__(self, env)
self.lives = 0
self.was_real_done = True

def step(self, action):
obs, reward, done, info = self.env.step(action)
self.was_real_done = done
# check current lives, make loss of life terminal,
# then update lives to handle bonus lives
lives = self.env.unwrapped.ale.lives()
if lives < self.lives and lives > 0:
# for Qbert sometimes we stay in lives == 0 condtion for a few frames
# so its important to keep lives > 0, so that we only reset once
# the environment advertises done.
done = True
self.lives = lives
return obs, reward, done, info

def reset(self, **kwargs):
'''5機とも失敗したら、本当にリセット'''
if self.was_real_done:
obs = self.env.reset(**kwargs)
else:
# no-op step to advance from terminal/lost life state
obs, _, _, _ = self.env.step(0)
self.lives = self.env.unwrapped.ale.lives()
return obs

class MaxAndSkipEnv(gym.Wrapper):
def __init__(self, env, skip=4):
'''工夫3のMax and Skipです。4フレーム連続で同じ行動を実施し、最後の3、4フレームの最大値をとった画像をobsにする'''
gym.Wrapper.__init__(self, env)
# most recent raw observations (for max pooling across time steps)
self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype=np.uint8)
self._skip = skip

def step(self, action):
"""Repeat action, sum reward, and max over last observations."""
total_reward = 0.0
done = None
for i in range(self._skip):
obs, reward, done, info = self.env.step(action)
if i == self._skip - 2:
self._obs_buffer[0] = obs
if i == self._skip - 1:
self._obs_buffer[1] = obs
total_reward += reward
if done:
break
# Note that the observation on the done=True frame
# doesn't matter
max_frame = self._obs_buffer.max(axis=0)
return max_frame, total_reward, done, info

def reset(self, **kwargs):
return self.env.reset(**kwargs)

class WarpFrame(gym.ObservationWrapper):
def __init__(self, env):
'''工夫4のWarp frameです。画像サイズをNatureのDQN論文と同じ84x84の白黒にします'''
gym.ObservationWrapper.__init__(self, env)
self.width = 84
self.height = 84
self.observation_space = spaces.Box(low=0, high=255,
shape=(self.height, self.width, 1), dtype=np.uint8)

def observation(self, frame):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(frame, (self.width, self.height),
interpolation=cv2.INTER_AREA)
return frame[:, :, None]

class WrapPyTorch(gym.ObservationWrapper):
def __init__(self, env=None):
'''PyTorchのミニバッチのインデックス順に変更するラッパー'''
super(WrapPyTorch, self).__init__(env)
obs_shape = self.observation_space.shape
self.observation_space = Box(
self.observation_space.low[0, 0, 0],
self.observation_space.high[0, 0, 0],
[obs_shape[2], obs_shape[1], obs_shape[0]],
dtype=self.observation_space.dtype)

def observation(self, observation):
return observation.transpose(2, 0, 1)

再生用の実行環境を実装します。

  • EpisodicLifeEnvPlay 関数で1度でも失敗したらブロックの状態を最初から完全になり直します。
  • MaxAndSkipEnvPlay クラスで4フレーム目だけを画像として出力します。
  • make_env_play 関数で実行環境を生成します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 再生用の実行環境
class EpisodicLifeEnvPlay(gym.Wrapper):
def __init__(self, env):
'''工夫2のEpisodic Lifeです。1機失敗したときにリセットし、失敗時の状態から次を始める。
今回は再生用に、1機失敗したときのリセット時もブロックの状態をリセットします'''
gym.Wrapper.__init__(self, env)

def step(self, action):
obs, reward, done, info = self.env.step(action)
# ライフ(残機)が始め5あるが、1つでも減ると終了にする
if self.env.unwrapped.ale.lives() < 5:
done = True
return obs, reward, done, info

def reset(self, **kwargs):
'''1回でも失敗したら完全リセット'''
obs = self.env.reset(**kwargs)
return obs

class MaxAndSkipEnvPlay(gym.Wrapper):
def __init__(self, env, skip=4):
'''工夫3のMax and Skipです。4フレーム連続で同じ行動を実施し、最後の4フレームの画像をobsにする'''
gym.Wrapper.__init__(self, env)
# most recent raw observations (for max pooling across time steps)
self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype=np.uint8)
self._skip = skip

def step(self, action):
"""Repeat action, sum reward, and max over last observations."""
total_reward = 0.0
done = None
for i in range(self._skip):
obs, reward, done, info = self.env.step(action)
if i == self._skip - 2:
self._obs_buffer[0] = obs
if i == self._skip - 1:
self._obs_buffer[1] = obs
total_reward += reward
if done:
break

return obs, total_reward, done, info

def reset(self, **kwargs):
return self.env.reset(**kwargs)

# 実行環境生成関数の定義

# 並列実行環境
from baselines.common.vec_env.subproc_vec_env import SubprocVecEnv

def make_env(env_id, seed, rank):
def _thunk():
'''_thunk()がマルチプロセス環境のSubprocVecEnvを実行するのに必要'''

env = gym.make(env_id)
#env = NoopResetEnv(env, noop_max=30)
env = MaxAndSkipEnv(env, skip=4)
env.seed(seed + rank) # 乱数シードの設定
#env = EpisodicLifeEnv(env)
env = EpisodicLifeEnvPlay(env)
env = WarpFrame(env)
env = WrapPyTorch(env)

return env

return _thunk

def make_env_play(env_id, seed, rank):
'''再生用の実行環境'''
env = gym.make(env_id)
env = MaxAndSkipEnvPlay(env, skip=4)
env.seed(seed + rank) # 乱数シードの設定
env = EpisodicLifeEnvPlay(env)
return env

定数を設定します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 定数の設定
ENV_NAME = 'BreakoutNoFrameskip-v4'
# Breakout-v0ではなく、BreakoutNoFrameskip-v4を使用
# v0はフレームが自動的に2-4のランダムにskipされますが、今回はフレームスキップはさせないバージョンを使用
NUM_SKIP_FRAME = 4 # skipするframe数です
NUM_STACK_FRAME = 4 # 状態として連続的に保持するframe数です
NOOP_MAX = 30 # reset時に何もしないフレームを挟む(No-operation)フレーム数の乱数上限です
NUM_PROCESSES = 16 # 並列して同時実行するプロセス数です
NUM_ADVANCED_STEP = 5 # 何ステップ進めて報酬和を計算するのか設定
GAMMA = 0.99 # 時間割引率

TOTAL_FRAMES=10e6 # 学習に使用する総フレーム数
NUM_UPDATES = int(TOTAL_FRAMES / NUM_ADVANCED_STEP / NUM_PROCESSES) # ネットワークの総更新回数
# NUM_UPDATESは125,000となる

A2Cの損失関数の計算をするための定数を設定します。

1
2
3
4
5
6
7
8
9
# A2Cの損失関数の計算のための定数設定
value_loss_coef = 0.5
entropy_coef = 0.01
max_grad_norm = 0.5

# 学習手法RMSpropの設定
lr = 7e-4
eps = 1e-5
alpha = 0.99

GPU使用の設定を行います。(GPUがなくても実行できます。)

1
2
3
4
# GPUの使用の設定
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(device)

メモリオブジェクトの定義を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# メモリオブジェクトの定義
class RolloutStorage(object):
'''Advantage学習するためのメモリクラスです'''

def __init__(self, num_steps, num_processes, obs_shape):

self.observations = torch.zeros(
num_steps + 1, num_processes, *obs_shape).to(device)
# *を使うと()リストの中身を取り出す
# obs_shape→(4,84,84)
# *obs_shape→ 4 84 84

self.masks = torch.ones(num_steps + 1, num_processes, 1).to(device)
self.rewards = torch.zeros(num_steps, num_processes, 1).to(device)
self.actions = torch.zeros(
num_steps, num_processes, 1).long().to(device)

# 割引報酬和を格納
self.returns = torch.zeros(num_steps + 1, num_processes, 1).to(device)
self.index = 0 # insertするインデックス

def insert(self, current_obs, action, reward, mask):
'''次のindexにtransitionを格納する'''
self.observations[self.index + 1].copy_(current_obs)
self.masks[self.index + 1].copy_(mask)
self.rewards[self.index].copy_(reward)
self.actions[self.index].copy_(action)

self.index = (self.index + 1) % NUM_ADVANCED_STEP # インデックスの更新

def after_update(self):
'''Advantageするstep数が完了したら、最新のものをindex0に格納'''
self.observations[0].copy_(self.observations[-1])
self.masks[0].copy_(self.masks[-1])

def compute_returns(self, next_value):
'''Advantageするステップ中の各ステップの割引報酬和を計算する'''

# 注意:5step目から逆向きに計算しています
# 注意:5step目はAdvantage1となる。4ステップ目はAdvantage2となる。・・・
self.returns[-1] = next_value
for ad_step in reversed(range(self.rewards.size(0))):
self.returns[ad_step] = self.returns[ad_step + 1] * \
GAMMA * self.masks[ad_step + 1] + self.rewards[ad_step]

A2Cのディープ・ニューラルネットワークの構築を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# A2Cのディープ・ニューラルネットワークの構築
def init(module, gain):
'''層の結合パラメータを初期化する関数を定義'''
nn.init.orthogonal_(module.weight.data, gain=gain)
nn.init.constant_(module.bias.data, 0)
return module

class Flatten(nn.Module):
'''コンボリューション層の出力画像を1次元に変換する層を定義'''
def forward(self, x):
return x.view(x.size(0), -1)

class Net(nn.Module):
def __init__(self, n_out):
super(Net, self).__init__()

# 結合パラメータの初期化関数
def init_(module): return init(
module, gain=nn.init.calculate_gain('relu'))

# コンボリューション層の定義
self.conv = nn.Sequential(
# 画像サイズの変化84*84→20*20
init_(nn.Conv2d(NUM_STACK_FRAME, 32, kernel_size=8, stride=4)),
# stackするflameは4画像なのでinput=NUM_STACK_FRAME=4である、出力は32とする、
# sizeの計算 size = (Input_size - Kernel_size + 2*Padding_size)/ Stride_size + 1

nn.ReLU(),
# 画像サイズの変化20*20→9*9
init_(nn.Conv2d(32, 64, kernel_size=4, stride=2)),
nn.ReLU(),
init_(nn.Conv2d(64, 64, kernel_size=3, stride=1)), # 画像サイズの変化9*9→7*7
nn.ReLU(),
Flatten(), # 画像形式を1次元に変換
init_(nn.Linear(64 * 7 * 7, 512)), # 64枚の7×7の画像を、512次元のoutputへ
nn.ReLU()
)

# 結合パラメータの初期化関数
def init_(module): return init(module, gain=1.0)

# Criticの定義
self.critic = init_(nn.Linear(512, 1)) # 状態価値なので出力は1つ

# 結合パラメータの初期化関数
def init_(module): return init(module, gain=0.01)

# Actorの定義
self.actor = init_(nn.Linear(512, n_out)) # 行動を決めるので出力は行動の種類数

# ネットワークを訓練モードに設定
self.train()

def forward(self, x):
'''ネットワークのフォワード計算を定義します'''
input = x / 255.0 # 画像のピクセル値0-255を0-1に正規化する
conv_output = self.conv(input) # Convolution層の計算
critic_output = self.critic(conv_output) # 状態価値の計算
actor_output = self.actor(conv_output) # 行動の計算

return critic_output, actor_output

def act(self, x):
'''状態xから行動を確率的に求めます'''
value, actor_output = self(x)
probs = F.softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
action = probs.multinomial(num_samples=1)

return action

def get_value(self, x):
'''状態xから状態価値を求めます'''
value, actor_output = self(x)

return value

def evaluate_actions(self, x, actions):
'''状態xから状態価値、実際の行動actionsのlog確率とエントロピーを求めます'''
value, actor_output = self(x)

log_probs = F.log_softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
action_log_probs = log_probs.gather(1, actions) # 実際の行動のlog_probsを求める

probs = F.softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
dist_entropy = -(log_probs * probs).sum(-1).mean()

return value, action_log_probs, dist_entropy

エージェントが持つ頭脳となるクラスを定義します。
このクラスは全エージェントで共有されます。

前回の「学習編」で作成した学習データ weight_end.pth を使います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# エージェントが持つ頭脳となるクラスを定義、全エージェントで共有する
class Brain(object):
def __init__(self, actor_critic):

self.actor_critic = actor_critic # actor_criticはクラスNetのディープ・ニューラルネットワーク

# 結合パラメータをロードする場合
filename = 'weight_end.pth'
param = torch.load(filename, map_location='cpu')
self.actor_critic.load_state_dict(param)

# パラメータ更新の勾配法の設定
self.optimizer = optim.RMSprop(
actor_critic.parameters(), lr=lr, eps=eps, alpha=alpha)

def update(self, rollouts):
'''advanced計算した5つのstepの全てを使って更新します'''
obs_shape = rollouts.observations.size()[2:] # torch.Size([4, 84, 84])
num_steps = NUM_ADVANCED_STEP
num_processes = NUM_PROCESSES

values, action_log_probs, dist_entropy = self.actor_critic.evaluate_actions(
rollouts.observations[:-1].view(-1, *obs_shape),
rollouts.actions.view(-1, 1))

# 注意:各変数のサイズ
# rollouts.observations[:-1].view(-1, *obs_shape) torch.Size([80, 4, 84, 84])
# rollouts.actions.view(-1, 1) torch.Size([80, 1])
# values torch.Size([80, 1])
# action_log_probs torch.Size([80, 1])
# dist_entropy torch.Size([])

values = values.view(num_steps, num_processes,
1) # torch.Size([5, 16, 1])
action_log_probs = action_log_probs.view(num_steps, num_processes, 1)

advantages = rollouts.returns[:-1] - values # torch.Size([5, 16, 1])
value_loss = advantages.pow(2).mean()

action_gain = (advantages.detach() * action_log_probs).mean()
# detachしてadvantagesを定数として扱う

total_loss = (value_loss * value_loss_coef -
action_gain - dist_entropy * entropy_coef)

self.optimizer.zero_grad() # 勾配をリセット
total_loss.backward() # バックプロパゲーションを計算
nn.utils.clip_grad_norm_(self.actor_critic.parameters(), max_grad_norm)
# 一気に結合パラメータが変化しすぎないように、勾配の大きさは最大0.5までにする

self.optimizer.step() # 結合パラメータを更新

Breakoutを実行する環境のクラスを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Breakoutを実行する環境のクラス
NUM_PROCESSES = 1

class Environment:
def run(self):

# seedの設定
seed_num = 1
torch.manual_seed(seed_num)
if use_cuda:
torch.cuda.manual_seed(seed_num)

# 実行環境を構築
torch.set_num_threads(seed_num)
envs = [make_env(ENV_NAME, seed_num, i) for i in range(NUM_PROCESSES)]
envs = SubprocVecEnv(envs) # マルチプロセスの実行環境にする

# 全エージェントが共有して持つ頭脳Brainを生成
n_out = envs.action_space.n # 行動の種類は4
actor_critic = Net(n_out).to(device) # GPUへ
global_brain = Brain(actor_critic)

# 格納用変数の生成
obs_shape = envs.observation_space.shape # (1, 84, 84)
obs_shape = (obs_shape[0] * NUM_STACK_FRAME,
*obs_shape[1:]) # (4, 84, 84)
# torch.Size([16, 4, 84, 84])
current_obs = torch.zeros(NUM_PROCESSES, *obs_shape).to(device)
rollouts = RolloutStorage(
NUM_ADVANCED_STEP, NUM_PROCESSES, obs_shape) # rolloutsのオブジェクト
episode_rewards = torch.zeros([NUM_PROCESSES, 1]) # 現在の試行の報酬を保持
final_rewards = torch.zeros([NUM_PROCESSES, 1]) # 最後の試行の報酬和を保持

# 初期状態の開始
obs = envs.reset()
obs = torch.from_numpy(obs).float() # torch.Size([16, 1, 84, 84])
current_obs[:, -1:] = obs # flameの4番目に最新のobsを格納

# advanced学習用のオブジェクトrolloutsの状態の1つ目に、現在の状態を保存
rollouts.observations[0].copy_(current_obs)

# 描画用の環境(再生用に追加)
env_play = make_env_play(ENV_NAME, seed_num, 0)
obs_play = env_play.reset()

# 動画にするために画像を格納する変数(再生用に追加)
frames = []
main_end = False

# 実行ループ
for j in tqdm(range(NUM_UPDATES)):

# 報酬が基準を超えたら終わりにする(再生用に追加)
if main_end:
break

# advanced学習するstep数ごとに計算
for step in range(NUM_ADVANCED_STEP):

# 行動を求める
with torch.no_grad():
action = actor_critic.act(rollouts.observations[step])

cpu_actions = action.squeeze(1).cpu().numpy() # tensorをNumPyに

# 1stepの並列実行、なお返り値のobsのsizeは(16, 1, 84, 84)
obs, reward, done, info = envs.step(cpu_actions)

# 報酬をtensorに変換し、試行の総報酬に足す
# sizeが(16,)になっているのを(16, 1)に変換
reward = np.expand_dims(np.stack(reward), 1)
reward = torch.from_numpy(reward).float()
episode_rewards += reward

# 各実行環境それぞれについて、doneならmaskは0に、継続中ならmaskは1にする
masks = torch.FloatTensor(
[[0.0] if done_ else [1.0] for done_ in done])

# 最後の試行の総報酬を更新する
final_rewards *= masks # 継続中の場合は1をかけ算してそのまま、done時には0を掛けてリセット
# 継続中は0を足す、done時にはepisode_rewardsを足す
final_rewards += (1 - masks) * episode_rewards

# 画像を取得する(再生用に追加)
obs_play, reward_play, _, _ = env_play.step(cpu_actions[0])
frames.append(obs_play) # 変換した画像を保存
if done[0]: # 並列環境の1つ目が終了した場合
print(episode_rewards[0][0].numpy()) # 報酬

# 報酬が300を超えたら終わりにする
#if (episode_rewards[0][0].numpy()) > 300:
if (episode_rewards[0][0].numpy()) > 400:
main_end = True
break
else:
obs_view = env_play.reset()
frames = [] # 保存した画像をリセット

# 試行の総報酬を更新する
episode_rewards *= masks # 継続中のmaskは1なのでそのまま、doneの場合は0に

# masksをGPUへ
masks = masks.to(device)

# 現在の状態をdone時には全部0にする
# maskのサイズをtorch.Size([16, 1])→torch.Size([16, 1, 1 ,1])へ変換して、かけ算
current_obs *= masks.unsqueeze(2).unsqueeze(2)

# frameをstackする
# torch.Size([16, 1, 84, 84])
obs = torch.from_numpy(obs).float()
current_obs[:, :-1] = current_obs[:, 1:] # 0~2番目に1~3番目を上書き
current_obs[:, -1:] = obs # 4番目に最新のobsを格納

# メモリオブジェクトに今stepのtransitionを挿入
rollouts.insert(current_obs, action.data, reward, masks)

# advancedのfor loop終了

# advancedした最終stepの状態から予想する状態価値を計算
with torch.no_grad():
next_value = actor_critic.get_value(
rollouts.observations[-1]).detach()

# 全stepの割引報酬和を計算して、rolloutsの変数returnsを更新
rollouts.compute_returns(next_value)

# ネットワークとrolloutの更新
# global_brain.update(rollouts)
rollouts.after_update()

# 実行ループ終わり
display_frames_as_gif(frames) # 動画の保存と再生

実行します。

1
2
3
# 実行
breakout_env = Environment()
frames = breakout_env.run()

実行結果(途中略)

出力される動画ファイル 7breakout_play.mp4 は下記のようになります。

うまく壁の端からブロックを消して裏側に通したくさんのブロックを崩しています。
画面上部の方のブロックを消した方が高得点なので、報酬を-1から1にクリッピングしない方が裏側に通すように学習しやすくなります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

深層強化学習 ブロック崩し(breakout)をA2Cで攻略 -学習編-

ブロック崩し(breakout)を強化学習A2Cで攻略していきます。

まずOpenAI Gymの環境を並列で動かすために必要なパッケージをインストールします。

1
2
3
4
5
pip install tqdm
pip install opencv-python
git clone https://github.com/openai/baselines.git
cd baselines
pip install -e .

使用するパッケージをインポートします。

1
2
3
4
5
6
7
8
9
10
11
12
13
# パッケージのimport
import numpy as np
from collections import deque
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import gym
from gym import spaces
from gym.spaces.box import Box

ブロック崩し(breakout)の学習には4つの工夫をします。

  1. No-Operation
    実行環境をリセットするときに0~30ステップのいずれかの間何もしない行動を実施します。
    => ゲーム開始の初期状態を様々にし、特定の開始情報に特化しないようにするためです。
  2. Episodic Life
    5機ライフがありますが、1回失敗したときにゲーム終了とします。
    ただし崩したブロックはそのままの状態で次の試行を開始するようにします。
    => 多様な状態に対して学習ができるようにするためです。
  3. Max and Skip
    4フレームごとに行動を判断させ、4フレーム連続で同じ行動をするようにします。
    => 60Hzでゲームが進行すると早すぎるためエージェントの行動を15Hzにするためです。
  4. Warp frame
    縦210ピクセル、横160ピクセルのRGB値を縦横84ピクセルずつのグレースケール画像へと変換します。
    => 学習しやすくするためです。

また上記の4工夫とPyTorch環境に合わせるためのクラスWrapPyTorchを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# 実行環境の設定
import cv2
cv2.ocl.setUseOpenCL(False)

class NoopResetEnv(gym.Wrapper):
def __init__(self, env, noop_max=30):
'''工夫1のNo-Operationです。リセット後適当なステップの間何もしないようにし、
ゲーム開始の初期状態を様々にすることfで、特定の開始状態のみで学習するのを防ぐ'''
gym.Wrapper.__init__(self, env)
self.noop_max = noop_max
self.override_num_noops = None
self.noop_action = 0
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'

def reset(self, **kwargs):
""" Do no-op action for a number of steps in [1, noop_max]."""
self.env.reset(**kwargs)
if self.override_num_noops is not None:
noops = self.override_num_noops
else:
noops = self.unwrapped.np_random.randint(
1, self.noop_max + 1) # pylint: disable=E1101
assert noops > 0
obs = None
for _ in range(noops):
obs, _, done, _ = self.env.step(self.noop_action)
if done:
obs = self.env.reset(**kwargs)
return obs

def step(self, ac):
return self.env.step(ac)

class EpisodicLifeEnv(gym.Wrapper):
def __init__(self, env):
'''工夫2のEpisodic Lifeです。1機失敗したときにリセットし、失敗時の状態から次を始める'''
gym.Wrapper.__init__(self, env)
self.lives = 0
self.was_real_done = True

def step(self, action):
obs, reward, done, info = self.env.step(action)
self.was_real_done = done
# check current lives, make loss of life terminal,
# then update lives to handle bonus lives
lives = self.env.unwrapped.ale.lives()
if lives < self.lives and lives > 0:
# for Qbert sometimes we stay in lives == 0 condtion for a few frames
# so its important to keep lives > 0, so that we only reset once
# the environment advertises done.
done = True
self.lives = lives
return obs, reward, done, info

def reset(self, **kwargs):
'''5機とも失敗したら、本当にリセット'''
if self.was_real_done:
obs = self.env.reset(**kwargs)
else:
# no-op step to advance from terminal/lost life state
obs, _, _, _ = self.env.step(0)
self.lives = self.env.unwrapped.ale.lives()
return obs

class MaxAndSkipEnv(gym.Wrapper):
def __init__(self, env, skip=4):
'''工夫3のMax and Skipです。4フレーム連続で同じ行動を実施し、最後の3、4フレームの最大値をとった画像をobsにする'''
gym.Wrapper.__init__(self, env)
# most recent raw observations (for max pooling across time steps)
self._obs_buffer = np.zeros(
(2,)+env.observation_space.shape, dtype=np.uint8)
self._skip = skip

def step(self, action):
"""Repeat action, sum reward, and max over last observations."""
total_reward = 0.0
done = None
for i in range(self._skip):
obs, reward, done, info = self.env.step(action)
if i == self._skip - 2:
self._obs_buffer[0] = obs
if i == self._skip - 1:
self._obs_buffer[1] = obs
total_reward += reward
if done:
break
# Note that the observation on the done=True frame
# doesn't matter
max_frame = self._obs_buffer.max(axis=0)

return max_frame, total_reward, done, info

def reset(self, **kwargs):
return self.env.reset(**kwargs)

class WarpFrame(gym.ObservationWrapper):
def __init__(self, env):
'''工夫4のWarp frameです。画像サイズをNatureのDQN論文と同じ84x84の白黒にします'''
gym.ObservationWrapper.__init__(self, env)
self.width = 84
self.height = 84
self.observation_space = spaces.Box(low=0, high=255, shape=(self.height, self.width, 1), dtype=np.uint8)

def observation(self, frame):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
return frame[:, :, None]

class WrapPyTorch(gym.ObservationWrapper):
def __init__(self, env=None):
'''PyTorchのミニバッチのインデックス順に変更するラッパー'''
super(WrapPyTorch, self).__init__(env)
obs_shape = self.observation_space.shape
self.observation_space = Box(
self.observation_space.low[0, 0, 0],
self.observation_space.high[0, 0, 0],
[obs_shape[2], obs_shape[1], obs_shape[0]],
dtype=self.observation_space.dtype)

def observation(self, observation):
return observation.transpose(2, 0, 1)

マルチプロセルでBreakoutを並列実行する環境を生成する関数make_envを定義します。
OpenAIが用意しているマルチプロセス環境であるクラスSubprocVecEnvを使用します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 実行環境生成関数の定義
# 並列実行環境
from baselines.common.vec_env.subproc_vec_env import SubprocVecEnv

def make_env(env_id, seed, rank):
def _thunk():
'''_thunk()がマルチプロセス環境のSubprocVecEnvを実行するのに必要'''
env = gym.make(env_id)
env = NoopResetEnv(env, noop_max=30)
env = MaxAndSkipEnv(env, skip=4)
env.seed(seed + rank) # 乱数シードの設定
env = EpisodicLifeEnv(env)
env = WarpFrame(env)
env = WrapPyTorch(env)

return env

return _thunk

定数を設定します。

Breakout-v0ですとフレームが自動的に2~4のランダムにskipされるため、今回はフレームスキップはさせないBreakoutNoFrameskip-v4を使用します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 定数の設定
ENV_NAME = 'BreakoutNoFrameskip-v4'
NUM_SKIP_FRAME = 4 # skipするframe数です
NUM_STACK_FRAME = 4 # 状態として連続的に保持するframe数です
NOOP_MAX = 30 # reset時に何もしないフレームを挟む(No-operation)フレーム数の乱数上限です
NUM_PROCESSES = 16 # 並列して同時実行するプロセス数です
NUM_ADVANCED_STEP = 5 # 何ステップ進めて報酬和を計算するのか設定
GAMMA = 0.99 # 時間割引率

TOTAL_FRAMES=10e6 # 学習に使用する総フレーム数
NUM_UPDATES = int(TOTAL_FRAMES / NUM_ADVANCED_STEP / NUM_PROCESSES) # ネットワークの総更新回数
# NUM_UPDATESは125,000となる

# A2Cの損失関数の計算のための定数設定
value_loss_coef = 0.5
entropy_coef = 0.01
max_grad_norm = 0.5

# 学習手法RMSpropの設定
lr = 7e-4
eps = 1e-5
alpha = 0.99

GPU使用の設定を行います。
GPU環境があれば cuda が出力されますが、そうでない場合は cpu が出力されます。

1
2
3
4
# GPUの使用の設定
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(device)

Advantage学習するためのメモリクラスを定義します。
.to(device)を使用して、GPU環境がある場合には自動的にGPUを使えるようにしています。
PyTorchではCPU環境とGPU環境を意識せずに同じプログラムをどちらの環境でも実行できるのが便利です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# メモリオブジェクトの定義
class RolloutStorage(object):
'''Advantage学習するためのメモリクラスです'''
def __init__(self, num_steps, num_processes, obs_shape):

self.observations = torch.zeros(
num_steps + 1, num_processes, *obs_shape).to(device)
# *を使うと()リストの中身を取り出す
# obs_shape→(4,84,84)
# *obs_shape→ 4 84 84

self.masks = torch.ones(num_steps + 1, num_processes, 1).to(device)
self.rewards = torch.zeros(num_steps, num_processes, 1).to(device)
self.actions = torch.zeros(num_steps, num_processes, 1).long().to(device)

# 割引報酬和を格納
self.returns = torch.zeros(num_steps + 1, num_processes, 1).to(device)
self.index = 0 # insertするインデックス

def insert(self, current_obs, action, reward, mask):
'''次のindexにtransitionを格納する'''
self.observations[self.index + 1].copy_(current_obs)
self.masks[self.index + 1].copy_(mask)
self.rewards[self.index].copy_(reward)
self.actions[self.index].copy_(action)

self.index = (self.index + 1) % NUM_ADVANCED_STEP # インデックスの更新

def after_update(self):
'''Advantageするstep数が完了したら、最新のものをindex0に格納'''
self.observations[0].copy_(self.observations[-1])
self.masks[0].copy_(self.masks[-1])

def compute_returns(self, next_value):
'''Advantageするステップ中の各ステップの割引報酬和を計算する'''

# 注意:5step目から逆向きに計算しています
# 注意:5step目はAdvantage1となる。4ステップ目はAdvantage2となる。・・・
self.returns[-1] = next_value
for ad_step in reversed(range(self.rewards.size(0))):
self.returns[ad_step] = self.returns[ad_step + 1] * \
GAMMA * self.masks[ad_step + 1] + self.rewards[ad_step]

A2Cのディープ・ニューラルネットワークの構築を実装します。

コンボリューション層の定義のNUM_STACK_FRAME(=4)は、過去4フレーム分の画像を使って1つの状態として扱いニューラルネットワーク入力とすることを意味します。
1つのフレームではボールの位置しか分かりませんが、2フレームあれば速度が分かり、3フレームあれば加速度が分かるようになります。
今回はDQNのNature論文に合わせて4フレームとしています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# A2Cのディープ・ニューラルネットワークの構築
def init(module, gain):
'''層の結合パラメータを初期化する関数を定義'''
nn.init.orthogonal_(module.weight.data, gain=gain)
nn.init.constant_(module.bias.data, 0)
return module

class Flatten(nn.Module):
'''コンボリューション層の出力画像を1次元に変換する層を定義'''
def forward(self, x):
return x.view(x.size(0), -1)

class Net(nn.Module):
def __init__(self, n_out):
super(Net, self).__init__()

# 結合パラメータの初期化関数
def init_(module): return init(
module, gain=nn.init.calculate_gain('relu'))

# コンボリューション層の定義
self.conv = nn.Sequential(
# 画像サイズの変化84*84→20*20
init_(nn.Conv2d(NUM_STACK_FRAME, 32, kernel_size=8, stride=4)),
# stackするflameは4画像なのでinput=NUM_STACK_FRAME=4である、出力は32とする、
# sizeの計算 size = (Input_size - Kernel_size + 2*Padding_size)/ Stride_size + 1

nn.ReLU(),
# 画像サイズの変化20*20→9*9
init_(nn.Conv2d(32, 64, kernel_size=4, stride=2)),
nn.ReLU(),
init_(nn.Conv2d(64, 64, kernel_size=3, stride=1)), # 画像サイズの変化9*9→7*7
nn.ReLU(),
Flatten(), # 画像形式を1次元に変換
init_(nn.Linear(64 * 7 * 7, 512)), # 64枚の7×7の画像を、512次元のoutputへ
nn.ReLU()
)

# 結合パラメータの初期化関数
def init_(module): return init(module, gain=1.0)

# Criticの定義
self.critic = init_(nn.Linear(512, 1)) # 状態価値なので出力は1つ

# 結合パラメータの初期化関数
def init_(module): return init(module, gain=0.01)

# Actorの定義
self.actor = init_(nn.Linear(512, n_out)) # 行動を決めるので出力は行動の種類数

# ネットワークを訓練モードに設定
self.train()

def forward(self, x):
'''ネットワークのフォワード計算を定義します'''
input = x / 255.0 # 画像のピクセル値0-255を0-1に正規化する
conv_output = self.conv(input) # Convolution層の計算
critic_output = self.critic(conv_output) # 状態価値の計算
actor_output = self.actor(conv_output) # 行動の計算

return critic_output, actor_output

def act(self, x):
'''状態xから行動を確率的に求めます'''
value, actor_output = self(x)
# 190324
# self(x)の動作について、以下のリンクの最下部のFAQに解説を補足しました。
# https://github.com/YutaroOgawa/Deep-Reinforcement-Learning-Book
probs = F.softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
action = probs.multinomial(num_samples=1)
return action

def get_value(self, x):
'''状態xから状態価値を求めます'''
value, actor_output = self(x)
# 190324
# self(x)の動作について、以下のリンクの最下部のFAQに解説を補足しました。
# https://github.com/YutaroOgawa/Deep-Reinforcement-Learning-Book
return value

def evaluate_actions(self, x, actions):
'''状態xから状態価値、実際の行動actionsのlog確率とエントロピーを求めます'''
value, actor_output = self(x)
# 190324
# self(x)の動作について、以下のリンクの最下部のFAQに解説を補足しました。
# https://github.com/YutaroOgawa/Deep-Reinforcement-Learning-Book

log_probs = F.log_softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
action_log_probs = log_probs.gather(1, actions) # 実際の行動のlog_probsを求める

probs = F.softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
dist_entropy = -(log_probs * probs).sum(-1).mean()

return value, action_log_probs, dist_entropy

Brainクラスを定義します。
勾配降下法にはRMSpropを使用します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# エージェントが持つ頭脳となるクラスを定義、全エージェントで共有する
class Brain(object):
def __init__(self, actor_critic):

self.actor_critic = actor_critic # actor_criticはクラスNetのディープ・ニューラルネットワーク

# 結合パラメータをロードする場合
#filename = 'weight.pth'
#param = torch.load(filename, map_location='cpu')
# self.actor_critic.load_state_dict(param)

# パラメータ更新の勾配法の設定
self.optimizer = optim.RMSprop(
actor_critic.parameters(), lr=lr, eps=eps, alpha=alpha)

def update(self, rollouts):
'''advanced計算した5つのstepの全てを使って更新します'''
obs_shape = rollouts.observations.size()[2:] # torch.Size([4, 84, 84])
num_steps = NUM_ADVANCED_STEP
num_processes = NUM_PROCESSES

values, action_log_probs, dist_entropy = self.actor_critic.evaluate_actions(
rollouts.observations[:-1].view(-1, *obs_shape),
rollouts.actions.view(-1, 1))

# 注意:各変数のサイズ
# rollouts.observations[:-1].view(-1, *obs_shape) torch.Size([80, 4, 84, 84])
# rollouts.actions.view(-1, 1) torch.Size([80, 1])
# values torch.Size([80, 1])
# action_log_probs torch.Size([80, 1])
# dist_entropy torch.Size([])

values = values.view(num_steps, num_processes, 1) # torch.Size([5, 16, 1])
action_log_probs = action_log_probs.view(num_steps, num_processes, 1)

advantages = rollouts.returns[:-1] - values # torch.Size([5, 16, 1])
value_loss = advantages.pow(2).mean()

action_gain = (advantages.detach() * action_log_probs).mean()
# detachしてadvantagesを定数として扱う

total_loss = (value_loss * value_loss_coef -
action_gain - dist_entropy * entropy_coef)

self.optimizer.zero_grad() # 勾配をリセット
total_loss.backward() # バックプロパゲーションを計算
nn.utils.clip_grad_norm_(self.actor_critic.parameters(), max_grad_norm)
# 一気に結合パラメータが変化しすぎないように、勾配の大きさは最大0.5までにする

self.optimizer.step() # 結合パラメータを更新

実行環境のクラス Environment を定義します。

  • 入力データは画像となります。4フレームで1つの状態を表します。
  • マルチプロセル環境 SuvprocVecEnv を使用しているのでエージェントごとのforループ処理は必要ありません。
  • 実行ループ100回ごとに得点を出力します。この出力で学習状態を確認します。
  • 定期的に結合パラメータを保存します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Breakoutを実行する環境のクラス
class Environment:
def run(self):
# seedの設定
seed_num = 1
torch.manual_seed(seed_num)
if use_cuda:
torch.cuda.manual_seed(seed_num)

# 実行環境を構築
torch.set_num_threads(seed_num)
envs = [make_env(ENV_NAME, seed_num, i) for i in range(NUM_PROCESSES)]
envs = SubprocVecEnv(envs) # マルチプロセスの実行環境にする

# 全エージェントが共有して持つ頭脳Brainを生成
n_out = envs.action_space.n # 行動の種類は4
actor_critic = Net(n_out).to(device) # GPUへ
global_brain = Brain(actor_critic)

# 格納用変数の生成
obs_shape = envs.observation_space.shape # (1, 84, 84)
obs_shape = (obs_shape[0] * NUM_STACK_FRAME,
*obs_shape[1:]) # (4, 84, 84)
# torch.Size([16, 4, 84, 84])
current_obs = torch.zeros(NUM_PROCESSES, *obs_shape).to(device)
rollouts = RolloutStorage(
NUM_ADVANCED_STEP, NUM_PROCESSES, obs_shape) # rolloutsのオブジェクト
episode_rewards = torch.zeros([NUM_PROCESSES, 1]) # 現在の試行の報酬を保持
final_rewards = torch.zeros([NUM_PROCESSES, 1]) # 最後の試行の報酬和を保持

# 初期状態の開始
obs = envs.reset()
obs = torch.from_numpy(obs).float() # torch.Size([16, 1, 84, 84])
current_obs[:, -1:] = obs # flameの4番目に最新のobsを格納

# advanced学習用のオブジェクトrolloutsの状態の1つ目に、現在の状態を保存
rollouts.observations[0].copy_(current_obs)

# 実行ループ
for j in tqdm(range(NUM_UPDATES)):
# advanced学習するstep数ごとに計算
for step in range(NUM_ADVANCED_STEP):

# 行動を求める
with torch.no_grad():
action = actor_critic.act(rollouts.observations[step])

cpu_actions = action.squeeze(1).cpu().numpy() # tensorをNumPyに

# 1stepの並列実行、なお返り値のobsのsizeは(16, 1, 84, 84)
obs, reward, done, info = envs.step(cpu_actions)

# 報酬をtensorに変換し、試行の総報酬に足す
# sizeが(16,)になっているのを(16, 1)に変換
reward = np.expand_dims(np.stack(reward), 1)
reward = torch.from_numpy(reward).float()
episode_rewards += reward

# 各実行環境それぞれについて、doneならmaskは0に、継続中ならmaskは1にする
masks = torch.FloatTensor(
[[0.0] if done_ else [1.0] for done_ in done])

# 最後の試行の総報酬を更新する
final_rewards *= masks # 継続中の場合は1をかけ算してそのまま、done時には0を掛けてリセット
# 継続中は0を足す、done時にはepisode_rewardsを足す
final_rewards += (1 - masks) * episode_rewards

# 試行の総報酬を更新する
episode_rewards *= masks # 継続中のmaskは1なのでそのまま、doneの場合は0に

# masksをGPUへ
masks = masks.to(device)

# 現在の状態をdone時には全部0にする
# maskのサイズをtorch.Size([16, 1])→torch.Size([16, 1, 1 ,1])へ変換して、かけ算
current_obs *= masks.unsqueeze(2).unsqueeze(2)

# frameをstackする
# torch.Size([16, 1, 84, 84])
obs = torch.from_numpy(obs).float()
current_obs[:, :-1] = current_obs[:, 1:] # 0~2番目に1~3番目を上書き
current_obs[:, -1:] = obs # 4番目に最新のobsを格納

# メモリオブジェクトに今stepのtransitionを挿入
rollouts.insert(current_obs, action.data, reward, masks)

# advancedのfor loop終了

# advancedした最終stepの状態から予想する状態価値を計算
with torch.no_grad():
next_value = actor_critic.get_value(
rollouts.observations[-1]).detach()

# 全stepの割引報酬和を計算して、rolloutsの変数returnsを更新
rollouts.compute_returns(next_value)

# ネットワークとrolloutの更新
global_brain.update(rollouts)
rollouts.after_update()

# ログ:途中経過の出力
if j % 100 == 0:
print("finished frames {}, mean/median reward {:.1f}/{:.1f}, min/max reward {:.1f}/{:.1f}".
format(j*NUM_PROCESSES*NUM_ADVANCED_STEP,
final_rewards.mean(),
final_rewards.median(),
final_rewards.min(),
final_rewards.max()))

# 結合パラメータの保存
if j % 12500 == 0:
torch.save(global_brain.actor_critic.state_dict(), 'weight_'+str(j)+'.pth')

# 実行ループの終了
torch.save(global_brain.actor_critic.state_dict(), 'weight_end.pth')

最後に実行します。

1
2
3
# 実行
breakout_env = Environment()
breakout_env.run()

実行途中結果

学習が完了すると学習データ weight_end.pth ファイルが出力されます。
次回はこの学習データを使ってBreakoutをプレイします。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

深層強化学習 ブロック崩し(breakout)をランダム操作

OpenAI Gym に用意されている環境の1つブロック崩し(breakout)を実行してみます。

まずbreakout-v0を実行するために、次のコマンドを実行します。

1
2
pip install --no-index -f https://github.com/Kojoley/atari-py/releases atari_py
pip install opencv-python

使用するパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

ブロック崩し(Breakout-v0)を指定して環境を作成します。

1
2
3
# ゲームの開始
ENV = 'Breakout-v0' # 使用する課題名
env = gym.make(ENV) # 実行する課題を設定

ゲームの状態と行動を把握するためのコードを実行します。

1
2
3
4
5
6
7
8
# ゲームの状態と行動を把握
# 状態
print('状態', env.observation_space)
# Box(210, 160, 3)

# 行動
print( env.action_space)
print('行動', env.unwrapped.get_action_meanings())

実行結果1

  • 状態 env.observation_space は縦210ピクセル、横160のRGB情報です。
    CartPoleのような物理情報ではなく画面そのものが状態となっているのがポイントです。
  • 行動 env.unwrapped.get_action_meanings() は次の4種類となります。
    1. NOOP : 何もしない
    2. FIRE : 玉を発射
    3. RIGHT : 右へ移動
    4. LEFT : 左へ移動

試しに初期状態の画面を表示してみます。

1
2
3
# 初期状態を描画してみる
observation = env.reset() # 環境の初期化
plt.imshow(observation) # 描画

実行結果2

画面上部に表示される数字の意味は下記の通りです。

  • 左の数字 : 得点
  • 中央の数字 : ライフ(残機)
  • 右の数字 : プレイヤー数/チーム数(今回は無視)

動画ファイルを保存する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)

anim.save('7_1breakout.mp4') # 動画ファイルを保存する

ランダムに行動し、その様子を動画ファイルで保存します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
frames = []  # 画像を格納していく変数
observation = env.reset() # 環境の初期化

for step in range(1000): # 最大1000エピソードのループ
frames.append(observation) # 変換せずに画像を保存
action = np.random.randint(0, 4) # 0~3のランダムな行動を求める
observation_next, reward, done, info = env.step(action) # 実行

observation = observation_next # 状態の更新

if done: # 終了したらループから抜ける
break

display_frames_as_gif(frames) # 動画を保存と描画してみよう

出力される動画ファイル 7_1breakout.mp4 は下記のようになります。

以上で、ブロック崩し(Breakout)をランダムに動かすことができました。
次回はこの環境に強化学習を適用します。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

深層強化学習 A2C

A2Cという深層強化学習を実装します。

A2Cではエージェントを複数用意して強化学習を行います。
A2Cは「Advantage学習」と「Actor-Critic」を意味します。

  • Advantage学習は2ステップ以上先まで動かしてQ関数を更新します。
  • Actor-Criticは方策反復法と価値反復法の両方を使用します。

使用するパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

定数を定義します。
今回はエージェントを16個用意し、Advantageするステップ数を5とします。

1
2
3
4
5
6
7
8
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
GAMMA = 0.99 # 時間割引率
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 1000 # 最大試行回数

NUM_PROCESSES = 32 # 同時に実行する環境
NUM_ADVANCED_STEP = 5 # 何ステップ進めて報酬和を計算するのか設定

A2Cの損失関数の計算のための定数を設定します。

1
2
3
4
# A2Cの損失関数の計算のための定数設定
value_loss_coef = 0.5
entropy_coef = 0.01
max_grad_norm = 0.5

Advantage学習用にメモリクラスを準備します。

  • marks は試行の終端を表します。
  • 関数 insert は現在の transition を RolloutStorage に追加します。
  • 関数 after_update は Advantage する5ステップが終了したら一番最新(後ろ)の内容を先頭に格納します。
  • 関数 compute_returns は各ステップでの割引報酬和を計算します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# メモリクラスの定義
class RolloutStorage(object):
'''Advantage学習するためのメモリクラスです'''

def __init__(self, num_steps, num_processes, obs_shape):

self.observations = torch.zeros(num_steps + 1, num_processes, 4)
self.masks = torch.ones(num_steps + 1, num_processes, 1)
self.rewards = torch.zeros(num_steps, num_processes, 1)
self.actions = torch.zeros(num_steps, num_processes, 1).long()

# 割引報酬和を格納
self.returns = torch.zeros(num_steps + 1, num_processes, 1)
self.index = 0 # insertするインデックス

def insert(self, current_obs, action, reward, mask):
'''次のindexにtransitionを格納する'''
self.observations[self.index + 1].copy_(current_obs)
self.masks[self.index + 1].copy_(mask)
self.rewards[self.index].copy_(reward)
self.actions[self.index].copy_(action)

self.index = (self.index + 1) % NUM_ADVANCED_STEP # インデックスの更新

def after_update(self):
'''Advantageするstep数が完了したら、最新のものをindex0に格納'''
self.observations[0].copy_(self.observations[-1])
self.masks[0].copy_(self.masks[-1])

def compute_returns(self, next_value):
'''Advantageするステップ中の各ステップの割引報酬和を計算する'''

# 注意:5step目から逆向きに計算しています
# 注意:5step目はAdvantage1となる。4ステップ目はAdvantage2となる。
self.returns[-1] = next_value
for ad_step in reversed(range(self.rewards.size(0))):
self.returns[ad_step] = self.returns[ad_step + 1] * \
GAMMA * self.masks[ad_step + 1] + self.rewards[ad_step]

A2Cのディープ・ニューラルネットワークの構築するクラスを実装します。

fc層を2つ用意し、Actor側とCritic側の出力を用意します。

  • 関数 act は状態xから行動を確率的に求めます。
  • 関数 get_value は状態xでの状態価値を求めます。
  • 関数 evaluate_actions はネットワークを更新します。
    状態xでの状態価値および実際に行った行動actionsを利用して、行動のlog確率action_log_probsと方策のエントロピーを計算します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# A2Cのディープ・ニューラルネットワークの構築
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):

def __init__(self, n_in, n_mid, n_out):
super(Net, self).__init__()
self.fc1 = nn.Linear(n_in, n_mid)
self.fc2 = nn.Linear(n_mid, n_mid)
self.actor = nn.Linear(n_mid, n_out) # 行動を決めるので出力は行動の種類数
self.critic = nn.Linear(n_mid, 1) # 状態価値なので出力は1つ

def forward(self, x):
'''ネットワークのフォワード計算を定義します'''
h1 = F.relu(self.fc1(x))
h2 = F.relu(self.fc2(h1))
critic_output = self.critic(h2) # 状態価値の計算
actor_output = self.actor(h2) # 行動の計算

return critic_output, actor_output

def act(self, x):
'''状態xから行動を確率的に求めます'''
value, actor_output = self(x)
# dim=1で行動の種類方向にsoftmaxを計算
action_probs = F.softmax(actor_output, dim=1)
action = action_probs.multinomial(num_samples=1) # dim=1で行動の種類方向に確率計算
return action

def get_value(self, x):
'''状態xから状態価値を求めます'''
value, actor_output = self(x)

return value

def evaluate_actions(self, x, actions):
'''状態xから状態価値、実際の行動actionsのlog確率とエントロピーを求めます'''
value, actor_output = self(x)

log_probs = F.log_softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
action_log_probs = log_probs.gather(1, actions) # 実際の行動のlog_probsを求める

probs = F.softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
entropy = -(log_probs * probs).sum(-1).mean()

return value, action_log_probs, entropy

エージェントが持つ頭脳となるBrainクラスを定義します。
このクラスのオブジェクトは全エージェントで共有されます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# エージェントが持つ頭脳となるクラスを定義、全エージェントで共有する
import torch
from torch import optim

class Brain(object):
def __init__(self, actor_critic):
self.actor_critic = actor_critic # actor_criticはクラスNetのディープ・ニューラルネットワーク
self.optimizer = optim.Adam(self.actor_critic.parameters(), lr=0.01)

def update(self, rollouts):
'''Advantageで計算した5つのstepの全てを使って更新します'''
obs_shape = rollouts.observations.size()[2:] # torch.Size([4, 84, 84])
num_steps = NUM_ADVANCED_STEP
num_processes = NUM_PROCESSES

values, action_log_probs, entropy = self.actor_critic.evaluate_actions(
rollouts.observations[:-1].view(-1, 4),
rollouts.actions.view(-1, 1))

# 注意:各変数のサイズ
# rollouts.observations[:-1].view(-1, 4) torch.Size([80, 4])
# rollouts.actions.view(-1, 1) torch.Size([80, 1])
# values torch.Size([80, 1])
# action_log_probs torch.Size([80, 1])
# entropy torch.Size([])

values = values.view(num_steps, num_processes, 1) # torch.Size([5, 16, 1])
action_log_probs = action_log_probs.view(num_steps, num_processes, 1)

# advantage(行動価値-状態価値)の計算
advantages = rollouts.returns[:-1] - values # torch.Size([5, 16, 1])

# Criticのlossを計算
value_loss = advantages.pow(2).mean()

# Actorのgainを計算、あとでマイナスをかけてlossにする
action_gain = (action_log_probs*advantages.detach()).mean()
# detachしてadvantagesを定数として扱う

# 誤差関数の総和
total_loss = (value_loss * value_loss_coef -
action_gain - entropy * entropy_coef)

# 結合パラメータを更新
self.actor_critic.train() # 訓練モードに
self.optimizer.zero_grad() # 勾配をリセット
total_loss.backward() # バックプロパゲーションを計算
nn.utils.clip_grad_norm_(self.actor_critic.parameters(), max_grad_norm)
# 一気に結合パラメータが変化しすぎないように、勾配の大きさは最大0.5までにする

self.optimizer.step() # 結合パラメータを更新

実行する環境のクラスを実装します。
このクラスでエージェントを複数生成し Advantage学習による報酬の計算を行います。

  • 関数 run でAdvantageする5ステップ分ずつ各環境を実行します。
    この5ステップ全てのtransitionに対してそれぞれ1~5ステップのAdvantage学習を実施します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# 実行する環境のクラスです
import copy

class Environment:
def run(self):
'''メインの実行'''

# 同時実行する環境数分、envを生成
envs = [gym.make(ENV) for i in range(NUM_PROCESSES)]

# 全エージェントが共有して持つ頭脳Brainを生成
n_in = envs[0].observation_space.shape[0] # 状態は4
n_out = envs[0].action_space.n # 行動は2
n_mid = 32
actor_critic = Net(n_in, n_mid, n_out) # ディープ・ニューラルネットワークの生成
global_brain = Brain(actor_critic)

# 格納用変数の生成
obs_shape = n_in
current_obs = torch.zeros(NUM_PROCESSES, obs_shape) # torch.Size([16, 4])
rollouts = RolloutStorage(NUM_ADVANCED_STEP, NUM_PROCESSES, obs_shape) # rolloutsのオブジェクト
episode_rewards = torch.zeros([NUM_PROCESSES, 1]) # 現在の試行の報酬を保持
final_rewards = torch.zeros([NUM_PROCESSES, 1]) # 最後の試行の報酬を保持
obs_np = np.zeros([NUM_PROCESSES, obs_shape]) # Numpy配列
reward_np = np.zeros([NUM_PROCESSES, 1]) # Numpy配列
done_np = np.zeros([NUM_PROCESSES, 1]) # Numpy配列
each_step = np.zeros(NUM_PROCESSES) # 各環境のstep数を記録
episode = 0 # 環境0の試行数

# 初期状態の開始
obs = [envs[i].reset() for i in range(NUM_PROCESSES)]
obs = np.array(obs)
obs = torch.from_numpy(obs).float() # torch.Size([16, 4])
current_obs = obs # 最新のobsを格納

# advanced学習用のオブジェクトrolloutsの状態の1つ目に、現在の状態を保存
rollouts.observations[0].copy_(current_obs)

# 実行ループ
for j in range(NUM_EPISODES*NUM_PROCESSES): # 全体のforループ
# advanced学習するstep数ごとに計算
for step in range(NUM_ADVANCED_STEP):

# 行動を求める
with torch.no_grad():
action = actor_critic.act(rollouts.observations[step])

# (16,1)→(16,)→tensorをNumPyに
actions = action.squeeze(1).numpy()

# 1stepの実行
for i in range(NUM_PROCESSES):
obs_np[i], reward_np[i], done_np[i], _ = envs[i].step(
actions[i])

# episodeの終了評価と、state_nextを設定
if done_np[i]: # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる

# 環境0のときのみ出力
if i == 0:
print('%d Episode: Finished after %d steps' % (
episode, each_step[i]+1))
episode += 1

# 報酬の設定
if each_step[i] < 195:
reward_np[i] = -1.0 # 途中でこけたら罰則として報酬-1を与える
else:
reward_np[i] = 1.0 # 立ったまま終了時は報酬1を与える

each_step[i] = 0 # step数のリセット
obs_np[i] = envs[i].reset() # 実行環境のリセット

else:
reward_np[i] = 0.0 # 普段は報酬0
each_step[i] += 1

# 報酬をtensorに変換し、試行の総報酬に足す
reward = torch.from_numpy(reward_np).float()
episode_rewards += reward

# 各実行環境それぞれについて、doneならmaskは0に、継続中ならmaskは1にする
masks = torch.FloatTensor(
[[0.0] if done_ else [1.0] for done_ in done_np])

# 最後の試行の総報酬を更新する
final_rewards *= masks # 継続中の場合は1をかけ算してそのまま、done時には0を掛けてリセット
# 継続中は0を足す、done時にはepisode_rewardsを足す
final_rewards += (1 - masks) * episode_rewards

# 試行の総報酬を更新する
episode_rewards *= masks # 継続中のmaskは1なのでそのまま、doneの場合は0に

# 現在の状態をdone時には全部0にする
current_obs *= masks

# current_obsを更新
obs = torch.from_numpy(obs_np).float() # torch.Size([16, 4])
current_obs = obs # 最新のobsを格納

# メモリオブジェクトに今stepのtransitionを挿入
rollouts.insert(current_obs, action.data, reward, masks)

# advancedのfor loop終了

# advancedした最終stepの状態から予想する状態価値を計算

with torch.no_grad():
next_value = actor_critic.get_value(
rollouts.observations[-1]).detach()
# rollouts.observationsのサイズはtorch.Size([6, 16, 4])

# 全stepの割引報酬和を計算して、rolloutsの変数returnsを更新
rollouts.compute_returns(next_value)

# ネットワークとrolloutの更新
global_brain.update(rollouts)
rollouts.after_update()

# 全部のNUM_PROCESSESが200step経ち続けたら成功
if final_rewards.sum().numpy() >= NUM_PROCESSES:
print('連続成功')
break

学習を実行します。

1
2
3
# main学習
cartpole_env = Environment()
cartpole_env.run()

実行結果

25エピソードで学習が完了しました。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

深層強化学習 Prioritized Experience Replay

Prioritized Experience Replayという深層強化学習を実装します。

  • Prioritized Experience Replayは、学習がきちんと進んでいない状態に対して優先的に学習させる深層強化学習です。
  • 具体的には誤差が大きいtransitionを優先的にExperience Replay時に学習させ、価値関数のニューラルネットワークの出力誤差が小さくなるようにします。

使用するパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画ファイルを保存する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0),
dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)

anim.save('6_4movie_cartpole_prioritized_experience_replay.mp4') # 動画ファイルを保存します。

経験(Transition)を表すnamedtupleを生成します。
(状態 state、行動 action、次の状態 next_state、報酬 rewardに容易にアクセスできます。)

1
2
3
# namedtupleを生成
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

定数を宣言します。

1
2
3
4
5
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
GAMMA = 0.99 # 時間割引率
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 500 # 最大試行回数

経験を保存するメモリクラスを定義します。

  • 経験を保存する関数 push、ランダムに経験を取り出す関数 sampleがあります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 経験を保存するメモリクラスを定義します
class ReplayMemory:

def __init__(self, CAPACITY):
self.capacity = CAPACITY # メモリの最大長さ
self.memory = [] # 経験を保存する変数
self.index = 0 # 保存するindexを示す変数

def push(self, state, action, state_next, reward):
'''transition = (state, action, state_next, reward)をメモリに保存する'''
if len(self.memory) < self.capacity:
self.memory.append(None) # メモリが満タンでないときは足す

# namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
self.memory[self.index] = Transition(state, action, state_next, reward)

self.index = (self.index + 1) % self.capacity # 保存するindexを1つずらす

def sample(self, batch_size):
'''batch_size分だけ、ランダムに保存内容を取り出す'''
return random.sample(self.memory, batch_size)

def __len__(self):
'''関数lenに対して、現在の変数memoryの長さを返す'''
return len(self.memory)

TD誤差を格納するメモリクラスを定義します。

  • 関数 get_prioritized_indexes はメモリ格納されている誤差の大きさに応じて確率的にindexを求める関数です。
  • 関数 update_td_erro はメモリに格納されている誤差を更新するための関数です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# TD誤差を格納するメモリクラスを定義します
TD_ERROR_EPSILON = 0.0001 # 誤差に加えるバイアス

class TDerrorMemory:

def __init__(self, CAPACITY):
self.capacity = CAPACITY # メモリの最大長さ
self.memory = [] # 経験を保存する変数
self.index = 0 # 保存するindexを示す変数

def push(self, td_error):
'''TD誤差をメモリに保存します'''
if len(self.memory) < self.capacity:
self.memory.append(None) # メモリが満タンでないときは足す

self.memory[self.index] = td_error
self.index = (self.index + 1) % self.capacity # 保存するindexを1つずらす

def __len__(self):
'''関数lenに対して、現在の変数memoryの長さを返す'''
return len(self.memory)

def get_prioritized_indexes(self, batch_size):
'''TD誤差に応じた確率でindexを取得'''
# TD誤差の和を計算
sum_absolute_td_error = np.sum(np.absolute(self.memory))
sum_absolute_td_error += TD_ERROR_EPSILON * len(self.memory) # 微小値を足す

# batch_size分の乱数を生成して、昇順に並べる
rand_list = np.random.uniform(0, sum_absolute_td_error, batch_size)
rand_list = np.sort(rand_list)

# 作成した乱数で串刺しにして、インデックスを求める
indexes = []
idx = 0
tmp_sum_absolute_td_error = 0
for rand_num in rand_list:
while tmp_sum_absolute_td_error < rand_num:
tmp_sum_absolute_td_error += (abs(self.memory[idx]) + TD_ERROR_EPSILON)
idx += 1

# 微小値を計算に使用した関係でindexがメモリの長さを超えた場合の補正
if idx >= len(self.memory):
idx = len(self.memory) - 1
indexes.append(idx)

return indexes

def update_td_error(self, updated_td_errors):
'''TD誤差の更新'''
self.memory = updated_td_errors

ニューラルネットワークを構築するクラスを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# ディープ・ニューラルネットワークの構築
# ニューラルネットワークの設定(Chainer風の書き方)
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):

def __init__(self, n_in, n_mid, n_out):
super(Net, self).__init__()
self.fc1 = nn.Linear(n_in, n_mid)
self.fc2 = nn.Linear(n_mid, n_mid)
self.fc3 = nn.Linear(n_mid, n_out)

def forward(self, x):
h1 = F.relu(self.fc1(x))
h2 = F.relu(self.fc2(h1))
output = self.fc3(h2)
return output

Brainクラスを実装します。
(このクラスがニューラルネットワークを保持します。)

初期化関数にはTD誤差を格納するクラス TDerrorMemory のオブジェクト生成処理を追加します。

  • 関数 replay では最初にExperience Replay を行い、途中から Prioritized Experience Replayに切り替えます。
  • 関数 update_td_error_memory でメモリオブジェクトの保存された全transitionのTD誤差を再計算します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# エージェントが持つ脳となるクラスです、PrioritizedExperienceReplayを実行します
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # CartPoleの行動(右に左に押す)の2を取得

# 経験を記憶するメモリオブジェクトを生成
self.memory = ReplayMemory(CAPACITY)

# ニューラルネットワークを構築
n_in, n_mid, n_out = num_states, 32, num_actions
self.main_q_network = Net(n_in, n_mid, n_out) # Netクラスを使用
self.target_q_network = Net(n_in, n_mid, n_out) # Netクラスを使用
print(self.main_q_network) # ネットワークの形を出力

# 最適化手法の設定
self.optimizer = optim.Adam(
self.main_q_network.parameters(), lr=0.0001)

# TD誤差のメモリオブジェクトを生成
self.td_error_memory = TDerrorMemory(CAPACITY)

def replay(self, episode):
'''Experience Replayでネットワークの結合パラメータを学習'''

# 1. メモリサイズの確認
if len(self.memory) < BATCH_SIZE:
return

# 2. ミニバッチの作成
self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch(episode)

# 3. 教師信号となるQ(s_t, a_t)値を求める
self.expected_state_action_values = self.get_expected_state_action_values()

# 4. 結合パラメータの更新
self.update_main_q_network()

def decide_action(self, state, episode):
'''現在の状態に応じて、行動を決定する'''
# ε-greedy法で徐々に最適行動のみを採用する
epsilon = 0.5 * (1 / (episode + 1))

if epsilon <= np.random.uniform(0, 1):
self.main_q_network.eval() # ネットワークを推論モードに切り替える
with torch.no_grad():
action = self.main_q_network(state).max(1)[1].view(1, 1)
# ネットワークの出力の最大値のindexを取り出します = max(1)[1]
# .view(1,1)は[torch.LongTensor of size 1] を size 1x1 に変換します

else:
# 0,1の行動をランダムに返す
action = torch.LongTensor(
[[random.randrange(self.num_actions)]]) # 0,1の行動をランダムに返す
# actionは[torch.LongTensor of size 1x1]の形になります

return action

def make_minibatch(self, episode):
'''2. ミニバッチの作成'''

# 2.1 メモリからミニバッチ分のデータを取り出す
if episode < 30:
transitions = self.memory.sample(BATCH_SIZE)
else:
# TD誤差に応じてミニバッチを取り出すに変更
indexes = self.td_error_memory.get_prioritized_indexes(BATCH_SIZE)
transitions = [self.memory.memory[n] for n in indexes]

# 2.2 各変数をミニバッチに対応する形に変形
# transitionsは1stepごとの(state, action, state_next, reward)が、BATCH_SIZE分格納されている
# つまり、(state, action, state_next, reward)×BATCH_SIZE
# これをミニバッチにしたい。つまり
# (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする
batch = Transition(*zip(*transitions))

# 2.3 各変数の要素をミニバッチに対応する形に変形し、ネットワークで扱えるようVariableにする
# 例えばstateの場合、[torch.FloatTensor of size 1x4]がBATCH_SIZE分並んでいるのですが、
# それを torch.FloatTensor of size BATCH_SIZEx4 に変換します
# 状態、行動、報酬、non_finalの状態のミニバッチのVariableを作成
# catはConcatenates(結合)のことです。
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

return batch, state_batch, action_batch, reward_batch, non_final_next_states

def get_expected_state_action_values(self):
'''3. 教師信号となるQ(s_t, a_t)値を求める'''

# 3.1 ネットワークを推論モードに切り替える
self.main_q_network.eval()
self.target_q_network.eval()

# 3.2 ネットワークが出力したQ(s_t, a_t)を求める
# self.model(state_batch)は、右左の両方のQ値を出力しており
# [torch.FloatTensor of size BATCH_SIZEx2]になっている。
# ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが右か左かのindexを求め
# それに対応するQ値をgatherでひっぱり出す。
self.state_action_values = self.main_q_network(
self.state_batch).gather(1, self.action_batch)

# 3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意。

# cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
self.batch.next_state)))
# まずは全部0にしておく
next_state_values = torch.zeros(BATCH_SIZE)
a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)

# 次の状態での最大Q値の行動a_mをMain Q-Networkから求める
# 最後の[1]で行動に対応したindexが返る
a_m[non_final_mask] = self.main_q_network(self.non_final_next_states).detach().max(1)[1]

# 次の状態があるものだけにフィルターし、size 32を32×1へ
a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

# 次の状態があるindexの、行動a_mのQ値をtarget Q-Networkから求める
# detach()で取り出す
# squeeze()でsize[minibatch×1]を[minibatch]に。
next_state_values[non_final_mask] = self.target_q_network(
self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

# 3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
expected_state_action_values = self.reward_batch + GAMMA * next_state_values

return expected_state_action_values

def update_main_q_network(self):
'''4. 結合パラメータの更新'''

# 4.1 ネットワークを訓練モードに切り替える
self.main_q_network.train()

# 4.2 損失関数を計算する(smooth_l1_lossはHuberloss)
# expected_state_action_valuesは
# sizeが[minbatch]になっているので、unsqueezeで[minibatch x 1]へ
loss = F.smooth_l1_loss(self.state_action_values, self.expected_state_action_values.unsqueeze(1))

# 4.3 結合パラメータを更新する
self.optimizer.zero_grad() # 勾配をリセット
loss.backward() # バックプロパゲーションを計算
self.optimizer.step() # 結合パラメータを更新

def update_target_q_network(self): # DDQNで追加
'''Target Q-NetworkをMainと同じにする'''
self.target_q_network.load_state_dict(self.main_q_network.state_dict())

def update_td_error_memory(self): # PrioritizedExperienceReplayで追加
'''TD誤差メモリに格納されているTD誤差を更新する'''

# ネットワークを推論モードに切り替える
self.main_q_network.eval()
self.target_q_network.eval()

# 全メモリでミニバッチを作成
transitions = self.memory.memory
batch = Transition(*zip(*transitions))

state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

# ネットワークが出力したQ(s_t, a_t)を求める
state_action_values = self.main_q_network(state_batch).gather(1, action_batch)

# cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None, batch.next_state)))

# まずは全部0にしておく、サイズはメモリの長さである
next_state_values = torch.zeros(len(self.memory))
a_m = torch.zeros(len(self.memory)).type(torch.LongTensor)

# 次の状態での最大Q値の行動a_mをMain Q-Networkから求める
# 最後の[1]で行動に対応したindexが返る
a_m[non_final_mask] = self.main_q_network(non_final_next_states).detach().max(1)[1]

# 次の状態があるものだけにフィルターし、size 32を32×1へ
a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

# 次の状態があるindexの、行動a_mのQ値をtarget Q-Networkから求める
# detach()で取り出す
# squeeze()でsize[minibatch×1]を[minibatch]に。
next_state_values[non_final_mask] = self.target_q_network(
non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

# TD誤差を求める
td_errors = (reward_batch + GAMMA * next_state_values) - state_action_values.squeeze()
# state_action_valuesはsize[minibatch×1]なので、squeezeしてsize[minibatch]へ

# TD誤差メモリを更新、Tensorをdetach()で取り出し、NumPyにしてから、Pythonのリストまで変換
self.td_error_memory.memory = td_errors.detach().numpy().tolist()

棒付き台車を表すAgentクラスを実装します。

  • 関数 memorize でメモリオブジェクトに経験したデータ transition を格納します。
  • 関数 memorize_td_erro ではそのステップでのTD誤差を格納します。
  • 関数 update_td_error_memory は各試行の終わりに実行され TDerrorMemory クラスのオブジェクトに格納されたTD誤差を更新します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# CartPoleで動くエージェントクラスです、棒付き台車そのものになります
class Agent:
def __init__(self, num_states, num_actions):
'''課題の状態と行動の数を設定する'''
self.brain = Brain(num_states, num_actions) # エージェントが行動を決定するための頭脳を生成

def update_q_function(self, episode):
'''Q関数を更新する'''
self.brain.replay(episode)

def get_action(self, state, episode):
'''行動を決定する'''
action = self.brain.decide_action(state, episode)
return action

def memorize(self, state, action, state_next, reward):
'''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
self.brain.memory.push(state, action, state_next, reward)

def update_target_q_function(self):
'''Target Q-NetworkをMain Q-Networkと同じに更新'''
self.brain.update_target_q_network()

def memorize_td_error(self, td_error): # PrioritizedExperienceReplayで追加
'''TD誤差メモリにTD誤差を格納'''
self.brain.td_error_memory.push(td_error)

def update_td_error_memory(self): # PrioritizedExperienceReplayで追加
'''TD誤差メモリに格納されているTD誤差を更新する'''
self.brain.update_td_error_memory()

CartPoleを実行する環境クラスを定義します。
(表形式表現のように離散化は行わず、観測結果 observationをそのままstateとして使用します。)

  • 各ステップでのTD誤差をTD誤差メモリに追加します。
  • Q_Networkの更新には引数 episode を追加します。(update_q_function関数)
  • 各試行の終わりにはTD誤差メモリの中身を更新させます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# CartPoleを実行する環境のクラスです
class Environment:

def __init__(self):
self.env = gym.make(ENV) # 実行する課題を設定
num_states = self.env.observation_space.shape[0] # 課題の状態と行動の数を設定
num_actions = self.env.action_space.n # CartPoleの行動(右に左に押す)の2を取得
# 環境内で行動するAgentを生成
self.agent = Agent(num_states, num_actions)

def run(self):
'''実行'''
episode_10_list = np.zeros(10) # 10試行分の立ち続けたstep数を格納し、平均ステップ数を出力に利用
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
episode_final = False # 最後の試行フラグ
frames = [] # 最後の試行を動画にするために画像を格納する変数

for episode in range(NUM_EPISODES): # 試行数分繰り返す
observation = self.env.reset() # 環境の初期化

state = observation # 観測をそのまま状態sとして使用
state = torch.from_numpy(state).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state = torch.unsqueeze(state, 0) # size 4をsize 1x4に変換

for step in range(MAX_STEPS): # 1エピソードのループ

# 動画描画を行います。
if episode_final is True: # 最終試行ではframesに各時刻の画像を追加していく
frames.append(self.env.render(mode='rgb_array'))

action = self.agent.get_action(state, episode) # 行動を求める

# 行動a_tの実行により、s_{t+1}とdoneフラグを求める
# actionから.item()を指定して、中身を取り出す
observation_next, _, done, _ = self.env.step(
action.item()) # rewardとinfoは使わないので_にする

# 報酬を与える。さらにepisodeの終了評価と、state_nextを設定する
if done: # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
state_next = None # 次の状態はないので、Noneを格納

# 直近10episodeの立てたstep数リストに追加
episode_10_list = np.hstack(
(episode_10_list[1:], step + 1))

if step < 195:
reward = torch.FloatTensor(
[-1.0]) # 途中でこけたら罰則として報酬-1を与える
complete_episodes = 0 # 連続成功記録をリセット
else:
reward = torch.FloatTensor([1.0]) # 立ったまま終了時は報酬1を与える
complete_episodes = complete_episodes + 1 # 連続記録を更新
else:
reward = torch.FloatTensor([0.0]) # 普段は報酬0
state_next = observation_next # 観測をそのまま状態とする
state_next = torch.from_numpy(state_next).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state_next = torch.unsqueeze(
state_next, 0) # size 4をsize 1x4に変換

# メモリに経験を追加
self.agent.memorize(state, action, state_next, reward)

# TD誤差メモリにTD誤差を追加
self.agent.memorize_td_error(0) # 本当はTD誤差を格納するが、0をいれておく

# PrioritizedExperienceReplayでQ関数を更新する
self.agent.update_q_function(episode)

# 観測の更新
state = state_next

# 終了時の処理
if done:
print('%d Episode: Finished after %d steps:10試行の平均step数 = %.1lf' % (
episode, step + 1, episode_10_list.mean()))

# TD誤差メモリの中身を更新する
self.agent.update_td_error_memory()

# DDQNで追加、2試行に1度、Target Q-NetworkをMainと同じにコピーする
if(episode % 2 == 0):
self.agent.update_target_q_function()
break

if episode_final is True:
# 動画を保存
display_frames_as_gif(frames)
break

# 10連続で200step経ち続けたら成功
if complete_episodes >= 10:
print('10回連続成功')
episode_final = True # 次の試行を描画を行う最終試行とする

学習を実行します。

1
2
3
# main クラス
cartpole_env = Environment()
cartpole_env.run()

実行結果1 エピソード0~12

実行結果2 エピソード160~176(終了)

175エピソードで学習が完了しました。
出力された動画ファイル’6_4movie_cartpole_prioritized_experience_replay.mp4’は下記のようになります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

深層強化学習 Dueling Network

Dueling Networkという深層強化学習を実装します。

Dueling NetworkではQ関数を2つに分けて学習します。

  • 状態sだけで決まるV(s)
  • 行動しだいできまるAdvantage関数 A(s,a)

最後にこの2関数を足してQ(s,a)と決めます。

使用するパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画ファイルを保存する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0),
dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
anim.save('6_3movie_cartpole_dueling_network.mp4') # 動画のファイル名と保存です

経験(Transition)を表すnamedtupleを生成します。
状態 state、行動 action、次の状態 next_state、報酬 rewardに容易にアクセスできます。

1
2
3
# namedtupleを生成
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

定数を宣言します。

1
2
3
4
5
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
GAMMA = 0.99 # 時間割引率
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 500 # 最大試行回数

経験を保存するメモリクラスを定義します。
経験を保存する関数 push、ランダムに経験を取り出す関数 sampleがあります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 経験を保存するメモリクラスを定義します
class ReplayMemory:

def __init__(self, CAPACITY):
self.capacity = CAPACITY # メモリの最大長さ
self.memory = [] # 経験を保存する変数
self.index = 0 # 保存するindexを示す変数

def push(self, state, action, state_next, reward):
'''transition = (state, action, state_next, reward)をメモリに保存する'''
if len(self.memory) < self.capacity:
self.memory.append(None) # メモリが満タンでないときは足す

# namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
self.memory[self.index] = Transition(state, action, state_next, reward)

self.index = (self.index + 1) % self.capacity # 保存するindexを1つずらす

def sample(self, batch_size):
'''batch_size分だけ、ランダムに保存内容を取り出す'''
return random.sample(self.memory, batch_size)

def __len__(self):
'''関数lenに対して、現在の変数memoryの長さを返す'''
return len(self.memory)

Dueling Network型のディープ・ニューラルネットワークを構築します。

Advantage側の層 fc3_advと状態価値の層 fc3_vを作成します。
最後に出力する行動価値 output は上記2つを足したものとなります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Dueling Network型のディープ・ニューラルネットワークの構築
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):

def __init__(self, n_in, n_mid, n_out):
super(Net, self).__init__()
self.fc1 = nn.Linear(n_in, n_mid)
self.fc2 = nn.Linear(n_mid, n_mid)
# Dueling Network
self.fc3_adv = nn.Linear(n_mid, n_out) # Advantage側
self.fc3_v = nn.Linear(n_mid, 1) # 価値V側

def forward(self, x):
h1 = F.relu(self.fc1(x))
h2 = F.relu(self.fc2(h1))

adv = self.fc3_adv(h2) # この出力はReLUしない
val = self.fc3_v(h2).expand(-1, adv.size(1)) # この出力はReLUしない
# valはadvと足し算するために、サイズを[minibatchx1]から[minibatchx2]にexpandする
# adv.size(1)は出力する行動の種類数の2

output = val + adv - adv.mean(1, keepdim=True).expand(-1, adv.size(1))
# val+advからadvの平均値を引き算する
# adv.mean(1, keepdim=True) で列方向(行動の種類方向)に平均し、サイズが[minibatch×1]
# expandで展開して、サイズ[minibatchx2]

return output

Brainクラスを実装します。
このクラスがニューラルネットワークを保持します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# エージェントが持つ脳となるクラスです、Dueling Networkを実行します
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # CartPoleの行動(右に左に押す)の2を取得

# 経験を記憶するメモリオブジェクトを生成
self.memory = ReplayMemory(CAPACITY)

# ニューラルネットワークを構築
n_in, n_mid, n_out = num_states, 32, num_actions
self.main_q_network = Net(n_in, n_mid, n_out) # Netクラスを使用
self.target_q_network = Net(n_in, n_mid, n_out) # Netクラスを使用
print(self.main_q_network) # ネットワークの形を出力

# 最適化手法の設定
self.optimizer = optim.Adam(
self.main_q_network.parameters(), lr=0.0001)

def replay(self):
'''Experience Replayでネットワークの結合パラメータを学習'''

# 1. メモリサイズの確認
if len(self.memory) < BATCH_SIZE:
return

# 2. ミニバッチの作成
self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()

# 3. 教師信号となるQ(s_t, a_t)値を求める
self.expected_state_action_values = self.get_expected_state_action_values()

# 4. 結合パラメータの更新
self.update_main_q_network()

def decide_action(self, state, episode):
'''現在の状態に応じて、行動を決定する'''
# ε-greedy法で徐々に最適行動のみを採用する
epsilon = 0.5 * (1 / (episode + 1))

if epsilon <= np.random.uniform(0, 1):
self.main_q_network.eval() # ネットワークを推論モードに切り替える
with torch.no_grad():
action = self.main_q_network(state).max(1)[1].view(1, 1)
# ネットワークの出力の最大値のindexを取り出します = max(1)[1]
# .view(1,1)は[torch.LongTensor of size 1] を size 1x1 に変換します

else:
# 0,1の行動をランダムに返す
action = torch.LongTensor(
[[random.randrange(self.num_actions)]]) # 0,1の行動をランダムに返す
# actionは[torch.LongTensor of size 1x1]の形になります

return action

def make_minibatch(self):
'''2. ミニバッチの作成'''

# 2.1 メモリからミニバッチ分のデータを取り出す
transitions = self.memory.sample(BATCH_SIZE)

# 2.2 各変数をミニバッチに対応する形に変形
# transitionsは1stepごとの(state, action, state_next, reward)が、BATCH_SIZE分格納されている
# つまり、(state, action, state_next, reward)×BATCH_SIZE
# これをミニバッチにしたい。つまり
# (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする
batch = Transition(*zip(*transitions))

# 2.3 各変数の要素をミニバッチに対応する形に変形し、ネットワークで扱えるようVariableにする
# 例えばstateの場合、[torch.FloatTensor of size 1x4]がBATCH_SIZE分並んでいるのですが、
# それを torch.FloatTensor of size BATCH_SIZEx4 に変換します
# 状態、行動、報酬、non_finalの状態のミニバッチのVariableを作成
# catはConcatenates(結合)のことです。
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

return batch, state_batch, action_batch, reward_batch, non_final_next_states

def get_expected_state_action_values(self):
'''3. 教師信号となるQ(s_t, a_t)値を求める'''

# 3.1 ネットワークを推論モードに切り替える
self.main_q_network.eval()
self.target_q_network.eval()

# 3.2 ネットワークが出力したQ(s_t, a_t)を求める
# self.model(state_batch)は、右左の両方のQ値を出力しており
# [torch.FloatTensor of size BATCH_SIZEx2]になっている。
# ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが右か左かのindexを求め
# それに対応するQ値をgatherでひっぱり出す。
self.state_action_values = self.main_q_network(
self.state_batch).gather(1, self.action_batch)

# 3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意。

# cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
self.batch.next_state)))
# まずは全部0にしておく
next_state_values = torch.zeros(BATCH_SIZE)

a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)

# 次の状態での最大Q値の行動a_mをMain Q-Networkから求める
# 最後の[1]で行動に対応したindexが返る
a_m[non_final_mask] = self.main_q_network(
self.non_final_next_states).detach().max(1)[1]

# 次の状態があるものだけにフィルターし、size 32を32×1へ
a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

# 次の状態があるindexの、行動a_mのQ値をtarget Q-Networkから求める
# detach()で取り出す
# squeeze()でsize[minibatch×1]を[minibatch]に。
next_state_values[non_final_mask] = self.target_q_network(
self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

# 3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
expected_state_action_values = self.reward_batch + GAMMA * next_state_values

return expected_state_action_values

def update_main_q_network(self):
'''4. 結合パラメータの更新'''

# 4.1 ネットワークを訓練モードに切り替える
self.main_q_network.train()

# 4.2 損失関数を計算する(smooth_l1_lossはHuberloss)
# expected_state_action_valuesは
# sizeが[minbatch]になっているので、unsqueezeで[minibatch x 1]へ
loss = F.smooth_l1_loss(self.state_action_values,
self.expected_state_action_values.unsqueeze(1))

# 4.3 結合パラメータを更新する
self.optimizer.zero_grad() # 勾配をリセット
loss.backward() # バックプロパゲーションを計算
self.optimizer.step() # 結合パラメータを更新

def update_target_q_network(self): # DDQNで追加
'''Target Q-NetworkをMainと同じにする'''
self.target_q_network.load_state_dict(self.main_q_network.state_dict())

棒付き台車を表すAgentクラスを実装します。
関数 memorizeでメモリオブジェクトに経験したデータ transitionを格納します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# CartPoleで動くエージェントクラスです、棒付き台車そのものになります
class Agent:
def __init__(self, num_states, num_actions):
'''課題の状態と行動の数を設定する'''
self.brain = Brain(num_states, num_actions) # エージェントが行動を決定するための頭脳を生成

def update_q_function(self):
'''Q関数を更新する'''
self.brain.replay()

def get_action(self, state, episode):
'''行動を決定する'''
action = self.brain.decide_action(state, episode)
return action

def memorize(self, state, action, state_next, reward):
'''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
self.brain.memory.push(state, action, state_next, reward)

def update_target_q_function(self):
'''Target Q-NetworkをMain Q-Networkと同じに更新'''
self.brain.update_target_q_network()

CartPoleを実行する環境クラスを定義します。
表形式表現のように離散化は行わず、観測結果 observationをそのままstateとして使用します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# CartPoleを実行する環境のクラスです
class Environment:

def __init__(self):
self.env = gym.make(ENV) # 実行する課題を設定
num_states = self.env.observation_space.shape[0] # 課題の状態と行動の数を設定
num_actions = self.env.action_space.n # CartPoleの行動(右に左に押す)の2を取得
# 環境内で行動するAgentを生成
self.agent = Agent(num_states, num_actions)

def run(self):
'''実行'''
episode_10_list = np.zeros(10) # 10試行分の立ち続けたstep数を格納し、平均ステップ数を出力に利用
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
episode_final = False # 最後の試行フラグ
frames = [] # 最後の試行を動画にするために画像を格納する変数

for episode in range(NUM_EPISODES): # 試行数分繰り返す
observation = self.env.reset() # 環境の初期化

state = observation # 観測をそのまま状態sとして使用
state = torch.from_numpy(state).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state = torch.unsqueeze(state, 0) # size 4をsize 1x4に変換

for step in range(MAX_STEPS): # 1エピソードのループ

# 動画描画をコメントアウトしています
if episode_final is True: # 最終試行ではframesに各時刻の画像を追加していく
frames.append(self.env.render(mode='rgb_array'))

action = self.agent.get_action(state, episode) # 行動を求める

# 行動a_tの実行により、s_{t+1}とdoneフラグを求める
# actionから.item()を指定して、中身を取り出す
observation_next, _, done, _ = self.env.step(
action.item()) # rewardとinfoは使わないので_にする

# 報酬を与える。さらにepisodeの終了評価と、state_nextを設定する
if done: # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
state_next = None # 次の状態はないので、Noneを格納

# 直近10episodeの立てたstep数リストに追加
episode_10_list = np.hstack(
(episode_10_list[1:], step + 1))

if step < 195:
reward = torch.FloatTensor(
[-1.0]) # 途中でこけたら罰則として報酬-1を与える
complete_episodes = 0 # 連続成功記録をリセット
else:
reward = torch.FloatTensor([1.0]) # 立ったまま終了時は報酬1を与える
complete_episodes = complete_episodes + 1 # 連続記録を更新
else:
reward = torch.FloatTensor([0.0]) # 普段は報酬0
state_next = observation_next # 観測をそのまま状態とする
state_next = torch.from_numpy(state_next).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state_next = torch.unsqueeze(state_next, 0) # size 4をsize 1x4に変換

# メモリに経験を追加
self.agent.memorize(state, action, state_next, reward)

# Experience ReplayでQ関数を更新する
self.agent.update_q_function()

# 観測の更新
state = state_next

# 終了時の処理
if done:
print('%d Episode: Finished after %d steps:10試行の平均step数 = %.1lf' % (
episode, step + 1, episode_10_list.mean()))

# DDQNで追加、2試行に1度、Target Q-NetworkをMainと同じにコピーする
if(episode % 2 == 0):
self.agent.update_target_q_function()
break

if episode_final is True:
# 動画描画をコメントアウトしています
# 動画を保存と描画
display_frames_as_gif(frames)
break

# 10連続で200step経ち続けたら成功
if complete_episodes >= 10:
print('10回連続成功')
episode_final = True # 次の試行を描画を行う最終試行とする

学習を実行します。

1
2
3
# main クラス
cartpole_env = Environment()
cartpole_env.run()

実行結果1 エピソード0~10

実行結果2 エピソード150~165(終了)

164エピソードで学習が完了しました。
出力された動画ファイル’6_3movie_cartpole_dueling_network.mp4’は下記のようになります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

深層強化学習 DDQN (Double-DQN)

DDQN (Double-DQN) では2つのネットワークを使用して Main Q-Networkの更新量を求めます。

  • Main Q-Network
    次の状態でのQ値が最大となる行動を求めるネットワーク
  • Target Q-Network
    Main Q-Networkで求めた行動のQ値を求めるネットワーク

使用するパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画ファイルを保存する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
print(len(frames))
print(frames[0].shape)
plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0),
dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)

anim.save('6_2movie_cartpole_DDQN.mp4') # 動画のファイル名と保存です

経験(Transition)を表すnamedtupleを生成します。
状態 state、行動 action、次の状態 next_state、報酬 rewardに容易にアクセスできます。

1
2
3
# namedtupleを生成
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

定数を宣言します。

1
2
3
4
5
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
GAMMA = 0.99 # 時間割引率
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 500 # 最大試行回数

経験データを保存しておくメモリクラス ReplayMemoryを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 経験を保存するメモリクラスを定義します
class ReplayMemory:

def __init__(self, CAPACITY):
self.capacity = CAPACITY # メモリの最大長さ
self.memory = [] # 経験を保存する変数
self.index = 0 # 保存するindexを示す変数

def push(self, state, action, state_next, reward):
'''transition = (state, action, state_next, reward)をメモリに保存する'''
if len(self.memory) < self.capacity:
self.memory.append(None) # メモリが満タンでないときは足す

# namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
self.memory[self.index] = Transition(state, action, state_next, reward)

self.index = (self.index + 1) % self.capacity # 保存するindexを1つずらす

def sample(self, batch_size):
'''batch_size分だけ、ランダムに保存内容を取り出す'''
return random.sample(self.memory, batch_size)

def __len__(self):
'''関数lenに対して、現在の変数memoryの長さを返す'''
return len(self.memory)

ディープ・ニューラルネットワークの構築を行うクラスを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# ディープ・ニューラルネットワークの構築
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):

def __init__(self, n_in, n_mid, n_out):
super(Net, self).__init__()
self.fc1 = nn.Linear(n_in, n_mid)
self.fc2 = nn.Linear(n_mid, n_mid)
self.fc3 = nn.Linear(n_mid, n_out)

def forward(self, x):
h1 = F.relu(self.fc1(x))
h2 = F.relu(self.fc2(h1))
output = self.fc3(h2)
return output

エージェントが持つ脳となるクラスを定義します。

replay関数では下記の4処理を行います。

  1. メモリサイズの確認
  2. ミニバッチの作成 → make_minibatch()関数
  3. 教師信号となるQ値を求める → get_expected_state_action_values()関数
  4. 結合パラメータの更新 → update_main_q_network()関数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# エージェントが持つ脳となるクラスです、DDQNを実行します
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # CartPoleの行動(右に左に押す)の2を取得

# 経験を記憶するメモリオブジェクトを生成
self.memory = ReplayMemory(CAPACITY)

# ニューラルネットワークを構築
n_in, n_mid, n_out = num_states, 32, num_actions
self.main_q_network = Net(n_in, n_mid, n_out) # Netクラスを使用
self.target_q_network = Net(n_in, n_mid, n_out) # Netクラスを使用
print(self.main_q_network) # ネットワークの形を出力

# 最適化手法の設定
self.optimizer = optim.Adam(self.main_q_network.parameters(), lr=0.0001)

def replay(self):
'''Experience Replayでネットワークの結合パラメータを学習'''
# 1. メモリサイズの確認
if len(self.memory) < BATCH_SIZE:
return

# 2. ミニバッチの作成
self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()

# 3. 教師信号となるQ(s_t, a_t)値を求める
self.expected_state_action_values = self.get_expected_state_action_values()

# 4. 結合パラメータの更新
self.update_main_q_network()

def decide_action(self, state, episode):
'''現在の状態に応じて、行動を決定する'''
# ε-greedy法で徐々に最適行動のみを採用する
epsilon = 0.5 * (1 / (episode + 1))

if epsilon <= np.random.uniform(0, 1):
self.main_q_network.eval() # ネットワークを推論モードに切り替える
with torch.no_grad():
action = self.main_q_network(state).max(1)[1].view(1, 1)
# ネットワークの出力の最大値のindexを取り出します = max(1)[1]
# .view(1,1)は[torch.LongTensor of size 1] を size 1x1 に変換します

else:
# 0,1の行動をランダムに返す
action = torch.LongTensor(
[[random.randrange(self.num_actions)]]) # 0,1の行動をランダムに返す
# actionは[torch.LongTensor of size 1x1]の形になります

return action

def make_minibatch(self):
'''2. ミニバッチの作成'''

# 2.1 メモリからミニバッチ分のデータを取り出す
transitions = self.memory.sample(BATCH_SIZE)

# 2.2 各変数をミニバッチに対応する形に変形
# transitionsは1stepごとの(state, action, state_next, reward)が、BATCH_SIZE分格納されている
# つまり、(state, action, state_next, reward)×BATCH_SIZE
# これをミニバッチにしたい。つまり
# (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする
batch = Transition(*zip(*transitions))

# 2.3 各変数の要素をミニバッチに対応する形に変形し、ネットワークで扱えるようVariableにする
# 例えばstateの場合、[torch.FloatTensor of size 1x4]がBATCH_SIZE分並んでいるのですが、
# それを torch.FloatTensor of size BATCH_SIZEx4 に変換します
# 状態、行動、報酬、non_finalの状態のミニバッチのVariableを作成
# catはConcatenates(結合)のことです。
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

return batch, state_batch, action_batch, reward_batch, non_final_next_states

def get_expected_state_action_values(self):
'''3. 教師信号となるQ(s_t, a_t)値を求める'''

# 3.1 ネットワークを推論モードに切り替える
self.main_q_network.eval()
self.target_q_network.eval()

# 3.2 ネットワークが出力したQ(s_t, a_t)を求める
# self.model(state_batch)は、右左の両方のQ値を出力しており
# [torch.FloatTensor of size BATCH_SIZEx2]になっている。
# ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが右か左かのindexを求め
# それに対応するQ値をgatherでひっぱり出す。
self.state_action_values = self.main_q_network(
self.state_batch).gather(1, self.action_batch)

# 3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意。

# cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
self.batch.next_state)))
# まずは全部0にしておく
next_state_values = torch.zeros(BATCH_SIZE)

a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)

# 次の状態での最大Q値の行動a_mをMain Q-Networkから求める
# 最後の[1]で行動に対応したindexが返る
a_m[non_final_mask] = self.main_q_network(
self.non_final_next_states).detach().max(1)[1]

# 次の状態があるものだけにフィルターし、size 32を32×1へ
a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

# 次の状態があるindexの、行動a_mのQ値をtarget Q-Networkから求める
# detach()で取り出す
# squeeze()でsize[minibatch×1]を[minibatch]に。
next_state_values[non_final_mask] = self.target_q_network(
self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

# 3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
expected_state_action_values = self.reward_batch + GAMMA * next_state_values

return expected_state_action_values

def update_main_q_network(self):
'''4. 結合パラメータの更新'''

# 4.1 ネットワークを訓練モードに切り替える
self.main_q_network.train()

# 4.2 損失関数を計算する(smooth_l1_lossはHuberloss)
# expected_state_action_valuesは
# sizeが[minbatch]になっているので、unsqueezeで[minibatch x 1]へ
loss = F.smooth_l1_loss(self.state_action_values,
self.expected_state_action_values.unsqueeze(1))

# 4.3 結合パラメータを更新する
self.optimizer.zero_grad() # 勾配をリセット
loss.backward() # バックプロパゲーションを計算
self.optimizer.step() # 結合パラメータを更新

def update_target_q_network(self): # DDQNで追加
'''Target Q-NetworkをMainと同じにする'''
self.target_q_network.load_state_dict(self.main_q_network.state_dict())

CartPoleで動くAgentクラスを実装します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# CartPoleで動くエージェントクラスです、棒付き台車そのものになります
class Agent:
def __init__(self, num_states, num_actions):
'''課題の状態と行動の数を設定する'''
self.brain = Brain(num_states, num_actions) # エージェントが行動を決定するための頭脳を生成

def update_q_function(self):
'''Q関数を更新する'''
self.brain.replay()

def get_action(self, state, episode):
'''行動を決定する'''
action = self.brain.decide_action(state, episode)
return action

def memorize(self, state, action, state_next, reward):
'''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
self.brain.memory.push(state, action, state_next, reward)

def update_target_q_function(self):
'''Target Q-NetworkをMain Q-Networkと同じに更新'''
self.brain.update_target_q_network()

CartPoleを実行する環境クラスを実装します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# CartPoleを実行する環境のクラスです
class Environment:

def __init__(self):
self.env = gym.make(ENV) # 実行する課題を設定
num_states = self.env.observation_space.shape[0] # 課題の状態と行動の数を設定
num_actions = self.env.action_space.n # CartPoleの行動(右に左に押す)の2を取得
# 環境内で行動するAgentを生成
self.agent = Agent(num_states, num_actions)

def run(self):
'''実行'''
episode_10_list = np.zeros(10) # 10試行分の立ち続けたstep数を格納し、平均ステップ数を出力に利用
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
episode_final = False # 最後の試行フラグ
frames = [] # 最後の試行を動画にするために画像を格納する変数

for episode in range(NUM_EPISODES): # 試行数分繰り返す
observation = self.env.reset() # 環境の初期化

state = observation # 観測をそのまま状態sとして使用
state = torch.from_numpy(state).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state = torch.unsqueeze(state, 0) # size 4をsize 1x4に変換

for step in range(MAX_STEPS): # 1エピソードのループ

# 動画描画をコメントアウトしています
if episode_final is True: # 最終試行ではframesに各時刻の画像を追加していく
frames.append(self.env.render(mode='rgb_array'))

action = self.agent.get_action(state, episode) # 行動を求める

# 行動a_tの実行により、s_{t+1}とdoneフラグを求める
# actionから.item()を指定して、中身を取り出す
observation_next, _, done, _ = self.env.step(
action.item()) # rewardとinfoは使わないので_にする

# 報酬を与える。さらにepisodeの終了評価と、state_nextを設定する
if done: # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
state_next = None # 次の状態はないので、Noneを格納

# 直近10episodeの立てたstep数リストに追加
episode_10_list = np.hstack(
(episode_10_list[1:], step + 1))

if step < 195:
reward = torch.FloatTensor(
[-1.0]) # 途中でこけたら罰則として報酬-1を与える
complete_episodes = 0 # 連続成功記録をリセット
else:
reward = torch.FloatTensor([1.0]) # 立ったまま終了時は報酬1を与える
complete_episodes = complete_episodes + 1 # 連続記録を更新
else:
reward = torch.FloatTensor([0.0]) # 普段は報酬0
state_next = observation_next # 観測をそのまま状態とする
state_next = torch.from_numpy(state_next).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state_next = torch.unsqueeze(state_next, 0) # size 4をsize 1x4に変換

# メモリに経験を追加
self.agent.memorize(state, action, state_next, reward)

# Experience ReplayでQ関数を更新する
self.agent.update_q_function()

# 観測の更新
state = state_next

# 終了時の処理
if done:
print('%d Episode: Finished after %d steps:10試行の平均step数 = %.1lf' % (
episode, step + 1, episode_10_list.mean()))

# DDQNで追加、2試行に1度、Target Q-NetworkをMainと同じにコピーする
if(episode % 2 == 0):
self.agent.update_target_q_function()
break

if episode_final is True:
# 動画描画をコメントアウトしています
# 動画を保存と描画
display_frames_as_gif(frames)
break

# 10連続で200step経ち続けたら成功
if complete_episodes >= 10:
print('10回連続成功')
episode_final = True # 次の試行を描画を行う最終試行とする

学習を実行します。

1
2
3
# main クラス
cartpole_env = Environment()
cartpole_env.run()

実行結果1 エピソード0~20

実行結果2 エピソード20~47(終了)

46エピソードで学習が完了しました。
出力された動画ファイル’6_2movie_cartpole_DDQN.mp4’は下記のようになります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

深層強化学習 DQN (Deep Q-Network)

深層強化学習でOpenAI GymのCartPoleを攻略します。
深層強化学習は強化学習にディープラーニングを使用した方法です。

使用するパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画ファイルを保存する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
anim.save('5_4movie_cartpole_DQN.mp4') # 動画ファイルを保存します。

namedtupleを使うことで、値をフィールド名とペアで格納でき、値に対してフィールド名でアクセスできて便利です。
namedtupleの動作を確認するためのサンプルを実装してみます。

1
2
3
4
5
6
7
from collections import namedtuple

Tr = namedtuple('tr', ('name_a', 'value_b'))
Tr_object = Tr('名前Aです', 100)

print(Tr_object)
print(Tr_object.value_b)

実行結果1

経験(Transition)を表すnamedtupleを生成します。
状態 state、行動 action、次の状態 next_state、報酬 rewardに容易にアクセスできます。

1
2
3
# namedtupleを生成
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

定数を宣言します。

1
2
3
4
5
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
GAMMA = 0.99 # 時間割引率
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 500 # 最大試行回数

ミニバッチ学習を実現するために、経験データを保存しておくメモリクラス ReplayMemoryを定義します。
経験を保存する関数 push、ランダムに経験を取り出す関数 sampleがあります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 経験を保存するメモリクラスを定義します
class ReplayMemory:

def __init__(self, CAPACITY):
self.capacity = CAPACITY # メモリの最大長さ
self.memory = [] # 経験を保存する変数
self.index = 0 # 保存するindexを示す変数

def push(self, state, action, state_next, reward):
'''transition = (state, action, state_next, reward)をメモリに保存する'''
if len(self.memory) < self.capacity:
self.memory.append(None) # メモリが満タンでないときは足す

# namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
self.memory[self.index] = Transition(state, action, state_next, reward)

self.index = (self.index + 1) % self.capacity # 保存するindexを1つずらす

def sample(self, batch_size):
'''batch_size分だけ、ランダムに保存内容を取り出す'''
return random.sample(self.memory, batch_size)

def __len__(self):
'''関数lenに対して、現在の変数memoryの長さを返す'''
return len(self.memory)

Brainクラスを実装します。
このクラスがニューラルネットワークを保持します。

関数 replayはメモリクラスからミニバッチを取り出し、ニューラルネットワークの結合パラメータを学習し、Q関数を更新します。
この関数の流れは下記のようになります。

  1. メモリサイズの確認

  2. メモリサイズがミニバッチより小さい間は何もしない。

  3. ミニバッチの作成

  4. メモリからミニバッチ分のデータを取り出す。

  5. 各変数をミニバッチに対応する形に変形する。

  6. 各変数の要素をミニバッチに対応する形に変形する。

  7. 教師信号となるQ値を算出

  8. ネットワークを推論モードに切り替える。

  9. ネットワークが出力したQ値を求める。

  10. maxQ値を求める。

  11. 教師となるQ値をQ学習の式から求める。

  12. 結合パラメータの更新

  13. ネットワークを訓練モードに切り替える。

  14. 損失関数の値を計算する。

  15. 結合パラメータを更新する。

関数 decide_actionはε-greedy法によりランダムな行動 または 現在の状態に対してQ値が最大となる行動のindexを返します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# エージェントが持つ脳となるクラスです、DQNを実行します
# Q関数をディープラーニングのネットワークをクラスとして定義
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # CartPoleの行動(右に左に押す)の2を取得

# 経験を記憶するメモリオブジェクトを生成
self.memory = ReplayMemory(CAPACITY)

# ニューラルネットワークを構築
self.model = nn.Sequential()
self.model.add_module('fc1', nn.Linear(num_states, 32))
self.model.add_module('relu1', nn.ReLU())
self.model.add_module('fc2', nn.Linear(32, 32))
self.model.add_module('relu2', nn.ReLU())
self.model.add_module('fc3', nn.Linear(32, num_actions))

print(self.model) # ネットワークの形を出力

# 最適化手法の設定
self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)

def replay(self):
'''Experience Replayでネットワークの結合パラメータを学習'''
# -----------------------------------------
# 1. メモリサイズの確認
# -----------------------------------------
# 1.1 メモリサイズがミニバッチより小さい間は何もしない
if len(self.memory) < BATCH_SIZE:
return

# -----------------------------------------
# 2. ミニバッチの作成
# -----------------------------------------
# 2.1 メモリからミニバッチ分のデータを取り出す
transitions = self.memory.sample(BATCH_SIZE)

# 2.2 各変数をミニバッチに対応する形に変形
# transitionsは1stepごとの(state, action, state_next, reward)が、BATCH_SIZE分格納されている
# つまり、(state, action, state_next, reward)×BATCH_SIZE
# これをミニバッチにしたい。つまり
# (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする
batch = Transition(*zip(*transitions))

# 2.3 各変数の要素をミニバッチに対応する形に変形し、ネットワークで扱えるようVariableにする
# 例えばstateの場合、[torch.FloatTensor of size 1x4]がBATCH_SIZE分並んでいるのですが、
# それを torch.FloatTensor of size BATCH_SIZEx4 に変換します
# 状態、行動、報酬、non_finalの状態のミニバッチのVariableを作成
# catはConcatenates(結合)のことです。
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

# -----------------------------------------
# 3. 教師信号となるQ(s_t, a_t)値を求める
# -----------------------------------------
# 3.1 ネットワークを推論モードに切り替える
self.model.eval()

# 3.2 ネットワークが出力したQ(s_t, a_t)を求める
# self.model(state_batch)は、右左の両方のQ値を出力しており
# [torch.FloatTensor of size BATCH_SIZEx2]になっている。
# ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが右か左かのindexを求め
# それに対応するQ値をgatherでひっぱり出す。
state_action_values = self.model(state_batch).gather(1, action_batch)

# 3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意。

# cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
batch.next_state)))
# まずは全部0にしておく
next_state_values = torch.zeros(BATCH_SIZE)

# 次の状態があるindexの最大Q値を求める
# 出力にアクセスし、max(1)で列方向の最大値の[値、index]を求めます
# そしてそのQ値(index=0)を出力します
# detachでその値を取り出します
next_state_values[non_final_mask] = self.model(
non_final_next_states).max(1)[0].detach()

# 3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
expected_state_action_values = reward_batch + GAMMA * next_state_values

# -----------------------------------------
# 4. 結合パラメータの更新
# -----------------------------------------
# 4.1 ネットワークを訓練モードに切り替える
self.model.train()

# 4.2 損失関数を計算する(smooth_l1_lossはHuberloss)
# expected_state_action_valuesは
# sizeが[minbatch]になっているので、unsqueezeで[minibatch x 1]へ
loss = F.smooth_l1_loss(state_action_values,
expected_state_action_values.unsqueeze(1))

# 4.3 結合パラメータを更新する
self.optimizer.zero_grad() # 勾配をリセット
loss.backward() # バックプロパゲーションを計算
self.optimizer.step() # 結合パラメータを更新

def decide_action(self, state, episode):
'''現在の状態に応じて、行動を決定する'''
# ε-greedy法で徐々に最適行動のみを採用する
epsilon = 0.5 * (1 / (episode + 1))

if epsilon <= np.random.uniform(0, 1):
self.model.eval() # ネットワークを推論モードに切り替える
with torch.no_grad():
action = self.model(state).max(1)[1].view(1, 1)
# ネットワークの出力の最大値のindexを取り出します = max(1)[1]
# .view(1,1)は[torch.LongTensor of size 1] を size 1x1 に変換します

else:
# 0,1の行動をランダムに返す
action = torch.LongTensor(
[[random.randrange(self.num_actions)]]) # 0,1の行動をランダムに返す
# actionは[torch.LongTensor of size 1x1]の形になります

return action

棒付き台車を表すAgentクラスを実装します。
関数 memorizeでメモリオブジェクトに経験したデータ transitionを格納します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# CartPoleで動くエージェントクラスです、棒付き台車そのものになります
class Agent:
def __init__(self, num_states, num_actions):
'''課題の状態と行動の数を設定する'''
self.brain = Brain(num_states, num_actions) # エージェントが行動を決定するための頭脳を生成

def update_q_function(self):
'''Q関数を更新する'''
self.brain.replay()

def get_action(self, state, episode):
'''行動を決定する'''
action = self.brain.decide_action(state, episode)
return action

def memorize(self, state, action, state_next, reward):
'''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
self.brain.memory.push(state, action, state_next, reward)

CartPoleを実行する環境クラスを定義します。
表形式表現のように離散化は行わず、観測結果 observationをそのままstateとして使用します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# CartPoleを実行する環境のクラスです
class Environment:

def __init__(self):
self.env = gym.make(ENV) # 実行する課題を設定
num_states = self.env.observation_space.shape[0] # 課題の状態数4を取得
num_actions = self.env.action_space.n # CartPoleの行動(右に左に押す)の2を取得
self.agent = Agent(num_states, num_actions) # 環境内で行動するAgentを生成

def run(self):
'''実行'''
episode_10_list = np.zeros(10) # 10試行分の立ち続けたstep数を格納し、平均ステップ数を出力に利用
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
episode_final = False # 最後の試行フラグ
frames = [] # 最後の試行を動画にするために画像を格納する変数

for episode in range(NUM_EPISODES): # 最大試行数分繰り返す
observation = self.env.reset() # 環境の初期化

state = observation # 観測をそのまま状態sとして使用
state = torch.from_numpy(state).type(
torch.FloatTensor) # NumPy変数をPyTorchのテンソルに変換
state = torch.unsqueeze(state, 0) # size 4をsize 1x4に変換

for step in range(MAX_STEPS): # 1エピソードのループ

if episode_final is True: # 最終試行ではframesに各時刻の画像を追加していく
frames.append(self.env.render(mode='rgb_array'))

action = self.agent.get_action(state, episode) # 行動を求める

# 行動a_tの実行により、s_{t+1}とdoneフラグを求める
# actionから.item()を指定して、中身を取り出す
observation_next, _, done, _ = self.env.step(
action.item()) # rewardとinfoは使わないので_にする

# 報酬を与える。さらにepisodeの終了評価と、state_nextを設定する
if done: # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
state_next = None # 次の状態はないので、Noneを格納

# 直近10episodeの立てたstep数リストに追加
episode_10_list = np.hstack(
(episode_10_list[1:], step + 1))

if step < 195:
reward = torch.FloatTensor(
[-1.0]) # 途中でこけたら罰則として報酬-1を与える
complete_episodes = 0 # 連続成功記録をリセット
else:
reward = torch.FloatTensor([1.0]) # 立ったまま終了時は報酬1を与える
complete_episodes = complete_episodes + 1 # 連続記録を更新
else:
reward = torch.FloatTensor([0.0]) # 普段は報酬0
state_next = observation_next # 観測をそのまま状態とする
state_next = torch.from_numpy(state_next).type(
torch.FloatTensor) # numpy変数をPyTorchのテンソルに変換
state_next = torch.unsqueeze(state_next, 0) # size 4をsize 1x4に変換

# メモリに経験を追加
self.agent.memorize(state, action, state_next, reward)

# Experience ReplayでQ関数を更新する
self.agent.update_q_function()

# 観測の更新
state = state_next

# 終了時の処理
if done:
print('%d Episode: Finished after %d steps:10試行の平均step数 = %.1lf' % (
episode, step + 1, episode_10_list.mean()))
break

if episode_final is True:
# 動画を保存と描画
display_frames_as_gif(frames)
break

# 10連続で200step経ち続けたら成功
if complete_episodes >= 10:
print('10回連続成功')
episode_final = True # 次の試行を描画を行う最終試行とする

学習を実行します。

1
2
3
# main クラス
cartpole_env = Environment()
cartpole_env.run()

実行結果2 エピソード0~10

実行結果3 エピソード70~86(終了)

85エピソードで学習が完了しました。
出力された動画ファイル’5_4movie_cartpole_DQN.mp4’は下記のようになります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ