这个代码参考了时间序列预测模型实战案例(三)(LSTM)(Python)(深度学习)时间序列预测(包括运行代码以及代码讲解)_lstm预测模型-CSDN博客
结合我之前所学的lstm-seq2seq里所学习到的知识对其进行预测
import time
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from matplotlib import pyplot as plt
from sklearn.preprocessing import MinMaxScaler
np.random.seed(0)
def calculate_mae(y_true, y_pred):
# 平均绝对误差
mae = np.mean(np.abs(y_true - y_pred))
return mae
true_data = pd.read_csv(r"C:\Users\33746\Desktop\DailyDelhiClimateTrain.csv") # 填你自己的数据地址
target = 'meanpressure'
# 这里加一些数据的预处理, 最后需要的格式是pd.series
true_data = np.array(true_data['meanpressure'])
# 定义窗口大小
test_data_size = 32
# 训练集和测试集的尺寸划分
test_size = 0.15
train_size = 0.85
# 标准化处理
scaler_train = MinMaxScaler(feature_range=(0, 1))
scaler_test = MinMaxScaler(feature_range=(0, 1))
train_data = true_data[:int(train_size * len(true_data))]
test_data = true_data[-int(test_size * len(true_data)):]
print("训练集尺寸:", len(train_data))
print("测试集尺寸:", len(test_data))
train_data_normalized = scaler_train.fit_transform(train_data.reshape(-1, 1))
test_data_normalized = scaler_test.fit_transform(test_data.reshape(-1, 1))
# 转化为深度学习模型需要的类型Tensor
train_data_normalized = torch.FloatTensor(train_data_normalized).view(-1)
test_data_normalized = torch.FloatTensor(test_data_normalized).view(-1)
def create_inout_sequences(input_data, tw, pre_len):
inout_seq = []
L = len(input_data)
for i in range(L - tw):
train_seq = input_data[i:i + tw]
if (i + tw + 4) > len(input_data):
break
train_label = input_data[i + tw:i + tw + pre_len]
inout_seq.append((train_seq, train_label))
return inout_seq
pre_len = 4
train_window = 16
# 定义训练器的的输入
train_inout_seq = create_inout_sequences(train_data_normalized, train_window, pre_len)
class LSTM(nn.Module):
def __init__(self, input_dim=1, hidden_dim=350, output_dim=1):
super(LSTM, self).__init__()
self.hidden_dim = hidden_dim
self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
x = x.unsqueeze(1)
h0_lstm = torch.zeros(1, self.hidden_dim).to(x.device)
c0_lstm = torch.zeros(1, self.hidden_dim).to(x.device)
out, _ = self.lstm(x, (h0_lstm, c0_lstm))
out = out[:, -1]
out = self.fc(out)
return out
lstm_model = LSTM(input_dim=1, output_dim=pre_len, hidden_dim=train_window)
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(lstm_model.parameters(), lr=0.001)
epochs = 10
Train = False # 训练还是预测
if Train:
losss = []
lstm_model.train() # 训练模式
start_time = time.time() # 计算起始时间
for i in range(epochs):
for seq, labels in train_inout_seq:
lstm_model.train()
optimizer.zero_grad()
y_pred = lstm_model(seq)
single_loss = loss_function(y_pred, labels)
single_loss.backward()
optimizer.step()
print(f'epoch: {i:3} loss: {single_loss.item():10.8f}')
losss.append(single_loss.detach().numpy())
torch.save(lstm_model.state_dict(), 'save_model.pth')
print(f"模型已保存,用时:{(time.time() - start_time) / 60:.4f} min")
plt.plot(losss)
# 设置图表标题和坐标轴标签
plt.title('Training Error')
plt.xlabel('Epoch')
plt.ylabel('Error')
# 保存图表到本地
plt.savefig('training_error.png')
else:
# 加载模型进行预测
lstm_model.load_state_dict(torch.load('save_model.pth'))
lstm_model.eval() # 评估模式
results = []
reals = []
losss = []
test_inout_seq = create_inout_sequences(test_data_normalized, train_window, pre_len)
for seq, labels in train_inout_seq:
pred = lstm_model(seq)[0].item()
results.append(pred)
mae = calculate_mae(pred, labels.detach().numpy()) # MAE误差计算绝对值(预测值 - 真实值)
reals.append(labels.detach().numpy())
losss.append(mae)
print("模型预测结果:", results)
print("预测误差MAE:", losss)
plt.style.use('ggplot')
# 创建折线图
plt.plot(results, label='real', color='blue') # 实际值
plt.plot(reals, label='forecast', color='red', linestyle='--') # 预测值
# 增强视觉效果
plt.grid(True)
plt.title('real vs forecast')
plt.xlabel('time')
plt.ylabel('value')
plt.legend()
plt.savefig('test——results.png')
train_data = true_data[:int(train_size * len(true_data))]
test_data = true_data[-int(test_size * len(true_data)):]是在干什么
这两行代码的作用是将数据集 true_data
划分为训练集 train_data
和测试集 test_data
。具体来说:
代码解释:
-
train_data = true_data[:int(train_size * len(true_data))]
:-
train_size
是一个比例值(例如0.8,表示80%),表示训练集占总数据集的比例。 -
len(true_data)
返回true_data
的长度。 -
train_size * len(true_data)
计算出训练集的大小。 -
int(train_size * len(true_data))
将计算结果转换为整数,表示训练集的实际样本数量。 -
true_data[:int(train_size * len(true_data))]
使用切片操作从true_data
的开头到训练集的样本数量,提取出训练集。
-
-
test_data = true_data[-int(test_size * len(true_data)):]
:-
test_size
是另一个比例值(例如0.2,表示20%),表示测试集占总数据集的比例。 -
test_size * len(true_data)
计算出测试集的大小。 -
int(test_size * len(true_data))
将计算结果转换为整数,表示测试集的实际样本数量。 -
true_data[-int(test_size * len(true_data)):]
使用切片操作从true_data
的末尾到测试集的样本数量,提取出测试集。
-
示例:
假设 true_data
有100个样本,train_size
为0.8,test_size
为0.2:
-
train_size * len(true_data) = 0.8 * 100 = 80
-
test_size * len(true_data) = 0.2 * 100 = 20
那么:
-
train_data = true_data[:80]
会提取true_data
的前80个样本作为训练集。 -
test_data = true_data[-20:]
会提取true_data
的后20个样本作为测试集。
注意事项:
-
数据顺序:如果
true_data
是有序的(例如时间序列数据),这种划分方式可能会导致训练集和测试集之间存在时间上的重叠,从而引入数据泄露。在这种情况下,应该按照时间顺序进行划分。 -
随机性:如果
true_data
是无序的,这种划分方式是合理的。但如果数据没有被打乱,建议在划分之前先打乱数据,以确保训练集和测试集能够代表整体数据的分布。 -
假设
input_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
,tw = 3
,pre_len = 2
: -
第一次循环:
-
i = 0
-
train_seq = [1, 2, 3]
-
train_label = [4, 5]
-
inout_seq = [([1, 2, 3], [4, 5])]
-
-
第二次循环:
-
i = 1
-
train_seq = [2, 3, 4]
-
train_label = [5, 6]
-
inout_seq = [([1, 2, 3], [4, 5]), ([2, 3, 4], [5, 6])]
-
-
以此类推,直到
i = 7
时,train_seq = [8, 9, 10]
,train_label = []
,此时i + tw + pre_len
超出input_data
的范围,循环结束。