如果结局早已注定,那么过程就将大于结局
—— 25.3.18
自回归语言模型:由前文预测后文的语言模型
特点:单向
训练方式:利用前n个字预测第n+1个字,实现一个mask矩阵,送入Bert模型,让其前文看不到后文,作一个生成式训练
数据文件
通过网盘分享的文件:文本生成
链接: https://pan.baidu.com/s/1Az9WLH1LfEyk_5ih8db7jw?pwd=6uv6 提取码: 6uv6
--来自百度网盘超级会员v3的分享
一、模型定义
1.模型初始化
代码运行流程
# LanguageModel初始化流程树状图
├── 1. 父类初始化
│ └── `super().__init__()` → 继承父类(如`nn.Module`)的属性和方法
├── 2. 加载预训练BERT模型
│ ├── `self.bert = BertModel.from_pretrained(...)`
│ │ ├── `pretrain_model_path`: 预训练模型路径(如`bert-base-uncased`)
│ │ ├── `return_dict=False`: 强制返回元组而非字典(兼容旧版代码)
│ │ └── `attn_implementation='eager'`: 使用标准注意力实现(非Flash Attention等优化)
│ └── 输出特征维度:`hidden_size`(与BERT模型配置一致)
├── 3. 定义分类层
│ └── `self.classify = nn.Linear(hidden_size, vocab_size)`
│ ├── `hidden_size`: BERT输出的隐藏层维度(如768)
│ └── `vocab_size`: 目标词汇表大小(如30522)
└── 4. 定义损失函数
└── `self.loss = nn.functional.cross_entropy`
└── 计算预测logits与真实标签的交叉熵损失
hidden_size:与BERT模型的隐藏层维度一致(例如BERT-base为768),用于连接分类层输入维度。
vocab_size:输出层词汇表大小,通常与预训练模型的词汇表匹配
pretrain_model_path: 预训练模型的本地路径或HuggingFace模型标识符
self.bert:加载预训练的BERT模型,用于文本编码
self.classify:将BERT输出的隐藏状态映射到词汇表空间,生成预测logits
self.loss:计算预测logits与真实标签的交叉熵损失
BertModel.from_pretrained():从预训练模型(如BERT、RoBERTa等)加载模型权重和配置,支持自定义加载路径或HuggingFace模型标识符
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
pretrained_model_name_or_path | str | 是 | 无 | 预训练模型名称(如bert-base-uncased )或本地路径 |
config | PretrainedConfig | 否 | None | 自定义模型配置(覆盖默认配置) |
cache_dir | str | 否 | None | 模型缓存目录(避免重复下载) |
force_download | bool | 否 | False | 强制重新下载模型文件 |
resume_download | bool | 否 | False | 断点续传下载 |
output_loading_info | bool | 否 | False | 是否返回加载过程的详细信息 |
attn_implementation | str | 否 | "eager" | 注意力实现方式(如"flash_attention_2" 加速) |
nn.Linear():定义全连接层,对输入数据执行线性变换(y = xA^T + b
),适用于特征映射和分类层
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
in_features | int | 是 | 无 | 输入特征维度(如BERT隐藏层维度768) |
out_features | int | 是 | 无 | 输出特征维度(如词汇表大小30522) |
bias | bool | 否 | True | 是否启用偏置项(b ) |
nn.functional.cross_entropy:计算交叉熵损失,结合log_softmax
和nll_loss
,适用于分类任务(如情感分析、文本分类)
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
input | Tensor | 是 | 无 | 未归一化的预测值(形状[batch, num_classes] ) |
target | Tensor | 是 | 无 | 真实标签(类别索引或概率分布) |
weight | Tensor | 否 | None | 类别权重(平衡样本不均衡问题) |
ignore_index | int | 否 | -100 | 忽略指定索引的标签计算 |
reduction | str | 否 | "mean" | 损失聚合方式("none" 、"mean" 、"sum" ) |
label_smoothing | float | 否 | 0.0 | 标签平滑系数(缓解过拟合) |
def __init__(self, hidden_size, vocab_size, pretrain_model_path):
super(LanguageModel, self).__init__()
# self.embedding = nn.Embedding(len(vocab), input_dim)
# self.layer = nn.LSTM(input_dim, input_dim, num_layers=1, batch_first=True)
self.bert = BertModel.from_pretrained(pretrain_model_path, return_dict=False, attn_implementation='eager')
self.classify = nn.Linear(hidden_size, vocab_size)
self.loss = nn.functional.cross_entropy
2.前向传播,计算损失 ⭐
代码运行流程
# forward方法流程
├── **输入判断**
│ └── 根据`y`是否存在选择训练或推理模式
│ ├── **训练模式** (`y is not None`)
│ │ ├── 1. 构建注意力掩码
│ │ │ └── `mask = torch.tril(torch.ones((x.shape[0], x.shape[1], x.shape[1])))` [1](@ref)
│ │ │ └── 生成下三角矩阵(允许当前token关注过去token,禁止关注未来token)
│ │ ├── 2. BERT编码
│ │ │ └── `x, _ = self.bert(x, attention_mask=mask)`
│ │ │ ├── 输入`x`: 输入序列的token IDs [1](@ref)
│ │ │ └── `attention_mask`: 限制自注意力范围(防止模型“偷看”未来信息) [1](@ref)
│ │ ├── 3. 分类层预测
│ │ │ └── `y_pred = self.classify(x)`
│ │ │ └── 将BERT输出映射到词汇表空间(`vocab_size`维度)[4,6](@ref)
│ │ └── 4. 计算损失
│ │ └── `return self.loss(y_pred.view(-1, ...), y.view(-1))`
│ │ └── 交叉熵损失计算(展平维度适配`[batch*seq_len, vocab_size]`)[7,9](@ref)
│ └── **推理模式** (`y is None`)
│ ├── 1. BERT编码(无掩码)
│ │ └── `x, _ = self.bert(x)`
│ │ └── 全注意力模式(允许所有token互相关注)
│ ├── 2. 分类层预测
│ │ └── `y_pred = self.classify(x)`
│ └── 3. 返回概率分布
│ └── `return torch.softmax(y_pred, dim=-1)` [8](@ref)
│ └── 输出每个token的类别概率(维度:`[batch_size, seq_len, vocab_size]`)
x:输入序列的token ID张量
y:真实标签张量(训练模式时存在)
y_pred:分类层输出的预测logits
mask:生成下三角注意力掩码矩阵
self.loss:计算预测与标签的交叉熵损失
torch.trill():生成一个下三角矩阵,主对角线以下的元素保留,其余置零。常用于因果掩码(如自注意力机制中防止模型“偷看”未来信息)
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
input | Tensor | 是 | 无 | 输入张量 |
diagonal | int | 否 | 0 | 对角线偏移量: - 0 :主对角线- >0 :主对角线上方第k条对角线- <0 :主对角线下方第k条对角线 |
torch.ones():生成指定形状的全1张量,常用于初始化或占位符
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
size | int或tuple | 是 | 无 | 输出张量的形状,如(2,3) |
dtype | torch.dtype | 否 | None | 数据类型(如torch.float32 ) |
device | torch.device | 否 | CPU | 设备(如"cuda:0" ) |
requires_grad | bool | 否 | False | 是否需计算梯度 |
.shape():返回张量的维度信息(类型为torch.Size
),非函数而是属性
torch.cuda.is_available():检测当前系统是否支持CUDA(即是否有可用GPU),返回布尔值
cuda():将张量或模型从CPU移动到GPU,加速计算
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
device | int或torch.device | 否 | 当前默认GPU | 目标GPU设备索引(如0 或"cuda:0" ) |
.view():改变张量的形状(类似reshape
),但需张量内存连续
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
*shape | int或-1 | 是 | 无 | 目标形状,-1 表示自动计算维度大小 |
torch.softmax():对张量在指定维度上计算Softmax,输出概率分布(总和为1)
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
input | Tensor | 是 | 无 | 输入张量 |
dim | int | 是 | 无 | 计算Softmax的维度(如dim=1 表示按行计算) |
# 当输入真实标签,返回loss值;无真实标签,返回预测值
def forward(self, x, y=None):
if y is not None:
# 训练时,构建一个下三角的mask矩阵,让上下文之间没有交互
mask = torch.tril(torch.ones((x.shape[0], x.shape[1], x.shape[1])))
if torch.cuda.is_available():
mask = mask.cuda()
x, _ = self.bert(x, attention_mask=mask)
y_pred = self.classify(x) # output shape:(batch_size, vocab_size)
return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1))
else:
# 预测时,可以不使用mask
x, _ = self.bert(x)
y_pred = self.classify(x) # output shape:(batch_size, vocab_size)
return torch.softmax(y_pred, dim=-1)
二、加载语料
代码运行流程
load_corpus(path)
├── 1. 初始化corpus字符串
│ └── corpus = ""
├── 2. 打开文件
│ ├── with open(path, encoding="gbk") as f:
│ │ └── 以"gbk"编码只读模式打开文件(自动处理文件关闭)
│ └── 异常处理:若文件不存在或编码错误,抛出IOError/UnicodeDecodeError(隐式)
├── 3. 逐行读取文件内容
│ └── for line in f:
│ ├── line.strip(): 移除行首尾的空白字符(包括换行符"\n")
│ └── corpus += line.strip(): 拼接处理后的行到corpus字符串
└── 4. 返回结果
└── return corpus → 合并后的完整文本字符串
path:需要加载的文本文件的路径(如"./data.txt"
),支持绝对或相对路径。
corpus:初始为空字符串,用于存储逐行读取并处理后的完整文本。
f:文件对象,通过open()
打开的文件句柄,支持迭代逐行读取。
line:每次循环读取的一行原始内容(包含换行符)
open():打开文件并返回文件对象,用于文件的读写操作。支持文本或二进制模式,是Python中文件I/O的核心函数
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
**name ** | str | 是 | 无 | 文件路径(如"data.txt" ),支持绝对或相对路径 2 。 |
**mode ** | str | 否 | 'r' | 文件打开模式: - 'r' :只读- 'w' :写入(覆盖)- 'a' :追加写入- 'b' :二进制模式(如'rb' )- '+' :读写模式(如'r+' )。 |
**buffering ** | int | 否 | -1 | 缓冲策略: - 0 :无缓冲(仅限二进制模式)- 1 :行缓冲(文本模式)- >1 :指定缓冲区大小(字节) |
strip():移除字符串开头和结尾的指定字符序列,默认删除空白符(如空格、换行符\n
、制表符\t
等)
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
**chars ** | str | 否 | None | 指定要删除的字符集合: - 若提供,则删除开头和结尾中所有属于 chars 的字符,直到遇到不在其中的字符为止;- 若未提供,默认删除空白符。 |
# 加载语料
def load_corpus(path):
corpus = ""
with open(path, encoding="gbk") as f:
for line in f:
corpus += line.strip()
return corpus
三、 随机生成样本
代码运行流程
build_sample(tokenizer, window_size, corpus)
├── 1. 随机生成起始位置
│ └── start = random.randint(0, len(corpus)-1-window_size)
│ ├── 生成范围:0 ≤ start ≤ len(corpus)-window_size-1
│ └── 确保窗口末尾不越界
├── 2. 截取输入窗口与目标序列
│ ├── end = start + window_size → 窗口结束位置
│ ├── window = corpus[start:end] → 输入文本(前n字)
│ └── target = corpus[start+1:end+1] → 目标文本(后n字,输入右移1位)
├── 3. 文本编码为模型输入
│ ├── x = tokenizer.encode(window, ...)
│ │ ├── add_special_tokens=False → 不添加特殊标记(如[CLS]/[SEP])
│ │ ├── padding='max_length' → 填充至固定长度(max_length=10)
│ │ └── truncation=True → 超长时截断
│ └── y = tokenizer.encode(target, ...) → 同x逻辑
└── 4. 返回样本
└── return x, y → 输入序列x与目标序列y
tokenizer:分词器对象,将文本转换为模型可处理的数字序列
window_size:输入窗口的长度(字符数)
corpis:原始文本语料
start:窗口起始位置
end:窗口结束位置
window:输入文本片段
target:目标文本片段
x:输入序列的编码
y:目标序列的编码
random.randint():生成一个在闭区间 [a, b]
内的随机整数(包含 a
和 b
),常用于模拟离散随机事件(如掷骰子、随机抽样等)
参数 | 类型 | 必选 | 默认值 | 说明 | 示例 |
---|---|---|---|---|---|
**a ** | int | 是 | 无 | 随机数的最小值(包含) | random.randint(1, 10) → 7 |
**b ** | int | 是 | 无 | 随机数的最大值(包含) | random.randint(1, 10) → 3 |
tokenizer.encode():将文本转换为模型可处理的数字序列(Token ID 列表),常用于自然语言处理任务(如BERT、GPT等模型的输入预处理)
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
**text ** | str 或 List[str] | 是 | 无 | 输入文本(支持单句或批量文本) |
**add_special_tokens ** | bool | 否 | True | 是否添加特殊标记(如BERT的[CLS] 和[SEP] ) |
**padding ** | str 或 bool | 否 | False | 填充策略:True /"max_length" 填充至max_length 长度,"longest" 填充到批次最长序列长度 |
**truncation ** | str 或 bool | 否 | False | 截断策略:True /"longest_first" 优先截断较长部分,"only_first" 仅截断第一个句子 |
**max_length ** | int | 否 | 模型默认(如512) | 序列最大长度(超出部分截断或填充) |
**return_tensors ** | str | 否 | None | 返回张量类型:"pt" (PyTorch)或"tf" (TensorFlow) |
# 随机生成一个样本
# 从文本中截取随机窗口,前n个字作为输入,最后一个字作为输出
def build_sample(tokenizer, window_size, corpus):
start = random.randint(0, len(corpus) - 1 - window_size)
end = start + window_size
window = corpus[start:end]
target = corpus[start + 1:end + 1] # 输入输出错开一位
x = tokenizer.encode(window, add_special_tokens=False, padding='max_length', truncation=True,
max_length=10) # 将字转换成序号
y = tokenizer.encode(target, add_special_tokens=False, padding='max_length', truncation=True, max_length=10)
return x, y
四、建立模型
vocab:词汇表的大小或词汇表对象。通常表示模型中词汇的数量,用于定义模型的输入维度。
char_dim:字符嵌入的维度,表示每个字符的向量表示的长度。
pretrain_model_path:预训练模型的路径,用于加载预训练权重或配置。
model:返回一个 LanguageModel
实例,该实例已经初始化并加载了预训练权重
# 建立模型
def build_model(vocab, char_dim, pretrain_model_path):
model = LanguageModel(768, 21128, pretrain_model_path)
return model
五、采样策略选择
代码运行流程
sampling_strategy(prob_distribution)
├── 1. 随机选择采样策略
│ ├── if random.random() > 0.1:
│ │ └── strategy = "greedy" → 90%概率使用贪婪搜索
│ └── else:
│ └── strategy = "sampling" → 10%概率使用随机采样
├── 2. 根据策略选择token
│ ├── if strategy == "greedy":
│ │ └── return int(torch.argmax(prob_distribution)) → 选择概率最大的token
│ └── elif strategy == "sampling":
│ ├── prob_distribution = prob_distribution.cpu().numpy() → 将概率分布转换为NumPy数组
│ └── return np.random.choice(list(range(len(prob_distribution))), p=prob_distribution) → 根据概率分布随机采样
└── 3. 返回选择的token ID
└── return token_id → 返回生成的token ID
prob_distribution:模型输出的概率分布,表示每个token的未归一化概率(logits)
strategy:采样策略
random.random():生成一个 [0.0, 1.0)
之间的随机浮点数,常用于需要随机值的场景
torch.argmax():返回张量中最大值的索引,常用于分类任务中获取最大概率的类别索引
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
input | torch.Tensor | 是 | 无 | 输入张量。 |
dim | int | 否 | None | 指定沿哪个维度计算最大值索引。 |
keepdim | bool | 否 | False | 是否保留原维度。 |
cpu():将张量从 GPU 转移到 CPU,适用于需要在 CPU 上处理数据的场景
numpy():将 PyTorch 张量转换为 NumPy 数组,便于与 NumPy 库进行交互
np.random.choice():从给定数组中随机选择元素,支持指定采样概率和是否允许重复
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
a | int 或 array-like | 是 | 无 | 输入数组或整数。 |
size | int 或 tuple | 否 | None | 输出数组的形状。 |
replace | bool | 否 | True | 是否允许重复采样。 |
p | array-like | 否 | None | 每个元素的采样概率。 |
list():将可迭代对象(如元组、字符串、集合等)转换为列表,便于修改和操作
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
iterable | 可迭代对象 | 否 | 无 | 将可迭代对象转换为列表。 |
def sampling_strategy(prob_distribution):
if random.random() > 0.1:
strategy = "greedy"
else:
strategy = "sampling"
if strategy == "greedy":
return int(torch.argmax(prob_distribution))
elif strategy == "sampling":
prob_distribution = prob_distribution.cpu().numpy()
return np.random.choice(list(range(len(prob_distribution))), p=prob_distribution)
六、模型效果测试
代码运行流程
generate_sentence(openings, model, tokenizer, window_size)
├── 1. 设置模型为评估模式
│ └── model.eval() → 禁用dropout和batchnorm等训练专用行为
├── 2. 禁用梯度计算
│ └── with torch.no_grad() → 减少内存占用,加速推理
├── 3. 初始化生成字符
│ └── pred_char = "" → 用于存储当前生成的字符
├── 4. 文本生成循环
│ ├── 终止条件:
│ │ ├── pred_char == "\n" → 生成换行符
│ │ └── len(openings) > 30 → 生成文本超过30字
│ ├── 更新生成文本
│ │ └── openings += pred_char → 将新生成的字符追加到文本中
│ ├── 编码输入文本
│ │ ├── x = tokenizer.encode(openings, add_special_tokens=False) → 将文本转换为数字序列
│ │ └── x = torch.LongTensor([x]) → 转换为PyTorch张量
│ ├── 设备转移
│ │ └── if torch.cuda.is_available(): x = x.cuda() → 将输入数据移至GPU(若可用)
│ ├── 模型推理
│ │ └── y = model(x)[0][-1] → 获取模型输出的最后一个token的logits
│ ├── 采样策略
│ │ └── index = sampling_strategy(y) → 根据logits选择下一个token(如贪婪搜索或随机采样)
│ └── 解码生成字符
│ └── pred_char = ''.join(tokenizer.decode(index)) → 将token ID转换为字符
└── 5. 返回生成文本
└── return openings → 返回生成的完整文本
openings:生成文本的起始片段(如"今天天气")
model:用于生成文本的预训练或微调模型
tokenizer:分词器对象,将文本转换为模型输入的数字序列,并将模型输出解码为文本
window_size:输入窗口大小,限制模型输入的长度
pred_char:当前生成的字符
x:编码后的输入文本(数字序列)
y:模型输出,模型预测的logits(未归一化的概率分布)
index:通过采样策略从logits中选择的token ID
eval():在测试或验证时使用,确保模型行为一致,不会随机丢弃神经元或更新 BatchNorm
的统计量
torch.no_grad():在推理阶段使用,避免不必要的梯度计算和存储
tokenizer.encode():将文本转换为模型输入的数字序列
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
text | str 或 List[str] | 是 | 无 | 输入文本,支持单句或批量文本。 |
add_special_tokens | bool | 否 | True | 是否添加特殊标记(如 [CLS] 和 [SEP] )。 |
max_length | int | 否 | 模型默认(如 512) | 最大序列长度,超出部分截断或填充。 |
truncation | str 或 bool | 否 | False | 截断策略(如 "longest_first" )。 |
return_tensors | str | 否 | None | 返回张量类型(如 "pt" 或 "tf" )。 |
torch.LongTensor():创建 64 位整型张量,常用于索引或标签数据
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
data | list 或 numpy.ndarray | 是 | 无 | 输入数据,转换为 64 位整型张量。 |
torch.cuda.is_available():返回布尔值,指示是否可以使用 GPU
cuda():将张量或模型从CPU移动到GPU,加速计算
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
device | int或torch.device | 否 | 当前默认GPU | 目标GPU设备索引(如0 或"cuda:0" ) |
join():将可迭代对象中的元素连接为字符串,使用指定分隔符
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
iterable | 可迭代对象 | 是 | 无 | 包含要连接的字符串或字符。 |
sep | str | 否 | "" | 分隔符,默认为空字符串。 |
tokenizer.decode():将 token ID 序列解码为文本
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
token_ids | List[int] 或 torch.Tensor | 是 | 无 | 要解码的 token ID 序列。 |
skip_special_tokens | bool | 否 | False | 是否跳过特殊标记(如 [CLS] 和 [SEP] )。 |
clean_up_tokenization_spaces | bool | 否 | True | 是否清理多余的空格。 |
# 文本生成测试代码
def generate_sentence(openings, model, tokenizer, window_size):
# reverse_vocab = dict((y, x) for x, y in vocab.items())
model.eval()
with torch.no_grad():
pred_char = ""
# 生成了换行符,或生成文本超过30字则终止迭代
while pred_char != "\n" and len(openings) <= 30:
openings += pred_char
x = tokenizer.encode(openings, add_special_tokens=False)
x = torch.LongTensor([x])
if torch.cuda.is_available():
x = x.cuda()
y = model(x)[0][-1]
index = sampling_strategy(y)
pred_char = ''.join(tokenizer.decode(index))
return openings
七、模型训练
代码运行流程
train(corpus_path, save_weight=True)
├── 1. 初始化训练参数
│ ├── epoch_num = 20 → 训练轮数
│ ├── batch_size = 128 → 每次训练样本个数
│ ├── train_sample = 10000 → 每轮训练样本总数
│ ├── char_dim = 768 → 每个字的维度
│ ├── window_size = 10 → 样本文本长度
│ ├── vocab_size = 21128 → 字表大小
│ └── learning_rate = 0.001 → 学习率
├── 2. 加载预训练模型和分词器
│ ├── pretrain_model_path = r"F:\人工智能NLP\NLP资料\week6 语言模型\bert-base-chinese" → 预训练模型路径
│ └── tokenizer = BertTokenizer.from_pretrained(pretrain_model_path) → 加载分词器
├── 3. 加载语料和构建模型
│ ├── corpus = load_corpus(corpus_path) → 加载语料
│ ├── model = build_model(vocab_size, char_dim, pretrain_model_path) → 构建模型
│ └── if torch.cuda.is_available(): model = model.cuda() → 将模型移至GPU(若可用)
├── 4. 初始化优化器
│ └── optim = torch.optim.Adam(model.parameters(), lr=learning_rate) → 使用Adam优化器
├── 5. 训练循环
│ ├── for epoch in range(epoch_num): → 遍历每轮训练
│ │ ├── model.train() → 设置模型为训练模式
│ │ ├── watch_loss = [] → 初始化损失列表
│ │ ├── for batch in range(int(train_sample / batch_size)): → 遍历每个批次
│ │ │ ├── x, y = build_dataset(batch_size, tokenizer, window_size, corpus) → 构建训练样本
│ │ │ ├── if torch.cuda.is_available(): x, y = x.cuda(), y.cuda() → 将数据移至GPU(若可用)
│ │ │ ├── optim.zero_grad() → 梯度归零
│ │ │ ├── loss = model(x, y) → 计算损失
│ │ │ ├── loss.backward() → 反向传播计算梯度
│ │ │ └── optim.step() → 更新模型参数
│ │ │ └── watch_loss.append(loss.item()) → 记录损失值
│ │ └── print("第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss))) → 打印每轮平均损失
│ │ └── print(generate_sentence("让他在半年之前,就不能做出", model, tokenizer, window_size)) → 生成示例句子
│ │ └── print(generate_sentence("李慕站在山路上,深深的呼吸", model, tokenizer, window_size)) → 生成示例句子
├── 6. 保存模型权重
│ ├── if not save_weight: return → 不保存权重
│ └── else:
│ ├── base_name = os.path.basename(corpus_path).replace("txt", "pth") → 生成模型文件名
│ ├── model_path = os.path.join("model", base_name) → 生成模型保存路径
│ └── torch.save(model.state_dict(), model_path) → 保存模型权重
└── 7. 返回
└── return → 函数结束
corpus_path:语料文件的路径,用于加载训练数据。
save_weight:控制是否保存训练后的模型权重,默认为 True
。
epoch_num:训练轮数
batch_size:每次训练的样本数量
train_sample:每轮训练样本总数
char_dim:每个字符的向量维度
window_size:窗口长度大小
vocab_size:词汇表中字符的数量
learning_rate:优化器的学习率
pretrain_model_path:预训练模型的路径,用于加载分词器和模型权重
tokenizer:将文本转换为模型输入的数字序列
corpus:加载的语料数据,用于训练模型
model:用于训练的语言模型
optim:用于更新模型参数的优化器
watch_loss:记录每轮训练中每个批次的损失值
x,y:输入数据和目标数据,用于训练模型。
base_name:根据语料路径生成的模型文件名。
model_path:模型权重的保存路径。
BertTokenizer.from_pretrained():加载预训练的分词器
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
pretrained_model_name_or_path | str 或 os.PathLike | 是 | 无 | 预训练模型的名称或路径。 |
*init_inputs | 任意 | 否 | 无 | 额外的初始化参数。 |
**kwargs | 任意 | 否 | 无 | 其他可选参数,如 do_lower_case 、cls_token_id 等。 |
torch.cuda.is_available():返回布尔值,指示是否可以使用 GPU。
model.cuda():将模型移至 GPU。
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
device | int 或 str | 否 | "cuda:0" | 指定 GPU 设备,如 "cuda:0" 或 "cuda:1" 。 |
torch.optim.Adam():创建 Adam 优化器
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
params | Iterable | 是 | 无 | 需要优化的参数,通常是 model.parameters() 。 |
lr | float | 否 | 0.001 | 学习率。 |
betas | Tuple[float, float] | 否 | (0.9, 0.999) | Adam 算法中的 beta1 和 beta2 参数。 |
eps | float | 否 | 1e-8 | 数值稳定性参数。 |
weight_decay | float | 否 | 0 | 权重衰减(L2 正则化)。 |
amsgrad | bool | 否 | False | 是否使用 AMSGrad 变体。 |
model.parameters():获取模型的可训练参数
model.train():启用 Dropout
和 BatchNorm
等训练专用行为
optim.zero_grad():将模型参数的梯度归零,避免梯度累积。
loss.backward():反向传播计算梯度
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
retain_graph | bool | 否 | False | 是否保留计算图以供后续反向传播。 |
create_graph | bool | 否 | False | 是否创建计算图以支持高阶导数计算。 |
optim.step():根据梯度更新模型参数
append():将元素添加到列表末尾
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
item | 任意 | 是 | 无 | 要添加到列表末尾的元素。 |
item():返回张量的 Python 标量值
np.mean():计算数组的均值
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
a | array_like | 是 | 无 | 输入数组。 |
axis | int 或 tuple | 否 | None | 沿指定轴计算均值。 |
dtype | dtype | 否 | None | 输出数组的数据类型。 |
out | ndarray | 否 | None | 输出数组。 |
os.path.basename():返回路径中的文件名部分
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
path | str | 是 | 无 | 文件路径。 |
replace():替换字符串中的子字符串
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
old | str | 是 | 无 | 要替换的子字符串。 |
new | str | 是 | 无 | 替换后的新字符串。 |
count | int | 否 | -1 | 替换次数,默认替换所有。 |
os.path.join():将多个路径部分连接成一个完整路径
参数 | 类型 | 必选 | 默认值 | 说明 |
---|---|---|---|---|
*paths | str | 是 | 无 | 要连接的路径部分。 |
state_dict():获取模型的状态字典,包含所有可训练参数。
def train(corpus_path, save_weight=True):
epoch_num = 20 # 训练轮数
batch_size = 128 # 每次训练样本个数
train_sample = 10000 # 每轮训练总共训练的样本总数
char_dim = 768 # 每个字的维度
window_size = 10 # 样本文本长度
vocab_size = 21128 # 字表大小
learning_rate = 0.001 # 学习率
pretrain_model_path = r"F:\人工智能NLP\NLP资料\week6 语言模型\bert-base-chinese"
tokenizer = BertTokenizer.from_pretrained(pretrain_model_path)
corpus = load_corpus(corpus_path) # 加载语料
model = build_model(vocab_size, char_dim, pretrain_model_path) # 建立模型
if torch.cuda.is_available():
model = model.cuda()
optim = torch.optim.Adam(model.parameters(), lr=learning_rate) # 建立优化器
print("文本词表模型加载完毕,开始训练")
for epoch in range(epoch_num):
model.train()
watch_loss = []
for batch in range(int(train_sample / batch_size)):
x, y = build_dataset(batch_size, tokenizer, window_size, corpus) # 构建一组训练样本
if torch.cuda.is_available():
x, y = x.cuda(), y.cuda()
optim.zero_grad() # 梯度归零
loss = model(x, y) # 计算loss
loss.backward() # 计算梯度
optim.step() # 更新权重
watch_loss.append(loss.item())
print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
print(generate_sentence("让他在半年之前,就不能做出", model, tokenizer, window_size))
print(generate_sentence("李慕站在山路上,深深的呼吸", model, tokenizer, window_size))
if not save_weight:
return
else:
base_name = os.path.basename(corpus_path).replace("txt", "pth")
model_path = os.path.join("model", base_name)
torch.save(model.state_dict(), model_path)
return
八、main函数
if __name__ == "__main__":
# build_vocab_from_corpus("corpus/all.txt")
train(r"F:\人工智能NLP\NLP\HomeWork\demo9.1_Bert语言模型生成文本\corpus.txt", False)
九、整体代码
# coding:utf8
import torch
import torch.nn as nn
import numpy as np
import math
import random
import os
import re
from transformers import BertTokenizer, BertModel
"""
基于pytorch的Bert语言模型
"""
class LanguageModel(nn.Module):
def __init__(self, hidden_size, vocab_size, pretrain_model_path):
super(LanguageModel, self).__init__()
# self.embedding = nn.Embedding(len(vocab), input_dim)
# self.layer = nn.LSTM(input_dim, input_dim, num_layers=1, batch_first=True)
self.bert = BertModel.from_pretrained(pretrain_model_path, return_dict=False, attn_implementation='eager')
self.classify = nn.Linear(hidden_size, vocab_size)
self.loss = nn.functional.cross_entropy
# 当输入真实标签,返回loss值;无真实标签,返回预测值
def forward(self, x, y=None):
if y is not None:
# 训练时,构建一个下三角的mask矩阵,让上下文之间没有交互
mask = torch.tril(torch.ones((x.shape[0], x.shape[1], x.shape[1])))
if torch.cuda.is_available():
mask = mask.cuda()
x, _ = self.bert(x, attention_mask=mask)
y_pred = self.classify(x) # output shape:(batch_size, vocab_size)
return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1))
else:
# 预测时,可以不使用mask
x, _ = self.bert(x)
y_pred = self.classify(x) # output shape:(batch_size, vocab_size)
return torch.softmax(y_pred, dim=-1)
# 加载字表
# def build_vocab(vocab_path):
# vocab = {"<pad>":0}
# with open(vocab_path, encoding="utf8") as f:
# for index, line in enumerate(f):
# char = line[:-1] #去掉结尾换行符
# vocab[char] = index + 1 #留出0位给pad token
# return vocab
# 加载语料
def load_corpus(path):
corpus = ""
with open(path, encoding="gbk") as f:
for line in f:
corpus += line.strip()
return corpus
# 随机生成一个样本
# 从文本中截取随机窗口,前n个字作为输入,最后一个字作为输出
def build_sample(tokenizer, window_size, corpus):
start = random.randint(0, len(corpus) - 1 - window_size)
end = start + window_size
window = corpus[start:end]
target = corpus[start + 1:end + 1] # 输入输出错开一位
x = tokenizer.encode(window, add_special_tokens=False, padding='max_length', truncation=True,
max_length=10) # 将字转换成序号
y = tokenizer.encode(target, add_special_tokens=False, padding='max_length', truncation=True, max_length=10)
return x, y
# 建立数据集
# sample_length 输入需要的样本数量。需要多少生成多少
# vocab 词表
# window_size 样本长度
# corpus 语料字符串
def build_dataset(sample_length, tokenizer, window_size, corpus):
dataset_x = []
dataset_y = []
for i in range(sample_length):
x, y = build_sample(tokenizer, window_size, corpus)
dataset_x.append(x)
dataset_y.append(y)
return torch.LongTensor(dataset_x), torch.LongTensor(dataset_y)
# 建立模型
def build_model(vocab, char_dim, pretrain_model_path):
model = LanguageModel(768, 21128, pretrain_model_path)
return model
def sampling_strategy(prob_distribution):
if random.random() > 0.1:
strategy = "greedy"
else:
strategy = "sampling"
if strategy == "greedy":
return int(torch.argmax(prob_distribution))
elif strategy == "sampling":
prob_distribution = prob_distribution.cpu().numpy()
return np.random.choice(list(range(len(prob_distribution))), p=prob_distribution)
# 文本生成测试代码
def generate_sentence(openings, model, tokenizer, window_size):
# reverse_vocab = dict((y, x) for x, y in vocab.items())
model.eval()
with torch.no_grad():
pred_char = ""
# 生成了换行符,或生成文本超过30字则终止迭代
while pred_char != "\n" and len(openings) <= 30:
openings += pred_char
x = tokenizer.encode(openings, add_special_tokens=False)
x = torch.LongTensor([x])
if torch.cuda.is_available():
x = x.cuda()
y = model(x)[0][-1]
index = sampling_strategy(y)
pred_char = ''.join(tokenizer.decode(index))
return openings
def train(corpus_path, save_weight=True):
epoch_num = 20 # 训练轮数
batch_size = 128 # 每次训练样本个数
train_sample = 10000 # 每轮训练总共训练的样本总数
char_dim = 768 # 每个字的维度
window_size = 10 # 样本文本长度
vocab_size = 21128 # 字表大小
learning_rate = 0.001 # 学习率
pretrain_model_path = r"F:\人工智能NLP\NLP资料\week6 语言模型\bert-base-chinese"
tokenizer = BertTokenizer.from_pretrained(pretrain_model_path)
corpus = load_corpus(corpus_path) # 加载语料
model = build_model(vocab_size, char_dim, pretrain_model_path) # 建立模型
if torch.cuda.is_available():
model = model.cuda()
optim = torch.optim.Adam(model.parameters(), lr=learning_rate) # 建立优化器
print("文本词表模型加载完毕,开始训练")
for epoch in range(epoch_num):
model.train()
watch_loss = []
for batch in range(int(train_sample / batch_size)):
x, y = build_dataset(batch_size, tokenizer, window_size, corpus) # 构建一组训练样本
if torch.cuda.is_available():
x, y = x.cuda(), y.cuda()
optim.zero_grad() # 梯度归零
loss = model(x, y) # 计算loss
loss.backward() # 计算梯度
optim.step() # 更新权重
watch_loss.append(loss.item())
print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
print(generate_sentence("让他在半年之前,就不能做出", model, tokenizer, window_size))
print(generate_sentence("李慕站在山路上,深深的呼吸", model, tokenizer, window_size))
if not save_weight:
return
else:
base_name = os.path.basename(corpus_path).replace("txt", "pth")
model_path = os.path.join("model", base_name)
torch.save(model.state_dict(), model_path)
return
if __name__ == "__main__":
# build_vocab_from_corpus("corpus/all.txt")
train(r"F:\人工智能NLP\NLP\HomeWork\demo9.1_Bert语言模型生成文本\corpus.txt", False)