模型训练
模型训练一般分为四个步骤:
构建数据集。
定义神经网络模型。
定义超参、损失函数及优化器。
输入数据集进行训练与评估。
- 数据集加载
import mindspore
from mindspore import nn
# 从 MindSpore 数据集包中导入 vision 和 transforms 模块。
# vision:包含处理图像数据的工具。
# transforms:包含数据转换的工具。
from mindspore.dataset import vision, transforms
# 从 MindSpore 数据集包中导入 MnistDataset 类,用于加载 MNIST 数据集。
from mindspore.dataset import MnistDataset
# 从 download 模块中导入 download 函数,用于下载数据集。
from download import download
# 指定数据集的 URL 地址。
url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/" \
"notebook/datasets/MNIST_Data.zip"
# 使用 download 函数下载数据集并解压到当前目录。
path = download(url, "./", kind="zip", replace=True)
# 定义一个数据管道函数,接收数据集路径和批量大小作为参数。
def datapipe(path, batch_size):
# 定义图像数据的转换操作列表。
image_transforms = [
vision.Rescale(1.0 / 255.0, 0), # 缩放图像像素值到 [0, 1] 范围。
vision.Normalize(mean=(0.1307,), std=(0.3081,)), # 标准化图像数据。
vision.HWC2CHW() # 转换图像格式从 HWC(高度、宽度、通道)到 CHW(通道、高度、宽度)。
]
# 定义标签数据的转换操作,将标签转换为 int32 类型。
label_transform = transforms.TypeCast(mindspore.int32)
# 加载指定路径的数据集。
dataset = MnistDataset(path)
# 对数据集的图像应用转换操作。
dataset = dataset.map(image_transforms, 'image')
# 对数据集的标签应用转换操作。
dataset = dataset.map(label_transform, 'label')
# 将数据集分批,每批包含指定数量的样本。
dataset = dataset.batch(batch_size)
# 返回处理后的数据集。
return dataset
# 创建训练数据集,批量大小为 64。
train_dataset = datapipe('MNIST_Data/train', batch_size=64)
# 创建测试数据集,批量大小为 64。
test_dataset = datapipe('MNIST_Data/test', batch_size=64)
- 构建神经网络
# 定义一个神经网络类 Network,继承自 nn.Cell。
class Network(nn.Cell):
# 在初始化方法中定义网络的结构。
def __init__(self):
# 调用父类的初始化方法。
super().__init__()
# 定义一个平坦化层,用于将输入的多维数据展开为一维。
self.flatten = nn.Flatten()
# 定义一个顺序容器 SequentialCell,其中包含多个层顺序连接。
self.dense_relu_sequential = nn.SequentialCell(
# 全连接层,将输入数据的尺寸从 28*28(即 784)转换为 512。
nn.Dense(28*28, 512),
# ReLU 激活函数。
nn.ReLU(),
# 全连接层,将输入数据的尺寸从 512 转换为 512。
nn.Dense(512, 512),
# ReLU 激活函数。
nn.ReLU(),
# 全连接层,将输入数据的尺寸从 512 转换为 10(对应于 10 个类别)。
nn.Dense(512, 10)
)
# 定义前向传播方法,用于计算网络的输出。
def construct(self, x):
# 将输入数据平坦化。
x = self.flatten(x)
# 依次通过顺序容器中的各层,得到最终的输出 logits。
logits = self.dense_relu_sequential(x)
# 返回计算得到的 logits。
return logits
# 创建一个 Network 类的实例,表示定义好的神经网络模型。
model = Network()
3.定义超参、损失函数及优化器。
# 定义训练的参数。
# 训练的轮数,即数据集将被遍历的次数。
epochs = 3
# 每个批次的大小,即一次训练中使用的样本数。
batch_size = 64
# 学习率,即模型参数在每次更新时调整的幅度。
learning_rate = 1e-2
# 定义训练的参数。
# 训练的轮数,即数据集将被遍历的次数。
epochs = 3
# 每个批次的大小,即一次训练中使用的样本数。
batch_size = 64
# 学习率,即模型参数在每次更新时调整的幅度。
learning_rate = 1e-2
# 定义损失函数,用于计算预测结果与实际标签之间的差异。
# 使用交叉熵损失函数(CrossEntropyLoss),这是分类问题中常用的损失函数。
loss_fn = nn.CrossEntropyLoss()
# 定义优化器,用于更新模型的参数。
# 使用随机梯度下降(SGD)优化器。
# model.trainable_params() 获取模型中所有需要训练的参数。
# learning_rate 指定优化器的学习率。
optimizer = nn.SGD(model.trainable_params(), learning_rate=learning_rate)
4.训练与评估
训练
# 定义前向函数,用于计算模型输出和损失。
def forward_fn(data, label):
# 使用模型计算预测值(logits)。
logits = model(data)
# 计算预测值与真实标签之间的损失。
loss = loss_fn(logits, label)
# 返回损失值和预测值。
return loss, logits
# 获取梯度函数,用于计算损失相对于模型参数的梯度。
# mindspore.value_and_grad 会计算前向函数的值和梯度。
# forward_fn: 计算损失的前向函数。
# None: 不需要计算的额外输出。
# optimizer.parameters: 需要计算梯度的参数。
# has_aux=True: 表示前向函数返回多个值。
grad_fn = mindspore.value_and_grad(forward_fn, None, optimizer.parameters, has_aux=True)
# 定义单步训练函数。
def train_step(data, label):
# 计算损失和梯度。
(loss, _), grads = grad_fn(data, label)
# 使用优化器更新模型参数。
optimizer(grads)
# 返回当前步的损失值。
return loss
# 定义训练循环函数。
def train_loop(model, dataset):
# 获取数据集的大小(即批次的数量)。
size = dataset.get_dataset_size()
# 设置模型为训练模式。
model.set_train()
# 枚举数据集的每个批次。
for batch, (data, label) in enumerate(dataset.create_tuple_iterator()):
# 执行单步训练,获取当前批次的损失值。
loss = train_step(data, label)
# 每 100 个批次打印一次损失值和当前批次编号。
if batch % 100 == 0:
loss, current = loss.asnumpy(), batch
print(f"loss: {loss:>7f} [{current:>3d}/{size:>3d}]")
测试函数
# 定义测试循环函数,用于在测试集上评估模型的性能。
def test_loop(model, dataset, loss_fn):
# 获取数据集的批次数量。
num_batches = dataset.get_dataset_size()
# 设置模型为评估模式。
model.set_train(False)
# 初始化总样本数、测试损失和正确预测数。
total, test_loss, correct = 0, 0, 0
# 枚举数据集的每个批次。
for data, label in dataset.create_tuple_iterator():
# 使用模型进行预测。
pred = model(data)
# 累加总样本数。
total += len(data)
# 累加测试损失。
test_loss += loss_fn(pred, label).asnumpy()
# 累加正确预测数。
correct += (pred.argmax(1) == label).asnumpy().sum()
# 计算平均损失。
test_loss /= num_batches
# 计算准确率。
correct /= total
# 打印测试结果,包括准确率和平均损失。
print(f"Test: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
运行
# 定义损失函数和优化器。
loss_fn = nn.CrossEntropyLoss()
optimizer = nn.SGD(model.trainable_params(), learning_rate=learning_rate)
# 执行多个 epoch 的训练循环。
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------------------")
# 执行训练循环。
train_loop(model, train_dataset)
# 在测试集上进行评估。
test_loop(model, test_dataset, loss_fn)
print("Done!")