強化学習 AlphaZero 20 (どうぶつ将棋)

AlphaZeroのアルゴリズムを使ってどうぶつ将棋の対戦ゲームを作ります。

どうぶつ将棋は、駒の動きを簡易化した将棋で「ライオン」「ゾウ」「キリン」「ヒヨコ」の4種類の駒を使います。
駒の移動可能な方向は、駒に丸が描いてある方向になり、ライオンをとったら勝ちです。
また盤面は3×4の小さなものになります。

最初の盤面は次のようになります。

最初の盤面

ソースコード一覧は下記のようになります。

ソースコード 説明 3目並べとの相違
game.py ゲーム状態 全て
dual_network.py デュアルネットワーク パラメータのみ
pv_mcts.py モンテカルロ木探索 デュアルネットワークの入力変更
self_play.py セルフプレイ部 デュアルネットワークの入力変更
train_network.py パラメータ更新部 なし
evaluate_network.py 新パラメータ評価部 なし
train_cycle.py 学習サイクルの実行 ベストプレイヤーの評価を削除
human_play.py ゲームUI 全て

まずどうぶつ将棋のゲーム状態を作成します。
ゲームの盤面は駒が置かれているときは1、そうでないときは0としています。
相手の盤面は180度回転させて保持します。
持ち駒の有無は、有の場合は配列全要素を1、無の場合は配列全要素を0とします。

game.py
1
2
3
4
5
6
7
8
9
10
# ====================
# 簡易将棋
# ====================

# パッケージのインポート
import random
import math

# ゲームの状態
class State:

自分の駒の配置と相手の駒の配置を長さ15の1次元配列で保持します。
列(3)×行(4)+持ち駒種類数(3)で15です。

この1次元の配列にどの駒があるかは次の駒IDで管理します。

駒ID 説明
0 なし
1 ヒヨコ
2 ゾウ
3 キリン
4 ライオン
5 ヒヨコの持ち駒
6 ゾウの持ち駒
7 キリンの持ち駒

ほかの変数に関しては次の通りです。

  • depthは何ターン目かを表します。
  • dxyは8方向(下、左下、左、左上、上、右上、右、右下)を表します。
    自分の駒から8方向に対して、移動できるかどうかを計算する際に利用します。
game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 初期化
def __init__(self, pieces=None, enemy_pieces=None, depth=0):
# 方向定数
self.dxy = ((0, -1), (1, -1), (1, 0), (1, 1), (0, 1), (-1, 1), (-1, 0), (-1, -1))

# 駒の配置
self.pieces = pieces if pieces != None else [0] * (12+3)
self.enemy_pieces = enemy_pieces if enemy_pieces != None else [0] * (12+3)
self.depth = depth

# 駒の初期配置
if pieces == None or enemy_pieces == None:
self.pieces = [0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 4, 3, 0, 0, 0]
self.enemy_pieces = [0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 4, 3, 0, 0, 0]

負けかどうかを判定します。

game.py
1
2
3
4
5
6
# 負けかどうか
def is_lose(self):
for i in range(12):
if self.pieces[i] == 4: # ライオン存在
return False
return True

引き分けかどうかを判定します。

game.py
1
2
3
# 引き分けかどうか
def is_draw(self):
return self.depth >= 300 # 300手

ゲーム終了かどうかを判定します。

game.py
1
2
3
# ゲーム終了かどうか
def is_done(self):
return self.is_lose() or self.is_draw()

「駒の配置の2つの1次元配列(自分と相手の駒の配列)」をデュアルネットワークの入力(14個の2次元配列)に変換します。

game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# デュアルネットワークの入力の2次元配列の取得
def pieces_array(self):
# プレイヤー毎のデュアルネットワークの入力の2次元配列の取得
def pieces_array_of(pieces):
table_list = []
# 0:ヒヨコ, 1:ゾウ, 2:キリン, 3:ライオン,
for j in range(1, 5):
table = [0] * 12
table_list.append(table)
for i in range(12):
if pieces[i] == j:
table[i] = 1

# 4:ヒヨコの持ち駒, 5:ゾウの持ち駒, 6:キリンの持ち駒
for j in range(1, 4):
flag = 1 if pieces[11+j] > 0 else 0
table = [flag] * 12
table_list.append(table)
return table_list

# デュアルネットワークの入力の2次元配列の取得
return [pieces_array_of(self.pieces), pieces_array_of(self.enemy_pieces)]

駒の移動先と駒の移動元を行動に変換します。

game.py
1
2
3
# 駒の移動先と移動元を行動に変換
def position_to_action(self, position, direction):
return position * 11 + direction

行動を駒の移動先と移動元に変換します。

game.py
1
2
3
# 行動を駒の移動先と移動元に変換
def action_to_position(self, action):
return (int(action/11), action%11)

合法手のリストを取得します。
マスごとに駒の移動時と持ち駒の配置時の合法手を取得します。

game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 合法手のリストの取得
def legal_actions(self):
actions = []
for p in range(12):
# 駒の移動時
if self.pieces[p] != 0:
actions.extend(self.legal_actions_pos(p))

# 持ち駒の配置時
if self.pieces[p] == 0 and self.enemy_pieces[11-p] == 0:
for capture in range(1, 4):
if self.pieces[11+capture] != 0:
actions.append(self.position_to_action(p, 8-1+capture))
return actions

駒の移動時の合法手のリストを取得します。
駒の移動可能な方向を計算し、移動可能時は合法手として追加します。

game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 駒の移動時の合法手のリストの取得
def legal_actions_pos(self, position_src):
actions = []

# 駒の移動可能な方向
piece_type = self.pieces[position_src]
if piece_type > 4: piece_type-4
directions = []
if piece_type == 1: # ヒヨコ
directions = [0]
elif piece_type == 2: # ゾウ
directions = [1, 3, 5, 7]
elif piece_type == 3: # キリン
directions = [0, 2, 4, 6]
elif piece_type == 4: # ライオン
directions = [0, 1, 2, 3, 4, 5, 6, 7]

# 合法手の取得
for direction in directions:
# 駒の移動元
x = position_src%3 + self.dxy[direction][0]
y = int(position_src/3) + self.dxy[direction][1]
p = x + y * 3

# 移動可能時は合法手として追加
if 0 <= x and x <= 2 and 0<= y and y <= 3 and self.pieces[p] == 0:
actions.append(self.position_to_action(p, direction))
return actions

行動に応じた次の状態を取得します。

game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 次の状態の取得
def next(self, action):
# 次の状態の作成
state = State(self.pieces.copy(), self.enemy_pieces.copy(), self.depth+1)

# 行動を(移動先, 移動元)に変換
position_dst, position_src = self.action_to_position(action)

# 駒の移動
if position_src < 8:
# 駒の移動元
x = position_dst%3 - self.dxy[position_src][0]
y = int(position_dst/3) - self.dxy[position_src][1]
position_src = x + y * 3

# 駒の移動
state.pieces[position_dst] = state.pieces[position_src]
state.pieces[position_src] = 0

# 相手の駒が存在する時は取る
piece_type = state.enemy_pieces[11-position_dst]
if piece_type != 0:
if piece_type != 4:
state.pieces[11+piece_type] += 1 # 持ち駒+1
state.enemy_pieces[11-position_dst] = 0

# 持ち駒の配置
else:
capture = position_src-7
state.pieces[position_dst] = capture
state.pieces[11+capture] -= 1 # 持ち駒-1

# 駒の交代
w = state.pieces
state.pieces = state.enemy_pieces
state.enemy_pieces = w
return state

先手化どうかを取得します。

game.py
1
2
3
# 先手かどうか
def is_first_player(self):
return self.depth%2 == 0

ゲーム状態の文字列表示を行います。

各駒は次の文字列で表現されます。

駒の文字表現 説明
H 先手のヒヨコ
Z 先手のゾウ
K 先手のキリン
R 先手のライオン
h 後手のヒヨコ
z 後手のゾウ
k 後手のキリン
r 後手のライオン
game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 文字列表示
def __str__(self):
pieces0 = self.pieces if self.is_first_player() else self.enemy_pieces
pieces1 = self.enemy_pieces if self.is_first_player() else self.pieces
hzkr0 = ('', 'H', 'Z', 'K', 'R')
hzkr1 = ('', 'h', 'z', 'k', 'r')

# 後手の持ち駒
str = ' ['
for i in range(12, 15):
if pieces1[i] >= 2: str += ' ' + hzkr1[i-11]
if pieces1[i] >= 1: str += ' ' + hzkr1[i-11]
str += ' ]\n'

# ボード
for i in range(12):
if pieces0[i] != 0:
str += ' ' + hzkr0[pieces0[i]]
elif pieces1[11-i] != 0:
str += ' ' + hzkr1[pieces1[11-i]]
else:
str += ' -'
if i % 3 == 2:
str += '\n'

# 先手の持ち駒
str += ' ['
for i in range(12, 15):
if pieces0[i] >= 2: str += ' ' + hzkr0[i-11]
if pieces0[i] >= 1: str += ' ' + hzkr0[i-11]
str += ' ]\n'
return str

動作確認用にランダム対ランダムで対戦するコードを追加します。

game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# ランダムで行動選択
def random_action(state):
legal_actions = state.legal_actions()
return legal_actions[random.randint(0, len(legal_actions)-1)]

# 動作確認
if __name__ == '__main__':
# 状態の生成
state = State()

# ゲーム終了までのループ
while True:
# ゲーム終了時
if state.is_done():
break

# 次の状態の取得
state = state.next(random_action(state))

# 文字列表示
print(state)
print()

試しに実行したところ下記のような結果となりました。

最終結果(途中略)

次にデュアルネットワークを実装します。
入力シェイプは、(3, 4, 14)となります。各要素数の意味は「盤面(3×4)×(自分の駒の配置(4)+自分の持ち駒の配置(3)+相手の駒の配置(4)+相手の持ち駒の配置(3))」です。
行動数は132(駒の移動先12×駒の移動元11)となります。

dual_network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# ====================
# デュアルネットワークの作成
# ====================

# パッケージのインポート
from tensorflow.keras.layers import Activation, Add, BatchNormalization, Conv2D, Dense, GlobalAveragePooling2D, Input
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from tensorflow.keras import backend as K
import os

# パラメータの準備
DN_FILTERS = 128 # 畳み込み層のカーネル数(本家は256)
DN_RESIDUAL_NUM = 16 # 残差ブロックの数(本家は19)
DN_INPUT_SHAPE = (3, 4, 14) # 入力シェイプ
DN_OUTPUT_SIZE = 132 # 行動数(駒の移動先(12)*駒の移動元(11))

# 畳み込み層の作成
def conv(filters):
return Conv2D(filters, 3, padding='same', use_bias=False,
kernel_initializer='he_normal', kernel_regularizer=l2(0.0005))

# 残差ブロックの作成
def residual_block():
def f(x):
sc = x
x = conv(DN_FILTERS)(x)
x = BatchNormalization()(x)
x = Activation('relu')(x)
x = conv(DN_FILTERS)(x)
x = BatchNormalization()(x)
x = Add()([x, sc])
x = Activation('relu')(x)
return x
return f

# デュアルネットワークの作成
def dual_network():
# モデル作成済みの場合は無処理
if os.path.exists('./model/best.h5'):
return

# 入力層
input = Input(shape=DN_INPUT_SHAPE)

# 畳み込み層
x = conv(DN_FILTERS)(input)
x = BatchNormalization()(x)
x = Activation('relu')(x)

# 残差ブロック x 16
for i in range(DN_RESIDUAL_NUM):
x = residual_block()(x)

# プーリング層
x = GlobalAveragePooling2D()(x)

# ポリシー出力
p = Dense(DN_OUTPUT_SIZE, kernel_regularizer=l2(0.0005),
activation='softmax', name='pi')(x)

# バリュー出力
v = Dense(1, kernel_regularizer=l2(0.0005))(x)
v = Activation('tanh', name='v')(v)

# モデルの作成
model = Model(inputs=input, outputs=[p,v])

# モデルの保存
os.makedirs('./model/', exist_ok=True) # フォルダがない時は生成
model.save('./model/best.h5') # ベストプレイヤーのモデル

# モデルの破棄
K.clear_session()
del model

モンテカルロ木探索を実装します。デュアルネットワークの入力を変更します。(state.pieces_array())

pv_mcts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# ====================
# モンテカルロ木探索の作成
# ====================

# パッケージのインポート
from game import State
from dual_network import DN_INPUT_SHAPE
from math import sqrt
from tensorflow.keras.models import load_model
from pathlib import Path
import numpy as np

# パラメータの準備
PV_EVALUATE_COUNT = 50 # 1推論あたりのシミュレーション回数(本家は1600)

# 推論
def predict(model, state):
# 推論のための入力テ゛ータのシェイフ゜の変換
a, b, c = DN_INPUT_SHAPE
x = np.array(state.pieces_array())
x = x.reshape(c, a, b).transpose(1, 2, 0).reshape(1, a, b, c)

# 推論
y = model.predict(x, batch_size=1)

# 方策の取得
policies = y[0][0][list(state.legal_actions())] # 合法手のみ
policies /= sum(policies) if sum(policies) else 1 # 合計1の確率分布に変換

# 価値の取得
value = y[1][0][0]
return policies, value

# ノードのリストを試行回数のリストに変換
def nodes_to_scores(nodes):
scores = []
for c in nodes:
scores.append(c.n)
return scores

# モンテカルロ木探索のスコアの取得
def pv_mcts_scores(model, state, temperature):
# モンテカルロ木探索のノードの定義
class Node:
# ノードの初期化
def __init__(self, state, p):
self.state = state # 状態
self.p = p # 方策
self.w = 0 # 累計価値
self.n = 0 # 試行回数
self.child_nodes = None # 子ノード群

# 局面の価値の計算
def evaluate(self):
# ゲーム終了時
if self.state.is_done():
# 勝敗結果で価値を取得
value = -1 if self.state.is_lose() else 0

# 累計価値と試行回数の更新
self.w += value
self.n += 1
return value

# 子ノードが存在しない時
if not self.child_nodes:
# ニューラルネットワークの推論で方策と価値を取得
policies, value = predict(model, self.state)

# 累計価値と試行回数の更新
self.w += value
self.n += 1

# 子ノードの展開
self.child_nodes = []
for action, policy in zip(self.state.legal_actions(), policies):
self.child_nodes.append(Node(self.state.next(action), policy))
return value

# 子ノードが存在する時
else:
# アーク評価値が最大の子ノードの評価で価値を取得
value = -self.next_child_node().evaluate()

# 累計価値と試行回数の更新
self.w += value
self.n += 1
return value

# アーク評価値が最大の子ノードを取得
def next_child_node(self):
# アーク評価値の計算
C_PUCT = 1.0
t = sum(nodes_to_scores(self.child_nodes))
pucb_values = []
for child_node in self.child_nodes:
pucb_values.append((-child_node.w / child_node.n if child_node.n else 0.0) +
C_PUCT * child_node.p * sqrt(t) / (1 + child_node.n))

# アーク評価値が最大の子ノードを返す
return self.child_nodes[np.argmax(pucb_values)]

# 現在の局面のノードの作成
root_node = Node(state, 0)

# 複数回の評価の実行
for _ in range(PV_EVALUATE_COUNT):
root_node.evaluate()

# 合法手の確率分布
scores = nodes_to_scores(root_node.child_nodes)
if temperature == 0: # 最大値のみ1
action = np.argmax(scores)
scores = np.zeros(len(scores))
scores[action] = 1
else: # ボルツマン分布でバラつき付加
scores = boltzman(scores, temperature)
return scores

# モンテカルロ木探索で行動選択
def pv_mcts_action(model, temperature=0):
def pv_mcts_action(state):
scores = pv_mcts_scores(model, state, temperature)
return np.random.choice(state.legal_actions(), p=scores)
return pv_mcts_action

# ボルツマン分布
def boltzman(xs, temperature):
xs = [x ** (1 / temperature) for x in xs]
return [x / sum(xs) for x in xs]

セルフプレイを実装します。デュアルネットワークの入力を変更します。(state.pieces_array())

self_play.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# ====================
# セルフプレイ部
# ====================

# パッケージのインポート
from game import State
from pv_mcts import pv_mcts_scores
from dual_network import DN_OUTPUT_SIZE
from datetime import datetime
from tensorflow.keras.models import load_model
from tensorflow.keras import backend as K
from pathlib import Path
import numpy as np
import pickle
import os

# パラメータの準備
SP_GAME_COUNT = 500 # セルフプレイを行うゲーム数(本家は25000)
SP_TEMPERATURE = 1.0 # ボルツマン分布の温度パラメータ

# 先手プレイヤーの価値
def first_player_value(ended_state):
# 1:先手勝利, -1:先手敗北, 0:引き分け
if ended_state.is_lose():
return -1 if ended_state.is_first_player() else 1
return 0

# 学習データの保存
def write_data(history):
now = datetime.now()
os.makedirs('./data/', exist_ok=True) # フォルダがない時は生成
path = './data/{:04}{:02}{:02}{:02}{:02}{:02}.history'.format(
now.year, now.month, now.day, now.hour, now.minute, now.second)
with open(path, mode='wb') as f:
pickle.dump(history, f)

# 1ゲームの実行
def play(model):
# 学習データ
history = []

# 状態の生成
state = State()

while True:
# ゲーム終了時
if state.is_done():
break

# 合法手の確率分布の取得
scores = pv_mcts_scores(model, state, SP_TEMPERATURE)

# 学習データに状態と方策を追加
policies = [0] * DN_OUTPUT_SIZE
for action, policy in zip(state.legal_actions(), scores):
policies[action] = policy
history.append([state.pieces_array(), policies, None])

# 行動の取得
action = np.random.choice(state.legal_actions(), p=scores)

# 次の状態の取得
state = state.next(action)

# 学習データに価値を追加
value = first_player_value(state)
for i in range(len(history)):
history[i][2] = value
value = -value
return history

# セルフプレイ
def self_play():
# 学習データ
history = []

# ベストプレイヤーのモデルの読み込み
model = load_model('./model/best.h5')

# 複数回のゲームの実行
for i in range(SP_GAME_COUNT):
# 1ゲームの実行
h = play(model)
history.extend(h)

# 出力
print('\rSelfPlay {}/{}'.format(i+1, SP_GAME_COUNT), end='')
print('')

# 学習データの保存
write_data(history)

# モデルの破棄
K.clear_session()
del model

パラメータ更新部を実装します。変更はありません。

train_network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# ====================
# パラメータ更新部
# ====================

# パッケージのインポート
from dual_network import DN_INPUT_SHAPE
from tensorflow.keras.callbacks import LearningRateScheduler, LambdaCallback
from tensorflow.keras.models import load_model
from tensorflow.keras import backend as K
from pathlib import Path
import numpy as np
import pickle

# パラメータの準備
RN_EPOCHS = 100 # 学習回数

# 学習データの読み込み
def load_data():
history_path = sorted(Path('./data').glob('*.history'))[-1]
with history_path.open(mode='rb') as f:
return pickle.load(f)

# デュアルネットワークの学習
def train_network():
# 学習データの読み込み
history = load_data()
xs, y_policies, y_values = zip(*history)

# 学習のための入力テ゛ータのシェイフ゜の変換
a, b, c = DN_INPUT_SHAPE
xs = np.array(xs)
xs = xs.reshape(len(xs), c, a, b).transpose(0, 2, 3, 1)
y_policies = np.array(y_policies)
y_values = np.array(y_values)

# ベストプレイヤーのモデルの読み込み
model = load_model('./model/best.h5')

# モデルのコンパイル
model.compile(loss=['categorical_crossentropy', 'mse'], optimizer='adam')

# 学習率
def step_decay(epoch):
x = 0.001
if epoch >= 50: x = 0.0005
if epoch >= 80: x = 0.00025
return x
lr_decay = LearningRateScheduler(step_decay)

# 出力
print_callback = LambdaCallback(
on_epoch_begin=lambda epoch,logs:
print('\rTrain {}/{}'.format(epoch + 1,RN_EPOCHS), end=''))

# 学習の実行
model.fit(xs, [y_policies, y_values], batch_size=128, epochs=RN_EPOCHS,
verbose=0, callbacks=[lr_decay, print_callback])
print('')

# 最新プレイヤーのモデルの保存
model.save('./model/latest.h5')

# モデルの破棄
K.clear_session()
del model

新パラメータ評価部を実装します。変更はありません。

evaluate_network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# ====================
# 新パラメータ評価部
# ====================

# パッケージのインポート
from game import State
from pv_mcts import pv_mcts_action
from tensorflow.keras.models import load_model
from tensorflow.keras import backend as K
from pathlib import Path
from shutil import copy
import numpy as np

# パラメータの準備
EN_GAME_COUNT = 10 # 1評価あたりのゲーム数(本家は400)
EN_TEMPERATURE = 1.0 # ボルツマン分布の温度

# 先手プレイヤーのポイント
def first_player_point(ended_state):
# 1:先手勝利, 0:先手敗北, 0.5:引き分け
if ended_state.is_lose():
return 0 if ended_state.is_first_player() else 1
return 0.5

# 1ゲームの実行
def play(next_actions):
# 状態の生成
state = State()

# ゲーム終了までループ
while True:
# ゲーム終了時
if state.is_done():
break;

# 行動の取得
next_action = next_actions[0] if state.is_first_player() else next_actions[1]
action = next_action(state)

# 次の状態の取得
state = state.next(action)

# 先手プレイヤーのポイントを返す
return first_player_point(state)

# ベストプレイヤーの交代
def update_best_player():
copy('./model/latest.h5', './model/best.h5')
print('Change BestPlayer')

# ネットワークの評価
def evaluate_network():
# 最新プレイヤーのモデルの読み込み
model0 = load_model('./model/latest.h5')

# ベストプレイヤーのモデルの読み込み
model1 = load_model('./model/best.h5')

# PV MCTSで行動選択を行う関数の生成
next_action0 = pv_mcts_action(model0, EN_TEMPERATURE)
next_action1 = pv_mcts_action(model1, EN_TEMPERATURE)
next_actions = (next_action0, next_action1)

# 複数回の対戦を繰り返す
total_point = 0
for i in range(EN_GAME_COUNT):
# 1ゲームの実行
if i % 2 == 0:
total_point += play(next_actions)
else:
total_point += 1 - play(list(reversed(next_actions)))

# 出力
print('\rEvaluate {}/{}'.format(i + 1, EN_GAME_COUNT), end='')
print('')

# 平均ポイントの計算
average_point = total_point / EN_GAME_COUNT
print('AveragePoint', average_point)

# モデルの破棄
K.clear_session()
del model0
del model1

# ベストプレイヤーの交代
if average_point > 0.5:
update_best_player()
return True
else:
return False

学習サイクルを実行します。ベストプレイヤーの評価は削除しています。
この処理を実行するとベストプレイヤーのモデルが作成されます。(model/best.h5)

学習完了までにCorei5、メモリ4G、GPUなしのPCでまる2日ほどかかりました。

train_cycle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# ====================
# 学習サイクルの実行
# ====================

# パッケージのインポート
from dual_network import dual_network
from self_play import self_play
from train_network import train_network
from evaluate_network import evaluate_network

# デュアルネットワークの作成
dual_network()

for i in range(10):
print('Train',i,'====================')
# セルフプレイ部
self_play()

# パラメータ更新部
train_network()

# 新ハ゜ラメータ評価部
evaluate_network()

ゲームUIを実装します。

human_play.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# ====================
# 人とAIの対戦
# ====================

# パッケージのインポート
from game import State
from pv_mcts import pv_mcts_action
from tensorflow.keras.models import load_model
from pathlib import Path
from threading import Thread
import tkinter as tk
from PIL import Image, ImageTk

# ベストプレイヤーのモデルの読み込み
model = load_model('./model/best.h5')

# ゲームUIの定義
class GameUI(tk.Frame):

ゲーム状態の初期化で下記の3点を準備します。

  • ゲーム状態
  • モンテカルロ木探索で行動を行う関数
  • キャンバス
human_play.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 初期化
def __init__(self, master=None, model=None):
tk.Frame.__init__(self, master)
self.master.title('簡易将棋')

# ゲーム状態の生成
self.state = State()
self.select = -1 # 選択(-1:なし, 0~11:マス, 12~14:持ち駒)

# 方向定数
self.dxy = ((0, -1), (1, -1), (1, 0), (1, 1), (0, 1), (-1, 1), (-1, 0), (-1, -1))

# PV MCTSで行動選択を行う関数の生成
self.next_action = pv_mcts_action(model, 0.0)

# イメージの準備
self.images = [(None, None, None, None)]
for i in range(1, 5):
image = Image.open('piece{}.png'.format(i))
self.images.append((
ImageTk.PhotoImage(image),
ImageTk.PhotoImage(image.rotate(180)),
ImageTk.PhotoImage(image.resize((40, 40))),
ImageTk.PhotoImage(image.resize((40, 40)).rotate(180))))

# キャンバスの生成
self.c = tk.Canvas(self, width = 240, height = 400, highlightthickness = 0)
self.c.bind('<Button-1>', self.turn_of_human)
self.c.pack()

# 描画の更新
self.on_draw()

人間のターンの処理を行います。

  1. ゲーム終了時はゲーム状態を初期状態に戻します。
  2. 先手でないとき操作不可とします。
  3. 持ち駒の種類を取得します。
  4. 駒の選択と移動の位置の計算を行います。
  5. 駒が未選択の場合は駒を選択し、駒の移動指定を促します。
  6. 駒の選択と移動の位置を行動に変換します。
  7. 合法手でない場合は、駒の選択を解除します。
  8. 合法手の場合は、次の状態を取得して描画の更新を行います。
  9. AIのターンへ遷移します。
human_play.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 人間のターン
def turn_of_human(self, event):
# ゲーム終了時
if self.state.is_done():
self.state = State()
self.on_draw()
return

# 先手でない時
if not self.state.is_first_player():
return

# 持ち駒の種類の取得
captures = []
for i in range(3):
if self.state.pieces[12+i] >= 2: captures.append(1+i)
if self.state.pieces[12+i] >= 1: captures.append(1+i)

# 駒の選択と移動の位置の計算(0?11:マス, 12?14:持ち駒)
p = int(event.x/80) + int((event.y-40)/80) * 3
if 40 <= event.y and event.y <= 360:
select = p
elif event.x < len(captures) * 40 and event.y > 360:
select = 12 + int(event.x/40)
else:
return

# 駒の選択
if self.select < 0:
self.select = select
self.on_draw()
return

# 駒の選択と移動を行動に変換
action = -1
if select < 12:
# 駒の移動時
if self.select < 12:
action = self.state.position_to_action(p, self.position_to_direction(self.select, p))
# 持ち駒の配置時
else:
action = self.state.position_to_action(p, 8-1+captures[self.select-12])

# 合法手でない時
if not (action in self.state.legal_actions()):
self.select = -1
self.on_draw()
return

# 次の状態の取得
self.state = self.state.next(action)
self.select = -1
self.on_draw()

# AIのターン
self.master.after(1, self.turn_of_ai)

AIのターンの処理を行います。

  1. ゲーム終了時は無処理とします。
  2. デュアルネットワークで行動を取得します。
  3. 取得した行動に応じて次の状態を取得し、描画の更新を行います。
human_play.py
1
2
3
4
5
6
7
8
9
10
11
12
# AIのターン
def turn_of_ai(self):
# ゲーム終了時
if self.state.is_done():
return

# 行動の取得
action = self.next_action(self.state)

# 次の状態の取得
self.state = self.state.next(action)
self.on_draw()

駒の移動先を駒の移動方向に変換します。

human_play.py
1
2
3
4
5
6
7
# 駒の移動先を駒の移動方向に変換
def position_to_direction(self, position_src, position_dst):
dx = position_dst%3-position_src%3
dy = int(position_dst/3)-int(position_src/3)
for i in range(8):
if self.dxy[i][0] == dx and self.dxy[i][1] == dy: return i
return 0

駒の描画を行います。

human_play.py
1
2
3
4
5
6
# 駒の描画
def draw_piece(self, index, first_player, piece_type):
x = (index%3)*80
y = int(index/3)*80+40
index = 0 if first_player else 1
self.c.create_image(x, y, image=self.images[piece_type][index], anchor=tk.NW)

持ち駒の描画を行います。

human_play.py
1
2
3
4
5
6
7
8
9
# 持ち駒の描画
def draw_capture(self, first_player, pieces):
index, x, dx, y = (2, 0, 40, 360) if first_player else (3, 200, -40, 0)
captures = []
for i in range(3):
if pieces[12+i] >= 2: captures.append(1+i)
if pieces[12+i] >= 1: captures.append(1+i)
for i in range(len(captures)):
self.c.create_image(x+dx*i, y, image=self.images[captures[i]][index], anchor=tk.NW)

カーソルの描画を行います。

human_play.py
1
2
3
4
5
6
# カーソルの描画
def draw_cursor(self, x, y, size):
self.c.create_line(x+1, y+1, x+size-1, y+1, width = 4.0, fill = '#FF0000')
self.c.create_line(x+1, y+size-1, x+size-1, y+size-1, width = 4.0, fill = '#FF0000')
self.c.create_line(x+1, y+1, x+1, y+size-1, width = 4.0, fill = '#FF0000')
self.c.create_line(x+size-1, y+1, x+size-1, y+size-1, width = 4.0, fill = '#FF0000')

全てのマス目、駒、持ち駒、選択カーソルを描画します。

human_play.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 描画の更新
def on_draw(self):
# マス目
self.c.delete('all')
self.c.create_rectangle(0, 0, 240, 400, width = 0.0, fill = '#EDAA56')
for i in range(1,3):
self.c.create_line(i*80+1, 40, i*80, 360, width = 2.0, fill = '#000000')
for i in range(5):
self.c.create_line(0, 40+i*80, 240, 40+i*80, width = 2.0, fill = '#000000')

# 駒
for p in range(12):
p0, p1 = (p, 11-p) if self.state.is_first_player() else (11-p, p)
if self.state.pieces[p0] != 0:
self.draw_piece(p, self.state.is_first_player(), self.state.pieces[p0])
if self.state.enemy_pieces[p1] != 0:
self.draw_piece(p, not self.state.is_first_player(), self.state.enemy_pieces[p1])

# 持ち駒
self.draw_capture(self.state.is_first_player(), self.state.pieces)
self.draw_capture(not self.state.is_first_player(), self.state.enemy_pieces)

# 選択カーソル
if 0 <= self.select and self.select < 12:
self.draw_cursor(int(self.select%3)*80, int(self.select/3)*80+40, 80)
elif 12 <= self.select:
self.draw_cursor((self.select-12)*40, 360, 40)

ゲームUIを実行します、

human_play.py
1
2
3
4
#ゲームUIの実行
f = GameUI(model = model)
f.pack()
f.mainloop()

human_play.pyを実行するとどうぶつ将棋が始まりAIと対戦することができます。
最初のクリックで移動する駒を選択し、2回目のクリックで移動先を選択します。先手は人間となります。

対戦中

かなり弱いAIで楽勝で勝ててしまいます。もうすこし学習サイクルを増やすべきでしょうか。

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ