【人工智能概论】 RNN、LSTM、GRU简单入门与应用举例、代码耗时计算
文章目录
- 【人工智能概论】 RNN、LSTM、GRU简单入门与应用举例、代码耗时计算
- 一. RNN简介
- 1.1 概念简介
- 1.2 方法使用简介
- 二. 编码层embedding
- 2.1 embedding的参数
- 2.2 embedding的理解
- 三. Linear层与CrossEntropyLoss层输入输出的几点说明
- 3.1 Linear层
- 3.2 CrossEntropyLoss层
- 3.3 提它俩的意义
- 四. LSTM、GRU
- 4.1 LSTM
- 4.2 GRU
- 4.3 双向RNN、LSTM、GRU
- 五. pack_padded_sequence
- 5.1 引言
- 5.2 如何压缩
- 5.3 解压缩
- 六. 举例
- 七. 代码运行耗时计算
一. RNN简介
1.1 概念简介
- 循环神经网络(Recurrent Neural Network)
- 理念上与CNN类似,都有权值共享的理念在,CNN是一个核扫空间,RNN是一个核扫时间。
- 具体点说RNN有点像是对线性层的复用。
- RNN的结构展示:
- 每个时间步都会产生一个隐变量hi,hi会作为输入的一部分传给下一个时间步,hi会保存之前时间步里的信息。
1.2 方法使用简介
- 可以利用以下的组合实现构建一个RNN结构
RNN = torch.nn.RNN(input_size, hidden_size, num_layers)
outputs, hidden_n = RNN(inputs, hidden_0)
- 其中:
- input_size可以理解为词编码的维度,hidden_size是隐变量的维度,num_layers是RNN的堆叠层数;
- 为了每次输入的都是同一时间步的数据,inputs的形状为
(seqlen,batch_size,input_size)
;- hidden_0指的是初始隐变量h0,它是个先验数据,不知道不妨全给0,其形状为(num_layers,batch_size,hidden_size);
- outputs是所有时间步产生的隐变量,其尺寸为(seqlen,batch_size,hidden_size);
- hidden_n是最后一个时间步的隐变量hn,其尺寸为(num_layers,batch_size,hidden_size)。
二. 编码层embedding
2.1 embedding的参数
nn.Embedding(num_embeddings,embedding_dim)
- num_embeddings代表词典大小尺寸,比如训练时所可能出现的词语一共5000个词,那么就有num_embedding=5000;
- embedding_dim表示嵌入向量的维度,即用一个多少维的向量来表示一个符号。
2.2 embedding的理解
- embedding说起来天花乱坠,但实际上就是一个存储固定大小的词典的嵌入向量的查找表。
- 输入一个编号embedding就返回这个编号对应的嵌入向量,嵌入向量反映了各个编号代表的符号之间的语义关系。
- 输入为一个编号列表,输出为对应的符号嵌入向量列表。
- embedding的输入只能是编号,而编号用torch.LongTensor表示
三. Linear层与CrossEntropyLoss层输入输出的几点说明
3.1 Linear层
- 对于Linear层,输入形状满足(N, ∗ * ∗,in_features)即可,输出就是(N, ∗ * ∗,out_festures)。
- 满足(N, ∗ * ∗,in_features)这个形式就可以做维度变换,即只对最后一维做维度转换,因此RNN的三维数据也可以直接输入线性层。
3.2 CrossEntropyLoss层
- 对于CrossEntropyLoss层,保障第一个输入(Input)是二维数据且形状为(N,C)其中,N在此语境下可以理解为所有句子中文字的总数,C是class_num即种类数量,每个元素对应某个位置是某个字的概率;
- 第二个输入(target)是一维数据且形状为(N),每个元素对应某个位置应该是那个字的编号。
3.3 提它俩的意义
-
搞清楚它们的输入输出尺寸关系,有助于理解做分类的代码:
-
为什么代码中可以直接把RNN的输出输入到线性层,因为Linear层只对最后一维做变换;
-
为什么要把最后Linear层的输出压缩成二维的,因为CrossEntropyLoss层就是这么要求的。
四. LSTM、GRU
- 作为RNN的升级版,LSTM、GRU在性能上有不小的提高,总体上它俩的思路和效果是类似的,只不过GRU的计算量要小一点,推荐用GRU。
4.1 LSTM
-
LSTM遵循以下的计算公式:
-
网络结构图:
-
输入数据为(inputs,h0,c0),输出数据为(outputs,hn,cn),各部分的形状参照RNN即可,h与c的形状是一致的。
4.2 GRU
-
GRU遵循以下的计算公式:
-
网络结构图:
- 用PyTorch实现GRU,用torch.nn.GRU,具体用法与RNN几乎相同,可直接参照RNN。
4.3 双向RNN、LSTM、GRU
- 它们仨都是类似的,以双向RNN为例,双向就是正反向各做一次,然后拼接。
- 此时,隐变量hi的形状就变成了(num_layers
∗
*
∗num_direction,batch_size,hidden_size),outputs的形状就是(seqlen,batch_size,hidden_size
∗
*
∗num_direction),若用了双向则num_direction=2,用bidirectional参数来确定是否用双向。
- 隐层维度从原本的hidden_size变成了hidden_size ∗ * ∗num_direction。
五. pack_padded_sequence
5.1 引言
- 当训练RNN时,如果想要进行批次化训练,就需要截断与填充。
- 因为句子的长短不一,为方便运算,需要截长补短。
pack_padded_sequence
做的是压缩填充字符的操作,以加快RNN的计算效率。
5.2 如何压缩
- 假设一个批次有六个句子,要将这些句子进行填充操作。
- 此处按照句子的长度逆序排序,pads就是填充,不同颜色代表不同时间步输入的单词。
- 到底
pack_padded_sequence
如何进行压缩,如上图,它根据时间步拉平了上面排序过的句子,在每个时间步用一个数值代表该批次当前时间步内有多少个数据,如上图右侧绿色区域所指代的时间步内,只有3个有效输入,即该时间步下batch值为3。 - 该方法会返回一个
PackedSequence
对象实例,其中包含压缩后的数据data
和每个时间步内批次大小batch_sizes
,比如上面就是tensor([4,3,3,2,1,1])。 - 压缩的时候,需要传入实际语句的长度,以方便后面的逆操作,注意,最新版本的PyTorch已经不需要在传入句子之前对句子的长度进行排序了。
- PyTorch的RNN、LSTM、GRU都可以接受
PackedSequence
,并返回新的PackedSequence
。
5.3 解压缩
- 用
pad_packed_sequence
方法把返回的PackedSequence
进行解压缩。
六. 举例
- 根据名字判断国别
- 字/词 -> 编号 -> one_hot -> embedding -> GRU -> 根据最后一个隐变量hn做分类
- 模型结构:
- 具体代码
import csv
import math
import gzip
import time
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pack_padded_sequence
BATCH_SIZE = 256
N_EPOCHS = 100
N_CHARS = 128
HIDDEN_SIZE = 100
N_LAYER = 2
USE_GPU = True
# 准备数据
class NameDataset(Dataset):
def __init__(self,is_train_set = True):
# 读取数据源文件,根据数据文件的不同,读取方式也是各种各样的
filename = 'names_train.csv.gz' if is_train_set else 'names_test.csv.gz'
with gzip.open(filename,'rt') as f:
reader = csv.reader(f)
rows = list(reader)
# 转化后举例rows[1]=['Ajdrna', 'Czech'] Ajdrna为名字,Czech为国家
# 把名字与国家保存在list里
self.names = [row[0] for row in rows]
self.len = len(self.names)
self.countries = [row[1] for row in rows]
# 把国别及其编号保存成字典
self.country_list = list(sorted(set(self.countries))) # 保存国名在list中
# set(self.countries)是通过构建集合来去除重复的国名
# sorted()是按照Ascall码顺序排序
self.country_dict = self.getCountryDict() # 将国名保存在字典里,名字做key,索引号为value
self.country_num = len(self.country_list) # 求出国家的数量
def __getitem__(self,index):
return self.names[index], self.country_dict[self.countries[index]]
# 根据index在list中找到名字,根据index找到国名,然后在找到对应的索引号
def __len__(self):
return self.len # 数据集的长度
def getCountryDict(self):
country_dict = dict() # 构建空字典
for idx, country_name in enumerate(self.country_list,0):# 0表示序号从0开始
country_dict[country_name] = idx # 把国家名和类别号,用键值对组合起来
return country_dict
def idx2country(self,index):
return self.country_list[index] # 根据国家的类别号找出国家的名
def getCountriesNum(self):
return self.country_num # 获取国家的数量
trainset = NameDataset(is_train_set = True)
trainloader = DataLoader(trainset,batch_size=BATCH_SIZE,shuffle=True)
testset = NameDataset(is_train_set=False)
testloader = DataLoader(testset,batch_size=BATCH_SIZE,shuffle=False)
N_COUNTRY = trainset.getCountriesNum() # 获取国家的数量
# 构建模型
def create_tensor(tensor):
# 实际上就是把数据传送到指定设备上
if USE_GPU:
device = torch.device("cuda:0")
tensor = tensor.to(device)
return tensor
class RNNClassifier(torch.nn.Module):
def __init__(self,input_size,hidden_size,output_size,n_layers=1,bidirectional=True):
super(RNNClassifier,self).__init__()
self.hidden_size = hidden_size
self.n_layers = n_layers
self.n_directions = 2 if bidirectional else 1
self.embedding = torch.nn.Embedding(input_size,hidden_size)
self.gru = torch.nn.GRU(hidden_size,hidden_size,n_layers,
bidirectional = bidirectional)
self.fc = torch.nn.Linear(hidden_size*self.n_directions,output_size)
def _init_hidden(self,batch_size):
hidden = torch.zeros(self.n_layers*self.n_directions,
batch_size,self.hidden_size)
return create_tensor(hidden)
def forward(self,input,seq_lengths):
input = input.t()
# batch_size*seqlen --> seqlen*batch_size
batch_size = input.size(1) # 保存batch_size用于生成初始化隐层
hidden = self._init_hidden(batch_size) # 生成初始化隐层
embedding = self.embedding(input) # 编码
gru_input = pack_padded_sequence(embedding,seq_lengths)#压缩padd
# 一种高效的处理方式, seq_lengths是每个句子的真实长度,其返回的是一个packedsequence类的对象
output, hidden = self.gru(gru_input,hidden)
if self.n_directions == 2: # 双层的话要把隐变量拼接一下
hidden_cat = torch.cat([hidden[-1],hidden[-2]],dim=1)
else:
hidden_cat = hidden[-1]
fc_output = self.fc(hidden_cat) # 做分类
return fc_output
# 确定优化策略
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(),lr=0.001)
# 完善训练与测试代码
def name2list(name):
# 把词转化成Ascall码的组合
arr = [ord(c) for c in name]
# ord() 用于获取Ascall码值
return arr,len(arr) # 返回一个元组
def make_tensors(names,countries):
sequences_and_lengths = [name2list(name) for name in names]
name_sequences = [s1[0] for s1 in sequences_and_lengths]
seq_lengths = torch.LongTensor([s1[1] for s1 in sequences_and_lengths])
countries = countries.long()# .long()是转化成长整型
# padding
seq_tensor = torch.zeros(len(name_sequences),seq_lengths.max()).long()
for idx,(seq,seq_len) in enumerate(zip(name_sequences,seq_lengths),0):
seq_tensor[idx,:seq_len] = torch.LongTensor(seq)
seq_lengths,perm_idx = seq_lengths.sort(dim=0,descending =True)
seq_tensor = seq_tensor[perm_idx]
countries = countries[perm_idx]
return create_tensor(seq_tensor),create_tensor(seq_lengths),create_tensor(countries)
def trainModel():
total_loss = 0
for i,(name,countries) in enumerate(trainloader, 1):
inputs, seq_lengths, target = make_tensors(name,countries) # 此时name和country都是python数据要把他们转换成张量
output = classifier(inputs,seq_lengths.to("cpu"))
loss = criterion(output,target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
if i%10 == 0:
print(f'[{time_since(start)}] Epoch{epoch}',end='')
print(f'[{i*len(inputs)}/{len(trainset)}]',end='')
print(f'loss={total_loss/(i*len(inputs))}')
return total_loss
def testModel():
correct = 0
total = len(testset)
print("evaluating trained model ...")
with torch.no_grad():
for i, (names,countries) in enumerate(testloader, 1):
inputs, seq_lengths,target = make_tensors(names,countries)
output = classifier(inputs,seq_lengths.to("cpu"))
pred = output.max(dim=1,keepdim=True)[1]
correct += pred.eq(target.view_as(pred)).sum().item()
percent = '%.2f' %(100*correct/total)
print(f'Test set: Accuracy {correct}/{total} {percent}%')
return correct/total
# 主函数
# 把执行时间转化成多少分多少秒
def time_since(since):
s = time.time() - since
m = math.floor(s/60)
s -= m * 60
return '%dm %ds' %(m,s)
if __name__ == "__main__":
classifier = RNNClassifier(N_CHARS, HIDDEN_SIZE, N_COUNTRY, N_LAYER)
# 字符数 , 隐层维度 , 国家数量 , GRU的层数
if USE_GPU: # 是否采用GPU
device = torch.device("cuda:0")
classifier.to(device)
start = time.time() # 用于计算持续时间
print("Training for %d epochs..."%N_EPOCHS)
acc_list = [] # 记录每一轮的test数据集上的表现
for epoch in range(1,N_EPOCHS+1):
trainModel()
acc = testModel()
acc_list.append(acc)
print(time_since(start)) # 打印出训练所需时长
# 将表现情况,即准确率,可视化出来
epoch = np.arange(1, len(acc_list) + 1 , 1)
acc_list = np.array(acc_list)
plt.plot(epoch, acc_list)
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.grid()
plt.show()
七. 代码运行耗时计算
import time
import math
def time_since(since):
s = time.time() - since
m = math.floor(s/60)
s -= m * 60
return '%dm %ds' %(m,s)
start = time.time()
print("hello world")
print(time_since(start))