针对序列(时间、文本)数据的网络结构
- P183--循环神经网络(RNN, Recurrent Neural Network 1980s)
- (1)模型结构说明
- (2)创新性说明
- (3)示例代码:类似古诗词风格的文本生成
- P184--长短期记忆网络(LSTM, Long Short-Term Memory 1997)
- (1)模型结构说明
- (2)创新性说明
- (3)示例代码:Apple股票价格预测
- P185--门控循环单元(GRU, Gated Recurrent Unit 2014)
- (1)模型结构说明
- (2)创新性说明
- (3)示例代码:模拟天气数据的多输出预测
运行系统:macOS Sequoia 15.0
Python编译器:PyCharm 2024.1.4 (Community Edition)
Python版本:3.12
TensorFlow版本:2.17.0
Pytorch版本:2.4.1
往期链接:
1-5 | 6-10 | 11-20 | 21-30 | 31-40 | 41-50 |
---|
51-60:函数 | 61-70:类 | 71-80:编程范式及设计模式 |
---|
81-90:Python编码规范 | 91-100:Python自带常用模块-1 |
---|
101-105:Python自带模块-2 | 106-110:Python自带模块-3 |
---|
111-115:Python常用第三方包-频繁使用 | 116-120:Python常用第三方包-深度学习 |
---|
121-125:Python常用第三方包-爬取数据 | 126-130:Python常用第三方包-为了乐趣 |
---|
131-135:Python常用第三方包-拓展工具1 | 136-140:Python常用第三方包-拓展工具2 |
---|
Python项目实战
141-145 | 146-150 | 151-155 | 156-160 | 161-165 | 166-170 | 171-175 |
---|
176-180:卷积结构 | 181-182:卷积结构(续) |
---|
P183–循环神经网络(RNN, Recurrent Neural Network 1980s)
(1)模型结构说明
循环神经网络(Recurrent Neural Network,简称RNN)是一类专门用于处理序列数据的神经网络。它在自然语言处理、时间序列预测、语音识别等领域具有广泛的应用。
RNN的核心思想是引入循环连接,使得网络能够记忆之前的信息,从而处理序列数据中的时间依赖性。其基本结构如下所示:
- 输入层:每个输入 x t x_t xt代表序列中的一个元素(例如,一个词或一个时间步的数值)。
- 隐藏层【记忆在这里实现】:隐藏状态
h
t
h_t
ht是根据前一个隐藏状态
h
t
−
1
h_{t-1}
ht−1和当前输入
x
t
x_t
xt计算得出的。这个过程通常用以下公式表示:
h t = t a n h ( W h ⋅ h t − 1 + W x ⋅ x t + b ) h_t = tanh(W_{h}\cdot h_{t-1} + W_{x} \cdot x_{t}+b) ht=tanh(Wh⋅ht−1+Wx⋅xt+b) - 输出层:输出
y
t
y_t
yt是通过当前的隐藏状态生成的,公式如下:
y t = W y ⋅ h t + c y_t = W_{y} \cdot h_{t}+c yt=Wy⋅ht+c
+-----------------+
| 输入序列 |
+-----------------+
|
v
+------------------+
| 隐藏状态 |<--- h_{t-1}
+------------------+
|
v
+-------------+
| 输出 |
+-------------+
(2)创新性说明
-
1. 处理序列数据的能力
与传统前馈神经网络不同,RNN通过循环连接能够处理任意长度的序列数据,适应性更强。它能够记忆之前的信息,捕捉序列中的时间依赖性。 -
2. 共享参数
在RNN中,同一组权重参数 W h W_h Wh和 W x W_x Wx在不同时间步之间共享。这不仅减少了模型的参数数量,还使模型能够在不同时间步之间泛化学习到的模式。 -
3. 动态计算图
RNN的循环结构使其具备动态计算图的特点,能够处理不同长度的输入序列,增加了模型的灵活性。 -
4. 基础性地位
RNN作为序列模型的基础,为后续更复杂的模型(如长短期记忆网络LSTM、门控循环单元GRU等)的发展奠定了基础,推动了深度学习在序列数据处理领域的应用和研究。
(3)示例代码:类似古诗词风格的文本生成
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN, Dense, Embedding
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
# 准备更多的训练数据
poems = [
"春眠不觉晓,处处闻啼鸟。夜来风雨声,花落知多少。",
"床前明月光,疑是地上霜。举头望明月,低头思故乡。",
"白日依山尽,黄河入海流。欲穷千里目,更上一层楼。",
"红豆生南国,春来发几枝。愿君多采撷,此物最相思。",
"君问归期未有期,巴山夜雨涨秋池。何当共剪西窗烛,却话巴山夜雨时。",
"独坐幽篁里,弹琴复长啸。深林人不知,明月来相照。",
"天门中断楚江开,碧水东流至此回。两岸青山相对出,孤帆一片日边来。",
"过故人庄,故人具鸡黍。召儿烹鸡,唤妇载酒。稻花香里说丰年,听取蛙声一片。",
"山中相送罢,日暮掩柴扉。春草明年绿,王孙归不归。",
"葡萄美酒夜光杯,欲饮琵琶马上催。醉卧沙场君莫笑,古来征战几人回。"
]
# 将所有诗句连接成一个长文本
text = "".join(poems)
# 创建字符级别的Tokenizer
tokenizer = Tokenizer(char_level=True)
tokenizer.fit_on_texts([text])
total_chars = len(tokenizer.word_index) + 1
# 生成输入序列和目标
seq_length = 20
sequences = []
for i in range(len(text) - seq_length):
sequences.append(text[i:i+seq_length+1])
X = []
y = []
for seq in sequences:
X.append(seq[:-1])
y.append(seq[-1])
X = tokenizer.texts_to_sequences(X)
X = pad_sequences(X, maxlen=seq_length)
y = tokenizer.texts_to_sequences(y)
y = np.array(y).reshape(-1, 1)
X = np.array(X)
y = tf.keras.utils.to_categorical(y, num_classes=total_chars)
# 构建RNN模型
model = Sequential([
Embedding(total_chars, 64, input_length=seq_length),
SimpleRNN(256, return_sequences=True),
SimpleRNN(256),
Dense(total_chars, activation='softmax')
])
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
# 训练模型
model.fit(X, y, epochs=10330, batch_size=32, validation_split=0.1)
# 生成文本函数
def generate_text(seed_text, num_chars):
generated_text = seed_text
for _ in range(num_chars):
x = tokenizer.texts_to_sequences([generated_text[-seq_length:]])
x = pad_sequences(x, maxlen=seq_length)
y_pred = model.predict(x)[0]
predicted_char_index = np.argmax(y_pred)
predicted_char = tokenizer.index_word[predicted_char_index]
generated_text += predicted_char
return generated_text
# 生成新文本
print(generate_text("君问归期", 20))
在不更改模型RNN结构的基础上,可通过以下方式提高输出质量:
- 进一步增加SimpleRNN层的神经元数量。
- 增加SimpleRNN层的数量。
- 调整学习率或使用学习率调度器。
- 增加训练轮次【最重要】
- 收集更多的训练数据。
P184–长短期记忆网络(LSTM, Long Short-Term Memory 1997)
(1)模型结构说明
LSTM(Long Short-Term Memory)是一种特殊的RNN(循环神经网络)结构,由Hochreiter & Schmidhuber在1997年提出。专门设计用于解决长序列数据中的梯度消失问题。以下是 LSTM 的组成部分:
-
输入门:决定当前输入 x t x_t xt对记忆单元的影响。
i t = σ ( W i ⋅ [ h t − 1 , x t ] + b i i_t = \sigma(W_i \cdot [h_{t-1}, x_t] +b_i it=σ(Wi⋅[ht−1,xt]+bi -
遗忘门:决定上一状态 C t − 1 C_{t-1} Ct−1中的信息保留多少。
f t = σ ( W f ⋅ [ h t − 1 , x t ] + b f f_t = \sigma(W_f \cdot [h_{t-1}, x_t] +b_f ft=σ(Wf⋅[ht−1,xt]+bf -
记忆单元:更新当前记忆状态,结合遗忘门和输入门。
C t = f t ⊙ C t − 1 + i t ⊙ C ~ t C_t = f_t \odot C_{t-1}+i_t\odot \tilde{C}_{t} Ct=ft⊙Ct−1+it⊙C~t -
输出门:决定当前隐藏状态 h t h_t ht的输出。
o t = σ ( W o ⋅ [ h t − 1 , x t ] + b 0 o_t = \sigma(W_o \cdot [h_{t-1}, x_t] +b_0 ot=σ(Wo⋅[ht−1,xt]+b0 -
隐藏状态:结合当前记忆单元和输出门的结果。
h t = o t ⊙ t a n h ( C t ) h_t=o_t \odot tanh(C_t) ht=ot⊙tanh(Ct)
┌───────────┐
│ 输入 │
└─────┬─────┘
│
▼
┌────────────┐
│ 忘记门 │
│ (f_t) │
└─────┬─────┘
│
▼
┌────────────┐
│ 输入门 │
│ (i_t) │
└─────┬─────┘
│
▼
┌────────────┐
│ 记忆单元 │
│ (C_t) │
└─────┬─────┘
│
▼
┌────────────┐
│ 输出门 │
│ (o_t) │
└─────┬─────┘
│
▼
┌────────────┐
│ 输出 │
└────────────┘
(2)创新性说明
LSTM的主要创新点和优势包括:
-
解决了传统RNN的长期依赖问题:通过引入门控机制,LSTM能够有效地学习长期依赖关系,避免了传统RNN中的梯度消失问题。
-
门控机制:LSTM引入了输入门、遗忘门和输出门,这些门可以控制信息的流动,使网络能够选择性地记忆或遗忘信息。
-
记忆单元:LSTM包含一个独立的记忆单元(cell state),可以长期保存重要信息。
-
更好的梯度流动:LSTM的结构允许梯度在长序列中更容易地流动,减少了梯度消失和梯度爆炸的问题。
-
灵活性:LSTM可以处理不同长度的序列,适用于各种序列建模任务。
(3)示例代码:Apple股票价格预测
import numpy as np
import pandas as pd
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
# 获取股票数据
stock = yf.Ticker("AAPL")
df = stock.history(period="5y")
data = df['Close'].values.reshape(-1, 1)
# 数据归一化
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)
# 准备训练数据
def create_dataset(dataset, time_step=60):
X, y = [], []
for i in range(len(dataset) - time_step):
X.append(dataset[i:(i + time_step), 0])
y.append(dataset[i + time_step, 0])
return np.array(X), np.array(y)
time_step = 60
X, y = create_dataset(scaled_data, time_step)
# 划分训练集和测试集
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
# 重塑输入为 3D 格式 [samples, time_steps, features]
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
# 构建LSTM模型
model = Sequential([
LSTM(50, return_sequences=True, input_shape=(time_step, 1)),
LSTM(50, return_sequences=True),
LSTM(50),
Dense(1)
])
model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
# 训练模型
history = model.fit(X_train, y_train, validation_data=(X_test, y_test),
epochs=50, batch_size=32, verbose=1)
# 使用模型进行预测
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)
# 反归一化
train_predict = scaler.inverse_transform(train_predict)
test_predict = scaler.inverse_transform(test_predict)
y_train_actual = scaler.inverse_transform(y_train.reshape(-1, 1))
y_test_actual = scaler.inverse_transform(y_test.reshape(-1, 1))
# 创建时间索引
time_index = df.index[time_step:]
# 绘制结果
plt.figure(figsize=(16,8))
# 绘制训练集
plt.plot(time_index[:train_size], y_train_actual, label='Actual Train', alpha=0.7)
plt.plot(time_index[:train_size], train_predict, label='Predicted Train', alpha=0.7)
# 绘制测试集
plt.plot(time_index[train_size:], y_test_actual, label='Actual Test', alpha=0.7)
plt.plot(time_index[train_size:], test_predict, label='Predicted Test', alpha=0.7)
plt.title('Apple Stock Price Prediction')
plt.xlabel('Date')
plt.ylabel('Price (USD)')
plt.legend()
plt.show()
# 输出模型性能
train_rmse = np.sqrt(np.mean((train_predict - y_train_actual)**2))
test_rmse = np.sqrt(np.mean((test_predict - y_test_actual)**2))
print(f"Train RMSE: {train_rmse}")
print(f"Test RMSE: {test_rmse}")
# 绘制损失曲线
plt.figure(figsize=(12,6))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()
P185–门控循环单元(GRU, Gated Recurrent Unit 2014)
(1)模型结构说明
GRU(Gated Recurrent Unit)是一种由Cho等人在2014年提出的循环神经网络(RNN)结构。GRU旨在解决传统RNN中的梯度消失问题,同时具有较低的计算复杂度。它在许多任务中表现出与LSTM相似的效果,但通常训练速度更快。
GRU的主要结构包括两个门:
- 更新门(Update Gate):决定了前一时刻的状态信息保留到当前状态的程度。
- 重置门(Reset Gate):决定了如何将新的输入信息与前一状态结合。
GRU的计算过程如下:
1. 计算更新门
z
t
=
σ
(
W
z
⋅
[
h
t
−
1
,
x
t
]
z_t = \sigma(W_z \cdot [h_{t-1}, x_t]
zt=σ(Wz⋅[ht−1,xt]
2. 计算重置门
r
t
=
σ
(
W
r
⋅
[
h
t
−
1
,
x
t
]
r_t = \sigma(W_r \cdot [h_{t-1}, x_t]
rt=σ(Wr⋅[ht−1,xt]
3. 计算候选隐藏状态
h
~
t
=
t
a
n
h
(
W
⋅
[
r
t
∗
h
t
−
1
,
x
t
]
)
\tilde{h}_{t}=tanh(W\cdot [r_t \ast h_{t-1}, x_t])
h~t=tanh(W⋅[rt∗ht−1,xt])
4. 计算当前隐藏状态
h
t
=
(
1
−
z
t
)
∗
h
t
−
1
+
z
t
∗
h
~
t
h_t = (1-z_t)\ast h_{t-1} + z_t \ast \tilde h_{t}
ht=(1−zt)∗ht−1+zt∗h~t
+-----------------+
| 输入序列 |
+-----------------+
|
+------------------+
| 重置门 (r) |
+------------------+
|
+------------------+
| 更新门 (z) |
+------------------+
|
+------------------+
| 候选状态 |
+------------------+
|
+-----------------+
| 输出状态 |
+-----------------+
(2)创新性说明
简化结构: GRU只有两个门,相比于LSTM的三个门,结构更简单,参数更少。
自适应记忆: 通过更新门和重置门,GRU能够自适应地决定保留多少历史信息和引入多少新信息。
解决梯度问题: GRU有效缓解了梯度消失和梯度爆炸问题,适合捕捉长期依赖关系。
性能可比: 在许多任务中,GRU的性能与LSTM相当,但训练速度更快,计算效率更高。
(3)示例代码:模拟天气数据的多输出预测
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GRU, Dense
# macos系统显示中文
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']
# 生成模拟数据
np.random.seed(0)
def generate_time_series(n, freq, noise):
x = np.linspace(0, 4*np.pi, n)
return np.sin(freq * x) + np.random.normal(0, noise, n)
n_samples = 10000
n_features = 6
# 生成特征
temp = generate_time_series(n_samples, 0.5, 0.1) * 10 + 20 # 温度
humidity = generate_time_series(n_samples, 0.3, 0.05) * 20 + 50 # 湿度
wind_speed = np.abs(generate_time_series(n_samples, 0.7, 0.1)) * 5 # 风速
traffic_volume = np.abs(generate_time_series(n_samples, 1, 0.2)) * 1000 # 交通流量
industrial_activity = np.abs(generate_time_series(n_samples, 0.2, 0.1)) * 100 # 工业活动
time_of_day = np.tile(np.arange(24), n_samples // 24 + 1)[:n_samples] # 一天中的小时
# 创建目标变量
aqi = (
0.3 * temp +
0.2 * humidity +
0.1 * wind_speed +
0.2 * traffic_volume +
0.2 * industrial_activity +
np.random.normal(0, 5, n_samples)
)
aqi = np.clip(aqi, 0, 500) # 限制AQI在0-500范围内
energy_consumption = (
0.4 * temp +
0.1 * humidity +
0.3 * industrial_activity +
0.2 * np.sin(2 * np.pi * time_of_day / 24) + # 日周期
np.random.normal(0, 10, n_samples)
) * 100 # 缩放到更现实的值
energy_consumption = np.clip(energy_consumption, 0, None) # 确保非负
# 创建数据框
df = pd.DataFrame({
'temp': temp,
'humidity': humidity,
'wind_speed': wind_speed,
'traffic_volume': traffic_volume,
'industrial_activity': industrial_activity,
'time_of_day': time_of_day,
'next_aqi': np.roll(aqi, -24),
'next_energy': np.roll(energy_consumption, -24)
})
df = df.iloc[:-24] # 删除最后24行,因为它们没有对应的"下一天"值
# 准备数据
features = ['temp', 'humidity', 'wind_speed', 'traffic_volume', 'industrial_activity', 'time_of_day']
target = ['next_aqi', 'next_energy']
X = df[features].values
y = df[target].values
# 创建时间序列数据
def create_sequences(X, y, time_steps=1):
Xs, ys = [], []
for i in range(len(X) - time_steps):
Xs.append(X[i:(i + time_steps)])
ys.append(y[i + time_steps - 1])
return np.array(Xs), np.array(ys)
time_steps = 24 # 使用过去24小时的数据
X_seq, y_seq = create_sequences(X, y, time_steps)
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X_seq, y_seq, test_size=0.2, random_state=42)
# 标准化数据
scaler_X = StandardScaler()
X_train_scaled = scaler_X.fit_transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
X_test_scaled = scaler_X.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)
scaler_y = StandardScaler()
y_train_scaled = scaler_y.fit_transform(y_train)
y_test_scaled = scaler_y.transform(y_test)
# 构建GRU模型
model = Sequential([
GRU(64, input_shape=(time_steps, len(features)), return_sequences=True),
GRU(32),
Dense(16, activation='relu'),
Dense(2) # 2个输出:AQI和能源消耗
])
model.compile(optimizer='adam', loss='mse')
# 训练模型
history = model.fit(X_train_scaled, y_train_scaled,
epochs=50, batch_size=32,
validation_split=0.2,
verbose=1)
# 评估模型
loss = model.evaluate(X_test_scaled, y_test_scaled)
print(f"Test loss: {loss}")
# 使用模型进行预测
predictions_scaled = model.predict(X_test_scaled)
predictions = scaler_y.inverse_transform(predictions_scaled)
# 打印一些预测结果
for i in range(5):
print(f"真实值: AQI {y_test[i][0]:.2f}, 能源消耗 {y_test[i][1]:.2f} kWh")
print(f"预测值: AQI {predictions[i][0]:.2f}, 能源消耗 {predictions[i][1]:.2f} kWh")
print()
# 绘制损失曲线
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()
plt.show()
# 绘制预测结果
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.scatter(y_test[:, 0], predictions[:, 0])
plt.plot([y_test[:, 0].min(), y_test[:, 0].max()], [y_test[:, 0].min(), y_test[:, 0].max()], 'r--', lw=2)
plt.xlabel('True AQI')
plt.ylabel('Predicted AQI')
plt.title('AQI Prediction')
plt.subplot(1, 2, 2)
plt.scatter(y_test[:, 1], predictions[:, 1])
plt.plot([y_test[:, 1].min(), y_test[:, 1].max()], [y_test[:, 1].min(), y_test[:, 1].max()], 'r--', lw=2)
plt.xlabel('True Energy Consumption')
plt.ylabel('Predicted Energy Consumption')
plt.title('Energy Consumption Prediction')
plt.tight_layout()
plt.show()