按照阿光的项目
做出了学习笔记,pytorch深度学习实战项目100例
基于LSTM实现春联上联对下联
基于LSTM(长短期记忆网络)实现春联上联对下联是一种有趣且具有挑战性的任务,它涉及到自然语言处理(NLP)中的序列到序列(seq2seq)模型。LSTM是处理序列数据的理想选择,因为它能够记住长期的依赖信息,这对于生成符合语境和文化习俗的春联下联至关重要。
数据
https://github.com/wb14123/couplet-dataset
感谢大佬的分享的对联数据集
对数据集的处理
def data_generator(data):
# 计算每个对联长度的权重
data_probability = [float(len(x)) for wordcount, [x, y] in data.items()] # [每个字数key对应对联list中上联数据的个数]
data_probability = np.array(data_probability) / sum(data_probability) # 标准化至[0,1],这是每个字数的权重
# 随机选择字数,然后随机选择字数对应的上联样本,生成batch
for idx in range(15):
# 随机选字数id,概率为上面计算的字数权重
idx = idx + 1
size = min(batch_size, len(data[idx][0])) # batch_size=64,len(data[idx][0])随机选择的字数key对应的上联个数
# 从上联列表下标list中随机选出大小为size的list
idxs = np.random.choice(len(data[idx][0]), size=size)
# 返回选出的上联X与下联y, 将原本1-d array维度扩展为(row,col,1)
yield data[idx][0][idxs], np.expand_dims(data[idx][1][idxs], axis=2)
# 加载文本数据
def load_data(input_path, output_path):
# 数据读取与切分
def read_data(file_path):
txt = codecs.open(file_path, encoding='utf-8').readlines()
txt = [line.strip().split(' ') for line in txt] # 每行按空格切分
txt = [line for line in txt if len(line) < 16] # 过滤掉字数超过maxlen的对联
return txt
# 产生数据字典
def generate_count_dict(result_dict, x, y):
for i, idx in enumerate(x):
j = len(idx)
if j not in result_dict:
result_dict[j] = [[], []] # [样本数据list,类别标记list]
result_dict[j][0].append(idx)
result_dict[j][1].append(y[i])
return result_dict
# 将字典数据转为numpy
def to_numpy_array(dict):
for count, [x, y] in dict.items():
dict[count][0] = np.array(x)
dict[count][1] = np.array(y)
return dict
x = read_data(input_path)
y = read_data(output_path)
# 获取词表
vocabulary = x + y
# 构造字符级别的特征
string = ''
for words in vocabulary:
for word in words:
string += word
# 所有的词汇表
vocabulary = set(string)
word2idx = {word: i for i, word in enumerate(vocabulary)}
idx2word = {i: word for i, word in enumerate(vocabulary)}
# 训练数据中所有词的个数
vocab_size = len(word2idx.keys()) # 词汇表大小
# 将x和y转为数值
x = [[word2idx[word] for word in sent] for sent in x]
y = [[word2idx[word] for word in sent] for sent in y]
train_dict = {}
train_dict = generate_count_dict(train_dict, x, y)
train_dict = to_numpy_array(train_dict)
return train_dict, vocab_size, idx2word, word2idx
基本想法:
这种场景是典型的 Encoder-Decoder 框架应用问题。
在这个框架中:
- Encoder 负责读取输入序列(上联)并将其转换成一个固定长度的内部表示形式,通常是最后一个时间步的隐藏状态。这个内部表示被视为输入序列的“上下文”或“意义”,包含了生成输出序列所需的所有信息。
- Decoder 接收这个内部表示并开始生成输出序列(下联),一步一步地生成,直到产生序列结束标记或达到特定长度。
构建模型
模型架构:使用seq2seq模型,该模型一般包括一个编码器(encoder)和一个解码器(decoder),两者都可以是LSTM网络。编码器负责处理上联,而解码器则生成下联。
嵌入层:通常在模型的第一层使用嵌入层,将每个字符或词转换为固定大小的向量,这有助于模型更好地理解语言中的语义信息。
# 定义网络结构
class LSTM(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers):
super(LSTM, self).__init__()
self.hidden_dim = hidden_dim
self.embeddings = nn.Embedding(vocab_size + 1, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers)
self.linear = nn.Linear(hidden_dim, vocab_size)
def forward(self, x):
time_step, batch_size = x.size() # 124, 16
embeds = self.embeddings(x)
output, (h_n, c_n) = self.lstm(embeds)
output = self.linear(output.reshape(time_step * batch_size, -1))
# 要返回所有时间点的数据,每个时间点对应一个字,也就是vocab_size维度的向量
return output
训练模型
# 加载数据
train_dict, vocab_size, idx2word, word2idx = load_data(input_path, output_path)
# 模型训练
model = LSTM(vocab_size=vocab_size, hidden_dim=hidden_dim,
embedding_dim=embedding_dim, num_layers=num_layers)
Configimizer = optim.Adam(model.parameters(), lr=lr) # 优化器
criterion = nn.CrossEntropyLoss() # 多分类损失函数
model.to(device)
loss_meter = meter.AverageValueMeter()
best_loss = 999 # 保存loss
best_model = None # 保存对应最好准确率的模型参数
for epoch in range(epochs):
model.train() # 开启训练模式
loss_meter.reset()
for x, y in data_generator(train_dict):
x = torch.from_numpy(x).long().transpose(1, 0).contiguous()
x = x.to(device)
y = torch.from_numpy(y).long().transpose(1, 0).contiguous()
y = y.to(device)
Configimizer.zero_grad()
# 形成预测结果
output_ = model(x)
# 计算损失
loss = criterion(output_, y.long().view(-1))
loss.backward()
Configimizer.step()
loss_meter.add(loss.item())
# 打印信息
print("【EPOCH: 】%s" % str(epoch + 1))
print("训练损失为%s" % (str(loss_meter.mean)))
# 保存模型及相关信息
if loss_meter.mean < best_loss:
best_loss = loss_meter.mean
best_model = model.state_dict()
# 在训练结束保存最优的模型参数
if epoch == epochs - 1:
# 保存模型
torch.save(best_model, './best_model.pkl')
测试
import codecs
import numpy as np
import torch
from torch import nn
from torch import optim
from torchnet import meter
# 模型输入参数,需要自己根据需要调整
input_path = 'C:\\Users\\kaai\\AppData\\Local\\Temp\\BNZ.65e95f542f0fca6f\\train\\in.txt'
output_path = 'C:\\Users\\kaai\\AppData\\Local\\Temp\\BNZ.65e95f542f0fca6f\\train\\out.txt'
num_layers = 1 # LSTM层数
hidden_dim = 100 # LSTM中的隐层大小
epochs = 50 # 迭代次数
batch_size = 128 # 每个批次样本大小
embedding_dim = 15 # 每个字形成的嵌入向量大小
lr = 0.01 # 学习率
device = 'cpu' # 设备
# 用于生成训练数据
def data_generator(data):
# 计算每个对联长度的权重
data_probability = [float(len(x)) for wordcount, [x, y] in data.items()] # [每个字数key对应对联list中上联数据的个数]
data_probability = np.array(data_probability) / sum(data_probability) # 标准化至[0,1],这是每个字数的权重
# 随机选择字数,然后随机选择字数对应的上联样本,生成batch
for idx in range(15):
# 随机选字数id,概率为上面计算的字数权重
idx = idx + 1
size = min(batch_size, len(data[idx][0])) # batch_size=64,len(data[idx][0])随机选择的字数key对应的上联个数
# 从上联列表下标list中随机选出大小为size的list
idxs = np.random.choice(len(data[idx][0]), size=size)
# 返回选出的上联X与下联y, 将原本1-d array维度扩展为(row,col,1)
yield data[idx][0][idxs], np.expand_dims(data[idx][1][idxs], axis=2)
# 加载文本数据
def load_data(input_path, output_path):
# 数据读取与切分
def read_data(file_path):
txt = codecs.open(file_path, encoding='utf-8').readlines()
txt = [line.strip().split(' ') for line in txt] # 每行按空格切分
txt = [line for line in txt if len(line) < 16] # 过滤掉字数超过maxlen的对联
return txt
# 产生数据字典
def generate_count_dict(result_dict, x, y):
for i, idx in enumerate(x):
j = len(idx)
if j not in result_dict:
result_dict[j] = [[], []] # [样本数据list,类别标记list]
result_dict[j][0].append(idx)
result_dict[j][1].append(y[i])
return result_dict
# 将字典数据转为numpy
def to_numpy_array(dict):
for count, [x, y] in dict.items():
dict[count][0] = np.array(x)
dict[count][1] = np.array(y)
return dict
x = read_data(input_path)
y = read_data(output_path)
# 获取词表
vocabulary = x + y
# 构造字符级别的特征
string = ''
for words in vocabulary:
for word in words:
string += word
# 所有的词汇表
vocabulary = set(string)
word2idx = {word: i for i, word in enumerate(vocabulary)}
idx2word = {i: word for i, word in enumerate(vocabulary)}
# 训练数据中所有词的个数
vocab_size = len(word2idx.keys()) # 词汇表大小
# 将x和y转为数值
x = [[word2idx[word] for word in sent] for sent in x]
y = [[word2idx[word] for word in sent] for sent in y]
train_dict = {}
train_dict = generate_count_dict(train_dict, x, y)
train_dict = to_numpy_array(train_dict)
return train_dict, vocab_size, idx2word, word2idx
# 定义网络结构
class LSTM(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers):
super(LSTM, self).__init__()
self.hidden_dim = hidden_dim
self.embeddings = nn.Embedding(vocab_size + 1, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers)
self.linear = nn.Linear(hidden_dim, vocab_size)
def forward(self, x):
time_step, batch_size = x.size() # 124, 16
embeds = self.embeddings(x)
output, (h_n, c_n) = self.lstm(embeds)
output = self.linear(output.reshape(time_step * batch_size, -1))
# 要返回所有时间点的数据,每个时间点对应一个字,也就是vocab_size维度的向量
return output
def couplet_match(s):
# 将字符串转为数值
x = [word2idx[word] for word in s]
# 将数值向量转为tensor
x = torch.from_numpy(np.array(x).reshape(-1, 1))
# 加载模型
model_path = './best_model.pkl'
model = LSTM(vocab_size=vocab_size, hidden_dim=hidden_dim,
embedding_dim=embedding_dim, num_layers=num_layers)
model.load_state_dict(torch.load(model_path, 'cpu'))
y = model(x)
y = y.argmax(axis=1)
r = ''.join([idx2word[idx.item()] for idx in y])
print('上联:%s,下联:%s' % (s, r))
# 加载数据
train_dict, vocab_size, idx2word, word2idx = load_data(input_path, output_path)
# 测试
sentence = '恭喜发财'
couplet_match(sentence)