カートポールをDQN(Deep Q-network)で解いていきます。
カートポールは棒を倒さないようにバランスをとるゲームです。
今回の強化学習のポイントは下記です。
目的は棒を倒さないようにバランスをとること。
1エピソードは棒を倒すまで。
状態は「カートの位置」「カートの速度」「棒の角度」「棒の角速度」の4種類。
行動は「カートの左移動」「カートの右移動」の2種類。
報酬はエピソード完了時に190ステップ以上で+1.
パラメータ更新間隔は行動1回ごと。
DQNでは行動価値関数を表形式ではなく、ニューラルネットワークで表現します。 またそれ以外にもDQNでは次の4つの工夫がされています。
Experience Replayd(経験を貯めておきあとでランダムに学習する)
Fixed Target Q-Network(更新計算のためにニューラルネットワークをもう1つ作る)
Reward Clipping(報酬のスケールを-1,0,1に統一する)
Huber Loss(誤差が大きい場合でも安定している関数huberを使う)
まずはGoogle Colabaratoryで動作確認するために環境を構築します。 tensorflowのバージョンは1.13.1が必須のため、次のコマンドを実行します。
1 2 !pip uninstall tensorflow -y !pip install tensorflow==1.13.1
カートポールの様子をアニメーションで確認するために、ディスプレイの設定を行います。
1 2 3 4 5 6 7 8 9 10 11 12 # ディスプレイ設定のインストール !apt-get -qq -y install xvfb freeglut3-dev ffmpeg> /dev/null !pip install pyglet==1.3.2 !pip install pyopengl !pip install pyvirtualdisplay # ディスプレイ設定の適用 from pyvirtualdisplay import Display import os disp = Display(visible=0, size=(1024, 768)) disp.start() os.environ['DISPLAY'] = ':' + str(disp.display) + '.' + str(disp.screen)
ここでいったんランタイムの再接続が必要になります。
次に必要なパッケージをインポートします。
1 2 3 4 5 6 7 8 import gymimport numpy as npfrom keras.models import Sequentialfrom keras.layers import Densefrom keras.optimizers import Adamfrom collections import dequefrom tensorflow.losses import huber_loss
パラメータを準備します。パラメータの内容はコメントを参照して下さい。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 NUM_EPISODES = 500 MAX_STEPS = 200 GAMMA = 0.99 WARMUP = 10 E_START = 1.0 E_STOP = 0.01 E_DECAY_RATE = 0.001 MEMORY_SIZE = 10000 BATCH_SIZE = 32
行動評価関数となるニューラルネットワークモデルを作成します。 全結合層を4つ重ねたモデルで、入力は状態数、出力数は行動数になります。
1 2 3 4 5 6 7 8 9 10 11 12 13 class QNetwork : def __init__ (self, state_size, action_size ): self.model = Sequential() self.model.add(Dense(16 , activation='relu' , input_dim=state_size)) self.model.add(Dense(16 , activation='relu' )) self.model.add(Dense(16 , activation='relu' )) self.model.add(Dense(action_size, activation='linear' )) self.model.compile (loss=huber_loss, optimizer=Adam(lr=0.001 ))
経験メモリを定義します。メモリの内容は「状態」「行動」「報酬」「次の状態」の4種類となります。 dequeはmaxlen以上の要素を追加すると自動的に先頭の要素が削除される関数です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Memory (): def __init__ (self, memory_size ): self.buffer = deque(maxlen=memory_size) def add (self, experience ): self.buffer.append(experience) def sample (self, batch_size ): idx = np.random.choice(np.arange(len (self.buffer)), size=batch_size, replace=False ) return [self.buffer[i] for i in idx] def __len__ (self ): return len (self.buffer)
環境を作成します。OpenAI Gymのgym.make()を使用します。 さらに、定義したQNetworkクラスとMemoryクラスを利用して、2つのニューラルネットワークと経験メモリを作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 env = gym.make('CartPole-v0' ) state_size = env.observation_space.shape[0 ] action_size = env.action_space.n main_qn = QNetwork(state_size, action_size) target_qn = QNetwork(state_size, action_size) memory = Memory(MEMORY_SIZE)
学習を開始します。
環境をリセットする。
定義したエピソード数だけエピソードを繰り返す。
target-networkを更新する。
1エピソード分、ゲーム終了まで処理を実行する。
εを減らす。
ランダムまたは行動価値関数に従って、行動を取得する。
行動に応じて状態を報酬を得る。
エピソード完了時に190ステップ以上で報酬+1、成功回数に1加算する。
エピソード完了でないとき報酬に0を指定し、経験メモリに経験を追加する。
行動価値を更新する。 1. ニューラルネットワークの入力と出力を準備する。 2. バッチサイズ分の経験をランダムに取得する。 3. ニューラルネットワークの入力と出力を生成する。 4. 行動価値を更新する。
エピソード完了時のログを表示する。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 state = env.reset() state = np.reshape(state, [1 , state_size]) total_step = 0 success_count = 0 for episode in range (1 , NUM_EPISODES+1 ): step = 0 target_qn.model.set_weights(main_qn.model.get_weights()) for _ in range (1 , MAX_STEPS+1 ): step += 1 total_step += 1 epsilon = E_STOP + (E_START - E_STOP)*np.exp(-E_DECAY_RATE*total_step) if epsilon > np.random.rand(): action = env.action_space.sample() else : action = np.argmax(main_qn.model.predict(state)[0 ]) next_state, _, done, _ = env.step(action) next_state = np.reshape(next_state, [1 , state_size]) if done: if step >= 190 : success_count += 1 reward = 1 else : success_count = 0 reward = 0 next_state = np.zeros(state.shape) if step > WARMUP: memory.add((state, action, reward, next_state)) else : reward = 0 if step > WARMUP: memory.add((state, action, reward, next_state)) state = next_state if len (memory) >= BATCH_SIZE: inputs = np.zeros((BATCH_SIZE, 4 )) targets = np.zeros((BATCH_SIZE, 2 )) minibatch = memory.sample(BATCH_SIZE) for i, (state_b, action_b, reward_b, next_state_b) in enumerate (minibatch): inputs[i] = state_b if not (next_state_b == np.zeros(state_b.shape)).all (axis=1 ): target = reward_b + GAMMA * np.amax(target_qn.model.predict(next_state_b)[0 ]) else : target = reward_b targets[i] = main_qn.model.predict(state_b) targets[i][action_b] = target main_qn.model.fit(inputs, targets, epochs=1 , verbose=0 ) if done: break print ('エピソード: {}, ステップ数: {}, epsilon: {:.4f}' .format (episode, step, epsilon)) if success_count >= 5 : break state = env.reset() state = np.reshape(state, [1 , state_size])
最大ステップ数の200に少しずつ近づいていきます。
アニメーションフレームを作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 frames = [] state = env.reset() state = np.reshape(state, [1 , state_size]) step = 0 for step in range (1 , MAX_STEPS+1 ): step += 1 frames.append(env.render(mode='rgb_array' )) action = np.argmax(main_qn.model.predict(state)[0 ]) next_state, reward, done, _ = env.step(action) next_state = np.reshape(next_state, [1 , state_size]) if done: next_state = np.zeros(state.shape) break else : state = next_state print ('ステップ数: {}' .format (step))
アニメーションフレームをアニメーションに変換して最終動作を確認します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 !pip install JSAnimation import matplotlib.pyplot as pltfrom matplotlib import animationfrom JSAnimation.IPython_display import display_animationfrom IPython.display import displaydef display_frames_as_gif (frames ): plt.figure(figsize=(frames[0 ].shape[1 ]/72.0 , frames[0 ].shape[0 ]/72.0 ), dpi=72 ) patch = plt.imshow(frames[0 ]) plt.axis('off' ) def animate (i ): patch.set_data(frames[i]) anim = animation.FuncAnimation(plt.gcf(), animate, frames=len (frames), interval=50 ) display(display_animation(anim, default_mode='loop' )) display_frames_as_gif(frames)
最終的にうまくポールのバランスととることができています。
VIDEO
(Google Colaboratory で動作確認しています。)
参考
AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ