机器学习课程学习周报九
文章目录
- 机器学习课程学习周报九
- 摘要
- Abstract
- 一、机器学习部分
- 1.1 Word Embedding
- 1.1.1 词嵌入的基本概念
- 1.1.2 word2vec连续词袋模型CBOW
- 1.1.3 word2vec跳字模型Skip-gram
- 1.2 Transformer代码实践
- Dataset
- Dataloader
- Model
- Learning rate schedule
- Model Function
- Validate
- Main Function
- Dataset of inference
- Main function of inference
- 总结
摘要
本周的学习重点是词嵌入技术和Transformer模型的应用。在词嵌入部分,我探讨了词嵌入的基本概念以及word2vec的两种模型:CBOW和Skip-gram。在代码实践中,我学习了如何使用Transformer模型对音频数据进行说话者识别,包括数据集的加载、模型的构建和训练过程。
Abstract
This week’s focus was on word embedding techniques and the application of the Transformer model. In the word embedding section, I explored the basic concepts and two models of word2vec: CBOW and Skip-gram. In the code practice, I learned how to use the Transformer model for speaker identification in audio data, covering dataset loading, model construction, and the training process.
一、机器学习部分
1.1 Word Embedding
1.1.1 词嵌入的基本概念
词嵌入(Word Embedding),是种将词汇表中的词映射为固定长度向量的技术。通过词嵌入,可以将One-Hot编码表示的高维稀疏向量转为低维连续的向量。
为了进一步说明词与词之间的关系,使用降维算法可以将词嵌入向量降维至2维,从而在平面中绘制。其中,语义相近的词语,对应的向量位置也更相近。例如,cat与kitten(小猫)的含义相近,它们的距离就相近,cat与其他词的语义差距大,其之间的距离就相对较远。
词嵌入向量不仅可以表达语义的相似性,还可以通过向量的数学关系,描述词语之间的语义关联。一般通过特定的词嵌入算法,如word2vec、fasttext、Glove等训练一个通用的嵌入矩阵。这个矩阵的每一行都代表了一个词向量,这些词向量的表达方式一旦训练完成,就可以应用在不同的NLP任务中。
具体来说,嵌入矩阵的行数表示词汇表中词语的个数,而嵌入矩阵的列数表示词向量的维度,嵌入矩阵记作 E E E。
以句子“我喜欢学习数学”为例,将句中的词转换为词嵌入向量:首先进行切词,得到“我”、“喜欢”、“学习”、“数学”,将词语进行One-Hot编码,并将这些词语的One-Hot向量组成为句子矩阵,记作 V V V。
将大小为 4 × 5000 4 \times 5000 4×5000的One-Hot句子矩阵 V V V乘上大小为 5000 × 128 5000 \times 128 5000×128嵌入矩阵 E E E,可以得到 4 × 128 4 \times 128 4×128的矩阵,这个矩阵的每一行都代表一个词的嵌入向量。实际上,将矩阵 V V V和矩阵 E E E相乘,即是将词对应的嵌入向量从嵌入矩阵中取出。
词嵌入算法与One-Hot编码相比,优势在于:
- 词嵌入将文本中的词,通过一个低维向量来表达,相比于万维的One-Hot编码方式,效率上有了质的提升。
- 通过词嵌入表示的词语,可以理解词语的语义,并进行词语推理。语义相似的词在向量空间上也会更相近。
- One-Hot编码不具有通用性,不同语料得到的One-Hot表示一般不同。而嵌入矩阵是通用的,同一份词向量,可以用在不同的NLP任务中。
1.1.2 word2vec连续词袋模型CBOW
CBOW(Continuous Bag of Words)连续词袋模型,根据上下文词汇预测目标词,这个模型的目标在于迭代出词嵌入矩阵 E E E(Embeddings)。以“We are about to study the idea of deep learning.”为例,使用词语study的上下文预测study这个词语,以介绍CBOW模型。
首先,对于某个词的上下文,需要提前设置一个上下文窗口长度:
设置好窗口的长度后,需要通过窗口内的词语,预测目标词。
CBOW模型接收上下文词语,将上下文词语转换为最有可能的目标词。
CBOW模型最前端是embeddings层,用以接收One-Hot形式的词语输入,embeddings层是一个 N × V N \times V N×V的矩阵, N N N是词表中的词语个数, V V V是词向量的维度,这就是我们希望训练后希望得到的嵌入矩阵 E E E。上图中输入词为“We”,转换为对应的One-Hot向量,然后乘上嵌入矩阵 E E E,得到词的嵌入向量,这一步是从矩阵中选择一个特定的行,从embeddings中查找“We”的词嵌入向量。
由于某个词的上下文中,包含了多个词语,这些词语会被同时输入至embeddings层,每个词语都会被转换为一个词向量。并将多个上下文的词向量直接相加,然后取平均,embeddings层输出语义信息平均的向量
v
v
v。
在embeddings层后,会连接一个线性层,上图中用红色区域表示。一般这个线性层不设置激活函数,这个线性层的权重矩阵的维度是 V × N V \times N V×N, V V V是词嵌入向量的维度, N N N是词表中词语的个数。具体来说,将所有上下文词向量的平均值 v v v,这个向量的大小是 1 × V 1 \times V 1×V,输入至该线性层,输出大小为 1 × N 1 \times N 1×N的向量,然后经过softmax函数就算出一个最有可能的输出词,softmax函数的输出是概率分布,图上直接挑选概率最大的位置赋为1,表示预测目标词的One-Hot编码,这只是一种简化。
1.1.3 word2vec跳字模型Skip-gram
Skip-gram会根据目标词预测上下文词,而CBOW是根据上下文词预测目标词,这两种方法的最终目标都是迭代出词向量字典embeddings。Skip-gram在迭代时,调整词向量:使目标词的词向量与其上下文的词向量尽可能的接近,使目标词的词向量与非上下文词的词向量尽可能的远。判断两个词向量是否相似,使用向量的点积:
A = ( a 1 , a 2 , … , a n ) B = ( b 1 , b 2 , … , b n ) A ⋅ B = a 1 b 1 + a 2 b 2 + … + a n b n \begin{array}{l}A = \left( {{a_1},{a_2}, \ldots ,{a_n}} \right)\\B = \left( {{b_1},{b_2}, \ldots ,{b_n}} \right)\\A \cdot B = {a_1}{b_1} + {a_2}{b_2} + \ldots + {a_n}{b_n}\end{array} A=(a1,a2,…,an)B=(b1,b2,…,bn)A⋅B=a1b1+a2b2+…+anbn
向量的点积,衡量了两个向量在同一方向上的强度。点积越大,两个向量越相似,它们对应的词语的语义就越接近。
同样地,以“We are about to study the idea of deep learning.”为例,并先设定一个上下文窗口长度。
Skip-gram模型是一个神经网络,其中包含了两个嵌入层in_embedding和out_embedding。模型接受One-Hot编码的向量输入,in_embeddings将One-Hot转换为词嵌入向量,out_embedding将输入的目标词的嵌入向量与词表中全部词语的嵌入向量做点积,计算语义相似度,最后输出一个词汇表大小的概率分布,它表示了词汇表中的每个词是目标词的上下文的可能性。
1.2 Transformer代码实践
本次代码作业旨在学习使用Transformer模型,给定一段音频序列,预测音频的speaker的id。
Dataset
数据集使用VoxCeleb2,训练数据包含56666个预处理过的音频特征文件,并包含相应的标签;测试集包含4000个预处理过的音频特征文件,不包含对应标签;标签总共有600种类别,每一种类别代表一个speaker。
音频数据的预处理过程如上所示
数据集的文件格式如上所示
import os
import json
import torch
import random
from pathlib import Path
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence
class myDataset(Dataset):
def __init__(self, data_dir, segment_len=128):
self.data_dir = data_dir
self.segment_len = segment_len
# 加载从speaker的名称到其对应id的映射。
mapping_path = Path(data_dir) / "mapping.json"
mapping = json.load(mapping_path.open())
self.speaker2id = mapping["speaker2id"]
# 加载训练数据的元数据。
metadata_path = Path(data_dir) / "metadata.json"
metadata = json.load(open(metadata_path))["speakers"]
# 获取speaker的总数。
self.speaker_num = len(metadata.keys())
self.data = []
for speaker in metadata.keys():
for utterances in metadata[speaker]:
self.data.append([utterances["feature_path"], self.speaker2id[speaker]])
def __len__(self):
return len(self.data)
def __getitem__(self, index):
feat_path, speaker = self.data[index]
# 加载处理为mel-spectrogram形式的音频数据。
mel = torch.load(os.path.join(self.data_dir, feat_path))
# 分割mel-spectrogram为128帧。
if len(mel) > self.segment_len:
start = random.randint(0, len(mel) - self.segment_len)
mel = torch.FloatTensor(mel[start:start+self.segment_len])
else:
mel = torch.FloatTensor(mel)
# 将speaker的id的数据类型改为long,以便之后计算损失。
speaker = torch.FloatTensor([speaker]).long()
return mel, speaker
def get_speaker_number(self):
return self.speaker_num
Dataloader
import torch
from torch.utils.data import DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence
def collate_batch(batch):
# 处理一个批次的音频特征
mel, speaker = zip(*batch)
# 因为我们是分批次训练模型的,所以我们需要在同一批次中填充特征,使它们的长度相同。
mel = pad_sequence(mel, batch_first=True, padding_value=-20) # 填充大小为log 10^(-20) ,这是个非常小的数值.
# mel: (batch size, length, 40)
return mel, torch.FloatTensor(speaker).long()
def get_dataloader(data_dir, batch_size, n_workers):
dataset = myDataset(data_dir)
speaker_num = dataset.get_speaker_number()
# 按照9:1的比例划分数据集为训练集和验证集
trainlen = int(0.9 * len(dataset))
lengths = [trainlen, len(dataset) - trainlen]
trainset, validset = random_split(dataset, lengths)
train_loader = DataLoader(
trainset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
num_workers=n_workers,
pin_memory=True,
collate_fn=collate_batch,
)
valid_loader = DataLoader(
validset,
batch_size=batch_size,
num_workers=n_workers,
drop_last=True,
pin_memory=True,
collate_fn=collate_batch,
)
return train_loader, valid_loader, speaker_num
Model
import torch
import torch.nn as nn
import torch.nn.functional as F
class Classifier(nn.Module):
def __init__(self, d_model=80, n_spks=600, dropout=0.1):
super().__init__()
# 神经网络的预处理层,将输入特征维度从40维映射到d_model维度
self.prenet = nn.Linear(40, d_model)
# 这里改进可以将Transformer模型变为Conformer模型
# Conformer,https://arxiv.org/abs/2005.08100
self.encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, dim_feedforward=256, nhead=2
)# Transformer编码器层,用于对特征进行编码
self.pred_layer = nn.Sequential(
nn.Linear(d_model, d_model),
nn.ReLU(),
nn.Linear(d_model, n_spks),# 输出对600个speaker的预测结果
)
def forward(self, mels):
"""
args:
mels: (batch size, length, 40)
return:
out: (batch size, n_spks)
"""
# out: (batch size, length, d_model)
out = self.prenet(mels)# 经过预处理层处理输入特征
# out: (length, batch size, d_model)
out = out.permute(1, 0, 2)# 改变特征维度的顺序以符合Transformer的输入要求
# The encoder layer expect features in the shape of (length, batch size, d_model).
out = self.encoder_layer(out)# 使用Transformer编码器层对特征进行编码
# out: (batch size, length, d_model)
out = out.transpose(0, 1)# 调整特征的维度顺序
# mean pooling
stats = out.mean(dim=1)# mean pooling,对特征进行平均池化
# out: (batch, n_spks)
out = self.pred_layer(stats)# 使用全连接层进行最终的预测输出
return out
Learning rate schedule
import math
import torch
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR
def get_cosine_schedule_with_warmup(
optimizer: Optimizer,
num_warmup_steps: int,
num_training_steps: int,
num_cycles: float = 0.5,
last_epoch: int = -1,
):
"""
创建一个学习率调度器,学习率按余弦函数的值逐渐减小,同时在初始阶段有一个线性增加学习率的预热阶段。
Args:
optimizer (:class:`~torch.optim.Optimizer`):
需要调度学习率的优化器。
num_warmup_steps (:obj:`int`):
预热阶段的步数。
num_training_steps (:obj:`int`):
总的训练步数。
num_cycles (:obj:`float`, `optional`, defaults to 0.5):
余弦调度中的波数(默认值是从最大值降到0按照半余弦波进行)。
last_epoch (:obj:`int`, `optional`, defaults to -1):
恢复训练时的最后一个周期索引。
Return:
:obj:`torch.optim.lr_scheduler.LambdaLR`,带有适当调度的学习率。
"""
def lr_lambda(current_step):
# 预热阶段
if current_step < num_warmup_steps:
return float(current_step) / float(max(1, num_warmup_steps))
# 余弦退火
progress = float(current_step - num_warmup_steps) / float(
max(1, num_training_steps - num_warmup_steps)
)
return max(
0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))
)
return LambdaLR(optimizer, lr_lambda, last_epoch)
Model Function
import torch
def model_fn(batch, model, criterion, device):
"""通过模型前向传播一个批次的数据。"""
mels, labels = batch
mels = mels.to(device)# 将输入特征移动到指定设备上
labels = labels.to(device)# 将标签移动到指定设备上
outs = model(mels)# 通过模型进行前向传播得到输出
loss = criterion(outs, labels)# 使用给定的损失函数计算损失值
# 获取具有最高概率的说话人id。
preds = outs.argmax(1)
# 计算预测准确率。
accuracy = torch.mean((preds == labels).float())
return loss, accuracy
Validate
from tqdm import tqdm
import torch
def valid(dataloader, model, criterion, device):
"""在验证集上进行验证。"""
model.eval() # 将模型设置为评估模式,不进行梯度计算
running_loss = 0.0 # 累积损失
running_accuracy = 0.0 # 累积准确率
pbar = tqdm(total=len(dataloader.dataset), ncols=0, desc="Valid", unit=" uttr") # 进度条
for i, batch in enumerate(dataloader):
with torch.no_grad(): # 不需要计算梯度
loss, accuracy = model_fn(batch, model, criterion, device)
running_loss += loss.item() # 累积损失值
running_accuracy += accuracy.item() # 累积准确率
pbar.update(dataloader.batch_size) # 更新进度条
pbar.set_postfix(
loss=f"{running_loss / (i+1):.2f}",
accuracy=f"{running_accuracy / (i+1):.2f}",
)
pbar.close() # 关闭进度条
model.train() # 将模型设置回训练模式
return running_accuracy / len(dataloader) # 返回平均准确率
Main Function
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader, random_split
def parse_args():
"""arguments"""
config = {
"data_dir": "./Dataset",
"save_path": "model.ckpt",
"batch_size": 32,
"n_workers": 8,
"valid_steps": 2000,
"warmup_steps": 1000,
"save_steps": 10000,
"total_steps": 70000,
}
return config
def main(
data_dir,
save_path,
batch_size,
n_workers,
valid_steps,
warmup_steps,
total_steps,
save_steps,
):
"""Main function."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[Info]: Use {device} now!")
train_loader, valid_loader, speaker_num = get_dataloader(data_dir, batch_size, n_workers)
train_iterator = iter(train_loader)
print(f"[Info]: Finish loading data!",flush = True)
model = Classifier(n_spks=speaker_num).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = AdamW(model.parameters(), lr=1e-3)
scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps)
print(f"[Info]: Finish creating model!",flush = True)
best_accuracy = -1.0
best_state_dict = None
pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")
for step in range(total_steps):
# 获取数据
try:
batch = next(train_iterator)
except StopIteration:
train_iterator = iter(train_loader)
batch = next(train_iterator)
loss, accuracy = model_fn(batch, model, criterion, device)
batch_loss = loss.item()
batch_accuracy = accuracy.item()
# 更新模型
loss.backward()
optimizer.step()
scheduler.step()
optimizer.zero_grad()
# 记录日志
pbar.update()
pbar.set_postfix(
loss=f"{batch_loss:.2f}",
accuracy=f"{batch_accuracy:.2f}",
step=step + 1,
)
# 进行验证
if (step + 1) % valid_steps == 0:
pbar.close()
valid_accuracy = valid(valid_loader, model, criterion, device)
# 保存最佳模型
if valid_accuracy > best_accuracy:
best_accuracy = valid_accuracy
best_state_dict = model.state_dict()
pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")
# 保存到目前为止最佳的模型
if (step + 1) % save_steps == 0 and best_state_dict is not None:
torch.save(best_state_dict, save_path)
pbar.write(f"Step {step + 1}, best model saved. (accuracy={best_accuracy:.4f})")
pbar.close()
if __name__ == "__main__":
main(**parse_args())
Dataset of inference
import os
import json
import torch
from pathlib import Path
from torch.utils.data import Dataset
class InferenceDataset(Dataset):
def __init__(self, data_dir):
# 加载测试数据的元数据
testdata_path = Path(data_dir) / "testdata.json"
metadata = json.load(testdata_path.open())
self.data_dir = data_dir
self.data = metadata["utterances"]
def __len__(self):
return len(self.data)
def __getitem__(self, index):
# 获取单个样本
utterance = self.data[index]
feat_path = utterance["feature_path"]
mel = torch.load(os.path.join(self.data_dir, feat_path))
return feat_path, mel
def inference_collate_batch(batch):
"""对一个数据批次进行整理。"""
feat_paths, mels = zip(*batch)
return feat_paths, torch.stack(mels)
Main function of inference
import json
import csv
from pathlib import Path
from tqdm.notebook import tqdm
import torch
from torch.utils.data import DataLoader
def parse_args():
"""arguments"""
config = {
"data_dir": "./Dataset",
"model_path": "./model.ckpt",
"output_path": "./output.csv",
}
return config
def main(
data_dir,
model_path,
output_path,
):
"""Main function."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[Info]: Use {device} now!")
mapping_path = Path(data_dir) / "mapping.json"
mapping = json.load(mapping_path.open())
dataset = InferenceDataset(data_dir)
dataloader = DataLoader(
dataset,
batch_size=1,
shuffle=False,
drop_last=False,
num_workers=8,
collate_fn=inference_collate_batch,
)
print(f"[Info]: Finish loading data!",flush = True)
speaker_num = len(mapping["id2speaker"])
model = Classifier(n_spks=speaker_num).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()
print(f"[Info]: Finish creating model!",flush = True)
results = [["Id", "Category"]]
for feat_paths, mels in tqdm(dataloader):
with torch.no_grad():
mels = mels.to(device)
outs = model(mels)
preds = outs.argmax(1).cpu().numpy()
for feat_path, pred in zip(feat_paths, preds):
results.append([feat_path, mapping["id2speaker"][str(pred)]])
with open(output_path, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerows(results)
if __name__ == "__main__":
main(**parse_args())
总结
本周通过对词嵌入和Transformer模型的深入学习,我掌握了如何有效地将文本和音频数据转换为可用于机器学习模型的特征表示。词嵌入技术为NLP任务提供了更好的语义理解能力,而Transformer模型则在处理序列数据上表现十分出色。下周计划进入生成对抗网络(GAN)的学习。