时序差分与蒙特卡洛方法, Q学习

  |  

摘要: 蒙特卡洛方法与时序差分

【对算法,数学,计算机感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:算法题刷刷
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


在文章 强化学习环境框架-从一个迷宫环境看MDP的要点 中,我们总结了强化学习的基本概念,以及作为强化学习机制的 MDP,并且以 grid 为例实现了 MDP 环境的代码框架。

有了 MDP 环境之后,我们就要解决如何在其中选择最优行动的问题,也就是 MDP 环境的代码框架中的 agent.policy 方法的实现,具体来说就是价值的计算方法(价值近似)基于价值近似的行动选择方法(策略)

当 MDP 环境的迁移函数 transit_func 和奖励函数 reward_func 已知的时候,就可以使用动态规划法来求解。这种基于迁移函数和奖励函数来学习行动的方法称为基于模型的学习方法。在文章 MDP的动态规划解法-策略迭代和价值迭代 中,我们就是从策略迭代和价值迭代两种思路实践的这种动态规划方法。

当不知道环境信息(迁移函数和奖励函数)的情况下,也可以使用这种基于模型的学习方法对这两种函数进行推算,后面我们有机会再探究这种用法。

此外对于环境信息(迁移函数和奖励函数)未知的情况,还有另一种学习方法:无模型的方法。无模型方法通过自身采取行动来积累经验,并根据经验进行学习的方法。

无模型的学习方法中,通过行动积累经验的过程中有以下 3 个要点

  1. 平衡经验的积累与应用
  2. 根据实际奖励来修正计划,还是根据预测来修正计划
  3. 用经验来更新价值近似还是更新策略

在文章 经验的积累与利用-Epsilon贪心 中,我们研究过第 1 个问题,主要内容是 Epsilon 贪心算法,并以多臂老虎机为例进行实践。

本文我们研究第 2 点,主要涉及到蒙特卡洛方法(根据实际奖励修正)和时序差分学习(根据预测来修正)。并通过 Multi-step learning 方法将蒙特卡洛方法和时序差分统一到一起。最后我们分别实现以下算法,作为代码模板。

  1. 蒙特卡洛方法
  2. 用了 Q 学习实现的 TD 方法

基于经验的修正过程

修正在什么时候进行

根据实际奖励还是根据预测来修正,是对修正的准确性和及时性的折中。

  • 修正在一个回合结束之后进行,则为蒙特卡洛方法
  • 修正在每次行动之后进行,则为时序差分方法,记为 TD(0)

如何进行修正 — TD 误差

t 时刻的状态为 s,t + 1 时刻状态迁移为 s’,获得立即奖励 r。过程如下图

在状态 s 时,价值的估计值为 V(s)。

实际行动后,得到立即奖励 r,并迁移到状态 s’,也就是说实际得到价值 $r + \gamma V(s)$

估计值与实际值之间的差值(t 时刻与 t+1 时刻之间的差异)称为 TD 误差

我们所说的经验,本质上就是这个 TD 误差。

(1) TD 方法

根据经验进行修正的过程,就是缩小 TD 误差的过程,更新方式如下

其中 alpha 为学习率。

(2) 蒙特卡洛方法

前面的 V(s) 的更新方式是用的 t 时刻与 t+1 时刻的差异。随着时刻的增加,我们会不断得到实际奖励 r。到回合结束后,我们就不再需要 V(s’) 了。这就是蒙特卡洛方法。

TD 方法与蒙特卡洛方法更新 V(s) 方式的对比

  • TD 方法
  • 蒙特卡洛方法

TD($\lambda$)方法

将用于修正的时间段定位大于 1 小于 T 的值,则成为 Multi-step Learning 方法。一般用 2 step 或 3 step。

也可以组合多个 step,称为 $TD(\lambda)$ 方法,如下图

对于各个 step 的实际价值,通过乘以系数 $\lambda$ 计算合计值作为总价值,如下:

其中各个 step 对应的价值如下

lambda 可以取 0 ~ 1 之间的值。

lambda 为 0 时,也就是 TD(0),只剩下 $G_{t}^{1}$,与 TD 方法等价。

lambda 为 1 时,也就是 TD(1),是只考虑回合结束的价值 $G_{t}^{T-t}$,与蒙特卡洛方法等价。


代码模板:

(1) 智能体

智能体部分 ELAgent 是基于 Epsilon-Greedy 算法实现的。

文章 强化学习环境框架-从一个迷宫环境看MDP的要点 中有 Agent 代码,但是那篇文章主要是针对 MDP 环境的,Agent 部分很简陋,所以后面针对自己的业务进行开发的时候,环境部分参考那篇文章的代码模板,而智能体部分参考本文以及后续的文章中的代码模板。

ELAgent 使用 Epsilon 的概率 self.epsilon 进行随机行动(探索),除此之外的情况基于价值近似 self.Q 进行行动(利用)。

self.Q[s][a] 是状态 s 下的行动 a 对应的价值。

self.reward_log 是智能体所获得的奖励的记录。

log 方法记录奖励

show_reward_log 方法在指定 episode 时显示 episode 对应的奖励,未指定的时候显示到目前为止的奖励。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import numpy as np
import matplotlib.pyplot as plt


class ELAgent():

def __init__(self, epsilon):
self.Q = {}
self.epsilon = epsilon
self.reward_log = []

def policy(self, s, actions):
if np.random.random() < self.epsilon:
return np.random.randint(len(actions))
else:
if s in self.Q and sum(self.Q[s]) != 0:
return np.argmax(self.Q[s])
else:
return np.random.randint(len(actions))

def init_log(self):
self.reward_log = []

def log(self, reward):
self.reward_log.append(reward)

def show_reward_log(self, interval=50, episode=-1):
if episode > 0:
rewards = self.reward_log[-interval:]
mean = np.round(np.mean(rewards), 3)
std = np.round(np.std(rewards), 3)
print("At Episode {} average reward is {} (+/-{}).".format(
episode, mean, std))
else:
indices = list(range(0, len(self.reward_log), interval))
means = []
stds = []
for i in indices:
rewards = self.reward_log[i:(i + interval)]
means.append(np.mean(rewards))
stds.append(np.std(rewards))
means = np.array(means)
stds = np.array(stds)
plt.figure()
plt.title("Reward History")
plt.grid()
plt.fill_between(indices, means - stds, means + stds,
alpha=0.1, color="g")
plt.plot(indices, means, "o-", color="g",
label="Rewards for each {} episode".format(interval))
plt.legend(loc="best")
plt.show()

(2) 环境

环境我们使用 gym 的 FrozenLake-v0 环境类。它是一个 4 乘 4 的迷宫,其中有起点、终点、陷阱。

掉入陷阱则游戏结束,如果到达终点则获得奖励。

默认设置下行动存在打滑的可能。也就是朝着希望的方向前进的概率为 1/3,往希望前进方向的两个垂直方向也各占 1/3。

我们可以通过 gym.envs.registration 中的 register 设定不会打滑。

1
2
3
from gym.envs.registration import register
register(id="FrozenLakeEasy-v0", entry_point="gym.envs.toy_text:FrozenLakeEnv",
kwargs={"is_slippery": False})

行动价值可视化

有了环境实例 env 后,我们就可以定义将行动价值可视化的函数 show_q_value

参数 Q 记录了各种状态(迷宫方阵)下的各种行动(上下左右移动)的价值。为了方便可视化,为每种状态构造 3 乘 3 方阵(详见下面的函数说明)。

show_q_value 函数相当于将 4 乘 4 的迷宫通过 3 乘 3 方阵进行分区并可视化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import gym


def show_q_value(Q):
"""
Show Q-values for FrozenLake-v0.
To show each action's evaluation,
a state is shown as 3 x 3 matrix like following.

+---+---+---+
| | u | | u: up value
| l | m | r | l: left value, r: right value, m: mean value
| | d | | d: down value
+---+---+---+
"""
env = gym.make("FrozenLake-v0")
nrow = env.unwrapped.nrow
ncol = env.unwrapped.ncol
state_size = 3
q_nrow = nrow * state_size
q_ncol = ncol * state_size
reward_map = np.zeros((q_nrow, q_ncol))

for r in range(nrow):
for c in range(ncol):
s = r * nrow + c
state_exist = False
if isinstance(Q, dict) and s in Q:
state_exist = True
elif isinstance(Q, (np.ndarray, np.generic)) and s < Q.shape[0]:
state_exist = True

if state_exist:
# 在展示图中,纵向的序号是反转的
_r = 1 + (nrow - 1 - r) * state_size
_c = 1 + c * state_size
reward_map[_r][_c - 1] = Q[s][0] # LEFT = 0
reward_map[_r - 1][_c] = Q[s][1] # DOWN = 1
reward_map[_r][_c + 1] = Q[s][2] # RIGHT = 2
reward_map[_r + 1][_c] = Q[s][3] # UP = 3
reward_map[_r][_c] = np.mean(Q[s]) # Center

fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
plt.imshow(reward_map, cmap=cm.RdYlGn, interpolation="bilinear",
vmax=abs(reward_map).max(), vmin=-abs(reward_map).max())
ax.set_xlim(-0.5, q_ncol - 0.5)
ax.set_ylim(-0.5, q_nrow - 0.5)
ax.set_xticks(np.arange(-0.5, q_ncol, state_size))
ax.set_yticks(np.arange(-0.5, q_nrow, state_size))
ax.set_xticklabels(range(ncol + 1))
ax.set_yticklabels(range(nrow + 1))
ax.grid(which="both")
plt.show()

有了 ELAgent, Env, show_q_value,我们可以实现蒙特卡洛方法和 TD 方法了。

代码模板: 蒙特卡洛方法/TD方法

(1) 蒙特卡洛方法

MonteCarloAgent

MonteCarloAgent 由 ELAgent 继承而来,增加 learn 方法。

learn 方法就是用蒙特卡洛方法学习的函数。

N[s][a] 是在某种状态 s 下采取某种行动 a 的次数,计算平均值时需要用。

G 是由以下公式计算得来的折现值

用于对 self.Q[s][a] 进行更新,具体如下,新的 self.Q[s][a] 中有 (1 - alpha) 的比例为现有的 self.Q[s][a],alpha 的比例为 G。

1
self.Q[s][a] += alpha * (G - self.Q[s][a])

episode_count 为回合次数。

计算 G 时,有两种口径:

  • Every-Visit: 各个时刻 range(i, len(experience)) 都考虑。
  • First-Visit: 以该状态和行动首次出现的时刻作为起点的方法 range(first(s, a), len(experience))

下面的代码模板中是 Every-Visit。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import math
from collections import defaultdict
import gym


class MonteCarloAgent(ELAgent):

def __init__(self, epsilon=0.1):
super().__init__(epsilon)

def learn(self, env, episode_count=1000, gamma=0.9,
render=False, report_interval=50):
self.init_log()
actions = list(range(env.action_space.n))
self.Q = defaultdict(lambda: [0] * len(actions))
N = defaultdict(lambda: [0] * len(actions))

for e in range(episode_count):
s = env.reset()
done = False
# 进行游戏,直到回合结束
experience = []
while not done:
if render:
env.render()
a = self.policy(s, actions)
n_state, reward, done, info = env.step(a)
experience.append({"state": s, "action": a, "reward": reward})
s = n_state
else:
self.log(reward)

# 估计各种状态、行动
for i, x in enumerate(experience):
s, a = x["state"], x["action"]

# 计算状态s对应的折现值
G, t = 0, 0
for j in range(i, len(experience)):
G += math.pow(gamma, t) * experience[j]["reward"]
t += 1

N[s][a] += 1 # s, a 对的数量
alpha = 1 / N[s][a]
self.Q[s][a] += alpha * (G - self.Q[s][a])

if e != 0 and e % report_interval == 0:
self.show_reward_log(episode=e)

MonteCarloAgent 的训练和可视化

1
2
3
4
5
6
7
8
9
10
11
def train():
agent = MonteCarloAgent(epsilon=0.1)
env = gym.make("FrozenLakeEasy-v0")
env.render()
agent.learn(env, episode_count=500)
show_q_value(agent.Q)
agent.show_reward_log()


if __name__ == "__main__":
train()

初始化的迷宫如下,由 env.render() 显示:

1
2
3
4
SFFF
FHFH
FFFH
HFFG

S 是起点、G 是终点、H 是陷阱。

  • 训练过程打印的日志如下:

  • 蒙特卡洛法对各状态和行动的评价如下:

各状态和行动在绿色越深的地方评价越高,由此可见整体上朝着终点方向的评价较高,朝着陷阱的方向评价较低。

  • reward 曲线如下:

其中浅绿色区域表示方差。

(2) TD 方法(基于Q学习)

TD 方法有好几种,这里以 Q 学习为基础进行实现。

QLearningAgent

将某种状态下的行动的价值 $Q(s, a)$ 称为 Q 值。对 Q 值进行学习的方法称为 Q 学习

提出 Q 学习的文章为 《Learning from Delayed Rewards》。

Q 学习的学习过程一般使用 TD 方法,除了 TD 方法,还有别的方法可以实现 Q 学习。

下面给出 Q 学习的 TD 方法的代码模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from collections import defaultdict
import gym


class QLearningAgent(ELAgent):

def __init__(self, epsilon=0.1):
super().__init__(epsilon)

def learn(self, env, episode_count=1000, gamma=0.9,
learning_rate=0.1, render=False, report_interval=50):
self.init_log()
actions = list(range(env.action_space.n))
self.Q = defaultdict(lambda: [0] * len(actions))
for e in range(episode_count):
s = env.reset()
done = False
while not done:
if render:
env.render()
a = self.policy(s, actions)
n_state, reward, done, info = env.step(a)

gain = reward + gamma * max(self.Q[n_state])
estimated = self.Q[s][a]
self.Q[s][a] += learning_rate * (gain - estimated)
s = n_state

else:
self.log(reward)

if e != 0 and e % report_interval == 0:
self.show_reward_log(episode=e)

self.Q[s][a] 的更新过程就是 TD 方法的公式

gain 是得到的奖励加上折扣率乘以迁移后的价值。gain - estimated 为 TD 误差,即 $r_{t+1} + \gamma V(s_{t+1}) - V(s_{t})$。

在计算迁移后的价值时,采用基于价值的观点。也就是采取使得价值最大的行动 a(max(self.Q[s][a])) 来进行 TD 方法。

estimated 对应 self.Q[s][a]

QLearningAgent 的训练和可视化

1
2
3
4
5
6
7
8
9
10
11
def train():
agent = QLearningAgent()
env = gym.make("FrozenLakeEasy-v0")
env.render()
agent.learn(env, episode_count=500)
show_q_value(agent.Q)
agent.show_reward_log()


if __name__ == "__main__":
train()

初始化的迷宫如下,由 env.render() 显示:

1
2
3
4
SFFF
FHFH
FFFH
HFFG

S 是起点、G 是终点、H 是陷阱。

  • 训练过程打印的日志如下:

  • TD 方法(Q 学习)对各状态和行动的评价如下:

  • reward 曲线如下:


总结

蒙特卡洛方法与 TD 方法各自的优缺点与其经验的反映方式密切相关。

蒙特卡洛方法进行到结束的结果可能是偶然的,严重依赖于一个回合的结果,想要降低这种依赖,需要增加学习的回合数量。

Q 学习对回合结果的依赖有所降低,但是因为使用估计值更新的 Q 学习没有蒙特卡洛方法那么可靠。在使用含参数的函数(例如神经网络)来计算估计值的情况下,对参数初始值的依赖程度会变高。

最近的研究中,介于蒙特卡洛方法和 TD 方法之间的 Multi-step Learning 方法比较多。例如下面这些先进方法

  • Rainbow
  • A3C/A2C
  • DDPG
  • APE-X DQN

Share