基于DQN强化学习训练一个超级玛丽

机器学习实验室

共 6281字,需浏览 13分钟

 · 2020-12-11

Author:MyEncyclopedia

From:MyEncyclopedia

上一期 MyEncyclopedia文章 通过代码学Sutton强化学习:从Q-Learning 演化到 DQN,我们从原理上讲解了DQN算法,这一期,让我们通过代码来实现DQN 在任天堂经典的超级玛丽游戏中的自动通关吧。本系列将延续通过代码学Sutton 强化学习系列,逐步通过代码实现经典深度强化学习应用在各种游戏环境中。本文所有代码在 

https://github.com/MyEncyclopedia/reinforcement-learning-2nd/tree/master/super_mario

最终训练第一关结果动画

DQN 算法回顾

上期详细讲解了DQN中的两个重要的技术:Target Network 和 Experience Replay,正是有了它们才使得 Deep Q Network在实战中容易收敛,以下是Deepmind 发表在Nature 的 Human-level control through deep reinforcement learning 的完整算法流程。

 

超级玛丽 NES OpenAI 环境

安装基于OpenAI gym的超级玛丽环境执行下面的 pip 命令即可。

pip install gym-super-mario-bros

我们先来看一下游戏环境的输入和输出。下面代码采用随机的action来和游戏交互。有了 组合游戏系列3: 井字棋、五子棋的OpenAI Gym GUI环境 关于OpenAI Gym 的介绍,现在对于其基本的交互步骤已经不陌生了。

import gym_super_mario_bros
from random import random, randrange
from gym_super_mario_bros.actions import RIGHT_ONLY
from nes_py.wrappers import JoypadSpace
from gym import wrappers

env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = JoypadSpace(env, RIGHT_ONLY)

# Play randomly
done = False
env.reset()

step = 0
while not done:
    action = randrange(len(RIGHT_ONLY))
    state, reward, done, info = env.step(action)
    print(done, step, info)
    env.render()
    step += 1

env.close()

随机策略的效果如下

注意我们在游戏环境初始化的时候用了参数 RIGHT_ONLY,它定义成五种动作的list,表示仅使用右键的一些组合,适用于快速训练来完成Mario第一关。

RIGHT_ONLY = [
    ['NOOP'],
    ['right'],
    ['right''A'],
    ['right''B'],
    ['right''A''B'],
]

观察一些 info 输出内容,coins表示金币获得数量,flag_get 表示是否取得最后的旗子,time 剩余时间,以及 Mario 大小状态和所在的 x,y位置。

{
   "coins":0,
   "flag_get":False,
   "life":2,
   "score":0,
   "stage":1,
   "status":"small",
   "time":381,
   "world":1,
   "x_pos":594,
   "y_pos":89
}


游戏图像处理

Deep Reinforcement Learning 一般是 end-to-end learning,意味着将游戏的 screen image,即 observed state 直接视为真实状态 state,喂给神经网络去训练。于此相反的另一种做法是,通过游戏环境拿到内部状态,例如所有相关物品的位置和属性作为模型输入。这两种方式的区别在我看来有两点。第一点,用观察到的屏幕像素代替真正的状态 state,在partially observable 的环境时可能因为 non-stationarity 导致无法很好的工作,而拿内部状态利用了额外的作弊信息,在partially observable环境中也可以工作。第二点,第一种方式屏幕像素维度比较高,输入数据量大,需要神经网络的大量训练拟合,第二种方式,内部真实状态往往维度低得多,训练起来很快,但缺点是因为除了内部状态往往还需要游戏相关规则作为输入,因此generalization能力不如前者强。

 

这里,我们当然采样屏幕像素的 end-to-end 方式了,自然首要任务是将游戏帧图像有效处理。超级玛丽游戏环境的屏幕输出是 (240, 256, 3) shape的 numpy array,通过下面一系列的转换,尽可能的在不影响训练效果的情况下减小采样到的数据量。

  1. MaxAndSkipFrameWrapper:每4个frame连在一起,采取同样的动作,降低frame数量

  2. FrameDownsampleWrapper:将原始的 (240, 256, 3) down sample 到 (84, 84, 1)

  3. ImageToPyTorchWrapper:转换成适合 pytorch 的 shape (1, 84, 84) 

  4. FrameBufferWrapper:保存最后4次屏幕采样

  5. NormalizeFloats:Normalize 成 [0., 1.0] 的浮点值

def wrap_environment(env_name: str, action_space: list) -> Wrapper:
    env = make(env_name)
    env = JoypadSpace(env, action_space)
    env = MaxAndSkipFrameWrapper(env)
    env = FrameDownsampleWrapper(env)
    env = ImageToPyTorchWrapper(env)
    env = FrameBufferWrapper(env, 4)
    env = NormalizeFloats(env)
    return env


CNN 模型

模型比较简单,三个卷积层后做 softmax输出,输出维度数为离散动作数。act() 采用了epsilon-greedy 模式,即在epsilon小概率时采取随机动作来 explore,大于epsilon时采取估计的最可能动作来 exploit。

class DQNModel(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(DQNModel, self).__init__()
        self._input_shape = input_shape
        self._num_actions = num_actions

        self.features = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(3264, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(6464, kernel_size=3, stride=1),
            nn.ReLU()
        )

        self.fc = nn.Sequential(
            nn.Linear(self.feature_size, 512),
            nn.ReLU(),
            nn.Linear(512, num_actions)
        )

    def forward(self, x):
        x = self.features(x).view(x.size()[0], -1)
        return self.fc(x)

    def act(self, state, epsilon, device):
        if random() > epsilon:
            state = torch.FloatTensor(np.float32(state)).unsqueeze(0).to(device)
            q_value = self.forward(state)
            action = q_value.max(1)[1].item()
        else:
            action = randrange(self._num_actions)
        return action


Experience Replay 缓存

实现采用了 Pytorch CartPole DQN 的官方代码,本质是一个最大为 capacity 的 list 保存了采样到的 (s, a, r, s', is_done)  五元组。

Transition = namedtuple('Transition', ('state''action''reward''next_state''done'))

class ReplayMemory:

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


DQNAgent

我们将 DQN 的逻辑封装在 DQNAgent 类中。DQNAgent 成员变量包括两个 DQNModel,一个ReplayMemory。

train() 方法中会每隔一定时间将 Target Network 的参数同步成现行Network的参数。在td_loss_backprop()方法中采样 ReplayMemory 中的五元组,通过minimize TD error方式来改进现行 Network 参数 。Loss函数为:

class DQNAgent():

    def act(self, state, episode_idx):
        self.update_epsilon(episode_idx)
        action = self.model.act(state, self.epsilon, self.device)
        return action

    def process(self, episode_idx, state, action, reward, next_state, done):
        self.replay_mem.push(state, action, reward, next_state, done)
        self.train(episode_idx)

    def train(self, episode_idx):
        if len(self.replay_mem) > self.initial_learning:
            if episode_idx % self.target_update_frequency == 0:
                self.target_model.load_state_dict(self.model.state_dict())
            self.optimizer.zero_grad()
            self.td_loss_backprop()
            self.optimizer.step()

    def td_loss_backprop(self):
        transitions = self.replay_mem.sample(self.batch_size)
        batch = Transition(*zip(*transitions))

        state = Variable(FloatTensor(np.float32(batch.state))).to(self.device)
        action = Variable(LongTensor(batch.action)).to(self.device)
        reward = Variable(FloatTensor(batch.reward)).to(self.device)
        next_state = Variable(FloatTensor(np.float32(batch.next_state))).to(self.device)
        done = Variable(FloatTensor(batch.done)).to(self.device)

        q_values = self.model(state)
        next_q_values = self.target_net(next_state)

        q_value = q_values.gather(1, action.unsqueeze(-1)).squeeze(-1)
        next_q_value = next_q_values.max(1)[0]
        expected_q_value = reward + self.gamma * next_q_value * (1 - done)

        loss = (q_value - expected_q_value.detach()).pow(2)
        loss = loss.mean()
        loss.backward()


外层控制代码

最后是外层调用代码,基本和以前文章一样。

def train(env, args, agent):
    for episode_idx in range(args.num_episodes):
        episode_reward = 0.0
        state = env.reset()

        while True:
            action = agent.act(state, episode_idx)
            if args.render:
                env.render()
            next_state, reward, done, stats = env.step(action)
            agent.process(episode_idx, state, action, reward, next_state, done)
            state = next_state
            episode_reward += reward
            if done:
                print(f'{episode_idx}{episode_reward}')
                break



往期精彩:

【原创首发】机器学习公式推导与代码实现30讲.pdf

【原创首发】深度学习语义分割理论与实战指南.pdf

 算法工程师研发技能表

 真正想做算法的,不要害怕内卷

 技术学习不能眼高手低

 技术人要学会自我营销

 做人不能过拟合

求个在看

浏览 29
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报