1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
| import cv2 cv2.ocl.setUseOpenCL(False)
class NoopResetEnv(gym.Wrapper): def __init__(self, env, noop_max=30): '''工夫1のNo-Operationです。リセット後適当なステップの間何もしないようにし、 ゲーム開始の初期状態を様々にすることfで、特定の開始状態のみで学習するのを防ぐ''' gym.Wrapper.__init__(self, env) self.noop_max = noop_max self.override_num_noops = None self.noop_action = 0 assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
def reset(self, **kwargs): """ Do no-op action for a number of steps in [1, noop_max].""" self.env.reset(**kwargs) if self.override_num_noops is not None: noops = self.override_num_noops else: noops = self.unwrapped.np_random.randint( 1, self.noop_max + 1) assert noops > 0 obs = None for _ in range(noops): obs, _, done, _ = self.env.step(self.noop_action) if done: obs = self.env.reset(**kwargs) return obs
def step(self, ac): return self.env.step(ac)
class EpisodicLifeEnv(gym.Wrapper): def __init__(self, env): '''工夫2のEpisodic Lifeです。1機失敗したときにリセットし、失敗時の状態から次を始める''' gym.Wrapper.__init__(self, env) self.lives = 0 self.was_real_done = True
def step(self, action): obs, reward, done, info = self.env.step(action) self.was_real_done = done lives = self.env.unwrapped.ale.lives() if lives < self.lives and lives > 0: done = True self.lives = lives return obs, reward, done, info
def reset(self, **kwargs): '''5機とも失敗したら、本当にリセット''' if self.was_real_done: obs = self.env.reset(**kwargs) else: obs, _, _, _ = self.env.step(0) self.lives = self.env.unwrapped.ale.lives() return obs
class MaxAndSkipEnv(gym.Wrapper): def __init__(self, env, skip=4): '''工夫3のMax and Skipです。4フレーム連続で同じ行動を実施し、最後の3、4フレームの最大値をとった画像をobsにする''' gym.Wrapper.__init__(self, env) self._obs_buffer = np.zeros( (2,)+env.observation_space.shape, dtype=np.uint8) self._skip = skip
def step(self, action): """Repeat action, sum reward, and max over last observations.""" total_reward = 0.0 done = None for i in range(self._skip): obs, reward, done, info = self.env.step(action) if i == self._skip - 2: self._obs_buffer[0] = obs if i == self._skip - 1: self._obs_buffer[1] = obs total_reward += reward if done: break max_frame = self._obs_buffer.max(axis=0)
return max_frame, total_reward, done, info
def reset(self, **kwargs): return self.env.reset(**kwargs)
class WarpFrame(gym.ObservationWrapper): def __init__(self, env): '''工夫4のWarp frameです。画像サイズをNatureのDQN論文と同じ84x84の白黒にします''' gym.ObservationWrapper.__init__(self, env) self.width = 84 self.height = 84 self.observation_space = spaces.Box(low=0, high=255, shape=(self.height, self.width, 1), dtype=np.uint8)
def observation(self, frame): frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA) return frame[:, :, None]
class WrapPyTorch(gym.ObservationWrapper): def __init__(self, env=None): '''PyTorchのミニバッチのインデックス順に変更するラッパー''' super(WrapPyTorch, self).__init__(env) obs_shape = self.observation_space.shape self.observation_space = Box( self.observation_space.low[0, 0, 0], self.observation_space.high[0, 0, 0], [obs_shape[2], obs_shape[1], obs_shape[0]], dtype=self.observation_space.dtype)
def observation(self, observation): return observation.transpose(2, 0, 1)
|