1. 引言
对于许多新接触深度学习爱好者来说,玩AutoEncoder总是很有趣的,因为它具有简单的处理逻辑、简易的网络架构,方便可视化潜在的特征空间。在本文中,我将从头开始介绍一个简单的AutoEncoder模型,以及一些可视化潜在特征空间的一些的方法,以便使本文变得生动有趣。
闲话少说,我们直接开始吧!
2. 数据集介绍
在本文中,我们使用FashionMNIST
数据集来完成此任务。
以下是Kaggle上数据集的链接:戳我。
该数据集已在torchvision库中集成;我们可以通过几行代码直接导入和处理该数据集。
为此,首先需要是编写一个collate_fn
函数,将数据集从PIL图像转换为torch张量,并进行相应的pad
操作:
# This function convert the PIL images to tensors then pad them
def collate_fn(batch):
process = transforms.Compose([
transforms.ToTensor(),
transforms.Pad([2])]
)
# x - images; we process each image in the batch
x = [process(data[0]) for data in batch]
x = torch.concat(x).unsqueeze(1)
# y - labels, note that we should convert the labels to LongTensor
y = torch.LongTensor([data[1] for data in batch])
return x, y
3. 实现DataLoader
接着,我们就可以使用以下代码来完成相应的DataLoader的实现:
labels = ["T-shirt/top", "Trouser", "Pullover", "Dress","Coat",
"Sandla", "Shirt", "Sneaker", "Bag", "Ankle boot"]
# download/load dataset
train_data = FashionMNIST("./MNIST_DATA", train=True, download=True)
valid_data = FashionMNIST("./MNIST_DATA", train=False, download=True)
# put datasets into dataloaders
train_loader = DataLoader(train_data, batch_size=config["batch_size"],
shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_data, batch_size=config["batch_size"],
shuffle=False, collate_fn=collate_fn)
接着我们可以使用以下代码来检验上述代码是否符合我们的预期,测试代码如下:
print("Inspecting train data: ")
for _, data in enumerate(train_loader):
print("Batch shape: ", data[0].shape)
fig, ax = plt.subplots(1, 4, figsize=(10, 4))
for i in range(4):
# Ture 3D tensor to 2D tensor due to image's single channel
ax[i].imshow(data[0][i].squeeze(), cmap="gray")
ax[i].axis("off")
ax[i].set_title(labels[data[1][i]])
plt.show()
# And don't forget to break
break
运行结果如下:
观察上图,图像和标签一一对应关系正常,接着我们就可以进入我们的网络设计部分。
4. 实现encoder
我们知道自编码器是由编码器encoder和解码器decoder实现的,其中编码器的作用为将输入的图像编码为特征空间的特征向量,解码器的作用相反,尽可能的将上述特征向量结果恢复为原图。基于此,我们首先来一步步实现编码器。首先,我们来定义模型的基本超参数如下:
# Model parameters:
LAYERS = 3
KERNELS = [3, 3, 3]
CHANNELS = [32, 64, 128]
STRIDES = [2, 2, 2]
LINEAR_DIM = 2048
同时相应的编码器的网络结构设计如下:
class Encoder(nn.Module):
def __init__(self, output_dim=2, use_batchnorm=False, use_dropout=False):
super(Encoder, self).__init__()
# bottleneck dimentionality
self.output_dim = output_dim
# variables deciding if using dropout and batchnorm in model
self.use_dropout = use_dropout
self.use_batchnorm = use_batchnorm
# convolutional layer hyper parameters
self.layers = LAYERS
self.kernels = KERNELS
self.channels = CHANNELS
self.strides = STRIDES
self.conv = self.get_convs()
# layers for latent space projection
self.fc_dim = LINEAR_DIM
self.flatten = nn.Flatten()
self.linear = nn.Linear(self.fc_dim, self.output_dim)
def get_convs(self):
"""
generating convolutional layers based on model's hyper parameters
"""
conv_layers = nn.Sequential()
for i in range(self.layers):
# The input channel of the first layer is 1
if i == 0: conv_layers.append(nn.Conv2d(1,
self.channels[i],
kernel_size=self.kernels[i],
stride=self.strides[i],
padding=1))
else: conv_layers.append(nn.Conv2d(self.channels[i-1],
self.channels[i],
kernel_size=self.kernels[i],
stride=self.strides[i],
padding=1))
if self.use_batchnorm:
conv_layers.append(nn.BatchNorm2d(self.channels[i]))
# Here we use GELU as activation function
conv_layers.append(nn.GELU())
if self.use_dropout:
conv_layers.append(nn.Dropout2d(0.15))
return conv_layers
def forward(self, x):
x = self.conv(x)
x = self.flatten(x)
return self.linear(x)
在Pytorch中torchsummary
是一个非常方便的工具,用于检查和调试模型的网络结构;我们可以检查层、每层中的张量形状以及模型的参数。代码如下:
from torchsummary import summary
# Get the summary of autoencoder architecture
encoder = Encoder(use_batchnorm=True, use_dropout=True).to(DEVICE)
summary(encoder, (1, 32, 32))
pass
得到输出如下:
5. 实现decoder
在我们的例子中,解码器层decoder
是编码器的反向操作;确保每一层的输入和输出形状是很重要的。此外,我们应该调整转置卷积层中的padding
和output_pading
参数,以确保输出图像和输入图像的维度相同。代码实现如下:
class Decoder(nn.Module):
def __init__(self, input_dim=2, use_batchnorm=False, use_dropout=False):
super(Decoder, self).__init__()
# variables deciding if using dropout and batchnorm in model
self.use_dropout = use_dropout
self.use_batchnorm = use_batchnorm
self.fc_dim = LINEAR_DIM
self.input_dim = input_dim
# Conv layer hypyer parameters
self.layers = LAYERS
self.kernels = KERNELS
self.channels = CHANNELS[::-1] # flip the channel dimensions
self.strides = STRIDES
# In decoder, we first do fc project, then conv layers
self.linear = nn.Linear(self.input_dim, self.fc_dim)
self.conv = self.get_convs()
self.output = nn.Conv2d(self.channels[-1], 1, kernel_size=1, stride=1)
def get_convs(self):
conv_layers = nn.Sequential()
for i in range(self.layers):
if i == 0: conv_layers.append(
nn.ConvTranspose2d(self.channels[i],
self.channels[i],
kernel_size=self.kernels[i],
stride=self.strides[i],
padding=1,
output_padding=1)
)
else: conv_layers.append(
nn.ConvTranspose2d(self.channels[i-1],
self.channels[i],
kernel_size=self.kernels[i],
stride=self.strides[i],
padding=1,
output_padding=1
)
)
if self.use_batchnorm and i != self.layers - 1:
conv_layers.append(nn.BatchNorm2d(self.channels[i]))
conv_layers.append(nn.GELU())
if self.use_dropout:
conv_layers.append(nn.Dropout2d(0.15))
return conv_layers
def forward(self, x):
x = self.linear(x)
# reshape 3D tensor to 4D tensor
x = x.reshape(x.shape[0], 128, 4, 4)
x = self.conv(x)
return self.output(x)
相应的解码器实现如下:
decoder = Decoder(use_batchnorm=True, use_dropout=True).to(DEVICE)
summary(decoder, (1, 2))
pass
运行后,结果如下:
6. 实现自编码器
接着,我们将上述编码器和解码器串联起来,代码实现如下:
class AutoEncoder(nn.Module):
def __init__(self):
super(AutoEncoder, self).__init__()
self.encoder = Encoder(output_dim=2, use_batchnorm=True, use_dropout=False)
self.decoder = Decoder(input_dim=2, use_batchnorm=True, use_dropout=False)
def forward(self, x):
return self.decoder(self.encoder(x))
model = AutoEncoder().to(DEVICE)
summary(model, (1, 32, 32))
pass
得到结果如下:
7. 可视化函数
在进入训练部分之前,让我们花一些时间编写一个函数来可视化我们模型的潜在特征空间,即编码后二维特征向量的可视化表示。
def plotting(step:int=0, show=False):
model.eval() # Switch the model to evaluation mode
points = []
label_idcs = []
path = "./ScatterPlots"
if not os.path.exists(path): os.mkdir(path)
for i, data in enumerate(valid_loader):
img, label = [d.to(DEVICE) for d in data]
# We only need to encode the validation images
proj = model.encoder(img)
points.extend(proj.detach().cpu().numpy())
label_idcs.extend(label.detach().cpu().numpy())
del img, label
points = np.array(points)
# Creating a scatter plot
fig, ax = plt.subplots(figsize=(10, 10) if not show else (8, 8))
scatter = ax.scatter(x=points[:, 0], y=points[:, 1], s=2.0,
c=label_idcs, cmap='tab10', alpha=0.9, zorder=2)
ax.spines["right"].set_visible(False)
ax.spines["top"].set_visible(False)
if show:
ax.grid(True, color="lightgray", alpha=1.0, zorder=0)
plt.show()
else:
# Do not show but only save the plot in training
plt.savefig(f"{path}/Step_{step:03d}.png", bbox_inches="tight")
plt.close() # don't forget to close the plot, or it is always in memory
model.train()
以下是训练过程中生成的图;该过程显示了模型的潜在空间随时间的分布,可以看出尽管有个别离群点,整体不同类别的数据在特征空间呈现出聚类趋势:
8. 损失函数
在编写训练和验证函数之前,还有一个步骤是定义目标函数和优化方法。由于自动编码器是一个自监督模型,输入也是网络输出重建图像逼近的对象,因此我们可以使用MSE(均方误差)损失来评估输入和重建图像之间的逐像素损失。当然有很多优化器可供选择,这里我选择的是AdamW,因为我在过去几个月里经常使用它。
criterion = nn.MSELoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"], weight_decay=1e-5)
# For mixed precision training
scaler = torch.cuda.amp.GradScaler()
steps = 0 # tracking the training steps
9. 训练函数
接着我们来定义训练一个epoch
的函数,代码实现如下:
def train(model, dataloader, criterion, optimizer, save_distrib=False):
# steps is used to track training progress, purely for latent space plots
global steps
model.train()
train_loss = 0.0
# Process tqdm bar, helpful for monitoring training process
batch_bar = tqdm(total=len(dataloader), dynamic_ncols=True,
leave=False, position=0, desc="Train")
for i, batch in enumerate(dataloader):
optimizer.zero_grad()
x = batch[0].to(DEVICE)
# Here we implement the mixed precision training
with torch.cuda.amp.autocast():
y_recons = model(x)
loss = criterion(y_recons, x)
train_loss += loss.item()
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
batch_bar.set_postfix(
loss=f"{train_loss/(i+1):.4f}",
lr = f"{optimizer.param_groups[0]['lr']:.4f}"
)
batch_bar.update()
# Saving latent space plots
if steps % 10 == 0 and save_distrib and steps <= 400: plotting(steps)
steps += 1
# remove unnecessary cache in CUDA memory
torch.cuda.empty_cache()
del x, y_recons
batch_bar.close()
train_loss /= len(dataloader)
return train_loss
10 验证函数
相应的验证函数的实现稍微简单一点,代码如下:
def validate(model, dataloader, criterion):
model.eval() # Don't forget to turn the model to eval mode
valid_loss = 0.0
# Progress tqdm bar
batch_bar = tqdm(total=len(dataloader), dynamic_ncols=True,
leave=False, position=0, desc="Validation")
for i, batch in enumerate(dataloader):
x = batch[0].to(DEVICE)
with torch.no_grad(): # we don't need gradients in validation
y_recons = model(x)
loss = criterion(y_recons, x)
valid_loss += loss.item()
batch_bar.set_postfix(
loss=f"{valid_loss/(i+1):.4f}",
lr = f"{optimizer.param_groups[0]['lr']:.4f}"
)
batch_bar.update()
torch.cuda.empty_cache()
del x, y_recons
batch_bar.close()
valid_loss /= len(dataloader)
return valid_loss
11 训练过程
接着,我们将上述代码串起来,来实现我们模型的训练,由于FashionMNIST是一个很小的数据集,我们实际上不需要大量训练;初始训练和验证损失非常低,并且在三个epoch
之后没有太大的改进空间。
for i in range(config["epochs"]):
curr_lr = float(optimizer.param_groups[0]["lr"])
train_loss = train(model, train_loader, criterion,
optimizer, save_distrib=True)
valid_loss = validate(model, valid_loader, criterion)
print(f"Epoch {i+1}/{config['epochs']}\nTrain loss: {train_loss:.4f}\t Validation loss: {valid_loss:.4f}\tlr: {curr_lr:.4f}")
输出如下:
12 结果可视化
我们现在可以再次绘制和检查收敛后的特征空间,可视化输出如下:
观察上图可知,相应的聚类后的效果比训练过程中的要好,但有些个别类混合在同一集群中。这个问题可以通过增加编码器输出的特征向量的维度或使用其他损失函数函数来解决。
13 预测效果可视化
为了验证我们的解码器确实学到了东西,我们可以在随机绘制一些离散点来观察解码器重建图像的效果,代码如下:
# randomly sample x and y values
xs = [random.uniform(-6.0, 8.0) for i in range(8)]
ys = [random.uniform(-7.5, 10.0) for i in range(8)]
points = list(zip(xs, ys))
coords = torch.tensor(points).unsqueeze(1).to(DEVICE)
nrows, ncols = 2, 4
fig, axes = plt.subplots(nrows, ncols, figsize=(10, 5))
model.eval()
with torch.no_grad():
generates = [model.decoder(coord) for coord in coords]
# plot points
idx = 0
for row in range(0, nrows):
for col in range(0, ncols):
ax = axes[row, col]
im = generates[idx].squeeze().detach().cpu()
ax.imshow(im, cmap="gray")
ax.axis("off")
coord = coords[idx].detach().cpu().numpy()[0]
ax.set_title(f"({coord[0]:.3f}, {coord[1]:.3f})")
idx += 1
plt.show()
代码输出如下:
14. 总结
本文重点介绍了如何利用Pytorch来实现自编码器,从数据集,到搭建网络结构,以及特征可视化和网络预测输出几个方面,分别进行了详细的阐述,并给出了相应的代码示例。
您学废了吗?
完整代码链接:戳我