基于Python 3.9版本演示
一、写在前面
上一节,我们学了经验模态分解(Empirical Mode Decomposition,EMD)。
如同结尾所说,“那么,做这些分解有什么作用呢?有大佬基于这些分解出来的序列分别作预测,然后再次合并,达到提升预测性能的作用。”
二、EMD&LSTM-ARIMA组合策略
该组合策略主要是将传统的经验模态分解(EMD)方法和现代的机器学习技术(LSTM 和 ARIMA 模型)相结合,用于增强时序数据的预测能力。下面是这个策略的具体描述:
(1)经验模态分解 (EMD):
1)首先,使用 EMD 方法处理原始时序数据,将其分解为多个内模函数(IMF)和一个剩余信号。这一步骤的目的是提取数据中的不同频率成分,每个 IMF 代表原始信号的不同频率层次,而剩余信号包含了趋势信息。
2)EMD 是一种自适应方法,适用于非线性和非平稳时间序列数据分析,可以揭示隐藏在复杂数据集中的简单结构和成分。
(2)LSTM 和 ARIMA 模型的应用:
1)将不同的 IMF 成分分配给不同的预测模型:选定的IMF由 LSTM 模型处理,通常选择那些更具高频和复杂动态的成分;而趋势性较强的成分(包括剩余信号)则交由 ARIMA 模型进行分析。
2)LSTM (长短期记忆网络):适合处理和预测时间序列数据中的长期依赖关系,因此用于捕捉和预测时序数据中的非线性模式和复杂关系。
3)ARIMA (自回归积分滑动平均模型):擅长处理线性关系和趋势变化,适用于具有明显趋势或季节性的时间序列数据。
三、EMD&LSTM-ARIMA组合策略代码Pyhton实现
下面,我使用的是之前分享过的肺结核的数据做演示:
Pyhon代码:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import CubicSpline
import torch
import torch.nn as nn
import torch.optim as optim
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_absolute_error, mean_squared_error
# 读取数据
file_path = 'pone.0277314.s006.xlsx'
data = pd.read_excel(file_path)
# 提取时间和PTB病例数
time_series = data['Time']
ptb_cases = data['PTB cases']
# 将时间转换为数值形式
time_numeric = np.arange(len(time_series))
def get_envelope_mean(signal):
"""计算信号的上包络线和下包络线的均值"""
maxima = np.where(np.r_[True, signal[1:] > signal[:-1]] & np.r_[signal[:-1] > signal[1:], True])[0]
minima = np.where(np.r_[True, signal[1:] < signal[:-1]] & np.r_[signal[:-1] < signal[1:], True])[0]
if len(maxima) < 2 or len(minima) < 2:
return np.zeros_like(signal)
upper_env = CubicSpline(maxima, signal[maxima])(time_numeric)
lower_env = CubicSpline(minima, signal[minima])(time_numeric)
return (upper_env + lower_env) / 2
def sift(signal, max_iter=1000, tol=1e-6):
"""对信号进行sifting操作,提取IMF"""
h = signal
for _ in range(max_iter):
m = get_envelope_mean(h)
h1 = h - m
if np.mean(np.abs(h - h1)) < tol:
break
h = h1
return h
def emd(signal, max_imfs=6):
"""进行EMD分解"""
residual = signal
imfs = []
for _ in range(max_imfs):
imf = sift(residual)
imfs.append(imf)
residual = residual - imf
if np.all(np.abs(residual) < 1e-6):
break
return np.array(imfs), residual
# 执行EMD分解
imfs, residual = emd(ptb_cases.values)
# 绘制分解结果
num_imfs = imfs.shape[0]
plt.figure(figsize=(12, 9))
for i in range(num_imfs):
plt.subplot(num_imfs + 1, 1, i + 1)
plt.plot(time_series, imfs[i], label=f'IMF {i + 1}')
plt.legend()
plt.subplot(num_imfs + 1, 1, num_imfs + 1)
plt.plot(time_series, residual, label='Residual')
plt.legend()
plt.tight_layout()
plt.show()
# LSTM模型
class LSTMModel(nn.Module):
def __init__(self, input_size=1, hidden_layer_size=50, output_size=1):
super(LSTMModel, self).__init__()
self.hidden_layer_size = hidden_layer_size
self.lstm = nn.LSTM(input_size, hidden_layer_size)
self.linear = nn.Linear(hidden_layer_size, output_size)
self.hidden_cell = (torch.zeros(1,1,self.hidden_layer_size),
torch.zeros(1,1,self.hidden_layer_size))
def forward(self, input_seq):
lstm_out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq) ,1, -1), self.hidden_cell)
predictions = self.linear(lstm_out.view(len(input_seq), -1))
return predictions[-1]
def train_lstm_model(train_data, n_steps):
model = LSTMModel()
loss_function = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
epochs = 200
for epoch in range(epochs):
for seq in range(len(train_data) - n_steps):
model.hidden_cell = (torch.zeros(1, 1, model.hidden_layer_size),
torch.zeros(1, 1, model.hidden_layer_size))
seq_train = torch.FloatTensor(train_data[seq:seq + n_steps])
label = torch.FloatTensor(train_data[seq + n_steps:seq + n_steps + 1])
optimizer.zero_grad()
y_pred = model(seq_train)
single_loss = loss_function(y_pred, label)
single_loss.backward()
optimizer.step()
if epoch % 50 == 0:
print(f'Epoch {epoch+1} loss: {single_loss.item()}')
return model
def arima_model(train_data, order):
model = ARIMA(train_data, order=order)
model_fit = model.fit()
return model_fit
n_steps = 10
imfs_lstm = [3, 4] # 分配给LSTM的IMFs索引
imfs_arima = [0, 1, 2] # 分配给ARIMA的IMFs索引
lstm_predictions = np.zeros(len(time_numeric))
arima_predictions = np.zeros(len(time_numeric))
# LSTM预测
for idx in imfs_lstm:
print(f'Training LSTM for IMF {idx+1}')
train_data = imfs[idx].flatten()
model = train_lstm_model(train_data, n_steps)
for i in range(n_steps, len(train_data)):
seq = torch.FloatTensor(train_data[i-n_steps:i])
with torch.no_grad():
lstm_predictions[i] += model(seq).item()
print(f'LSTM predictions for IMF {idx+1} completed')
# ARIMA预测
for idx in imfs_arima:
print(f'Training ARIMA for IMF {idx+1}')
train_data = imfs[idx]
model_fit = arima_model(train_data, order=(5, 1, 0))
arima_predictions += model_fit.predict(start=0, end=len(train_data) - 1)
print(f'ARIMA predictions for IMF {idx+1} completed')
# 合并预测结果
final_predictions = lstm_predictions + arima_predictions
# 计算误差
mae = mean_absolute_error(ptb_cases, final_predictions)
mse = mean_squared_error(ptb_cases, final_predictions)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((ptb_cases - final_predictions) / ptb_cases)) * 100
# 打印误差
print(f'MAE: {mae}')
print(f'MSE: {mse}')
print(f'RMSE: {rmse}')
print(f'MAPE: {mape}')
# 绘制预测结果
plt.figure(figsize=(12, 6))
plt.plot(time_numeric, ptb_cases, label='Original Data')
plt.plot(time_numeric, final_predictions, label='Predicted Data')
plt.legend()
plt.show()
输出:
跟原图对比:
发现了没,似乎是整体向下偏移了一波。让GPT帮忙优化一下算法。
五、优化后
根据每个模型的误差(MAE)微调一下试试:
Pyhon代码:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import CubicSpline
import torch
import torch.nn as nn
import torch.optim as optim
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_absolute_error, mean_squared_error
# 读取数据
file_path = 'pone.0277314.s006.xlsx'
data = pd.read_excel(file_path)
# 提取时间和PTB病例数
time_series = data['Time']
ptb_cases = data['PTB cases']
# 将时间转换为数值形式
time_numeric = np.arange(len(time_series))
def get_envelope_mean(signal):
"""计算信号的上包络线和下包络线的均值"""
maxima = np.where(np.r_[True, signal[1:] > signal[:-1]] & np.r_[signal[:-1] > signal[1:], True])[0]
minima = np.where(np.r_[True, signal[1:] < signal[:-1]] & np.r_[signal[:-1] < signal[1:], True])[0]
if len(maxima) < 2 or len(minima) < 2:
return np.zeros_like(signal)
upper_env = CubicSpline(maxima, signal[maxima])(time_numeric)
lower_env = CubicSpline(minima, signal[minima])(time_numeric)
return (upper_env + lower_env) / 2
def sift(signal, max_iter=1000, tol=1e-6):
"""对信号进行sifting操作,提取IMF"""
h = signal
for _ in range(max_iter):
m = get_envelope_mean(h)
h1 = h - m
if np.mean(np.abs(h - h1)) < tol:
break
h = h1
return h
def emd(signal, max_imfs=6):
"""进行EMD分解"""
residual = signal
imfs = []
for _ in range(max_imfs):
imf = sift(residual)
imfs.append(imf)
residual = residual - imf
if np.all(np.abs(residual) < 1e-6):
break
return np.array(imfs), residual
# 执行EMD分解
imfs, residual = emd(ptb_cases.values)
# 绘制分解结果
num_imfs = imfs.shape[0]
plt.figure(figsize=(12, 9))
for i in range(num_imfs):
plt.subplot(num_imfs + 1, 1, i + 1)
plt.plot(time_series, imfs[i], label=f'IMF {i + 1}')
plt.legend()
plt.subplot(num_imfs + 1, 1, num_imfs + 1)
plt.plot(time_series, residual, label='Residual')
plt.legend()
plt.tight_layout()
plt.show()
# LSTM模型
class LSTMModel(nn.Module):
def __init__(self, input_size=1, hidden_layer_size=50, output_size=1):
super(LSTMModel, self).__init__()
self.hidden_layer_size = hidden_layer_size
self.lstm = nn.LSTM(input_size, hidden_layer_size)
self.linear = nn.Linear(hidden_layer_size, output_size)
self.hidden_cell = (torch.zeros(1,1,self.hidden_layer_size),
torch.zeros(1,1,self.hidden_layer_size))
def forward(self, input_seq):
lstm_out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq) ,1, -1), self.hidden_cell)
predictions = self.linear(lstm_out.view(len(input_seq), -1))
return predictions[-1]
def train_lstm_model(train_data, n_steps):
model = LSTMModel(hidden_layer_size=100) # 调整隐藏层大小
loss_function = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) # 调整学习率
epochs = 300 # 增加训练轮数
for epoch in range(epochs):
for seq in range(len(train_data) - n_steps):
model.hidden_cell = (torch.zeros(1, 1, model.hidden_layer_size),
torch.zeros(1, 1, model.hidden_layer_size))
seq_train = torch.FloatTensor(train_data[seq:seq + n_steps])
label = torch.FloatTensor(train_data[seq + n_steps:seq + n_steps + 1])
optimizer.zero_grad()
y_pred = model(seq_train)
single_loss = loss_function(y_pred, label)
single_loss.backward()
optimizer.step()
if epoch % 50 == 0:
print(f'Epoch {epoch+1} loss: {single_loss.item()}')
return model
def arima_model(train_data, order):
model = ARIMA(train_data, order=order)
model_fit = model.fit()
return model_fit
n_steps = 10
imfs_lstm = [3, 4] # 分配给LSTM的IMFs索引
imfs_arima = [0, 1, 2] # 分配给ARIMA的IMFs索引
lstm_predictions = np.zeros(len(time_numeric))
arima_predictions = np.zeros(len(time_numeric))
# LSTM预测
for idx in imfs_lstm:
print(f'Training LSTM for IMF {idx+1}')
train_data = imfs[idx].flatten()
model = train_lstm_model(train_data, n_steps)
for i in range(n_steps, len(train_data)):
seq = torch.FloatTensor(train_data[i-n_steps:i])
with torch.no_grad():
lstm_predictions[i] += model(seq).item()
print(f'LSTM predictions for IMF {idx+1} completed')
# ARIMA预测
for idx in imfs_arima:
print(f'Training ARIMA for IMF {idx+1}')
train_data = imfs[idx]
model_fit = arima_model(train_data, order=(5, 1, 0))
arima_predictions += model_fit.predict(start=0, end=len(train_data) - 1)
print(f'ARIMA predictions for IMF {idx+1} completed')
# 合并预测结果
final_predictions = lstm_predictions + arima_predictions
# 计算LSTM和ARIMA模型的误差
lstm_error = np.mean(ptb_cases - lstm_predictions)
arima_error = np.mean(ptb_cases - arima_predictions)
# 根据误差平移预测结果
final_predictions += (lstm_error + arima_error) / 2
# 计算误差
mae = mean_absolute_error(ptb_cases, final_predictions)
mse = mean_squared_error(ptb_cases, final_predictions)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((ptb_cases - final_predictions) / ptb_cases)) * 100
# 打印误差
print(f'MAE: {mae}')
print(f'MSE: {mse}')
print(f'RMSE: {rmse}')
print(f'MAPE: {mape}')
# 绘制预测结果
plt.figure(figsize=(12, 6))
plt.plot(time_numeric, ptb_cases, label='Original Data')
plt.plot(time_numeric, final_predictions, label='Predicted Data')
plt.legend()
plt.show()
看看结果:
效果也不是太好。
六、最后
下一期,我们来测试一下其他矫正方法。