TimesNet:用于一般时间序列分析的时间二维变化模型(代码解析)

news2025/1/10 14:15:36

前言

  • TimesNet:用于一般时间序列分析的时间二维变化模型论文下载地址,Github项目地址,论文解读系列
  • 本文针对TimesNet模型参数与模型架构开源代码进行讲解,本人水平有限,若出现解读错误,欢迎指出
  • 开源代码中分别实现长短期序列预测、插补、异常检测、分类任务,本文针对长短期序列预测为例进行讲解。
  • 后面会推出自定义项目,以及使用NNI框架对模型关键参数进行调节,也可能会推出模型压缩等相关文章,如果感兴趣请关注点赞收藏,谢谢

参数设定模块(run)

  • 首先打开run.py文件保证在不修改任何参数的情况下,代码可以跑通,这里windows系统需要将代码中--is_training--model_id--model--data参数中required=True选项删除,否则会报错。--num_workers参数需要置为0。
  • 其次需要在项目文件夹下新建子文件夹data用来存放训练数据,可以使用ETTh1数据,这里提供下载地址
  • 运行run.py训练完成不报错就成功了,建议在GPU上进行,CPU几乎没办法训练。

参数含义

  • 下面是各参数含义(注释)
parser = argparse.ArgumentParser(description='TimesNet')

    # basic config
    # 任务类型
    parser.add_argument('--task_name', type=str, default='long_term_forecast',
                        help='task name, options:[long_term_forecast, short_term_forecast, imputation, classification, anomaly_detection]')
    # 置为训练模式
    parser.add_argument('--is_training', type=int, default=1, help='status')
    # 模型名称
    parser.add_argument('--model_id', type=str, default='test', help='model id')
    # 选择模型
    parser.add_argument('--model', type=str, default='TimesNet',
                        help='model name, options: [Autoformer, Transformer, TimesNet]')

    # data loader
    # 数据名称
    parser.add_argument('--data', type=str, default='ETTh1', help='dataset type')
    # 数据所在文件夹
    parser.add_argument('--root_path', type=str, default='./data/', help='root path of the data file')
    # 数据文件全称
    parser.add_argument('--data_path', type=str, default='ETTh1.csv', help='data file')
    # 时间特征处理方式
    parser.add_argument('--features', type=str, default='M',
                        help='forecasting task, options:[M, S, MS]; M:multivariate predict multivariate, S:univariate predict univariate, MS:multivariate predict univariate')
    # 目标列列名
    parser.add_argument('--target', type=str, default='OT', help='target feature in S or MS task')
    # 时间采集粒度
    parser.add_argument('--freq', type=str, default='h',
                        help='freq for time features encoding, options:[s:secondly, t:minutely, h:hourly, d:daily, b:business days, w:weekly, m:monthly], you can also use more detailed freq like 15min or 3h')
    # 模型权重保存文件夹
    parser.add_argument('--checkpoints', type=str, default='./checkpoints/', help='location of model checkpoints')

    # forecasting task
    # 回视窗口
    parser.add_argument('--seq_len', type=int, default=96, help='input sequence length')
    # 先验序列长度
    parser.add_argument('--label_len', type=int, default=48, help='start token length')
    # 预测窗口长度
    parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
    # 季节模式(针对M4数据集)
    parser.add_argument('--seasonal_patterns', type=str, default='Monthly', help='subset for M4')

    # inputation task
    # 插补任务中数据丢失率
    parser.add_argument('--mask_rate', type=float, default=0.25, help='mask ratio')

    # anomaly detection task
    # 异常检测中异常点占比
    parser.add_argument('--anomaly_ratio', type=float, default=0.25, help='prior anomaly ratio (%)')

    # model define
    # TimesBlock中傅里叶变换,频率排名前k个周期
    parser.add_argument('--top_k', type=int, default=5, help='for TimesBlock')
    # Inception中卷积核个数
    parser.add_argument('--num_kernels', type=int, default=6, help='for Inception')
    # encoder输入特征数
    parser.add_argument('--enc_in', type=int, default=7, help='encoder input size')
    # decoder输入特征数
    parser.add_argument('--dec_in', type=int, default=7, help='decoder input size')
    # 输出通道数
    parser.add_argument('--c_out', type=int, default=7, help='output size')
    # 线性层隐含神经元个数
    parser.add_argument('--d_model', type=int, default=512, help='dimension of model')
    # 多头注意力机制
    parser.add_argument('--n_heads', type=int, default=8, help='num of heads')
    # encoder层数
    parser.add_argument('--e_layers', type=int, default=2, help='num of encoder layers')
    # decoder层数
    parser.add_argument('--d_layers', type=int, default=1, help='num of decoder layers')
    # FFN层隐含神经元个数
    parser.add_argument('--d_ff', type=int, default=2048, help='dimension of fcn')
    # 滑动窗口长度
    parser.add_argument('--moving_avg', type=int, default=25, help='window size of moving average')
    # 对Q进行采样,对Q采样的因子数
    parser.add_argument('--factor', type=int, default=1, help='attn factor')
    # 是否下采样操作pooling
    parser.add_argument('--distil', action='store_false',
                        help='whether to use distilling in encoder, using this argument means not using distilling',
                        default=True)
    # dropout率
    parser.add_argument('--dropout', type=float, default=0.1, help='dropout')
    # 时间特征嵌入方式
    parser.add_argument('--embed', type=str, default='timeF',
                        help='time features encoding, options:[timeF, fixed, learned]')
    # 激活函数类型
    parser.add_argument('--activation', type=str, default='gelu', help='activation')
    # 是否输出attention
    parser.add_argument('--output_attention', action='store_true', help='whether to output attention in ecoder')

    # optimization
    # 并行核心数
    parser.add_argument('--num_workers', type=int, default=0, help='data loader num workers')
    # 实验轮数
    parser.add_argument('--itr', type=int, default=1, help='experiments times')
    # 训练迭代次数
    parser.add_argument('--train_epochs', type=int, default=10, help='train epochs')
    # batch size大小
    parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')
    # early stopping机制容忍次数
    parser.add_argument('--patience', type=int, default=3, help='early stopping patience')
    # 学习率
    parser.add_argument('--learning_rate', type=float, default=0.0001, help='optimizer learning rate')
    parser.add_argument('--des', type=str, default='test', help='exp description')
    # 损失函数
    parser.add_argument('--loss', type=str, default='MSE', help='loss function')
    # 学习率下降策略
    parser.add_argument('--lradj', type=str, default='type1', help='adjust learning rate')
    # 使用混合精度训练
    parser.add_argument('--use_amp', action='store_true', help='use automatic mixed precision training', default=False)

    # GPU
    parser.add_argument('--use_gpu', type=bool, default=False, help='use gpu')
    parser.add_argument('--gpu', type=int, default=0, help='gpu')
    parser.add_argument('--use_multi_gpu', action='store_true', help='use multiple gpus', default=False)
    parser.add_argument('--devices', type=str, default='0,1,2,3', help='device ids of multile gpus')

    # de-stationary projector params
    parser.add_argument('--p_hidden_dims', type=int, nargs='+', default=[128, 128],
                        help='hidden layer dimensions of projector (List)')
    parser.add_argument('--p_hidden_layers', type=int, default=2, help='number of hidden layers in projector')

我们在exp.train(setting)行打上断点跳到训练主函数exp_long_term_forecasting.py

数据处理模块

_get_data中找到数据处理函数data_factory.py点击进入,可以看到各标准数据集处理方法:

data_dict = {
    'ETTh1': Dataset_ETT_hour,
    'ETTh2': Dataset_ETT_hour,
    'ETTm1': Dataset_ETT_minute,
    'ETTm2': Dataset_ETT_minute,
    'custom': Dataset_Custom,
    'm4': Dataset_M4,
    'PSM': PSMSegLoader,
    'MSL': MSLSegLoader,
    'SMAP': SMAPSegLoader,
    'SMD': SMDSegLoader,
    'SWAT': SWATSegLoader,
    'UEA': UEAloader
}
  • 由于我们的数据集是ETTh1,那么数据处理的方式为Dataset_ETT_hour,我们进入data_loader.py文件,找到Dataset_ETT_hour
  • __init__主要用于传各类参数,这里不过多赘述,主要对__read_data__进行说明
     def __read_data__(self):
        # 数据标准化实例
        self.scaler = StandardScaler()
        # 读取数据
        df_raw = pd.read_csv(os.path.join(self.root_path,
                                          self.data_path))
        # 计算数据起始点
        border1s = [0, 12 * 30 * 24 - self.seq_len, 12 * 30 * 24 + 4 * 30 * 24 - self.seq_len]
        border2s = [12 * 30 * 24, 12 * 30 * 24 + 4 * 30 * 24, 12 * 30 * 24 + 8 * 30 * 24]
        border1 = border1s[self.set_type]
        border2 = border2s[self.set_type]

        # 如果预测对象为多变量预测或多元预测单变量
        if self.features == 'M' or self.features == 'MS':
            # 取除日期列的其他所有列
            cols_data = df_raw.columns[1:]
            df_data = df_raw[cols_data]
        # 若预测类型为S(单特征预测单特征)
        elif self.features == 'S':
            # 取特征列
            df_data = df_raw[[self.target]]

        # 将数据进行归一化
        if self.scale:
            train_data = df_data[border1s[0]:border2s[0]]
            self.scaler.fit(train_data.values)
            data = self.scaler.transform(df_data.values)
        else:
            data = df_data.values
        # 取日期列
        df_stamp = df_raw[['date']][border1:border2]
        # 利用pandas将数据转换为日期格式
        df_stamp['date'] = pd.to_datetime(df_stamp.date)
        # 构建时间特征
        if self.timeenc == 0:
            df_stamp['month'] = df_stamp.date.apply(lambda row: row.month, 1)
            df_stamp['day'] = df_stamp.date.apply(lambda row: row.day, 1)
            df_stamp['weekday'] = df_stamp.date.apply(lambda row: row.weekday(), 1)
            df_stamp['hour'] = df_stamp.date.apply(lambda row: row.hour, 1)
            data_stamp = df_stamp.drop(['date'], 1).values
        elif self.timeenc == 1:
            # 时间特征构造函数
            data_stamp = time_features(pd.to_datetime(df_stamp['date'].values), freq=self.freq)
            # 转置
            data_stamp = data_stamp.transpose(1, 0)
        
        # 取数据特征列
        self.data_x = data[border1:border2]
        self.data_y = data[border1:border2]
        self.data_stamp = data_stamp
  • 需要注意的是time_features函数,用来提取日期特征,比如't':['month','day','weekday','hour','minute'],表示提取月,天,周,小时,分钟。可以打开timefeatures.py文件进行查阅,同样后期也可以加一些日期编码进去。
  • 同样的,对__getitem__进行说明
    def __getitem__(self, index):
        # 随机取得标签
        s_begin = index

        # 训练区间
        s_end = s_begin + self.seq_len
        # 有标签区间+无标签区间(预测时间步长)
        r_begin = s_end - self.label_len
        r_end = r_begin + self.label_len + self.pred_len

        # 取训练数据
        seq_x = self.data_x[s_begin:s_end]
        seq_y = self.data_y[r_begin:r_end]
        # 取训练数据对应时间特征
        seq_x_mark = self.data_stamp[s_begin:s_end]
        # 取有标签区间+无标签区间(预测时间步长)对应时间特征
        seq_y_mark = self.data_stamp[r_begin:r_end]

        return seq_x, seq_y, seq_x_mark, seq_y_mark
  • 关于这部分数据处理可能有些绕,开源看我在SCInet代码讲解中数据处理那一部分,绘制了数据集划分图。
    请添加图片描述

网络架构

  • 打开models文件夹下的TimesNet.py,可以看到TimesBlockModel

TimesBlock

  • 可以看到一维离散傅里叶变换FFT_for_Period函数逐行代码解析请看注释,示意图如下:
    在这里插入图片描述
def FFT_for_Period(x, k=2):
    # [B, T, C]
    # 一维离散傅里叶变换,沿T维度[B, T, C] --> [B, T//2+1, C]
    xf = torch.fft.rfft(x, dim=1)
    # 通过增幅寻找周期
    # 在每个频率上的平均值,然后对所有频率取平均[B, T//2+1, C] --> [T//2+1]
    frequency_list = abs(xf).mean(0).mean(-1)
    # 将第一个频率值设置为0(直流分量)
    frequency_list[0] = 0
    # 在频率列表中找到前K个最大值的索引
    _, top_list = torch.topk(frequency_list, k)
    # 将top_list张量转换为numpy数组
    top_list = top_list.detach().cpu().numpy()
    # 通过将序列的总长度除以每个顶部频率,计算周期
    period = x.shape[1] // top_list
    # 返回period,计算选定的顶部频率在最后一个维度C上的平均幅度[B, T//2+1, C] --> [B, k]
    return period, abs(xf).mean(-1)[:, top_list]
  • 转到TimesBlock类,论文中提到的parameter-efficient块是一个类似于Inception视觉模型的架构。
    在这里插入图片描述
  • Inception模型采用多分支结构,它将1×1卷积、3×3卷积最大池化堆叠在一起。这种结构既可以增加网络的宽度,又可以增强网络对不同尺寸的适应性。
  • 打开layers文件夹下的Conv_Blocks.py文件。里面实现了两个类Inception_Block_V1Inception_Block_V2。作者在TimesBlock中只使用了V1版本。
  • 代码逐行解析看注释,重点看各卷积层卷积核的变化和padding操作。
class Inception_Block_V1(nn.Module):
    def __init__(self, in_channels, out_channels, num_kernels=6, init_weight=True):
        super(Inception_Block_V1, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.num_kernels = num_kernels
        kernels = []
        for i in range(self.num_kernels):
            # 二维卷积层,卷积核(1, 3, 5, 7,...)
            kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=2 * i + 1, padding=i))
        self.kernels = nn.ModuleList(kernels)
        if init_weight:
            self._initialize_weights()

    # 初始化权重
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

    def forward(self, x):
        # 新建列表
        res_list = []
        # 遍历每一层2维卷积
        for i in range(self.num_kernels):
            # 将每一层的结果添加到列表中
            res_list.append(self.kernels[i](x))
        # 将列表沿最后一个维度拼接, 然后在最后一个维度求平均
        res = torch.stack(res_list, dim=-1).mean(-1)
        return res


class Inception_Block_V2(nn.Module):
    def __init__(self, in_channels, out_channels, num_kernels=6, init_weight=True):
        super(Inception_Block_V2, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.num_kernels = num_kernels
        kernels = []
        for i in range(self.num_kernels // 2):
            # 二维卷积,卷积核为[(1,3), (1,5), (1,7)]
            kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=[1, 2 * i + 3], padding=[0, i + 1]))
            # 二维卷积,卷积核为[(3,1), (5,1), (7,1)]
            kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=[2 * i + 3, 1], padding=[i + 1, 0]))
        # 再添加一个2维卷积,卷积核为(1,1)
        kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=1))
        self.kernels = nn.ModuleList(kernels)
        if init_weight:
            self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

    def forward(self, x):
        res_list = []
        for i in range(self.num_kernels + 1):
            # 得到每层的输出
            res_list.append(self.kernels[i](x))
        # 将列表沿最后一个维度拼接, 然后在最后一个维度求平均
        res = torch.stack(res_list, dim=-1).mean(-1)
        return res
  • 回到TimesBlock类,再经过一维离散傅里叶变换后,取得频率排名前k的周期,将其进行padding使其符合卷积操作数据维度。将得到的结果进行堆叠拼接。然后把频率权重进行自适应聚合,最后加入残差连接得到输出。具体看代码解析
class TimesBlock(nn.Module):
    def __init__(self, configs):
        super(TimesBlock, self).__init__()
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.k = configs.top_k
        # parameter-efficient块
        self.conv = nn.Sequential(
            Inception_Block_V1(configs.d_model, configs.d_ff,
                               num_kernels=configs.num_kernels),
            nn.GELU(),
            Inception_Block_V1(configs.d_ff, configs.d_model,
                               num_kernels=configs.num_kernels)
        )

    def forward(self, x):
        # 得到B, T, N
        B, T, N = x.size()
        # 得到周期以及对应权重
        period_list, period_weight = FFT_for_Period(x, self.k)

        res = []
        for i in range(self.k):
            # 取周期
            period = period_list[i]
            # padding操作
            if (self.seq_len + self.pred_len) % period != 0:
                length = (((self.seq_len + self.pred_len) // period) + 1) * period
                # padding:[batch,length - seq_len - pred_len,feature]
                padding = torch.zeros([x.shape[0], (length - (self.seq_len + self.pred_len)), x.shape[2]]).to(x.device)
                # 沿len维度拼接
                out = torch.cat([x, padding], dim=1)
            # 不需要padding
            else:
                length = (self.seq_len + self.pred_len)
                out = x
            # 重塑[batch,length // period,period,feature] --> [batch,feature,length // period,period](深拷贝)
            out = out.reshape(B, length // period, period,
                              N).permute(0, 3, 1, 2).contiguous()
            # 进入卷积网络
            out = self.conv(out)
            # 重塑结果(batch, -1, feature)
            out = out.permute(0, 2, 3, 1).reshape(B, -1, N)
            res.append(out[:, :(self.seq_len + self.pred_len), :])
        # 将结果沿feature维度拼接
        res = torch.stack(res, dim=-1)
        # 自适应聚合
        period_weight = F.softmax(period_weight, dim=1)
        # 添加两个维度,重复使其与res维度一致
        period_weight = period_weight.unsqueeze(
            1).unsqueeze(1).repeat(1, T, N, 1)
        # 沿feature维度求和
        res = torch.sum(res * period_weight, -1)
        # 残差连接
        res = res + x
        return res

TimesNet

  • 可以看到model类,针对每个任务类型,建立函数,forecast(预测),imputation(插值),anomaly_detection(异常检测),classification(分类)。
  • 详细内部处理请看代码注释
# 预测类函数
    def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec):
        # 非稳态Transfromer标准归一化
        # 计算均值
        means = x_enc.mean(1, keepdim=True).detach()
        # 减去均值
        x_enc = x_enc - means
        # 计算标准差
        stdev = torch.sqrt(
            torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5)
        # 除以标准差
        x_enc /= stdev

        # embedding
        enc_out = self.enc_embedding(x_enc, x_mark_enc)  # [B,T,C]
        # [B,T,C] --> [B,C,T] --> [B,T,C]
        enc_out = self.predict_linear(enc_out.permute(0, 2, 1)).permute(
            0, 2, 1)  # align temporal dimension

        # TimesNet
        for i in range(self.layer):
            # layer_norm
            enc_out = self.layer_norm(self.model[i](enc_out))
        # 映射层
        dec_out = self.projection(enc_out)

        # 非稳态Transfromer反归一化
        dec_out = dec_out * \
                  (stdev[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        dec_out = dec_out + \
                  (means[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        return dec_out

    def imputation(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask):
        # 非稳态Transfromer标准归一化
        # 计算均值
        means = torch.sum(x_enc, dim=1) / torch.sum(mask == 1, dim=1)
        means = means.unsqueeze(1).detach()
        # 减去均值
        x_enc = x_enc - means
        x_enc = x_enc.masked_fill(mask == 0, 0)
        # 计算方差
        stdev = torch.sqrt(torch.sum(x_enc * x_enc, dim=1) /
                           torch.sum(mask == 1, dim=1) + 1e-5)
        stdev = stdev.unsqueeze(1).detach()
        # 除以方差
        x_enc /= stdev

        # embedding
        enc_out = self.enc_embedding(x_enc, x_mark_enc)  # [B,T,C]
        # TimesNet
        for i in range(self.layer):
            enc_out = self.layer_norm(self.model[i](enc_out))
        # porject back
        dec_out = self.projection(enc_out)

        # 非稳态Transfromer反归一化
        dec_out = dec_out * \
                  (stdev[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        dec_out = dec_out + \
                  (means[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        return dec_out

    def anomaly_detection(self, x_enc):
        # Normalization from Non-stationary Transformer
        means = x_enc.mean(1, keepdim=True).detach()
        x_enc = x_enc - means
        stdev = torch.sqrt(
            torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5)
        x_enc /= stdev

        # embedding
        enc_out = self.enc_embedding(x_enc, None)  # [B,T,C]
        # TimesNet
        for i in range(self.layer):
            enc_out = self.layer_norm(self.model[i](enc_out))
        # porject back
        dec_out = self.projection(enc_out)

        # De-Normalization from Non-stationary Transformer
        dec_out = dec_out * \
                  (stdev[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        dec_out = dec_out + \
                  (means[:, 0, :].unsqueeze(1).repeat(
                      1, self.pred_len + self.seq_len, 1))
        return dec_out

    def classification(self, x_enc, x_mark_enc):
        # embedding
        enc_out = self.enc_embedding(x_enc, None)  # [B,T,C]
        # TimesNet
        for i in range(self.layer):
            enc_out = self.layer_norm(self.model[i](enc_out))

        # Output
        # the output transformer encoder/decoder embeddings don't include non-linearity
        output = self.act(enc_out)
        output = self.dropout(output)
        # zero-out padding embeddings
        output = output * x_mark_enc.unsqueeze(-1)
        # (batch_size, seq_length * d_model)
        output = output.reshape(output.shape[0], -1)
        output = self.projection(output)  # (batch_size, num_classes)
        return output
  • 看到model类中__init__方法和forward函数
class Model(nn.Module):

    def __init__(self, configs):
        super(Model, self).__init__()
        self.configs = configs
        self.task_name = configs.task_name
        self.seq_len = configs.seq_len
        self.label_len = configs.label_len
        self.pred_len = configs.pred_len
        # TimesBlock堆叠
        self.model = nn.ModuleList([TimesBlock(configs)
                                    for _ in range(configs.e_layers)])
        self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,
                                           configs.dropout)
        self.layer = configs.e_layers
        self.layer_norm = nn.LayerNorm(configs.d_model)
        # 若任务是长期预测或短期预测
        if self.task_name == 'long_term_forecast' or self.task_name == 'short_term_forecast':
            # 线性层输出pred_len
            self.predict_linear = nn.Linear(
                self.seq_len, self.pred_len + self.seq_len)
            # 映射,线性层,输出c_out
            self.projection = nn.Linear(
                configs.d_model, configs.c_out, bias=True)
        # 若任务为插值或异常检测
        if self.task_name == 'imputation' or self.task_name == 'anomaly_detection':
            # 映射,线性层,输出c_out
            self.projection = nn.Linear(
                configs.d_model, configs.c_out, bias=True)
        # 若任务为分类
        if self.task_name == 'classification':
            # gelu激活函数
            self.act = F.gelu
            # dropout层
            self.dropout = nn.Dropout(configs.dropout)
            # 映射,线性层,输出num_class
            self.projection = nn.Linear(
                configs.d_model * configs.seq_len, configs.num_class)
    def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):
        if self.task_name == 'long_term_forecast' or self.task_name == 'short_term_forecast':
            dec_out = self.forecast(x_enc, x_mark_enc, x_dec, x_mark_dec)
            return dec_out[:, -self.pred_len:, :]  # [B, L, D]
        if self.task_name == 'imputation':
            dec_out = self.imputation(
                x_enc, x_mark_enc, x_dec, x_mark_dec, mask)
            return dec_out  # [B, L, D]
        if self.task_name == 'anomaly_detection':
            dec_out = self.anomaly_detection(x_enc)
            return dec_out  # [B, L, D]
        if self.task_name == 'classification':
            dec_out = self.classification(x_enc, x_mark_enc)
            return dec_out  # [B, N]
        return None

Embed

  • 在TimesNet中用到了DataEmbedding,其定义在layers文件夹下Embed.py文件中。其他所有informer类模型embedding操作也在这个文件夹中,这里正好做一个整体的解释。
class PositionalEmbedding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEmbedding, self).__init__()
        # 在对数空间中计算位置编码
        pe = torch.zeros(max_len, d_model).float()
        pe.require_grad = False

        position = torch.arange(0, max_len).float().unsqueeze(1)
        div_term = (torch.arange(0, d_model, 2).float()
                    * -(math.log(10000.0) / d_model)).exp()

        # 使用正弦和余弦函数计算位置编码
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return self.pe[:, :x.size(1)]


class TokenEmbedding(nn.Module):
    def __init__(self, c_in, d_model):
        super(TokenEmbedding, self).__init__()
        padding = 1 if torch.__version__ >= '1.5.0' else 2
        # 通过一维卷积进行标记嵌入
        self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,
                                   kernel_size=3, padding=padding, padding_mode='circular', bias=False)
        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(
                    m.weight, mode='fan_in', nonlinearity='leaky_relu')

    def forward(self, x):
        x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)
        return x


class FixedEmbedding(nn.Module):
    def __init__(self, c_in, d_model):
        super(FixedEmbedding, self).__init__()

        w = torch.zeros(c_in, d_model).float()
        w.require_grad = False

        position = torch.arange(0, c_in).float().unsqueeze(1)
        div_term = (torch.arange(0, d_model, 2).float()
                    * -(math.log(10000.0) / d_model)).exp()

        # 使用正弦和余弦函数计算位置编码
        w[:, 0::2] = torch.sin(position * div_term)
        w[:, 1::2] = torch.cos(position * div_term)
        # 使用固定嵌入层进行标记嵌入
        self.emb = nn.Embedding(c_in, d_model)
        self.emb.weight = nn.Parameter(w, requires_grad=False)

    def forward(self, x):
        return self.emb(x).detach()


class TemporalEmbedding(nn.Module):
    def __init__(self, d_model, embed_type='fixed', freq='h'):
        super(TemporalEmbedding, self).__init__()

        minute_size = 4
        hour_size = 24
        weekday_size = 7
        day_size = 32
        month_size = 13

        Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embedding
        # 如果频率为't',使用固定嵌入层进行标记嵌入
        if freq == 't':
            self.minute_embed = Embed(minute_size, d_model)
        # 使用固定嵌入层进行小时标记嵌入
        self.hour_embed = Embed(hour_size, d_model)
        # 使用固定嵌入层进行星期几标记嵌入
        self.weekday_embed = Embed(weekday_size, d_model)
        # 使用固定嵌入层进行日期标记嵌入
        self.day_embed = Embed(day_size, d_model)
        # 使用固定嵌入层进行月份标记嵌入
        self.month_embed = Embed(month_size, d_model)

    def forward(self, x):
        x = x.long()
        # 根据频率选择对应的嵌入进行计算
        minute_x = self.minute_embed(x[:, :, 4]) if hasattr(
            self, 'minute_embed') else 0.
        hour_x = self.hour_embed(x[:, :, 3])
        weekday_x = self.weekday_embed(x[:, :, 2])
        day_x = self.day_embed(x[:, :, 1])
        month_x = self.month_embed(x[:, :, 0])

        # 将不同的嵌入进行相加得到最终的时间特征嵌入
        return hour_x + weekday_x + day_x + month_x + minute_x


class TimeFeatureEmbedding(nn.Module):
    def __init__(self, d_model, embed_type='timeF', freq='h'):
        super(TimeFeatureEmbedding, self).__init__()

        freq_map = {'h': 4, 't': 5, 's': 6,
                    'm': 1, 'a': 1, 'w': 2, 'd': 3, 'b': 3}
        d_inp = freq_map[freq]
        # 使用线性层进行时间特征嵌入
        self.embed = nn.Linear(d_inp, d_model, bias=False)

    def forward(self, x):
        return self.embed(x)


class DataEmbedding(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        if x_mark is None:
            # 如果没有时间标记,则只使用数值嵌入和位置嵌入
            x = self.value_embedding(x) + self.position_embedding(x)
        else:
            # 如果有时间标记,则使用数值嵌入、时间嵌入和位置嵌入
            x = self.value_embedding(
                x) + self.temporal_embedding(x_mark) + self.position_embedding(x)
        return self.dropout(x)


class DataEmbedding_wo_pos(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding_wo_pos, self).__init__()

        # 值嵌入层,将输入的特征向量映射到d_model维度的向量空间
        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        # 位置嵌入层,用于对输入进行位置编码
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        # 时间嵌入层,根据embed_type和freq对输入进行时间编码
        # 如果embed_type不是'timeF',则使用TemporalEmbedding
        # 否则使用TimeFeatureEmbedding
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        # Dropout层,用于在训练过程中进行随机失活
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        if x_mark is None:
            # 如果x_mark为空,则仅应用值嵌入到x
            x = self.value_embedding(x)
        else:
            # 否则,将值嵌入和时间嵌入相加
            x = self.value_embedding(x) + self.temporal_embedding(x_mark)
        return self.dropout(x)


class PatchEmbedding(nn.Module):
    def __init__(self, d_model, patch_len, stride, padding, dropout):
        super(PatchEmbedding, self).__init__()
        # Patching
        # patch_len是每个patch的长度
        self.patch_len = patch_len
        # stride是patch的步幅
        self.stride = stride
        # padding_patch_layer用于对输入进行补零操作
        self.padding_patch_layer = nn.ReplicationPad1d((0, padding))

        # Backbone, Input encoding: projection of feature vectors onto a d-dim vector space
        # value_embedding将每个patch的特征向量映射到d_model维度的向量空间
        self.value_embedding = nn.Linear(patch_len, d_model, bias=False)

        # Positional embedding
        # 位置嵌入层,用于对输入进行位置编码
        self.position_embedding = PositionalEmbedding(d_model)

        # Residual dropout
        # Dropout层
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # 对输入进行patch操作
        n_vars = x.shape[1]
        # 对输入进行补零操作
        x = self.padding_patch_layer(x)
        # 滑动窗口分割输入为多个patch
        x = x.unfold(dimension=-1, size=self.patch_len, step=self.stride)
        # 重塑张量形状
        x = torch.reshape(x, (x.shape[0] * x.shape[1], x.shape[2], x.shape[3]))
        # 输入 encoding
        x = self.value_embedding(x) + self.position_embedding(x)
        return self.dropout(x), n_vars

模型训练

  • 这里我主要注释一下train函数,validtest函数都差不多,只是有些操作不需要删减了而已。
    def train(self, setting):
        # 取得训练、验证、测试数据及数据加载器
        train_data, train_loader = self._get_data(flag='train')
        vali_data, vali_loader = self._get_data(flag='val')
        test_data, test_loader = self._get_data(flag='test')

        path = os.path.join(self.args.checkpoints, setting)
        # 创建模型保存路径
        if not os.path.exists(path):
            os.makedirs(path)

        # 获取当前时间
        time_now = time.time()

        # 取训练步数
        train_steps = len(train_loader)
        # 设置早停参数
        early_stopping = EarlyStopping(patience=self.args.patience, verbose=True)

        # 选择优化器
        model_optim = self._select_optimizer()
        # 选择损失函数
        criterion = self._select_criterion()

        # 如果多GPU并行
        if self.args.use_amp:
            scaler = torch.cuda.amp.GradScaler()

        # 训练次数
        for epoch in range(self.args.train_epochs):
            iter_count = 0
            train_loss = []

            self.model.train()
            epoch_time = time.time()
            for i, (batch_x, batch_y, batch_x_mark, batch_y_mark) in enumerate(train_loader):
                iter_count += 1
                # 梯度归零
                model_optim.zero_grad()

                # 取训练数据
                batch_x = batch_x.float().to(self.device)

                batch_y = batch_y.float().to(self.device)
                batch_x_mark = batch_x_mark.float().to(self.device)
                batch_y_mark = batch_y_mark.float().to(self.device)

                # decoder输入
                dec_inp = torch.zeros_like(batch_y[:, -self.args.pred_len:, :]).float()
                dec_inp = torch.cat([batch_y[:, :self.args.label_len, :], dec_inp], dim=1).float().to(self.device)

                # encoder - decoder
                if self.args.use_amp:
                    with torch.cuda.amp.autocast():
                        if self.args.output_attention:
                            outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
                        else:
                            outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)

                        f_dim = -1 if self.args.features == 'MS' else 0
                        outputs = outputs[:, -self.args.pred_len:, f_dim:]
                        batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device)
                        loss = criterion(outputs, batch_y)
                        train_loss.append(loss.item())
                else:
                    if self.args.output_attention:
                        outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
                    else:
                        outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)

                    # 如果预测方式为MS,取最后1列否则取第1列
                    f_dim = -1 if self.args.features == 'MS' else 0
                    outputs = outputs[:, -self.args.pred_len:, f_dim:]
                    batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device)
                    # 计算损失
                    loss = criterion(outputs, batch_y)
                    # 将损失放入train_loss列表中
                    train_loss.append(loss.item())

                # 记录训练过程
                if (i + 1) % 100 == 0:
                    print("\titers: {0}, epoch: {1} | loss: {2:.7f}".format(i + 1, epoch + 1, loss.item()))
                    speed = (time.time() - time_now) / iter_count
                    left_time = speed * ((self.args.train_epochs - epoch) * train_steps - i)
                    print('\tspeed: {:.4f}s/iter; left time: {:.4f}s'.format(speed, left_time))
                    iter_count = 0
                    time_now = time.time()

                if self.args.use_amp:
                    scaler.scale(loss).backward()
                    scaler.step(model_optim)
                    scaler.update()
                else:
                    # 反向传播
                    loss.backward()
                    # 更新梯度
                    model_optim.step()

            print("Epoch: {} cost time: {}".format(epoch + 1, time.time() - epoch_time))
            train_loss = np.average(train_loss)
            vali_loss = self.vali(vali_data, vali_loader, criterion)
            test_loss = self.vali(test_data, test_loader, criterion)

            print("Epoch: {0}, Steps: {1} | Train Loss: {2:.7f} Vali Loss: {3:.7f} Test Loss: {4:.7f}".format(
                epoch + 1, train_steps, train_loss, vali_loss, test_loss))
            early_stopping(vali_loss, self.model, path)
            if early_stopping.early_stop:
                print("Early stopping")
                break

            # 更新学习率
            adjust_learning_rate(model_optim, epoch + 1, self.args)
        # 保存模型
        best_model_path = path + '/' + 'checkpoint.pth'
        self.model.load_state_dict(torch.load(best_model_path))

        return self.model

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/548176.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

回归分析-简单线性回归推导

回归分析-简单线性回归模型 在大数据分析中,回归分析是一种预测性的建模技术,它研究的是因变量(目标)和自变量(预测器)之间的关系。这种技术通常用于预测分析,时间序列模型以及发现变量之间的因…

浏览器免费安装ChatGPT插件与国内免费ChatGPT分享

文章目录 一、什么是ChatGPT?ChatGPT的功能: 二、如果在Edge上安装chatgpt插件三、国内免费ChatGPT四、ChatGPT程序员的影响五、Chatgp能取代程序员吗? 一、什么是ChatGPT? ChatGPT是一种基于自然语言处理的机器学习算法&#xf…

【牛客刷题】 选择题整理day3~day4

在知识的海洋里彻底疯狂!!! 文章目录 1. try-catch-finally2. 方法调用3. 接口4. 1. try-catch-finally try-catch-finally的执行机制是 try块是不能被省略的,用于包含可能会抛出异常的代码,如果没有try块&#xff0c…

Linux之DNAT策略及应用与tcpdump抓包

目录 一、DNAT的介绍 二、DNAT实验设计 三、DNAT具体实验步骤操作 第一步:配置好网卡与环境 第二步:web服务器安装httpd服务,且开启httpd服务 第三步:对网关服务器进行操作 1.设置路由转发 2.设置SNAT 四、tcpdump抓包工具…

yooasset+hybridclr在android,ios端热更新测试

Hybridclr+YooAsset+Unity Run android+iOS 这个工程是用来学习YooAsset和Hybridcl,来做unity资源和代码热更新, 实现了android 和ios 双端,跑通。 源码在文章最后。 版本 很新的版本 用到的YooAsset 1.4.13 用到的HybridCLR 2.4.2 yooAsset git网址 https://github.co…

CVTE C++软开全程面试(一面、二面、群面、HR面)

一面,面了一个钟,问了很多问题,大部分是计算机的基础知识,我也只能记录下一部分。 C的继承问题,protected成员被public、protected和private继承的情况。 下面是关于protected成员在不同类型继承中的访问权限&#xff…

声音合成——Foley Sound——DECASE项目——多模态智能感知与应用——论文翻译

文章目录 概述论文翻译CONDITIONAL SOUND GENERATION USING NEURAL DISCRETE TIME-FREQUENCY REPRESENTATION LEARNINGAbstractSampleRNN是啥? Introduction个人总结(省流)补充个人感想 Approach2.1 Discrete time-frequency省流总结2.1.1 Mu…

http2

HTTP: HTTP/2 - High Performance Browser Networking (OReilly) 以下内容都是上面这篇文章的一些总结(或者说翻译hiahia) http2是由谷歌的SPDY之上演变而来的。主要涉及的技术包括: 头部压缩,多路复用,请求优先级 …

使用Docker Dockerfile构建php LNMP集成开发环境,并运行Thinkphp5

宿主机环境 系统:MAC、Windows10 Docker版本:Docker version 23.0.5 Docker Desktop:Dockerdesktop官方地址 前言 这篇主要介绍如何在Mac、Windows10使用docker搭建LNMP集成开发环境。下面我会写Dockerfile编译安装Nginxphp基础环境。mysql、redis基…

多维时序 | MATLAB实现基于贝叶斯线性回归(Bayesian Regression)的多变量时间序列预测

多维时序 | MATLAB实现基于贝叶斯线性回归(Bayesian Regression)的多变量输入回归预测 目录 多维时序 | MATLAB实现基于贝叶斯线性回归(Bayesian Regression)的多变量输入回归预测预测效果基本介绍模型描述程序设计参考资料预测效果 基本介绍 多维时序 | MATLAB实现基于贝叶斯线…

Rocky Linux 8.8 发布 - CentOS 的权威替代

Rocky Linux 8.8 发布 - CentOS 的权威替代 Rocky Linux 由 CentOS 项目的创始人 Gregory Kurtzer 领导 请访问原文链接:https://sysin.org/blog/rocky-linux-8/,查看最新版。原创作品,转载请保留出处。 作者主页:sysin.org 以…

Linux-Shell编程

一,shell编程的概念 1.0Shell与内核的关系 内核是Linux系统的核心,它是操作系统的最底层部分,负责管理计算机的硬件资源,例如CPU、内存、磁盘等。内核还提供了许多系统调用,供应用程序使用,例如打开文件、…

数字员工IN淄博:淄博烧烤火出圈,政务服务很圈粉!

“小饼烤炉加蘸料,灵魂烧烤三件套”,淄博烧烤近期在各大社交媒体平台火爆出圈,不少人慕名前往亲身体验人间烟火气,让这座传统工业城市再度名声鹊起,焕活了淄博文旅市场的发展潜能。淄博人民的淳朴热情造就了这一次淄博…

STM32F401RET6 LQFP64 (Nucleo-F401RE) SPI通信(主从双机SPI通信)

STM32F401RET6 LQFP64 (Nucleo-F401RE) SPI通信(主从双机SPI通信) 1.1 SPI总线介绍 SPI 通讯使用 3 条总线及片选线,3 条总线分别为 SCK、MOSI、MISO,片选线为NSS(CS) NSS 信号线由高变低 ,是 SPI 通讯的起始信号 。…

Redis系列--redis集群

一、redis集群介绍 一、简介与注意事项 由于数据量过大,当单个master挂了再slave进行选举时,会有一定时间内无法进行写操作,会出现数据的丢失。也就数说单个master复制集难以承担,因此需要对多个复制集进行集群,形成水…

AcWing 243. 一个简单的整数问题2

题目描述 题目链接:AcWing 243. 一个简单的整数问题2 给定一个长度为 N 的数列 A,以及 M 条指令,每条指令可能是以下两种之一: C l r d,表示把 A[l],A[l1],…,A[r] 都加上 d。 Q l r,表示询问数列中第 l∼…

【5.20】五、安全测试——安全测试工具

目录 5.4 常见的安全测试工具 1. Web漏洞扫描工具——AppScan 2. 端口扫描工具——Nmap 3. 抓包工具——Fiddler 4. Web渗透测试工具——Metasploit 小提示:Kali Linux 5.4 常见的安全测试工具 安全测试是一个非常复杂的过程,测试所使用到的工具也…

自学网络安全/Web安全,一般人我还是劝你算了吧

由于我之前写了不少网络安全技术相关的文章,不少读者朋友知道我是从事网络安全相关的工作,于是经常有人私信问我: 我刚入门网络安全,该怎么学? 要学哪些东西? 有哪些方向? 怎么选&a…

【leetcode】1373. 二叉搜索子树的最大键值和

二叉搜索子树的最大键值和 问题描述问题简单分析提交之旅第一次提交-失败第二次提交-失败第三次提交-成功 问题描述 二叉搜索子树的最大键值和 给你一棵以 root 为根的 二叉树 ,请你返回 任意 二叉搜索子树的最大键值和。 二叉搜索树的定义如下: 任意节…

JVM学习(六)

1. JAVA 集合 1.1. 接口继承关系和实现 集合类存放于 Java.util 包中,主要有 3 种:set(集)、list(列表包含 Queue)和 map(映射)。 1. Collection:Collection 是集合 List、Set、Queue 的最基本的接口。 2. Iterato…