ブロックチェーンで広がるIoT(5) - 画像や音声も記録可能

画像や音声も記録可能

IoTが普及してくと、データの自動収集がますます一般的になっていきます。

ブロックチェーンは、取引データだけではなくテキスト・画像・音声も記録することができます。

データを自動的に蓄積し、流通可能になるとビッグデータとしての価値が生まれていきます。

例.防犯カメラ

防犯カメラの映像を数分おきにブロックチェーンに記録することで、映像証拠の信頼性をあげることができます。

画像データのサイズが大きい場合は画像から生成したハッシュ値だけをブロックチェーン上に記録し、画像は手元のハードディスクに保存しておくという方法もあります。

インターネットに接続できる防犯カメラであれば、ハッキングされて画像が差し替えられてしまう可能性もありますが、画像を録画した時点でハッシュ値を算出して署名をつけてブロックチェーンに記録しておけば、改ざんの検知が簡単にできるようになります。

ブロックチェーンで広がるIoT(4) - データが改ざんねつ造されない

データが改ざんねつ造されない

IoT端末は一般家庭から企業まで普及が進んでいっており、様々な分野のデータを取集することができるようになりつつあります。

ブロックチェーンで格納されているデータは改ざんやねつ造されていないということを簡単に検証することができます。

例.マラソン大会

例としてマラソン大会を考えてみます。

マラソン大会の参加者全員がスマホを装着し、10キロ、20キロ、30キロ地点ごとに、互いの記録を登録/承認/書き込みしておけば、あとから主催者や第三者に順位やタイムを書き換えられるのは困難になります。

ブロックチェーンで広がるIoT(3) - 記録したデータを手元に残したまま共有

記録したデータを手元に残したまま共有

データの記録はマイニングによって分散管理台帳(ブロックチェーン)に誰でも追加できます。

データ管理はブロックチェーン・ネットワークの各ノードにより改ざん等の不正を検出することができます。

またブロックチェーンをさかのぼることで、膨大なデータから必要な情報だけを取り出すこともできます。

データのタグ付け

膨大なデータを全てブロックチェーンに記録するのは効率が悪いため、データのタグ(所在や概要、キーワード)だけをを記録しておき、データそのものは個々のPCに保存するといった方法をとることもできます。

ブロックチェーンで広がるIoT(2) - データを時系列で記録

時系列データ記録

ブロックチェーンでデータを格納すると、どのIoT端末から「いつ、どこでから」データを収集したかがブロックに記録されるので、時系列データとして扱えます。

IoT端末を使うと自動でデータを収集することができますが、たとえIoT端末が時間を持っていない場合でも、データをブロックチェーンに書き込んだ時点の時刻は残ります。

書き込まれる時刻は現在時刻ではなく前ブロックから現ブロックまでの相対時間が記録されます。

このため国や地域の時刻や時差を意識しないで済むようになり、時刻の一元管理が不要になっています。

中央集権型のシステムのように、サーバが時刻を一元管理して同期させる仕組みがなくても、各ブロックの前後関係を表すことが可能なのです。

ブロックチェーンで広がるIoT(1) - データ収集

データ収集

データ収集の方法の1つにオープン・データの利用がありますが、これには欠点がありデータ提供者にインセンティブが働きませんでした。

ブロックチェーン上でデータ提供を受けつつ、それの対価をリアルタイムに支払えるようになると、データを提供してもらいやすくなる可能性が高まります。

IoT端末を用いて、リアルタイム性の高い膨大なデータを取引する方法が、データ・マーケットプレイス(例:米国のFactual)で考え出されました。

利用者はデータに対して暗号通貨などで提供者に報酬を支払うので、たくさんの情報がリアルタイムに提供されるようになります。

身の回りから大量のデータが発生しているので、誰もがデータ・マーケットプレイスを通じてデータが売り買いできるようになります。

IoTで取得できる情報例

IoTで取得できる情報の例としては次のようなものが挙げられます。

  • 自動車の走行情報や道路状況
  • スマート・ウォッチの活動量や心拍数の情報
  • 家電製品の利用情報
  • ソーラ・パネルの発電記録や発電効率の情報
  • 医療記録や市販薬の摂取記録
  • 金融取引の売買記録
  • NFCによる電子マネー取引

さまざまな国や企業がデータ作成

市場参加者が増えないと価値形成が偏ったものになり、データの価値が正当に評価されなくなる可能性があります。

さまざまな価値観を持つ国、組織、個人が集まることで、これまでは国や世界のトップ企業でしか収集できなかったような情報を手軽に入手したり、分析できるようになります。

今後、ますますデータの価値が高まっていくのは間違いありません。

ブロックチェーンのポテンシャル

ブロックチェーンとは

ブロックチェーン(Blockchain)は、分散型台帳技術 または 分散型ネットワーク技術 と呼ばれています。

ブロックチェーンは次のような特徴があります。

  • データをいつでも追跡・取得できる。
  • 相手にデータを提供してもらったら対価を払える。
  • センサーや制御、取引のデータを時系列に保存できる。
  • 中心となるサーバがなくてもデータを共有できる。

IoTへの応用

暗号通貨や決済のような金融分野への利用はすでに行われています。

金融分野への利用だけでなくブロックチェーンを使えば、IoTなどの組み込み機器おいていろいろと応用することができるようになります。

具体的には次のようなIoTや人工知能と組み合わせた応用が考えられます。

  • 食品管理
  • 農業支援
  • 契約書
  • 証明書
  • 履歴書管理
  • 課税
  • 特許申請
  • 登記簿
  • 医療記録
  • 議事録
  • 議決権行使の記録
  • 分散型インターネットのプログラム

ブロックチェーンは記録することだけに特化した技術ではなく、分散型インターネットのプログラムのような分野への発展も進んでいます。

Satoshi Nakamoto論文

ビットコインを語るうえで、Satoshi Nakamotoの論文を一読するのは大変有益かと思います。

Satoshi Nakamoto;Bitcoin: A Peer-to-Peer Electronic Cash System.

この論文だけでビットコインを理解することは難しいですが、そのポイントを少々引用していきたいと思います。

中央集権から分散管理

ビットコインが従来の電子通貨と最も異なる点はトランザクション・データを管理する中央集権が存在しないということです。

トランザクション・データを分散生成/処理/管理することによって、中央集権システムの限界を突破し、世界中での流通を可能としました。

電子署名で取引データの正しさを保証

トランザクション・データとはあるビットコインの口座から別のビットコイン口座へ何ビットコイン振り込むかというデータです。

トランザクション・データに振込元の口座の持ち主が電子署名を施すことによって、そのトランザクションは持ち主が意図するものであることを保証できます。

取引の不整合を防ぐ

電子署名は個々のトランザクション・データの正しいことを保証しますが、これだけでは全体の整合性を保証することができません。

ビットコインでは、トランザクション・データを全て公開し、相互監視することによって、データ不整合をチェックし、不整合のあるデータはブロックチェーンに登録しないようにします。

マイニングで取引を承認

トランザクション・データは10分ごとに1つのブロックにまとめて管理されます。

分散管理にすると複数のブロックが競合することがありますが、ビットコインでは最終的には一意のデータに収束します。

その理由は、採掘(マイニング)報酬による競争原理を導入したことであり、競争に勝ったブロックに含まれるトランザクション・データだけが有効であるというルールがあるからです。

マイニングとは「ノンス」を探すこと

各ブロックのハッシュ値は、次のブロックに登録され、そのハッシュ値を含む次のブロックのハッシュ値が、さらにその次のブロックに登録されます。

このようにブロックをチェーン上につなぐのがブロックチェーンの名前の由来です。

ハッシュ値は決められた数のゼロビットを含まなければならず、そのためにノンス(nonce)と呼ばれる値を増加させながらハッシュ値の計算を繰り返さなければなりません。

マイニングする人には「報酬」が支払われる

ブロックを作った人は新しいコインをもらうことができ、中央集権のないシステムとって新しいコインを発行する機会にもなります。

インフレーションを防ぐために、ビットコインの発行には上限があります。

Kaggle - ConnectX(4) - 4枚そろえるボードゲーム

Connect Xコンペに関する4回目の記事です。

Connect X

機械学習ではない、ルールベースのエージェントがありましたので試しに実行してみます。

ルールベースのエージェント

ConnectX Rule-Based

エージェントの実装だけ抽出してみます。

[ソース]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def my_agent(obs, conf):
def get_results(x, y, mark, multiplier):
""" get list of points, lowest cells and "in air" cells of a board[x][y] cell considering mark """
# set board[x][y] as mark
board[x][y] = mark
results = []
# if some points in axis already found - axis blocked
blocked = [False, False, False, False]
# i is amount of marks required to add points
for i in range(conf.inarow, 2, -1):
# points
p = 0
# lowest cell
lc = 0
# "in air" points
ap = 0
# axis S -> N, only if one mark required for victory
if i == conf.inarow and blocked[0] is False:
(p, lc, ap, blocked[0]) = process_results(p, lc, ap,
check_axis(mark, i, x, lambda z : z, y + inarow_m1, lambda z : z - 1))
# axis SW -> NE
if blocked[1] is False:
(p, lc, ap, blocked[1]) = process_results(p, lc, ap,
check_axis(mark, i, x - inarow_m1, lambda z : z + 1, y + inarow_m1, lambda z : z - 1))
# axis E -> W
if blocked[2] is False:
(p, lc, ap, blocked[2]) = process_results(p, lc, ap,
check_axis(mark, i, x + inarow_m1, lambda z : z - 1, y, lambda z : z))
# axis SE -> NW
if blocked[3] is False:
(p, lc, ap, blocked[3]) = process_results(p, lc, ap,
check_axis(mark, i, x + inarow_m1, lambda z : z - 1, y + inarow_m1, lambda z : z - 1))
results.append((p * multiplier, lc, ap))
# restore board[x][y] original value
board[x][y] = 0
return results

def check_axis(mark, inarow, x, x_fun, y, y_fun):
""" check axis (NE -> SW etc.) for lowest cell and amounts of points and "in air" cells """
(x, y, axis_max_range) = get_x_y_and_axis_max_range(x, x_fun, y, y_fun)
zeros_allowed = conf.inarow - inarow
#lowest_cell = y
# lowest_cell calculation turned off
lowest_cell = 0
for i in range(axis_max_range):
x_temp = x
y_temp = y
zeros_remained = zeros_allowed
marks = 0
# amount of empty cells that are "in air" (don't have board bottom or mark under them)
in_air = 0
for j in range(conf.inarow):
if board[x_temp][y_temp] != mark and board[x_temp][y_temp] != 0:
break
elif board[x_temp][y_temp] == mark:
marks += 1
# board[x_temp][y_temp] is 0
else:
zeros_remained -= 1
if (y_temp + 1) < conf.rows and board[x_temp][y_temp + 1] == 0:
in_air -= 1
# if y_temp > lowest_cell:
# lowest_cell = y_temp
if marks == inarow and zeros_remained == 0:
return (sp, lowest_cell, in_air, True)
x_temp = x_fun(x_temp)
y_temp = y_fun(y_temp)
if y_temp < 0 or y_temp >= conf.rows or x_temp < 0 or x_temp >= conf.columns:
return (0, 0, 0, False)
x = x_fun(x)
y = y_fun(y)
return (0, 0, 0, False)

def get_x_y_and_axis_max_range(x, x_fun, y, y_fun):
""" set x and y inside board boundaries and get max range of axis """
axis_max_range = conf.inarow
while y < 0 or y >= conf.rows or x < 0 or x >= conf.columns:
x = x_fun(x)
y = y_fun(y)
axis_max_range -= 1
return (x, y, axis_max_range)

def process_results(p, lc, ap, axis_check_results):
""" process results of check_axis function, return lowest cell and sums of points and "in air" cells """
(points, lowest_cell, in_air, blocked) = axis_check_results
if points > 0:
if lc < lowest_cell:
lc = lowest_cell
ap += in_air
p += points
return (p, lc, ap, blocked)

def get_best_cell(best_cell, current_cell):
""" get best cell by comparing factors of cells """
for i in range(len(current_cell["factors"])):
# index 0 = points, 1 = lowest cell, 2 = "in air" cells
for j in range(3):
# if value of best cell factor is smaller than value of
# the same factor in the current cell
# best cell = current cell and break the loop,
# don't compare lower priority factors
if best_cell["factors"][i][j] < current_cell["factors"][i][j]:
return current_cell
# if value of best cell factor is bigger than value of
# the same factor in the current cell
# break loop and don't compare lower priority factors
if best_cell["factors"][i][j] > current_cell["factors"][i][j]:
return best_cell
return best_cell

def get_factors(results):
""" get list of factors represented by results and ordered by priority from highest to lowest """
factors = []
for i in range(conf.inarow - 2):
if i == 1:
# my checker in this cell means my victory two times
factors.append(results[0][0][i] if results[0][0][i][0] > st else (0, 0, 0))
# opponent's checker in this cell means my defeat two times
factors.append(results[0][1][i] if results[0][1][i][0] > st else (0, 0, 0))
# if there are results of a cell one row above current
if len(results) > 1:
# opponent's checker in cell one row above current means my defeat two times
factors.append(results[1][1][i] if -results[1][1][i][0] > st else (0, 0, 0))
# my checker in cell one row above current means my victory two times
factors.append(results[1][0][i] if -results[1][0][i][0] > st else (0, 0, 0))
else:
for j in range(2):
factors.append((0, 0, 0))
else:
for j in range(2):
factors.append((0, 0, 0))
for j in range(2):
factors.append((0, 0, 0))
# consider only if there is no "in air" cells
if results[0][1][i][2] == 0:
# placing opponent's checker in this cell means opponent's victory
factors.append(results[0][1][i])
else:
factors.append((0, 0, 0))
# placing my checker in this cell means my victory
factors.append(results[0][0][i])
# central column priority
factors.append((1 if i == 1 and shift == 0 else 0, 0, 0))
# if there are results of a cell one row above current
if len(results) > 1:
# opponent's checker in cell one row above current means my defeat
factors.append(results[1][1][i])
# my checker in cell one row above current means my victory
factors.append(results[1][0][i])
else:
for j in range(2):
factors.append((0, 0, 0))
# if there are results of a cell two rows above current
if len(results) > 2:
for i in range(conf.inarow - 2):
# my checker in cell two rows above current means my victory
factors.append(results[2][0][i])
# opponent's checker in cell two rows above current means my defeat
factors.append(results[2][1][i])
else:
for i in range(conf.inarow - 2):
for j in range(2):
factors.append((0, 0, 0))
return factors


# define my mark and opponent's mark
my_mark = obs.mark
opp_mark = 2 if my_mark == 1 else 1

# define board as two dimensional array
board = []
for column in range(conf.columns):
board.append([])
for row in range(conf.rows):
board[column].append(obs.board[conf.columns * row + column])

best_cell = None
board_center = conf.columns // 2
inarow_m1 = conf.inarow - 1

# standard amount of points
sp = 1
# "seven" pattern threshold points
st = 1

# start searching for best_cell from board center
x = board_center

# shift to right or left from board center
shift = 0

# searching for best_cell
while x >= 0 and x < conf.columns:
# find first empty cell starting from bottom of the column
y = conf.rows - 1
while y >= 0 and board[x][y] != 0:
y -= 1
# if column is not full
if y >= 0:
# results of current cell and cells above it
results = []
results.append((get_results(x, y, my_mark, 1), get_results(x, y, opp_mark, 1)))
# if possible, get results of a cell one row above current
if (y - 1) >= 0:
results.append((get_results(x, y - 1, my_mark, -1), get_results(x, y - 1, opp_mark, -1)))
# if possible, get results of a cell two rows above current
if (y - 2) >= 0:
results.append((get_results(x, y - 2, my_mark, 1), get_results(x, y - 2, opp_mark, 1)))

# list of factors represented by results
# ordered by priority from highest to lowest
factors = get_factors(results)

# if best_cell is not yet found
if best_cell is None:
best_cell = {
"column": x,
"factors": factors
}
# compare values of factors in best cell and current cell
else:
current_cell = {
"column": x,
"factors": factors
}
best_cell = get_best_cell(best_cell, current_cell)

# shift x to right or left from board center
if shift >= 0: shift += 1
shift *= -1
x = board_center + shift

# return index of the best cell column
return best_cell["column"]

エージェントの評価

ランダム選択の相手との結果と、NegaMax法の相手との結果(平均報酬)を表示します。

[ソース]

1
2
3
4
5
6
def mean_reward(rewards):
return sum(r[0] for r in rewards) / float(len(rewards))

# Run multiple episodes to estimate its performance.
print("My Agent vs Random Agent:", mean_reward(evaluate("connectx", [my_agent, "random"], num_episodes=10)))
print("My Agent vs Negamax Agent:", mean_reward(evaluate("connectx", [my_agent, "negamax"], num_episodes=10)))

[結果]

ランダム相手には完勝しており、NegaMax法の相手との結果もかなり勝ち越しています。

なかなか強いエージェントかとも思ったのですが、Kaggleに提出したところスコア600でした。

強いエージェントとは言えないようです。

Kaggle - ConnectX(3) - 4枚そろえるボードゲーム

Connect Xコンペに関する3回目の記事です。

Connect X

今回は、スコア1029.1を叩き出している「Cell Swarm」というノートブックのエージェントを参考にさせて頂きました。

Cell Swarm

Cell Swarmノートブックのエージェント

エージェントの実装だけ抽出してみます。

Swarmは「群れ」という意味で、Cell Swarmだと「セルの群れ」とか「セルの集まり」とかいう意味でしょうか。

処理はコメントをご参照ください。

[ソース]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def my_agent(obs, conf):

def evaluate_cell(cell):
""" evaluate qualities of the cell """
# セルの品質を評価。パターンを取得して、そのセルのポイント付けをしているみたい。
cell = get_patterns(cell)
cell = calculate_points(cell)
for i in range(1, conf.rows):
cell = explore_cell_above(cell, i)
return cell

def get_patterns(cell):
""" get swarm and opponent's patterns of each axis of the cell """
# 群れと対戦相手のセルの各軸パターンを取得。
ne = get_pattern(cell["x"], lambda z : z + 1, cell["y"], lambda z : z - 1, conf.inarow)
sw = get_pattern(cell["x"], lambda z : z - 1, cell["y"], lambda z : z + 1, conf.inarow)[::-1]
cell["swarm_patterns"]["NE_SW"] = sw + [{"mark": swarm_mark}] + ne
cell["opp_patterns"]["NE_SW"] = sw + [{"mark": opp_mark}] + ne
e = get_pattern(cell["x"], lambda z : z + 1, cell["y"], lambda z : z, conf.inarow)
w = get_pattern(cell["x"], lambda z : z - 1, cell["y"], lambda z : z, conf.inarow)[::-1]
cell["swarm_patterns"]["E_W"] = w + [{"mark": swarm_mark}] + e
cell["opp_patterns"]["E_W"] = w + [{"mark": opp_mark}] + e
se = get_pattern(cell["x"], lambda z : z + 1, cell["y"], lambda z : z + 1, conf.inarow)
nw = get_pattern(cell["x"], lambda z : z - 1, cell["y"], lambda z : z - 1, conf.inarow)[::-1]
cell["swarm_patterns"]["SE_NW"] = nw + [{"mark": swarm_mark}] + se
cell["opp_patterns"]["SE_NW"] = nw + [{"mark": opp_mark}] + se
s = get_pattern(cell["x"], lambda z : z, cell["y"], lambda z : z + 1, conf.inarow)
n = get_pattern(cell["x"], lambda z : z, cell["y"], lambda z : z - 1, conf.inarow)[::-1]
cell["swarm_patterns"]["S_N"] = n + [{"mark": swarm_mark}] + s
cell["opp_patterns"]["S_N"] = n + [{"mark": opp_mark}] + s
return cell

def get_pattern(x, x_fun, y, y_fun, cells_remained):
""" get pattern of marks in direction """
# ある方向へのマークパターンを取得
pattern = []
x = x_fun(x)
y = y_fun(y)
# if cell is inside swarm's borders
# セルが群れの境界内にある場合
if y >= 0 and y < conf.rows and x >= 0 and x < conf.columns:
pattern.append({
"mark": swarm[x][y]["mark"]
})
# amount of cells to explore in this direction
# ある方向へのセルの総数
cells_remained -= 1
if cells_remained > 1:
pattern.extend(get_pattern(x, x_fun, y, y_fun, cells_remained))
return pattern

def calculate_points(cell):
""" calculate amounts of swarm's and opponent's correct patterns and add them to cell's points """
for i in range(conf.inarow - 1):
# inarow = amount of marks in pattern to consider that pattern as correct
inarow = conf.inarow - i
swarm_points = 0
opp_points = 0
# calculate swarm's points and depth
# 群れのポイントと深さを計算
swarm_points = evaluate_pattern(swarm_points, cell["swarm_patterns"]["E_W"], swarm_mark, inarow)
swarm_points = evaluate_pattern(swarm_points, cell["swarm_patterns"]["NE_SW"], swarm_mark, inarow)
swarm_points = evaluate_pattern(swarm_points, cell["swarm_patterns"]["SE_NW"], swarm_mark, inarow)
swarm_points = evaluate_pattern(swarm_points, cell["swarm_patterns"]["S_N"], swarm_mark, inarow)
# calculate opponent's points and depth
# 対戦相手のポイントと深さを計算
opp_points = evaluate_pattern(opp_points, cell["opp_patterns"]["E_W"], opp_mark, inarow)
opp_points = evaluate_pattern(opp_points, cell["opp_patterns"]["NE_SW"], opp_mark, inarow)
opp_points = evaluate_pattern(opp_points, cell["opp_patterns"]["SE_NW"], opp_mark, inarow)
opp_points = evaluate_pattern(opp_points, cell["opp_patterns"]["S_N"], opp_mark, inarow)
# if more than one mark required for victory
# 勝つために1つ以上のマークが必要かどうか
if i > 0:
# swarm_mark or opp_mark priority
# 自分のマークと対戦相手のマークの優先順位
if swarm_points > opp_points:
cell["points"].append(swarm_points)
cell["points"].append(opp_points)
else:
cell["points"].append(opp_points)
cell["points"].append(swarm_points)
else:
cell["points"].append(swarm_points)
cell["points"].append(opp_points)
return cell

def evaluate_pattern(points, pattern, mark, inarow):
""" get amount of points, if pattern has required amounts of marks and zeros """
# saving enough cells for required amounts of marks and zeros
# マーク数と非マーク数の総数を保存する
for i in range(len(pattern) - (conf.inarow - 1)):
marks = 0
zeros = 0
# check part of pattern for required amounts of marks and zeros
# マーク数と非マーク数の総数をチェックする
for j in range(conf.inarow):
if pattern[i + j]["mark"] == mark:
marks += 1
elif pattern[i + j]["mark"] == 0:
zeros += 1
if marks >= inarow and (marks + zeros) == conf.inarow:
return points + 1
return points

def explore_cell_above(cell, i):
""" add positive or negative points from cell above (if it exists) to points of current cell """
# ポジティブなポイントかネガティブなポイントを追加する
if (cell["y"] - i) >= 0:
cell_above = swarm[cell["x"]][cell["y"] - i]
cell_above = get_patterns(cell_above)
cell_above = calculate_points(cell_above)
# points will be positive or negative
# ポイントがポジティブかネガティブか
n = -1 if i & 1 else 1
# if it is first cell above
# 最初のセルの上かどうか
if i == 1:
# add first 4 points of cell_above["points"] to cell["points"]
# 最初の4ポイントを追加する
cell["points"][2:2] = [n * cell_above["points"][1], n * cell_above["points"][0]]
# if it is not potential "seven" pattern in cell and cell_above has more points
if abs(cell["points"][4]) < 2 and abs(cell["points"][4]) < cell_above["points"][2]:
cell["points"][4:4] = [n * cell_above["points"][2]]
# if it is not potential "seven" pattern in cell and cell_above has more points
if abs(cell["points"][5]) < 2 and abs(cell["points"][5]) < cell_above["points"][3]:
cell["points"][5:5] = [n * cell_above["points"][3]]
else:
cell["points"][7:7] = [n * cell_above["points"][3]]
else:
cell["points"][6:6] = [n * cell_above["points"][2], n * cell_above["points"][3]]
cell["points"].append(n * cell_above["points"][4])
cell["points"].append(n * cell_above["points"][5])
else:
cell["points"].extend(map(lambda z : z * n, cell_above["points"]))
else:
cell["points"].extend([0, 0, 0, 0, 0, 0])
return cell

def choose_best_cell(best_cell, current_cell):
""" compare two cells and return the best one """
# 2つのセルを比較しベストなセルを返す
if best_cell is not None:
for i in range(len(best_cell["points"])):
# compare amounts of points of two cells
# 2つのセルの総ポイントを比較する
if best_cell["points"][i] < current_cell["points"][i]:
best_cell = current_cell
break
if best_cell["points"][i] > current_cell["points"][i]:
break
# if ["points"][i] of cells are equal, compare distance to swarm's center of each cell
# もし["points"][i]セルが等しい場合、各セルの群れの中心への距離を比較する
if best_cell["points"][i] > 0:
if best_cell["distance_to_center"] > current_cell["distance_to_center"]:
best_cell = current_cell
break
if best_cell["distance_to_center"] < current_cell["distance_to_center"]:
break
else:
best_cell = current_cell
return best_cell

###############################################################################
# define swarm's and opponent's marks
# 群れと対戦相手のマークを定義
swarm_mark = obs.mark
opp_mark = 2 if swarm_mark == 1 else 1
# define swarm's center
# 群れの中央位置を定義
swarm_center_horizontal = conf.columns // 2
swarm_center_vertical = conf.rows // 2

# define swarm as two dimensional array of cells
# セルの2次元配列として群れを定義
swarm = []
for column in range(conf.columns):
swarm.append([])
for row in range(conf.rows):
cell = {
"x": column,
"y": row,
"mark": obs.board[conf.columns * row + column],
"swarm_patterns": {},
"opp_patterns": {},
"distance_to_center": abs(row - swarm_center_vertical) + abs(column - swarm_center_horizontal),
"points": []
}
swarm[column].append(cell)

best_cell = None
# start searching for best_cell from swarm center
# 群れの中央から最適なセル位置を検索開始
x = swarm_center_horizontal
# shift to right or left from swarm center
# 群れの中央から右か左にシフト
shift = 0

# searching for best_cell
# 最適なセル位置を検索
while x >= 0 and x < conf.columns:
# find first empty cell starting from bottom of the column
# カラムの底位置からマークされていない最初の位置を見つける
y = conf.rows - 1
while y >= 0 and swarm[x][y]["mark"] != 0:
y -= 1
# if column is not full
# カラムがフルでない場合
if y >= 0:
# current cell evaluates its own qualities
# 現在のセルの評価
current_cell = evaluate_cell(swarm[x][y])
# current cell compares itself against best cell
# 現在のセルと最適なセル位置を比較
best_cell = choose_best_cell(best_cell, current_cell)

# shift x to right or left from swarm center
# 中央から右か左にずらす
if shift >= 0:
shift += 1
shift *= -1
x = swarm_center_horizontal + shift

# return index of the best cell column
# 最適なカラム位置のインデックスを返す
return best_cell["x"]

エージェントの評価

ランダム選択の相手との結果と、NegaMax法の相手との結果(平均報酬)を表示します。

[ソース]

1
2
3
4
5
6
def mean_reward(rewards):
return sum(r[0] for r in rewards) / float(len(rewards))

# Run multiple episodes to estimate its performance.
print("My Agent vs Random Agent:", mean_reward(evaluate("connectx", [my_agent, "random"], num_episodes=10)))
print("My Agent vs Negamax Agent:", mean_reward(evaluate("connectx", [my_agent, "negamax"], num_episodes=10)))

[結果]

ランダム相手には完勝しており、NegaMax法の相手との結果も勝ち越しています。

なかなか強いエージェントみたいです。

Kaggle - ConnectX(2) - 4枚そろえるボードゲーム

Connect Xコンペに関する2回目の記事です。

Connect X

今回はエージェントが受け取る情報を確認したいと思います。

環境ルール(Environment Rules)

サンプルソースを見ますと下記のようなコメントが記述されていました。

[サンプルソース]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def agent(observation, configuration):
# Number of Columns on the Board.(ボードのカラム数)
columns = configuration.columns
# Number of Rows on the Board.(ボードの行数)
rows = configuration.rows
# Number of Checkers "in a row" needed to win.(勝つために並べるコインの数)
inarow = configuration.inarow
# The current serialized Board (rows x columns).(現在のボードの状態。1次元として)
board = observation.board
# Which player the agent is playing as (1 or 2).(どちらのプレイヤーか)
mark = observation.mark

# Return which column to drop a checker (action).(どのカラムに落とすかを返す)
return 0

確認のために、前回作成したソースにエージェントが受け取る情報を表示する処理を追加し実行してみます。

[ソース]

1
2
3
4
5
6
7
8
9
10
11
def my_agent(observation, configuration):
print('observation', observation)
print('configuration', configuration)
print('------------')
from random import choice
return choice([c for c in range(configuration.columns) if observation.board[c] == 0])

env.reset()
# Play as the first agent against default "random" agent.
env.run([my_agent, "random"])
env.render(mode="ipython", width=500, height=450)

[結果]
(一部表示)

サンプルソースの説明に合致した情報が格納されていることが分かります。

そのほかの情報(stepやtimeoutなど)もありますが、必要に応じて参照すればいいと思います。

ポイントとしては、observation.boardでしょうか。

6行7列の配列を1次元配列で表しています。この方が機械学習のデータとして扱いやすいですからね。