カスタムGym環境作成(27) - 広げたマップを強化学習で攻略(学習率を深ぼり2回目)

今回も、学習率を調整して結果がどう変わるか見ていきたいと思います。

[広くしたマップイメージ]

学習率の微調整2回目

前回の結果より、学習率1.5付近の結果も比較良かったので、さらに学習率1.6から2.5の結果も確認してみます。

ソースの修正箇所は、26-35行目となります。

[ソース]

train7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 警告を非表示
import os
import warnings
warnings.simplefilter('ignore')
import tensorflow as tf
tf.get_logger().setLevel("ERROR")

import gym
from env7 import MyEnv

from stable_baselines.common.vec_env import DummyVecEnv
#from stable_baselines import PPO2
from stable_baselines import ACKTR
from stable_baselines.bench import Monitor

# ログフォルダの作成
log_dir = './logs/'
os.makedirs(log_dir, exist_ok=True)

# 環境の生成
env = MyEnv()
env = Monitor(env, log_dir, allow_early_resets=True)
env = DummyVecEnv([lambda: env])

# モデルの生成
model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.6)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.7)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.8)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.9)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=2.0)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=2.1)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=2.2)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=2.3)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=2.4)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=2.5)

# モデルの学習
model.learn(total_timesteps=128000)

# モデルの保存
model.save('model7')

# モデルのテスト
state = env.reset()
total_reward = 0
while True:
# 環境の描画
env.render()

# モデルの推論
action, _ = model.predict(state)

# 1ステップの実行
state, reward, done, info = env.step(action)
total_reward += reward
print('reward:', reward, 'total_reward', total_reward)
print('-----------')

print('')
# エピソード完了
if done:
# 環境の描画
print('total_reward:', total_reward)
break

学習率を変更しながら実行し、それぞれの最終結果と平均報酬遷移(グラフ)を確認します。

[結果]

学習率 最終位置・最終報酬 平均報酬遷移
1.6
1.7
1.8
1.9
2.0
2.1
2.2
2.3
2.4
2.5

学習率1.9と2.0は全エピソードの半分くらいゴールしているんですが、完全攻略はできていません。

もう一歩のような気もするんですが・・・😔

カスタムGym環境作成(26) - 広げたマップを強化学習で攻略(学習率を深ぼり)

今回は、学習率を微調整して結果がどう変わるか見ていきたいと思います。

[広くしたマップイメージ]

学習率の微調整

前回の結果より、学習率0.1~1.0付近でのゴール回数が多いように感じられました。

今回は学習率を0.5から1.5まで0.1ずつ増やしてその結果を確認していきたいと思います。

ソースの修正箇所は、26-36行目となります。

[ソース]

train7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 警告を非表示
import os
import warnings
warnings.simplefilter('ignore')
import tensorflow as tf
tf.get_logger().setLevel("ERROR")

import gym
from env7 import MyEnv

from stable_baselines.common.vec_env import DummyVecEnv
#from stable_baselines import PPO2
from stable_baselines import ACKTR
from stable_baselines.bench import Monitor

# ログフォルダの作成
log_dir = './logs/'
os.makedirs(log_dir, exist_ok=True)

# 環境の生成
env = MyEnv()
env = Monitor(env, log_dir, allow_early_resets=True)
env = DummyVecEnv([lambda: env])

# モデルの生成
model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.5)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.6)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.7)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.8)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.9)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.0)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.1)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.2)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.3)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.4)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.5)

# モデルの学習
model.learn(total_timesteps=128000)

# モデルの保存
model.save('model7')

# モデルのテスト
state = env.reset()
total_reward = 0
while True:
# 環境の描画
env.render()

# モデルの推論
action, _ = model.predict(state)

# 1ステップの実行
state, reward, done, info = env.step(action)
total_reward += reward
print('reward:', reward, 'total_reward', total_reward)
print('-----------')

print('')
# エピソード完了
if done:
# 環境の描画
print('total_reward:', total_reward)
break

学習率を変更しながら実行し、それぞれの最終結果と平均報酬遷移(グラフ)を確認します。

[結果]

学習率 最終位置・最終報酬 平均報酬遷移
0.5
0.6
0.7
0.8
0.9
1.0
1.1
1.2
1.3
1.4
1.5

ゴール回数は増えているように見えますが、今回も攻略するまでには至りませんでした。

次回ももう少し学習率を調整を行ってみたいと思います。

カスタムGym環境作成(25) - 広げたマップを強化学習で攻略(報酬の簡易化)

前回は、学習アルゴリズムを変更(PPO2アルゴリズム使用)しましたがぜんぜんゴールできない結果となってしまいました。

学習アルゴリズムはACKTRに戻します。

[広くしたマップイメージ]

いろいろ対策を行いまして、こんがらがってきてしまったので今回は一旦報酬をシンプルにしてみたいと思います。

報酬の簡易化

具体的には、ゴール時の報酬を500にし、それ以外の報酬を-1とします。

報酬をシンプルにし、とにかくゴールできるように学習をしてほしいという狙いです。

修正箇所は、カスタムGym環境の99-120行目となります。

[ソース]

env7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import sys

import gym
import numpy as np
import gym.spaces

class MyEnv(gym.Env):
FIELD_TYPES = [
'S', # 0: スタート
'G', # 1: ゴール
' ', # 2: 平地
'山', # 3: 山(歩けない)
'☆', # 4: プレイヤー
'三', # 5: 橋
'川', # 6: 川
'林', # 7: 林
]
MAP = np.array([
[0, 3, 3, 3, 2, 3, 7, 7, 3, 2, 2, 3],
[2, 2, 2, 2, 2, 2, 7, 3, 2, 2, 2, 2],
[3, 2, 3, 2, 3, 7, 7, 2, 2, 2, 2, 3],
[3, 2, 2, 2, 7, 7, 6, 6, 3, 3, 2, 2],
[7, 7, 3, 2, 6, 6, 6, 3, 7, 7, 3, 3],
[3, 2, 3, 2, 3, 7, 3, 1, 2, 7, 2, 2],
[3, 7, 2, 2, 3, 2, 3, 3, 3, 3, 2, 7],
[2, 2, 3, 2, 2, 3, 2, 3, 2, 3, 3, 2],
[7, 3, 2, 7, 7, 2, 2, 2, 7, 6, 6, 7],
[2, 3, 2, 6, 6, 6, 2, 3, 2, 7, 2, 2],
[3, 3, 2, 3, 2, 3, 6, 2, 3, 2, 2, 3],
[7, 2, 2, 2, 3, 3, 6, 2, 7, 3, 2, 2],
])
#MAX_STEPS = 2000
MAX_STEPS = 5000

def __init__(self):
super().__init__()
# action_space, observation_space, reward_range を設定する
self.action_space = gym.spaces.Discrete(4) # 上下左右
self.observation_space = gym.spaces.Box(
low=0,
high=len(self.FIELD_TYPES),
shape=self.MAP.shape
)
self.reset()

def reset(self):
# 諸々の変数を初期化する
self.pos = self._find_pos('S')[0]
self.goal = self._find_pos('G')[0]
self.river = self._find_pos('川')
self.trees = self._find_pos('林')
self.start = self._find_pos('S')[0]
self.done = False
self.steps = 0
return self._observe()

def step(self, action):
# 1ステップ進める処理を記述。戻り値は observation, reward, done(ゲーム終了したか), info(追加の情報の辞書)
# 左上の座標を(0, 0)とする
if action == 0: # 右移動
next_pos = self.pos + [0, 1]
elif action == 1: # 左移動
next_pos = self.pos + [0, -1]
elif action == 2: # 下移動
next_pos = self.pos + [1, 0]
elif action == 3: # 上移動
next_pos = self.pos + [-1, 0]

if self._is_movable(next_pos):
self.pos = next_pos
moved = True
else:
moved = False

self.steps += 1
observation = self._observe()
reward = self._get_reward(self.pos, moved)
self.done = self._is_done()
return observation, reward, self.done, {}

def render(self, mode='console', close=False):
for row in self._observe():
for elem in row:
print(self.FIELD_TYPES[elem], end='')
print()

def _close(self):
pass

def _seed(self, seed=None):
pass

def _get_reward(self, pos, moved):
# 報酬を返す。
# - ゴールにたどり着くと 3000 ポイント
# - 川に入ったら -10 ポイント
# - 林に入ったら -3 ポイント
# - 1ステップごとに-1ポイント(できるだけ短いステップでゴールにたどり着きたい)
'''
if (self.start == pos).all(): # スタート位置に戻ってきたときのマイナス報酬
return -10
if moved:
if (self.goal == pos).all():
#return 3000
return 15000
for x in self.river:
if (x == pos).all():
return -10
for x in self.trees:
if (x == pos).all():
return -3
return -1
#return -3
else: # エージェントが動かなかった場合
return -10
'''
if (self.goal == pos).all():
return 500
else:
return -1

def _is_movable(self, pos):
# マップの中にいるか、歩けない場所にいないか
return (
0 <= pos[0] < self.MAP.shape[0]
and 0 <= pos[1] < self.MAP.shape[1]
and self.FIELD_TYPES[self.MAP[tuple(pos)]] != '山'
)

def _observe(self):
# マップにプレイヤーの位置を重ねて返す
observation = self.MAP.copy()
observation[tuple(self.pos)] = self.FIELD_TYPES.index('☆')
return observation

def _is_done(self):
# 最大で self.MAX_STEPS まで
if (self.pos == self.goal).all():
return True
elif self.steps > self.MAX_STEPS:
return True
else:
return False

def _find_pos(self, field_type):
return np.array(list(zip(*np.where(self.MAP == self.FIELD_TYPES.index(field_type)))))

学習率を変更しながら実行し、それぞれの最終結果と平均報酬遷移(グラフ)を確認します。

[結果]

学習率 最終位置・最終報酬 平均報酬遷移
0.01
0.05
0.1
0.5
1.0

学習完了(マップ攻略)とはいきませんでしたが、学習率0.1~1.0付近でのゴール回数が多いようです。

次回は、この学習率付近をもう少し深ぼりして調査したいと思います。

カスタムGym環境作成(23) - 広げたマップを強化学習で攻略(学習アルゴリム変更)

前回は、これまでの改善点をまとめましたが、まだ学習アルゴリズムの変更を試していないことに気づきました。

[広くしたマップイメージ]

というわけで今回は別の学習アルゴリズムを試してみます。

学習アルゴリズム変更

いままでは学習アルゴリズムとしてACKTRを使っていましたがPPO2に変更します。

修正箇所は12-13行目26-30行目となります。

[ソース]

train7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 警告を非表示
import os
import warnings
warnings.simplefilter('ignore')
import tensorflow as tf
tf.get_logger().setLevel("ERROR")

import gym
from env7 import MyEnv

from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2
#from stable_baselines import ACKTR
from stable_baselines.bench import Monitor

# ログフォルダの作成
log_dir = './logs/'
os.makedirs(log_dir, exist_ok=True)

# 環境の生成
env = MyEnv()
env = Monitor(env, log_dir, allow_early_resets=True)
env = DummyVecEnv([lambda: env])

# モデルの生成
model = PPO2('MlpPolicy', env, verbose=1, learning_rate=0.01)
#model = PPO2('MlpPolicy', env, verbose=1, learning_rate=0.05)
#model = PPO2('MlpPolicy', env, verbose=1, learning_rate=0.1)
#model = PPO2('MlpPolicy', env, verbose=1, learning_rate=0.5)
#model = PPO2('MlpPolicy', env, verbose=1, learning_rate=1.0)

# モデルの学習
model.learn(total_timesteps=128000)

# モデルの保存
model.save('model7')

# モデルのテスト
state = env.reset()
total_reward = 0
while True:
# 環境の描画
env.render()

# モデルの推論
action, _ = model.predict(state)

# 1ステップの実行
state, reward, done, info = env.step(action)
total_reward += reward
print('reward:', reward, 'total_reward', total_reward)
print('-----------')

print('')
# エピソード完了
if done:
# 環境の描画
print('total_reward:', total_reward)
break

学習率を変更しながら実行し、それぞれの最終結果と平均報酬遷移(グラフ)を確認します。

[結果]

学習率 最終位置・最終報酬 平均報酬遷移
0.01
0.05
0.1
0.5
1.0

ほとんどゴール地点から動かず、全くゴールまでたどり着いていません。

このカスタム環境にはPPO2アルゴリズムは向いていないようです。

一旦学習アルゴリズムはACKTRに戻して、次回は別の改善を行いたいと思います。

カスタムGym環境作成(22) - 広げたマップを強化学習で攻略(改善点まとめ)

これまで広げたマップに対していろいろ改善を行ってきましたが、まだ完全攻略するに至っていません。

[広くしたマップイメージ]

今後の戦略を練るために、一旦これまでの改善内容をまとめたいと思います。

改善点まとめ

  1. エージェントが動かない場合のマイナス報酬を増やす。
    エージェントが動かないステップが多かったので、エージェントが動かない場合のマイナス報酬を増やしてみました。(-1 ⇒ -10)

  2. スタート地点に戻ったときの報酬を設定。
    エージェントがスタート地点に戻った時の報酬を設定していなかったので、-10というマイナス報酬を設定しました。

  3. ゴール時の報酬をアップ。
    最終報酬がプラスになることがほとんどなかったので、ゴール時の報酬をアップさせました。(3000 ⇒ 15000)

  4. 学習回数を増やす。
    回数は少なめですがゴールすることあったので、もう少し頑張れば攻略できるのではないかと思い学習回数を5倍に増やしてみました。(128000 ⇒ 128000×5)

次回は別の改善を行い完全攻略を目指します。

カスタムGym環境作成(21) - 広げたマップを強化学習で攻略(学習回数を増やす)

前回、広げたマップに対してゴールしたときの報酬を増やみましたが、ほとんどゴールにたどり着くことができませんでした。

[広くしたマップイメージ]

今回は単純に学習回数を増やして、攻略を目指します。

学習回数を増やす

これまでの改善で、(たまたまかもしれませんが)何回かはゴールすることもあるようなのでもう少し学習回数を増やせば攻略できるのではないか・・・と考えました。

そこで今回は単純に学習回数をこれまでの5倍に増やしてみます。

修正箇所は32行目となります。

[ソース]

train7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 警告を非表示
import os
import warnings
warnings.simplefilter('ignore')
import tensorflow as tf
tf.get_logger().setLevel("ERROR")

import gym
from env7 import MyEnv

from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import ACKTR
from stable_baselines.bench import Monitor

# ログフォルダの作成
log_dir = './logs/'
os.makedirs(log_dir, exist_ok=True)

# 環境の生成
env = MyEnv()
env = Monitor(env, log_dir, allow_early_resets=True)
env = DummyVecEnv([lambda: env])

# モデルの生成
model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.01)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.05)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.1)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.5)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.0)

# モデルの学習
model.learn(total_timesteps=128000*5)

# モデルの保存
model.save('model7')

# モデルのテスト
state = env.reset()
total_reward = 0
while True:
# 環境の描画
env.render()

# モデルの推論
action, _ = model.predict(state)

# 1ステップの実行
state, reward, done, info = env.step(action)
total_reward += reward
print('reward:', reward, 'total_reward', total_reward)
print('-----------')

print('')
# エピソード完了
if done:
# 環境の描画
print('total_reward:', total_reward)
break

前回と同じように学習率を変更しながら実行し、それぞれの最終結果と平均報酬遷移(グラフ)を確認します。

[結果]

学習率 最終位置・最終報酬 平均報酬遷移
0.01
0.05
0.1
0.5
1.0

どの学習率でも同じような結果となりました。

ゴールまでたどり着く(最終報酬がプラスになる)ことは何度もあるのですが、その状態が続くことはありません。

つまりきちんと学習できていません。

行動報酬や学習回数を調整してきましたが、もしかしたら別の対策(切り口)が必要なのかもしれません。

次回は一旦これまでの改善点をまとめたいと思います。

カスタムGym環境作成(20) - 広げたマップを強化学習で攻略(報酬を改善3)

前回、広げたマップに対してエージェントがスタート位置に移動したときのマイナス報酬を増やしてみましたが、ゴールにたどり着くことができませんでした。

[広くしたマップイメージ]

今回は再度報酬を見直して、攻略を目指します。

最終報酬がプラスにならない問題の対策

これまでの結果を見るとプラス報酬になることがほとんどありませんでした。

そこで今回はゴール時の報酬を見直してみました。

具体的にはゴール時の報酬を3000から15000に変更してみました。

理由は、最終報酬が-20000となるケースが一番悪い結果のようなので、それ以前にゴールにたどり着けば最終報酬がプラスになるのではないかと考えたからです。

修正箇所は103~104行目となります。

[ソース]

env7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# activate openai_gym
import sys

import gym
import numpy as np
import gym.spaces

class MyEnv(gym.Env):
FIELD_TYPES = [
'S', # 0: スタート
'G', # 1: ゴール
' ', # 2: 平地
'山', # 3: 山(歩けない)
'☆', # 4: プレイヤー
'三', # 5: 橋
'川', # 6: 川
'林', # 7: 林
]
MAP = np.array([
[0, 3, 3, 3, 2, 3, 7, 7, 3, 2, 2, 3],
[2, 2, 2, 2, 2, 2, 7, 3, 2, 2, 2, 2],
[3, 2, 3, 2, 3, 7, 7, 2, 2, 2, 2, 3],
[3, 2, 2, 2, 7, 7, 6, 6, 3, 3, 2, 2],
[7, 7, 3, 2, 6, 6, 6, 3, 7, 7, 3, 3],
[3, 2, 3, 2, 3, 7, 3, 1, 2, 7, 2, 2],
[3, 7, 2, 2, 3, 2, 3, 3, 3, 3, 2, 7],
[2, 2, 3, 2, 2, 3, 2, 3, 2, 3, 3, 2],
[7, 3, 2, 7, 7, 2, 2, 2, 7, 6, 6, 7],
[2, 3, 2, 6, 6, 6, 2, 3, 2, 7, 2, 2],
[3, 3, 2, 3, 2, 3, 6, 2, 3, 2, 2, 3],
[7, 2, 2, 2, 3, 3, 6, 2, 7, 3, 2, 2],
])
MAX_STEPS = 2000

def __init__(self):
super().__init__()
# action_space, observation_space, reward_range を設定する
self.action_space = gym.spaces.Discrete(4) # 上下左右
self.observation_space = gym.spaces.Box(
low=0,
high=len(self.FIELD_TYPES),
shape=self.MAP.shape
)
self.reset()

def reset(self):
# 諸々の変数を初期化する
self.pos = self._find_pos('S')[0]
self.goal = self._find_pos('G')[0]
self.river = self._find_pos('川')
self.trees = self._find_pos('林')
self.start = self._find_pos('S')[0]
self.done = False
self.steps = 0
return self._observe()

def step(self, action):
# 1ステップ進める処理を記述。戻り値は observation, reward, done(ゲーム終了したか), info(追加の情報の辞書)
# 左上の座標を(0, 0)とする
if action == 0: # 右移動
next_pos = self.pos + [0, 1]
elif action == 1: # 左移動
next_pos = self.pos + [0, -1]
elif action == 2: # 下移動
next_pos = self.pos + [1, 0]
elif action == 3: # 上移動
next_pos = self.pos + [-1, 0]

if self._is_movable(next_pos):
self.pos = next_pos
moved = True
else:
moved = False

self.steps += 1
observation = self._observe()
reward = self._get_reward(self.pos, moved)
self.done = self._is_done()
return observation, reward, self.done, {}

def render(self, mode='console', close=False):
for row in self._observe():
for elem in row:
print(self.FIELD_TYPES[elem], end='')
print()

def _close(self):
pass

def _seed(self, seed=None):
pass

def _get_reward(self, pos, moved):
# 報酬を返す。
# - ゴールにたどり着くと 3000 ポイント
# - 川に入ったら -10 ポイント
# - 林に入ったら -3 ポイント
# - 1ステップごとに-1ポイント(できるだけ短いステップでゴールにたどり着きたい)
if (self.start == pos).all(): # スタート位置に戻ってきたときのマイナス報酬
return -10
if moved:
if (self.goal == pos).all():
#return 3000
return 15000
for x in self.river:
if (x == pos).all():
return -10
for x in self.trees:
if (x == pos).all():
return -3
return -1
else: # エージェントが動かなかった場合
return -10

def _is_movable(self, pos):
# マップの中にいるか、歩けない場所にいないか
return (
0 <= pos[0] < self.MAP.shape[0]
and 0 <= pos[1] < self.MAP.shape[1]
and self.FIELD_TYPES[self.MAP[tuple(pos)]] != '山'
)

def _observe(self):
# マップにプレイヤーの位置を重ねて返す
observation = self.MAP.copy()
observation[tuple(self.pos)] = self.FIELD_TYPES.index('☆')
return observation

def _is_done(self):
# 最大で self.MAX_STEPS まで
if (self.pos == self.goal).all():
return True
elif self.steps > self.MAX_STEPS:
return True
else:
return False

def _find_pos(self, field_type):
return np.array(list(zip(*np.where(self.MAP == self.FIELD_TYPES.index(field_type)))))

報酬を修正したこのカスタム環境に対して、前回と同じように学習率を変更しながら実行し、それぞれの最終結果と平均報酬遷移(グラフ)を確認します。

各実行結果をまとめると下記のようになりました。

[結果]

学習率 最終位置・最終報酬 平均報酬遷移
0.01
0.05
0.1
0.5
1.0

ゴールしたときの報酬を増やしたおかげで、ゴールしたときのエピソードがどこかは分かりやすくなったのですが、どの学習率でもたまにラッキーゴールをした感じで、マップを攻略した状態にはなっていません。

マップを攻略したモデルであれば、つねにプラス報酬であり続けるはずですので。。。

次回は、学習ステップ数を増やしてマップを攻略できるかどうか試したいと思います。

カスタムGym環境作成(19) - 広げたマップを強化学習で攻略(報酬を改善2)

前回、広げたマップに対してエージェントが移動しない場合のマイナス報酬を修正して攻略を試みましたが、ゴールにたどり着くことができませんでした。

[広くしたマップイメージ]

今回は再度報酬を見直して、攻略を目指します。

ゴール近くから動かないことの対策

前回の結果を見ていてまたまた問題だと思ったのが、スタート地点付近からエージェントが全く動かないことが多いということです。

そのため前回、移動しない場合のマイナス報酬をよりマイナスにしたのですが、それでもスタート位置付近にいるケースが多々ありました。

という訳で今回はスタート地点に移動したときの報酬を-10としてみました。

修正箇所は52行目、99~100行目となります。

[ソース]

env7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# activate openai_gym
import sys

import gym
import numpy as np
import gym.spaces

class MyEnv(gym.Env):
FIELD_TYPES = [
'S', # 0: スタート
'G', # 1: ゴール
' ', # 2: 平地
'山', # 3: 山(歩けない)
'☆', # 4: プレイヤー
'三', # 5: 橋
'川', # 6: 川
'林', # 7: 林
]
MAP = np.array([
[0, 3, 3, 3, 2, 3, 7, 7, 3, 2, 2, 3],
[2, 2, 2, 2, 2, 2, 7, 3, 2, 2, 2, 2],
[3, 2, 3, 2, 3, 7, 7, 2, 2, 2, 2, 3],
[3, 2, 2, 2, 7, 7, 6, 6, 3, 3, 2, 2],
[7, 7, 3, 2, 6, 6, 6, 3, 7, 7, 3, 3],
[3, 2, 3, 2, 3, 7, 3, 1, 2, 7, 2, 2],
[3, 7, 2, 2, 3, 2, 3, 3, 3, 3, 2, 7],
[2, 2, 3, 2, 2, 3, 2, 3, 2, 3, 3, 2],
[7, 3, 2, 7, 7, 2, 2, 2, 7, 6, 6, 7],
[2, 3, 2, 6, 6, 6, 2, 3, 2, 7, 2, 2],
[3, 3, 2, 3, 2, 3, 6, 2, 3, 2, 2, 3],
[7, 2, 2, 2, 3, 3, 6, 2, 7, 3, 2, 2],
])
MAX_STEPS = 2000

def __init__(self):
super().__init__()
# action_space, observation_space, reward_range を設定する
self.action_space = gym.spaces.Discrete(4) # 上下左右
self.observation_space = gym.spaces.Box(
low=0,
high=len(self.FIELD_TYPES),
shape=self.MAP.shape
)
self.reset()

def reset(self):
# 諸々の変数を初期化する
self.pos = self._find_pos('S')[0]
self.goal = self._find_pos('G')[0]
self.river = self._find_pos('川')
self.trees = self._find_pos('林')
self.start = self._find_pos('S')[0]
self.done = False
self.steps = 0
return self._observe()

def step(self, action):
# 1ステップ進める処理を記述。戻り値は observation, reward, done(ゲーム終了したか), info(追加の情報の辞書)
# 左上の座標を(0, 0)とする
if action == 0: # 右移動
next_pos = self.pos + [0, 1]
elif action == 1: # 左移動
next_pos = self.pos + [0, -1]
elif action == 2: # 下移動
next_pos = self.pos + [1, 0]
elif action == 3: # 上移動
next_pos = self.pos + [-1, 0]

if self._is_movable(next_pos):
self.pos = next_pos
moved = True
else:
moved = False

self.steps += 1
observation = self._observe()
reward = self._get_reward(self.pos, moved)
self.done = self._is_done()
return observation, reward, self.done, {}

def render(self, mode='console', close=False):
for row in self._observe():
for elem in row:
print(self.FIELD_TYPES[elem], end='')
print()

def _close(self):
pass

def _seed(self, seed=None):
pass

def _get_reward(self, pos, moved):
# 報酬を返す。
# - ゴールにたどり着くと 3000 ポイント
# - 川に入ったら -10 ポイント
# - 林に入ったら -3 ポイント
# - 1ステップごとに-1ポイント(できるだけ短いステップでゴールにたどり着きたい)
if (self.start == pos).all(): # スタート位置に戻ってきたときのマイナス報酬
return -10
if moved:
if (self.goal == pos).all():
return 3000
for x in self.river:
if (x == pos).all():
return -10
for x in self.trees:
if (x == pos).all():
return -3
return -1
else: # エージェントが動かなかった場合
return -10

def _is_movable(self, pos):
# マップの中にいるか、歩けない場所にいないか
return (
0 <= pos[0] < self.MAP.shape[0]
and 0 <= pos[1] < self.MAP.shape[1]
and self.FIELD_TYPES[self.MAP[tuple(pos)]] != '山'
)

def _observe(self):
# マップにプレイヤーの位置を重ねて返す
observation = self.MAP.copy()
observation[tuple(self.pos)] = self.FIELD_TYPES.index('☆')
return observation

def _is_done(self):
# 最大で self.MAX_STEPS まで
if (self.pos == self.goal).all():
return True
elif self.steps > self.MAX_STEPS:
return True
else:
return False

def _find_pos(self, field_type):
return np.array(list(zip(*np.where(self.MAP == self.FIELD_TYPES.index(field_type)))))

報酬を修正したこのカスタム環境に対して、前回と同じように学習率を変更しながら実行し、それぞれの最終結果と平均報酬遷移(グラフ)を確認します。

各実行結果をまとめると下記のようになりました。

[結果]

学習率 最終位置・最終報酬 平均報酬遷移
0.01
0.05
0.1
0.5
1.0
5.0
10.0
50.0

学習率が1.0以下の場合は、スタート地点から移動したまにゴールすることもありますが、学習が完了することはありませんでした。

学習率が5.0以上の場合は、移動をあきらめたのかスタート地点付近にとどまり続けてしまっています。

次回はまた報酬を見直しゴールを目指します。

(また学習率を5.0以上に設定した場合ですが、結果があまりにも悪いのでもう検証しないことにします)

カスタムGym環境作成(18) - 広げたマップを強化学習で攻略(報酬を改善1)

前回、広げたマップに対していろいろな学習率で攻略を試みましたが、ゴールにたどり着くことができませんでした。

[広くしたマップイメージ]

今回はカスタム環境の報酬を改善して、攻略を目指します。

報酬を改善

前回の結果を見ていて一番問題だと思ったのが、スタート地点付近からエージェントが全く動かないことでした。

これを改善するために、エージェントが動かない場合のマイナス報酬を増やして(-1 ⇒ -10)みたいと思います。

修正箇所は108~109行目となります。

[ソース]

env7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# activate openai_gym
import sys

import gym
import numpy as np
import gym.spaces

class MyEnv(gym.Env):
FIELD_TYPES = [
'S', # 0: スタート
'G', # 1: ゴール
' ', # 2: 平地
'山', # 3: 山(歩けない)
'☆', # 4: プレイヤー
'三', # 5: 橋
'川', # 6: 川
'林', # 7: 林
]
MAP = np.array([
[0, 3, 3, 3, 2, 3, 7, 7, 3, 2, 2, 3],
[2, 2, 2, 2, 2, 2, 7, 3, 2, 2, 2, 2],
[3, 2, 3, 2, 3, 7, 7, 2, 2, 2, 2, 3],
[3, 2, 2, 2, 7, 7, 6, 6, 3, 3, 2, 2],
[7, 7, 3, 2, 6, 6, 6, 3, 7, 7, 3, 3],
[3, 2, 3, 2, 3, 7, 3, 1, 2, 7, 2, 2],
[3, 7, 2, 2, 3, 2, 3, 3, 3, 3, 2, 7],
[2, 2, 3, 2, 2, 3, 2, 3, 2, 3, 3, 2],
[7, 3, 2, 7, 7, 2, 2, 2, 7, 6, 6, 7],
[2, 3, 2, 6, 6, 6, 2, 3, 2, 7, 2, 2],
[3, 3, 2, 3, 2, 3, 6, 2, 3, 2, 2, 3],
[7, 2, 2, 2, 3, 3, 6, 2, 7, 3, 2, 2],
])
MAX_STEPS = 2000

def __init__(self):
super().__init__()
# action_space, observation_space, reward_range を設定する
self.action_space = gym.spaces.Discrete(4) # 上下左右
self.observation_space = gym.spaces.Box(
low=0,
high=len(self.FIELD_TYPES),
shape=self.MAP.shape
)
self.reset()

def reset(self):
# 諸々の変数を初期化する
self.pos = self._find_pos('S')[0]
self.goal = self._find_pos('G')[0]
self.river = self._find_pos('川')
self.trees = self._find_pos('林')
self.done = False
self.steps = 0
return self._observe()

def step(self, action):
# 1ステップ進める処理を記述。戻り値は observation, reward, done(ゲーム終了したか), info(追加の情報の辞書)
# 左上の座標を(0, 0)とする
if action == 0: # 右移動
next_pos = self.pos + [0, 1]
elif action == 1: # 左移動
next_pos = self.pos + [0, -1]
elif action == 2: # 下移動
next_pos = self.pos + [1, 0]
elif action == 3: # 上移動
next_pos = self.pos + [-1, 0]

if self._is_movable(next_pos):
self.pos = next_pos
moved = True
else:
moved = False

self.steps += 1
observation = self._observe()
reward = self._get_reward(self.pos, moved)
self.done = self._is_done()
return observation, reward, self.done, {}

def render(self, mode='console', close=False):
for row in self._observe():
for elem in row:
print(self.FIELD_TYPES[elem], end='')
print()

def _close(self):
pass

def _seed(self, seed=None):
pass

def _get_reward(self, pos, moved):
# 報酬を返す。
# - ゴールにたどり着くと 3000 ポイント
# - 川に入ったら -10 ポイント
# - 林に入ったら -3 ポイント
# - 1ステップごとに-1ポイント(できるだけ短いステップでゴールにたどり着きたい)
if moved:
if (self.goal == pos).all():
return 3000
for x in self.river:
if (x == pos).all():
return -10
for x in self.trees:
if (x == pos).all():
return -3
return -1
else: # エージェントが動かなかった場合
return -10

def _is_movable(self, pos):
# マップの中にいるか、歩けない場所にいないか
return (
0 <= pos[0] < self.MAP.shape[0]
and 0 <= pos[1] < self.MAP.shape[1]
and self.FIELD_TYPES[self.MAP[tuple(pos)]] != '山'
)

def _observe(self):
# マップにプレイヤーの位置を重ねて返す
observation = self.MAP.copy()
observation[tuple(self.pos)] = self.FIELD_TYPES.index('☆')
return observation

def _is_done(self):
# 最大で self.MAX_STEPS まで
if (self.pos == self.goal).all():
return True
elif self.steps > self.MAX_STEPS:
return True
else:
return False

def _find_pos(self, field_type):
return np.array(list(zip(*np.where(self.MAP == self.FIELD_TYPES.index(field_type)))))

報酬を修正したこのカスタム環境に対して、前回と同じように学習率を変更しながら実行し、それぞれの最終結果と平均報酬遷移(グラフ)を確認します。

各実行結果をまとめると下記のようになりました。

[結果]

学習率 最終位置・最終報酬 平均報酬遷移
0.01
0.05
0.1
0.5
1.0
5.0
10.0
50.0

学習率0.05の最終結果をみるとたまたまゴールしているようです。

その他の学習率の平均報酬遷移グラフをみるとたまにゴールすることはあるようですが、きちんと学習して常にゴールをするという段階には達していません。

次回はまた別の報酬を変更して、広げたマップの攻略を目指します。

カスタムGym環境作成(17) - 広げたマップを強化学習で攻略

今回は、広げたマップを強化学習で攻略していきます。

[広くしたマップイメージ]

広げたマップマップを強化学習

前々回に実装したカスタムGym環境(env7.py)を9行目で読み込み、強化学習を行います。

学習アルゴリズムはACKTR(25~32行目)で、学習率は0.01から50.0に段階的に変更して結果がどのように変わるか確認していきます。

学習ステップ数は128000(35行目)としています。

[ソース]

train7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 警告を非表示
import os
import warnings
warnings.simplefilter('ignore')
import tensorflow as tf
tf.get_logger().setLevel("ERROR")

import gym
from env7 import MyEnv

from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import ACKTR
from stable_baselines.bench import Monitor

# ログフォルダの作成
log_dir = './logs/'
os.makedirs(log_dir, exist_ok=True)

# 環境の生成
env = MyEnv()
env = Monitor(env, log_dir, allow_early_resets=True)
env = DummyVecEnv([lambda: env])

# モデルの生成
model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.01)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.05)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.1)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=0.5)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=1.0)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=5.0)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=10.0)
#model = ACKTR('MlpPolicy', env, verbose=1, learning_rate=50.0)

# モデルの学習
model.learn(total_timesteps=128000)

# モデルの保存
model.save('model7')

# モデルのテスト
state = env.reset()
total_reward = 0
while True:
# 環境の描画
env.render()

# モデルの推論
action, _ = model.predict(state)

# 1ステップの実行
state, reward, done, info = env.step(action)
total_reward += reward
print('reward:', reward, 'total_reward', total_reward)
print('-----------')

print('')
# エピソード完了
if done:
# 環境の描画
print('total_reward:', total_reward)
break

各実行結果をまとめると下記のようになりました。

[結果]

学習率 最終位置・最終報酬 平均報酬遷移
0.01
0.05
0.1
0.5
1.0
5.0
10.0
50.0

全然ゴールできていません。

ランダムで実行していたときは、半分ほどゴールまでたどり着いていたのでランダム実行よりはるかに劣る結果ということになります😥

スタート地点付近からほとんど動かないことが多く、学習率の調整以前の問題だと考えられます。

ということで次回はカスタム環境の報酬設定を見直していきたいと思います。