摘要
本实验旨在构建并训练一个卷积神经网络(CNN)模型,用于图像分类任务。实验中使用了PyTorch框架,并在GPU(或MPS,如果可用)上执行训练和测试。实验结果表明,所构建的CNN模型在训练集和测试集上均取得了良好的性能。图像分类是计算机视觉领域中的一个基础任务,旨在识别图像中的主要对象。卷积神经网络(CNN)因其在图像处理任务中的卓越性能而广泛使用。本实验通过构建一个简单的CNN模型来解决图像分类问题。
方法
数据集
实验使用了自定义的数据集,数据集包含图像路径和对应的标签。数据集被分为训练集和测试集。
数据预处理
使用torchvision.transforms
对图像进行预处理,包括调整图像大小到256x256像素,并将图像转换为Tensor。
# 数据预处理转换
data_transforms = {
'train': transforms.Compose([
transforms.Resize([256, 256]), # 调整图像大小
transforms.ToTensor(), # 将图像转换为Tensor
]),
'valid': transforms.Compose([
transforms.Resize([256, 256]), # 调整图像大小
transforms.ToTensor(), # 将图像转换为Tensor
])
}
自定义数据集
# 自定义数据集类,继承自PyTorch的Dataset类
class data1(Dataset):
def __init__(self, file_path, transform=None):
self.file_path = file_path # 文件路径,假设是一个文本文件,每行包含图像路径和标签
self.imgs = [] # 存储图像路径的列表
self.labels = [] # 存储标签的列表
self.transform = transform # 预处理和数据增强的transform
# 打开文件并读取数据
with open(self.file_path) as f:
# 对文件中的每一行进行处理
samples = [x.strip().split(' ') for x in f.readlines()]
for img_path, label in samples:
self.imgs.append(img_path) # 将图像路径添加到imgs列表
self.labels.append(label) # 将标签添加到labels列表
def __len__(self):
# 返回数据集中的样本数
return len(self.imgs)
def __getitem__(self, idx):
# 根据索引idx获取一个样本
image = Image.open(self.imgs[idx]) # 打开图像文件
if self.transform: # 如果定义了transform
image = self.transform(image) # 应用transform到图像上
label = self.labels[idx] # 获取对应的标签
label = torch.from_numpy(np.array(label, dtype=np.int64)) # 将标签转换为整数类型的Tensor
return image, label # 返回图像和标签
创建数据加载器
# 创建训练数据的数据加载器
train_dataloader = DataLoader(
training_data, # 训练数据集实例
batch_size=64, # 每个批次的样本数量
shuffle=True # 是否在每个epoch开始时打乱数据
)
# 创建测试数据的数据加载器
test_dataloader = DataLoader(
test_data, # 测试数据集实例
batch_size=64, # 每个批次的样本数量
shuffle=True # 是否在每个epoch开始时打乱数据
)
选择设备
# 选择设备
device = 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'
print(f"{device}")
定义CNN模型
# 定义CNN模型类,继承自PyTorch的nn.Module
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__() # 调用父类构造函数
# 定义第一个卷积层,输入通道为3(RGB图像),输出通道为16,卷积核大小为5,步长为1,填充为2
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels=3, out_channels=16, kernel_size=5, stride=1, padding=2),
nn.ReLU(), # ReLU激活函数
nn.MaxPool2d(kernel_size=2), # 最大池化层,池化窗口大小为2
)
# 定义第二个卷积层,输入通道为16,输出通道为32,卷积核大小为5,步长为1,填充为2
self.conv2 = nn.Sequential(
nn.Conv2d(16, 32, 5, 1, 2),
nn.ReLU(), # ReLU激活函数
nn.Conv2d(32, 32, 5, 1, 2), # 另一个卷积层,增加深度但保持通道数不变
nn.ReLU(), # ReLU激活函数
nn.MaxPool2d(2), # 最大池化层,池化窗口大小为2
)
# 定义第三个卷积层,输入通道为32,输出通道为128,卷积核大小为5,步长为1,填充为2
self.conv3 = nn.Sequential(
nn.Conv2d(32, 128, 5, 1, 2),
nn.ReLU(), # ReLU激活函数
)
# 定义一个全连接层,输入特征数为128*64*64(这需要根据实际的输入图像尺寸和卷积层的输出尺寸来调整),输出特征数为20(假设有20个类别)
self.out = nn.Linear(128 * 64 * 64, 20)
def forward(self, x):
# 定义前向传播过程
x = self.conv1(x) # 通过第一个卷积层
x = self.conv2(x) # 通过第二个卷积层
x = self.conv3(x) # 通过第三个卷积层
x = x.view(x.size(0), -1) # 展平特征图,准备输入到全连接层
output = self.out(x) # 通过全连接层得到输出
return output
# 实例化模型
model = CNN()
# 将模型移动到指定的设备(如GPU或CPU)
model = model.to(device)
定义训练函数
# 定义训练函数
def train(dataloader, model, loss_fn, optimizer):
model.train()
batch_size_num = 1
for x, y in dataloader:
x, y = x.to(device), y.to(device)
pred = model(x) # 使用model(x)而不是model.forward(x)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_value = loss.item()
print(f"{loss_value} {batch_size_num}")
batch_size_num += 1
定义测试函数
# 定义测试函数
def test(dataloader, model, loss_fn):
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0, 0
with torch.no_grad():
for x, y in dataloader:
x, y = x.to(device), y.to(device)
pred = model(x) # 使用model(x)而不是model.forward(x)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(f"Test result: \n Accuracy: {(100*correct)}%, Avg loss: {test_loss}")
定义损失函数和优化器
# 定义损失函数和优化器
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
训练与测试模型
# 训练和测试模型
train(train_dataloader, model, loss_fn, optimizer)
test(test_dataloader, model, loss_fn)
训练多个周期
epochs = 10
for t in range(epochs):
print(f"epoch{t+1}\n")
train(train_dataloader, model, loss_fn, optimizer)
print("done")
test(test_dataloader, model, loss_fn)
完整代码
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
from PIL import Image
from torchvision import transforms
# 数据预处理转换
data_transforms = {
'train': transforms.Compose([
transforms.Resize([256, 256]), # 调整图像大小
transforms.ToTensor(), # 将图像转换为Tensor
]),
'valid': transforms.Compose([
transforms.Resize([256, 256]), # 调整图像大小
transforms.ToTensor(), # 将图像转换为Tensor
])
}
# 自定义数据集类
class data1(Dataset):
def __init__(self, file_path, transform=None):
self.file_path = file_path
self.imgs = []
self.labels = []
self.transform = transform
with open(self.file_path) as f:
samples = [x.strip().split(' ') for x in f.readlines()]
for img_path, label in samples:
self.imgs.append(img_path)
self.labels.append(label)
def __len__(self):
return len(self.imgs)
def __getitem__(self, idx):
image = Image.open(self.imgs[idx])
if self.transform:
image = self.transform(image)
label = self.labels[idx]
label = torch.from_numpy(np.array(label, dtype=np.int64))
return image, label
# 实例化训练和测试数据集
training_data = data1(file_path='./train.txt', transform=data_transforms['train'])
test_data = data1(file_path='./test.txt', transform=data_transforms['valid'])
# 创建数据加载器
train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)
# 选择设备
device = 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'
print(f"{device}")
# 定义CNN模型
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels=3, out_channels=16, kernel_size=5, stride=1, padding=2),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2),
)
self.conv2 = nn.Sequential(
nn.Conv2d(16, 32, 5, 1, 2),
nn.ReLU(),
nn.Conv2d(32, 32, 5, 1, 2),
nn.ReLU(),
nn.MaxPool2d(2),
)
self.conv3 = nn.Sequential(nn.Conv2d(32, 128, 5, 1, 2), nn.ReLU())
self.out = nn.Linear(128 * 64 * 64, 20) # 这里的尺寸需要根据实际情况调整
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = self.conv3(x)
x = x.view(x.size(0), -1)
output = self.out(x)
return output
# 实例化模型并移动到选择的设备
model = CNN().to(device)
# 定义训练函数
def train(dataloader, model, loss_fn, optimizer):
model.train()
batch_size_num = 1
for x, y in dataloader:
x, y = x.to(device), y.to(device)
pred = model(x) # 使用model(x)而不是model.forward(x)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_value = loss.item()
print(f"{loss_value} {batch_size_num}")
batch_size_num += 1
# 定义测试函数
def test(dataloader, model, loss_fn):
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0, 0
with torch.no_grad():
for x, y in dataloader:
x, y = x.to(device), y.to(device)
pred = model(x) # 使用model(x)而不是model.forward(x)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(f"Test result: \n Accuracy: {(100*correct)}%, Avg loss: {test_loss}")
# 定义损失函数和优化器
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 训练和测试模型
train(train_dataloader, model, loss_fn, optimizer)
test(test_dataloader, model, loss_fn)
# 训练多个周期
epochs = 10
for t in range(epochs):
print(f"epoch{t+1}\n")
train(train_dataloader, model, loss_fn, optimizer)
print("done")
test(test_dataloader, model, loss_fn)
结果
模型在训练集上的表现优于测试集,这可能是由于过拟合。训练过程中的损失逐渐减小,准确率逐渐提高。