将深度学习用于价值近似-DQN

  |  

摘要: DQN 初探

【对算法,数学,计算机感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:算法题刷刷
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


在文章 在Q学习中应用含参数函数实现价值近似 中,我们学习了如何在 Q 学习中应用含参数函数实现价值近似。具体地,我们根据 使用神经网络实现强化学习的框架 首先创建一个根据价值函数选择行动的智能体,ValueFunctionAgent,然后将其用在了 gym 的 CartPole 中。

本文我们进一步将 ValueFunctionAgent 改进为 DeepQNetworkAgent,增加网络的复杂性,Agent 价值函数的部分使用 CNN,直接输入画面进行学习,并在 gym 的 Catcher 中应用。

注意:我们在 使用神经网络实现强化学习的框架 中总结了使用神经网络实现强化学习的代码套路,其中包括 Agent, Observer, Trainer, Logger。后面对 DQN 的实现流程完全是按照这个套路来的,这种流程在以后的其它业务中可以迁移使用。

最后我们总结一下提出 DQN 的 DeepMind 发布的 Rainbow 模型,其中有 6 种 DQN 的改进版本及其主要技术。


1. 关于 Catcher 游戏

Catcher 这个游戏可以体现直接输入画面的优点。Catcher 是一个用挡板来接从上面掉下来的球的游戏,接住则奖励 +1,没接住则奖励 -1。

这个游戏通过 pygame 创建,pygame 是一个用 Python 来创建游戏的框架。 PyGame-Learning-Environment(PLE) 是集中了用 pygame 创建的游戏的环境,可以用于强化学习。如果要在 gym 中操作 PLE 则需要插件 gym-ple,安装方法如下

1
2
3
git clone git@github.com:ntasfi/PyGame-Learning-Environment.git
cd PyGame-Learning-Environment
pip install -e .
1
2
3
git clone git@github.com:lusob/gym-ple.git
cd gym-ple
pip install -e .

Catcher 有三种行动: 向左,向右,停止。用 CNN 构建网络学习后,输入画面,就可以输出各种行动的价值。

2. DeepQNetworkAgent 与 DQN

ValueFunctionAgent 中实现的是通过简单的神经网络实现价值函数。这里我们将简单神经网络改为深度神经网络。

首先给出 DeepQNetworkAgent 的代码,可以与 ValueFunctionAgent 进行对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import random
import argparse
from collections import deque
import numpy as np
from tensorflow import keras as K
from PIL import Image
import gym
import gym_ple
from fn_framework import FNAgent, Trainer, Observer


class DeepQNetworkAgent(FNAgent):

def __init__(self, epsilon, actions):
super().__init__(epsilon, actions)
self._scaler = None
self._teacher_model = None

def initialize(self, experiences, optimizer):
feature_shape = experiences[0].s.shape
self.make_model(feature_shape)
self.model.compile(optimizer, loss="mse")
self.initialized = True
print("Done initialization. From now, begin training!")

def make_model(self, feature_shape):
normal = K.initializers.glorot_normal()
model = K.Sequential()
model.add(K.layers.Conv2D(
32, kernel_size=8, strides=4, padding="same",
input_shape=feature_shape, kernel_initializer=normal,
activation="relu"))
model.add(K.layers.Conv2D(
64, kernel_size=4, strides=2, padding="same",
kernel_initializer=normal,
activation="relu"))
model.add(K.layers.Conv2D(
64, kernel_size=3, strides=1, padding="same",
kernel_initializer=normal,
activation="relu"))
model.add(K.layers.Flatten())
model.add(K.layers.Dense(256, kernel_initializer=normal,
activation="relu"))
model.add(K.layers.Dense(len(self.actions),
kernel_initializer=normal))
self.model = model
self._teacher_model = K.models.clone_model(self.model)

def estimate(self, state):
return self.model.predict(np.array([state]))[0]

def update(self, experiences, gamma):
states = np.array([e.s for e in experiences])
n_states = np.array([e.n_s for e in experiences])

estimateds = self.model.predict(states)
future = self._teacher_model.predict(n_states)

for i, e in enumerate(experiences):
reward = e.r
if not e.d:
reward += gamma * np.max(future[i])
estimateds[i][e.a] = reward

loss = self.model.train_on_batch(states, estimateds)
return loss

def update_teacher(self):
self._teacher_model.set_weights(self.model.get_weights())
  • initialize 负责构建模型,其中 optimizer 由负责学习的 Trainer 传递,loss="lse" 的意思是最小化 TD 误差。
  • make_model 负责创建模型,代码与下图是对应的。

  • updateValueFunctionAgent 中的单层神经网络的处理几乎相同,但是迁移后的价值是根据 self._teacher_model 计算的。这是模型的副本,通过 update_teacher 从主模型复制参数,主模型在每次进行 update 时都会更新参数,而 self._teacher_model 的参数则是固定的,直到运行 update_teacher 为止。
  • 根据 self._teacher_model (在一定时期固定的参数) 来计算迁移后的价值的方法称为 Fixed Target Q-Network

再利用主模型计算迁移后的价值的情况下,由于每次进行学习(train_on_batch)时,参数都会变化,所以值每次都会变化,那么用于学习的 TD 误差将不稳定,最终导致学习也不稳定。因此需要使用参数在一定期间固定的网络来计算迁移后的价值。

DQN 主要在以下三个方面做了改善

  1. 经验回放
  2. Fixed Target Q-Network
  3. 奖励剪裁(Reward clipping)

其中奖励剪裁是指在整个游戏中统一奖励:成功奖励 1,不成功奖励 -1。但是使用这种奖励剪裁有个缺点就是无法对奖励加权,这方面的讨论可以参考以下两篇文章

1
2
《Learning Values across many orders of magnitude》
《Multi-task Deep Reinforcement Learning with PopArt》

3. 测试用的 Agent

在测试用的 Agent 和测试环境 Cartpole 中,对学习过程之外的部分进行测试。因为使用 CNN 学习非常耗时,因此在学习前先把能测试的先测试了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class DeepQNetworkAgentTest(DeepQNetworkAgent):

def __init__(self, epsilon, actions):
super().__init__(epsilon, actions)

def make_model(self, feature_shape):
normal = K.initializers.glorot_normal()
model = K.Sequential()
model.add(K.layers.Dense(64, input_shape=feature_shape,
kernel_initializer=normal, activation="relu"))
model.add(K.layers.Dense(len(self.actions), kernel_initializer=normal,
activation="relu"))
self.model = model
self._teacher_model = K.models.clone_model(self.model)

在对 Agent 进行实例化时,先进行一个 test_mode 的判定,再决定初始化测试的 DeepQNetworkAgentTest 还是正常的 DeepQNetworkAgent

1
2
3
4
if not test_mode:
agent = DeepQNetworkAgent(1.0, actions)
else:
agent = DeepQNetworkAgentTest(1.0, actions)

4. Observer

有了测试用的 Agent 之后,接下来就是准备测试环境的 Observer。

输入的是按时间顺序排列的 4 个画面帧,因此我们需要进行合并这 4 个帧。对每个帧做灰度处理,归一化为 0 ~ 1 的值。由于开始时的 4 个帧没有对齐,要复制第一帧 4 次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class CatcherObserver(Observer):

def __init__(self, env, width, height, frame_count):
super().__init__(env)
self.width = width
self.height = height
self.frame_count = frame_count
self._frames = deque(maxlen=frame_count)

def transform(self, state):
grayed = Image.fromarray(state).convert("L")
resized = grayed.resize((self.width, self.height))
resized = np.array(resized).astype("float")
normalized = resized / 255.0 # 归一化为 0~1
if len(self._frames) == 0:
for i in range(self.frame_count):
self._frames.append(normalized)
else:
self._frames.append(normalized)
feature = np.array(self._frames)
# 将特征图的形状由(f, w, h)转换成(h, w, f).
feature = np.transpose(feature, (1, 2, 0))

return feature

5. Trainer

Agent 和 Observer 都有了之后,我们就可以定义学习的 Trainer 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class DeepQNetworkTrainer(Trainer):

def __init__(self, buffer_size=50000, batch_size=32,
gamma=0.99, initial_epsilon=0.5, final_epsilon=1e-3,
learning_rate=1e-3, teacher_update_freq=3, report_interval=10,
log_dir="", file_name=""):
super().__init__(buffer_size, batch_size, gamma,
report_interval, log_dir)
self.file_name = file_name if file_name else "dqn_agent.h5"
self.initial_epsilon = initial_epsilon
self.final_epsilon = final_epsilon
self.learning_rate = learning_rate
self.teacher_update_freq = teacher_update_freq
self.loss = 0
self.training_episode = 0
self._max_reward = -10

def train(self, env, episode_count=1200, initial_count=200,
test_mode=False, render=False, observe_interval=100):
actions = list(range(env.action_space.n))
if not test_mode:
agent = DeepQNetworkAgent(1.0, actions)
else:
agent = DeepQNetworkAgentTest(1.0, actions)
observe_interval = 0
self.training_episode = episode_count

self.train_loop(env, agent, episode_count, initial_count, render,
observe_interval)
return agent

def episode_begin(self, episode, agent):
self.loss = 0

def begin_train(self, episode, agent):
optimizer = K.optimizers.Adam(lr=self.learning_rate, clipvalue=1.0)
agent.initialize(self.experiences, optimizer)
self.logger.set_model(agent.model)
agent.epsilon = self.initial_epsilon
self.training_episode -= episode

def step(self, episode, step_count, agent, experience):
if self.training:
batch = random.sample(self.experiences, self.batch_size)
self.loss += agent.update(batch, self.gamma)

def episode_end(self, episode, step_count, agent):
reward = sum([e.r for e in self.get_recent(step_count)])
self.loss = self.loss / step_count
self.reward_log.append(reward)
if self.training:
self.logger.write(self.training_count, "loss", self.loss)
self.logger.write(self.training_count, "reward", reward)
self.logger.write(self.training_count, "epsilon", agent.epsilon)
if reward > self._max_reward:
agent.save(self.logger.path_of(self.file_name))
self._max_reward = reward
if self.is_event(self.training_count, self.teacher_update_freq):
agent.update_teacher()

diff = (self.initial_epsilon - self.final_epsilon)
decay = diff / self.training_episode
agent.epsilon = max(agent.epsilon - decay, self.final_epsilon)

if self.is_event(episode, self.report_interval):
recent_rewards = self.reward_log[-self.report_interval:]
self.logger.describe("reward", recent_rewards, episode=episode)

buffer_size 比较大,因为需要从画面上学习。
begin_train 初始化模型,并通过 step 进行学习,使用 K.optimizers.Adam 作为最优化方法。
episode_end 记录奖励和误差,用 agent.save 保存模型,如果学习已经开始,则以 self.teacher_update_freq 的频率更新用于计算钱以后的价值的模型
epsilon 有逐渐下降的机制。


6. 实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def main(play, is_test):
file_name = "dqn_agent.h5" if not is_test else "dqn_agent_test.h5"
trainer = DeepQNetworkTrainer(file_name=file_name)
path = trainer.logger.path_of(trainer.file_name)
agent_class = DeepQNetworkAgent

if is_test:
print("Train on test mode")
obs = gym.make("CartPole-v0")
agent_class = DeepQNetworkAgentTest
else:
env = gym.make("Catcher-v0")
obs = CatcherObserver(env, 80, 80, 4)
trainer.learning_rate = 1e-4

if play:
agent = agent_class.load(obs, path)
agent.play(obs, render=True)
else:
trainer.train(obs, test_mode=is_test)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DQN Agent")
parser.add_argument("--play", action="store_true",
help="play with trained model")
parser.add_argument("--test", action="store_true",
help="train by test mode")

args = parser.parse_args()
main(args.play, args.test)

有两组参数:

--test 表示用 Cartole-v0 测试智能体的学习,没有此参数则用 Catcher-v0 正式运行训练或推理
--play 表示用学习好的模型进行推理,没有此参数则为训练。

实验结果

  • test 模式训练:

逻辑正确性测试,资源需求较小。

1
python dqn_agent.py --test

  • test 模式测试
1
python dqn_agent.py --test --play

  • 正式模式训练与测试

注意资源问题。

1
2
python dqn_agent.py
python dqn_agent.py --play

7. DQN 的 6 种改进, Rainbow 模型

提出 DQN 的 DeepMind 也发布了一种 Rainbow 模型,除了 DQN 以外还内置了 6 种改进版本,共 7 种,参考论文:《Model-Based Reinforcement Learning via Meta-Policy Optimization》2018。

下面看一下 6 种改进版本。

(1) Double DQN

提高价值估计精度的方法。Q 值的计算中包含了正负噪声,但是计算最大值时,始终采用正噪声,因此总价值有所高估。(过估计)

Double-DQN 就是将两者分开。

(2) Prioritized Replay

提高学习效率的方法。不是简单地根据经验回放随机采样,而是优先考虑对学习效果较高,即 TD 误差较大的样本进行采样。但是为了使得学习不偏向 TD 误差较大的样本,还需要与随机采样一起进行,调整比例。

(3) Dueling Network

提高价值估计精度的方法,将状态本身的价值和各种状态下的行动的价值分开计算。由此能够分别掌握状态价值和行动价值

(4) Multi-step Learning

提高价值估计精度的方法。这是很早就有的方法,介于 Q 学习和蒙特卡洛方法之间,根据 n-step 奖励和 n-step 之后的状态价值进行校正

(5) Distributional RL

提高价值估计精度的方法。奖励通常用期望值表示,而 Distributional RL 将奖励视为分布,其期望和方差根据状态和行动的变化而变化。通过方差参数,可以表现期望值相同但奖励不同的情况。

(6) Noisy Nets

提高探索效率的方法。Noisy Nets 是一种让网络学习 epsilon 的设定的方法。


Share