前言: 先简单说明下,关于AG_NEWS情感分类的案例,网上很多博客写了,但是要么代码有问题,要么数据集不对,要么API过时,在这里我再更新一篇博客。希望对各位同学有一定的应用效果。
1、DataSets 数据集的处理
不同的深度学习模型训练大致相同,第一步都是对数据集的处理
1.1 下载数据集
这里以 AG_NEWS
数据集为例
1、 通过 AG_NEWS 直接导入的
import torch
from torchtext.datasets import AG_NEWS
train_iter = AG_NEWS(root="./data", splits=('train', 'test'))
root
很明显可以看出来是下载的路径,后面splits
是划分的数据集,据说这种写法执行后,就会再data文件夹下有好几个文件,由于我这边一直下载不成功,所以暂时就没有使用这种办法,如果有朋友能够下载成功,可以在评论区里说一下。
2、 通过 读取
CSV
表格形成数据集
我这边目前就是采用的这种办法,也就是直接读取csv
# 选择合适的tokenizer(分词器)
train_dataset = AgNewDataSet("./data/train.csv", tokenizer)
valid_dataset = AgNewDataSet("./data/test.csv", tokenizer)
不要着急AgNewDataSet
,这个类是我自己写的加载CSV
的一个类,下面会详细的介绍这个类
1.2 处理数据集
数据集加载类
class AgNewDataSet(Dataset):
def __init__(self, csv_file, tokenizer):
self.index = 0 # 迭代时候时候用到的索引
self.csv_file = csv_file # csv文件路径
self.tokenizer = tokenizer # 分词器
self.data = self._load_data() # 加载数据的函数
# 添加 __iter__ 函数表明可以迭代
def __iter__(self):
return self
# 这个函数表明迭代的行为
def __next__(self):
if self.index < len(self.data):
item = self.data[self.index]
self.index += 1
return item
else:
self.index = 0 # 重置索引为 0,使迭代重新开始
raise StopIteration
def _load_data(self):
df = pd.read_csv(self.csv_file)
dataTmep = []
for _, row in df.iterrows():
label = row["label"] # 将类别从1到4映射为0到3
context = row["title"] + row["text"]
dataTmep.append((label, context))
return dataTmep
# 分词生成器
def yield_tokens(self):
for text, _ in self.data:
yield tokenizer(text)
def __len__(self):
return len(self.data)
def __getitem__(self, index):
if index < 0 or index >= len(self):
raise IndexError("索引超出范围")
return self.data[index]
def get_vocab_count(self):
all_token = []
for index in range(len(self.data)):
text = self.data[index][0]
token2 = word_tokenize(text)
all_token.extend(token2)
vocab_count = len(all_token)
return vocab_count
def getRandomData(self, num_samples):
random_data = random.sample(range(len(self.data)), num_samples)
first_data = []
second_data = []
for index in range(len(self.data)):
if index in random_data:
first_data.append(self.__getitem__(index))
else:
second_data.append(self.__getitem__(index))
return first_data, second_data
简单对上面的类进行说明下
1、__init__
中传入csv文件和 分分词器,如果你下载的csv中没有 head
,请打开csv
文件,手动添加下 label,title,text
三个head
,如果不明白打开csv文件看第一行就能明白了。然后在第一行里面调用了加载数据集的函数
2、__iter__
表示可以迭代,并返回自己,这个没啥好说的
3、__next__
表示具体怎么迭代,怎么获取数据了,这个方法有个注意点,就是在迭代完把 index设置为0,丛头开始,要不然可能会报错
4、_load_data
这个函数主要就是读取csv文件,并将数据形成(label,text)
的格式,然后返回去,最终保存在self.data
当中
5、yield_tokens
这是一个分词生成器,生成器函数,并使用生成器对象按需生成值,这在处理大量数据或需要延迟计算时非常有用。在这里根据需要返回分词的结果
6、__len__
返回数据的长度
7、__getitem__
根据索引返回一个数据
7、get_vocab_count
根据索引数据集中所有的单词,但是没有去重
7、__getitem__
根据索引返回一个数据
7、getRandomData
随机取一定量的样本数据,这个在本例中没有用到
2、预处理
# 分词生成器
def yield_token(data_iter):
for _, text in data_iter:
yield tokenizer(text)
# 构建词汇表
vocab = build_vocab_from_iterator(yield_token(train_dataset), specials=["<unk>"])
# 设置默认索引,当某个单词不在词汇表中,则返回0
vocab.set_default_index(vocab["<unk>"])
# 将词汇表保存到vocab.pkl文件中
with open('vocab.pkl', 'wb') as f:
pickle.dump(vocab, f)
# 使用分词器构建PipeLine
text_pipeline = lambda x: vocab(tokenizer(x))
lable_pipeline = lambda x: int(x) - 1
# 定义 collate_batch 函数 在 DataLoader 中会使用,对传入的数据进行批量处理
def collate_batch(batch):
# 存放Label以及text的列表 offset存放每条text的偏移量
label_list, text_list, offset = [], [], [0]
for (_label, _text) in batch:
label_list.append(lable_pipeline(_label))
processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
text_list.append(processed_text)
# 将每一条数据的长度放入offsets列表中
offset.append(processed_text.size(0))
label_list = torch.tensor(label_list, dtype=torch.int64)
# 计算出每一条text的偏移量
offsets = torch.tensor(offset[:-1]).cumsum(dim=0)
text_list = torch.cat(text_list)
return label_list.to(device), text_list.to(device), offsets.to(device)
dataLoader = DataLoader(train_dataset, batch_size=64, shuffle=False, collate_fn=collate_batch)
预处理,主要是对数据进行基本的分割,调整格式等一些列操作。重点介绍下几个函数
1、vocab = build_vocab_from_iterator(yield_token(train_dataset), specials=["<unk>"])
主要是将训练集中的单词变成词汇表
specials=["<unk>"]
在自然语言处理任务中, 通常表示未知的单词或标记。这样的列表可以在处理文本数据时用作特殊标记的集合。
下面是使用build_vocab_from_iterator
函数的一般步骤:
导入必要的库和模块:
from torchtext.vocab import build_vocab_from_iterator
创建一个文本数据的迭代器,该迭代器产生一系列文本样本。这可以是一个列表、文件和数据库等。
text_iterator = ["This is a sample sentence", "Another example sentence"]
使用build_vocab_from_iterator函数来构建词汇表:
vocab = build_vocab_from_iterator(text_iterator)
可选:根据需要设置其他参数,例如指定特殊标记(如、等)、最小词频等。
vocab = build_vocab_from_iterator(text_iterator, specials=["<unk>", "<pad>"], min_freq=2)
现在,您可以访问词汇表中的单词和索引了:
word = "example"
index = vocab[word]
print(f"The index of '{word}' is {index}")
执行完你会看到:
The index of example is 2562
保存词汇表: 保存词汇表主要是后面模型保存在使用的时候会用到,因为我们模型在训练完毕,后面再次使用就不用从头开始训练,直接加载词汇表,加载模型就能预测了
# 将词汇表保存到vocab.pkl文件中
with open('vocab.pkl', 'wb') as f:
pickle.dump(vocab, f)
构建分词器: 这里使用的是lambda
表达式,当然也可以写成函数
# 使用分词器构建PipeLine
text_pipeline = lambda x: vocab(tokenizer(x))
lable_pipeline = lambda x: int(x) - 1
collate_batch
函数:这个函数是DataLoader
对象中需要传递的一个参数,用于对批数据进行处理,具体处理可以看上面的代码
3、定义模型
# 定义模型
class TextClassficationModel(nn.Module):
def __init__(self, vocab_size, embed_dim, num_class):
super(TextClassficationModel, self).__init__()
self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True)
self.fc = nn.Linear(embed_dim, num_class)
self.initWeight()
def initWeight(self):
initrange = 0.5
self.embedding.weight.data.uniform_(-initrange, initrange)
self.fc.weight.data.uniform_(-initrange, initrange)
self.fc.bias.data.zero_()
def forward(self, text, offsets):
embedded = self.embedding(text, offsets)
return self.fc(embedded)
在这个模型里,我们使用了EmbeddingBag
是PyTorch
中的一个类,用于进行文本嵌入(text embedding)
的操作。它可以将文本序列映射为密集的低维向量表示。
EmbeddingBag
的主要作用是将输入的文本序列转换为固定长度的向量表示。与Embedding
层不同,EmbeddingBag
允许输入的文本序列具有可变长度,且可以处理不定长的序列。
以下是EmbeddingBag
的一般使用方法:
导入所需的PyTorch模块:
import torch
import torch.nn as nn
创建一个EmbeddingBag实例,并指定词汇表大小和嵌入维度:
vocab_size = 10000 # 词汇表大小
embed_dim = 300 # 嵌入维度
embedding_bag = nn.EmbeddingBag(vocab_size, embed_dim)
准备输入数据,这可以是一个整数类型的张量,表示文本序列:
input_data = torch.tensor([1, 2, 3, 4, 5]) # 输入的文本序列
offsets = torch.tensor([0,3]) # 指示每个示例的偏移量
# 使用embedding_bag对输入数据进行嵌入操作:
output = embedding_bag(input_data, offsets)
其中,offsets
是一个1维整数张量,表示不同文本序列的起始位置。在上面的示例中,offsets
指示了一个文本序列的起始位置:[0, 3]。offsets是一个1维整数张量,其中的值表示每个示例的起始位置索引。例如,offsets=[0, 3]表示有两个示例,第一个示例的起始位置是索引0,第二个示例的起始位置是索引3。
最后,output
是一个具有固定长度的向量表示,其中每个输入序列对应一个嵌入向量。可以根据具体的任务和需求使用这些向量进行下游任务的处理,例如分类、聚类等。
其中很重要一点EmbeddingBag
自带平均池化
self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True, mode='mean')
当然在其他博客中,我们还可以看到,使用Embedding
的,如果是使用Embedding
的话,那么就需要再添加一层此话,而EmbeddingBag
中只需要更改mode
参数就可以自动计算了。
4、训练前准备
num_class = len(set([label for (label, text) in train_dataset]))
emsize = 64 # 词嵌入维度
vocab_size = len(vocab)
print("num_class :",num_class)
model = TextClassficationModel(vocab_size, emsize, num_class).to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=5)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1)
这里就不多说,绝大部分的模型训练都要设置的参数,比如优化器、损失函数。
torch.optim.lr_scheduler.StepLR是PyTorch中的一个学习率调度器(learning rate scheduler),用于在训练过程中动态地调整学习率。StepLR调度器按照一定的步长(step size)降低学习率。在每个步长的时候,学习率会乘以一个给定的衰减因子(decay factor)。这种调度方式常用于训练过程中进行学习率的阶梯性衰减。
5、训练和验证函数
def train(local_data, epoch):
model.train()
total_acc, total_count = 0, 0
log_interval = 500
start_time = time.time()
for idx, (label, text, offsets) in enumerate(local_data):
predicted_label = model(text, offsets)
loss = criterion(predicted_label, label)
loss.backward()
# 在执行梯度裁剪之前将稀疏张量转换为密集张量
dense_grads = [param.grad.to_dense() for param in model.parameters() if param.grad is not None]
clip_grad_norm_(dense_grads, 0.1)
optimizer.step()
optimizer.zero_grad()
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
if idx % log_interval == 0 and idx > 0:
print(' | epoch {:3d} | {:5d}/{:5d} batches | accuracy {:8.3f} '.format(epoch,
idx, len(local_data),
total_acc / total_count))
total_count, total_acc = 0.0, 0.0
start_time = time.time()
def evaluate(evaluate_dataloader):
model.eval()
total_acc, total_count = 0, 0
with torch.no_grad():
for idx, (label, text, offsets) in enumerate(evaluate_dataloader):
predicted_label = model(text, offsets)
loss = criterion(predicted_label, label)
# 这里的argmax(1) 表示在一行预测概率值中寻找一个最大索引的标签
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
return total_acc / total_count
这里的训练和验证都是很常规的写法,就不多了,就是更新梯度、计算梯度、优化学习率。
6、开始训练
train_datasets = to_map_style_dataset(train_dataset)
test_datasets = to_map_style_dataset(valid_dataset)
num_train = int(len(train_datasets) * 0.95)
split_train_, split_valid_ = random_split(train_datasets, [num_train, len(train_datasets) - num_train])
train_dataloader = DataLoader(split_train_, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_datasets, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=collate_batch)
EPOCHS = 5 # epoch
total_accu = None
for epoch in range(1, EPOCHS + 1):
epoch_start_time = time.time()
train(train_dataloader,epoch)
accu_val = evaluate(valid_dataloader)
if total_accu is not None and total_accu > accu_val:
scheduler.step()
else:
total_accu = accu_val
print('-' * 59)
print('| end of epoch {:3d} | time: {:5.2f}s | valid accuracy {:8.3f} '
.format(epoch, time.time() - epoch_start_time, accu_val))
print('-' * 59)
上面将数据集划分后,使用DataLoader封装数据集
| epoch 1 | 500/ 1782 batches | accuracy 0.699
| epoch 1 | 1000/ 1782 batches | accuracy 0.859
| epoch 1 | 1500/ 1782 batches | accuracy 0.875
-----------------------------------------------------------
| end of epoch 1 | time: 6.26s | valid accuracy 0.871
-----------------------------------------------------------
| epoch 2 | 500/ 1782 batches | accuracy 0.899
| epoch 2 | 1000/ 1782 batches | accuracy 0.899
| epoch 2 | 1500/ 1782 batches | accuracy 0.904
-----------------------------------------------------------
| end of epoch 2 | time: 4.45s | valid accuracy 0.876
-----------------------------------------------------------
| epoch 3 | 500/ 1782 batches | accuracy 0.915
| epoch 3 | 1000/ 1782 batches | accuracy 0.915
| epoch 3 | 1500/ 1782 batches | accuracy 0.915
-----------------------------------------------------------
| end of epoch 3 | time: 4.68s | valid accuracy 0.895
-----------------------------------------------------------
7、验证
print('Checking the results of test dataset.')
accu_test = evaluate(test_dataloader)
print('test accuracy {:8.3f}'.format(accu_test))
8、保存模型
with open('vocab.pkl', 'wb') as f:
pickle.dump(vocab, f)
model = model.to('cpu')
res = predict(ex_text_str, text_pipeline)
print("This is a %s news" % ag_news_label[res])
torch.save(model,"nlpModel.pth")
最后我们得到一个模型和一个词汇表
9、加载模型和词汇表
import torch
import torch.nn as nn
from torchtext.data.utils import get_tokenizer # 分词器
from torchtext.vocab import build_vocab_from_iterator
import pickle
# 定义模型
class TextClassficationModel(nn.Module):
def __init__(self, vocab_size, embed_dim, num_class):
super(TextClassficationModel, self).__init__()
self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True)
self.fc = nn.Linear(embed_dim, num_class)
self.initWeight()
def initWeight(self):
initrange = 0.5
self.embedding.weight.data.uniform_(-initrange, initrange)
self.fc.weight.data.uniform_(-initrange, initrange)
self.fc.bias.data.zero_()
def forward(self, text, offsets):
embedded = self.embedding(text, offsets)
return self.fc(embedded)
# 构建英文分词器
tokenizer = get_tokenizer('basic_english')
with open('vocab.pkl', 'rb') as f:
vocab = pickle.load(f)
ag_news_label = {1: "World",
2: "Sports",
3: "Business",
4: "Sci/Tec"}
text_pipeline = lambda x: vocab(tokenizer(x))
ex_text_str = "Sure! Here's a 50-word sports news snippet for you:Manchester United defeats Liverpool 2-0 in a thrilling Premier League clash. With goals from Marcus Rashford and Mohamed Salah, the match showcased intense competition and skill. United's victory strengthens their position at the top of the league table, while Liverpool looks to bounce back in their upcoming matches. Excitement continues to build in the football world"
# 预测
def predict(text, pipeline):
with torch.no_grad():
text = torch.tensor(pipeline(text))
output = model(text, torch.tensor([0]))
return output.argmax(1).item() + 1
model = torch.load("nlpModel.pth")
model = model.to("cpu")
predict(ex_text_str,text_pipeline)
res = predict(ex_text_str, text_pipeline)
print("This is a %s news" % ag_news_label[res])
从上面代码可以知道,我们加载模型和词汇表,然后预测,很简单的逻辑,这里不多说了。好了这个案例就简单的说到这里。