模型建立
- 数据初始化
- 根据模型搭建前向传播
- 打印模型结构
前向传播数据初始化
def __init__(self):
super(LeNet, self).__init__()
# 第一层卷积层:
# 输入:灰度图像 (1通道,大小 28x28)
# 输出:6个特征图 (大小 28x28, 通过padding=2保持尺寸不变)
# 卷积核大小为 5x5,步幅为 1
self.c1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, padding=2)
# 激活函数:
# 使用 Sigmoid 作为激活函数,将卷积输出的值压缩到 (0,1) 区间
self.sig = nn.Sigmoid()
# 第一层池化层:
# 输入:6个特征图 (28x28)
# 输出:6个特征图 (14x14, 通过池化降采样)
# 池化窗口大小为 2x2,步幅为 2
self.s2 = nn.AvgPool2d(kernel_size=2, stride=2)
# 第二层卷积层:
# 输入:6个特征图 (14x14)
# 输出:16个特征图 (10x10, 无padding)
# 卷积核大小为 5x5,步幅为 1
self.c3 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5)
# 第二层池化层:
# 输入:16个特征图 (10x10)
# 输出:16个特征图 (5x5, 通过池化降采样)
# 池化窗口大小为 2x2,步幅为 2
self.s4 = nn.AvgPool2d(kernel_size=2, stride=2)
# 展平层:
# 输入:16个特征图 (5x5)
# 输出:展平为长度为 400 的向量
self.flatten = nn.Flatten()
# 全连接层 1:
# 输入:长度为 400 的向量
# 输出:长度为 120 的向量
self.f5 = nn.Linear(400, 120)
# 全连接层 2:
# 输入:长度为 120 的向量
# 输出:长度为 84 的向量
self.f6 = nn.Linear(120, 84)
# 全连接层 3:
# 输入:长度为 84 的向量
# 输出:长度为 10 的向量(对应10个分类标签,如MNIST数字分类)
self.f7 = nn.Linear(84, 10)
构建前向传播
def forward(self, x):
# 前向传播过程
# 1. 卷积层 + 激活函数
x = self.sig(self.c1(x)) # 第一层卷积 + 激活函数
# 2. 池化层
x = self.s2(x) # 第一层池化
# 3. 卷积层 + 激活函数
x = self.sig(self.c3(x)) # 第二层卷积 + 激活函数
# 4. 池化层
x = self.s4(x) # 第二层池化
# 5. 展平
x = self.flatten(x) # 将特征图展平为向量
# 6. 全连接层 1
x = self.sig(self.f5(x)) # 第一全连接层 + 激活函数
# 7. 全连接层 2
x = self.sig(self.f6(x)) # 第二全连接层 + 激活函数
# 8. 全连接层 3
x = self.f7(x) # 第三全连接层(输出分类结果)
return x
打印模型参数
if __name__ == "__main__":
# 判断当前是否有可用的GPU,如果有则使用CUDA(GPU),否则使用CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 初始化模型(LeNet类假设在之前已定义)
# 并将模型加载到指定的设备(CPU或GPU)上
model = LeNet().to(device)
# 打印模型的摘要,包括模型结构、每层的参数数量以及输出形状
# 输入形状指定为 (1, 28, 28),即单通道灰度图像
print(summary(model, (1, 28, 28)))
运行结果
数据预处理
- 加载训练数据集
- 创建数据加载器
from torchvision.datasets import FashionMNIST # 导入 FashionMNIST 数据集
from torchvision import transforms # 图像处理工具
import torch.utils.data as Data # 数据加载工具
import numpy as np # 数值计算工具
import matplotlib.pyplot as plt # 数据可视化工具
# 加载训练数据集
train_data = FashionMNIST(
root='./data', # 数据存储的根目录
train=True, # 加载训练数据
transform=transforms.Compose([
transforms.Resize(size=224), # 将图像调整到 224x224 的大小
transforms.ToTensor() # 将图像转换为 PyTorch 张量
]),
download=True # 如果数据集不存在,则自动下载
)
# 创建数据加载器
train_loader = Data.DataLoader(
dataset=train_data, # 数据集
batch_size=64, # 每批次加载 64 张图片
shuffle=True, # 随机打乱数据
num_workers=0 # 数据加载时使用的子线程数(Windows 环境下设置为 0)
)
# 提取一个批次的数据
for step, (b_x, b_y) in enumerate(train_loader): # 遍历训练数据加载器
if step > 0: # 只提取第一个批次
break
# 将图像数据转换为 NumPy 格式,并移除单一的维度
batch_x = b_x.squeeze().numpy() # b_x: 形状为 [64, 1, 224, 224],squeeze 移除第 2 维
batch_y = b_y.numpy() # 将标签转换为 NumPy 数组
class_label = train_data.classes # 获取数据集中的类别标签
# print(class_label) # 如果需要调试,可以打印类别标签列表
# 打印当前批次的维度
print("The size of batch in train data:", batch_x.shape) # 输出批次大小,例如 [64, 224, 224]
# 可视化一个批次的图像
plt.figure(figsize=(12, 5)) # 创建一个 12x5 的画布
for ii in np.arange(len(batch_y)): # 遍历当前批次的每个样本
plt.subplot(4, 16, ii + 1) # 在 4x16 网格中创建子图
plt.imshow(batch_x[ii, :, :], cmap=plt.cm.gray) # 绘制图像,灰度显示
plt.title(class_label[batch_y[ii]], size=10) # 设置标题为对应的类别标签
plt.axis("off") # 隐藏坐标轴
plt.subplots_adjust(wspace=0.05) # 调整子图之间的间距
plt.show() # 显示图像
模型训练
数据预处理
import torch
from torchvision.datasets import FashionMNIST
from torchvision import transforms
import torch.utils.data as Data
import numpy as np
import matplotlib.pyplot as plt
from model import LeNet
import torch.nn as nn
import pandas as pd
def train_val_data_process():
train_data = FashionMNIST(root='./data',
train=True,
transform=transforms.Compose([transforms.Resize(size=28), transforms.ToTensor()]),
download=True)
#划分训练集与验证集 80%用于训练
train_data, val_data = Data.random_split(train_data, [round(0.8 * len(train_data)), round(0.2 * len(train_data))])
#训练数据加载器
train_dataloader = Data.DataLoader(dataset=train_data,
batch_size=32,
shuffle=True,
num_workers=2)
#验证数据加载器
val_dataloader = Data.DataLoader(dataset=val_data,
batch_size=32,
shuffle=True,
num_workers=2)
return train_dataloader, val_dataloader
# train_val_data_process()
训练逻辑
代码训练逻辑如果是学习新知识考试
初始化(学习计划制定)
- 学生决定使用纸质书还是电子书来学习,类似于代码中选择 GPU 或 CPU
- 优化器类似于学习方法(比如背诵、做笔记),损失函数类似于判断学习效果的标准(是否答对问题)
训练阶段(学习阶段)
- 学生每天从学习材料中抽取一部分内容(批次)
- 学习时通过练习题测试自己的理解,找出错误并改正(前向传播和反向传播)
- 每天记录自己学了多少内容以及正确率(累加损失和正确样本数)
验证阶段(考试)
- 学生在考试中用未见过的题目检验学习效果
- 只计算考试的分数(验证损失和准确率),不再对学习内容进行改动
记录结果(反思总结)
- 每天记录学习和考试的情况(损失和准确率)
- 如果某天考试成绩比历史最好成绩高,就记录这一天的学习成果(保存最优模型参数)
最终总结(保存模型)
- 保存最优模型(保存学习笔记以备后续复习)
训练函数实现
实现逻辑简单理解(训练过程类似应对考试的全流程)
###########################################
####深度学习训练与验证
##########################################
def train_model_process(model, train_dataloader, val_dataloader, num_epochs):
###########################################
###设备设置和优化器初始化
##########################################
# 设定训练所用到的设备,有GPU用GPU没有GPU用CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 使用Adam优化器,学习率为0.001
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 损失函数为交叉熵函数
criterion = nn.CrossEntropyLoss()
# 将模型放入到训练设备中
model = model.to(device)
# 复制当前模型的参数
best_model_wts = copy.deepcopy(model.state_dict())
###########################################
###初始化变量
##########################################
# 最高准确度
best_acc = 0.0
# 训练集损失列表
train_loss_all = []
# 验证集损失列表
val_loss_all = []
# 训练集准确度列表
train_acc_all = []
# 验证集准确度列表
val_acc_all = []
# 当前时间
since = time.time()
###########################################
###训练和验证循环
##########################################
for epoch in range(num_epochs):
print("Epoch {}/{}".format(epoch, num_epochs-1))
print("-"*10)
# 初始化参数
# 训练集损失函数
train_loss = 0.0
# 训练集准确度
train_corrects = 0
# 验证集损失函数
val_loss = 0.0
# 验证集准确度
val_corrects = 0
# 训练集样本数量
train_num = 0
# 验证集样本数量
val_num = 0
# 对每一个mini-batch训练和计算
for step, (b_x, b_y) in enumerate(train_dataloader):
# 将特征放入到训练设备中
b_x = b_x.to(device)
# 将标签放入到训练设备中
b_y = b_y.to(device)
# 设置模型为训练模式
model.train()
# 前向传播过程,输入为一个batch,输出为一个batch中对应的预测
output = model(b_x)
# 查找每一行中最大值对应的行标
pre_lab = torch.argmax(output, dim=1)
# 计算每一个batch的损失函数
loss = criterion(output, b_y)
# 将梯度初始化为0
optimizer.zero_grad()
# 反向传播计算
loss.backward()
# 根据网络反向传播的梯度信息来更新网络的参数,以起到降低loss函数计算值的作用
optimizer.step()
# 对损失函数进行累加(学到了多少内容)
train_loss += loss.item() * b_x.size(0)
# 如果预测正确,则准确度train_corrects加1(正确率)
train_corrects += torch.sum(pre_lab == b_y.data)
# 当前用于训练的样本数量
train_num += b_x.size(0)
###########################################
###验证阶段
##########################################
for step, (b_x, b_y) in enumerate(val_dataloader):
# 将特征放入到验证设备中
b_x = b_x.to(device)
# 将标签放入到验证设备中
b_y = b_y.to(device)
# 设置模型为评估模式
model.eval()
# 前向传播过程,输入为一个batch,输出为一个batch中对应的预测
output = model(b_x)
# 查找每一行中最大值对应的行标
pre_lab = torch.argmax(output, dim=1)
# 计算每一个batch的损失函数
loss = criterion(output, b_y)
# 对损失函数进行累加
val_loss += loss.item() * b_x.size(0)
# 如果预测正确,则准确度train_corrects加1
val_corrects += torch.sum(pre_lab == b_y.data)
# 当前用于验证的样本数量
val_num += b_x.size(0)
###########################################
###记录和打印结果
##########################################
# 计算并保存每一次迭代的loss值和准确率
# 计算并保存训练集的loss值
train_loss_all.append(train_loss / train_num)
# 计算并保存训练集的准确率
train_acc_all.append(train_corrects.double().item() / train_num)
# 计算并保存验证集的loss值
val_loss_all.append(val_loss / val_num)
# 计算并保存验证集的准确率
val_acc_all.append(val_corrects.double().item() / val_num)
print("{} train loss:{:.4f} train acc: {:.4f}".format(epoch, train_loss_all[-1], train_acc_all[-1]))
print("{} val loss:{:.4f} val acc: {:.4f}".format(epoch, val_loss_all[-1], val_acc_all[-1]))
###########################################
###保存最优模型
##########################################
if val_acc_all[-1] > best_acc:
# 保存当前最高准确度
best_acc = val_acc_all[-1]
# 保存当前最高准确度的模型参数
best_model_wts = copy.deepcopy(model.state_dict())
# 计算训练和验证的耗时
time_use = time.time() - since
print("训练和验证耗费的时间{:.0f}m{:.0f}s".format(time_use//60, time_use%60))
# 选择最优参数,保存最优参数的模型
torch.save(best_model_wts, "E:\秋招就业\CNN卷积神经网络\测试用例\LeNet\\best_model.pth")
train_process = pd.DataFrame(data={"epoch":range(num_epochs),
"train_loss_all":train_loss_all,
"val_loss_all":val_loss_all,
"train_acc_all":train_acc_all,
"val_acc_all":val_acc_all,})
return train_process
可视化打印
ef matplot_acc_loss(train_process):
# 显示每次迭代后的训练集和验证集的损失与准确率
plt.figure(figsize=(12, 4)) # 创建一个宽 12、高 4 的画布
# 第一个子图:训练与验证的损失函数变化
plt.subplot(1, 2, 1) # 第 1 行、第 1 列、第 1 个子图
plt.plot(train_process['epoch'], train_process.train_loss_all, "ro-", label="Train loss") # 绘制训练损失曲线
plt.plot(train_process['epoch'], train_process.val_loss_all, "bs-", label="Val loss") # 绘制验证损失曲线
plt.legend() # 显示图例
plt.xlabel("epoch") # x 轴标签
plt.ylabel("Loss") # y 轴标签
# 第二个子图:训练与验证的准确率变化
plt.subplot(1, 2, 2) # 第 1 行、第 2 列、第 2 个子图
plt.plot(train_process['epoch'], train_process.train_acc_all, "ro-", label="Train acc") # 绘制训练准确率曲线
plt.plot(train_process['epoch'], train_process.val_acc_all, "bs-", label="Val acc") # 绘制验证准确率曲线
plt.xlabel("epoch") # x 轴标签
plt.ylabel("Accuracy") # y 轴标签
plt.legend() # 显示图例
plt.show() # 显示绘制的图像
if __name__ == '__main__':
# 加载需要的模型
LeNet = LeNet()
# 加载数据集
train_data, val_data = train_val_data_process()
# 利用现有的模型进行模型的训练
train_process = train_model_process(LeNet, train_data, val_data, num_epochs=20)
matplot_acc_loss(train_process)
训练结果
左边错误率,右边正确率
Epoch 0/19
----------
0 train loss:0.9156 train acc: 0.6530
0 val loss:0.6296 val acc: 0.7583
训练和验证耗费的时间0m24s
Epoch 1/19
----------
1 train loss:0.5498 train acc: 0.7897
1 val loss:0.5201 val acc: 0.8092
训练和验证耗费的时间0m46s
Epoch 2/19
----------
2 train loss:0.4720 train acc: 0.8251
2 val loss:0.4770 val acc: 0.8291
训练和验证耗费的时间1m8s
Epoch 3/19
----------
3 train loss:0.4265 train acc: 0.8422
3 val loss:0.4196 val acc: 0.8462
训练和验证耗费的时间1m31s
Epoch 4/19
----------
4 train loss:0.3963 train acc: 0.8534
4 val loss:0.3953 val acc: 0.8558
训练和验证耗费的时间1m53s
Epoch 5/19
----------
5 train loss:0.3690 train acc: 0.8626
5 val loss:0.3777 val acc: 0.8602
训练和验证耗费的时间2m16s
Epoch 6/19
----------
6 train loss:0.3500 train acc: 0.8687
6 val loss:0.3656 val acc: 0.8663
训练和验证耗费的时间2m38s
Epoch 7/19
----------
7 train loss:0.3332 train acc: 0.8743
7 val loss:0.3465 val acc: 0.8734
训练和验证耗费的时间2m60s
Epoch 8/19
----------
8 train loss:0.3180 train acc: 0.8819
8 val loss:0.3365 val acc: 0.8747
训练和验证耗费的时间3m22s
Epoch 9/19
----------
9 train loss:0.3078 train acc: 0.8842
9 val loss:0.3389 val acc: 0.8735
训练和验证耗费的时间3m44s
Epoch 10/19
----------
10 train loss:0.2982 train acc: 0.8871
10 val loss:0.3327 val acc: 0.8801
训练和验证耗费的时间4m8s
Epoch 11/19
----------
11 train loss:0.2879 train acc: 0.8919
11 val loss:0.3068 val acc: 0.8852
训练和验证耗费的时间4m31s
Epoch 12/19
----------
12 train loss:0.2786 train acc: 0.8954
12 val loss:0.3040 val acc: 0.8870
训练和验证耗费的时间4m54s
Epoch 13/19
----------
13 train loss:0.2704 train acc: 0.8978
13 val loss:0.3089 val acc: 0.8881
训练和验证耗费的时间5m18s
Epoch 14/19
----------
14 train loss:0.2629 train acc: 0.9015
14 val loss:0.2877 val acc: 0.8935
训练和验证耗费的时间5m41s
Epoch 15/19
----------
15 train loss:0.2547 train acc: 0.9033
15 val loss:0.2968 val acc: 0.8901
训练和验证耗费的时间6m5s
Epoch 16/19
----------
16 train loss:0.2508 train acc: 0.9057
16 val loss:0.2915 val acc: 0.8921
训练和验证耗费的时间6m28s
Epoch 17/19
----------
17 train loss:0.2440 train acc: 0.9080
17 val loss:0.3284 val acc: 0.8809
训练和验证耗费的时间6m51s
Epoch 18/19
----------
18 train loss:0.2381 train acc: 0.9102
18 val loss:0.2834 val acc: 0.8954
训练和验证耗费的时间7m13s
Epoch 19/19
----------
19 train loss:0.2318 train acc: 0.9125
19 val loss:0.2842 val acc: 0.8950
训练和验证耗费的时间7m37s
模型测试
测试模型搭建逻辑
- 老师加载学生的答案
- 设备选择(GPU或CPU):老师选择在哪个地方(教室或办公室)批改试卷。如果有更多的资源(GPU),可以更快地批改;否则,就用现有的资源(CPU)
- 模型移动到设备上:类似于老师将试卷放到指定的位置开始批改
- 初始化计数器:老师开始计数正确和总的试题数
- 批改数据(循环遍历测试数据)
- 禁用梯度计算:就像老师在批改试卷时只关注答案,不需要反复修改试卷内容
- 循环遍历试卷:
- 数据迁移:老师将试卷放到批改台上
- 评估模式:老师保持客观,不带情绪地批改试卷
- 前向传播:老师查看学生的答案
- 预测标签:老师判断学生的答案是否正确
- 计算正确数量:如果答案正确,计数器加1
- 更新总数:统计总共批改了多少份试卷
- 最后计算准确率
具体实现
def test_data_process():
test_data = FashionMNIST(root='./data',
train=False,
transform=transforms.Compose([transforms.Resize(size=28), transforms.ToTensor()]),
download=True)
test_dataloader = Data.DataLoader(dataset=test_data,
batch_size=1,
shuffle=True,
num_workers=0)
return test_dataloader
def test_model_process(model, test_dataloader):
# 设定测试所用到的设备,有GPU用GPU没有GPU用CPU
device = "cuda" if torch.cuda.is_available() else 'cpu'
# 将模型放入到测试设备中
model = model.to(device)
# 初始化参数
test_corrects = 0.0
test_num = 0
# 只进行前向传播计算,不计算梯度,从而节省内存,加快运行速度
with torch.no_grad():
for test_data_x, test_data_y in test_dataloader:
# 将特征放入到测试设备中
test_data_x = test_data_x.to(device)
# 将标签放入到测试设备中
test_data_y = test_data_y.to(device)
# 设置模型为评估模式
model.eval()
# 前向传播过程,输入为测试数据集,输出为对每个样本的预测值
output = model(test_data_x)
# 查找每一行中最大值对应的行标
pre_lab = torch.argmax(output, dim=1)
# 如果预测正确,则准确度test_corrects加1
test_corrects += torch.sum(pre_lab == test_data_y.data)
# 将所有的测试样本进行累加
test_num += test_data_x.size(0)
# 计算测试准确率
test_acc = test_corrects.double().item() / test_num
print("测试的准确率为:", test_acc)
if __name__=="__main__":
# 加载模型
model = LeNet()
model.load_state_dict(torch.load('best_model.pth'))
# 加载测试数据
test_dataloader = test_data_process()
# 加载模型测试的函数
test_model_process(model, test_dataloader)
测试结果
如果想要提供精度,提升训练轮数即可