单卡实现
本文主要从两个方面进行展开:
1.将两个或多个dataset组合成pytorch中的一个ConcatDataset
.这个dataset将会作为pytorch中Dataloader
的输入。
2.覆盖重写RandomSampler
修改batch产生过程,以确保在第一个batch中产生第一个任务的数据(image
),在第二个batch中产生下一个任务的数据(video
)。
下述定义了一个BatchSchedulerSampler
类,实现了一个新的sampler iterator
。首先,通过为每一个单独的dataset创建RandomSampler;接着,在每一个dataset iter
中获取对应的sample index
;最后,创建新的sample index list
。
import math
import torch
from torch.utils.data.sampler import RandomSampler
from torch.utils.data.dataset import ConcatDataset
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from dataset.K600 import *
class BatchSchedulerSampler(torch.utils.data.sampler.Sampler):
"""
iterate over tasks and provide a random batch per task in each mini-batch
"""
def __init__(self, dataset, batch_size):
self.dataset = dataset
self.batch_size = batch_size
self.number_of_datasets = len(dataset.datasets)
self.largest_dataset_size = max([len(cur_dataset.samples) for cur_dataset in dataset.datasets])
def __len__(self):
return self.batch_size * math.ceil(self.largest_dataset_size / self.batch_size) * len(self.dataset.datasets)
def __iter__(self):
samplers_list = []
sampler_iterators = []
for dataset_idx in range(self.number_of_datasets):
cur_dataset = self.dataset.datasets[dataset_idx]
sampler = RandomSampler(cur_dataset)
samplers_list.append(sampler)
cur_sampler_iterator = sampler.__iter__()
sampler_iterators.append(cur_sampler_iterator)
push_index_val = [0] + self.dataset.cumulative_sizes[:-1]
step = self.batch_size * self.number_of_datasets
samples_to_grab = self.batch_size
# for this case we want to get all samples in dataset, this force us to resample from the smaller datasets
epoch_samples = self.largest_dataset_size * self.number_of_datasets
final_samples_list = [] # this is a list of indexes from the combined dataset
for _ in range(0, epoch_samples, step):
for i in range(self.number_of_datasets):
cur_batch_sampler = sampler_iterators[i]
cur_samples = []
for _ in range(samples_to_grab):
try:
cur_sample_org = cur_batch_sampler.__next__()
cur_sample = cur_sample_org + push_index_val[i]
cur_samples.append(cur_sample)
except StopIteration:
# got to the end of iterator - restart the iterator and continue to get samples
# until reaching "epoch_samples"
sampler_iterators[i] = samplers_list[i].__iter__()
cur_batch_sampler = sampler_iterators[i]
cur_sample_org = cur_batch_sampler.__next__()
cur_sample = cur_sample_org + push_index_val[i]
cur_samples.append(cur_sample)
final_samples_list.extend(cur_samples)
return iter(final_samples_list)
if __name__ == "__main__":
image_dataset = ImageFolder(root='/mnt/workspace/data/imagenet/data/newtrain', transform=transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
]))
video_dataset = VideoFolder(root='/mnt/workspace/data/k600/train_videos', transform=transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
]))
joint_dataset = ConcatDataset([image_dataset, video_dataset])
batch_size = 8
dataloader = torch.utils.data.DataLoader(dataset=joint_dataset,
sampler=BatchSchedulerSampler(dataset=joint_dataset, batch_size=batch_size),
batch_size=batch_size, shuffle=False)
num_epochs = 1
for epoch in range(num_epochs):
for inputs, labels in dataloader:
print(inputs.shape)
'''
torch.Size([8, 3, 224, 224])
torch.Size([8, 3, 16, 224, 224])
torch.Size([8, 3, 224, 224])
torch.Size([8, 3, 16, 224, 224])
'''
DDP多卡实现
在多卡训练中使用分布式数据并行(DDP)时,你需要重写 DistributedSampler
而不是 RandomSampler
,以确保每个进程都能正确地获取数据子集。以下是如何实现一个 BatchSchedulerDistributedSampler
类来支持多卡训练的示例