八 手动构建模型实战
1 构建数据集
epoch:使用训练集的全部数据对模型进行一次完整的训练,被称为一代训练
batch:使用训练集中的部分样本对模型权重进行一次反向传播声望参数更新,这部分样本被称为一批数据
iteration:使用一个batch数据对模型进行一次参数更新的过程,被称为一次训练
1 .构建数据集
API
使用 sklearn 的 make_regression 方法来构建一个模拟的回归数据集。
make_regression 方法的参数解释:
- n_samples: 生成的样本数量,决定了数据集的规模。
- n_features: 生成的特征数量,决定了数据维度。
- noise: 添加到目标变量的噪声标准差,用于模拟真实世界数据的不完美。
- coef: 如果为 True, 会返回生成数据的真实系数,用于了解特征与目标变量间的真实关系。
- random_state: 随机数生成的种子,确保在多次运行中能够复现相同的结果。
返回:
- X: 生成的特征矩阵。
- y: 生成的目标变量。
- coef: 如果在调用时 coef 参数为 True,则还会返回真实系数。
def build_data():
"""
description: 构建数据集
"""
noise=14.6# 噪声
n_samples=1000# 样本数量
b=0.5
X, y,coef = make_regression(n_samples=n_samples, n_features=4,coef=True,bias=b, noise=noise, random_state=666)
X=torch.tensor(X,dtype=torch.float32,requires_grad=True)
y=torch.tensor(y,dtype=torch.float32,requires_grad=True)
return X, y,coef,b
2 .构建数据加载器
将数据分批次加载到模型进行训练
def data_loader(x,y):
"""
description: 数据加载器
"""
# 配置参数
batch_size = 16#一个批次数量
n_samples=x.shape[0]#len(x)
n_batches = math.ceil(n_samples / batch_size)#一轮的训练次数
indexs=[i for i in range(n_samples)]
random.shuffle(indexs)
for i in range(0,n_batches):
index=indexs[i*batch_size:min((i+1)*batch_size,n_samples)]
yield x[index],y[index]
if __name__ == "__main__":
# 构建数据集
X, y, coef = build_dataset()
# 分批次记载数据
for x, y in data_loader(X, y):
print(x, y)
2 模型函数
def myregreser(x,w,b):
return x@w+b#一个容器中装的每一条样本的预测值
3 损失函数
def MSE(y_pred,y_true):
"""
计算均方误差
"""
return torch.mean((y_pred-y_true)**2)
4 初始化
def initialize(n_features):
"""
初始化参数
"""
torch.manual_seed(666)
w=torch.randn(n_features,requires_grad=True,dtype=torch.float32)
# print(w)
b=torch.tensor(14.5,requires_grad=True,dtype=torch.float32)
return w,b
5 优化器
def optim_step(w,b,dw,db,lr):
# 更新梯度 朝着梯度下降的方向更新梯度值
w.data=w.data-lr*dw.data
b.data=b.data-lr*db.data
6 训练函数
def train():
# 1.生成数据
x,y,coef,bais=build_data()
# 2.初始化参数
w,b=initialize(x.shape[1])
# 3.定义训练参数
lr=0.01
epoch=100
for i in range(epoch):
e=0
count=0
for batch_x,batch_y_true in data_loader(x,y):
y_batch_pred=myregreser(batch_x,w,b)
loss=MSE(y_batch_pred,batch_y_true)
e+=loss
count+=1
# 梯度清零
if w.grad is not None:
w.data.zero_()
if b.grad is not None:
b.data.zero_()
#反向传播(梯度计算)
loss.backward()
# 梯度更新
optim_step(w,b,w.grad,b.grad,lr)
print(f"epoch:{i},loss:{e/count}")
return w,b,coef,bais
def detect(x,w,b):
return torch.matmul(x.type(torch.float32),w)+b
if __name__ == '__main__':
w,b,coef,bais=train()
print(w,b)
print(coef,bais)
y_pred=detect(torch.tensor([[1,2,3,2],[1,2,3,2]]),w,b)
print(y_pred)
九 模型定义组件
1 基本组件
1.损失函数组件
import torch
import torch.nn as nn
import torch.optim as optim
def test001():
y_true=torch.tensor([1,2,3,4,5,6],dtype=torch.float32)
y_pred=torch.tensor([2,3,4,5,6,7],dtype=torch.float32)
loss=nn.MSELoss()#均方误差工具
e=loss(y_true,y_pred)#计算损失
print(e)
2.线性层组件
def test002():
model=nn.Linear(4,1)#w1x1+w2x2+w3x3+w4x4+b=y,隐式操作:w1,w2,w3,w4,b已经初始化过了
# print(model.parameters())
x=torch.tensor([[1,2,3,4],[1,2,3,4],[1,2,3,4]],dtype=torch.float32)
y=model(x)
print(y)
3.优化器方法
API:
import torch.optim as optim
- params=model.parameters():模型参数获取;
- optimizer=optim.SGD(params):优化器方法;
- optimizer.zero_grad():梯度清零;
- optimizer.step():参数更新;
def test003():
# 一次完整的梯度更新
# 1.构建数据集
input_x=torch.randint(1,10,(400,5)).type(torch.float32)
target=torch.randint(1,10,(400,1)).type(torch.float32)
# 2.线性层模型
model=nn.Linear(5,1)
# 3.优化器对象
sgd=optim.SGD(model.parameters(),lr=0.01)
# 4.预测
y_pred=model(input_x)
# 5.损失函数
loss_fn=nn.MSELoss()
loss=loss_fn(y_pred,target)
print(loss)
# 6.梯度清零
sgd.zero_grad()#等价于w.grad.zero_()
# 7.反向传播
loss.backward()#1.求损失函数的导函数 2.求梯度
# 8.梯度更新
sgd.step()
# 9.访问更新后的w
print(model.weight)
if __name__=='__main__':
test003()
2 数据加载器
1.构建数据类
需要继承 torch.utils.data.Dataset 并实现以下方法:
①.__init __
方法
初始化数据集对象:加载数据或者定义如何从存储中获取数据的路径和方法
def __init__(self, data, labels):
self.data = data
self.labels = labels
②.__len__
方法
返回样本数量,让Dataloader加载器能够知道数据集的大小
def __len__(self):
return len(self.data)
③.__getitem__
方法
根据索引返回样本:将从数据集中提取一个样本,并可能对样本进行预处理或变换
def __getitem__(self, index):
sample = self.data[index]
label = self.labels[index]
return sample, label
整体:
import torch
from torch.utils.data import Dataset,DataLoader
class my_dataset(Dataset):
def __init__(self,x,y):
super(my_dataset,self).__init__()
self.data = x
self.labels = y
def __getitem__(self, index):
return self.data[index],self.labels[index]
def __len__(self):
return len(self.data)
if __name__ == '__main__':
# data=my_dataset()==>__init__
# len(data)==>__len__
# data[1]==>__getitem__
x = torch.randn(100, 3)
print(x)
y = torch.randn(100, 1)
# 数据集
data=my_dataset(x,y)
count=len(data)
print(count)
print(data[50])
# 数据加载器
loader=DataLoader(data,batch_size=16,shuffle=True)
for x,y in loader:
print(x.shape,y.shape)
2.数据加载器
from torch.utils.data import Dataset,DataLoader
loader=DataLoader(data,batch_size=16,shuffle=True)
for x,y in loader:
print(x.shape,y.shape)
3 数据集加载案例
1.加载excel数据集
import torch
from torch.utils.data import Dataset,DataLoader
import pandas as pd
class my_excel_dataset(Dataset):
def __init__(self,path):
super(my_excel_dataset,self).__init__()
"""
把excel文件读取出来然后想办法
把特征值保存到data中
把目标值保存到labels中
至于如何处理 看实际需求
"""
data_pd=pd.read_excel(path)
data_pd.dropna(axis=1,how='all')
data_pd.columns=["zubie","st_id","st_name","fengong","expresion","ppt_make","answer","code_show","score","comments"]
data_pd=data_pd.drop(["zubie","st_id","st_name","fengong","comments"],axis=1)
# print(data_pd.head())
self.data = torch.tensor(data_pd.iloc[:,:-1].to_numpy(),dtype=torch.float32)
self.labels = torch.tensor(data_pd.iloc[:,-1].to_numpy(),dtype=torch.float32)
def __getitem__(self, index):
return self.data[index],self.labels[index]
def __len__(self):
return len(self.data)
if __name__ == '__main__':
data=my_excel_dataset("./data/21级大数据答辩成绩表.xlsx")
data_loader=DataLoader(data,batch_size=4,shuffle=True)
for x,y in data_loader:
print(x,y)
2.加载图片数据集
# import os
"""
os模块的API
"""
# for root,dir,files in os.walk("./data"):
# print(root,dir,files,"666666666666666")
# path=os.path.join("./data","1.png")
# # path="./data"+"/"+"1.png"
# print(path)
#
# _,str=os.path.split("./data/animal/cat")
# print(str)
"""
enumerate 生成枚举下标
"""
# x=["20","hello","9999"]
# for i in range(len(x)):
# print(i,x[i])
# x=["20","hello","9999"]
# for i,el in enumerate(x):
# print(i,el)
import os
from torch.utils.data import Dataset,DataLoader
import cv2
import torch
# import PIL as Image
class my_image_dataset(Dataset):
def __init__(self,path):
self.path=path
self.classname=[]
self.data=[]
self.label=[]
for root,dirs,files in os.walk(path):
if root==path:
self.classname=dirs
# print(dirs)
# return
else:
for file in files:
file_path=os.path.join(root,file)
self.data.append(file_path)
class_id=self.classname.index(os.path.split(root)[1])
self.label.append(class_id)
def __len__(self):
return len(self.data)
def __getitem__(self, index):
img_path=self.data[index]
label=self.label[index]
img=cv2.imread(img_path)
img=cv2.resize(img,(336,336))
# print(img.shape)
img=torch.from_numpy(img)
#HWC 2 CHW
img=img.permute(2,0,1)
return img,label
# return
# pass
if __name__=="__main__":
data=my_image_dataset("./data/animal")
print(data[500])
print(len(data))
print(data.classname)
train_loader=DataLoader(data,batch_size=32,shuffle=True)
for x,y in train_loader:
print(x.shape)
# x[0]
print(y.shape)
3.加载官方数据集
官方地址:Datasets — Torchvision 0.20 documentation
from torch.utils.data import Dataset,DataLoader
from torchvision import transforms,datasets
def test01():
transform=transforms.Compose([transforms.ToTensor()])
datasets.MNIST(
root="./data",
train=True,
download=True,
transform=transform
)
for x,y in DataLoader(data,batch_size=4,shuffle=True):
print(x.shape,y.shape)
if __name__ == "__main__":
test01()
4 数据增强
提高模型泛化能力(鲁棒性)的一种有效方法,可以模拟更多的训练样本减少过拟合风险
通过torchvision.transforms模块实现,官方提供
1.固定转换
参考:Illustration of transforms — Torchvision 0.20 documentation
2.概率控制转换
3.随机转换
4.数据增强整合
transforms.Compose()
5 重构线性回归
from sklearn.datasets import make_regression
import torch
from torch.utils.data import DataLoader, TensorDataset
def build_dataset():
bias=14.5
X,y=make_regression(n_samples=1000,n_features=4,coef=True,bias=bias,noise=0.5,random_state=666)
X=torch.tensor(X,dtype=torch.float32)
y=torch.tensor(y,dtype=torch.float32)
coef=torch.tensor(coef,dtype=torch.float32)
return X,y,coef,bias
def train():
#1.加载数据集
X,y,coef,bias=build_dataset()
data=TensorDataset(X,y)
#2.构建模型
model=torch.nn.Linear(X.shape[1],1)#会初始化几个w(相同的x)
#3.初始化参数,可以自己初始化,如果不自己初始化那么会自动初始化
#4.构建损失函数
loss_fn=torch.nn.MSELoss()
#5.构建优化器
sgd=torch.optim.SGD(model.parameters(),lr=0.1)
#6.训练(训练参数配置)
epochs=100
#6.1循环次数
for epoch in range(epochs):
#6.2计算损失
data_loader=DataLoader (data,batch_size=16,shuffle=True)
loss_toltal=0
count=0
for x,y in data_loader:
count+=1
y_pred=model(x)
loss=loss_fn(y_pred,y)
#6.3梯度清零
sgd.zero_grad()
#6.4反向传播
loss.backward()
#6.5更新参数
sgd.step()
#7.打印一轮损失(观察是否出了问题)
print(f"epoch:{epoch},loss:{loss}")
#8.保存模型参数
print(model.weight,model.bias)
print(coef,bias)
if __name__ == '__main__':
train()
十 模型的保存和加载
1 标准网络模型构建
import torch
import torch.nn as nn
class MyModle(nn.Module):
def __init__(self, input_size, output_size):
super(MyModle, self).__init__()
self.fc1 = nn.Linear(input_size, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.fc2(x)
output = self.fc3(x)
return output
2 序列化模型对象
def train():
model=MyModle(10,5)
torch.save(model,'./data/model.pkl')
model_dict=model.state_dict()
torch.save(model_dict,'./data/model.pt')
model=MyModle(10,5)
model_dict=torch.load('./data/model.pt')
model.load_state_dict(model_dict)
3 保存模型参数
def detect():
#"cuda:0"等价于"cuda"
model=torch.load('./data/model.pkl',map_location=torch.device('cpu'))
print(model)
if __name__ == "__main__":
train()
detect()