1. 实现SRN
import numpy as np
inputs = np.array([[1., 1.],
[1., 1.],
[2., 2.]]) # 初始化输入序列
print('inputs is ', inputs)
state_t = np.zeros(2, ) # 初始化存储器
print('state_t is ', state_t)
w1, w2, w3, w4, w5, w6, w7, w8 = 1., 1., 1., 1., 1., 1., 1., 1.
U1, U2, U3, U4 = 1., 1., 1., 1.
for input_t in inputs:
print('inputs is ', input_t)
print('state_t is ', state_t)
in_h1 = np.dot([w1, w3], input_t) + np.dot([U2, U4], state_t)
in_h2 = np.dot([w2, w4], input_t) + np.dot([U1, U3], state_t)
state_t = in_h1, in_h2
output_y1 = np.dot([w5, w7], [in_h1, in_h2])
output_y2 = np.dot([w6, w8], [in_h1, in_h2])
print('output_y is ', output_y1, output_y2)
import numpy as np
inputs = np.array([[1., 1.],
[1., 1.],
[2., 2.]]) # 初始化输入序列
print('inputs is ', inputs)
state_t = np.zeros(2, ) # 初始化存储器
print('state_t is ', state_t)
w1, w2, w3, w4, w5, w6, w7, w8 = 1., 1., 1., 1., 1., 1., 1., 1.
U1, U2, U3, U4 = 1., 1., 1., 1.
for input_t in inputs:
print('inputs is ', input_t)
print('state_t is ', state_t)
in_h1 = np.tanh(np.dot([w1, w3], input_t) + np.dot([U2, U4], state_t))
in_h2 = np.tanh(np.dot([w2, w4], input_t) + np.dot([U1, U3], state_t))
state_t = in_h1, in_h2
output_y1 = np.dot([w5, w7], [in_h1, in_h2])
output_y2 = np.dot([w6, w8], [in_h1, in_h2])
print('output_y is ', output_y1, output_y2)
import torch
batch_size = 1
seq_len = 3 # 序列长度
input_size = 2 # 输入序列维度
hidden_size = 2 # 隐藏层维度
output_size = 2 # 输出层维度
# RNNCell
cell = torch.nn.RNNCell(input_size=input_size, hidden_size=hidden_size)
# 初始化参数 https://zhuanlan.zhihu.com/p/342012463
for name, param in cell.named_parameters():
if name.startswith("weight"):
# 线性层
liner = torch.nn.Linear(hidden_size, output_size)
liner.weight.data = torch.Tensor([[1, 1], [1, 1]])
liner.bias.data = torch.Tensor([0.0])
seq = torch.Tensor([[[1, 1]],
[[1, 1]],
[[2, 2]]])
hidden = torch.zeros(batch_size, hidden_size)
output = torch.zeros(batch_size, output_size)
for idx, input in enumerate(seq):
print('=' * 20, idx, '=' * 20)
print('Input :', input)
print('hidden :', hidden)
hidden = cell(input, hidden)
output = liner(hidden)
print('output :', output)
import torch
batch_size = 1
seq_len = 3
input_size = 2
hidden_size = 2
num_layers = 1
output_size = 2
cell = torch.nn.RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
for name, param in cell.named_parameters(): # 初始化参数
if name.startswith("weight"):
# 线性层
liner = torch.nn.Linear(hidden_size, output_size)
liner.weight.data = torch.Tensor([[1, 1], [1, 1]])
liner.bias.data = torch.Tensor([0.0])
inputs = torch.Tensor([[[1, 1]],
[[1, 1]],
[[2, 2]]])
hidden = torch.zeros(num_layers, batch_size, hidden_size)
out, hidden = cell(inputs, hidden)
print('Input :', inputs[0])
print('hidden:', 0, 0)
print('Output:', liner(out[0]))
print('Input :', inputs[1])
print('hidden:', out[0])
print('Output:', liner(out[1]))
print('Input :', inputs[2])
print('hidden:', out[1])
print('Output:', liner(out[2]))
2. 实现“序列到序列”
import torch
input_size = 4
hidden_size = 4
batch_size = 1
idx2char = ['e', 'h', 'l', 'o']
x_data = [1, 0, 2, 2, 3]
y_data = [3, 1, 2, 3, 2]
one_hot_lookup = [[1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]]
x_one_hot = [one_hot_lookup[x] for x in x_data]
inputs = torch.Tensor(x_one_hot).view(-1, batch_size, input_size)
labels = torch.LongTensor(y_data).view(-1, 1)
class Model(torch.nn.Module):
def __init__(self, input_size, hidden_size, batch_size):
super(Model, self).__init__()
self.batch_size = batch_size
self.input_size = input_size
self.hidden_size = hidden_size
self.rnncell = torch.nn.RNNCell(input_size=self.input_size,
def forward(self, input, hidden):
hidden = self.rnncell(input, hidden)
return hidden
def init_hidden(self):
return torch.zeros(self.batch_size, self.hidden_size)
net = Model(input_size, hidden_size, batch_size)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.1)
for epoch in range(15): # 循环15轮
loss = 0
optimizer.zero_grad() # 每一次计算前先把梯度归零
hidden = net.init_hidden() # 初始化 h_0
print('Predicted string:', end='')
for input, label in zip(inputs, labels): # 输入seq_length, batch_size, input_size
hidden = net(input, hidden)
loss += criterion(hidden, label) # 所有的损失要用累加,不能用item
_, idx = hidden.max(dim=1) # 找到hidden 里面最大的值,认为是最有可能的
print(idx2char[idx.item()], end='')
loss.backward() # 反向传播
optimizer.step() # 更新参数
print(', Epoch [%d/15] loss=%.4f' % (epoch + 1, loss.item()))
3. “编码器-解码器”的简单实现
import torch
import numpy as np
import torch.nn as nn
import torch.utils.data as Data
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# ?: Symbol that will fill in blank sequence if current batch data size is short than n_step
letter = [c for c in 'SE?abcdefghijklmnopqrstuvwxyz']
letter2idx = {n: i for i, n in enumerate(letter)}
seq_data = [['man', 'women'], ['black', 'white'], ['king', 'queen'], ['girl', 'boy'], ['up', 'down'], ['high', 'low']]
# Seq2Seq Parameter
n_step = max([max(len(i), len(j)) for i, j in seq_data]) # max_len(=5)
n_hidden = 128
n_class = len(letter2idx) # classfication problem
batch_size = 3
def make_data(seq_data):
enc_input_all, dec_input_all, dec_output_all = [], [], []
for seq in seq_data:
for i in range(2):
seq[i] = seq[i] + '?' * (n_step - len(seq[i])) # 'man??', 'women'
enc_input = [letter2idx[n] for n in (seq[0] + 'E')] # ['m', 'a', 'n', '?', '?', 'E']
dec_input = [letter2idx[n] for n in ('S' + seq[1])] # ['S', 'w', 'o', 'm', 'e', 'n']
dec_output = [letter2idx[n] for n in (seq[1] + 'E')] # ['w', 'o', 'm', 'e', 'n', 'E']
dec_output_all.append(dec_output) # not one-hot
# make tensor
return torch.Tensor(enc_input_all), torch.Tensor(dec_input_all), torch.LongTensor(dec_output_all)
enc_input_all: [6, n_step+1 (because of 'E'), n_class]
dec_input_all: [6, n_step+1 (because of 'S'), n_class]
dec_output_all: [6, n_step+1 (because of 'E')]
enc_input_all, dec_input_all, dec_output_all = make_data(seq_data)
class TranslateDataSet(Data.Dataset):
def __init__(self, enc_input_all, dec_input_all, dec_output_all):
self.enc_input_all = enc_input_all
self.dec_input_all = dec_input_all
self.dec_output_all = dec_output_all
def __len__(self): # return dataset size
return len(self.enc_input_all)
def __getitem__(self, idx):
return self.enc_input_all[idx], self.dec_input_all[idx], self.dec_output_all[idx]
loader = Data.DataLoader(TranslateDataSet(enc_input_all, dec_input_all, dec_output_all), batch_size, True)
# Model
class Seq2Seq(nn.Module):
def __init__(self):
super(Seq2Seq, self).__init__()
self.encoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # encoder
self.decoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # decoder
self.fc = nn.Linear(n_hidden, n_class)
def forward(self, enc_input, enc_hidden, dec_input):
# enc_input(=input_batch): [batch_size, n_step+1, n_class]
# dec_inpu(=output_batch): [batch_size, n_step+1, n_class]
enc_input = enc_input.transpose(0, 1) # enc_input: [n_step+1, batch_size, n_class]
dec_input = dec_input.transpose(0, 1) # dec_input: [n_step+1, batch_size, n_class]
# h_t : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
_, h_t = self.encoder(enc_input, enc_hidden)
# outputs : [n_step+1, batch_size, num_directions(=1) * n_hidden(=128)]
outputs, _ = self.decoder(dec_input, h_t)
model = self.fc(outputs) # model : [n_step+1, batch_size, n_class]
return model
model = Seq2Seq().to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(5000):
for enc_input_batch, dec_input_batch, dec_output_batch in loader:
# make hidden shape [num_layers * num_directions, batch_size, n_hidden]
h_0 = torch.zeros(1, batch_size, n_hidden).to(device)
(enc_input_batch, dec_intput_batch, dec_output_batch) = (
enc_input_batch.to(device), dec_input_batch.to(device), dec_output_batch.to(device))
# enc_input_batch : [batch_size, n_step+1, n_class]
# dec_intput_batch : [batch_size, n_step+1, n_class]
# dec_output_batch : [batch_size, n_step+1], not one-hot
pred = model(enc_input_batch, h_0, dec_intput_batch)
# pred : [n_step+1, batch_size, n_class]
pred = pred.transpose(0, 1) # [batch_size, n_step+1(=6), n_class]
loss = 0
for i in range(len(dec_output_batch)):
# pred[i] : [n_step+1, n_class]
# dec_output_batch[i] : [n_step+1]
loss += criterion(pred[i], dec_output_batch[i])
if (epoch + 1) % 1000 == 0:
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
def translate(word):
enc_input, dec_input, _ = make_data([[word, '?' * n_step]])
enc_input, dec_input = enc_input.to(device), dec_input.to(device)
# make hidden shape [num_layers * num_directions, batch_size, n_hidden]
hidden = torch.zeros(1, 1, n_hidden).to(device)
output = model(enc_input, hidden, dec_input)
# output : [n_step+1, batch_size, n_class]
predict = output.data.max(2, keepdim=True)[1] # select n_class dimension
decoded = [letter[i] for i in predict]
translated = ''.join(decoded[:decoded.index('E')])
return translated.replace('?', '')
print('man ->', translate('man'))
print('mans ->', translate('mans'))
print('king ->', translate('king'))
print('black ->', translate('black'))
print('up ->', translate('up'))
import torch
from torch.autograd import Variable
from torch import nn
input_size = 10
batch_size = 5
hidden_size = 20
num_layers = 2
model = nn.RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
x = torch.rand(3, batch_size, input_size) #这里理解为生成batch_size条数据,每条数据长度为3,特征维度为input_size
h0 = torch.zeros(num_layers, batch_size, hidden_size) #这里定义最初始输入的记忆状态,直接设置为0就行
y, h = model(x, h0)
print('y.shape=', y.shape)
print('h.shape=', h.shape)
h1 = H(h0 + x0) #h0为初始化输入,可以直接设置为0
out1 = h1[-1] #也就是所有神经元输出的最后一层
h2 = H(h1 + x1)
out2 = h2[-1]
h3 = H(h2 + x2)
out3 = h3[-1]
最后训练结束,输出的h就是最后一个时刻模型所有神经元的输出,也就是h3,我们上面定义的模型有2层,每层神经元数目为20,于是得到的输出尺寸应该为[2, 20],又因为上面我设置了5条神经元数据,每条都对应一个[2, 20]的输出矩阵,所以最终的输出矩阵尺寸为[2, 5, 20],记得中间的维度是针对第几条数据,希望读者能理解输出和输入都是表示什么。
那么很好理解了吧,模型最后输出的y,就是每个时刻元素的输出,也就是[out1, out2, out3],每个out都是模型最后一层神经元的输出,尺寸为[1, 20],这里我设置的数据每条时间序列有3个元素,每个元素对应一个时刻的输出,于是尺寸为[3, 20],又因为有5条数据,所以最终尺寸为[3, 5, 20]。
这里注意下,RNN的输入格式不同于CNN,CNN模型的输入尺寸是batch_size放前面,也就是[batch_size, input_size_1, input_size_2],而RNN是放中间,也就是
[input_size_1, batch_size, input_size_2],希望各位同学注意下。
import torch
from torch.autograd import Variable
from torch import nn
input_size = 10
batch_size = 5
hidden_size = 20
num_layers = 2
model = nn.RNNCell(input_size=input_size, hidden_size=hidden_size)
x = torch.rand(3, batch_size, input_size) #这里理解为生成batch_size条数据,每条数据长度为3,特征维度为input_size
h0 = torch.zeros(batch_size, hidden_size) #这里定义最初始输入的记忆状态,直接设置为0就行
for i in range(3):
h0 = model(x[i], h0)
y = h0
序列数据(Sequence Data) 是指按照一定顺序排列的数据集合,其中的每个元素被称为序列的一个项(item)。
这是一种通用的端到端序列学习方法,它基于编码-解码(Encoder - Decoder)的架构。一般情况下,输入序列和输出序列有不同的长度(比如机器翻译)。这就需要一个更高级的设置,尤其在没有进一步语境的「序列到序列模型」时。下面是其工作原理:
- 一个 RNN 层(或其中的堆栈)作为「编码器」:它处理输入序列并反馈其内部状态。注意我们抛弃了编码器 RNN 的输出,只恢复其状态。该状态在下一步中充当解码器的「语境」。
- 另一个 RNN 层作为「解码器」:在给定目标序列先前字母的情况下,它被训练以预测目标序列的下一个字符。具体讲,它被训练把目标序列转化为相同序列,但接下来被一个时间步抵消,这一训练过程在语境中被称为「teacher forcing」。更重要的是,编码器把其状态向量用作初始状态,如此编码器获得了其将要生成的信息。实际上,在给定 targets[...t] 的情况下,解码器学习生成 targets[t+1...],前提是在输入序列上。
Seq2Seq 模型通常由两个主要部分组成:编码器(Encoder)和解码器(Decoder)。编码器将输入序列编码成一个固定长度的上下文向量(context vector),然后解码器根据这个上下文向量生成目标序列。
编码器是一个循环神经网络(RNN),如 LSTM 或 GRU,用于处理输入序列,并生成一个上下文向量。这个向量总结了输入序列的全部信息。
解码器也是一个 RNN,使用编码器生成的上下文向量作为初始输入,并逐步生成目标序列的每一个元素。
在训练过程中,解码器在每一步生成一个单词,并使用该单词作为下一步的输入。这种方法被称为教师强制(Teacher Forcing)。
人工智能 - 循环神经网络RNN完全解析:从基础理论到PyTorch实战 - 个人文章 - SegmentFault 思否
RNN 主要由输入层、隐藏层和输出层组成。其中,隐藏层之间的节点不再无连接,而是有连接的,并且隐藏层的输入不仅包括输入层的输出,还包括上一时刻隐藏层的输出。