股价预测源码原版来源
github https://github.com/zhouhaoyi/Informer2020
数据
工欲善其事必先利其器,这是我经常说的。所以了解我们的数据处理很重要,毕竟要的就是地地地地地地道。
源码中的date数据很重要。但是我们要知道我们下载的代码具有可拓展性,对于各类数据切分很精准,所以我们并不需要每个部分都要去理解。什么意思?就是我们理解处理一次性代码就行了,并且学会构建数据集。这样就能跑自己的数据集了。话不多说开始看!!
首先要下载数据集 ,下载的话可以看百度网盘链接
链接:https://pan.baidu.com/s/1i6_AQNw7YLzdU7pdkoWgnQ?pwd=1234
提取码:1234
ok我们看到这个文件夹exp/exp_informer.py没错这就是主要的模型文件,都说万事开头看train
ok我们之间拉到train.py文件发现第一行发现定义了三个数据读取的操作
train_data, train_loader = self._get_data(flag='train')
vali_data, vali_loader = self._get_data(flag='val')
test_data, test_loader = self._get_data(flag='test')
发现我们需要进入._get_data函数里面
首先官源代码的定义是先定义数据集合的名称等像下面这样,那么定义的具体意义是:Dataset_ETT_hour(以小时为划分间隙去划分数据),Dataset_ETT_minute就是以分钟,Dataset_Custom就是正常切割,我们具体看这个
data_dict = {
'ETTh1': Dataset_ETT_hour,
'ETTh2': Dataset_ETT_hour,
'ETTm1': Dataset_ETT_minute,
'ETTm2': Dataset_ETT_minute,
'WTH': Dataset_Custom,
'ECL': Dataset_Custom,
'Solar': Dataset_Custom,
'custom': Dataset_Custom,
'wq': Dataset_Custom # 添加'wq'数据集
}
Data = data_dict[self.args.data] # 根据用户提供的数据集名称,从字典中选择相应的数据集类
Dataset_Custom,进行参数初始化后会调用__read_data__()函数去读取数据。其实主要的目的就是根据我们传入的初始化的数据切分间隔不同,从而产生符合我们要求的数据集。最重要的参数就是 freq,它有(‘h’, ‘d’, 等)h代表小时,d代表day按天切分,w代表week表示按周切分,m代表month按月切分,但是要注意,我们提供的数据必须含有date数据列,不然会报没有date数据列的错误。因为源码中切分了日期列。然后就是我们要提供 cols 参数,则使用用户指定的列,并移除目标列。如果没有提供 cols 参数,则使用所有列,并移除目标列和日期列。
class Dataset_Custom(Dataset):
def __init__(self, root_path, flag='train', size=None,
features='S', data_path='ETTh1.csv',
target='OT', scale=True, inverse=False, timeenc=0, freq='h', cols=None):
"""
初始化自定义数据集类,用于加载和处理时间序列数据。
:param root_path: 数据集根路径
:param flag: 数据集类型('train', 'val', 'test')
:param size: 序列长度、标签长度和预测长度列表
:param features: 特征类型('S', 'M', 'MS')
:param data_path: 数据文件路径
:param target: 要预测的目标列名
:param scale: 是否标准化
:param inverse: 是否反标准化
:param timeenc: 是否进行时间特征编码
:param freq: 时间频率('h', 'd', 等)
:param cols: 使用的列
"""
if size is None:
self.seq_len = 24 * 4 * 4 # 序列长度,默认24*4*4(4天)
self.label_len = 24 * 4 # 标签长度也就是训练的时候对照的label,默认24*4(1天)
self.pred_len = 24 * 4 # 预测长度,默认24*4(1天)
else:
self.seq_len = size[0] # 设置序列长度
self.label_len = size[1] # 设置标签长度
self.pred_len = size[2] # 设置预测长度
assert flag in ['train', 'test', 'val'] # 确保flag在指定的范围内
type_map = {'train': 0, 'val': 1, 'test': 2} # 数据集类型映射
self.set_type = type_map[flag] # 根据flag设置数据集类型
self.features = features # 特征类型
self.target = target # 目标列名
self.scale = scale # 是否标准化
self.inverse = inverse # 是否反标准化
self.timeenc = timeenc # 是否进行时间特征编码
self.freq = freq # 时间频率
self.cols = cols # 使用的列
self.root_path = root_path # 数据集根路径
self.data_path = data_path # 数据文件路径
self.__read_data__() # 读取数据
注意这里的time_feature函数,这个是根据freq来处理时间序列的方法,具体就是把相应的日期根据字典规则转换成为numpy数组元素,举个例子,在timeenc=0的情况下 2024-01-01 00:00:00->[ 1 1 0 0],这个数组中第一列表示月份,都是 1,因为所有日期都是在一月。第二列表示日期中的日,都是 1。第三列表示星期几,都是 0(星期一)。第四列表示小时,从 0 到 9。具体的可以自行查看该函数。
def __read_data__(self):
"""读取并处理数据。"""
self.scaler = StandardScaler() # 创建标准化器
df_raw = pd.read_csv(os.path.join(self.root_path, self.data_path)) # 读取数据文件
# 根据是否提供cols参数设置数据列
if self.cols:
cols = self.cols.copy() # 使用用户提供的列
cols.remove(self.target) # 移除目标列
else:
cols = list(df_raw.columns) # 使用所有列
cols.remove(self.target) # 移除目标列
cols.remove('date') # 移除日期列
df_raw = df_raw[['date'] + cols + [self.target]] # 重新排列列顺序
num_train = int(len(df_raw) * 0.7) # 训练集样本数,占比70%
num_test = int(len(df_raw) * 0.2) # 测试集样本数,占比20%
num_vali = len(df_raw) - num_train - num_test # 验证集样本数,剩余部分
border1s = [0, num_train - self.seq_len, len(df_raw) - num_test - self.seq_len] # 起始边界
border2s = [num_train, num_train + num_vali, len(df_raw)] # 结束边界
border1 = border1s[self.set_type] # 根据数据集类型设置起始边界
border2 = border2s[self.set_type] # 根据数据集类型设置结束边界
# 选择特征列,M和MS使用多列,S使用单列
if self.features in ['M', 'MS']:
cols_data = df_raw.columns[1:] # 除去日期列的所有列
df_data = df_raw[cols_data] # 提取多列数据
elif self.features == 'S':
df_data = df_raw[[self.target]] # 仅提取目标列数据
# 数据标准化
if self.scale:
train_data = df_data[border1s[0]:border2s[0]] # 训练集数据
self.scaler.fit(train_data.values) # 计算训练集均值和标准差
data = self.scaler.transform(df_data.values) # 标准化所有数据
else:
data = df_data.values # 不标准化
df_stamp = df_raw[['date']][border1:border2] # 提取日期列
df_stamp['date'] = pd.to_datetime(df_stamp.date) # 将日期列转换为datetime对象
data_stamp = time_features(df_stamp, timeenc=self.timeenc, freq=self.freq) # 生成时间特征
self.data_x = data[border1:border2] # 设置输入数据
if self.inverse:
self.data_y = df_data.values[border1:border2] # 设置目标数据,反标准化
else:
self.data_y = data[border1:border2] # 设置目标数据,标准化
self.data_stamp = data_stamp # 设置时间特征数据
这里的 __getitem__函数是用于进行dataloder取数据的时候用的,它会按size进行取我们的训练数据集和标签数据集
def __getitem__(self, index):
"""
根据索引获取数据样本。
"""
s_begin = index # 序列起始位置
s_end = s_begin + self.seq_len # 序列结束位置
r_begin = s_end - self.label_len # 标签起始位置
r_end = r_begin + self.label_len + self.pred_len # 标签结束位置
seq_x = self.data_x[s_begin:s_end] # 获取输入序列
if self.inverse:
seq_y = np.concatenate([self.data_x[r_begin:r_begin + self.label_len], self.data_y[r_begin + self.label_len:r_end]], 0) # 获取目标序列,反标准化
else:
seq_y = self.data_y[r_begin:r_end] # 获取目标序列,标准化
seq_x_mark = self.data_stamp[s_begin:s_end] # 获取训练的输入序列时间特征
seq_y_mark = self.data_stamp[r_begin:r_end] # 获取训练标签的目标序列时间特征
return seq_x, seq_y, seq_x_mark, seq_y_mark # 返回输入序列、目标序列及其时间特征
def __len__(self):
"""
返回数据集的长度。
"""
return len(self.data_x) - self.seq_len - self.pred_len + 1 # 数据集样本数
def inverse_transform(self, data):
"""
反标准化数据。
"""
return self.scaler.inverse_transform(data) # 将标准化数据转换为原始数据
参数设置
关于原文中的参数我进行翻译了一下,应该具体意思是这样对应接下来讲的参数
定义参数
我把参数部分进行了简化大概是这样
args = argparse.Namespace(
model='informer', # 模型名称,可以是[informer, informerstack, informerlight(TBD)]
data='wq', # 数据集名称
root_path='Informer2020/data/ETT', # 数据文件的根路径
data_path='601398.XSHG(工商银行14-24).csv', # 数据文件名称
features='MS', # 预测任务类型,可以是[M, S, MS]
target='close', # 在S或MS任务中使用的目标特征
freq='b', # 时间特征编码的频率,可以是[s(秒级), t(分钟级), h(小时级), d(天级), b(工作日级), w(周级), m(月级)]
checkpoints='./checkpoints/', # 模型检查点的位置
seq_len=96, # 编码器的输入序列长度
label_len=48, # 解码器的开始标记序列长度
pred_len=24, # 预测序列长度
enc_in=6, # 编码器输入维度
dec_in=6, # 解码器输入维度
c_out=1, # 模型输出维度
d_model=512, # 模型维度
n_heads=8, # 注意力头的数量
e_layers=2, # 编码器层数
d_layers=1, # 解码器层数
s_layers='3,2,1', # 堆叠编码器层数
d_ff=2048, # 前馈神经网络的维度
factor=5, # 稀疏注意力的因子 也就是加入你均摊的q
padding=0, # 填充类型
distil=True, # 是否在编码器中使用蒸馏
dropout=0.05, # dropout概率
attn='prob', # 编码器中使用的注意力类型,可以是[prob, full]
embed='timeF', # 时间特征编码的方式,可以是[timeF, fixed, learned]
activation='gelu', # 激活函数
output_attention=False, # 是否输出编码器的注意力权重
do_predict=False, # 是否预测未见过的未来数据
mix=True, # 是否在生成解码器中使用混合注意力
cols=None, # 从数据文件中选择的特定列作为输入特征
num_workers=0, # 数据加载器的工作进程数
itr=1, # 实验的重复次数
train_epochs=10, # 训练的轮数
batch_size=32, # 训练输入数据的批次大小
patience=500000000, # 早停的耐心值
learning_rate=0.0001, # 优化器的学习率
des='test', # 实验描述
loss='mse', # 损失函数
lradj='type1', # 学习率调整的方式
use_amp=False, # 是否使用自动混合精度训练
inverse=False, # 是否反转输出数据
use_gpu=True, # 是否使用GPU
gpu=0, # 用于训练和推理的GPU编号
use_multi_gpu=False, # 是否使用多GPU
devices='0,1,2,3' # 多GPU设备ID
)
网络主体
通过这些参数确实能很不错的获得一些效果,但是具体还是得进行informer的主要结构配合才行,我把train的早停策略给删除了
我们可以通过在Iexp/exp_informer.py里面的train函数里面
def train(self, setting):
# 获取训练、验证和测试数据及其数据加载器
train_data, train_loader = self._get_data(flag='train')
vali_data, vali_loader = self._get_data(flag='val')
test_data, test_loader = self._get_data(flag='test')
# 创建保存模型检查点的目录
path = os.path.join(self.args.checkpoints, setting)
if not os.path.exists(path):
os.makedirs(path)
time_now = time.time() # 记录当前时间
train_steps = len(train_loader) # 训练步骤的数量
model_optim = self._select_optimizer() # 选择优化器
criterion = self._select_criterion() # 选择损失函数
# 如果使用自动混合精度(AMP),初始化GradScaler
if self.args.use_amp:
scaler = torch.cuda.amp.GradScaler()
# 开始训练循环
for epoch in range(self.args.train_epochs):
iter_count = 0
train_loss = []
self.model.train() # 设置模型为训练模式
epoch_time = time.time() # 记录当前时间
for i, (batch_x, batch_y, batch_x_mark, batch_y_mark) in enumerate(train_loader):
iter_count += 1
model_optim.zero_grad() # 清零梯度
# 处理一个批次的数据
pred, true = self._process_one_batch(
train_data, batch_x, batch_y, batch_x_mark, batch_y_mark)
loss = criterion(pred, true) # 计算损失
train_loss.append(loss.item())
if (i + 1) % 100 == 0: # 每100个批次打印一次训练信息
print("\titers: {0}, epoch: {1} | loss: {2:.7f}".format(i + 1, epoch + 1, loss.item()))
speed = (time.time() - time_now) / iter_count # 计算每次迭代的速度
left_time = speed * ((self.args.train_epochs - epoch) * train_steps - i) # 估计剩余时间
print('\tspeed: {:.4f}s/iter; left time: {:.4f}s'.format(speed, left_time))
iter_count = 0
time_now = time.time()
# 使用AMP进行反向传播和优化
if self.args.use_amp:
scaler.scale(loss).backward()
scaler.step(model_optim)
scaler.update()
else:
loss.backward() # 反向传播计算梯度
model_optim.step() # 更新模型参数
print("Epoch: {} cost time: {}".format(epoch + 1, time.time() - epoch_time))
train_loss = np.average(train_loss) # 计算平均训练损失
vali_loss = self.vali(vali_data, vali_loader, criterion) # 计算验证损失
test_loss = self.vali(test_data, test_loader, criterion) # 计算测试损失
print("Epoch: {0}, Steps: {1} | Train Loss: {2:.7f} Vali Loss: {3:.7f} Test Loss: {4:.7f}".format(
epoch + 1, train_steps, train_loss, vali_loss, test_loss))
adjust_learning_rate(model_optim, epoch + 1, self.args) # 调整学习率
# 加载并返回最佳模型
best_model_path = path + '/' + 'checkpoint.pth'
self.model.load_state_dict(torch.load(best_model_path))
return self.model
进入```_process_one_batch(train_data, batch_x, batch_y, batch_x_mark, batch_y_mark),进行训练。
def _process_one_batch(self, dataset_object, batch_x, batch_y, batch_x_mark, batch_y_mark):
# 将输入和目标数据转换为浮点类型,并移动到指定的设备(GPU或CPU)
# batch_x 维度: [batch_size, seq_len, feature_dim]
# batch_y 维度: [batch_size, label_len + pred_len, feature_dim]
batch_x = batch_x.float().to(self.device)
batch_y = batch_y.float()
# 将时间特征标记数据转换为浮点类型,并移动到指定的设备(GPU或CPU)
# batch_x_mark 维度: [batch_size, seq_len, time_feature_dim]
# batch_y_mark 维度: [batch_size, label_len + pred_len, time_feature_dim]
batch_x_mark = batch_x_mark.float().to(self.device)
batch_y_mark = batch_y_mark.float().to(self.device)
# 生成解码器输入
if self.args.padding == 0:
# 如果使用零填充,则创建一个全零的张量作为解码器输入
# dec_inp 维度: [batch_size, pred_len, feature_dim]
dec_inp = torch.zeros([batch_y.shape[0], self.args.pred_len, batch_y.shape[-1]]).float()
elif self.args.padding == 1:
# 如果使用一填充,则创建一个全一的张量作为解码器输入
# dec_inp 维度: [batch_size, pred_len, feature_dim]
dec_inp = torch.ones([batch_y.shape[0], self.args.pred_len, batch_y.shape[-1]]).float()
# 将解码器输入拼接到标签长度的数据前面
# dec_inp 维度: [batch_size, label_len + pred_len, feature_dim]
dec_inp = torch.cat([batch_y[:, :self.args.label_len, :], dec_inp], dim=1).float().to(self.device)
# 编码器-解码器模型
if self.args.use_amp:
# 如果使用自动混合精度(AMP)
with torch.cuda.amp.autocast():
if self.args.output_attention:
# 如果输出注意力,则返回输出和注意力
# outputs 维度: [batch_size, pred_len, feature_dim]
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
else:
# 否则只返回输出
# outputs 维度: [batch_size, pred_len, feature_dim]
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
else:
if self.args.output_attention:
# 如果输出注意力,则返回输出和注意力
# outputs 维度: [batch_size, pred_len, feature_dim]
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
else:
# 否则只返回输出
# outputs 维度: [batch_size, pred_len, feature_dim]
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
# 如果使用逆变换,则对输出进行逆变换
if self.args.inverse:
outputs = dataset_object.inverse_transform(outputs)
# 根据特征类型调整目标数据的维度
# 如果特征类型是 'MS',则选择最后一个维度,否则选择第一个维度
f_dim = -1 if self.args.features == 'MS' else 0
# 调整后的 batch_y 维度: [batch_size, pred_len, feature_dim]
batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device)
# 返回模型输出和调整后的目标数据
return outputs, batch_y
embedding层
embedding有输入的编码转换和mask编码转换,但是informer官源代码加入了时间序列比transformer的只加位置编码多了时间序列编码。那么它的原理是特征列利用卷积层映射为d_model列+ position_embedding + 时间特征利用全连接层映射为d_model列。
class DataEmbedding(nn.Module):
def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
"""
DataEmbedding 初始化函数。
参数:
c_in (int): 输入通道数。
d_model (int): 嵌入维度。
embed_type (str): 嵌入类型,可以是['fixed', 'learned', 'timeF']。
freq (str): 时间频率,可以是['h', 'd', 'm', 'w', ...]。
dropout (float): Dropout 概率。
"""
super(DataEmbedding, self).__init__()
# Token Embedding,用于数值特征的嵌入
self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
# 位置嵌入,用于加入位置信息
self.position_embedding = PositionalEmbedding(d_model=d_model)
# 时间特征嵌入,用于加入时间信息
# 如果 embed_type 不是 'timeF',使用 TemporalEmbedding,否则使用 TimeFeatureEmbedding
self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type, freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq)
# Dropout 层
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, x_mark):
"""
前向传播函数。
参数:
x (Tensor): 输入数据,形状为 [B, L, c_in],其中 B 是批量大小,L 是序列长度,c_in 是输入通道数。
x_mark (Tensor): 时间特征数据,形状为 [B, L, 时间特征数]。
返回:
Tensor: 嵌入后的数据,形状为 [B, L, d_model]。
"""
# 特征列利用卷积层映射为d_model列+ position_embedding + 时间特征利用全连接层映射为d_model列
# value_embedding: [B, L, d_model]
# position_embedding: [B, L, d_model]
# temporal_embedding: [B, L, d_model]
x = self.value_embedding(x) + self.position_embedding(x) + self.temporal_embedding(x_mark)
# 进行 Dropout
return self.dropout(x)
encoder层
我们先看输入 [batch,seq_len,feature]
那么是多少根据原文,但是我改了一下数据集变成是**[32,95,6],其实特征维度是多少都无所谓,因为我们的模型是进行时序预测,那我们要做的事情是什么?那肯定是用前面的数据取预测后面的数据。我们encoder的名字是编码。那作用肯定是和名字一样,encoder 的最终输出是一个上下文向量,这个向量包含了对输入时间序列的抽象表示。输出将被传递给 decoder,用于生成未来时间步的预测。所以主要的作用还是特征提取。**
首先我们要讲一个参数 e_layers =[3,2,1],这是什么参数,我们看到原理图。就是下面两幅图的意思,我们的encoder输入会有多个编码器,但是我们需要控制参数,所以我们要确定我们的encoder的输入序列的比例。
举个例子
假设输入序列的长度为 96,e_layers 为 [3, 2, 1],则 inp_lens 为 [0, 1, 2]。对于每个编码器:
第一个编码器处理整个输入序列(长度为 96)。
第二个编码器处理输入序列的后半部分(长度为 48)。
第三个编码器处理输入序列的最后四分之一(长度为 24)。
看到代码 我们的informer主体里面的encoder是
self.encoder = EncoderStack(encoders, inp_lens)
那么我们要知道encoders和inp_lens是什么,inp_lens已经解释过了,那么encoders是什么呢?
看代码,那么下面的意思是,我们每个 Encoder 实例由多个 EncoderLayer 和 ConvLayer 组成。每个 EncoderLayer 包含一个 AttentionLayer、两个 ConvLayer(前馈网络的一部分),以及层归一化和 dropout 层。e_layers 列表定义了每个 Encoder 实例中 EncoderLayer 的数量。
encoders = [ # 创建一个由多个 Encoder 实例组成的列表
Encoder( # 创建一个 Encoder 实例
[
EncoderLayer( # 创建多个 EncoderLayer 实例并组成列表
AttentionLayer( # 创建 AttentionLayer 实例
Attn(False, factor, attention_dropout=dropout, output_attention=output_attention),
# 初始化注意力层,Attn 是 ProbAttention 或 FullAttention,取决于 attn 参数
d_model, # 模型维度
n_heads, # 注意力头的数量
mix=False # 是否使用混合注意力,这里为 False
),
d_model, # 模型维度
d_ff, # 前馈网络的维度
dropout=dropout, # dropout 率
activation=activation # 激活函数,可以是 relu 或 gelu
) for l in range(el) # 创建 el 个 EncoderLayer
],
[
ConvLayer( # 创建多个 ConvLayer 实例并组成列表
d_model # 卷积层的输入和输出通道数
) for l in range(el-1) # 创建 el-1 个 ConvLayer,如果 distil 为 True
] if distil else None, # 如果 distil 为 True,则添加卷积层,否则为 None
norm_layer=torch.nn.LayerNorm(d_model) # 添加层归一化层,归一化维度为 d_model
) for el in e_layers # 对 e_layers 中的每个元素 el 执行上述操作
]
那么我们从小的看起,在models/encoder.py里面
1.初始化 Encoder 类
attn_layers: 注意力层的列表,每一层都是一个 EncoderLayer 实例。
conv_layers: 卷积层的列表,可选。如果存在,则每一层都是一个 ConvLayer 实例。
norm_layer: 层归一化层,用于在编码最后进行归一化。和transformer一样。
def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
super(Encoder, self).__init__()
self.attn_layers = nn.ModuleList(attn_layers)
self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
self.norm = norm_layer
2.前向传播
x: 输入张量,形状为 [B, L, D],即 (batch_size, sequence_length, d_model)。
attn_mask: 注意力掩码,用于遮掩特定位置的注意力计算,可选。
attns: 用于存储每一层的注意力权重。
def forward(self, x, attn_mask=None):
attns = []
if self.conv_layers is not None:
for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
x, attn = attn_layer(x, attn_mask=attn_mask)
x = conv_layer(x)
attns.append(attn)
x, attn = self.attn_layers[-1](x, attn_mask=attn_mask)
attns.append(attn)
else:
for attn_layer in self.attn_layers:
x, attn = attn_layer(x, attn_mask=attn_mask)
attns.append(attn)
if self.norm is not None:
x = self.norm(x)
return x, attns
3处理包含卷积层的情况
如果存在卷积层,则每个注意力层和卷积层交替进行。
先通过 attn_layer 计算注意力,再通过 conv_layer 进行卷积操作。
收集每一层的注意力权重。
if self.conv_layers is not None:
for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
x, attn = attn_layer(x, attn_mask=attn_mask)
x = conv_layer(x)
attns.append(attn)
x, attn = self.attn_layers[-1](x, attn_mask=attn_mask)
attns.append(attn)
4 处理不包含卷积层的情况:
else:
for attn_layer in self.attn_layers:
x, attn = attn_layer(x, attn_mask=attn_mask)
attns.append(attn)
我们继续来看我们informer的主体之一 EncoderStack
class EncoderStack(nn.Module):
def __init__(self, encoders, inp_lens):
super(EncoderStack, self).__init__()
self.encoders = nn.ModuleList(encoders) # 将传入的 encoder 列表转换为 ModuleList
self.inp_lens = inp_lens # 输入长度的列表,用于决定每个 encoder 处理的输入长度
def forward(self, x, attn_mask=None):
# x [B, L, D],即 (batch_size, sequence_length, d_model)
x_stack = [] # 用于存储每个 encoder 的输出
attns = [] # 用于存储每个 encoder 的注意力权重
for i_len, encoder in zip(self.inp_lens, self.encoders):
# 计算每个 encoder 处理的输入序列长度
inp_len = x.shape[1] // (2**i_len)
# 取输入序列的最后 inp_len 个时间步,并传入 encoder 进行编码
x_s, attn = encoder(x[:, -inp_len:, :])
# 将每个 encoder 的输出和注意力权重添加到对应的列表中
x_stack.append(x_s)
attns.append(attn)
# 将所有 encoder 的输出在序列长度维度上连接起来
x_stack = torch.cat(x_stack, -2)
return x_stack, attns # 返回最终编码后的输出和所有的注意力权重
EncoderStack 类由多个 Encoder 组成,每个 Encoder 包含多个 EncoderLayer 和可选的 ConvLayer。
inp_lens 用于确定每个编码器处理的输入序列长度,通过取输入序列的最后 inp_len 个时间步的数据来进行编码。最终将所有编码器的输出在序列长度维度上连接起来,返回编码后的输出和所有注意力权重。
接下来是Encoder的EncoderLayer 和ConvLayer,个人理解ConvLayer主要是降低维度的作用,也就是对应原文图中金字塔的梯形部分。
EncoderLayer 就是梯形减少参数下面的一层怎么说呢我用一张图表示,3表示ConvLaye,1表示 EncoderLayer。
class ConvLayer(nn.Module):
def __init__(self, c_in):
super(ConvLayer, self).__init__()
# 根据 torch 版本设置 padding 大小
padding = 1 if torch.__version__>='1.5.0' else 2
# 一维卷积层,输入和输出通道数相同
self.downConv = nn.Conv1d(in_channels=c_in,
out_channels=c_in,
kernel_size=3,
padding=padding,
padding_mode='circular')
# 批归一化层
self.norm = nn.BatchNorm1d(c_in)
# 激活函数,使用 ELU
self.activation = nn.ELU()
# 最大池化层,池化窗口大小为 3,步长为 2,padding 为 1
self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
def forward(self, x):
# 输入 x 的维度为 [B, L, D] =[batch,sq_len,d_model]
# permute 改变维度顺序为 [B, D, L],以适应 Conv1d 的输入要求
x = self.downConv(x.permute(0, 2, 1))
# 进行批归一化
x = self.norm(x)
# 应用激活函数
x = self.activation(x)
# 最大池化
x = self.maxPool(x)
# 恢复原始维度顺序 [B, L, D]
x = x.transpose(1,2)
return x
class EncoderLayer(nn.Module):
def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
super(EncoderLayer, self).__init__()
d_ff = d_ff or 4*d_model # 如果没有指定d_ff,则默认值为4倍的d_model
self.attention = attention # 自注意力机制
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1) # 第一个卷积层
self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1) # 第二个卷积层
self.norm1 = nn.LayerNorm(d_model) # 第一个LayerNorm层
self.norm2 = nn.LayerNorm(d_model) # 第二个LayerNorm层
self.dropout = nn.Dropout(dropout) # Dropout层
self.activation = F.relu if activation == "relu" else F.gelu # 激活函数,支持ReLU和GELU
def forward(self, x, attn_mask=None):
# 输入 x 的维度为 [B, L, D],其中 B 是批大小,L 是序列长度,D 是特征维度
# 计算自注意力,x,x,x传入三个x代表分别代表qkv特征,new_x 是注意力输出,attn 是注意力权重
new_x, attn = self.attention(
x, x, x,
attn_mask = attn_mask
)
x = x + self.dropout(new_x) # 残差连接并应用dropout
# 第一个LayerNorm层
y = x = self.norm1(x)
# 将y的最后一个维度转换到第一个位置,以便进行1D卷积
y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1)))) # 第一个卷积层后应用激活函数和dropout
y = self.dropout(self.conv2(y).transpose(-1, 1)) # 第二个卷积层后应用dropout,并将维度转换回去
# 第二个LayerNorm层,返回最后的输出和注意力权重
return self.norm2(x + y), attn # 残差连接并应用第二个LayerNorm层
P注意力
在attn.py里面的
注意力层
其实很简单,定义好qkv和mask后之间调用注意力层, self.inner_attention = attention这里是进行注意力计算。
class AttentionLayer(nn.Module):
def __init__(self, attention, d_model, n_heads,
d_keys=None, d_values=None, mix=False):
super(AttentionLayer, self).__init__()
d_keys = d_keys or (d_model//n_heads)
d_values = d_values or (d_model//n_heads)
self.inner_attention = attention#这里定义注意力计算
self.query_projection = nn.Linear(d_model, d_keys * n_heads)
self.key_projection = nn.Linear(d_model, d_keys * n_heads)
self.value_projection = nn.Linear(d_model, d_values * n_heads)
self.out_projection = nn.Linear(d_values * n_heads, d_model)
self.n_heads = n_heads
self.mix = mix
def forward(self, queries, keys, values, attn_mask):
B, L, _ = queries.shape
_, S, _ = keys.shape
H = self.n_heads
queries = self.query_projection(queries).view(B, L, H, -1)
keys = self.key_projection(keys).view(B, S, H, -1)
values = self.value_projection(values).view(B, S, H, -1)
out, attn = self.inner_attention(
queries,
keys,
values,
attn_mask
)
if self.mix:
out = out.transpose(2,1).contiguous()
out = out.view(B, L, -1)
return self.out_projection(out), attn
然后就是class ProbAttention(nn.Module): 里面的forward函数,ProbAttention定义了三个函数, def _get_initial_context是初始化上下文 context。上下文是注意力机制的输出之一,用于存储注意力得分和键值的加权和。如果没有掩码,则计算 V 的平均值,并扩展为查询的长度 L_Q。如果使用掩码,计算累积和(cumulative sum)作为上下文。_update_context是更新上下文 context,这是通过计算注意力得分和加权和实现的。计算得分。
根据公式计算
def forward(self, queries, keys, values, attn_mask):
B, L_Q, H, D = queries.shape
_, L_K, _, _ = keys.shape
# 转置查询、键和值的维度,从 [B, L, H, D] 变为 [B, H, L, D] ,因为要把头也就是H放前面好让每个头单独计算
queries = queries.transpose(2, 1)
keys = keys.transpose(2, 1)
values = values.transpose(2, 1)
# 计算采样数量 U_part 和 u,分别对应 L_K 和 L_Q 的对数值乘以 factor ,L_K 和 L_Q是对应的K,Q键值的数量
U_part = self.factor * np.ceil(np.log(L_K)).astype('int').item() # c*ln(L_k)
u = self.factor * np.ceil(np.log(L_Q)).astype('int').item() # c*ln(L_q)
# 确保采样数量不超过实际的 L_K 和 L_Q
U_part = U_part if U_part < L_K else L_K
u = u if u < L_Q else L_Q
# 根据采样数量计算查询和键的相关性得分,并选出得分最高的 u 个查询
scores_top, index = self._prob_QK(queries, keys, sample_k=U_part, n_top=u)
# 计算缩放因子
scale = self.scale or 1. / sqrt(D)
if scale is not None:
scores_top = scores_top * scale
# 初始化上下文,使用值 V 和查询长度 L_Q
context = self._get_initial_context(values, L_Q)
# 使用选出的 top_k 查询更新上下文
context, attn = self._update_context(context, values, scores_top, index, L_Q, attn_mask)
# 转置上下文维度,从 [B, H, L, D] 变为 [B, L, H, D] 因为前面转置为了计算,后面转置为了维度匹配
return context.transpose(2, 1).contiguous(), attn
decoder
来到解码操作了,解码简单,通过encoder 的输入,和自己的decoder输入,再经过两个注意力机制(自注意力机制,和交叉注意力机制,和transformer一样)得到输出。
# 解码器输入序列的起始部分长度
label_len = 2
# 预测序列长度
pred_len = 3
# 假设我们有一段历史股价数据
history_data = [100, 101, 102, 103, 104]
# 模型的输入包括过去 label_len 的数据和预测序列长度 pred_len
# 例如,输入解码器的序列长度为 label_len + pred_len
dec_inp = torch.cat([history_data[-label_len:], torch.zeros([pred_len])], dim=0)
print(dec_inp)
# 输出可能是 [103, 104, 0, 0, 0],表示解码器的初始输入
如果设置 label_len=2,pred_len=3,那么解码器的输入将包括过去2天的数据作为起始部分,预测未来3天的股价。
在这个例子中,label_len 被设置为2,pred_len 被设置为3,解码器的初始输入包括过去2天的数据和未来3天的预测。label_len 的意义在于提供解码器的初始输入部分,它决定了解码器在进行预测时可以利用的历史上下文信息长度。通过调整 label_len,可以控制解码器使用的历史数据长度,从而影响预测结果的准确性。
通过上面这个例子让我知道了,解码器的原理以及pre_len和dec_inp之间的关系,在解码器中,label_len 提供了解码器所需的历史上下文信息(也就是),输入序列的总长度为 label_len + pred_len。其中,label_len 是解码器的初始输入长度,pred_len 是解码器需要预测的未来时间步数。所以原文中的 label_len + pred_len的意思是
那我们的注意力机制在哪?DecoderLayer,我们的Decoder里面包含很多个DecoderLayer层。
class DecoderLayer(nn.Module):
def __init__(self, self_attention, cross_attention, d_model, d_ff=None,
dropout=0.1, activation="relu"):
super(DecoderLayer, self).__init__()
d_ff = d_ff or 4 * d_model # 默认设置前馈神经网络的隐藏层维度为模型维度的4倍
self.self_attention = self_attention # 自注意力机制
self.cross_attention = cross_attention # 交叉注意力机制
#下面的就是通道变换的全连接神经网络的内容,与transformer一致
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1) #d_model->diff
self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1) #diff->d_model
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout) # dropout层,用于防止过拟合
self.activation = F.relu if activation == "relu" else F.gelu # 激活函数
def forward(self, x, cross, x_mask=None, cross_mask=None):
# 自注意力机制
x = x + self.dropout(self.self_attention(
x, x, x,
attn_mask=x_mask
)[0])
x = self.norm1(x)
# 交叉注意力机制
x = x + self.dropout(self.cross_attention(
x, cross, cross,
attn_mask=cross_mask
)[0])
y = x = self.norm2(x)
# 前馈神经网络部分
y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
y = self.dropout(self.conv2(y).transpose(-1, 1))
return self.norm3(x + y)
训练
代码如下,创建1.py 然后就是参数修改,参数备注我已经备注好了,看着来。然后就是,数据集的问题,data_parser里面有我自己创建的数据集跟着改就行。
import argparse
import os
import torch
from exp.exp_informer import Exp_Informer
# 定义参数
args = argparse.Namespace(
model='informer', # 模型名称,可以是[informer, informerstack, informerlight(TBD)]
data='wq', # 数据集名称
root_path='Informer2020/data/ETT', # 数据文件的根路径
data_path='601398.XSHG(工商银行14-24).csv', # 数据文件名称
features='MS', # 预测任务类型,可以是[M, S, MS]
target='close', # 在S或MS任务中使用的目标特征
freq='d', # 时间特征编码的频率,可以是[s(秒级), t(分钟级), h(小时级), d(天级), b(工作日级), w(周级), m(月级)]
checkpoints='./checkpoints/', # 模型检查点的位置
seq_len=6, # 编码器的输入序列长度
label_len=3, # 是解码器输入序列的起始部分的长度。它表示在解码过程中,从过去数据中选取多少个时间步作为解码器的起始输入。
pred_len=1, # 预测序列长度 要预测的天数
enc_in=6, # 编码器输入维度
dec_in=6, # 解码器输入维度
c_out=1, # 模型输出维度
d_model=512, # 模型维度
n_heads=8, # 注意力头的数量
e_layers=2, # 编码器层数
d_layers=1, # 解码器层数
s_layers='3,2,1', # 堆叠编码器层数
d_ff=2048, # 前馈神经网络的维度
factor=5, # 稀疏注意力的因子 也就是加入你均摊的q
padding=0, # 填充类型
distil=True, # 是否在编码器中使用蒸馏
dropout=0.05, # dropout概率
attn='prob', # 编码器中使用的注意力类型,可以是[prob, full]
embed='timeF', # 时间特征编码的方式,可以是[timeF, fixed, learned]
activation='relu', # 激活函数
output_attention=False, # 是否输出编码器的注意力权重
do_predict=False, # 是否预测未见过的未来数据
mix=True, # 是否在生成解码器中使用混合注意力
cols=None, # 从数据文件中选择的特定列作为输入特征
num_workers=0, # 数据加载器的工作进程数
itr=1, # 实验的重复次数
train_epochs=50, # 训练的轮数
batch_size=8, # 训练输入数据的批次大小
patience=500000000, # 早停的耐心值
learning_rate=0.0001, # 优化器的学习率
des='test', # 实验描述
loss='mse', # 损失函数
lradj='type1', # 学习率调整的方式
use_amp=False, # 是否使用自动混合精度训练
inverse=False, # 是否反转输出数据
use_gpu=True, # 是否使用GPU
gpu=0, # 用于训练和推理的GPU编号
use_multi_gpu=False, # 是否使用多GPU
devices='0,1,2,3' # 多GPU设备ID
)
# 检查是否可以使用GPU
args.use_gpu = True if torch.cuda.is_available() and args.use_gpu else False
# 如果使用多GPU,设置多GPU的设备ID
if args.use_gpu and args.use_multi_gpu:
args.devices = args.devices.replace(' ', '')
device_ids = args.devices.split(',')
args.device_ids = [int(id_) for id_ in device_ids]
args.gpu = args.device_ids[0]
# 数据集信息配置
#
data_parser = {
'ETTh1': {'data': 'ETTh1.csv', 'T': 'OT', 'M': [7, 7, 7], 'S': [1, 1, 1], 'MS': [7, 7, 1]},
'ETTh2': {'data': 'ETTh2.csv', 'T': 'OT', 'M': [7, 7, 7], 'S': [1, 1, 1], 'MS': [7, 7, 1]},
'ETTm1': {'data': 'ETTm1.csv', 'T': 'OT', 'M': [7, 7, 7], 'S': [1, 1, 1], 'MS': [7, 7, 1]},
'ETTm2': {'data': 'ETTm2.csv', 'T': 'OT', 'M': [7, 7, 7], 'S': [1, 1, 1], 'MS': [7, 7, 1]},
'WTH': {'data': 'WTH.csv', 'T': 'WetBulbCelsius', 'M': [12, 12, 12], 'S': [1, 1, 1], 'MS': [12, 12, 1]},
'ECL': {'data': 'ECL.csv', 'T': 'MT_320', 'M': [321, 321, 321], 'S': [1, 1, 1], 'MS': [321, 321, 1]},
'Solar': {'data': 'solar_AL.csv', 'T': 'POWER_136', 'M': [137, 137, 137], 'S': [1, 1, 1], 'MS': [137, 137, 1]},
#这是我新加的数据集一开始是数据文件的名称可以自定义 data:数据文件名(全称),T:你要预测的标签列,M:预测变量数(对于我这个数据给了六列特征预测一列,则为[6(编码器输入维度),6(解码器输入维度),1(要预测的列数)]),
'wq': {'data': '601398.XSHG(工商银行14-24).csv', 'T': 'close', 'MS': [6, 6, 2], 'S': [1, 1, 1], 'MS': [6, 6, 1]},
}
# 根据数据集名称设置数据相关参数
if args.data in data_parser.keys():
data_info = data_parser[args.data]
args.data_path = data_info['data']
args.target = data_info['T']
args.enc_in, args.dec_in, args.c_out = data_info[args.features]
# 处理堆叠编码器层参数
args.s_layers = [int(s_l) for s_l in args.s_layers.replace(' ', '').split(',')]
args.detail_freq = args.freq
args.freq = args.freq[-1:]
print('Args in experiment:')
print(args)
# 实验类实例化
Exp = Exp_Informer
for ii in range(args.itr):
# 设置实验记录
setting = '{}_{}_ft{}_sl{}_ll{}_pl{}_dm{}_nh{}_el{}_dl{}_df{}_at{}_fc{}_eb{}_dt{}_mx{}_{}_{}'.format(
args.model, args.data, args.features, args.seq_len, args.label_len, args.pred_len,
args.d_model, args.n_heads, args.e_layers, args.d_layers, args.d_ff, args.attn, args.factor,
args.embed, args.distil, args.mix, args.des, ii
)
exp = Exp(args) # 设置实验
print('>>>>>>>start training : {}>>>>>>>>>>>>>>>>>>>>>>>>>>'.format(setting))
exp.train(setting)
print('>>>>>>>testing : {}<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<'.format(setting))
exp.test(setting)
if args.do_predict:
print('>>>>>>>predicting : {}<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<'.format(setting))
exp.predict(setting, True)
torch.cuda.empty_cache()