赛题背景:
其实总结起来就是一句话,这个项目是基于目前的深度伪装技术,就是通过大量人脸的原数据集进行模型训练之后,能够生成伪造的人脸视频。这项目就是教我们如何去实现这个DeepFake技术。
Task1:了解Deepfake和跑通baseline
代码架构如下:
-
模型定义:使用
timm
库创建一个预训练的resnet18
模型。 -
训练/验证数据加载:使用
torch.utils.data.DataLoader
来加载训练集和验证集数据,并通过定义的transforms进行数据增强。 -
训练与验证过程:
-
定义了
train
函数来执行模型在一个epoch上的训练过程,包括前向传播、损失计算、反向传播和参数更新。 -
定义了
validate
函数来评估模型在验证集上的性能,计算准确率。
-
-
性能评估:使用准确率(Accuracy)作为性能评估的主要指标,并在每个epoch后输出验证集上的准确率。
-
提交:最后,将预测结果保存到CSV文件中,准备提交到Kaggle比赛。
代码解释如下:
详见代码注释吧
这份代码后续还是要好好精读理解一下的吧,好好分析一下,顺便提升一下代码能力。--7.15
from PIL import Image
Image.open('/kaggle/input/deepfake/phase1/trainset/63fee8a89581307c0b4fd05a48e0ff79.jpg')
import torch
torch.manual_seed(0)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True
import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data.dataset import Dataset
import timm
import time
import pandas as pd
import numpy as np
import cv2
from PIL import Image
from tqdm import tqdm_notebook
train_label = pd.read_csv('/kaggle/input/deepfake/phase1/trainset_label.txt')
val_label = pd.read_csv('/kaggle/input/deepfake/phase1/valset_label.txt')
train_label['path'] = '/kaggle/input/deepfake/phase1/trainset/' + train_label['img_name']
val_label['path'] = '/kaggle/input/deepfake/phase1/valset/' + val_label['img_name']
train_label['target'].value_counts()
val_label['target'].value_counts()
train_label.head(10)
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self, name, fmt=':f'):
self.name = name
self.fmt = fmt
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def __str__(self):
fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
return fmtstr.format(**self.__dict__)
class ProgressMeter(object):
def __init__(self, num_batches, *meters):
self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
self.meters = meters
self.prefix = ""
def pr2int(self, batch):
entries = [self.prefix + self.batch_fmtstr.format(batch)]
entries += [str(meter) for meter in self.meters]
print('\t'.join(entries))
def _get_batch_fmtstr(self, num_batches):
num_digits = len(str(num_batches // 1))
fmt = '{:' + str(num_digits) + 'd}'
return '[' + fmt + '/' + fmt.format(num_batches) + ']'
def validate(val_loader, model, criterion):
batch_time = AverageMeter('Time', ':6.3f')
losses = AverageMeter('Loss', ':.4e')
top1 = AverageMeter('Acc@1', ':6.2f')
progress = ProgressMeter(len(val_loader), batch_time, losses, top1)
# switch to evaluate mode
model.eval()
with torch.no_grad():
end = time.time()
for i, (input, target) in tqdm_notebook(enumerate(val_loader), total=len(val_loader)):
input = input.cuda()
target = target.cuda()
# compute output
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss
acc = (output.argmax(1).view(-1) == target.float().view(-1)).float().mean() * 100
losses.update(loss.item(), input.size(0))
top1.update(acc, input.size(0))
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
# TODO: this should also be done with the ProgressMeter
print(' * Acc@1 {top1.avg:.3f}'
.format(top1=top1))
return top1
def predict(test_loader, model, tta=10):
# switch to evaluate mode
model.eval()
test_pred_tta = None
for _ in range(tta):
test_pred = []
with torch.no_grad():
end = time.time()
for i, (input, target) in tqdm_notebook(enumerate(test_loader), total=len(test_loader)):
input = input.cuda()
target = target.cuda()
# compute output
output = model(input)
output = F.softmax(output, dim=1)
output = output.data.cpu().numpy()
test_pred.append(output)
test_pred = np.vstack(test_pred)
if test_pred_tta is None:
test_pred_tta = test_pred
else:
test_pred_tta += test_pred
return test_pred_tta
def train(train_loader, model, criterion, optimizer, epoch):
batch_time = AverageMeter('Time', ':6.3f')
losses = AverageMeter('Loss', ':.4e')
top1 = AverageMeter('Acc@1', ':6.2f')
progress = ProgressMeter(len(train_loader), batch_time, losses, top1)
# switch to train mode
model.train()
end = time.time()
for i, (input, target) in enumerate(train_loader):
input = input.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
# compute output
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss
losses.update(loss.item(), input.size(0))
acc = (output.argmax(1).view(-1) == target.float().view(-1)).float().mean() * 100
top1.update(acc, input.size(0))
# compute gradient and do SGD step
optimizer.zero_grad()
loss.backward()
optimizer.step()
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
if i % 100 == 0:
progress.pr2int(i)
class FFDIDataset(Dataset):
def __init__(self, img_path, img_label, transform=None):
self.img_path = img_path
self.img_label = img_label
if transform is not None:
self.transform = transform
else:
self.transform = None
def __getitem__(self, index):
img = Image.open(self.img_path[index]).convert('RGB')
if self.transform is not None:
img = self.transform(img)
return img, torch.from_numpy(np.array(self.img_label[index]))
def __len__(self):
return len(self.img_path)
import timm
model = timm.create_model('resnet18', pretrained=True, num_classes=2)
model = model.cuda()
train_loader = torch.utils.data.DataLoader(
FFDIDataset(train_label['path'].head(1000), train_label['target'].head(1000),
transforms.Compose([
transforms.Resize((256, 256)),
transforms.RandomHorizontalFlip(),
transforms.RandomVerticalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
), batch_size=40, shuffle=True, num_workers=4, pin_memory=True
)
val_loader = torch.utils.data.DataLoader(
FFDIDataset(val_label['path'].head(1000), val_label['target'].head(1000),
transforms.Compose([
transforms.Resize((256, 256)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
), batch_size=40, shuffle=False, num_workers=4, pin_memory=True
)
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.Adam(model.parameters(), 0.005)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.85)
best_acc = 0.0
for epoch in range(2):
scheduler.step()
print('Epoch: ', epoch)
train(train_loader, model, criterion, optimizer, epoch)
val_acc = validate(val_loader, model, criterion)
if val_acc.avg.item() > best_acc:
best_acc = round(val_acc.avg.item(), 2)
torch.save(model.state_dict(), f'./model_{best_acc}.pt')
test_loader = torch.utils.data.DataLoader(
FFDIDataset(val_label['path'], val_label['target'],
transforms.Compose([
transforms.Resize((256, 256)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
), batch_size=40, shuffle=False, num_workers=4, pin_memory=True
)
val_label['y_pred'] = predict(test_loader, model, 1)[:, 1]
val_label[['img_name', 'y_pred']].to_csv('submit.csv', index=None)
本来是想直接在本地运行的,但是这个数据集实在是太大了,受限于操作和设备,只能在kaggle云运行这个代码咯,结果如下:
提交上kaggle进行评分:
Inclusion・The Global Multimedia Deepfake Detection | Kaggle
结果挺差的,毕竟这就是个普通的原始代码,啥都没优化的,参数也没调,只能先这样咯,后续task再来优化调整咔咔上分吧。