目录
摘要
Abstract
文献阅读:可解释多水平时间序列预测的时间融合Transformer
一、多水平预测(Multi-horizon forecasting)
二、过去相关研究
三、现存问题
四、提出方法
五、模型架构
1、门控残差网络(GRN)
2、变量选择网络
3、静态协变量编码器
4、时间融合解码器
5、分位数输出
六、研究实验
1、数据集
2、评估指标
3、实验过程
4、实验结果
七、框架搭建( tft_model.py)
总结
摘要
本周阅读的文献《Temporal Fusion Transformers for interpretable multi-horizon time series forecasting》介绍了一种新颖的时间序列预测架构——时间融合变换器(Temporal Fusion Transformer, TFT)。该架构的目的在于解决多水平时间序列预测中的复杂输入问题,这些输入包括静态变量、已知未来输入及仅过往观察到的外生时间序列,且它们与目标变量的交互方式未知。与传统的深度学习方法相比,TFT不仅在预测性能上有所突破,更重要的是提高了模型的可解释性,使用户能洞察模型如何处理时间序列中的不同时间尺度关系。TFT的核心设计包括使用循环层处理局部时间依赖,以及应用可解释的自注意力层捕捉长期依赖关系。模型通过特定组件选择相关特征,并利用门控层抑制不必要的信息,从而在广泛场景中达到高性能。与现有方法如DeepAR、DSSM和基于卷积处理的Transformer变体相比,TFT在点预测和不确定性量化方面均展现出显著优势。
Abstract
The literature "Temporary Fusion Transformers for Interpretable Multi Horizon Time Series Forecasting" read this week introduces a novel time series prediction architecture - the Time Fusion Transformer (TFT). The purpose of this architecture is to solve the complex input problem in multi-level time series prediction, which includes static variables, known future inputs, and exogenous time series only observed in the past, and their interaction with the target variable is unknown. Compared with traditional deep learning methods, TFT not only breaks through in predictive performance, but more importantly, improves the interpretability of the model, allowing users to gain insight into how the model handles different time scale relationships in time series. The core design of TFT includes using a loop layer to handle local temporal dependencies and applying an interpretable self attention layer to capture long-term dependencies. The model selects relevant features through specific components and utilizes gating layers to suppress unnecessary information, achieving high performance in a wide range of scenarios. Compared with existing methods such as DeepAR, DSSM, and Transformer variants based on convolutional processing, TFT exhibits significant advantages in point prediction and uncertainty quantification.
文献阅读:可解释多水平时间序列预测的时间融合Transformer
Temporal Fusion Transformers for interpretable multi-horizon time series forecastinghttps://doi.org/10.1016/j.ijforecast.2021.03.012PDF:main.pdf (sciencedirectassets.com)
代码来源:https://github.com/google-research/google-research/tree/master/tft
一、多水平预测(Multi-horizon forecasting)
多水平预测(Multi-horizon forecasting)是指对未来多个时间点上的目标变量进行预测的任务,与一步到位的预测不同,它为用户提供了一整条未来路径的估计,使得用户能够在多个未来时间点上优化其决策,比如零售商可以基于整个即将到来季节的销售预测来优化库存管理,或者临床医生能够依据患者的治疗效果预测来调整长期治疗方案。主要是指模型看到的输入数据的种类和范围扩展了,并且支持动态步数的多步预测。
实际的多水平预测应用程序通常可以访问各种数据源,如图所示,包括关于未来的已知信息(例如即将到来的假期日期)、其他外生时间序列(例如历史客户客流量)和静态元数据(例如商店的位置),而不需要任何关于它们如何交互的先验知识,数据源的异质性(不同方面的数据)以及关于它们相互作用的很少信息使得多水平时间序列预测特别具有挑战性。
在给定的时间序列数据集中,假设有 个唯一的实体,比如零售业中的不同门店或医疗保健中的不同病人。每个实体 关联有一组静态协变量 ,以及在每个时间步 处的输入特征向量和标量目标值。
时间相关的输入特征被细分为两类,其中前者为观测输入,只能在每个时间步上测量且事先未知,而后者为已知输入,可以预先确定(例如,时间t的星期几)。
在许多情况下,预测间隔的规定可能有助于优化决策和风险管理,因为它提供了目标可能采取的最好和最坏情况的值的指示。因此,对多水平预测设置采用分位数回归(例如,在每个时间步长输出第10、50和90个百分位数)。每个分位数预测采用以下形式:
将所有过去的信息合并到一个有限的回溯窗口k中,只使用直到并包括预测开始时间t的目标和已知输入和整个范围内的已知输入
二、过去相关研究
1、用于多步预测的DNNs
深度神经网络(DNNs)在多水平时间序列预测中逐渐兴起,展现出超越传统时间序列模型的强大性能。这些方法可分为迭代式(如基于自回归模型的Deep AR)和直接式(如基于序列到序列模型的方法)两大类。
-
迭代方法:这类方法基于自回归模型,如Deep AR和Deep State-Space Models(DSSM),采用LSTM网络来逐步生成每一步的预测分布,通过递归地将前一时刻的预测作为后一时刻的输入,实现多步预测。
-
直接序列到序列方法:与迭代方法不同,这类方法如基于Transformer的模型,直接从输入序列映射到输出序列,无需递归预测。
2、时间序列注意可解释性
注意力机制允许模型学习并突出显示输入序列中的重要部分,从而为预测结果提供更直观的解释依据。注意力机制逐渐被应用于时间序列分析以增强模型的可解释性。以往的研究,已经探索了基于LSTM和Transformer架构的注意力机制在时间序列上的应用。这些模型通过注意力权重揭示了模型关注输入序列中哪些部分进行预测的机制。
3、DNNs的实例变量重要性
对于实例级别的变量重要性分析,即理解特定样本预测中各变量的贡献,现有的方法分为后验解释方法和内在可解释模型两大类。后验解释方法如LIME、SHAP和RL-LIM,虽能提供关于黑盒模型的输入特征贡献度,但它们通常不考虑输入的时间顺序,限制了在复杂时间序列数据分析中的应用。相比之下,内在可解释模型,如Interpretable Multi-Variable LSTMs,通过结构化设计直接在模型架构中嵌入特征选择机制,使每个变量对预测的贡献显式量化,提高了模型的透明度和可解释性。
三、现存问题
- 没有考虑多水平预测中常见的不同类型的输入:传统上,多水平时间序列预测面临挑战,尤其是在处理混合型输入方面,包括静态变量(即时间不变协变量)、已知未来输入和仅在过去观察到的其他外生时间序列。现有方法通常假设所有外生输入在未来可知,或忽视重要的静态协变量,这导致模型在实际应用中缺乏灵活性和解释性。
- 缺乏可解释性:大多数当前架构为“黑箱”模型,其复杂非线性参数间相互作用使得预测过程难以解释,降低了用户对模型输出的信任度,且给模型调试带来困难。现有的后验解释方法如LIME和SHAP并不适合应用于时间序列,因为它们未考虑输入特征的时间顺序和相互依赖性。
四、提出方法
为应对上述问题,提出了一个基于注意力的框架Temporal Fusion Transformer(TFT),主要是对于多元异构的输入分别处理,逐层筛选过滤非必要的特征,从而去除噪声,保留重要信息,进而增加了可解释性。(其本质就是特征选择)创新点在于:
- 静态协变量编码器:用于生成上下文向量,这些向量在整个网络中被利用,帮助模型理解预测问题的全局背景。
- 门控机制与样本依赖性变量选择:通过这一系列机制最小化无关输入的影响,提升模型对关键信息的聚焦能力。
- 序列到序列层:专门处理已知和观测到的输入,实现局部时序信息的有效整合。
- 时间融合解码器:学习数据集中存在的任何长期依赖关系,增强模型对复杂时序模式的理解能力。
总结优点有:
- 传统上的多步预测通常是单步预测迭代而来的,即将 t+1 时刻的预测值,再作为模型的输入,反复迭代出多步的结果,这样会导致误差积累,就不如一次性预测多步的模型好。TFT 采用 序列到序列(seq2seq 的思路),输入多个历史值,在训练时同时将多步的预测结果纳入loss函数的计算,更好的减少误差提高预测精度。
- 不同类型的数据对于变量的预测都有着不同程度的影响, TFT对输入的特征做了严格的区分,分为静态变量(不会改变的)、可知动态变量(例如时间)、不可知动态变量(例如未来的数据),这样不同类型的数据在模型中起到了不同的重要程度,通过计算重要程度,模型可以更专注于更有意义的特征。
- TFT输出的预测值并不是一个值,而是一个预测区间。在时序预测问题中,每一个预测的时间步预测的置信度是不同的,而预测基本都是给决策服务的,一个要利用预测信息进行决策的人,可以根据预测区间的宽窄去决定自己要多信赖这一次预测的结果,在实际应用中很有价值。
- 通过变量选择模块和多头注意力解释了如何筛选重要特征,通过分析历史信息找到是否存在特定的模式,然后在特定模式上进一步研究数据集的时序是否发生过重大变化。
五、模型架构
TFT的核心是其独特的架构设计,主要组成部分是:
- 门控机制,可以跳过架构中任何未使用的组件,提供自适应深度和网络复杂性,以适应广泛的数据集和场景。
- 变量选择网络,在每个时间步中选择相关的输入变量。
- 静态协变量编码器,将静态特征集成到网络中,通过对上下文向量的编码来调节时间动态。
- 时间处理,从观察到的和已知的时变输入中学习长期和短期的时间关系,使用序列到序列层进行局部处理,其中使用一种新的可解释的多头注意块捕获长期依赖关系。
- 预测区间,通过分位数预测来确定每个预测水平可能目标值的范围。
从下往上分层来看,首先第一层输入层,分三部分输入:静态信息、历史(待预测变量)信息、未来(其他变量)信息;第二层变量选择层,做输入特征的筛选;第三层 LSTM编码层,通过LSTM网络来捕捉点长短期信息;第四层Gate + Add&Norm,其中门控在进一步考虑不同特征的重要性,然后做残差和normalization操作;第五层GRN,与第四层基本一样,可以理解就是在加深网络;第六次Attention层,对不同时刻的信息进行加权,计算注意力值;第七层输出层,做分位数回归预测区间。
1、门控残差网络(GRN)
外部输入和目标之间的精确关系通常是预先未知的,因此很难预测哪些变量是相关的。此外,很难确定所需的非线性处理的程度,在某些情况下,较简单的模型可能是有益的--例如,当数据集较小或有噪声时。为了给模型提供只在需要的地方应用非线性处理的灵活性,提出了门控残差网络(GRN)作为TFT的构建块,作用是控制信息流,主要是为了保持信息通过门控做一个初步特征选择工作。GRN接受主输入和可选的上下文向量,并产生
2、变量选择网络
为应对多变的输入特征重要性,TFT设计了变量选择网络。该网络在每个时间步为模型提供动态的、实例级别的变量权重,这意味着模型能够根据当前预测任务的需求,自动筛选出最相关的输入特征。这种机制减少了无关变量的干扰,增强了模型对关键信息的关注,从而提高预测精度。
变量选择网络包含了上述GRN,针对不同输入做了一个设计,变量选择网络 的工作原理如下:
- 将 GRN 单独应用于每个特征。
- 在所有特征的串联上应用 GRN,其中运用了外部的信息(External Context)去引导学习,然后通过 softmax 产生特征权重。
- 权重再乘以左边特征的一个feature map就可以得到特征选择的一个结果,即单个 GRN 输出的加权总和。
3、静态协变量编码器
区别于其他时间序列预测架构,TFT设计了静态协变量编码器,以集成静态元数据到模型中。通过独立的GRN编码器,它生成四个不同的上下文向量,分别用于条件化时间动态的不同方面,如时间变量选择、局部特征处理以及通过静态信息丰富时序特征。这一设计使得模型能够充分利用静态信息,增强对时间序列动态的解释力。从编码器生成的四个静态变量会流向三个地方:
- 时间变量选择的上下文,给之后的GRN提供外部的上下文变量,作为一个引导作用
- 时间特征的局部处理,给LSTM这两个结果作为初始化去encoder和decoder
- 用静态信息丰富时间特征的上下文,给VSN提供一个外部的上下文变量
4、时间融合解码器
给LSTM Encoder过去的一些特征,再给LSTM Decoder未来的一个特征,然后LSTM的编码器和解码器会再次经过Gate做特征选择的工作,把这些特征都处理完了后会统一流入到这个TFD,所以TFD的输入把过去,未来和现在的信息都整合过来了,整合进来完了之后会流入时间融合解码器内部的三个模块,它综合了LSTM(长短期记忆网络)进行局部时序信息处理和自注意力机制来整合来自任意时间步的信息。这种设计使模型能够高效地处理短时和长时的时序关系,同时保持了对时序动态的深入理解和预测能力。解码器的设计允许对不同类型的输入(如静态、已知、观测的输入)进行有效融合,形成对目标序列的预测。
TFD中的自注意力机制通过修改自Transformer架构的多头注意力机制,以提高模型的可解释性。多头注意力允许模型在不同表示子空间中学习长程依赖关系,每个多头关注输入序列的不同方面,然后通过线性组合这些关注结果,模型能更细致地捕捉和解释不同时间步之间复杂的关系。这种机制不仅提高了模型的学习能力,也为理解模型决策过程提供了途径。
这里对传统的Transformer多头注意力机制进行了一些小改进,传统的针对Q K V 针对每一个头都会有不同权重,但是TFT在这里V是多头共享的参数,Q K 本身是组成Attention的一个重要部分,所以这两个就不用想参数,每一个头都是每一个头的权重。
5、分位数输出
TFT还提供了预测区间的功能,通过量化预测来确定目标值在每个预测时间点的可能范围。这种量化预测不仅给出了点估计,还提供了关于预测不确定性的信息,这对于风险管理和决策制定至关重要。通过学习不同的分位数,模型能够输出一系列的值,反映出目标变量在不同置信水平下的预期范围。
六、研究实验
1、数据集
研究采用了一系列具有代表性的数据集,以全面检验Temporal Fusion Transformer (TFT) 在多水平时间序列预测中的表现和适用性。所选数据集覆盖了从简单到复杂的多种应用场景,具体包括:
- UCI电力负荷图数据集,包含370个客户的用电量,以小时为单位聚合,利用过去一周的数据,预测接下来24小时的消费。
- UCI PEM-SF交通数据集,描述了SF海湾地区440条高速公路的占用率,同样按小时聚合,预测窗口和历史观察期相同。
- Favorita杂货销售数据集,源自Kaggle竞赛,结合了产品和店铺的元数据以及每日采样的其他外生时间序列变量,目标是基于过去90天的信息,预测未来30天的日志销量。
- 波动率数据,OMI实现库提供的31个股票指数的日实现波动率数据,结合日收益率,用以评估模型在小规模且噪声较大的金融数据上的鲁棒性。
2、评估指标
为了全面评估模型的表现,采用了量化损失指标,特别是P50和P90损失,以衡量预测的准确性和不确定性。P50代表中位数损失,反映了模型预测值的中心趋势;而P90损失则评估了模型预测的上端可信区间,反映了模型在极端情况下的表现。
3、实验过程
实验过程包括了对每个数据集的划分,分为训练集、验证集和测试集,用于模型学习、超参数调整和最终性能评估。超参数优化通过随机搜索进行,针对不同的数据集调整了状态大小、丢弃率、批次大小、学习率和最大梯度范数等多个参数。例如,在波动率数据集上进行了240次迭代,在其他数据集上进行了64次。此过程确保了模型在不同数据特性下的最优配置。
4、实验结果
实验结果显示,TFT在各种数据集上显著优于所有比较基准,包括直接和迭代模型。平均而言,TFT的P50和P90损失分别比次优模型低7%和9%,体现了其在点预测和不确定性估计方面的优越性。特别是在处理复杂输入(如零售和波动率数据集)时,TFT通过考虑观测到的输入变量和采用分位数回归来处理非正态分布的目标,展现出了明显的优势。例如,在交通流量数据集上,由于目标分布严重偏斜,TFT的直接模型性能超过了基于高斯分布假设的其他模型。
消融实验
- 门控层:通过将每个GLU层替换为一个简单的线性层,然后是ELU来进行消融。
- 静态协变量编码器:通过将所有上下文向量设置为零(即cs=ce=cc=ch=0)并将所有转换后的静态输入连接到所有与时间相关的过去和未来输入来实现。
- 实例式变量选择网络:通过用可训练系数替换Softmax输出,并移除产生可变选择权重的网络来进行消融。
- 自我注意力层:通过替换可解释的多头关注层的注意矩阵(等式)来消融。
- 用于局部处理的序列到序列层:通过用Vaswani等人使用的标准位置编码取代序列到序列层来进行消融。
结果如图所示,对所有数据集的P50和P90损耗的影响是相似的,所有组件总体上都有助于性能改进。负责捕捉时间关系的组件(如循环层进行局部处理和自注意力层处理长期依赖)对模型性能的影响最大。当这些组件被移除后,P90损失平均增加了超过6%,在特定数据集中甚至超过了20%。除此之外可以发现,不同时间序列数据集对模型组件的依赖程度有所差异。门控层的消融实验显示,它们对模型稳健性有显著贡献,特别是在噪声较大的小型数据集上,如波动率预测数据集,P90损失增加了4.1%。
七、框架搭建( tft_model.py)
1、def linear_layer()
定义Dense线性层,但相比Dense,这里增加了一个TimeDistributed层,即在每个时间步上均进行Dense操作。
def linear_layer(size,
activation=None,
use_time_distributed=False,
use_bias=True):
linear = tf.keras.layers.Dense(size, activation=activation, use_bias=use_bias)
if use_time_distributed:
linear = tf.keras.layers.TimeDistributed(linear)
return linear
2、apply_mlp()
定义两层Dense,MLP多层感知器。
def apply_mlp(inputs,
hidden_size,
output_size,
output_activation=None,
hidden_activation='tanh',
use_time_distributed=False):
if use_time_distributed:
hidden = tf.keras.layers.TimeDistributed(
tf.keras.layers.Dense(hidden_size, activation=hidden_activation))(
inputs)
return tf.keras.layers.TimeDistributed(
tf.keras.layers.Dense(output_size, activation=output_activation))(
hidden)
else:
hidden = tf.keras.layers.Dense(
hidden_size, activation=hidden_activation)(
inputs)
return tf.keras.layers.Dense(
output_size, activation=output_activation)(
hidden)
3、def apply_gating_layer(),定义GLU门限单元
在Dropout后,分别定义激活函数为sigmoid与无激活函数的Dense层,将两Dense层的输出矩阵相乘即获得门限单元。门限单元的作用即门限,相当于给变量加一个阀门,乘以一个系数(非线性)。
def apply_gating_layer(x,
hidden_layer_size,
dropout_rate=None,
use_time_distributed=True,
activation=None):
if dropout_rate is not None:
x = tf.keras.layers.Dropout(dropout_rate)(x)
if use_time_distributed:
activation_layer = tf.keras.layers.TimeDistributed(
tf.keras.layers.Dense(hidden_layer_size, activation=activation))(
x)
gated_layer = tf.keras.layers.TimeDistributed(
tf.keras.layers.Dense(hidden_layer_size, activation='sigmoid'))(
x)
else:
activation_layer = tf.keras.layers.Dense(
hidden_layer_size, activation=activation)(
x)
gated_layer = tf.keras.layers.Dense(
hidden_layer_size, activation='sigmoid')(
x)
return tf.keras.layers.Multiply()([activation_layer,
gated_layer]), gated_layer
4、def add_and_norm()残差与归一化网络,防止过拟合。
ef add_and_norm(x_list):
tmp = Add()(x_list)
tmp = LayerNorm()(tmp)
return tmp
5、def gated_residual_network()
定义GRN门限残差网络,属于门限装置。
输入先通过 linear_layer()函数定义的Dense层,使用ELU指数激活函数后,再通过linear_layer()函数定义的Dense层,apply_gating_layer()函数定义的门限层,最后经过残差与归一化网络输出。其作用相当于主成分分析,提取有效特征。
如果还输入了上下文特征矩阵c,a、c同时通过Dense层得到两个特征矩阵后相加。其作用大致相当于同时提取有效(对输出有影响)的上下文特征。
def gated_residual_network(x,
hidden_layer_size,
output_size=None,
dropout_rate=None,
use_time_distributed=True,
additional_context=None,
return_gate=False):
# Setup skip connection
if output_size is None:
output_size = hidden_layer_size
skip = x
else:
linear = Dense(output_size)
if use_time_distributed:
linear = tf.keras.layers.TimeDistributed(linear)
skip = linear(x)
# Apply feedforward network
hidden = linear_layer(
hidden_layer_size,
activation=None,
use_time_distributed=use_time_distributed)(
x)
if additional_context is not None:
hidden = hidden + linear_layer(
hidden_layer_size,
activation=None,
use_time_distributed=use_time_distributed,
use_bias=False)(
additional_context)
hidden = tf.keras.layers.Activation('elu')(hidden)
hidden = linear_layer(
hidden_layer_size,
activation=None,
use_time_distributed=use_time_distributed)(
hidden)
gating_layer, gate = apply_gating_layer(
hidden,
output_size,
dropout_rate=dropout_rate,
use_time_distributed=use_time_distributed,
activation=None)
if return_gate:
return add_and_norm([skip, gating_layer]), gate
else:
return add_and_norm([skip, gating_layer])
6、def get_decoder_mask()
相当与大名鼎鼎的Transformer中的looking_ahead_masking层。因为在实际情况中,一条数据的数据只受历史与当前数据的影响,而与未来状态无影响。因此,我们需要一个矩阵用来盖住下文特征。而该函数创建了一个上三角为1的矩阵,乘以负无穷,加在特征矩阵上后,使第i条数据输出,只受前i-1条数据的影响,之后数据的影响为0,参数为负无穷。
def get_decoder_mask(self_attn_inputs):
len_s = tf.shape(self_attn_inputs)[1]
bs = tf.shape(self_attn_inputs)[:1]
mask = K.cumsum(tf.eye(len_s, batch_shape=bs), 1)
return mask
7、class ScaledDotProductAttention()
实现自注意力机制。
self_attention的实现这里不细讲。主要流程为:
- Q(uary)、K(ey)、V(alue)取相同值
- Q、K矩阵相乘,再除以维度的根号,得到序列内部子注意力系数矩阵
- 子注意力系数与V相乘,得到自注意力矩阵
class ScaledDotProductAttention():
def __init__(self, attn_dropout=0.0):
self.dropout = Dropout(attn_dropout)
self.activation = Activation('softmax')
def __call__(self, q, k, v, mask):
temper = tf.sqrt(tf.cast(tf.shape(k)[-1], dtype='float32'))
attn = Lambda(lambda x: K.batch_dot(x[0], x[1], axes=[2, 2]) / temper)(
[q, k]) # shape=(batch, q, k)
if mask is not None:
mmask = Lambda(lambda x: (-1e+9) * (1. - K.cast(x, 'float32')))(
mask) # setting to infinity
attn = Add()([attn, mmask])
attn = self.activation(attn)
attn = self.dropout(attn)
output = Lambda(lambda x: K.batch_dot(x[0], x[1]))([attn, v])
return output, attn
8、 class InterpretableMultiHeadAttention()
可解释的多头自注意力机制的实现。
在我看来,这里的可解释值的是多头的可解释:在传统的多头自注意力机制中,我们将多维的数据切割为单维,每维的数据都投入神经网络,得到各自自注意力矩阵,再将多个矩阵合成单个矩阵,但问题是,我们无法解释每维数据得到的矩阵代表着什么,因为输入的仅是矩阵的一维,我们无法将输出作为数据整体的子注意力的一部分,这是不可解释的。而论文的创新点在于不再将矩阵切成n维后分别投入,而是通过Dense层得到单维的特征矩阵,再投入自注意力机制。
这个方法使矩阵的输入变得合理,有一定的解释性。本来,我想专门调研以下可解释性的Transformer的,但导师说神经网络可解释性的水太深,不是我一个本科生能把握住的~-~。
class InterpretableMultiHeadAttention():
def __init__(self, n_head, d_model, dropout):
self.n_head = n_head
self.d_k = self.d_v = d_k = d_v = d_model // n_head
self.dropout = dropout
self.qs_layers = []
self.ks_layers = []
self.vs_layers = []
# Use same value layer to facilitate interp
vs_layer = Dense(d_v, use_bias=False)
for _ in range(n_head):
self.qs_layers.append(Dense(d_k, use_bias=False))
self.ks_layers.append(Dense(d_k, use_bias=False))
self.vs_layers.append(vs_layer) # use same vs_layer
self.attention = ScaledDotProductAttention()
self.w_o = Dense(d_model, use_bias=False)
def __call__(self, q, k, v, mask=None):
n_head = self.n_head
heads = []
attns = []
for i in range(n_head):
qs = self.qs_layers[i](q)
ks = self.ks_layers[i](k)
vs = self.vs_layers[i](v)
head, attn = self.attention(qs, ks, vs, mask)
head_dropout = Dropout(self.dropout)(head)
heads.append(head_dropout)
attns.append(attn)
head = K.stack(heads) if n_head > 1 else heads[0]
attn = K.stack(attns)
outputs = K.mean(head, axis=0) if n_head > 1 else head
outputs = self.w_o(outputs)
outputs = Dropout(self.dropout)(outputs) # output dropout
return outputs, attn
9、class TemporalFusionTransformer(),TFT的具体搭建
# TFT model definitions.
class TemporalFusionTransformer(object):
def __init__(self, raw_params, use_cudnn=False):
self.name = self.__class__.__name__
params = dict(raw_params) # copy locally
# Data parameters
self.time_steps = int(params['total_time_steps'])
self.input_size = int(params['input_size'])
self.output_size = int(params['output_size'])
self.category_counts = json.loads(str(params['category_counts']))
self.n_multiprocessing_workers = int(params['multiprocessing_workers'])
# Relevant indices for TFT
self._input_obs_loc = json.loads(str(params['input_obs_loc']))
self._static_input_loc = json.loads(str(params['static_input_loc']))
self._known_regular_input_idx = json.loads(
str(params['known_regular_inputs']))
self._known_categorical_input_idx = json.loads(
str(params['known_categorical_inputs']))
self.column_definition = params['column_definition']
# Network params
self.quantiles = [0.1, 0.5, 0.9]
self.use_cudnn = use_cudnn # Whether to use GPU optimised LSTM
self.hidden_layer_size = int(params['hidden_layer_size'])
self.dropout_rate = float(params['dropout_rate'])
self.max_gradient_norm = float(params['max_gradient_norm'])
self.learning_rate = float(params['learning_rate'])
self.minibatch_size = int(params['minibatch_size'])
self.num_epochs = int(params['num_epochs'])
self.early_stopping_patience = int(params['early_stopping_patience'])
self.num_encoder_steps = int(params['num_encoder_steps'])
self.num_stacks = int(params['stack_size'])
self.num_heads = int(params['num_heads'])
# Serialisation options
self._temp_folder = os.path.join(params['model_folder'], 'tmp')
self.reset_temp_folder()
# Extra components to store Tensorflow nodes for attention computations
self._input_placeholder = None
self._attention_components = None
self._prediction_parts = None
print('*** {} params ***'.format(self.name))
for k in params:
print('# {} = {}'.format(k, params[k]))
# Build model
self.model = self.build_model()
def get_tft_embeddings(self, all_inputs):
time_steps = self.time_steps
# Sanity checks
for i in self._known_regular_input_idx:
if i in self._input_obs_loc:
raise ValueError('Observation cannot be known a priori!')
for i in self._input_obs_loc:
if i in self._static_input_loc:
raise ValueError('Observation cannot be static!')
if all_inputs.get_shape().as_list()[-1] != self.input_size:
raise ValueError(
'Illegal number of inputs! Inputs observed={}, expected={}'.format(
all_inputs.get_shape().as_list()[-1], self.input_size))
num_categorical_variables = len(self.category_counts)
num_regular_variables = self.input_size - num_categorical_variables
embedding_sizes = [
self.hidden_layer_size for i, size in enumerate(self.category_counts)
]
embeddings = []
for i in range(num_categorical_variables):
embedding = tf.keras.Sequential([
tf.keras.layers.InputLayer([time_steps]),
tf.keras.layers.Embedding(
self.category_counts[i],
embedding_sizes[i],
input_length=time_steps,
dtype=tf.float32)
])
embeddings.append(embedding)
regular_inputs, categorical_inputs \
= all_inputs[:, :, :num_regular_variables], \
all_inputs[:, :, num_regular_variables:]
embedded_inputs = [
embeddings[i](categorical_inputs[Ellipsis, i])
for i in range(num_categorical_variables)
]
# Static inputs
if self._static_input_loc:
static_inputs = [tf.keras.layers.Dense(self.hidden_layer_size)(
regular_inputs[:, 0, i:i + 1]) for i in range(num_regular_variables)
if i in self._static_input_loc] \
+ [embedded_inputs[i][:, 0, :]
for i in range(num_categorical_variables)
if i + num_regular_variables in self._static_input_loc]
static_inputs = tf.keras.backend.stack(static_inputs, axis=1)
else:
static_inputs = None
def convert_real_to_embedding(x):
"""Applies linear transformation for time-varying inputs."""
return tf.keras.layers.TimeDistributed(
tf.keras.layers.Dense(self.hidden_layer_size))(
x)
# Targets
obs_inputs = tf.keras.backend.stack([
convert_real_to_embedding(regular_inputs[Ellipsis, i:i + 1])
for i in self._input_obs_loc
],
axis=-1)
# Observed (a prioir unknown) inputs
wired_embeddings = []
for i in range(num_categorical_variables):
if i not in self._known_categorical_input_idx \
and i + num_regular_variables not in self._input_obs_loc:
e = embeddings[i](categorical_inputs[:, :, i])
wired_embeddings.append(e)
unknown_inputs = []
for i in range(regular_inputs.shape[-1]):
if i not in self._known_regular_input_idx \
and i not in self._input_obs_loc:
e = convert_real_to_embedding(regular_inputs[Ellipsis, i:i + 1])
unknown_inputs.append(e)
if unknown_inputs + wired_embeddings:
unknown_inputs = tf.keras.backend.stack(
unknown_inputs + wired_embeddings, axis=-1)
else:
unknown_inputs = None
# A priori known inputs
known_regular_inputs = [
convert_real_to_embedding(regular_inputs[Ellipsis, i:i + 1])
for i in self._known_regular_input_idx
if i not in self._static_input_loc
]
known_categorical_inputs = [
embedded_inputs[i]
for i in self._known_categorical_input_idx
if i + num_regular_variables not in self._static_input_loc
]
known_combined_layer = tf.keras.backend.stack(
known_regular_inputs + known_categorical_inputs, axis=-1)
return unknown_inputs, known_combined_layer, obs_inputs, static_inputs
def _get_single_col_by_type(self, input_type):
return utils.get_single_col_by_input_type(input_type,
self.column_definition)
def training_data_cached(self):
return TFTDataCache.contains('train') and TFTDataCache.contains('valid')
def cache_batched_data(self, data, cache_key, num_samples=-1):
if num_samples > 0:
TFTDataCache.update(
self._batch_sampled_data(data, max_samples=num_samples), cache_key)
else:
TFTDataCache.update(self._batch_data(data), cache_key)
print('Cached data "{}" updated'.format(cache_key))
(1)__init__函数:实现各种参数的初始化与赋值。
(2)def get_tft_embeddings(self, all_inputs)函数:规范化各项输入,嵌入相同的维度。all_inputs格式为(, 192, 5)。
- 检查预处理数据
- 创建embeddings层
- 生成静态输入
- 处理输出格式
- 生成可观测输入
- 生成已知输入
- 返回可观测输入
10、def _batch_sampled_data(), 将预处理数据切割为batch
def _batch_sampled_data(self, data, max_samples):
if max_samples < 1:
raise ValueError(
'Illegal number of samples specified! samples={}'.format(max_samples))
id_col = self._get_single_col_by_type(InputTypes.ID)
time_col = self._get_single_col_by_type(InputTypes.TIME)
data.sort_values(by=[id_col, time_col], inplace=True)
print('Getting valid sampling locations.')
valid_sampling_locations = []
split_data_map = {}
for identifier, df in data.groupby(id_col):
print('Getting locations for {}'.format(identifier))
num_entries = len(df)
if num_entries >= self.time_steps:
valid_sampling_locations += [
(identifier, self.time_steps + i)
for i in range(num_entries - self.time_steps + 1)
]
split_data_map[identifier] = df
inputs = np.zeros((max_samples, self.time_steps, self.input_size))
outputs = np.zeros((max_samples, self.time_steps, self.output_size))
time = np.empty((max_samples, self.time_steps, 1), dtype=object)
identifiers = np.empty((max_samples, self.time_steps, 1), dtype=object)
if max_samples > 0 and len(valid_sampling_locations) > max_samples:
print('Extracting {} samples...'.format(max_samples))
ranges = [
valid_sampling_locations[i] for i in np.random.choice(
len(valid_sampling_locations), max_samples, replace=False)
]
else:
print('Max samples={} exceeds # available segments={}'.format(
max_samples, len(valid_sampling_locations)))
ranges = valid_sampling_locations
id_col = self._get_single_col_by_type(InputTypes.ID)
time_col = self._get_single_col_by_type(InputTypes.TIME)
target_col = self._get_single_col_by_type(InputTypes.TARGET)
input_cols = [
tup[0]
for tup in self.column_definition
if tup[2] not in {InputTypes.ID, InputTypes.TIME}
]
for i, tup in enumerate(ranges):
if (i + 1 % 1000) == 0:
print(i + 1, 'of', max_samples, 'samples done...')
identifier, start_idx = tup
sliced = split_data_map[identifier].iloc[start_idx -
self.time_steps:start_idx]
inputs[i, :, :] = sliced[input_cols]
outputs[i, :, :] = sliced[[target_col]]
time[i, :, 0] = sliced[time_col]
identifiers[i, :, 0] = sliced[id_col]
sampled_data = {
'inputs': inputs,
'outputs': outputs[:, self.num_encoder_steps:, :],
'active_entries': np.ones_like(outputs[:, self.num_encoder_steps:, :]),
'time': time,
'identifier': identifiers
}
return sampled_data
11、def _build_base_graph(), 构建TFT的神经网络框架。
def _build_base_graph(self):
"""Returns graph defining layers of the TFT."""
# Size definitions.
time_steps = self.time_steps
combined_input_size = self.input_size
encoder_steps = self.num_encoder_steps
# Inputs.
all_inputs = tf.keras.layers.Input(
shape=(
time_steps,
combined_input_size,
))
unknown_inputs, known_combined_layer, obs_inputs, static_inputs \
= self.get_tft_embeddings(all_inputs)
# Isolate known and observed historical inputs.
if unknown_inputs is not None:
historical_inputs = concat([
unknown_inputs[:, :encoder_steps, :],
known_combined_layer[:, :encoder_steps, :],
obs_inputs[:, :encoder_steps, :]
],
axis=-1)
else:
historical_inputs = concat([
known_combined_layer[:, :encoder_steps, :],
obs_inputs[:, :encoder_steps, :]
],
axis=-1)
# Isolate only known future inputs.
future_inputs = known_combined_layer[:, encoder_steps:, :]
def static_combine_and_mask(embedding):
# 添加时序特种车
_, num_static, _ = embedding.get_shape().as_list()
flatten = tf.keras.layers.Flatten()(embedding)
# Nonlinear transformation with gated residual network.
mlp_outputs = gated_residual_network(
flatten,
self.hidden_layer_size,
output_size=num_static,
dropout_rate=self.dropout_rate,
use_time_distributed=False,
additional_context=None)
sparse_weights = tf.keras.layers.Activation('softmax')(mlp_outputs)
sparse_weights = K.expand_dims(sparse_weights, axis=-1)
trans_emb_list = []
for i in range(num_static):
e = gated_residual_network(
embedding[:, i:i + 1, :],
self.hidden_layer_size,
dropout_rate=self.dropout_rate,
use_time_distributed=False)
trans_emb_list.append(e)
transformed_embedding = concat(trans_emb_list, axis=1)
combined = tf.keras.layers.Multiply()(
[sparse_weights, transformed_embedding])
static_vec = K.sum(combined, axis=1)
return static_vec, sparse_weights
static_encoder, static_weights = static_combine_and_mask(static_inputs)
static_context_variable_selection = gated_residual_network(
static_encoder,
self.hidden_layer_size,
dropout_rate=self.dropout_rate,
use_time_distributed=False)
static_context_enrichment = gated_residual_network(
static_encoder,
self.hidden_layer_size,
dropout_rate=self.dropout_rate,
use_time_distributed=False)
static_context_state_h = gated_residual_network(
static_encoder,
self.hidden_layer_size,
dropout_rate=self.dropout_rate,
use_time_distributed=False)
static_context_state_c = gated_residual_network(
static_encoder,
self.hidden_layer_size,
dropout_rate=self.dropout_rate,
use_time_distributed=False)
12、 def static_combine_and_mask(embedding),用于静态协变量的变量选择,选择对输出造成影响的协变量。
def static_combine_and_mask(embedding):
# Add temporal features
_, num_static, _ = embedding.get_shape().as_list()
flatten = tf.keras.layers.Flatten()(embedding)
# Nonlinear transformation with gated residual network.
mlp_outputs = gated_residual_network(
flatten,
self.hidden_layer_size,
output_size=num_static,
dropout_rate=self.dropout_rate,
use_time_distributed=False,
additional_context=None)
sparse_weights = tf.keras.layers.Activation('softmax')(mlp_outputs)
sparse_weights = K.expand_dims(sparse_weights, axis=-1)
trans_emb_list = []
for i in range(num_static):
e = gated_residual_network(
embedding[:, i:i + 1, :],
self.hidden_layer_size,
dropout_rate=self.dropout_rate,
use_time_distributed=False)
trans_emb_list.append(e)
transformed_embedding = concat(trans_emb_list, axis=1)
combined = tf.keras.layers.Multiply()(
[sparse_weights, transformed_embedding])
static_vec = K.sum(combined, axis=1)
return static_vec, sparse_weights
static_encoder, static_weights = static_combine_and_mask(static_inputs)
static_context_variable_selection = gated_residual_network(
static_encoder,
self.hidden_layer_size,
dropout_rate=self.dropout_rate,
use_time_distributed=False)
static_context_enrichment = gated_residual_network(
static_encoder,
self.hidden_layer_size,
dropout_rate=self.dropout_rate,
use_time_distributed=False)
static_context_state_h = gated_residual_network(
static_encoder,
self.hidden_layer_size,
dropout_rate=self.dropout_rate,
use_time_distributed=False)
static_context_state_c = gated_residual_network(
static_encoder,
self.hidden_layer_size,
dropout_rate=self.dropout_rate,
use_time_distributed=False)
总结
TFT将高性能的多步预测与对时间序列内在动态的可解释性相结合,能够灵活地处理不同尺度的时间依赖性,同时利用变量选择网络剔除无关噪声,专注于最有意义的输入特征。相比以往研究,它关注不同类型的数据,并计算这些输入特征的重要性,挑选重要特征,去除噪声影响,最后将不同类型的数据进行融合,得到预测区间。TFT框架从这几个方面创新,不管是在模型的预测精度还是可解释上都得到了很大的提升。