ANN-with-Keras

  |  

本文内容整理自 《Hands-on Machine Learning》 第十章第二节


用 Keras 实现 MLP

Keras

Keras 基础信息

  • 代码
  • 时间: 2015
  • 作者: Francois Chollet
  • 文档
  • build/train/evaluate/execute all sort of NN

Keras 的 computation backend

目前 Keras 可以跑在 Apache mxnet; Apple Core ML JavaScript, PlaidML(所有 GPU 都可用,不限于 Nvidia)

Tensorflow 也有一个 Keras 的实现 tf.keras,只支持 Tensorflow 作为 Backend,好处是可以用 Tensorflow 的其它特性,例如数据相关的 API 等等。

Keras API 的两种实现: 多 backend Keras 和 tf.keras

PyTorch API 与 Keras 很像,并且它们都受了 sklearn 和 Chainer 的启发。

1
2
import tensorflow.keras as keras
print(keras.__version__)

(1) 用 Sequential API 建立模型

分类问题

在 Fashion MNIST 上建立图像分类器。

载入数据

1
2
fashion_mnist = keras.datasets.fashion_mnist
(X_train_full, y_train_full), (X_test, y_test) = fashion_mnist.load_data()

数据以 28 * 28 的二维 array 存储而不是 784 的一维 array。各个像素均为 0~255 的整数(uint8)而不是 0.0~255.0。

由于要用梯度下降训练神经网络,我们将输入特征归一化。并分一下验证集。

1
2
X_valid, X_train = X_train_full[:5000] / 255.0, X_train_full[5000:] / 255.0
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

对于 Fashion MNIST,类标签0, 1, … 无法表示具体含义,我们还需要一个对应的标签名的列表。

1
2
class_names = ["T-shirt/top", "Trouser", "Pullover", "Dress", "Coat",
"Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"]

用 Keras Sequential API 建立模型

1
2
3
4
5
model = keras.Sequential()
model.add(keras.layers.Flatten(input_shape=[28, 28]))
model.add(keras.layers.Dense(300, activation="relu"))
model.add(keras.layers.Dense(100, activation="relu"))
model.add(keras.layers.Dense(10, activation="softmax"))
  • keras.Sequential() creates a Sequential model: just composed of a single stack of layers connected sequentially
  • Flatten layer: its role is to convert each input image into a 1D array
    • 可以用 keras.layers.InputLayer 作第一层,input_shape=[28,28].
  • Dense Layer: keras.layers.Dense 第一个参数是神经元个数,Dense 的意思是各层只管理自己的权重矩阵。
  • https://keras.io/api/layers/activations/ 可以查看其它激活函数

可以一次性将所有层以列表的形式提供给 keras.models.Sequential()

1
2
3
4
5
6
model = keras.models.Sequential([
keras.layers.Flatten(input_shape=[28, 28]),
keras.layers.Dense(300, activation="relu"),
keras.layers.Dense(100, activation="relu"),
keras.layers.Dense(10, activation="softmax")
])

第一个隐藏层共 784 * 300 个权重加 300 个偏置,共计 235500 个参数。

model.summary() 展示模型所有的层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Model: "sequential_1"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
flatten (Flatten) (None, 784) 0
_________________________________________________________________
dense (Dense) (None, 300) 235500
_________________________________________________________________
dense_1 (Dense) (None, 100) 30100
_________________________________________________________________
dense_2 (Dense) (None, 10) 1010
=================================================================
Total params: 266,610
Trainable params: 266,610
Non-trainable params: 0
_________________________________________________________________

查看特定层的信息

1
2
3
4
5
hidden1 = model.layers[1]
hidden1.name # 结果为 dense,与 model.summary() 的信息对应

model.get_layer("dense") is hidden1 # 结果为 True
weights, biases = hidden1.get_weights()
  • Dense Layer 会将权重随机初始化,偏置初始化为 0。要用其它初始化方法,可以用 kernel_initializer/bias_initializer 参数。
  • https://keras.io/api/layers/initializers/ 可以查看其它初始化方法

compile the model

建立模型后,必须调用 compile 方法,指定所用的 loss 和 optimizer。此外还可以提供一个额外的 metrics 列表,用于 training 和 evaluation 时的计算。

1
2
3
4
model.compile(loss="sparse_categorical_crossentropy"
,optimizer="sgd"
,metrics=["accuracy"]
)

关于 sparse labels 和 one-hot 向量

  • 使用 sparse_categorical_crossentropy 是因为用的是 sparse labels: 对每个样本,输出只有一个类标签(0 ~ 9 的某个数)。如果每个类都有一个概率值,比如 one-hot 向量,则我们要用 “categorical_crossentropy”
  • sparse labels 转 one-hot 向量: keras.utils.to_categorical()
  • ont-hot 向量转 sparse labels: np.argmax(axis=1)

Training and Evaluating the Model

1
history = model.fit(X_train, y_train, epochs=30, validation_data=(X_valid, y_valid))
  • 如果有类别不平衡的问题,调用 fit 时需要设置 class_weight 参数,它会给不同的类不同的权重,这个权重在算 loss 时会使用。
  • 如果需要样本权重,设置 sample_weight 这个参数。当某些数据是专家标注,某些数据是众包平台标注的时候,这个参数很有用。
  • 返回 History 对象,history.params 包含训练参数 verbose, epoch, steps, history.history 包含每个 epoch 的 loss, accuracy, val_loss, val_accuracy,可以用于画图。
1
2
3
4
5
6
7
import pandas as pd
import matplotlib.pyplot as plt

pd.DataFrame(history.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

注意,验证误差是在每个 epoch 末尾计算的,而训练误差是 opech 中的每一轮都会算的。所以训练曲线应该向左平移半个 epoch 再与验证曲线对比。

如果对模型的验证集性能不满意,可以开始调参:

  • 学习率
  • 其它优化器
  • 层数,层的神经元个数
  • 激活函数
  • batch_size

如果对模型的验证集型能满意,开始在测试集上做预测

1
model.evaluate(X_test, y_test)

Inference

1
2
y_proba = model.predict(X_test)
y_pred = model.predict_classes(X_test)

回归问题

数据加载

用 sklearn 的 fetch_california_housing 获取数据,这个数据比价简单,仅含数值型特征,没有缺失值。

1
2
3
4
5
6
7
8
9
10
11
12
13
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()

X_train_full, X_test, y_train_full, y_test = train_test_split(housing.data, housing.target)
X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_valid = scaler.transform(X_valid)
X_test = scaler.transform(X_test)

用 Sequential API 建立模型, 训练, 预测

与分类模型的主要区别

  • 输出层只要一个神经元且没有激活函数
  • 损失函数是 MSE
1
2
3
4
5
6
7
8
9
10
11
12
model = keras.models.Sequential([
keras.layers.Dense(30, activation="relu", input_shape=X_train.shape[1:]),
keras.layers.Dense(1)
])

model.compile(loss="mean_squared_error", optimizer="sgd")

history = model.fit(X_train, y_train, epochs=20, validation_data=(X_valid, y_valid))

mse_test = model.evaluate(X_test, y_test)

y_pred = model.predict(X_test)

(2) 用 Functional API 建立更复杂的模型

以 Wide & Deep neural network 模型解决回归问题为例。网络结构如下图

Wide & Deep NN 网络结构

这种结构的好处是可以同时学习深层特征和简单规则。Keras 实现如下

1
2
3
4
5
6
7
input_ = keras.layers.Input(shape=X_train.shape[1:])
hidden1 = keras.layers.Dense(30, activation="relu")(input_)
hidden2 = keras.layers.Dense(30, activation="relu")(hidden1)
concat = keras.layers.Concatenate()([input_, hidden2])
output = keras.layers.Dense(1)(concat)

model = keras.Model(inputs=[input_], outputs=[output])
  • 首先创建一个 Input 对象,包含 shape 和 dtype 信息。一个模型可以有多个 Input 对象。
  • 创建第一个隐藏层 hidden1 = keras.layers.Dense(30, activation="relu")(input_) 好像是进行了函数调用一样,因此我们管它叫 Functional API。我们只是告诉 Keras 层之间是如何连接的,没有实际数据要处理。
  • 创建第二个隐藏层
  • 创建 Concatenate 层,它依然像一个函数调用
  • 创建输出层,单个神经元,没有激活函数,仍然像函数一样调用

建立好模型之后,后续的过程与前面的例子中就一模一样了。

处理 Multiple inputs

下面考虑一部分特征给 deep path, 另一部分给 wide path 的情况(两部分可能有重合)。网络结构如下图

Wide & Deep NN 网络结构

加上我们想把特征 0~4 给 wide path,2~7 给 deep path

1
2
3
4
5
6
7
8
input_A = keras.layers.Input(shape=[5], name="wide_input")
input_B = keras.layers.Input(shape=[6], name="deep_input")
hidden1 = keras.layers.Dense(30, activation="relu")(input_B)
hidden2 = keras.layers.Dense(30, activation="relu")(hidden1)
concat = keras.layers.concatenate([input_A, hidden2])
output = keras.layers.Dense(1, name="output")(concat)

model = keras.Model(inputs=[input_A, input_B], outputs=[output])

训练时候需要对数据做一些额外处理,得到 input_A 和 input_B

1
2
3
4
5
6
7
8
9
10
model.compile(loss="mse", optimizer=keras.optimizers.SGD(lr=1e-3))

X_train_A, X_train_B = X_train[:, :5], X_train[:, 2:]
X_valid_A, X_valid_B = X_valid[:, :5], X_valid[:, 2:]
X_test_A, X_test_B = X_test[:, :5], X_test[:, 2:]

history = model.fit((X_train_A, X_train_B), y_train, epochs=20,
validation_data=((X_valid_A, X_valid_B), y_valid))
mse_test = model.evaluate((X_test_A, X_test_B), y_test)
y_pred = model.predict((X_test_A, X_test_B))

处理 Multiple outputs

需要处理多输出的几个场景

  1. 任务本身需要多输出,比如检测任务中,需要对图片中的主要物体进行定位(对物体中心坐标,宽度和高度的回归问题)和分类。
  2. 同一个数据集上的多任务训练。按常规做法可以对每个任务单独训练 NN。但是也可以只训练一个 NN 只是每个任务单独一个输出,例如人脸的多任务分类,一个输出分类表情,一个输出分类是否戴了眼镜。
  3. 正则化技术: 增加辅助输出。例如下面的网络结构

增加辅助输出的正则化

实现如下,对比上面的多 Input 的实现代码,仅仅改了原有的 output 一行,增加了 aux_output 一行,然后在 model 定义时给 outputs 增加一个 aux_output。

1
2
3
4
5
6
7
8
9
input_A = keras.layers.Input(shape=[5], name="wide_input")
input_B = keras.layers.Input(shape=[6], name="deep_input")
hidden1 = keras.layers.Dense(30, activation="relu")(input_B)
hidden2 = keras.layers.Dense(30, activation="relu")(hidden1)
concat = keras.layers.concatenate([input_A, hidden2])
output = keras.layers.Dense(1, name="main_output")(concat)
aux_output = keras.layers.Dense(1, name="aux_output")(hidden2)

model = keras.Model(inputs=[input_A, input_B], outputs=[output, aux_output])

定义了多输出后,每个输出需要自己的 loss,因此 compile 时候,需要提供 loss 的列表。如果只给了一个 loss,则 Keras 会认为所有输出都用同一个 loss。
Keras 默认会将所有 loss 简单相加等到训练的总 loss。我们可以提供不同 output 对应的 loss 的权重。

1
model.compile(loss=["mse", "mse"], loss_weights=[0.9, 0.1], optimizer="sgd")

定义了多输出后,在训练模型时,需要提供每个输出的标签。如果多个输出预测的是同一个东西,则它们可以用同一套标签。例如增加辅助输出的正则化后,原来的 y_train 改成 [y_train, y_train]

1
2
3
4
history = model.fit([X_train_A, X_train_B], [y_train, y_train]
,epochs=20
,validation_data=([X_valid_A, X_valid_B], [y_valid, y_valid])
)

评估的时候,Keras 会给出 total loss 以及各个输出对应的 loss。

1
total_loss, main_loss, aux_loss = model.evaluate([X_test_A, X_test_B], [y_test, y_test])

预测阶段,predict() 回给出各个输出对应的输出结果

1
y_pred_main, y_pred_aux = model.predict([X_test_A, X_test_B])

(3) Subclassing API 建立动态网络

Sequential API 和 Functional API 都是声明式的: 事先声明要用什么样的层,各层之间是如何连接的,然后再喂给模型训练或预测数据。

这样做有很多好处:

  • 模型很容易保存,复制,分享
  • 网络结构可以可视化分析
  • 可以检查推理时的 shape 和 type,易于排查错误

但是也有缺点,那就是它们是静态的。有些模型中包含循环,条件分支,变化的 shape,以及其它动态的行为。遇到这些情况时,Sequential API 和 Functional API 就不好使了,要用 Subclassing API 。

继承 keras.Model 类,在构造函数中创建需要的层,然后在 call 方法中定义计算流程。

下面我们将前面用 Functional API 实现的 Deep and Wide 模型用 Subclassing API 实现一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class WideAndDeepModel(keras.Model):
def __init__(self, uints=30, activation="relu", **kwargs):
super().__init__(**kwargs) # handle standard args (e.g. name)
self.hidden1 = keras.layers.Dense(uints, activation=activation)
self.hidden2 = keras.layers.Dense(uints, activation=activation)
self.main_output = keras.layers.Dense(1)
self.aux_output = keras.layers.Dense(1)

def call(self, inputs):
input_A, input_B = inputs
hidden1 = self.hidden1(input_B)
hidden2 = self.hidden2(hidden1)
concat = keras.layers.concatenate([input_A, hidden2])
main_output = self.main_output(concat)
aux_output = self.aux_output(hidden2)
return main_output, aux_output

model = WideAndDeepModel()

compile, train, evaluate, prediction 部分均不变,与 Functional API 实现一样。

这个实现方式与 Functional API 差不多,只是不用创建 inputs。inputs 是 call() 的参数。这样就将层的创建和使用分离了
最大的好处就是可以在 call() 中做很多事情了,比如 for 循环,if 语句,low-level TensorFlow operation 等等。

这样的灵活性是有成本的:

  • 你的模型的实现隐藏在 call() 里,因此 Keras 无法知晓,这样模型就不能保存或复制。
  • 调用 summary() 只能得到层的列表,层之间是如何连接的是提供不出来的。
  • Keras 也不能提前检查类型和 shape,使得出错的几率增加。

保存和恢复模型

Sequential 和 Functional API

保存模型: keras 会用 HDF5 格式保存模型结构和参数。并且会保存 optimizer 及其超参数。

1
2
3
4
model = keras.layers.Sequential([...]) # or keras.Model([...])
model.compile([...])
model.fit([...])
model.save("my_keras_model.h5")

加载模型

1
model = keras.models.load_model("my_keras_model.h5")

保存 checkpoints : 使用 Callbacks

fit() 可以接受一个 callbacks 参数,callbacks 中持有一些对象,模型在 epoch 的开始和结束时会调用这些对象。

1
2
3
... # build and compile
checkpoint_cb = keras.callbacks.ModelCheckpoint("my_keras_model.h5")
history = model.fit(X_train, y_train, epochs=10, callbacks=[checkpoint_cb])

如果使用了验证集,可以用 save_best_only=True 去保存 checkpoint。只保存验证集上最好的模型,可以防止训练时间过长导致的过拟合。

1
2
3
4
5
... # build and compile
checkpoint_cb = keras.callbacks.ModelCheckpoint("my_keras_model.h5", save_best_only=True)
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid), callbacks=[checkpoint_cb])

model = keras.models.load_model("my_keras_model.h5") # roll back to best model

另一种实现 Early stoppint 的方式是使用 EarlyStopping callback。当多个 epochs (由 patience 参数定义)验证集的性能均无提高的时候,训练就会中断。

1
2
3
4
5
early_stopping_cb = keras.callbacks.EarlyStopping(patience=10,
restore_best_weights=True)
history = model.fit(X_train, y_train, epochs=100,
validation_data=(X_valid, y_valid),
callbacks=[checkpoint_cb, early_stopping_cb])

自定义 callback 例子:展示验证集 loss 与训练集 loss 的比值

1
2
3
4
5
6
7
class PrintValTrainRatioCallback(keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs):
print("\nval/train: {:.2f}".format(logs["val_loss"] / logs["loss"]))

history = model.fit(X_train, y_train, epochs=100,
validation_data=(X_valid, y_valid),
callbacks=[checkpoint_cb, early_stopping_cb, PrintValTrainRatioCallback()])

类似地,可以实现下列函数

1
2
3
4
5
6
on_train_begin()
on_train_end()
on_epoch_begin()
on_epoch_end()
on_batch_begin()
on_batch_end()

评估和预测时也可以用 callback,尤其是 debug 时。

对于 evaluate() 可以实现下列 callback

1
2
3
4
on_test_begin()
on_test_end()
on_test_batch_begin()
on_test_batch_end()

对于 predict() 可以实现下列 callback

1
2
3
4
on_predict_begin()
on_predict_end()
on_predict_batch_begin()
or on_predict_batch_end()

TensorBoard

TensorBoard 可以做的事情

  • view the learning curves during training
  • compare learning curves between multiple runs
  • visualize the computation graph
  • analyze training statistics
  • view images generated by your model
  • visualize complex multidimensional data projected down to 3D and automatically clustered for you

首先要你要可视化的数据输出到一个二进制 log 文件(Event 文件).
每条二进制记录称为一个 summary.

TensorBoard 服务会监控 log 目录,自动找出变化并更新可视化结果。

1
2
3
4
5
6
7
8
9
10
11
import os
logdir = os.path.join(os.curdir, "my_logs")

def get_run_logdir():
import time
run_id = time.strftime("run_%Y_%m_%d-%H_%M_%S")
return os.path.join(logdir, run_id)

run_log_dir = get_run_logdir()

tensorboard_cb = keras.callbacks.TensorBoard(run_log_dir)

然后启动 TensorBoard 服务

1
tensorboard --logdir=./my_logs --port=6006

在 jupyter notebook 中也可以启动 TensorBoard

1
2
%load_ext tensorboard
%tensorboard --logdir=./my_logs --port=6006

此外,tf.summary 提供了 lower-level API。下面的代码用 create_file_writer 创建了一个 SummaryWriter。此 weiter 用作 context,往二进制 log 里写 scalars, histograms, images, audio, 和 text,这些都可以在 TensorBoard 中看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
test_logdir = get_run_logdir()
writer = tf.summary.create_file_writer(test_logdir)

with writer.as_default():
for step in range(1, 1000 + 1):
tf.summary.scalar("my_scalar", np.sin(step / 10), step=step)

data = (np.random.randn(100) + 2) * step / 100 # some random data
tf.summary.histogram("my_hist", data, buckets=50, step=step)

images = np.random.rand(2, 32, 32, 3) # random 32×32 RGB images
tf.summary.image("my_images", images * step / 1000, step=step)

texts = ["The step is " + str(step), "Its square is " + str(step**2)]
tf.summary.text("my_text", texts, step=step)

sine_wave = tf.math.sin(tf.range(12000) / 48000 * 2 * np.pi * step)
audio = tf.reshape(tf.cast(sine_wave, tf.float32), [1, -1, 1])
tf.summary.audio("my_audio", audio, sample_rate=48000, step=step)

Share