写在前面 眨眼间,一周的时间从指间悄然流逝。今天要做的是“基于神经网络的手写数字识别”。2021.12.6
基于神经网络的手写数字识别 实验目的 掌握神经网络的设计原理,熟练掌握神经网络的训练和使用方法,能够使用Python语言,针对手写数字分类的训练和使用,实现一个三层全连接神经网络模型。具体包括: 1)实现三层神经网络模型来进行手写数字分类,建立一个简单而完整的神经网络工程。通过本实验理解神经网络中基本模块的作用和模块间的关系,为后续建立更复杂的神经网络实验奠定基础。 2)利用Python实现神经网络基本单元的前向传播(正向传播)和反向传播,加深对神经网络中基本单元的理解,包括全连接层、激活函数、损失函数等基本单元。 3)利用Python实现神经网络的构建和训练,实现神经网络所使用的梯度下降算法,加深对神经网络训练过程的理解。
背景知识 神经网络的组成 一个完整的神经网络通常由多个基本的网络层堆叠而成。本实验中的三层全连接神经网络由三个全连接层构成,在每两个全连接层之间插入 ReLU 激活函数以引入非线性变换,最后使用 Softmax 层计算交叉熵损失,如图 2.1 所示。因此本实验中使用的基本单元包括全连接层、ReLU 激活函数、Softmax 损失函数。
全连接层 全连接层以一维向量作为输入,输入与权重相乘后再与偏置相加得到输出向量。假设全连接层的输入为 m维列向量 x,输出为 n 维列向量 y。
全连接层的权重 W 是二维矩阵,维度为 m×n,偏置 b 是 n 维列向量。前向传播时,全连接层的输出的计算公式为(注意偏置可以是向量,计算每一个输出使用不同的值;偏置也可以是一个标量,计算同一层的输出使用同一个值)
UReLU激活函数
Softmax 损失层
神经网络训练
示例代码 数据集 数据集采用 MNIST 手写数字库(老师直接提供,也可在 http://yann.lecun.com/exdb/mnist/自行下载)。该数据集包含一个训练集和一个测试集,其中训练集有 60000 个样本,测试集有 10000 个样本。每个样本都由灰度图像(即单通道图像)及其标记组成,图像大小为 28×28。MNIST 数据集包含 4 个文件,分别是训练集图像、训练集标记、测试集图像、测试集标记 。
总体设计 设计一个三层神经网络实现手写数字图像分类。该网络包含两个隐层和一个输出层,其中输入神经元个数由输入数据维度决定,输出层的神经元个数由数据集包含的类别决定,两个隐层的神经元个数可以作为超参数自行设置。对于手写数字图像的分类问题,输入数据为手写数字图像,原始图像一般可表示为二维矩阵(灰度图像)或三维矩阵(彩色图像),在输入神经网络前会将图像矩阵调整为一维向量作为输入。待分类的类别数一般是提前预设的,如手写数字包含 0 至 9 共 10 个类别,则神经网络的输出神经元个数为 10。
为了便于迭代开发,工程实现时采用模块化的方式来实现整个神经网络的处理,共划分为5大模块:
1)数据加载模块:从文件中读取数据,并进行预处理,其中预处理包括归一化、维度变换等处理。如果需要人为对数据进行随机数据扩增,则数据扩增处理也在数据加载模块中实现。 2)基本单元模块:实现神经网络中不同类型的网络层的定义、前向传播、反向传播等功能。 3)网络结构模块:利用基本单元模块建一个完整的神经网络。 4)网络训练(training)模块:用训练集对神经网络进行训练。对建立的神经网络结构,实现神经网络的前向传播、神经网络的反向传播、对神经网络进行参数更新、保存神经网络参数等基本操作,以及训练函数主体。 5)网络推断(inference)模块:使用训练得到的网络模型,对测试样本进行预测(也称为测试或推断)。具体操作包括加载训练得到的模型参数、神经网络的前向传播等。
数据加载模块 本实验釆用的数据集是MNIST手写数字库。该数据集中的图像数据和标记数据采用表2.1中的IDX文件格式存放。图像的像素值按行优先顺序存放,取值范围为[0,255],其中0表示黑色,255表示白色。
首先编写读取 MNIST 数据集文件并预处理的子函数,程序示例如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def load_mnist (self, file_dir, is_images = 'True' ): bin_file = open (file_dir, 'rb' ) bin_data = bin_file.read() bin_file.close() if is_images: fmt_header = '>iiii' magic, num_images, num_rows, num_cols = struct.unpack_from(fmt_header, bin_data, 0 ) else : fmt_header = '>ii' magic, num_images = struct.unpack_from(fmt_header, bin_data, 0 ) num_rows, num_cols = 1 , 1 data_size = num_images * num_rows * num_cols mat_data = struct.unpack_from('>' + str (data_size) + 'B' , bin_data, struct.calcsize(fmt_header)) mat_data = np.reshape(mat_data, [num_images, num_rows * num_cols]) print('Load images from %s, number: %d, data shape: %s' % (file_dir, num_images, str (mat_data.shape))) return mat_data
然后调用该子函数对 MN1ST 数据集中的 4 个文件分别进行读取和预处理,并将处理过的训练和测试数据存储在 NumPy矩阵中(训练模型时可以快速读取该矩阵中的数据)。实现该功能的程序示例如下 所示。
1 2 3 4 5 6 7 8 9 10 def load_data (self ): print('Loading MNIST data from files...' ) train_images = self.load_mnist(os.path.join(MNIST_DIR, TRAIN_DATA), True ) train_labels = test_images = test_labels = self.train_data = np.append(train_images, train_labels, axis=1 ) self.test_data = np.append(test_images, test_labels, axis=1 )
TODO 提示:代码中已有如下定义,直接按照 train_images 的代码套用即可: TRAIN_DATA = “train-images-idx3-ubyte” TRAIN_LABEL = “train-labels-idx1-ubyte” TEST_DATA = “t10k-images-idx3-ubyte” TEST_LABEL = “t10k-labels-idx1-ubyte”
基本单元模块
全连接层的实现示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class FullyConnectedLayer (object ): def __init__ (self, num_input, num_output ): self.num_input = num_input self.num_output = num_output print('\tFully connected layer with input %d, output %d.' % (self.num_input, self.num_output)) def init_param (self, std=0.01 ): self.weight = np.random.normal(loc=0.0 , scale=std, size=(self.num_input, self.num_output)) self.bias = np.zeros([1 , self.num_output]) def forward (self, input ): start_time = time.time() self.input = input self.output = return self.output def backward (self, top_diff ): self.d_weight = self.d_bias = bottom_diff = return bottom_diff def update_param (self, lr ): self.weight = self.bias = def load_param (self, weight, bias ): assert self.weight.shape == weight.shape assert self.bias.shape == bias.shape self.weight = weight self.bias = bias def save_param (self ): return self.weight, self.bias
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class ReLULayer (object ): def __init__ (self ): print('\tReLU layer.' ) def forward (self, input ): start_time = time.time() self.input = input output = return output def backward (self, top_diff ): bottom_diff = bottom_diff[self.input <0 ] = 0 return bottom_diff
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class SoftmaxLossLayer (object ): def __init__ (self ): print('\tSoftmax loss layer.' ) def forward (self, input ): input_max = np.max (input , axis=1 , keepdims=True ) input_exp = np.exp(input - input_max) self.prob = return self.prob def get_loss (self, label ): self.batch_size = self.prob.shape[0 ] self.label_onehot = np.zeros_like(self.prob) self.label_onehot[np.arange(self.batch_size), label] = 1.0 loss = -np.sum (np.log(self.prob) * self.label_onehot) / self.batch_size return loss def backward (self ): bottom_diff = return bottom_diff
网络结构模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class MNIST_MLP (object ): def __init__ (self, batch_size=100 , input_size=784 , hidden1=32 , hidden2=16 , out_classes=10 , lr=0.01 , max_epoch=2 , print_iter=100 ): self.batch_size = batch_size self.input_size = input_size self.hidden1 = hidden1 self.hidden2 = hidden2 self.out_classes = out_classes self.lr = lr self.max_epoch = max_epoch self.print_iter = print_iter def build_model (self ): print('Building multi-layer perception model...' ) self.fc1 = FullyConnectedLayer(self.input_size, self.hidden1) self.relu1 = ReLULayer() self.fc3 = FullyConnectedLayer(self.hidden2, self.out_classes) self.softmax = SoftmaxLossLayer() self.update_layer_list = [self.fc1, self.fc2, self.fc3] def init_model (self ): print('Initializing parameters of each layer in MLP...' ) for layer in self.update_layer_list: layer.init_param()
网络训练( training) 神经网络训练流程如图2.9所示。在完成数据加载模块和网络结构模块实现之后,需要实现训练模块。本实验中三层神经网络的网络训练模块程序示例如图2.10所示。神经网络的训练模块通常拆解为若干步骤,包括神经网络的前向传播、神经网络的反向传播、神经网络参数更新、神经网络参数保存等基本操作。这些网络训练模块的基本操作以及训练主体用神经网络类的成员函数来定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def forward (self, input ): h1 = self.fc1.forward(input ) h1 = self.relu1.forward(h1) prob = self.softmax.forward(h3) return probdef backward (self ): dloss = self.softmax.backward() dh1 = self.relu1.backward(dh2) dh1 = self.fc1.backward(dh1)def update (self, lr ): for layer in self.update_layer_list: layer.update_param(lr)def train (self ): max_batch = self.train_data.shape[0 ] // self.batch_size print('Start training...' ) for idx_epoch in range (self.max_epoch): self.shuffle_data() for idx_batch in range (max_batch): batch_images = self.train_data[idx_batch*self.batch_size:(idx_batch+1 )*self.batch_size, :-1 ] batch_labels = self.train_data[idx_batch*self.batch_size:(idx_batch+1 )*self.batch_size, -1 ] prob = self.forward(batch_images) loss = self.softmax.get_loss(batch_labels) self.backward() self.update(self.lr) if idx_batch % self.print_iter == 0 : print('Epoch %d, iter %d, loss: %.6f' % (idx_epoch, idx_batch, loss))
网络推断( inference ) 整个神经网络推断流程如图2.11所示。完成神经网络的训练之后,可以用训练得到的模型对测试数据进行预测,以评估模型的精度。工程实现中同样常将一个神经网络的推断模块拆解为若干步骤,包括神经网络模型参数加载、前向传播、精度计算等基本操作。这些网络推断模块的基本操作以及推断主体用神经网络类的成员函数来定义:
•神经网络的前向传播:网络推断模块中的神经网络前向传播操作与网络训练模块中的前向传播操作完全一致,因此可以直接调用网络训练模块中的神经网络前向传播函数。 •神经网络参数加载:读取神经网络训练模块保存的模型参数文件,并加载有参数的网络层的参数值。 •神经网络推断函数主体:在进行神经网络推断前,需要从模型参数文件中加载神经网络的参数。在神经网络推断过程中,循环每次读取一定批量的测试数据,随后进行整个神经网络的前向传播计算得到神经网络的输出结果。得到整个测试数据集的输出结果后,与测试数据集的标记进行比对,利用相关的评价函数计算模型的精度,如手写数字分类问题使用分类平均正确率作为模型的评函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def load_model (self, param_dir ): print('Loading parameters from file ' + param_dir) params = np.load(param_dir, allow_pickle=True ).item() self.fc1.load_param(params['w1' ], params['b1' ]) self.fc2.load_param(params['w2' ], params['b2' ]) self.fc3.load_param(params['w3' ], params['b3' ]) def evaluate (self ): pred_results = np.zeros([self.test_data.shape[0 ]]) start_time = time.time() for idx in range (self.test_data.shape[0 ]//self.batch_size): batch_images = self.test_data[idx*self.batch_size:(idx+1 )*self.batch_size, :-1 ] prob = self.forward(batch_images) end = time.time() pred_labels = np.argmax(prob, axis=1 ) pred_results[idx*self.batch_size:(idx+1 )*self.batch_size] = pred_labels print("All evaluate time: %f" %(time.time()-start_time)) accuracy = np.mean(pred_results == self.test_data[:,-1 ]) print('Accuracy in test set: %f' % accuracy)
完整实验流程 完成神经网络的各个模块之后,调用这些模块就可以实现用三层神经网络进行手写数字图像分类的完整流程。。首先实例化三层神经网络对应的类,指定神经网络的超参数,如每层的神经元个数。其次进行数据的加载和预处理。再调用网络结构模块建立神经网络,随后进行网络初始化,在该过程中网络结构模块会自动调用基本单元模块实例化神经网络中的每个层。然后调用网络训练模块训练整个网络,之后将训练得到的模型参数保存到文件中。最后从文件中读取训练得到的模型参数,之后调用网络推断模块测试网络的精度。
1 2 3 4 5 6 7 8 9 10 11 12 if __name__ == '__main__' : h1, h2, e = 32 , 16 , 1 mlp = MNIST_MLP(hidden1=h1, hidden2=h2, max_epoch=e) mlp.load_data() mlp.build_model() mlp.init_model() start_time = time.time() mlp.train() print("All train time: %f" %(time.time()-start_time)) mlp.save_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e)) mlp.load_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e)) mlp.evaluate()
实验评估
实验内容 1)请在代码中有TODO的地方填空,将程序补充完整,在报告中写出相应代码,并给出自己的理解。 2) mlp.load_data()执行到最后时,train_images、train_labels、test_images、test_labels 的维度是多少?即多少行多少列,用(x,y)来表示。self.train_data 和 self.test_data 的维度是多少? 3)本案例中的神经网络一共有几层?每层有多少个神经元?如果要增加或减少层数,应该怎么做(简单描述即可不用编程)?如果要增加或减少某一层的节点,应该怎么做(简单描述)?如果要把 softmax 换成 sigmoid,应该怎么做(简单描述)? 4) 在 train()函数中,max_batch = self.train_data.shape[0] // self.batch_size 这一句的意义是什么?self.shuffle_data()的意义是什么?
5)最 终 evaluate()函数输出的 Accuracy in test set 是多少?请想办法提高该数值。本小题的评估标准设定如下: • 60 分标准:给定全连接层、ReLU 层、Softmax 损失层的前向传播的输入矩阵、参数值、反向 传播的输入,可以得到正确的前向传播的输出矩阵、反向传播的输出和参数梯度。 • 80 分标准:实现正确的三层神经网络,并进行训练和推断,使最后训练得到的模型在 MNIST 测试数据集上的平均分类正确率高于 92%。 • 90 分标准:实现正确的三层神经网络,并进行训练和推断,调整和训练相关的超参数,使最后 训练得到的模型在 MNIST 测试数据集上的平均分类正确率高于 95%。 • 100 分标准:在三层神经网络基础上设计自己的神经网络结构,并进行训练和推断,使最后训 练得到的模型在 MN1ST测试数据集上的平均分类正确率高于 98%。
实验结果与分析 步骤解析 数据加载模块 1 2 3 4 5 6 7 8 9 10 def load_data (self ): print('Loading MNIST data from files...' ) train_images = self.load_mnist(os.path.join(MNIST_DIR, TRAIN_DATA), True ) train_labels = self.load_mnist(os.path.join(MNIST_DIR,TRAIN_LABEL),False ) test_images = self.load_mnist(os.path.join(MNIST_DIR,TEST_DATA),True ) test_labels = self.load_mnist(os.path.join(MNIST_DIR,TEST_LABEL ),False ) self.train_data = np.append(train_images, train_labels, axis=1 ) self.test_data = np.append(test_images, test_labels, axis=1 )
加载数据集,train_labels、train_images、test_images、test_labels。MNIST_DIR用于定位文件,True代表加载图像,False代表加载标签。
基本单元模块 全连接层 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class FullyConnectedLayer (object ): def __init__ (self, num_input, num_output ): self.num_input = num_input self.num_output = num_output print('\tFully connected layer with input %d, output %d.' % (self.num_input, self.num_output)) def init_param (self, std=0.01 ): self.weight = np.random.normal(loc=0.0 , scale=std, size=(self.num_input, self.num_output)) self.bias = np.zeros([1 , self.num_output]) def forward (self, input ): start_time = time.time() self.input = input self.output = return self.output def backward (self, top_diff ): self.d_weight = np.dot(self.input .T,top_diff) self.d_bias = top_diff bottom_diff = np.dot(top_diff,self.weight.T) return bottom_diff def update_param (self, lr ): self.weight = self.weight-lr*self.d_weight self.bias = self.bias-lr*self.d_bias def load_param (self, weight, bias ): assert self.weight.shape == weight.shape assert self.bias.shape == bias.shape self.weight = weight self.bias = bias def save_param (self ): return self.weight, self.bias
ReLU层 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class ReLULayer (object ): def __init__ (self ): print('\tReLU layer.' ) def forward (self, input ): start_time = time.time() self.input = input output = np.maximum(self.input ,0 ) return output def backward (self, top_diff ): bottom_diff = top_diff bottom_diff[self.input <0 ] = 0 return bottom_diff
Softmax层 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class SoftmaxLossLayer (object ): def __init__ (self ): print('\tSoftmax loss layer.' ) def forward (self, input ): input_max = np.max (input , axis=1 , keepdims=True ) input_exp = np.exp(input - input_max) self.prob = input_exp/np.tile(np.sum (input_exp,axis=1 ),(10 ,1 )).T return self.prob def get_loss (self, label ): self.batch_size = self.prob.shape[0 ] self.label_onehot = np.zeros_like(self.prob) self.label_onehot[np.arange(self.batch_size), label] = 1.0 loss = -np.sum (np.log(self.prob) * self.label_onehot) / self.batch_size return loss def backward (self ): bottom_diff = (self.prob-self.label_onehot)/ self.batch_size return bottom_diff
网络结构模块 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class MNIST_MLP (object ): def __init__ (self, batch_size=100 , input_size=784 , hidden1=32 , hidden2=16 , out_classes=10 , lr=0.01 , max_epoch=2 , print_iter=100 ): self.batch_size = batch_size self.input_size = input_size self.hidden1 = hidden1 self.hidden2 = hidden2 self.out_classes = out_classes self.lr = lr self.max_epoch = max_epoch self.print_iter = print_iter def build_model (self ): print('Building multi-layer perception model...' ) self.fc1 = FullyConnectedLayer(self.input_size, self.hidden1) self.relu1 = ReLULayer() self.fc2 = FullyConnectedLayer(self.hidden1,self.hidden2) self.relu2 = ReLULayer() self.fc3 = FullyConnectedLayer(self.hidden2, self.out_classes) self.softmax = SoftmaxLossLayer() self.update_layer_list = [self.fc1, self.fc2, self.fc3] def init_model (self ): print('Initializing parameters of each layer in MLP...' ) for layer in self.update_layer_list: layer.init_param()
网络训练模块 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 def forward (self, input ): h1 = self.fc1.forward(input ) h1 = self.relu1.forward(h1) h2 = self.fc2.forward(h1) h2 = self.relu2.forward(h2) prob = self.softmax.forward(h3) return prob def backward (self ): dloss = self.softmax.backward() dh2 = self.fc3.backward(dloss) dh2 = self.relu2.backward(dh2) dh1 = self.fc2.backward(dh2) dh1 = self.relu1.backward(dh2) dh1 = self.fc1.backward(dh1) def update (self, lr ): for layer in self.update_layer_list: layer.update_param(lr) def train (self ): max_batch = self.train_data.shape[0 ] // self.batch_size print('Start training...' ) for idx_epoch in range (self.max_epoch): self.shuffle_data() for idx_batch in range (max_batch): batch_images = self.train_data[idx_batch*self.batch_size:(idx_batch+1 )*self.batch_size, :-1 ] batch_labels = self.train_data[idx_batch*self.batch_size:(idx_batch+1 )*self.batch_size, -1 ] prob = self.forward(batch_images) loss = self.softmax.get_loss(batch_labels) self.backward() self.update(self.lr) if idx_batch % self.print_iter == 0 : print('Epoch %d, iter %d, loss: %.6f' % (idx_epoch, idx_batch, loss))
shuffle的作用及max_batch的意义 max_batch = self.train_data.shape[0] // self.batch_size 表示分组,即一个样本计算一次偏导更新一次权值还是多个样本计算一次偏导,更新一次权值。
以猫狗分类为例, 假如数据集是
Dog,Dog,Dog,… ,Dog,Dog,Dog,Cat,Cat,Cat,Cat,… ,Cat,Cat
所有的狗都在猫前面,如果不shuffle,模型训练一段时间内只看到了Dog,必然会过拟合于Dog,一段时间内又只能看到Cat,必然又过拟合于Cat,这样的模型泛化能力必然很差。 那如果Dog和Cat一直交替,会不会就不过拟合了呢?
Dog,Cat,Dog,Cat,Dog ,Cat,Dog,…
依然会过拟合,模型是会记住训练数据路线的,为什么呢?
当用随机梯度下降法训练神经网络时,通常的做法是洗牌数据。在纠结细节的情况下,让我们用一个极端的例子来解释为什么shuffle是有用的。假设你正在训练一个分类器来区分猫和狗,你的训练集是50,000只猫后面跟着50,000只狗。如果你不洗牌,你的训练成绩就会很差。 严格地说,这个问题是由梯度噪声中的序列相关性和参数更新的不可交换性引起的。首先我们需要明白固定的数据集顺序,意味着给定迭代步,对应此迭代步的训练数据是固定的。 假如目标函数是J=f(w,b)J=f(w,b),使用梯度下降优化JJ。给定权重取值w、bw、b和迭代步step的情况下,固定的数据集顺序意味着固定的训练样本,也就意味着权值更新的方向是固定的,而无顺序的数据集,意味着更新方向是随机的。所以固定的数据集顺序,严重限制了梯度优化方向的可选择性,导致收敛点选择空间严重变少,容易导致过拟合。所以模型是会记住数据路线的,所以shuffle很重要,一定shuffle。
摘自 https://deepindeed.cn/2019/12/23/Data-Shuffle/
完整代码: 全连接神经网络 layers_1.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 import numpy as npimport structimport osimport timeclass FullyConnectedLayer (object ): def __init__ (self, num_input, num_output ): self.num_input = num_input self.num_output = num_output print('\tFully connected layer with input %d, output %d.' % (self.num_input, self.num_output)) def init_param (self, std=0.01 ): self.weight = np.random.normal(loc=0.0 , scale=std, size=(self.num_input, self.num_output)) self.bias = np.zeros([1 , self.num_output]) def forward (self, input ): start_time = time.time() self.input = input self.output = np.matmul(self.input ,self.weight)+self.bias return self.output def backward (self, top_diff ): self.d_weight = np.dot(self.input .T,top_diff) self.d_bias = top_diff bottom_diff = np.dot(top_diff,self.weight.T) return bottom_diff def update_param (self, lr ): self.weight = self.weight-lr*self.d_weight self.bias = self.bias-lr*self.d_bias def load_param (self, weight, bias ): assert self.weight.shape == weight.shape assert self.bias.shape == bias.shape self.weight = weight self.bias = bias def save_param (self ): return self.weight, self.biasclass ReLULayer (object ): def __init__ (self ): print('\tReLU layer.' ) def forward (self, input ): start_time = time.time() self.input = input output = np.maximum(self.input ,0 ) return output def backward (self, top_diff ): bottom_diff = top_diff bottom_diff[self.input <0 ] = 0 return bottom_diffclass SoftmaxLossLayer (object ): def __init__ (self ): print('\tSoftmax loss layer.' ) def forward (self, input ): input_max = np.max (input , axis=1 , keepdims=True ) input_exp = np.exp(input - input_max) self.prob = input_exp/np.tile(np.sum (input_exp,axis=1 ),(10 ,1 )).T return self.prob def get_loss (self, label ): self.batch_size = self.prob.shape[0 ] self.label_onehot = np.zeros_like(self.prob) self.label_onehot[np.arange(self.batch_size), label] = 1.0 loss = -np.sum ( self.label_onehot*np.log(self.prob)) / self.batch_size return loss def backward (self ): bottom_diff = (self.prob-self.label_onehot)/ self.batch_size return bottom_diff
mnist_mlp_cpu.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 import numpy as npimport structimport osimport timefrom layers_1 import FullyConnectedLayer, ReLULayer, SoftmaxLossLayer MNIST_DIR = "./mnist_data" TRAIN_DATA = "train-images-idx3-ubyte" TRAIN_LABEL = "train-labels-idx1-ubyte" TEST_DATA = "t10k-images-idx3-ubyte" TEST_LABEL = "t10k-labels-idx1-ubyte" def show_matrix (mat, name ): pass class MNIST_MLP (object ): def __init__ (self, batch_size=100 , input_size=784 , hidden1=32 , hidden2=16 , out_classes=10 , lr=0.01 , max_epoch=2 , print_iter=100 ): self.batch_size = batch_size self.input_size = input_size self.hidden1 = hidden1 self.hidden2 = hidden2 self.out_classes = out_classes self.lr = lr self.max_epoch = max_epoch self.print_iter = print_iter def load_mnist (self, file_dir, is_images = 'True' ): bin_file = open (file_dir, 'rb' ) bin_data = bin_file.read() bin_file.close() if is_images: fmt_header = '>iiii' magic, num_images, num_rows, num_cols = struct.unpack_from(fmt_header, bin_data, 0 ) else : fmt_header = '>ii' magic, num_images = struct.unpack_from(fmt_header, bin_data, 0 ) num_rows, num_cols = 1 , 1 data_size = num_images * num_rows * num_cols mat_data = struct.unpack_from('>' + str (data_size) + 'B' , bin_data, struct.calcsize(fmt_header)) mat_data = np.reshape(mat_data, [num_images, num_rows * num_cols]) print('Load images from %s, number: %d, data shape: %s' % (file_dir, num_images, str (mat_data.shape))) return mat_data def load_data (self ): print('Loading MNIST data from files...' ) train_images = self.load_mnist(os.path.join(MNIST_DIR, TRAIN_DATA), True ) train_labels = self.load_mnist(os.path.join(MNIST_DIR,TRAIN_LABEL),False ) test_images = self.load_mnist(os.path.join(MNIST_DIR,TEST_DATA),True ) test_labels = self.load_mnist(os.path.join(MNIST_DIR,TEST_LABEL ),False ) self.train_data = np.append(train_images, train_labels, axis=1 ) self.test_data = np.append(test_images, test_labels, axis=1 ) def shuffle_data (self ): print('Randomly shuffle MNIST data...' ) np.random.shuffle(self.train_data) def build_model (self ): print('Building multi-layer perception model...' ) self.fc1 = FullyConnectedLayer(self.input_size, self.hidden1) self.relu1 = ReLULayer() self.fc2 = FullyConnectedLayer(self.hidden1,self.hidden2) self.relu2 = ReLULayer() self.fc3 = FullyConnectedLayer(self.hidden2, self.out_classes) self.softmax = SoftmaxLossLayer() self.update_layer_list = [self.fc1, self.fc2, self.fc3] def init_model (self ): print('Initializing parameters of each layer in MLP...' ) for layer in self.update_layer_list: layer.init_param() def load_model (self, param_dir ): print('Loading parameters from file ' + param_dir) params = np.load(param_dir, allow_pickle=True ).item() self.fc1.load_param(params['w1' ], params['b1' ]) self.fc2.load_param(params['w2' ], params['b2' ]) self.fc3.load_param(params['w3' ], params['b3' ]) def save_model (self, param_dir ): print('Saving parameters to file ' + param_dir) params = {} params['w1' ], params['b1' ] = self.fc1.save_param() params['w2' ], params['b2' ] = self.fc2.save_param() params['w3' ], params['b3' ] = self.fc3.save_param() np.save(param_dir, params) def forward (self, input ): h1 = self.fc1.forward(input ) h1 = self.relu1.forward(h1) h2 = self.fc2.forward(h1) h2 = self.relu2.forward(h2) h3 = self.fc3.forward(h2) self.prob = self.softmax.forward(h3) return self.prob def backward (self ): dloss = self.softmax.backward() dh2 = self.fc3.backward(dloss) dh2 = self.relu2.backward(dh2) dh1 = self.fc2.backward(dh2) dh1 = self.relu1.backward(dh1) dh1 = self.fc1.backward(dh1) def update (self, lr ): for layer in self.update_layer_list: layer.update_param(lr) def train (self ): max_batch = self.train_data.shape[0 ] // self.batch_size print('Start training...' ) for idx_epoch in range (self.max_epoch): self.shuffle_data() for idx_batch in range (max_batch): batch_images = self.train_data[idx_batch*self.batch_size:(idx_batch+1 )*self.batch_size, :-1 ] batch_labels = self.train_data[idx_batch*self.batch_size:(idx_batch+1 )*self.batch_size, -1 ] prob = self.forward(batch_images) loss = self.softmax.get_loss(batch_labels) self.backward() self.update(self.lr) if idx_batch % self.print_iter == 0 : print('Epoch %d, iter %d, loss: %.6f' % (idx_epoch, idx_batch, loss)) def evaluate (self ): pred_results = np.zeros([self.test_data.shape[0 ]]) start_time = time.time() for idx in range (self.test_data.shape[0 ]//self.batch_size): batch_images = self.test_data[idx*self.batch_size:(idx+1 )*self.batch_size, :-1 ] prob = self.forward(batch_images) end = time.time() pred_labels = np.argmax(prob, axis=1 ) pred_results[idx*self.batch_size:(idx+1 )*self.batch_size] = pred_labels print("All evaluate time: %f" %(time.time()-start_time)) accuracy = np.mean(pred_results == self.test_data[:,-1 ]) print('Accuracy in test set: %f' % accuracy)if __name__ == '__main__' : h1, h2, e = 32 , 16 , 1 mlp = MNIST_MLP(hidden1=h1, hidden2=h2, max_epoch=e) mlp.load_data() mlp.build_model() mlp.init_model() start_time = time.time() mlp.train() print("All train time: %f" %(time.time()-start_time)) mlp.save_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e)) mlp.load_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e)) mlp.evaluate()
卷积神经网络 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import tensorflow as tffrom tensorflow.keras.models import Sequentialfrom tensorflow.keras.layers import Dense,Dropout,Flatten,Convolution2D,MaxPooling2Dfrom tensorflow.keras.optimizers import Adamimport matplotlib.pyplot as pltimport numpy as np mnist = tf.keras.datasets.mnist (x_train,y_train),(x_test,y_test) = mnist.load_data() x_train = x_train.reshape(-1 ,28 ,28 ,1 )/255.0 x_test = x_test.reshape(-1 ,28 ,28 ,1 )/255.0 y_train = tf.keras.utils.to_categorical(y_train,num_classes=10 ) y_test = tf.keras.utils.to_categorical(y_test,num_classes=10 ) model = Sequential() model.add(Convolution2D( input_shape = (28 ,28 ,1 ), filters = 32 , kernel_size = 5 , strides = 1 , padding = 'same' , activation = 'relu' )) model.add(MaxPooling2D( pool_size = 2 , strides =2 , padding = 'same' )) model.add(Convolution2D(64 ,5 ,strides = 1 ,padding = 'same' ,activation = 'relu' )) model.add(MaxPooling2D(2 ,2 ,'same' )) model.add(Flatten()) model.add(Dense(1024 ,activation='relu' )) model.add(Dropout(0.5 )) model.add(Dense(10 ,activation = 'softmax' )) adam = Adam(lr = 1e-4 ) model.compile (optimizer=adam,loss='categorical_crossentropy' ,metrics=['accuracy' ]) model.fit(x_train,y_train,batch_size = 64 ,epochs = 10 ,validation_data = (x_test,y_test)) model.save('mnist.h5' )
效果: