マルコフ決定過程(MDP)

マルコフ決定過程(MDP)に従う環境を構築します。

マルコフ決定過程(MDP)は次のようなルールに従います。

  • 遷移先の状態は直前の状態とそこでの行動のみに依存する。
  • 報酬は直前の状態と遷移先に依存する。

今回は次のような迷路を解く環境を実装します。

迷路

まずは必要なパッケージをインポートします。

1
2
3
import random
from enum import Enum
import numpy as np

状態を表すクラスを定義します。
縦位置をrow、横位置をcolumnで表します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class State():

def __init__(self, row=-1, column=-1):
self.row = row
self.column = column

def __repr__(self):
return "<State: [{}, {}]>".format(self.row, self.column)

def clone(self):
return State(self.row, self.column)

def __hash__(self):
return hash((self.row, self.column))

def __eq__(self, other):
return self.row == other.row and self.column == other.column

行動を表すクラスを定義します。
行動は上下左右への移動4種類です。

1
2
3
4
5
class Action(Enum):
UP = 1
DOWN = -1
LEFT = 2
RIGHT = -2

環境の実体となるクラスを定義します。
迷路の定義を2次元配列のgridで受け取ります。

gridの要素は次のような意味となります。

意味
0 移動可能な場所を表します。
-1 ダメージを受ける場所でゲーム終了となります。
1 報酬を得られる場所でゲーム終了となります。
9 壁を意味し移動することができない場所です。

default_rewardは基本の報酬となり、この変数をマイナスにすることで意味なく行動することを防ぎ、早くゴールに向かうことを促します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Environment():

def __init__(self, grid, move_prob=0.8):
# grid is 2d-array. Its values are treated as an attribute.
# Kinds of attribute is following.
# 0: ordinary cell
# -1: damage cell (game end)
# 1: reward cell (game end)
# 9: block cell (can't locate agent)
self.grid = grid
self.agent_state = State()

# Default reward is minus. Just like a poison swamp.
# It means the agent has to reach the goal fast!
self.default_reward = -0.04

# Agent can move to a selected direction in move_prob.
# It means the agent will move different direction
# in (1 - move_prob).
self.move_prob = move_prob
self.reset()

@property
def row_length(self):
return len(self.grid)

@property
def column_length(self):
return len(self.grid[0])

@property
def actions(self):
return [Action.UP, Action.DOWN,
Action.LEFT, Action.RIGHT]

@property
def states(self):
states = []
for row in range(self.row_length):
for column in range(self.column_length):
# Block cells are not included to the state.
if self.grid[row][column] != 9:
states.append(State(row, column))
return states

遷移関数を定義します。
選択した行動にはmove_prob(80%)の行動確率を設定し、反対の行動には0%の行動確率を設定します。
残りの2方向の移動には10%の行動確率を設定します。
(トータルの行動確率は100%になります。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def transit_func(self, state, action):
transition_probs = {}
if not self.can_action_at(state):
# Already on the terminal cell.
return transition_probs

opposite_direction = Action(action.value * -1)

for a in self.actions:
prob = 0
if a == action:
prob = self.move_prob
elif a != opposite_direction:
prob = (1 - self.move_prob) / 2

next_state = self._move(state, a)
if next_state not in transition_probs:
transition_probs[next_state] = prob
else:
transition_probs[next_state] += prob

return transition_probs

行動できる場所(状態)かどうかを判定する関数を定義します。

1
2
3
4
5
def can_action_at(self, state):
if self.grid[state.row][state.column] == 0:
return True
else:
return False

ある状態である行動をすると、次にどの状態になるかを返す関数を定義します。
迷路の範囲外への移動を防いだり、壁にぶつかったかどうかはこの関数内で判断します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _move(self, state, action):
if not self.can_action_at(state):
raise Exception("Can't move from here!")

next_state = state.clone()

# Execute an action (move).
if action == Action.UP:
next_state.row -= 1
elif action == Action.DOWN:
next_state.row += 1
elif action == Action.LEFT:
next_state.column -= 1
elif action == Action.RIGHT:
next_state.column += 1

# Check whether a state is out of the grid.
if not (0 <= next_state.row < self.row_length):
next_state = state
if not (0 <= next_state.column < self.column_length):
next_state = state

# Check whether the agent bumped a block cell.
if self.grid[next_state.row][next_state.column] == 9:
next_state = state

return next_state

報酬関数を定義します。
ある状態で報酬が得られるかどうか、ダメージを受けるかどうかを判定します。
ゲームが終了するかどうかもこの報酬関数で判定します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def reward_func(self, state):
reward = self.default_reward
done = False

# Check an attribute of next state.
attribute = self.grid[state.row][state.column]
if attribute == 1:
# Get reward! and the game ends.
reward = 1
done = True
elif attribute == -1:
# Get damage! and the game ends.
reward = -1
done = True

return reward, done

エージェントの位置を初期化する関数を定義します。
ゲーム開始時や、ゲームが終わり再度ゲームを開始する場合に使用します。

1
2
3
4
def reset(self):
# Locate the agent at lower left corner.
self.agent_state = State(self.row_length - 1, 0)
return self.agent_state

行動を行う関数を定義します。
行動を受け取り、遷移関数から遷移先を算出し、さらに報酬関数から即時報酬を取得します。

1
2
3
4
5
6
def step(self, action):
next_state, reward, done = self.transit(self.agent_state, action)
if next_state is not None:
self.agent_state = next_state

return next_state, reward, done

遷移関数を定義します。
行動を受け取り、遷移関数を使って行動確率を取得します。
行動確率から実際にどう行動するかどうかを最終決定します。(np.random.choice関数を使用)
決定した行動より遷移先と報酬、終了したかどうかの結果が導きだされます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def transit(self, state, action):
transition_probs = self.transit_func(state, action)
if len(transition_probs) == 0:
return None, None, True

next_states = []
probs = []
for s in transition_probs:
next_states.append(s)
probs.append(transition_probs[s])

next_state = np.random.choice(next_states, p=probs)
reward, done = self.reward_func(next_state)
return next_state, reward, done

エージェントを定義します。
エージェントのpolicyは状態を受け取って行動を決める関数ですが、今回は単純にランダム行動をとるようにしています。

1
2
3
4
5
6
7
class Agent():

def __init__(self, env):
self.actions = env.actions

def policy(self, state):
return random.choice(self.actions)

環境内でエージェントを動作させるコードを実装します。
迷路の定義(grid)を行い、それをもとにして環境(Environment)作成します。
作成した環境をエージェントに渡して、そのエージェントを行動させることでゲームが実行されます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def main():
# Make grid environment.
grid = [
[0, 0, 0, 1],
[0, 9, 0, -1],
[0, 0, 0, 0]
]
env = Environment(grid)
agent = Agent(env)

# Try 10 game.
for i in range(10):
# Initialize position of agent.
state = env.reset()
total_reward = 0
done = False

while not done:
action = agent.policy(state)
next_state, reward, done = env.step(action)
total_reward += reward
state = next_state

print("Episode {}: Agent gets {} reward.".format(i, total_reward))

if __name__ == "__main__":
main()

実行結果

単純なランダム行動ですが、10ゲーム行い10回分の報酬を取得できることを確認できます。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード