gym-duckietown - 強化学習環境の紹介④

Duckietownは、MIT(マサチューセッツ工科大学)発の、ロボット工学とAIを学ぶためのプラットフォームです。

自動車(Duckiebot)、道路、信号、標識、障害物、輸送を必要とする人(Duckies)、そしてそれらが住む都市(Duckietown)で構成されています。

Duckietown - https://www.duckietown.org/


gym-duckietownは、Duckietownのシミュレータ環境になります。

duckietown/gym-duckietown - https://github.com/duckietown/gym-duckietown

Gym Retro - 利用できるゲーム一覧

Gym Retro環境で利用できるゲームの一覧を取得します。

Gym Retroで利用できるゲーム一覧

下記のコードを準備します。

[コード]

1
2
3
import retro
for i, game in enumerate(retro.data.list_games()):
print(i, game, retro.data.list_states(game))

実行すると下記のようにゲーム一覧が表示されます。

[実行結果]

0 1942-Nes ['1Player.Level1']
1 1943-Nes ['Level1']
2 3NinjasKickBack-Genesis ['1Player.Colt.Level1']
3 8Eyes-Nes ['1Player.Arabia.Level1']
4 AaahhRealMonsters-Genesis ['Level1']
5 AbadoxTheDeadlyInnerWar-Nes ['Level1']
6 AcceleBrid-Snes ['SilverMarine.DefaultSettings.Level1']
7 ActRaiser2-Snes ['Level1']
8 ActionPachio-Snes ['Level1']
9 AddamsFamily-GameBoy ['Level1']
(途中略)
985 ZombiesAteMyNeighbors-Snes ['Start']
986 ZoolNinjaOfTheNthDimension-Genesis ['Level1']
987 ZoolNinjaOfTheNthDimension-Sms ['Level1']
988 ZoolNinjaOfTheNthDimension-Snes ['Level1']

989種類のゲームが利用できることが確認できます。

ここで表示されるのは「ゲームインテグレーション」と呼ばれ、「開始状態」「報酬」「エピソード完了」という情報が定義されているゲームになります。

Gym Retro - シューティングゲームを実行する② 学習編

前回はシューティングゲームをランダムで実行しましたが、今回は強化学習アルゴリズムを実装してゲームを攻略してみます。

baselinesインストール

baselinesがインストールされていない場合は、下記コマンドを実行してインストールしておきます。

1
2
3
git clone https://github.com/openai/baselines.git
cd baselines
pip install -e .

Airstrikerを学習して実行

Airstrikerを学習して実行するコードは次のようになります。

  • コールバック
    10更新ごとにベストモデルの保存を行います。
  • Airstrikerラッパー
    行動パターンを4096通りから3通りに減らして学習効率を上げています。
  • CustomRewardAndDoneラッパー
    報酬を20で割って範囲を「0~1.0」に近づけます。報酬は「0~1.0」の範囲に収めると、学習が安定する傾向があります。

[コード]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import retro
import time
import os
import gym
import numpy as np
import datetime
import pytz
from stable_baselines.results_plotter import load_results, ts2xy
from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2
from stable_baselines.common import set_global_seeds
from stable_baselines.bench import Monitor
from baselines.common.retro_wrappers import *
from util import log_dir, callback, AirstrikerDiscretizer, CustomRewardAndDoneEnv

# ログフォルダの生成
log_dir = './logs/'
os.makedirs(log_dir, exist_ok=True)

# コールバック
best_mean_reward = -np.inf
nupdates = 1
def callback(_locals, _globals):
global nupdates
global best_mean_reward

# 10更新毎
if (nupdates + 1) % 10 == 0:
# 平均エピソード長、平均報酬の取得
x, y = ts2xy(load_results(log_dir), 'timesteps')
if len(y) > 0:
# 直近10件の平均報酬
mean_reward = np.mean(y[-10:])

# 平均報酬がベスト報酬以上の時はモデルを保存
update_model = mean_reward > best_mean_reward
if update_model:
best_mean_reward = mean_reward
_locals['self'].save('airstriker_model')

# ログ
print('time: {}, nupdates: {}, mean: {:.2f}, best_mean: {:.2f}, model_update: {}'.format(
datetime.datetime.now(pytz.timezone('Asia/Tokyo')),
nupdates, mean_reward, best_mean_reward, update_model))
nupdates += 1
return True

# Airstrikerラッパー
class AirstrikerDiscretizer(gym.ActionWrapper):
# 初期化
def __init__(self, env):
super(AirstrikerDiscretizer, self).__init__(env)
buttons = ['B', 'A', 'MODE', 'START', 'UP', 'DOWN', 'LEFT', 'RIGHT', 'C', 'Y', 'X', 'Z']
actions = [['LEFT'], ['RIGHT'], ['B']]
self._actions = []
for action in actions:
arr = np.array([False] * 12)
for button in action:
arr[buttons.index(button)] = True
self._actions.append(arr)
self.action_space = gym.spaces.Discrete(len(self._actions))
# 行動の取得
def action(self, a):
return self._actions[a].copy()

# CustomRewardAndDoneラッパー
class CustomRewardAndDoneEnv(gym.Wrapper):
# 初期化
def __init__(self, env):
super(CustomRewardAndDoneEnv, self).__init__(env)
# ステップ
def step(self, action):
state, rew, done, info = self.env.step(action)
# 報酬の変更
rew /= 20
# エピソード完了の変更
if info['gameover'] == 1:
done = True
return state, rew, done, info

# 環境の生成
env = retro.make(game='Airstriker-Genesis', state='Level1')
env = AirstrikerDiscretizer(env) # 行動空間を離散空間に変換
env = CustomRewardAndDoneEnv(env) # 報酬とエピソード完了の変更
env = StochasticFrameSkip(env, n=4, stickprob=0.25) # スティッキーフレームスキップ
env = Downsample(env, 2) # ダウンサンプリング
env = Rgb2gray(env) # グレースケール
env = FrameStack(env, 4) # フレームスタック
env = ScaledFloatFrame(env) # 状態の正規化
env = Monitor(env, log_dir, allow_early_resets=True)
print('行動空間: ', env.action_space)
print('状態空間: ', env.observation_space)

# シードの指定
env.seed(0)
set_global_seeds(0)

# ベクトル化環境の作成
env = DummyVecEnv([lambda: env])

# モデルの作成
model = PPO2('CnnPolicy', env, verbose=0)

# モデルの読み込み
#model = PPO2.load('airstriker_model', env=env, verbose=0)

# モデルの学習
model.learn(total_timesteps=128000, callback=callback)

# モデルのテスト
state = env.reset()
total_reward = 0
while True:
# 環境の描画
env.render()
# スリープ
time.sleep(1/60)
# モデルの推論
action, _ = model.predict(state)
# 1ステップ実行
state, reward, done, info = env.step(action)
total_reward += reward[0]
# エピソード完了
if done:
print('reward:', total_reward)
state = env.reset()
total_reward = 0

上記コードを実行すると最初に学習処理が実行されます。(私の環境ではGPUなしで60分以上かかりました。)

それが完了されると下記のようなウィンドウが表示され、学習したモデルでゲームが実行されます。

実行結果

ランダム行動よりまともにゲームができているようです。

Gym Retro - シューティングゲームを実行する① ランダム編

Gym Retroはファミコン、スーパーファミコン、メガドライブ、PCエンジンなどのレトロゲーム機のエミュレータが搭載されていて、昔の懐かしいゲームを実行することができます。

今回は標準付属されているシューティングゲーム「Airstriker」をランダムで実行してみます。

Gym Retroインストール

Gym Retroをインストールするために下記コマンドを実行します。

1
pip install gym-retro

Airstrikerをランダム実行

Airstrikerをランダム実行するコードは次のようになります。

[コード]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import retro
import time

# 環境の作成
env = retro.make(game='Airstriker-Genesis', state='Level1')

# ランダム行動による動作確認
state = env.reset()
while True:
# 環境の描画
env.render()

# スリープ
time.sleep(1/60)

# 1ステップ実行
state, reward, done, info = env.step(env.action_space.sample())
print('reward:', reward)

# エピソード完了
if done:
print('done')
state = env.reset()

上記コードを実行すると下記のようなウィンドウが現れてシューティングゲームがランダム行動で実行されます。

実行結果

ランダム行動のためすぐやられてしまいます。

次回はこのシューティングゲームを強化学習で攻略してみます。

Python OpenAI Gym - モデルの読み込み

前回ファイルに保存した学習済みモデルを読み込んで利用してみます。

モデルの読み込み

学習したモデルを読み込むコードは次のようになります。

学習したモデルの読み込みにはload関数を利用します。

[コード]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import gym
from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2

# 環境の作成
env = gym.make('CartPole-v1')
env = DummyVecEnv([lambda: env])

# モデルの読み込み
model = PPO2.load('sample')

# モデルのテスト
state = env.reset()
for i in range(200):
# 環境の描画
env.render()

# モデルの推論
action, _ = model.predict(state)

# 1ステップ実行
state, rewards, done, info = env.step(action)

# エピソード完了判定
if done:
break

# 環境のクローズ
env.close()

上記コードを実行すると「CartPole-v1」が表示され、棒のバランスがとれていることを確認できます。

Python OpenAI Gym - モデルの保存

モデルを利用するたびに毎回学習するのは時間がかかります。

今回は学習したモデルを保存して、次回はそのモデルを読み込んで利用してみます。

モデルの保存

学習したモデルを保存するコードは次のようになります。

学習処理learn関数は少し時間がかかります。学習したモデルの保存はsave関数を利用します。

[コード]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gym
from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2

# 環境の作成
env = gym.make('CartPole-v1')
env = DummyVecEnv([lambda: env])

# モデルの生成
model = PPO2('MlpPolicy', env, verbose=1)

# モデルの学習
model.learn(total_timesteps=100000)

# モデルの保存
model.save('sample')

上記コードを実行するとsample.zipという学習済みモデルファイルが作成されます。

次回は、この学習済みモデルの読み込みを行います。

Python OpenAI Gym - カスタムGym環境の学習

自作の環境(カスタムGym環境)を学習させて実行してみます。

カスタムGym環境の作成

右への移動を学ぶ環境GoRightを実装します。(前回実行したもののと同じものとなります。)

エージェントが左右に移動する5マスの環境になります。

[コード]

go_right.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import numpy as np
import gym

# 右への移動を学ぶ環境
class GoRight(gym.Env):
# 定数定義
GRID_SIZE = 5
LEFT = 0
RIGHT = 1

# 初期化
def __init__(self):
super(GoRight, self).__init__()
# グリッドのサイズ
self.grid_size = self.GRID_SIZE
# 初期位置の指定
self.agent_pos = self.GRID_SIZE - 1
# 行動空間と状態空間の定義
self.action_space = gym.spaces.Discrete(2)
self.observation_space = gym.spaces.Box(low=0, high=self.GRID_SIZE - 1, shape=(1,), dtype=np.float32)

# 環境のリセット
def reset(self):
# 初期位置の指定
self.agent_pos = 0
# 初期位置をfloat32のnumpy配列に変換
return np.array(self.agent_pos).astype(np.float32)

# 環境の1ステップ実行
def step(self, action):
# 移動
if action == self.LEFT:
self.agent_pos -= 1
elif action == self.RIGHT:
self.agent_pos += 1
self.agent_pos = np.clip(self.agent_pos, 0, self.GRID_SIZE)
# エピソード完了の計算
done = self.agent_pos == self.GRID_SIZE - 1
# 報酬の計算
reward = 1 if done else - 0.1
return np.array(self.agent_pos).astype(np.float32), reward, done, {}

# 環境の描画
def render(self, mode='console', close=False):
# エージェントはA、他は.で表現する
print('.' * self.agent_pos, end='')
print('A', end='')
print('.' * (self.GRID_SIZE - 1 - self.agent_pos))

カスタムGym環境を学習させて実行

PPOで学習させて実行します。

[コード]

train_go_right.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import gym
from go_right import GoRight

from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2

# 環境の生成
env = GoRight()
env = DummyVecEnv([lambda: env])

# モデルの生成
model = PPO2('MlpPolicy', env, verbose=1)

# モデルの読み込み
#model = PPO2.load('go_right_model')

# モデルの学習
model.learn(total_timesteps=12800)

# モデルの保存
model.save('go_right_model')

# モデルのテスト
state = env.reset()
total_reward = 0
while True:
# 環境の描画
env.render()

# モデルの推論
action, _ = model.predict(state)

# 1ステップの実行
state, reward, done, info = env.step(action)
total_reward += reward

print('')
# エピソード完了
if done:
# 環境の描画
print('total_reward:', total_reward)
break

上記コードを実行するとコンソールに次のような表示がされます。

[実行結果]

A....

.A...

..A..

...A.

total_reward: [0.7]

学習がうまくいっているようで、一直線に右に向かって移動していることが分かります。