引言
本文我们通过SiameseNet模型来完成中文文本匹配任务,其中包含了文本匹配任务一般套路,后续只需要修改实现的模型。
数据准备
数据准备包括
- 构建词表(Vocabulary)
- 构建数据集(Dataset)
本次用的是LCQMC通用领域问题匹配数据集,它已经分好了训练、验证和测试集。
我们通过pandas来加载一下。
import pandas as pd
train_df = pd.read_csv(data_path.format("train"), sep="\t", header=None, names=["sentence1", "sentence2", "label"])
train_df.head()
数据是长这样子的,有两个待匹配的句子,标签是它们是否相似。
由于我们这次要构建的模型中包含的是字符级别的嵌入,而不是单词级别的,因此不需要间jieba分词。
通过定义函数读取数据集:
def build_dataframe_from_csv(dataset_csv: str) -> pd.DataFrame:
df = pd.read_csv(
dataset_csv,
sep="\t",
header=None,
names=["sentence1", "sentence2", "label"],
)
return df
train_df = build_dataframe_from_csv("../data/lcqmcdata/train.txt")
test_df = build_dataframe_from_csv("../data/lcqmcdata/test.txt")
dev_df = build_dataframe_from_csv("../data/lcqmcdata/dev.txt")
和上篇文章一样,我们这里可以复用之前写的代码,不熟悉的建议先阅读DSSM实战这篇文章。
from collections import defaultdict
from tqdm import tqdm
import numpy as np
import json
from torch.utils.data import Dataset
import pandas as pd
from typing import Tuple
UNK_TOKEN = "<UNK>"
PAD_TOKEN = "<PAD>"
class Vocabulary:
"""Class to process text and extract vocabulary for mapping"""
def __init__(self, token_to_idx: dict = None, tokens: list[str] = None) -> None:
"""
Args:
token_to_idx (dict, optional): a pre-existing map of tokens to indices. Defaults to None.
tokens (list[str], optional): a list of unique tokens with no duplicates. Defaults to None.
"""
assert any(
[tokens, token_to_idx]
), "At least one of these parameters should be set as not None."
if token_to_idx:
self._token_to_idx = token_to_idx
else:
self._token_to_idx = {}
if PAD_TOKEN not in tokens:
tokens = [PAD_TOKEN] + tokens
for idx, token in enumerate(tokens):
self._token_to_idx[token] = idx
self._idx_to_token = {idx: token for token, idx in self._token_to_idx.items()}
self.unk_index = self._token_to_idx[UNK_TOKEN]
self.pad_index = self._token_to_idx[PAD_TOKEN]
@classmethod
def build(
cls,
sentences: list[list[str]],
min_freq: int = 2,
reserved_tokens: list[str] = None,
) -> "Vocabulary":
"""Construct the Vocabulary from sentences
Args:
sentences (list[list[str]]): a list of tokenized sequences
min_freq (int, optional): the minimum word frequency to be saved. Defaults to 2.
reserved_tokens (list[str], optional): the reserved tokens to add into the Vocabulary. Defaults to None.
Returns:
Vocabulary: a Vocubulary instane
"""
token_freqs = defaultdict(int)
for sentence in tqdm(sentences):
for token in sentence:
token_freqs[token] += 1
unique_tokens = (reserved_tokens if reserved_tokens else []) + [UNK_TOKEN]
unique_tokens += [
token
for token, freq in token_freqs.items()
if freq >= min_freq and token != UNK_TOKEN
]
return cls(tokens=unique_tokens)
def __len__(self) -> int:
return len(self._idx_to_token)
def __getitem__(self, tokens: list[str] | str) -> list[int] | int:
"""Retrieve the indices associated with the tokens or the index with the single token
Args:
tokens (list[str] | str): a list of tokens or single token
Returns:
list[int] | int: the indices or the single index
"""
if not isinstance(tokens, (list, tuple)):
return self._token_to_idx.get(tokens, self.unk_index)
return [self.__getitem__(token) for token in tokens]
def lookup_token(self, indices: list[int] | int) -> list[str] | str:
"""Retrive the tokens associated with the indices or the token with the single index
Args:
indices (list[int] | int): a list of index or single index
Returns:
list[str] | str: the corresponding tokens (or token)
"""
if not isinstance(indices, (list, tuple)):
return self._idx_to_token[indices]
return [self._idx_to_token[index] for index in indices]
def to_serializable(self) -> dict:
"""Returns a dictionary that can be serialized"""
return {"token_to_idx": self._token_to_idx}
@classmethod
def from_serializable(cls, contents: dict) -> "Vocabulary":
"""Instantiates the Vocabulary from a serialized dictionary
Args:
contents (dict): a dictionary generated by `to_serializable`
Returns:
Vocabulary: the Vocabulary instance
"""
return cls(**contents)
def __repr__(self):
return f"<Vocabulary(size={len(self)})>"
class TMVectorizer:
"""The Vectorizer which vectorizes the Vocabulary"""
def __init__(self, vocab: Vocabulary, max_len: int) -> None:
"""
Args:
vocab (Vocabulary): maps characters to integers
max_len (int): the max length of the sequence in the dataset
"""
self.vocab = vocab
self.max_len = max_len
def _vectorize(
self, indices: list[int], vector_length: int = -1, padding_index: int = 0
) -> np.ndarray:
"""Vectorize the provided indices
Args:
indices (list[int]): a list of integers that represent a sequence
vector_length (int, optional): an arugment for forcing the length of index vector. Defaults to -1.
padding_index (int, optional): the padding index to use. Defaults to 0.
Returns:
np.ndarray: the vectorized index array
"""
if vector_length <= 0:
vector_length = len(indices)
vector = np.zeros(vector_length, dtype=np.int64)
if len(indices) > vector_length:
vector[:] = indices[:vector_length]
else:
vector[: len(indices)] = indices
vector[len(indices) :] = padding_index
return vector
def _get_indices(self, sentence: list[str]) -> list[int]:
"""Return the vectorized sentence
Args:
sentence (list[str]): list of tokens
Returns:
indices (list[int]): list of integers representing the sentence
"""
return [self.vocab[token] for token in sentence]
def vectorize(
self, sentence: list[str], use_dataset_max_length: bool = True
) -> np.ndarray:
"""
Return the vectorized sequence
Args:
sentence (list[str]): raw sentence from the dataset
use_dataset_max_length (bool): whether to use the global max vector length
Returns:
the vectorized sequence with padding
"""
vector_length = -1
if use_dataset_max_length:
vector_length = self.max_len
indices = self._get_indices(sentence)
vector = self._vectorize(
indices, vector_length=vector_length, padding_index=self.vocab.pad_index
)
return vector
@classmethod
def from_serializable(cls, contents: dict) -> "TMVectorizer":
"""Instantiates the TMVectorizer from a serialized dictionary
Args:
contents (dict): a dictionary generated by `to_serializable`
Returns:
TMVectorizer:
"""
vocab = Vocabulary.from_serializable(contents["vocab"])
max_len = contents["max_len"]
return cls(vocab=vocab, max_len=max_len)
def to_serializable(self) -> dict:
"""Returns a dictionary that can be serialized
Returns:
dict: a dict contains Vocabulary instance and max_len attribute
"""
return {"vocab": self.vocab.to_serializable(), "max_len": self.max_len}
def save_vectorizer(self, filepath: str) -> None:
"""Dump this TMVectorizer instance to file
Args:
filepath (str): the path to store the file
"""
with open(filepath, "w") as f:
json.dump(self.to_serializable(), f)
@classmethod
def load_vectorizer(cls, filepath: str) -> "TMVectorizer":
"""Load TMVectorizer from a file
Args:
filepath (str): the path stored the file
Returns:
TMVectorizer:
"""
with open(filepath) as f:
return TMVectorizer.from_serializable(json.load(f))
class TMDataset(Dataset):
"""Dataset for text matching"""
def __init__(self, text_df: pd.DataFrame, vectorizer: TMVectorizer) -> None:
"""
Args:
text_df (pd.DataFrame): a DataFrame which contains the processed data examples
vectorizer (TMVectorizer): a TMVectorizer instance
"""
self.text_df = text_df
self._vectorizer = vectorizer
def __getitem__(self, index: int) -> Tuple[np.ndarray, np.ndarray, int]:
row = self.text_df.iloc[index]
return (
self._vectorizer.vectorize(row.sentence1),
self._vectorizer.vectorize(row.sentence2),
row.label,
)
def get_vectorizer(self) -> TMVectorizer:
return self._vectorizer
def __len__(self) -> int:
return len(self.text_df)
完全复用,一个标点符号都没改,所以写一个通用的类很重要。
由于对中文字符串来说 ,本身迭代时就是按字分隔的,所以可以直接传入Vocabulary
的build
方法。
train_corpus = train_df.sentence1.to_list() + train_df.sentence2.to_list()
train_corpus[0]
'喜欢打篮球的男生喜欢什么样的女生'
其中都是这种中文字符串。
vocab = Vocabulary.build(train_corpus, min_freq=1)
vocab
100%|██████████| 477532/477532 [00:02<00:00, 210755.76it/s]
<Vocabulary(size=5041)>
可以看到这里按字拆分的词表中共5041个字符,我们可以确认一下真的是按字拆分的:
vocab.to_serializable()
{'token_to_idx': {'<PAD>': 0,
'<UNK>': 1,
'喜': 2,
'欢': 3,
'打': 4,
'篮': 5,
'球': 6,
'的': 7,
'男': 8,
'生': 9,
'什': 10,
'么': 11,
'样': 12,
'女': 13,
'我': 14,
'手': 15,
'机': 16,
'丢': 17,
'了': 18,
',': 19,
'想': 20,
'换': 21,
'个': 22,
'大': 23,
'家': 24,
'觉': 25,
'得': 26,
'她': 27,
'好': 28,
'看': 29,
'吗': 30,
'求': 31,
...
'*': 984,
'治': 985,
'绑': 986,
'定': 987,
'宗': 988,
'辣': 989,
'椒': 990,
'牙': 991,
'执': 992,
'言': 993,
'进': 994,
'条': 995,
'非': 996,
'常': 997,
'歉': 998,
'历': 999,
...}}
有了词表之后,我们就可以构建向量化类:
vectorizer = TMVectorizer(vocab, args.max_len)
接着就是数据集和数据加载器:
train_dataset = TMDataset(train_df, vectorizer)
test_dataset = TMDataset(test_df, vectorizer)
dev_dataset = TMDataset(dev_df, vectorizer)
train_data_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
dev_data_loader = DataLoader(dev_dataset, batch_size=args.batch_size)
test_data_loader = DataLoader(test_dataset, batch_size=args.batch_size)
我们可以检查下加载器的输出:
for x1, x2, y in train_data_loader:
print(x1)
print(x2)
print(y)
break
tensor([[ 80, 1282, 1729, ..., 0, 0, 0],
[ 428, 519, 893, ..., 0, 0, 0],
[ 31, 3441, 750, ..., 0, 0, 0],
...,
[2980, 2777, 872, ..., 0, 0, 0],
[1153, 1661, 105, ..., 0, 0, 0],
[ 330, 2434, 126, ..., 0, 0, 0]])
tensor([[ 80, 1282, 1729, ..., 0, 0, 0],
[ 909, 838, 472, ..., 0, 0, 0],
[ 31, 3441, 750, ..., 0, 0, 0],
...,
[ 996, 997, 18, ..., 0, 0, 0],
[1153, 1661, 49, ..., 0, 0, 0],
[ 126, 22, 181, ..., 0, 0, 0]])
tensor([1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1,
0, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0,
1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1,
1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 1, 1,
1, 0, 0, 1, 0, 0, 1, 0])
至此数据就准备好了。
构建模型
import torch.nn as nn
import torch
from argparse import Namespace
class SiameseNet(nn.Module):
"""The Siamese Network implemention."""
def __init__(self, args: Namespace) -> None:
"""
Args:
args (Namespace): arguments for the whole network
"""
super().__init__()
if args.activation.lower() == "relu":
activate_func = nn.ReLU()
else:
activate_func = nn.Tanh()
self.embedding = nn.Sequential(
nn.Embedding(args.vocab_size, args.embedding_dim),
nn.Dropout(args.dropout),
nn.LSTM(
args.embedding_dim,
args.lstm_hidden_dim,
num_layers=args.lstm_num_layers,
dropout=args.lstm_dropout,
batch_first=True,
bidirectional=True,
),
)
self.dense = nn.Sequential(
nn.Dropout(args.dropout),
nn.Linear(args.linear_hidden_dim, args.linear_hidden_dim),
activate_func,
nn.Dropout(args.dropout),
)
def forward(self, sentence1: torch.Tensor, sentence2: torch.Tensor) -> torch.Tensor:
"""Using the same network to compute the representations of two sentences
Args:
sentence1 (torch.Tensor): shape (batch_size, seq_len)
sentence2 (torch.Tensor): shape (batch_size, seq_len)
Returns:
torch.Tensor: the cosine similarity between sentence1 and sentence2
"""
embed_1, _ = self.embedding(sentence1)
embed_2, _ = self.embedding(sentence2)
vector_1 = self.dense(torch.mean(embed_1, dim=1))
vector_2 = self.dense(torch.mean(embed_2, dim=1))
return torch.cosine_similarity(vector_1, vector_2, dim=1, eps=1e-8)
实现和DSSM差不多,这里沿用论文的设定,在LSTM中每层间和其他网络层之间引入dropout。不过其他网络层之间的dropout比率设成了0.1。
定义对比损失
class ContrastiveLoss(nn.Module):
def __init__(self, m: float = 0.2) -> None:
"""
Args:
m (float, optional): margin. Defaults to 0.2.
"""
super().__init__()
self.m = m
def forward(self, energy: torch.Tensor, label: torch.Tensor) -> torch.Tensor:
"""Computes the contrastive loss between the embeddings of x1 and x2
Args:
energy (torch.Tensor): the cosine similarity between the embeddings of x1 and x2
label (torch.Tensor): an integer indicates whether x1 and x2 are similar (= 1) or dissimilar (= 0).
Returns:
torch.Tensor:
"""
loss_pos = 0.25 * (1 - energy) ** 2
loss_neg = (
torch.where(
energy < self.m,
torch.full_like(energy, 0),
energy,
)
** 2
)
loss = label * loss_pos + (1 - label) * loss_neg
return loss.sum()
这里完全按照原论文对比损失的定义实现,传入的energy
是计算好的余弦相似度。
训练模型
定义评估指标:
def metrics(y: torch.Tensor, y_pred: torch.Tensor) -> Tuple[float, float, float, float]:
TP = ((y_pred == 1) & (y == 1)).sum().float() # True Positive
TN = ((y_pred == 0) & (y == 0)).sum().float() # True Negative
FN = ((y_pred == 0) & (y == 1)).sum().float() # False Negatvie
FP = ((y_pred == 1) & (y == 0)).sum().float() # False Positive
p = TP / (TP + FP).clamp(min=1e-8) # Precision
r = TP / (TP + FN).clamp(min=1e-8) # Recall
F1 = 2 * r * p / (r + p).clamp(min=1e-8) # F1 score
acc = (TP + TN) / (TP + TN + FP + FN).clamp(min=1e-8) # Accurary
return acc, p, r, F1
定义评估函数:
def evaluate(
data_iter: DataLoader, model: nn.Module
) -> Tuple[float, float, float, float]:
y_list, y_pred_list = [], []
model.eval()
for x1, x2, y in tqdm(data_iter):
x1 = x1.to(device).long()
x2 = x2.to(device).long()
y = y.float().to(device)
similarity = model(x1, x2)
pred = (similarity > 0.5).int()
y_pred_list.append(pred)
y_list.append(y)
y_pred = torch.cat(y_pred_list, 0)
y = torch.cat(y_list, 0)
acc, p, r, f1 = metrics(y, y_pred)
return acc, p, r, f1
评估函数的实现和DSSM差不多,这里用大于0.5表示两段文本是相似的,否则为不相似,这是一个经验参数,可以试着调整。
定义训练函数:
def train(
data_iter: DataLoader,
model: nn.Module,
criterion: ContrastiveLoss,
optimizer: torch.optim.Optimizer,
print_every: int = 500,
verbose=True,
) -> None:
model.train()
for step, (x1, x2, y) in enumerate(tqdm(data_iter)):
x1 = x1.to(device).long()
x2 = x2.to(device).long()
y = y.float().to(device)
similarity = model(x1, x2)
loss = criterion(similarity, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if verbose and (step + 1) % print_every == 0:
pred = (similarity > 0.5).int()
acc, p, r, f1 = metrics(y, pred)
print(
f" TRAIN iter={step+1} loss={loss.item():.6f} accuracy={acc:.3f} precision={p:.3f} recal={r:.3f} f1 score={f1:.4f}"
)
在训练之前定义所需的所有参数:
args = Namespace(
dataset_csv="text_matching/data/lcqmc/{}.txt",
vectorizer_file="vectorizer.json",
model_state_file="model.pth",
save_dir=f"{os.path.dirname(__file__)}/model_storage",
reload_model=False,
cuda=True,
learning_rate=1e-3,
batch_size=128,
num_epochs=10,
max_len=50,
embedding_dim=512,
lstm_hidden_dim=64,
lstm_num_layers=4,
lstm_dropout=0.2,
linear_hidden_dim=128,
activation="relu",
margin=0.3,
dropout=0.1,
min_freq=1,
print_every=500,
verbose=True,
)
对比DSSM有些修改,比如构建词表时的min_freq=1
、max_len=50
表示最长50个字符、学习率调整为0.001
。
最后就是定义优化器和对比损失函数。
optimizer = torch.optim.Adam(model.parameters(), lr=args.learning_rate)
criterion = ContrastiveLoss(args.margin)
for epoch in range(args.num_epochs):
train(
train_data_loader,
model,
criterion,
optimizer,
print_every=args.print_every,
verbose=args.verbose,
)
print("Begin evalute on dev set.")
with torch.no_grad():
acc, p, r, f1 = evaluate(dev_data_loader, model)
print(
f"EVALUATE [{epoch+1}/{args.num_epochs}] accuracy={acc:.3f} precision={p:.3f} recal={r:.3f} f1 score={f1:.4f}"
)
model.eval()
acc, p, r, f1 = evaluate(test_data_loader, model)
print(f"TEST accuracy={acc:.3f} precision={p:.3f} recal={r:.3f} f1 score={f1:.4f}")
由于引入了RNN,训练时间相比DSSM要长一些:
...
TRAIN iter=1500 loss=8.042133 accuracy=0.812 precision=0.912 recal=0.732 f1 score=0.8125
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1866/1866 [01:20<00:00, 23.06it/s]
Begin evalute on dev set.
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 69/69 [00:00<00:00, 81.56it/s]
EVALUATE [9/10] accuracy=0.698 precision=0.704 recal=0.684 f1 score=0.6937
27%|█████████████████████████████████████████████████▋ | 498/1866 [00:22<01:01, 22.24it/s]
TRAIN iter=500 loss=6.863042 accuracy=0.805 precision=0.943 recal=0.694 f1 score=0.8000
53%|███████████████████████████████████████████████████████████████████████████████████████████████████▍ | 998/1866 [00:44<00:37, 23.39it/s]
TRAIN iter=1000 loss=7.218624 accuracy=0.781 precision=0.903 recal=0.718 f1 score=0.8000
80%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▌ | 1499/1866 [01:05<00:15, 22.97it/s]
TRAIN iter=1500 loss=8.405084 accuracy=0.758 precision=0.926 recal=0.649 f1 score=0.7634
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1866/1866 [01:21<00:00, 22.81it/s]
Begin evalute on dev set.
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 69/69 [00:00<00:00, 76.84it/s]
EVALUATE [10/10] accuracy=0.711 precision=0.714 recal=0.702 f1 score=0.7082
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 98/98 [00:01<00:00, 78.78it/s]
TEST accuracy=0.763 precision=0.724 recal=0.849 f1 score=0.7816
最终在测试集上验证效果不错,比上次DSSM模型准确率提高了5个点左右。