強化学習 AlphaZero 11 (三目並べ AlphaZero3)

AlphaZeroのベースとなるモンテカルロ木探索を作成します。

ニューラルネットワークで「方策」と「価値」を取得します。
「方策」はアーク評価値の計算、「価値」は累計価値の更新に利用します。

まず必要なパッケージをインポートします。

pv_mcts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# ====================
# モンテカルロ木探索の作成
# ====================

# パッケージのインポート
from game import State
from dual_network import DN_INPUT_SHAPE
from math import sqrt
from tensorflow.keras.models import load_model
from pathlib import Path
import numpy as np

# パラメータの準備
PV_EVALUATE_COUNT = 50 # 1推論あたりのシミュレーション回数(本家は1600)

ニューラルネットワークの推論を行います。

  1. 推論のために入力データのシェイプを変換します。
    自分の石の配置と相手の石の配置を(1, 3, 3, 2)に変換します。
  2. 推論を行います。
    model.predict()を使います。
  3. 方策を取得します。
    合法手のみ抽出し、合計で割って合法手のみの確率分布に変換します。
  4. 価値を取得します。
    価値の配列から値のみを抽出します。
pv_mcts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 推論
def predict(model, state):
# 推論のための入力テ゛ータのシェイフ゜の変換
a, b, c = DN_INPUT_SHAPE
x = np.array([state.pieces, state.enemy_pieces])
x = x.reshape(c, a, b).transpose(1, 2, 0).reshape(1, a, b, c)

# 推論
y = model.predict(x, batch_size=1)

# 方策の取得
policies = y[0][0][list(state.legal_actions())] # 合法手のみ
policies /= sum(policies) if sum(policies) else 1 # 合計1の確率分布に変換

# 価値の取得
value = y[1][0][0]
return policies, value

ノードのリストを試行回数のリストに変換します。

pv_mcts.py
1
2
3
4
5
6
# ノードのリストを試行回数のリストに変換
def nodes_to_scores(nodes):
scores = []
for c in nodes:
scores.append(c.n)
return scores

モンテカルロ木探索のスコアを取得します。

  1. モンテカルロ木探索のノードを定義します。
  2. 現在の局面のノードを作成します。
  3. 複数回の評価を実行します。

4.合法手の確率分布を返します。

pv_mcts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# モンテカルロ木探索のスコアの取得
def pv_mcts_scores(model, state, temperature):
# モンテカルロ木探索のノードの定義
class Node:
# ノードの初期化
def __init__(self, state, p):
self.state = state # 状態
self.p = p # 方策
self.w = 0 # 累計価値
self.n = 0 # 試行回数
self.child_nodes = None # 子ノード群

# 局面の価値の計算
def evaluate(self):
# ゲーム終了時
if self.state.is_done():
# 勝敗結果で価値を取得
value = -1 if self.state.is_lose() else 0

# 累計価値と試行回数の更新
self.w += value
self.n += 1
return value

# 子ノードが存在しない時
if not self.child_nodes:
# ニューラルネットワークの推論で方策と価値を取得
policies, value = predict(model, self.state)

# 累計価値と試行回数の更新
self.w += value
self.n += 1

# 子ノードの展開
self.child_nodes = []
for action, policy in zip(self.state.legal_actions(), policies):
self.child_nodes.append(Node(self.state.next(action), policy))
return value

# 子ノードが存在する時
else:
# アーク評価値が最大の子ノードの評価で価値を取得
value = -self.next_child_node().evaluate()

# 累計価値と試行回数の更新
self.w += value
self.n += 1
return value

# アーク評価値が最大の子ノードを取得
def next_child_node(self):
# アーク評価値の計算
C_PUCT = 1.0
t = sum(nodes_to_scores(self.child_nodes))
pucb_values = []
for child_node in self.child_nodes:
pucb_values.append((-child_node.w / child_node.n if child_node.n else 0.0) +
C_PUCT * child_node.p * sqrt(t) / (1 + child_node.n))

# アーク評価値が最大の子ノードを返す
return self.child_nodes[np.argmax(pucb_values)]

# 現在の局面のノードの作成
root_node = Node(state, 0)

# 複数回の評価の実行
for _ in range(PV_EVALUATE_COUNT):
root_node.evaluate()

# 合法手の確率分布
scores = nodes_to_scores(root_node.child_nodes)
if temperature == 0: # 最大値のみ1
action = np.argmax(scores)
scores = np.zeros(len(scores))
scores[action] = 1
else: # ボルツマン分布でバラつき付加
scores = boltzman(scores, temperature)
return scores

モンテカルロ木探索で行動を選択します。

pv_mcts.py
1
2
3
4
5
6
7
# モンテカルロ木探索で行動選択
def pv_mcts_action(model, temperature=0):
def pv_mcts_action(state):
scores = pv_mcts_scores(model, state, temperature)
return np.random.choice(state.legal_actions(), p=scores)
return pv_mcts_action

ボルツマン分布によるバラつきを付加します。

pv_mcts.py
1
2
3
4
5
# ボルツマン分布
def boltzman(xs, temperature):
xs = [x ** (1 / temperature) for x in xs]
return [x / sum(xs) for x in xs]

動作確認のためにモンテカルロ木探索行動選択「pv_mcts_action」を使ってゲーム終了まで実行します。

pv_mcts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 動作確認
if __name__ == '__main__':
# モデルの読み込み
path = sorted(Path('./model').glob('*.h5'))[-1]
model = load_model(str(path))

# 状態の生成
state = State()

# モンテカルロ木探索で行動取得を行う関数の生成
next_action = pv_mcts_action(model, 1.0)

# ゲーム終了までループ
while True:
# ゲーム終了時
if state.is_done():
break

# 行動の取得
action = next_action(state)

# 次の状態の取得
state = state.next(action)

# 文字列表示
print(state)

実行結果は下記の通りです。

結果

今回は引き分けとなりました。

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

強化学習 AlphaZero 10 (三目並べ AlphaZero2)

AlphaZeroでは局面に応じて「方策」「価値」を出力する「デュアルネットワーク」を使います。
ResNetの残差ブロックでゲームの盤面の特徴を抽出し、「ポリシー出力」と「バリュー出力」の2つに分岐させて、「方策」(次の一手)と「価値」(勝敗予測)の2つを推論します。

dual_network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# ====================
# デュアルネットワークの作成
# ====================

# パッケージのインポート
from tensorflow.keras.layers import Activation, Add, BatchNormalization, Conv2D, Dense, GlobalAveragePooling2D, Input
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from tensorflow.keras import backend as K
import os

# パラメータの準備
DN_FILTERS = 128 # 畳み込み層のカーネル数(本家は256)
DN_RESIDUAL_NUM = 16 # 残差ブロックの数(本家は19)
DN_INPUT_SHAPE = (3, 3, 2) # 入力シェイプ
DN_OUTPUT_SIZE = 9 # 行動数(配置先(3*3))

# 畳み込み層の作成
def conv(filters):
return Conv2D(filters, 3, padding='same', use_bias=False,
kernel_initializer='he_normal', kernel_regularizer=l2(0.0005))

# 残差ブロックの作成
def residual_block():
def f(x):
sc = x
x = conv(DN_FILTERS)(x)
x = BatchNormalization()(x)
x = Activation('relu')(x)
x = conv(DN_FILTERS)(x)
x = BatchNormalization()(x)
x = Add()([x, sc])
x = Activation('relu')(x)
return x
return f

デュアルネットワークの作成手順は次の通りです。

  1. モデル作成済みの場合は何も処理しません。
  2. モデルを作成します。
    入力層、畳み込み層、残差ブロック×16、プーリング層、ポリシー出力、バリュー出力、モデルと順番に作成します。
  3. モデルを保存します。
  4. モデルを破棄します。
dual_network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# デュアルネットワークの作成
def dual_network():
# モデル作成済みの場合は無処理
if os.path.exists('./model/best.h5'):
return

# 入力層
input = Input(shape=DN_INPUT_SHAPE)

# 畳み込み層
x = conv(DN_FILTERS)(input)
x = BatchNormalization()(x)
x = Activation('relu')(x)

# 残差ブロック x 16
for i in range(DN_RESIDUAL_NUM):
x = residual_block()(x)

# プーリング層
x = GlobalAveragePooling2D()(x)

# ポリシー出力
p = Dense(DN_OUTPUT_SIZE, kernel_regularizer=l2(0.0005),
activation='softmax', name='pi')(x)

# バリュー出力
v = Dense(1, kernel_regularizer=l2(0.0005))(x)
v = Activation('tanh', name='v')(v)

# モデルの作成
model = Model(inputs=input, outputs=[p,v])

# モデルの保存
os.makedirs('./model/', exist_ok=True) # フォルダがない時は生成
model.save('./model/best.h5') # ベストプレイヤーのモデル

# モデルの破棄
K.clear_session()
del model

# 動作確認
if __name__ == '__main__':
dual_network()

実行すると、「./model.best.h5」ファイルが出力されます。
これはまだ未学習のモデルとなります。

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

強化学習 AlphaZero 9 (三目並べ AlphaZero1)

三目並べをAlphaZeroで解いていきます。
今回はAlphaZeroを実装する事前準備として、ゲームの全体像と三目並べを実装します。

AlphaZeroは、モンテカルロ木探索をベースに次の2点を組み込んでいます。

  • 深層学習の曲面から最善手を予測する。
  • 強化学習の自己対戦による経験を組み合わせる。

AlphaZeroによる三目並べのポイントは次の通りです。

  • 目的は勝つことです。
  • 状態は局面です。
  • 行動は手を打つことです。
  • 報酬は勝ったら+1、負けたら-1です。
  • 学習方法はモンテカルロ木検索+ResNet+セルフプレイです。
  • パラメータ更新間隔は1エピソードごとです。

AlphaZeroの強化学習は下記のようになります。

  1. デュアルネットワークの作成
    ニューラルネットワークを定義し、ベストプレイヤー(過去最強)のモデルを作成します。
  2. セルフプレイ部
    AI同士でゲーム終了までプレイします。
  3. パラメータ更新部
    セルフプレイで作成した学習データで最新プレイヤーを作成します。
    初期状態としてはベストプレイヤーをコピーして作成します。
  4. 新パラメータ更新部
    最新プレイヤーとベストプレイヤーで対戦し、勝ち越した場合はベストプレイヤーを更新します。

2~4を繰り返して学習を行います。

まず三目並べを実装します。

game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# パッケージのインポート
import random
import math

# ゲーム状態
class State:
# 初期化
def __init__(self, pieces=None, enemy_pieces=None):
# 石の配置
self.pieces = pieces if pieces != None else [0] * 9
self.enemy_pieces = enemy_pieces if enemy_pieces != None else [0] * 9

# 石の数の取得
def piece_count(self, pieces):
count = 0
for i in pieces:
if i == 1:
count += 1
return count

# 負けかどうか
def is_lose(self):
# 3並びかどうか
def is_comp(x, y, dx, dy):
for k in range(3):
if y < 0 or 2 < y or x < 0 or 2 < x or \
self.enemy_pieces[x+y*3] == 0:
return False
x, y = x+dx, y+dy
return True

# 負けかどうか
if is_comp(0, 0, 1, 1) or is_comp(0, 2, 1, -1):
return True
for i in range(3):
if is_comp(0, i, 1, 0) or is_comp(i, 0, 0, 1):
return True
return False

# 引き分けかどうか
def is_draw(self):
return self.piece_count(self.pieces) + self.piece_count(self.enemy_pieces) == 9

# ゲーム終了かどうか
def is_done(self):
return self.is_lose() or self.is_draw()

# 次の状態の取得
def next(self, action):
pieces = self.pieces.copy()
pieces[action] = 1
return State(self.enemy_pieces, pieces)

# 合法手のリストの取得
def legal_actions(self):
actions = []
for i in range(9):
if self.pieces[i] == 0 and self.enemy_pieces[i] == 0:
actions.append(i)
return actions

# 先手かどうか
def is_first_player(self):
return self.piece_count(self.pieces) == self.piece_count(self.enemy_pieces)

# 文字列表示
def __str__(self):
ox = (' o', ' x') if self.is_first_player() else (' x', ' o')
str = ''
for i in range(9):
if self.pieces[i] == 1:
str += ox[0]
elif self.enemy_pieces[i] == 1:
str += ox[1]
else:
str += ' -'
if i % 3 == 2:
str += '\n'
return str

ランダムで行動を選択する関数を定義します。

game.py
1
2
3
4
# ランダムで行動選択
def random_action(state):
legal_actions = state.legal_actions()
return legal_actions[random.randint(0, len(legal_actions)-1)]

アルファベータ法で行動を選択する関数を定義します。

game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# アルファベータ法で状態価値計算
def alpha_beta(state, alpha, beta):
# 負けは状態価値-1
if state.is_lose():
return -1

# 引き分けは状態価値0
if state.is_draw():
return 0

# 合法手の状態価値の計算
for action in state.legal_actions():
score = -alpha_beta(state.next(action), -beta, -alpha)
if score > alpha:
alpha = score

# 現ノードのベストスコアが親ノードを超えたら探索終了
if alpha >= beta:
return alpha

# 合法手の状態価値の最大値を返す
return alpha

# アルファベータ法で行動選択
def alpha_beta_action(state):
# 合法手の状態価値の計算
best_action = 0
alpha = -float('inf')
for action in state.legal_actions():
score = -alpha_beta(state.next(action), -float('inf'), -alpha)
if score > alpha:
best_action = action
alpha = score

# 合法手の状態価値の最大値を持つ行動を返す
return best_action

プレイアウト関数を定義します。

game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# プレイアウト
def playout(state):
# 負けは状態価値-1
if state.is_lose():
return -1

# 引き分けは状態価値0
if state.is_draw():
return 0

# 次の状態の状態価値
return -playout(state.next(random_action(state)))

# 最大値のインデックスを返す
def argmax(collection):
return collection.index(max(collection))

モンテカルロ木探索で行動を選択する関数を定義します。

game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# モンテカルロ木探索の行動選択
def mcts_action(state):
# モンテカルロ木探索のノード
class node:
# 初期化
def __init__(self, state):
self.state = state # 状態
self.w = 0 # 累計価値
self.n = 0 # 試行回数
self.child_nodes = None # 子ノード群

# 評価
def evaluate(self):
# ゲーム終了時
if self.state.is_done():
# 勝敗結果で価値を取得
value = -1 if self.state.is_lose() else 0 # 負けは-1、引き分けは0

# 累計価値と試行回数の更新
self.w += value
self.n += 1
return value

# 子ノードが存在しない時
if not self.child_nodes:
# プレイアウトで価値を取得
value = playout(self.state)

# 累計価値と試行回数の更新
self.w += value
self.n += 1

# 子ノードの展開
if self.n == 10:
self.expand()
return value

# 子ノードが存在する時
else:
# UCB1が最大の子ノードの評価で価値を取得
value = -self.next_child_node().evaluate()

# 累計価値と試行回数の更新
self.w += value
self.n += 1
return value

# 子ノードの展開
def expand(self):
legal_actions = self.state.legal_actions()
self.child_nodes = []
for action in legal_actions:
self.child_nodes.append(node(self.state.next(action)))

# UCB1が最大の子ノードを取得
def next_child_node(self):
# 試行回数nが0の子ノードを返す
for child_node in self.child_nodes:
if child_node.n == 0:
return child_node

# UCB1の計算
t = 0
for c in self.child_nodes:
t += c.n
ucb1_values = []
for child_node in self.child_nodes:
ucb1_values.append(-child_node.w/child_node.n+2*(2*math.log(t)/child_node.n)**0.5)

# UCB1が最大の子ノードを返す
return self.child_nodes[argmax(ucb1_values)]

# ルートノードの生成
root_node = node(state)
root_node.expand()

# ルートノードを100回評価
for _ in range(100):
root_node.evaluate()

# 試行回数の最大値を持つ行動を返す
legal_actions = state.legal_actions()
n_list = []
for c in root_node.child_nodes:
n_list.append(c.n)
return legal_actions[argmax(n_list)]

動作確認のためのコードを実装します。ランダムとランダムで対戦するコードです。

game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 動作確認
if __name__ == '__main__':
# 状態の生成
state = State()

# ゲーム終了までのループ
while True:
# ゲーム終了時
if state.is_done():
break

# 次の状態の取得
state = state.next(random_action(state))

# 文字列表示
print(state)
print()

1手ごとの局面が表示され、ゲーム終了まで実行されます。(1エピソードに対応)

結果

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

強化学習 AlphaZero 8 (三目並べ モンテカルロ木探索)

三目並べをモンテカルロ木探索で解いていきます。

モンテカルロ木探索は、原始モンテカルロ探索をベースに「有望な手」をより深く調べることで改善を試みます。具体的には、選択・評価・展開・更新の4操作を行います。

まず、三目並べを実装します。(前回の原始モンテカルロ探索で定義したものと同じです。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# 三目並べの作成
import random

# ゲームの状態
class State:
# 初期化
def __init__(self, pieces=None, enemy_pieces=None):
# 石の配置
self.pieces = pieces if pieces != None else [0] * 9
self.enemy_pieces = enemy_pieces if enemy_pieces != None else [0] * 9

# 石の数の取得
def piece_count(self, pieces):
count = 0
for i in pieces:
if i == 1:
count += 1
return count

# 負けかどうか
def is_lose(self):
# 3並びかどうか
def is_comp(x, y, dx, dy):
for k in range(3):
if y < 0 or 2 < y or x < 0 or 2 < x or \
self.enemy_pieces[x+y*3] == 0:
return False
x, y = x+dx, y+dy
return True

# 負けかどうか
if is_comp(0, 0, 1, 1) or is_comp(0, 2, 1, -1):
return True
for i in range(3):
if is_comp(0, i, 1, 0) or is_comp(i, 0, 0, 1):
return True
return False

# 引き分けかどうか
def is_draw(self):
return self.piece_count(self.pieces) + self.piece_count(self.enemy_pieces) == 9

# ゲーム終了かどうか
def is_done(self):
return self.is_lose() or self.is_draw()

# 次の状態の取得
def next(self, action):
pieces = self.pieces.copy()
pieces[action] = 1
return State(self.enemy_pieces, pieces)

# 合法手のリストの取得
def legal_actions(self):
actions = []
for i in range(9):
if self.pieces[i] == 0 and self.enemy_pieces[i] == 0:
actions.append(i)
return actions

# 先手かどうか
def is_first_player(self):
return self.piece_count(self.pieces) == self.piece_count(self.enemy_pieces)

# 文字列表示
def __str__(self):
ox = ('o', 'x') if self.is_first_player() else ('x', 'o')
str = ''
for i in range(9):
if self.pieces[i] == 1:
str += ox[0]
elif self.enemy_pieces[i] == 1:
str += ox[1]
else:
str += '-'
if i % 3 == 2:
str += '\n'
return str

ランダムで行動を選択する関数を定義します。対戦相手用です。

1
2
3
4
# ランダムで行動選択
def random_action(state):
legal_actions = state.legal_actions()
return legal_actions[random.randint(0, len(legal_actions)-1)]

アルファベータ法で行動を選択する関数を定義します。対戦相手用です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# アルファベータ法で状態価値計算
def alpha_beta(state, alpha, beta):
# 負けは状態価値-1
if state.is_lose():
return -1

# 引き分けは状態価値0
if state.is_draw():
return 0

# 合法手の状態価値の計算
for action in state.legal_actions():
score = -alpha_beta(state.next(action), -beta, -alpha)
if score > alpha:
alpha = score

# 現ノードのベストスコアが親ノードを超えたら探索終了
if alpha >= beta:
return alpha

# 合法手の状態価値の最大値を返す
return alpha

# アルファベータ法で行動選択
def alpha_beta_action(state):
# 合法手の状態価値の計算
best_action = 0
alpha = -float('inf')
for action in state.legal_actions():
score = -alpha_beta(state.next(action), -float('inf'), -alpha)
if score > alpha:
best_action = action
alpha = score

# 合法手の状態価値の最大値を持つ行動を返す
return best_action

プレイアウト関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
# プレイアウト
def playout(state):
# 負けは状態価値-1
if state.is_lose():
return -1

# 引き分けは状態価値0
if state.is_draw():
return 0

# 次の状態の状態価値
return -playout(state.next(random_action(state)))

原始モンテカルロ探索で行動を選択する関数を定義します。対戦相手用です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 原始モンテカルロ探索で行動選択
def mcs_action(state):
# 合法手ごとに10回プレイアウトした時の状態価値の合計の計算
legal_actions = state.legal_actions()
values = [0] * len(legal_actions)
for i, action in enumerate(legal_actions):
for _ in range(10):
values[i] += -playout(state.next(action))

# 合法手の状態価値の合計の最大値を持つ行動を返す
return legal_actions[argmax(values)]

# 最大値のインデックスを返す
def argmax(collection, key=None):
return collection.index(max(collection))

モンテカルロ木探索で行動を選択する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import math

# モンテカルロ木探索の行動選択
def mcts_action(state):
# モンテカルロ木探索のノードの定義
class Node:
# ノードの初期化
def __init__(self, state):
self.state = state # 状態
self.w = 0 # 累計価値
self.n = 0 # 試行回数
self.child_nodes = None # 子ノード群

# 局面の価値の計算
def evaluate(self):
# ゲーム終了時
if self.state.is_done():
# 勝敗結果で価値を取得
value = -1 if self.state.is_lose() else 0 # 負けは-1、引き分けは0

# 累計価値と試行回数の更新
self.w += value
self.n += 1
return value

# 子ノードが存在しない時
if not self.child_nodes:
# プレイアウトで価値を取得
value = playout(self.state)

# 累計価値と試行回数の更新
self.w += value
self.n += 1

# 子ノードの展開
if self.n == 10:
self.expand()
return value

# 子ノードが存在する時
else:
# UCB1が最大の子ノードの評価で価値を取得
value = -self.next_child_node().evaluate()

# 累計価値と試行回数の更新
self.w += value
self.n += 1
return value

# 子ノードの展開
def expand(self):
legal_actions = self.state.legal_actions()
self.child_nodes = []
for action in legal_actions:
self.child_nodes.append(Node(self.state.next(action)))

# UCB1が最大の子ノードの取得
def next_child_node(self):
# 試行回数が0の子ノードを返す
for child_node in self.child_nodes:
if child_node.n == 0:
return child_node

# UCB1の計算
t = 0
for c in self.child_nodes:
t += c.n
ucb1_values = []
for child_node in self.child_nodes:
ucb1_values.append(-child_node.w/child_node.n+(2*math.log(t)/child_node.n)**0.5)

# UCB1が最大の子ノードを返す
return self.child_nodes[argmax(ucb1_values)]

# 現在の局面のノードの作成
root_node = Node(state)
root_node.expand()

# 100回のシミュレーションを実行
for _ in range(100):
root_node.evaluate()

# 試行回数の最大値を持つ行動を返す
legal_actions = state.legal_actions()
n_list = []
for c in root_node.child_nodes:
n_list.append(c.n)
return legal_actions[argmax(n_list)]

モンテカルロ木探索とランダムおよびアルファベータ法、原始モンテカルロ探索の対戦を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# モンテカルロ木探索とランダムおよびアルファベータ法、原始モンテカルロ探索の対戦

# パラメータ
EP_GAME_COUNT = 100 # 1評価あたりのゲーム数

# 先手プレイヤーのポイント
def first_player_point(ended_state):
# 1:先手勝利, 0:先手敗北, 0.5:引き分け
if ended_state.is_lose():
return 0 if ended_state.is_first_player() else 1
return 0.5

# 1ゲームの実行
def play(next_actions):
# 状態の生成
state = State()

# ゲーム終了までループ
while True:
# ゲーム終了時
if state.is_done():
break

# 行動の取得
next_action = next_actions[0] if state.is_first_player() else next_actions[1]
action = next_action(state)

# 次の状態の取得
state = state.next(action)

# 先手プレイヤーのポイントを返す
return first_player_point(state)

# 任意のアルゴリズムの評価
def evaluate_algorithm_of(label, next_actions):
# 複数回の対戦を繰り返す
total_point = 0
for i in range(EP_GAME_COUNT):
# 1ゲームの実行
if i % 2 == 0:
total_point += play(next_actions)
else:
total_point += 1 - play(list(reversed(next_actions)))

# 出力
print('\rEvaluate {}/{}'.format(i + 1, EP_GAME_COUNT), end='')
print('')

# 平均ポイントの計算
average_point = total_point / EP_GAME_COUNT
print(label.format(average_point * 100))

# VSランダム
next_actions = (mcts_action, random_action)
evaluate_algorithm_of('モンテカルロ木探索 VS ランダム {:.2f}%\n', next_actions)

# VSアルファベータ法
next_actions = (mcts_action, alpha_beta_action)
evaluate_algorithm_of('モンテカルロ木探索 VS アルファベータ法 {:.2f}%\n', next_actions)

# VSモンテカルロ法
next_actions = (mcts_action, mcs_action)
evaluate_algorithm_of('モンテカルロ木探索 VS 原始モンテカルロ探索 {:.2f}%\n', next_actions)

結果は次の通りです。
結果
ランダムとの対戦では勝率93%と圧勝ですが、アルファベータ法との対戦では勝率36.5%となり原始モンテカルロ探索よりは勝率が10%以上あがりました。

モンテカルロ木探索と原始モンテカルロ探索の対戦では67.00%と原始モンテカルロ探索よりモンテカルロ木探索がやや強い結果となりました。

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

強化学習 AlphaZero 7 (三目並べ 原始モンテカルロ探索)

三目並べを原始モンテカルロ探索で解いていきます。

原始モンテカルロ探索はランダムシミュレーションによって状態価値を計算する手法です。
何回もランダムプレイを行い、勝率の高い手を価値が高いと評価します。

まず、三目並べを実装します。(前回のアルファベータ法で定義したものと同じです。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# 三目並べの作成
import random

# ゲームの状態
class State:
# 初期化
def __init__(self, pieces=None, enemy_pieces=None):
# 石の配置
self.pieces = pieces if pieces != None else [0] * 9
self.enemy_pieces = enemy_pieces if enemy_pieces != None else [0] * 9

# 石の数の取得
def piece_count(self, pieces):
count = 0
for i in pieces:
if i == 1:
count += 1
return count

# 負けかどうか
def is_lose(self):
# 3並びかどうか
def is_comp(x, y, dx, dy):
for k in range(3):
if y < 0 or 2 < y or x < 0 or 2 < x or \
self.enemy_pieces[x+y*3] == 0:
return False
x, y = x+dx, y+dy
return True

# 負けかどうか
if is_comp(0, 0, 1, 1) or is_comp(0, 2, 1, -1):
return True
for i in range(3):
if is_comp(0, i, 1, 0) or is_comp(i, 0, 0, 1):
return True
return False

# 引き分けかどうか
def is_draw(self):
return self.piece_count(self.pieces) + self.piece_count(self.enemy_pieces) == 9

# ゲーム終了かどうか
def is_done(self):
return self.is_lose() or self.is_draw()

# 次の状態の取得
def next(self, action):
pieces = self.pieces.copy()
pieces[action] = 1
return State(self.enemy_pieces, pieces)

# 合法手のリストの取得
def legal_actions(self):
actions = []
for i in range(9):
if self.pieces[i] == 0 and self.enemy_pieces[i] == 0:
actions.append(i)
return actions

# 先手かどうか
def is_first_player(self):
return self.piece_count(self.pieces) == self.piece_count(self.enemy_pieces)

# 文字列表示
def __str__(self):
ox = ('o', 'x') if self.is_first_player() else ('x', 'o')
str = ''
for i in range(9):
if self.pieces[i] == 1:
str += ox[0]
elif self.enemy_pieces[i] == 1:
str += ox[1]
else:
str += '-'
if i % 3 == 2:
str += '\n'
return str

ランダムで行動を選択する関数を定義します。対戦相手用です。

1
2
3
4
# ランダムで行動選択
def random_action(state):
legal_actions = state.legal_actions()
return legal_actions[random.randint(0, len(legal_actions)-1)]

アルファベータ法で行動を選択する関数を定義します。対戦相手用です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# アルファベータ法で状態価値計算
def alpha_beta(state, alpha, beta):
# 負けは状態価値-1
if state.is_lose():
return -1

# 引き分けは状態価値0
if state.is_draw():
return 0

# 合法手の状態価値の計算
for action in state.legal_actions():
score = -alpha_beta(state.next(action), -beta, -alpha)
if score > alpha:
alpha = score

# 現ノードのベストスコアが親ノードを超えたら探索終了
if alpha >= beta:
return alpha

# 合法手の状態価値の最大値を返す
return alpha

# アルファベータ法で行動選択
def alpha_beta_action(state):
# 合法手の状態価値の計算
best_action = 0
alpha = -float('inf')
for action in state.legal_actions():
score = -alpha_beta(state.next(action), -float('inf'), -alpha)
if score > alpha:
best_action = action
alpha = score

# 合法手の状態価値の最大値を持つ行動を返す
return best_action

いよいよ原始モンテカルロ探索の実装にはいります。
まずはプレイアウト関数を定義します。プレイアウトとはたんに現在の局面からゲーム終了までプレイすることです。

1
2
3
4
5
6
7
8
9
10
11
12
# プレイアウト
def playout(state):
# 負けは状態価値-1
if state.is_lose():
return -1

# 引き分けは状態価値0
if state.is_draw():
return 0

# 次の状態の状態価値
return -playout(state.next(random_action(state)))

原始モンテカルロ探索で行動選択する関数を定義します。
合法手ごとに10回プレイアウトした時の状態価値の合計の計算し、その合計が最も大きな行動を選択します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 原始モンテカルロ探索で行動選択
def mcs_action(state):
# 合法手ごとに10回プレイアウトした時の状態価値の合計の計算
legal_actions = state.legal_actions()
values = [0] * len(legal_actions)
for i, action in enumerate(legal_actions):
for _ in range(10):
values[i] += -playout(state.next(action))

# 合法手の状態価値の合計の最大値を持つ行動を返す
return legal_actions[argmax(values)]

# 最大値のインデックスを返す
def argmax(collection, key=None):
return collection.index(max(collection))

原始モンテカルロ探索とランダム・アルファベータ法で対戦してみます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 原始モンテカルロ探索とランダム・アルファベータ法の対戦

# パラメータ
EP_GAME_COUNT = 100 # 1評価あたりのゲーム数

# 先手プレイヤーのポイント
def first_player_point(ended_state):
# 1:先手勝利, 0:先手敗北, 0.5:引き分け
if ended_state.is_lose():
return 0 if ended_state.is_first_player() else 1
return 0.5

# 1ゲームの実行
def play(next_actions):
# 状態の生成
state = State()

# ゲーム終了までループ
while True:
# ゲーム終了時
if state.is_done():
break

# 行動の取得
next_action = next_actions[0] if state.is_first_player() else next_actions[1]
action = next_action(state)

# 次の状態の取得
state = state.next(action)

# 先手プレイヤーのポイントを返す
return first_player_point(state)

# 任意のアルゴリズムの評価
def evaluate_algorithm_of(label, next_actions):
# 複数回の対戦を繰り返す
total_point = 0
for i in range(EP_GAME_COUNT):
# 1ゲームの実行
if i % 2 == 0:
total_point += play(next_actions)
else:
total_point += 1 - play(list(reversed(next_actions)))

# 出力
print('\rEvaluate {}/{}'.format(i + 1, EP_GAME_COUNT), end='')
print('')

# 平均ポイントの計算
average_point = total_point / EP_GAME_COUNT
print(label.format(average_point))

# VSランダム
next_actions = (mcs_action, random_action)
evaluate_algorithm_of('VS_Random {:.3f}', next_actions)

# VSアルファベータ法
next_actions = (mcs_action, alpha_beta_action)
evaluate_algorithm_of('VS_AlphaBeta {:.3f}', next_actions)

結果は次の通りです。
結果
ランダムとの対戦では勝率93%と圧勝ですが、アルファベータ法との対戦では勝率23.5%となかなか厳しい結果となりました。

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

強化学習 AlphaZero 6 (三目並べ アルファベータ法)

三目並べをアルファベータ法で解いていきます。

アルファベータ法は、ミニマックス法から計算しなくても同じ計算結果になる部分を読まない処理(枝刈り)を実装し、高速化したものです。探索手順はミニマックス法と同じです。

まず、三目並べを実装します。(前回のミニマックス法で定義したものと同じです。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# 三目並べの作成
import random

# ゲームの状態
class State:
# 初期化
def __init__(self, pieces=None, enemy_pieces=None):
# 石の配置
self.pieces = pieces if pieces != None else [0] * 9
self.enemy_pieces = enemy_pieces if enemy_pieces != None else [0] * 9

# 石の数の取得
def piece_count(self, pieces):
count = 0
for i in pieces:
if i == 1:
count += 1
return count

# 負けかどうか
def is_lose(self):
# 3並びかどうか
def is_comp(x, y, dx, dy):
for k in range(3):
if y < 0 or 2 < y or x < 0 or 2 < x or \
self.enemy_pieces[x+y*3] == 0:
return False
x, y = x+dx, y+dy
return True

# 負けかどうか
if is_comp(0, 0, 1, 1) or is_comp(0, 2, 1, -1):
return True
for i in range(3):
if is_comp(0, i, 1, 0) or is_comp(i, 0, 0, 1):
return True
return False

# 引き分けかどうか
def is_draw(self):
return self.piece_count(self.pieces) + self.piece_count(self.enemy_pieces) == 9

# ゲーム終了かどうか
def is_done(self):
return self.is_lose() or self.is_draw()

# 次の状態の取得
def next(self, action):
pieces = self.pieces.copy()
pieces[action] = 1
return State(self.enemy_pieces, pieces)

# 合法手のリストの取得
def legal_actions(self):
actions = []
for i in range(9):
if self.pieces[i] == 0 and self.enemy_pieces[i] == 0:
actions.append(i)
return actions

# 先手かどうか
def is_first_player(self):
return self.piece_count(self.pieces) == self.piece_count(self.enemy_pieces)

# 文字列表示
def __str__(self):
ox = ('o', 'x') if self.is_first_player() else ('x', 'o')
str = ''
for i in range(9):
if self.pieces[i] == 1:
str += ox[0]
elif self.enemy_pieces[i] == 1:
str += ox[1]
else:
str += '-'
if i % 3 == 2:
str += '\n'
return str

ミニマックス法で状態価値を計算します。(前回と同じです。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# ミニマックス法で状態価値計算
def mini_max(state):
# 負けは状態価値-1
if state.is_lose():
return -1

# 引き分けは状態価値0
if state.is_draw():
return 0

# 合法手の状態価値の計算
best_score = -float('inf')
for action in state.legal_actions():
score = -mini_max(state.next(action))
if score > best_score:
best_score = score

# 合法手の状態価値の最大値を返す
return best_score

# ミニマックス法で行動選択
def mini_max_action(state):
# 合法手の状態価値の計算
best_action = 0
best_score = -float('inf')
for action in state.legal_actions():
score = -mini_max(state.next(action))
if score > best_score:
best_action = action
best_score = score

# 合法手の状態価値の最大値を持つ行動を返す
return best_action

アルファベータ法で状態価値を計算計算します。

引数のalphaは親ノードの親ノード(つまり自分)のベストスコアで、betaは親ノードのベストスコアです。

  • alphaは自分のベストスコアなので、alpha以上のスコアを取得したらalphaを更新します。
  • betaは親ノードのベストスコアなのでalpha以下だったら探索を終了します。

メソッドを再帰的に呼び出すとき、alphaとbetaを左右入れ替えてるのがポイントです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# アルファベータ法で状態価値計算
def alpha_beta(state, alpha, beta):
# 負けは状態価値-1
if state.is_lose():
return -1

# 引き分けは状態価値0
if state.is_draw():
return 0

# 合法手の状態価値の計算
for action in state.legal_actions():
score = -alpha_beta(state.next(action), -beta, -alpha)
if score > alpha:
alpha = score

# 現ノードのベストスコアが親ノードを超えたら探索終了
if alpha >= beta:
return alpha

# 合法手の状態価値の最大値を返す
return alpha

アルファベータ法で状態に応じて行動を返す関数を実装します。
ミニマックス法との違いは次の2点です

  • best_scoreをalphaとする。
  • betaの初期値に-float(‘inf’)を設定する。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# アルファベータ法で行動選択
def alpha_beta_action(state):
# 合法手の状態価値の計算
best_action = 0
alpha = -float('inf')
str = ['','']
for action in state.legal_actions():
score = -alpha_beta(state.next(action), -float('inf'), -alpha)
if score > alpha:
best_action = action
alpha = score

str[0] = '{}{:2d},'.format(str[0], action)
str[1] = '{}{:2d},'.format(str[1], score)
print('action:', str[0], '\nscore: ', str[1], '\n')

# 合法手の状態価値の最大値を持つ行動を返す
return best_action

最後に、アルファベータ法とミニマックス法で対戦します。
先手はアルファベータ法、後手はミニマックス法を使います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# アルファベータ法とミニマックス法の対戦

# 状態の生成
state = State()

# ゲーム終了までのループ
while True:
# ゲーム終了時
if state.is_done():
break

# 行動の取得
if state.is_first_player():
print('----- アルファベータ法で行動選択 -----')
action = alpha_beta_action(state)
else:
print('----- ミニマックス法で行動選択 -----')
action = mini_max_action(state)

# 次の状態の取得
state = state.next(action)

# 文字列表示
print(state)

結果は次の通りです。

結果
アルファベータ法の方が実行速度が上がりましたが、強さは互角になりました。

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

強化学習 AlphaZero 5 (三目並べ ミニマックス法)

三目並べをミニマックス法で解いていきます。

ミニマックス法は、自分は自分にとって最善手を選び、相手は自分にとって最悪手を選ぶと仮定して最善手を探すアルゴリズムです。

ポイントは次の通りです。

  • next()は行動に応じて次の状態を取得する。
  • 行動は石を配置するマスの位置を0~8で指定する。
  • legal_actions()は選択可能な行動を取得します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# 三目並べの作成
import random

# ゲームの状態
class State:
# 初期化
def __init__(self, pieces=None, enemy_pieces=None):
# 石の配置
self.pieces = pieces if pieces != None else [0] * 9
self.enemy_pieces = enemy_pieces if enemy_pieces != None else [0] * 9

# 石の数の取得
def piece_count(self, pieces):
count = 0
for i in pieces:
if i == 1:
count += 1
return count

# 負けかどうか
def is_lose(self):
# 3並びかどうか
def is_comp(x, y, dx, dy):
for k in range(3):
if y < 0 or 2 < y or x < 0 or 2 < x or \
self.enemy_pieces[x+y*3] == 0:
return False
x, y = x+dx, y+dy
return True

# 負けかどうか
if is_comp(0, 0, 1, 1) or is_comp(0, 2, 1, -1):
return True
for i in range(3):
if is_comp(0, i, 1, 0) or is_comp(i, 0, 0, 1):
return True
return False

# 引き分けかどうか
def is_draw(self):
return self.piece_count(self.pieces) + self.piece_count(self.enemy_pieces) == 9

# ゲーム終了かどうか
def is_done(self):
return self.is_lose() or self.is_draw()

# 次の状態の取得
def next(self, action):
pieces = self.pieces.copy()
pieces[action] = 1
return State(self.enemy_pieces, pieces)

# 合法手のリストの取得
def legal_actions(self):
actions = []
for i in range(9):
if self.pieces[i] == 0 and self.enemy_pieces[i] == 0:
actions.append(i)
return actions

# 先手かどうか
def is_first_player(self):
return self.piece_count(self.pieces) == self.piece_count(self.enemy_pieces)

# 文字列表示
def __str__(self):
ox = ('o', 'x') if self.is_first_player() else ('x', 'o')
str = ''
for i in range(9):
if self.pieces[i] == 1:
str += ox[0]
elif self.enemy_pieces[i] == 1:
str += ox[1]
else:
str += '-'
if i % 3 == 2:
str += '\n'
return str

ランダムで行動選択する関数を実装します。
leagl_actions()で合法手を取得し、その中からランダムに手を選びます。

1
2
3
4
# ランダムで行動選択
def random_action(state):
legal_actions = state.legal_actions()
return legal_actions[random.randint(0, len(legal_actions)-1)]

ランダムとランダムで三目並べを対戦させます。
ゲーム終了まで行動の取得と次の状態の取得を繰り返します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# ランダムとランダムで対戦

# 状態の生成
state = State()

# ゲーム終了までのループ
while True:
# ゲーム終了時
if state.is_done():
break;

# 行動の取得
action = random_action(state)

# 次の状態の取得
state = state.next(action)

# 文字列表示
print(state)
print()


結果

ミニマックス法で状態価値の計算を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# ミニマックス法で状態価値計算
def mini_max(state):
# 負けは状態価値-1
if state.is_lose():
return -1

# 引き分けは状態価値0
if state.is_draw():
return 0

# 合法手の状態価値の計算
best_score = -float('inf')
for action in state.legal_actions():
score = -mini_max(state.next(action))
if score > best_score:
best_score = score

# 合法手の状態価値の最大値を返す
return best_score

ミニマックス法で状態に応じて行動を返す関数を実装します。
Stateを渡すと行動(石を置くマス:0~8)を返します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# ミニマックス法で行動選択
def mini_max_action(state):
# 合法手の状態価値の計算
best_action = 0
best_score = -float('inf')
str = ['','']
for action in state.legal_actions():
score = -mini_max(state.next(action))
if score > best_score:
best_action = action
best_score = score

str[0] = '{}{:2d},'.format(str[0], action)
str[1] = '{}{:2d},'.format(str[1], score)
print('action:', str[0], '\nscore: ', str[1], '\n')

# 合法手の状態価値の最大値を持つ行動を返す
return best_action

ミニマックス法とランダムで対戦します。
先手はmini_max_action()、後手はrandmo_action()を使います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# ミニマックス法とランダムで対戦

# 状態の生成
state = State()

# ゲーム終了までのループ
while True:
# ゲーム終了時
if state.is_done():
break

# 行動の取得
if state.is_first_player():
action = mini_max_action(state)
else:
action = random_action(state)

# 次の状態の取得
state = state.next(action)

# 文字列表示
print(state)
print()

実行ごとに結果がかわるので複数回実行してみてください。
ただランダムが勝つことはありません。

結果

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

強化学習 AlphaZero 4 (カートポール Deep Q-network)

カートポールをDQN(Deep Q-network)で解いていきます。

カートポールは棒を倒さないようにバランスをとるゲームです。

今回の強化学習のポイントは下記です。

  • 目的は棒を倒さないようにバランスをとること。
  • 1エピソードは棒を倒すまで。
  • 状態は「カートの位置」「カートの速度」「棒の角度」「棒の角速度」の4種類。
  • 行動は「カートの左移動」「カートの右移動」の2種類。
  • 報酬はエピソード完了時に190ステップ以上で+1.
  • パラメータ更新間隔は行動1回ごと。

DQNでは行動価値関数を表形式ではなく、ニューラルネットワークで表現します。
またそれ以外にもDQNでは次の4つの工夫がされています。

  • Experience Replayd(経験を貯めておきあとでランダムに学習する)
  • Fixed Target Q-Network(更新計算のためにニューラルネットワークをもう1つ作る)
  • Reward Clipping(報酬のスケールを-1,0,1に統一する)
  • Huber Loss(誤差が大きい場合でも安定している関数huberを使う)

まずはGoogle Colabaratoryで動作確認するために環境を構築します。
tensorflowのバージョンは1.13.1が必須のため、次のコマンドを実行します。

1
2
!pip uninstall tensorflow -y
!pip install tensorflow==1.13.1

カートポールの様子をアニメーションで確認するために、ディスプレイの設定を行います。

1
2
3
4
5
6
7
8
9
10
11
12
# ディスプレイ設定のインストール
!apt-get -qq -y install xvfb freeglut3-dev ffmpeg> /dev/null
!pip install pyglet==1.3.2
!pip install pyopengl
!pip install pyvirtualdisplay

# ディスプレイ設定の適用
from pyvirtualdisplay import Display
import os
disp = Display(visible=0, size=(1024, 768))
disp.start()
os.environ['DISPLAY'] = ':' + str(disp.display) + '.' + str(disp.screen)

ここでいったんランタイムの再接続が必要になります。

次に必要なパッケージをインポートします。

1
2
3
4
5
6
7
8
# パッケージのインポート
import gym
import numpy as np
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from collections import deque
from tensorflow.losses import huber_loss

パラメータを準備します。パラメータの内容はコメントを参照して下さい。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# パラメータの準備
NUM_EPISODES = 500 # エピソード数
MAX_STEPS = 200 # 最大ステップ数
GAMMA = 0.99 # 時間割引率
WARMUP = 10 # 無操作ステップ数

# 探索パラメータ
E_START = 1.0 # εの初期値
E_STOP = 0.01 # εの最終値
E_DECAY_RATE = 0.001 # εの減衰率

# メモリパラメータ
MEMORY_SIZE = 10000 # 経験メモリのサイズ
BATCH_SIZE = 32 # バッチサイズ

行動評価関数となるニューラルネットワークモデルを作成します。
全結合層を4つ重ねたモデルで、入力は状態数、出力数は行動数になります。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 行動価値関数の定義
class QNetwork:
# 初期化
def __init__(self, state_size, action_size):
# モデルの作成
self.model = Sequential()
self.model.add(Dense(16, activation='relu', input_dim=state_size))
self.model.add(Dense(16, activation='relu'))
self.model.add(Dense(16, activation='relu'))
self.model.add(Dense(action_size, activation='linear'))

# モデルのコンパイル
self.model.compile(loss=huber_loss, optimizer=Adam(lr=0.001))

経験メモリを定義します。メモリの内容は「状態」「行動」「報酬」「次の状態」の4種類となります。
dequeはmaxlen以上の要素を追加すると自動的に先頭の要素が削除される関数です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 経験メモリの定義
class Memory():
# 初期化
def __init__(self, memory_size):
self.buffer = deque(maxlen=memory_size)

# 経験の追加
def add(self, experience):
self.buffer.append(experience)

# バッチサイズ分の経験をランダムに取得
def sample(self, batch_size):
idx = np.random.choice(np.arange(len(self.buffer)), size=batch_size, replace=False)
return [self.buffer[i] for i in idx]

# 経験メモリのサイズ
def __len__(self):
return len(self.buffer)

環境を作成します。OpenAI Gymのgym.make()を使用します。
さらに、定義したQNetworkクラスとMemoryクラスを利用して、2つのニューラルネットワークと経験メモリを作成します。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 環境の作成
env = gym.make('CartPole-v0')
state_size = env.observation_space.shape[0] # 行動数
action_size = env.action_space.n # 状態数

# main-networkの作成
main_qn = QNetwork(state_size, action_size)

# target-networkの作成
target_qn = QNetwork(state_size, action_size)

# 経験メモリの作成
memory = Memory(MEMORY_SIZE)

学習を開始します。

  1. 環境をリセットする。
  2. 定義したエピソード数だけエピソードを繰り返す。
  3. target-networkを更新する。
  4. 1エピソード分、ゲーム終了まで処理を実行する。
  5. εを減らす。
  6. ランダムまたは行動価値関数に従って、行動を取得する。
  7. 行動に応じて状態を報酬を得る。
  8. エピソード完了時に190ステップ以上で報酬+1、成功回数に1加算する。
  9. エピソード完了でないとき報酬に0を指定し、経験メモリに経験を追加する。
  10. 行動価値を更新する。
    1. ニューラルネットワークの入力と出力を準備する。
    2. バッチサイズ分の経験をランダムに取得する。
    3. ニューラルネットワークの入力と出力を生成する。
    4. 行動価値を更新する。
  11. エピソード完了時のログを表示する。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# 学習の開始

# 環境の初期化
state = env.reset()
state = np.reshape(state, [1, state_size])

# エピソード数分のエピソードを繰り返す
total_step = 0 # 総ステップ数
success_count = 0 # 成功数
for episode in range(1, NUM_EPISODES+1):
step = 0 # ステップ数

# target-networkの更新
target_qn.model.set_weights(main_qn.model.get_weights())

# 1エピソードのループ
for _ in range(1, MAX_STEPS+1):
step += 1
total_step += 1

# εを減らす
epsilon = E_STOP + (E_START - E_STOP)*np.exp(-E_DECAY_RATE*total_step)

# ランダムな行動を選択
if epsilon > np.random.rand():
action = env.action_space.sample()
# 行動価値関数で行動を選択
else:
action = np.argmax(main_qn.model.predict(state)[0])

# 行動に応じて状態と報酬を得る
next_state, _, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, state_size])

# エピソード完了時
if done:
# 報酬の指定
if step >= 190:
success_count += 1
reward = 1
else:
success_count = 0
reward = 0

# 次の状態に状態なしを代入
next_state = np.zeros(state.shape)

# 経験の追加
if step > WARMUP:
memory.add((state, action, reward, next_state))
# エピソード完了でない時
else:
# 報酬の指定
reward = 0

# 経験の追加
if step > WARMUP:
memory.add((state, action, reward, next_state))

# 状態に次の状態を代入
state = next_state

# 行動価値関数の更新
if len(memory) >= BATCH_SIZE:
# ニューラルネットワークの入力と出力の準備
inputs = np.zeros((BATCH_SIZE, 4)) # 入力(状態)
targets = np.zeros((BATCH_SIZE, 2)) # 出力(行動ごとの価値)

# バッチサイズ分の経験をランダムに取得
minibatch = memory.sample(BATCH_SIZE)

# ニューラルネットワークの入力と出力の生成
for i, (state_b, action_b, reward_b, next_state_b) in enumerate(minibatch):

# 入力に状態を指定
inputs[i] = state_b

# 採った行動の価値を計算
if not (next_state_b == np.zeros(state_b.shape)).all(axis=1):
target = reward_b + GAMMA * np.amax(target_qn.model.predict(next_state_b)[0])
else:
target = reward_b

# 出力に行動ごとの価値を指定
targets[i] = main_qn.model.predict(state_b)
targets[i][action_b] = target # 採った行動の価値

# 行動価値関数の更新
main_qn.model.fit(inputs, targets, epochs=1, verbose=0)

# エピソード完了時
if done:
# エピソードループを抜ける
break

# エピソード完了時のログ表示
print('エピソード: {}, ステップ数: {}, epsilon: {:.4f}'.format(episode, step, epsilon))

# 5回連続成功で学習終了
if success_count >= 5:
break

# 環境のリセット
state = env.reset()
state = np.reshape(state, [1, state_size])

最大ステップ数の200に少しずつ近づいていきます。
結果(エピソード1から20)
結果(エピソード90から114)

アニメーションフレームを作成します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 評価
frames = [] # アニメーションフレーム

# 環境のリセット
state = env.reset()
state = np.reshape(state, [1, state_size])

# 1エピソードのループ
step = 0 # ステップ数
for step in range(1, MAX_STEPS+1):
step += 1

# アニメーションフレームの追加
frames.append(env.render(mode='rgb_array'))

# 最適行動を選択
action = np.argmax(main_qn.model.predict(state)[0])

# 行動に応じて状態と報酬を得る
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, state_size])

# エピソード完了時
if done:
# 次の状態に状態なしを代入
next_state = np.zeros(state.shape)

# エピソードループを抜ける
break
else:
# 状態に次の状態を代入
state = next_state

# エピソード完了時のログ表示
print('ステップ数: {}'.format(step))

結果

アニメーションフレームをアニメーションに変換して最終動作を確認します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# JSAnimationのインストール
!pip install JSAnimation

# パッケージのインポート
import matplotlib.pyplot as plt
from matplotlib import animation
from JSAnimation.IPython_display import display_animation
from IPython.display import display

# アニメーション再生の定義
def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

# アニメーションの定期処理
def animate(i):
patch.set_data(frames[i])

# アニメーション再生
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
display(display_animation(anim, default_mode='loop'))

# アニメーション再生
display_frames_as_gif(frames)

最終的にうまくポールのバランスととることができています。

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

強化学習 AlphaZero 3 (迷路ゲーム Sarsa/Q-learning)

迷路ゲームをSarsaとQ-learningで解いていきます。

SarsaとQ-learningは価値反復法と呼ばれ、ある行動をとるたびに状態価値を増やす手法です。
(Sarsaは収束が遅い一方局所解に陥りにくく、Q-learningは収束が早い一方で局所解に陥りやすいと言われています。)

今回の強化学習のポイントは下記です。

  • 目的はゴールすること。
  • 状態は位置。
  • 行動は上下左右の4種類。
  • 報酬はゴールしたら+1。
  • パラメータl更新間隔は行動1回ごと。

価値反復法の学習手順は次の通りです。

  1. ランダム行動の準備する。
  2. 行動価値関数を準備する。
  3. 行動に従って、次の状態を取得する。
  4. ランダムまたは行動価値関数に従って、行動を取得する。
  5. 行動価値関数を更新する。
  6. ゴールするまで3~5を繰り返す。

まずは必要なパッケージをインポートします。

1
2
3
4
5
6
# パッケージのインポート
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from matplotlib import animation
from IPython.display import HTML

迷路を作成します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 迷路の作成
fig = plt.figure(figsize=(3, 3))

# 壁
plt.plot([0, 3], [3, 3], color='k')
plt.plot([0, 3], [0, 0], color='k')
plt.plot([0, 0], [0, 2], color='k')
plt.plot([3, 3], [1, 3], color='k')
plt.plot([1, 1], [1, 2], color='k')
plt.plot([2, 3], [2, 2], color='k')
plt.plot([2, 1], [1, 1], color='k')
plt.plot([2, 2], [0, 1], color='k')

# 数字
for i in range(3):
for j in range(3):
plt.text(0.5+i, 2.5-j, str(i+j*3), size=20, ha='center', va='center')

# 円
circle, = plt.plot([0.5], [2.5], marker='o', color='#d3d3d3', markersize=40)

# 目盛りと枠の非表示
plt.tick_params(axis='both', which='both', bottom='off', top= 'off',
labelbottom='off', right='off', left='off', labelleft='off')
plt.box('off')

結果

ランダム行動を準備します。
パラメータθと方策を作成し、np.random.choice()でばらつきを加えて作成します。
前回の方策勾配法とは違い、今回パラメータθと方策の更新はありません。

1
2
3
4
5
6
7
8
9
10
# パラメータθの初期値の準備
theta_0 = np.array([
[np.nan, 1, 1, np.nan], # 0
[np.nan, 1, 1, 1], # 1
[np.nan, np.nan, np.nan, 1], # 2
[1, np.nan, 1, np.nan], # 3
[1, 1, np.nan, np.nan], # 4
[np.nan, np.nan, 1, 1], # 5
[1, 1, np.nan, np.nan], # 6
[np.nan, np.nan, np.nan, 1]]) # 7

パラメータθから方策のへの変換には単純な割合計算を行います。

1
2
3
4
5
6
7
8
9
# パラメータθを方策に変換
def get_pi(theta):
# 割合の計算
[m, n] = theta.shape
pi = np.zeros((m, n))
for i in range(0, m):
pi[i, :] = theta[i, :] / np.nansum(theta[i, :])
pi = np.nan_to_num(pi)
return pi

パラメータθの初期値を変換します。列の合計が1になっています。

1
2
3
# パラメータθの初期値を方策に変換
pi_0 = get_pi(theta_0)
print(pi_0)

結果

行動価値関数を表形式で準備します。
学習前は行動価値関数は不明なので、移動可能が方向は乱数、移動不可の方向は欠損値(np.nan)で初期化します。

1
2
3
4
# 行動価値関数の準備
[a, b] = theta_0.shape
Q = np.random.rand(a, b) * theta_0 * 0.01
print(Q)

結果

行動に従って次の状態を取得する関数を作成します。

1
2
3
4
5
6
7
8
9
10
# 行動に従って次の状態を取得
def get_s_next(s, a):
if a == 0: # 上
return s - 3
elif a == 1: # 右
return s + 1
elif a == 2: # 下
return s + 3
elif a == 3: # 左
return s - 1

確率ε(0以上1以下)でランダムに行動し、確率1-εで行動価値関数の行動を選択します。

1
2
3
4
5
6
7
8
# ランダムまたは行動価値関数に従って行動を取得
def get_a(s, Q, epsilon, pi_0):
if np.random.rand() < epsilon:
# ランダムに行動を選択
return np.random.choice([0, 1, 2, 3], p=pi_0[s])
else:
# 行動価値関数で行動を選択
return np.nanargmax(Q[s])

Sarsaの行動価値関数の更新は、パラメータθに学習係数とTD誤差を掛けた値を加算します。

1
2
3
4
5
6
7
8
9
10
# Sarsaによる行動価値関数の更新
def sarsa(s, a, r, s_next, a_next, Q):
eta = 0.1 # 学習係数
gamma = 0.9 # 時間割引率

if s_next == 8:
Q[s, a] = Q[s, a] + eta * (r - Q[s, a])
else:
Q[s, a] = Q[s, a] + eta * (r + gamma * Q[s_next, a_next] - Q[s, a])
return Q

Q-learningの行動価値関数の更新は、次ステップの価値最大の行動を使用します。
(Sarsaと異なりランダム性を含みません)

1
2
3
4
5
6
7
8
9
10
# Q学習による行動価値関数の更新
def q_learning(s, a, r, s_next, a_next, Q):
eta = 0.1 # 学習係数
gamma = 0.9 # 時間割引率

if s_next == 8:
Q[s, a] = Q[s, a] + eta * (r - Q[s, a])
else:
Q[s, a] = Q[s, a] + eta * (r + gamma * np.nanmax(Q[s_next, :]) - Q[s, a])
return Q

1エピソードを実行して履歴と行動価値を取得します。
履歴は[状態, 行動]のリストです。

Sarsaの場合は行動価値関数としてsarsa()を使い、Q-learnigの場合はq-learning()を使います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 1エピソードの実行
def play(Q, epsilon, pi, type):
s = 0 # 状態
a = a_next = get_a(s, Q, epsilon, pi) # 行動の初期値
s_a_history = [[0, np.nan]] # 状態と行動の履歴

# エピソード完了までループ
while True:
# 行動に従って次の状態の取得
a = a_next
s_next = get_s_next(s, a)

# 履歴の更新
s_a_history[-1][1] = a
s_a_history.append([s_next, np.nan])

# 終了判定
if s_next == 8:
r = 1
a_next = np.nan
else:
r = 0
# 行動価値関数Qに従って行動の取得
a_next = get_a(s_next, Q, epsilon, pi)

# 行動価値関数の更新
if type == 1:
Q = sarsa(s, a, r, s_next, a_next, Q)
else:
Q = q_learning(s, a, r, s_next, a_next, Q)

# 終了判定
if s_next == 8:
break
else:
s = s_next

# 履歴と行動価値関数を返す
return [s_a_history, Q]

エピソードを繰り返し実行して学習を行います。今回は10回エピソードを実行しています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
epsilon = 0.5  # ε-greedy法のεの初期値
# 行動価値関数を初期化
Q = np.random.rand(a, b) * theta_0 * 0.01

# エピソードを繰り返し実行して学習
print('=== Sarsa ===')
for episode in range(10):
# ε-greedyの値を少しずつ小さくする
epsilon = epsilon / 2

# 1エピソード実行して履歴と行動価値関数を取得
[s_a_history, Q] = play(Q, epsilon, pi_0, 1)

# 出力
print('エピソード: {}, ステップ: {}'.format(
episode, len(s_a_history)-1))

epsilon = 0.5 # ε-greedy法のεの初期値
# 行動価値関数を初期化
Q = np.random.rand(a, b) * theta_0 * 0.01

# エピソードを繰り返し実行して学習
print('=== Q-learning ===')
for episode in range(10):
# ε-greedyの値を少しずつ小さくする
epsilon = epsilon / 2

# 1エピソード実行して履歴と行動価値関数を取得
[s_a_history, Q] = play(Q, epsilon, pi_0, 2)

# 出力
print('エピソード: {}, ステップ: {}'.format(
episode, len(s_a_history)-1))

SarsaとQ-learningでそれぞれ最短ステップの4に収束していることがわかります。
またSarsaよりQ-learningの方が収束が早いこともわかります。
結果

最後の履歴をもとにアニメーション表示を行ってみます。
(Sarsa、Q-learningともに最終的には同じ結果になります。)

1
2
3
4
5
6
7
8
9
10
# アニメーションの定期処理を行う関数
def animate(i):
state = s_a_history[i][0]
circle.set_data((state % 3) + 0.5, 2.5 - int(state / 3))
return circle

# アニメーションの表示
anim = animation.FuncAnimation(fig, animate, \
frames=len(s_a_history), interval=1000, repeat=False)
HTML(anim.to_jshtml())

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

強化学習 AlphaZero 2 (迷路ゲーム 方策勾配法)

迷路ゲームを方策勾配法で解いていきます。

方策勾配法では成功時の行動を重要と考え、その行動を多く取り入れる手法です。

  • 目的はゴールすること。
  • 状態は位置。
  • 行動は上下左右の4種類。
  • 報酬はゴールした行動を重要視。
  • パラメータ更新間隔は1エピソード(ゴールするまで)

学習手順は次の通りです。

  1. パラメータθを準備。
  2. パラメータθを方策に変換。
  3. 方策に従って、行動をゴールするまで繰り返す。
  4. 成功した行動を多く取り入れるようにパラメータθを更新する。
  5. 方策の変化量が閾値以下になるまで2~4を繰り返す。

パラメータθは深層学習の重みパラメータにあたるものです。

まずは必要なパッケージをインポートします。

1
2
3
4
5
6
# パッケージのインポート
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from matplotlib import animation
from IPython.display import HTML

迷路を作成します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 迷路の作成
fig = plt.figure(figsize=(3, 3))

# 壁
plt.plot([0, 3], [3, 3], color='k')
plt.plot([0, 3], [0, 0], color='k')
plt.plot([0, 0], [0, 2], color='k')
plt.plot([3, 3], [1, 3], color='k')
plt.plot([1, 1], [1, 2], color='k')
plt.plot([2, 3], [2, 2], color='k')
plt.plot([2, 1], [1, 1], color='k')
plt.plot([2, 2], [0, 1], color='k')

# 数字
for i in range(3):
for j in range(3):
plt.text(0.5+i, 2.5-j, str(i+j*3), size=20, ha='center', va='center')

# 円
circle, = plt.plot([0.5], [2.5], marker='o', color='#d3d3d3', markersize=40)

# 目盛りと枠の非表示
plt.tick_params(axis='both', which='both', bottom='off', top= 'off',
labelbottom='off', right='off', left='off', labelleft='off')
plt.box('off')

結果

パラメータθを準備します。
学習前は正しいパラメータθは不明なので、移動可能な方向は1、移動不可な方向はnp.nan(欠損値)で初期化します。
スタートのマスはインデックス0で、ゴールのマス(インデックス8)は存在しません。

1
2
3
4
5
6
7
8
9
10
# パラメータθの初期値の準備
theta_0 = np.array([
[np.nan, 1, 1, np.nan], # 0 上,右,下,左
[np.nan, 1, 1, 1], # 1
[np.nan, np.nan, np.nan, 1], # 2
[1, np.nan, 1, np.nan], # 3
[1, 1, np.nan, np.nan], # 4
[np.nan, np.nan, 1, 1], # 5
[1, 1, np.nan, np.nan], # 6
[np.nan, np.nan, np.nan, 1]]) # 7

パラメータθを方策に変換します。
変換関数にはソフトマックス関数を利用します。マスごとに合計を1になる実数値に落とし込む関数です。
例)[np,nan, 1, 1, np.nan] -> [0, 0.5, 0.5, 0]

1
2
3
4
5
6
7
8
9
10
# パラメータθを方策に変換
def get_pi(theta):
# ソフトマックス関数で変換
[m, n] = theta.shape
pi = np.zeros((m, n))
exp_theta = np.exp(theta)
for i in range(0, m):
pi[i, :] = exp_theta[i, :] / np.nansum(exp_theta[i, :])
pi = np.nan_to_num(pi)
return pi

パラメータθの初期値を方策に変換します。

1
2
3
# パラメータθの初期値を方策に変換
pi_0 = get_pi(theta_0)
print(pi_0)

列の合計が1になっていることが分かります。
結果

方策に従って行動を取得します。
np.random.choice()にて引数pの確率分布に従って、配列の要素をランダムに返します。

1
2
3
4
# 方策に従って行動を取得
def get_a(pi, s):
# 方策の確率に従って行動を返す
return np.random.choice([0, 1, 2, 3], p=pi[s])

行動に従って次の状態を取得する関数を定義します、
3 x 3 の迷路なので、左右移動は±1、上下移動は±3になります。

1
2
3
4
5
6
7
8
9
10
# 行動に従って次の状態を取得
def get_s_next(s, a):
if a == 0: # 上
return s - 3
elif a == 1: # 右
return s + 1
elif a == 2: # 下
return s + 3
elif a == 3: # 左
return s - 1

1エピソードを実行して、履歴を取得します。履歴は、[状態, 行動]のリストです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 1エピソード実行して履歴取得
def play(pi):
s = 0 # 状態
s_a_history = [[0, np.nan]] # 状態と行動の履歴

# エピソード完了までループ
while True:
# 方策に従って行動を取得
a = get_a(pi, s)

# 行動に従って次の状態を取得
s_next = get_s_next(s, a)

# 履歴の更新
s_a_history[-1][1] = a
s_a_history.append([s_next, np.nan])

# 終了判定
if s_next == 8:
break
else:
s = s_next

return s_a_history

1エピソードの実行と履歴を確認します。
ゴールまでにどのような経路をたどり、何ステップかかったかを把握することができます。

1
2
3
4
# 1エピソードの実行と履歴の確認
s_a_history = play(pi_0)
print(s_a_history)
print('1エピソードのステップ数:{}'.format(len(s_a_history)+1))

結果

パラメータθを更新します。
パラメータθに「学習係数」と「パラメータθの変化量」を掛けた値を加算します。
学習係数は1回の学習で更新される大きさです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def update_theta(theta, pi, s_a_history):
eta = 0.1 # 学習係数
total = len(s_a_history) - 1 # ゴールまでにかかった総ステップ数
[s_count, a_count] = theta.shape # 状態数, 行動数

# パラメータθの変化量の計算
delta_theta = theta.copy()
for i in range(0, s_count):
for j in range(0, a_count):
if not(np.isnan(theta[i, j])):
# ある状態である行動を採る回数
sa_ij = [sa for sa in s_a_history if sa == [i, j]]
n_ij = len(sa_ij)

# ある状態でなんらかの行動を採る回数
sa_i = [sa for sa in s_a_history if sa[0] == i]
n_i = len(sa_i)

# パラメータθの変化量
delta_theta[i, j] = (n_ij + pi[i, j] * n_i) / total

# パラメータθの更新
return theta + eta * delta_theta

エピソードを繰り返し実行して学習を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
stop_epsilon = 10**-3 # しきい値
theta = theta_0 # パラメータθ
pi = pi_0 # 方策

# エピソードを繰り返し実行して学習
for episode in range(1000):
# 1エピソード実行して履歴取得
s_a_history = play(pi)

# パラメータθの更新
theta = update_theta(theta, pi, s_a_history)

# 方策の更新
pi_new = get_pi(theta)

# 方策の変化量
pi_delta = np.sum(np.abs(pi_new-pi))
pi = pi_new

# 出力
print('エピソード: {}, ステップ: {}, 方策変化量: {:.4f}'.format(
episode, len(s_a_history)-1, pi_delta))

# 終了判定
if pi_delta < stop_epsilon: # 方策の変化量がしきい値以下
break

結果(エピソード0から27)
(中略)
結果(エピソード208から231)
ゴールへの最短ステップ数である4に少しずつ近づいているのが分かります。

最後の履歴をもとにアニメーション表示を行ってみます。

# アニメーションの定期処理を行う関数
def animate(i):
    state = s_a_history[i][0]
    circle.set_data((state % 3) + 0.5, 2.5 - int(state / 3))
    return circle

# アニメーションの表示
anim = animation.FuncAnimation(fig, animate, \
        frames=len(s_a_history), interval=1000, repeat=False)
HTML(anim.to_jshtml())```

{% youtube BUoy31PYMMY %}


([Google Colaboratory](https://colab.research.google.com/notebooks/welcome.ipynb)で動作確認しています。)

参考
> <cite>AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 <a href="https://www.borndigital.co.jp/book/14383.html" > サポートページ</a> </cite>