PyTorchで手書き数字画像の分類課題MNIST

PyTorchはディープラーニング用パッケージです。
PyTorchを使用して手書き数字の画像データ(MNIST)を分類するディープラーニングを実装します。

まずは手書き数字の画像データMNISTをダウンロードします。
変数mnistにデータが格納されます。

1
2
3
# 手書き数字の画像データMNISTをダウンロード
from sklearn.datasets import fetch_openml
mnist = fetch_openml('mnist_784', version=1, data_home=".") # data_homeは保存先を指定します

PyTorchによるディープラーニングは下記の手順で行います。

  1. データの前処理
  2. DataLoaderの作成
  3. ネットワークの構築
  4. 誤差関数と最適化手法の設定
  5. 学習と推論の設定
  6. 学習と推論の実行

データの前処理では、データをニューラルネットワークに投入できるように加工します。

1
2
3
4
5
6
7
# 1. データの前処理(画像データとラベルに分割し、正規化)
X = mnist.data / 255 # 0-255を0-1に正規化
y = mnist.target

import numpy as np
y = np.array(y)
y = y.astype(np.int32)

MNISTのデータの1つ目を可視化してみます。

1
2
3
4
5
6
# MNISTのデータの1つ目を可視化する
import matplotlib.pyplot as plt
%matplotlib inline

plt.imshow(X[0].reshape(28, 28), cmap='gray')
print("この画像データのラベルは{:.0f}です".format(y[0]))

実行結果1

正規化したMNISTデータをPyTorchで扱えるようにDataLoaderという変数に変換します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 2. DataLoderの作成
import torch
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

# 2.1 データを訓練とテストに分割(6:1)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=1/7, random_state=0)

# 2.2 データをPyTorchのTensorに変換
X_train = torch.Tensor(X_train)
X_test = torch.Tensor(X_test)
y_train = torch.LongTensor(y_train)
y_test = torch.LongTensor(y_test)

# 2.3 データとラベルをセットにしたDatasetを作成
ds_train = TensorDataset(X_train, y_train)
ds_test = TensorDataset(X_test, y_test)

# 2.4 データセットのミニバッチサイズを指定した、Dataloaderを作成
# Chainerのiterators.SerialIteratorと似ている
loader_train = DataLoader(ds_train, batch_size=64, shuffle=True)
loader_test = DataLoader(ds_test, batch_size=64, shuffle=False)

ニューラルネットワークを構築します。
‘fc’は全結合(Fully Connecteed)層を意味し、’relu’は活性化関数にReLU関数を使用することを意味します。

1
2
3
4
5
6
7
8
9
10
11
# 3. ネットワークの構築
from torch import nn

model = nn.Sequential()
model.add_module('fc1', nn.Linear(28*28*1, 100))
model.add_module('relu1', nn.ReLU())
model.add_module('fc2', nn.Linear(100, 100))
model.add_module('relu2', nn.ReLU())
model.add_module('fc3', nn.Linear(100, 10))

print(model)

実行結果2

ネットワークの誤差関数と最適化手法の設定を行います。

分類問題では誤差関数にクロスエントロピー誤差関数を使用します。
最適化手法にはAdamというアルゴリズムを使います。

1
2
3
4
5
6
7
8
# 4. 誤差関数と最適化手法の設定
from torch import optim

# 誤差関数の設定
loss_fn = nn.CrossEntropyLoss() # 変数名にはcriterionが使われることも多い

# 重みを学習する際の最適化手法の選択
optimizer = optim.Adam(model.parameters(), lr=0.01)

学習と推論での動作を設定します。

学習では訓練データを入力して出力を求め、出力と正解との誤差を計算し、誤差をバックプロパゲーションして結合パラメータを更新・学習させます。
引数のepochとはデータを一通り使用する1試行のことを意味します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 5. 学習と推論の設定
# 5-1. 学習1回でやることを定義します
def train(epoch):
model.train() # ネットワークを学習モードに切り替える

# データローダーから1ミニバッチずつ取り出して計算する
for data, targets in loader_train:

optimizer.zero_grad() # 一度計算された勾配結果を0にリセット
outputs = model(data) # 入力dataをinputし、出力を求める
loss = loss_fn(outputs, targets) # 出力と訓練データの正解との誤差を求める
loss.backward() # 誤差のバックプロパゲーションを求める
optimizer.step() # バックプロパゲーションの値で重みを更新する

print("epoch{}:終了\n".format(epoch))

推論ではテストデータを入力して出力を求め、正解と一致した割合を計算します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 5. 学習と推論の設定
# 5-2. 推論1回でやることを定義します
def test():
model.eval() # ネットワークを推論モードに切り替える
correct = 0

# データローダーから1ミニバッチずつ取り出して計算する
with torch.no_grad(): # 微分は推論では必要ない
for data, targets in loader_test:

outputs = model(data) # 入力dataをinputし、出力を求める

# 推論する
_, predicted = torch.max(outputs.data, 1) # 確率が最大のラベルを求める
correct += predicted.eq(targets.data.view_as(predicted)).sum() # 正解と一緒だったらカウントアップ

# 正解率を出力
data_num = len(loader_test.dataset) # データの総数
print('\nテストデータの正解率: {}/{} ({:.0f}%)\n'.format(correct, data_num, 100. * correct / data_num))

試しに学習をせずにテストデータで推論してみます。

1
2
# 学習なしにテストデータで推論
test()

実行結果3
正解率は8%となりました。

次にニューラルネットワークの結合パラメータを学習させてから推論を行います。
6万件の訓練データに対して3epoch学習させます。

1
2
3
4
5
# 6. 学習と推論の実行
for epoch in range(3):
train(epoch)

test()

実行結果4
学習後には正解率が95%となり、手書き数字をほぼ正しく認識できるようになりました

試しに2020番目の画像データ推論し、予測結果と画像データ、正解を表示してみます。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 2020番目の画像データを推論してみる
index = 2020

model.eval() # ネットワークを推論モードに切り替える
data = X_test[index]
output = model(data) # 入力dataをinputし、出力を求める
_, predicted = torch.max(output.data, 0) # 確率が最大のラベルを求める

print("予測結果は{}".format(predicted))

X_test_show = (X_test[index]).numpy()
plt.imshow(X_test_show.reshape(28, 28), cmap='gray')
print("この画像データの正解ラベルは{:.0f}です".format(y_test[index]))

実行結果5

なかなか癖のある数字ですが、正しく判定できていることが分かります。

(Google Colaboratoryで動作確認しています。)


参考 > つくりながら学ぶ!深層強化学習 サポートページ

CartPoleをQ学習で制御

CartPoleをQ学習で制御していきます。

実装するクラスは下記の3つです。

  • Agentクラス
    カートを表します。
    Q関数を更新する関数と次の行動を決定する関数があります。
    Brainクラスのオブジェクトをメンバーに持ちます。
  • Brainクラス
    Agentの頭脳となるクラスです。
    Q学習を実装します。
    状態を離散化する関数とQテーブルを更新する関数とQテーブルから行動を決定する関数があります。
  • Environmentクラス
    OpenAI Gymの実行環境です。

まず必要なパッケージをインポートします。

1
2
3
4
5
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画を描画するための関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames),
interval=50)

anim.save('3_4movie_cartpole.mp4') # 動画ファイルを作成します。

各定数を定義します。
CartPole-v0は200ステップ 棒が立ち続ければゲーム攻略となります。(MAX_STEPS=200)

1
2
3
4
5
6
7
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
NUM_DIZITIZED = 6 # 各状態の離散値への分割数
GAMMA = 0.99 # 時間割引率
ETA = 0.5 # 学習係数
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 1000 # 最大試行回数

Agentクラスを実装します。

初期関数initでCartPoleの状態数と行動数を受け取り、自分の頭脳となるBrainクラスを生成します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Agent:
'''CartPoleのエージェントクラスです、棒付き台車そのものになります'''
def __init__(self, num_states, num_actions):
self.brain = Brain(num_states, num_actions) # エージェントが行動を決定するための頭脳を生成

def update_Q_function(self, observation, action, reward, observation_next):
'''Q関数の更新'''
self.brain.update_Q_table(
observation, action, reward, observation_next)

def get_action(self, observation, step):
'''行動の決定'''
action = self.brain.decide_action(observation, step)
return action

Brainクラスを実装します。
Qテーブルの更新と行動の決定を行います。

試行回数が少ないときは探索行動が多くなるようなε-greedy法としています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Brain:
'''エージェントが持つ脳となるクラスです、Q学習を実行します'''
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # CartPoleの行動(右に左に押す)の2を取得

# Qテーブルを作成。行数は状態を分割数^(4変数)にデジタル変換した値、列数は行動数を示す
self.q_table = np.random.uniform(low=0, high=1, size=(
NUM_DIZITIZED**num_states, num_actions))

def bins(self, clip_min, clip_max, num):
'''観測した状態(連続値)を離散値にデジタル変換する閾値を求める'''
return np.linspace(clip_min, clip_max, num + 1)[1:-1]

def digitize_state(self, observation):
'''観測したobservation状態を、離散値に変換する'''
cart_pos, cart_v, pole_angle, pole_v = observation
digitized = [
np.digitize(cart_pos, bins=self.bins(-2.4, 2.4, NUM_DIZITIZED)),
np.digitize(cart_v, bins=self.bins(-3.0, 3.0, NUM_DIZITIZED)),
np.digitize(pole_angle, bins=self.bins(-0.5, 0.5, NUM_DIZITIZED)),
np.digitize(pole_v, bins=self.bins(-2.0, 2.0, NUM_DIZITIZED))
]
return sum([x * (NUM_DIZITIZED**i) for i, x in enumerate(digitized)])

def update_Q_table(self, observation, action, reward, observation_next):
'''QテーブルをQ学習により更新'''
state = self.digitize_state(observation) # 状態を離散化
state_next = self.digitize_state(observation_next) # 次の状態を離散化
Max_Q_next = max(self.q_table[state_next][:])
self.q_table[state, action] = self.q_table[state, action] + \
ETA * (reward + GAMMA * Max_Q_next - self.q_table[state, action])

def decide_action(self, observation, episode):
'''ε-greedy法で徐々に最適行動のみを採用する'''
state = self.digitize_state(observation)
epsilon = 0.5 * (1 / (episode + 1))

if epsilon <= np.random.uniform(0, 1):
action = np.argmax(self.q_table[state][:])
else:
action = np.random.choice(self.num_actions) # 0,1の行動をランダムに返す
return action

Environmentクラスを定義します。
10回連続で195ステップ以上棒が立ち続ければ学習成功としています。

成功後に動画保存のため1回再実行しています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class Environment:
'''CartPoleを実行する環境のクラスです'''
def __init__(self):
self.env = gym.make(ENV) # 実行する課題を設定
num_states = self.env.observation_space.shape[0] # 課題の状態の数4を取得
num_actions = self.env.action_space.n # CartPoleの行動(右に左に押す)の2を取得
self.agent = Agent(num_states, num_actions) # 環境内で行動するAgentを生成

def run(self):
'''実行'''
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
is_episode_final = False # 最終試行フラグ
frames = [] # 動画用に画像を格納する変数

for episode in range(NUM_EPISODES): # 試行数分繰り返す
observation = self.env.reset() # 環境の初期化

for step in range(MAX_STEPS): # 1エピソードのループ

if is_episode_final is True: # 最終試行ではframesに各時刻の画像を追加していく
frames.append(self.env.render(mode='rgb_array'))

# 行動を求める
action = self.agent.get_action(observation, episode)

# 行動a_tの実行により、s_{t+1}, r_{t+1}を求める
observation_next, _, done, _ = self.env.step(
action) # rewardとinfoは使わないので_にする

# 報酬を与える
if done: # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
if step < 195:
reward = -1 # 途中でこけたら罰則として報酬-1を与える
complete_episodes = 0 # 195step以上連続で立ち続けた試行数をリセット
else:
reward = 1 # 立ったまま終了時は報酬1を与える
complete_episodes += 1 # 連続記録を更新
else:
reward = 0 # 途中の報酬は0

# step+1の状態observation_nextを用いて,Q関数を更新する
self.agent.update_Q_function(
observation, action, reward, observation_next)

# 観測の更新
observation = observation_next

# 終了時の処理
if done:
print('{0} Episode: Finished after {1} time steps'.format(
episode, step + 1))
break

if is_episode_final is True: # 最終試行では動画を保存と描画
display_frames_as_gif(frames)
break

if complete_episodes >= 10: # 10連続成功なら
print('10回連続成功')
is_episode_final = True # 次の試行を描画を行う最終試行とする

Environmentクラスのオブジェクトを生成し、関数runを実行します。

1
2
3
# main
cartpole_env = Environment()
cartpole_env.run()

実行結果 エピソード0~18

実行結果 エピソード261~276(終了)
275エピソードで学習が完了しています。(195ステップ以上を10エピソード連続で成功)

強化学習後のプレイ動画は’3_4movie_cartpole.mp4’ファイルで保存されています。

棒が倒れずにうまくバランスがとれていることが分かります。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

多変数・連続値で示される情報の表形式表現「CartPole」

CartPoleの状態を1つの変数に落とし込みます。

まずCartPoleの状態は次の4つの情報で表現されます。

変数 説明
カート位置 -2.4~2.4
カート速度 -Inf~Inf
棒の角度 -41.8°~41.8°
棒の角速度 -Inf~Inf

これら4つの情報は連続値となります。
連続値のままですとQ関数を用いて表形式で表現できません。

表形式で表現するために、連続値を離散化してデジタル化します。

離散化とは、連続した値を不連続な値に分割することです
例えばカート位置を6つの値(0~5)で離散化する場合には、-2.4~-1.6の場合は0、-1.6~-0.8の場合は1・・・というように変換します。
また-2.4を超えてマイナスになる可能性もあるので-Inf~-1.6を0とし、1.6~-Infを5とします。
変換後の0~5の値は離散変数と呼ばれます。

ほかの3つの情報も6つの値で離散化しますと変数が4種類ありますので6の4乗、つまり1296種類のデジタル値で表現できることになります。

CartPoleでとれる行動はカートを右に押すか、カートを左に押すの2通りですのでCartPoleのQ関数は1296行×2列の表形式で表現できます。
Q関数は各状態で各行動をとったときにその後得られる予定の割引報酬和を表します。

では状態の離散化を実装していきます。
ますは使用するパッケージをインポートします。

1
2
3
4
5
# 使用するパッケージの宣言
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

定数を定義します。

1
2
3
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
NUM_DIZITIZED = 6 # 各状態の離散値への分割数

CartPoleを実行し、環境の初期状態を取得します。

1
2
3
# CartPoleを実行してみる
env = gym.make(ENV) # 実行する課題を設定
observation = env.reset() # 環境の初期化

離散化の閾値を計算する関数を定義します。

1
2
3
4
# 離散化の閾値を求める
def bins(clip_min, clip_max, num):
'''観測した状態(連続値)を離散値にデジタル変換する閾値を求める'''
return np.linspace(clip_min, clip_max, num + 1)[1:-1]

np.linspace()関数は等間隔の数列を生成する命令です。
動作確認のための下記のコードを実行します。

1
np.linspace(-2.4, 2.4, 6 + 1)

実行結果1

閾値としましては上記で得られる配列の最初と最後の要素は不要なのでスライスします。

1
np.linspace(-2.4, 2.4, 6 + 1)[1:-1]

実行結果2

bins()関数で求めた閾値に応じて、連続変数を離散化する関数を作成します。4変数を一度に変換します。
np.digitizeは状態変数のリストをbinsに応じてデジタル値に変換します。

最後に4つの離散変数を6進数で計算しreturnしています。

1
2
3
4
5
6
7
8
9
10
def digitize_state(observation):
'''観測したobservation状態を、離散値に変換する'''
cart_pos, cart_v, pole_angle, pole_v = observation
digitized = [
np.digitize(cart_pos, bins=bins(-2.4, 2.4, NUM_DIZITIZED)),
np.digitize(cart_v, bins=bins(-3.0, 3.0, NUM_DIZITIZED)),
np.digitize(pole_angle, bins=bins(-0.5, 0.5, NUM_DIZITIZED)),
np.digitize(pole_v, bins=bins(-2.0, 2.0, NUM_DIZITIZED))]
print('digitized', digitized)
return sum([x * (NUM_DIZITIZED**i) for i, x in enumerate(digitized)])

動作確認のためdigitize_state()関数をコールして結果を確認します。

1
print('digitize_state', digitize_state(observation))

実行結果3

CartPoleの状態を離散化し表形式表現できましたので、次回はQ学習を実装しCartPoleを制御していきます。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

倒立振子課題「CartPole」

今回は強化学習アルゴリズムを実装したり、性能を比較するための実行環境であるOpenAI Gymを使います。

OpenAI Gymでは課題となる実行環境がいくつか用意されていますが、定番の倒立振子課題 CartPole を試します。
倒立振子課題は台車の上に回転軸を固定した棒を立て、その棒が倒れないように台車を右・左と細かく動かす制御課題です。

ますは必要なライブラリをインストールします。(Pythonバージョンは3.6です)

1
2
3
4
5
6
pip install gym
pip install matplotlib
pip install JSAnimation
pip uninstall pyglet -y
pip install pyglet==1.2.4
conda install -c conda-forge ffmpeg

Jupyter Notebookを開きコーディングを始めます。

使用するパッケージは次の通りです。

1
2
3
4
5
# 使用するパッケージの宣言
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

動画を描画するための関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 動画の描画関数の宣言
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
anim.save('3_2movie_cartpole.mp4') # 動画を保存

CartPoleを実行するところを実装します。
今回はきちんとした制御をせず、ただランダムに左右動かします。

gym.make()はOpenAIの実行環境を立ち上げる関数です。
環境を実行するとき一番最初にenv.reset()をコールしておきます。
env.step()はゲーム環境を1step進める関数で、引数に0を渡すと左に、1を渡すと右にカートを移動します。

返値は次の4つを返します。

変数 説明
observation カートと棒の状態
reward 即時報酬
カートの位置が±2.4以内 かつ 棒が20.9度以上傾いてない場合 -> 1
カートの位置が±2.4を超える または 棒が20.9度以上傾いている場合 -> 0
done 終了状態になるとTrue
info デバッグなどの必要情報
1
2
3
4
5
6
7
8
9
# CartPoleをランダムに動かす
frames = []
env = gym.make('CartPole-v0')
observation = env.reset() # 最初に環境のresetが必要

for step in range(0, 200):
frames.append(env.render(mode='rgb_array')) # framesに各時刻の画像を追加していく
action = np.random.choice(2) # 0(カートを左に押す), 1(カートを右に押す)をランダムに返す
observation, reward, done, info = env.step(action) # actionを実行する

変数framesに格納された画像を「3_2movie_cartpole.mp4」に保存します。

1
2
# 動画を保存
display_frames_as_gif(frames)

保存した動画は下記のようになります。

ランダムなので棒はすぐに倒れてしまいます。次回は棒が倒れないように強化学習していきます。


参考 > つくりながら学ぶ!深層強化学習 サポートページ

迷路をQ学習で解く

価値反復法の1つQ学習で迷路を攻略します。
Q学習では1エピソードごとではなく1ステップごとに更新を行います。

まず使用するパッケージをインポートします。

1
2
3
4
# 使用するパッケージの宣言
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

次に迷路の初期状態を描画します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 初期位置での迷路の様子

# 図を描く大きさと、図の変数名を宣言
fig = plt.figure(figsize=(5, 5))
ax = plt.gca()

# 赤い壁を描く
plt.plot([1, 1], [0, 1], color='red', linewidth=2)
plt.plot([1, 2], [2, 2], color='red', linewidth=2)
plt.plot([2, 2], [2, 1], color='red', linewidth=2)
plt.plot([2, 3], [1, 1], color='red', linewidth=2)

# 状態を示す文字S0~S8を描く
plt.text(0.5, 2.5, 'S0', size=14, ha='center')
plt.text(1.5, 2.5, 'S1', size=14, ha='center')
plt.text(2.5, 2.5, 'S2', size=14, ha='center')
plt.text(0.5, 1.5, 'S3', size=14, ha='center')
plt.text(1.5, 1.5, 'S4', size=14, ha='center')
plt.text(2.5, 1.5, 'S5', size=14, ha='center')
plt.text(0.5, 0.5, 'S6', size=14, ha='center')
plt.text(1.5, 0.5, 'S7', size=14, ha='center')
plt.text(2.5, 0.5, 'S8', size=14, ha='center')
plt.text(0.5, 2.3, 'START', ha='center')
plt.text(2.5, 0.3, 'GOAL', ha='center')

# 描画範囲の設定と目盛りを消す設定
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
plt.tick_params(axis='both', which='both', bottom='off', top='off',
labelbottom='off', right='off', left='off', labelleft='off')

# 現在地S0に緑丸を描画する
line, = ax.plot([0.5], [2.5], marker="o", color='g', markersize=60)

実行結果1

初期の方策を決定するパラメータtheta_0を設定します。

行は状態0~7を表し、列は上、右、下、左へ行動できるかどうかを表します。
状態8はゴールなので方策の定義は不要です。

1
2
3
4
5
6
7
8
9
10
11
# 初期の方策を決定するパラメータtheta_0を設定
# 行は状態0~7、列は移動方向で↑、→、↓、←を表す
theta_0 = np.array([[np.nan, 1, 1, np.nan], # s0
[np.nan, 1, np.nan, 1], # s1
[np.nan, np.nan, 1, 1], # s2
[1, 1, 1, np.nan], # s3
[np.nan, np.nan, 1, 1], # s4
[1, np.nan, np.nan, np.nan], # s5
[1, np.nan, np.nan, np.nan], # s6
[1, 1, np.nan, np.nan], # s7、※s8はゴールなので、方策はなし
])

方策パラメータθ0をランダム方策piに変換する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 方策パラメータtheta_0をランダム方策piに変換する関数の定義
def simple_convert_into_pi_from_theta(theta):
'''単純に割合を計算する'''

[m, n] = theta.shape # thetaの行列サイズを取得
pi = np.zeros((m, n))
for i in range(0, m):
pi[i, :] = theta[i, :] / np.nansum(theta[i, :]) # 割合の計算

pi = np.nan_to_num(pi) # nanを0に変換

return pi

# ランダム行動方策pi_0を求める
pi_0 = simple_convert_into_pi_from_theta(theta_0)

初期の行動価値関数Qを設定します。

行動価値関数は、行が状態sを表し、列が行動aを表します。
最初は正しい行動価値がわからないのでランダムな値を設定します。

1
2
3
4
5
# 初期の行動価値関数Qを設定

[a, b] = theta_0.shape # 行と列の数をa, bに格納
Q = np.random.rand(a, b) * theta_0 * 0.1
# *theta0をすることで要素ごとに掛け算をし、Qの壁方向の値がnanになる

ε-greedy法を実装します。
一定確率εでランダムに行動し、残りの1-εの確率で行動価値Qが最大になる行動をとります。

get_action()が行動を求める関数で、get_s_next()が行動を引数に次の状態を求める関数になります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# ε-greedy法を実装
def get_action(s, Q, epsilon, pi_0):
direction = ["up", "right", "down", "left"]

# 行動を決める
if np.random.rand() < epsilon:
# εの確率でランダムに動く
next_direction = np.random.choice(direction, p=pi_0[s, :])
else:
# Qの最大値の行動を採用する
next_direction = direction[np.nanargmax(Q[s, :])]

# 行動をindexに
if next_direction == "up":
action = 0
elif next_direction == "right":
action = 1
elif next_direction == "down":
action = 2
elif next_direction == "left":
action = 3

return action

def get_s_next(s, a, Q, epsilon, pi_0):
direction = ["up", "right", "down", "left"]
next_direction = direction[a] # 行動aの方向

# 行動から次の状態を決める
if next_direction == "up":
s_next = s - 3 # 上に移動するときは状態の数字が3小さくなる
elif next_direction == "right":
s_next = s + 1 # 右に移動するときは状態の数字が1大きくなる
elif next_direction == "down":
s_next = s + 3 # 下に移動するときは状態の数字が3大きくなる
elif next_direction == "left":
s_next = s - 1 # 左に移動するときは状態の数字が1小さくなる

return s_next

行動価値関数Q(s,a)が正しい値になるように学習して更新する処理を実装します。
gammaは時間割引率で未来の報酬を割り引いています。

SarsaとQ学習の違いはこのコードだけになります。
Sarsaは更新時に次の行動を求めて更新に使用していましたが、Q学習では状態の行動価値関数の値のうち、最も大きいものを更新に使用します。

1
2
3
4
5
6
7
8
9
10
# Q学習による行動価値関数Qの更新
def Q_learning(s, a, r, s_next, Q, eta, gamma):

if s_next == 8: # ゴールした場合
Q[s, a] = Q[s, a] + eta * (r - Q[s, a])

else:
Q[s, a] = Q[s, a] + eta * (r + gamma * np.nanmax(Q[s_next,: ]) - Q[s, a])

return Q

Q学習に従って迷路を解く処理を実装します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Q学習で迷路を解く関数の定義、状態と行動の履歴および更新したQを出力
def goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi):
s = 0 # スタート地点
a = a_next = get_action(s, Q, epsilon, pi) # 初期の行動
s_a_history = [[0, np.nan]] # エージェントの移動を記録するリスト

while (1): # ゴールするまでループ
a = a_next # 行動更新

s_a_history[-1][1] = a
# 現在の状態(つまり一番最後なのでindex=-1)に行動を代入

s_next = get_s_next(s, a, Q, epsilon, pi)
# 次の状態を格納

s_a_history.append([s_next, np.nan])
# 次の状態を代入。行動はまだ分からないのでnanにしておく

# 報酬を与え, 次の行動を求めます
if s_next == 8:
r = 1 # ゴールにたどり着いたなら報酬を与える
a_next = np.nan
else:
r = 0
a_next = get_action(s_next, Q, epsilon, pi)
# 次の行動a_nextを求めます。

# 価値関数を更新
Q = Q_learning(s, a, r, s_next, Q, eta, gamma)

# 終了判定
if s_next == 8: # ゴール地点なら終了
break
else:
s = s_next

return [s_a_history, Q]

価値関数の更新を繰り返す処理を実装します。
今回の学習終了条件は、100エピソードを行うこととしました。

Q学習で迷路を解く部分は、各エピソードでの状態価値関数の値を変数Vに格納しています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Q学習で迷路を解く
eta = 0.1 # 学習率
gamma = 0.9 # 時間割引率
epsilon = 0.5 # ε-greedy法の初期値
v = np.nanmax(Q, axis=1) # 状態ごとに価値の最大値を求める
is_continue = True
episode = 1

V = [] # エピソードごとの状態価値を格納する
V.append(np.nanmax(Q, axis=1)) # 状態ごとに行動価値の最大値を求める

while is_continue: # is_continueがFalseになるまで繰り返す
print("エピソード:" + str(episode))
# ε-greedyの値を少しずつ小さくする
epsilon = epsilon / 2

# Q学習で迷路を解き、移動した履歴と更新したQを求める
[s_a_history, Q] = goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi_0)

# 状態価値の変化
new_v = np.nanmax(Q, axis=1) # 状態ごとに行動価値の最大値を求める
print(np.sum(np.abs(new_v - v))) # 状態価値関数の変化を出力
v = new_v
V.append(v) # このエピソード終了時の状態価値関数を追加

print("迷路を解くのにかかったステップ数は" + str(len(s_a_history) - 1) + "です")

# 100エピソード繰り返す
episode = episode + 1
if episode > 100:
break

実行結果 エピソード:1~8

エピソード6以降はずっと最小の4ステップで安定しました。

実行結果 エピソード:92~100

エピソードごとに状態価値関数がどのように変化したのかを可視化します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 状態価値の変化を可視化します
from matplotlib import animation
from IPython.display import HTML
import matplotlib.cm as cm # color map

def init():
# 背景画像の初期化
line.set_data([], [])
return (line,)

def animate(i):
# フレームごとの描画内容
# 各マスに状態価値の大きさに基づく色付きの四角を描画
line, = ax.plot([0.5], [2.5], marker="s",
color=cm.jet(V[i][0]), markersize=85) # S0
line, = ax.plot([1.5], [2.5], marker="s",
color=cm.jet(V[i][1]), markersize=85) # S1
line, = ax.plot([2.5], [2.5], marker="s",
color=cm.jet(V[i][2]), markersize=85) # S2
line, = ax.plot([0.5], [1.5], marker="s",
color=cm.jet(V[i][3]), markersize=85) # S3
line, = ax.plot([1.5], [1.5], marker="s",
color=cm.jet(V[i][4]), markersize=85) # S4
line, = ax.plot([2.5], [1.5], marker="s",
color=cm.jet(V[i][5]), markersize=85) # S5
line, = ax.plot([0.5], [0.5], marker="s",
color=cm.jet(V[i][6]), markersize=85) # S6
line, = ax.plot([1.5], [0.5], marker="s",
color=cm.jet(V[i][7]), markersize=85) # S7
line, = ax.plot([2.5], [0.5], marker="s",
color=cm.jet(1.0), markersize=85) # S8
return (line,)

# 初期化関数とフレームごとの描画関数を用いて動画を作成
anim = animation.FuncAnimation(
fig, animate, init_func=init, frames=len(V), interval=200, repeat=False)

HTML(anim.to_jshtml())

最初は青だったマス目が次第に赤く変化していく様子が分かります。
完全に赤色にならないのは時間割引率で状態価値が割り引かれているためです。

ポイントは報酬が得られるゴールから逆向きに状態価値が学習されていくことと、学習後にはスタートからゴールへの道筋ができているということです。

(Google Colaboratoryで動作確認しています。)

参考

つくりながら学ぶ!深層強化学習 サポートページ

迷路をSarsaで解く

価値反復法の1つSarsaで迷路を攻略します。
Sarsaでは1エピソードごとではなく1ステップごとに更新を行います。

まず使用するパッケージをインポートします。

1
2
3
4
# 使用するパッケージの宣言
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

次に迷路の初期状態を描画します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 初期位置での迷路の様子

# 図を描く大きさと、図の変数名を宣言
fig = plt.figure(figsize=(5, 5))
ax = plt.gca()

# 赤い壁を描く
plt.plot([1, 1], [0, 1], color='red', linewidth=2)
plt.plot([1, 2], [2, 2], color='red', linewidth=2)
plt.plot([2, 2], [2, 1], color='red', linewidth=2)
plt.plot([2, 3], [1, 1], color='red', linewidth=2)

# 状態を示す文字S0~S8を描く
plt.text(0.5, 2.5, 'S0', size=14, ha='center')
plt.text(1.5, 2.5, 'S1', size=14, ha='center')
plt.text(2.5, 2.5, 'S2', size=14, ha='center')
plt.text(0.5, 1.5, 'S3', size=14, ha='center')
plt.text(1.5, 1.5, 'S4', size=14, ha='center')
plt.text(2.5, 1.5, 'S5', size=14, ha='center')
plt.text(0.5, 0.5, 'S6', size=14, ha='center')
plt.text(1.5, 0.5, 'S7', size=14, ha='center')
plt.text(2.5, 0.5, 'S8', size=14, ha='center')
plt.text(0.5, 2.3, 'START', ha='center')
plt.text(2.5, 0.3, 'GOAL', ha='center')

# 描画範囲の設定と目盛りを消す設定
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
plt.tick_params(axis='both', which='both', bottom='off', top='off',
labelbottom='off', right='off', left='off', labelleft='off')

# 現在地S0に緑丸を描画する
line, = ax.plot([0.5], [2.5], marker="o", color='g', markersize=60)

実行結果1

初期の方策を決定するパラメータtheta_0を設定します。

行は状態0~7を表し、列は上、右、下、左へ行動できるかどうかを表します。
状態8はゴールなので方策の定義は不要です。

1
2
3
4
5
6
7
8
9
10
11
# 初期の方策を決定するパラメータtheta_0を設定
# 行は状態0~7、列は移動方向で↑、→、↓、←を表す
theta_0 = np.array([[np.nan, 1, 1, np.nan], # s0
[np.nan, 1, np.nan, 1], # s1
[np.nan, np.nan, 1, 1], # s2
[1, 1, 1, np.nan], # s3
[np.nan, np.nan, 1, 1], # s4
[1, np.nan, np.nan, np.nan], # s5
[1, np.nan, np.nan, np.nan], # s6
[1, 1, np.nan, np.nan], # s7、※s8はゴールなので、方策はなし
])

方策パラメータθ0をランダム方策piに変換する関数を定義します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 方策パラメータtheta_0をランダム方策piに変換する関数の定義
def simple_convert_into_pi_from_theta(theta):
'''単純に割合を計算する'''
[m, n] = theta.shape # thetaの行列サイズを取得
pi = np.zeros((m, n))
for i in range(0, m):
pi[i, :] = theta[i, :] / np.nansum(theta[i, :]) # 割合の計算

pi = np.nan_to_num(pi) # nanを0に変換

return pi

# ランダム行動方策pi_0を求める
pi_0 = simple_convert_into_pi_from_theta(theta_0)

初期の行動価値関数Qを設定します。

行動価値関数は、行が状態sを表し、列が行動aを表します。
最初は正しい行動価値がわからないのでランダムな値を設定します。

1
2
3
4
# 初期の行動価値関数Qを設定
[a, b] = theta_0.shape # 行と列の数をa, bに格納
Q = np.random.rand(a, b) * theta_0
# * theta0をすることで要素ごとに掛け算をし、Qの壁方向の値がnanになる

ε-greedy法を実装します。
一定確率εでランダムに行動し、残りの1-εの確率で行動価値Qが最大になる行動をとります。

get_action()が行動を求める関数で、get_s_next()が行動を引数に次の状態を求める関数になります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# ε-greedy法を実装
def get_action(s, Q, epsilon, pi_0):
direction = ["up", "right", "down", "left"]

# 行動を決める
if np.random.rand() < epsilon:
# εの確率でランダムに動く
next_direction = np.random.choice(direction, p=pi_0[s, :])
else:
# Qの最大値の行動を採用する
next_direction = direction[np.nanargmax(Q[s, :])]

# 行動をindexに
if next_direction == "up":
action = 0
elif next_direction == "right":
action = 1
elif next_direction == "down":
action = 2
elif next_direction == "left":
action = 3

return action

def get_s_next(s, a, Q, epsilon, pi_0):
direction = ["up", "right", "down", "left"]
next_direction = direction[a] # 行動aの方向

# 行動から次の状態を決める
if next_direction == "up":
s_next = s - 3 # 上に移動するときは状態の数字が3小さくなる
elif next_direction == "right":
s_next = s + 1 # 右に移動するときは状態の数字が1大きくなる
elif next_direction == "down":
s_next = s + 3 # 下に移動するときは状態の数字が3大きくなる
elif next_direction == "left":
s_next = s - 1 # 左に移動するときは状態の数字が1小さくなる

return s_next

行動価値関数Q(s,a)が正しい値になるように学習して更新する処理を実装します。
gammaは時間割引率で未来の報酬を割り引いています。

1
2
3
4
5
6
7
8
# Sarsaによる行動価値関数Qの更新
def Sarsa(s, a, r, s_next, a_next, Q, eta, gamma):
if s_next == 8: # ゴールした場合
Q[s, a] = Q[s, a] + eta * (r - Q[s, a])
else:
Q[s, a] = Q[s, a] + eta * (r + gamma * Q[s_next, a_next] - Q[s, a])

return Q

Sarsaに従って迷路を解く処理を実装します。
1エピソードごとではなく1ステップごとに更新を行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Sarsaで迷路を解く関数の定義、状態と行動の履歴および更新したQを出力
def goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi):
s = 0 # スタート地点
a = a_next = get_action(s, Q, epsilon, pi) # 初期の行動
s_a_history = [[0, np.nan]] # エージェントの移動を記録するリスト

while (1): # ゴールするまでループ
a = a_next # 行動更新

s_a_history[-1][1] = a
# 現在の状態(つまり一番最後なのでindex=-1)に行動を代入

s_next = get_s_next(s, a, Q, epsilon, pi)
# 次の状態を格納

s_a_history.append([s_next, np.nan])
# 次の状態を代入。行動はまだ分からないのでnanにしておく

# 報酬を与え, 次の行動を求めます
if s_next == 8:
r = 1 # ゴールにたどり着いたなら報酬を与える
a_next = np.nan
else:
r = 0
a_next = get_action(s_next, Q, epsilon, pi)
# 次の行動a_nextを求めます。

# 価値関数を更新
Q = Sarsa(s, a, r, s_next, a_next, Q, eta, gamma)

# 終了判定
if s_next == 8: # ゴール地点なら終了
break
else:
s = s_next

return [s_a_history, Q]

価値関数の更新を繰り返す処理を実装します。
今回の学習終了条件は、100エピソードを行うこととしました。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Sarsaで迷路を解く
eta = 0.1 # 学習率
gamma = 0.9 # 時間割引率
epsilon = 0.5 # ε-greedy法の初期値
v = np.nanmax(Q, axis=1) # 状態ごとに価値の最大値を求める
is_continue = True
episode = 1

while is_continue: # is_continueがFalseになるまで繰り返す
print("エピソード:" + str(episode))

# ε-greedyの値を少しずつ小さくする
epsilon = epsilon / 2

# Sarsaで迷路を解き、移動した履歴と更新したQを求める
[s_a_history, Q] = goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi_0)

# 状態価値の変化
new_v = np.nanmax(Q, axis=1) # 状態ごとに価値の最大値を求める
print(np.sum(np.abs(new_v - v))) # 状態価値の変化を出力
v = new_v

print("迷路を解くのにかかったステップ数は" + str(len(s_a_history) - 1) + "です")

# 100エピソード繰り返す
episode = episode + 1
if episode > 100:
break

実行結果 エピソード:1~8
エピソード2ですでに最小ステップ4となっています。エピソード3では6ステップと増えましたがその後は、ずっと4ステップのままです。

実行結果 エピソード:92~100

エピソード100を実行するまでもなくずっと最小の4ステップで安定しました。
今回の迷路と解く問題では方策反復法よりも価値反復法の方が早く学習できるという結果になりました。

(Google Colaboratoryで動作確認しています。)

参考

つくりながら学ぶ!深層強化学習 サポートページ

迷路を方策勾配法で解く

前回はランダムに行動する方策を実装しましたが、今回は方策勾配法に従ってエージェントを動かしてみます。

まず使用するパッケージをインポートします。

1
2
3
4
# 使用するパッケージの宣言
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

次に迷路の初期状態を描画します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 初期位置での迷路の様子
# 図を描く大きさと、図の変数名を宣言
fig = plt.figure(figsize=(5, 5))
ax = plt.gca()

# 赤い壁を描く
plt.plot([1, 1], [0, 1], color='red', linewidth=2)
plt.plot([1, 2], [2, 2], color='red', linewidth=2)
plt.plot([2, 2], [2, 1], color='red', linewidth=2)
plt.plot([2, 3], [1, 1], color='red', linewidth=2)

# 状態を示す文字S0~S8を描く
plt.text(0.5, 2.5, 'S0', size=14, ha='center')
plt.text(1.5, 2.5, 'S1', size=14, ha='center')
plt.text(2.5, 2.5, 'S2', size=14, ha='center')
plt.text(0.5, 1.5, 'S3', size=14, ha='center')
plt.text(1.5, 1.5, 'S4', size=14, ha='center')
plt.text(2.5, 1.5, 'S5', size=14, ha='center')
plt.text(0.5, 0.5, 'S6', size=14, ha='center')
plt.text(1.5, 0.5, 'S7', size=14, ha='center')
plt.text(2.5, 0.5, 'S8', size=14, ha='center')
plt.text(0.5, 2.3, 'START', ha='center')
plt.text(2.5, 0.3, 'GOAL', ha='center')

# 描画範囲の設定と目盛りを消す設定
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
plt.tick_params(axis='both', which='both', bottom='off', top='off',
labelbottom='off', right='off', left='off', labelleft='off')

# 現在地S0に緑丸を描画する
line, = ax.plot([0.5], [2.5], marker="o", color='g', markersize=60)

実行結果1

初期の方策を決定するパラメータtheta_0を設定します。

行は状態0~7を表し、列は上、右、下、左へ行動できるかどうかを表します。
状態8はゴールなので方策の定義は不要です。

1
2
3
4
5
6
7
8
9
10
11
# 初期の方策を決定するパラメータtheta_0を設定
# 行は状態0~7、列は移動方向で↑、→、↓、←を表す
theta_0 = np.array([[np.nan, 1, 1, np.nan], # s0
[np.nan, 1, np.nan, 1], # s1
[np.nan, np.nan, 1, 1], # s2
[1, 1, 1, np.nan], # s3
[np.nan, np.nan, 1, 1], # s4
[1, np.nan, np.nan, np.nan], # s5
[1, np.nan, np.nan, np.nan], # s6
[1, 1, np.nan, np.nan], # s7、※s8はゴールなので、方策はなし
])

方策パラメータ(theta)をsoftmax関数で行動方策(pi)に変換する関数を定義します。
betaは逆温度と呼ばれ、小さいほど行動がランダムになります。
さらに指数関数(np.exp)を使って割合を計算しています。

softmax関数を使用するとパラメータθが負の値になっても方策を算出できるというメリットがあります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 方策パラメータthetaを行動方策piにソフトマックス関数で変換する手法の定義
def softmax_convert_into_pi_from_theta(theta):
'''ソフトマックス関数で割合を計算する'''
beta = 1.0
[m, n] = theta.shape # thetaの行列サイズを取得
pi = np.zeros((m, n))

exp_theta = np.exp(beta * theta) # thetaをexp(theta)へと変換

for i in range(0, m):
# pi[i, :] = theta[i, :] / np.nansum(theta[i, :])
# simpleに割合の計算の場合
pi[i, :] = exp_theta[i, :] / np.nansum(exp_theta[i, :])
# softmaxで計算の場合

pi = np.nan_to_num(pi) # nanを0に変換

return pi

上記で定義したsoftmax_convert_into_pi_from_theta関数を使ってθ0から方策を算出します。

1
2
3
# 初期の方策pi_0を求める
pi_0 = softmax_convert_into_pi_from_theta(theta_0)
print(pi_0)

実行結果2
学習前のため前回(ランダム行動)の方策と同じ結果になりますが問題ありません。

続いてsoftmax関数による方策に従ってエージェントを行動させる関数を実装します。
1ステップ移動後のエージェントの状態とその時の行動を返します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 行動aと1step移動後の状態sを求める関数を定義
def get_action_and_next_s(pi, s):
direction = ["up", "right", "down", "left"]
# pi[s,:]の確率に従って、directionが選択される
next_direction = np.random.choice(direction, p=pi[s, :])

if next_direction == "up":
action = 0
s_next = s - 3 # 上に移動するときは状態の数字が3小さくなる
elif next_direction == "right":
action = 1
s_next = s + 1 # 右に移動するときは状態の数字が1大きくなる
elif next_direction == "down":
action = 2
s_next = s + 3 # 下に移動するときは状態の数字が3大きくなる
elif next_direction == "left":
action = 3
s_next = s - 1 # 左に移動するときは状態の数字が1小さくなる

return [action, s_next]

ゴールにたどり着くまでエージェントを移動させ続ける関数を定義します。
状態と行動の履歴をセットで返します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 迷路を解く関数の定義、状態と行動の履歴を出力
def goal_maze_ret_s_a(pi):
s = 0 # スタート地点
s_a_history = [[0, np.nan]] # エージェントの移動を記録するリスト

while (1): # ゴールするまでループ
[action, next_s] = get_action_and_next_s(pi, s)
s_a_history[-1][1] = action
# 現在の状態(つまり一番最後なのでindex=-1)の行動を代入

s_a_history.append([next_s, np.nan])
# 次の状態を代入。行動はまだ分からないのでnanにしておく

if next_s == 8: # ゴール地点なら終了
break
else:
s = next_s

return s_a_history

初期の方策で迷路を解いてみます。

1
2
3
4
# 初期の方策で迷路を解く
s_a_history = goal_maze_ret_s_a(pi_0)
print(s_a_history)
print("迷路を解くのにかかったステップ数は" + str(len(s_a_history) - 1) + "です")

実行結果3(途中略)
スタートからゴールまでの状態と行動のセットが表示されます。
最後にトータルで何ステップかかったかを表示しています。

方策勾配法に従い方策を更新する関数を定義します。

学習率etaは1回の学習で更新される大きさを表します。
学習率が小さすぎるとなかなか学習が進みませんし、大きすぎるときちんと学習することができません。

方策の更新のために下記の3つを入力しています。

  • 現在の方策 theta
  • 方策 pi
  • 現在の方策での実行結果 s_a_history
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# thetaの更新関数を定義します
def update_theta(theta, pi, s_a_history):
eta = 0.1 # 学習率
T = len(s_a_history) - 1 # ゴールまでの総ステップ数

[m, n] = theta.shape # thetaの行列サイズを取得
delta_theta = theta.copy() # Δthetaの元を作成、ポインタ参照なので、delta_theta = thetaはダメ

# delta_thetaを要素ごとに求めます
for i in range(0, m):
for j in range(0, n):
if not(np.isnan(theta[i, j])): # thetaがnanでない場合

SA_i = [SA for SA in s_a_history if SA[0] == i]
# 履歴から状態iのものを取り出すリスト内包表記です

SA_ij = [SA for SA in s_a_history if SA == [i, j]]
# 状態iで行動jをしたものを取り出す

N_i = len(SA_i) # 状態iで行動した総回数
N_ij = len(SA_ij) # 状態iで行動jをとった回数

# 初版では符号の正負に間違いがありました(修正日:180703)
#delta_theta[i, j] = (N_ij + pi[i, j] * N_i) / T
delta_theta[i, j] = (N_ij - pi[i, j] * N_i) / T

new_theta = theta + eta * delta_theta

return new_theta

方策を更新し、方策がどう変化するのかを確認します。

1
2
3
4
# 方策の更新
new_theta = update_theta(theta_0, pi_0, s_a_history)
pi = softmax_convert_into_pi_from_theta(new_theta)
print(pi)

実行結果4
最初の方策から少し変化していることが分かります。

迷路をノーミスでクリアできるまで迷路内の探索とパラメータθの更新を繰り返す処理を実装します。

学習終了条件は課題に応じて調整する必要がありますが、今回は方策変化の絶対値和が10**-4より小さくなったら終了とします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 方策勾配法で迷路を解く
stop_epsilon = 10**-4 # 10^-4よりも方策に変化が少なくなったら学習終了とする

theta = theta_0
pi = pi_0

is_continue = True
count = 1
while is_continue: # is_continueがFalseになるまで繰り返す
s_a_history = goal_maze_ret_s_a(pi) # 方策πで迷路内を探索した履歴を求める
new_theta = update_theta(theta, pi, s_a_history) # パラメータΘを更新
new_pi = softmax_convert_into_pi_from_theta(new_theta) # 方策πの更新

print(np.sum(np.abs(new_pi - pi))) # 方策の変化を出力
print("迷路を解くのにかかったステップ数は" + str(len(s_a_history) - 1) + "です")

if np.sum(np.abs(new_pi - pi)) < stop_epsilon:
is_continue = False
else:
theta = new_theta
pi = new_pi

実行結果5(途中略)
最終的にゴールまで最小ステップ数の4となっていることが分かります。

最終的な方策を確認してみます。

1
2
3
# 最終的な方策を確認
np.set_printoptions(precision=3, suppress=True) # 有効桁数3、指数表示しないという設定
print(pi)

実行結果6

最後に動画でエージェントの移動を可視化します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# エージェントの移動の様子を可視化します
# 参考URL http://louistiao.me/posts/notebooks/embedding-matplotlib-animations-in-jupyter-notebooks/
from matplotlib import animation
from IPython.display import HTML

def init():
# 背景画像の初期化
line.set_data([], [])
return (line,)

def animate(i):
# フレームごとの描画内容
state = s_a_history[i][0] # 現在の場所を描く
x = (state % 3) + 0.5 # 状態のx座標は、3で割った余り+0.5
y = 2.5 - int(state / 3) # y座標は3で割った商を2.5から引く
line.set_data(x, y)
return (line,)

# 初期化関数とフレームごとの描画関数を用いて動画を作成
anim = animation.FuncAnimation(fig, animate, init_func=init, frames=len(
s_a_history), interval=500, repeat=False)

HTML(anim.to_jshtml())

スタート地点からまっすぐゴールにたどり着いていることが分かります。

(Google Colaboratoryで動作確認しています。)

参考

つくりながら学ぶ!深層強化学習 サポートページ

迷路内をランダムに探索させる

3×3の迷路をランダムに探索してゴールを目指すエージェントを実装します。
S0地点がスタート位置で、S8地点がゴール位置になります。

使用するパッケージをインポートします。

1
2
3
4
# 使用するパッケージの宣言
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

次に迷路の初期状態を描画します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 初期位置での迷路の様子
# 図を描く大きさと、図の変数名を宣言
fig = plt.figure(figsize=(5, 5))
ax = plt.gca()

# 赤い壁を描く
plt.plot([1, 1], [0, 1], color='red', linewidth=2)
plt.plot([1, 2], [2, 2], color='red', linewidth=2)
plt.plot([2, 2], [2, 1], color='red', linewidth=2)
plt.plot([2, 3], [1, 1], color='red', linewidth=2)

# 状態を示す文字S0~S8を描く
plt.text(0.5, 2.5, 'S0', size=14, ha='center')
plt.text(1.5, 2.5, 'S1', size=14, ha='center')
plt.text(2.5, 2.5, 'S2', size=14, ha='center')
plt.text(0.5, 1.5, 'S3', size=14, ha='center')
plt.text(1.5, 1.5, 'S4', size=14, ha='center')
plt.text(2.5, 1.5, 'S5', size=14, ha='center')
plt.text(0.5, 0.5, 'S6', size=14, ha='center')
plt.text(1.5, 0.5, 'S7', size=14, ha='center')
plt.text(2.5, 0.5, 'S8', size=14, ha='center')
plt.text(0.5, 2.3, 'START', ha='center')
plt.text(2.5, 0.3, 'GOAL', ha='center')

# 描画範囲の設定と目盛りを消す設定
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
plt.tick_params(axis='both', which='both', bottom='off', top='off',
labelbottom='off', right='off', left='off', labelleft='off')

# 現在地S0に緑丸を描画する
line, = ax.plot([0.5], [2.5], marker="o", color='g', markersize=60)

実行結果1

エージェントを実装します。エージェントは緑色の丸で表示します。

エージェントがどのように行動するのかを決めたルールは方策(Policy)といいます。
初期の方策を決定するパラメータtheta_0を設定します。

行は状態0~7を表し、列は上、右、下、左へ行動できるかどうかを表します。
状態8はゴールなので方策の定義は不要です。

1
2
3
4
5
6
7
8
9
10
11
12
# 初期の方策を決定するパラメータtheta_0を設定

# 行は状態0~7、列は移動方向で↑、→、↓、←を表す
theta_0 = np.array([[np.nan, 1, 1, np.nan], # s0
[np.nan, 1, np.nan, 1], # s1
[np.nan, np.nan, 1, 1], # s2
[1, 1, 1, np.nan], # s3
[np.nan, np.nan, 1, 1], # s4
[1, np.nan, np.nan, np.nan], # s5
[1, np.nan, np.nan, np.nan], # s6
[1, 1, np.nan, np.nan], # s7、※s8はゴールなので、方策はなし
])

パラメータtheta_0を割合に変換して確率にします。

1
2
3
4
5
6
7
8
9
10
11
# 方策パラメータthetaを行動方策piに変換する関数の定義
def simple_convert_into_pi_from_theta(theta):
'''単純に割合を計算する'''
[m, n] = theta.shape # thetaの行列サイズを取得
pi = np.zeros((m, n))
for i in range(0, m):
pi[i, :] = theta[i, :] / np.nansum(theta[i, :]) # 割合の計算

pi = np.nan_to_num(pi) # nanを0に変換

return pi

初期の方策pi_0を算出します。

1
2
# 初期の方策pi_0を求める
pi_0 = simple_convert_into_pi_from_theta(theta_0)

初期の方策pi_0を表示します。

1
2
# 初期の方策pi_0を表示
pi_0

実行結果2

続いて、方策pi_0に従ってエージェントを行動させます。
1step移動後の状態sを求める関数get_next_sを定義します。

迷路の位置は0~8の番号で定義しているため、上に移動する場合は数字を3小さくすればよいことになります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 1step移動後の状態sを求める関数を定義
def get_next_s(pi, s):
direction = ["up", "right", "down", "left"]

next_direction = np.random.choice(direction, p=pi[s, :])
# pi[s,:]の確率に従って、directionが選択される
if next_direction == "up":
s_next = s - 3 # 上に移動するときは状態の数字が3小さくなる
elif next_direction == "right":
s_next = s + 1 # 右に移動するときは状態の数字が1大きくなる
elif next_direction == "down":
s_next = s + 3 # 下に移動するときは状態の数字が3大きくなる
elif next_direction == "left":
s_next = s - 1 # 左に移動するときは状態の数字が1小さくなる

return s_next

迷路内をエージェントがゴールするまで移動させる関数を定義します。
ゴールにたどり着くまでwhile文で移動し続け、状態の軌跡をstate_historyに格納しています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 迷路内をエージェントがゴールするまで移動させる関数の定義
def goal_maze(pi):
s = 0 # スタート地点
state_history = [0] # エージェントの移動を記録するリスト

while (1): # ゴールするまでループ
next_s = get_next_s(pi, s)
state_history.append(next_s) # 記録リストに次の状態(エージェントの位置)を追加

if next_s == 8: # ゴール地点なら終了
break
else:
s = next_s

return state_history

方策pi_0に従ってエージェントを移動させます。

1
2
# 迷路内をゴールを目指して、移動
state_history = goal_maze(pi_0)

ゴールするまでの移動の軌跡と、合計何ステップかかったかを確認します。

1
2
print(state_history)
print("迷路を解くのにかかったステップ数は" + str(len(state_history) - 1) + "です")

実行結果3
ランダムに移動しているので、状態の軌跡は実行するたびに変わります。


迷路内をエージェントが移動する様子を動画にしてみます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# エージェントの移動の様子を可視化します
from matplotlib import animation
from IPython.display import HTML

def init():
'''背景画像の初期化'''
line.set_data([], [])
return (line,)

def animate(i):
'''フレームごとの描画内容'''
state = state_history[i] # 現在の場所を描く
x = (state % 3) + 0.5 # 状態のx座標は、3で割った余り+0.5
line.set_data(x, y)
return (line,)

# 初期化関数とフレームごとの描画関数を用いて動画を作成する
anim = animation.FuncAnimation(fig, animate, init_func=init, frames=len(
state_history), interval=200, repeat=False)

HTML(anim.to_jshtml())

動画を見ると何回もさまよいながら最終的にはゴールにたどり着く様子を見ることができます。

(Google Colaboratoryで動作確認しています。)

参考

つくりながら学ぶ!深層強化学習 サポートページ

ResNet (Resiidual Network) で画像分類

ResNet (Resiidual Network) で画像分類を行います。

ResNetでは残差ブロックというショートカット構造があることが特徴です。
畳み込み層にショートカットコネクション(迂回ルート)を追加し、畳み込み層で学習が不要になった場合迂回を行いより深い層の学習ができるようになります。

まずGoogle Colaboratoryのランタイムのハードウェアアクセラレータを「TPU」に設定します。
ハードウェアアクセラレータの設定

次にtensorflowのバージョンを1.13.1にします。

1
2
!pip uninstall tensorflow -y
!pip install tensorflow==1.13.1

インストール後にランタイムをリスタートする必要があります。


ResNetで必要なパッケージをインポートします。

1
2
3
4
5
6
7
8
9
10
11
12
# パッケージのインポート
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.layers import Activation, Add, BatchNormalization, Conv2D, Dense, GlobalAveragePooling2D, Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.regularizers import l2
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

データセットの準備を行います。

データセットの配列は下記のようになります。

配列 説明
train_images 訓練画像の配列
train_labels 訓練ラベルの配列
test_images テスト画像の配列
test_labels テストラベルの配列
1
2
# データセットの準備
(train_images, train_labels), (test_images, test_labels) = cifar10.load_data()

実行結果1

データセットのシェイプを確認します、

1
2
3
4
5
# データセットのシェイプの確認
print(train_images.shape)
print(train_labels.shape)
print(test_images.shape)
print(test_labels.shape)

実行結果2

訓練データと訓練ラベルは50000件、テスト画像とテストラベルは10000件です。
画像サイズは32×32×3です。RGB画像のため画像サイズ(32×32)×3となります。


先頭10件の訓練画像を確認します。

1
2
3
4
5
# データセットの画像の確認
for i in range(10):
plt.subplot(2, 5, i+1)
plt.imshow(train_images[i])
plt.show()

実行結果3

先頭10件の訓練ラベルの確認を行います。

1
2
# データセットのラベルの確認
print(train_labels[0:10])

実行結果4

ラベルの意味は下記の通りです。

ID 説明
0 airplane(飛行機)
1 automobile(自動車)
2 bird(鳥)
3 cat(猫)
4 deer(鹿)
5 dog(犬)
6 frog(カエル)
7 horse(馬)
8 ship(船)
9 truck(トラック)

データセットの前処理と確認を行います。

訓練ラベルとテストラベルのone-hot表現への変換を行います。

1
2
3
4
5
6
7
8
9
10
11
# データセットの前処理
train_images = train_images
train_labels = to_categorical(train_labels)
test_images = test_images
test_labels = to_categorical(test_labels)

# データセットの前処理後のシェイプの確認
print(train_images.shape)
print(train_labels.shape)
print(test_images.shape)
print(test_labels.shape)

実行結果5

畳み込み層の生成を行います。

Conv2Dのコンストラクタの引数は次の通りです。

引数 データ型 説明
filters int カーネル数
kernel_size int or tuple カーネルサイズ
strides int or tuple ストライド
padding str パディングで入力と同じサイズに戻すときはsame、戻さないときはvalid
user_bias boot バイアスを加えるかどうか
kernel_initializer str カーネルの重み行列の初期値。he_normalは正規分布による初期化
kernel_regularizer Regularizer kernelの重みに適用させる正則化。l2はL2正則化を利用
1
2
3
4
# 畳み込み層の生成
def conv(filters, kernel_size, strides=1):
return Conv2D(filters, kernel_size, strides=strides, padding='same', use_bias=False,
kernel_initializer='he_normal', kernel_regularizer=l2(0.0001))

残差ブロックAを生成します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 残差ブロックAの生成
def first_residual_unit(filters, strides):
def f(x):
# →BN→ReLU
x = BatchNormalization()(x)
b = Activation('relu')(x)

# 畳み込み層→BN→ReLU
x = conv(filters // 4, 1, strides)(b)
x = BatchNormalization()(x)
x = Activation('relu')(x)

# 畳み込み層→BN→ReLU
x = conv(filters // 4, 3)(x)
x = BatchNormalization()(x)
x = Activation('relu')(x)

# 畳み込み層→
x = conv(filters, 1)(x)

# ショートカットのシェイプサイズを調整
sc = conv(filters, 1, strides)(b)

# Add
return Add()([x, sc])
return f

残差ブロックBを生成します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 残差ブロックBの生成
def residual_unit(filters):
def f(x):
sc = x

# →BN→ReLU
x = BatchNormalization()(x)
x = Activation('relu')(x)

# 畳み込み層→BN→ReLU
x = conv(filters // 4, 1)(x)
x = BatchNormalization()(x)
x = Activation('relu')(x)

# 畳み込み層→BN→ReLU
x = conv(filters // 4, 3)(x)
x = BatchNormalization()(x)
x = Activation('relu')(x)

# 畳み込み層→
x = conv(filters, 1)(x)

# Add
return Add()([x, sc])
return f

残差ブロックAと残差ブロックB x 17を生成します。

1
2
3
4
5
6
7
8
# 残差ブロックAと残差ブロックB x 17の生成
def residual_block(filters, strides, unit_size):
def f(x):
x = first_residual_unit(filters, strides)(x)
for i in range(unit_size-1):
x = residual_unit(filters)(x)
return x
return f

畳み込み層を行い残差ブロック54個とプーリング層を重ねます。(特徴の抽出)
次に、全結合を1つ重ねます。(分類)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 入力データのシェイプ
input = Input(shape=(32,32, 3))

# 畳み込み層
x = conv(16, 3)(input)

# 残差ブロック x 54
x = residual_block(64, 1, 18)(x)
x = residual_block(128, 2, 18)(x)
x = residual_block(256, 2, 18)(x)

# →BN→ReLU
x = BatchNormalization()(x)
x = Activation('relu')(x)

# プーリング層
x = GlobalAveragePooling2D()(x)

# 全結合層
output = Dense(10, activation='softmax', kernel_regularizer=l2(0.0001))(x)

# モデルの作成
model = Model(inputs=input, outputs=output)

TPUモデルへの変換への変換を行います。

1
2
3
4
5
6
7
8
9
# TPUモデルへの変換
import tensorflow as tf
import os
tpu_model = tf.contrib.tpu.keras_to_tpu_model(
model,
strategy=tf.contrib.tpu.TPUDistributionStrategy(
tf.contrib.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])
)
)

コンパイルを行います。
損失関数はcategorical_crossentropy、最適化関数はSGD、評価指標accを設定します。

1
2
# コンパイル
tpu_model.compile(loss='categorical_crossentropy', optimizer=SGD(momentum=0.9), metrics=['acc'])

ImageDataGeneratorを使って、データセット画像の正規化と水増しを行います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# ImageDataGeneratorの準備
train_gen = ImageDataGenerator(
featurewise_center=True,
featurewise_std_normalization=True,
width_shift_range=0.125,
height_shift_range=0.125,
horizontal_flip=True)
test_gen = ImageDataGenerator(
featurewise_center=True,
featurewise_std_normalization=True)

# データセット全体の統計量を予め計算
for data in (train_gen, test_gen):
data.fit(train_images)

LearningRateSchedulerを準備します。
LearningRateSchedulerは学習中に学習率を変化させるコールバックです。
学習率をはじめに0.1、80エポック以降0.01、120エポック以降0.001となるように設定します。
学習率は正解から遠い時には大きく、正解に近くなった時には小さく更新することで、速く正確に正解にだとりつけるように調整します。

1
2
3
4
5
6
7
# LearningRateSchedulerの準備
def step_decay(epoch):
x = 0.1
if epoch >= 80: x = 0.01
if epoch >= 120: x = 0.001
return x
lr_decay = LearningRateScheduler(step_decay)

訓練画像と訓練ラベルの配列をモデルに渡して、学習を行います。

1
2
3
4
5
6
7
8
9
# 学習
batch_size = 128
history = tpu_model.fit_generator(
train_gen.flow(train_images, train_labels, batch_size=batch_size),
epochs=100,
steps_per_epoch=train_images.shape[0] // batch_size,
validation_data=test_gen.flow(test_images, test_labels, batch_size=batch_size),
validation_steps=test_images.shape[0] // batch_size,
callbacks=[lr_decay])

実行結果6(途中略)

学習結果をグラフ表示します。

1
2
3
4
5
6
7
# グラフの表示
plt.plot(history.history['acc'], label='acc')
plt.plot(history.history['val_acc'], label='val_acc')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(loc='best')
plt.show()

実行結果7

テスト画像とテストラベルの配列をモデルに渡して評価を実行し、正解率を取得します。

1
2
3
4
5
6
# 評価
batch_size = 128
test_loss, test_acc = tpu_model.evaluate_generator(
test_gen.flow(test_images, test_labels, batch_size=batch_size),
steps=10)
print('loss: {:.3f}\nacc: {:.3f}'.format(test_loss, test_acc ))

実行結果8
正解率は94.2%となりました。
畳み込みニューラルネットワークでは73.5%だったのでかなりの性能向上です。


先頭10件のテスト画像の判定を行い、予測結果を表示します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 推論する画像の表示
for i in range(10):
plt.subplot(2, 5, i+1)
plt.imshow(test_images[i])
plt.show()

# 推論したラベルの表示
test_predictions = tpu_model.predict_generator(
test_gen.flow(test_images[0:16], shuffle = False, batch_size=16),
steps=16) # 学習データ数を16個に増やす
test_predictions = np.argmax(test_predictions, axis=1)[0:10] # 結果数を10個に減らす
labels = ['airplane', 'automobile', 'bird', 'cat', 'deer',
'dog', 'frog', 'horse', 'ship', 'truck']
print([labels[n] for n in test_predictions])

実行結果9

全ての画像を正しく判定できていることが分かります。

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

畳み込みニューラルネットワークで画像分類

畳み込みニューラルネットワークで写真の画像分類を行います。
畳み込みニューラルネットワークを使って効率よくデータの特徴を抽出することができます。

データセットはCIFAR-10を使用します。10個に分類された画像と正解ラベルを集めたデータセットで、訓練データ50000件、テストデータ10000件が含まれています。

まずGoogle Colaboratoryのランタイムのハードウェアアクセラレータを「TPU」に設定します。
ハードウェアアクセラレータの設定

次にtensorflowのバージョンを1.13.1にします。

1
2
!pip uninstall tensorflow -y
!pip install tensorflow==1.13.1

インストール後にランタイムをリスタートする必要があります。


畳み込みニューラルネットワークで必要なパッケージをインポートします。

1
2
3
4
5
6
7
8
9
# パッケージのインポート
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.layers import Activation, Dense, Dropout, Conv2D, Flatten, MaxPool2D
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

データセットの準備を行います。

データセットの配列は下記のようになります。

配列 説明
train_images 訓練画像の配列
train_labels 訓練ラベルの配列
test_images テスト画像の配列
test_labels テストラベルの配列
1
2
# データセットの準備
(train_images, train_labels), (test_images, test_labels) = cifar10.load_data()

データセットのシェイプを確認します、

1
2
3
4
5
# データセットのシェイプの確認
print(train_images.shape)
print(train_labels.shape)
print(test_images.shape)
print(test_labels.shape)

実行結果1
訓練データと訓練ラベルは50000件、テスト画像とテストラベルは10000件です。
画像サイズは32×32×3です。RGB画像のため画像サイズ(32×32)×3となります。


先頭10件の訓練画像を確認します。

1
2
3
4
5
# データセットの画像の確認
for i in range(10):
plt.subplot(2, 5, i+1)
plt.imshow(train_images[i])
plt.show()

実行結果2
先頭10件の訓練ラベルの確認を行います。

1
2
# データセットのラベルの確認
print(train_labels[0:10])

実行結果3

ラベルの意味は下記の通りです。

ID 説明
0 airplane(飛行機)
1 automobile(自動車)
2 bird(鳥)
3 cat(猫)
4 deer(鹿)
5 dog(犬)
6 frog(カエル)
7 horse(馬)
8 ship(船)
9 truck(トラック)

データセットの前処理と確認を行います。 訓練画像とテスト画像の正規化を行います。画像のRGBは「0~255」なので255で割って「0.0~1.0」に変換します。
1
2
3
4
5
6
7
# データセットの画像の前処理
train_images = train_images.astype('float32')/255.0
test_images = test_images.astype('float32')/255.0

# データセットの画像の前処理後のシェイプの確認
print(train_images.shape)
print(test_images.shape)
![実行結果4](/img/chap3/3-4.png) 訓練ラベルとテストラベルをone-hot表現に変換します。
1
2
3
4
5
6
7
# データセットのラベルの前処理
train_labels = to_categorical(train_labels, 10)
test_labels = to_categorical(test_labels, 10)

# データセットのラベルの前処理後のシェイプの確認
print(train_labels.shape)
print(test_labels.shape)
![実行結果5](/img/chap3/3-5.png) 畳み込みニューラルネットワークのモデルを作成します。 モデルのネットワーク構造は下記の通りです。
  1. 特徴の抽出を行う。(畳み込みブロック)
  2. (1回目)畳み込み層→畳み込み層→プーリング層→Dropout
  3. (2回目)畳み込み層→畳み込み層→プーリング層→Dropout
  4. 1次元に変換する。(Flatten)
  5. 分類を行う。
  6. 全結合層
  7. Dropout
  8. 全結合層

利用するクラスは次の通りです。

クラス 説明
Conv2D 畳み込み層。引数はカーネル数、カーネルサイズ、活性化関数、パティング。
MaxPool2D プーリング層(Maxプーリング)。引数はプーリング適用領域。
Dense 全結合層。引数はユニット数と活性化関数。
Dropout ドロップアウト。引数は無効にする割合。
Flatten 層の入出力を1次元に変換。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#モデルの作成
model = Sequential()

# Conv→Conv→Pool→Dropout
model.add(Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=(32, 32, 3)))
model.add(Conv2D(32, (3, 3), activation='relu', padding='same'))
model.add(MaxPool2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

# Conv→Conv→Pool→Dropout
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
model.add(MaxPool2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

# Flatten→Dense→Dropout→Dense
model.add(Flatten())
model.add(Dense(512, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax'))

TPUモデルへの変換を行います。

1
2
3
4
5
6
7
8
9
# TPUモデルへの変換
import tensorflow as tf
import os
tpu_model = tf.contrib.tpu.keras_to_tpu_model(
model,
strategy=tf.contrib.tpu.TPUDistributionStrategy(
tf.contrib.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])
)
)

畳み込みニューラルネットワークモデルのコンパイルを行います。

1
2
# コンパイル
tpu_model.compile(loss='categorical_crossentropy', optimizer=Adam(lr=0.001), metrics=['acc'])

訓練画像と訓練ラベルの配列をモデルに渡して学習を開始します。

1
2
3
# 学習
history = tpu_model.fit(train_images, train_labels, batch_size=128,
epochs=20, validation_split=0.1)

実行結果6(途中略)
畳み込みニューラルネットワークの学習結果をグラフ表示します。

1
2
3
4
5
6
7
# グラフの表示
plt.plot(history.history['acc'], label='acc')
plt.plot(history.history['val_acc'], label='val_acc')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(loc='best')
plt.show()

実行結果7
学習が進むと正解率が上がっていくことが分かります。


テスト画像とテストラベルの配列をモデルに渡して評価を行い正解率を取得します。

1
2
3
# 評価
test_loss, test_acc = tpu_model.evaluate(test_images, test_labels)
print('loss: {:.3f}\nacc: {:.3f}'.format(test_loss, test_acc ))

実行結果8
正解率は73.5%となりました。


最後に、先頭10件のテストデータの推論を行い予測結果を取得します。

1
2
3
4
5
6
7
8
9
10
11
12
# 推論する画像の表示
for i in range(10):
plt.subplot(2, 5, i+1)
plt.imshow(test_images[i])
plt.show()

# 推論したラベルの表示
test_predictions = tpu_model.predict(test_images[0:16]) # 学習データ数を16個に増やす
test_predictions = np.argmax(test_predictions, axis=1)[0:10] # 結果数を10個に減らす
labels = ['airplane', 'automobile', 'bird', 'cat', 'deer',
'dog', 'frog', 'horse', 'ship', 'truck']
print([labels[n] for n in test_predictions])

実行結果9

画像と推測結果を比較すると正解率が90%であることが分かります。

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ