物理学视角讲解diffusion生成模型——隐扩散模型

news2024/11/15 2:56:32

https://zhuanlan.zhihu.com/p/692996885
https://zhuanlan.zhihu.com/p/693255617
前面两篇文章介绍了扩散过程,同时实现了1维、2维混合高斯扩散、逆扩散,通过模型预测得分函数来实现逆扩散推理。这个章节介绍工业界使用的文本生成图扩撒模型:stable diffusion。

  1. 学习生成新内容的方法(正向/逆向扩散)
  2. 将文本与图像关联的方式(如CLIP的文图表示模型)
  3. 压缩图像的方式(自动编码器)
  4. 与学习评分函数相关的损失
  5. 用于处理图像的U-Net架构
  6. 加入控制条件方式(U-net架构 + 自我/交叉注意力)

后面章节会将上述每个部分的一些内容代码实现,到最后将拥有一个运行中的类Stable-Diffusion模型。

基础知识

让我们从正向扩散开始。在最简单的情况下,相关的扩散方程是:
KaTeX parse error: {equation} can be used only in display mode.
其中 σ ( t ) > 0 \sigma(t) > 0 σ(t)>0是“噪声强度”, Δ t \Delta t Δt是步长, r ∼ N ( 0 , 1 ) r \sim \mathcal{N}(0, 1) rN(0,1)是一个标准正态随机变量。本质上,我们不断地向样本中添加正态分布的噪声。通常,噪声强度 σ ( t ) \sigma(t) σ(t)会随时间变化(即随着 t$的增大而增大)。
我们可以通过一个类似的更新规则来逆转这个扩散过程:
KaTeX parse error: {equation} can be used only in display mode.
被称为得分函数。如果我们知道这个函数,我们可以逆转正向扩散,并将噪声还原为我们开始时的状态。
如果我们的初始样本始终只有一个点 x 0 = 0 x_0 = 0 x0=0,并且噪声强度是恒定的,那么得分函数就等于
KaTeX parse error: {equation} can be used only in display mode.
然后通常情况,我们并不事先知道得分函数;相反,我们需要学习它。一种学习它的方法是通过去噪目标训练一个神经网络来`去噪’样本。
KaTeX parse error: {equation} can be used only in display mode.
其中 p 0 ( x 0 ) p_0(x_0) p0(x0)是我们的目标分布(例如猫和狗的图片), x n o i s e d x_{noised} xnoised是目标分布样本 x 0 x_0 x0经过一步正向扩散后的状态,即 x n o i s e d − x 0 x_{noised} - x_0 xnoisedx0只是一个正态分布的随机变量。
这里有另一种写法,更接近实际实现:
KaTeX parse error: {equation} can be used only in display mode.

像素空间扩散模型

无条件约束扩散模型

通过U型网络处理图像

前面回顾了扩散模型的基础知识,关键在于学习得分函数可以让我们将纯噪声转变为想要的东西。我们将用神经网络来近似得分函数。但当我们处理图像时,我们需要我们的神经网络与它们“友好相处”,并反映出感应…空间尺度。
由于得分函数是时间的函数,我们还需要找到一种方法确保我们的神经网络能够正确响应时间的变化。为此,我们可以使用时间嵌入
下面的代码通过时间嵌入让神经网络处理时间。其思想是,不仅仅告诉网络一个数字(当前时间),而是用许多正弦特征来表达当前时间。以多种不同的方式告诉网络当前的时间,它将更容易地响应时间的变化。
这将使神经网络能够成功学习一个与时间相关的得分函数s(x, t)。

#@title Get some modules to let time interact

class GaussianFourierProjection(nn.Module):
    """Gaussian random features for encoding time steps."""  
    def __init__(self, embed_dim, scale=30.):
        super().__init__()
        # Randomly sample weights (frequencies) during initialization. 
        # These weights (frequencies) are fixed during optimization and are not trainable.
        self.W = nn.Parameter(torch.randn(embed_dim // 2) * scale, requires_grad=False)
    def forward(self, x):
        # Cosine(2 pi freq x), Sine(2 pi freq x)
        x_proj = x[:, None] * self.W[None, :] * 2 * np.pi
        return torch.cat([torch.sin(x_proj), torch.cos(x_proj)], dim=-1)


class Dense(nn.Module):
    """A fully connected layer that reshapes outputs to feature maps.
  Allow time repr to input additively from the side of a convolution layer.
  """
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.dense = nn.Linear(input_dim, output_dim)
    def forward(self, x):
        return self.dense(x)[..., None, None] 
# this broadcast the 2d tensor to 4d, add the same value across space. 
定义U-Net架构
#@title Defining a time-dependent score-based model (double click to expand or collapse)

class UNet(nn.Module):
  """A time-dependent score-based model built upon U-Net architecture."""

  def __init__(self, marginal_prob_std, channels=[32, 64, 128, 256], embed_dim=256):
    """Initialize a time-dependent score-based network.

    Args:
      marginal_prob_std: A function that takes time t and gives the standard
        deviation of the perturbation kernel p_{0t}(x(t) | x(0)).
      channels: The number of channels for feature maps of each resolution.
      embed_dim: The dimensionality of Gaussian random feature embeddings.
    """
    super().__init__()
    # Gaussian random feature embedding layer for time
    self.time_embed = nn.Sequential(
          GaussianFourierProjection(embed_dim=embed_dim),
          nn.Linear(embed_dim, embed_dim)
          )
    # Encoding layers where the resolution decreases
    self.conv1 = nn.Conv2d(1, channels[0], 3, stride=1, bias=False)
    self.dense1 = Dense(embed_dim, channels[0])
    self.gnorm1 = nn.GroupNorm(4, num_channels=channels[0])

    self.conv2 = nn.Conv2d(channels[0], channels[1], 3, stride=2, bias=False)
    self.dense2 = Dense(embed_dim, channels[1])
    self.gnorm2 = nn.GroupNorm(32, num_channels=channels[1])
    
    self.conv3 = nn.Conv2d(channels[1], channels[2], 3, stride=2, bias=False)
    self.dense3 = Dense(embed_dim, channels[2])
    self.gnorm3 = nn.GroupNorm(32, num_channels=channels[2])
    
    self.conv4 = nn.Conv2d(channels[2], channels[3], 3, stride=2, bias=False)
    self.dense4 = Dense(embed_dim, channels[3])
    self.gnorm4 = nn.GroupNorm(32, num_channels=channels[3])   
     

    # Decoding layers where the resolution increases
    self.tconv4 = nn.ConvTranspose2d(channels[3], channels[2], 3, stride=2, bias=False)
    self.dense5 = Dense(embed_dim, channels[2])
    self.tgnorm4 = nn.GroupNorm(32, num_channels=channels[2])
    self.tconv3 = nn.ConvTranspose2d(channels[2] + channels[2], channels[1], 3, stride=2, bias=False, output_padding=1)    
    self.dense6 = Dense(embed_dim, channels[1])
    self.tgnorm3 = nn.GroupNorm(32, num_channels=channels[1])
    self.tconv2 = nn.ConvTranspose2d(channels[1] + channels[1], channels[0], 3, stride=2, bias=False, output_padding=1)    
    self.dense7 = Dense(embed_dim, channels[0])
    self.tgnorm2 = nn.GroupNorm(32, num_channels=channels[0])
    self.tconv1 = nn.ConvTranspose2d(channels[0] + channels[0], 1, 3, stride=1)
    
    # The swish activation function
    self.act = lambda x: x * torch.sigmoid(x)
    self.marginal_prob_std = marginal_prob_std
  
  def forward(self, x, t, y=None): 
    # Obtain the Gaussian random feature embedding for t   
    embed = self.act(self.time_embed(t))    
    # Encoding path
    h1 = self.conv1(x)  + self.dense1(embed)   
    ## Incorporate information from t
    ## Group normalization
    h1 = self.act(self.gnorm1(h1))
    h2 = self.conv2(h1) + self.dense2(embed)
    h2 = self.act(self.gnorm2(h2))
    
    #   apply activation function
    h3 = self.conv3(h2) + self.dense3(embed)
    h3 = self.act(self.gnorm3(h3))
   
    h4 = self.conv4(h3) + self.dense4(embed)
    h4 = self.act(self.gnorm4(h4))

    # Decoding path
    h = self.tconv4(h4)
    ## Skip connection from the encoding path
    h += self.dense5(embed)
    h = self.act(self.tgnorm4(h))
    h = self.tconv3(torch.cat([h, h3], dim=1))
    h += self.dense6(embed)
    h = self.act(self.tgnorm3(h))
    h = self.tconv2(torch.cat([h, h2], dim=1))
    h += self.dense7(embed)
    h = self.act(self.tgnorm2(h))
    h = self.tconv1(torch.cat([h, h1], dim=1))

    # Normalize output
    h = h / self.marginal_prob_std(t)[:, None, None, None]
    return h

训练U-Net学习得分函数

将上面定义的U-Net与学习评分函数的方法结合起来。需要定义一个损失函数,然后以常规方式训练一个神经网络。
d x = σ t d w dx = \sigma^t dw dx=σtdw
该过程将噪声加到x样本上,噪声规模呈指数增长。
鉴于这一前向过程,以及给定一个起始x(0),我们有一个解析解用于任何时间x(t)的样本
p ( x ( t ) ∣ x ( 0 ) ) = N ( x ( 0 ) , σ ( t ) 2 ) p(x(t)|x(0))=\mathcal N(x(0),\sigma(t)^2) p(x(t)x(0))=N(x(0),σ(t)2)
我们称 σ ( t ) \sigma(t) σ(t)为边际标准差,即条件分布的标准差。在这个特定情况下,
σ 2 ( t ) = ∫ 0 t ( σ τ ) 2 d τ = ∫ 0 t σ 2 τ d τ = σ 2 t − 1 2 log ⁡ σ \sigma^2(t)=\int_0^t (\sigma^\tau ) ^2d\tau = \int_0^t \sigma^{2\tau} d\tau = \frac{\sigma^{2t}-1}{2\log \sigma } σ2(t)=0t(στ)2dτ=0tσ2τdτ=2logσσ2t1

扩散常数和噪声强度
#@title Diffusion constant and noise strength
device = 'cuda' #@param ['cuda', 'cpu'] {'type':'string'}

def marginal_prob_std(t, sigma):
    """Compute the mean and standard deviation of $p_{0t}(x(t) | x(0))$.

  Args:    
    t: A vector of time steps.
    sigma: The $\sigma$ in our SDE.  
  
  Returns:
    The standard deviation.
  """    
    t = torch.tensor(t, device=device)
    return torch.sqrt((sigma**(2 * t) - 1.) / 2. / np.log(sigma))

def diffusion_coeff(t, sigma):
    """Compute the diffusion coefficient of our SDE.

  Args:
    t: A vector of time steps.
    sigma: The $\sigma$ in our SDE.
  
  Returns:
    The vector of diffusion coefficients.
  """
    return torch.tensor(sigma**t, device=device)

sigma =  25.0#@param {'type':'number'}
marginal_prob_std_fn = functools.partial(marginal_prob_std, sigma=sigma)
diffusion_coeff_fn = functools.partial(diffusion_coeff, sigma=sigma)
定义损失函数

损失函数主要在下面定义。用强度为std[:, None, None, None]的随机噪声进行采样,并确保其形状与𝐱相同。然后使用这个来扰动𝐱。

def loss_fn(model, x, marginal_prob_std, eps=1e-5):
  """The loss function for training score-based generative models.

  Args:
    model: A PyTorch model instance that represents a 
      time-dependent score-based model.
    x: A mini-batch of training data.    
    marginal_prob_std: A function that gives the standard deviation of 
      the perturbation kernel.
    eps: A tolerance value for numerical stability.
  """
  # Sample time uniformly in 0, 1
  random_t = torch.rand(x.shape[0], device=x.device) * (1. - eps) + eps 
  # Find the noise std at the time `t`
  std = marginal_prob_std(random_t)
  
  z = torch.randn_like(x)             # get normally distributed noise
  perturbed_x = x + z * std[:, None, None, None]
  
  score = model(perturbed_x, random_t) #用模型来预测得分函数值
  loss = torch.mean(torch.sum((score * std[:, None, None, None] + z)**2, dim=(1,2,3)))
  return loss
定义采样器
#@title Sampler code
num_steps = 500#@param {'type':'integer'}
def Euler_Maruyama_sampler(score_model, 
              marginal_prob_std,
              diffusion_coeff, 
              batch_size=64, 
              x_shape=(1, 28, 28),
              num_steps=num_steps, 
              device='cuda', 
              eps=1e-3, y=None):
  """Generate samples from score-based models with the Euler-Maruyama solver.

  Args:
    score_model: A PyTorch model that represents the time-dependent score-based model.
    marginal_prob_std: A function that gives the standard deviation of
      the perturbation kernel.
    diffusion_coeff: A function that gives the diffusion coefficient of the SDE.
    batch_size: The number of samplers to generate by calling this function once.
    num_steps: The number of sampling steps. 
      Equivalent to the number of discretized time steps.
    device: 'cuda' for running on GPUs, and 'cpu' for running on CPUs.
    eps: The smallest time step for numerical stability.
  
  Returns:
    Samples.    
  """
  t = torch.ones(batch_size, device=device)
  init_x = torch.randn(batch_size, *x_shape, device=device) \
    * marginal_prob_std(t)[:, None, None, None]
  time_steps = torch.linspace(1., eps, num_steps, device=device)
  step_size = time_steps[0] - time_steps[1]
  x = init_x
  with torch.no_grad():
    for time_step in tqdm(time_steps):      
      batch_time_step = torch.ones(batch_size, device=device) * time_step
      g = diffusion_coeff(batch_time_step)
      mean_x = x + (g**2)[:, None, None, None] * score_model(x, batch_time_step, y=y) * step_size
      x = mean_x + torch.sqrt(step_size) * g[:, None, None, None] * torch.randn_like(x)      
  # Do not include any noise in the last sampling step.
  return mean_x

整合训练链路

把上面的各模块整合起来,用来MNIST数据来做无监督训练。

#@title Training the alternate U-Net model (double click to expand or collapse)

score_model = torch.nn.DataParallel(UNet_res(marginal_prob_std=marginal_prob_std_fn))
score_model = score_model.to(device)

n_epochs =   75#@param {'type':'integer'}
## size of a mini-batch
batch_size =  1024 #@param {'type':'integer'}
## learning rate
lr=10e-4 #@param {'type':'number'}

dataset = MNIST('.', train=True, transform=transforms.ToTensor(), download=True)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4)

optimizer = Adam(score_model.parameters(), lr=lr)
scheduler = LambdaLR(optimizer, lr_lambda=lambda epoch: max(0.2, 0.98 ** epoch))
tqdm_epoch = trange(n_epochs)
for epoch in tqdm_epoch:
  avg_loss = 0.
  num_items = 0
  for x, y in data_loader:
    x = x.to(device)    
    loss = loss_fn(score_model, x, marginal_prob_std_fn)
    optimizer.zero_grad()
    loss.backward()    
    optimizer.step()
    avg_loss += loss.item() * x.shape[0]
    num_items += x.shape[0]
  scheduler.step()
  lr_current = scheduler.get_last_lr()[0]
  print('{} Average Loss: {:5f} lr {:.1e}'.format(epoch, avg_loss / num_items, lr_current))
  # Print the averaged training loss so far.
  tqdm_epoch.set_description('Average Loss: {:5f}'.format(avg_loss / num_items))
  # Update the checkpoint after each epoch of training.
  torch.save(score_model.state_dict(), 'ckpt_res.pth')
训练结果可视化

image.png

def save_samples_uncond(score_model, suffix=""):
  score_model.eval()
  ## Generate samples using the specified sampler.
  sample_batch_size = 64 #@param {'type':'integer'}
  num_steps = 250 #@param {'type':'integer'}
  sampler = Euler_Maruyama_sampler #@param ['Euler_Maruyama_sampler', 'pc_sampler', 'ode_sampler'] {'type': 'raw'}
  # score_model.eval()
  ## Generate samples using the specified sampler.
  samples = sampler(score_model, 
          marginal_prob_std_fn,
          diffusion_coeff_fn, 
          sample_batch_size, 
          num_steps=num_steps,
          device=device,
          )

      ## Sample visualization.
  samples = samples.clamp(0.0, 1.0)
  sample_grid = make_grid(samples, nrow=int(np.sqrt(sample_batch_size)))
  sample_np = sample_grid.permute(1, 2, 0).cpu().numpy()
  plt.imsave(f"uncondition_diffusion{suffix}.png", sample_np,)
  plt.figure(figsize=(6,6))
  plt.axis('off')
  plt.imshow(sample_np, vmin=0., vmax=1.)
  plt.show()

uncond_score_model = torch.nn.DataParallel(UNet_res(marginal_prob_std=marginal_prob_std_fn))
uncond_score_model.load_state_dict(torch.load("ckpt_res.pth"))
save_samples_uncond(uncond_score_model, suffix="_res")

带条件约束扩散模型

注意力机制实现条件生成扩散模型

除了生成数字0-9的图像外,我们还想进行条件生成:例如,我们想指定我们想生成哪个数字的图像。
注意力模型虽然对于条件生成来说不是绝对必要的,但已被证明对于使其良好工作非常有用。所以这部分我们会用注意模型来实现条件生成扩散模型。

文本控制条件

在这里,我们没有使用复杂的CLIP模型,而是定义了数字0-9的自己的向量表示。
我们使用了nn.Embedding层将0-9的索引转换成向量。

class WordEmbed(nn.Module):
  def __init__(self, vocab_size, embed_dim):
    super(WordEmbed, self).__init__()
    self.embed = nn.Embedding(vocab_size+1, embed_dim)
  
  def forward(self, ids):
    return self.embed(ids)
文本条件引入注意力模型

我们通常使用三个部分来实现注意力模型:

  1. CrossAttention 编写一个模块来执行序列的自注意力/交叉注意力。
  2. TransformerBlock 结合自注意力/交叉注意力和前馈神经网络。
  3. SpatialTransformer 在U-net中使用注意力,将空间张量转换为序列张量,然后再转换回来。

实现CrossAttention类的一部分,并且还将在U-Net中添加注意力的位置。
image.png
简要回顾一下注意力模型的数学原理。QKV(查询-键-值)注意力模型将查询、键和值表示为向量。这些工具帮助我们将翻译任务一侧的词语/图像与另一侧相关联。
这些向量与𝐞向量(代表编码器的隐藏状态)和𝐡向量(代表解码器的隐藏状态)线性相关:
KaTeX parse error: {equation} can be used only in display mode.
为了确定要“关注”什么,计算每个键𝐤和查询𝐪的内积(即相似性)。为了得到不太大也不太小的典型值,可以通过查询向量𝐪𝑖的长度/维度来进行归一化。
最终的注意力分布来自于对所有这些进行softmax处理:
KaTeX parse error: {equation} can be used only in display mode.
注意力分布用于挑选一些相关的特征组合。例如,在将英语短语“European Union”翻译成法语时,要得到正确的答案(“Union européenne”),需要同时关注这两个词,而不是试图完全分开翻译每个词。在数学上,可以通过注意力分布对值𝐯𝑗进行加权:
KaTeX parse error: {equation} can be used only in display mode.
实现CrossAttention类。同时也实现TransformerBlock类。
请注意,相关的矩阵乘法可以使用torch.einsum来执行。例如,要将一个 M × N M \times N M×N矩阵A与一个 N × N N \times N N×N矩阵B相乘,即得到A B,我们可以写:
torch.einsum(ij,jk -> ik ,A, B)
如果我们想要计算 A B T A B^T ABT,我们可以写:
torch.einsum(ij,kj->ik ,A, B)
这个库会自动处理维度的正确移动。对于机器学习来说,批量进行矩阵乘法等操作通常很重要。在这种情况下,你可能会有张量而不是矩阵,但你可以写一个非常相似的表达式:
torch.einsum(bij,bkj ,A, B)
其中b是描述我们所讨论的批次元素的索引。最后一点,你可以使用任何你喜欢的字母来代替ij等。

注意力模块
class CrossAttention(nn.Module):
  def __init__(self, embed_dim, hidden_dim, context_dim=None, num_heads=1,):
    """
    Note: For simplicity reason, we just implemented 1-head attention. 
    Feel free to implement multi-head attention! with fancy tensor manipulations.
    """
    super(CrossAttention, self).__init__()
    self.hidden_dim = hidden_dim
    self.context_dim = context_dim
    self.embed_dim = embed_dim
    self.query = nn.Linear(hidden_dim, embed_dim, bias=False)
    if context_dim is None:
      self.self_attn = True 
      self.key = nn.Linear(hidden_dim, embed_dim, bias=False)     ###########
      self.value = nn.Linear(hidden_dim, hidden_dim, bias=False)  ############
    else:
      self.self_attn = False 
      self.key = nn.Linear(context_dim, embed_dim, bias=False)   #############
      self.value = nn.Linear(context_dim, hidden_dim, bias=False) ############
    
    
  def forward(self, tokens, context=None):
    # tokens: with shape [batch, sequence_len, hidden_dim]
    # context: with shape [batch, contex_seq_len, context_dim]
    if self.self_attn:
        Q = self.query(tokens)
        K = self.key(tokens)  
        V = self.value(tokens) 
    else:
        # implement Q, K, V for the Cross attention 
        Q = self.query(tokens)
        K = self.key(context)  
        V = self.value(context) 
    
    print(Q.shape, K.shape, V.shape)
    scoremats = torch.einsum("BTH,BSH->BTS", Q, K) # inner product of Q and K, a tensor 
    attnmats = F.softmax(scoremats / math.sqrt(self.embed_dim), dim=-1) # softmax of scoremats
    print(scoremats.shape, attnmats.shape, )
    ctx_vecs = torch.einsum("BTS,BSH->BTH", attnmats, V)  # weighted average value vectors by attnmats
    return ctx_vecs


class TransformerBlock(nn.Module):
  """The transformer block that combines self-attn, cross-attn and feed forward neural net"""
  def __init__(self, hidden_dim, context_dim):
    super(TransformerBlock, self).__init__()
    self.attn_self = CrossAttention(hidden_dim, hidden_dim, )
    self.attn_cross = CrossAttention(hidden_dim, hidden_dim, context_dim)

    self.norm1 = nn.LayerNorm(hidden_dim)
    self.norm2 = nn.LayerNorm(hidden_dim)
    self.norm3 = nn.LayerNorm(hidden_dim)
    # implement a 2 layer MLP with K*hidden_dim hidden units, and nn.GeLU nonlinearity #######
    self.ffn  = nn.Sequential(
        nn.Linear(hidden_dim, 3*hidden_dim),
        nn.GELU(),
        nn.Linear(3*hidden_dim, hidden_dim)
    )
    
  def forward(self, x, context=None):
    # Notice the + x as residue connections
    x = self.attn_self(self.norm1(x)) + x
    # Notice the + x as residue connections
    x = self.attn_cross(self.norm2(x), context=context) + x
    # Notice the + x as residue connections
    x = self.ffn(self.norm3(x)) + x
    return x 

class SpatialTransformer(nn.Module):
  def __init__(self, hidden_dim, context_dim):
    super(SpatialTransformer, self).__init__()
    self.transformer = TransformerBlock(hidden_dim, context_dim)

  def forward(self, x, context=None):
    b, c, h, w = x.shape
    x_in = x
    # Combine the spatial dimensions and move the channel dimen to the end
    x = rearrange(x, "b c h w->b (h w) c")
    # Apply the sequence transformer
    x = self.transformer(x, context)
    # Reverse the process
    x = rearrange(x, 'b (h w) c -> b c h w', h=h, w=w)
    # Residue 
    return x + x_in
Unet Transformer

现在可以在卷积层之间穿插使用SpatialTransformer层了!
在前向函数中使用它们。看看架构,如果需要的话,可以增加额外的注意力层。

class UNet_Tranformer(nn.Module):
  """A time-dependent score-based model built upon U-Net architecture."""

  def __init__(self, marginal_prob_std, channels=[32, 64, 128, 256], embed_dim=256, 
               text_dim=256, nClass=10):
    """Initialize a time-dependent score-based network.

    Args:
      marginal_prob_std: A function that takes time t and gives the standard
        deviation of the perturbation kernel p_{0t}(x(t) | x(0)).
      channels: The number of channels for feature maps of each resolution.
      embed_dim: The dimensionality of Gaussian random feature embeddings of time.
      text_dim:  the embedding dimension of text / digits. 
      nClass:    number of classes you want to model.
    """
    super().__init__()
    # Gaussian random feature embedding layer for time
    self.time_embed = nn.Sequential(
        GaussianFourierProjection(embed_dim=embed_dim),
        nn.Linear(embed_dim, embed_dim)
        )
    # Encoding layers where the resolution decreases
    self.conv1 = nn.Conv2d(1, channels[0], 3, stride=1, bias=False)
    self.dense1 = Dense(embed_dim, channels[0])
    self.gnorm1 = nn.GroupNorm(4, num_channels=channels[0])
    
    self.conv2 = nn.Conv2d(channels[0], channels[1], 3, stride=2, bias=False)
    self.dense2 = Dense(embed_dim, channels[1])
    self.gnorm2 = nn.GroupNorm(32, num_channels=channels[1])
    
    self.conv3 = nn.Conv2d(channels[1], channels[2], 3, stride=2, bias=False)
    self.dense3 = Dense(embed_dim, channels[2])
    self.gnorm3 = nn.GroupNorm(32, num_channels=channels[2])
    self.attn3 = SpatialTransformer(channels[2], text_dim) 
    
    self.conv4 = nn.Conv2d(channels[2], channels[3], 3, stride=2, bias=False)
    self.dense4 = Dense(embed_dim, channels[3])
    self.gnorm4 = nn.GroupNorm(32, num_channels=channels[3])    
    
    self.attn4 = SpatialTransformer(channels[3], text_dim) 

    # Decoding layers where the resolution increases
    self.tconv4 = nn.ConvTranspose2d(channels[3], channels[2], 3, stride=2, bias=False)
    self.dense5 = Dense(embed_dim, channels[2])
    self.tgnorm4 = nn.GroupNorm(32, num_channels=channels[2])   

    self.tconv3 = nn.ConvTranspose2d(channels[2], channels[1], 3, stride=2, bias=False, output_padding=1)     #  + channels[2]
    self.dense6 = Dense(embed_dim, channels[1])
    self.tgnorm3 = nn.GroupNorm(32, num_channels=channels[1])
    
    self.tconv2 = nn.ConvTranspose2d(channels[1], channels[0], 3, stride=2, bias=False, output_padding=1)     #  + channels[1]
    self.dense7 = Dense(embed_dim, channels[0])
    self.tgnorm2 = nn.GroupNorm(32, num_channels=channels[0])
    self.tconv1 = nn.ConvTranspose2d(channels[0], 1, 3, stride=1) #  + channels[0]
    
    # The swish activation function
    self.act = nn.SiLU() # lambda x: x * torch.sigmoid(x)
    self.marginal_prob_std = marginal_prob_std
    self.cond_embed = nn.Embedding(nClass, text_dim)
  
  def forward(self, x, t, y=None): 
    # Obtain the Gaussian random feature embedding for t   
    embed = self.act(self.time_embed(t))    
    y_embed = self.cond_embed(y).unsqueeze(1)
    # Encoding path
    h1 = self.conv1(x) + self.dense1(embed) 
    ## Incorporate information from t
    ## Group normalization
    h1 = self.act(self.gnorm1(h1))
    h2 = self.conv2(h1) + self.dense2(embed)
    h2 = self.act(self.gnorm2(h2))
    h3 = self.conv3(h2) + self.dense3(embed)
    h3 = self.act(self.gnorm3(h3))
    h3 = self.attn3(h3, y_embed) # Use your attention layers
    h4 = self.conv4(h3) + self.dense4(embed)
    h4 = self.act(self.gnorm4(h4))
    # Your code: Use your additional attention layers! 
    h4 = self.attn4(h4, y_embed) 
    ##################### ATTENTION LAYER COULD GO HERE IF ATTN4 IS DEFINED

    # Decoding path
    h = self.tconv4(h4) + self.dense5(embed)
    ## Skip connection from the encoding path
    h = self.act(self.tgnorm4(h))
    h = self.tconv3(h + h3) + self.dense6(embed)
    h = self.act(self.tgnorm3(h))
    h = self.tconv2(h + h2) + self.dense7(embed)
    h = self.act(self.tgnorm2(h))
    h = self.tconv1(h + h1)

    # Normalize output
    h = h / self.marginal_prob_std(t)[:, None, None, None]
    return h

条件去噪损失函数

在这里,需要在训练中使用y信息来修改损失函数。

def loss_fn_cond(model, x, y, marginal_prob_std, eps=1e-5):
  """The loss function for training score-based generative models.

  Args:
    model: A PyTorch model instance that represents a 
      time-dependent score-based model.
    x: A mini-batch of training data.    
    marginal_prob_std: A function that gives the standard deviation of 
      the perturbation kernel.
    eps: A tolerance value for numerical stability.
  """
  random_t = torch.rand(x.shape[0], device=x.device) * (1. - eps) + eps  
  z = torch.randn_like(x)
  std = marginal_prob_std(random_t)
  perturbed_x = x + z * std[:, None, None, None]
  score = model(perturbed_x, random_t, y=y)
  loss = torch.mean(torch.sum((score * std[:, None, None, None] + z)**2, dim=(1,2,3)))
  return loss

包含注意力机制的模型训练

#@title A handy training function
def train_diffusion_model(dataset,
                          score_model,
                          n_epochs =   100,
                          batch_size =  1024,
                          lr=10e-4,
                          model_name="transformer"):

  data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4)

  optimizer = Adam(score_model.parameters(), lr=lr)
  scheduler = LambdaLR(optimizer, lr_lambda=lambda epoch: max(0.2, 0.98 ** epoch))
  tqdm_epoch = trange(n_epochs)
  for epoch in tqdm_epoch:
    avg_loss = 0.
    num_items = 0
    for x, y in tqdm(data_loader):
      x = x.to(device)    
      loss = loss_fn_cond(score_model, x, y, marginal_prob_std_fn)
      optimizer.zero_grad()
      loss.backward()    
      optimizer.step()
      avg_loss += loss.item() * x.shape[0]
      num_items += x.shape[0]
    scheduler.step()
    lr_current = scheduler.get_last_lr()[0]
    print('{} Average Loss: {:5f} lr {:.1e}'.format(epoch, avg_loss / num_items, lr_current))
    # Print the averaged training loss so far.
    tqdm_epoch.set_description('Average Loss: {:5f}'.format(avg_loss / num_items))
    # Update the checkpoint after each epoch of training.
    torch.save(score_model.state_dict(), f'ckpt_{model_name}.pth')

# Feel free to play with hyperparameters for training!
score_model = torch.nn.DataParallel(UNet_Tranformer(marginal_prob_std=marginal_prob_std_fn))
score_model = score_model.to(device)
train_diffusion_model(dataset, score_model,
                      n_epochs =   100,
                      batch_size =  1024,
                      lr=10e-4,
                      model_name="transformer")

可视化


def visualize_digit_embedding(digit_embed):
  cossim_mat = []
  for i in range(10):
    cossim = torch.cosine_similarity(digit_embed, digit_embed[i:i+1,:]).cpu()
    cossim_mat.append(cossim)
  cossim_mat = torch.stack(cossim_mat)
  cossim_mat_nodiag = cossim_mat + torch.diag_embed(torch.nan * torch.ones(10))
  plt.imshow(cossim_mat_nodiag)
  plt.show()
  return cossim_mat

cossim_mat = visualize_digit_embedding(score_model.module.cond_embed.weight.data)

image.png

def save_samples_cond(score_model, suffix):
  score_model.eval()
  for digit in range(10):
    ## Generate samples using the specified sampler.
    sample_batch_size = 64 #@param {'type':'integer'}
    num_steps = 500 #@param {'type':'integer'}
    sampler = Euler_Maruyama_sampler #@param ['Euler_Maruyama_sampler', 'pc_sampler', 'ode_sampler'] {'type': 'raw'}
    # score_model.eval()
    ## Generate samples using the specified sampler.
    samples = sampler(score_model, 
            marginal_prob_std_fn,
            diffusion_coeff_fn, 
            sample_batch_size, 
            num_steps=num_steps,
            device=device,
            y=digit*torch.ones(sample_batch_size, dtype=torch.long))

        ## Sample visualization.
    samples = samples.clamp(0.0, 1.0)
    sample_grid = make_grid(samples, nrow=int(np.sqrt(sample_batch_size)))
    sample_np = sample_grid.permute(1, 2, 0).cpu().numpy()
    plt.imsave(f"condition_diffusion{suffix}_digit%d.png"%digit, sample_np,)
    plt.figure(figsize=(6,6))
    plt.axis('off')
    plt.imshow(sample_np, vmin=0., vmax=1.)
    plt.show()
save_samples_cond(score_model,"_res") # model with res connection

image.png

隐空间扩散

与其在像素空间中扩散(即破坏并去噪每个图像的像素),我们可以尝试在某种潜在空间中进行扩散。
这有几个优点。一个明显的优点是速度:在进行正向/逆向扩散之前压缩图像,可以使生成和训练的速度更快。另一个优点是,如果仔细选择,潜在空间可能是一个更自然或更易于解释的空间来处理图像。例如,给定一组头部的图片,可能某个潜在的方向对应于头部的方向。
如果我们对选择哪个潜在空间没有任何先验的偏好,我们可以直接使用自编码器来解决问题,希望它能找到合适的东西。
在本节中,我们将使用自编码器将MNIST图像压缩到更小的规模,并将其与我们的扩散管道的其余部分结合起来。

自编码器

自编码器首先将图像“编码”成某种潜在表示,然后从该潜在表示“解码”图像。

class AutoEncoder(nn.Module):
  """A time-dependent score-based model built upon U-Net architecture."""

  def __init__(self, channels=[4, 8, 32],):
    """Initialize a time-dependent score-based network.
    Args:
      channels: The number of channels for feature maps of each resolution.
      embed_dim: The dimensionality of Gaussian random feature embeddings.
    """
    super().__init__()
    # Gaussian random feature embedding layer for time
    # Encoding layers where the resolution decreases
    self.encoder = nn.Sequential(nn.Conv2d(1, channels[0], 3, stride=1, bias=True),
                   nn.BatchNorm2d(channels[0]),
                   nn.SiLU(),
                   nn.Conv2d(channels[0], channels[1], 3, stride=2, bias=True),
                   nn.BatchNorm2d(channels[1]),
                   nn.SiLU(),
                   nn.Conv2d(channels[1], channels[2], 3, stride=1, bias=True),
                   nn.BatchNorm2d(channels[2]),
                   ) #nn.SiLU(),
    self.decoder = nn.Sequential(nn.ConvTranspose2d(channels[2], channels[1], 3, stride=1, bias=True),
                   nn.BatchNorm2d(channels[1]),
                   nn.SiLU(),
                   nn.ConvTranspose2d(channels[1], channels[0], 3, stride=2, bias=True, output_padding=1),
                   nn.BatchNorm2d(channels[0]),
                   nn.SiLU(),
                   nn.ConvTranspose2d(channels[0], 1, 3, stride=1, bias=True),
                   nn.Sigmoid(),)
  
  def forward(self, x):
    encoded = self.encoder(x)
    output = self.decoder(encoded)
    return output

x_tmp = torch.randn(1,1,28,28)
print(AutoEncoder()(x_tmp).shape)
assert AutoEncoder()(x_tmp).shape == x_tmp.shape, "Check conv layer spec! the autoencoder input output shape not align"
用感知损失训练自动编码器
from lpips import LPIPS
device = 'cuda' #@param ['cuda', 'cpu'] {'type':'string'}
# Define the loss function, MSE and LPIPS 
lpips = LPIPS(net="squeeze").cuda()
loss_fn_ae = lambda x,xhat: \
    nn.functional.mse_loss(x, xhat) + \
    lpips(x.repeat(1,3,1,1), x_hat.repeat(1,3,1,1)).mean()

ae_model = AutoEncoder([4, 4, 4]).cuda()
n_epochs =   50  #@param {'type':'integer'}
## size of a mini-batch
batch_size =  2048   #@param {'type':'integer'}
## learning rate
lr=10e-4 #@param {'type':'number'}

dataset = MNIST('.', train=True, transform=transforms.ToTensor(), download=True)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=0)

optimizer = Adam(ae_model.parameters(), lr=lr)
tqdm_epoch = trange(n_epochs)
for epoch in tqdm_epoch:
  avg_loss = 0.
  num_items = 0
  for x, y in data_loader:
    x = x.to(device)    
    z = ae_model.encoder(x)
    x_hat = ae_model.decoder(z)
    loss = loss_fn_ae(x, x_hat) #loss_fn_cond(score_model, x, y, marginal_prob_std_fn)
    optimizer.zero_grad()
    loss.backward()    
    optimizer.step()
    avg_loss += loss.item() * x.shape[0]
    num_items += x.shape[0]
  print('{} Average Loss: {:5f}'.format(epoch, avg_loss / num_items))
  # Print the averaged training loss so far.
  tqdm_epoch.set_description('Average Loss: {:5f}'.format(avg_loss / num_items))
  # Update the checkpoint after each epoch of training.
  torch.save(ae_model.state_dict(), 'ckpt_ae.pth')
可视化训练过的自动编码器

下面的单元格用于可视化结果。自动编码器的输出应该与原始图像几乎一模一样。

#@title Visualize trained autoencoder
ae_model.eval()
x, y = next(iter(data_loader))
x_hat = ae_model(x.to(device)).cpu()
plt.figure(figsize=(6,6.5))
plt.axis('off')
plt.imshow(make_grid(x[:64,:,:,:].cpu()).permute([1,2,0]), vmin=0., vmax=1.)
plt.title("Original")
plt.show()

plt.figure(figsize=(6,6.5))
plt.axis('off')
plt.imshow(make_grid(x_hat[:64,:,:,:].cpu()).permute([1,2,0]), vmin=0., vmax=1.)
plt.title("AE Reconstructed")
plt.show()

image.pngimage.png

创建潜在状态数据集

让我们使用自动编码器将MNIST图像转换为潜在空间表示。我们将使用这些压缩后的图像来训练我们的扩散生成模型。

batch_size = 4096
dataset = MNIST('.', train=True, transform=transforms.ToTensor(), download=True)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=4)
ae_model.requires_grad_(False)
ae_model.eval()
zs = []
ys = []
for x, y in tqdm(data_loader):
  z = ae_model.encoder(x.to(device)).cpu()
  zs.append(z)
  ys.append(y)

zdata = torch.cat(zs, )
ydata = torch.cat(ys, )

print(zdata.shape)
print(ydata.shape)
print(zdata.mean(), zdata.var())

from torch.utils.data import TensorDataset
latent_dataset = TensorDataset(zdata, ydata)

用于潜变量的变压器UNet模型

有一个U-Net(包括自注意力/交叉注意力),类似于我们之前定义的那种,但这次它处理的是压缩图像而不是全尺寸图像。

class Latent_UNet_Tranformer(nn.Module):
  """A time-dependent score-based model built upon U-Net architecture."""

  def __init__(self, marginal_prob_std, channels=[4, 64, 128, 256], embed_dim=256, 
               text_dim=256, nClass=10):
    """Initialize a time-dependent score-based network.

    Args:
      marginal_prob_std: A function that takes time t and gives the standard
        deviation of the perturbation kernel p_{0t}(x(t) | x(0)).
      channels: The number of channels for feature maps of each resolution.
      embed_dim: The dimensionality of Gaussian random feature embeddings.
    """
    super().__init__()
    # Gaussian random feature embedding layer for time
    self.time_embed = nn.Sequential(
        GaussianFourierProjection(embed_dim=embed_dim),
        nn.Linear(embed_dim, embed_dim))
    # Encoding layers where the resolution decreases
    self.conv1 = nn.Conv2d(channels[0], channels[1], 3, stride=1, bias=False)
    self.dense1 = Dense(embed_dim, channels[1])
    self.gnorm1 = nn.GroupNorm(4, num_channels=channels[1])
    self.conv2 = nn.Conv2d(channels[1], channels[2], 3, stride=2, bias=False)
    self.dense2 = Dense(embed_dim, channels[2])
    self.gnorm2 = nn.GroupNorm(4, num_channels=channels[2])
    self.attn2 = SpatialTransformer(channels[2], text_dim)
    self.conv3 = nn.Conv2d(channels[2], channels[3], 3, stride=2, bias=False)
    self.dense3 = Dense(embed_dim, channels[3])
    self.gnorm3 = nn.GroupNorm(4, num_channels=channels[3])   
    self.attn3 = SpatialTransformer(channels[3], text_dim) 
    
    self.tconv3 = nn.ConvTranspose2d(channels[3], channels[2], 3, stride=2, bias=False, )    
    self.dense6 = Dense(embed_dim, channels[2])
    self.tgnorm3 = nn.GroupNorm(4, num_channels=channels[2])
    self.attn6 = SpatialTransformer(channels[2], text_dim)
    self.tconv2 = nn.ConvTranspose2d(channels[2], channels[1], 3, stride=2, bias=False, output_padding=1)     # + channels[2]
    self.dense7 = Dense(embed_dim, channels[1])
    self.tgnorm2 = nn.GroupNorm(4, num_channels=channels[1])
    self.tconv1 = nn.ConvTranspose2d(channels[1], channels[0], 3, stride=1) # + channels[1]
    
    # The swish activation function
    self.act = nn.SiLU() # lambda x: x * torch.sigmoid(x)
    self.marginal_prob_std = marginal_prob_std
    self.cond_embed = nn.Embedding(nClass, text_dim)
  
  def forward(self, x, t, y=None): 
    # Obtain the Gaussian random feature embedding for t   
    embed = self.act(self.time_embed(t))    
    y_embed = self.cond_embed(y).unsqueeze(1)
    # Encoding path
    ## Incorporate information from t
    h1 = self.conv1(x) + self.dense1(embed)
    ## Group normalization
    h1 = self.act(self.gnorm1(h1))
    h2 = self.conv2(h1) + self.dense2(embed)
    h2 = self.act(self.gnorm2(h2))
    h2 = self.attn2(h2, y_embed)
    h3 = self.conv3(h2) + self.dense3(embed)
    h3 = self.act(self.gnorm3(h3))
    h3 = self.attn3(h3, y_embed)

    # Decoding path
    ## Skip connection from the encoding path
    h = self.tconv3(h3) + self.dense6(embed)
    h = self.act(self.tgnorm3(h))
    h = self.attn6(h, y_embed)
    h = self.tconv2(h + h2)
    h += self.dense7(embed)
    h = self.act(self.tgnorm2(h))
    h = self.tconv1(h + h1)

    # Normalize output
    h = h / self.marginal_prob_std(t)[:, None, None, None]
    return h

训练隐扩散模型

最后,我们将所有内容整合在一起,将我们的潜在空间表示与我们先进的U-Net结合起来学习评分函数。(这可能实际上效果不是很好…但至少你可以理解,有了这么多的活动部件,这成了一个复杂的工程问题。)
运行下面的单元来训练我们的潜在扩散模型!

#@title Training Latent diffusion model

continue_training = False #@param {type:"boolean"}
if not continue_training:
  print("initilize new score model...")
  latent_score_model = torch.nn.DataParallel(
      Latent_UNet_Tranformer(marginal_prob_std=marginal_prob_std_fn,
                  channels=[4, 16, 32, 64], ))
  latent_score_model = latent_score_model.to(device)


n_epochs =   250 #@param {'type':'integer'}
## size of a mini-batch
batch_size =  1024 #@param {'type':'integer'}
## learning rate
lr=1e-4 #@param {'type':'number'}

latent_data_loader = DataLoader(latent_dataset, batch_size=batch_size, shuffle=True, )
latent_score_model.train()
optimizer = Adam(latent_score_model.parameters(), lr=lr)
scheduler = LambdaLR(optimizer, lr_lambda=lambda epoch: max(0.5, 0.995 ** epoch))
tqdm_epoch = trange(n_epochs)
for epoch in tqdm_epoch:
  avg_loss = 0.
  num_items = 0
  for z, y in latent_data_loader:
    z = z.to(device)    
    loss = loss_fn_cond(latent_score_model, z, y, marginal_prob_std_fn)
    optimizer.zero_grad()
    loss.backward()    
    optimizer.step()
    avg_loss += loss.item() * z.shape[0]
    num_items += z.shape[0]
  scheduler.step()
  lr_current = scheduler.get_last_lr()[0]
  print('{} Average Loss: {:5f} lr {:.1e}'.format(epoch, avg_loss / num_items, lr_current))
  # Print the averaged training loss so far.
  tqdm_epoch.set_description('Average Loss: {:5f}'.format(avg_loss / num_items))
  # Update the checkpoint after each epoch of training.
  torch.save(latent_score_model.state_dict(), 'ckpt_latent_diff_transformer.pth')

可视化

digit = 4 #@param {'type':'integer'}
sample_batch_size = 64 #@param {'type':'integer'}
num_steps = 500 #@param {'type':'integer'}
sampler = Euler_Maruyama_sampler #@param ['Euler_Maruyama_sampler', 'pc_sampler', 'ode_sampler'] {'type': 'raw'}
latent_score_model.eval()
## Generate samples using the specified sampler.
samples_z = sampler(latent_score_model, 
        marginal_prob_std_fn,
        diffusion_coeff_fn, 
        sample_batch_size, 
        num_steps=num_steps,
        device=device,
        x_shape=(4,10,10),
        y=digit*torch.ones(sample_batch_size, dtype=torch.long))

## Sample visualization.
decoder_samples = ae_model.decoder(samples_z).clamp(0.0, 1.0)
sample_grid = make_grid(decoder_samples, nrow=int(np.sqrt(sample_batch_size)))
%matplotlib inline
import matplotlib.pyplot as plt

plt.figure(figsize=(6,6))
plt.axis('off')
plt.imshow(sample_grid.permute(1, 2, 0).cpu(), vmin=0., vmax=1.)
plt.show()

# play with architecturs
latent_score_model = torch.nn.DataParallel(
    Latent_UNet_Tranformer(marginal_prob_std=marginal_prob_std_fn,
                channels=[4, 16, 32, 64], ))
latent_score_model = latent_score_model.to(device)

train_diffusion_model(latent_dataset, score_model,
                      n_epochs =   100,
                      batch_size =  1024,
                      lr=10e-4,
                      model_name="transformer_latent")

在下面,我们将详细检查一些样本,以研究图像生成的质量。你可能会看到一些相当奇怪的东西。

cossim_mat = visualize_digit_embedding(latent_score_model.module.cond_embed.weight.data)
digit = 8 #@param {'type':'integer'}
sample_batch_size = 64 #@param {'type':'integer'}
num_steps = 250 #@param {'type':'integer'}
sampler = Euler_Maruyama_sampler #@param ['Euler_Maruyama_sampler', 'pc_sampler', 'ode_sampler'] {'type': 'raw'}
latent_score_model.eval()
## Generate samples using the specified sampler.
samples_z = sampler(latent_score_model, 
        marginal_prob_std_fn,
        diffusion_coeff_fn, 
        sample_batch_size, 
        num_steps=num_steps,
        device=device,
        x_shape=(4,10,10),
        y=digit*torch.ones(sample_batch_size, dtype=torch.long))

## Sample visualization.
decoder_samples = ae_model.decoder(samples_z).clamp(0.0, 1.0)
sample_grid = make_grid(decoder_samples, nrow=int(np.sqrt(sample_batch_size)))
%matplotlib inline
import matplotlib.pyplot as plt

plt.figure(figsize=(6,6))
plt.axis('off')
plt.imshow(sample_grid.permute(1, 2, 0).cpu(), vmin=0., vmax=1.)
plt.show()

image.pngimage.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1612460.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SSM+Jsp+Mysql的多人命题系统

开发语言:Java框架:ssm技术:JSPJDK版本:JDK1.8服务器:tomcat7数据库:mysql 5.7(一定要5.7版本)数据库工具:Navicat11开发软件:eclipse/myeclipse/ideaMaven包…

frp改造Windows笔记本实现家庭版免费内网穿透

文章目录 前言frp原理Windows服务端IP检验IP固定软件下载端口放行端口映射开机启动 NAS客户端端口查询软件下载端口检验穿透测试自启设置 Ubuntu客户端软件下载后台启动 后记 前言 之前一直用花生壳远程控制一个服务器,但最近内网的网络策略似乎发生了变化&#xf…

InfiniGate自研网关实现思路二

5.HTTP请求参数解析 解析 HTTP 网络请求的参数信息,包括;GET/POST,以及应对不同 Content-Type 类型的处理。 HTTP 接口请求的参数需要解析成可以匹配到 RPC 接口的入参信息,所以通常为了方便控制一般只支持 RPC 接口单个对象入参…

「sentinel」流量控制组件的应用

「sentinel」流量控制组件的应用 Sentinel版本QPS 一、初识Sentinel1、Sentinel2、Sentinel 和 Hystrix对比3、雪崩问题 二、环境搭建1、下载安装Sentinel2、微服务整合Sentinel 三、流量控制1、簇点链路2、流控设置3、流控模式直接关联链路 4、流控效果流控效果解释 四、热点限…

C#通用类库封装实战

数据库查询 特性方式获取数据库列的别名 数据库更新 使用简单工厂配置的方式

C++ stl容器stack,queue,priority_queue的底层模拟实现

目录 前言: 文档借鉴:Reference - C Reference 1.deque a.deque的结构特点: b.deque的迭代器结构: c.面试题: 2.stack 3.queue 4.仿函数 5.priority_queue 总结: 前言: 本篇一共简单…

【QT学习】8.qt事件处理机制,事件过滤器,自定义事件

1.qt事件处理机制 事件处理: 当用户移动鼠标的时候 ,创建一个 鼠标移动事件对象 然后把这个对象放到 事件队列里面去,事件管理器 从队列中 取出事件,然后 调用其对应的事件处理函数。 多态机制: &#x…

靠谱的婚恋平台有哪些?青藤之恋、二狗、百合网、珍爱网等深度测评

哇塞,恋爱和结婚对于年轻人来讲可是超级重要的大事呢!不过呀,找到一个稳稳当当的婚恋平台可不简单哟!那么,到底哪个婚恋平台最靠得住呢? 丛丛: 这可是我用了好久好久的脱单交友小程序嘞&#xf…

MySQL中explain的用法

执行结果各字段的含义 EXPLAIN SQL语句 如: EXPLAIN SELECT * FROM test 执行结果: 列名描述id在一个大的查询语句中每个SELECT关键字都对应一个 唯一的idselect_typeSELECT关键字对应的那个查询的类型table表名partitions匹配的分区信息type针对单表…

机器学习预测汽车油耗效率 MPG

流程 数据获取导入需要的包引入文件,查看内容划分训练集和测试集调用模型查看准确率 数据获取 链接:https://pan.baidu.com/s/1KeIJykbcVpsfEk0xjhiICA?pwd30oe 提取码:30oe --来自百度网盘超级会员V1的分享导入需要的包 import pandas as pd imp…

【Spring Boot】掌握Spring Boot:深入解析配置文件的使用与管理

💓 博客主页:从零开始的-CodeNinja之路 ⏩ 收录文章:【Spring Boot】掌握Spring Boot:深入解析配置文件的使用与管理 🎉欢迎大家点赞👍评论📝收藏⭐文章 目录 Spring Boot 配置文件一. 配置文…

重新总结一下以前写过的“波特率”!单片机常见的通信速率分析!

文章目录 如题以前文章新的总结如题 波特率是单片机中描述通信速率的一个单位,比如串口通信、SPI通信、IIC通信、LIN通信、CAN通信等等,现在重新总结一下涉及到波特率的一些知识点。 以前文章 上面是存储的单位换算方式 这是通信速率的换算方式 新的总结 波特率的英文是…

画家-qt-surce

void GraphicView::paintEvent(QPaintEvent *pe) { QPainter painter(viewport()); painter.setRenderHint(QPainter::SmoothPixmapTransform);//升级画家 painter.drawImage(rect(),musicImage); } 分析: 这段代码是用于绘制图形视图的部分。 1. void GraphicV…

JavaCard学习笔记: CAP Component 之 Class Component

文章目录 整体结构tag和size字段signature_pool_length和signature_pooltype_descriptor结构导入类型编码导入项签名示例导入类导入数组导入远程方法 interfaces[]interface_info结构flagsinteface_countsuperinterfacesinterface_name class_info_compact classes[]结构flagsi…

mapreduce中的ReduceTask工作机制(Hadoop)

ReduceTask 是 Hadoop 中的一个重要组件,负责对 MapTask 的输出进行合并、排序和归并,最终生成最终的输出结果。 ReduceTask 的工作机制 1. 分组(Shuffle)阶段: 在分组阶段,ReduceTask 会从多个 Mapper …

【问题处理】银河麒麟操作系统实例分享,服务器操作系统VNC远程问题分析

1.服务器环境以及配置 【内核版本】 4.19.90-23.8.v2101.ky10.aarch64 【OS镜像版本】 0518-server 2.问题现象描述 服务器通过vncserver:1.service服务启动的vnc服务后,普通用户用vnc连接时,锁屏后,然后输入登陆密码会报密码错误&…

回溯算法练习day.4

93.复原ip地址 链接:. - 力扣(LeetCode) 题目描述: 有效 IP 地址 正好由四个整数(每个整数位于 0 到 255 之间组成,且不能含有前导 0),整数之间用 . 分隔。 例如:"…

基于单片机的智能病床呼叫系统设计与仿真

摘 要 本文设计的病床呼叫系统采用单片机作为控制器。该系统具有远程控制、病人的身体情况检测、报警呼叫、显示和执行器运动的功能。远程控制由红外线传感器和矩阵键盘组成,检测电路由温湿度传感器DH22、心率传感器Pulse Sensor、压力传感器MPX4115组成&#x…

苹果电脑虚拟机黑屏了怎么办解决 MAC系统升级后虚拟机黑屏问题 苹果电脑虚拟机卡住了怎么办

虚拟机是一种可以在一台电脑上运行多个操作系统的软件,它可以让用户在苹果电脑上安装和使用Windows、Linux等其他系统。但是,有时候在升级Mac系统或者虚拟机软件后,虚拟机会出现黑屏的现象,无法正常启动或者使用。这种情况该如何解…

java swing电商出入库管理系统eclipse开发Mysql数据库CS结构java编程

一、源码特点 java swing 电商出入库管理系统 是一套完善的窗体设计系统,对理解SWING java 编程开发语言有帮助,系统具有完整的源代码和数据库,,系统主要采用C/S模式开发。 应用技术:javamysql 开发工具&#xff1…