CartPoleをQ学習で制御

CartPoleをQ学習で制御していきます。

実装するクラスは下記の3つです。

  • Agentクラス
    カートを表します。
    Q関数を更新する関数と次の行動を決定する関数があります。
    Brainクラスのオブジェクトをメンバーに持ちます。
  • Brainクラス
    Agentの頭脳となるクラスです。
    Q学習を実装します。
    状態を離散化する関数とQテーブルを更新する関数とQテーブルから行動を決定する関数があります。
  • Environmentクラス
    OpenAI Gymの実行環境です。

まず必要なパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画を描画するための関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames),
interval=50)

anim.save('3_4movie_cartpole.mp4') # 動画ファイルを作成します。

各定数を定義します。
CartPole-v0は200ステップ 棒が立ち続ければゲーム攻略となります。(MAX_STEPS=200)

1
2
3
4
5
6
7
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
NUM_DIZITIZED = 6 # 各状態の離散値への分割数
GAMMA = 0.99 # 時間割引率
ETA = 0.5 # 学習係数
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 1000 # 最大試行回数

Agentクラスを実装します。

初期関数initでCartPoleの状態数と行動数を受け取り、自分の頭脳となるBrainクラスを生成します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Agent:
'''CartPoleのエージェントクラスです、棒付き台車そのものになります'''
def __init__(self, num_states, num_actions):
self.brain = Brain(num_states, num_actions) # エージェントが行動を決定するための頭脳を生成

def update_Q_function(self, observation, action, reward, observation_next):
'''Q関数の更新'''
self.brain.update_Q_table(
observation, action, reward, observation_next)

def get_action(self, observation, step):
'''行動の決定'''
action = self.brain.decide_action(observation, step)
return action

Brainクラスを実装します。
Qテーブルの更新と行動の決定を行います。

試行回数が少ないときは探索行動が多くなるようなε-greedy法としています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Brain:
'''エージェントが持つ脳となるクラスです、Q学習を実行します'''
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # CartPoleの行動(右に左に押す)の2を取得

# Qテーブルを作成。行数は状態を分割数^(4変数)にデジタル変換した値、列数は行動数を示す
self.q_table = np.random.uniform(low=0, high=1, size=(
NUM_DIZITIZED**num_states, num_actions))

def bins(self, clip_min, clip_max, num):
'''観測した状態(連続値)を離散値にデジタル変換する閾値を求める'''
return np.linspace(clip_min, clip_max, num + 1)[1:-1]

def digitize_state(self, observation):
'''観測したobservation状態を、離散値に変換する'''
cart_pos, cart_v, pole_angle, pole_v = observation
digitized = [
np.digitize(cart_pos, bins=self.bins(-2.4, 2.4, NUM_DIZITIZED)),
np.digitize(cart_v, bins=self.bins(-3.0, 3.0, NUM_DIZITIZED)),
np.digitize(pole_angle, bins=self.bins(-0.5, 0.5, NUM_DIZITIZED)),
np.digitize(pole_v, bins=self.bins(-2.0, 2.0, NUM_DIZITIZED))
]
return sum([x * (NUM_DIZITIZED**i) for i, x in enumerate(digitized)])

def update_Q_table(self, observation, action, reward, observation_next):
'''QテーブルをQ学習により更新'''
state = self.digitize_state(observation) # 状態を離散化
state_next = self.digitize_state(observation_next) # 次の状態を離散化
Max_Q_next = max(self.q_table[state_next][:])
self.q_table[state, action] = self.q_table[state, action] + \
ETA * (reward + GAMMA * Max_Q_next - self.q_table[state, action])

def decide_action(self, observation, episode):
'''ε-greedy法で徐々に最適行動のみを採用する'''
state = self.digitize_state(observation)
epsilon = 0.5 * (1 / (episode + 1))

if epsilon <= np.random.uniform(0, 1):
action = np.argmax(self.q_table[state][:])
else:
action = np.random.choice(self.num_actions) # 0,1の行動をランダムに返す
return action

Environmentクラスを定義します。
10回連続で195ステップ以上棒が立ち続ければ学習成功としています。

成功後に動画保存のため1回再実行しています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class Environment:
'''CartPoleを実行する環境のクラスです'''
def __init__(self):
self.env = gym.make(ENV) # 実行する課題を設定
num_states = self.env.observation_space.shape[0] # 課題の状態の数4を取得
num_actions = self.env.action_space.n # CartPoleの行動(右に左に押す)の2を取得
self.agent = Agent(num_states, num_actions) # 環境内で行動するAgentを生成

def run(self):
'''実行'''
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
is_episode_final = False # 最終試行フラグ
frames = [] # 動画用に画像を格納する変数

for episode in range(NUM_EPISODES): # 試行数分繰り返す
observation = self.env.reset() # 環境の初期化

for step in range(MAX_STEPS): # 1エピソードのループ

if is_episode_final is True: # 最終試行ではframesに各時刻の画像を追加していく
frames.append(self.env.render(mode='rgb_array'))

# 行動を求める
action = self.agent.get_action(observation, episode)

# 行動a_tの実行により、s_{t+1}, r_{t+1}を求める
observation_next, _, done, _ = self.env.step(
action) # rewardとinfoは使わないので_にする

# 報酬を与える
if done: # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
if step < 195:
reward = -1 # 途中でこけたら罰則として報酬-1を与える
complete_episodes = 0 # 195step以上連続で立ち続けた試行数をリセット
else:
reward = 1 # 立ったまま終了時は報酬1を与える
complete_episodes += 1 # 連続記録を更新
else:
reward = 0 # 途中の報酬は0

# step+1の状態observation_nextを用いて,Q関数を更新する
self.agent.update_Q_function(
observation, action, reward, observation_next)

# 観測の更新
observation = observation_next

# 終了時の処理
if done:
print('{0} Episode: Finished after {1} time steps'.format(
episode, step + 1))
break

if is_episode_final is True: # 最終試行では動画を保存と描画
display_frames_as_gif(frames)
break

if complete_episodes >= 10: # 10連続成功なら
print('10回連続成功')
is_episode_final = True # 次の試行を描画を行う最終試行とする

Environmentクラスのオブジェクトを生成し、関数runを実行します。

1
2
3
# main
cartpole_env = Environment()
cartpole_env.run()

実行結果 エピソード0~18

実行結果 エピソード261~276(終了)
275エピソードで学習が完了しています。(195ステップ以上を10エピソード連続で成功)

強化学習後のプレイ動画は’3_4movie_cartpole.mp4’ファイルで保存されています。

棒が倒れずにうまくバランスがとれていることが分かります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ