目录
一、模型定义组件——重构线性回归
二、模型的加载和保存
2、序列化保存对象和加载
3、保存模型参数
一、模型定义组件——重构线性回归
回顾之前的手动构建线性回归案例:
1.构建数据集;2.加载数据集(数据集转换为迭代器);3.参数初始化;4.线性回归(模型函数,前向回归);5.损失函数(均分误差,反向传播对象);6.优化器(梯度更新);7.训练数据集;8.预测数据。
import math
import torch
import random
from sklearn.datasets import make_regression
def build_data():
"""
构建数据集
"""
# 噪声
noise = random.randint(1,5)
# 样本数量
sample = 1000
# 目标y的真实偏置 bias
bias = 0.5
# coef:真实系数 coef=True 表示希望函数返回生成数据的真实系数
x,y,coef = make_regression(n_samples=sample,n_features=4,bias=bias,noise=noise,coef=True,random_state=666)
# 数据转换成张量
x = torch.tensor(x,dtype=torch.float32)
y = torch.tensor(y,dtype=torch.float32)
coef = torch.tensor(coef,dtype=torch.float32)
return x,y,coef,bias
def load_data(x,y):
"""
加载数据集
将数据集转换为迭代器,以便在训练过程中进行批量处理。
"""
# 单批次数量
batch_size = 16
# 样本总数量
n_samples = x.shape[0]
# 一轮训练的次数
n_batches = math.ceil(n_samples/batch_size)
# 构建数据索引
indices = list(range(n_samples)
# 打乱索引
random.shuffle(indices)
# 从每批次中取出的数据
for i in range(0,n_batches):
start = i*batch_size
end = min((i+1)*batch_size,n_samples)
# 数据下标切片
index = indices[start,end]
# 返回数据
return x[index],y[index]
def initialize(n_feature):
"""
参数初始化
随机初始化权重w, 并将偏置b初始化为1
"""
torch.manual_seed(66)
# 权重 正态分布
w = torch.randn(n_feature,required_grad=True,dtype=torch.float32)
# 偏置
b = torch.tensor(0.0,required_grad=True,dtype=torch.float32)
return w,b
def regressor(x,w,b):
"""
线性回归
模型函数 "前向传播"
"""
return x@w + b
def MSE(y_pred,y_true):
"""
损失函数
均分误差 反向传播的对象
"""
return torch.mean((y_pred-y_true)**2)
def optim_step(w,b,dw,db,lr):
"""
优化器
梯度更新 向梯度下降的方向更新
"""
# 修改的不是原tenser而是tensor的data
w.data -= lr*dw.data
b.data -= lr*db.data
def train():
"""
训练数据集
"""
# 创建数据
x,y,coef,bias = build_data()
# 初始化参数
w,b = initialize(x.shape[0])
# 设置训练参数
lr = 0.1 # 学习率
epoch = 500 # 迭代次数
# 训练数据
# 迭代循环
for i in range(epoch):
total_loss = 0 # 误差总和
count = 0 # 训练次数
# 批次循环
for batch_x,batch_y_true in load_data(x,y):
count += 1
# 代入线性回归得出预测值
batch_y_pred = regressor(x,w,b)
# 计算损失函数
loss = MSE(batch_y_pred,btach_y_true)
tatol_loss += loss
# 梯度清零
if w.grad is not None:
w.data.zero_()
if b.grad is not None:
b.data.zero_()
# 反向传播 计算梯度
loss.backward()
# 梯度更新 得出预测w和b
w,b = optim_step(w,b,w.grad,b.grad,lr)
# 打印数据
print(f'epoch:{i},loss:{total_loss/count}')
return w.data,b.data,coef,bias
def detect(x,w,b):
"""
预测数据
"""
return torch.matmul(x.type(torch.float32),w) + b
if __name__ == "__main__":
w,b,coef,bias = train()
print(f'真实系数:{coef},真实偏置:{bias}')
print(f'预测系数:{w},预测偏置:{b}')
y_pred = detect(torch.tensor([[4,5,6,6],[7,8,8,9]]),w,b)
print(f'y_pred:{y_pred}')
这个手动实现的过程对深度学习的思维很有帮助,现在结合上一篇的官方数据加载器,我们将它重构:
import torch
from sklearn.datasets import make_regression
from torch.utils.data import DataLoader,TensorDataset
def build_dataset():
"""
构建数据集
"""
noise = random.randint(1,5)
bias = 14.5
X,y,coef = make_regression(n_samples=1000,
n_features=4,
coef=True,
bias=bias,
noise=noise,
random_state=66)
X = torch.tensor(X,dtype=torch.float32)
y = torch.tensor(y,dtype=torch.float32)
return X,y,coef,bias
def train():
"""
训练数据集
"""
# 01 加载数据
X,y,coef,bias = build_dataset()
# 02 构建模型
"""
torch.nn.Linear(in_features,out_features)
in_features 输入的特征数量——w数量
out_features 输出的数量——y数量
"""
model = torch.nn.Linear(X.shape[1],1)
# 03 初始化参数
# 若不手动初始化则会自动初始化 这里选择自动初始化
# 04 构建损失函数
loss_fn = torch.nn.MSELoss() # 均方误差
# 05 构建优化器
sgd = torch.optim.SGD(model.parameter(),lr) # 传入模型参数和学习率
# 06 训练
epoch = 500
# 06.1 循环次数
for i in range(epoch):
# 06.2 计算损失
data_loader = DataLoader(data,batch_size=16,shuffle=True) # 按小批次划分并随机打乱
total_loss = 0
count = 0
for x,y in data_loader:
count += 1
y_pred = model(x) # 模型预测的输出值
loss = loss_fn(y_pred,y)
total_loss += loss
# 06.3 梯度清零
sgd.zero_grad()
# 06.4 反向传播
loss.backward()
# 06.5 更新参数
sgd.step()
print(f'epoch:{epoch},loss:{total_loss/count}') # 打印每一批次的结果
# 07 保存模型参数
print(f'weight:{model.weight},bias:{model.bias}')
print(f'true_weight:{coef},true_bias:{bias}')
if __name__ == '__main__':
train()
可见得方便了许多。
二、模型的加载和保存
训练一个模型通常需要大量的数据、时间和计算资源。通过保存训练好的模型,可以满足后续的模型部署、模型更新、迁移学习、训练恢复等各种业务需要求。
1、标准网络模型构建
class MyModle(nn.Module):
"""
标准网络模型构建
"""
def __init__(self, input_size, output_size):
super(MyModle, self).__init__()
self.fc1 = nn.Linear(input_size, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.fc2(x)
output = self.fc3(x)
return output
2、序列化保存对象和加载
import torch
import torch.nn as nn
class MyModle(nn.Module):
"""
标准网络模型构建
"""
def __init__(self, input_size, output_size):
super(MyModle, self).__init__()
self.fc1 = nn.Linear(input_size, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.fc2(x)
output = self.fc3(x)
return output
def train01():
"""
保存
"""
model = MyModle(10,5)
# 序列化方式保存模型对象
torch.save(model, "./data/model.pkl")
def detect01():
"""
加载
"""
# 注意设备问题
model = torch.load("./data/model.pkl", map_location="cpu")
print(model)
if __name__ == "__main__":
test01()
test02()
3、保存模型参数
更常用的保存和加载方式,只需要保存权重、偏执、准确率等相关参数,都可以在加载后打印观察。
import torch
import torch.nn as nn
import torch.optim as optim
class MyModle(nn.Module):
"""
标准网络模型构建
"""
def __init__(self, input_size, output_size):
super(MyModle, self).__init__()
self.fc1 = nn.Linear(input_size, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.fc2(x)
output = self.fc3(x)
return output
def train02():
model = MyModle(input_size=128, output_size=32)
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 自己构建要存储的模型参数
save_dict = {
"init_params": {
"input_size": 128, # 输入特征数
"output_size": 32, # 输出特征数
},
"accuracy": 0.99, # 模型准确率
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
}
torch.save(save_dict, "model_dict.pth")
def detect02():
save_dict = torch.load("model_dict.pth")
model = MyModle(
input_size=save_dict["init_params"]["input_size"],
output_size=save_dict["init_params"]["output_size"],
)
# 初始化模型参数
model.load_state_dict(save_dict["model_state_dict"])
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 初始化优化器参数
optimizer.load_state_dict(save_dict["optimizer_state_dict"])
# 打印模型信息
print(save_dict["accuracy"])
print(model)
if __name__ == "__main__":
train02()
detect02()