38实战Kaggle
比赛:图像分类 (CIFAR-10
)
比赛链接:CIFAR-10 - Object Recognition in Images | Kaggle
导入包
import os
import glob
import pandas as pd
import numpy as np
import torch
import torchvision
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt
from torch import nn
from d2l import torch as d2l
import liliPytorch as lp
import csv
预处理:数据集分析
# 获取精简数据集
#@save
d2l.DATA_HUB['cifar10_tiny'] = (d2l.DATA_URL + 'kaggle_cifar10_tiny.zip',
'2068874e4b9a9f0fb07ebe0ad2b29754449ccacd')
# 如果使用完整的Kaggle竞赛的数据集,设置demo为False
demo = True
if demo:
data_dir = d2l.download_extract('cifar10_tiny')
else:
data_dir = '../data/cifar-10/'
train_path = '../data/kaggle_cifar10_tiny/train.csv'
file_path = '../data/kaggle_cifar10_tiny/'
# 读取数据
train_data = pd.read_csv(train_path)
# 查看数据
print(train_data['label'].value_counts())
# """
# label
# automobile 112
# frog 107
# truck 103
# horse 102
# airplane 102
# deer 99
# bird 99
# ship 99
# cat 92
# dog 85
# """
1.数据处理与加载
train_path = '../data/kaggle_cifar10_tiny/train.csv'
test_path = '../data/kaggle_cifar10_tiny/test.csv'
file_path = '../data/kaggle_cifar10_tiny/'
# 统计label种类,并排序
cifar_labels = sorted(list(set(train_data['label'])))
# 将label对应编号
labels_to_num = dict(zip(cifar_labels, range(len(cifar_labels))))
# print(labels_to_num)
"""
{'airplane': 0, 'automobile': 1, 'bird': 2, 'cat': 3, 'deer': 4,
'dog': 5, 'frog': 6, 'horse': 7, 'ship': 8, 'truck': 9}
"""
# 将编号对应label,用于后续预测
num_to_labels = {value : key for key, value in labels_to_num.items()}
# print(num_to_labels)
"""
{0: 'airplane', 1: 'automobile', 2: 'bird', 3: 'cat', 4: 'deer',
5: 'dog', 6: 'frog', 7: 'horse', 8: 'ship', 9: 'truck'}
"""
def get_image_filenames(folder_path, extensions=['.png', '.jpg', '.jpeg']):
# 获取指定文件夹中的所有图片文件
image_files = []
for ext in extensions:
image_files.extend(glob.glob(os.path.join(folder_path, f'*{ext}')))
# 返回图片文件名列表
return [os.path.basename(image) for image in image_files]
def save_filenames_to_csv(filenames, csv_path):
with open(csv_path, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
# 写入CSV的第一行
writer.writerow(['id'])
# 写入每个文件名
for filename in filenames:
writer.writerow([filename])
# 获取测试图片名
test_images_path = '../data/kaggle_cifar10_tiny/test'
image_filenames = get_image_filenames(test_images_path)
# 保存到CSV文件
save_filenames_to_csv(image_filenames, file_path + 'test.csv')
class CifarDataset(Dataset):
def __init__(self, csv_path, file_path, mode='train', valid_ratio=0.2, resize_height=224, resize_width=224):
"""
初始化 LeavesDataset 对象。
参数:
csv_path (str): 包含图像路径和标签的 CSV 文件路径。
file_path (str): 图像文件所在目录的路径。
mode (str, optional): 数据集的模式。可以是 'train', 'valid' 或 'test'。默认值为 'train'。
valid_ratio (float, optional): 用于验证的数据比例。默认值为 0.2。
resize_height (int, optional): 调整图像高度的大小。默认值为 224。
resize_width (int, optional): 调整图像宽度的大小。默认值为 224。
"""
# 存储图像调整大小的高度和宽度
self.resize_height = resize_height
self.resize_width = resize_width
# 存储图像文件路径和模式(train/valid/test)
if mode == 'train' or mode == 'valid':
self.file_path = file_path + 'train/'
else:
self.file_path = file_path + 'test/'
self.mode = mode
# 读取包含图像路径和标签的 CSV 文件
self.data_info = pd.read_csv(csv_path, header=0)
# 获取样本总数
self.data_len = len(self.data_info.index)
# 计算训练集样本数
self.train_len = int(self.data_len * (1 - valid_ratio))
# 根据模式处理数据
if self.mode == 'train':
# 训练模式下的图像和标签
self.train_img = np.asarray(self.data_info.iloc[0:self.train_len, 0])
self.train_label = np.asarray(self.data_info.iloc[0:self.train_len, 1])
self.image_arr = self.train_img
self.label_arr = self.train_label
elif self.mode == 'valid':
# 验证模式下的图像和标签
self.valid_img = np.asarray(self.data_info.iloc[self.train_len:, 0])
self.valid_label = np.asarray(self.data_info.iloc[self.train_len:, 1])
self.image_arr = self.valid_img
self.label_arr = self.valid_label
elif self.mode == 'test':
# 测试模式下的图像
self.test_img = np.asarray(self.data_info.iloc[:, 0])
self.image_arr = self.test_img
# 获取图像数组的长度
self.len_image = len(self.image_arr)
print(f'扫描所有 {mode} 数据,共 {self.len_image} 张图像')
def __getitem__(self, idx):
"""
获取指定索引的图像和标签。
参数:
idx (int): 标签文本对应编号的索引
返回:
如果是测试模式,返回图像张量;
否则返回图像张量和标签。
"""
# 打开图像文件
if self.mode == 'test':
self.img = Image.open(self.file_path + str(self.image_arr[idx]))
else :
self.img = Image.open(self.file_path + str(self.image_arr[idx]) + '.png')
if self.mode == 'train':
# 训练模式下的数据增强
trans =torchvision.transforms.Compose([
torchvision.transforms.Resize((self.resize_height, self.resize_width)),
torchvision.transforms.RandomHorizontalFlip(p=0.5),
torchvision.transforms.RandomVerticalFlip(p=0.5),
torchvision.transforms.RandomResizedCrop(32, scale=(0.64, 1.0),ratio=(1.0, 1.0)),
torchvision.transforms.RandomRotation(degrees=30),
# torchvision.transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
# torchvision.transforms.RandomResizedCrop(size=self.resize_height, scale=(0.8, 1.0)),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
self.img = trans(self.img)
else:
# 验证和测试模式下的简单处理
trans = torchvision.transforms.Compose([
torchvision.transforms.Resize((self.resize_height, self.resize_width)),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
self.img = trans(self.img)
if self.mode == 'test':
return self.img
else:
# 获取标签文本对应的编号
self.label = labels_to_num[self.label_arr[idx]]
return self.img, self.label
def __call__(self, idx):
"""
使对象可以像函数一样被调用。
参数:
idx (int):标签文本对应编号的索引
返回:
调用 __getitem__ 方法并返回结果。
"""
return self.__getitem__(idx)
def __len__(self):
"""
获取数据集的长度。
返回:
数据集中图像的数量。
"""
return self.len_image
train_dataset = CifarDataset(train_path,file_path, mode='train', valid_ratio=0.1, resize_height=40, resize_width=40)
valid_dataset = CifarDataset(train_path, file_path, mode='valid',valid_ratio=0.1, resize_height=40, resize_width=40)
test_dataset = CifarDataset(test_path, file_path, mode='test',valid_ratio=0.1, resize_height=40, resize_width=40)
batch_size = 32
train_iter = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True, num_workers=0)
valid_iter = torch.utils.data.DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, drop_last=True, num_workers=0)
test_iter = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=False, num_workers=0)
2.模型训练
def train_batch(net, X, y, loss, trainer, devices):
"""使用多GPU训练一个小批量数据。
参数:
net: 神经网络模型。
X: 输入数据,张量或张量列表。
y: 标签数据。
loss: 损失函数。
trainer: 优化器。
devices: GPU设备列表。
返回:
train_loss_sum: 当前批次的训练损失和。
train_acc_sum: 当前批次的训练准确度和。
"""
# 如果输入数据X是列表类型
if isinstance(X, list):
# 将列表中的每个张量移动到第一个GPU设备
X = [x.to(devices[0]) for x in X]
else:
X = X.to(devices[0])# 如果X不是列表,直接将X移动到第一个GPU设备
y = y.to(devices[0])# 将标签数据y移动到第一个GPU设备
net.train() # 设置网络为训练模式
trainer.zero_grad()# 梯度清零
pred = net(X) # 前向传播,计算预测值
l = loss(pred, y) # 计算损失
l.sum().backward()# 反向传播,计算梯度
trainer.step() # 更新模型参数
train_loss_sum = l.sum()# 计算当前批次的总损失
train_acc_sum = d2l.accuracy(pred, y)# 计算当前批次的总准确度
return train_loss_sum, train_acc_sum# 返回训练损失和与准确度和
def train(net, train_iter, valid_iter, num_epochs, lr, wd, devices, lr_period, lr_decay,param_group=True):
# trainer = torch.optim.SGD(net.parameters(), lr=lr, momentum=0.9,weight_decay=wd)
trainer = torch.optim.Adam(net.parameters(), lr=lr,weight_decay=wd)
scheduler = torch.optim.lr_scheduler.StepLR(trainer, lr_period, lr_decay)
loss = nn.CrossEntropyLoss(reduction="none")
num_batches, timer = len(train_iter), d2l.Timer()
legend = ['train loss', 'train acc']
if valid_iter is not None:
legend.append('valid acc')
animator = lp.Animator(xlabel='epoch', xlim=[1, num_epochs],
legend=legend)
net = nn.DataParallel(net, device_ids=devices).to(devices[0])
for epoch in range(num_epochs):
net.train()
metric = lp.Accumulator(3)
for i, (features, labels) in enumerate(train_iter):
timer.start()
l, acc = train_batch(net, features, labels,loss, trainer, devices)
metric.add(l, acc, labels.shape[0])
timer.stop()
train_l = metric[0] / metric[2] # 计算训练损失
train_acc = metric[1] / metric[2] # 计算训练准确率
if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
animator.add(epoch + (i + 1) / num_batches,(train_l , train_acc,None))
if valid_iter is not None:
valid_acc = d2l.evaluate_accuracy_gpu(net, valid_iter)
animator.add(epoch + 1, (None, None, valid_acc))
scheduler.step()
print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
f'valid_acc {valid_acc:.3f}')
measures = (f'train loss {metric[0] / metric[2]:.3f}, '
f'train acc {metric[1] / metric[2]:.3f}')
if valid_iter is not None:
measures += f', valid acc {valid_acc:.3f}'
print(measures + f'\n{metric[2] * num_epochs / timer.sum():.1f}'
f' examples/sec on {str(devices)}')
3.定义超参数
# 定义模型
net = d2l.resnet18(len(cifar_labels),3)
devices, num_epochs, lr, wd = d2l.try_all_gpus(), 100, 3e-4, 5e-4
lr_period, lr_decay = 4, 0.9
train(net, train_iter, valid_iter, num_epochs, lr, wd, devices, lr_period, lr_decay)
plt.show()
# train loss 0.153, train acc 0.955, valid acc 0.469
# 873.5 examples/sec on [device(type='cuda', index=0)]
4.模型预测
# 针对测试集进行分类预测
def predict(net, data_loader, devices):
"""
使用模型进行预测
参数:
net (torch.nn.Module): 要进行预测的模型
data_loader (torch.utils.data.DataLoader): 数据加载器,用于提供待预测的数据
devices (list): 计算设备列表(CPU或GPU)
返回:
all_preds (list): 包含所有预测结果的列表
"""
all_preds = [] # 存储所有预测结果
net.to(devices[0]) # 将模型移动到指定设备
net.eval() # 设置模型为评估模式
with torch.no_grad(): # 在不需要计算梯度的上下文中进行
for X in data_loader: # 遍历数据加载器
X = X.to(devices[0]) # 将数据移动到指定设备
outputs = net(X) # 前向传播,计算模型输出
_, preds = torch.max(outputs, 1) # 获取预测结果
all_preds.extend(preds.cpu().numpy()) # 将预测结果添加到列表中
return all_preds # 返回所有预测结果
# 调用预测函数
predictions = predict(net, test_iter, devices)
# 映射预测结果到标签
mapped_predictions = [num_to_labels[int(i)] for i in predictions]
# 读取测试数据
test_data = pd.read_csv(test_path)
# 将预测结果添加到测试数据中
test_data['label'] = pd.Series(mapped_predictions)
# 创建提交文件
submission = pd.concat([test_data['id'], test_data['label']], axis=1)
# 保存提交文件
submission.to_csv(file_path + 'submission.csv', index=False)