OpenAI-Gym神经网络策略及其训练(策略梯度)

  |  

摘要: OpenAI gym 策略梯度入门

【对算法,数学,计算机感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:算法题刷刷
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


在文章 OpenAI-Gym入门 中,我们用 CartPole-v1 环境学习了 OpenAI Gym 的基本用法,并跑了示例程序。

本文我们继续用该环境,来学习在 Gym 中如何写策略。

  • 硬编码简单策略
  • 神经网络策略
    • 评估动作
      • 折扣因子
      • 动作优势
    • 策略梯度
    • 训练和推理

1. 硬编码简单策略

首先我们可以拍脑袋定一个硬编码策略,也就是自己写一个函数,输入 obs 返回 action。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import time

import gym
import numpy as np

"""
CartPole-v1 环境
硬编码策略:
当杆子向左倾斜时,向左加速
当杆子向右倾斜时,向右加速
执行此政策,看获得500回合以上的平均奖励
"""

def basic_policy(obs):
angle = obs[2]
return 0 if angle < 0 else 1

def main():
env = gym.make("CartPole-v1")
totals = []
for episode in range(500):
episode_rewards = 0
obs = env.reset()
env.render()
for step in range(200):
action = basic_policy(obs)
obs, reward, done, info = env.step(action)
episode_rewards += reward
if done:
break
totals.append(episode_rewards)
time.sleep(0.1)
print("totals mean: {:.4f}".format(np.mean(totals)))
print("totals std: {:.4f}".format(np.std(totals)))
print("totals min: {:.4f}".format(np.min(totals)))
print("totals max: {:.4f}".format(np.max(totals)))

if __name__ == "__main__":
main()

结果如下

1
2
3
4
totals mean: 41.9080
totals std: 8.5351
totals min: 24.0000
totals max: 72.0000

2. 神经网络策略

神经网络把观察值作为输入,输出要执行的动作。具体做法是先估计每个动作的概率,然后根据估计的概率随机选择一个动作

对于 CartPole-v1 环境,只有两个动作,因此只需要一个输出神经元,输出左的概率 p,右的概率就是 1 - p。

通过概率 + 随机选择的方式,可以使得智能体可以在探索新动作和利用已知运行良好的动作之间找到平衡。

下面我们用 keras 实现神经网络策略。输入的数量是观察空间的大小,由于问题比较简单,只用 5 个隐藏神经元,输出一个向左概率。

1
2
3
4
5
n_inputs = env.observation_space.shape[0] # 输入为观察空间的大小
model = keras.models.Sequential([
keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]), # 5 个隐藏单元
keras.layers.Dense(1, activation="sigmoid"), # 如果有多个可能的动作,则每个动作有一个输出神经元,用 softmax
])

有了神经网络策略,下面要考虑如何训练。

评估动作

智能体只能通过奖励来指导自己的训练,而奖励是稀疏并且是延迟的。也就是智能体在执行若干次动作之后回合结束,才得到奖励反馈。而智能体并不知道得到这样的奖励反馈应该归因于哪些动作。

为了解决这个问题,一种常见的策略是基于动作后获得的所有奖励的总和来评估该动作,并且在每个步骤中累乘一个折扣因子 gamma,折扣后的奖励的总和称为动作回报

如果折扣因子接近 0,则与立即回报相比,未来回报将不起作用。相反,如果折扣因子接近 1,则远期回报将几乎等于立即回报。如果折扣因子 gamma = 0.95,则未来 13 个步骤的回报大约是即时回报的一半。

一个好的动作之后,可能会跟着不好的动作,造成好动作的回报变低,但是平均而言,好的动作会比坏的动作获得更高回报。

我们想要估计一个动作与其它可能的动作相比平均好多少,这是动作优势,具体做法是执行许多回合,归一化所有动作的回报。具有负优势的动作是不好的,有正优势的动作是好的。这样就有了评估每个动作的方法。想要训练起来,还要策略梯度的概念。

策略梯度

策略梯度算法通过更高回报的梯度来优化策略的参数

step1: 让 NN 策略多次参与游戏,在每个步骤中计算梯度,使所选择动作更可能发生
step2: 运行几个回合之后,就可以计算每个动作的优势
step3: 把每个梯度向量乘以相应的动作优势
优势为正,用梯度使得该动作在将来被选择的可能性较大
优势为负,用梯度使得该动作在将来被选择的可能性较小
step4: 计算所有得到的梯度向量的均值,用来执行梯度下降

下面在 keras 中实现该算法,首先我们定义运行一步的函数,注意给模型 model 输入 obs,最终的到 action 的写法,推理阶段也要这么写(在硬编码策略中是 action = basic_policy(obs))

这里我们先认为采取的任何动作都是正确的操作,以便我们可以计算损失及其梯度(这些梯度将被保存一会儿,稍后我们将根据操作的好坏来对其进行修改)

1
2
3
4
5
6
7
8
9
10
# 训练
def play_one_step(env, obs, model, loss_fn):
with tf.GradientTape() as tape:
left_proba = model(obs[np.newaxis])
action = (tf.random.uniform([1, 1]) > left_proba)
y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
loss = tf.reduce_mean(loss_fn(y_target, left_proba))
grads = tape.gradient(loss, model.trainable_variables)
obs, reward, done, info = env.step(int(action[0, 0].numpy()))
return obs, reward, done, grads

tf.GradientTape() 中,我们首先调用模型,并提供单个观察值。

接下来采样一个 0 ~ 1 之间的随机浮点数,并检查它是否大于 left_proba,相应地返回 0 或者 1 作为 action,0 表示向左,1 表示向右。

然后定义向左移动的目标概率,其值为 1 - action,然后用给定的损失函数计算损失,并用 tape 计算模型可训练变量的损失梯度。

最后执行选定的动作,返回新的观察结果、奖励、会和是否结束、以及梯度。

以下函数用 play_one_step() 函数来执行多个回合,并返回每个回合和每个步骤的所有回报和梯度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
all_rewards = []
all_grads = []
for episode in range(n_episodes):
current_rewards = []
current_grads = []
obs = env.reset()
for step in range(n_max_steps):
obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
current_rewards.append(reward)
current_grads.append(grads)
if done:
break
all_rewards.append(current_rewards)
all_grads.append(current_grads)
return all_rewards, all_grads

返回一个奖励列表的列表,梯度列表的列表(每个回合一个奖励列表,一个梯度列表),梯度列表包含每个步骤一个梯度元组,每个元组包含每个可训练变量的一个梯度张量。

下面的函数计算每个步骤中未来折扣奖励的总和,然后对许多回合的所有折扣奖励进行归一化

1
2
3
4
5
6
7
8
9
10
11
12
def discount_rewards(rewards, discount_factor):
discounted = np.array(rewards)
for step in range(len(rewards) - 2, -1, -1):
discounted[step] += discounted[step + 1] * discount_factor
return discounted

def discount_and_normalize_rewards(all_rewards, discount_factor):
all_discounted_rewards = [discount_rewards(rewards, discount_factor) for rewards in all_rewards]
flat_rewards = np.concatenate(all_discounted_rewards)
reward_mean = flat_rewards.mean()
reward_std = flat_rewards.std()
return [(discounted_rewards - reward_mean) / reward_std for discounted_rewards in all_discounted_rewards]

下面我们来测试一下以上两个函数

1
2
3
4
5
6
7
rewards1 = [10, 0, -50]
rewards2 = [10, 20]
all_rewards = [rewards1, rewards2]
a = discount_rewards(rewards1, discount_factor=0.8)
print(a)
b = discount_and_normalize_rewards(all_rewards, discount_factor=0.8)
print(b)

结果如下

1
2
[-22 -40 -50]
[array([-0.28435071, -0.86597718, -1.18910299]), array([1.26665318, 1.0727777 ])]

现在我们可以训练了,下面定义一些超参数

1
2
3
4
n_iterations = 30
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95

然后定义损失函数和优化器

1
2
3
4
5
# 损失函数
loss_fn = keras.losses.binary_crossentropy

# 优化器
optimizer = keras.optimizers.Adam(lr=0.01)

然后我们定义训练循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for iteration in range(n_iterations):
all_rewards, all_grads = play_multiple_episodes(env, n_episodes_per_update, n_max_steps, model, loss_fn)
all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_factor)
all_mean_grads = []
for var_index in range(len(model.trainable_variables)):
mean_grads = tf.reduce_mean([
final_reward * all_grads[episode_index][step][var_index]
for episode_index, final_rewards in enumerate(all_final_rewards)
for step, final_reward in enumerate(final_rewards)
], axis=0)
all_mean_grads.append(mean_grads)
optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

model.save(save_path)

训好模型之后,我们可以进行推理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
totals = []
for episode in range(500):
episode_rewards = 0
obs = env.reset()
env.render()
for step in range(200):
left_proba = model(obs[np.newaxis])
action = (1 - int(left_proba > 0.5))
obs, reward, done, info = env.step(action)
episode_rewards += reward
if done:
break
totals.append(episode_rewards)
time.sleep(0.1)
print("totals mean: {:.4f}".format(np.mean(totals)))
print("totals std: {:.4f}".format(np.std(totals)))
print("totals min: {:.4f}".format(np.min(totals)))
print("totals max: {:.4f}".format(np.max(totals)))

结果如下,明显比硬编码策略好

1
2
3
4
totals mean: 150.1460
totals std: 51.0068
totals min: 70.0000
totals max: 200.0000

3. 总结

刚刚我们用策略梯度算法解决了 CartPole 任务,但到更复杂的任务时,效率极低。但是策略梯度算法是其它更强的算法的基础,例如 Actor-Critic 算法。


4. 策略网络的训练和推理完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import gym
import numpy as np
import time

import tensorflow as tf
from tensorflow import keras

def play_one_step(env, obs, model, loss_fn):
"""
训练一个步骤
"""
with tf.GradientTape() as tape:
left_proba = model(obs[np.newaxis])
action = (tf.random.uniform([1, 1]) > left_proba)
y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
loss = tf.reduce_mean(loss_fn(y_target, left_proba))
grads = tape.gradient(loss, model.trainable_variables)
obs, reward, done, info = env.step(int(action[0, 0].numpy()))
return obs, reward, done, grads

def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
"""
训练多个回合
"""
all_rewards = []
all_grads = []
for episode in range(n_episodes):
current_rewards = []
current_grads = []
obs = env.reset()
for step in range(n_max_steps):
obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
current_rewards.append(reward)
current_grads.append(grads)
if done:
break
all_rewards.append(current_rewards)
all_grads.append(current_grads)
return all_rewards, all_grads

def discount_rewards(rewards, discount_factor):
"""
计算一个回合中各个步骤的未来折扣奖励总和
"""
discounted = np.array(rewards)
for step in range(len(rewards) - 2, -1, -1):
discounted[step] += discounted[step + 1] * discount_factor
return discounted

def discount_and_normalize_rewards(all_rewards, discount_factor):
"""
计算多个回合的 discount_rewards 的归一化
"""
all_discounted_rewards = [discount_rewards(rewards, discount_factor) for rewards in all_rewards]
flat_rewards = np.concatenate(all_discounted_rewards)
reward_mean = flat_rewards.mean()
reward_std = flat_rewards.std()
return [(discounted_rewards - reward_mean) / reward_std for discounted_rewards in all_discounted_rewards]

def train(env, save_path):
"""
神经网络策略模型训练
"""
# 定义模型
n_inputs = env.observation_space.shape[0] # 输入为观察空间的大小
model = keras.models.Sequential([
keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]), # 5 个隐藏单元
keras.layers.Dense(1, activation="sigmoid"), # 如果有多个可能的动作,则每个动作有一个输出神经元,用 softmax
])

# 超参数
n_iterations = 30
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95

# 损失函数
loss_fn = keras.losses.binary_crossentropy

# 优化器
optimizer = keras.optimizers.Adam(lr=0.01)

# 训练循环
for iteration in range(n_iterations):
all_rewards, all_grads = play_multiple_episodes(env, n_episodes_per_update, n_max_steps, model, loss_fn)
all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_factor)
all_mean_grads = []
for var_index in range(len(model.trainable_variables)):
mean_grads = tf.reduce_mean([
final_reward * all_grads[episode_index][step][var_index]
for episode_index, final_rewards in enumerate(all_final_rewards)
for step, final_reward in enumerate(final_rewards)
], axis=0)
all_mean_grads.append(mean_grads)
optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

model.save(save_path)

def inference(env, model):
"""
神经网络策略模型推理
"""
totals = []
for episode in range(500):
episode_rewards = 0
obs = env.reset()
env.render()
for step in range(200):
left_proba = model(obs[np.newaxis])
action = (1 - int(left_proba > 0.5))
obs, reward, done, info = env.step(action)
episode_rewards += reward
if done:
break
totals.append(episode_rewards)
time.sleep(0.1)
print("totals mean: {:.4f}".format(np.mean(totals)))
print("totals std: {:.4f}".format(np.std(totals)))
print("totals min: {:.4f}".format(np.min(totals)))
print("totals max: {:.4f}".format(np.max(totals)))

def main():
env = gym.make("CartPole-v1")

save_path = "demo/model/rl_nn_model.h5"
# train(env, save_path)

model = keras.models.load_model(save_path)
inference(env, model)


if __name__ == "__main__":
main()

Share