目录
- 1. 模型介绍
- 2. 问题提出
- 3. 模型具体实现
- 3.1 嵌入和恢复函数
- 3.2 序列生成器和判别器
- 3.3 联合学习编码、生成和迭代
- 4. 代码实现
- 参考文献
时间序列数据在金融、医疗和物联网等领域具有广泛的应用。生成真实感的时间序列数据对于数据增强、隐私保护和模拟研究具有重要意义。TimeGAN是一种结合生成对抗网络(GAN)和序列生成模型优点的时间序列生成模型。本文将详细介绍TimeGAN的原理、模型实现以及相关代码。
1. 模型介绍
TimeGAN由Jinsung Yoon, Daniel Jarrett和Mihaela van der Schaar在2019年提出,是一种用于生成时间序列数据的生成对抗网络。该模型通过结合生成对抗网络(GAN)和有监督学习的优势,不仅能够生成具有高保真度的合成时间序列数据,还能够保留原始数据的时间依赖结构。TimeGAN的主要组件包括嵌入网络、生成器、判别器和有监督网络。
2. 问题提出
在一般数据设置中,每个实例由静态特征(如性别)和时间特征(如生命体征)组成。令 S S S 为静态特征的向量空间, X X X 为时间特征的向量空间, S ∈ S , X ∈ X S \in S, X \in X S∈S,X∈X 为可以用具体值实例化的随机向量,分别表示为 s s s 和 x x x。我们的目标是使用训练数据 D = { ( s n , x n , 1 : T n ) } n = 1 N D = \{(s_n, x_{n,1:T_n})\}_{n=1}^N D={(sn,xn,1:Tn)}n=1N 学习一个密度 p ^ ( S , X 1 : T ) \hat{p}(S, X_{1:T}) p^(S,X1:T),以尽可能逼近真实分布 p ( S , X 1 : T ) p(S, X_{1:T}) p(S,X1:T)。
通过联合分布的自回归分解, p ( S , X 1 : T ) = p ( S ) ∏ t p ( X t ∣ S , X 1 : t − 1 ) p(S, X_{1:T}) = p(S) \prod_t p(X_t|S, X_{1:t-1}) p(S,X1:T)=p(S)∏tp(Xt∣S,X1:t−1),我们可以专注于学习条件分布 p ^ ( X t ∣ S , X 1 : t − 1 ) \hat{p}(X_t|S, X_{1:t-1}) p^(Xt∣S,X1:t−1),以逼近真实的条件分布 p ( X t ∣ S , X 1 : t − 1 ) p(X_t|S, X_{1:t-1}) p(Xt∣S,X1:t−1)。
3. 模型具体实现
3.1 嵌入和恢复函数
嵌入和恢复函数提供特征空间和潜在空间之间的映射,使对抗性网络能够通过低维表示学习数据的时间动态。令 H S , H X H_S, H_X HS,HX 分别表示对应于特征空间 S , X S, X S,X 的潜在向量空间。嵌入函数 e : S × Q t X → H S × Q t H X e: S \times Q_t X \to H_S \times Q_t H_X e:S×QtX→HS×QtHX 将静态和时间特征转换为其潜在编码 h S , h 1 : T = e ( s , x 1 : T ) h_S, h_{1:T} = e(s, x_{1:T}) hS,h1:T=e(s,x1:T)。在本文中,我们通过一个递归网络实现 e e e:
h S = e S ( s ) , h t = e X ( h S , h t − 1 , x t ) h_S = e_S(s), \quad h_t = e_X(h_S, h_{t-1}, x_t) hS=eS(s),ht=eX(hS,ht−1,xt)
其中, e S : S → H S e_S: S \to H_S eS:S→HS 是用于静态特征的嵌入网络, e X : H S × H X × X → H X e_X: H_S \times H_X \times X \to H_X eX:HS×HX×X→HX 是用于时间特征的递归嵌入网络。反向操作中,恢复函数 r : H S × Q t H X → S × Q t X r: H_S \times Q_t H_X \to S \times Q_t X r:HS×QtHX→S×QtX 将静态和时间编码恢复到其特征表示:
s ~ , x ~ 1 : T = r ( h S , h 1 : T ) \tilde{s}, \tilde{x}_{1:T} = r(h_S, h_{1:T}) s~,x~1:T=r(hS,h1:T)
在这里,我们通过一个前馈网络在每一步实现 r r r:
s ~ = r S ( h S ) , x ~ t = r X ( h t ) \tilde{s} = r_S(h_S), \quad \tilde{x}_t = r_X(h_t) s~=rS(hS),x~t=rX(ht)
3.2 序列生成器和判别器
生成器首先在嵌入空间中输出,而不是直接在特征空间中产生合成输出。令 Z S , Z X Z_S, Z_X ZS,ZX 表示定义了已知分布的向量空间,并从中抽取随机向量作为输入以生成 H S , H X H_S, H_X HS,HX。生成函数 g : Z S × Q t Z X → H S × Q t H X g: Z_S \times Q_t Z_X \to H_S \times Q_t H_X g:ZS×QtZX→HS×QtHX 将静态和时间随机向量的元组转换为合成潜在编码 h ^ S , h ^ 1 : T = g ( z S , z 1 : T ) \hat{h}_S, \hat{h}_{1:T} = g(z_S, z_{1:T}) h^S,h^1:T=g(zS,z1:T)。我们通过一个递归网络实现 g g g:
h ^ S = g S ( z S ) , h ^ t = g X ( h ^ S , h ^ t − 1 , z t ) \hat{h}_S = g_S(z_S), \quad \hat{h}_t = g_X(\hat{h}_S, \hat{h}_{t-1}, z_t) h^S=gS(zS),h^t=gX(h^S,h^t−1,zt)
其中, g S : Z S → H S g_S: Z_S \to H_S gS:ZS→HS 是用于静态特征的生成网络, g X : H S × H X × Z X → H X g_X: H_S \times H_X \times Z_X \to H_X gX:HS×HX×ZX→HX 是用于时间特征的递归生成器。随机向量 z S z_S zS 可以从任意选择的分布中采样,而 z t z_t zt 遵循一个随机过程;在这里我们分别使用高斯分布和维纳过程。最后,判别器也从嵌入空间中操作。判别函数 d : H S × Q t H X → [ 0 , 1 ] × Q t [ 0 , 1 ] d: H_S \times Q_t H_X \to [0, 1] \times Q_t[0, 1] d:HS×QtHX→[0,1]×Qt[0,1] 接收静态和时间编码,返回分类结果 y ~ S , y ~ 1 : T = d ( h ~ S , h ~ 1 : T ) \tilde{y}_S, \tilde{y}_{1:T} = d(\tilde{h}_S, \tilde{h}_{1:T}) y~S,y~1:T=d(h~S,h~1:T)。
我们通过一个双向递归网络和一个前馈输出层实现 d d d:
y ~ S = d S ( h ~ S ) , y ~ t = d X ( u → t , u ← t ) \tilde{y}_S = d_S(\tilde{h}_S), \quad \tilde{y}_t = d_X(\overset{\rightarrow}{u}_t, \overset{\leftarrow}{u}_t) y~S=dS(h~S),y~t=dX(u→t,u←t)
其中 u → t = c → X ( h ~ S , h ~ t , u → t − 1 ) \overset{\rightarrow}{u}_t = \overset{\rightarrow}{c}_X(\tilde{h}_S, \tilde{h}_t, \overset{\rightarrow}{u}_{t-1}) u→t=c→X(h~S,h~t,u→t−1) 和 u ← t = c ← X ( h ~ S , h ~ t , u ← t + 1 ) \overset{\leftarrow}{u}_t = \overset{\leftarrow}{c}_X(\tilde{h}_S, \tilde{h}_t, \overset{\leftarrow}{u}_{t+1}) u←t=c←X(h~S,h~t,u←t+1) 分别表示前向和后向隐藏状态序列, c → X \overset{\rightarrow}{c}_X c→X 和 c ← X \overset{\leftarrow}{c}_X c←X 是递归函数, d S , d X d_S, d_X dS,dX 是输出层分类函数。
3.3 联合学习编码、生成和迭代
首先,嵌入和恢复函数作为特征和潜在空间之间的可逆映射,应该能够准确重构原始数据 s , x 1 : T s, x_{1:T} s,x1:T 的潜在表示 h S , h 1 : T h_S, h_{1:T} hS,h1:T。因此,我们的第一个目标函数是重构损失:
L R = E s , x 1 : T ∼ p [ ∥ s − s ~ ∥ 2 + ∑ t ∥ x t − x ~ t ∥ 2 ] L_R = \mathbb{E}_{s,x_{1:T} \sim p} \left[ \|s - \tilde{s}\|^2 + \sum_t \|x_t - \tilde{x}_t\|^2 \right] LR=Es,x1:T∼p[∥s−s~∥2+t∑∥xt−x~t∥2]
在TimeGAN中,生成器在训练过程中会接触到两种类型的输入。首先,在纯开放循环模式下,生成器(自回归的)接收合成嵌入 h ^ S , h ^ 1 : t − 1 \hat{h}_S, \hat{h}_{1:t-1} h^S,h^1:t−1(即它自己的前一个输出)以生成下一个合成向量 h ^ t \hat{h}_t h^t。然后在无监督损失上计算梯度:
L U = E s , x 1 : T ∼ p [ log y S + ∑ t log y t ] + E s , x 1 : T ∼ p ^ [ log ( 1 − y ^ S ) + ∑ t log ( 1 − y ^ t ) ] L_U = \mathbb{E}_{s,x_{1:T} \sim p} \left[ \log y_S + \sum_t \log y_t \right] + \mathbb{E}_{s,x_{1:T} \sim \hat{p}} \left[ \log(1 - \hat{y}_S) + \sum_t \log(1 - \hat{y}_t) \right] LU=Es,x1:T∼p[logyS+t∑logyt]+Es,x1:T∼p^[log(1−y^S)+t∑log(1−y^t)]
为了更有效地实现学习,我们引入了一个额外的监督损失。在闭环模式下,生成器接收实际数据的嵌入序列 h 1 : t − 1 h_{1:t-1} h1:t−1 以生成下一个潜在向量,计算捕获分布 p ( H t ∣ H S , H 1 : t − 1 ) p(H_t|H_S, H_{1:t-1}) p(Ht∣HS,H1:t−1) 和 p ^ ( H t ∣ H S , H 1 : t − 1 ) \hat{p}(H_t|H_S, H_{1:t-1}) p^(Ht∣HS,H1:t−1) 之间差异的损失:
L S = E s , x 1 : T ∼ p [ ∑ t ∥ h t − g X ( h S , h t − 1 , z t ) ∥ 2 ] L_S = \mathbb{E}_{s,x_{1:T} \sim p} \left[ \sum_t \|h_t - g_X(h_S, h_{t-1}, z_t)\|^2 \right] LS=Es,x1:T∼p[t∑∥ht−gX(hS,ht−1,zt)∥2]
优化过程如下:
min θ e , θ r ( λ L S + L R ) \min_{\theta_e, \theta_r}(\lambda L_S + L_R) θe,θrmin(λLS+LR)
min θ g ( η L S + max θ d L U ) \min_{\theta_g} (\eta L_S + \max_{\theta_d} L_U) θgmin(ηLS+θdmaxLU)
其中 λ \lambda λ 和 η \eta η 是超参数,用于平衡不同的损失项。
4. 代码实现
下面是TimeGAN的pytorch实现:
"""
该代码实现了TimeGAN的几个关键网络模块,包括编码器、恢复器、生成器、监督器和判别器。
"""
import torch
import torch.nn as nn
import torch.nn.init as init
def _weights_init(m):
"""
权重初始化函数:根据层类型初始化不同的权重。
"""
classname = m.__class__.__name__
if isinstance(m, nn.Linear):
init.xavier_uniform_(m.weight) # 使用Xavier初始化线性层的权重
m.bias.data.fill_(0) # 将偏置初始化为0
elif classname.find('Conv') != -1:
m.weight.data.normal_(0.0, 0.02) # 卷积层权重初始化
elif classname.find('Norm') != -1:
m.weight.data.normal_(1.0, 0.02) # 归一化层权重初始化
m.bias.data.fill_(0)
elif classname.find("GRU") != -1:
for name, param in m.named_parameters():
if 'weight_ih' in name:
init.xavier_uniform_(param.data) # 使用Xavier初始化GRU输入门权重
elif 'weight_hh' in name:
init.orthogonal_(param.data) # 使用正交初始化GRU隐藏层权重
elif 'bias' in name:
param.data.fill_(0) # 将偏置初始化为0
class Encoder(nn.Module):
"""
编码器:将原始特征空间嵌入到潜在空间中。
参数:
- input: 输入时间序列特征。(L, N, X) = (24, ?, 6)
- h3: (num_layers, N, H). [3, ?, 24]
返回:
- H: 嵌入表示
"""
def __init__(self, opt):
super(Encoder, self).__init__()
self.rnn = nn.GRU(input_size=opt.z_dim, hidden_size=opt.hidden_dim, num_layers=opt.num_layer)
# self.norm = nn.BatchNorm1d(opt.hidden_dim) # 批量归一化(被注释掉)
self.fc = nn.Linear(opt.hidden_dim, opt.hidden_dim)
self.sigmoid = nn.Sigmoid()
self.apply(_weights_init) # 应用权重初始化
def forward(self, input, sigmoid=True):
e_outputs, _ = self.rnn(input) # 输入时间序列进入GRU层
H = self.fc(e_outputs) # 全连接层映射
if sigmoid:
H = self.sigmoid(H) # 使用Sigmoid激活函数
return H
class Recovery(nn.Module):
"""
恢复器:将潜在空间还原到原始空间。
参数:
- H: 潜在表示
- T: 输入时间信息
返回:
- X_tilde: 恢复的数据
"""
def __init__(self, opt):
super(Recovery, self).__init__()
self.rnn = nn.GRU(input_size=opt.hidden_dim, hidden_size=opt.z_dim, num_layers=opt.num_layer)
# self.norm = nn.BatchNorm1d(opt.z_dim) # 批量归一化(被注释掉)
self.fc = nn.Linear(opt.z_dim, opt.z_dim)
self.sigmoid = nn.Sigmoid()
self.apply(_weights_init) # 应用权重初始化
def forward(self, input, sigmoid=True):
r_outputs, _ = self.rnn(input) # 输入潜在表示进入GRU层
X_tilde = self.fc(r_outputs) # 全连接层映射
if sigmoid:
X_tilde = self.sigmoid(X_tilde) # 使用Sigmoid激活函数
return X_tilde
class Generator(nn.Module):
"""
生成器:在潜在空间中生成时间序列数据。
参数:
- Z: 随机变量
- T: 输入时间信息
返回:
- E: 生成的嵌入
"""
def __init__(self, opt):
super(Generator, self).__init__()
self.rnn = nn.GRU(input_size=opt.z_dim, hidden_size=opt.hidden_dim, num_layers=opt.num_layer)
# self.norm = nn.LayerNorm(opt.hidden_dim) # 层归一化(被注释掉)
self.fc = nn.Linear(opt.hidden_dim, opt.hidden_dim)
self.sigmoid = nn.Sigmoid()
self.apply(_weights_init) # 应用权重初始化
def forward(self, input, sigmoid=True):
g_outputs, _ = self.rnn(input) # 输入随机变量进入GRU层
# g_outputs = self.norm(g_outputs) # 进行层归一化(被注释掉)
E = self.fc(g_outputs) # 全连接层映射
if sigmoid:
E = self.sigmoid(E) # 使用Sigmoid激活函数
return E
class Supervisor(nn.Module):
"""
监督器:使用前一个序列生成下一个序列。
参数:
- H: 潜在表示
- T: 输入时间信息
返回:
- S: 基于生成器生成的潜在表示的生成序列
"""
def __init__(self, opt):
super(Supervisor, self).__init__()
self.rnn = nn.GRU(input_size=opt.hidden_dim, hidden_size=opt.hidden_dim, num_layers=opt.num_layer)
# self.norm = nn.LayerNorm(opt.hidden_dim) # 层归一化(被注释掉)
self.fc = nn.Linear(opt.hidden_dim, opt.hidden_dim)
self.sigmoid = nn.Sigmoid()
self.apply(_weights_init) # 应用权重初始化
def forward(self, input, sigmoid=True):
s_outputs, _ = self.rnn(input) # 输入潜在表示进入GRU层
# s_outputs = self.norm(s_outputs) # 进行层归一化(被注释掉)
S = self.fc(s_outputs) # 全连接层映射
if sigmoid:
S = self.sigmoid(S) # 使用Sigmoid激活函数
return S
class Discriminator(nn.Module):
"""
判别器:区分原始和合成的时间序列数据。
参数:
- H: 潜在表示
- T: 输入时间信息
返回:
- Y_hat: 原始与合成时间序列的分类结果
"""
def __init__(self, opt):
super(Discriminator, self).__init__()
self.rnn = nn.GRU(input_size=opt.hidden_dim, hidden_size=opt.hidden_dim, num_layers=opt.num_layer)
# self.norm = nn.LayerNorm(opt.hidden_dim) # 层归一化(被注释掉)
self.fc = nn.Linear(opt.hidden_dim, opt.hidden_dim)
self.sigmoid = nn.Sigmoid()
self.apply(_weights_init) # 应用权重初始化
def forward(self, input, sigmoid=True):
d_outputs, _ = self.rnn(input) # 输入潜在表示进入GRU层
Y_hat = self.fc(d_outputs) # 全连接层映射
if sigmoid:
Y_hat = self.sigmoid(Y_hat) # 使用Sigmoid激活函数
return Y_hat
参考文献
[1] Yoon, J., Jarrett, D., & van der Schaar, M. (2019). Time-series Generative Adversarial Networks. arXiv preprint arXiv:1906.02691.
[2] Goodfellow, I., Pouget-Abadie, J., Mirza, M., Xu, B., Warde-Farley, D., Ozair, S., ... & Bengio, Y. (2014). Generative adversarial nets. Advances in neural information processing systems, 27.