深層強化学習 ブロック崩し(breakout)をA2Cで攻略 -プレイ編-

前回学習したデータを使ってブロック崩し(breakout)をプレイします。

まず必要なパッケージをインポートします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# パッケージのimport
import numpy as np
from collections import deque
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import gym
from gym import spaces
from gym.spaces.box import Box

import matplotlib.pyplot as plt
%matplotlib inline

動画ファイルを保存する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0*1, frames[0].shape[0]/72.0*1),
dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=20)

anim.save('7breakout_play.mp4') # 動画のファイル名と保存です

実行環境を設定します。
おさらいとなりますがブロック崩し(breakout)の学習には4つの工夫をします。

  1. No-Operation
    実行環境をリセットするときに0~30ステップのいずれかの間何もしない行動を実施します。
    => ゲーム開始の初期状態を様々にし、特定の開始情報に特化しないようにするためです。
  2. Episodic Life
    5機ライフがありますが、1回失敗したときにゲーム終了とします。
    ただし崩したブロックはそのままの状態で次の試行を開始するようにします。
    => 多様な状態に対して学習ができるようにするためです。
  3. Max and Skip
    4フレームごとに行動を判断させ、4フレーム連続で同じ行動をするようにします。
    => 60Hzでゲームが進行すると早すぎるためエージェントの行動を15Hzにするためです。
  4. Warp frame
    縦210ピクセル、横160ピクセルのRGB値を縦横84ピクセルずつのグレースケール画像へと変換します。
    => 学習しやすくするためです。

また上記の4工夫とPyTorch環境に合わせるためのクラスWrapPyTorchを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# 実行環境の設定
import cv2
cv2.ocl.setUseOpenCL(False)

class NoopResetEnv(gym.Wrapper):
def __init__(self, env, noop_max=30):
'''工夫1のNo-Operationです。リセット後適当なステップの間何もしないようにし、
ゲーム開始の初期状態を様々にすることfで、特定の開始状態のみで学習するのを防ぐ'''
gym.Wrapper.__init__(self, env)
self.noop_max = noop_max
self.override_num_noops = None
self.noop_action = 0
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'

def reset(self, **kwargs):
""" Do no-op action for a number of steps in [1, noop_max]."""
self.env.reset(**kwargs)
if self.override_num_noops is not None:
noops = self.override_num_noops
else:
noops = self.unwrapped.np_random.randint(1, self.noop_max + 1) # pylint: disable=E1101
assert noops > 0
obs = None
for _ in range(noops):
obs, _, done, _ = self.env.step(self.noop_action)
if done:
obs = self.env.reset(**kwargs)
return obs

def step(self, ac):
return self.env.step(ac)

class EpisodicLifeEnv(gym.Wrapper):
def __init__(self, env):
'''工夫2のEpisodic Lifeです。1機失敗したときにリセットし、失敗時の状態から次を始める'''
gym.Wrapper.__init__(self, env)
self.lives = 0
self.was_real_done = True

def step(self, action):
obs, reward, done, info = self.env.step(action)
self.was_real_done = done
# check current lives, make loss of life terminal,
# then update lives to handle bonus lives
lives = self.env.unwrapped.ale.lives()
if lives < self.lives and lives > 0:
# for Qbert sometimes we stay in lives == 0 condtion for a few frames
# so its important to keep lives > 0, so that we only reset once
# the environment advertises done.
done = True
self.lives = lives
return obs, reward, done, info

def reset(self, **kwargs):
'''5機とも失敗したら、本当にリセット'''
if self.was_real_done:
obs = self.env.reset(**kwargs)
else:
# no-op step to advance from terminal/lost life state
obs, _, _, _ = self.env.step(0)
self.lives = self.env.unwrapped.ale.lives()
return obs

class MaxAndSkipEnv(gym.Wrapper):
def __init__(self, env, skip=4):
'''工夫3のMax and Skipです。4フレーム連続で同じ行動を実施し、最後の3、4フレームの最大値をとった画像をobsにする'''
gym.Wrapper.__init__(self, env)
# most recent raw observations (for max pooling across time steps)
self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype=np.uint8)
self._skip = skip

def step(self, action):
"""Repeat action, sum reward, and max over last observations."""
total_reward = 0.0
done = None
for i in range(self._skip):
obs, reward, done, info = self.env.step(action)
if i == self._skip - 2:
self._obs_buffer[0] = obs
if i == self._skip - 1:
self._obs_buffer[1] = obs
total_reward += reward
if done:
break
# Note that the observation on the done=True frame
# doesn't matter
max_frame = self._obs_buffer.max(axis=0)
return max_frame, total_reward, done, info

def reset(self, **kwargs):
return self.env.reset(**kwargs)

class WarpFrame(gym.ObservationWrapper):
def __init__(self, env):
'''工夫4のWarp frameです。画像サイズをNatureのDQN論文と同じ84x84の白黒にします'''
gym.ObservationWrapper.__init__(self, env)
self.width = 84
self.height = 84
self.observation_space = spaces.Box(low=0, high=255,
shape=(self.height, self.width, 1), dtype=np.uint8)

def observation(self, frame):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(frame, (self.width, self.height),
interpolation=cv2.INTER_AREA)
return frame[:, :, None]

class WrapPyTorch(gym.ObservationWrapper):
def __init__(self, env=None):
'''PyTorchのミニバッチのインデックス順に変更するラッパー'''
super(WrapPyTorch, self).__init__(env)
obs_shape = self.observation_space.shape
self.observation_space = Box(
self.observation_space.low[0, 0, 0],
self.observation_space.high[0, 0, 0],
[obs_shape[2], obs_shape[1], obs_shape[0]],
dtype=self.observation_space.dtype)

def observation(self, observation):
return observation.transpose(2, 0, 1)

再生用の実行環境を実装します。

  • EpisodicLifeEnvPlay 関数で1度でも失敗したらブロックの状態を最初から完全になり直します。
  • MaxAndSkipEnvPlay クラスで4フレーム目だけを画像として出力します。
  • make_env_play 関数で実行環境を生成します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 再生用の実行環境
class EpisodicLifeEnvPlay(gym.Wrapper):
def __init__(self, env):
'''工夫2のEpisodic Lifeです。1機失敗したときにリセットし、失敗時の状態から次を始める。
今回は再生用に、1機失敗したときのリセット時もブロックの状態をリセットします'''
gym.Wrapper.__init__(self, env)

def step(self, action):
obs, reward, done, info = self.env.step(action)
# ライフ(残機)が始め5あるが、1つでも減ると終了にする
if self.env.unwrapped.ale.lives() < 5:
done = True
return obs, reward, done, info

def reset(self, **kwargs):
'''1回でも失敗したら完全リセット'''
obs = self.env.reset(**kwargs)
return obs

class MaxAndSkipEnvPlay(gym.Wrapper):
def __init__(self, env, skip=4):
'''工夫3のMax and Skipです。4フレーム連続で同じ行動を実施し、最後の4フレームの画像をobsにする'''
gym.Wrapper.__init__(self, env)
# most recent raw observations (for max pooling across time steps)
self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype=np.uint8)
self._skip = skip

def step(self, action):
"""Repeat action, sum reward, and max over last observations."""
total_reward = 0.0
done = None
for i in range(self._skip):
obs, reward, done, info = self.env.step(action)
if i == self._skip - 2:
self._obs_buffer[0] = obs
if i == self._skip - 1:
self._obs_buffer[1] = obs
total_reward += reward
if done:
break

return obs, total_reward, done, info

def reset(self, **kwargs):
return self.env.reset(**kwargs)

# 実行環境生成関数の定義

# 並列実行環境
from baselines.common.vec_env.subproc_vec_env import SubprocVecEnv

def make_env(env_id, seed, rank):
def _thunk():
'''_thunk()がマルチプロセス環境のSubprocVecEnvを実行するのに必要'''

env = gym.make(env_id)
#env = NoopResetEnv(env, noop_max=30)
env = MaxAndSkipEnv(env, skip=4)
env.seed(seed + rank) # 乱数シードの設定
#env = EpisodicLifeEnv(env)
env = EpisodicLifeEnvPlay(env)
env = WarpFrame(env)
env = WrapPyTorch(env)

return env

return _thunk

def make_env_play(env_id, seed, rank):
'''再生用の実行環境'''
env = gym.make(env_id)
env = MaxAndSkipEnvPlay(env, skip=4)
env.seed(seed + rank) # 乱数シードの設定
env = EpisodicLifeEnvPlay(env)
return env

定数を設定します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 定数の設定
ENV_NAME = 'BreakoutNoFrameskip-v4'
# Breakout-v0ではなく、BreakoutNoFrameskip-v4を使用
# v0はフレームが自動的に2-4のランダムにskipされますが、今回はフレームスキップはさせないバージョンを使用
NUM_SKIP_FRAME = 4 # skipするframe数です
NUM_STACK_FRAME = 4 # 状態として連続的に保持するframe数です
NOOP_MAX = 30 # reset時に何もしないフレームを挟む(No-operation)フレーム数の乱数上限です
NUM_PROCESSES = 16 # 並列して同時実行するプロセス数です
NUM_ADVANCED_STEP = 5 # 何ステップ進めて報酬和を計算するのか設定
GAMMA = 0.99 # 時間割引率

TOTAL_FRAMES=10e6 # 学習に使用する総フレーム数
NUM_UPDATES = int(TOTAL_FRAMES / NUM_ADVANCED_STEP / NUM_PROCESSES) # ネットワークの総更新回数
# NUM_UPDATESは125,000となる

A2Cの損失関数の計算をするための定数を設定します。

1
2
3
4
5
6
7
8
9
# A2Cの損失関数の計算のための定数設定
value_loss_coef = 0.5
entropy_coef = 0.01
max_grad_norm = 0.5

# 学習手法RMSpropの設定
lr = 7e-4
eps = 1e-5
alpha = 0.99

GPU使用の設定を行います。(GPUがなくても実行できます。)

1
2
3
4
# GPUの使用の設定
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(device)

メモリオブジェクトの定義を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# メモリオブジェクトの定義
class RolloutStorage(object):
'''Advantage学習するためのメモリクラスです'''

def __init__(self, num_steps, num_processes, obs_shape):

self.observations = torch.zeros(
num_steps + 1, num_processes, *obs_shape).to(device)
# *を使うと()リストの中身を取り出す
# obs_shape→(4,84,84)
# *obs_shape→ 4 84 84

self.masks = torch.ones(num_steps + 1, num_processes, 1).to(device)
self.rewards = torch.zeros(num_steps, num_processes, 1).to(device)
self.actions = torch.zeros(
num_steps, num_processes, 1).long().to(device)

# 割引報酬和を格納
self.returns = torch.zeros(num_steps + 1, num_processes, 1).to(device)
self.index = 0 # insertするインデックス

def insert(self, current_obs, action, reward, mask):
'''次のindexにtransitionを格納する'''
self.observations[self.index + 1].copy_(current_obs)
self.masks[self.index + 1].copy_(mask)
self.rewards[self.index].copy_(reward)
self.actions[self.index].copy_(action)

self.index = (self.index + 1) % NUM_ADVANCED_STEP # インデックスの更新

def after_update(self):
'''Advantageするstep数が完了したら、最新のものをindex0に格納'''
self.observations[0].copy_(self.observations[-1])
self.masks[0].copy_(self.masks[-1])

def compute_returns(self, next_value):
'''Advantageするステップ中の各ステップの割引報酬和を計算する'''

# 注意:5step目から逆向きに計算しています
# 注意:5step目はAdvantage1となる。4ステップ目はAdvantage2となる。・・・
self.returns[-1] = next_value
for ad_step in reversed(range(self.rewards.size(0))):
self.returns[ad_step] = self.returns[ad_step + 1] * \
GAMMA * self.masks[ad_step + 1] + self.rewards[ad_step]

A2Cのディープ・ニューラルネットワークの構築を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# A2Cのディープ・ニューラルネットワークの構築
def init(module, gain):
'''層の結合パラメータを初期化する関数を定義'''
nn.init.orthogonal_(module.weight.data, gain=gain)
nn.init.constant_(module.bias.data, 0)
return module

class Flatten(nn.Module):
'''コンボリューション層の出力画像を1次元に変換する層を定義'''
def forward(self, x):
return x.view(x.size(0), -1)

class Net(nn.Module):
def __init__(self, n_out):
super(Net, self).__init__()

# 結合パラメータの初期化関数
def init_(module): return init(
module, gain=nn.init.calculate_gain('relu'))

# コンボリューション層の定義
self.conv = nn.Sequential(
# 画像サイズの変化84*84→20*20
init_(nn.Conv2d(NUM_STACK_FRAME, 32, kernel_size=8, stride=4)),
# stackするflameは4画像なのでinput=NUM_STACK_FRAME=4である、出力は32とする、
# sizeの計算 size = (Input_size - Kernel_size + 2*Padding_size)/ Stride_size + 1

nn.ReLU(),
# 画像サイズの変化20*20→9*9
init_(nn.Conv2d(32, 64, kernel_size=4, stride=2)),
nn.ReLU(),
init_(nn.Conv2d(64, 64, kernel_size=3, stride=1)), # 画像サイズの変化9*9→7*7
nn.ReLU(),
Flatten(), # 画像形式を1次元に変換
init_(nn.Linear(64 * 7 * 7, 512)), # 64枚の7×7の画像を、512次元のoutputへ
nn.ReLU()
)

# 結合パラメータの初期化関数
def init_(module): return init(module, gain=1.0)

# Criticの定義
self.critic = init_(nn.Linear(512, 1)) # 状態価値なので出力は1つ

# 結合パラメータの初期化関数
def init_(module): return init(module, gain=0.01)

# Actorの定義
self.actor = init_(nn.Linear(512, n_out)) # 行動を決めるので出力は行動の種類数

# ネットワークを訓練モードに設定
self.train()

def forward(self, x):
'''ネットワークのフォワード計算を定義します'''
input = x / 255.0 # 画像のピクセル値0-255を0-1に正規化する
conv_output = self.conv(input) # Convolution層の計算
critic_output = self.critic(conv_output) # 状態価値の計算
actor_output = self.actor(conv_output) # 行動の計算

return critic_output, actor_output

def act(self, x):
'''状態xから行動を確率的に求めます'''
value, actor_output = self(x)
probs = F.softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
action = probs.multinomial(num_samples=1)

return action

def get_value(self, x):
'''状態xから状態価値を求めます'''
value, actor_output = self(x)

return value

def evaluate_actions(self, x, actions):
'''状態xから状態価値、実際の行動actionsのlog確率とエントロピーを求めます'''
value, actor_output = self(x)

log_probs = F.log_softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
action_log_probs = log_probs.gather(1, actions) # 実際の行動のlog_probsを求める

probs = F.softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
dist_entropy = -(log_probs * probs).sum(-1).mean()

return value, action_log_probs, dist_entropy

エージェントが持つ頭脳となるクラスを定義します。
このクラスは全エージェントで共有されます。

前回の「学習編」で作成した学習データ weight_end.pth を使います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# エージェントが持つ頭脳となるクラスを定義、全エージェントで共有する
class Brain(object):
def __init__(self, actor_critic):

self.actor_critic = actor_critic # actor_criticはクラスNetのディープ・ニューラルネットワーク

# 結合パラメータをロードする場合
filename = 'weight_end.pth'
param = torch.load(filename, map_location='cpu')
self.actor_critic.load_state_dict(param)

# パラメータ更新の勾配法の設定
self.optimizer = optim.RMSprop(
actor_critic.parameters(), lr=lr, eps=eps, alpha=alpha)

def update(self, rollouts):
'''advanced計算した5つのstepの全てを使って更新します'''
obs_shape = rollouts.observations.size()[2:] # torch.Size([4, 84, 84])
num_steps = NUM_ADVANCED_STEP
num_processes = NUM_PROCESSES

values, action_log_probs, dist_entropy = self.actor_critic.evaluate_actions(
rollouts.observations[:-1].view(-1, *obs_shape),
rollouts.actions.view(-1, 1))

# 注意:各変数のサイズ
# rollouts.observations[:-1].view(-1, *obs_shape) torch.Size([80, 4, 84, 84])
# rollouts.actions.view(-1, 1) torch.Size([80, 1])
# values torch.Size([80, 1])
# action_log_probs torch.Size([80, 1])
# dist_entropy torch.Size([])

values = values.view(num_steps, num_processes,
1) # torch.Size([5, 16, 1])
action_log_probs = action_log_probs.view(num_steps, num_processes, 1)

advantages = rollouts.returns[:-1] - values # torch.Size([5, 16, 1])
value_loss = advantages.pow(2).mean()

action_gain = (advantages.detach() * action_log_probs).mean()
# detachしてadvantagesを定数として扱う

total_loss = (value_loss * value_loss_coef -
action_gain - dist_entropy * entropy_coef)

self.optimizer.zero_grad() # 勾配をリセット
total_loss.backward() # バックプロパゲーションを計算
nn.utils.clip_grad_norm_(self.actor_critic.parameters(), max_grad_norm)
# 一気に結合パラメータが変化しすぎないように、勾配の大きさは最大0.5までにする

self.optimizer.step() # 結合パラメータを更新

Breakoutを実行する環境のクラスを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Breakoutを実行する環境のクラス
NUM_PROCESSES = 1

class Environment:
def run(self):

# seedの設定
seed_num = 1
torch.manual_seed(seed_num)
if use_cuda:
torch.cuda.manual_seed(seed_num)

# 実行環境を構築
torch.set_num_threads(seed_num)
envs = [make_env(ENV_NAME, seed_num, i) for i in range(NUM_PROCESSES)]
envs = SubprocVecEnv(envs) # マルチプロセスの実行環境にする

# 全エージェントが共有して持つ頭脳Brainを生成
n_out = envs.action_space.n # 行動の種類は4
actor_critic = Net(n_out).to(device) # GPUへ
global_brain = Brain(actor_critic)

# 格納用変数の生成
obs_shape = envs.observation_space.shape # (1, 84, 84)
obs_shape = (obs_shape[0] * NUM_STACK_FRAME,
*obs_shape[1:]) # (4, 84, 84)
# torch.Size([16, 4, 84, 84])
current_obs = torch.zeros(NUM_PROCESSES, *obs_shape).to(device)
rollouts = RolloutStorage(
NUM_ADVANCED_STEP, NUM_PROCESSES, obs_shape) # rolloutsのオブジェクト
episode_rewards = torch.zeros([NUM_PROCESSES, 1]) # 現在の試行の報酬を保持
final_rewards = torch.zeros([NUM_PROCESSES, 1]) # 最後の試行の報酬和を保持

# 初期状態の開始
obs = envs.reset()
obs = torch.from_numpy(obs).float() # torch.Size([16, 1, 84, 84])
current_obs[:, -1:] = obs # flameの4番目に最新のobsを格納

# advanced学習用のオブジェクトrolloutsの状態の1つ目に、現在の状態を保存
rollouts.observations[0].copy_(current_obs)

# 描画用の環境(再生用に追加)
env_play = make_env_play(ENV_NAME, seed_num, 0)
obs_play = env_play.reset()

# 動画にするために画像を格納する変数(再生用に追加)
frames = []
main_end = False

# 実行ループ
for j in tqdm(range(NUM_UPDATES)):

# 報酬が基準を超えたら終わりにする(再生用に追加)
if main_end:
break

# advanced学習するstep数ごとに計算
for step in range(NUM_ADVANCED_STEP):

# 行動を求める
with torch.no_grad():
action = actor_critic.act(rollouts.observations[step])

cpu_actions = action.squeeze(1).cpu().numpy() # tensorをNumPyに

# 1stepの並列実行、なお返り値のobsのsizeは(16, 1, 84, 84)
obs, reward, done, info = envs.step(cpu_actions)

# 報酬をtensorに変換し、試行の総報酬に足す
# sizeが(16,)になっているのを(16, 1)に変換
reward = np.expand_dims(np.stack(reward), 1)
reward = torch.from_numpy(reward).float()
episode_rewards += reward

# 各実行環境それぞれについて、doneならmaskは0に、継続中ならmaskは1にする
masks = torch.FloatTensor(
[[0.0] if done_ else [1.0] for done_ in done])

# 最後の試行の総報酬を更新する
final_rewards *= masks # 継続中の場合は1をかけ算してそのまま、done時には0を掛けてリセット
# 継続中は0を足す、done時にはepisode_rewardsを足す
final_rewards += (1 - masks) * episode_rewards

# 画像を取得する(再生用に追加)
obs_play, reward_play, _, _ = env_play.step(cpu_actions[0])
frames.append(obs_play) # 変換した画像を保存
if done[0]: # 並列環境の1つ目が終了した場合
print(episode_rewards[0][0].numpy()) # 報酬

# 報酬が300を超えたら終わりにする
#if (episode_rewards[0][0].numpy()) > 300:
if (episode_rewards[0][0].numpy()) > 400:
main_end = True
break
else:
obs_view = env_play.reset()
frames = [] # 保存した画像をリセット

# 試行の総報酬を更新する
episode_rewards *= masks # 継続中のmaskは1なのでそのまま、doneの場合は0に

# masksをGPUへ
masks = masks.to(device)

# 現在の状態をdone時には全部0にする
# maskのサイズをtorch.Size([16, 1])→torch.Size([16, 1, 1 ,1])へ変換して、かけ算
current_obs *= masks.unsqueeze(2).unsqueeze(2)

# frameをstackする
# torch.Size([16, 1, 84, 84])
obs = torch.from_numpy(obs).float()
current_obs[:, :-1] = current_obs[:, 1:] # 0~2番目に1~3番目を上書き
current_obs[:, -1:] = obs # 4番目に最新のobsを格納

# メモリオブジェクトに今stepのtransitionを挿入
rollouts.insert(current_obs, action.data, reward, masks)

# advancedのfor loop終了

# advancedした最終stepの状態から予想する状態価値を計算
with torch.no_grad():
next_value = actor_critic.get_value(
rollouts.observations[-1]).detach()

# 全stepの割引報酬和を計算して、rolloutsの変数returnsを更新
rollouts.compute_returns(next_value)

# ネットワークとrolloutの更新
# global_brain.update(rollouts)
rollouts.after_update()

# 実行ループ終わり
display_frames_as_gif(frames) # 動画の保存と再生

実行します。

1
2
3
# 実行
breakout_env = Environment()
frames = breakout_env.run()

実行結果(途中略)

出力される動画ファイル 7breakout_play.mp4 は下記のようになります。

うまく壁の端からブロックを消して裏側に通したくさんのブロックを崩しています。
画面上部の方のブロックを消した方が高得点なので、報酬を-1から1にクリッピングしない方が裏側に通すように学習しやすくなります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ