代码
class FeedForward(nn.Module):
def __init__(self, config: LMConfig):
super().__init__()
if config.hidden_dim is None:
hidden_dim = 4 * config.dim
hidden_dim = int(2 * hidden_dim / 3)
config.hidden_dim = config.multiple_of * ((hidden_dim + config.multiple_of - 1) // config.multiple_of)
self.w1 = nn.Linear(config.dim, config.hidden_dim, bias=False)
self.w2 = nn.Linear(config.hidden_dim, config.dim, bias=False)
self.w3 = nn.Linear(config.dim, config.hidden_dim, bias=False)
self.dropout = nn.Dropout(config.dropout)
def forward(self, x):
return self.dropout(self.w2(F.silu(self.w1(x)) * self.w3(x)))
class MoEGate(nn.Module):
def __init__(self, config: LMConfig):
super().__init__()
self.config = config
self.top_k = config.num_experts_per_tok
self.n_routed_experts = config.n_routed_experts
self.scoring_func = config.scoring_func
self.alpha = config.aux_loss_alpha
self.seq_aux = config.seq_aux
self.norm_topk_prob = config.norm_topk_prob
self.gating_dim = config.dim
self.weight = nn.Parameter(torch.empty((self.n_routed_experts, self.gating_dim)))
self.reset_parameters()
def reset_parameters(self) -> None:
import torch.nn.init as init
init.kaiming_uniform_(self.weight, a=math.sqrt(5))
def forward(self, hidden_states):
bsz, seq_len, h = hidden_states.shape
hidden_states = hidden_states.view(-1, h)
logits = F.linear(hidden_states, self.weight, None)
if self.scoring_func == 'softmax':
scores = logits.softmax(dim=-1)
else:
raise NotImplementedError(f'insupportable scoring function for MoE gating: {self.scoring_func}')
topk_weight, topk_idx = torch.topk(scores, k=self.top_k, dim=-1, sorted=False)
if self.top_k > 1 and self.norm_topk_prob:
denominator = topk_weight.sum(dim=-1, keepdim=True) + 1e-20
topk_weight = topk_weight / denominator
if self.training and self.alpha > 0.0:
scores_for_aux = scores
aux_topk = self.top_k
topk_idx_for_aux_loss = topk_idx.view(bsz, -1)
if self.seq_aux:
scores_for_seq_aux = scores_for_aux.view(bsz, seq_len, -1)
ce = torch.zeros(bsz, self.n_routed_experts, device=hidden_states.device)
ce.scatter_add_(1, topk_idx_for_aux_loss,
torch.ones(bsz, seq_len * aux_topk, device=hidden_states.device)).div_(
seq_len * aux_topk / self.n_routed_experts)
aux_loss = (ce * scores_for_seq_aux.mean(dim=1)).sum(dim=1).mean() * self.alpha
else:
mask_ce = F.one_hot(topk_idx_for_aux_loss.view(-1), num_classes=self.n_routed_experts)
ce = mask_ce.float().mean(0)
Pi = scores_for_aux.mean(0)
fi = ce * self.n_routed_experts
aux_loss = (Pi * fi).sum() * self.alpha
else:
aux_loss = 0
return topk_idx, topk_weight, aux_loss
class MOEFeedForward(nn.Module):
def __init__(self, config: LMConfig):
super().__init__()
self.config = config
self.experts = nn.ModuleList([
FeedForward(config)
for _ in range(config.n_routed_experts)
])
self.gate = MoEGate(config)
if config.n_shared_experts is not None:
self.shared_experts = FeedForward(config)
def forward(self, x):
identity = x
orig_shape = x.shape
bsz, seq_len, _ = x.shape
# 使用门控机制选择专家
topk_idx, topk_weight, aux_loss = self.gate(x)
x = x.view(-1, x.shape[-1])
flat_topk_idx = topk_idx.view(-1)
if self.training:
# 训练模式下,重复输入数据
x = x.repeat_interleave(self.config.num_experts_per_tok, dim=0)
y = torch.empty_like(x, dtype=torch.float16)
for i, expert in enumerate(self.experts):
y[flat_topk_idx == i] = expert(x[flat_topk_idx == i]).to(y.dtype) # 确保类型一致
y = (y.view(*topk_weight.shape, -1) * topk_weight.unsqueeze(-1)).sum(dim=1)
y = y.view(*orig_shape)
else:
# 推理模式下,只选择最优专家
y = self.moe_infer(x, flat_topk_idx, topk_weight.view(-1, 1)).view(*orig_shape)
if self.config.n_shared_experts is not None:
y = y + self.shared_experts(identity)
self.aux_loss = aux_loss
return y
@torch.no_grad()
def moe_infer(self, x, flat_expert_indices, flat_expert_weights):
expert_cache = torch.zeros_like(x)
idxs = flat_expert_indices.argsort()
tokens_per_expert = flat_expert_indices.bincount().cpu().numpy().cumsum(0)
token_idxs = idxs // self.config.num_experts_per_tok
# 例如当tokens_per_expert=[6, 15, 20, 26, 33, 38, 46, 52]
# 当token_idxs=[3, 7, 19, 21, 24, 25, 4, 5, 6, 10, 11, 12...]
# 意味着当token_idxs[:6] -> [3, 7, 19, 21, 24, 25, 4]位置的token都由专家0处理,token_idxs[6:15]位置的token都由专家1处理......
for i, end_idx in enumerate(tokens_per_expert):
start_idx = 0 if i == 0 else tokens_per_expert[i - 1]
if start_idx == end_idx:
continue
expert = self.experts[i]
exp_token_idx = token_idxs[start_idx:end_idx]
expert_tokens = x[exp_token_idx]
expert_out = expert(expert_tokens).to(expert_cache.dtype)
expert_out.mul_(flat_expert_weights[idxs[start_idx:end_idx]])
# 使用 scatter_add_ 进行 sum 操作
expert_cache.scatter_add_(0, exp_token_idx.view(-1, 1).repeat(1, x.shape[-1]), expert_out)
return expert_cache
代码解释
解释一下这段代码的主要组成部分:
FeedForward
类:
- 实现了一个基础的前馈网络
- 使用 SwiGLU 激活函数(
F.silu(self.w1(x)) * self.w3(x)
) - 包含三个线性层(w1、w2、w3)和一个 dropout 层
MoEGate
类(门控机制):
- 负责决定每个 token 应该由哪些专家处理
- 主要步骤:
- 计算每个 token 对应每个专家的分数(使用 softmax)
- 选择 top-k 个最高分的专家
- 计算辅助损失(aux_loss)来平衡专家的使用
MOEFeedForward
类(混合专家系统):
-
包含多个专家(FeedForward)和一个门控网络(MoEGate)
-
训练模式:
- 使用门控网络选择每个 token 的专家
- 将输入数据复制多份,分发给不同专家
- 专家并行处理数据
- 根据门控权重合并结果
-
推理模式(
moe_infer
):- 对专家索引排序,将相同专家的 token 批量处理
- 使用
scatter_add_
将专家输出累加到正确位置 - 更高效的推理实现,避免了数据重复
- 特殊功能:
- 支持共享专家(
n_shared_experts
) - 实现了专家负载均衡(通过辅助损失)
- 支持每个 token 选择多个专家(
num_experts_per_tok
)
这是一个典型的 MoE(Mixture of Experts)实现,用于大型语言模型中提高模型容量和计算效率。
示例
# 创建 MoE 实例
dim = 512 # 输入维度
n_routed_experts = 4 # 专家数量
num_experts_per_tok = 2 # 每个token选择的专家数量
moe = MOEFeedForward(
dim=dim,
n_routed_experts=n_routed_experts,
num_experts_per_tok=num_experts_per_tok,
hidden_dim=None, # FFN隐藏层维度,None时自动计算
dropout=0.1 # dropout比率
)
# 创建示例输入
batch_size = 2
seq_len = 10
x = torch.randn(batch_size, seq_len, dim) # 形状: [2, 10, 512]
moe(x)
输出
After gate - topk_idx.shape: torch.Size([20, 2]), topk_weight.shape: torch.Size([20, 2])
After view - x.shape: torch.Size([20, 512]), flat_topk_idx.shape: torch.Size([40])
After repeat_interleave - x.shape: torch.Size([40, 512])
Empty y tensor shape: torch.Size([40, 512])
Expert 0 - input shape: torch.Size([9, 512])
Expert 0 - output shape: torch.Size([9, 512])
Expert 1 - input shape: torch.Size([13, 512])
Expert 1 - output shape: torch.Size([13, 512])
Expert 2 - input shape: torch.Size([11, 512])
Expert 2 - output shape: torch.Size([11, 512])
Expert 3 - input shape: torch.Size([7, 512])
Expert 3 - output shape: torch.Size([7, 512])
Before view - y.shape: torch.Size([40, 512])
topk_weight.shape: torch.Size([20, 2])
After view and sum - y.shape: torch.Size([20, 512])
Final y.shape: torch.Size([2, 10, 512])
相应的torch函数
import torch
# empty: 创建未初始化的张量
x = torch.empty((2, 3)) # 创建形状为 2x3 的未初始化张量
# zeros_like: 创建与输入相同形状的全零张量
a = torch.tensor([[1, 2], [3, 4]])
b = torch.zeros_like(a) # 创建形状为 2x2 的全零张量
print(b) # tensor([[0, 0], [0, 0]])
tensor([[0, 0],
[0, 0]])
import torch.nn.functional as F
x = torch.tensor([[1, 2, 3, 4], [5, 6, 7, 8]])
# view: 改变张量形状
y = x.view(-1) # 展平为一维
print(y) # tensor([1, 2, 3, 4, 5, 6, 7, 8])
# -1 表示自动计算该维度大小
z = x.view(-1, 2) # 重塑为 4x2
print(z) # tensor([[1, 2], [3, 4], [5, 6], [7, 8]])
tensor([1, 2, 3, 4, 5, 6, 7, 8])
tensor([[1, 2],
[3, 4],
[5, 6],
[7, 8]])
# linear: 线性变换 y = xA^T + b
input = torch.randn(2, 3) # 2个样本,每个3维
weight = torch.randn(4, 3) # 输出4维
output = F.linear(input, weight) # 形状变为 [2, 4]
# softmax: 将数值转换为概率分布
logits = torch.tensor([1.0, 2.0, 3.0])
probs = F.softmax(logits, dim=0)
print(probs) # tensor([0.0900, 0.2447, 0.6652])
tensor([0.0900, 0.2447, 0.6652])
# 找出最大的k个值及其索引
x = torch.tensor([1, 5, 2, 8, 3])
values, indices = torch.topk(x, k=2)
print(values) # tensor([8, 5])
print(indices) # tensor([3, 1])
tensor([8, 5])
tensor([3, 1])
x = torch.tensor([1, 2, 3])
# 每个元素重复2次
y = x.repeat_interleave(2)
print(y) # tensor([1, 1, 2, 2, 3, 3])
tensor([1, 1, 2, 2, 3, 3])
# 统计每个数字出现的次数
x = torch.tensor([1, 1, 2, 3, 1, 2])
counts = x.bincount()
print(counts) # tensor([0, 3, 2, 1]) # 0出现0次,1出现3次,2出现2次,3出现1次
tensor([0, 3, 2, 1])
# 在指定位置累加值
src = torch.tensor([[1, 2], [3, 4]], dtype=torch.float) # 指定数据类型为 float
index = torch.tensor([[0, 1], [0, 1]])
out = torch.zeros(2, 2, dtype=torch.float) # 确保与 src 的数据类型相同
out.scatter_add_(0, index, src)
print(out)
tensor([[4., 0.],
[0., 6.]])
# 返回排序后的索引
x = torch.tensor([3, 1, 4, 1, 5])
indices = x.argsort()
print(indices) # tensor([1, 3, 0, 2, 4]) # 最小值在位置1和3,然后是0,2,4
tensor([1, 3, 0, 2, 4])