动手学深度学习55 循环神经网络 RNN 的实现
- 从零开始实现
- 简洁实现
- QA
课件:https://zh-v2.d2l.ai/chapter_recurrent-neural-networks/rnn-scratch.html
从零开始实现
%matplotlib inline
import math
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
import collections
import re
import random
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt', '090b5e7e70c295757f55df93cb0a180b9691891a')
def read_time_machine():
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
lines = f.readlines()
# 替换掉所有符号 转为小写
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
lines = read_time_machine()
print(f'# 文本总行数: {len(lines)}')
print(lines[0])
print(lines[10])
def tokenize(lines, token='word'):
"""将文本行拆分为单词或字符词元"""
if token == 'word': # 一个单词
return [line.split() for line in lines]
elif token == 'char': # 一个字符 字母
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)
tokens = tokenize(lines)
for i in range(11):
print(tokens[i])
class Vocab:
"""文本词表"""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
counter = count_corpus(tokens)
self._token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True)
# 未知词元的索引为0
self.idx_to_token = ['<unk>'] + reserved_tokens
self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
if freq < min_freq:
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
def __len__(self):
return len(self.idx_to_token)
def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]
@property
def unk(self): # 未知词元的索引为0
return 0
@property
def token_freqs(self):
return self._token_freqs
def count_corpus(tokens):
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
return collections.Counter(tokens)
vocab = Vocab(tokens)
print(list(vocab.token_to_idx.items())[:10])
for i in [0, 10]:
print('文本:', tokens[i])
print('索引:', vocab[tokens[i]])
def load_corpus_time_machine(max_tokens=-1):
"""返回时光机器数据集的词元索引列表和词表"""
lines = read_time_machine()
tokens = tokenize(lines, 'char') # 按字符-字母分
vocab = Vocab(tokens)
# 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
# 所以将所有文本行展平到一个列表中
corpus = [vocab[token] for line in tokens for token in line]
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab
def seq_data_iter_random(corpus, batch_size, num_steps):
"""使用随机抽样生成一个小批量子序列"""
# 从随机偏移量开始对序列进行分区,随机范围包括num_steps-1
corpus = corpus[random.randint(0, num_steps - 1):]
# 减去1,是因为我们需要考虑标签
num_subseqs = (len(corpus) - 1) // num_steps
# 长度为num_steps的子序列的起始索引
initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
# 在随机抽样的迭代过程中,
# 来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
random.shuffle(initial_indices)
def data(pos):
# 返回从pos位置开始的长度为num_steps的序列
return corpus[pos: pos + num_steps]
num_batches = num_subseqs // batch_size
for i in range(0, batch_size * num_batches, batch_size):
# 在这里,initial_indices包含子序列的随机起始索引
initial_indices_per_batch = initial_indices[i: i + batch_size]
X = [data(j) for j in initial_indices_per_batch]
Y = [data(j + 1) for j in initial_indices_per_batch]
yield torch.tensor(X), torch.tensor(Y)
my_seq = list(range(35))
for X, Y in seq_data_iter_random(my_seq, batch_size=2, num_steps=5):
print('X: ', X, '\nY:', Y)
def seq_data_iter_sequential(corpus, batch_size, num_steps):
"""使用顺序分区生成一个小批量子序列"""
# 从随机偏移量开始划分序列
offset = random.randint(0, num_steps)
num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
Xs = torch.tensor(corpus[offset: offset + num_tokens])
Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
num_batches = Xs.shape[1] // num_steps
for i in range(0, num_steps * num_batches, num_steps):
X = Xs[:, i: i + num_steps]
Y = Ys[:, i: i + num_steps]
yield X, Y
for X, Y in seq_data_iter_sequential(my_seq, batch_size=2, num_steps=5):
print('X: ', X, '\nY:', Y)
class SeqDataLoader:
"""加载序列数据的迭代器"""
def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
if use_random_iter:
self.data_iter_fn = seq_data_iter_random
else:
self.data_iter_fn = seq_data_iter_sequential
self.corpus, self.vocab = load_corpus_time_machine(max_tokens)
self.batch_size, self.num_steps = batch_size, num_steps
def __iter__(self):
return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
def load_data_time_machine(batch_size, num_steps, use_random_iter=False, max_tokens=10000):
"""返回时光机器数据集的迭代器和词表"""
data_iter = SeqDataLoader(
batch_size, num_steps, use_random_iter, max_tokens)
return data_iter, data_iter.vocab
# num_steps 每一次取多长的序列 时间T
batch_size, num_steps = 32, 35
# vocab 把整数index转成对应的词
train_iter, vocab = load_data_time_machine(batch_size, num_steps)
# 独热编码 [0, 2]表示下标-类别 给一个下标,返回一个向量表示
F.one_hot(torch.tensor([0, 2]), len(vocab))
# 小批量数据形状是二维张量: (批量大小,时间步数)--三维
X = torch.arange(10).reshape((2, 5))
# 转置好处:把时间提前,每次访问x_t都是一个连续的序列
F.one_hot(X.T, 28).shape
# 初始化循环神经网络的模型参数 关键函数
def get_params(vocab_size, num_hiddens, device):
# 多分类,类别个数字典所有字,可以是任意一个字
num_inputs = num_outputs = vocab_size
def normal(shape):
# 最简单初始化 均值为0 方差为1 *0.01 把方差变成0.01
return torch.randn(size=shape, device=device) * 0.01
# 隐藏层参数
# 输入x映射到隐藏层的大小
W_xh = normal((num_inputs, num_hiddens))
# 上一个时刻隐藏层变量映射到下一个时刻隐藏层变量
W_hh = normal((num_hiddens, num_hiddens))
# 隐藏元的b 偏移
b_h = torch.zeros(num_hiddens, device=device)
# 输出层参数
# 隐藏变量到输出的映射
W_hq = normal((num_hiddens, num_outputs))
# 输出层的b 偏移
b_q = torch.zeros(num_outputs, device=device)
# 附加梯度
params = [W_xh, W_hh, b_h, W_hq, b_q]
for param in params:
# 参数列表需要计算梯度
param.requires_grad_(True)
return params
# 在初始化时返回隐藏状态
def init_rnn_state(batch_size, num_hiddens, device):
# 写成tuple LSTM有两个值,为了统一化。
return (torch.zeros((batch_size, num_hiddens), device=device), )
# 给定一个小批量 计算所有时间步【x0-xt】 得到输出?
# 定义了如何在一个时间步内计算隐藏状态和输出
def rnn(inputs, state, params):
# inputs的形状:(时间步数量,批量大小,词表大小)
# state 初始隐藏状态
W_xh, W_hh, b_h, W_hq, b_q = params
H, = state
outputs = []
# inputs 3Dtensor 时间的步数,批量大小,词表大小
# X的形状:(批量大小,词表大小)
for X in inputs:
# H 前一个时间的隐藏状态 先更新隐藏状态
H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
# 当前时刻的预测 预测下一个时刻的字符是谁
Y = torch.mm(H, W_hq) + b_q
outputs.append(Y)
# n个矩阵按列拼起来,列数不变,行数是批量大小*时间步数
return torch.cat(outputs, dim=0), (H,)
# 包装rnn函数
class RNNModelScratch:
"""从零开始实现的循环神经网络模型"""
def __init__(self, vocab_size, num_hiddens, device,
get_params, init_state, forward_fn):
self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
self.params = get_params(vocab_size, num_hiddens, device)
self.init_state, self.forward_fn = init_state, forward_fn
# 重写 __call__ 或者 写forward函数都行
def __call__(self, X, state):
# x load的数据集 (批量大小,时间步数) onehot 整型变成浮点型
X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
return self.forward_fn(X, state, self.params)
def begin_state(self, batch_size, device):
return self.init_state(batch_size, self.num_hiddens, device)
num_hiddens = 512
net = RNNModelScratch(len(vocab), num_hiddens,d2l.try_gpu(), get_params,init_rnn_state, rnn)
# 初始状态
state = net.begin_state(X.shape[0], d2l.try_gpu())
Y, new_state = net(X.to(d2l.try_gpu()), state)
print(Y.shape, len(new_state), new_state[0].shape)
# 预测
def predict_ch8(prefix, num_preds, net, vocab, device):
"""
prefix: 给定句子的开头
num_preds: 预测多少词
在prefix后面生成新字符"""
state = net.begin_state(batch_size=1, device=device)
outputs = [vocab[prefix[0]]]
get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
for y in prefix[1:]: # 预热期
_, state = net(get_input(), state)
outputs.append(vocab[y])
for _ in range(num_preds): # 预测num_preds步
y, state = net(get_input(), state)
# 多分类 拿出最大概率的索引
outputs.append(int(y.argmax(dim=1).reshape(1)))
# token转成字符串拼接输出
return ''.join([vocab.idx_to_token[i] for i in outputs])
predict_ch8('time traveller ', 10, net, vocab, d2l.try_gpu())
# 辅助函数 rnn 梯度乘法太多 容易梯度爆炸
def grad_clipping(net, theta):
"""裁剪梯度"""
# 拿出所有参与训练层的参数
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
# 把所有层的梯度拉成一个长向量,计算L2
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
# 梯度太大做映射投影 梯度小不做处理
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
# 训练一个epoch
def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
"""训练网络一个迭代周期(定义见第8章)
use_random_iter 随机iterate 下一个batch的样本和上一个batch的样本没有关系 不随机的话 两个batch的样本是相邻的
"""
state, timer = None, d2l.Timer()
metric = d2l.Accumulator(2) # 训练损失之和,词元数量
for X, Y in train_iter:
if state is None or use_random_iter:
# 在第一次迭代或使用随机抽样时初始化state
state = net.begin_state(batch_size=X.shape[0], device=device)
else:
if isinstance(net, nn.Module) and not isinstance(state, tuple):
# state对于nn.GRU是个张量
# detach_()不改变state的值
state.detach_()
else:
# state对于nn.LSTM或对于我们从零开始实现的模型是个张量
for s in state:
s.detach_()
y = Y.T.reshape(-1)
X, y = X.to(device), y.to(device)
y_hat, state = net(X, state)
l = loss(y_hat, y.long()).mean()
if isinstance(updater, torch.optim.Optimizer):
updater.zero_grad()
l.backward()
grad_clipping(net, 1)
updater.step()
else:
l.backward()
grad_clipping(net, 1)
# 因为已经调用了mean函数
updater(batch_size=1)
metric.add(l * y.numel(), y.numel())
# crossentropy 交叉熵 --> 困惑度 做个指数
return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
def train_ch8(net, train_iter, vocab, lr, num_epochs, device, use_random_iter=False):
"""训练模型(定义见第8章)"""
loss = nn.CrossEntropyLoss()
animator = d2l.Animator(xlabel='epoch', ylabel='perplexity', legend=['train'], xlim=[10, num_epochs])
# 初始化
if isinstance(net, nn.Module):
updater = torch.optim.SGD(net.parameters(), lr)
else:
updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device)
# 训练和预测
for epoch in range(num_epochs):
ppl, speed = train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter)
if (epoch + 1) % 10 == 0:
print(predict('time traveller'))
animator.add(epoch + 1, [ppl])
print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')
print(predict('time traveller'))
print(predict('traveller'))
num_epochs, lr = 500, 1
# train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu())
train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu(), use_random_iter=True)
困惑度 1.1, 91434.3 词元/秒 cuda:0
time travelleryou can show black is white by argument said filby
travelleryou can show black is white by argument said filby
使用随机iter
简洁实现
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
# 加载数据
batch_size, num_steps = 32, 35
train_iter, vocab = load_data_time_machine(batch_size, num_steps)
num_hiddens = 256
rnn_layer = nn.RNN(len(vocab), num_hiddens)
# 初始化隐藏状态
state = torch.zeros((1, batch_size, num_hiddens))
print(state.shape)
# 初始化X Y还是3D
X = torch.rand(size=(num_steps, batch_size, len(vocab)))
Y, state_new = rnn_layer(X, state)
print(Y.shape, state_new.shape)
class RNNModel(nn.Module):
"""循环神经网络模型"""
def __init__(self, rnn_layer, vocab_size, **kwargs):
super(RNNModel, self).__init__(**kwargs)
self.rnn = rnn_layer
self.vocab_size = vocab_size
self.num_hiddens = self.rnn.hidden_size
# 如果RNN是双向的(之后将介绍),num_directions应该是2,否则应该是1
if not self.rnn.bidirectional:
self.num_directions = 1
# 构造自己的输出层
self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
else:
self.num_directions = 2
self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)
def forward(self, inputs, state):
X = F.one_hot(inputs.T.long(), self.vocab_size)
X = X.to(torch.float32)
# Y 中间隐藏层的Y 时间步数,batch大小,隐藏层大小
Y, state = self.rnn(X, state)
# 全连接层首先将Y的形状改为(时间步数*批量大小,隐藏单元数)
# 它的输出形状是(时间步数*批量大小,词表大小)。
output = self.linear(Y.reshape((-1, Y.shape[-1])))
return output, state
def begin_state(self, device, batch_size=1):
if not isinstance(self.rnn, nn.LSTM):
# nn.GRU以张量作为隐状态
return torch.zeros((self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens),
device=device)
else:
# nn.LSTM以元组作为隐状态
return (torch.zeros((
self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens), device=device),
torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device))
# 初始化
device = d2l.try_gpu()
net = RNNModel(rnn_layer, vocab_size=len(vocab))
net = net.to(device)
predict_ch8('time traveller', 10, net, vocab, device)
num_epochs, lr = 500, 1
train_ch8(net, train_iter, vocab, lr, num_epochs, device)
# 框架实现 把多个小矩阵乘法换成大矩阵乘法
QA
1 num_steps 输入到小批量里面面样本句子的长度。
2 torchserve 开始使用java编写的, triton框架。
3 转置是为了怎么取数据方便。
4 dim=0 在0维度堆积 一列多行的结果
5 normal 初始化权重
6 为什么是批量大小乘以时间长度? 每个批量每个样本要做t次分类【任意时间点都要做一次分类】,所以要乘。
7 不会预测特别长的东西。append没有模型内存爆炸的概率高。
8 H每个时间点都会更新一次。?
9 2批量大小
10 是
11 batch_size 和 time的区别? time是样本句子的长度,和数据相关; batch_sizi 批量大小。
12 prefix–label。预测的时候不要更新模型
13 依赖长度–T
14 单个时间点是一个矩阵–站点数。rnn做气象预测。
15 num_step 是计算多少次 不是隐藏层数
16 每一个帧用cnn抽特征–向量,不需要one-hot. rnn不能处理特别长的序列
17 梯度计算。不做detach会记住前面计算的东西,误差反传会多算东西。只关注当前计算的东西。
18 rnn不能做超长序列【超过100就难用了】。一般随机取。
19 随机取对模型好,不容易overfitting ; 第二rnn做不了超长序列
20 可以做。不去掉标点等。
21 字符预测起来比较简单,输出是28voab,分类数少。
22 定义了两个函数 样本batch随机取和batch相连。
23 h-长为256的向量,很长很难计算。变长类似很长的mlp,容易过拟合。
25 工具成熟 发展没有特别快
26 根据频率采样。把高频词概率压一压–低采样。
27 端侧–主要车上,未来家里所有深度学习都跑在车上–家庭超级计算机。
28 车内–人互动、娱乐–未来; 车外-自动驾驶