前言
- TimesNet:用于一般时间序列分析的时间二维变化模型论文下载地址,Github项目地址,论文解读系列
- 本文针对TimesNet模型参数与模型架构开源代码进行讲解,本人水平有限,若出现解读错误,欢迎指出
- 开源代码中分别实现长短期序列预测、插补、异常检测、分类任务,本文针对长短期序列预测为例进行讲解。
- 后面会推出自定义项目,以及使用NNI框架对模型关键参数进行调节,也可能会推出模型压缩等相关文章,如果感兴趣请关注点赞收藏,谢谢
参数设定模块(run)
- 首先打开
run.py
文件保证在不修改任何参数的情况下,代码可以跑通,这里windows系统需要将代码中--is_training
、--model_id
、--model
、--data
参数中required=True
选项删除,否则会报错。--num_workers
参数需要置为0。 - 其次需要在项目文件夹下新建子文件夹
data
用来存放训练数据,可以使用ETTh1
数据,这里提供下载地址 - 运行
run.py
训练完成不报错就成功了,建议在GPU上进行,CPU几乎没办法训练。
参数含义
- 下面是各参数含义(注释)
parser = argparse.ArgumentParser(description='TimesNet')
# basic config
# 任务类型
parser.add_argument('--task_name', type=str, default='long_term_forecast',
help='task name, options:[long_term_forecast, short_term_forecast, imputation, classification, anomaly_detection]')
# 置为训练模式
parser.add_argument('--is_training', type=int, default=1, help='status')
# 模型名称
parser.add_argument('--model_id', type=str, default='test', help='model id')
# 选择模型
parser.add_argument('--model', type=str, default='TimesNet',
help='model name, options: [Autoformer, Transformer, TimesNet]')
# data loader
# 数据名称
parser.add_argument('--data', type=str, default='ETTh1', help='dataset type')
# 数据所在文件夹
parser.add_argument('--root_path', type=str, default='./data/', help='root path of the data file')
# 数据文件全称
parser.add_argument('--data_path', type=str, default='ETTh1.csv', help='data file')
# 时间特征处理方式
parser.add_argument('--features', type=str, default='M',
help='forecasting task, options:[M, S, MS]; M:multivariate predict multivariate, S:univariate predict univariate, MS:multivariate predict univariate')
# 目标列列名
parser.add_argument('--target', type=str, default='OT', help='target feature in S or MS task')
# 时间采集粒度
parser.add_argument('--freq', type=str, default='h',
help='freq for time features encoding, options:[s:secondly, t:minutely, h:hourly, d:daily, b:business days, w:weekly, m:monthly], you can also use more detailed freq like 15min or 3h')
# 模型权重保存文件夹
parser.add_argument('--checkpoints', type=str, default='./checkpoints/', help='location of model checkpoints')
# forecasting task
# 回视窗口
parser.add_argument('--seq_len', type=int, default=96, help='input sequence length')
# 先验序列长度
parser.add_argument('--label_len', type=int, default=48, help='start token length')
# 预测窗口长度
parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
# 季节模式(针对M4数据集)
parser.add_argument('--seasonal_patterns', type=str, default='Monthly', help='subset for M4')
# inputation task
# 插补任务中数据丢失率
parser.add_argument('--mask_rate', type=float, default=0.25, help='mask ratio')
# anomaly detection task
# 异常检测中异常点占比
parser.add_argument('--anomaly_ratio', type=float, default=0.25, help='prior anomaly ratio (%)')
# model define
# TimesBlock中傅里叶变换,频率排名前k个周期
parser.add_argument('--top_k', type=int, default=5, help='for TimesBlock')
# Inception中卷积核个数
parser.add_argument('--num_kernels', type=int, default=6, help='for Inception')
# encoder输入特征数
parser.add_argument('--enc_in', type=int, default=7, help='encoder input size')
# decoder输入特征数
parser.add_argument('--dec_in', type=int, default=7, help='decoder input size')
# 输出通道数
parser.add_argument('--c_out', type=int, default=7, help='output size')
# 线性层隐含神经元个数
parser.add_argument('--d_model', type=int, default=512, help='dimension of model')
# 多头注意力机制
parser.add_argument('--n_heads', type=int, default=8, help='num of heads')
# encoder层数
parser.add_argument('--e_layers', type=int, default=2, help='num of encoder layers')
# decoder层数
parser.add_argument('--d_layers', type=int, default=1, help='num of decoder layers')
# FFN层隐含神经元个数
parser.add_argument('--d_ff', type=int, default=2048, help='dimension of fcn')
# 滑动窗口长度
parser.add_argument('--moving_avg', type=int, default=25, help='window size of moving average')
# 对Q进行采样,对Q采样的因子数
parser.add_argument('--factor', type=int, default=1, help='attn factor')
# 是否下采样操作pooling
parser.add_argument('--distil', action='store_false',
help='whether to use distilling in encoder, using this argument means not using distilling',
default=True)
# dropout率
parser.add_argument('--dropout', type=float, default=0.1, help='dropout')
# 时间特征嵌入方式
parser.add_argument('--embed', type=str, default='timeF',
help='time features encoding, options:[timeF, fixed, learned]')
# 激活函数类型
parser.add_argument('--activation', type=str, default='gelu', help='activation')
# 是否输出attention
parser.add_argument('--output_attention', action='store_true', help='whether to output attention in ecoder')
# optimization
# 并行核心数
parser.add_argument('--num_workers', type=int, default=0, help='data loader num workers')
# 实验轮数
parser.add_argument('--itr', type=int, default=1, help='experiments times')
# 训练迭代次数
parser.add_argument('--train_epochs', type=int, default=10, help='train epochs')
# batch size大小
parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')
# early stopping机制容忍次数
parser.add_argument('--patience', type=int, default=3, help='early stopping patience')
# 学习率
parser.add_argument('--learning_rate', type=float, default=0.0001, help='optimizer learning rate')
parser.add_argument('--des', type=str, default='test', help='exp description')
# 损失函数
parser.add_argument('--loss', type=str, default='MSE', help='loss function')
# 学习率下降策略
parser.add_argument('--lradj', type=str, default='type1', help='adjust learning rate')
# 使用混合精度训练
parser.add_argument('--use_amp', action='store_true', help='use automatic mixed precision training', default=False)
# GPU
parser.add_argument('--use_gpu', type=bool, default=False, help='use gpu')
parser.add_argument('--gpu', type=int, default=0, help='gpu')
parser.add_argument('--use_multi_gpu', action='store_true', help='use multiple gpus', default=False)
parser.add_argument('--devices', type=str, default='0,1,2,3', help='device ids of multile gpus')
# de-stationary projector params
parser.add_argument('--p_hidden_dims', type=int, nargs='+', default=[128, 128],
help='hidden layer dimensions of projector (List)')
parser.add_argument('--p_hidden_layers', type=int, default=2, help='number of hidden layers in projector')
我们在exp.train(setting)
行打上断点跳到训练主函数exp_long_term_forecasting.py
。
数据处理模块
在_get_data
中找到数据处理函数data_factory.py
点击进入,可以看到各标准数据集处理方法:
data_dict = {
'ETTh1': Dataset_ETT_hour,
'ETTh2': Dataset_ETT_hour,
'ETTm1': Dataset_ETT_minute,
'ETTm2': Dataset_ETT_minute,
'custom': Dataset_Custom,
'm4': Dataset_M4,
'PSM': PSMSegLoader,
'MSL': MSLSegLoader,
'SMAP': SMAPSegLoader,
'SMD': SMDSegLoader,
'SWAT': SWATSegLoader,
'UEA': UEAloader
}
- 由于我们的数据集是
ETTh1
,那么数据处理的方式为Dataset_ETT_hour
,我们进入data_loader.py
文件,找到Dataset_ETT_hour
类 __init__
主要用于传各类参数,这里不过多赘述,主要对__read_data__
进行说明
def __read_data__(self):
# 数据标准化实例
self.scaler = StandardScaler()
# 读取数据
df_raw = pd.read_csv(os.path.join(self.root_path,
self.data_path))
# 计算数据起始点
border1s = [0, 12 * 30 * 24 - self.seq_len, 12 * 30 * 24 + 4 * 30 * 24 - self.seq_len]
border2s = [12 * 30 * 24, 12 * 30 * 24 + 4 * 30 * 24, 12 * 30 * 24 + 8 * 30 * 24]
border1 = border1s[self.set_type]
border2 = border2s[self.set_type]
# 如果预测对象为多变量预测或多元预测单变量
if self.features == 'M' or self.features == 'MS':
# 取除日期列的其他所有列
cols_data = df_raw.columns[1:]
df_data = df_raw[cols_data]
# 若预测类型为S(单特征预测单特征)
elif self.features == 'S':
# 取特征列
df_data = df_raw[[self.target]]
# 将数据进行归一化
if self.scale:
train_data = df_data[border1s[0]:border2s[0]]
self.scaler.fit(train_data.values)
data = self.scaler.transform(df_data.values)
else:
data = df_data.values
# 取日期列
df_stamp = df_raw[['date']][border1:border2]
# 利用pandas将数据转换为日期格式
df_stamp['date'] = pd.to_datetime(df_stamp.date)
# 构建时间特征
if self.timeenc == 0:
df_stamp['month'] = df_stamp.date.apply(lambda row: row.month, 1)
df_stamp['day'] = df_stamp.date.apply(lambda row: row.day, 1)
df_stamp['weekday'] = df_stamp.date.apply(lambda row: row.weekday(), 1)
df_stamp['hour'] = df_stamp.date.apply(lambda row: row.hour, 1)
data_stamp = df_stamp.drop(['date'], 1).values
elif self.timeenc == 1:
# 时间特征构造函数
data_stamp = time_features(pd.to_datetime(df_stamp['date'].values), freq=self.freq)
# 转置
data_stamp = data_stamp.transpose(1, 0)
# 取数据特征列
self.data_x = data[border1:border2]
self.data_y = data[border1:border2]
self.data_stamp = data_stamp
- 需要注意的是
time_features
函数,用来提取日期特征,比如't':['month','day','weekday','hour','minute']
,表示提取月,天,周,小时,分钟。可以打开timefeatures.py
文件进行查阅,同样后期也可以加一些日期编码进去。 - 同样的,对
__getitem__
进行说明
def __getitem__(self, index):
# 随机取得标签
s_begin = index
# 训练区间
s_end = s_begin + self.seq_len
# 有标签区间+无标签区间(预测时间步长)
r_begin = s_end - self.label_len
r_end = r_begin + self.label_len + self.pred_len
# 取训练数据
seq_x = self.data_x[s_begin:s_end]
seq_y = self.data_y[r_begin:r_end]
# 取训练数据对应时间特征
seq_x_mark = self.data_stamp[s_begin:s_end]
# 取有标签区间+无标签区间(预测时间步长)对应时间特征
seq_y_mark = self.data_stamp[r_begin:r_end]
return seq_x, seq_y, seq_x_mark, seq_y_mark
- 关于这部分数据处理可能有些绕,开源看我在SCInet代码讲解中数据处理那一部分,绘制了数据集划分图。
网络架构
- 打开
models
文件夹下的TimesNet.py
,可以看到TimesBlock
和Model
类
TimesBlock
- 可以看到一维离散傅里叶变换
FFT_for_Period
函数逐行代码解析请看注释,示意图如下:
def FFT_for_Period(x, k=2):
# [B, T, C]
# 一维离散傅里叶变换,沿T维度[B, T, C] --> [B, T//2+1, C]
xf = torch.fft.rfft(x, dim=1)
# 通过增幅寻找周期
# 在每个频率上的平均值,然后对所有频率取平均[B, T//2+1, C] --> [T//2+1]
frequency_list = abs(xf).mean(0).mean(-1)
# 将第一个频率值设置为0(直流分量)
frequency_list[0] = 0
# 在频率列表中找到前K个最大值的索引
_, top_list = torch.topk(frequency_list, k)
# 将top_list张量转换为numpy数组
top_list = top_list.detach().cpu().numpy()
# 通过将序列的总长度除以每个顶部频率,计算周期
period = x.shape[1] // top_list
# 返回period,计算选定的顶部频率在最后一个维度C上的平均幅度[B, T//2+1, C] --> [B, k]
return period, abs(xf).mean(-1)[:, top_list]
- 转到
TimesBlock
类,论文中提到的parameter-efficient
块是一个类似于Inception
视觉模型的架构。
Inception
模型采用多分支结构,它将1×1卷积、3×3卷积最大池化堆叠在一起。这种结构既可以增加网络的宽度,又可以增强网络对不同尺寸的适应性。- 打开
layers
文件夹下的Conv_Blocks.py
文件。里面实现了两个类Inception_Block_V1
、Inception_Block_V2
。作者在TimesBlock
中只使用了V1
版本。 - 代码逐行解析看注释,重点看各卷积层卷积核的变化和
padding
操作。
class Inception_Block_V1(nn.Module):
def __init__(self, in_channels, out_channels, num_kernels=6, init_weight=True):
super(Inception_Block_V1, self).__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.num_kernels = num_kernels
kernels = []
for i in range(self.num_kernels):
# 二维卷积层,卷积核(1, 3, 5, 7,...)
kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=2 * i + 1, padding=i))
self.kernels = nn.ModuleList(kernels)
if init_weight:
self._initialize_weights()
# 初始化权重
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
def forward(self, x):
# 新建列表
res_list = []
# 遍历每一层2维卷积
for i in range(self.num_kernels):
# 将每一层的结果添加到列表中
res_list.append(self.kernels[i](x))
# 将列表沿最后一个维度拼接, 然后在最后一个维度求平均
res = torch.stack(res_list, dim=-1).mean(-1)
return res
class Inception_Block_V2(nn.Module):
def __init__(self, in_channels, out_channels, num_kernels=6, init_weight=True):
super(Inception_Block_V2, self).__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.num_kernels = num_kernels
kernels = []
for i in range(self.num_kernels // 2):
# 二维卷积,卷积核为[(1,3), (1,5), (1,7)]
kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=[1, 2 * i + 3], padding=[0, i + 1]))
# 二维卷积,卷积核为[(3,1), (5,1), (7,1)]
kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=[2 * i + 3, 1], padding=[i + 1, 0]))
# 再添加一个2维卷积,卷积核为(1,1)
kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=1))
self.kernels = nn.ModuleList(kernels)
if init_weight:
self._initialize_weights()
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
def forward(self, x):
res_list = []
for i in range(self.num_kernels + 1):
# 得到每层的输出
res_list.append(self.kernels[i](x))
# 将列表沿最后一个维度拼接, 然后在最后一个维度求平均
res = torch.stack(res_list, dim=-1).mean(-1)
return res
- 回到
TimesBlock
类,再经过一维离散傅里叶变换后,取得频率排名前k的周期,将其进行padding使其符合卷积操作数据维度。将得到的结果进行堆叠拼接。然后把频率权重进行自适应聚合,最后加入残差连接得到输出。具体看代码解析
class TimesBlock(nn.Module):
def __init__(self, configs):
super(TimesBlock, self).__init__()
self.seq_len = configs.seq_len
self.pred_len = configs.pred_len
self.k = configs.top_k
# parameter-efficient块
self.conv = nn.Sequential(
Inception_Block_V1(configs.d_model, configs.d_ff,
num_kernels=configs.num_kernels),
nn.GELU(),
Inception_Block_V1(configs.d_ff, configs.d_model,
num_kernels=configs.num_kernels)
)
def forward(self, x):
# 得到B, T, N
B, T, N = x.size()
# 得到周期以及对应权重
period_list, period_weight = FFT_for_Period(x, self.k)
res = []
for i in range(self.k):
# 取周期
period = period_list[i]
# padding操作
if (self.seq_len + self.pred_len) % period != 0:
length = (((self.seq_len + self.pred_len) // period) + 1) * period
# padding:[batch,length - seq_len - pred_len,feature]
padding = torch.zeros([x.shape[0], (length - (self.seq_len + self.pred_len)), x.shape[2]]).to(x.device)
# 沿len维度拼接
out = torch.cat([x, padding], dim=1)
# 不需要padding
else:
length = (self.seq_len + self.pred_len)
out = x
# 重塑[batch,length // period,period,feature] --> [batch,feature,length // period,period](深拷贝)
out = out.reshape(B, length // period, period,
N).permute(0, 3, 1, 2).contiguous()
# 进入卷积网络
out = self.conv(out)
# 重塑结果(batch, -1, feature)
out = out.permute(0, 2, 3, 1).reshape(B, -1, N)
res.append(out[:, :(self.seq_len + self.pred_len), :])
# 将结果沿feature维度拼接
res = torch.stack(res, dim=-1)
# 自适应聚合
period_weight = F.softmax(period_weight, dim=1)
# 添加两个维度,重复使其与res维度一致
period_weight = period_weight.unsqueeze(
1).unsqueeze(1).repeat(1, T, N, 1)
# 沿feature维度求和
res = torch.sum(res * period_weight, -1)
# 残差连接
res = res + x
return res
TimesNet
- 可以看到
model
类,针对每个任务类型,建立函数,forecast
(预测),imputation
(插值),anomaly_detection
(异常检测),classification
(分类)。 - 详细内部处理请看代码注释
# 预测类函数
def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec):
# 非稳态Transfromer标准归一化
# 计算均值
means = x_enc.mean(1, keepdim=True).detach()
# 减去均值
x_enc = x_enc - means
# 计算标准差
stdev = torch.sqrt(
torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5)
# 除以标准差
x_enc /= stdev
# embedding
enc_out = self.enc_embedding(x_enc, x_mark_enc) # [B,T,C]
# [B,T,C] --> [B,C,T] --> [B,T,C]
enc_out = self.predict_linear(enc_out.permute(0, 2, 1)).permute(
0, 2, 1) # align temporal dimension
# TimesNet
for i in range(self.layer):
# layer_norm
enc_out = self.layer_norm(self.model[i](enc_out))
# 映射层
dec_out = self.projection(enc_out)
# 非稳态Transfromer反归一化
dec_out = dec_out * \
(stdev[:, 0, :].unsqueeze(1).repeat(
1, self.pred_len + self.seq_len, 1))
dec_out = dec_out + \
(means[:, 0, :].unsqueeze(1).repeat(
1, self.pred_len + self.seq_len, 1))
return dec_out
def imputation(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask):
# 非稳态Transfromer标准归一化
# 计算均值
means = torch.sum(x_enc, dim=1) / torch.sum(mask == 1, dim=1)
means = means.unsqueeze(1).detach()
# 减去均值
x_enc = x_enc - means
x_enc = x_enc.masked_fill(mask == 0, 0)
# 计算方差
stdev = torch.sqrt(torch.sum(x_enc * x_enc, dim=1) /
torch.sum(mask == 1, dim=1) + 1e-5)
stdev = stdev.unsqueeze(1).detach()
# 除以方差
x_enc /= stdev
# embedding
enc_out = self.enc_embedding(x_enc, x_mark_enc) # [B,T,C]
# TimesNet
for i in range(self.layer):
enc_out = self.layer_norm(self.model[i](enc_out))
# porject back
dec_out = self.projection(enc_out)
# 非稳态Transfromer反归一化
dec_out = dec_out * \
(stdev[:, 0, :].unsqueeze(1).repeat(
1, self.pred_len + self.seq_len, 1))
dec_out = dec_out + \
(means[:, 0, :].unsqueeze(1).repeat(
1, self.pred_len + self.seq_len, 1))
return dec_out
def anomaly_detection(self, x_enc):
# Normalization from Non-stationary Transformer
means = x_enc.mean(1, keepdim=True).detach()
x_enc = x_enc - means
stdev = torch.sqrt(
torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5)
x_enc /= stdev
# embedding
enc_out = self.enc_embedding(x_enc, None) # [B,T,C]
# TimesNet
for i in range(self.layer):
enc_out = self.layer_norm(self.model[i](enc_out))
# porject back
dec_out = self.projection(enc_out)
# De-Normalization from Non-stationary Transformer
dec_out = dec_out * \
(stdev[:, 0, :].unsqueeze(1).repeat(
1, self.pred_len + self.seq_len, 1))
dec_out = dec_out + \
(means[:, 0, :].unsqueeze(1).repeat(
1, self.pred_len + self.seq_len, 1))
return dec_out
def classification(self, x_enc, x_mark_enc):
# embedding
enc_out = self.enc_embedding(x_enc, None) # [B,T,C]
# TimesNet
for i in range(self.layer):
enc_out = self.layer_norm(self.model[i](enc_out))
# Output
# the output transformer encoder/decoder embeddings don't include non-linearity
output = self.act(enc_out)
output = self.dropout(output)
# zero-out padding embeddings
output = output * x_mark_enc.unsqueeze(-1)
# (batch_size, seq_length * d_model)
output = output.reshape(output.shape[0], -1)
output = self.projection(output) # (batch_size, num_classes)
return output
- 看到
model
类中__init__
方法和forward
函数
class Model(nn.Module):
def __init__(self, configs):
super(Model, self).__init__()
self.configs = configs
self.task_name = configs.task_name
self.seq_len = configs.seq_len
self.label_len = configs.label_len
self.pred_len = configs.pred_len
# TimesBlock堆叠
self.model = nn.ModuleList([TimesBlock(configs)
for _ in range(configs.e_layers)])
self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,
configs.dropout)
self.layer = configs.e_layers
self.layer_norm = nn.LayerNorm(configs.d_model)
# 若任务是长期预测或短期预测
if self.task_name == 'long_term_forecast' or self.task_name == 'short_term_forecast':
# 线性层输出pred_len
self.predict_linear = nn.Linear(
self.seq_len, self.pred_len + self.seq_len)
# 映射,线性层,输出c_out
self.projection = nn.Linear(
configs.d_model, configs.c_out, bias=True)
# 若任务为插值或异常检测
if self.task_name == 'imputation' or self.task_name == 'anomaly_detection':
# 映射,线性层,输出c_out
self.projection = nn.Linear(
configs.d_model, configs.c_out, bias=True)
# 若任务为分类
if self.task_name == 'classification':
# gelu激活函数
self.act = F.gelu
# dropout层
self.dropout = nn.Dropout(configs.dropout)
# 映射,线性层,输出num_class
self.projection = nn.Linear(
configs.d_model * configs.seq_len, configs.num_class)
def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):
if self.task_name == 'long_term_forecast' or self.task_name == 'short_term_forecast':
dec_out = self.forecast(x_enc, x_mark_enc, x_dec, x_mark_dec)
return dec_out[:, -self.pred_len:, :] # [B, L, D]
if self.task_name == 'imputation':
dec_out = self.imputation(
x_enc, x_mark_enc, x_dec, x_mark_dec, mask)
return dec_out # [B, L, D]
if self.task_name == 'anomaly_detection':
dec_out = self.anomaly_detection(x_enc)
return dec_out # [B, L, D]
if self.task_name == 'classification':
dec_out = self.classification(x_enc, x_mark_enc)
return dec_out # [B, N]
return None
Embed
- 在TimesNet中用到了
DataEmbedding
,其定义在layers
文件夹下Embed.py
文件中。其他所有informer
类模型embedding
操作也在这个文件夹中,这里正好做一个整体的解释。
class PositionalEmbedding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEmbedding, self).__init__()
# 在对数空间中计算位置编码
pe = torch.zeros(max_len, d_model).float()
pe.require_grad = False
position = torch.arange(0, max_len).float().unsqueeze(1)
div_term = (torch.arange(0, d_model, 2).float()
* -(math.log(10000.0) / d_model)).exp()
# 使用正弦和余弦函数计算位置编码
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
return self.pe[:, :x.size(1)]
class TokenEmbedding(nn.Module):
def __init__(self, c_in, d_model):
super(TokenEmbedding, self).__init__()
padding = 1 if torch.__version__ >= '1.5.0' else 2
# 通过一维卷积进行标记嵌入
self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,
kernel_size=3, padding=padding, padding_mode='circular', bias=False)
for m in self.modules():
if isinstance(m, nn.Conv1d):
nn.init.kaiming_normal_(
m.weight, mode='fan_in', nonlinearity='leaky_relu')
def forward(self, x):
x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)
return x
class FixedEmbedding(nn.Module):
def __init__(self, c_in, d_model):
super(FixedEmbedding, self).__init__()
w = torch.zeros(c_in, d_model).float()
w.require_grad = False
position = torch.arange(0, c_in).float().unsqueeze(1)
div_term = (torch.arange(0, d_model, 2).float()
* -(math.log(10000.0) / d_model)).exp()
# 使用正弦和余弦函数计算位置编码
w[:, 0::2] = torch.sin(position * div_term)
w[:, 1::2] = torch.cos(position * div_term)
# 使用固定嵌入层进行标记嵌入
self.emb = nn.Embedding(c_in, d_model)
self.emb.weight = nn.Parameter(w, requires_grad=False)
def forward(self, x):
return self.emb(x).detach()
class TemporalEmbedding(nn.Module):
def __init__(self, d_model, embed_type='fixed', freq='h'):
super(TemporalEmbedding, self).__init__()
minute_size = 4
hour_size = 24
weekday_size = 7
day_size = 32
month_size = 13
Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embedding
# 如果频率为't',使用固定嵌入层进行标记嵌入
if freq == 't':
self.minute_embed = Embed(minute_size, d_model)
# 使用固定嵌入层进行小时标记嵌入
self.hour_embed = Embed(hour_size, d_model)
# 使用固定嵌入层进行星期几标记嵌入
self.weekday_embed = Embed(weekday_size, d_model)
# 使用固定嵌入层进行日期标记嵌入
self.day_embed = Embed(day_size, d_model)
# 使用固定嵌入层进行月份标记嵌入
self.month_embed = Embed(month_size, d_model)
def forward(self, x):
x = x.long()
# 根据频率选择对应的嵌入进行计算
minute_x = self.minute_embed(x[:, :, 4]) if hasattr(
self, 'minute_embed') else 0.
hour_x = self.hour_embed(x[:, :, 3])
weekday_x = self.weekday_embed(x[:, :, 2])
day_x = self.day_embed(x[:, :, 1])
month_x = self.month_embed(x[:, :, 0])
# 将不同的嵌入进行相加得到最终的时间特征嵌入
return hour_x + weekday_x + day_x + month_x + minute_x
class TimeFeatureEmbedding(nn.Module):
def __init__(self, d_model, embed_type='timeF', freq='h'):
super(TimeFeatureEmbedding, self).__init__()
freq_map = {'h': 4, 't': 5, 's': 6,
'm': 1, 'a': 1, 'w': 2, 'd': 3, 'b': 3}
d_inp = freq_map[freq]
# 使用线性层进行时间特征嵌入
self.embed = nn.Linear(d_inp, d_model, bias=False)
def forward(self, x):
return self.embed(x)
class DataEmbedding(nn.Module):
def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
super(DataEmbedding, self).__init__()
self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
self.position_embedding = PositionalEmbedding(d_model=d_model)
self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
d_model=d_model, embed_type=embed_type, freq=freq)
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, x_mark):
if x_mark is None:
# 如果没有时间标记,则只使用数值嵌入和位置嵌入
x = self.value_embedding(x) + self.position_embedding(x)
else:
# 如果有时间标记,则使用数值嵌入、时间嵌入和位置嵌入
x = self.value_embedding(
x) + self.temporal_embedding(x_mark) + self.position_embedding(x)
return self.dropout(x)
class DataEmbedding_wo_pos(nn.Module):
def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
super(DataEmbedding_wo_pos, self).__init__()
# 值嵌入层,将输入的特征向量映射到d_model维度的向量空间
self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
# 位置嵌入层,用于对输入进行位置编码
self.position_embedding = PositionalEmbedding(d_model=d_model)
# 时间嵌入层,根据embed_type和freq对输入进行时间编码
# 如果embed_type不是'timeF',则使用TemporalEmbedding
# 否则使用TimeFeatureEmbedding
self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
d_model=d_model, embed_type=embed_type, freq=freq)
# Dropout层,用于在训练过程中进行随机失活
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, x_mark):
if x_mark is None:
# 如果x_mark为空,则仅应用值嵌入到x
x = self.value_embedding(x)
else:
# 否则,将值嵌入和时间嵌入相加
x = self.value_embedding(x) + self.temporal_embedding(x_mark)
return self.dropout(x)
class PatchEmbedding(nn.Module):
def __init__(self, d_model, patch_len, stride, padding, dropout):
super(PatchEmbedding, self).__init__()
# Patching
# patch_len是每个patch的长度
self.patch_len = patch_len
# stride是patch的步幅
self.stride = stride
# padding_patch_layer用于对输入进行补零操作
self.padding_patch_layer = nn.ReplicationPad1d((0, padding))
# Backbone, Input encoding: projection of feature vectors onto a d-dim vector space
# value_embedding将每个patch的特征向量映射到d_model维度的向量空间
self.value_embedding = nn.Linear(patch_len, d_model, bias=False)
# Positional embedding
# 位置嵌入层,用于对输入进行位置编码
self.position_embedding = PositionalEmbedding(d_model)
# Residual dropout
# Dropout层
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# 对输入进行patch操作
n_vars = x.shape[1]
# 对输入进行补零操作
x = self.padding_patch_layer(x)
# 滑动窗口分割输入为多个patch
x = x.unfold(dimension=-1, size=self.patch_len, step=self.stride)
# 重塑张量形状
x = torch.reshape(x, (x.shape[0] * x.shape[1], x.shape[2], x.shape[3]))
# 输入 encoding
x = self.value_embedding(x) + self.position_embedding(x)
return self.dropout(x), n_vars
模型训练
- 这里我主要注释一下
train
函数,valid
和test
函数都差不多,只是有些操作不需要删减了而已。
def train(self, setting):
# 取得训练、验证、测试数据及数据加载器
train_data, train_loader = self._get_data(flag='train')
vali_data, vali_loader = self._get_data(flag='val')
test_data, test_loader = self._get_data(flag='test')
path = os.path.join(self.args.checkpoints, setting)
# 创建模型保存路径
if not os.path.exists(path):
os.makedirs(path)
# 获取当前时间
time_now = time.time()
# 取训练步数
train_steps = len(train_loader)
# 设置早停参数
early_stopping = EarlyStopping(patience=self.args.patience, verbose=True)
# 选择优化器
model_optim = self._select_optimizer()
# 选择损失函数
criterion = self._select_criterion()
# 如果多GPU并行
if self.args.use_amp:
scaler = torch.cuda.amp.GradScaler()
# 训练次数
for epoch in range(self.args.train_epochs):
iter_count = 0
train_loss = []
self.model.train()
epoch_time = time.time()
for i, (batch_x, batch_y, batch_x_mark, batch_y_mark) in enumerate(train_loader):
iter_count += 1
# 梯度归零
model_optim.zero_grad()
# 取训练数据
batch_x = batch_x.float().to(self.device)
batch_y = batch_y.float().to(self.device)
batch_x_mark = batch_x_mark.float().to(self.device)
batch_y_mark = batch_y_mark.float().to(self.device)
# decoder输入
dec_inp = torch.zeros_like(batch_y[:, -self.args.pred_len:, :]).float()
dec_inp = torch.cat([batch_y[:, :self.args.label_len, :], dec_inp], dim=1).float().to(self.device)
# encoder - decoder
if self.args.use_amp:
with torch.cuda.amp.autocast():
if self.args.output_attention:
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
else:
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
f_dim = -1 if self.args.features == 'MS' else 0
outputs = outputs[:, -self.args.pred_len:, f_dim:]
batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device)
loss = criterion(outputs, batch_y)
train_loss.append(loss.item())
else:
if self.args.output_attention:
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
else:
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
# 如果预测方式为MS,取最后1列否则取第1列
f_dim = -1 if self.args.features == 'MS' else 0
outputs = outputs[:, -self.args.pred_len:, f_dim:]
batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device)
# 计算损失
loss = criterion(outputs, batch_y)
# 将损失放入train_loss列表中
train_loss.append(loss.item())
# 记录训练过程
if (i + 1) % 100 == 0:
print("\titers: {0}, epoch: {1} | loss: {2:.7f}".format(i + 1, epoch + 1, loss.item()))
speed = (time.time() - time_now) / iter_count
left_time = speed * ((self.args.train_epochs - epoch) * train_steps - i)
print('\tspeed: {:.4f}s/iter; left time: {:.4f}s'.format(speed, left_time))
iter_count = 0
time_now = time.time()
if self.args.use_amp:
scaler.scale(loss).backward()
scaler.step(model_optim)
scaler.update()
else:
# 反向传播
loss.backward()
# 更新梯度
model_optim.step()
print("Epoch: {} cost time: {}".format(epoch + 1, time.time() - epoch_time))
train_loss = np.average(train_loss)
vali_loss = self.vali(vali_data, vali_loader, criterion)
test_loss = self.vali(test_data, test_loader, criterion)
print("Epoch: {0}, Steps: {1} | Train Loss: {2:.7f} Vali Loss: {3:.7f} Test Loss: {4:.7f}".format(
epoch + 1, train_steps, train_loss, vali_loss, test_loss))
early_stopping(vali_loss, self.model, path)
if early_stopping.early_stop:
print("Early stopping")
break
# 更新学习率
adjust_learning_rate(model_optim, epoch + 1, self.args)
# 保存模型
best_model_path = path + '/' + 'checkpoint.pth'
self.model.load_state_dict(torch.load(best_model_path))
return self.model