- 🍨 本文为🔗365天深度学习训练营 中的学习记录博客
- 🍖 原作者:K同学啊 | 接辅导、项目定制
目录
- 0. 总结:
- 1. 基础模型
- a. 数据加载
- b. 数据预处理
- c. 模型搭建与初始化
- d. 训练函数
- e. 评估函数
- f.拆分数据集运行模型
- g. 结果可视化
- h. 测试指定数据
- 2. TextCNN(通用模型-待拓展)
- 3. Bert(高级模型-待拓展)
0. 总结:
之前有学习过文本预处理的环节,对文本处理的主要方式有以下三种:
1:词袋模型(one-hot编码)
2:TF-IDF
3:Word2Vec(词向量(Word Embedding) 以及Word2vec(Word Embedding 的方法之一))
详细介绍及中英文分词详见pytorch文本分类(一):文本预处理
上上期主要介绍Embedding,及EmbeddingBag 使用示例(对词索引向量转化为词嵌入向量) ,上期主要介绍:应用三种模型的英文分类
本期将主要介绍中文基本分类(熟悉流程)、拓展:textCNN分类(通用模型)、拓展:Bert分类(模型进阶)
1. 基础模型
a. 数据加载
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms,datasets
import numpy as np
import pandas as pd
import os,PIL,pathlib,warnings
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore") # 忽略警告信息
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
plt.rcParams['figure.dpi'] = 100 # 分辨率
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device
device(type='cuda')
# 加载自定义中文数据集
train_data = pd.read_csv('./data/N4-train.csv',sep = '\t',header = None)
train_data.head()
0 | 1 | |
---|---|---|
0 | 还有双鸭山到淮阴的汽车票吗13号的 | Travel-Query |
1 | 从这里怎么回家 | Travel-Query |
2 | 随便播放一首专辑阁楼里的佛里的歌 | Music-Play |
3 | 给看一下墓王之王嘛 | FilmTele-Play |
4 | 我想看挑战两把s686打突变团竞的游戏视频 | Video-Play |
train_data[0]
0 还有双鸭山到淮阴的汽车票吗13号的
1 从这里怎么回家
2 随便播放一首专辑阁楼里的佛里的歌
3 给看一下墓王之王嘛
4 我想看挑战两把s686打突变团竞的游戏视频
...
12095 一千六百五十三加三千一百六十五点六五等于几
12096 稍小点客厅空调风速
12097 黎耀祥陈豪邓萃雯畲诗曼陈法拉敖嘉年杨怡马浚伟等到场出席
12098 百事盖世群星星光演唱会有谁
12099 下周一视频会议的闹钟帮我开开
Name: 0, Length: 12100, dtype: object
# 构建数据迭代器
def custom_data_iter(texts,labels):
for x,y in zip(texts,labels):
yield x,y
train_iter = custom_data_iter(train_data[0].values[:],train_data[1].values[:])
b. 数据预处理
# 构建词典
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import jieba
# 中文分词方法
tokenizer = jieba.lcut # jieba.cut返回的是一个生成器,而jieba.lcut返回的是一个列表
def yield_tokens(data_iter):
for text,_ in data_iter:
yield tokenizer(text)
vocab = build_vocab_from_iterator(yield_tokens(train_iter),specials=["<unk>"])
vocab.set_default_index(vocab["<unk>"])
Building prefix dict from the default dictionary ...
Loading model from cache C:\Users\Cheng\AppData\Local\Temp\jieba.cache
Loading model cost 0.549 seconds.
Prefix dict has been built successfully.
tokenizer('我想看和平精英上战神必备技巧的游戏视频')
['我', '想', '看', '和平', '精英', '上', '战神', '必备', '技巧', '的', '游戏', '视频']
vocab(['我','想','看','和平','精英','上','战神','必备','技巧','的','游戏','视频'])
[2, 10, 13, 973, 1079, 146, 7724, 7574, 7793, 1, 186, 28]
text_pipeline = lambda x:vocab(tokenizer(x))
label_pipeline = lambda x:label_name.index(x)
label_name = list(set(train_data[1].values[:]))
print(label_name)
['HomeAppliance-Control', 'Audio-Play', 'Other', 'Weather-Query', 'Music-Play', 'Travel-Query', 'TVProgram-Play', 'Alarm-Update', 'Video-Play', 'Calendar-Query', 'FilmTele-Play', 'Radio-Listen']
print(text_pipeline('我想看和平精英上战神必备技巧的游戏视频'))
print(label_pipeline('Video-Play'))
[2, 10, 13, 973, 1079, 146, 7724, 7574, 7793, 1, 186, 28]
8
# 生成数据批次和迭代器
from torch.utils.data import DataLoader
def collate_batch(batch):
label_list,text_list,offsets = [],[],[0]
for (_text,_label) in batch:
# 标签列表
label_list.append(label_pipeline(_label))
# 文本列表
processed_text = torch.tensor(text_pipeline(_text),dtype = torch.int64)
text_list.append(processed_text)
# 偏移量(即语句的总词汇量)
offsets.append(processed_text.size(0))
label_list = torch.tensor(label_list,dtype = torch.int64)
text_list = torch.cat(text_list)
offsets = torch.tensor(offsets[:-1]).cumsum(dim=0) # 返回维度dim中输入元素的累计和
return text_list.to(device),label_list.to(device),offsets.to(device)
# 数据加载器,调用示例
dataloader = DataLoader(train_iter,
batch_size = 8,
shuffle = False,
collate_fn = collate_batch)
c. 模型搭建与初始化
from torch import nn
class TextClassificationModel(nn.Module):
def __init__(self,vocab_size,embed_dim,num_class):
super(TextClassificationModel,self).__init__()
self.embedding = nn.EmbeddingBag(vocab_size, # 词典大小
embed_dim, # 嵌入维度
sparse=False
)
self.fc = nn.Linear(embed_dim,num_class)
self.init_weights()
def init_weights(self):
initrange = 0.5
self.embedding.weight.data.uniform_(-initrange,initrange) # 初始化权重
self.fc.weight.data.uniform_(-initrange,initrange)
self.fc.bias.data.zero_()
def forward(self,text,offsets):
embedded = self.embedding(text,offsets)
return self.fc(embedded)
# 初始化模型
num_class = len(label_name)
vocab_size = len(vocab)
em_size = 64
model = TextClassificationModel(vocab_size,em_size,num_class).to(device)
d. 训练函数
import time
def train(dataloader):
size = len(dataloader.dataset) # 训练集的大小
num_batches = len(dataloader) # 批次数目, (size/batch_size,向上取整)
train_acc,train_loss = 0,0 # 初始化训练损失和正确率
for idx,(text,label,offsets) in enumerate(dataloader):
# 计算预测误差
predicted_label = model(text,offsets) # 网络输出
loss = criterion(predicted_label,label) # 计算网络输出和真实值之间的差距,label为真实值,计算二者差值即为损失
# 反向传播
optimizer.zero_grad() # grad属性归零
loss.backward() # 反向传播
torch.nn.utils.clip_grad_norm_(model.parameters(),0.1)
optimizer.step() # 每一步自动更新
# 记录acc与loss
train_acc += (predicted_label.argmax(1) == label).sum().item()
train_loss += loss.item()
train_acc /= size
train_loss /= num_batches
return train_acc,train_loss
e. 评估函数
import time
def evaluate(dataloader):
test_acc,test_loss,total_count = 0,0,0
with torch.no_grad():
for idx,(text,label,offsets) in enumerate(dataloader):
# 计算预测误差
predicted_label = model(text,offsets)
loss = criterion(predicted_label,label)
# 记录测试数据
test_acc += (predicted_label.argmax(1) == label).sum().item()
test_loss += loss.item()
total_count += label.size(0)
return test_acc/total_count,test_loss/total_count
f.拆分数据集运行模型
from torch.utils.data.dataset import random_split
from torchtext.data.functional import to_map_style_dataset
# 超参数
EPOCHS = 10 # epoch
LR = 5 # 学习率
BATCH_SIZE = 64 # batch size for training
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LR)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1)
total_accu = None
# 构建数据集
train_iter = custom_data_iter(train_data[0].values[:], train_data[1].values[:])
train_dataset = to_map_style_dataset(train_iter)
split_train_, split_valid_ = random_split(train_dataset,
[int(len(train_dataset)*0.8),int(len(train_dataset)*0.2)])
train_dataloader = DataLoader(split_train_, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=collate_batch)
import copy
train_acc = []
train_loss = []
test_acc = []
test_loss = []
best_acc = None # 设置一个最佳准确率,作为最佳模型的判别指标
for epoch in range(1, EPOCHS + 1):
epoch_start_time = time.time()
model.train() # 切换为训练模式
epoch_train_acc,epoch_train_loss = train(train_dataloader)
model.eval() # 切换为测试模式
epoch_test_acc,epoch_test_loss = evaluate(valid_dataloader)
if best_acc is not None and best_acc > epoch_test_acc:
scheduler.step()
else:
best_acc = epoch_test_acc
best_model = copy.deepcopy(model)
train_acc.append(epoch_train_acc)
train_loss.append(epoch_train_loss)
test_acc.append(epoch_test_acc)
test_loss.append(epoch_test_loss)
# 获取当前的学习率
lr = optimizer.state_dict()['param_groups'][0]['lr']
template = ('Epoch:{:2d},Train_acc:{:.1f}%,Train_loss:{:.3f},Test_acc:{:.1f}%,Test_loss:{:.3f},Lr:{:.2E}')
print(template.format(epoch,epoch_train_acc*100,epoch_train_loss,epoch_test_acc*100,epoch_test_loss,lr))
print('Done!')
Epoch: 1,Train_acc:63.8%,Train_loss:1.339,Test_acc:79.5%,Test_loss:0.012,Lr:5.00E+00
Epoch: 2,Train_acc:83.2%,Train_loss:0.592,Test_acc:84.8%,Test_loss:0.008,Lr:5.00E+00
Epoch: 3,Train_acc:88.2%,Train_loss:0.413,Test_acc:86.8%,Test_loss:0.007,Lr:5.00E+00
Epoch: 4,Train_acc:91.1%,Train_loss:0.313,Test_acc:87.6%,Test_loss:0.006,Lr:5.00E+00
Epoch: 5,Train_acc:93.3%,Train_loss:0.241,Test_acc:89.8%,Test_loss:0.006,Lr:5.00E+00
Epoch: 6,Train_acc:95.0%,Train_loss:0.189,Test_acc:89.8%,Test_loss:0.006,Lr:5.00E-01
Epoch: 7,Train_acc:96.7%,Train_loss:0.144,Test_acc:89.6%,Test_loss:0.006,Lr:5.00E-02
Epoch: 8,Train_acc:96.8%,Train_loss:0.139,Test_acc:89.6%,Test_loss:0.006,Lr:5.00E-03
Epoch: 9,Train_acc:96.8%,Train_loss:0.139,Test_acc:89.6%,Test_loss:0.006,Lr:5.00E-04
Epoch:10,Train_acc:96.8%,Train_loss:0.139,Test_acc:89.6%,Test_loss:0.005,Lr:5.00E-05
Done!
g. 结果可视化
epochs_range = range(EPOCHS)
plt.figure(figsize=(12,3))
plt.subplot(1,2,1)
plt.plot(epochs_range,train_acc,label='Training Accuracy')
plt.plot(epochs_range,test_acc,label='Test Accuracy')
plt.legend(loc = 'lower right')
plt.title('Training and Validation Accuracy')
plt.subplot(1,2,2)
plt.plot(epochs_range,train_loss,label='Train Loss')
plt.plot(epochs_range,test_loss,label='Test Loss')
plt.legend(loc = 'lower right')
plt.title('Training and Validation Loss')
plt.show()
h. 测试指定数据
注意以下俩种测试方法,必须保证模型和数据在同样的设备上(GPU或CPU)
def predict(text, text_pipeline):
with torch.no_grad():
text = torch.tensor(text_pipeline(text))
output = model(text, torch.tensor([0]))
return output.argmax(1).item()
# ex_text_str = "随便播放一首专辑阁楼里的佛里的歌"
ex_text_str = "还有双鸭山到淮阴的汽车票吗13号的"
model = model.to("cpu")
print("该文本的类别是:%s" %label_name[predict(ex_text_str, text_pipeline)])
该文本的类别是:Travel-Query
def predict(text, text_pipeline, model, device):
model.eval() # 切换为评估模式
with torch.no_grad():
# 将文本和偏移量张量都移动到相同的设备
text = torch.tensor(text_pipeline(text)).to(device)
offsets = torch.tensor([0]).to(device)
output = model(text, offsets)
return output.argmax(1).item()
ex_text_str = "还有双鸭山到淮阴的汽车票吗13号的"
# 确保模型在设备上
model = model.to(device)
print("该文本的类别是:%s" % label_name[predict(ex_text_str, text_pipeline, model, device)])
该文本的类别是:Travel-Query
2. TextCNN(通用模型-待拓展)
3. Bert(高级模型-待拓展)