強化学習 AlphaZero 2 (迷路ゲーム 方策勾配法)

迷路ゲームを方策勾配法で解いていきます。

方策勾配法では成功時の行動を重要と考え、その行動を多く取り入れる手法です。

  • 目的はゴールすること。
  • 状態は位置。
  • 行動は上下左右の4種類。
  • 報酬はゴールした行動を重要視。
  • パラメータ更新間隔は1エピソード(ゴールするまで)

学習手順は次の通りです。

  1. パラメータθを準備。
  2. パラメータθを方策に変換。
  3. 方策に従って、行動をゴールするまで繰り返す。
  4. 成功した行動を多く取り入れるようにパラメータθを更新する。
  5. 方策の変化量が閾値以下になるまで2~4を繰り返す。

パラメータθは深層学習の重みパラメータにあたるものです。

まずは必要なパッケージをインポートします。

1
2
3
4
5
6
# パッケージのインポート
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from matplotlib import animation
from IPython.display import HTML

迷路を作成します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 迷路の作成
fig = plt.figure(figsize=(3, 3))

# 壁
plt.plot([0, 3], [3, 3], color='k')
plt.plot([0, 3], [0, 0], color='k')
plt.plot([0, 0], [0, 2], color='k')
plt.plot([3, 3], [1, 3], color='k')
plt.plot([1, 1], [1, 2], color='k')
plt.plot([2, 3], [2, 2], color='k')
plt.plot([2, 1], [1, 1], color='k')
plt.plot([2, 2], [0, 1], color='k')

# 数字
for i in range(3):
for j in range(3):
plt.text(0.5+i, 2.5-j, str(i+j*3), size=20, ha='center', va='center')

# 円
circle, = plt.plot([0.5], [2.5], marker='o', color='#d3d3d3', markersize=40)

# 目盛りと枠の非表示
plt.tick_params(axis='both', which='both', bottom='off', top= 'off',
labelbottom='off', right='off', left='off', labelleft='off')
plt.box('off')

結果

パラメータθを準備します。
学習前は正しいパラメータθは不明なので、移動可能な方向は1、移動不可な方向はnp.nan(欠損値)で初期化します。
スタートのマスはインデックス0で、ゴールのマス(インデックス8)は存在しません。

1
2
3
4
5
6
7
8
9
10
# パラメータθの初期値の準備
theta_0 = np.array([
[np.nan, 1, 1, np.nan], # 0 上,右,下,左
[np.nan, 1, 1, 1], # 1
[np.nan, np.nan, np.nan, 1], # 2
[1, np.nan, 1, np.nan], # 3
[1, 1, np.nan, np.nan], # 4
[np.nan, np.nan, 1, 1], # 5
[1, 1, np.nan, np.nan], # 6
[np.nan, np.nan, np.nan, 1]]) # 7

パラメータθを方策に変換します。
変換関数にはソフトマックス関数を利用します。マスごとに合計を1になる実数値に落とし込む関数です。
例)[np,nan, 1, 1, np.nan] -> [0, 0.5, 0.5, 0]

1
2
3
4
5
6
7
8
9
10
# パラメータθを方策に変換
def get_pi(theta):
# ソフトマックス関数で変換
[m, n] = theta.shape
pi = np.zeros((m, n))
exp_theta = np.exp(theta)
for i in range(0, m):
pi[i, :] = exp_theta[i, :] / np.nansum(exp_theta[i, :])
pi = np.nan_to_num(pi)
return pi

パラメータθの初期値を方策に変換します。

1
2
3
# パラメータθの初期値を方策に変換
pi_0 = get_pi(theta_0)
print(pi_0)

列の合計が1になっていることが分かります。
結果

方策に従って行動を取得します。
np.random.choice()にて引数pの確率分布に従って、配列の要素をランダムに返します。

1
2
3
4
# 方策に従って行動を取得
def get_a(pi, s):
# 方策の確率に従って行動を返す
return np.random.choice([0, 1, 2, 3], p=pi[s])

行動に従って次の状態を取得する関数を定義します、
3 x 3 の迷路なので、左右移動は±1、上下移動は±3になります。

1
2
3
4
5
6
7
8
9
10
# 行動に従って次の状態を取得
def get_s_next(s, a):
if a == 0: # 上
return s - 3
elif a == 1: # 右
return s + 1
elif a == 2: # 下
return s + 3
elif a == 3: # 左
return s - 1

1エピソードを実行して、履歴を取得します。履歴は、[状態, 行動]のリストです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 1エピソード実行して履歴取得
def play(pi):
s = 0 # 状態
s_a_history = [[0, np.nan]] # 状態と行動の履歴

# エピソード完了までループ
while True:
# 方策に従って行動を取得
a = get_a(pi, s)

# 行動に従って次の状態を取得
s_next = get_s_next(s, a)

# 履歴の更新
s_a_history[-1][1] = a
s_a_history.append([s_next, np.nan])

# 終了判定
if s_next == 8:
break
else:
s = s_next

return s_a_history

1エピソードの実行と履歴を確認します。
ゴールまでにどのような経路をたどり、何ステップかかったかを把握することができます。

1
2
3
4
# 1エピソードの実行と履歴の確認
s_a_history = play(pi_0)
print(s_a_history)
print('1エピソードのステップ数:{}'.format(len(s_a_history)+1))

結果

パラメータθを更新します。
パラメータθに「学習係数」と「パラメータθの変化量」を掛けた値を加算します。
学習係数は1回の学習で更新される大きさです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def update_theta(theta, pi, s_a_history):
eta = 0.1 # 学習係数
total = len(s_a_history) - 1 # ゴールまでにかかった総ステップ数
[s_count, a_count] = theta.shape # 状態数, 行動数

# パラメータθの変化量の計算
delta_theta = theta.copy()
for i in range(0, s_count):
for j in range(0, a_count):
if not(np.isnan(theta[i, j])):
# ある状態である行動を採る回数
sa_ij = [sa for sa in s_a_history if sa == [i, j]]
n_ij = len(sa_ij)

# ある状態でなんらかの行動を採る回数
sa_i = [sa for sa in s_a_history if sa[0] == i]
n_i = len(sa_i)

# パラメータθの変化量
delta_theta[i, j] = (n_ij + pi[i, j] * n_i) / total

# パラメータθの更新
return theta + eta * delta_theta

エピソードを繰り返し実行して学習を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
stop_epsilon = 10**-3 # しきい値
theta = theta_0 # パラメータθ
pi = pi_0 # 方策

# エピソードを繰り返し実行して学習
for episode in range(1000):
# 1エピソード実行して履歴取得
s_a_history = play(pi)

# パラメータθの更新
theta = update_theta(theta, pi, s_a_history)

# 方策の更新
pi_new = get_pi(theta)

# 方策の変化量
pi_delta = np.sum(np.abs(pi_new-pi))
pi = pi_new

# 出力
print('エピソード: {}, ステップ: {}, 方策変化量: {:.4f}'.format(
episode, len(s_a_history)-1, pi_delta))

# 終了判定
if pi_delta < stop_epsilon: # 方策の変化量がしきい値以下
break

結果(エピソード0から27)
(中略)
結果(エピソード208から231)
ゴールへの最短ステップ数である4に少しずつ近づいているのが分かります。

最後の履歴をもとにアニメーション表示を行ってみます。

# アニメーションの定期処理を行う関数
def animate(i):
    state = s_a_history[i][0]
    circle.set_data((state % 3) + 0.5, 2.5 - int(state / 3))
    return circle

# アニメーションの表示
anim = animation.FuncAnimation(fig, animate, \
        frames=len(s_a_history), interval=1000, repeat=False)
HTML(anim.to_jshtml())```

{% youtube BUoy31PYMMY %}


([Google Colaboratory](https://colab.research.google.com/notebooks/welcome.ipynb)で動作確認しています。)

参考
> <cite>AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 <a href="https://www.borndigital.co.jp/book/14383.html" > サポートページ</a> </cite>