论文及代码解读:Learning Latent Dynamics for Planning from Pixels
文章目录
- 论文及代码解读:Learning Latent Dynamics for Planning from Pixels
- 3. Recurrent State Space Model
- Latent dynamics
- Variational encoder
- Training objective
- Deterministic path
- 4. Latent Overshooting
- Limited capacity
- Multi-step prediction
- Latent overshooting
- 流程解读
- 1. Initialize dataset D with S random seed episodes.
- 2. Initialize model parameters θ randomly.
- 3. 开始做循环迭代
3. Recurrent State Space Model
For planning, we need to evaluate thousands of action sequences at every time step of the agent. Therefore, we use a recurrent state-space model (RSSM) that can predict forward purely in latent space, similar to recently proposed models
对于规划任务,我们在每个时间步内需要评估上千条动作序列(我的理解是每个时间步输出的不是确定性动作,而是动作分布,这就导致在每个时间步下会存在很多可以执行的动作)。因此,作者在这里提出了RSSM模型,这个模型可以在潜在空间上做纯粹的前向预测,与之前已提出的模型类似。
This model can be thought of as a non-linear Kalman filter or sequential VAE.
这个模型既可以认为是一种非线性卡尔曼滤波,也可以认为是一种“序列变分编码器”。
Instead of an extensive comparison to prior architectures, we highlight two findings that can guide future designs of dynamics models: our experiments show that both stochastic and deterministic paths in the transition model are crucial for successful planning.
与跟之前的结构做大量对比实验不同,作者重点提出能引导未来动力学模型设计的两个发现:在状态转移模型中,确定性和随机性“路径”同时存在,对规划任务的成功有重要帮助。
Latent dynamics
We consider sequences { o t , a t , r t } t = 1 T \{o_t, a_t , r_t \}^{T}_{t=1} {ot,at,rt}t=1T with discrete time step t t t, image observations o t o_t ot , continuous action vectors a t a_t at , and scalar rewards r t r_t rt .
考虑序列: { o t , a t , r t } t = 1 T \{o_t, a_t , r_t \}^{T}_{t=1} {ot,at,rt}t=1T,离散时间步 t t t、图片观测信息 o t o_t ot、连续动作向量 a t a_t at 和标量奖励 r t r_t rt。
A typical latent state-space model is shown in Figure 2b and resembles the structure of a partially observable Markov decision process.
典型的潜在状态空间模型在图2b中展示,这看起来类似与部分可观马尔可夫决策过程。
It defines the generative process of the images and rewards using a hidden state sequence { s t } t = 1 T \{s_t\}^{T}_{t=1} {st}t=1T ,where we assume a fixed initial state s 0 s_0 s0 without loss of generality.
图2b定义了一个一般情况下的使用隐藏状态序列 { s t } t = 1 T \{s_t\}^{T}_{t=1} {st}t=1T 的图片和奖励过程,不是一般性,可以假设存在一个固定初始状态 s 0 s_0 s0 。
图2b展示如下。
Latent dynamics model designs. In this example, the model observes the first two time steps and predicts the third. Circles represent stochastic variables and squares deterministic variables. Solid lines denote the generative process and dashed lines the inference model.
潜在动力学设计的情况示意图。模型在前两个时间步内做观测,并预测第三时间步。圆圈表示随机性变量而方块表示确定性变量。实线指带生成过程而虚线指带推理模型。
Transition model: s t ∼ p ( s t ∣ s t − 1 , a t − 1 ) s_t \sim p(s_t | s_{t−1} , a_{t−1} ) st∼p(st∣st−1,at−1)
Observation model: o t ∼ p ( o t ∣ s t ) o_t \sim p(o_t | s_t ) ot∼p(ot∣st)
Reward model: r t ∼ p ( r t ∣ s t ) r_t \sim p(r_t | s_t ) rt∼p(rt∣st)我的理解是,根据论文提出的三个模型,我们先不看虚线。遮住虚线后,可以看出两个特点:(1)状态 s 1 s_1 s1 、 s 2 s_2 s2 和 s 3 s_3 s3 都可以生成对应时间步的观测信息 o i , i = 1 , 2 , 3 o_{i},i=1,2,3 oi,i=1,2,3 和奖励信息 r i , i = 1 , 2 , 3 r_{i},i=1,2,3 ri,i=1,2,3 。(2)状态 s 1 s_1 s1 到状态 s 2 s_2 s2 需要动作信息 a 1 a_{1} a1 的参与,动作信息 a 1 a_{1} a1 可以想象成施加的“外力”。
The transition model is Gaussian with mean and variance parameterized by a feed-forward neural network, the observation model is Gaussian with mean parameterized by a deconvolutional neural network and identity covariance, and the reward model is a scalar Gaussian with mean parameterized by a feed-forward neural network and unit variance.
三者的模型都是用高斯分布表示。状态转移模型和奖励模型的高斯分布均值方差参数,都是用前馈神经网络计算出来;观测模型因为涉及到了图片,因此均值参数是反卷积神经网络计算而成,方差是对角单位协方差矩阵。
这部分代码是观测模型。状态空间和隐藏层空间都是若干维度的向量,那么重新生成模型,就考虑采用反卷积。
class ObservationModel(nn.Module):
"""
p(o_t | s_t, h_t)
Observation model to reconstruct image observation (3, 64, 64)
from state and rnn hidden state
"""
def __init__(self, state_dim, rnn_hidden_dim):
super(ObservationModel, self).__init__()
self.fc = nn.Linear(state_dim + rnn_hidden_dim, 1024)
self.dc1 = nn.ConvTranspose2d(1024, 128, kernel_size=5, stride=2)
self.dc2 = nn.ConvTranspose2d(128, 64, kernel_size=5, stride=2)
self.dc3 = nn.ConvTranspose2d(64, 32, kernel_size=6, stride=2)
self.dc4 = nn.ConvTranspose2d(32, 3, kernel_size=6, stride=2)
def forward(self, state, rnn_hidden):
hidden = self.fc(torch.cat([state, rnn_hidden], dim=1))
hidden = hidden.view(hidden.size(0), 1024, 1, 1)
hidden = F.relu(self.dc1(hidden))
hidden = F.relu(self.dc2(hidden))
hidden = F.relu(self.dc3(hidden))
obs = self.dc4(hidden)
return obs
接下来这部分代码是奖励模型。与观测模型比较相似。只是奖励数值为标量,只用线性神经网络表示就行,不用涉及图像的显示处理。编程时,hidden_dim
是网络的元素数量,而非论文的相关变量值。
class RewardModel(nn.Module):
"""
p(r_t | s_t, h_t)
Reward model to predict reward from state and rnn hidden state
"""
def __init__(self, state_dim, rnn_hidden_dim, hidden_dim=300, act=F.relu):
super(RewardModel, self).__init__()
self.fc1 = nn.Linear(state_dim + rnn_hidden_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, hidden_dim)
self.fc4 = nn.Linear(hidden_dim, 1)
self.act = act
def forward(self, state, rnn_hidden):
hidden = self.act(self.fc1(torch.cat([state, rnn_hidden], dim=1)))
hidden = self.act(self.fc2(hidden))
hidden = self.act(self.fc3(hidden))
reward = self.fc4(hidden)
return reward
反卷积神经网络,顾名思义,其性能与卷积神经网络相反。DNN 并不是通过卷积来降低来图像的维数,而是利用反卷积来创建图像,一般是从噪声中生成的。DNN 还经常用于寻找丢失的特征或信号,这些特征或信号以前可能被认为对卷积神经网络的任务并不重要。一个信号可能由于与其他信号卷积而丢失。信号的Deconvolution可以用于图像合成和分析。
https://zhuanlan.zhihu.com/p/372516381
Note that the log-likelihood under a Gaussian distribution with unit variance equals the mean squared error up to a constant.
注意,具有单位方差的高斯分布下的对数似然等于一个常数的均方误差。
Variational encoder
Since the model is non-linear, we cannot directly compute the state posteriors that are needed for parameter learning.
由于模型本身非线性,我们不能直接计算被参数学习所需要的状态后验分布。
Instead, we use an encoder q ( s 1 : T ∣ o 1 : T , a 1 : T ) = ∏ t = 1 T q ( s t ∣ s t − 1 , a t − 1 , o t ) q(s_{1:T} |o_{1:T} , a_{1:T} ) = \prod_{t=1}^{T} q(s_t | s_{t−1} , a_{t−1} , o_t ) q(s1:T∣o1:T,a1:T)=∏t=1Tq(st∣st−1,at−1,ot) to infer approximate state posteriors from past observations and actions, where q ( s t ∣ s t − 1 , a t − 1 , o t ) q(s_t | s_{t−1} , a_{t−1} , o_t ) q(st∣st−1,at−1,ot) is a diagonal Gaussian with mean and variance parameterized by a convolution neural network followed by a feed-forward neural network.
我们使用一个编码器来推断从过去观测信息和动作的近似的状态后验,其中 q ( s t ∣ s t − 1 , a t − 1 , o t ) q(s_t | s_{t−1} , a_{t−1} , o_t ) q(st∣st−1,at−1,ot) 是对角高斯分布,其均值和方差均由前馈神经网络+卷积神经网络参数化得到。
为什么需要后验分布,我的理解是:我们知道了上一个潜在空间的状态 s t − 1 s_{t-1} st−1 、当前时刻的观测 o t o_{t} ot 和上一时刻的动作信息 a t − 1 a_{t-1} at−1,如果是先验过程,那么应该是 o t ∼ p ( o t ∣ s t − 1 , a t − 1 , s t ) ≈ p ( o t ∣ h t − 1 , s t ) o_{t}\sim p(o_{t}|s_{t-1},a_{t-1},s_{t})\approx p(o_{t}|h_{t-1},s_{t}) ot∼p(ot∣st−1,at−1,st)≈p(ot∣ht−1,st) 其中 h t − 1 h_{t-1} ht−1 融合了过去的状态、动作和隐藏信息(有代码),这是一种前向的、先验的过程。但是现在当前潜在空间的状态 s t s_{t} st 未知,而我们的观测信息 o t o_{t} ot 已知,是后验信息,那么求出的 s t s_{t} st 的分布 p ( h t − 1 , o t ) p(h_{t-1},o_{t}) p(ht−1,ot) 就是后验分布。
def posterior(self, rnn_hidden, embedded_obs): """ Compute posterior q(s_t | h_t, o_t) """ hidden = self.act(self.fc_rnn_hidden_embedded_obs( torch.cat([rnn_hidden, embedded_obs], dim=1))) mean = self.fc_state_mean_posterior(hidden) stddev = F.softplus(self.fc_state_stddev_posterior(hidden)) + self._min_stddev return Normal(mean, stddev)
在代码中,后验分布的获得代码表示如上。
embedded_obs
是通过卷积编码后的观测信息;embedded_obs
和rnn_hidden
共同结合然后输入到一个全连接神经网络。再通过线性全连接转到均值和方差的相应维度。
We use the filtering posterior that conditions on past observations since we are ultimately interested in using the model for planning, but one may also use the full smoothing posterior during training.
作者使用具有滤波性质的、以过去观测为条件的后验分布,是因为作者最终需要这个模型来规划,但是其他的工作也可能在训练中使用这个完全平滑后验分布。
Training objective
Using the encoder, we construct a variational bound on the data log-likelihood. For simplicity, we write losses for predicting only the observations — the reward losses follow by analogy.
在使用编码器时候,我们构建了一个基于数据对数似然的变分界。简单地,作者构建预测损失时候只考虑了观测信息,奖励损失以此类推。
这个公式的推到在文章附录部分。但是根据不等式可以定性看出,这个变分下界是在最大化隐藏空间状态 s t s_t st 推断观测 o t o_t ot 的概率同时,尽量让序列的推断后验分布尽可能与状态转移概率一致。
Estimating the outer expectations using a single re-parameterized sample yields an efficient objective for inference and learning in non-linear latent variable models that can be optimized using gradient ascent.
使用简单的重参数化技巧来评估外在期望可以产生有效的实例目标,也可以学习用梯度下降优化的非线性潜在变量空间。
Deterministic path
Despite its generality, the purely stochastic transitions make it difficult for the transition model to reliably remember information for multiple time steps. In theory, this model could learn to set the variance
to zero for some state components, but the optimization procedure may not find this solution.
一般地,纯粹随机的状态转移过程让这个状态转移模型可靠地记住多步信息显得困难。理论上可以将一些状态信息的方差设置为0,但是这样的设置在优化过程中可能找不到解。
This motivates including a deterministic sequence of activation vectors { h t } t = 1 T \{h_t\}^{T}_{t=1} {ht}t=1T that allow the model to access not just the last state but all previous states deterministically.
这个模型存在一个确定性的激活向量 { h t } t = 1 T \{h_t\}^{T}_{t=1} {ht}t=1T 不仅能访问到上一时刻的状态还能访问到先前所有的状态。
Deterministic state model: h t = f ( h t − 1 , s t − 1 , a t − 1 ) h_t=f(h_{t-1},s_{t-1},a_{t-1}) ht=f(ht−1,st−1,at−1) ; Stochastic state model: s t ∼ p ( s t ∣ h t ) s_t\sim p(s_{t}|h_{t}) st∼p(st∣ht)Observation model: o t ∼ p ( o t ∣ s t , h t ) o_t\sim p(o_{t}|s_t,h_{t}) ot∼p(ot∣st,ht) ; Reward model: r t ∼ p ( r t ∣ s t , h t ) r_t\sim p(r_t|s_t,h_t) rt∼p(rt∣st,ht)
方块表示确定性信息、圆形框表示随机性信息。看的时候不要看虚线。
相比与图2b的模型,这个模型变成以确定性激活向量 { h t } t = 1 T \{h_t\}^{T}_{t=1} {ht}t=1T 为主导,他衍生出了随机性的状态信息、观测模型和奖励模型。确定性激活向量 { h t } t = 1 T \{h_t\}^{T}_{t=1} {ht}t=1T 序列的发展是确定性的,也就是通过函数输出一个准确的确定性激活向量,而非用概率分布采样。
f ( h t − 1 , s t − 1 , a t − 1 ) f(h_{t-1},s_{t-1},a_{t-1}) f(ht−1,st−1,at−1) is implemented as a recurrent neural network (RNN). Intuitively, we can understand this model as splitting the state into a stochastic part s t s_t st and a deterministic part h t h_t ht , which depend on the stochastic and deterministic parts at the previous time step through the RNN.
f ( h t − 1 , s t − 1 , a t − 1 ) f(h_{t-1},s_{t-1},a_{t-1}) f(ht−1,st−1,at−1) 采用了循环神经网络RNN,实际上我们可以把这个模型切分成随机性的部分 s t s_t st 和确定性部分 h t h_t ht ,这取决于先前时间步的RNN的确定性和随机性部分。
We use the encoder q ( s 1 : T ∣ o 1 : T , a 1 : T ) = ∏ t = 1 T q ( s t ∣ h t , o t ) q(s_{1:T} | o_{1:T} , a_{1:T} )=\prod_{t=1}^{T} q(s_t | h_t , o_t ) q(s1:T∣o1:T,a1:T)=∏t=1Tq(st∣ht,ot) to parameterize the approximate state posteriors.
这部分描述主要是介绍图2c里面的虚线部分。这里通过序列观测信息和序列动作信息来获得随机性状态 s 1 : T s_{1:T} s1:T 的分布。
Importantly, all information about the observations must pass through the sampling step of the encoder to avoid a deterministic shortcut from inputs to reconstructions.
关于观测的所有信息都必须经过编码器的采样过程来避免从输入到重构的确定性因素带来的不足。
上述代码表示的就是编码器网络。每层卷积后面用 relu()
激活函数,最后一层对数据重排列。
class Encoder(nn.Module):
"""
Encoder to embed image observation (3, 64, 64) to vector (1024,)
"""
def __init__(self):
super(Encoder, self).__init__()
self.cv1 = nn.Conv2d(3, 32, kernel_size=4, stride=2)
self.cv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
self.cv3 = nn.Conv2d(64, 128, kernel_size=4, stride=2)
self.cv4 = nn.Conv2d(128, 256, kernel_size=4, stride=2)
def forward(self, obs):
hidden = F.relu(self.cv1(obs))
hidden = F.relu(self.cv2(hidden))
hidden = F.relu(self.cv3(hidden))
embedded_obs = F.relu(self.cv4(hidden)).reshape(hidden.size(0), -1)
return embedded_obs
4. Latent Overshooting
This objective function contains reconstruction terms for the observations and KL divergence regularizes for the approximate posteriors.
(3) 的目标函数的重构项包含了观测信息、和估计后验的KL散度。
A limitation of this objective is that the stochastic path of the transition function p ( s t ∣ s t − 1 , a t − 1 ) p(s_t | s_{t−1} , a_{t−1} ) p(st∣st−1,at−1) is only trained via the KL-divergence regularizes for one-step predictions: the gradient flows through p ( s t ∣ s t − 1 , a t − 1 ) p(s_t | s_{t−1} , a_{t−1} ) p(st∣st−1,at−1) directly into q ( s t − 1 ) q(s_{t−1}) q(st−1) but never traverses a chain of multiple p ( s t ∣ s t − 1 , a t − 1 ) p(s_t | s_{t−1} , a_{t−1} ) p(st∣st−1,at−1).
这个目标的不足之处是,状态转移的随机性路径在一步预测中只通过KL散度正则化训练:梯度流经过 p ( s t ∣ s t − 1 , a t − 1 ) p(s_t | s_{t−1} , a_{t−1} ) p(st∣st−1,at−1) 直接达到了 q ( s t − 1 ) q(s_{t−1}) q(st−1) 但是没有遍历多步状态转移链。
In this section, we generalize this variational bound to latent overshooting, which trains all multi-step predictions in latent space.
在这部分中,我们泛化这个变分界,形成 latent overshooting 概念,在潜在空间中训练多步预测。
Limited capacity
If we could train our model to make perfect one-step predictions, it would also make perfect multi-step predictions, so this would not be a problem. However, when using a model with limited capacity and restricted distributional family, training the model only on one-step predictions until convergence does in general not coincide with the model that is best at multi-step predictions.
如果我们能训练模型做出完美的一步预测,那么它也能做出完美的多步预测,所以这不会是一个问题。然而,当使用容量有限且分布族有限的模型时,仅在一步预测上训练模型直到收敛,通常与擅长多步预测的模型不一致。
Therefore, we take inspiration from Amos et al. (2018) and earlier related ideas (Krishnan et al., 2015; Lamb et al., 2016; Chiappa et al., 2017), and train the model on multi-step predictions of all distances. We develop this idea for latent sequence models, showing that multi-step predictions can be improved by a loss in latent space, without having to generate additional images.
因此,作者从早期学者的相关思想受到启发,并训练模型所有距离的多步长预测。我们发展这个思想为潜在序列模型,表明多步骤预测可以通过优化潜在空间的损失来改善,而不必生成额外的图像。
Multi-step prediction
We start by generalizing the standard variational bound (Equation 3) from training one-step predictions to training multi-step predictions of a fixed distance d d d.
作者开始泛化这个变分界从单步预测到固定距离 d d d 的多步预测。
For ease of notation, we omit actions in the conditioning set here; every distribution over s t s_t st is conditioned upon a < t a_{<t} a<t .
为了简化表达,在条件中的序列动作做了一些省略。每个 s t s_t st 的分布以 a < t a_{<t} a<t 为条件。
We first define multi-step predictions, which are computed by repeatedly applying the transition model and integrating out the intermediate states.
作者首先定义了多步预测,通过应用转移模型重复计算并集成当前时刻的状态信息。
当且仅当 d = 1 d=1 d=1 时候表示单步预测。
接着就能将公式泛化到多步预测上:
根据琴声不等式子:这样的目标可以用两个方面表达:1) 最大化当前随机状态预测当前观测的概率;2) 最小化多步过程中的预测。
Maximizing this objective trains the multi-step predictive distribution. This reflects the fact that during planning, the model makes predictions without having access to all the preceding observations.
最大化这一目标可以训练多步预测分布。这反映了这样一个事实,即在规划过程中,模型在没有访问所有先前观测的情况下进行预测。
Since the latent state sequence is Markovian, for d ≥ 1 d ≥ 1 d≥1 we have I ( s t ; s t − d ) ≤ I ( s t ; s t − 1 ) I(s_t ; s_{t−d} ) ≤ I(s_t ; s_{t−1} ) I(st;st−d)≤I(st;st−1) and thus E [ ln p d ( o 1 : T ) ] ≤ E [ ln p ( o 1 : T ) ] E[\ln p_{d} (o_{1:T} )] ≤ E[\ln p(o_{1:T} )] E[lnpd(o1:T)]≤E[lnp(o1:T)]. Hence, every bound on the multi-step predictive distribution is also a bound on the one-step predictive distribution in expectation over the data set.
因为在潜在空间上的状态序列也具有马尔可夫性,因此在距离值 d ≥ 1 d ≥ 1 d≥1 存在 I ( s t ; s t − d ) ≤ I ( s t ; s t − 1 ) I(s_t ; s_{t−d} ) ≤ I(s_t ; s_{t−1} ) I(st;st−d)≤I(st;st−1) 互信息的差异,由此可以推导出 E [ ln p d ( o 1 : T ) ] ≤ E [ ln p ( o 1 : T ) ] E[\ln p_{d} (o_{1:T} )] ≤ E[\ln p(o_{1:T} )] E[lnpd(o1:T)]≤E[lnp(o1:T)] 。因此在数据集上,多步预测的分布下界也是单步预测的下界。
Latent overshooting
We introduced a bound on predictions of a given distance d d d. However, for planning we need accurate predictions not just for a fixed distance but for all distances up to the planning horizon. We introduce latent overshooting for this, an objective function for latent sequence models that generalizes the standard variation abound (Equation 3) to train the model on multi-step predictions of all distances 1 ≤ d ≤ D 1 ≤ d ≤ D 1≤d≤D.
作者引入了给定距离 d d d 的预测界。然而,为了进行规划,不仅需要对固定距离进行准确预测,还需要对规划范围内的所有距离进行精确预测。为此,我们引入了 Latent overshooting,这是潜在序列模型的一个目标函数,它推广了大量的标准变化(方程3),以训练模型对所有距离 1 ≤ d ≤ D 1≤d≤D 1≤d≤D 的多步预测。
Latent overshooting can be interpreted as a regularizer in latent space that encourages consistency between one-step and multi-step predictions, which we know should be equivalent in expectation over the data set.
Latent overshooting 可以被解释为潜在空间中的正则化子,它鼓励一步和多步预测之间的一致性,我们知道这在数据集上的期望应该是相等的。
We include weighting factors { β d } d = 1 D \{β_d\}^{D}_{d=1} {βd}d=1D analogously to the β β β-VAE. While we set all β > 1 β >1 β>1 to the same value for simplicity, they could be chosen to let the model focus more on long-term or short-term predictions.
我们包括加权因子 { β d } d = 1 D \{β_d\}^{D}_{d=1} {βd}d=1D,类似于 β β β-VAE。虽然为了简单起见,我们将所有的 β > 1 β>1 β>1 设置为相同的值,但可以选择它们来让模型更多地关注长期或短期预测。
In practice, we stop gradients of the posterior distributions for overshooting distances d > 1 d > 1 d>1, so that the multi-step predictions are trained towards the informed posteriors, but not the other way around.
在实践中,对于超调距离 d > 1 d>1 d>1,我们停止后验分布的梯度,以便多步骤预测朝着知情的后验进行训练,而不是相反。
接下来的代码就是 RSSM 类模型代码。方便表示,再把这张图拿出来 _
-
__init__
函数:初始化了确定性状态、随机性状态以及动作信息的维度。定义了一些网络和常量。 -
forward
函数:返回了用于前向推理的 p ( s t + 1 ∣ h t + 1 ) p(s_{t+1} | h_{t+1}) p(st+1∣ht+1) 和后验计算 p ( s t + 1 ∣ h t + 1 , o t + 1 ) p(s_{t+1} | h_{t+1}, o_{t+1}) p(st+1∣ht+1,ot+1)。 -
prior
函数:计算前向推理 p ( s t + 1 ∣ h t + 1 ) p(s_{t+1} | h_{t+1}) p(st+1∣ht+1) ,首先这个函数先用self.fc_state_action()
方法将随机性信息和动作信息结合起来,生成一个“临时”的隐藏层变量hidden
。然后再和rnn_hidden
张量结合,输入到self.rnn
方法中,获得新的rnn_hidden
张量,最后通过线性网络+激活函数还原为确定性的张量hidden
。完成了论文里面的 h t + 1 h_{t+1} ht+1 的产生。最后将这个确定性的张量hidden
输入到self.fc_state_mean_prior
和self.fc_state_stddev_prior
方法中,获得这个分布 p ( s t + 1 ∣ h t + 1 ) p(s_{t+1} | h_{t+1}) p(st+1∣ht+1) 。这个函数最后返回了正态分布 p ( s t + 1 ∣ h t + 1 ) p(s_{t+1} | h_{t+1}) p(st+1∣ht+1) 以及循环网络的变量
rnn_hidden
。 -
posterior
函数:将循环网络的变量rnn_hidden
与通过卷积编码器降维成的图像向量相结合,生成中间层变量hidden
,最后通过self.fc_state_mean_posterior
和self.fc_state_stddev_posterior
输出后验的分布。 -
小细节:
softplus
函数的数学表达式为 y = log ( 1 + e x ) y=\log(1+e^{x}) y=log(1+ex)。softplus
可以看作是ReLu
的平滑。根据神经科学家的相关研究,softplus
和ReLu
与脑神经元激活频率函数有神似的地方。也就是说,相比于早期的激活函数,softplus
和ReLu
更加接近脑神经元的激活模型。https://blog.csdn.net/Suan2014/article/details/77162042
class RecurrentStateSpaceModel(nn.Module):
"""
This class includes multiple components
Deterministic state model: h_t+1 = f(h_t, s_t, a_t)
Stochastic state model (prior): p(s_t+1 | h_t+1)
State posterior: q(s_t | h_t, o_t)
NOTE: actually, this class takes embedded observation by Encoder class
min_stddev is added to stddev same as original implementation
Activation function for this class is F.relu same as original implementation
"""
def __init__(self, state_dim, action_dim, rnn_hidden_dim,
hidden_dim=200, min_stddev=0.1, act=F.relu):
super(RecurrentStateSpaceModel, self).__init__()
self.state_dim = state_dim
self.action_dim = action_dim
self.rnn_hidden_dim = rnn_hidden_dim
self.fc_state_action = nn.Linear(state_dim + action_dim, hidden_dim)
self.fc_rnn_hidden = nn.Linear(rnn_hidden_dim, hidden_dim)
self.fc_state_mean_prior = nn.Linear(hidden_dim, state_dim)
self.fc_state_stddev_prior = nn.Linear(hidden_dim, state_dim)
self.fc_rnn_hidden_embedded_obs = nn.Linear(rnn_hidden_dim + 1024, hidden_dim)
self.fc_state_mean_posterior = nn.Linear(hidden_dim, state_dim)
self.fc_state_stddev_posterior = nn.Linear(hidden_dim, state_dim)
self.rnn = nn.GRUCell(hidden_dim, rnn_hidden_dim)
self._min_stddev = min_stddev
self.act = act
def forward(self, state, action, rnn_hidden, embedded_next_obs):
"""
h_t+1 = f(h_t, s_t, a_t)
Return prior p(s_t+1 | h_t+1) and posterior p(s_t+1 | h_t+1, o_t+1)
for model training
"""
next_state_prior, rnn_hidden = self.prior(state, action, rnn_hidden)
next_state_posterior = self.posterior(rnn_hidden, embedded_next_obs)
return next_state_prior, next_state_posterior, rnn_hidden
def prior(self, state, action, rnn_hidden):
"""
h_t+1 = f(h_t, s_t, a_t)
Compute prior p(s_t+1 | h_t+1)
"""
hidden = self.act(self.fc_state_action(torch.cat([state, action], dim=1)))
rnn_hidden = self.rnn(hidden, rnn_hidden)
hidden = self.act(self.fc_rnn_hidden(rnn_hidden))
mean = self.fc_state_mean_prior(hidden)
stddev = F.softplus(self.fc_state_stddev_prior(hidden)) + self._min_stddev
return Normal(mean, stddev), rnn_hidden
def posterior(self, rnn_hidden, embedded_obs):
"""
Compute posterior q(s_t | h_t, o_t)
"""
hidden = self.act(self.fc_rnn_hidden_embedded_obs(
torch.cat([rnn_hidden, embedded_obs], dim=1)))
mean = self.fc_state_mean_posterior(hidden)
stddev = F.softplus(self.fc_state_stddev_posterior(hidden)) + self._min_stddev
return Normal(mean, stddev)
优化过程初始化代码:
这个项目里面的网络参数都是集合好然后统一优化的。学习率是 0.001,eps (float, optional)
:为了提高数值稳定性而添加到分母的一个项(默认:
1
e
−
8
1 e-8
1e−8)。
all_params = (list(encoder.parameters()) +
list(rssm.parameters()) +
list(obs_model.parameters()) +
list(reward_model.parameters()))
optimizer = Adam(all_params, lr=args.lr, eps=args.eps)
流程解读
训练的伪代码。
1. Initialize dataset D with S random seed episodes.
# collect initial experience with random action
for episode in range(args.seed_episodes):
obs = env.reset()
done = False
while not done:
action = env.action_space.sample()
next_obs, reward, done, _ = env.step(action)
replay_buffer.push(obs, action, reward, done)
obs = next_obs
2. Initialize model parameters θ randomly.
# define models and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
encoder = Encoder().to(device)
rssm = RecurrentStateSpaceModel(args.state_dim,
env.action_space.shape[0],
args.rnn_hidden_dim).to(device)
obs_model = ObservationModel(args.state_dim, args.rnn_hidden_dim).to(device)
reward_model = RewardModel(args.state_dim, args.rnn_hidden_dim).to(device)
all_params = (list(encoder.parameters()) +
list(rssm.parameters()) +
list(obs_model.parameters()) +
list(reward_model.parameters()))
optimizer = Adam(all_params, lr=args.lr, eps=args.eps)
3. 开始做循环迭代
这里补充 CEMAgent()
类。下面这个代码是图像预处理函数,此外增加了一些噪声。
def preprocess_obs(obs, bit_depth=5):
"""
Reduces the bit depth of image for the ease of training
and convert to [-0.5, 0.5]
In addition, add uniform random noise same as original implementation
"""
obs = obs.astype(np.float32)
reduced_obs = np.floor(obs / 2 ** (8 - bit_depth))
normalized_obs = reduced_obs / 2**bit_depth - 0.5
normalized_obs += np.random.uniform(0.0, 1.0 / 2**bit_depth, normalized_obs.shape)
return normalized_obs
def __call__(self, obs)
的含义是:在类实例化之后,可以把实例当作函数直接调用这个方法。
这里运用到了CEM方法,把伪代码图片补充上来。迭代过程的基本思路是:根据动作分布采样候选动作,执行每个候选动作得到各自的及时奖励,然后评估最大的奖励,将最大奖励对应的动作收集起来生成新的动作分布。然后一直开环执行,执行多个步。
class CEMAgent:
"""
Action planning by Cross Entropy Method (CEM) in learned RSSM Model
"""
def __init__(self, encoder, rssm, reward_model,
horizon, N_iterations, N_candidates, N_top_candidates):
self.encoder = encoder
self.rssm = rssm
self.reward_model = reward_model
self.horizon = horizon # 12
self.N_iterations = N_iterations # 10
self.N_candidates = N_candidates # 1000
self.N_top_candidates = N_top_candidates # 100
self.device = next(self.reward_model.parameters()).device
self.rnn_hidden = torch.zeros(1, rssm.rnn_hidden_dim, device=self.device)
def __call__(self, obs):
# Preprocess observation and transpose for torch style (channel-first)
obs = preprocess_obs(obs)
obs = torch.as_tensor(obs, device=self.device)
obs = obs.transpose(1, 2).transpose(0, 1).unsqueeze(0)
with torch.no_grad():
# Compute starting state for planning
# while taking information from current observation (posterior)
embedded_obs = self.encoder(obs)
# 先根据RNN隐藏层信息和编码后的观测信息计算状态的后验分布。
state_posterior = self.rssm.posterior(self.rnn_hidden, embedded_obs)
# Initialize action distribution
# 对应于论文的伪代码"Initialize factorized belief over action sequences q(a t:t+H # ) ← Normal(0, I)."
action_dist = Normal(
torch.zeros((self.horizon, self.rssm.action_dim), device=self.device),
torch.ones((self.horizon, self.rssm.action_dim), device=self.device)
)
# Iteratively improve action distribution with CEM
for itr in range(self.N_iterations):
# Sample action candidates and transpose to
# (self.horizon, self.N_candidates, action_dim) for parallel exploration
action_candidates = \
action_dist.sample([self.N_candidates]).transpose(0, 1)
# Initialize reward, state, and rnn hidden state
# The size of state is (self.N_acndidates, state_dim)
# The size of rnn hidden is (self.N_candidates, rnn_hidden_dim)
# These are for parallel exploration
total_predicted_reward = torch.zeros(self.N_candidates, device=self.device)
state = state_posterior.sample([self.N_candidates]).squeeze()
rnn_hidden = self.rnn_hidden.repeat([self.N_candidates, 1])
# Compute total predicted reward by open-loop prediction using prior
for t in range(self.horizon):
next_state_prior, rnn_hidden = \
self.rssm.prior(state, action_candidates[t], rnn_hidden)
# 此处 state 一直被赋值,因此是多步开环的预测
state = next_state_prior.sample()
total_predicted_reward += self.reward_model(state, rnn_hidden).squeeze()
# update action distribution using top-k samples
top_indexes = \
total_predicted_reward.argsort(descending=True)[: self.N_top_candidates]
top_action_candidates = action_candidates[:, top_indexes, :]
mean = top_action_candidates.mean(dim=1)
stddev = (top_action_candidates - mean.unsqueeze(1)
).abs().sum(dim=1) / (self.N_top_candidates - 1)
action_dist = Normal(mean, stddev)
# Return only first action (replan each state based on new observation)
action = mean[0]
# update rnn hidden state for next step planning
with torch.no_grad():
_, self.rnn_hidden = self.rssm.prior(state_posterior.sample(),
action.unsqueeze(0),
self.rnn_hidden)
return action.cpu().numpy()
def reset(self):
self.rnn_hidden = torch.zeros(1, self.rssm.rnn_hidden_dim, device=self.device)
接下来是训练过程。通过 cem_agent
方法获得一个确定的动作,然后再加了随机噪声。与环境交互的数据放在经验池中。
# main training loop
for episode in range(args.seed_episodes, args.all_episodes):
# collect experiences
start = time.time()
cem_agent = CEMAgent(encoder, rssm, reward_model,
args.horizon, args.N_iterations,
args.N_candidates, args.N_top_candidates)
obs = env.reset()
done = False
total_reward = 0
while not done:
action = cem_agent(obs)
action += np.random.normal(0, np.sqrt(args.action_noise_var),
env.action_space.shape[0])
next_obs, reward, done, _ = env.step(action)
replay_buffer.push(obs, action, reward, done)
obs = next_obs
total_reward += reward
writer.add_scalar('total reward at train', total_reward, episode)
print('episode [%4d/%4d] is collected. Total reward is %f' %
(episode + 1, args.all_episodes, total_reward))
print('elasped time for interaction: %.2fs' % (time.time() - start))
接下来开始进行模型的训练:
# update model parameters
start = time.time()
for update_step in range(args.collect_interval):
observations, actions, rewards, _ = \
replay_buffer.sample(args.batch_size, args.chunk_length)
# preprocess observations and transpose tensor for RNN training
observations = preprocess_obs(observations)
observations = torch.as_tensor(observations, device=device)
observations = observations.transpose(3, 4).transpose(2, 3)
observations = observations.transpose(0, 1)
actions = torch.as_tensor(actions, device=device).transpose(0, 1)
rewards = torch.as_tensor(rewards, device=device).transpose(0, 1)
# embed observations with CNN
embedded_observations = encoder(
observations.reshape(-1, 3, 64, 64)).view(args.chunk_length, args.batch_size, -1)
# prepare Tensor to maintain states sequence and rnn hidden states sequence
states = torch.zeros(
args.chunk_length, args.batch_size, args.state_dim, device=device)
rnn_hiddens = torch.zeros(
args.chunk_length, args.batch_size, args.rnn_hidden_dim, device=device)
# initialize state and rnn hidden state with 0 vector
state = torch.zeros(args.batch_size, args.state_dim, device=device)
rnn_hidden = torch.zeros(args.batch_size, args.rnn_hidden_dim, device=device)
# compute state and rnn hidden sequences and kl loss
# 计算先验分布和后验分布的KL散度,而且是平均散度
kl_loss = 0
for l in range(args.chunk_length - 1):
next_state_prior, next_state_posterior, rnn_hidden = \
rssm(state, actions[l], rnn_hidden, embedded_observations[l + 1])
state = next_state_posterior.rsample()
states[l + 1] = state
rnn_hiddens[l + 1] = rnn_hidden
kl = kl_divergence(next_state_prior, next_state_posterior).sum(dim=1)
kl_loss += kl.clamp(min=args.free_nats).mean()
kl_loss /= (args.chunk_length - 1)
# compute reconstructed observations and predicted rewards
# 前向计算观测模型和奖励模型
flatten_states = states.view(-1, args.state_dim)
flatten_rnn_hiddens = rnn_hiddens.view(-1, args.rnn_hidden_dim)
recon_observations = obs_model(flatten_states, flatten_rnn_hiddens).view(
args.chunk_length, args.batch_size, 3, 64, 64)
predicted_rewards = reward_model(flatten_states, flatten_rnn_hiddens).view(
args.chunk_length, args.batch_size, 1)
# compute loss for observation and reward
obs_loss = 0.5 * mse_loss(
recon_observations[1:], observations[1:], reduction='none').mean([0, 1]).sum()
reward_loss = 0.5 * mse_loss(predicted_rewards[1:], rewards[:-1])
# add all losses and update model parameters with gradient descent
loss = kl_loss + obs_loss + reward_loss
optimizer.zero_grad()
loss.backward()
clip_grad_norm_(all_params, args.clip_grad_norm)
optimizer.step()
# print losses and add tensorboard
print('update_step: %3d loss: %.5f, kl_loss: %.5f, obs_loss: %.5f, reward_loss: % .5f'
% (update_step + 1,
loss.item(), kl_loss.item(), obs_loss.item(), reward_loss.item()))
total_update_step = episode * args.collect_interval + update_step
writer.add_scalar('overall loss', loss.item(), total_update_step)
writer.add_scalar('kl loss', kl_loss.item(), total_update_step)
writer.add_scalar('obs loss', obs_loss.item(), total_update_step)
writer.add_scalar('reward loss', reward_loss.item(), total_update_step)
print('elasped time for update: %.2fs' % (time.time() - start))
接下来是测试环节:
# test to get score without exploration noise
if (episode + 1) % args.test_interval == 0:
start = time.time()
cem_agent = CEMAgent(encoder, rssm, reward_model,
args.horizon, args.N_iterations,
args.N_candidates, args.N_top_candidates)
obs = env.reset()
done = False
total_reward = 0
while not done:
action = cem_agent(obs)
obs, reward, done, _ = env.step(action)
total_reward += reward
writer.add_scalar('total reward at test', total_reward, episode)
print('Total test reward at episode [%4d/%4d] is %f' %
(episode + 1, args.all_episodes, total_reward))
print('elasped time for test: %.2fs' % (time.time() - start))