使用神经网络实现强化学习的框架

  |  

摘要: 用神经网络实现强化学习的框架

【对算法,数学,计算机感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:算法题刷刷
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


python_rl

写在前面

在文章 强化学习方法的分类总结 中,我们总结了强化学习的各种学习方式,其中我们详细研究了 Q 学习、蒙特卡洛方法、SARSA、Actor Critic 这四种方法的原理,并实现了代码模板。

回顾前面的各种方法的原理和代码模板,我们发现其中的一个共同要点是计算 Q 值,而实现方式都是以 Q[s][a] 的方式保存各种状态下各种行动的价值。

当状态和行动增加,这种方式变得不可行;此外对于连续值的状态和行动也无法用 Q[s][a] 的方式实现 Q 值的计算。

通过含参数的函数来计算 Q 值是一种解决方式。这种方式不记录所有状态下所有行动的价值,而是用数学式和参数来表达状态、行动与行动价值的关系。

基于这个思路,本文我们就来看一下通过含参数的函数(神经网络),实现价值近似和策略的方法,本文我们重点实现将神经网络用于强化学习的整体框架,具体的价值近似和策略的实现我们后面再学习。本文的一些内容(包括图和代码)主要参考《用Python动手学强化学习》这本书。本书的翻译者程引是一位交大学长,本书非常偏实战,适合快速入门并对经典模型进行实践,还是比较推荐的。

如果我们用 $y = ax + b$ 作为价值近似,则 x 就是 Agent 的状态,y 就是 Agent 动作空间中各种动作的价值。

例如考虑一个网格环境,Agent 的状态为其坐标 (x1, x2),Agent 的动作空间有上下左右,它们的价值记为 (y1, y2, y3, y4)。此时 y = ax + b 可以展开:

我们学习的目标是缩小价值的统计误差(TD误差),以前我们通过更新 Q[s][a] 的方式学习。这里就需要通过优化参数 a, b 的方式学习。

本文我们实现一个将神经网络应用到强化学习的框架,此后我们可以在此框架下将神经网络作为含参数的函数用于价值近似以及策略


神经网络结构

在使用神经网络实现强化学习框架前,我们极简地复习一下神经网络相关的必要知识。

参考: 极简神经网络必要知识


使用神经网络实现强化学习的框架

基于神经网络的强化学习框架由以下四个模块组成:

  1. Agent(智能体): 通过含有参数的函数(神经网络)实现的智能体。
  2. Trainer(训练者): 进行智能体的学习的模块。
  3. Observer(观察者): 进行从环境。
  4. Logger(记录者): 记录学习过程。

整体架构如下图所示:

其中:

Agent 使用含有参数的函数(本文使用神经网络),对状态(state)进行评价,并通过 Epsilon-Greedy 算法决定策略(policy)。
Trainer 基于智能体提供的数据进行训练。
Observer 执行预处理,例如 render。
Logger 记录学习情况的指标,例如 reward, loss 等。

下面我们看一下整体框架的实现代码。将来可以在自定义的环境中继承这个训练框架进行学习。

Agent

首先给出完整代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from collection import namedtuple

import numpy as np

from utils.logger import Logger


logger = Logger("Agent")


Experience = namedtuple("Experience"
,["s", "a", "r", "n_s", "d"]
)

class FNAgent():
def __init__(self, epsilon, actions):
self.epsilon = epsilon
self.actions = actions
self.model = None
self.estimate_probs = False
self.initialized = False

def save(self, model_path):
self.model.save(model_path, overwrite=True, include_optimimzer=False)

@classmethod
def load(cls, env, model_path, epsilon=0.0001):
actions = list(range(env.action_space.n))
agent = cls(epsilon, actions)
agent.initialized = True
return agent

def initialize(self, experiences):
raise NotImplementedError("You have to implwement initialize method")

def estimate(self, s):
raise NotImplementedError("You have to implwement estimate method")

def update(self, experiences, gamma):
raise NotImplementedError("You have to implwement update method")

def policy(self, s):
if np.random.random() < self.epsilon or not self.initialized:
return np.random.randint(len(self.actions))
else:
estimates = self.estimate(s)
if self.estimate_probs:
action = np.random.choice(self.actions, size=1, p=estimates)[0]
return action
else:
return np.argmax(estimates)

def play(self, env, episode_count=5, render=True):
for e in range(episode_count):
s = env.reset()
done = False
episode_reward = 0
while not done:
if render:
env.render()
a = self.policy(s)
n_state, reward, done, info = env.step(a)
episode_reward += reward
s = n_state
else:
logger.info("get reward: {:.6f}".format(episode_reward))

下面解读代码中的每个组件。

  • Experience 是存储智能体经验的数据结构。它由状态(s)、行动(a)、奖励(r)、迁移的状态(n_s)
  • Agent 中的 save 和 load 方法是用于保存、读取已经进行学习的智能体的函数。
  • Agent 中的 initialize, estimate, update 方法由派生类实现,分别用于初始化智能体持有的含参数的函数、通过函数进行预测、更新参数。
  • Agent 中的 policy 方法,如果预测行动的概率,则根据该概率对行动采样。
  • Agent 中的 play 方法,模拟智能体的行动。

Trainer

首先给出完整代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import re
from collection import deque

from utils.logger import Logger as MyLogger


mylogger = MyLogger("Trainer")


class Trainer():
def __init__(self, buffer_size=1024, batch_size=32,
gamma=0.9, report_internal=10, log_dir=""):
self.buffer_size = buffer_size
self.batch_size = batch_size
self.gamma = gamma
self.report_internal = report_internal
self.logger = Logger(log_dir, self.trainer_name)
self.experiences = deque(maxlen=buffer_size)
self.training = False
self.training_count = 0
self.reward_log = []

@property
def trainer_name(self):
class_name = self.__class__.name
snaked = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", class_name)
snaked = re.sub("([a-z0-9])([A-Z])", r"1\_\2", snaked).lower()
snaked = snaked.replace("_trainer", "")
return snaked

def train_loop(self, env, agent, episode=200, initial_count=-1
render=False, observe_interval=0):
self.experiences = deque(maxlen=self.buffer_size)
self.training = False
self.training_count = 0
self.reward_log = []
frames = []

for i in range(episode):
s = env.reset()
done = False
step_count = 0
self.episode_begin(i, agent)
while not done:
if render:
env.render()
if self.training and observe_interval > 0 \
and (self.training_count == 1 or self.training_count % observe_interval == 0):
frames.append(s)

a = agent.police(s)
n_state, reward, done, info = env.step(a)
e = Experience(s, a, reward, n_state, done)
self.experiences.append(e)
if not self.training and len(self.experiences) == self.buffer_size:
self.begin_train(i, agent)
self.training = True

self.step(i, step_count), agent, e
s = n_state
step_count += 1
else:
self.episode_end(i, step_count, agent)

if not self.training and \
initial_count > 0 and i >= initial_count:
self.begin_train(i, agent)
self.training = True

if self.training:
if len(frames) > 0:
self.logger.write_image(self.training_count, frames)
frames = []
self.training_count += 1

def episode_begin(self, episode, agent):
pass

def begin_train(self, episode, agent):
pass

def step(self, episode, step_count, agent, experience):
pass

def episode_end(self, episode, step_count, agent):
pass

def is_event(self, count, interval):
return True if count != 0 and count % interval == 0 else False

def get_recent(self, count):
recent = range(len(self.experiences) - count, len(self.experiences))
return [self.experiences[i] for i in recent]

下面解读代码中的每个组件。

  • Trainer 将智能体的行动记录存储在 self.experiences 中,只是智能体进行学习需要的数据。
  • buffer_size 为 self.experiences 的大小,在超过 buffer_size 的情况下,最早的行动记录将被丢弃,由 deque 实现。
  • 每次学习时,从 self.experiences 中取出的数据大小为 batch_size

像以上这样,将行动记录暂时保存,然后从中进行采样和学习的方法称为经验回放

当然也可以直接将行动结果用于学习,只是学的会不稳定。

书中提供了一张对比有无经验回放的示意图,非常直观。

  • train_loop 是进行学习的循环,其中的处理方式是在指定的 episode 环境中进行游戏,并根据回合的开始、结束以及各个步骤,运行相应的方法 episode_begin、episode_end、step。
  • 当积累 self.buffer_size 次 self.experiences 或者进行了 initial_count 个回合时,重置学习开始的标志 self.training = True。

以上这种处理方式可以很方便地自定义在学习的哪个阶段进行哪种处理。

  • observe_interval 用来指定智能体在环境中运行的频率,具体的实现方式是在运行时以 observe_interval 的频率将画面(状态)存储在 frames 中,并在回合结束后由 self.logger.write_image 写出。

Observer

首先给出完整代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from utils.logger import Logger

logger = Logger("Observer")

class Observer():
def __init__(self, env):
self._env = env
logger.info("创建 Observer")

@property
def action_space(self):
return self._env.action_space

@property
def observation_space(self):
return self._env.observation_space

def reset(self):
return self.transform(self._env.reset())

def render(self):
return self._env.render(mode="human")

def step(self, action):
n_state, reward, done, info = self._env.step(action)
return self.transform(n_state), reward, done, info

def transform(self, state):
raise NotImplementedError("You have to implement transform method")

下面解读代码中的每个组件。

  • Observer 可以简单理解为 env 的封装。
  • transform 将从 env 获得的状态转换为智能体易于处理的形式。
  • 在使用 Observer 进行学习的情况下,在运行时也必须使用 Observer。这是因为学习后的智能体是以 Observer 的转换为前提的。

Logger

首先给出完整代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import os
import io

import numpy as np
import tensorflow as tf
from PIL import Image
import matplotlib.pyplot as plt

from utils.logger import Logger as MyLogger
from utils.path import creater_folder

mylogger = MyLogger("Logger")

class Logger():
def __init__(self, log_dir="", dir_name=""):
self.log_dir = log_dir
if not log_dir:
self.log_dir = os.path.join(os.path.dirname(__file__), "logs")
creater_folder(self.log_dir)
if dir_name:
self.log_dir = os.path.join(self.log_dir, dir_name)
creater_folder(self.log_dir)

self._callback = tf.compat.v1.keras.callbacks.TensorBoard(self.log_dir)

mylogger.info("创建 Logger")

@property
def writer(self):
return self._callback.writer

def weiter(self, model):
return self._callback.writer

def set_model(self, model):
return self._callback.set_model(model)

def path_of(self, file_name):
return os.path.join(self.log_dir, file_name)

def describe(self, name, values, episode=-1, step=-1):
mean = np.round(np.mean(values), 3)
std = np.round(np.std(values), 3)
desc = "{} is {} (+/-{})".format(name, mean, std)
if episode > 0:
print("At episode {}, {}".format(episode, desc))
elif step > 0:
print("At step {}, {}".format(step, desc))

def plot(self, name, values, interval=10):
indices = list(range(0, len(values), interval))
means = []
stds = []
for i in indeces:
_values = values[i: (i + interval)]
means.append(np.mean(_values))
stds.append(np.std(_values))
means = np.array(means)
stds = np.array(stds)
plt.figure()
plt.title("{} History".format(name))
plt.grid()
plt.fill_between(indeces, means - stds, means + stds, alpha=0.1), color="g")
plt.plot(indeces, means, "o-", color="g")
plt.legend(loc="best")
plt.show()

def write(self, index, name, value):
summary = tf.compat.v1.Summary()
summary_value = summary.value.add()
summary_value.tag = name
summary_value.simple_value = value
self.writer.add_summary(summary, index)
self.writer.flush()

def write_image(self, index, frames):
# 将一个 frames 作为一系列灰度图像处理
last_frames = [f[:, :, -1] for f in frames]
if np.min(last_frames[-1]) < 0:
scale = 127 / np.abs(last_frames[-1]).max()
offset = 128
else:
scale = 255 / np.max(last_frame[-1])
offset = 0
channel = 1 # 灰度
tag = "frames_at_training_{}".format(index)
values = []

for f in last_frames:
height, width = f.shape
array = np.asarray(f * scale + offset, dtype=np.uint8)
image = Image.fromarray(array)
output = io.BayesIO()
image.save(output, format="PNG")
image_string = output.getvalue()
output.close()
image = tf.compat.v1.Summary.Image(height=height, width=width, colorspace=channel, encoded_image_string=image_string)
value = tf.compat.v1.Summary.Value(tag=tag, image=image)
values.append(value)

summary = tf.compat.v1.Summary(value=values)
self.writer.add_summary(summary, index)
self.writer.flush()

Logger 的作用是记录学习的情况,其实现的核心是通过 TensorBoard 引用 write, write_image 等。

write_image 绘制 Observer 处理的智能体正在观看的画面,由此我们可以检查是否已经执行了预期的预处理。


Share