基于余弦相似度实现的表示型文本匹配
- 目标
- 数据准备
- 参数配置
- 数据处理
- 初始化方法
- 加载数据
- 编码句子
- 补齐与截断
- 重写父类两个方法
- 采样策略
- 加载词表和方案
- 模型构建
- SentenceEncoder类
- SiameseNetwork类
- 优化器配置
- 主程序
- 验证与评估
- 推理预测
- 测试结果
目标
本文基于给定的词表,将输入的文本基于jieba分词分割为若干个词,然后将词基于词表进行初步编码,而后基于不同的采样策略过网络层,得到文本的词嵌入特征向量,最后计算文本之间特征向量的余弦相似度,从而实现一个简单表示型文本的匹配方法。
数据准备
词表文件chars.txt
类别标签文件schema.json
{
"停机保号": 0,
"密码重置": 1,
"宽泛业务问题": 2,
"亲情号码设置与修改": 3,
"固话密码修改": 4,
"来电显示开通": 5,
"亲情号码查询": 6,
"密码修改": 7,
"无线套餐变更": 8,
"月返费查询": 9,
"移动密码修改": 10,
"固定宽带服务密码修改": 11,
"UIM反查手机号": 12,
"有限宽带障碍报修": 13,
"畅聊套餐变更": 14,
"呼叫转移设置": 15,
"短信套餐取消": 16,
"套餐余量查询": 17,
"紧急停机": 18,
"VIP密码修改": 19,
"移动密码重置": 20,
"彩信套餐变更": 21,
"积分查询": 22,
"话费查询": 23,
"短信套餐开通立即生效": 24,
"固话密码重置": 25,
"解挂失": 26,
"挂失": 27,
"无线宽带密码修改": 28
}
训练集数据train.json训练集数据
验证集数据valid.json验证集数据
参数配置
config.py
# -*- coding: utf-8 -*-
"""
配置参数信息
"""
# -*- coding: utf-8 -*-
"""
配置参数信息
"""
Config = {
"model_path": "model_output",
"schema_path": "../data/schema.json",
"train_data_path": "../data/train.json",
"valid_data_path": "../data/valid.json",
"vocab_path":"../chars.txt",
"max_length": 20,
"hidden_size": 128,
"epoch": 10,
"batch_size": 32,
"epoch_data_size": 200, #每轮训练中采样数量
"positive_sample_rate":0.5, #正样本比例
"optimizer": "adam",
"learning_rate": 1e-3,
}
数据处理
loader.py
# -*- coding: utf-8 -*-
import json
import re
import os
import torch
import random
import jieba
import numpy as np
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
"""
数据加载
"""
class DataGenerator:
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.vocab = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.vocab)
self.schema = load_schema(config["schema_path"])
self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
self.data_type = None #用来标识加载的是训练集还是测试集 "train" or "test"
self.load()
def load(self):
self.data = []
self.knwb = defaultdict(list)
with open(self.path, encoding="utf8") as f:
for line in f:
line = json.loads(line)
#加载训练集
if isinstance(line, dict):
self.data_type = "train"
questions = line["questions"]
label = line["target"]
for question in questions:
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
self.knwb[self.schema[label]].append(input_id)
#加载测试集
else:
self.data_type = "test"
assert isinstance(line, list)
question, label = line
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
label_index = torch.LongTensor([self.schema[label]])
self.data.append([input_id, label_index])
return
def encode_sentence(self, text):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
input_id = self.padding(input_id)
return input_id
#补齐或截断输入的序列,使其可以在一个batch内运算
def padding(self, input_id):
input_id = input_id[:self.config["max_length"]]
input_id += [0] * (self.config["max_length"] - len(input_id))
return input_id
def __len__(self):
if self.data_type == "train":
return self.config["epoch_data_size"]
else:
assert self.data_type == "test", self.data_type
return len(self.data)
def __getitem__(self, index):
if self.data_type == "train":
return self.random_train_sample() #随机生成一个训练样本
else:
return self.data[index]
#依照一定概率生成负样本或正样本
#负样本从随机两个不同的标准问题中各随机选取一个
#正样本从随机一个标准问题中随机选取两个
def random_train_sample(self):
standard_question_index = list(self.knwb.keys())
#随机正样本
if random.random() <= self.config["positive_sample_rate"]:
p = random.choice(standard_question_index)
#如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
if len(self.knwb[p]) < 2:
return self.random_train_sample()
else:
s1, s2 = random.sample(self.knwb[p], 2)
return [s1, s2, torch.LongTensor([1])]
#随机负样本
else:
p, n = random.sample(standard_question_index, 2)
s1 = random.choice(self.knwb[p])
s2 = random.choice(self.knwb[n])
return [s1, s2, torch.LongTensor([-1])]
#加载字表或词表
def load_vocab(vocab_path):
token_dict = {}
with open(vocab_path, encoding="utf8") as f:
for index, line in enumerate(f):
token = line.strip()
token_dict[token] = index + 1 #0留给padding位置,所以从1开始
return token_dict
#加载schema
def load_schema(schema_path):
with open(schema_path, encoding="utf8") as f:
return json.loads(f.read())
#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
dg = DataGenerator(data_path, config)
dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
return dl
主要实现一个数据加载器 DataGenerator
,用于从指定的数据路径加载数据,并根据给定的配置进行处理。
初始化方法
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.vocab = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.vocab)
self.schema = load_schema(config["schema_path"])
self.train_data_size = config["epoch_data_size"]
self.data_type = None
self.load()
data_path
: 数据文件路径,包含数据(训练或测试数据)。config
: 配置字典,包含词汇表路径、最大序列长度、批次大小等。vocab
: 通过load_vocab
函数加载的词汇表。schema
: 通过load_schema
函数加载的schema,定义了类别与标签的映射。train_data_size
: 设置每个训练周期的数据大小(如数据采样的数量)。data_type
: 表示数据是训练数据还是测试数据。
加载数据
def load(self):
self.data = []
self.knwb = defaultdict(list)
with open(self.path, encoding="utf8") as f:
for line in f:
line = json.loads(line)
# 加载训练集
if isinstance(line, dict):
self.data_type = "train"
questions = line["questions"]
label = line["target"]
for question in questions:
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
self.knwb[self.schema[label]].append(input_id)
# 加载测试集
else:
self.data_type = "test"
assert isinstance(line, list)
question, label = line
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
label_index = torch.LongTensor([self.schema[label]])
self.data.append([input_id, label_index])
- 数据从
data_path
指定的文件加载。每行数据可以是一个字典(训练数据)或者一个列表(测试数据)。 - 对于训练数据,每个问题(
questions
)都通过encode_sentence
进行编码并加入到knwb
字典中,knwb
的键是schema[label]
,值是对应的句子编码。 - 对于测试数据,问题和标签被直接处理并存储到
data
列表中。
编码句子
def encode_sentence(self, text):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
input_id = self.padding(input_id)
return input_id
- 根据
vocab_path
来决定是按词(jieba.cut
分词)还是按字符(逐个字符编码)进行句子的编码。 - 若词或字符不在词汇表中,则使用
[UNK]
(未知词)表示。 - 最后,通过
padding
方法将编码后的句子长度统一为max_length
。
补齐与截断
def padding(self, input_id):
input_id = input_id[:self.config["max_length"]]
input_id += [0] * (self.config["max_length"] - len(input_id))
return input_id
- 将输入序列截断或填充至
max_length
,保证所有句子有相同的长度。
重写父类两个方法
def __len__(self):
if self.data_type == "train":
return self.config["epoch_data_size"]
else:
assert self.data_type == "test", self.data_type
return len(self.data)
def __getitem__(self, index):
if self.data_type == "train":
return self.random_train_sample() # 随机生成一个训练样本
else:
return self.data[index]
- 返回数据集的大小。如果是训练集,返回
epoch_data_size
(即每个训练周期的数据量);如果是测试集,返回实际数据量。 - 如果是训练数据,调用
random_train_sample()
方法来随机生成一个训练样本。 - 如果是测试数据,直接返回
data
中的对应项。
采样策略
def random_train_sample(self):
standard_question_index = list(self.knwb.keys())
# 随机正样本
if random.random() <= self.config["positive_sample_rate"]:
p = random.choice(standard_question_index)
# 如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
if len(self.knwb[p]) < 2:
return self.random_train_sample()
else:
s1, s2 = random.sample(self.knwb[p], 2)
return [s1, s2, torch.LongTensor([1])]
# 随机负样本
else:
p, n = random.sample(standard_question_index, 2)
s1 = random.choice(self.knwb[p])
s2 = random.choice(self.knwb[n])
return [s1, s2, torch.LongTensor([-1])]
采样策略解释:
-
正样本采样:
- 首先随机选择一个标准问题集(
p
)。 - 若该标准问题集下有至少两个问题,则随机选取两个问题作为正样本(标签为1)。
- 如果该问题集下的问题不足两个,则重新进行正样本采样。
- 首先随机选择一个标准问题集(
-
负样本采样:
- 随机选择两个不同的标准问题集(
p
和n
)。 - 分别从这两个标准问题集中各选取一个问题,组成负样本(标签为-1)。
- 随机选择两个不同的标准问题集(
这两种采样策略用于生成训练数据,其中正样本表示两个问题来自同一类别,负样本表示两个问题来自不同类别。
加载词表和方案
def load_vocab(vocab_path):
token_dict = {}
with open(vocab_path, encoding="utf8") as f:
for index, line in enumerate(f):
token = line.strip()
token_dict[token] = index + 1 # 0 留给 padding 位置,所以从 1 开始
return token_dict
def load_schema(schema_path):
with open(schema_path, encoding="utf8") as f:
return json.loads(f.read())
load_vocab
: 加载词汇表,每个词与一个唯一的索引对应(从1开始,0用于填充)。load_schema
: 加载类别与标签的映射文件,通常是一个 JSON 文件。
模型构建
model.py
# -*- coding: utf-8 -*-
import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
"""
建立网络模型结构
"""
class SentenceEncoder(nn.Module):
def __init__(self, config):
super(SentenceEncoder, self).__init__()
hidden_size = config["hidden_size"]
vocab_size = config["vocab_size"] + 1
max_length = config["max_length"]
self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)
# self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True)
self.layer = nn.Linear(hidden_size, hidden_size)
self.dropout = nn.Dropout(0.5)
#输入为问题字符编码
def forward(self, x):
x = self.embedding(x)
#使用lstm
# x, _ = self.lstm(x)
#使用线性层
x = self.layer(x)
x = nn.functional.max_pool1d(x.transpose(1, 2), x.shape[1]).squeeze()
return x
class SiameseNetwork(nn.Module):
def __init__(self, config):
super(SiameseNetwork, self).__init__()
self.sentence_encoder = SentenceEncoder(config)
self.loss = nn.CosineEmbeddingLoss()
# 计算余弦距离 1-cos(a,b)
# cos=1时两个向量相同,余弦距离为0;cos=0时,两个向量正交,余弦距离为1
def cosine_distance(self, tensor1, tensor2):
tensor1 = torch.nn.functional.normalize(tensor1, dim=-1)
tensor2 = torch.nn.functional.normalize(tensor2, dim=-1)
cosine = torch.sum(torch.mul(tensor1, tensor2), axis=-1)
return 1 - cosine
def cosine_triplet_loss(self, a, p, n, margin=None):
ap = self.cosine_distance(a, p)
an = self.cosine_distance(a, n)
if margin is None:
diff = ap - an + 0.1
else:
diff = ap - an + margin.squeeze()
return torch.mean(diff[diff.gt(0)]) #greater than
#sentence : (batch_size, max_length)
def forward(self, sentence1, sentence2=None, target=None):
#同时传入两个句子
if sentence2 is not None:
vector1 = self.sentence_encoder(sentence1) #vec:(batch_size, hidden_size)
vector2 = self.sentence_encoder(sentence2)
#如果有标签,则计算loss
if target is not None:
return self.loss(vector1, vector2, target.squeeze())
#如果无标签,计算余弦距离
else:
return self.cosine_distance(vector1, vector2)
#单独传入一个句子时,认为正在使用向量化能力
else:
return self.sentence_encoder(sentence1)
def choose_optimizer(config, model):
optimizer = config["optimizer"]
learning_rate = config["learning_rate"]
if optimizer == "adam":
return Adam(model.parameters(), lr=learning_rate)
elif optimizer == "sgd":
return SGD(model.parameters(), lr=learning_rate)
这段代码主要定义一个Siamese 孪生网络模型,用于处理两个句子之间的相似性计算。代码包括了一个句子编码器(SentenceEncoder
)和一个主要的网络类(SiameseNetwork
),以及一个优化器选择函数。
SentenceEncoder类
SentenceEncoder
是一个简单的神经网络模块,用于将输入句子转换为一个固定维度的向量表示。
-
构造函数(
__init__
):hidden_size
: 神经网络中的隐藏层维度。vocab_size
: 词汇表的大小。加 1 是为了考虑填充(padding)词。max_length
: 输入句子的最大长度(这个值在代码中没有使用,但可能在实际使用中会有用)。self.embedding
: 使用nn.Embedding
来将词汇表中的每个词映射到一个隐藏维度的向量。padding_idx=0
表示索引为0的词(即填充词)不参与梯度计算。self.layer
: 一个全连接层,输入和输出的维度都是hidden_size
,该层用于变换嵌入后的表示。self.dropout
: 用于在训练时随机丢弃一部分神经元,以减少过拟合。丢弃率为0.5。
-
forward
方法:- 输入:
x
(形状为(batch_size, max_length)
的张量,即一个批次的词汇索引)。 self.embedding(x)
: 将词汇索引转换为词嵌入(形状为(batch_size, max_length, hidden_size)
)。self.layer(x)
: 对嵌入后的表示应用一个线性层(形状变为(batch_size, max_length, hidden_size)
)。nn.functional.max_pool1d
: 对每个句子(按批次处理)应用最大池化。由于我们处理的是句子,因此进行池化后,x.transpose(1, 2)
的形状是(batch_size, hidden_size, max_length)
,使用max_pool1d
聚合最大特征值,最后将其维度变为(batch_size, hidden_size)
。
- 输入:
SiameseNetwork类
SiameseNetwork
是一个典型的孪生网络(Siamese Network),用于计算两个输入句子(或输入向量)之间的相似性。
-
构造函数(
__init__
):- 初始化
SentenceEncoder
模型,作为句子编码器。 self.loss
使用CosineEmbeddingLoss
作为损失函数,这是一个基于余弦相似度的损失函数,用于衡量两个向量之间的相似性。它期望输入为两个向量及其相似度标签。
- 初始化
-
cosine_distance
方法:- 计算两个输入张量
tensor1
和tensor2
之间的余弦距离。余弦距离的定义是: cosine_distance = 1 − cos ( θ ) \text{cosine\_distance} = 1 - \cos(\theta) cosine_distance=1−cos(θ)
其中 cos ( θ ) \cos(\theta) cos(θ) 是两个向量的余弦相似度。 torch.nn.functional.normalize
: 对输入张量进行归一化,使得每个向量的范数(长度)为1,这样计算余弦相似度时只需要考虑方向而不考虑长度。torch.sum(torch.mul(tensor1, tensor2), axis=-1)
: 计算归一化后的两个向量的点积,即余弦相似度。
- 计算两个输入张量
-
cosine_triplet_loss
方法:- 这是一个用于三元组损失(Triplet Loss)的实现,通常用于训练深度学习模型区分相似和不相似的样本。
a
是锚点(anchor),p
是正样本(positive),n
是负样本(negative)。ap
和an
分别是锚点与正样本、锚点与负样本之间的余弦距离。diff
计算的是正负样本之间的差异,若差异大于0
则返回损失。margin
用于调节该差异。- 返回
diff
中大于0的部分的平均值(即只有当正样本与负样本之间的距离差大于某个阈值时,才会产生损失)。
-
forward
方法:- 输入:
sentence1
和sentence2
(可选),以及target
(标签,表示相似性)。sentence1
和sentence2
是两个句子的索引。 - 如果
sentence2
存在,表示需要计算这两个句子的相似性:self.sentence_encoder(sentence1)
: 通过句子编码器获取sentence1
的向量表示。self.sentence_encoder(sentence2)
: 获取sentence2
的向量表示。- 如果提供了
target
,计算损失;如果没有提供标签,计算余弦距离。
- 如果只传入
sentence1
,表示仅需对该句子进行向量化。 nn.CosineEmbeddingLoss()
计算两个向量之间的余弦相似度损失。目标标签为1
时,最小化余弦相似度;为-1
时,最小化负的余弦相似度。其计算公式为:
余弦相似度:
cosine_similarity = i n p u t 1 ⋅ i n p u t 2 ∥ i n p u t 1 ∥ ∥ i n p u t 2 ∥ \text{cosine\_similarity} = \frac{input1 \cdot input2}{\|input1\| \|input2\|} cosine_similarity=∥input1∥∥input2∥input1⋅input2
损失:
loss = { max ( 0 , 1 − cosine_similarity ) for target 1 max ( 0 , cosine_similarity + 1 ) for target -1 \text{loss} = \left\{ \begin{array}{ll} \max(0, 1 - \text{cosine\_similarity}) & \text{for target 1} \\ \max(0, \text{cosine\_similarity} + 1) & \text{for target -1} \end{array} \right. loss={max(0,1−cosine_similarity)max(0,cosine_similarity+1)for target 1for target -1
- 输入:
优化器配置
此函数用于根据配置选择优化器。支持的优化器有 Adam
和 SGD
。
-
参数:
config
: 一个字典,包含优化器选择(optimizer
)和学习率(learning_rate
)等配置。model
: 要优化的模型。
-
返回: 根据
config
中的配置返回一个优化器(Adam
或SGD
)。
总结
- SentenceEncoder: 将输入句子通过词嵌入和线性层转换为固定维度的向量表示。
- SiameseNetwork: 用于计算两个句子之间的相似性。它使用
CosineEmbeddingLoss
来训练模型,或者在没有标签的情况下计算句子之间的余弦距离。 - choose_optimizer: 根据配置选择优化器。
主程序
main.py
# -*- coding: utf-8 -*-
import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import SiameseNetwork, choose_optimizer
from evaluate import Evaluator
from loader import load_data
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
"""
模型训练主程序
"""
def main(config):
#创建保存模型的目录
if not os.path.isdir(config["model_path"]):
os.mkdir(config["model_path"])
#加载训练数据
train_data = load_data(config["train_data_path"], config)
#加载模型
model = SiameseNetwork(config)
# 标识是否使用gpu
cuda_flag = torch.cuda.is_available()
if cuda_flag:
logger.info("gpu可以使用,迁移模型至gpu")
model = model.cuda()
#加载优化器
optimizer = choose_optimizer(config, model)
#加载效果测试类
evaluator = Evaluator(config, model, logger)
#训练
for epoch in range(config["epoch"]):
epoch += 1
model.train()
logger.info("epoch %d begin" % epoch)
train_loss = []
for index, batch_data in enumerate(train_data):
optimizer.zero_grad()
if cuda_flag:
batch_data = [d.cuda() for d in batch_data]
input_id1, input_id2, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况
loss = model(input_id1, input_id2, labels)
train_loss.append(loss.item())
# if index % int(len(train_data) / 2) == 0:
# logger.info("batch loss %f" % loss)
loss.backward()
optimizer.step()
logger.info("epoch average loss: %f" % np.mean(train_loss))
evaluator.eval(epoch)
model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
torch.save(model.state_dict(), model_path)
return
if __name__ == "__main__":
main(Config)
用于训练Siamese网络模型的主程序。首先,创建用于保存模型的目录并加载训练数据。接着,初始化Siamese网络模型,并检查是否有可用的GPU来加速训练。然后,选择合适的优化器和效果测试类。训练过程通过多个epoch进行,每个epoch中,遍历训练数据并计算损失,执行反向传播和优化步骤。每个epoch结束时,计算并输出平均损失,并调用Evaluator
类进行模型评估。最后,训练完成后将模型保存到指定路径。
验证与评估
evaluate.py
# -*- coding: utf-8 -*-
import torch
from loader import load_data
"""
模型效果测试
"""
class Evaluator:
def __init__(self, config, model, logger):
self.config = config
self.model = model
self.logger = logger
self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
# 由于效果测试需要训练集当做知识库,再次加载训练集。
# 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
self.train_data = load_data(config["train_data_path"], config)
self.stats_dict = {"correct":0, "wrong":0} #用于存储测试结果
#将知识库中的问题向量化,为匹配做准备
#每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
def knwb_to_vector(self):
self.question_index_to_standard_question_index = {}
self.question_ids = []
for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
for question_id in question_ids:
#记录问题编号到标准问题标号的映射,用来确认答案是否正确
self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
self.question_ids.append(question_id)
with torch.no_grad():
question_matrixs = torch.stack(self.question_ids, dim=0)
if torch.cuda.is_available():
question_matrixs = question_matrixs.cuda()
self.knwb_vectors = self.model(question_matrixs)
#将所有向量都作归一化 v / |v|
self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
return
def eval(self, epoch):
self.logger.info("开始测试第%d轮模型效果:" % epoch)
self.stats_dict = {"correct":0, "wrong":0} #清空前一轮的测试结果
self.model.eval()
self.knwb_to_vector()
for index, batch_data in enumerate(self.valid_data):
if torch.cuda.is_available():
batch_data = [d.cuda() for d in batch_data]
input_id, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况
with torch.no_grad():
test_question_vectors = self.model(input_id) #不输入labels,使用模型当前参数进行预测
self.write_stats(test_question_vectors, labels)
self.show_stats()
return
def write_stats(self, test_question_vectors, labels):
assert len(labels) == len(test_question_vectors)
for test_question_vector, label in zip(test_question_vectors, labels):
#通过一次矩阵乘法,计算输入问题和知识库中所有问题的相似度
#test_question_vector shape [vec_size] knwb_vectors shape = [n, vec_size]
res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
if int(hit_index) == int(label):
self.stats_dict["correct"] += 1
else:
self.stats_dict["wrong"] += 1
return
def show_stats(self):
correct = self.stats_dict["correct"]
wrong = self.stats_dict["wrong"]
self.logger.info("预测集合条目总量:%d" % (correct +wrong))
self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))
self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))
self.logger.info("--------------------")
return
定义一个 Evaluator
类,用于评估模型在验证数据集上的表现,尤其是在知识库中找到与输入问题最匹配的答案。核心流程如下:
-
初始化:
__init__
方法加载验证数据集和训练数据集,并初始化评估统计字典stats_dict
,该字典用于存储正确和错误的预测数量。 -
向量化知识库:
knwb_to_vector
方法将训练集中的问题转化为向量,以便与模型的输出进行匹配。每个问题的向量通过模型生成,并且进行归一化。方法将所有问题与标准问题的编号进行映射,以便后续验证预测是否正确。 -
评估模型性能:
eval
方法用于在每个训练轮次中评估模型的效果。通过调用knwb_to_vector
生成知识库向量,接着使用模型预测验证集中的问题向量。模型输出的向量与知识库中所有问题的向量进行相似度计算,判断预测是否与真实标签匹配。 -
统计和显示结果:
write_stats
方法统计每个问题的预测是否正确,show_stats
方法输出评估结果,包括总预测数、正确预测数、错误预测数和准确率。
总之,这段代码实现了一个基于向量相似度的模型评估流程,用于衡量模型在匹配问题和知识库的准确度。
推理预测
predict.py
# -*- coding: utf-8 -*-
import jieba
import torch
from loader import load_data
from config import Config
from model import SiameseNetwork, choose_optimizer
"""
模型效果测试
"""
class Predictor:
def __init__(self, config, model, knwb_data):
self.config = config
self.model = model
self.train_data = knwb_data
if torch.cuda.is_available():
self.model = model.cuda()
else:
self.model = model.cpu()
self.model.eval()
self.knwb_to_vector()
#将知识库中的问题向量化,为匹配做准备
#每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
def knwb_to_vector(self):
self.question_index_to_standard_question_index = {}
self.question_ids = []
self.vocab = self.train_data.dataset.vocab
self.schema = self.train_data.dataset.schema
self.index_to_standard_question = dict((y, x) for x, y in self.schema.items())
for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
for question_id in question_ids:
#记录问题编号到标准问题标号的映射,用来确认答案是否正确
self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
self.question_ids.append(question_id)
with torch.no_grad():
question_matrixs = torch.stack(self.question_ids, dim=0)
if torch.cuda.is_available():
question_matrixs = question_matrixs.cuda()
self.knwb_vectors = self.model(question_matrixs)
#将所有向量都作归一化 v / |v|
self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
return
def encode_sentence(self, text):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
return input_id
def predict(self, sentence):
input_id = self.encode_sentence(sentence)
input_id = torch.LongTensor([input_id])
if torch.cuda.is_available():
input_id = input_id.cuda()
with torch.no_grad():
test_question_vector = self.model(input_id) #不输入labels,使用模型当前参数进行预测
res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
return self.index_to_standard_question[hit_index]
if __name__ == "__main__":
knwb_data = load_data(Config["train_data_path"], Config)
model = SiameseNetwork(Config)
model.load_state_dict(torch.load("model_output/epoch_10.pth"))
pd = Predictor(Config, model, knwb_data)
while True:
# sentence = "固定宽带服务密码修改"
sentence = input("请输入问题:")
res = pd.predict(sentence)
print(res)
这段代码实现一个基于Siamese网络的问答匹配系统,用于根据用户输入的问题在知识库中找到最相似的问题并返回对应答案。具体流程如下:
-
类初始化:
Predictor
类负责加载配置、模型和训练数据。模型根据可用的硬件环境(GPU或CPU)进行加载,并设置为评估模式(model.eval()
)。同时,调用knwb_to_vector
方法将知识库中的问题转化为向量,便于后续匹配。 -
知识库向量化:
knwb_to_vector
方法将知识库中的所有问题编码为向量,并将其归一化,存储在knwb_vectors
中。此过程在每次测试时重新进行,以保证每次使用模型的最新参数。 -
文本编码:
encode_sentence
方法负责将输入问题转换为模型所需的索引形式。如果配置文件指定使用词汇表words.txt
,则对句子进行分词,并将词语映射为词汇表中的索引。 -
问题匹配:
predict
方法接收用户输入的问题,编码为索引后,利用模型计算问题的向量表示。然后,计算输入问题向量与知识库中所有问题向量的相似度,找到最匹配的问题并返回其对应的标准问题。 -
主程序:在主程序中,加载预训练的模型,并进入一个循环,等待用户输入问题。每次输入后,模型预测并输出最相似的标准问题。
这个系统基于Siamese网络,通过计算向量的相似度来进行高效的问答匹配。
测试结果
请输入问题:手机暂时不用能办理停机吗
停机保号
请输入问题:用手机怎样对VIP密码进行修改
VIP密码修改
请输入问题:去营业厅修改移动密码必须是本人吗
移动密码修改
请输入问题: