Gym Retro - 利用できるゲーム一覧

Gym Retro環境で利用できるゲームの一覧を取得します。

Gym Retroで利用できるゲーム一覧

下記のコードを準備します。

[コード]

1
2
3
import retro
for i, game in enumerate(retro.data.list_games()):
print(i, game, retro.data.list_states(game))

実行すると下記のようにゲーム一覧が表示されます。

[実行結果]

0 1942-Nes ['1Player.Level1']
1 1943-Nes ['Level1']
2 3NinjasKickBack-Genesis ['1Player.Colt.Level1']
3 8Eyes-Nes ['1Player.Arabia.Level1']
4 AaahhRealMonsters-Genesis ['Level1']
5 AbadoxTheDeadlyInnerWar-Nes ['Level1']
6 AcceleBrid-Snes ['SilverMarine.DefaultSettings.Level1']
7 ActRaiser2-Snes ['Level1']
8 ActionPachio-Snes ['Level1']
9 AddamsFamily-GameBoy ['Level1']
(途中略)
985 ZombiesAteMyNeighbors-Snes ['Start']
986 ZoolNinjaOfTheNthDimension-Genesis ['Level1']
987 ZoolNinjaOfTheNthDimension-Sms ['Level1']
988 ZoolNinjaOfTheNthDimension-Snes ['Level1']

989種類のゲームが利用できることが確認できます。

ここで表示されるのは「ゲームインテグレーション」と呼ばれ、「開始状態」「報酬」「エピソード完了」という情報が定義されているゲームになります。

Gym Retro - シューティングゲームを実行する② 学習編

前回はシューティングゲームをランダムで実行しましたが、今回は強化学習アルゴリズムを実装してゲームを攻略してみます。

baselinesインストール

baselinesがインストールされていない場合は、下記コマンドを実行してインストールしておきます。

1
2
3
git clone https://github.com/openai/baselines.git
cd baselines
pip install -e .

Airstrikerを学習して実行

Airstrikerを学習して実行するコードは次のようになります。

  • コールバック
    10更新ごとにベストモデルの保存を行います。
  • Airstrikerラッパー
    行動パターンを4096通りから3通りに減らして学習効率を上げています。
  • CustomRewardAndDoneラッパー
    報酬を20で割って範囲を「0~1.0」に近づけます。報酬は「0~1.0」の範囲に収めると、学習が安定する傾向があります。

[コード]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import retro
import time
import os
import gym
import numpy as np
import datetime
import pytz
from stable_baselines.results_plotter import load_results, ts2xy
from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2
from stable_baselines.common import set_global_seeds
from stable_baselines.bench import Monitor
from baselines.common.retro_wrappers import *
from util import log_dir, callback, AirstrikerDiscretizer, CustomRewardAndDoneEnv

# ログフォルダの生成
log_dir = './logs/'
os.makedirs(log_dir, exist_ok=True)

# コールバック
best_mean_reward = -np.inf
nupdates = 1
def callback(_locals, _globals):
global nupdates
global best_mean_reward

# 10更新毎
if (nupdates + 1) % 10 == 0:
# 平均エピソード長、平均報酬の取得
x, y = ts2xy(load_results(log_dir), 'timesteps')
if len(y) > 0:
# 直近10件の平均報酬
mean_reward = np.mean(y[-10:])

# 平均報酬がベスト報酬以上の時はモデルを保存
update_model = mean_reward > best_mean_reward
if update_model:
best_mean_reward = mean_reward
_locals['self'].save('airstriker_model')

# ログ
print('time: {}, nupdates: {}, mean: {:.2f}, best_mean: {:.2f}, model_update: {}'.format(
datetime.datetime.now(pytz.timezone('Asia/Tokyo')),
nupdates, mean_reward, best_mean_reward, update_model))
nupdates += 1
return True

# Airstrikerラッパー
class AirstrikerDiscretizer(gym.ActionWrapper):
# 初期化
def __init__(self, env):
super(AirstrikerDiscretizer, self).__init__(env)
buttons = ['B', 'A', 'MODE', 'START', 'UP', 'DOWN', 'LEFT', 'RIGHT', 'C', 'Y', 'X', 'Z']
actions = [['LEFT'], ['RIGHT'], ['B']]
self._actions = []
for action in actions:
arr = np.array([False] * 12)
for button in action:
arr[buttons.index(button)] = True
self._actions.append(arr)
self.action_space = gym.spaces.Discrete(len(self._actions))
# 行動の取得
def action(self, a):
return self._actions[a].copy()

# CustomRewardAndDoneラッパー
class CustomRewardAndDoneEnv(gym.Wrapper):
# 初期化
def __init__(self, env):
super(CustomRewardAndDoneEnv, self).__init__(env)
# ステップ
def step(self, action):
state, rew, done, info = self.env.step(action)
# 報酬の変更
rew /= 20
# エピソード完了の変更
if info['gameover'] == 1:
done = True
return state, rew, done, info

# 環境の生成
env = retro.make(game='Airstriker-Genesis', state='Level1')
env = AirstrikerDiscretizer(env) # 行動空間を離散空間に変換
env = CustomRewardAndDoneEnv(env) # 報酬とエピソード完了の変更
env = StochasticFrameSkip(env, n=4, stickprob=0.25) # スティッキーフレームスキップ
env = Downsample(env, 2) # ダウンサンプリング
env = Rgb2gray(env) # グレースケール
env = FrameStack(env, 4) # フレームスタック
env = ScaledFloatFrame(env) # 状態の正規化
env = Monitor(env, log_dir, allow_early_resets=True)
print('行動空間: ', env.action_space)
print('状態空間: ', env.observation_space)

# シードの指定
env.seed(0)
set_global_seeds(0)

# ベクトル化環境の作成
env = DummyVecEnv([lambda: env])

# モデルの作成
model = PPO2('CnnPolicy', env, verbose=0)

# モデルの読み込み
#model = PPO2.load('airstriker_model', env=env, verbose=0)

# モデルの学習
model.learn(total_timesteps=128000, callback=callback)

# モデルのテスト
state = env.reset()
total_reward = 0
while True:
# 環境の描画
env.render()
# スリープ
time.sleep(1/60)
# モデルの推論
action, _ = model.predict(state)
# 1ステップ実行
state, reward, done, info = env.step(action)
total_reward += reward[0]
# エピソード完了
if done:
print('reward:', total_reward)
state = env.reset()
total_reward = 0

上記コードを実行すると最初に学習処理が実行されます。(私の環境ではGPUなしで60分以上かかりました。)

それが完了されると下記のようなウィンドウが表示され、学習したモデルでゲームが実行されます。

実行結果

ランダム行動よりまともにゲームができているようです。

Gym Retro - シューティングゲームを実行する① ランダム編

Gym Retroはファミコン、スーパーファミコン、メガドライブ、PCエンジンなどのレトロゲーム機のエミュレータが搭載されていて、昔の懐かしいゲームを実行することができます。

今回は標準付属されているシューティングゲーム「Airstriker」をランダムで実行してみます。

Gym Retroインストール

Gym Retroをインストールするために下記コマンドを実行します。

1
pip install gym-retro

Airstrikerをランダム実行

Airstrikerをランダム実行するコードは次のようになります。

[コード]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import retro
import time

# 環境の作成
env = retro.make(game='Airstriker-Genesis', state='Level1')

# ランダム行動による動作確認
state = env.reset()
while True:
# 環境の描画
env.render()

# スリープ
time.sleep(1/60)

# 1ステップ実行
state, reward, done, info = env.step(env.action_space.sample())
print('reward:', reward)

# エピソード完了
if done:
print('done')
state = env.reset()

上記コードを実行すると下記のようなウィンドウが現れてシューティングゲームがランダム行動で実行されます。

実行結果

ランダム行動のためすぐやられてしまいます。

次回はこのシューティングゲームを強化学習で攻略してみます。

Python OpenAI Gym - モデルの読み込み

前回ファイルに保存した学習済みモデルを読み込んで利用してみます。

モデルの読み込み

学習したモデルを読み込むコードは次のようになります。

学習したモデルの読み込みにはload関数を利用します。

[コード]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import gym
from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2

# 環境の作成
env = gym.make('CartPole-v1')
env = DummyVecEnv([lambda: env])

# モデルの読み込み
model = PPO2.load('sample')

# モデルのテスト
state = env.reset()
for i in range(200):
# 環境の描画
env.render()

# モデルの推論
action, _ = model.predict(state)

# 1ステップ実行
state, rewards, done, info = env.step(action)

# エピソード完了判定
if done:
break

# 環境のクローズ
env.close()

上記コードを実行すると「CartPole-v1」が表示され、棒のバランスがとれていることを確認できます。

Python OpenAI Gym - モデルの保存

モデルを利用するたびに毎回学習するのは時間がかかります。

今回は学習したモデルを保存して、次回はそのモデルを読み込んで利用してみます。

モデルの保存

学習したモデルを保存するコードは次のようになります。

学習処理learn関数は少し時間がかかります。学習したモデルの保存はsave関数を利用します。

[コード]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gym
from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2

# 環境の作成
env = gym.make('CartPole-v1')
env = DummyVecEnv([lambda: env])

# モデルの生成
model = PPO2('MlpPolicy', env, verbose=1)

# モデルの学習
model.learn(total_timesteps=100000)

# モデルの保存
model.save('sample')

上記コードを実行するとsample.zipという学習済みモデルファイルが作成されます。

次回は、この学習済みモデルの読み込みを行います。

Python OpenAI Gym - カスタムGym環境の学習

自作の環境(カスタムGym環境)を学習させて実行してみます。

カスタムGym環境の作成

右への移動を学ぶ環境GoRightを実装します。(前回実行したもののと同じものとなります。)

エージェントが左右に移動する5マスの環境になります。

[コード]

go_right.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import numpy as np
import gym

# 右への移動を学ぶ環境
class GoRight(gym.Env):
# 定数定義
GRID_SIZE = 5
LEFT = 0
RIGHT = 1

# 初期化
def __init__(self):
super(GoRight, self).__init__()
# グリッドのサイズ
self.grid_size = self.GRID_SIZE
# 初期位置の指定
self.agent_pos = self.GRID_SIZE - 1
# 行動空間と状態空間の定義
self.action_space = gym.spaces.Discrete(2)
self.observation_space = gym.spaces.Box(low=0, high=self.GRID_SIZE - 1, shape=(1,), dtype=np.float32)

# 環境のリセット
def reset(self):
# 初期位置の指定
self.agent_pos = 0
# 初期位置をfloat32のnumpy配列に変換
return np.array(self.agent_pos).astype(np.float32)

# 環境の1ステップ実行
def step(self, action):
# 移動
if action == self.LEFT:
self.agent_pos -= 1
elif action == self.RIGHT:
self.agent_pos += 1
self.agent_pos = np.clip(self.agent_pos, 0, self.GRID_SIZE)
# エピソード完了の計算
done = self.agent_pos == self.GRID_SIZE - 1
# 報酬の計算
reward = 1 if done else - 0.1
return np.array(self.agent_pos).astype(np.float32), reward, done, {}

# 環境の描画
def render(self, mode='console', close=False):
# エージェントはA、他は.で表現する
print('.' * self.agent_pos, end='')
print('A', end='')
print('.' * (self.GRID_SIZE - 1 - self.agent_pos))

カスタムGym環境を学習させて実行

PPOで学習させて実行します。

[コード]

train_go_right.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import gym
from go_right import GoRight

from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2

# 環境の生成
env = GoRight()
env = DummyVecEnv([lambda: env])

# モデルの生成
model = PPO2('MlpPolicy', env, verbose=1)

# モデルの読み込み
#model = PPO2.load('go_right_model')

# モデルの学習
model.learn(total_timesteps=12800)

# モデルの保存
model.save('go_right_model')

# モデルのテスト
state = env.reset()
total_reward = 0
while True:
# 環境の描画
env.render()

# モデルの推論
action, _ = model.predict(state)

# 1ステップの実行
state, reward, done, info = env.step(action)
total_reward += reward

print('')
# エピソード完了
if done:
# 環境の描画
print('total_reward:', total_reward)
break

上記コードを実行するとコンソールに次のような表示がされます。

[実行結果]

A....

.A...

..A..

...A.

total_reward: [0.7]

学習がうまくいっているようで、一直線に右に向かって移動していることが分かります。

Python OpenAI Gym - カスタムGym環境の作成

OpenAI Gymで準備されている環境ではなく、自作の環境(カスタムGym環境)を作成してみます。

カスタムGym環境の作成

右への移動を学ぶ環境GoRightを実装します。

エージェントが左右に移動する5マスの環境になります。

[コード]

go_right.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import numpy as np
import gym

# 右への移動を学ぶ環境
class GoRight(gym.Env):
# 定数定義
GRID_SIZE = 5
LEFT = 0
RIGHT = 1

# 初期化
def __init__(self):
super(GoRight, self).__init__()
# グリッドのサイズ
self.grid_size = self.GRID_SIZE
# 初期位置の指定
self.agent_pos = self.GRID_SIZE - 1
# 行動空間と状態空間の定義
self.action_space = gym.spaces.Discrete(2)
self.observation_space = gym.spaces.Box(low=0, high=self.GRID_SIZE - 1, shape=(1,), dtype=np.float32)

# 環境のリセット
def reset(self):
# 初期位置の指定
self.agent_pos = 0
# 初期位置をfloat32のnumpy配列に変換
return np.array(self.agent_pos).astype(np.float32)

# 環境の1ステップ実行
def step(self, action):
# 移動
if action == self.LEFT:
self.agent_pos -= 1
elif action == self.RIGHT:
self.agent_pos += 1
self.agent_pos = np.clip(self.agent_pos, 0, self.GRID_SIZE)
# エピソード完了の計算
done = self.agent_pos == self.GRID_SIZE - 1
# 報酬の計算
reward = 1 if done else - 0.1
return np.array(self.agent_pos).astype(np.float32), reward, done, {}

# 環境の描画
def render(self, mode='console', close=False):
# エージェントはA、他は.で表現する
print('.' * self.agent_pos, end='')
print('A', end='')
print('.' * (self.GRID_SIZE - 1 - self.agent_pos))

カスタムGym環境の動作確認

自作した環境をランダム行動で実行してみます。

gym.make関数を使わずに、GoRightクラスを直接生成しています。

[コード]

test_go_right.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import gym
from go_right import GoRight

# 環境の生成
env = GoRight()

# 1エピソードのループ
state = env.reset()

while True:
# ランダム行動の取得
action = env.action_space.sample()
# 1ステップの実行
state, reward, done, info = env.step(action)
# 環境の描画
env.render()
print('reward:', reward)
# エピソード完了
if done:
print('done')
break

上記コードを実行するとコンソールに次のような表示がされます。
(ランダム行動のため実行結果は毎回異なります。)

[実行結果]

reward: -0.1
.A...
reward: -0.1
A....
reward: -0.1
.A...
reward: -0.1
..A..
reward: -0.1
...A.
reward: 1
....A
done

エージェントAが左右に移動し、最終的に一番右に移動し終了していることが分かります。

Python OpenAI Gym - CartPole(棒たてゲーム)を試す② 強化学習編

Stable Baselinesという強化学習アルゴリズムを使ってCartPoleを実行します。

インストール

下記のコマンドを実行しStable Baselinesを準備します。

1
2
3
4
pip install stable-baselines[mpi]
pip install tensorflow==1.14.0
pip install pyqt5
pip install imageio

強化学習アルゴリズムを使ってCartPole実行

強化学習のモデルを作成し、100,000回学習を行ってからCartPoleを実行してみます。

[コード]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import gym
from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2

# 環境の作成
env = gym.make('CartPole-v0')
env = DummyVecEnv([lambda: env])

# モデルの作成
model = PPO2('MlpPolicy', env, verbose=1)

# モデルの学習
model.learn(total_timesteps=100000)

# モデルのテスト
state = env.reset()
for i in range(200):
# 環境の描画
env.render()

# モデルの推論
action, _ = model.predict(state, deterministic=True)

# 1ステップ実行
state, rewards, done, info = env.step(action)

# エピソード完了判定
if done:
break

# 環境のクローズ
env.close()

実行してみると、棒が倒れることなくうまくバランスをとっていることが確認できます。

実行結果

Python OpenAI Gym - CartPole(棒たてゲーム)を試す①

OpenAI Gymに含まれるCartPole(棒たてゲーム)を試してみます。

インストール

下記のコマンドを実行しOpenAI Gym環境を準備します。

1
pip install gym

CartPole(棒たてゲーム)を実行する

ランダムにカートを動かすコードを準備します。

[コード]

1
2
3
4
5
6
7
8
9
10
import gym

# 環境の作成
env = gym.make('CartPole-v0')

# ランダム行動による動作確認
env.reset()
while True:
env.render()
env.step(env.action_space.sample())

実行すると次のような画面が表示されますが、ランダム行動のためすぐに棒が倒れてしまいます。

実行結果

Python AnyTrading - 投資の強化学習環境を試す② 学習編

FXのトレードを学習させて、利益総額が増えるかどうかを確認します。

インストール

Windowsで実行する場合、Microsoft MPIをインストールする必要があります。

Microsoft MPI - https://www.microsoft.com/en-us/download/details.aspx?id=100593

さらに下記のコマンドを実行し学習のための環境を準備します。

1
2
3
4
pip install stable-baselines[mpi]
pip install tensorflow==1.14.0
pip install pyqt5
pip install imageio

学習投資

FXトレードを学習してから投資を行う処理を実装します。

[コード]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import gym
import gym_anytrading
import matplotlib.pyplot as plt
import os
from gym_anytrading.envs import TradingEnv, ForexEnv, StocksEnv, Actions, Positions
from gym_anytrading.datasets import FOREX_EURUSD_1H_ASK, STOCKS_GOOGL
from stable_baselines.common import set_global_seeds

from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2
from stable_baselines.bench import Monitor

# ログフォルダの作成
log_dir = './logs/'
os.makedirs(log_dir, exist_ok=True)

# 環境を生成(frame_boundはデータセット内の訓練範囲を開始行数と終了行数で指定)
env = gym.make('forex-v0', frame_bound=(50, 100), window_size=10)
env = Monitor(env, log_dir, allow_early_resets=True)

# シードの指定
env.seed(0)
set_global_seeds(0)

# ベクトル化環境の生成
env = DummyVecEnv([lambda: env])

# モデルの生成
model = PPO2('MlpPolicy', env, verbose=1)

# モデルの読み込み(学習済みデータがある場合)
# model = PPO2.load('trading_model')

# モデルの学習
model.learn(total_timesteps=128000)

# モデルの保存
model.save('trading_model')

# モデルのテスト
env = gym.make('forex-v0', frame_bound=(50, 100), window_size=10)
env.seed(0)
state = env.reset()
while True:
# 行動の取得
action, _ = model.predict(state)
# 1ステップ実行
state, reward, done, info = env.step(action)
# エピソード完了
if done:
print('info:', info)
break

# グラフのプロット
plt.cla()
env.render_all()
plt.show()

実行結果は下記のとおりです。

[実行結果]

info: {'total_reward': 76.70000000000952, 'total_profit': 0.993213633946179, 'position': 1}

累計報酬(total_reward)、純利益(total_profit)、ポジション(position:0がショート、1がロング)が表示されます。

ランダムの時と比べてあまり結果がよくなりませんでした・・・というよりむしろ成績が落ちていて、もう少し理解を深める必要がありそうです。

グラフに表示される結果は以下の通りです。

結果

赤●が「0:Sell」で落ちる予想、緑●が「1:Buy」で上がる予想を表します。