一、步骤
1 加载数据集fashion_minst
2 搭建class NeuralNetwork模型
3 设置损失函数,优化器
4 编写评估函数
5 编写训练函数
6 开始训练
7 绘制损失,准确率曲线
二、代码
导包,打印版本号:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
from tqdm.auto import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
print(sys.version_info)
for module in mpl, np, pd, sklearn, torch:
print(module.__name__, module.__version__)
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print(device)
torch的运算过程都是张量,也叫算子(tensor)
torchvision的包可以提供数据集,图片就是datasets:
这里下载到data目录,如果已有数据则不会下载。这段代码可以实现数据向tensor的转换:
做预处理的时候把图片变成tensor,啥都没写的时候就不会转换成tensor
from torchvision import datasets
from torchvision.transforms import ToTensor
from torchvision import transforms
# 定义数据集的变换
transform = transforms.Compose([
])
# fashion_mnist图像分类数据集,衣服分类,60000张训练图片,10000张测试图片
train_ds = datasets.FashionMNIST(
root="data",
train=True,
download=True,
transform=transform
)
test_ds = datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=transform
)
# torchvision 数据集里没有提供训练集和验证集的划分
# 当然也可以用 torch.utils.data.Dataset 实现人为划分
type(train_ds[0]) # 元组,第一个元素是图片,第二个元素是标签
如果使用了数据类型变换:
img_tensor, label = train_ds[0]
img_tensor.shape #img这时是一个tensor,shape=(1, 28, 28)
在PyTorch中,DataLoader
是一个迭代器,它封装了数据的加载和预处理过程,使得在训练机器学习模型时可以方便地批量加载数据。DataLoader
主要负责以下几个方面:
-
批量加载数据:
DataLoader
可以将数据集(Dataset)切分为更小的批次(batch),每次迭代提供一小批量数据,而不是单个数据点。这有助于模型学习数据中的统计依赖性,并且可以更高效地利用GPU等硬件的并行计算能力。 -
数据打乱:默认情况下,
DataLoader
会在每个epoch(训练周期)开始时打乱数据的顺序。这有助于模型训练时避免陷入局部最优解,并且可以提高模型的泛化能力。 -
多线程数据加载:
DataLoader
支持多线程(通过参数num_workers
)来并行地加载数据,这可以显著减少训练过程中的等待时间,尤其是在处理大规模数据集时。 -
数据预处理:
DataLoader
可以与transforms
结合使用,对加载的数据进行预处理,如归一化、标准化、数据增强等操作。 -
内存管理:
DataLoader
负责管理数据的内存使用,确保在训练过程中不会耗尽内存资源。 -
易用性:
DataLoader
提供了一个简单的接口,可以很容易地集成到训练循环中。
# 从数据集到dataloader
train_loader = torch.utils.data.DataLoader(train_ds, batch_size=32, shuffle=True) #batch_size分批,shuffle洗牌
val_loader = torch.utils.data.DataLoader(test_ds, batch_size=32, shuffle=False)
这里每32个样本就会算一次平均损失,更新一次w。
定义模型:继承nn.Module
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__() # 继承父类的初始化方法,子类有父类的属性
self.flatten = nn.Flatten() # 展平层
self.linear_relu_stack = nn.Sequential(
nn.Linear(784, 300), # in_features=784, out_features=300, 784是输入特征数,300是输出特征数
nn.ReLU(), # 激活函数
nn.Linear(300, 100),#隐藏层神经元数100
nn.ReLU(), # 激活函数
nn.Linear(100, 10),#输出层神经元数10
)
def forward(self, x): # 前向计算,前向传播
# x.shape [batch size, 1, 28, 28],1是通道数
x = self.flatten(x)
# print(f'x.shape--{x.shape}')
# 展平后 x.shape [batch size, 784]
logits = self.linear_relu_stack(x)
# logits.shape [batch size, 10]
return logits #没有经过softmax,称为logits
model = NeuralNetwork()
model的结构:第一层是展平层,然后激活,然后隐藏层,激活,输出层
在训练之前需要测试一下模型能不能用,所以我们随机一个或者从样本拿一个,同尺寸就行:
#为了查看模型运算的tensor尺寸
x = torch.randn(32, 1, 28, 28)
print(x.shape)
logits = model(x) # 把x输入到模型中,得到logits
print(logits.shape)
然后开始训练,pytorch的训练需要自行实现,包括定义损失函数、优化器、训练步,训练
# 1. 定义损失函数 采用交叉熵损失
loss_fct = nn.CrossEntropyLoss() #内部先做softmax,然后计算交叉熵
# 2. 定义优化器 采用SGD
# Optimizers specified in the torch.optim package,随机梯度下降
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
from sklearn.metrics import accuracy_score # sk里面有一个算子,可以计算准确率
@torch.no_grad() # 装饰器,禁止反向传播,节省内存,就是不求导的意思
def evaluating(model, dataloader, loss_fct): # 评估函数,评估也要做一次向前计算,不需要求梯度
loss_list = [] # 记录损失
pred_list = [] # 记录预测
label_list = [] # 记录标签
for datas, labels in dataloader:#10000/32=312
datas = datas.to(device) # 转到GPU
labels = labels.to(device) # 转到GPU 这两行代码torch必写,把tensor放到GPU上
# 前向计算
logits = model(datas) # 进行前向计算
loss = loss_fct(logits, labels) # 验证集损失,loss尺寸是一个数值
loss_list.append(loss.item()) # 记录损失,item是把tensor转换为数值
preds = logits.argmax(axis=-1) # 验证集预测,argmax返回最大值索引,-1就是最后一个维度
print(f'评估中的preds.shape--{preds.shape}')
pred_list.extend(preds.cpu().numpy().tolist())#将PyTorch张量转换为NumPy数组。只有当张量在CPU上时,这个转换才是合法的
# print(preds.cpu().numpy().tolist())
label_list.extend(labels.cpu().numpy().tolist())
acc = accuracy_score(label_list, pred_list) # 计算准确率
return np.mean(loss_list), acc
# 训练
def training(model, train_loader, val_loader, epoch, loss_fct, optimizer, eval_step=500):
#参数分别是模型,训练集,验证集,训练epoch,损失函数,优化器,评估步数(500评估一次)
record_dict = { # 记录字典,用于记录训练过程中的信息
"train": [],
"val": []
}
global_step = 0 # 全局步数,记录训练的步数
model.train() # 进入训练模式,模型可以切换模式
#tqdm是一个进度条库
with tqdm(total=epoch * len(train_loader)) as pbar: # 进度条 加入epoch等于10,就是所有样本搞10次,不断地把样本带进去学习,1875*10,60000/32=1875
for epoch_id in range(epoch): # 训练epoch次
# training
for datas, labels in train_loader: #执行次数是60000/32=1875
datas = datas.to(device) #datas尺寸是[batch_size,1,28,28]
labels = labels.to(device) #labels尺寸是[batch_size]
# 梯度清空
optimizer.zero_grad() # 每次训练前都要把梯度清空,不然会累加
# 模型前向计算
logits = model(datas)
# 计算损失
loss = loss_fct(logits, labels)
# 梯度回传,loss.backward()会计算梯度,loss对模型参数求导
loss.backward()
# 调整优化器,包括学习率的变动等,优化器的学习率会随着训练的进行而减小,更新w,b
optimizer.step() #梯度是计算并存储在模型参数的 .grad 属性中,优化器使用这些存储的梯度来更新模型参数
preds = logits.argmax(axis=-1) # 训练集预测
acc = accuracy_score(labels.cpu().numpy(), preds.cpu().numpy()) # 计算准确率,numpy可以,每个step都算一次
loss = loss.cpu().item() # 损失转到CPU,item()取值,一个数值
# tensor如果只有一个值(标量),一维是向量,二维是矩阵,可以用item()取出值,如果有多个值,则需要用tolist()转为列表
# record
# record
record_dict["train"].append({
"loss": loss, "acc": acc, "step": global_step
}) # 记录训练集信息,每一步的损失,准确率,步数
# evaluating
if global_step % eval_step == 0:
model.eval() # 进入评估模式,不会求梯度
val_loss, val_acc = evaluating(model, val_loader, loss_fct)
record_dict["val"].append({
"loss": val_loss, "acc": val_acc, "step": global_step
})
model.train() # 进入训练模式
# udate step
global_step += 1 # 全局步数加1
pbar.update(1) # 更新进度条
pbar.set_postfix({"epoch": epoch_id}) # 设置进度条显示信息
return record_dict
epoch = 20 #改为40
model = model.to(device)
record = training(model, train_loader, val_loader, epoch, loss_fct, optimizer, eval_step=1000)
#画线要注意的是损失是不一定在零到1之间的
def plot_learning_curves(record_dict, sample_step=1000):
# build DataFrame
train_df = pd.DataFrame(record_dict["train"]).set_index("step").iloc[::sample_step]
val_df = pd.DataFrame(record_dict["val"]).set_index("step")
last_step = train_df.index[-1] # 最后一步的步数
# print(train_df.columns)
print(train_df['acc'])
print(val_df['acc'])
# plot
fig_num = len(train_df.columns) # 画几张图,分别是损失和准确率
fig, axs = plt.subplots(1, fig_num, figsize=(5 * fig_num, 5))
for idx, item in enumerate(train_df.columns):
# print(train_df[item].values)
axs[idx].plot(train_df.index, train_df[item], label=f"train_{item}")
axs[idx].plot(val_df.index, val_df[item], label=f"val_{item}")
axs[idx].grid() # 显示网格
axs[idx].legend() # 显示图例
axs[idx].set_xticks(range(0, train_df.index[-1], 5000)) # 设置x轴刻度
axs[idx].set_xticklabels(map(lambda x: f"{int(x/1000)}k", range(0, last_step, 5000))) # 设置x轴标签
axs[idx].set_xlabel("step")
plt.show()
plot_learning_curves(record) #横坐标是 steps
# dataload for evaluating
model.eval() # 进入评估模式
loss, acc = evaluating(model, val_loader, loss_fct)
print(f"loss: {loss:.4f}\naccuracy: {acc:.4f}")