Unity ML-AgentsのBrain

Agentが観測した状態に応じて、行動を決定するオブジェクトとなります。
1つのBrainで複数のAgentsの行動を決定することもできます。
Brainには下記の4種類があります。

1.External

外部の自作MLライブラリ(Tensorflowなど)を使用します。
学習時に設定されます。

2.Internal

プロジェクトに埋め込まれた推論モデルを使用します。
推論時に設定されます。

3.Player

プレイヤー(人間)の入力に従って行動します。
学習環境の動作確認時などに利用します。

4.Heuristic

ルールベース(プログラム)に従って行動します。

Unity ML-Agentsのプロセス

Unity ML-Agentsは、Unityで機械学習の学習環境を構築するためのフレームワークです。
Unity ML-Agentsでの2プロセスに関して説明します。

学習プロセス

学習用Pythonスクリプトが学習環境となるUnityで強化学習を行います。
学習結果は推論モデルとして保存されます。

推論プロセス

学習結果となる推論モデルをつかってUnityで動作します。
推論モデルは与えられたデータから推論結果を導き出すものです。

Intrinsic Curiosity Module(ICM)

Intrinsic Curiosity Module(ICM)

まだ見たことのない場面に対する好奇心を報酬として学習させる手法です。

ICMでは次の2つのモデルを同時に学習します。

  • 逆モデル
    2つの状態からその間に選択した行動を予測する。
  • 順モデル
    状態と選択した行動から次の状態を予測する。
    この予測が外れるほど多くの報酬を与える。

これらによってエージェントにとって未知である行動を取るほど報酬を多く受け取ることになります。
迷路を探索してさまざまな行動をとる必要があるゲーム等に最適な学習方法です。

Recurrent Neural Network(RNN)とLong Short-Term Memory(LSTM)

Recurrent Neural Network(RNN)

時系列を扱えるニューラルネットワークです。

強化学習では通常「現在の環境」に応じて行動を決定しますが、RNNを利用することで「過去の環境」の状態も踏まえて行動を決定することができるようになります。
エージェントが記憶を持つようなイメージです。

ただRNNでは長期記憶の学習がうまくできないという問題があるため、次で説明するLSTMで長期記憶ができるように改善します。

Long Short-Term Memory(LSTM)

長期的な依存関係を学習することのできるRNNの特別な一種で、1つ前の入力データをうまく扱うことに特化した「LSTMブロック」を組み込みます。

多種多様な問題にとてもよく動作し、現在では広く使用されています。

カリキュラム学習

カリキュラム学習はタスクの難易度を徐々に上げていくことにより、効率的な学習を可能にする手法です。

例としましては「足し算・引き算」を学んだあと、「掛け算・引き算」を学び、そのあとで「面積の計算」を学習します。
学んだ知識をその後の学習に生かすことで、より難しい問題を解くことが可能になります。

この手法は機械学習にも適用可能で、簡単なタスクを訓練することでより困難なタスクを達成することを目指します。

Unity ML-Agents V0.4.0bがなんとか動くようになった件

強化学習を実践していく上でシミュレータが必要なことがわかり、シミュレータとしてはUnityが便利だということがわかり、さらにUnity ML-Agentsを使うとPythonからUnityを動作させることが分かりました。
そして参考文献を探したところ目的にはまった下記の書籍を見つけましたがバージョンが古いせいでしょうか、まったく動作させることができずしばらく放置していました。

しかしなんとか動作させることができるようになったので備忘録としてまとめておきます。

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

【手順1】Unityインストール

書籍(P.41)にしたがってUnityをインストールします。
私の環境ではUnityのバージョンは2017.3.1f1 (64-bit)を使用しました。

【手順2】Unity ML-Agents v0.4.0bダウンロード

サポートサイトに記載されていますが、下記のリンクからUnity ML-Agents v0.4.0bをダウンロードします。
https://github.com/Unity-Technologies/ml-agents/tree/0.4.0b

※2019/11/16現在の最新バージョンは0.11なのでかなり古いバージョンです。

【手順3】TensorFlowSharpプラグインダウンロード

書籍(P.42)にしたがってTensorFlowSharpプラグインをダウンロードをインストールします。


【手順4】Pythonインストール

書籍(P.43)にしたがってPythonをインストールし、仮想環境を構築します。
Pythonバージョンは3.6で問題ありません。

【手順5】Pythonパッケージインストール

書籍(P.44)に該当する箇所ですが、【手順2】でダウンロードしたファイルを使ってPythonパッケージをインストールします。
ダウンロードしたものの中にpythonフォルダがありますのでそのフォルダに移動してインストールコマンドを実行します。

1
2
cd (ダウンロードした中のpythonフォルダ)
pip install .

【手順6】Numpyバージョン変更

ここが一番はまったポイントでした。
Numpyのバージョンは1.17.4がインストールされていたのですがこれを1.14.5に落とします。

1
2
pip uninstall numpy
pip install numpy==1.14.5

(動作確認時に配列関連エラーでExceptionが発生していたのでnumpyのバージョンを疑い、結果動作させることができるようになりました。)


【手順7】プロジェクト設定

書籍(P.47~P.49)にしたがってプロジェクトの設定を行います。

【手順8】Unity Editorで動作確認

書籍(P.50~P.52)のexeを作っての実行はうまくいかないのであきらめました。
書籍(P.63)「Unity Editor上での学習」の方を実行したところ問題なく動作確認できました。

以上で一通り書籍に書かれているサンプルを実行させることができるようになります。

もしこれでもうまくいかない場合のため、動作確認ができた環境のライブラリバージョン一覧を書いておきますので、参考にして頂ければと思います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
> pip list
Package Version
------------------ -------------------
absl-py 0.8.1
astor 0.8.0
atomicwrites 1.3.0
attrs 19.3.0
backcall 0.1.0
bleach 1.5.0
certifi 2019.9.11
colorama 0.4.1
cycler 0.10.0
decorator 4.4.1
defusedxml 0.6.0
docopt 0.6.2
entrypoints 0.3
gast 0.3.2
grpcio 1.11.0
html5lib 0.9999999
importlib-metadata 0.23
ipykernel 5.1.3
ipython 7.9.0
ipython-genutils 0.2.0
ipywidgets 7.5.1
jedi 0.15.1
Jinja2 2.10.3
jsonschema 3.1.1
jupyter 1.0.0
jupyter-client 5.3.4
jupyter-console 6.0.0
jupyter-core 4.6.1
kiwisolver 1.1.0
Markdown 3.1.1
MarkupSafe 1.1.1
matplotlib 3.1.1
mistune 0.8.4
more-itertools 7.2.0
nbconvert 5.6.1
nbformat 4.4.0
notebook 6.0.2
numpy 1.14.5
packaging 19.2
pandocfilters 1.4.2
parso 0.5.1
pickleshare 0.7.5
Pillow 6.2.1
pip 19.3.1
pluggy 0.13.0
prometheus-client 0.7.1
prompt-toolkit 2.0.10
protobuf 3.5.2
py 1.8.0
Pygments 2.4.2
pyparsing 2.4.5
pyrsistent 0.15.5
pytest 5.2.3
python-dateutil 2.8.1
pywin32 227
pywinpty 0.5.5
PyYAML 5.1.2
pyzmq 18.1.1
qtconsole 4.5.5
Send2Trash 1.5.0
setuptools 41.6.0.post20191030
six 1.13.0
tensorboard 1.7.0
tensorflow 1.7.1
termcolor 1.1.0
terminado 0.8.3
testpath 0.4.4
tornado 6.0.3
traitlets 4.3.3
unityagents 0.4.0
wcwidth 0.1.7
Werkzeug 0.16.0
wheel 0.33.6
widgetsnbextension 3.5.1
wincertstore 0.2
zipp 0.6.0

私と同じように動作確認をあきらめてしまった方の一助になれば幸いです。

経験の蓄積と活用のバランス Epsion-Greedey法

経験の蓄積と活用のトレードオフのバランスをとる手法としてEpsilon-Greedy法を実装します。

何枚かのコインから1枚を選んで、投げた時表が出れば報酬が得られるゲームを考えます。
各コインの表が出る確率はバラバラです。

必要なパッケージをインポートします。

1
2
import random
import numpy as np

コイントスゲームの実装を行います。
head_probsは配列のパラメータで各コインの表が出る確率を指定します。

max_episode_stepsはコイントスを行う回数で、この回数の実行して表がでた回数が報酬となります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class CoinToss():

def __init__(self, head_probs, max_episode_steps=30):
self.head_probs = head_probs
self.max_episode_steps = max_episode_steps
self.toss_count = 0

def __len__(self):
return len(self.head_probs)

def reset(self):
self.toss_count = 0

def step(self, action):
final = self.max_episode_steps - 1
if self.toss_count > final:
raise Exception("The step count exceeded maximum. Please reset env.")
else:
done = True if self.toss_count == final else False

if action >= len(self.head_probs):
raise Exception("The No.{} coin doesn't exist.".format(action))
else:
head_prob = self.head_probs[action]
if random.random() < head_prob:
reward = 1.0
else:
reward = 0.0
self.toss_count += 1
return reward, done

エージェントを作成します。

policy関数で、epsilonの確率でランダムにコインを選択し(探索)、それ以外の確率で各コインの期待値にそってコインを選択します(活用)。
play関数は、コイントスを行う処理です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class EpsilonGreedyAgent():

def __init__(self, epsilon):
self.epsilon = epsilon
self.V = []

def policy(self):
coins = range(len(self.V))
if random.random() < self.epsilon:
return random.choice(coins)
else:
return np.argmax(self.V)

def play(self, env):
# Initialize estimation.
N = [0] * len(env)
self.V = [0] * len(env)

env.reset()
done = False
rewards = []
while not done:
selected_coin = self.policy()
reward, done = env.step(selected_coin)
rewards.append(reward)

n = N[selected_coin]
coin_average = self.V[selected_coin]
new_average = (coin_average * n + reward) / (n + 1)
N[selected_coin] += 1
self.V[selected_coin] = new_average

return rewards

5枚のコインを用意し、コイントスの回数を変えながら、各エピソードにおける1回のコイントスあたりの報酬を記録していきます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if __name__ == "__main__":
import pandas as pd
import matplotlib.pyplot as plt

def main():
env = CoinToss([0.1, 0.5, 0.1, 0.9, 0.1])
epsilons = [0.0, 0.1, 0.2, 0.5, 0.8]
game_steps = list(range(10, 310, 10))
result = {}
for e in epsilons:
agent = EpsilonGreedyAgent(epsilon=e)
means = []
for s in game_steps:
env.max_episode_steps = s
rewards = agent.play(env)
means.append(np.mean(rewards))
result["epsilon={}".format(e)] = means
result["coin toss count"] = game_steps
result = pd.DataFrame(result)
result.set_index("coin toss count", drop=True, inplace=True)
result.plot.line(figsize=(10, 5))
plt.show()

main()

実行結果

epsilon=0.1と0.2ではコイントスの回数とともに報酬が向上していることが分かります。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

価値の定義と算出 Bellman Equation

価値を再帰的かつ期待値で表現する手法をBellman Equationと呼びます。(Valueベース)
Bellman Equationを使えば各状態の価値が計算可能となります。

まず価値を返す関数を定義します。

1
2
3
def V(s, gamma=0.99):
V = R(s) + gamma * max_V_on_next_state(s)
return V

報酬関数を定義します。
エピソード終了のとき”happy_end”であれば1を返し、”bad_end”であれば-1を返します。
エピソードが終了していなければ0を返します。

1
2
3
4
5
6
7
def R(s):
if s == "happy_end":
return 1
elif s == "bad_end":
return -1
else:
return 0

全ての行動でV(s)を計算し値が最大になる価値を返します。
評価vの計算式は確率遷移×遷移先の価値となります。

upかdownかを繰り返していき5回行動したら終了となります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def max_V_on_next_state(s):
# If game end, expected value is 0.
if s in ["happy_end", "bad_end"]:
return 0

actions = ["up", "down"]
values = []
for a in actions:
transition_probs = transit_func(s, a)
v = 0
for next_state in transition_probs:
prob = transition_probs[next_state] # 確率遷移
v += prob * V(next_state) # 遷移先の価値
values.append(v)
return max(values)

遷移関数を定義します。

  • 引数sには”state”や”state_up_up”、”state_down_down”などが受け渡されます。
  • 引数aは”up”か”down”が設定されます。
  • エピソード完了時は1要素が返り、途中の場合は2要素が返ります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def transit_func(s, a):
actions = s.split("_")[1:]
LIMIT_GAME_COUNT = 5
HAPPY_END_BORDER = 4
MOVE_PROB = 0.9

def next_state(state, action):
return "_".join([state, action])

if len(actions) == LIMIT_GAME_COUNT:
# 最大行動数と一致するのでエピソード終了
up_count = sum([1 if a == "up" else 0 for a in actions])
state = "happy_end" if up_count >= HAPPY_END_BORDER else "bad_end"
prob = 1.0
return {state: prob}
else:
opposite = "up" if a == "down" else "down"
return {
next_state(s, a): MOVE_PROB,
next_state(s, opposite): 1 - MOVE_PROB
}

実際に価値V(s)の計算を行ってみます。

1
2
3
4
if __name__ == "__main__":
print(V("state"))
print(V("state_up_up"))
print(V("state_down_down"))

実行結果

upの数が多い方が評価されます。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

マルコフ決定過程(MDP)

マルコフ決定過程(MDP)に従う環境を構築します。

マルコフ決定過程(MDP)は次のようなルールに従います。

  • 遷移先の状態は直前の状態とそこでの行動のみに依存する。
  • 報酬は直前の状態と遷移先に依存する。

今回は次のような迷路を解く環境を実装します。

迷路

まずは必要なパッケージをインポートします。

1
2
3
import random
from enum import Enum
import numpy as np

状態を表すクラスを定義します。
縦位置をrow、横位置をcolumnで表します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class State():

def __init__(self, row=-1, column=-1):
self.row = row
self.column = column

def __repr__(self):
return "<State: [{}, {}]>".format(self.row, self.column)

def clone(self):
return State(self.row, self.column)

def __hash__(self):
return hash((self.row, self.column))

def __eq__(self, other):
return self.row == other.row and self.column == other.column

行動を表すクラスを定義します。
行動は上下左右への移動4種類です。

1
2
3
4
5
class Action(Enum):
UP = 1
DOWN = -1
LEFT = 2
RIGHT = -2

環境の実体となるクラスを定義します。
迷路の定義を2次元配列のgridで受け取ります。

gridの要素は次のような意味となります。

意味
0 移動可能な場所を表します。
-1 ダメージを受ける場所でゲーム終了となります。
1 報酬を得られる場所でゲーム終了となります。
9 壁を意味し移動することができない場所です。

default_rewardは基本の報酬となり、この変数をマイナスにすることで意味なく行動することを防ぎ、早くゴールに向かうことを促します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Environment():

def __init__(self, grid, move_prob=0.8):
# grid is 2d-array. Its values are treated as an attribute.
# Kinds of attribute is following.
# 0: ordinary cell
# -1: damage cell (game end)
# 1: reward cell (game end)
# 9: block cell (can't locate agent)
self.grid = grid
self.agent_state = State()

# Default reward is minus. Just like a poison swamp.
# It means the agent has to reach the goal fast!
self.default_reward = -0.04

# Agent can move to a selected direction in move_prob.
# It means the agent will move different direction
# in (1 - move_prob).
self.move_prob = move_prob
self.reset()

@property
def row_length(self):
return len(self.grid)

@property
def column_length(self):
return len(self.grid[0])

@property
def actions(self):
return [Action.UP, Action.DOWN,
Action.LEFT, Action.RIGHT]

@property
def states(self):
states = []
for row in range(self.row_length):
for column in range(self.column_length):
# Block cells are not included to the state.
if self.grid[row][column] != 9:
states.append(State(row, column))
return states

遷移関数を定義します。
選択した行動にはmove_prob(80%)の行動確率を設定し、反対の行動には0%の行動確率を設定します。
残りの2方向の移動には10%の行動確率を設定します。
(トータルの行動確率は100%になります。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def transit_func(self, state, action):
transition_probs = {}
if not self.can_action_at(state):
# Already on the terminal cell.
return transition_probs

opposite_direction = Action(action.value * -1)

for a in self.actions:
prob = 0
if a == action:
prob = self.move_prob
elif a != opposite_direction:
prob = (1 - self.move_prob) / 2

next_state = self._move(state, a)
if next_state not in transition_probs:
transition_probs[next_state] = prob
else:
transition_probs[next_state] += prob

return transition_probs

行動できる場所(状態)かどうかを判定する関数を定義します。

1
2
3
4
5
def can_action_at(self, state):
if self.grid[state.row][state.column] == 0:
return True
else:
return False

ある状態である行動をすると、次にどの状態になるかを返す関数を定義します。
迷路の範囲外への移動を防いだり、壁にぶつかったかどうかはこの関数内で判断します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _move(self, state, action):
if not self.can_action_at(state):
raise Exception("Can't move from here!")

next_state = state.clone()

# Execute an action (move).
if action == Action.UP:
next_state.row -= 1
elif action == Action.DOWN:
next_state.row += 1
elif action == Action.LEFT:
next_state.column -= 1
elif action == Action.RIGHT:
next_state.column += 1

# Check whether a state is out of the grid.
if not (0 <= next_state.row < self.row_length):
next_state = state
if not (0 <= next_state.column < self.column_length):
next_state = state

# Check whether the agent bumped a block cell.
if self.grid[next_state.row][next_state.column] == 9:
next_state = state

return next_state

報酬関数を定義します。
ある状態で報酬が得られるかどうか、ダメージを受けるかどうかを判定します。
ゲームが終了するかどうかもこの報酬関数で判定します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def reward_func(self, state):
reward = self.default_reward
done = False

# Check an attribute of next state.
attribute = self.grid[state.row][state.column]
if attribute == 1:
# Get reward! and the game ends.
reward = 1
done = True
elif attribute == -1:
# Get damage! and the game ends.
reward = -1
done = True

return reward, done

エージェントの位置を初期化する関数を定義します。
ゲーム開始時や、ゲームが終わり再度ゲームを開始する場合に使用します。

1
2
3
4
def reset(self):
# Locate the agent at lower left corner.
self.agent_state = State(self.row_length - 1, 0)
return self.agent_state

行動を行う関数を定義します。
行動を受け取り、遷移関数から遷移先を算出し、さらに報酬関数から即時報酬を取得します。

1
2
3
4
5
6
def step(self, action):
next_state, reward, done = self.transit(self.agent_state, action)
if next_state is not None:
self.agent_state = next_state

return next_state, reward, done

遷移関数を定義します。
行動を受け取り、遷移関数を使って行動確率を取得します。
行動確率から実際にどう行動するかどうかを最終決定します。(np.random.choice関数を使用)
決定した行動より遷移先と報酬、終了したかどうかの結果が導きだされます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def transit(self, state, action):
transition_probs = self.transit_func(state, action)
if len(transition_probs) == 0:
return None, None, True

next_states = []
probs = []
for s in transition_probs:
next_states.append(s)
probs.append(transition_probs[s])

next_state = np.random.choice(next_states, p=probs)
reward, done = self.reward_func(next_state)
return next_state, reward, done

エージェントを定義します。
エージェントのpolicyは状態を受け取って行動を決める関数ですが、今回は単純にランダム行動をとるようにしています。

1
2
3
4
5
6
7
class Agent():

def __init__(self, env):
self.actions = env.actions

def policy(self, state):
return random.choice(self.actions)

環境内でエージェントを動作させるコードを実装します。
迷路の定義(grid)を行い、それをもとにして環境(Environment)作成します。
作成した環境をエージェントに渡して、そのエージェントを行動させることでゲームが実行されます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def main():
# Make grid environment.
grid = [
[0, 0, 0, 1],
[0, 9, 0, -1],
[0, 0, 0, 0]
]
env = Environment(grid)
agent = Agent(env)

# Try 10 game.
for i in range(10):
# Initialize position of agent.
state = env.reset()
total_reward = 0
done = False

while not done:
action = agent.policy(state)
next_state, reward, done = env.step(action)
total_reward += reward
state = next_state

print("Episode {}: Agent gets {} reward.".format(i, total_reward))

if __name__ == "__main__":
main()

実行結果

単純なランダム行動ですが、10ゲーム行い10回分の報酬を取得できることを確認できます。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

進化戦略と遺伝的アルゴリズム

ニューラルネットワークの学習では勾配法が使われることが一般的ですが、勾配法とは違うアプローチとして「進化戦略」と「遺伝的アルゴリズム」があります。

進化戦略

パラメータを複数生成し、各パラメータを使った場合のモデルを評価します。
評価がよいものに近いパラメータをさらに生成し、評価を行うというプロセスを繰り返します。
(多くの候補から優秀なものを絞り込むというアプローチです)

遺伝的アルゴリズム

進化戦略と基本は同じですが、評価が高かったパラメータ同士を混ぜる(交叉)、ランダムなパラメータを入れる(突然変異)という操作を行います。