通过含参数函数实现策略-策略梯度

  |  

摘要: 策略梯度:通过含参数函数实现策略

【对算法,数学,计算机感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:算法题刷刷
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


在文章 在Q学习中应用含参数函数实现价值近似 中,我们学习了如何在 Q 学习中应用含参数函数实现价值近似。具体地,我们根据 使用神经网络实现强化学习的框架 首先创建一个根据价值函数选择行动的智能体,ValueFunctionAgent,然后将其用在了 gym 的 CartPole 中。

策略也可以用含参数函数表示,这是一种以状态为自变量,输出行动或行动概率的函数。

本文我们先从理论上研究在用含参数函数表示策略时如何训练,并形成策略梯度算法;然后将策略梯度算法实现出来,形成代码模板,并将代码模板应用在 CartPole 环境中,可以将 PolicyGradientAgentValueFunctionAgent 进行对比。

最后我们针对策略梯度中使用的价值进行几个讨论,并为 A2C 留下引子。


策略梯度算法

在价值近似中,有一个明确的目标:估计值更接近于实际值,因此再用含参数函数实现价值近似的时候,如何学习是比较明确的。

但是用含参数函数实现策略的时候,参数更新不太容易。因为策略输出的行动和行动概率无法与可以计算的价值直接比较,此时如何学习就是关键问题了。

价值的期望值是一个线索,可以参考文章 MDP的动态规划解法-策略迭代和价值迭代 中策略迭代的内容。

在文章 价值的定义与计算-贝尔曼方程 中,我们引入贝尔曼方程的时候,通过将各种行动的概率和价值相乘计算得到了期望值。公式如下:

上式是状态的价值,这里我们要考虑的是策略的价值。计算的过程只是增加一个概率,由根据策略进行状态迁移的概率、行动概率、行动价值计算期望值 $J(\theta)$

$$ J(\theta) \propto \sum\limits_{s\in S}d^{\pi_{\theta}}(s)\sum\limits_{a\in A}\pi_{\theta}(a|s)Q^{\pi_{\theta}}(s, a) $$

其中:

  • $d^{\pi_{\theta}}(s)$ 是根据策略 $\pi_{\theta}$ (具有 $\theta$ 的参数) 迁移到状态 s 的概率。
  • $\pi_{\theta}(a|s)$ 是采取行动 a 的概率(行动概率)
  • $Q^{\pi_{\theta}(s, a)}$ 是行动价值

上面公式的含义是:将行动价值乘以行动概率,得到状态价值,然后将状态价值乘以迁移概率,得到期望值。

注意这个公式是一个比例公式而不是等式,因为 $J(\theta)$ 与平均会和长度成正比。详细推导参考 《强化学习(第2版)》

下面的问题就是如何使 $J(\theta)$ 最大化。直观上,要将较高的概率分配给预计能得到较高奖励的行动,将较低的概率分配给预计得到较低奖励的行动。

我们可以使用梯度法完成这个直观上的想法,也就是在最小化 $-J(\theta)$ (最大化 $J(\theta)$)时,用梯度对策略的参数进行最优化,这种方法称为策略梯度

$J(\theta)$ 的梯度可以通过策略梯度定理推导,最终结果如下,详细过程在书《强化学习(第2版)》 中有。

基于上式,我们可以将期望值的移动方向,也就是 $\nabla J(\theta)$ 转换为【概率乘以值】这样的期望值形式。

根据对数微分,将 $\nabla \pi_{\theta}(a|s)$ 做如下变形

将 $\nabla J(\theta)$ 代入上式

其中 $d^{\pi_{\theta}}(s)$, $\pi_{\theta}(a|s)$ 是概率,$\nabla ln\pi_{\theta}(a|s)Q^{\pi_{\theta}}(s, a)$ 是值。

上式可以写为

$$ \nabla J(\theta) \propto E_{\pi_{\theta}}[\nabla ln\pi_{\theta}(a|s)Q^{\pi_{\theta}}(s, a)] $$

其中 $\nabla ln\pi_{\theta}(a|s)Q^{\pi_{\theta}}(s, a)$ 可以这样理解

  • 梯度 $\nabla ln\pi_{\theta}(a|s)$ 是移动方向
  • 行动价值 $Q^{\pi_{\theta}}(s, a)$ 是移动的程度

下图可以辅助理解 $\theta$ 是如何学习的。

在图中,有两个参数 $\theta_{1}$ 和 $\theta_{2}$,形成一个坐标系,当前的 $\theta$ 由坐标系中的一个点表示。

对于每个 (s, a),计算梯度 $\nabla ln\pi_{\theta}(a|s)$ 即参数移动的方向。将其乘以 $Q^{\pi_{\theta}}(s, a)$ 也就是移动的程度,其平均值(期望值) $\nabla J(\theta)$ 就是整体前进的方向。


在策略梯度中使用经验回放

在策略梯度中,我们以提高期望值为目标盘进行学习。

在计算期望值时,用了状态迁移概率和行动概率,但此时的概率必须是当前策略下的概率。如果使用过去的行动记录,则在当前策略以外的策略下采取的行动也将影响概率的计算,从而无法进行准确的评价。在价值近似中,由于适中选择价值最大的行动,没有这个问题。

Off-Policy Actor Critic

有一种方法可以在策略梯度中使用经验回放,即 Off-Policy Actor-Critic,参考 2012 年的论文《Off-Policy Actor Critic》。这种方法通过 Off-Policy 策略进行经验的采样,由于使用已知的策略进行采样,因此称为 Behaviour policy($\beta$)

正常的 Actor Critic 可以参考这篇文章:组合使用基于价值和基于策略 — Actor Critic 学习模式,另外在文章 强化学习方法的分类总结 中,若干方法的对比表格中,有 Off-policy Actor Critic,可以参考。

Off-Policy Actor Critic 根据 Behaviour policy 的状态迁移来计算期望值。即使状态迁移不是实际策略,如果 Behaviour policy 迁移到各种状态,则在任何状态下均有效的策略,其期望值应高于仅在特定状态下才有效的策略。 然而 Behaviour policy ($\beta(a|s)$) 与实际的策略 ($\pi(a|s)$) 的行动概率不同,因此需要调整。为了进行这个调整,我们引入实际策略相对于 Behaviour policy 的权重 $\frac{\pi_{\theta}(a|s)}{\beta_{\theta}(a|s)}$,下面是 Off-Policy Actor Critic 的梯度:

如果策略 $\pi(a|s)$ 是确定性的,则在计算期望值时就不用考虑行动概率了,这就免除了通过权重来调整行动概率的必要。确定性策略梯度(DPG)就是这样更新的。DDPG 就是用了深度学习的 DPG

使用 Off-Policy Actor-Critic 更新策略,可以收集到比 Behaviour policy 更广泛的经验。Off-Policy Actor-Critic 算法已经发展成 Actor Critic with Experience Replay 算法。参考 2016 年的论文《Sample Efficient Actor-Critic with Experience Replay》。


策略梯度的实现(代码模板)

这里我们用的环境与 在Q学习中应用含参数函数实现价值近似 中一样,也是 CartPole。

(1) Agent (PolicyGradientAgent)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import random
import argparse
import joblib

import numpy as np
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
from tensorflow import keras as K
import gym


class PolicyGradientAgent(FNAgent):

def __init__(self, actions):
# PolicyGradientAgent使用自身的策略(而非epsilon)
super().__init__(epsilon=0.0, actions=actions)
self.estimate_probs = True
self.scaler = StandardScaler()
self._updater = None

def save(self, model_path):
super().save(model_path)
joblib.dump(self.scaler, self.scaler_path(model_path))

@classmethod
def load(cls, env, model_path):
actions = list(range(env.action_space.n))
agent = cls(actions)
agent.model = K.models.load_model(model_path)
agent.initialized = True
agent.scaler = joblib.load(agent.scaler_path(model_path))
return agent

def scaler_path(self, model_path):
fname, _ = os.path.splitext(model_path)
fname += "_scaler.pkl"
return fname

def initialize(self, experiences, optimizer):
states = np.vstack([e.s for e in experiences])
feature_size = states.shape[1]
self.model = K.models.Sequential([
K.layers.Dense(10, activation="relu", input_shape=(feature_size,)),
K.layers.Dense(10, activation="relu"),
K.layers.Dense(len(self.actions), activation="softmax")
])
self.set_updater(optimizer)
self.scaler.fit(states)
self.initialized = True
print("Done initialization. From now, begin training!")

def set_updater(self, optimizer):
actions = tf.compat.v1.placeholder(shape=(None), dtype="int32")
rewards = tf.compat.v1.placeholder(shape=(None), dtype="float32")
one_hot_actions = tf.one_hot(actions, len(self.actions), axis=1)
action_probs = self.model.output
selected_action_probs = tf.reduce_sum(one_hot_actions * action_probs,
axis=1)
clipped = tf.clip_by_value(selected_action_probs, 1e-10, 1.0)
loss = - tf.math.log(clipped) * rewards
loss = tf.reduce_mean(loss)

updates = optimizer.get_updates(loss=loss,
params=self.model.trainable_weights)
self._updater = K.backend.function(
inputs=[self.model.input,
actions, rewards],
outputs=[loss],
updates=updates)

def estimate(self, s):
normalized = self.scaler.transform(s)
action_probs = self.model.predict(normalized)[0]
return action_probs

def update(self, states, actions, rewards):
normalizeds = self.scaler.transform(states)
actions = np.array(actions)
rewards = np.array(rewards)
self._updater([normalizeds, actions, rewards])
  • __init__ 将 epsilon 设置为 0,因为如果有随机的行动,则无法计算基于当前策略的概率。
  • 使用神经网络实现强化学习的框架Agent 的派生类需要实现 initializeestimateupdate
  • initialize 中定义的模型与价值函数的实现相同。参考 在Q学习中应用含参数函数实现价值近似,其中网络是 sklearn.neural_network.MLPRegressor 构建的,本文用的是 Keras。输出的不是各种状态下各种行动的价值,而是采取各种行动的概率,最后的 activation 是计算概率的 softmax。
  • set_updater 用来更新参数,必须计算 $\ln\pi_{\theta}(a|s)Q^{\pi_{\theta}}(s,a)$ 的期望值。(这个函数是 PG 的核心)
    • 首先根据行动概率 $\pi_{\theta}(a|s)$ (代码中的 selected_action_probs)计算,过程如下图
    • 然后取对数,因为概率值过小会使得对数无穷大,需要对概率值的范围限制 (clipped)
    • 计算 $Q^{\pi_{\theta}}(s,a)$ 使用了奖励的折现值 (代码中的rewards),也就是通过蒙特卡洛方法计算价值。
    • tf.reduce_mean 计算期望值,为了用梯度法使其最大化,将该值取负。
    • 至此我们完成了 $\ln\pi_{\theta}(a|s)Q^{\pi_{\theta}}(s,a)$ 的期望值的计算,其梯度的计算由 optimizer.get_updates 完成。然后就得到了 $\nabla\ln\pi_{\theta}(a|s)Q^{\pi_{\theta}}(s,a)$ 的期望值 $\nabla J(\theta)$。
    • K.backend.function 生成进行参数更新的函数 (updates),用来将更新函数化。此函数接收状态 (self.model.input)、实际采取的行动 (actions),以及相应的奖励 (rewards) 作为参数。

(2) Observer

处理环境的 Observer 与 在Q学习中应用含参数函数实现价值近似 中相同。

1
2
3
4
class CartPoleObserver(Observer):

def transform(self, state):
return np.array(state).reshape((1, -1))

(3) Trainer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class PolicyGradientTrainer(Trainer):

def __init__(self, buffer_size=256, batch_size=32, gamma=0.9,
report_interval=10, log_dir=""):
super().__init__(buffer_size, batch_size, gamma,
report_interval, log_dir)

def train(self, env, episode_count=220, initial_count=-1, render=False):
actions = list(range(env.action_space.n))
agent = PolicyGradientAgent(actions)
self.train_loop(env, agent, episode_count, initial_count, render)
return agent

def episode_begin(self, episode, agent):
if agent.initialized:
self.experiences = []

def make_batch(self, policy_experiences):
length = min(self.batch_size, len(policy_experiences))
batch = random.sample(policy_experiences, length)
states = np.vstack([e.s for e in batch])
actions = [e.a for e in batch]
rewards = [e.r for e in batch]
scaler = StandardScaler()
rewards = np.array(rewards).reshape((-1, 1))
rewards = scaler.fit_transform(rewards).flatten()
return states, actions, rewards

def episode_end(self, episode, step_count, agent):
rewards = [e.r for e in self.get_recent(step_count)]
self.reward_log.append(sum(rewards))

if not agent.initialized:
if len(self.experiences) == self.buffer_size:
optimizer = K.optimizers.Adam(lr=0.01)
agent.initialize(self.experiences, optimizer)
self.training = True
else:
policy_experiences = []
for t, e in enumerate(self.experiences):
s, a, r, n_s, d = e
d_r = [_r * (self.gamma ** i) for i, _r in
enumerate(rewards[t:])]
d_r = sum(d_r)
d_e = Experience(s, a, d_r, n_s, d)
policy_experiences.append(d_e)

agent.update(*self.make_batch(policy_experiences))

if self.is_event(episode, self.report_interval):
recent_rewards = self.reward_log[-self.report_interval:]
self.logger.describe("reward", recent_rewards, episode=episode)
  • episode_end: 使用当前的策略运行一个回合后进行更新,更新使用的价值是折现值 (代码中的 rewards[t:])。折现值计算过程如下图

  • agent.update 更新策略。通过 scaler 对折现值进行归一化,然后传递值 (make_batch)。

(4) 实验结果与分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def main(play):
env = CartPoleObserver(gym.make("CartPole-v0"))
trainer = PolicyGradientTrainer()
path = trainer.logger.path_of("policy_gradient_agent.h5")

if play:
agent = PolicyGradientAgent.load(env, path)
agent.play(env)
else:
trained = trainer.train(env)
trainer.logger.plot("Rewards", trainer.reward_log,
trainer.report_interval)
trained.save(path)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="PG Agent")
parser.add_argument("--play", action="store_true",
help="play with trained model")

args = parser.parse_args()
main(args.play)

实验结果如下

策略梯度的运行结果

作为对比,我们将价值函数的学习结果贴在下面对比,具体参考文章 在Q学习中应用含参数函数实现价值近似

价值函数的运行结果

可以看到,策略梯度要花更长时间才能达到足够大的奖励。


关于策略梯度中使用的价值的几个讨论

(1) biased 价值与 unbiased 价值

前面的代码框架中使用的是折现值,通过蒙特卡洛方法计算,称为 biased 价值。

如果使用 $Q^{\pi_{\theta}}(s, a)$,也就是价值的估计值,称为 unbiased 价值,则无须等待回合结束,可以减少对特定回合经验的依赖。

(2) Actor Critic 框架与 Compatible Function Approximation Theorem

如果使用 Actor Critic 框架(参考文章 组合使用基于价值和基于策略 — Actor Critic 学习模式),则包含一个策略函数行为价值函数,其中策略函数充当演员(Actor),生成行为与环境交互;行为价值函数充当评论家(Critic),负责评价演员的表现,并指导演员的后续行为动作。

Critic 的行为价值函数是基于策略(Actor) $\pi_{\theta}$ 的一个近似

  • Critic是一个神经网络: 输入是状态和动作,输出是Q值(或者输入是状态,输出是状态价值V)。这个网络的训练是为了让它的预测结果尽可能地逼近真值。网络的预测值为 $Q(s, a)$ ,网络的实际值用下一状态的 Q 表示,即: $r + \gamma Q(s’, a’)$ ,两者进行作差,然后用一些常用的loss函数进行网络训练。

  • Actor也是一个神经网络,输入是状态,输出是离散动作的分布函数的关键参数(或者离散动作的概率)。这个网络的目标是如果这次被选中的动作获得了正的奖励,那么在下一次被选中的概率增大,如果这次被选中的动作获得了负的奖励,那么在下一次被选中的概率减小。

因为在Critic 的行为价值函数中用神经网络进行了近似。所以在计算Actor网络的策略梯度时也是近似公式

两者什么时候能够严格地相等呢,相容函数近似定理(Compatible Function Approximation Theorem) 说明了这个问题。定理的条件和结论如下:

如果下面两个条件满足:

  1. 近似价值函数的梯度与分值函数的梯度相同,即 $\nabla_{w}Q_{w}(s,a) = \nabla_{\theta}\ln\pi_{\theta}(s,a)$
  2. 近似价值函数的参数 w 能够最小化 $\epsilon = E_{\pi_{\theta}}[(Q_{\pi_{\theta}}(s,a) - Q_{w}(s,a))^{2}]$

那么策略梯度 $\nabla_{\theta}J(\theta)$ 是准确的,即

此时可以根据与价值函数相同的评价结果来决定策略

(3) 行动的相对价值

这是一种通过减去状态价值来评价行动的一种方法。提出这种方法是因为各种状态下的行动价值 $Q(s, a)$,相比行动更依赖于状态。

此时行动价值的计算如下:

其中 V(s) 是状态价值。得到的 A(s, a) 称为 Advantage

可以将折现值作为 $Q_{w}(s, a)$,并从中减去 $V(s)$ 计算 Advantage。

使用 Advantage 时,梯度如下

如果将 $\pi_{\theta}(a|s)$ 作为 Actor,将计算 Advantage 所需的 $V(s)$ 作为 Critic,则可以通过 Actor Critic 的方式进行学习。这就是 Advantage Actor Critic (A2C)算法。

使用价值的估计值 $Q_{w}(s, a)$ 和行动的相对价值 Advantage,学习过程比仅使用折现值时更稳定。


Share