一、基于pytorch的网络编写一个分词模型
#coding:utf8
import torch
import torch.nn as nn
import jieba
import numpy as np
import random
import json
from torch.utils.data import DataLoader
"""
基于pytorch的网络编写一个分词模型
我们使用jieba分词的结果作为训练数据
看看是否可以得到一个效果接近的神经网络模型
"""
# TorchModel 类定义了一个包含嵌入层、RNN 层和线性分类层的神经网络
# 使用 nn.Embedding 将字符映射到高维空间,通过 nn.RNN 处理序列信息,使用 nn.Linear 进行分类。
class TorchModel(nn.Module):
def __init__(self, input_dim, hidden_size, num_rnn_layers, vocab):
super(TorchModel, self).__init__()
self.embedding = nn.Embedding(len(vocab)+1, input_dim, padding_idx=0)
self.rnn_layer = nn.RNN(input_size=input_dim,
hidden_size=hidden_size,
batch_first=True,
num_layers=num_rnn_layers
)
self.classify = nn.Linear(hidden_size, 2)
self.loss_func = nn.CrossEntropyLoss(ignore_index=-100)
def forward(self, x, y=None):
x = self.embedding(x) #input shape: (batch_size, sen_len), output shape:(batch_size, sen_len, input_dim)
x, _ = self.rnn_layer(x) #output shape:(batch_size, sen_len, hidden_size)
y_pred = self.classify(x) #output shape:(batch_size, sen_len, 2) -> y_pred.view(-1, 2) (batch_size*sen_len, 2)
if y is not None:
return self.loss_func(y_pred.view(-1, 2), y.view(-1))
else:
return y_pred
# Dataset 类用于处理语料库数据,从文件中读取句子
# 使用 sentence_to_sequence 将句子转换为数字序列,使用 sequence_to_label 生成标记序列,使用 padding 方法将序列和标签填充到固定长度。
class Dataset:
def __init__(self, corpus_path, vocab, max_length):
self.vocab = vocab
self.corpus_path = corpus_path
self.max_length = max_length
self.load()
def load(self):
self.data = []
with open(self.corpus_path, encoding="utf8") as f:
for line in f:
sequence = sentence_to_sequence(line, self.vocab)
label = sequence_to_label(line)
sequence, label = self.padding(sequence, label)
sequence = torch.LongTensor(sequence)
label = torch.LongTensor(label)
self.data.append([sequence, label])
#使用部分数据做展示,使用全部数据训练时间会相应变长
if len(self.data) > 10000:
break
#将文本截断或补齐到固定长度
def padding(self, sequence, label):
sequence = sequence[:self.max_length]
sequence += [0] * (self.max_length - len(sequence))
label = label[:self.max_length]
label += [-100] * (self.max_length - len(label))
return sequence, label
def __len__(self):
return len(self.data)
def __getitem__(self, item):
return self.data[item]
#文本转化为数字序列,为embedding做准备
def sentence_to_sequence(sentence, vocab):
sequence = [vocab.get(char, vocab['unk']) for char in sentence]
return sequence
#基于结巴生成分级结果的标注
def sequence_to_label(sentence):
words = jieba.lcut(sentence)
label = [0] * len(sentence)
pointer = 0
for word in words:
pointer += len(word)
label[pointer - 1] = 1
return label
#加载字表
def build_vocab(vocab_path):
vocab = {}
with open(vocab_path, "r", encoding="utf8") as f:
for index, line in enumerate(f):
char = line.strip()
vocab[char] = index + 1 #每个字对应一个序号
vocab['unk'] = len(vocab) + 1
return vocab
#建立数据集
def build_dataset(corpus_path, vocab, max_length, batch_size):
dataset = Dataset(corpus_path, vocab, max_length) #diy __len__ __getitem__
data_loader = DataLoader(dataset, shuffle=True, batch_size=batch_size) #torch
return data_loader
def main():
epoch_num = 5 #训练轮数
batch_size = 20 #每次训练样本个数
char_dim = 50 #每个字的维度
hidden_size = 100 #隐含层维度
num_rnn_layers = 1 #rnn层数
max_length = 20 #样本最大长度
learning_rate = 1e-3 #学习率
vocab_path = "chars.txt" #字表文件路径
corpus_path = "../corpus.txt" #语料文件路径
vocab = build_vocab(vocab_path) #建立字表
data_loader = build_dataset(corpus_path, vocab, max_length, batch_size) #建立数据集
model = TorchModel(char_dim, hidden_size, num_rnn_layers, vocab) #建立模型
optim = torch.optim.Adam(model.parameters(), lr=learning_rate) #建立优化器
#训练开始
for epoch in range(epoch_num):
model.train()
watch_loss = []
for x, y in data_loader:
optim.zero_grad() #梯度归零
loss = model.forward(x, y) #计算loss
loss.backward() #计算梯度
optim.step() #更新权重
watch_loss.append(loss.item())
print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
#保存模型
torch.save(model.state_dict(), "model.pth")
return
#最终预测
def predict(model_path, vocab_path, input_strings):
#配置保持和训练时一致
char_dim = 50 # 每个字的维度
hidden_size = 100 # 隐含层维度
num_rnn_layers = 1 # rnn层数
vocab = build_vocab(vocab_path) #建立字表
model = TorchModel(char_dim, hidden_size, num_rnn_layers, vocab) #建立模型
model.load_state_dict(torch.load(model_path)) #加载训练好的模型权重
model.eval()
for input_string in input_strings:
#逐条预测
x = sentence_to_sequence(input_string, vocab)
with torch.no_grad():
result = model.forward(torch.LongTensor([x]))[0]
result = torch.argmax(result, dim=-1) #预测出的01序列
#在预测为1的地方切分,将切分后文本打印出来
for index, p in enumerate(result):
if p == 1:
print(input_string[index], end=" ")
else:
print(input_string[index], end="")
print()
if __name__ == "__main__":
# main()
input_strings = ["同时国内有望出台新汽车刺激方案",
"沪胶后市有望延续强势",
"经过两个交易日的强势调整后",
"昨日上海天然橡胶期货价格再度大幅上扬"]
predict("model.pth", "chars.txt", input_strings)
模型分析
- 模型定义:
TorchModel
类定义了一个包含嵌入层、RNN 层和线性分类层的神经网络,使用nn.Embedding
将字符映射到高维空间,通过nn.RNN
处理序列信息,使用nn.Linear
进行分类。- 数据集处理:
Dataset
类用于处理语料库数据,从文件中读取句子,使用sentence_to_sequence
将句子转换为数字序列,使用sequence_to_label
生成标记序列,使用padding
方法将序列和标签填充到固定长度。build_vocab
函数从文件中构建词汇表,build_dataset
函数使用Dataset
类和DataLoader
进行批处理。
- 训练部分:
main
函数中,设置超参数,构建模型和优化器,进行多轮训练,计算损失、反向传播和更新参数,保存训练好的模型。
- 预测部分:
predict
函数加载训练好的模型和词汇表,对输入句子进行分词预测,将预测为 1 的位置进行分词,将结果打印输出。
- 数据预处理:
sentence_to_sequence
函数将输入的句子中的字符根据词汇表转换为数字序列,未在词汇表中的字符使用unk
的索引。sequence_to_label
函数利用结巴分词的结果,将分词结束位置标记为 1,其余为 0,生成标记序列。Dataset
类的padding
方法确保所有序列和标签具有相同的长度,便于批处理。
- 模型架构:
TorchModel
类的embedding
层将输入的数字序列映射到高维空间,rnn_layer
处理序列信息,classify
层将 RNN 的输出映射到 2 个类别(分词或不分词)。- 训练时使用
CrossEntropyLoss
计算损失,预测时使用torch.argmax
找到最可能的类别。
- 训练和预测流程:
main
函数设置训练的超参数,创建数据集和模型,使用Adam
优化器进行优化,保存训练好的模型。predict
函数加载训练好的模型,对输入句子进行分词预测并输出结果。
二、DAG(有向无环图)法做分词
import jieba
#词典,每个词后方存储的是其词频,仅为示例,也可自行添加
Dict = {"经常":0.1,
"经":0.05,
"有":0.1,
"常":0.001,
"有意见":0.1,
"歧":0.001,
"意见":0.2,
"分歧":0.2,
"见":0.05,
"意":0.05,
"见分歧":0.05,
"分":0.1}
#根据上方词典,对于输入文本,构造一个存储有所有切分方式的信息字典
#学术叫法为有向无环图,DAG(Directed Acyclic Graph),不理解也不用纠结,只当是个专属名词就好
#这段代码直接来自于jieba分词
# jieba.cut
def calc_dag(sentence):
DAG = {}
n = len(sentence)
for k in range(n):
i = k
tmplist = []
while i < n:
frag = sentence[k: i+1]
if frag in Dict:
tmplist.append(i)
i += 1
if not tmplist:
tmplist = [k]
DAG[k] = tmplist
return DAG
sentence = "经常有意见分歧"
print(calc_dag(sentence))
#结果应该为{0: [0, 1], 1: [1], 2: [2, 4], 3: [3, 4], 4: [4, 6], 5: [5, 6], 6: [6]}
#0:[0,1]代表句子中的第0个字,可以单独成词,或与第1个字一起成词
#2:[2,4]代表句子中的第2个字,可以单独成词,或第2-4个字一起成词
#依次类推
#这个字典中实际上就存储了所有可能的切分方式的信息
#将DAG中的信息解码(还原)出来,用文本展示出所有切分方式
class DAGDecode:
#通过两个队列来实现
def __init__(self, sentence):
self.sentence = sentence
self.DAG = calc_dag(sentence) #使用了上方的函数
self.length = len(sentence)
self.unfinish_path = [[]] #保存待解码序列的队列
self.finish_path = [] #保存解码完成的序列的队列
#对于每一个序列,检查是否需要继续解码
#不需要继续解码的,放入解码完成队列
#需要继续解码的,将生成的新队列,放入待解码队列
#path形如:["经常", "有", "意见"]
def decode_next(self, path):
path_length = len("".join(path))
if path_length == self.length: #已完成解码
self.finish_path.append(path)
return
candidates = self.DAG[path_length]
new_paths = []
for candidate in candidates:
new_paths.append(path + [self.sentence[path_length:candidate+1]])
self.unfinish_path += new_paths #放入待解码对列
return
#递归调用序列解码过程
def decode(self):
while self.unfinish_path != []:
path = self.unfinish_path.pop(0) #从待解码队列中取出一个序列
self.decode_next(path) #使用该序列进行解码
sentence = "经常有意见分歧"
dd = DAGDecode(sentence)
dd.decode()
print(dd.finish_path)
代码分析
一、函数和类的功能分析:
-
calc_dag(sentence)
函数:- 功能:
- 该函数的主要目的是根据输入的句子和预定义的词典
Dict
构建一个有向无环图(DAG),用于存储句子中所有可能的词切分信息。
- 该函数的主要目的是根据输入的句子和预定义的词典
- 实现步骤:
- 首先,初始化一个空字典
DAG
用于存储结果。 - 获取输入句子的长度
n
。 - 遍历句子中的每个字符,从当前字符开始,通过不断增加子串长度,检查子串是否在
Dict
中。 - 若子串在
Dict
中,将该子串结束字符的索引添加到tmplist
中。 - 若
tmplist
为空,说明当前字符没有可切分的词,将当前字符索引添加到tmplist
。 - 最后将
k
作为键,tmplist
作为值存储在DAG
中。
- 首先,初始化一个空字典
- 功能:
-
DAGDecode
类:__init__(self, sentence)
方法:- 功能:
- 对输入的句子进行初始化操作,为后续的解码操作准备所需的数据结构。
- 实现步骤:
- 存储输入的句子。
- 调用
calc_dag(sentence)
函数生成有向无环图,并存储在self.DAG
中。 - 存储句子的长度。
- 初始化两个队列:
self.unfinish_path
存储待解码的序列,初始化为只包含一个空列表的列表;self.finish_path
存储已完成解码的序列,初始化为空列表。
- 功能:
decode_next(self, path)
方法:- 功能:
- 对于给定的部分解码路径,判断是否完成解码,若未完成则根据
self.DAG
生成新的待解码路径并添加到self.unfinish_path
中,若完成则添加到self.finish_path
中。
- 对于给定的部分解码路径,判断是否完成解码,若未完成则根据
- 实现步骤:
- 计算当前
path
所代表的字符串的长度。 - 若长度等于句子长度,说明解码完成,将
path
加入self.finish_path
。 - 若未完成,根据
self.DAG
中存储的信息,找出可能的下一个词的结束位置,生成新的解码路径并添加到self.unfinish_path
中。
- 计算当前
- 功能:
decode(self)
方法:- 功能:
- 循环从
self.unfinish_path
中取出路径,调用decode_next
方法进行解码,直到self.unfinish_path
为空。
- 循环从
- 实现步骤:
- 只要
self.unfinish_path
不为空,就取出其中的一个元素。 - 调用
decode_next
方法对该元素进行解码。
- 只要
- 功能:
二、代码逻辑总结:
- 首先,使用
calc_dag(sentence)
函数对输入的句子构建一个有向无环图,该图以字典的形式存储了从每个字符开始的所有可能的词切分信息。例如对于输入"经常有意见分歧"
,会得到{0: [0, 1], 1: [1], 2: [2, 4], 3: [3, 4], 4: [4, 6], 5: [5, 6], 6: [6]}
。 - 然后,
DAGDecode
类利用这个有向无环图进行解码操作:- 在
__init__
阶段,存储句子、有向无环图、句子长度,并初始化待解码和已完成解码的队列。 decode_next
方法会根据当前的部分解码结果判断是否继续解码,若继续解码,会根据DAG
生成新的可能路径添加到待解码队列中,若完成则添加到已完成队列中。decode
方法通过不断调用decode_next
方法处理待解码队列中的元素,最终将所有可能的句子切分方式存储在finish_path
中。
- 在
三、代码解释示例:
- 以输入句子
"经常有意见分歧"
为例:- 在
calc_dag
函数中:- 从
k = 0
开始,"经"
在Dict
中,"经常"
也在Dict
中,所以DAG[0] = [0, 1]
。 - 对于
k = 1
,只有"常"
在Dict
中,所以DAG[1] = [1]
。 - 对于
k = 2
,"有"
在Dict
中,"有意见"
也在Dict
中,所以DAG[2] = [2, 4]
。 - 以此类推,最终得到完整的
DAG
。
- 从
- 在
DAGDecode
类中:- 初始化时,
unfinish_path = [[]]
,finish_path = []
。 - 第一次调用
decode_next
对[]
进行处理,会根据DAG[0]
生成["经"]
和["经常"]
等新路径添加到unfinish_path
。 - 不断循环调用
decode_next
,直到unfinish_path
为空,最终得到所有可能的句子切分方式存储在finish_path
中。
- 初始化时,
- 在