Diffusion Models视频生成
前言:目前开源的DiT视频生成模型不是很多,Open-Sora是开发者生态最好的一个,涵盖了DiT、时空DiT、3D VAE、Rectified Flow、因果卷积等Diffusion视频生成的经典知识点。本篇博客从Open-Sora的代码出发,深入解读背后的原理。
目录
3D VAE原理
代码剖析
2D VAE
时间VAE
因果3D卷积
3D VAE原理
之前绝大多数都是2D VAE,特别是SDXL的VAE相当好用,很多人都拿来直接用了。但是在DiT-based的模型中,时间序列上如果再不做压缩的话,就已经很难训得动了。因此非常有必要在时间序列上进行压缩,3D VAE应运而生。
Open-Sora的方案是在2D VAE的基础上,再添加一个时间VAE,相比于EasyAnimate 和 CogVideoX的方案的Full Attention 存在劣势,但是可以充分利用到2D VAE的权重,成本更低。
代码剖析
2D VAE
来自华为pixart sdxl vae:
vae_2d = dict(
type="VideoAutoencoderKL",
from_pretrained="PixArt-alpha/pixart_sigma_sdxlvae_T5_diffusers",
subfolder="vae",
micro_batch_size=micro_batch_size,
local_files_only=local_files_only,
)
时间VAE
vae_temporal = dict(
type="VAE_Temporal_SD",
from_pretrained=None,
)
@MODELS.register_module()
class VAE_Temporal(nn.Module):
def __init__(
self,
in_out_channels=4,
latent_embed_dim=4,
embed_dim=4,
filters=128,
num_res_blocks=4,
channel_multipliers=(1, 2, 2, 4),
temporal_downsample=(True, True, False),
num_groups=32, # for nn.GroupNorm
activation_fn="swish",
):
super().__init__()
self.time_downsample_factor = 2 ** sum(temporal_downsample)
# self.time_padding = self.time_downsample_factor - 1
self.patch_size = (self.time_downsample_factor, 1, 1)
self.out_channels = in_out_channels
# NOTE: following MAGVIT, conv in bias=False in encoder first conv
self.encoder = Encoder(
in_out_channels=in_out_channels,
latent_embed_dim=latent_embed_dim * 2,
filters=filters,
num_res_blocks=num_res_blocks,
channel_multipliers=channel_multipliers,
temporal_downsample=temporal_downsample,
num_groups=num_groups, # for nn.GroupNorm
activation_fn=activation_fn,
)
self.quant_conv = CausalConv3d(2 * latent_embed_dim, 2 * embed_dim, 1)
self.post_quant_conv = CausalConv3d(embed_dim, latent_embed_dim, 1)
self.decoder = Decoder(
in_out_channels=in_out_channels,
latent_embed_dim=latent_embed_dim,
filters=filters,
num_res_blocks=num_res_blocks,
channel_multipliers=channel_multipliers,
temporal_downsample=temporal_downsample,
num_groups=num_groups, # for nn.GroupNorm
activation_fn=activation_fn,
)
def get_latent_size(self, input_size):
latent_size = []
for i in range(3):
if input_size[i] is None:
lsize = None
elif i == 0:
time_padding = (
0
if (input_size[i] % self.time_downsample_factor == 0)
else self.time_downsample_factor - input_size[i] % self.time_downsample_factor
)
lsize = (input_size[i] + time_padding) // self.patch_size[i]
else:
lsize = input_size[i] // self.patch_size[i]
latent_size.append(lsize)
return latent_size
def encode(self, x):
time_padding = (
0
if (x.shape[2] % self.time_downsample_factor == 0)
else self.time_downsample_factor - x.shape[2] % self.time_downsample_factor
)
x = pad_at_dim(x, (time_padding, 0), dim=2)
encoded_feature = self.encoder(x)
moments = self.quant_conv(encoded_feature).to(x.dtype)
posterior = DiagonalGaussianDistribution(moments)
return posterior
def decode(self, z, num_frames=None):
time_padding = (
0
if (num_frames % self.time_downsample_factor == 0)
else self.time_downsample_factor - num_frames % self.time_downsample_factor
)
z = self.post_quant_conv(z)
x = self.decoder(z)
x = x[:, :, time_padding:]
return x
def forward(self, x, sample_posterior=True):
posterior = self.encode(x)
if sample_posterior:
z = posterior.sample()
else:
z = posterior.mode()
recon_video = self.decode(z, num_frames=x.shape[2])
return recon_video, posterior, z
因果3D卷积
class CausalConv3d(nn.Module):
def __init__(
self,
chan_in,
chan_out,
kernel_size: Union[int, Tuple[int, int, int]],
pad_mode="constant",
strides=None, # allow custom stride
**kwargs,
):
super().__init__()
kernel_size = cast_tuple(kernel_size, 3)
time_kernel_size, height_kernel_size, width_kernel_size = kernel_size
assert is_odd(height_kernel_size) and is_odd(width_kernel_size)
dilation = kwargs.pop("dilation", 1)
stride = strides[0] if strides is not None else kwargs.pop("stride", 1)
self.pad_mode = pad_mode
time_pad = dilation * (time_kernel_size - 1) + (1 - stride)
height_pad = height_kernel_size // 2
width_pad = width_kernel_size // 2
self.time_pad = time_pad
self.time_causal_padding = (width_pad, width_pad, height_pad, height_pad, time_pad, 0)
stride = strides if strides is not None else (stride, 1, 1)
dilation = (dilation, 1, 1)
self.conv = nn.Conv3d(chan_in, chan_out, kernel_size, stride=stride, dilation=dilation, **kwargs)
def forward(self, x):
x = F.pad(x, self.time_causal_padding, mode=self.pad_mode)
x = self.conv(x)
return x