写在前面

眨眼间,一周的时间从指间悄然流逝。今天要做的是“基于神经网络的手写数字识别”。2021.12.6

基于神经网络的手写数字识别

实验目的

掌握神经网络的设计原理,熟练掌握神经网络的训练和使用方法,能够使用Python语言,针对手写数字分类的训练和使用,实现一个三层全连接神经网络模型。具体包括:
1)实现三层神经网络模型来进行手写数字分类,建立一个简单而完整的神经网络工程。通过本实验理解神经网络中基本模块的作用和模块间的关系,为后续建立更复杂的神经网络实验奠定基础。
2)利用Python实现神经网络基本单元的前向传播(正向传播)和反向传播,加深对神经网络中基本单元的理解,包括全连接层、激活函数、损失函数等基本单元。
3)利用Python实现神经网络的构建和训练,实现神经网络所使用的梯度下降算法,加深对神经网络训练过程的理解。

背景知识

神经网络的组成

一个完整的神经网络通常由多个基本的网络层堆叠而成。本实验中的三层全连接神经网络由三个全连接层构成,在每两个全连接层之间插入 ReLU 激活函数以引入非线性变换,最后使用 Softmax 层计算交叉熵损失,如图 2.1 所示。因此本实验中使用的基本单元包括全连接层、ReLU 激活函数、Softmax 损失函数。

全连接层

全连接层以一维向量作为输入,输入与权重相乘后再与偏置相加得到输出向量。假设全连接层的输入为 m维列向量 x,输出为 n 维列向量 y。

用于手写数字分类的三层全连接神经网络

全连接层的权重 W 是二维矩阵,维度为 m×n,偏置 b 是 n 维列向量。前向传播时,全连接层的输出的计算公式为(注意偏置可以是向量,计算每一个输出使用不同的值;偏置也可以是一个标量,计算同一层的输出使用同一个值)

全连接层

UReLU激活函数

UReLU激活函数

Softmax 损失层

图1

图2

神经网络训练

图3

图4

图5

图6

示例代码

数据集

数据集采用 MNIST 手写数字库(老师直接提供,也可在 http://yann.lecun.com/exdb/mnist/自行下载)。该数据集包含一个训练集和一个测试集,其中训练集有 60000 个样本,测试集有 10000 个样本。每个样本都由灰度图像(即单通道图像)及其标记组成,图像大小为 28×28。MNIST 数据集包含 4 个文件,分别是训练集图像、训练集标记、测试集图像、测试集标记 。

总体设计

设计一个三层神经网络实现手写数字图像分类。该网络包含两个隐层和一个输出层,其中输入神经元个数由输入数据维度决定,输出层的神经元个数由数据集包含的类别决定,两个隐层的神经元个数可以作为超参数自行设置。对于手写数字图像的分类问题,输入数据为手写数字图像,原始图像一般可表示为二维矩阵(灰度图像)或三维矩阵(彩色图像),在输入神经网络前会将图像矩阵调整为一维向量作为输入。待分类的类别数一般是提前预设的,如手写数字包含 0 至 9 共 10 个类别,则神经网络的输出神经元个数为 10。

为了便于迭代开发,工程实现时采用模块化的方式来实现整个神经网络的处理,共划分为5大模块:

1)数据加载模块:从文件中读取数据,并进行预处理,其中预处理包括归一化、维度变换等处理。如果需要人为对数据进行随机数据扩增,则数据扩增处理也在数据加载模块中实现。
2)基本单元模块:实现神经网络中不同类型的网络层的定义、前向传播、反向传播等功能。
3)网络结构模块:利用基本单元模块建一个完整的神经网络。
4)网络训练(training)模块:用训练集对神经网络进行训练。对建立的神经网络结构,实现神经网络的前向传播、神经网络的反向传播、对神经网络进行参数更新、保存神经网络参数等基本操作,以及训练函数主体。
5)网络推断(inference)模块:使用训练得到的网络模型,对测试样本进行预测(也称为测试或推断)。具体操作包括加载训练得到的模型参数、神经网络的前向传播等。

数据加载模块

本实验釆用的数据集是MNIST手写数字库。该数据集中的图像数据和标记数据采用表2.1中的IDX文件格式存放。图像的像素值按行优先顺序存放,取值范围为[0,255],其中0表示黑色,255表示白色。

表  MNIST 数据集 IDX 文件格式

首先编写读取 MNIST 数据集文件并预处理的子函数,程序示例如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def load_mnist(self, file_dir, is_images = 'True'):
# Read binary data
bin_file = open(file_dir, 'rb')
bin_data = bin_file.read()
bin_file.close()
# Analysis file header
if is_images:
# Read images
fmt_header = '>iiii'
magic, num_images, num_rows, num_cols = struct.unpack_from(fmt_header, bin_data, 0)
else:
# Read labels
fmt_header = '>ii'
magic, num_images = struct.unpack_from(fmt_header, bin_data, 0)
num_rows, num_cols = 1, 1
data_size = num_images * num_rows * num_cols
mat_data = struct.unpack_from('>' + str(data_size) + 'B', bin_data, struct.calcsize(fmt_header))
mat_data = np.reshape(mat_data, [num_images, num_rows * num_cols])
print('Load images from %s, number: %d, data shape: %s' % (file_dir, num_images, str(mat_data.shape)))
return mat_data

然后调用该子函数对 MN1ST 数据集中的 4 个文件分别进行读取和预处理,并将处理过的训练和测试数据存储在 NumPy矩阵中(训练模型时可以快速读取该矩阵中的数据)。实现该功能的程序示例如下 所示。

1
2
3
4
5
6
7
8
9
10
def load_data(self):
# TODO: 调用函数 load_mnist 读取和预处理 MNIST 中训练数据和测试数据的图像和标记
print('Loading MNIST data from files...')
train_images = self.load_mnist(os.path.join(MNIST_DIR, TRAIN_DATA), True)
train_labels = #Your Code Here#
test_images = #Your Code Here#
test_labels = #Your Code Here#
self.train_data = np.append(train_images, train_labels, axis=1)
self.test_data = np.append(test_images, test_labels, axis=1)
# self.test_data = np.concatenate((self.train_data, self.test_data), axis=0)

TODO 提示:代码中已有如下定义,直接按照 train_images 的代码套用即可:
TRAIN_DATA = “train-images-idx3-ubyte”
TRAIN_LABEL = “train-labels-idx1-ubyte”
TEST_DATA = “t10k-images-idx3-ubyte”
TEST_LABEL = “t10k-labels-idx1-ubyte”

基本单元模块

全连接层的实现示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class FullyConnectedLayer(object):
def __init__(self, num_input, num_output): # 全连接层初始化
self.num_input = num_input
self.num_output = num_output
print('\tFully connected layer with input %d, output %d.' % (self.num_input, self.num_output))
def init_param(self, std=0.01): # 参数初始化
self.weight = np.random.normal(loc=0.0, scale=std, size=(self.num_input, self.num_output))
self.bias = np.zeros([1, self.num_output])
def forward(self, input): # 前向传播计算
start_time = time.time()
self.input = input
# TODO:全连接层的前向传播,计算输出结果
self.output = #Your Code Here#
return self.output
def backward(self, top_diff): # 反向传播的计算
# TODO:全连接层的反向传播,计算参数梯度和本层损失
self.d_weight = #Your Code Here#
self.d_bias = #Your Code Here#
bottom_diff = #Your Code Here#
return bottom_diff
def update_param(self, lr): # 参数更新
# TODO:对全连接层参数利用参数进行更新
self.weight = #Your Code Here#
self.bias = #Your Code Here#
def load_param(self, weight, bias): # 参数加载
assert self.weight.shape == weight.shape
assert self.bias.shape == bias.shape
self.weight = weight
self.bias = bias
def save_param(self): # 参数保存
return self.weight, self.bias

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class ReLULayer(object):
def __init__(self):
print('\tReLU layer.')
def forward(self, input): # 前向传播的计算
start_time = time.time()
self.input = input
# TODO:ReLU层的前向传播,计算输出结果
output = #Your Code Here#
return output
def backward(self, top_diff): # 反向传播的计算
# TODO:ReLU层的反向传播,计算本层损失
bottom_diff = #Your Code Here#
bottom_diff[self.input<0] = 0
return bottom_diff

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class SoftmaxLossLayer(object):
def __init__(self):
print('\tSoftmax loss layer.')
def forward(self, input): # 前向传播的计算
# TODO:softmax 损失层的前向传播,计算输出结果
input_max = np.max(input, axis=1, keepdims=True)
input_exp = np.exp(input - input_max)
self.prob = #Your Code Here#
return self.prob
def get_loss(self, label): # 计算损失
self.batch_size = self.prob.shape[0]
self.label_onehot = np.zeros_like(self.prob)
self.label_onehot[np.arange(self.batch_size), label] = 1.0
loss = -np.sum(np.log(self.prob) * self.label_onehot) / self.batch_size
return loss
def backward(self): # 反向传播的计算
# TODO:softmax 损失层的反向传播,计算本层损失
bottom_diff = #Your Code Here#
return bottom_diff

网络结构模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class MNIST_MLP(object):
def __init__(self, batch_size=100, input_size=784, hidden1=32, hidden2=16, out_classes=10, lr=0.01, max_epoch=2, print_iter=100):
# 神经网络初始化
self.batch_size = batch_size
self.input_size = input_size
self.hidden1 = hidden1
self.hidden2 = hidden2
self.out_classes = out_classes
self.lr = lr
self.max_epoch = max_epoch
self.print_iter = print_iter
def build_model(self): # 建立网络结构
# TODO:建立三层神经网络结构
print('Building multi-layer perception model...')
self.fc1 = FullyConnectedLayer(self.input_size, self.hidden1)
self.relu1 = ReLULayer()
#Your Code Here#
#Your Code Here#
self.fc3 = FullyConnectedLayer(self.hidden2, self.out_classes)
self.softmax = SoftmaxLossLayer()
self.update_layer_list = [self.fc1, self.fc2, self.fc3]
def init_model(self):
print('Initializing parameters of each layer in MLP...')
for layer in self.update_layer_list:
layer.init_param()

网络训练( training)

神经网络训练流程如图2.9所示。在完成数据加载模块和网络结构模块实现之后,需要实现训练模块。本实验中三层神经网络的网络训练模块程序示例如图2.10所示。神经网络的训练模块通常拆解为若干步骤,包括神经网络的前向传播、神经网络的反向传播、神经网络参数更新、神经网络参数保存等基本操作。这些网络训练模块的基本操作以及训练主体用神经网络类的成员函数来定义:

神经网络训练流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def forward(self, input):  # 神经网络的前向传播
# TODO:神经网络的前向传播
h1 = self.fc1.forward(input)
h1 = self.relu1.forward(h1)
#Your Code Here#
#Your Code Here#
#Your Code Here#
prob = self.softmax.forward(h3)
return prob

def backward(self): # 神经网络的反向传播
# TODO:神经网络的反向传播
dloss = self.softmax.backward()
#Your Code Here#
#Your Code Here#
#Your Code Here#
dh1 = self.relu1.backward(dh2)
dh1 = self.fc1.backward(dh1)

def update(self, lr): # 神经网络的参数更新
for layer in self.update_layer_list:
layer.update_param(lr)

def train(self): # 训练函数
max_batch = self.train_data.shape[0] // self.batch_size
print('Start training...')
for idx_epoch in range(self.max_epoch):
self.shuffle_data()
for idx_batch in range(max_batch):
batch_images = self.train_data[idx_batch*self.batch_size:(idx_batch+1)*self.batch_size, :-1]
batch_labels = self.train_data[idx_batch*self.batch_size:(idx_batch+1)*self.batch_size, -1]
prob = self.forward(batch_images)
loss = self.softmax.get_loss(batch_labels)
self.backward()
self.update(self.lr)
if idx_batch % self.print_iter == 0:
print('Epoch %d, iter %d, loss: %.6f' % (idx_epoch, idx_batch, loss))

网络推断( inference )

整个神经网络推断流程如图2.11所示。完成神经网络的训练之后,可以用训练得到的模型对测试数据进行预测,以评估模型的精度。工程实现中同样常将一个神经网络的推断模块拆解为若干步骤,包括神经网络模型参数加载、前向传播、精度计算等基本操作。这些网络推断模块的基本操作以及推断主体用神经网络类的成员函数来定义:

•神经网络的前向传播:网络推断模块中的神经网络前向传播操作与网络训练模块中的前向传播操作完全一致,因此可以直接调用网络训练模块中的神经网络前向传播函数。
•神经网络参数加载:读取神经网络训练模块保存的模型参数文件,并加载有参数的网络层的参数值。
•神经网络推断函数主体:在进行神经网络推断前,需要从模型参数文件中加载神经网络的参数。在神经网络推断过程中,循环每次读取一定批量的测试数据,随后进行整个神经网络的前向传播计算得到神经网络的输出结果。得到整个测试数据集的输出结果后,与测试数据集的标记进行比对,利用相关的评价函数计算模型的精度,如手写数字分类问题使用分类平均正确率作为模型的评函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def load_model(self, param_dir): # 加载神经网络权值
print('Loading parameters from file ' + param_dir)
params = np.load(param_dir, allow_pickle=True).item()
self.fc1.load_param(params['w1'], params['b1'])
self.fc2.load_param(params['w2'], params['b2'])
self.fc3.load_param(params['w3'], params['b3'])
def evaluate(self): #推断函数
pred_results = np.zeros([self.test_data.shape[0]])
start_time = time.time()
for idx in range(self.test_data.shape[0]//self.batch_size):
batch_images = self.test_data[idx*self.batch_size:(idx+1)*self.batch_size, :-1]
prob = self.forward(batch_images)
end = time.time()
pred_labels = np.argmax(prob, axis=1)
pred_results[idx*self.batch_size:(idx+1)*self.batch_size] = pred_labels
print("All evaluate time: %f"%(time.time()-start_time))
accuracy = np.mean(pred_results == self.test_data[:,-1])
print('Accuracy in test set: %f' % accuracy)

完整实验流程

完成神经网络的各个模块之后,调用这些模块就可以实现用三层神经网络进行手写数字图像分类的完整流程。。首先实例化三层神经网络对应的类,指定神经网络的超参数,如每层的神经元个数。其次进行数据的加载和预处理。再调用网络结构模块建立神经网络,随后进行网络初始化,在该过程中网络结构模块会自动调用基本单元模块实例化神经网络中的每个层。然后调用网络训练模块训练整个网络,之后将训练得到的模型参数保存到文件中。最后从文件中读取训练得到的模型参数,之后调用网络推断模块测试网络的精度。

1
2
3
4
5
6
7
8
9
10
11
12
if __name__ == '__main__':
h1, h2, e = 32, 16, 1
mlp = MNIST_MLP(hidden1=h1, hidden2=h2, max_epoch=e)
mlp.load_data()
mlp.build_model()
mlp.init_model()
start_time = time.time()
mlp.train()
print("All train time: %f"%(time.time()-start_time))
mlp.save_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e))
mlp.load_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e))
mlp.evaluate()

实验评估

实验内容

1)请在代码中有TODO的地方填空,将程序补充完整,在报告中写出相应代码,并给出自己的理解。
2) mlp.load_data()执行到最后时,train_images、train_labels、test_images、test_labels 的维度是多少?即多少行多少列,用(x,y)来表示。self.train_data 和 self.test_data 的维度是多少?
3)本案例中的神经网络一共有几层?每层有多少个神经元?如果要增加或减少层数,应该怎么做(简单描述即可不用编程)?如果要增加或减少某一层的节点,应该怎么做(简单描述)?如果要把 softmax 换成 sigmoid,应该怎么做(简单描述)?
4) 在 train()函数中,max_batch = self.train_data.shape[0] // self.batch_size 这一句的意义是什么?self.shuffle_data()的意义是什么?

5)最 终 evaluate()函数输出的 Accuracy in test set 是多少?请想办法提高该数值。本小题的评估标准设定如下:
• 60 分标准:给定全连接层、ReLU 层、Softmax 损失层的前向传播的输入矩阵、参数值、反向
传播的输入,可以得到正确的前向传播的输出矩阵、反向传播的输出和参数梯度。
• 80 分标准:实现正确的三层神经网络,并进行训练和推断,使最后训练得到的模型在 MNIST
测试数据集上的平均分类正确率高于 92%。
• 90 分标准:实现正确的三层神经网络,并进行训练和推断,调整和训练相关的超参数,使最后
训练得到的模型在 MNIST 测试数据集上的平均分类正确率高于 95%。
• 100 分标准:在三层神经网络基础上设计自己的神经网络结构,并进行训练和推断,使最后训
练得到的模型在 MN1ST测试数据集上的平均分类正确率高于 98%。

实验结果与分析

步骤解析

数据加载模块

1
2
3
4
5
6
7
8
9
10
def load_data(self):
# TODO: 调用函数 load_mnist 读取和预处理 MNIST 中训练数据和测试数据的图像和标记
print('Loading MNIST data from files...')
train_images = self.load_mnist(os.path.join(MNIST_DIR, TRAIN_DATA), True)
train_labels = self.load_mnist(os.path.join(MNIST_DIR,TRAIN_LABEL),False)
test_images = self.load_mnist(os.path.join(MNIST_DIR,TEST_DATA),True)
test_labels = self.load_mnist(os.path.join(MNIST_DIR,TEST_LABEL ),False)
self.train_data = np.append(train_images, train_labels, axis=1)
self.test_data = np.append(test_images, test_labels, axis=1)
# self.test_data = np.concatenate((self.train_data, self.test_data), axis=0)

加载数据集,train_labels、train_images、test_images、test_labels。MNIST_DIR用于定位文件,True代表加载图像,False代表加载标签。

基本单元模块

全连接层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class FullyConnectedLayer(object):
def __init__(self, num_input, num_output): # 全连接层初始化
self.num_input = num_input
self.num_output = num_output
print('\tFully connected layer with input %d, output %d.' % (self.num_input, self.num_output))
def init_param(self, std=0.01): # 参数初始化
self.weight = np.random.normal(loc=0.0, scale=std, size=(self.num_input, self.num_output))
self.bias = np.zeros([1, self.num_output])
def forward(self, input): # 前向传播计算
start_time = time.time()
self.input = input
# TODO:全连接层的前向传播,计算输出结果
self.output = #Your Code Here#
return self.output
def backward(self, top_diff): # 反向传播的计算
# TODO:全连接层的反向传播,计算参数梯度和本层损失

#Your Code Here#
self.d_weight = np.dot(self.input.T,top_diff)
self.d_bias = top_diff
bottom_diff = np.dot(top_diff,self.weight.T)
#Your Code Here#

return bottom_diff
def update_param(self, lr): # 参数更新
# TODO:对全连接层参数利用参数进行更新

#Your Code Here#
self.weight = self.weight-lr*self.d_weight
self.bias = self.bias-lr*self.d_bias
#Your Code Here#

def load_param(self, weight, bias): # 参数加载
assert self.weight.shape == weight.shape
assert self.bias.shape == bias.shape
self.weight = weight
self.bias = bias
def save_param(self): # 参数保存
return self.weight, self.bias
ReLU层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ReLULayer(object):
def __init__(self):
print('\tReLU layer.')
def forward(self, input): # 前向传播的计算
start_time = time.time()
self.input = input
# TODO:ReLU层的前向传播,计算输出结果

#Your Code Here#
output = np.maximum(self.input,0)
#Your Code Here#

return output
def backward(self, top_diff): # 反向传播的计算
# TODO:ReLU层的反向传播,计算本层损失

#Your Code Here#
bottom_diff = top_diff
#Your Code Here#

bottom_diff[self.input<0] = 0
return bottom_diff
Softmax层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class SoftmaxLossLayer(object):
def __init__(self):
print('\tSoftmax loss layer.')
def forward(self, input): # 前向传播的计算
# TODO:softmax 损失层的前向传播,计算输出结果
input_max = np.max(input, axis=1, keepdims=True)
input_exp = np.exp(input - input_max)

#Your Code Here#
self.prob = input_exp/np.tile(np.sum(input_exp,axis=1),(10,1)).T
#Your Code Here#

return self.prob
def get_loss(self, label): # 计算损失
self.batch_size = self.prob.shape[0]
self.label_onehot = np.zeros_like(self.prob)
self.label_onehot[np.arange(self.batch_size), label] = 1.0
loss = -np.sum(np.log(self.prob) * self.label_onehot) / self.batch_size
return loss
def backward(self): # 反向传播的计算
# TODO:softmax 损失层的反向传播,计算本层损失

#Your Code Here#
bottom_diff = (self.prob-self.label_onehot)/ self.batch_size
#Your Code Here#

return bottom_diff

网络结构模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class MNIST_MLP(object):
def __init__(self, batch_size=100, input_size=784, hidden1=32, hidden2=16, out_classes=10, lr=0.01, max_epoch=2, print_iter=100):
# 神经网络初始化
self.batch_size = batch_size
self.input_size = input_size
self.hidden1 = hidden1
self.hidden2 = hidden2
self.out_classes = out_classes
self.lr = lr
self.max_epoch = max_epoch
self.print_iter = print_iter
def build_model(self): # 建立网络结构
# TODO:建立三层神经网络结构
print('Building multi-layer perception model...')
self.fc1 = FullyConnectedLayer(self.input_size, self.hidden1)
self.relu1 = ReLULayer()

#Your Code Here#
self.fc2 = FullyConnectedLayer(self.hidden1,self.hidden2)
self.relu2 = ReLULayer()
#Your Code Here#

self.fc3 = FullyConnectedLayer(self.hidden2, self.out_classes)
self.softmax = SoftmaxLossLayer()
self.update_layer_list = [self.fc1, self.fc2, self.fc3]
def init_model(self):
print('Initializing parameters of each layer in MLP...')
for layer in self.update_layer_list:
layer.init_param()

网络训练模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def forward(self, input):  # 神经网络的前向传播
# TODO:神经网络的前向传播
h1 = self.fc1.forward(input)
h1 = self.relu1.forward(h1)
#Your Code Here#
h2 = self.fc2.forward(h1)
h2 = self.relu2.forward(h2)
#Your Code Here#

prob = self.softmax.forward(h3)
return prob

def backward(self): # 神经网络的反向传播
# TODO:神经网络的反向传播
dloss = self.softmax.backward()

#Your Code Here#
dh2 = self.fc3.backward(dloss)
dh2 = self.relu2.backward(dh2)
dh1 = self.fc2.backward(dh2)
#Your Code Here#

dh1 = self.relu1.backward(dh2)
dh1 = self.fc1.backward(dh1)

def update(self, lr): # 神经网络的参数更新
for layer in self.update_layer_list:
layer.update_param(lr)

def train(self): # 训练函数
max_batch = self.train_data.shape[0] // self.batch_size
print('Start training...')
for idx_epoch in range(self.max_epoch):
self.shuffle_data()
for idx_batch in range(max_batch):
batch_images = self.train_data[idx_batch*self.batch_size:(idx_batch+1)*self.batch_size, :-1]
batch_labels = self.train_data[idx_batch*self.batch_size:(idx_batch+1)*self.batch_size, -1]
prob = self.forward(batch_images)
loss = self.softmax.get_loss(batch_labels)
self.backward()
self.update(self.lr)
if idx_batch % self.print_iter == 0:
print('Epoch %d, iter %d, loss: %.6f' % (idx_epoch, idx_batch, loss))

shuffle的作用及max_batch的意义

max_batch = self.train_data.shape[0] // self.batch_size 表示分组,即一个样本计算一次偏导更新一次权值还是多个样本计算一次偏导,更新一次权值。

以猫狗分类为例, 假如数据集是

Dog,Dog,Dog,… ,Dog,Dog,Dog,Cat,Cat,Cat,Cat,… ,Cat,Cat

所有的狗都在猫前面,如果不shuffle,模型训练一段时间内只看到了Dog,必然会过拟合于Dog,一段时间内又只能看到Cat,必然又过拟合于Cat,这样的模型泛化能力必然很差。 那如果Dog和Cat一直交替,会不会就不过拟合了呢?

Dog,Cat,Dog,Cat,Dog ,Cat,Dog,…

依然会过拟合,模型是会记住训练数据路线的,为什么呢?

当用随机梯度下降法训练神经网络时,通常的做法是洗牌数据。在纠结细节的情况下,让我们用一个极端的例子来解释为什么shuffle是有用的。假设你正在训练一个分类器来区分猫和狗,你的训练集是50,000只猫后面跟着50,000只狗。如果你不洗牌,你的训练成绩就会很差。 严格地说,这个问题是由梯度噪声中的序列相关性和参数更新的不可交换性引起的。首先我们需要明白固定的数据集顺序,意味着给定迭代步,对应此迭代步的训练数据是固定的。 假如目标函数是J=f(w,b)J=f(w,b),使用梯度下降优化JJ。给定权重取值w、bw、b和迭代步step的情况下,固定的数据集顺序意味着固定的训练样本,也就意味着权值更新的方向是固定的,而无顺序的数据集,意味着更新方向是随机的。所以固定的数据集顺序,严重限制了梯度优化方向的可选择性,导致收敛点选择空间严重变少,容易导致过拟合。所以模型是会记住数据路线的,所以shuffle很重要,一定shuffle。

摘自 https://deepindeed.cn/2019/12/23/Data-Shuffle/

完整代码:

全连接神经网络

layers_1.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# coding=utf-8
import numpy as np
import struct
import os
import time

class FullyConnectedLayer(object):
def __init__(self, num_input, num_output): # 全连接层初始化
self.num_input = num_input
self.num_output = num_output
print('\tFully connected layer with input %d, output %d.' % (self.num_input, self.num_output))
def init_param(self, std=0.01): # 参数初始化
self.weight = np.random.normal(loc=0.0, scale=std, size=(self.num_input, self.num_output))
self.bias = np.zeros([1, self.num_output])
def forward(self, input): # 前向传播计算
start_time = time.time()
self.input = input
# TODO:全连接层的前向传播,计算输出结果
# 修改代码处
self.output = np.matmul(self.input,self.weight)+self.bias
return self.output
def backward(self, top_diff): # 反向传播的计算
# TODO:全连接层的反向传播,计算参数梯度和本层损失
# 修改代码处
self.d_weight = np.dot(self.input.T,top_diff)
self.d_bias = top_diff
bottom_diff = np.dot(top_diff,self.weight.T)
# 修改代码处
return bottom_diff
def update_param(self, lr): # 参数更新
# TODO:对全连接层参数利用参数进行更新
# 修改代码处
self.weight = self.weight-lr*self.d_weight
self.bias = self.bias-lr*self.d_bias
# 修改代码处
def load_param(self, weight, bias): # 参数加载
assert self.weight.shape == weight.shape
assert self.bias.shape == bias.shape
self.weight = weight
self.bias = bias
def save_param(self): # 参数保存
return self.weight, self.bias

class ReLULayer(object):
def __init__(self):
print('\tReLU layer.')
def forward(self, input): # 前向传播的计算
start_time = time.time()
self.input = input
# TODO:ReLU层的前向传播,计算输出结果
output = np.maximum(self.input,0)
return output
def backward(self, top_diff): # 反向传播的计算
# TODO:ReLU层的反向传播,计算本层损失
bottom_diff = top_diff
bottom_diff[self.input<0] = 0
return bottom_diff

class SoftmaxLossLayer(object):
def __init__(self):
print('\tSoftmax loss layer.')
def forward(self, input): # 前向传播的计算
# TODO:softmax 损失层的前向传播,计算输出结果
input_max = np.max(input, axis=1, keepdims=True)
input_exp = np.exp(input - input_max)
self.prob = input_exp/np.tile(np.sum(input_exp,axis=1),(10,1)).T
return self.prob
def get_loss(self, label): # 计算损失
self.batch_size = self.prob.shape[0]
self.label_onehot = np.zeros_like(self.prob)
self.label_onehot[np.arange(self.batch_size), label] = 1.0
loss = -np.sum( self.label_onehot*np.log(self.prob)) / self.batch_size
return loss
def backward(self): # 反向传播的计算
# TODO:softmax 损失层的反向传播,计算本层损失
bottom_diff = (self.prob-self.label_onehot)/ self.batch_size
return bottom_diff

mnist_mlp_cpu.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# coding=utf-8
import numpy as np
import struct
import os
import time

from layers_1 import FullyConnectedLayer, ReLULayer, SoftmaxLossLayer

MNIST_DIR = "./mnist_data"
TRAIN_DATA = "train-images-idx3-ubyte"
TRAIN_LABEL = "train-labels-idx1-ubyte"
TEST_DATA = "t10k-images-idx3-ubyte"
TEST_LABEL = "t10k-labels-idx1-ubyte"

def show_matrix(mat, name):
#print(name + str(mat.shape) + ' mean %f, std %f' % (mat.mean(), mat.std()))
pass

class MNIST_MLP(object):
def __init__(self, batch_size=100, input_size=784, hidden1=32, hidden2=16, out_classes=10, lr=0.01, max_epoch=2, print_iter=100):
# 神经网络初始化
self.batch_size = batch_size
self.input_size = input_size
self.hidden1 = hidden1
self.hidden2 = hidden2
self.out_classes = out_classes
self.lr = lr
self.max_epoch = max_epoch
self.print_iter = print_iter

def load_mnist(self, file_dir, is_images = 'True'):
# Read binary data
bin_file = open(file_dir, 'rb')
bin_data = bin_file.read()
bin_file.close()
# Analysis file header
if is_images:
# Read images
fmt_header = '>iiii'
magic, num_images, num_rows, num_cols = struct.unpack_from(fmt_header, bin_data, 0)
else:
# Read labels
fmt_header = '>ii'
magic, num_images = struct.unpack_from(fmt_header, bin_data, 0)
num_rows, num_cols = 1, 1
data_size = num_images * num_rows * num_cols
mat_data = struct.unpack_from('>' + str(data_size) + 'B', bin_data, struct.calcsize(fmt_header))
mat_data = np.reshape(mat_data, [num_images, num_rows * num_cols])
print('Load images from %s, number: %d, data shape: %s' % (file_dir, num_images, str(mat_data.shape)))
return mat_data

def load_data(self):
# TODO: 调用函数 load_mnist 读取和预处理 MNIST 中训练数据和测试数据的图像和标记
print('Loading MNIST data from files...')
train_images = self.load_mnist(os.path.join(MNIST_DIR, TRAIN_DATA), True)
train_labels = self.load_mnist(os.path.join(MNIST_DIR,TRAIN_LABEL),False)
test_images = self.load_mnist(os.path.join(MNIST_DIR,TEST_DATA),True)
test_labels = self.load_mnist(os.path.join(MNIST_DIR,TEST_LABEL ),False)
self.train_data = np.append(train_images, train_labels, axis=1)
self.test_data = np.append(test_images, test_labels, axis=1)
# self.test_data = np.concatenate((self.train_data, self.test_data), axis=0)

def shuffle_data(self):
print('Randomly shuffle MNIST data...')
np.random.shuffle(self.train_data)

def build_model(self): # 建立网络结构
# TODO:建立三层神经网络结构
print('Building multi-layer perception model...')
self.fc1 = FullyConnectedLayer(self.input_size, self.hidden1)
self.relu1 = ReLULayer()
self.fc2 = FullyConnectedLayer(self.hidden1,self.hidden2)
self.relu2 = ReLULayer()
self.fc3 = FullyConnectedLayer(self.hidden2, self.out_classes)
self.softmax = SoftmaxLossLayer()
self.update_layer_list = [self.fc1, self.fc2, self.fc3]

def init_model(self):
print('Initializing parameters of each layer in MLP...')
for layer in self.update_layer_list:
layer.init_param()

def load_model(self, param_dir): # 加载神经网络权值
print('Loading parameters from file ' + param_dir)
params = np.load(param_dir, allow_pickle=True).item()
self.fc1.load_param(params['w1'], params['b1'])
self.fc2.load_param(params['w2'], params['b2'])
self.fc3.load_param(params['w3'], params['b3'])

def save_model(self, param_dir):
print('Saving parameters to file ' + param_dir)
params = {}
params['w1'], params['b1'] = self.fc1.save_param()
params['w2'], params['b2'] = self.fc2.save_param()
params['w3'], params['b3'] = self.fc3.save_param()
np.save(param_dir, params)

def forward(self, input): # 神经网络的前向传播
# TODO:神经网络的前向传播
h1 = self.fc1.forward(input)
h1 = self.relu1.forward(h1)
h2 = self.fc2.forward(h1)
h2 = self.relu2.forward(h2)
h3 = self.fc3.forward(h2)
self.prob = self.softmax.forward(h3)
return self.prob

def backward(self): # 神经网络的反向传播
# TODO:神经网络的反向传播
dloss = self.softmax.backward()
dh2 = self.fc3.backward(dloss)
dh2 = self.relu2.backward(dh2)
dh1 = self.fc2.backward(dh2)
dh1 = self.relu1.backward(dh1)
dh1 = self.fc1.backward(dh1)

def update(self, lr): # 神经网络的参数更新
for layer in self.update_layer_list:
layer.update_param(lr)

def train(self): # 训练函数
max_batch = self.train_data.shape[0] // self.batch_size
print('Start training...')
for idx_epoch in range(self.max_epoch):
self.shuffle_data()
for idx_batch in range(max_batch):
batch_images = self.train_data[idx_batch*self.batch_size:(idx_batch+1)*self.batch_size, :-1]
batch_labels = self.train_data[idx_batch*self.batch_size:(idx_batch+1)*self.batch_size, -1]
prob = self.forward(batch_images)
loss = self.softmax.get_loss(batch_labels)
self.backward()
self.update(self.lr)
if idx_batch % self.print_iter == 0:
print('Epoch %d, iter %d, loss: %.6f' % (idx_epoch, idx_batch, loss))

def evaluate(self): #推断函数
pred_results = np.zeros([self.test_data.shape[0]])
start_time = time.time()
for idx in range(self.test_data.shape[0]//self.batch_size):
batch_images = self.test_data[idx*self.batch_size:(idx+1)*self.batch_size, :-1]
prob = self.forward(batch_images)
end = time.time()
pred_labels = np.argmax(prob, axis=1)
pred_results[idx*self.batch_size:(idx+1)*self.batch_size] = pred_labels
print("All evaluate time: %f"%(time.time()-start_time))
accuracy = np.mean(pred_results == self.test_data[:,-1])
print('Accuracy in test set: %f' % accuracy)


if __name__ == '__main__':
h1, h2, e = 32, 16, 1
mlp = MNIST_MLP(hidden1=h1, hidden2=h2, max_epoch=e)
mlp.load_data()
mlp.build_model()
mlp.init_model()
start_time = time.time()
mlp.train()
print("All train time: %f"%(time.time()-start_time))
mlp.save_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e))
mlp.load_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e))
mlp.evaluate()

卷积神经网络

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense,Dropout,Flatten,Convolution2D,MaxPooling2D
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np

# 载入数据集
mnist = tf.keras.datasets.mnist
(x_train,y_train),(x_test,y_test) = mnist.load_data()

x_train = x_train.reshape(-1,28,28,1)/255.0
x_test = x_test.reshape(-1,28,28,1)/255.0

y_train = tf.keras.utils.to_categorical(y_train,num_classes=10)
y_test = tf.keras.utils.to_categorical(y_test,num_classes=10)

model = Sequential()

model.add(Convolution2D(
input_shape = (28,28,1),
filters = 32,
kernel_size = 5,
strides = 1,
padding = 'same',
activation = 'relu'
))

model.add(MaxPooling2D(
pool_size = 2,
strides =2,
padding = 'same'
))

model.add(Convolution2D(64,5,strides = 1,padding = 'same',activation = 'relu'))
model.add(MaxPooling2D(2,2,'same'))

model.add(Flatten())
model.add(Dense(1024,activation='relu'))

model.add(Dropout(0.5))
model.add(Dense(10,activation = 'softmax'))

adam = Adam(lr = 1e-4)
model.compile(optimizer=adam,loss='categorical_crossentropy',metrics=['accuracy'])
model.fit(x_train,y_train,batch_size = 64,epochs = 10,validation_data = (x_test,y_test))

model.save('mnist.h5')

效果:

卷积神经网络