Day08 【基于余弦相似度实现的表示型文本匹配】

news2025/4/16 10:51:02

基于余弦相似度实现的表示型文本匹配

      • 目标
      • 数据准备
      • 参数配置
      • 数据处理
        • 初始化方法
        • 加载数据
        • 编码句子
        • 补齐与截断
        • 重写父类两个方法
        • 采样策略
        • 加载词表和方案
      • 模型构建
        • SentenceEncoder类
        • SiameseNetwork类
        • 优化器配置
      • 主程序
      • 验证与评估
      • 推理预测
      • 测试结果

目标

本文基于给定的词表,将输入的文本基于jieba分词分割为若干个词,然后将词基于词表进行初步编码,而后基于不同的采样策略过网络层,得到文本的词嵌入特征向量,最后计算文本之间特征向量的余弦相似度,从而实现一个简单表示型文本的匹配方法。

数据准备

词表文件chars.txt

类别标签文件schema.json

{
  "停机保号": 0,
  "密码重置": 1,
  "宽泛业务问题": 2,
  "亲情号码设置与修改": 3,
  "固话密码修改": 4,
  "来电显示开通": 5,
  "亲情号码查询": 6,
  "密码修改": 7,
  "无线套餐变更": 8,
  "月返费查询": 9,
  "移动密码修改": 10,
  "固定宽带服务密码修改": 11,
  "UIM反查手机号": 12,
  "有限宽带障碍报修": 13,
  "畅聊套餐变更": 14,
  "呼叫转移设置": 15,
  "短信套餐取消": 16,
  "套餐余量查询": 17,
  "紧急停机": 18,
  "VIP密码修改": 19,
  "移动密码重置": 20,
  "彩信套餐变更": 21,
  "积分查询": 22,
  "话费查询": 23,
  "短信套餐开通立即生效": 24,
  "固话密码重置": 25,
  "解挂失": 26,
  "挂失": 27,
  "无线宽带密码修改": 28
}

训练集数据train.json训练集数据

验证集数据valid.json验证集数据

参数配置

config.py

# -*- coding: utf-8 -*-

"""
配置参数信息
"""
# -*- coding: utf-8 -*-

"""
配置参数信息
"""
Config = {
    "model_path": "model_output",
    "schema_path": "../data/schema.json",
    "train_data_path": "../data/train.json",
    "valid_data_path": "../data/valid.json",
    "vocab_path":"../chars.txt",
    "max_length": 20,
    "hidden_size": 128,
    "epoch": 10,
    "batch_size": 32,
    "epoch_data_size": 200,     #每轮训练中采样数量
    "positive_sample_rate":0.5,  #正样本比例
    "optimizer": "adam",
    "learning_rate": 1e-3,
}

数据处理

loader.py

# -*- coding: utf-8 -*-

import json
import re
import os
import torch
import random
import jieba
import numpy as np
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
"""
数据加载
"""


class DataGenerator:
    def __init__(self, data_path, config):
        self.config = config
        self.path = data_path
        self.vocab = load_vocab(config["vocab_path"])
        self.config["vocab_size"] = len(self.vocab)
        self.schema = load_schema(config["schema_path"])
        self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
        self.data_type = None  #用来标识加载的是训练集还是测试集 "train" or "test"
        self.load()

    def load(self):
        self.data = []
        self.knwb = defaultdict(list)
        with open(self.path, encoding="utf8") as f:
            for line in f:
                line = json.loads(line)
                #加载训练集
                if isinstance(line, dict):
                    self.data_type = "train"
                    questions = line["questions"]
                    label = line["target"]
                    for question in questions:
                        input_id = self.encode_sentence(question)
                        input_id = torch.LongTensor(input_id)
                        self.knwb[self.schema[label]].append(input_id)
                #加载测试集
                else:
                    self.data_type = "test"
                    assert isinstance(line, list)
                    question, label = line
                    input_id = self.encode_sentence(question)
                    input_id = torch.LongTensor(input_id)
                    label_index = torch.LongTensor([self.schema[label]])
                    self.data.append([input_id, label_index])
        return

    def encode_sentence(self, text):
        input_id = []
        if self.config["vocab_path"] == "words.txt":
            for word in jieba.cut(text):
                input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
        else:
            for char in text:
                input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
        input_id = self.padding(input_id)
        return input_id

    #补齐或截断输入的序列,使其可以在一个batch内运算
    def padding(self, input_id):
        input_id = input_id[:self.config["max_length"]]
        input_id += [0] * (self.config["max_length"] - len(input_id))
        return input_id

    def __len__(self):
        if self.data_type == "train":
            return self.config["epoch_data_size"]
        else:
            assert self.data_type == "test", self.data_type
            return len(self.data)

    def __getitem__(self, index):
        if self.data_type == "train":
            return self.random_train_sample() #随机生成一个训练样本
        else:
            return self.data[index]

    #依照一定概率生成负样本或正样本
    #负样本从随机两个不同的标准问题中各随机选取一个
    #正样本从随机一个标准问题中随机选取两个
    def random_train_sample(self):
        standard_question_index = list(self.knwb.keys())
        #随机正样本
        if random.random() <= self.config["positive_sample_rate"]:
            p = random.choice(standard_question_index)
            #如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
            if len(self.knwb[p]) < 2:
                return self.random_train_sample()
            else:
                s1, s2 = random.sample(self.knwb[p], 2)
                return [s1, s2, torch.LongTensor([1])]
        #随机负样本
        else:
            p, n = random.sample(standard_question_index, 2)
            s1 = random.choice(self.knwb[p])
            s2 = random.choice(self.knwb[n])
            return [s1, s2, torch.LongTensor([-1])]



#加载字表或词表
def load_vocab(vocab_path):
    token_dict = {}
    with open(vocab_path, encoding="utf8") as f:
        for index, line in enumerate(f):
            token = line.strip()
            token_dict[token] = index + 1  #0留给padding位置,所以从1开始
    return token_dict

#加载schema
def load_schema(schema_path):
    with open(schema_path, encoding="utf8") as f:
        return json.loads(f.read())

#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
    dg = DataGenerator(data_path, config)
    dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
    return dl

主要实现一个数据加载器 DataGenerator,用于从指定的数据路径加载数据,并根据给定的配置进行处理。

初始化方法
def __init__(self, data_path, config):
    self.config = config
    self.path = data_path
    self.vocab = load_vocab(config["vocab_path"])
    self.config["vocab_size"] = len(self.vocab)
    self.schema = load_schema(config["schema_path"])
    self.train_data_size = config["epoch_data_size"]
    self.data_type = None
    self.load()
  • data_path: 数据文件路径,包含数据(训练或测试数据)。
  • config: 配置字典,包含词汇表路径、最大序列长度、批次大小等。
  • vocab: 通过load_vocab函数加载的词汇表。
  • schema: 通过load_schema函数加载的schema,定义了类别与标签的映射。
  • train_data_size: 设置每个训练周期的数据大小(如数据采样的数量)。
  • data_type: 表示数据是训练数据还是测试数据。
加载数据
def load(self):
    self.data = []
    self.knwb = defaultdict(list)
    with open(self.path, encoding="utf8") as f:
        for line in f:
            line = json.loads(line)
            # 加载训练集
            if isinstance(line, dict):
                self.data_type = "train"
                questions = line["questions"]
                label = line["target"]
                for question in questions:
                    input_id = self.encode_sentence(question)
                    input_id = torch.LongTensor(input_id)
                    self.knwb[self.schema[label]].append(input_id)
            # 加载测试集
            else:
                self.data_type = "test"
                assert isinstance(line, list)
                question, label = line
                input_id = self.encode_sentence(question)
                input_id = torch.LongTensor(input_id)
                label_index = torch.LongTensor([self.schema[label]])
                self.data.append([input_id, label_index])
  • 数据从 data_path 指定的文件加载。每行数据可以是一个字典(训练数据)或者一个列表(测试数据)。
  • 对于训练数据,每个问题(questions)都通过 encode_sentence 进行编码并加入到 knwb 字典中,knwb 的键是 schema[label],值是对应的句子编码。
  • 对于测试数据,问题和标签被直接处理并存储到 data 列表中。
编码句子
def encode_sentence(self, text):
    input_id = []
    if self.config["vocab_path"] == "words.txt":
        for word in jieba.cut(text):
            input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
    else:
        for char in text:
            input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
    input_id = self.padding(input_id)
    return input_id
  • 根据 vocab_path 来决定是按词(jieba.cut 分词)还是按字符(逐个字符编码)进行句子的编码。
  • 若词或字符不在词汇表中,则使用 [UNK](未知词)表示。
  • 最后,通过 padding 方法将编码后的句子长度统一为 max_length
补齐与截断
def padding(self, input_id):
    input_id = input_id[:self.config["max_length"]]
    input_id += [0] * (self.config["max_length"] - len(input_id))
    return input_id
  • 将输入序列截断或填充至 max_length,保证所有句子有相同的长度。
重写父类两个方法
def __len__(self):
    if self.data_type == "train":
        return self.config["epoch_data_size"]
    else:
        assert self.data_type == "test", self.data_type
        return len(self.data)

def __getitem__(self, index):
    if self.data_type == "train":
        return self.random_train_sample()  # 随机生成一个训练样本
    else:
        return self.data[index]       
  • 返回数据集的大小。如果是训练集,返回 epoch_data_size(即每个训练周期的数据量);如果是测试集,返回实际数据量。
  • 如果是训练数据,调用 random_train_sample() 方法来随机生成一个训练样本。
  • 如果是测试数据,直接返回 data 中的对应项。
采样策略
def random_train_sample(self):
    standard_question_index = list(self.knwb.keys())
    # 随机正样本
    if random.random() <= self.config["positive_sample_rate"]:
        p = random.choice(standard_question_index)
        # 如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
        if len(self.knwb[p]) < 2:
            return self.random_train_sample()
        else:
            s1, s2 = random.sample(self.knwb[p], 2)
            return [s1, s2, torch.LongTensor([1])]
    # 随机负样本
    else:
        p, n = random.sample(standard_question_index, 2)
        s1 = random.choice(self.knwb[p])
        s2 = random.choice(self.knwb[n])
        return [s1, s2, torch.LongTensor([-1])]

采样策略解释:

  1. 正样本采样

    • 首先随机选择一个标准问题集(p)。
    • 若该标准问题集下有至少两个问题,则随机选取两个问题作为正样本(标签为1)。
    • 如果该问题集下的问题不足两个,则重新进行正样本采样。
  2. 负样本采样

    • 随机选择两个不同的标准问题集(pn)。
    • 分别从这两个标准问题集中各选取一个问题,组成负样本(标签为-1)。

这两种采样策略用于生成训练数据,其中正样本表示两个问题来自同一类别,负样本表示两个问题来自不同类别。

加载词表和方案
def load_vocab(vocab_path):
    token_dict = {}
    with open(vocab_path, encoding="utf8") as f:
        for index, line in enumerate(f):
            token = line.strip()
            token_dict[token] = index + 1  # 0 留给 padding 位置,所以从 1 开始
    return token_dict

def load_schema(schema_path):
    with open(schema_path, encoding="utf8") as f:
        return json.loads(f.read())
  • load_vocab: 加载词汇表,每个词与一个唯一的索引对应(从1开始,0用于填充)。
  • load_schema: 加载类别与标签的映射文件,通常是一个 JSON 文件。

模型构建

model.py

# -*- coding: utf-8 -*-

import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
"""
建立网络模型结构
"""

class SentenceEncoder(nn.Module):
    def __init__(self, config):
        super(SentenceEncoder, self).__init__()
        hidden_size = config["hidden_size"]
        vocab_size = config["vocab_size"] + 1
        max_length = config["max_length"]
        self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)
        # self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True)
        self.layer = nn.Linear(hidden_size, hidden_size)
        self.dropout = nn.Dropout(0.5)

    #输入为问题字符编码
    def forward(self, x):
        x = self.embedding(x)
        #使用lstm
        # x, _ = self.lstm(x)
        #使用线性层
        x = self.layer(x)
        x = nn.functional.max_pool1d(x.transpose(1, 2), x.shape[1]).squeeze()
        return x


class SiameseNetwork(nn.Module):
    def __init__(self, config):
        super(SiameseNetwork, self).__init__()
        self.sentence_encoder = SentenceEncoder(config)

        self.loss = nn.CosineEmbeddingLoss()

    # 计算余弦距离  1-cos(a,b)
    # cos=1时两个向量相同,余弦距离为0;cos=0时,两个向量正交,余弦距离为1
    def cosine_distance(self, tensor1, tensor2):
        tensor1 = torch.nn.functional.normalize(tensor1, dim=-1)
        tensor2 = torch.nn.functional.normalize(tensor2, dim=-1)
        cosine = torch.sum(torch.mul(tensor1, tensor2), axis=-1)
        return 1 - cosine

    def cosine_triplet_loss(self, a, p, n, margin=None):
        ap = self.cosine_distance(a, p)
        an = self.cosine_distance(a, n)
        if margin is None:
            diff = ap - an + 0.1
        else:
            diff = ap - an + margin.squeeze()
        return torch.mean(diff[diff.gt(0)]) #greater than

    #sentence : (batch_size, max_length)
    def forward(self, sentence1, sentence2=None, target=None):
        #同时传入两个句子
        if sentence2 is not None:
            vector1 = self.sentence_encoder(sentence1) #vec:(batch_size, hidden_size)
            vector2 = self.sentence_encoder(sentence2)
            #如果有标签,则计算loss
            if target is not None:
                return self.loss(vector1, vector2, target.squeeze())
            #如果无标签,计算余弦距离
            else:
                return self.cosine_distance(vector1, vector2)
        #单独传入一个句子时,认为正在使用向量化能力
        else:
            return self.sentence_encoder(sentence1)


def choose_optimizer(config, model):
    optimizer = config["optimizer"]
    learning_rate = config["learning_rate"]
    if optimizer == "adam":
        return Adam(model.parameters(), lr=learning_rate)
    elif optimizer == "sgd":
        return SGD(model.parameters(), lr=learning_rate)
        

这段代码主要定义一个Siamese 孪生网络模型,用于处理两个句子之间的相似性计算。代码包括了一个句子编码器(SentenceEncoder)和一个主要的网络类(SiameseNetwork),以及一个优化器选择函数。

SentenceEncoder类

SentenceEncoder 是一个简单的神经网络模块,用于将输入句子转换为一个固定维度的向量表示。

  • 构造函数(__init__:

    • hidden_size: 神经网络中的隐藏层维度。
    • vocab_size: 词汇表的大小。加 1 是为了考虑填充(padding)词。
    • max_length: 输入句子的最大长度(这个值在代码中没有使用,但可能在实际使用中会有用)。
    • self.embedding: 使用 nn.Embedding 来将词汇表中的每个词映射到一个隐藏维度的向量。padding_idx=0 表示索引为0的词(即填充词)不参与梯度计算。
    • self.layer: 一个全连接层,输入和输出的维度都是 hidden_size,该层用于变换嵌入后的表示。
    • self.dropout: 用于在训练时随机丢弃一部分神经元,以减少过拟合。丢弃率为0.5。
  • forward 方法:

    • 输入:x(形状为 (batch_size, max_length) 的张量,即一个批次的词汇索引)。
    • self.embedding(x): 将词汇索引转换为词嵌入(形状为 (batch_size, max_length, hidden_size))。
    • self.layer(x): 对嵌入后的表示应用一个线性层(形状变为 (batch_size, max_length, hidden_size))。
    • nn.functional.max_pool1d: 对每个句子(按批次处理)应用最大池化。由于我们处理的是句子,因此进行池化后,x.transpose(1, 2) 的形状是 (batch_size, hidden_size, max_length),使用 max_pool1d 聚合最大特征值,最后将其维度变为 (batch_size, hidden_size)
SiameseNetwork类

SiameseNetwork 是一个典型的孪生网络(Siamese Network),用于计算两个输入句子(或输入向量)之间的相似性。

  • 构造函数(__init__:

    • 初始化 SentenceEncoder 模型,作为句子编码器。
    • self.loss 使用 CosineEmbeddingLoss 作为损失函数,这是一个基于余弦相似度的损失函数,用于衡量两个向量之间的相似性。它期望输入为两个向量及其相似度标签。
  • cosine_distance 方法:

    • 计算两个输入张量 tensor1tensor2 之间的余弦距离。余弦距离的定义是: cosine_distance = 1 − cos ⁡ ( θ ) \text{cosine\_distance} = 1 - \cos(\theta) cosine_distance=1cos(θ)
      其中 cos ⁡ ( θ ) \cos(\theta) cos(θ) 是两个向量的余弦相似度。
    • torch.nn.functional.normalize: 对输入张量进行归一化,使得每个向量的范数(长度)为1,这样计算余弦相似度时只需要考虑方向而不考虑长度。
    • torch.sum(torch.mul(tensor1, tensor2), axis=-1): 计算归一化后的两个向量的点积,即余弦相似度。
  • cosine_triplet_loss 方法:

    • 这是一个用于三元组损失(Triplet Loss)的实现,通常用于训练深度学习模型区分相似和不相似的样本。
    • a 是锚点(anchor),p 是正样本(positive),n 是负样本(negative)。
    • apan 分别是锚点与正样本、锚点与负样本之间的余弦距离。
    • diff 计算的是正负样本之间的差异,若差异大于 0 则返回损失。margin 用于调节该差异。
    • 返回 diff 中大于0的部分的平均值(即只有当正样本与负样本之间的距离差大于某个阈值时,才会产生损失)。
  • forward 方法:

    • 输入:sentence1sentence2(可选),以及 target(标签,表示相似性)。sentence1sentence2 是两个句子的索引。
    • 如果 sentence2 存在,表示需要计算这两个句子的相似性:
      • self.sentence_encoder(sentence1): 通过句子编码器获取 sentence1 的向量表示。
      • self.sentence_encoder(sentence2): 获取 sentence2 的向量表示。
      • 如果提供了 target,计算损失;如果没有提供标签,计算余弦距离。
    • 如果只传入 sentence1,表示仅需对该句子进行向量化。
    • nn.CosineEmbeddingLoss() 计算两个向量之间的余弦相似度损失。目标标签为 1 时,最小化余弦相似度;为 -1 时,最小化负的余弦相似度。其计算公式为:
      余弦相似度:
      cosine_similarity = i n p u t 1 ⋅ i n p u t 2 ∥ i n p u t 1 ∥ ∥ i n p u t 2 ∥ \text{cosine\_similarity} = \frac{input1 \cdot input2}{\|input1\| \|input2\|} cosine_similarity=input1∥∥input2∥input1input2
      损失:
      loss = { max ⁡ ( 0 , 1 − cosine_similarity ) for target 1 max ⁡ ( 0 , cosine_similarity + 1 ) for target -1 \text{loss} = \left\{ \begin{array}{ll} \max(0, 1 - \text{cosine\_similarity}) & \text{for target 1} \\ \max(0, \text{cosine\_similarity} + 1) & \text{for target -1} \end{array} \right. loss={max(0,1cosine_similarity)max(0,cosine_similarity+1)for target 1for target -1
优化器配置

此函数用于根据配置选择优化器。支持的优化器有 AdamSGD

  • 参数:

    • config: 一个字典,包含优化器选择(optimizer)和学习率(learning_rate)等配置。
    • model: 要优化的模型。
  • 返回: 根据 config 中的配置返回一个优化器(AdamSGD)。

总结

  • SentenceEncoder: 将输入句子通过词嵌入和线性层转换为固定维度的向量表示。
  • SiameseNetwork: 用于计算两个句子之间的相似性。它使用 CosineEmbeddingLoss 来训练模型,或者在没有标签的情况下计算句子之间的余弦距离。
  • choose_optimizer: 根据配置选择优化器。

主程序

main.py

# -*- coding: utf-8 -*-

import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import SiameseNetwork, choose_optimizer
from evaluate import Evaluator
from loader import load_data

logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

"""
模型训练主程序
"""

def main(config):
    #创建保存模型的目录
    if not os.path.isdir(config["model_path"]):
        os.mkdir(config["model_path"])
    #加载训练数据
    train_data = load_data(config["train_data_path"], config)
    #加载模型
    model = SiameseNetwork(config)
    # 标识是否使用gpu
    cuda_flag = torch.cuda.is_available()
    if cuda_flag:
        logger.info("gpu可以使用,迁移模型至gpu")
        model = model.cuda()
    #加载优化器
    optimizer = choose_optimizer(config, model)
    #加载效果测试类
    evaluator = Evaluator(config, model, logger)
    #训练
    for epoch in range(config["epoch"]):
        epoch += 1
        model.train()
        logger.info("epoch %d begin" % epoch)
        train_loss = []
        for index, batch_data in enumerate(train_data):
            optimizer.zero_grad()
            if cuda_flag:
                batch_data = [d.cuda() for d in batch_data]
            input_id1, input_id2, labels = batch_data   #输入变化时这里需要修改,比如多输入,多输出的情况
            loss = model(input_id1, input_id2, labels)
            train_loss.append(loss.item())
            # if index % int(len(train_data) / 2) == 0:
            #     logger.info("batch loss %f" % loss)
            loss.backward()
            optimizer.step()
        logger.info("epoch average loss: %f" % np.mean(train_loss))
        evaluator.eval(epoch)
    model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
    torch.save(model.state_dict(), model_path)
    return

if __name__ == "__main__":
    main(Config)

用于训练Siamese网络模型的主程序。首先,创建用于保存模型的目录并加载训练数据。接着,初始化Siamese网络模型,并检查是否有可用的GPU来加速训练。然后,选择合适的优化器和效果测试类。训练过程通过多个epoch进行,每个epoch中,遍历训练数据并计算损失,执行反向传播和优化步骤。每个epoch结束时,计算并输出平均损失,并调用Evaluator类进行模型评估。最后,训练完成后将模型保存到指定路径。

验证与评估

evaluate.py

# -*- coding: utf-8 -*-
import torch
from loader import load_data

"""
模型效果测试
"""

class Evaluator:
    def __init__(self, config, model, logger):
        self.config = config
        self.model = model
        self.logger = logger
        self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
        # 由于效果测试需要训练集当做知识库,再次加载训练集。
        # 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
        self.train_data = load_data(config["train_data_path"], config)
        self.stats_dict = {"correct":0, "wrong":0}  #用于存储测试结果

    #将知识库中的问题向量化,为匹配做准备
    #每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
    def knwb_to_vector(self):
        self.question_index_to_standard_question_index = {}
        self.question_ids = []
        for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
            for question_id in question_ids:
                #记录问题编号到标准问题标号的映射,用来确认答案是否正确
                self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
                self.question_ids.append(question_id)
        with torch.no_grad():
            question_matrixs = torch.stack(self.question_ids, dim=0)
            if torch.cuda.is_available():
                question_matrixs = question_matrixs.cuda()
            self.knwb_vectors = self.model(question_matrixs)
            #将所有向量都作归一化 v / |v|
            self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
        return

    def eval(self, epoch):
        self.logger.info("开始测试第%d轮模型效果:" % epoch)
        self.stats_dict = {"correct":0, "wrong":0}  #清空前一轮的测试结果
        self.model.eval()
        self.knwb_to_vector()
        for index, batch_data in enumerate(self.valid_data):
            if torch.cuda.is_available():
                batch_data = [d.cuda() for d in batch_data]
            input_id, labels = batch_data   #输入变化时这里需要修改,比如多输入,多输出的情况
            with torch.no_grad():
                test_question_vectors = self.model(input_id) #不输入labels,使用模型当前参数进行预测
            self.write_stats(test_question_vectors, labels)
        self.show_stats()
        return

    def write_stats(self, test_question_vectors, labels):
        assert len(labels) == len(test_question_vectors)
        for test_question_vector, label in zip(test_question_vectors, labels):
            #通过一次矩阵乘法,计算输入问题和知识库中所有问题的相似度
            #test_question_vector shape [vec_size]   knwb_vectors shape = [n, vec_size]
            res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
            hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
            hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
            if int(hit_index) == int(label):
                self.stats_dict["correct"] += 1
            else:
                self.stats_dict["wrong"] += 1
        return

    def show_stats(self):
        correct = self.stats_dict["correct"]
        wrong = self.stats_dict["wrong"]
        self.logger.info("预测集合条目总量:%d" % (correct +wrong))
        self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))
        self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))
        self.logger.info("--------------------")
        return

定义一个 Evaluator 类,用于评估模型在验证数据集上的表现,尤其是在知识库中找到与输入问题最匹配的答案。核心流程如下:

  1. 初始化__init__方法加载验证数据集和训练数据集,并初始化评估统计字典stats_dict,该字典用于存储正确和错误的预测数量。

  2. 向量化知识库knwb_to_vector方法将训练集中的问题转化为向量,以便与模型的输出进行匹配。每个问题的向量通过模型生成,并且进行归一化。方法将所有问题与标准问题的编号进行映射,以便后续验证预测是否正确。

  3. 评估模型性能eval方法用于在每个训练轮次中评估模型的效果。通过调用knwb_to_vector生成知识库向量,接着使用模型预测验证集中的问题向量。模型输出的向量与知识库中所有问题的向量进行相似度计算,判断预测是否与真实标签匹配。

  4. 统计和显示结果write_stats方法统计每个问题的预测是否正确,show_stats方法输出评估结果,包括总预测数、正确预测数、错误预测数和准确率。

总之,这段代码实现了一个基于向量相似度的模型评估流程,用于衡量模型在匹配问题和知识库的准确度。

推理预测

predict.py

# -*- coding: utf-8 -*-
import jieba
import torch
from loader import load_data
from config import Config
from model import SiameseNetwork, choose_optimizer

"""
模型效果测试
"""

class Predictor:
    def __init__(self, config, model, knwb_data):
        self.config = config
        self.model = model
        self.train_data = knwb_data
        if torch.cuda.is_available():
            self.model = model.cuda()
        else:
            self.model = model.cpu()
        self.model.eval()
        self.knwb_to_vector()

    #将知识库中的问题向量化,为匹配做准备
    #每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
    def knwb_to_vector(self):
        self.question_index_to_standard_question_index = {}
        self.question_ids = []
        self.vocab = self.train_data.dataset.vocab
        self.schema = self.train_data.dataset.schema
        self.index_to_standard_question = dict((y, x) for x, y in self.schema.items())
        for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
            for question_id in question_ids:
                #记录问题编号到标准问题标号的映射,用来确认答案是否正确
                self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
                self.question_ids.append(question_id)
        with torch.no_grad():
            question_matrixs = torch.stack(self.question_ids, dim=0)
            if torch.cuda.is_available():
                question_matrixs = question_matrixs.cuda()
            self.knwb_vectors = self.model(question_matrixs)
            #将所有向量都作归一化 v / |v|
            self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
        return

    def encode_sentence(self, text):
        input_id = []
        if self.config["vocab_path"] == "words.txt":
            for word in jieba.cut(text):
                input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
        else:
            for char in text:
                input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
        return input_id

    def predict(self, sentence):
        input_id = self.encode_sentence(sentence)
        input_id = torch.LongTensor([input_id])
        if torch.cuda.is_available():
            input_id = input_id.cuda()
        with torch.no_grad():
            test_question_vector = self.model(input_id) #不输入labels,使用模型当前参数进行预测
            res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
            hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
            hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号 
        return  self.index_to_standard_question[hit_index]

if __name__ == "__main__":
    knwb_data = load_data(Config["train_data_path"], Config)
    model = SiameseNetwork(Config)
    model.load_state_dict(torch.load("model_output/epoch_10.pth"))
    pd = Predictor(Config, model, knwb_data)

    while True:
        # sentence = "固定宽带服务密码修改"
        sentence = input("请输入问题:")
        res = pd.predict(sentence)
        print(res)

这段代码实现一个基于Siamese网络的问答匹配系统,用于根据用户输入的问题在知识库中找到最相似的问题并返回对应答案。具体流程如下:

  1. 类初始化Predictor类负责加载配置、模型和训练数据。模型根据可用的硬件环境(GPU或CPU)进行加载,并设置为评估模式(model.eval())。同时,调用knwb_to_vector方法将知识库中的问题转化为向量,便于后续匹配。

  2. 知识库向量化knwb_to_vector方法将知识库中的所有问题编码为向量,并将其归一化,存储在knwb_vectors中。此过程在每次测试时重新进行,以保证每次使用模型的最新参数。

  3. 文本编码encode_sentence方法负责将输入问题转换为模型所需的索引形式。如果配置文件指定使用词汇表words.txt,则对句子进行分词,并将词语映射为词汇表中的索引。

  4. 问题匹配predict方法接收用户输入的问题,编码为索引后,利用模型计算问题的向量表示。然后,计算输入问题向量与知识库中所有问题向量的相似度,找到最匹配的问题并返回其对应的标准问题。

  5. 主程序:在主程序中,加载预训练的模型,并进入一个循环,等待用户输入问题。每次输入后,模型预测并输出最相似的标准问题。

这个系统基于Siamese网络,通过计算向量的相似度来进行高效的问答匹配。

测试结果

请输入问题:手机暂时不用能办理停机吗
停机保号
请输入问题:用手机怎样对VIP密码进行修改
VIP密码修改
请输入问题:去营业厅修改移动密码必须是本人吗
移动密码修改
请输入问题:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2335907.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【leetcode hot 100 72】编辑距离

解法一&#xff1a;递归 解法二&#xff1a;&#xff08;动态规划&#xff09;①定义&#xff1a;dp[i][j]为word1中前i个字符转化为word2中前j个字符所需操作数;dp[m1][n1] ②初始状态&#xff1a;dp[0][j]j(0变为j&#xff0c;需要j步)&#xff0c;dp[i][0]i(i变为0&#xff…

Java练习——day1(反射)

文章目录 练习1练习2练习3思考封装原则与反射合理使用反射“破坏”封装的场景 练习1 编写代码&#xff0c;通过反射获取String类的所有公共方法名称&#xff0c;并按字母顺序打印。 示例代码&#xff1a; import java.lang.reflect.Method; import java.util.Arrays;public …

Docker 安装 Elasticsearch 8.x

Docker 安装 Elasticsearch 8.x 前言一、准备工作二、设置容器的目录结构三、启动一个临时的容器来复制配置文件四、复制配置文件到本地目录五、删除临时容器六、创建并运行容器&#xff0c;挂载本地目录七、修改文件配置监听端口八、端口配置&#xff1a;Host 网络模式 vs Por…

Vue工程化开发脚手架Vue CLI

开发Vue有两种方式 核心包传统开发模式&#xff1a;基于html / css / js 文件&#xff0c;直接引入核心包&#xff0c;开发 Vue。工程化开发模式&#xff1a;基于构建工具&#xff08;例如&#xff1a;webpack&#xff09;的环境中开发Vue。 脚手架Vue CLI Vue CLl 是 Vue 官方…

开源智慧巡检——无人机油田AI视频监控的未来之力

油田巡检&#xff0c;关乎能源命脉&#xff0c;却常受困于广袤地形、高危环境和人工效率瓶颈。管道泄漏、设备故障、非法闯入——这些隐患稍有疏忽&#xff0c;便可能酿成大患。传统巡检已无法满足现代油田对安全与效率的需求&#xff0c;而无人机油田巡检系统正以智能化之力重…

Django从零搭建卖家中心登陆与注册实战

在电商系统开发中&#xff0c;卖家中心是一个重要的组成部分&#xff0c;而用户注册与登陆则是卖家中心的第一步。本文将详细介绍如何使用Django框架从零开始搭建一个功能完善的卖家注册页面&#xff0c;包括前端界面设计和后端逻辑实现。 一、项目概述 我们将创建一个名为sel…

MySQL表的使用(4)

首先回顾一下之前所学的增删查改&#xff0c;这些覆盖了平时使用的80% 我们上节课中学习到了MySQL的约束 其中Primary key 是主键约束&#xff0c;我们今天要学习的是外键约束 插入一个表 外键约束 父表 子表 这条记录中classid为5时候&#xff0c;不能插入&#xff1b; 删除…

ollama修改配置使用多GPU,使用EvalScope进行模型压力测试,查看使用负载均衡前后的性能区别

文章目录 省流结论机器配置不同量化模型占用显存1. 创建虚拟环境2. 创建测试jsonl文件3. 新建测试脚本3. 默认加载方式&#xff0c;单卡运行模型3.1 7b模型输出213 tok/s3.1 32b模型输出81 tok/s3.1 70b模型输出43tok/s 4. 使用负载均衡&#xff0c;多卡运行4.1 7b模型输出217t…

Dijkstra算法求解最短路径—— 从零开始的图论讲解(2)

前言 在本系列第一期:从零开始的图论讲解(1)——图的概念,图的存储,图的遍历与图的拓扑排序-CSDN博客 笔者给大家介绍了 图的概念,如何存图,如何简单遍历图,已经什么是图的拓扑排序 按照之前的学习规划&#xff0c;今天笔者将继续带大家深入了解图论中的一个核心问题&#x…

[连载]Transformer架构详解

Transformer: Attention Is All You Need Paper 地址&#xff1a;https://arxiv.org/abs/1706.03762 Paper 代码&#xff1a;https://github.com/tensorflow/tensor2tensor Paper 作者&#xff1a;Ashish Vaswani,Noam Shazeer,Niki Parmar,Jakob Uszkoreit,Llion Jones,Aidan…

LVGL Video控件和Radiobtn控件详解

LVGL Video控件和Radiobtn控件详解 一、 Video控件详解1. 概述2. 创建和初始化3. 基本属性设置4. 视频控制5. 回调函数6. 高级功能7. 注意事项 二、Radiobtn控件详解1. 概述2. 创建和初始化3. 属性设置4. 状态控制5. 组管理6. 事件处理7. 样式设置8. 注意事项 三、效果展示四、…

组合数哭唧唧

前言&#xff1a;手写一个简单的组合数&#xff0c;但是由于长期没写&#xff0c;导致一些细节没处理好 题目链接 #include<bits/stdc.h> using namespace std; #define endl "\n"#define int long longconst int N (int)2e510; const int Mod (int)1e97;int…

NLP高频面试题(四十二)——RAG系统评估:方法、指标与实践指南

1. 引言:RAG系统概述与评估挑战 检索增强生成(Retrieval-Augmented Generation,简称 RAG)是近年来自然语言处理领域的一个重要进展。RAG系统在大型语言模型生成文本的过程中引入了外部检索模块,从外部知识库获取相关信息,以缓解纯生成模型可能出现的幻觉和知识盲点。通过…

Linux路漫漫

目录 Vim模式 基本操作 文本编辑 更多功能 1. 直接启动 Vim 2. 打开一个已存在的文件 3. 打开多个文件 4. 以只读模式打开文件 5. 从指定行号开始编辑 6. 快速打开并执行命令 7. 检查是否安装了 Vim 8. 退出 Vim 前提条件 SCP 命令格式 具体操作 1. Windows 命…

游戏引擎学习第227天

今天的计划 今天的工作重点是进行吸引模式&#xff08;attract mode&#xff09;的开发&#xff0c;主要是处理游戏的进出和其他一些小的细节问题&#xff0c;这些是之前想要整理和清理的部分。我做了一些工作&#xff0c;将游戏代码中的不同部分分离到逻辑上独立的区域&#…

一键直达:用n8n打造谷歌邮箱到Telegram的实时通知流

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 &#x1f38f;&#xff1a;你只管努力&#xff0c;剩下的交给时间 &#x1f3e0; &#xff1a;小破站 一键直达&#xff1a;用n8n打造谷歌邮箱到Telegram的实时通知流 前言n8n的强大之处实现简便性实战…

【QT】 QT定时器的使用

QT定时器的使用 1. QTimer介绍&#xff08;1&#xff09;QTimer的使用方法步骤示例代码1&#xff1a;定时器的启动和关闭现象&#xff1a;示例代码2&#xff1a;定时器每隔1s在标签上切换图片现象&#xff1a; (2)实际开发的作用 2.日期 QDate(1)主要方法 3.时间 QTime(1)主要方…

【自动化测试】如何获取cookie,跳过登录的简单操作

前言 &#x1f31f;&#x1f31f;本期讲解关于自动化测试函数相关知识介绍~~~ &#x1f308;感兴趣的小伙伴看一看小编主页&#xff1a;GGBondlctrl-CSDN博客 &#x1f525; 你的点赞就是小编不断更新的最大动力 &#x1f386;那么废话…

每天五分钟深度学习PyTorch:RNN CELL模型原理以及搭建

本文重点 RNN Cell(循环神经网络单元)是循环神经网络(RNN)的核心组成部分,用于处理序列数据中的每个时间步,并维护隐藏状态以捕获序列中的时间依赖关系。 RNN CELL的结构 RNN是一个循环结构,它可以看作是RNN CELL的循环,RNN CELL的结构如下图所示,RNN CELL不断进行…

【基于开源insightface的人脸检测,人脸识别初步测试】

简介 InsightFace是一个基于深度学习的开源人脸识别项目,由蚂蚁金服的深度学习团队开发。该项目提供了人脸检测、人脸特征提取、人脸识别等功能,支持多种操作系统和深度学习框架。本文将详细介绍如何在Ubuntu系统上安装和实战InsightFace项目。 目前github有非常多的人脸识…