模倣学習

模倣学習では、専門家や上手な人の行動を記録しておいてそれと近い行動をとるようにエージェントを学習させます。
少ないデータで望ましい行動を短時間で学習させることができる模倣学習はとても重要な学習手法です。

模倣学習には2つの問題があります。

  • 状態数が多い場合、上手な人の行動をとりきるのが困難になる。
  • 行動を記録するのが難しい状態がある。

模倣学習の方法として次の4つがあります。

1.Forward Training

各タイムステップの個別戦略を作っておいてそれをつなぎ合わせて全体戦略とします。
単純な教師あり学習より実際の状態遷移分布に近いデータで各戦略を学習させることができます。

2.SMILe

複数の戦略を混合していく手法です。
最初の戦略は上手な人の行動だけから学習し、その後は学習した戦略を混ぜていきます。

3.DAgger

戦略ではなくデータを混ぜ合わせていき、そこから学習して戦略を作成していきます。
具体的には各ステップで得られた状態とその状態における上手な人の行動のペアを学習データに足していきます。

4.GAIL

上手な人の模倣を見破られないようにする手法です。
模倣する側と模倣を見破る側の2つのモデルが存在し、一方は模倣を行いもう一方は鑑定を行う設定で学習を行います。(敵対的学習)

探索の概要

探索

現在の状態を開始点として、数手先をどう展開するかを先読みし、展開先の状態を評価します。
その状態評価をもとに現在の状態での最良の一手を選ぶ手法です。

探索では状態の展開を表すのにゲーム木でモデル化します。

ゲーム木

完全ゲーム木

ゲームの開始から選択できるすべての手を含んだゲーム木です。
これがあれば絶対に負けない戦略を立てることができますが、完全ゲーム木のノード数は膨大なため計算が不可能なことがほとんどです。

部分ゲーム木

現在の状態から時間内に探索できる分だけを含んだゲーム木です。
有効だと思われるノードは深く探索し、有効でないノードは途中で探索を打ち切ります。
強さはいかに効率が高い部分ゲーム木を手に入れられるかということにかかってきます。

ニューロンとニューラルネットワーク

ニューロン

ニューロンは人間の脳内にある神経細胞のことです。

深層学習でのニューロンは、人間脳内の神経細胞を模したものです。
このニューロンは重みパラメータと閾値(バイアス)を持っています。

  • 重みパラメータ
    ニューロン同士のつながりの強さを表します。
  • 閾値(バイアス)
    脳細胞の感度のようなものになります。
    入力信号と重みパラメータを掛け合わせたものが閾値を超えた時に次のニューロンへ信号を送ります。(発火)

深層強化学習が行われることで、上記2つのパラメータが調整されていきます。

ニューラルネットワーク

ニューロンを複数並べたものを層といいます。
層を積み重ねたものがニューラルネットワークとなります。

  • 入力層
    最初にある層で入力を受け付けます。
    入力データの数がニューロン数となります。
  • 出力層
    最後にある層で出力を行います。
    出力する数(答えの数)がニューロン数となります。
  • 隠れ層
    入力層と出力層の間にある層です。
    複数作成することが可能で、4層以上のニューラルネットワークがディープニューラルネットワークと呼ばれます。(入力層×1、隠れ層×2、出力層×1)

コンピュータの能力向上や、インターネットの広がりで学習データが容易に収集できるようになり深層強化学習は広く普及しました。

深層学習 ニューラルネットワークで回帰

ニューラルネットワークで数値データの予測を行う推定モデルを作成します。
住宅情報から価格を予測します。

まずは必要なパッケージをインポートします。

1
2
3
4
5
6
7
8
9
10
# パッケージのインポート
from tensorflow.keras.datasets import boston_housing
from tensorflow.keras.layers import Activation, Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

データセットの準備を行います。

各データの内容は次の通りです。

変数名 内容
train_data 訓練データの配列
train_labels 訓練ラベルの配列
test_data テストデータの配列
test_labels テストラベルの配列
1
2
# データセットの準備
(train_data, train_labels), (test_data, test_labels) = boston_housing.load_data()

実行結果1

データセットのシェイプを確認します。

1
2
3
4
5
# データセットのシェイプの確認
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)

実行結果2(一部略)
訓練データと訓練ラベルは404件、テストデータとテストラベルは102件です。
データの13は住宅情報の種類数です。

訓練データの先頭10件を表示します。

1
2
3
4
# データセットのデータの確認
column_names = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT']
df = pd.DataFrame(train_data, columns=column_names)
df.head()

実行結果3

訓練ラベルの先頭10件を表示します。

1
2
# データセットのラベルの確認
print(train_labels[0:10])

実行結果4

学習前の準備として、訓練データと訓練ラベルをシャッフルします。
似たデータを連続して学習すると偏りが生じてしまうのを防ぐためです。

1
2
3
4
# データセットのシャッフルの前処理
order = np.argsort(np.random.random(train_labels.shape))
train_data = train_data[order]
train_labels = train_labels[order]

訓練データとテストデータの正規化を行います。
データを一定の方法で変換し同じ単位で比較しやすくするためです。

具体的には平均0、分散1で正規化を行います。

1
2
3
4
5
# データセットの正規化の前処理
mean = train_data.mean(axis=0)
std = train_data.std(axis=0)
train_data = (train_data - mean) / std
test_data = (test_data - mean) / std

データセットのデータが平均0、分散1になっていることを確認します。

1
2
3
4
# データセットの前処理後のデータの確認
column_names = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT']
df = pd.DataFrame(train_data, columns=column_names)
df.head()

実行結果5(一部略)

モデルを作成します。今回は全結合層を3つ重ねた簡単なモデルとなります。

1
2
3
4
5
# モデルの作成
model = Sequential()
model.add(Dense(64, activation='relu', input_shape=(13,)))
model.add(Dense(64, activation='relu'))
model.add(Dense(1))

ニューラルネットワークモデルのコンパイルを行います。

  • 損失関数 mse
    平均二乗誤差 Mean Squared Error - 実際の値と予測値との誤差の二乗を平均したものです。
    0に近いほど予測精度が高いことになります。
  • 最適化関数 Adam
    lrは学習率です。
  • 評価指標 mae
    平均絶対誤差 Mean Absolute Error - 実際の値と予測値との絶対値を平均したものです。
    0に近いほど予測精度が高いことになります。
1
2
# コンパイル
model.compile(loss='mse', optimizer=Adam(lr=0.001), metrics=['mae'])

EarlyStoppingの準備を行います。
任意のエポック数改善がないと学習を停止します。

1
2
# EarlyStoppingの準備
early_stop = EarlyStopping(monitor='val_loss', patience=30)

学習を行います。callbacksにEarlyStoppingを指定しています。

1
2
# 学習
history = model.fit(train_data, train_labels, epochs=500, validation_split=0.2, callbacks=[early_stop])

実行結果6(途中略)

学習中に出力される情報の意味は次の通りです。

情報 説明
loss 訓練データの誤差です。0に近いほどよい結果となります。
mean_absolute_error 訓練データの平均絶対誤差です。0に近いほどよい結果となります。
val_loss 検証データの誤差です。0に近いほどよい結果となります。
val_mean_absolute_error 検証データの平均絶対誤差です。0に近いほどよい結果となります。

上記のデータうち、訓練データの平均絶対誤差(mae)と検証データの平均絶対誤差(val_mae)をグラフ表示します。

1
2
3
4
5
6
7
8
# グラフの表示
plt.plot(history.history['mean_absolute_error'], label='train mae')
plt.plot(history.history['val_mean_absolute_error'], label='val mae')
plt.xlabel('epoch')
plt.ylabel('mae [1000$]')
plt.legend(loc='best')
plt.ylim([0,5])
plt.show()

実行結果7

テストデータとテストラベルを推定モデルに渡して評価を行い、平均絶対誤差を算出します。

1
2
3
# 評価
test_loss, test_mae = model.evaluate(test_data, test_labels)
print('loss:{:.3f}\nmae: {:.3f}'.format(test_loss, test_mae))

実行結果8

平均絶対誤差は2.655となりました。

テストデータの先頭10件の推論を行い、予測結果を出力します。

1
2
3
4
5
6
# 推論する値段の表示
print(np.round(test_labels[0:10]))

# 推論した値段の表示
test_predictions = model.predict(test_data[0:10]).flatten()
print(np.round(test_predictions))

実行結果9

実際の価格に近い価格が推論されているような気がします。

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

深層学習 ニューラルネットワークで分類

手書き数字を分類するためにニューラルネットワークを作成し、実際の数字を推論するモデルを作ります。

まずは必要なパッケージをインポートします。

1
2
3
4
5
6
7
8
9
# パッケージのインポート
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Activation, Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

データセットの準備を行います。

各データの内容は次の通りです。

変数名 内容
train_images 訓練画像の配列
train_labels 訓練ラベルの配列
test_images テスト画像の配列
test_labels テストラベルの配列
1
2
# データセットの準備
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

実行結果1

データセットのシェイプを確認します。

1
2
3
4
5
# データセットのシェイプの確認
print(train_images.shape)
print(train_labels.shape)
print(test_images.shape)
print(test_labels.shape)

実行結果2

訓練画像データは60000×画像サイズ(28×28)です。
訓練ラベルデータは60000の1次元配列となります。

データセットの画像を確認するために先頭の10件を表示します。

1
2
3
4
5
# データセットの画像の確認
for i in range(10):
plt.subplot(1, 10, i+1)
plt.imshow(train_images[i], 'gray')
plt.show()

実行結果3

データセットのラベルを確認するために先頭の10件を表示します。

1
2
# データセットのラベルの確認
print(train_labels[0:10])

実行結果4

学習を開始する事前準備として、データセットをニューラルネットワークに適した形に変換します。
具体的には、画像データを28×28の2次元配列から1次元配列(786)に変換します。

1
2
3
4
5
6
7
# データセットの画像の前処理
train_images = train_images.reshape((train_images.shape[0], 784))
test_images = test_images.reshape((test_images.shape[0], 784))

# データセットの画像の前処理後のシェイプの確認
print(train_images.shape)
print(test_images.shape)

実行結果5

ラベルデータに関しても、ニューラルネットワークに適した形に変換します。
具体的にはone-hot表現に変えます。
one-hot表現とは、ある1要素が1でほかの要素が0である配列です。
ラベルが8の場合は[0, 0, 0, 0, 0, 0, 0, 0, 1, 0]という配列になります。

1
2
3
4
5
6
7
# データセットのラベルの前処理
train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)

# データセットのラベルの前処理後のシェイプの確認
print(train_labels.shape)
print(test_labels.shape)

実行結果6

ニューラルネットワークのモデルを作成します。

入力層のシェイプは画像データに合わせて786で、出力層はラベルデータに合わせて10とします。
ユニット数と隠れ層の数は自由に決められますが今回はユニット数256と隠れ層128としました。

層とユニット数を増やすと複雑な特徴をとらえることができるようになる半面、学習時間が多くかかるようになってしまいます。
またユニット数が多くなると重要性の低い特徴を抽出して過学習になってしまう可能性があります。

Dropoutは過学習を防いでモデルの精度をあげるための手法となります。
任意の層のユニットをランダムに無効にして特定ニューロンへの依存を防ぎ汎化性能を上げます。

活性化関数は結合層の後に適用する関数で層からの出力に対して特定の関数を経由し最終的な出力値を決めます。活性化関数を使用することで線形分離不可能なデータも分類することができるようになります。

1
2
3
4
5
6
# モデルの作成
model = Sequential()
model.add(Dense(256, activation='sigmoid', input_shape=(784,))) # 入力層
model.add(Dense(128, activation='sigmoid')) # 隠れ層
model.add(Dropout(rate=0.5)) # ドロップアウト
model.add(Dense(10, activation='softmax')) # 出力層

ニューラルネットワークのモデルをコンパイルします。

  • 損失関数 [loss]
    モデルの予測値と正解データの誤差を計算する関数です。
  • 最適化関数 [optimizer]
    損失関数の結果が0に近づくように重みパラメータとバイアスを最適化する関数です。
  • 評価指標 [metrics]
    モデル性能を測定するための指標です。測定結果は、学習を行うfit()の戻り値に格納されます。
1
2
# コンパイル
model.compile(loss='categorical_crossentropy', optimizer=SGD(lr=0.1), metrics=['acc'])

訓練画像と訓練モデルを使って学習を実行します。

1
2
# 学習
history = model.fit(train_images, train_labels, batch_size=500, epochs=5, validation_split=0.2)

実行結果7

学習中に出力される情報の意味は次の通りです。

情報 説明
loss 訓練データの誤差です。0に近いほどよい結果となります。
acc 訓練データの正解率です。1に近いほどよい結果となります。
val_loss 検証データの誤差です。0に近いほどよい結果となります。
val_acc 検証データの正解率です。1に近いほどよい結果となります。

上記のデータうち、訓練データの正解率(acc)と検証データの正解率(val_acc)をグラフ表示します。

1
2
3
4
5
6
7
# グラフの表示
plt.plot(history.history['acc'], label='acc')
plt.plot(history.history['val_acc'], label='val_acc')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(loc='best')
plt.show()

実行結果8

テスト画像とテストラベルをモデルに渡して評価を行います。

1
2
3
# 評価
test_loss, test_acc = model.evaluate(test_images, test_labels)
print('loss: {:.3f}\nacc: {:.3f}'.format(test_loss, test_acc ))

実行結果9

正解率は91.0%となりました。

先頭10件のテスト画像の推論を行い、画像データと予測結果を合わせて表示します。

1
2
3
4
5
6
7
8
9
10
# 推論する画像の表示
for i in range(10):
plt.subplot(1, 10, i+1)
plt.imshow(test_images[i].reshape((28, 28)), 'gray')
plt.show()

# 推論したラベルの表示
test_predictions = model.predict(test_images[0:10])
test_predictions = np.argmax(test_predictions, axis=1)
print(test_predictions)

実行結果10

90%の正解率であることが確認できます。

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

逆強化学習

逆強化学習では報酬関数を推定します。そのメリットは次の通りです。

  • 人が報酬を設定する必要がない。
  • 他タスクでの利用が可能になる。
  • 人間や動物の行動理解に利用できる。

逆強化学習の手順は以下のようになります。

  1. エキスパートの行動を評価する。(戦略、状態遷移等)
  2. 報酬関数の初期化を行う。
  3. 報酬関数を利用し戦略を学習する。
  4. 学習した戦略の評価が、エキスパートの評価結果(手順1)と近くなるよう報酬関数を更新する。
  5. 手順3に戻り処理を繰り返す。

逆強化学習のデメリットとしては学習に時間がかかるということです。
通常の強化学習手順である手順3だけでも時間がかかるところを、逆強化学習ではその手順3を繰り返し行う必要があるためです。

強化学習の弱点

強化学習にはいくつか弱点があります。

弱点① サンプル効率が悪い。

DNNでの強化学習は、入力が画像であればいろいろな課題に対応できるという大きなメリットがあります。どのようなゲームであっても行動の数を調整するだけで同じネットワークで解くことができます。

ただし学習には大量のサンプルを用意する必要があり時間がかかります。

対策の1つとして何度もプレイ可能なシミュレーターを用意するという手がありますが、シミュレータの用意も相応の時間がかかるため深層強化学習の適用は難しいのが現状です。

OpenAI Gymのようなすでに用意されているシミュレータを使うのが近道となります。

弱点② 局所的な行動に陥る・過学習することが多い

大量のサンプルを学習したとしても、最適な行動を獲得できるとは限りません。

失敗する行動パターンは次の2種類です。

  • 局所最適な行動
    報酬は獲得できているものの最適とは言えない行動です。
    そこそこいい成績であればそれ以上頑張らないようなイメージです。
  • 過学習
    ある環境に特化した行動を獲得してしまうことです。
    試験で問題を理解するのではなく答えを覚えてしまうようなイメージです。

弱点③ 再現性が低い。

同じアルゴリズムを同じパラメータで学習したとしても獲得報酬が異なることが多々あります。
これはフレームワークのデフォルト値が影響するためです。

☆弱点を克服するための対策

対応策は次の3点が挙げられます。

  • テスト可能なモジュールに切り分ける。
    各モジュールごとにテストを行うことで全体テストの前に処理をブラッシュアップします。
    あるいはエージェントを複数用意して、エージェントを切り替えて効率的な学習を目指します。
  • 可能な限りログをとる。
    一度の実験からなるべく多くの情報を得るためです。
  • 学習を自動化する。
    学習の実行をスクリプト化し、設定パラメータや実行結果の明確化を目指します。

できるだけ事前に動作確認を行ったうえで実験を行い、可能な限り取得した動作ログからベストの対策を検討・実装したうえで何度もテストを行い、このプロセスを繰り返すことで強化学習の弱点を克服していきます。

強化学習の概要

強化学習では、「エージェント」がある「環境」の中で「行動」し、その行動から得られる「報酬」が最大化するような「推論モデル」を作成します。
推論モデルがあれば学習した状態で「環境」の中を「行動」することができます。

強化学習のサイクルを簡単にまとめると下記のようになります。

  1. エージェントが環境に対して行動を起こします。
  2. 環境が状態の更新と行動の評価を行います。
  3. 状態と報酬をエージェントに知らせます。

強化学習のポイントとなる用語を下記にまとめます。

用語 説明
エージェント - Agent 環境のなかでいろいろと行動し学習を行います。さまざまな試行を行い状態ごとに行動を最適化していきます。
環境 - Environment 行動に対して、状態の更新と行動の評価を行います。
行動 - Action エージェントがいろいろな状態で起こすことができる行動です。
状態 - State 環境の状態です。行動によって変化します。
報酬 - Reward 行動すると得られる報酬です。いい結果のときは正の報酬が得られ、悪い結果のときには負の報酬となります。

深層強化学習 ブロック崩し(breakout)をA2Cで攻略 -プレイ編-

前回学習したデータを使ってブロック崩し(breakout)をプレイします。

まず必要なパッケージをインポートします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# パッケージのimport
import numpy as np
from collections import deque
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import gym
from gym import spaces
from gym.spaces.box import Box

import matplotlib.pyplot as plt
%matplotlib inline

動画ファイルを保存する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0*1, frames[0].shape[0]/72.0*1),
dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=20)

anim.save('7breakout_play.mp4') # 動画のファイル名と保存です

実行環境を設定します。
おさらいとなりますがブロック崩し(breakout)の学習には4つの工夫をします。

  1. No-Operation
    実行環境をリセットするときに0~30ステップのいずれかの間何もしない行動を実施します。
    => ゲーム開始の初期状態を様々にし、特定の開始情報に特化しないようにするためです。
  2. Episodic Life
    5機ライフがありますが、1回失敗したときにゲーム終了とします。
    ただし崩したブロックはそのままの状態で次の試行を開始するようにします。
    => 多様な状態に対して学習ができるようにするためです。
  3. Max and Skip
    4フレームごとに行動を判断させ、4フレーム連続で同じ行動をするようにします。
    => 60Hzでゲームが進行すると早すぎるためエージェントの行動を15Hzにするためです。
  4. Warp frame
    縦210ピクセル、横160ピクセルのRGB値を縦横84ピクセルずつのグレースケール画像へと変換します。
    => 学習しやすくするためです。

また上記の4工夫とPyTorch環境に合わせるためのクラスWrapPyTorchを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# 実行環境の設定
import cv2
cv2.ocl.setUseOpenCL(False)

class NoopResetEnv(gym.Wrapper):
def __init__(self, env, noop_max=30):
'''工夫1のNo-Operationです。リセット後適当なステップの間何もしないようにし、
ゲーム開始の初期状態を様々にすることfで、特定の開始状態のみで学習するのを防ぐ'''
gym.Wrapper.__init__(self, env)
self.noop_max = noop_max
self.override_num_noops = None
self.noop_action = 0
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'

def reset(self, **kwargs):
""" Do no-op action for a number of steps in [1, noop_max]."""
self.env.reset(**kwargs)
if self.override_num_noops is not None:
noops = self.override_num_noops
else:
noops = self.unwrapped.np_random.randint(1, self.noop_max + 1) # pylint: disable=E1101
assert noops > 0
obs = None
for _ in range(noops):
obs, _, done, _ = self.env.step(self.noop_action)
if done:
obs = self.env.reset(**kwargs)
return obs

def step(self, ac):
return self.env.step(ac)

class EpisodicLifeEnv(gym.Wrapper):
def __init__(self, env):
'''工夫2のEpisodic Lifeです。1機失敗したときにリセットし、失敗時の状態から次を始める'''
gym.Wrapper.__init__(self, env)
self.lives = 0
self.was_real_done = True

def step(self, action):
obs, reward, done, info = self.env.step(action)
self.was_real_done = done
# check current lives, make loss of life terminal,
# then update lives to handle bonus lives
lives = self.env.unwrapped.ale.lives()
if lives < self.lives and lives > 0:
# for Qbert sometimes we stay in lives == 0 condtion for a few frames
# so its important to keep lives > 0, so that we only reset once
# the environment advertises done.
done = True
self.lives = lives
return obs, reward, done, info

def reset(self, **kwargs):
'''5機とも失敗したら、本当にリセット'''
if self.was_real_done:
obs = self.env.reset(**kwargs)
else:
# no-op step to advance from terminal/lost life state
obs, _, _, _ = self.env.step(0)
self.lives = self.env.unwrapped.ale.lives()
return obs

class MaxAndSkipEnv(gym.Wrapper):
def __init__(self, env, skip=4):
'''工夫3のMax and Skipです。4フレーム連続で同じ行動を実施し、最後の3、4フレームの最大値をとった画像をobsにする'''
gym.Wrapper.__init__(self, env)
# most recent raw observations (for max pooling across time steps)
self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype=np.uint8)
self._skip = skip

def step(self, action):
"""Repeat action, sum reward, and max over last observations."""
total_reward = 0.0
done = None
for i in range(self._skip):
obs, reward, done, info = self.env.step(action)
if i == self._skip - 2:
self._obs_buffer[0] = obs
if i == self._skip - 1:
self._obs_buffer[1] = obs
total_reward += reward
if done:
break
# Note that the observation on the done=True frame
# doesn't matter
max_frame = self._obs_buffer.max(axis=0)
return max_frame, total_reward, done, info

def reset(self, **kwargs):
return self.env.reset(**kwargs)

class WarpFrame(gym.ObservationWrapper):
def __init__(self, env):
'''工夫4のWarp frameです。画像サイズをNatureのDQN論文と同じ84x84の白黒にします'''
gym.ObservationWrapper.__init__(self, env)
self.width = 84
self.height = 84
self.observation_space = spaces.Box(low=0, high=255,
shape=(self.height, self.width, 1), dtype=np.uint8)

def observation(self, frame):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(frame, (self.width, self.height),
interpolation=cv2.INTER_AREA)
return frame[:, :, None]

class WrapPyTorch(gym.ObservationWrapper):
def __init__(self, env=None):
'''PyTorchのミニバッチのインデックス順に変更するラッパー'''
super(WrapPyTorch, self).__init__(env)
obs_shape = self.observation_space.shape
self.observation_space = Box(
self.observation_space.low[0, 0, 0],
self.observation_space.high[0, 0, 0],
[obs_shape[2], obs_shape[1], obs_shape[0]],
dtype=self.observation_space.dtype)

def observation(self, observation):
return observation.transpose(2, 0, 1)

再生用の実行環境を実装します。

  • EpisodicLifeEnvPlay 関数で1度でも失敗したらブロックの状態を最初から完全になり直します。
  • MaxAndSkipEnvPlay クラスで4フレーム目だけを画像として出力します。
  • make_env_play 関数で実行環境を生成します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 再生用の実行環境
class EpisodicLifeEnvPlay(gym.Wrapper):
def __init__(self, env):
'''工夫2のEpisodic Lifeです。1機失敗したときにリセットし、失敗時の状態から次を始める。
今回は再生用に、1機失敗したときのリセット時もブロックの状態をリセットします'''
gym.Wrapper.__init__(self, env)

def step(self, action):
obs, reward, done, info = self.env.step(action)
# ライフ(残機)が始め5あるが、1つでも減ると終了にする
if self.env.unwrapped.ale.lives() < 5:
done = True
return obs, reward, done, info

def reset(self, **kwargs):
'''1回でも失敗したら完全リセット'''
obs = self.env.reset(**kwargs)
return obs

class MaxAndSkipEnvPlay(gym.Wrapper):
def __init__(self, env, skip=4):
'''工夫3のMax and Skipです。4フレーム連続で同じ行動を実施し、最後の4フレームの画像をobsにする'''
gym.Wrapper.__init__(self, env)
# most recent raw observations (for max pooling across time steps)
self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype=np.uint8)
self._skip = skip

def step(self, action):
"""Repeat action, sum reward, and max over last observations."""
total_reward = 0.0
done = None
for i in range(self._skip):
obs, reward, done, info = self.env.step(action)
if i == self._skip - 2:
self._obs_buffer[0] = obs
if i == self._skip - 1:
self._obs_buffer[1] = obs
total_reward += reward
if done:
break

return obs, total_reward, done, info

def reset(self, **kwargs):
return self.env.reset(**kwargs)

# 実行環境生成関数の定義

# 並列実行環境
from baselines.common.vec_env.subproc_vec_env import SubprocVecEnv

def make_env(env_id, seed, rank):
def _thunk():
'''_thunk()がマルチプロセス環境のSubprocVecEnvを実行するのに必要'''

env = gym.make(env_id)
#env = NoopResetEnv(env, noop_max=30)
env = MaxAndSkipEnv(env, skip=4)
env.seed(seed + rank) # 乱数シードの設定
#env = EpisodicLifeEnv(env)
env = EpisodicLifeEnvPlay(env)
env = WarpFrame(env)
env = WrapPyTorch(env)

return env

return _thunk

def make_env_play(env_id, seed, rank):
'''再生用の実行環境'''
env = gym.make(env_id)
env = MaxAndSkipEnvPlay(env, skip=4)
env.seed(seed + rank) # 乱数シードの設定
env = EpisodicLifeEnvPlay(env)
return env

定数を設定します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 定数の設定
ENV_NAME = 'BreakoutNoFrameskip-v4'
# Breakout-v0ではなく、BreakoutNoFrameskip-v4を使用
# v0はフレームが自動的に2-4のランダムにskipされますが、今回はフレームスキップはさせないバージョンを使用
NUM_SKIP_FRAME = 4 # skipするframe数です
NUM_STACK_FRAME = 4 # 状態として連続的に保持するframe数です
NOOP_MAX = 30 # reset時に何もしないフレームを挟む(No-operation)フレーム数の乱数上限です
NUM_PROCESSES = 16 # 並列して同時実行するプロセス数です
NUM_ADVANCED_STEP = 5 # 何ステップ進めて報酬和を計算するのか設定
GAMMA = 0.99 # 時間割引率

TOTAL_FRAMES=10e6 # 学習に使用する総フレーム数
NUM_UPDATES = int(TOTAL_FRAMES / NUM_ADVANCED_STEP / NUM_PROCESSES) # ネットワークの総更新回数
# NUM_UPDATESは125,000となる

A2Cの損失関数の計算をするための定数を設定します。

1
2
3
4
5
6
7
8
9
# A2Cの損失関数の計算のための定数設定
value_loss_coef = 0.5
entropy_coef = 0.01
max_grad_norm = 0.5

# 学習手法RMSpropの設定
lr = 7e-4
eps = 1e-5
alpha = 0.99

GPU使用の設定を行います。(GPUがなくても実行できます。)

1
2
3
4
# GPUの使用の設定
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(device)

メモリオブジェクトの定義を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# メモリオブジェクトの定義
class RolloutStorage(object):
'''Advantage学習するためのメモリクラスです'''

def __init__(self, num_steps, num_processes, obs_shape):

self.observations = torch.zeros(
num_steps + 1, num_processes, *obs_shape).to(device)
# *を使うと()リストの中身を取り出す
# obs_shape→(4,84,84)
# *obs_shape→ 4 84 84

self.masks = torch.ones(num_steps + 1, num_processes, 1).to(device)
self.rewards = torch.zeros(num_steps, num_processes, 1).to(device)
self.actions = torch.zeros(
num_steps, num_processes, 1).long().to(device)

# 割引報酬和を格納
self.returns = torch.zeros(num_steps + 1, num_processes, 1).to(device)
self.index = 0 # insertするインデックス

def insert(self, current_obs, action, reward, mask):
'''次のindexにtransitionを格納する'''
self.observations[self.index + 1].copy_(current_obs)
self.masks[self.index + 1].copy_(mask)
self.rewards[self.index].copy_(reward)
self.actions[self.index].copy_(action)

self.index = (self.index + 1) % NUM_ADVANCED_STEP # インデックスの更新

def after_update(self):
'''Advantageするstep数が完了したら、最新のものをindex0に格納'''
self.observations[0].copy_(self.observations[-1])
self.masks[0].copy_(self.masks[-1])

def compute_returns(self, next_value):
'''Advantageするステップ中の各ステップの割引報酬和を計算する'''

# 注意:5step目から逆向きに計算しています
# 注意:5step目はAdvantage1となる。4ステップ目はAdvantage2となる。・・・
self.returns[-1] = next_value
for ad_step in reversed(range(self.rewards.size(0))):
self.returns[ad_step] = self.returns[ad_step + 1] * \
GAMMA * self.masks[ad_step + 1] + self.rewards[ad_step]

A2Cのディープ・ニューラルネットワークの構築を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# A2Cのディープ・ニューラルネットワークの構築
def init(module, gain):
'''層の結合パラメータを初期化する関数を定義'''
nn.init.orthogonal_(module.weight.data, gain=gain)
nn.init.constant_(module.bias.data, 0)
return module

class Flatten(nn.Module):
'''コンボリューション層の出力画像を1次元に変換する層を定義'''
def forward(self, x):
return x.view(x.size(0), -1)

class Net(nn.Module):
def __init__(self, n_out):
super(Net, self).__init__()

# 結合パラメータの初期化関数
def init_(module): return init(
module, gain=nn.init.calculate_gain('relu'))

# コンボリューション層の定義
self.conv = nn.Sequential(
# 画像サイズの変化84*84→20*20
init_(nn.Conv2d(NUM_STACK_FRAME, 32, kernel_size=8, stride=4)),
# stackするflameは4画像なのでinput=NUM_STACK_FRAME=4である、出力は32とする、
# sizeの計算 size = (Input_size - Kernel_size + 2*Padding_size)/ Stride_size + 1

nn.ReLU(),
# 画像サイズの変化20*20→9*9
init_(nn.Conv2d(32, 64, kernel_size=4, stride=2)),
nn.ReLU(),
init_(nn.Conv2d(64, 64, kernel_size=3, stride=1)), # 画像サイズの変化9*9→7*7
nn.ReLU(),
Flatten(), # 画像形式を1次元に変換
init_(nn.Linear(64 * 7 * 7, 512)), # 64枚の7×7の画像を、512次元のoutputへ
nn.ReLU()
)

# 結合パラメータの初期化関数
def init_(module): return init(module, gain=1.0)

# Criticの定義
self.critic = init_(nn.Linear(512, 1)) # 状態価値なので出力は1つ

# 結合パラメータの初期化関数
def init_(module): return init(module, gain=0.01)

# Actorの定義
self.actor = init_(nn.Linear(512, n_out)) # 行動を決めるので出力は行動の種類数

# ネットワークを訓練モードに設定
self.train()

def forward(self, x):
'''ネットワークのフォワード計算を定義します'''
input = x / 255.0 # 画像のピクセル値0-255を0-1に正規化する
conv_output = self.conv(input) # Convolution層の計算
critic_output = self.critic(conv_output) # 状態価値の計算
actor_output = self.actor(conv_output) # 行動の計算

return critic_output, actor_output

def act(self, x):
'''状態xから行動を確率的に求めます'''
value, actor_output = self(x)
probs = F.softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
action = probs.multinomial(num_samples=1)

return action

def get_value(self, x):
'''状態xから状態価値を求めます'''
value, actor_output = self(x)

return value

def evaluate_actions(self, x, actions):
'''状態xから状態価値、実際の行動actionsのlog確率とエントロピーを求めます'''
value, actor_output = self(x)

log_probs = F.log_softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
action_log_probs = log_probs.gather(1, actions) # 実際の行動のlog_probsを求める

probs = F.softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
dist_entropy = -(log_probs * probs).sum(-1).mean()

return value, action_log_probs, dist_entropy

エージェントが持つ頭脳となるクラスを定義します。
このクラスは全エージェントで共有されます。

前回の「学習編」で作成した学習データ weight_end.pth を使います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# エージェントが持つ頭脳となるクラスを定義、全エージェントで共有する
class Brain(object):
def __init__(self, actor_critic):

self.actor_critic = actor_critic # actor_criticはクラスNetのディープ・ニューラルネットワーク

# 結合パラメータをロードする場合
filename = 'weight_end.pth'
param = torch.load(filename, map_location='cpu')
self.actor_critic.load_state_dict(param)

# パラメータ更新の勾配法の設定
self.optimizer = optim.RMSprop(
actor_critic.parameters(), lr=lr, eps=eps, alpha=alpha)

def update(self, rollouts):
'''advanced計算した5つのstepの全てを使って更新します'''
obs_shape = rollouts.observations.size()[2:] # torch.Size([4, 84, 84])
num_steps = NUM_ADVANCED_STEP
num_processes = NUM_PROCESSES

values, action_log_probs, dist_entropy = self.actor_critic.evaluate_actions(
rollouts.observations[:-1].view(-1, *obs_shape),
rollouts.actions.view(-1, 1))

# 注意:各変数のサイズ
# rollouts.observations[:-1].view(-1, *obs_shape) torch.Size([80, 4, 84, 84])
# rollouts.actions.view(-1, 1) torch.Size([80, 1])
# values torch.Size([80, 1])
# action_log_probs torch.Size([80, 1])
# dist_entropy torch.Size([])

values = values.view(num_steps, num_processes,
1) # torch.Size([5, 16, 1])
action_log_probs = action_log_probs.view(num_steps, num_processes, 1)

advantages = rollouts.returns[:-1] - values # torch.Size([5, 16, 1])
value_loss = advantages.pow(2).mean()

action_gain = (advantages.detach() * action_log_probs).mean()
# detachしてadvantagesを定数として扱う

total_loss = (value_loss * value_loss_coef -
action_gain - dist_entropy * entropy_coef)

self.optimizer.zero_grad() # 勾配をリセット
total_loss.backward() # バックプロパゲーションを計算
nn.utils.clip_grad_norm_(self.actor_critic.parameters(), max_grad_norm)
# 一気に結合パラメータが変化しすぎないように、勾配の大きさは最大0.5までにする

self.optimizer.step() # 結合パラメータを更新

Breakoutを実行する環境のクラスを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Breakoutを実行する環境のクラス
NUM_PROCESSES = 1

class Environment:
def run(self):

# seedの設定
seed_num = 1
torch.manual_seed(seed_num)
if use_cuda:
torch.cuda.manual_seed(seed_num)

# 実行環境を構築
torch.set_num_threads(seed_num)
envs = [make_env(ENV_NAME, seed_num, i) for i in range(NUM_PROCESSES)]
envs = SubprocVecEnv(envs) # マルチプロセスの実行環境にする

# 全エージェントが共有して持つ頭脳Brainを生成
n_out = envs.action_space.n # 行動の種類は4
actor_critic = Net(n_out).to(device) # GPUへ
global_brain = Brain(actor_critic)

# 格納用変数の生成
obs_shape = envs.observation_space.shape # (1, 84, 84)
obs_shape = (obs_shape[0] * NUM_STACK_FRAME,
*obs_shape[1:]) # (4, 84, 84)
# torch.Size([16, 4, 84, 84])
current_obs = torch.zeros(NUM_PROCESSES, *obs_shape).to(device)
rollouts = RolloutStorage(
NUM_ADVANCED_STEP, NUM_PROCESSES, obs_shape) # rolloutsのオブジェクト
episode_rewards = torch.zeros([NUM_PROCESSES, 1]) # 現在の試行の報酬を保持
final_rewards = torch.zeros([NUM_PROCESSES, 1]) # 最後の試行の報酬和を保持

# 初期状態の開始
obs = envs.reset()
obs = torch.from_numpy(obs).float() # torch.Size([16, 1, 84, 84])
current_obs[:, -1:] = obs # flameの4番目に最新のobsを格納

# advanced学習用のオブジェクトrolloutsの状態の1つ目に、現在の状態を保存
rollouts.observations[0].copy_(current_obs)

# 描画用の環境(再生用に追加)
env_play = make_env_play(ENV_NAME, seed_num, 0)
obs_play = env_play.reset()

# 動画にするために画像を格納する変数(再生用に追加)
frames = []
main_end = False

# 実行ループ
for j in tqdm(range(NUM_UPDATES)):

# 報酬が基準を超えたら終わりにする(再生用に追加)
if main_end:
break

# advanced学習するstep数ごとに計算
for step in range(NUM_ADVANCED_STEP):

# 行動を求める
with torch.no_grad():
action = actor_critic.act(rollouts.observations[step])

cpu_actions = action.squeeze(1).cpu().numpy() # tensorをNumPyに

# 1stepの並列実行、なお返り値のobsのsizeは(16, 1, 84, 84)
obs, reward, done, info = envs.step(cpu_actions)

# 報酬をtensorに変換し、試行の総報酬に足す
# sizeが(16,)になっているのを(16, 1)に変換
reward = np.expand_dims(np.stack(reward), 1)
reward = torch.from_numpy(reward).float()
episode_rewards += reward

# 各実行環境それぞれについて、doneならmaskは0に、継続中ならmaskは1にする
masks = torch.FloatTensor(
[[0.0] if done_ else [1.0] for done_ in done])

# 最後の試行の総報酬を更新する
final_rewards *= masks # 継続中の場合は1をかけ算してそのまま、done時には0を掛けてリセット
# 継続中は0を足す、done時にはepisode_rewardsを足す
final_rewards += (1 - masks) * episode_rewards

# 画像を取得する(再生用に追加)
obs_play, reward_play, _, _ = env_play.step(cpu_actions[0])
frames.append(obs_play) # 変換した画像を保存
if done[0]: # 並列環境の1つ目が終了した場合
print(episode_rewards[0][0].numpy()) # 報酬

# 報酬が300を超えたら終わりにする
#if (episode_rewards[0][0].numpy()) > 300:
if (episode_rewards[0][0].numpy()) > 400:
main_end = True
break
else:
obs_view = env_play.reset()
frames = [] # 保存した画像をリセット

# 試行の総報酬を更新する
episode_rewards *= masks # 継続中のmaskは1なのでそのまま、doneの場合は0に

# masksをGPUへ
masks = masks.to(device)

# 現在の状態をdone時には全部0にする
# maskのサイズをtorch.Size([16, 1])→torch.Size([16, 1, 1 ,1])へ変換して、かけ算
current_obs *= masks.unsqueeze(2).unsqueeze(2)

# frameをstackする
# torch.Size([16, 1, 84, 84])
obs = torch.from_numpy(obs).float()
current_obs[:, :-1] = current_obs[:, 1:] # 0~2番目に1~3番目を上書き
current_obs[:, -1:] = obs # 4番目に最新のobsを格納

# メモリオブジェクトに今stepのtransitionを挿入
rollouts.insert(current_obs, action.data, reward, masks)

# advancedのfor loop終了

# advancedした最終stepの状態から予想する状態価値を計算
with torch.no_grad():
next_value = actor_critic.get_value(
rollouts.observations[-1]).detach()

# 全stepの割引報酬和を計算して、rolloutsの変数returnsを更新
rollouts.compute_returns(next_value)

# ネットワークとrolloutの更新
# global_brain.update(rollouts)
rollouts.after_update()

# 実行ループ終わり
display_frames_as_gif(frames) # 動画の保存と再生

実行します。

1
2
3
# 実行
breakout_env = Environment()
frames = breakout_env.run()

実行結果(途中略)

出力される動画ファイル 7breakout_play.mp4 は下記のようになります。

うまく壁の端からブロックを消して裏側に通したくさんのブロックを崩しています。
画面上部の方のブロックを消した方が高得点なので、報酬を-1から1にクリッピングしない方が裏側に通すように学習しやすくなります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

深層強化学習 ブロック崩し(breakout)をA2Cで攻略 -学習編-

ブロック崩し(breakout)を強化学習A2Cで攻略していきます。

まずOpenAI Gymの環境を並列で動かすために必要なパッケージをインストールします。

1
2
3
4
5
pip install tqdm
pip install opencv-python
git clone https://github.com/openai/baselines.git
cd baselines
pip install -e .

使用するパッケージをインポートします。

1
2
3
4
5
6
7
8
9
10
11
12
13
# パッケージのimport
import numpy as np
from collections import deque
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import gym
from gym import spaces
from gym.spaces.box import Box

ブロック崩し(breakout)の学習には4つの工夫をします。

  1. No-Operation
    実行環境をリセットするときに0~30ステップのいずれかの間何もしない行動を実施します。
    => ゲーム開始の初期状態を様々にし、特定の開始情報に特化しないようにするためです。
  2. Episodic Life
    5機ライフがありますが、1回失敗したときにゲーム終了とします。
    ただし崩したブロックはそのままの状態で次の試行を開始するようにします。
    => 多様な状態に対して学習ができるようにするためです。
  3. Max and Skip
    4フレームごとに行動を判断させ、4フレーム連続で同じ行動をするようにします。
    => 60Hzでゲームが進行すると早すぎるためエージェントの行動を15Hzにするためです。
  4. Warp frame
    縦210ピクセル、横160ピクセルのRGB値を縦横84ピクセルずつのグレースケール画像へと変換します。
    => 学習しやすくするためです。

また上記の4工夫とPyTorch環境に合わせるためのクラスWrapPyTorchを定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# 実行環境の設定
import cv2
cv2.ocl.setUseOpenCL(False)

class NoopResetEnv(gym.Wrapper):
def __init__(self, env, noop_max=30):
'''工夫1のNo-Operationです。リセット後適当なステップの間何もしないようにし、
ゲーム開始の初期状態を様々にすることfで、特定の開始状態のみで学習するのを防ぐ'''
gym.Wrapper.__init__(self, env)
self.noop_max = noop_max
self.override_num_noops = None
self.noop_action = 0
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'

def reset(self, **kwargs):
""" Do no-op action for a number of steps in [1, noop_max]."""
self.env.reset(**kwargs)
if self.override_num_noops is not None:
noops = self.override_num_noops
else:
noops = self.unwrapped.np_random.randint(
1, self.noop_max + 1) # pylint: disable=E1101
assert noops > 0
obs = None
for _ in range(noops):
obs, _, done, _ = self.env.step(self.noop_action)
if done:
obs = self.env.reset(**kwargs)
return obs

def step(self, ac):
return self.env.step(ac)

class EpisodicLifeEnv(gym.Wrapper):
def __init__(self, env):
'''工夫2のEpisodic Lifeです。1機失敗したときにリセットし、失敗時の状態から次を始める'''
gym.Wrapper.__init__(self, env)
self.lives = 0
self.was_real_done = True

def step(self, action):
obs, reward, done, info = self.env.step(action)
self.was_real_done = done
# check current lives, make loss of life terminal,
# then update lives to handle bonus lives
lives = self.env.unwrapped.ale.lives()
if lives < self.lives and lives > 0:
# for Qbert sometimes we stay in lives == 0 condtion for a few frames
# so its important to keep lives > 0, so that we only reset once
# the environment advertises done.
done = True
self.lives = lives
return obs, reward, done, info

def reset(self, **kwargs):
'''5機とも失敗したら、本当にリセット'''
if self.was_real_done:
obs = self.env.reset(**kwargs)
else:
# no-op step to advance from terminal/lost life state
obs, _, _, _ = self.env.step(0)
self.lives = self.env.unwrapped.ale.lives()
return obs

class MaxAndSkipEnv(gym.Wrapper):
def __init__(self, env, skip=4):
'''工夫3のMax and Skipです。4フレーム連続で同じ行動を実施し、最後の3、4フレームの最大値をとった画像をobsにする'''
gym.Wrapper.__init__(self, env)
# most recent raw observations (for max pooling across time steps)
self._obs_buffer = np.zeros(
(2,)+env.observation_space.shape, dtype=np.uint8)
self._skip = skip

def step(self, action):
"""Repeat action, sum reward, and max over last observations."""
total_reward = 0.0
done = None
for i in range(self._skip):
obs, reward, done, info = self.env.step(action)
if i == self._skip - 2:
self._obs_buffer[0] = obs
if i == self._skip - 1:
self._obs_buffer[1] = obs
total_reward += reward
if done:
break
# Note that the observation on the done=True frame
# doesn't matter
max_frame = self._obs_buffer.max(axis=0)

return max_frame, total_reward, done, info

def reset(self, **kwargs):
return self.env.reset(**kwargs)

class WarpFrame(gym.ObservationWrapper):
def __init__(self, env):
'''工夫4のWarp frameです。画像サイズをNatureのDQN論文と同じ84x84の白黒にします'''
gym.ObservationWrapper.__init__(self, env)
self.width = 84
self.height = 84
self.observation_space = spaces.Box(low=0, high=255, shape=(self.height, self.width, 1), dtype=np.uint8)

def observation(self, frame):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
return frame[:, :, None]

class WrapPyTorch(gym.ObservationWrapper):
def __init__(self, env=None):
'''PyTorchのミニバッチのインデックス順に変更するラッパー'''
super(WrapPyTorch, self).__init__(env)
obs_shape = self.observation_space.shape
self.observation_space = Box(
self.observation_space.low[0, 0, 0],
self.observation_space.high[0, 0, 0],
[obs_shape[2], obs_shape[1], obs_shape[0]],
dtype=self.observation_space.dtype)

def observation(self, observation):
return observation.transpose(2, 0, 1)

マルチプロセルでBreakoutを並列実行する環境を生成する関数make_envを定義します。
OpenAIが用意しているマルチプロセス環境であるクラスSubprocVecEnvを使用します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 実行環境生成関数の定義
# 並列実行環境
from baselines.common.vec_env.subproc_vec_env import SubprocVecEnv

def make_env(env_id, seed, rank):
def _thunk():
'''_thunk()がマルチプロセス環境のSubprocVecEnvを実行するのに必要'''
env = gym.make(env_id)
env = NoopResetEnv(env, noop_max=30)
env = MaxAndSkipEnv(env, skip=4)
env.seed(seed + rank) # 乱数シードの設定
env = EpisodicLifeEnv(env)
env = WarpFrame(env)
env = WrapPyTorch(env)

return env

return _thunk

定数を設定します。

Breakout-v0ですとフレームが自動的に2~4のランダムにskipされるため、今回はフレームスキップはさせないBreakoutNoFrameskip-v4を使用します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 定数の設定
ENV_NAME = 'BreakoutNoFrameskip-v4'
NUM_SKIP_FRAME = 4 # skipするframe数です
NUM_STACK_FRAME = 4 # 状態として連続的に保持するframe数です
NOOP_MAX = 30 # reset時に何もしないフレームを挟む(No-operation)フレーム数の乱数上限です
NUM_PROCESSES = 16 # 並列して同時実行するプロセス数です
NUM_ADVANCED_STEP = 5 # 何ステップ進めて報酬和を計算するのか設定
GAMMA = 0.99 # 時間割引率

TOTAL_FRAMES=10e6 # 学習に使用する総フレーム数
NUM_UPDATES = int(TOTAL_FRAMES / NUM_ADVANCED_STEP / NUM_PROCESSES) # ネットワークの総更新回数
# NUM_UPDATESは125,000となる

# A2Cの損失関数の計算のための定数設定
value_loss_coef = 0.5
entropy_coef = 0.01
max_grad_norm = 0.5

# 学習手法RMSpropの設定
lr = 7e-4
eps = 1e-5
alpha = 0.99

GPU使用の設定を行います。
GPU環境があれば cuda が出力されますが、そうでない場合は cpu が出力されます。

1
2
3
4
# GPUの使用の設定
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(device)

Advantage学習するためのメモリクラスを定義します。
.to(device)を使用して、GPU環境がある場合には自動的にGPUを使えるようにしています。
PyTorchではCPU環境とGPU環境を意識せずに同じプログラムをどちらの環境でも実行できるのが便利です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# メモリオブジェクトの定義
class RolloutStorage(object):
'''Advantage学習するためのメモリクラスです'''
def __init__(self, num_steps, num_processes, obs_shape):

self.observations = torch.zeros(
num_steps + 1, num_processes, *obs_shape).to(device)
# *を使うと()リストの中身を取り出す
# obs_shape→(4,84,84)
# *obs_shape→ 4 84 84

self.masks = torch.ones(num_steps + 1, num_processes, 1).to(device)
self.rewards = torch.zeros(num_steps, num_processes, 1).to(device)
self.actions = torch.zeros(num_steps, num_processes, 1).long().to(device)

# 割引報酬和を格納
self.returns = torch.zeros(num_steps + 1, num_processes, 1).to(device)
self.index = 0 # insertするインデックス

def insert(self, current_obs, action, reward, mask):
'''次のindexにtransitionを格納する'''
self.observations[self.index + 1].copy_(current_obs)
self.masks[self.index + 1].copy_(mask)
self.rewards[self.index].copy_(reward)
self.actions[self.index].copy_(action)

self.index = (self.index + 1) % NUM_ADVANCED_STEP # インデックスの更新

def after_update(self):
'''Advantageするstep数が完了したら、最新のものをindex0に格納'''
self.observations[0].copy_(self.observations[-1])
self.masks[0].copy_(self.masks[-1])

def compute_returns(self, next_value):
'''Advantageするステップ中の各ステップの割引報酬和を計算する'''

# 注意:5step目から逆向きに計算しています
# 注意:5step目はAdvantage1となる。4ステップ目はAdvantage2となる。・・・
self.returns[-1] = next_value
for ad_step in reversed(range(self.rewards.size(0))):
self.returns[ad_step] = self.returns[ad_step + 1] * \
GAMMA * self.masks[ad_step + 1] + self.rewards[ad_step]

A2Cのディープ・ニューラルネットワークの構築を実装します。

コンボリューション層の定義のNUM_STACK_FRAME(=4)は、過去4フレーム分の画像を使って1つの状態として扱いニューラルネットワーク入力とすることを意味します。
1つのフレームではボールの位置しか分かりませんが、2フレームあれば速度が分かり、3フレームあれば加速度が分かるようになります。
今回はDQNのNature論文に合わせて4フレームとしています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# A2Cのディープ・ニューラルネットワークの構築
def init(module, gain):
'''層の結合パラメータを初期化する関数を定義'''
nn.init.orthogonal_(module.weight.data, gain=gain)
nn.init.constant_(module.bias.data, 0)
return module

class Flatten(nn.Module):
'''コンボリューション層の出力画像を1次元に変換する層を定義'''
def forward(self, x):
return x.view(x.size(0), -1)

class Net(nn.Module):
def __init__(self, n_out):
super(Net, self).__init__()

# 結合パラメータの初期化関数
def init_(module): return init(
module, gain=nn.init.calculate_gain('relu'))

# コンボリューション層の定義
self.conv = nn.Sequential(
# 画像サイズの変化84*84→20*20
init_(nn.Conv2d(NUM_STACK_FRAME, 32, kernel_size=8, stride=4)),
# stackするflameは4画像なのでinput=NUM_STACK_FRAME=4である、出力は32とする、
# sizeの計算 size = (Input_size - Kernel_size + 2*Padding_size)/ Stride_size + 1

nn.ReLU(),
# 画像サイズの変化20*20→9*9
init_(nn.Conv2d(32, 64, kernel_size=4, stride=2)),
nn.ReLU(),
init_(nn.Conv2d(64, 64, kernel_size=3, stride=1)), # 画像サイズの変化9*9→7*7
nn.ReLU(),
Flatten(), # 画像形式を1次元に変換
init_(nn.Linear(64 * 7 * 7, 512)), # 64枚の7×7の画像を、512次元のoutputへ
nn.ReLU()
)

# 結合パラメータの初期化関数
def init_(module): return init(module, gain=1.0)

# Criticの定義
self.critic = init_(nn.Linear(512, 1)) # 状態価値なので出力は1つ

# 結合パラメータの初期化関数
def init_(module): return init(module, gain=0.01)

# Actorの定義
self.actor = init_(nn.Linear(512, n_out)) # 行動を決めるので出力は行動の種類数

# ネットワークを訓練モードに設定
self.train()

def forward(self, x):
'''ネットワークのフォワード計算を定義します'''
input = x / 255.0 # 画像のピクセル値0-255を0-1に正規化する
conv_output = self.conv(input) # Convolution層の計算
critic_output = self.critic(conv_output) # 状態価値の計算
actor_output = self.actor(conv_output) # 行動の計算

return critic_output, actor_output

def act(self, x):
'''状態xから行動を確率的に求めます'''
value, actor_output = self(x)
# 190324
# self(x)の動作について、以下のリンクの最下部のFAQに解説を補足しました。
# https://github.com/YutaroOgawa/Deep-Reinforcement-Learning-Book
probs = F.softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
action = probs.multinomial(num_samples=1)
return action

def get_value(self, x):
'''状態xから状態価値を求めます'''
value, actor_output = self(x)
# 190324
# self(x)の動作について、以下のリンクの最下部のFAQに解説を補足しました。
# https://github.com/YutaroOgawa/Deep-Reinforcement-Learning-Book
return value

def evaluate_actions(self, x, actions):
'''状態xから状態価値、実際の行動actionsのlog確率とエントロピーを求めます'''
value, actor_output = self(x)
# 190324
# self(x)の動作について、以下のリンクの最下部のFAQに解説を補足しました。
# https://github.com/YutaroOgawa/Deep-Reinforcement-Learning-Book

log_probs = F.log_softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
action_log_probs = log_probs.gather(1, actions) # 実際の行動のlog_probsを求める

probs = F.softmax(actor_output, dim=1) # dim=1で行動の種類方向に計算
dist_entropy = -(log_probs * probs).sum(-1).mean()

return value, action_log_probs, dist_entropy

Brainクラスを定義します。
勾配降下法にはRMSpropを使用します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# エージェントが持つ頭脳となるクラスを定義、全エージェントで共有する
class Brain(object):
def __init__(self, actor_critic):

self.actor_critic = actor_critic # actor_criticはクラスNetのディープ・ニューラルネットワーク

# 結合パラメータをロードする場合
#filename = 'weight.pth'
#param = torch.load(filename, map_location='cpu')
# self.actor_critic.load_state_dict(param)

# パラメータ更新の勾配法の設定
self.optimizer = optim.RMSprop(
actor_critic.parameters(), lr=lr, eps=eps, alpha=alpha)

def update(self, rollouts):
'''advanced計算した5つのstepの全てを使って更新します'''
obs_shape = rollouts.observations.size()[2:] # torch.Size([4, 84, 84])
num_steps = NUM_ADVANCED_STEP
num_processes = NUM_PROCESSES

values, action_log_probs, dist_entropy = self.actor_critic.evaluate_actions(
rollouts.observations[:-1].view(-1, *obs_shape),
rollouts.actions.view(-1, 1))

# 注意:各変数のサイズ
# rollouts.observations[:-1].view(-1, *obs_shape) torch.Size([80, 4, 84, 84])
# rollouts.actions.view(-1, 1) torch.Size([80, 1])
# values torch.Size([80, 1])
# action_log_probs torch.Size([80, 1])
# dist_entropy torch.Size([])

values = values.view(num_steps, num_processes, 1) # torch.Size([5, 16, 1])
action_log_probs = action_log_probs.view(num_steps, num_processes, 1)

advantages = rollouts.returns[:-1] - values # torch.Size([5, 16, 1])
value_loss = advantages.pow(2).mean()

action_gain = (advantages.detach() * action_log_probs).mean()
# detachしてadvantagesを定数として扱う

total_loss = (value_loss * value_loss_coef -
action_gain - dist_entropy * entropy_coef)

self.optimizer.zero_grad() # 勾配をリセット
total_loss.backward() # バックプロパゲーションを計算
nn.utils.clip_grad_norm_(self.actor_critic.parameters(), max_grad_norm)
# 一気に結合パラメータが変化しすぎないように、勾配の大きさは最大0.5までにする

self.optimizer.step() # 結合パラメータを更新

実行環境のクラス Environment を定義します。

  • 入力データは画像となります。4フレームで1つの状態を表します。
  • マルチプロセル環境 SuvprocVecEnv を使用しているのでエージェントごとのforループ処理は必要ありません。
  • 実行ループ100回ごとに得点を出力します。この出力で学習状態を確認します。
  • 定期的に結合パラメータを保存します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Breakoutを実行する環境のクラス
class Environment:
def run(self):
# seedの設定
seed_num = 1
torch.manual_seed(seed_num)
if use_cuda:
torch.cuda.manual_seed(seed_num)

# 実行環境を構築
torch.set_num_threads(seed_num)
envs = [make_env(ENV_NAME, seed_num, i) for i in range(NUM_PROCESSES)]
envs = SubprocVecEnv(envs) # マルチプロセスの実行環境にする

# 全エージェントが共有して持つ頭脳Brainを生成
n_out = envs.action_space.n # 行動の種類は4
actor_critic = Net(n_out).to(device) # GPUへ
global_brain = Brain(actor_critic)

# 格納用変数の生成
obs_shape = envs.observation_space.shape # (1, 84, 84)
obs_shape = (obs_shape[0] * NUM_STACK_FRAME,
*obs_shape[1:]) # (4, 84, 84)
# torch.Size([16, 4, 84, 84])
current_obs = torch.zeros(NUM_PROCESSES, *obs_shape).to(device)
rollouts = RolloutStorage(
NUM_ADVANCED_STEP, NUM_PROCESSES, obs_shape) # rolloutsのオブジェクト
episode_rewards = torch.zeros([NUM_PROCESSES, 1]) # 現在の試行の報酬を保持
final_rewards = torch.zeros([NUM_PROCESSES, 1]) # 最後の試行の報酬和を保持

# 初期状態の開始
obs = envs.reset()
obs = torch.from_numpy(obs).float() # torch.Size([16, 1, 84, 84])
current_obs[:, -1:] = obs # flameの4番目に最新のobsを格納

# advanced学習用のオブジェクトrolloutsの状態の1つ目に、現在の状態を保存
rollouts.observations[0].copy_(current_obs)

# 実行ループ
for j in tqdm(range(NUM_UPDATES)):
# advanced学習するstep数ごとに計算
for step in range(NUM_ADVANCED_STEP):

# 行動を求める
with torch.no_grad():
action = actor_critic.act(rollouts.observations[step])

cpu_actions = action.squeeze(1).cpu().numpy() # tensorをNumPyに

# 1stepの並列実行、なお返り値のobsのsizeは(16, 1, 84, 84)
obs, reward, done, info = envs.step(cpu_actions)

# 報酬をtensorに変換し、試行の総報酬に足す
# sizeが(16,)になっているのを(16, 1)に変換
reward = np.expand_dims(np.stack(reward), 1)
reward = torch.from_numpy(reward).float()
episode_rewards += reward

# 各実行環境それぞれについて、doneならmaskは0に、継続中ならmaskは1にする
masks = torch.FloatTensor(
[[0.0] if done_ else [1.0] for done_ in done])

# 最後の試行の総報酬を更新する
final_rewards *= masks # 継続中の場合は1をかけ算してそのまま、done時には0を掛けてリセット
# 継続中は0を足す、done時にはepisode_rewardsを足す
final_rewards += (1 - masks) * episode_rewards

# 試行の総報酬を更新する
episode_rewards *= masks # 継続中のmaskは1なのでそのまま、doneの場合は0に

# masksをGPUへ
masks = masks.to(device)

# 現在の状態をdone時には全部0にする
# maskのサイズをtorch.Size([16, 1])→torch.Size([16, 1, 1 ,1])へ変換して、かけ算
current_obs *= masks.unsqueeze(2).unsqueeze(2)

# frameをstackする
# torch.Size([16, 1, 84, 84])
obs = torch.from_numpy(obs).float()
current_obs[:, :-1] = current_obs[:, 1:] # 0~2番目に1~3番目を上書き
current_obs[:, -1:] = obs # 4番目に最新のobsを格納

# メモリオブジェクトに今stepのtransitionを挿入
rollouts.insert(current_obs, action.data, reward, masks)

# advancedのfor loop終了

# advancedした最終stepの状態から予想する状態価値を計算
with torch.no_grad():
next_value = actor_critic.get_value(
rollouts.observations[-1]).detach()

# 全stepの割引報酬和を計算して、rolloutsの変数returnsを更新
rollouts.compute_returns(next_value)

# ネットワークとrolloutの更新
global_brain.update(rollouts)
rollouts.after_update()

# ログ:途中経過の出力
if j % 100 == 0:
print("finished frames {}, mean/median reward {:.1f}/{:.1f}, min/max reward {:.1f}/{:.1f}".
format(j*NUM_PROCESSES*NUM_ADVANCED_STEP,
final_rewards.mean(),
final_rewards.median(),
final_rewards.min(),
final_rewards.max()))

# 結合パラメータの保存
if j % 12500 == 0:
torch.save(global_brain.actor_critic.state_dict(), 'weight_'+str(j)+'.pth')

# 実行ループの終了
torch.save(global_brain.actor_critic.state_dict(), 'weight_end.pth')

最後に実行します。

1
2
3
# 実行
breakout_env = Environment()
breakout_env.run()

実行途中結果

学習が完了すると学習データ weight_end.pth ファイルが出力されます。
次回はこの学習データを使ってBreakoutをプレイします。


参考 > つくりながら学ぶ!深層強化学習 サポートページ