PointNet数据预处理+网络训练
- 数据预处理
- 分类网络的训练
- 分割网络训练
- 分类和分割的结果
数据预处理
数据预处理,这里仅介绍一个shapenetdataset;
class ShapeNetDataset(data.Dataset):
def __init__(self,
root,
npoints=2500,
classification=False,
class_choice=None,
split='train',
data_augmentation=True):
self.npoints = npoints # 单个数据集的点数
self.root = root
self.catfile = os.path.join(self.root,'synsetoffset2category.txt') #各个类别的数据对应的文件夹的路径
self.cat = {}
self.data_augmentation = data_augmentation # 是否进行数据增强
self.classification = classification # 数据的种类
self.seg_classes = {}
with open(self.catfile,'r') as f:
for line in f:
ls = line.strip().split()
self.cat[ls[0]] = ls[1]
if not class_choice is None:
self.cat = {k: v for k,v in self.cat.items() if k in class_choice}
self.id2cat = {v:k for k,v in self.cat.items()}
self.meta = {}
# 读取已经分类好的数据的地址
splitfile = os.path.join(self.root,'train_test_split','shuffled_{}_file_list.json'.format(split))
filelist = json.load(open(splitfile,'r'))
for item in self.cat:
self.meta[item] = []
# 数据存储地址的转换
for file in filelist:
_,category,uuid = file.split('/')
if category in self.cat.values():
self.meta[self.id2cat[category]].append((os.path.join(self.root,category,'points',uuid+'.pts'),
os.path.join(self.root, category, 'points_label', uuid+'.seg')))
#按类别存储数据路径
self.datapath = []
for item in self.cat:
for fn in self.meta[item]:
self.datapath.append((item,fn[0],fn[1]))
self.classes = dict(zip(sorted(self.cat),range(len(self.cat))))
print(self.classes)
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)),'../misc/num_seg_classes.txt'),'r') as f:
for line in f:
ls = line.strip().split()
self.seg_classes[ls[0]] = int(ls[1])
self.num_seg_classes = self.seg_classes[list(self.cat.keys())[0]]
print(self.seg_classes,self.num_seg_classes)
#__getitem__ 方法通常用于定制类的实例对象的索引访问行为,使得类的实例可以像序列(如列表、元组)或映射(如字典)一样进行索引操作。
# 在你的代码片段中,这个方法的定义可能是为了支持类实例的索引访问,比如 instance[index] 的操作。
def __getitem__(self, index):
fn = self.datapath[index]
cls = self.classes[self.datapath[index][0]]
point_set = np.loadtxt(fn[1]).astype(np.float32)
seg = np.loadtxt((fn[2])).astype(np.int64)
choice = np.random.choice(len(seg),self.npoints,replace=True)
#resample
point_set = point_set[choice,:]
#去中心化
point_set = point_set - np.expand_dims(np.mean(point_set,axis=0),0)
#单位化
dist = np.max(np.sqrt(np.sum(point_set ** 2,axis = 1)),0)
point_set = point_set / dist
# 采用随机旋转和随机高斯抖动对数据进行数据增强
if self.data_augmentation:
theta = np.random.uniform(0,np.pi*2)
rotation_matrix = np.array([[np.cos(theta),-np.sin(theta)],[np.sin(theta),np.cos(theta)]])
point_set[:,[0,2]] = point_set[:,[0,2]].dot(rotation_matrix) # 随机旋转
point_set += np.random.normal(0,0.02,size=point_set.shape) # 生成的随机数服从均值为 0,标准差为 0.02 的正态分布
seg = seg[choice]
point_set = torch.from_numpy(point_set)
seg = torch.from_numpy(seg)
cls = torch.from_numpy(np.array([cls]).astype(np.int64))
if self.classification:
return point_set,cls
else:
return point_set,seg
def __len__(self):
return len(self.datapath)
分类网络的训练
from __future__ import print_function
import argparse
import os
import random
import torch
import torch.nn.parallel
import torch.optim as optim
import torch.utils.data
import matplotlib.pyplot as plt
from pointnet.my_dataset import ShapeNetDataset
from pointnet.my_model import PointNetCls,feature_transform_regularizer
import torch.nn.functional as F
from tqdm import tqdm
# 初始化记录变量
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []
learning_rates = []
# 创建绘图函数
def plot_metrics(train_losses, test_losses, train_accuracies, test_accuracies, learning_rates):
epochs = range(len(train_losses))
plt.figure(figsize=(12, 6))
# 绘制损失函数曲线
plt.subplot(2, 2, 1)
plt.plot(epochs, train_losses, label='Training Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Loss Curve')
# 绘制准确率曲线
plt.subplot(2, 2, 2)
plt.plot(epochs, train_accuracies, label='Training Accuracy')
plt.plot(epochs, test_accuracies, label='Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Accuracy Curve')
# 绘制学习率曲线
plt.subplot(2, 2, 3)
plt.plot(epochs, learning_rates, label='Learning Rate')
plt.xlabel('Epoch')
plt.ylabel('Learning Rate')
plt.legend()
plt.title('Learning Rate Curve')
plt.tight_layout()
# 保存图表
plt.savefig("result.png")
plt.close()
# 设置输入参数
parser = argparse.ArgumentParser()
parser.add_argument(
'--batchSize', type=int, default=32, help='input batch size')
parser.add_argument(
'--num_points', type=int, default=2500, help='input batch size')
parser.add_argument(
'--workers', type=int, help='number of data loading workers', default=4)
parser.add_argument(
'--nepoch', type=int, default=250, help='number of epochs to train for')
parser.add_argument('--outf', type=str, default='cls', help='output folder')
parser.add_argument('--model', type=str, default='', help='model path')
parser.add_argument('--dataset', type=str, required=True, help="dataset path")
parser.add_argument('--dataset_type', type=str, default='shapenet', help="dataset type shapenet|modelnet40")
parser.add_argument('--feature_transform', action='store_true', help="use feature transform")
opt = parser.parse_args()
print(opt)
blue = lambda x: '\033[94m' + x + '\033[0m'
opt.manualSeed = random.randint(1,10000)
print("Random Seed: ",opt.manualSeed)
random.seed(opt.manualSeed)
torch.manual_seed(opt.manualSeed)
# 设置数据类型,目前只写ShapeNet这一个数据集
if opt.dataset_type == 'shapenet':
dataset = ShapeNetDataset(
root = opt.dataset,
classification=True,
npoints=opt.num_points
)
test_dataset = ShapeNetDataset(
root = opt.dataset,
classification=True,
split='test',
npoints=opt.num_points,
data_augmentation=False
)
else:
exit("wrong dataset type!!!")
dataloader = torch.utils.data.DataLoader(
dataset,
batch_size=opt.batchSize,
shuffle=True,
num_workers=int(opt.workers))
testdataloader = torch.utils.data.DataLoader(
test_dataset,
batch_size=opt.batchSize,
shuffle=True,
num_workers=int(opt.workers))
print(len(dataset), len(test_dataset))
num_classes = len(dataset.classes)
print('classes', num_classes)
try:
os.makedirs(opt.outf)
except OSError:
pass
# 加载模型
classifier = PointNetCls(k=num_classes,feature_transform=opt.feature_transform)
#可以加载预训练模型
if opt.model != '':
classifier.load_state_dict(torch.load(opt.model))
#设置损失函数
optimizer = optim.Adam(classifier.parameters(),lr=0.001,betas=(0.9,0.999))
#设置激活函数
scheduler = optim.lr_scheduler.StepLR(optimizer,step_size=20,gamma=0.5)
classifier.cuda()
num_batch = len(dataset) / opt.batchSize
#按照轮次进行训练
for epoch in range(opt.nepoch):
scheduler.step()
epoch_train_loss = 0
epoch_train_correct = 0
num_batch = len(dataloader)
for i,data in enumerate(dataloader,0):
# 获取输入数据和标签
points,target = data
target = target[:,0]
# 数据预处理
points = points.transpose(2,1)
points,target = points.cuda(),target.cuda()
optimizer.zero_grad() # 梯度清零
classifier = classifier.train() # 设置分类器为训练模式
pred ,trans,trans_feat = classifier(points) # 前向传播 pred:[batchsize,classify]
# 计算损失函数
loss = F.nll_loss(pred,target.squeeze())
if opt.feature_transform:
loss += feature_transform_regularizer(trans_feat) * 0.001
# 反向传播和参数更新
loss.backward()
optimizer.step()
# 记录损失和准确率
epoch_train_loss += loss.item()
pred_choice = pred.data.max(1)[1]
correct = pred_choice.eq(target.data).cpu().sum()
epoch_train_correct += correct.item()
# 打印训练信息
print('[%d: %d/%d] train loss: %f accuracy: %f' % (
epoch, i, num_batch, loss.item(), correct.item() / float(opt.batchSize)))
# 每隔一定步数进行测试
if i % 10 == 0:
j, data = next(enumerate(testdataloader, 0))
points, target = data
target = target[:, 0]
points = points.transpose(2, 1)
points, target = points.cuda(), target.cuda()
classifier = classifier.eval()
pred, _, _ = classifier(points)
loss = F.nll_loss(pred, target)
pred_choice = pred.data.max(1)[1]
correct = pred_choice.eq(target.data).cpu().sum()
print('[%d: %d/%d] %s loss: %f accuracy: %f' % (epoch, i, num_batch, blue('test'), loss.item(), correct.item()/float(opt.batchSize)))
# 记录每个epoch的平均损失和准确率
train_losses.append(epoch_train_loss / num_batch)
train_accuracies.append(epoch_train_correct / (num_batch * opt.batchSize))
learning_rates.append(optimizer.param_groups[0]['lr'])
# 计算整个测试集的损失和准确率
classifier = classifier.eval()
epoch_test_loss = 0
epoch_test_correct = 0
with torch.no_grad():
for data in testdataloader:
points, target = data
target = target[:, 0]
points = points.transpose(2, 1)
points, target = points.cuda(), target.cuda()
pred, _, _ = classifier(points)
loss = F.nll_loss(pred, target)
epoch_test_loss += loss.item()
pred_choice = pred.data.max(1)[1]
correct = pred_choice.eq(target.data).cpu().sum()
epoch_test_correct += correct.item()
test_losses.append(epoch_test_loss / len(testdataloader))
test_accuracies.append(epoch_test_correct / (len(testdataloader) * opt.batchSize))
torch.save(classifier.state_dict(), '%s/cls_model_%d.pth' % (opt.outf, epoch))
# 绘制训练过程中的指标变化曲线
plot_metrics(train_losses, test_losses, train_accuracies, test_accuracies, learning_rates)
total_correct = 0
total_testset = 0
for i,data in tqdm(enumerate(testdataloader, 0)):
points, target = data
target = target[:, 0]
points = points.transpose(2, 1)
points, target = points.cuda(), target.cuda()
classifier = classifier.eval()
pred, _, _ = classifier(points)
pred_choice = pred.data.max(1)[1]
correct = pred_choice.eq(target.data).cpu().sum()
total_correct += correct.item()
total_testset += points.size()[0]
print("final accuracy {}".format(total_correct / float(total_testset)))
分割网络训练
from __future__ import print_function
import argparse
import os
import random
import torch.optim as optim
import torch.utils.data
import matplotlib.pyplot as plt
from pointnet.my_dataset import ShapeNetDataset
from pointnet.my_model import PointNetDenseCls,feature_transform_regularizer
import torch.nn.functional as F
from tqdm import tqdm
import numpy as np
def plot_metrics(train_losses, test_losses, train_accuracies, test_accuracies, mIOUs):
epochs = range(len(train_losses))
plt.figure(figsize=(16,12))
# 绘制损失函数曲线
plt.subplot(2, 2, 1)
plt.plot(epochs, train_losses, label='Training Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Loss Curve')
# 绘制准确率曲线
plt.subplot(2, 2, 2)
plt.plot(epochs, train_accuracies, label='Training Accuracy')
plt.plot(epochs, test_accuracies, label='Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Accuracy Curve')
# 绘制mIOU曲线
plt.subplot(2, 2, 3)
plt.plot(epochs, mIOUs, label='mIOUs')
plt.xlabel('Epoch')
plt.ylabel('mIOU')
plt.legend()
plt.title('mIOUs')
# 标注mIOU的最大值和最小值
max_mIOU = np.max(mIOUs)
plt.annotate(f'Max mIOU: {max_mIOU:.2f}', xy=(np.argmax(mIOUs), max_mIOU), xytext=(10, -20),
textcoords='offset points', arrowprops=dict(arrowstyle='->', color='blue'), fontsize=10)
plt.tight_layout()
# 保存图表
plt.savefig("seg_result.png")
plt.close()
parser = argparse.ArgumentParser()
parser.add_argument(
'--batchSize', type=int, default=32, help='input batch size')
parser.add_argument(
'--workers', type=int, help='number of data loading workers', default=4)
parser.add_argument(
'--nepoch', type=int, default=25, help='number of epochs to train for')
parser.add_argument('--outf', type=str, default='seg', help='output folder')
parser.add_argument('--model', type=str, default='', help='model path')
parser.add_argument('--dataset', type=str, required=True, help="dataset path")
parser.add_argument('--class_choice', type=str, default='Chair', help="class_choice")
parser.add_argument('--feature_transform', action='store_true', help="use feature transform")
opt = parser.parse_args()
print(opt)
opt.manualSeed = random.randint(1,10000)
print("Random Seed: ",opt.manualSeed)
random.seed(opt.manualSeed)
torch.manual_seed(opt.manualSeed)
dataset = ShapeNetDataset(
root=opt.dataset,
classification=False,
class_choice=[opt.class_choice]
)
dataloader = torch.utils.data.DataLoader(
dataset,
batch_size=opt.batchSize,
shuffle=True,
num_workers=int(opt.workers)
)
testset = ShapeNetDataset(
root=opt.dataset,
classification=False,
class_choice=[opt.class_choice],
split='test',
data_augmentation=False
)
testdataloader = torch.utils.data.DataLoader(
testset,
batch_size=opt.batchSize,
shuffle=True,
num_workers=int(opt.workers)
)
print(len(dataset), len(testset))
num_classes = dataset.num_seg_classes
print('classes', num_classes)
try:
os.makedirs(opt.outf)
except OSError:
pass
blue = lambda x: '\033[94m' + x + '\033[0m'
classifier = PointNetDenseCls(k=num_classes,feature_transform=opt.feature_transform)
if opt.model != '':
classifier.load_state_dict(torch.load(opt.model))
optimizer = optim.Adam(classifier.parameters(),lr=0.001,betas=(0.9,0.999))
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)
classifier.cuda()
num_batch = len(dataset) / opt.batchSize
# Lists to store data for plotting
train_losses = []
train_accuracies = []
test_losses = []
test_accuracies = []
mious = []
for epoch in range(opt.nepoch):
scheduler.step()
epoch_train_loss = 0
epoch_train_correct = 0
for i, data in enumerate(dataloader, 0):
points, target = data
points = points.transpose(2, 1)
points, target = points.cuda(), target.cuda()
optimizer.zero_grad()
classifier = classifier.train()
pred, trans, trans_feat = classifier(points)
pred = pred.view(-1, num_classes)
target = target.view(-1, 1)[:, 0] - 1
#print(pred.size(), target.size())
loss = F.nll_loss(pred, target)
if opt.feature_transform:
loss += feature_transform_regularizer(trans_feat) * 0.001
loss.backward()
optimizer.step()
pred_choice = pred.data.max(1)[1]
correct = pred_choice.eq(target.data).cpu().sum()
print('[%d: %d/%d] train loss: %f accuracy: %f' % (epoch, i, num_batch, loss.item(), correct.item()/float(opt.batchSize * 2500)))
# Append training loss and accuracy for plotting
epoch_train_loss += loss.item()
epoch_train_correct += correct.item()
# 计算整个测试集的损失和准确率
epoch_test_loss = 0
epoch_test_correct = 0
with torch.no_grad():
for data in testdataloader:
points,target = data
points = points.transpose(2, 1)
points, target = points.cuda(), target.cuda()
classifier = classifier.eval()
pred, _, _ = classifier(points)
pred = pred.view(-1, num_classes)
target = target.view(-1, 1)[:, 0] - 1
loss = F.nll_loss(pred, target)
pred_choice = pred.data.max(1)[1]
correct = pred_choice.eq(target.data).cpu().sum()
epoch_test_correct += correct.item()
epoch_test_loss += loss.item()
test_losses.append(epoch_test_loss / len(testdataloader))
test_accuracies.append(epoch_test_correct / (len(testdataloader) * opt.batchSize * 2500))
# 记录每个epoch的平均损失和准确率
train_losses.append(epoch_train_loss / num_batch)
train_accuracies.append(epoch_train_correct / (num_batch * opt.batchSize * 2500))
print('[%d] %s loss: %f accuracy: %f' % (
epoch, blue('test'), epoch_test_loss / len(testdataloader), epoch_test_correct / (len(testdataloader) * opt.batchSize * 2500)))
torch.save(classifier.state_dict(), '%s/seg_model_%s_%d.pth' % (opt.outf, opt.class_choice, epoch))
## benchmark mIOU
shape_ious = []
for i,data in tqdm(enumerate(testdataloader, 0)):
points, target = data
points = points.transpose(2, 1)
points, target = points.cuda(), target.cuda()
classifier = classifier.eval()
pred, _, _ = classifier(points)
pred_choice = pred.data.max(2)[1]
pred_np = pred_choice.cpu().data.numpy()
target_np = target.cpu().data.numpy() - 1
for shape_idx in range(target_np.shape[0]):
parts = range(num_classes)#np.unique(target_np[shape_idx])
part_ious = []
for part in parts:
I = np.sum(np.logical_and(pred_np[shape_idx] == part, target_np[shape_idx] == part))
U = np.sum(np.logical_or(pred_np[shape_idx] == part, target_np[shape_idx] == part))
if U == 0:
iou = 1 #If the union of groundtruth and prediction points is empty, then count part IoU as 1
else:
iou = I / float(U)
part_ious.append(iou)
shape_ious.append(np.mean(part_ious))
mious.append(np.mean(shape_ious))
print("mIOU for class {}: {}".format(opt.class_choice, np.mean(shape_ious)))
# Save the data to a txt file
with open('plot_data.txt', 'w') as f:
f.write('Train Losses:\n')
f.write(','.join(map(str, train_losses)) + '\n')
f.write('Train Accuracies:\n')
f.write(','.join(map(str, train_accuracies)) + '\n')
f.write('Test Losses:\n')
f.write(','.join(map(str, test_losses)) + '\n')
f.write('Test Accuracies:\n')
f.write(','.join(map(str, test_accuracies)) + '\n')
f.write('mIOUs:\n')
f.write(','.join(map(str, mious)) + '\n')
f.close()
plot_metrics(train_losses,test_losses,train_accuracies,test_accuracies,mious)
分类和分割的结果
shapenet分割结果
shapenet分类结果