文章目录
- 一:cifar10数据集介绍
- 二:代码
- (1)数据加载脚本编写
- (2)模型搭建
- ①:VGG
- ②:ResNet
- ③:MobileNetV1
- ④:InceptionNet
- (3)训练脚本
一:cifar10数据集介绍
cifar10数据集:CIFAR-10数据集是8000万微小图片的标签子集
- 数据集下载链接
数据集由6万张32*32的彩色图片组成,一共有10个类别。每个类别6000张图片。其中有5万张训练图片及1万张测试图片。使用torchvision.datasets.CIFAR10
可进行下载
train_dataset = datasets.CIFAR10(root=parametes.data_path, train=True,
transform=transforms.ToTensor(), download=False)
test_dataset = datasets.CIFAR10(root=parametes.data_path, train=False,
transform=transforms.ToTensor(), download=False)
如下,下载后会生成5个训练块文件和1个测试块文件,每一个块文件10000张图片
这种文件并非图像文件,为了后续更好的训练,且能查看到训练过程中的图像变化,所以我们需要把它们转换为图像文件,转换代码如下
import pickle
import numpy as np
import glob
import os
import cv2
# 提取函数
def unpickle(file):
with open(file, 'rb') as fo:
dict = pickle.load(fo, encoding='bytes')
return dict
# 类别名字
label_name = [
"airplane",
"automobile",
"bird",
"cat",
"deer",
"dog",
"frog",
"horse",
"ship",
"truck"
]
# 使用train_list和test_list拿到对应文件名字
train_list = glob.glob('./cifar-10-batches-py/data_batch_*')
# test_list = glob.glob('./cifar-10-batches-py/test_batch')
# 保存路径
save_path = './train/'
# 遍历
for l in train_list:
print(l)
l_dict = unpickle(l)
"""
映射为字典,有4个key
batch_label:该图片属于哪一个batch
labels:所属类别
data:图像数组
filenames:文件名
"""
print(l_dict.keys())
for im_idx, im_data in enumerate(l_dict[b'data']):
# 获取类别和名字
im_label = l_dict[b'labels'][im_idx]
im_name = l_dict[b'filenames'][im_idx]
# print(im_label, im_name, im_data)
# 映射为英文名
im_label_name = label_name[im_label]
# 将此一维数组转为三维并交换维度
im_data = np.reshape(im_data, [3, 32, 32])
im_data = np.transpose(im_data, (1, 2, 0))
# cv2.imshow("im_data", cv2.resize(im_data, (200, 200)))
# cv2.waitKey(0)
# 每个文件夹下创建对应类别文件夹,相同类别图片写入相同文件夹
if not os.path.exists(os.path.join(save_path, im_label_name)):
os.mkdir(os.path.join(save_path, im_label_name))
cv2.imwrite(os.path.join(save_path, im_label_name, im_name.decode('utf-8')), im_data)
转换后文件结构如下
二:代码
(1)数据加载脚本编写
import torchvision.datasets
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import os
from PIL import Image
import numpy as np
import glob
# 类别名字
label_name = [
"airplane",
"automobile",
"bird",
"cat",
"deer",
"dog",
"frog",
"horse",
"ship",
"truck"
]
# 类比名字映射索引
label_dict = {}
for idx, name in enumerate(label_name):
label_dict[name] = idx
def default_loader(path):
return Image.open(path).convert("RGB")
train_transforms = transforms.Compose([
transforms.RandomHorizontalFlip(),
transforms.ToTensor()
])
class MyDataset(Dataset):
"""
im_list:是一个列表,每一个元素是图片路径
transform:对图片进行增强
loader:使用PIL对图片进行加载
"""
def __init__(self, im_list, transform=None, loader=default_loader):
super(MyDataset, self).__init__()
# imgs为二维列表,每一个子列表中第一个元素存储im_list,第二个通过label_dict映射为索引
imgs = []
for im_item in im_list:
# 路径'./data/test/airplane/aeroplane_s_000002.png'中倒数第二个是标签名
im_label_name = im_item.split("\\")[-2]
imgs.append([im_item, label_dict[im_label_name]])
self.imgs = imgs
self.transform = transform
self.loader = loader
def __getitem__(self, index):
im__path, im_label = self.imgs[index]
# 会调用PIL加载图片数据
im_data = self.loader(im__path)
# 如果给了transoform那么就对图片进行增强
if self.transform is not None:
im_data = self.transform(im_data)
return im_data, im_label
def __len__(self):
return len(self.imgs)
im_train_list = glob.glob(r'./data/train/*/*.png')
im_test_list = glob.glob(r'./data/test/*/*.png')
train_dataset = MyDataset(im_train_list, transform=train_transforms)
test_dataset = MyDataset(im_test_list, transform=transforms.ToTensor())
if __name__ == '__main__':
im_train_list = glob.glob(r'./data/train/*/*.png')
im_test_list = glob.glob(r'./data/test/*/*.png')
train_dataset = MyDataset(im_train_list, transform=train_transforms)
test_dataset = MyDataset(im_test_list, transform=transforms.ToTensor())
print(len(train_dataset))
print(len(test_dataset))
train_loader = DataLoader(dataset=train_dataset, batch_size=6, shuffle=True, num_workers=0)
test_loader = DataLoader(dataset=test_dataset, batch_size=6, shuffle=False, num_workers=0)
"""
train_transforms = transforms.Compose([
transforms.RandomResizedCrop((28, 28)),
transforms.RandomHorizontalFlip(),
transforms.RandomVerticalFlip(),
transforms.RandomRotation(90),
transforms.RandomGrayscale(0.1),
transforms.ColorJitter(0.3, 0.3, 0.3, 0.3),
transforms.ToTensor()
])
train_dataset = torchvision.datasets.ImageFolder(root='./data/train', transform=train_transforms)
test_dataset = torchvision.datasets.ImageFolder(root='./data/test', transform=transforms.ToTensor)
print(train_dataset.classes[: 5])
print("-"*30)
print(train_dataset.class_to_idx)
print("-"*30)
print(train_dataset.imgs[: 5])
"""
(2)模型搭建
①:VGG
# 给定字典选择模型
cfgs = {
'vgg11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
'vgg13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
'vgg16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
#'vgg16': [16, 16, 'M', 32, 32, 'M', 64, 64, 64, 'M', 128, 128, 128, 'M', 128, 128, 128, 'M'],
'vgg19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}
# 生成卷积层
def create_conv(cfg):
layers = []
in_chaneels = parametes.init_in_chaneels
# 遍历列表
for c in cfg:
# 如果遇到"M",则增加一个最大池化层,其kernel_size=2, stride=2
if c == 'M':
layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
# 如果是数字,则代表该卷积核输出,卷积核统一为3×3,填充为1
else:
Conv2d = nn.Conv2d(in_channels=in_chaneels, out_channels=c, kernel_size=3, padding=1)
layers += [Conv2d, nn.ReLU(True)]
# 下一个输入通道等于现在的输出通道
in_chaneels = c
return nn.Sequential(*layers)
# VGG16网络
class VGG16(nn.Module):
def __init__(self, conv, num_classes, init_weights=False):
super(VGG16, self).__init__()
self.conv = conv
self.fc = nn.Sequential(
# 图片输入为224×224的前提下
nn.Linear(512*7*7, 4096),
nn.ReLU(True),
nn.Dropout(p=0.5),
nn.Linear(4096, 4096),
nn.ReLU(True),
nn.Dropout(p=0.5),
nn.Linear(4096, num_classes)
)
if init_weights:
self._initialize_weights()
def forward(self, x):
x = self.conv(x)
x = torch.flatten(x, start_dim=1)
x = self.fc(x)
return x
# 参数初始化(KAIMING)
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.constant_(m.bias, 0)
# 初始化网络
cfg = model.cfgs['vgg16']
net = model.VGG16(model.create_conv(cfg), parametes.num_classes, True)
net = net.to(parametes.device)
如下是VGG13,这种写法比较臃肿但清晰
import torch
import torch.nn as nn
import torch.nn.functional as F
class VGG13(nn.Module):
def __init__(self):
super(VGG13, self).__init__()
# N * 3 * 32 * 32
self.conv1_1 = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU()
)
self.conv1_2 = nn.Sequential(
nn.Conv2d(64, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU()
)
self.max_pooling1 = nn.MaxPool2d(kernel_size=2, stride=2)
# N * 64 * 16 * 16
self.conv2_1 = nn.Sequential(
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU()
)
self.conv2_2 = nn.Sequential(
nn.Conv2d(128, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU()
)
self.max_pooling2 = nn.MaxPool2d(kernel_size=2, stride=2)
# N * 128 * 8 * 8
self.conv3_1 = nn.Sequential(
nn.Conv2d(128, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU()
)
self.conv3_2 = nn.Sequential(
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU()
)
self.max_pooling3 = nn.MaxPool2d(kernel_size=2, stride=2)
# N * 256 * 4 * 4
self.conv4_1 = nn.Sequential(
nn.Conv2d(256, 512, kernel_size=3, padding=1),
nn.BatchNorm2d(512),
nn.ReLU()
)
self.conv4_2 = nn.Sequential(
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.BatchNorm2d(512),
nn.ReLU()
)
self.max_pooling4 = nn.MaxPool2d(kernel_size=2, stride=2)
# N * 512 * 2 * 2
self.conv5_1 = nn.Sequential(
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.BatchNorm2d(512),
nn.ReLU()
)
self.conv5_2 = nn.Sequential(
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.BatchNorm2d(512),
nn.ReLU()
)
self.max_pooling5 = nn.MaxPool2d(kernel_size=2, stride=2)
# N * 512 * 1 * 1
# 全连接层
self.fc = nn.Sequential(
nn.Linear(512 * 1 * 1, 4096),
nn.ReLU(True),
nn.Dropout(p=0.5),
nn.Linear(4096, 4096),
nn.ReLU(True),
nn.Dropout(p=0.5),
nn.Linear(4096, 10)
)
def forward(self, x):
out = self.conv1_1(x)
out = self.conv1_2(out)
out = self.max_pooling1(out)
out = self.conv2_1(out)
out = self.conv2_2(out)
out = self.max_pooling2(out)
out = self.conv3_1(out)
out = self.conv3_2(out)
out = self.max_pooling3(out)
out = self.conv4_1(out)
out = self.conv4_2(out)
out = self.max_pooling4(out)
out = self.conv5_1(out)
out = self.conv5_2(out)
out = self.max_pooling5(out)
out = torch.flatten(out, start_dim=1)
out = self.fc(out)
return out
②:ResNet
import torch
import torch.nn as nn
import torch.nn.functional as F
# 基本跳连单元
class ResBlock(nn.Module):
def __init__(self, in_channel, out_channel, stride=1):
super(ResBlock, self).__init__()
# 主干分支
self.layer = nn.Sequential(
nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride, padding=1),
nn.BatchNorm2d(out_channel),
nn.ReLU(),
nn.Conv2d(out_channel, out_channel, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(out_channel),
)
# 跳连分支
self.shortcut = nn.Sequential()
# 如果输入通道和输出通道不相等或者步长不是1,那么跳连分支必须再进行卷积
if in_channel != out_channel or stride > 1:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride, padding=1),
nn.BatchNorm2d(out_channel),
)
def forward(self, x):
# 主干
out1 = self.layer(x)
# 分支
out2 = self.shortcut(x)
# 最终结果 = 主干+分支
out = out1 + out2
out = F.relu(out)
class ResNet(nn.Module):
def make_layer(self, block, out_channel, stride, num_block):
layer_list = []
for i in range(num_block):
if i == 0:
in_stride = stride
else:
in_stride = 1
layer_list.append(block(self.in_channel, out_channel, in_stride))
self.in_channel = out_channel
return nn.Sequential(*layer_list)
def __init__(self):
super(ResNet, self).__init__()
# 第一层采用普通卷积
self.conv1 = nn.Sequential(
nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(32),
nn.ReLU
)
# 四层ResNet
self.in_channel = 32
self.layer1 = self.make_layer(ResBlock, 64, 2, 2)
self.layer2 = self.make_layer(ResBlock, 128, 2, 2)
self.layer3 = self.make_layer(ResBlock, 256, 2, 2)
self.layer4 = self.make_layer(ResBlock, 512, 2, 2)
self.fc = nn.Linear(512, 10)
def farward(self, x):
out = self.conv1(x)
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = F.avg_pool2d(out, 2)
out = out.view(out.size(0), -1)
out = self.fc(out)
return out
③:MobileNetV1
import torch
import torch.nn as nn
import torch.nn.functional as F
class MobileNet(nn.Module):
# 深度可分离卷积由分组卷积和点卷积构成(基本单元)
def conv_dw(self, in_channel, out_channel, stride):
return nn.Sequential(
# 分组卷积
nn.Conv2d(in_channel, in_channel, kernel_size=3, stride=stride, padding=1, groups=in_channel,
bias=False),
nn.BatchNorm2d(in_channel),
nn.ReLU(),
# 点卷积
nn.Conv2d(in_channel, out_channel, kernel_size=1, stride=1, padding=0, bias=False),
nn.BatchNorm2d(out_channel),
nn.ReLU(),
)
def __init__(self):
super(MobileNet, self).__init__()
# 标准卷积层
self.conv1 = nn.Sequential(
nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(32),
nn.ReLU()
)
# 深度可分离卷积
self.convdw2 = self.conv_dw(32, 32, 1)
self.convdw3 = self.conv_dw(32, 64, 2)
self.convdw4 = self.conv_dw(64, 64, 1)
self.convdw5 = self.conv_dw(64, 128, 2)
self.convdw6 = self.conv_dw(128, 128, 1)
self.convdw7 = self.conv_dw(128, 256, 2)
self.convdw8 = self.conv_dw(256, 256, 1)
self.convdw9 = self.conv_dw(256, 512, 2)
# 全连接层
self.fc = nn.Linear(512, 10)
def forward(self, x):
out = self.conv1(x)
out = self.convdw2(out)
out = self.convdw3(out)
out = self.convdw4(out)
out = self.convdw5(out)
out = self.convdw7(out)
out = self.convdw8(out)
out = self.convdw9(out)
out = F.avg_pool2d(out, 2)
out = out.view(-1, 512)
out = self.fc(out)
return out
④:InceptionNet
import torch
import torch.nn as nn
import torch.nn.functional as F
"""
InceptionNet属于网中网结构,对输入进行
不同分支的卷积,最后进行concat
"""
def ConvBNRelu(in_channel, out_channel, kernel_size):
return nn.Sequential(
nn.Conv2d(in_channel, out_channel, kernel_size=kernel_size, stride=1, padding=kernel_size//2),
nn.BatchNorm2d(out_channel),
nn.ReLU()
)
class BaseInception(nn.Module):
def __init__(self, in_channel, out_channel_list, reduce_channel_list):
super(BaseInception, self).__init__()
# 定义4个分支
# 第1个分支
self.branch1_conv = ConvBNRelu(in_channel, out_channel_list[0], 1)
# 剩余分支为减少计算量必须先进行压缩
self.branch2_conv1 = ConvBNRelu(in_channel, reduce_channel_list[0], 1)
self.branch2_conv2 = ConvBNRelu(reduce_channel_list[0], out_channel_list[1], 3)
self.branch3_conv1 = ConvBNRelu(in_channel, reduce_channel_list[1], 1)
self.branch3_conv2 = ConvBNRelu(reduce_channel_list[1], out_channel_list[2], 5)
self.branch4_pool = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
self.branch4_conv = ConvBNRelu(in_channel, out_channel_list[3], 3)
def forward(self, x):
out1 = self.branch1_conv(x)
out2 = self.branch2_conv1(x)
out2 = self.branch3_conv2(out2)
out3 = self.branch3_conv1(x)
out3 = self.branch3_conv2(out3)
out4 = self.branch4_pool(x)
out4 = self.branch4_conv(out4)
out = torch.cat([out1, out2, out3, out4], dim=1)
return out
class InceptionNet(nn.Module):
def __init__(self):
super(InceptionNet, self).__init__()
self.block1 = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=1),
nn.BatchNorm2d(64),
nn.ReLU()
)
self.block2 = nn.Sequential(
nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(),
)
self.block3 = nn.Sequential(
BaseInception(in_channel=128, out_channel_list=[64, 64, 64, 64], reduce_channel_list=[16, 16]),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
self.block4 = nn.Sequential(
BaseInception(in_channel=256, out_channel_list=[96, 96, 96, 96], reduce_channel_list=[32, 32]),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
self.fc = nn.Linear(384, 10)
def forward(self, x):
out = self.block1(x)
out = self.block1(out)
out = self.block1(out)
out = self.block1(out)
out = F.avg_pool2d(out, 2)
out = out.view(out.size(0), -1)
out = self.fc(out)
return out
(3)训练脚本
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import os
from tensorboardX import SummaryWriter
from VGG13 import VGG13
from dataloader import train_dataset, test_dataset
######################## 参数指定 ###################################
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 设备
batch_size = 64 # batch_size大小
epochs = 200 # 总训练轮数
# 训练和验证数据加载
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
# 网络结构指定
net = VGG13()
net = net.to(device)
# 损失函数(交叉熵)
loss_function = nn.CrossEntropyLoss()
# 优化器(Adam)
optimizer = torch.optim.Adam(net.parameters(), lr=0.01)
# 学习率衰减
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.9)
# 日志
logger = SummaryWriter('./log')
step = 1
# 训练
for epoch in range(1, epochs):
print("-----------当前epoch:{}-----------".format(epoch))
net.train()
for i, (X, y) in enumerate(train_loader):
print("\r-----------当前trainbatch:{}/{}-----------".format(i, (len(train_loader))), end="")
X = X.to(device)
y = y.to(device)
outputs = net(X)
loss = loss_function(outputs, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
_, pred = torch.max(outputs.data, dim=1)
correct = pred.eq(y.data).cpu().sum()
accurate = correct * 100.0 / batch_size
# print("train_loss: ", loss.item(), "accurate: ", accurate.item())
logger.add_scalar('loss/train', loss, step)
logger.add_scalar('accurate/train', accurate, step)
net.eval()
sum_loss = 0.
sum_correct = 0
with torch.no_grad():
for i, (X, y) in enumerate(test_loader):
print("\r-----------当前testbatch:{}/{}-----------".format(i, (len(test_loader))), end="")
X = X.to(device)
y = y.to(device)
outputs = net(X)
_, pred = torch.max(outputs.data, dim=1)
correct = pred.eq(y.data).cpu().sum()
loss = loss_function(outputs, y)
sum_loss += loss.item()
sum_correct += correct.item()
test_loss = sum_loss * 1.0 / len(test_loader)
test_accurate = sum_correct * 100.0 / len(test_loader) / batch_size
logger.add_scalar('loss/test', test_loss, epoch)
logger.add_scalar('accurate/test', test_accurate, epoch)
print("test_loss: ", test_loss.item(), "test_accurate: ", test_accurate.item())
step += 1
# 学习率衰减
scheduler.step()
logger.close()