経験の蓄積と活用のバランス Epsion-Greedey法

経験の蓄積と活用のトレードオフのバランスをとる手法としてEpsilon-Greedy法を実装します。

何枚かのコインから1枚を選んで、投げた時表が出れば報酬が得られるゲームを考えます。
各コインの表が出る確率はバラバラです。

必要なパッケージをインポートします。

1
2
import random
import numpy as np

コイントスゲームの実装を行います。
head_probsは配列のパラメータで各コインの表が出る確率を指定します。

max_episode_stepsはコイントスを行う回数で、この回数の実行して表がでた回数が報酬となります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class CoinToss():

def __init__(self, head_probs, max_episode_steps=30):
self.head_probs = head_probs
self.max_episode_steps = max_episode_steps
self.toss_count = 0

def __len__(self):
return len(self.head_probs)

def reset(self):
self.toss_count = 0

def step(self, action):
final = self.max_episode_steps - 1
if self.toss_count > final:
raise Exception("The step count exceeded maximum. Please reset env.")
else:
done = True if self.toss_count == final else False

if action >= len(self.head_probs):
raise Exception("The No.{} coin doesn't exist.".format(action))
else:
head_prob = self.head_probs[action]
if random.random() < head_prob:
reward = 1.0
else:
reward = 0.0
self.toss_count += 1
return reward, done

エージェントを作成します。

policy関数で、epsilonの確率でランダムにコインを選択し(探索)、それ以外の確率で各コインの期待値にそってコインを選択します(活用)。
play関数は、コイントスを行う処理です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class EpsilonGreedyAgent():

def __init__(self, epsilon):
self.epsilon = epsilon
self.V = []

def policy(self):
coins = range(len(self.V))
if random.random() < self.epsilon:
return random.choice(coins)
else:
return np.argmax(self.V)

def play(self, env):
# Initialize estimation.
N = [0] * len(env)
self.V = [0] * len(env)

env.reset()
done = False
rewards = []
while not done:
selected_coin = self.policy()
reward, done = env.step(selected_coin)
rewards.append(reward)

n = N[selected_coin]
coin_average = self.V[selected_coin]
new_average = (coin_average * n + reward) / (n + 1)
N[selected_coin] += 1
self.V[selected_coin] = new_average

return rewards

5枚のコインを用意し、コイントスの回数を変えながら、各エピソードにおける1回のコイントスあたりの報酬を記録していきます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if __name__ == "__main__":
import pandas as pd
import matplotlib.pyplot as plt

def main():
env = CoinToss([0.1, 0.5, 0.1, 0.9, 0.1])
epsilons = [0.0, 0.1, 0.2, 0.5, 0.8]
game_steps = list(range(10, 310, 10))
result = {}
for e in epsilons:
agent = EpsilonGreedyAgent(epsilon=e)
means = []
for s in game_steps:
env.max_episode_steps = s
rewards = agent.play(env)
means.append(np.mean(rewards))
result["epsilon={}".format(e)] = means
result["coin toss count"] = game_steps
result = pd.DataFrame(result)
result.set_index("coin toss count", drop=True, inplace=True)
result.plot.line(figsize=(10, 5))
plt.show()

main()

実行結果

epsilon=0.1と0.2ではコイントスの回数とともに報酬が向上していることが分かります。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

価値の定義と算出 Bellman Equation

価値を再帰的かつ期待値で表現する手法をBellman Equationと呼びます。(Valueベース)
Bellman Equationを使えば各状態の価値が計算可能となります。

まず価値を返す関数を定義します。

1
2
3
def V(s, gamma=0.99):
V = R(s) + gamma * max_V_on_next_state(s)
return V

報酬関数を定義します。
エピソード終了のとき”happy_end”であれば1を返し、”bad_end”であれば-1を返します。
エピソードが終了していなければ0を返します。

1
2
3
4
5
6
7
def R(s):
if s == "happy_end":
return 1
elif s == "bad_end":
return -1
else:
return 0

全ての行動でV(s)を計算し値が最大になる価値を返します。
評価vの計算式は確率遷移×遷移先の価値となります。

upかdownかを繰り返していき5回行動したら終了となります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def max_V_on_next_state(s):
# If game end, expected value is 0.
if s in ["happy_end", "bad_end"]:
return 0

actions = ["up", "down"]
values = []
for a in actions:
transition_probs = transit_func(s, a)
v = 0
for next_state in transition_probs:
prob = transition_probs[next_state] # 確率遷移
v += prob * V(next_state) # 遷移先の価値
values.append(v)
return max(values)

遷移関数を定義します。

  • 引数sには”state”や”state_up_up”、”state_down_down”などが受け渡されます。
  • 引数aは”up”か”down”が設定されます。
  • エピソード完了時は1要素が返り、途中の場合は2要素が返ります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def transit_func(s, a):
actions = s.split("_")[1:]
LIMIT_GAME_COUNT = 5
HAPPY_END_BORDER = 4
MOVE_PROB = 0.9

def next_state(state, action):
return "_".join([state, action])

if len(actions) == LIMIT_GAME_COUNT:
# 最大行動数と一致するのでエピソード終了
up_count = sum([1 if a == "up" else 0 for a in actions])
state = "happy_end" if up_count >= HAPPY_END_BORDER else "bad_end"
prob = 1.0
return {state: prob}
else:
opposite = "up" if a == "down" else "down"
return {
next_state(s, a): MOVE_PROB,
next_state(s, opposite): 1 - MOVE_PROB
}

実際に価値V(s)の計算を行ってみます。

1
2
3
4
if __name__ == "__main__":
print(V("state"))
print(V("state_up_up"))
print(V("state_down_down"))

実行結果

upの数が多い方が評価されます。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

マルコフ決定過程(MDP)

マルコフ決定過程(MDP)に従う環境を構築します。

マルコフ決定過程(MDP)は次のようなルールに従います。

  • 遷移先の状態は直前の状態とそこでの行動のみに依存する。
  • 報酬は直前の状態と遷移先に依存する。

今回は次のような迷路を解く環境を実装します。

迷路

まずは必要なパッケージをインポートします。

1
2
3
import random
from enum import Enum
import numpy as np

状態を表すクラスを定義します。
縦位置をrow、横位置をcolumnで表します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class State():

def __init__(self, row=-1, column=-1):
self.row = row
self.column = column

def __repr__(self):
return "<State: [{}, {}]>".format(self.row, self.column)

def clone(self):
return State(self.row, self.column)

def __hash__(self):
return hash((self.row, self.column))

def __eq__(self, other):
return self.row == other.row and self.column == other.column

行動を表すクラスを定義します。
行動は上下左右への移動4種類です。

1
2
3
4
5
class Action(Enum):
UP = 1
DOWN = -1
LEFT = 2
RIGHT = -2

環境の実体となるクラスを定義します。
迷路の定義を2次元配列のgridで受け取ります。

gridの要素は次のような意味となります。

意味
0 移動可能な場所を表します。
-1 ダメージを受ける場所でゲーム終了となります。
1 報酬を得られる場所でゲーム終了となります。
9 壁を意味し移動することができない場所です。

default_rewardは基本の報酬となり、この変数をマイナスにすることで意味なく行動することを防ぎ、早くゴールに向かうことを促します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Environment():

def __init__(self, grid, move_prob=0.8):
# grid is 2d-array. Its values are treated as an attribute.
# Kinds of attribute is following.
# 0: ordinary cell
# -1: damage cell (game end)
# 1: reward cell (game end)
# 9: block cell (can't locate agent)
self.grid = grid
self.agent_state = State()

# Default reward is minus. Just like a poison swamp.
# It means the agent has to reach the goal fast!
self.default_reward = -0.04

# Agent can move to a selected direction in move_prob.
# It means the agent will move different direction
# in (1 - move_prob).
self.move_prob = move_prob
self.reset()

@property
def row_length(self):
return len(self.grid)

@property
def column_length(self):
return len(self.grid[0])

@property
def actions(self):
return [Action.UP, Action.DOWN,
Action.LEFT, Action.RIGHT]

@property
def states(self):
states = []
for row in range(self.row_length):
for column in range(self.column_length):
# Block cells are not included to the state.
if self.grid[row][column] != 9:
states.append(State(row, column))
return states

遷移関数を定義します。
選択した行動にはmove_prob(80%)の行動確率を設定し、反対の行動には0%の行動確率を設定します。
残りの2方向の移動には10%の行動確率を設定します。
(トータルの行動確率は100%になります。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def transit_func(self, state, action):
transition_probs = {}
if not self.can_action_at(state):
# Already on the terminal cell.
return transition_probs

opposite_direction = Action(action.value * -1)

for a in self.actions:
prob = 0
if a == action:
prob = self.move_prob
elif a != opposite_direction:
prob = (1 - self.move_prob) / 2

next_state = self._move(state, a)
if next_state not in transition_probs:
transition_probs[next_state] = prob
else:
transition_probs[next_state] += prob

return transition_probs

行動できる場所(状態)かどうかを判定する関数を定義します。

1
2
3
4
5
def can_action_at(self, state):
if self.grid[state.row][state.column] == 0:
return True
else:
return False

ある状態である行動をすると、次にどの状態になるかを返す関数を定義します。
迷路の範囲外への移動を防いだり、壁にぶつかったかどうかはこの関数内で判断します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _move(self, state, action):
if not self.can_action_at(state):
raise Exception("Can't move from here!")

next_state = state.clone()

# Execute an action (move).
if action == Action.UP:
next_state.row -= 1
elif action == Action.DOWN:
next_state.row += 1
elif action == Action.LEFT:
next_state.column -= 1
elif action == Action.RIGHT:
next_state.column += 1

# Check whether a state is out of the grid.
if not (0 <= next_state.row < self.row_length):
next_state = state
if not (0 <= next_state.column < self.column_length):
next_state = state

# Check whether the agent bumped a block cell.
if self.grid[next_state.row][next_state.column] == 9:
next_state = state

return next_state

報酬関数を定義します。
ある状態で報酬が得られるかどうか、ダメージを受けるかどうかを判定します。
ゲームが終了するかどうかもこの報酬関数で判定します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def reward_func(self, state):
reward = self.default_reward
done = False

# Check an attribute of next state.
attribute = self.grid[state.row][state.column]
if attribute == 1:
# Get reward! and the game ends.
reward = 1
done = True
elif attribute == -1:
# Get damage! and the game ends.
reward = -1
done = True

return reward, done

エージェントの位置を初期化する関数を定義します。
ゲーム開始時や、ゲームが終わり再度ゲームを開始する場合に使用します。

1
2
3
4
def reset(self):
# Locate the agent at lower left corner.
self.agent_state = State(self.row_length - 1, 0)
return self.agent_state

行動を行う関数を定義します。
行動を受け取り、遷移関数から遷移先を算出し、さらに報酬関数から即時報酬を取得します。

1
2
3
4
5
6
def step(self, action):
next_state, reward, done = self.transit(self.agent_state, action)
if next_state is not None:
self.agent_state = next_state

return next_state, reward, done

遷移関数を定義します。
行動を受け取り、遷移関数を使って行動確率を取得します。
行動確率から実際にどう行動するかどうかを最終決定します。(np.random.choice関数を使用)
決定した行動より遷移先と報酬、終了したかどうかの結果が導きだされます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def transit(self, state, action):
transition_probs = self.transit_func(state, action)
if len(transition_probs) == 0:
return None, None, True

next_states = []
probs = []
for s in transition_probs:
next_states.append(s)
probs.append(transition_probs[s])

next_state = np.random.choice(next_states, p=probs)
reward, done = self.reward_func(next_state)
return next_state, reward, done

エージェントを定義します。
エージェントのpolicyは状態を受け取って行動を決める関数ですが、今回は単純にランダム行動をとるようにしています。

1
2
3
4
5
6
7
class Agent():

def __init__(self, env):
self.actions = env.actions

def policy(self, state):
return random.choice(self.actions)

環境内でエージェントを動作させるコードを実装します。
迷路の定義(grid)を行い、それをもとにして環境(Environment)作成します。
作成した環境をエージェントに渡して、そのエージェントを行動させることでゲームが実行されます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def main():
# Make grid environment.
grid = [
[0, 0, 0, 1],
[0, 9, 0, -1],
[0, 0, 0, 0]
]
env = Environment(grid)
agent = Agent(env)

# Try 10 game.
for i in range(10):
# Initialize position of agent.
state = env.reset()
total_reward = 0
done = False

while not done:
action = agent.policy(state)
next_state, reward, done = env.step(action)
total_reward += reward
state = next_state

print("Episode {}: Agent gets {} reward.".format(i, total_reward))

if __name__ == "__main__":
main()

実行結果

単純なランダム行動ですが、10ゲーム行い10回分の報酬を取得できることを確認できます。

参考

Pythonで学ぶ強化学習 -入門から実践まで- サンプルコード

進化戦略と遺伝的アルゴリズム

ニューラルネットワークの学習では勾配法が使われることが一般的ですが、勾配法とは違うアプローチとして「進化戦略」と「遺伝的アルゴリズム」があります。

進化戦略

パラメータを複数生成し、各パラメータを使った場合のモデルを評価します。
評価がよいものに近いパラメータをさらに生成し、評価を行うというプロセスを繰り返します。
(多くの候補から優秀なものを絞り込むというアプローチです)

遺伝的アルゴリズム

進化戦略と基本は同じですが、評価が高かったパラメータ同士を混ぜる(交叉)、ランダムなパラメータを入れる(突然変異)という操作を行います。

模倣学習

模倣学習では、専門家や上手な人の行動を記録しておいてそれと近い行動をとるようにエージェントを学習させます。
少ないデータで望ましい行動を短時間で学習させることができる模倣学習はとても重要な学習手法です。

模倣学習には2つの問題があります。

  • 状態数が多い場合、上手な人の行動をとりきるのが困難になる。
  • 行動を記録するのが難しい状態がある。

模倣学習の方法として次の4つがあります。

1.Forward Training

各タイムステップの個別戦略を作っておいてそれをつなぎ合わせて全体戦略とします。
単純な教師あり学習より実際の状態遷移分布に近いデータで各戦略を学習させることができます。

2.SMILe

複数の戦略を混合していく手法です。
最初の戦略は上手な人の行動だけから学習し、その後は学習した戦略を混ぜていきます。

3.DAgger

戦略ではなくデータを混ぜ合わせていき、そこから学習して戦略を作成していきます。
具体的には各ステップで得られた状態とその状態における上手な人の行動のペアを学習データに足していきます。

4.GAIL

上手な人の模倣を見破られないようにする手法です。
模倣する側と模倣を見破る側の2つのモデルが存在し、一方は模倣を行いもう一方は鑑定を行う設定で学習を行います。(敵対的学習)

探索の概要

探索

現在の状態を開始点として、数手先をどう展開するかを先読みし、展開先の状態を評価します。
その状態評価をもとに現在の状態での最良の一手を選ぶ手法です。

探索では状態の展開を表すのにゲーム木でモデル化します。

ゲーム木

完全ゲーム木

ゲームの開始から選択できるすべての手を含んだゲーム木です。
これがあれば絶対に負けない戦略を立てることができますが、完全ゲーム木のノード数は膨大なため計算が不可能なことがほとんどです。

部分ゲーム木

現在の状態から時間内に探索できる分だけを含んだゲーム木です。
有効だと思われるノードは深く探索し、有効でないノードは途中で探索を打ち切ります。
強さはいかに効率が高い部分ゲーム木を手に入れられるかということにかかってきます。

ニューロンとニューラルネットワーク

ニューロン

ニューロンは人間の脳内にある神経細胞のことです。

深層学習でのニューロンは、人間脳内の神経細胞を模したものです。
このニューロンは重みパラメータと閾値(バイアス)を持っています。

  • 重みパラメータ
    ニューロン同士のつながりの強さを表します。
  • 閾値(バイアス)
    脳細胞の感度のようなものになります。
    入力信号と重みパラメータを掛け合わせたものが閾値を超えた時に次のニューロンへ信号を送ります。(発火)

深層強化学習が行われることで、上記2つのパラメータが調整されていきます。

ニューラルネットワーク

ニューロンを複数並べたものを層といいます。
層を積み重ねたものがニューラルネットワークとなります。

  • 入力層
    最初にある層で入力を受け付けます。
    入力データの数がニューロン数となります。
  • 出力層
    最後にある層で出力を行います。
    出力する数(答えの数)がニューロン数となります。
  • 隠れ層
    入力層と出力層の間にある層です。
    複数作成することが可能で、4層以上のニューラルネットワークがディープニューラルネットワークと呼ばれます。(入力層×1、隠れ層×2、出力層×1)

コンピュータの能力向上や、インターネットの広がりで学習データが容易に収集できるようになり深層強化学習は広く普及しました。

深層学習 ニューラルネットワークで回帰

ニューラルネットワークで数値データの予測を行う推定モデルを作成します。
住宅情報から価格を予測します。

まずは必要なパッケージをインポートします。

1
2
3
4
5
6
7
8
9
10
# パッケージのインポート
from tensorflow.keras.datasets import boston_housing
from tensorflow.keras.layers import Activation, Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

データセットの準備を行います。

各データの内容は次の通りです。

変数名 内容
train_data 訓練データの配列
train_labels 訓練ラベルの配列
test_data テストデータの配列
test_labels テストラベルの配列
1
2
# データセットの準備
(train_data, train_labels), (test_data, test_labels) = boston_housing.load_data()

実行結果1

データセットのシェイプを確認します。

1
2
3
4
5
# データセットのシェイプの確認
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)

実行結果2(一部略)
訓練データと訓練ラベルは404件、テストデータとテストラベルは102件です。
データの13は住宅情報の種類数です。

訓練データの先頭10件を表示します。

1
2
3
4
# データセットのデータの確認
column_names = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT']
df = pd.DataFrame(train_data, columns=column_names)
df.head()

実行結果3

訓練ラベルの先頭10件を表示します。

1
2
# データセットのラベルの確認
print(train_labels[0:10])

実行結果4

学習前の準備として、訓練データと訓練ラベルをシャッフルします。
似たデータを連続して学習すると偏りが生じてしまうのを防ぐためです。

1
2
3
4
# データセットのシャッフルの前処理
order = np.argsort(np.random.random(train_labels.shape))
train_data = train_data[order]
train_labels = train_labels[order]

訓練データとテストデータの正規化を行います。
データを一定の方法で変換し同じ単位で比較しやすくするためです。

具体的には平均0、分散1で正規化を行います。

1
2
3
4
5
# データセットの正規化の前処理
mean = train_data.mean(axis=0)
std = train_data.std(axis=0)
train_data = (train_data - mean) / std
test_data = (test_data - mean) / std

データセットのデータが平均0、分散1になっていることを確認します。

1
2
3
4
# データセットの前処理後のデータの確認
column_names = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT']
df = pd.DataFrame(train_data, columns=column_names)
df.head()

実行結果5(一部略)

モデルを作成します。今回は全結合層を3つ重ねた簡単なモデルとなります。

1
2
3
4
5
# モデルの作成
model = Sequential()
model.add(Dense(64, activation='relu', input_shape=(13,)))
model.add(Dense(64, activation='relu'))
model.add(Dense(1))

ニューラルネットワークモデルのコンパイルを行います。

  • 損失関数 mse
    平均二乗誤差 Mean Squared Error - 実際の値と予測値との誤差の二乗を平均したものです。
    0に近いほど予測精度が高いことになります。
  • 最適化関数 Adam
    lrは学習率です。
  • 評価指標 mae
    平均絶対誤差 Mean Absolute Error - 実際の値と予測値との絶対値を平均したものです。
    0に近いほど予測精度が高いことになります。
1
2
# コンパイル
model.compile(loss='mse', optimizer=Adam(lr=0.001), metrics=['mae'])

EarlyStoppingの準備を行います。
任意のエポック数改善がないと学習を停止します。

1
2
# EarlyStoppingの準備
early_stop = EarlyStopping(monitor='val_loss', patience=30)

学習を行います。callbacksにEarlyStoppingを指定しています。

1
2
# 学習
history = model.fit(train_data, train_labels, epochs=500, validation_split=0.2, callbacks=[early_stop])

実行結果6(途中略)

学習中に出力される情報の意味は次の通りです。

情報 説明
loss 訓練データの誤差です。0に近いほどよい結果となります。
mean_absolute_error 訓練データの平均絶対誤差です。0に近いほどよい結果となります。
val_loss 検証データの誤差です。0に近いほどよい結果となります。
val_mean_absolute_error 検証データの平均絶対誤差です。0に近いほどよい結果となります。

上記のデータうち、訓練データの平均絶対誤差(mae)と検証データの平均絶対誤差(val_mae)をグラフ表示します。

1
2
3
4
5
6
7
8
# グラフの表示
plt.plot(history.history['mean_absolute_error'], label='train mae')
plt.plot(history.history['val_mean_absolute_error'], label='val mae')
plt.xlabel('epoch')
plt.ylabel('mae [1000$]')
plt.legend(loc='best')
plt.ylim([0,5])
plt.show()

実行結果7

テストデータとテストラベルを推定モデルに渡して評価を行い、平均絶対誤差を算出します。

1
2
3
# 評価
test_loss, test_mae = model.evaluate(test_data, test_labels)
print('loss:{:.3f}\nmae: {:.3f}'.format(test_loss, test_mae))

実行結果8

平均絶対誤差は2.655となりました。

テストデータの先頭10件の推論を行い、予測結果を出力します。

1
2
3
4
5
6
# 推論する値段の表示
print(np.round(test_labels[0:10]))

# 推論した値段の表示
test_predictions = model.predict(test_data[0:10]).flatten()
print(np.round(test_predictions))

実行結果9

実際の価格に近い価格が推論されているような気がします。

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

深層学習 ニューラルネットワークで分類

手書き数字を分類するためにニューラルネットワークを作成し、実際の数字を推論するモデルを作ります。

まずは必要なパッケージをインポートします。

1
2
3
4
5
6
7
8
9
# パッケージのインポート
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Activation, Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

データセットの準備を行います。

各データの内容は次の通りです。

変数名 内容
train_images 訓練画像の配列
train_labels 訓練ラベルの配列
test_images テスト画像の配列
test_labels テストラベルの配列
1
2
# データセットの準備
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

実行結果1

データセットのシェイプを確認します。

1
2
3
4
5
# データセットのシェイプの確認
print(train_images.shape)
print(train_labels.shape)
print(test_images.shape)
print(test_labels.shape)

実行結果2

訓練画像データは60000×画像サイズ(28×28)です。
訓練ラベルデータは60000の1次元配列となります。

データセットの画像を確認するために先頭の10件を表示します。

1
2
3
4
5
# データセットの画像の確認
for i in range(10):
plt.subplot(1, 10, i+1)
plt.imshow(train_images[i], 'gray')
plt.show()

実行結果3

データセットのラベルを確認するために先頭の10件を表示します。

1
2
# データセットのラベルの確認
print(train_labels[0:10])

実行結果4

学習を開始する事前準備として、データセットをニューラルネットワークに適した形に変換します。
具体的には、画像データを28×28の2次元配列から1次元配列(786)に変換します。

1
2
3
4
5
6
7
# データセットの画像の前処理
train_images = train_images.reshape((train_images.shape[0], 784))
test_images = test_images.reshape((test_images.shape[0], 784))

# データセットの画像の前処理後のシェイプの確認
print(train_images.shape)
print(test_images.shape)

実行結果5

ラベルデータに関しても、ニューラルネットワークに適した形に変換します。
具体的にはone-hot表現に変えます。
one-hot表現とは、ある1要素が1でほかの要素が0である配列です。
ラベルが8の場合は[0, 0, 0, 0, 0, 0, 0, 0, 1, 0]という配列になります。

1
2
3
4
5
6
7
# データセットのラベルの前処理
train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)

# データセットのラベルの前処理後のシェイプの確認
print(train_labels.shape)
print(test_labels.shape)

実行結果6

ニューラルネットワークのモデルを作成します。

入力層のシェイプは画像データに合わせて786で、出力層はラベルデータに合わせて10とします。
ユニット数と隠れ層の数は自由に決められますが今回はユニット数256と隠れ層128としました。

層とユニット数を増やすと複雑な特徴をとらえることができるようになる半面、学習時間が多くかかるようになってしまいます。
またユニット数が多くなると重要性の低い特徴を抽出して過学習になってしまう可能性があります。

Dropoutは過学習を防いでモデルの精度をあげるための手法となります。
任意の層のユニットをランダムに無効にして特定ニューロンへの依存を防ぎ汎化性能を上げます。

活性化関数は結合層の後に適用する関数で層からの出力に対して特定の関数を経由し最終的な出力値を決めます。活性化関数を使用することで線形分離不可能なデータも分類することができるようになります。

1
2
3
4
5
6
# モデルの作成
model = Sequential()
model.add(Dense(256, activation='sigmoid', input_shape=(784,))) # 入力層
model.add(Dense(128, activation='sigmoid')) # 隠れ層
model.add(Dropout(rate=0.5)) # ドロップアウト
model.add(Dense(10, activation='softmax')) # 出力層

ニューラルネットワークのモデルをコンパイルします。

  • 損失関数 [loss]
    モデルの予測値と正解データの誤差を計算する関数です。
  • 最適化関数 [optimizer]
    損失関数の結果が0に近づくように重みパラメータとバイアスを最適化する関数です。
  • 評価指標 [metrics]
    モデル性能を測定するための指標です。測定結果は、学習を行うfit()の戻り値に格納されます。
1
2
# コンパイル
model.compile(loss='categorical_crossentropy', optimizer=SGD(lr=0.1), metrics=['acc'])

訓練画像と訓練モデルを使って学習を実行します。

1
2
# 学習
history = model.fit(train_images, train_labels, batch_size=500, epochs=5, validation_split=0.2)

実行結果7

学習中に出力される情報の意味は次の通りです。

情報 説明
loss 訓練データの誤差です。0に近いほどよい結果となります。
acc 訓練データの正解率です。1に近いほどよい結果となります。
val_loss 検証データの誤差です。0に近いほどよい結果となります。
val_acc 検証データの正解率です。1に近いほどよい結果となります。

上記のデータうち、訓練データの正解率(acc)と検証データの正解率(val_acc)をグラフ表示します。

1
2
3
4
5
6
7
# グラフの表示
plt.plot(history.history['acc'], label='acc')
plt.plot(history.history['val_acc'], label='val_acc')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(loc='best')
plt.show()

実行結果8

テスト画像とテストラベルをモデルに渡して評価を行います。

1
2
3
# 評価
test_loss, test_acc = model.evaluate(test_images, test_labels)
print('loss: {:.3f}\nacc: {:.3f}'.format(test_loss, test_acc ))

実行結果9

正解率は91.0%となりました。

先頭10件のテスト画像の推論を行い、画像データと予測結果を合わせて表示します。

1
2
3
4
5
6
7
8
9
10
# 推論する画像の表示
for i in range(10):
plt.subplot(1, 10, i+1)
plt.imshow(test_images[i].reshape((28, 28)), 'gray')
plt.show()

# 推論したラベルの表示
test_predictions = model.predict(test_images[0:10])
test_predictions = np.argmax(test_predictions, axis=1)
print(test_predictions)

実行結果10

90%の正解率であることが確認できます。

(Google Colaboratoryで動作確認しています。)

参考

AlphaZero 深層学習・強化学習・探索 人工知能プログラミング実践入門 サポートページ

逆強化学習

逆強化学習では報酬関数を推定します。そのメリットは次の通りです。

  • 人が報酬を設定する必要がない。
  • 他タスクでの利用が可能になる。
  • 人間や動物の行動理解に利用できる。

逆強化学習の手順は以下のようになります。

  1. エキスパートの行動を評価する。(戦略、状態遷移等)
  2. 報酬関数の初期化を行う。
  3. 報酬関数を利用し戦略を学習する。
  4. 学習した戦略の評価が、エキスパートの評価結果(手順1)と近くなるよう報酬関数を更新する。
  5. 手順3に戻り処理を繰り返す。

逆強化学習のデメリットとしては学習に時間がかかるということです。
通常の強化学習手順である手順3だけでも時間がかかるところを、逆強化学習ではその手順3を繰り返し行う必要があるためです。