1.算法简介
参考论文:Elman J L. Finding structure in time[J]. Cognitive science, 1990, 14(2): 179-211.,谷歌被引次数超16000!
说到循环递归结构就不得不提到其鼻祖RNN网络。首先我们先对RNN有个初步的概念:想象一下,你正在阅读一本非常吸引人的小说。每次你翻开新的一页,你的大脑不仅会处理这一页上的内容,还会结合之前读过的所有内容来理解故事的情节、人物关系和背景设定。这就是一种“记忆”和“上下文理解”的过程,因为你大脑中的信息不是孤立的,而是连续且相互关联的。RNN就是模仿了这种“记忆”功能的神经网络。在传统的神经网络中,假设每个输入都是独立的,没有前后联系。但RNN不同,它专门设计用来处理序列数据——也就是那些按顺序排列,其中每个元素都与前后的元素有关联的数据,比如时间序列数据(股价、温度变化)、自然语言(句子、对话)等。在RNN中,有一个特殊的循环连接,让信息能够从前一个时间点传递到下一个时间点。就像你在读书时,上一页的信息会影响你对下一页的理解。RNN的这个特性让它能够记住前面的信息,这样当处理后续数据时,它就能够利用这些记忆来做出更好的决策或预测。有了一个初步概念之后,我们现在来具体的讲一讲RNN到底是什么以及其是怎么运行的。
2. RNN算法原理
RNN的核心思想是利用序列中的时序信息。与传统的前馈神经网络不同,RNN引入了循环连接,使网络能够保留之前时间步的信息。如上面的GIF图所示(引自:RNN):
其中X表示当前时刻的输入,以一个天气预报任务为例,我们将使用过去3天(前天、昨天和今天)的温度(T’)、湿度(H)和风速(W)来预测下一天的温度(T)。对于这个任务而言,我们的输入数据是[
X
T
t
−
2
X_{T_{t-2}}
XTt−2,
X
T
t
−
1
X_{T_{t-1}}
XTt−1,
X
T
t
X_{T_{t}}
XTt,
X
H
t
−
2
X_{H_{t-2}}
XHt−2,
X
H
t
−
1
X_{H_{t-1}}
XHt−1,
X
H
t
X_{H_{t}}
XHt,
X
W
t
−
2
X_{W_{t-2}}
XWt−2,
X
W
t
−
1
X_{W_{t-1}}
XWt−1,
X
W
t
X_{W_{t}}
XWt]即X是一个1*9的输入。我们的输出是明天的温度
y
T
t
+
1
y_{T_{t+1}}
yTt+1。即我们需要建一个RNN模型
f
(
x
)
f(x)
f(x):
y
T
t
+
1
=
f
(
X
T
t
−
2
,
X
T
t
−
1
,
X
T
t
,
X
H
t
−
2
,
X
H
t
−
1
,
X
H
t
,
X
W
t
−
2
,
X
W
t
−
1
,
X
W
t
)
y_{T_{t+1}}=f(X_{T_{t-2}},X_{T_{t-1}},X_{T_{t}},X_{H_{t-2}},X_{H_{t-1}},X_{H_{t}},X_{W_{t-2}},X_{W_{t-1}},X_{W_{t}})
yTt+1=f(XTt−2,XTt−1,XTt,XHt−2,XHt−1,XHt,XWt−2,XWt−1,XWt)
那么具体该怎么实现这个模型呢?待我一步步分解:
2.1 RNN基本结构介绍
RNN的核心是其循环结构,它允许信息在序列中传递。对于我们的天气预报任务,RNN的基本结构可以表示为:
h
t
=
t
a
n
h
(
W
h
h
∗
h
t
−
1
+
W
x
h
∗
x
t
+
b
h
)
h_t = tanh(W_hh * h_{t-1} + W_xh * x_t + b_h)
ht=tanh(Whh∗ht−1+Wxh∗xt+bh)
y
t
=
W
h
y
∗
h
t
+
b
y
y_t = W_hy * h_t + b_y
yt=Why∗ht+by
其中:
- h t h_t ht 是当前时间步的隐藏状态
- x t x_t xt 是当前时间步的输入
- W h h W_hh Whh, W x h W_xh Wxh, W h y W_hy Why 是权重矩阵,也就是网络的可学习参数,是未知量。
- b h b_h bh, b y b_y by 是偏置项
- tanh 是激活函数
2.2 计算流程
对于我们的天气预报任务,计算流程如下:
a) 初始化隐藏状态 h 0 h_0 h0(通常为零向量)
b) 对于每个时间步 t = 1 to 3:
h
t
−
2
=
h
0
h_{t-2}=h_0
ht−2=h0
h
t
−
1
=
t
a
n
h
(
W
h
h
∗
h
t
−
2
+
W
x
h
∗
[
X
T
t
−
1
,
X
H
t
−
1
,
X
W
t
−
1
]
+
b
h
)
h_{t-1} = tanh(W_{h}h * h_{t-2}+ W_xh*[X_{T_{t-1}}, X_{H_{t-1}}, X_{W_{t-1}}]+ b_h)
ht−1=tanh(Whh∗ht−2+Wxh∗[XTt−1,XHt−1,XWt−1]+bh)
h
t
=
t
a
n
h
(
W
h
h
∗
h
t
−
2
+
W
x
h
∗
[
X
T
t
,
X
H
t
,
X
W
t
]
+
b
h
)
h_{t} = tanh(W_{h}h * h_{t-2}+ W_xh*[X_{T_t}, X_{H_t}, X_{W_t}]+ b_h)
ht=tanh(Whh∗ht−2+Wxh∗[XTt,XHt,XWt]+bh)
c) 最后,我们使用最终的隐藏状态来预测明天的温度:
y
T
t
+
1
=
W
h
y
∗
h
t
+
b
y
y_{T_{t+1}} = W_hy * h_t + b_y
yTt+1=Why∗ht+by
然后我手搓了一个RNN代码便于各位理解:
class RNN:
def __init__(self, input_size, hidden_size, output_size):
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
# 初始化权重
self.Wxh = np.random.randn(hidden_size, input_size) * 0.01
self.Whh = np.random.randn(hidden_size, hidden_size) * 0.01
self.Why = np.random.randn(output_size, hidden_size) * 0.01
# 初始化偏置
self.bh = np.zeros((hidden_size, 1))
self.by = np.zeros((output_size, 1))
# 初始化Adam优化器
self.optimizer = Adam({
'Wxh': self.Wxh, 'Whh': self.Whh, 'Why': self.Why,
'bh': self.bh, 'by': self.by
})
def forward(self, inputs):
h = np.zeros((self.hidden_size, 1))
self.last_inputs = inputs
self.last_hs = {0: h}
# 前向传播
for t, x in enumerate(inputs):
h = np.tanh(np.dot(self.Wxh, x) + np.dot(self.Whh, h) + self.bh)
self.last_hs[t + 1] = h
y = np.dot(self.Why, h) + self.by
return y, h
def backward(self, d_y):
n = len(self.last_inputs)
# 初始化梯度
d_Wxh = np.zeros_like(self.Wxh)
d_Whh = np.zeros_like(self.Whh)
d_Why = np.zeros_like(self.Why)
d_bh = np.zeros_like(self.bh)
d_by = np.zeros_like(self.by)
d_h = np.dot(self.Why.T, d_y)
# 反向传播
for t in reversed(range(n)):
temp = (1 - self.last_hs[t + 1] ** 2) * d_h
d_Wxh += np.dot(temp, self.last_inputs[t].T)
d_Whh += np.dot(temp, self.last_hs[t].T)
d_bh += temp
d_h = np.dot(self.Whh.T, temp)
d_Why = np.dot(d_y, self.last_hs[n].T)
d_by = d_y
# 使用Adam优化器更新参数
self.optimizer.step({
'Wxh': d_Wxh, 'Whh': d_Whh, 'Why': d_Why,
'bh': d_bh, 'by': d_by
})
3.完整训练代码
import numpy as np
class Adam:
def __init__(self, params, lr=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8):
self.params = params
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
self.m = {k: np.zeros_like(v) for k, v in params.items()}
self.v = {k: np.zeros_like(v) for k, v in params.items()}
self.t = 0
def step(self, grads):
self.t += 1
for k in self.params.keys():
self.m[k] = self.beta1 * self.m[k] + (1 - self.beta1) * grads[k]
self.v[k] = self.beta2 * self.v[k] + (1 - self.beta2) * (grads[k] ** 2)
m_hat = self.m[k] / (1 - self.beta1 ** self.t)
v_hat = self.v[k] / (1 - self.beta2 ** self.t)
self.params[k] -= self.lr * m_hat / (np.sqrt(v_hat) + self.epsilon)
class RNN:
def __init__(self, input_size, hidden_size, output_size):
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
# 初始化权重
self.Wxh = np.random.randn(hidden_size, input_size) * 0.01
self.Whh = np.random.randn(hidden_size, hidden_size) * 0.01
self.Why = np.random.randn(output_size, hidden_size) * 0.01
# 初始化偏置
self.bh = np.zeros((hidden_size, 1))
self.by = np.zeros((output_size, 1))
# 初始化Adam优化器
self.optimizer = Adam({
'Wxh': self.Wxh, 'Whh': self.Whh, 'Why': self.Why,
'bh': self.bh, 'by': self.by
})
def forward(self, inputs):
h = np.zeros((self.hidden_size, 1))
self.last_inputs = inputs
self.last_hs = {0: h}
# 前向传播
for t, x in enumerate(inputs):
h = np.tanh(np.dot(self.Wxh, x) + np.dot(self.Whh, h) + self.bh)
self.last_hs[t + 1] = h
y = np.dot(self.Why, h) + self.by
return y, h
def backward(self, d_y):
n = len(self.last_inputs)
# 初始化梯度
d_Wxh = np.zeros_like(self.Wxh)
d_Whh = np.zeros_like(self.Whh)
d_Why = np.zeros_like(self.Why)
d_bh = np.zeros_like(self.bh)
d_by = np.zeros_like(self.by)
d_h = np.dot(self.Why.T, d_y)
# 反向传播
for t in reversed(range(n)):
temp = (1 - self.last_hs[t + 1] ** 2) * d_h
d_Wxh += np.dot(temp, self.last_inputs[t].T)
d_Whh += np.dot(temp, self.last_hs[t].T)
d_bh += temp
d_h = np.dot(self.Whh.T, temp)
d_Why = np.dot(d_y, self.last_hs[n].T)
d_by = d_y
# 使用Adam优化器更新参数
self.optimizer.step({
'Wxh': d_Wxh, 'Whh': d_Whh, 'Why': d_Why,
'bh': d_bh, 'by': d_by
})
# 生成模拟数据
def generate_data(num_samples, time_steps):
X = np.random.rand(num_samples, time_steps, 3) # 3个特征:温度、湿度、风速
y = np.sum(X[:, :, 0], axis=1) / 3 + np.random.normal(0, 0.1, num_samples) # 简单地用平均温度加噪声作为目标
return X, y.reshape(-1, 1)
# 数据标准化
def normalize(data):
return (data - np.mean(data)) / np.std(data)
# 生成训练和测试数据
X_train, y_train = generate_data(1000, 3)
X_test, y_test = generate_data(200, 3)
# 标准化数据
X_train_norm = normalize(X_train)
y_train_norm = normalize(y_train)
X_test_norm = normalize(X_test)
y_test_norm = normalize(y_test)
# 初始化RNN
input_size = 3
hidden_size = 64
output_size = 1
rnn = RNN(input_size, hidden_size, output_size)
# 训练函数
def train(rnn, X, y, epochs):
for epoch in range(epochs):
total_loss = 0
for i in range(len(X)):
inputs = [X[i][t].reshape(-1, 1) for t in range(3)]
target = y[i]
# 前向传播
output, _ = rnn.forward(inputs)
# 计算损失
loss = np.sum((output - target) ** 2)
total_loss += loss
# 反向传播
d_y = 2 * (output - target)
rnn.backward(d_y)
if epoch % 10 == 0:
print(f"Epoch {epoch}, Loss: {total_loss / len(X)}")
# 初始化RNN
input_size = 3
hidden_size = 64
output_size = 1
rnn = RNN(input_size, hidden_size, output_size)
# 训练模型
train(rnn, X_train_norm, y_train_norm, epochs=100)
# 评估函数
def evaluate(rnn, X, y):
total_loss = 0
for i in range(len(X)):
inputs = [X[i][t].reshape(-1, 1) for t in range(3)]
target = y[i]
output, _ = rnn.forward(inputs)
loss = np.sum((output - target) ** 2)
total_loss += loss
return total_loss / len(X)
# 评估模型
test_loss = evaluate(rnn, X_test_norm, y_test_norm)
print(f"Test Loss: {test_loss}")
# 进行预测
def predict(rnn, X):
inputs = [X[t].reshape(-1, 1) for t in range(3)]
output, _ = rnn.forward(inputs)
return output
# 示例预测
sample_data = X_test_norm[0]
prediction = predict(rnn, sample_data)
print(f"Sample input: {X_test[0]}")
print(f"Normalized prediction: {prediction[0][0]}")
# 反标准化预测结果
mean_y = np.mean(y_train)
std_y = np.std(y_train)
denormalized_prediction = prediction[0][0] * std_y + mean_y
print(f"Denormalized prediction: {denormalized_prediction}")
print(f"Actual value: {y_test[0][0]}")
创作不易,烦请各位观众老爷给个三连,小编在这里跪谢了!