torch.distribution
中包含了很多概率分布的实现,本文首先通过均匀分布来说明 Distribution 的具体用法, 然后再解释 von Mises-Fisher 分布的实现, 其公式推导见 von Mises-Fisher Distribution.
1. torch.distribution.Distribution
以下是 Uniform 的源码:
class Uniform(Distribution):
r"""
Generates uniformly distributed random samples from the half-open interval
``[low, high)``.
Example::
>>> m = Uniform(torch.tensor([0.0]), torch.tensor([5.0]))
>>> m.sample() # uniformly distributed in the range [0.0, 5.0)
>>> # xdoctest: +SKIP
tensor([ 2.3418])
Args:
low (float or Tensor): lower range (inclusive).
high (float or Tensor): upper range (exclusive).
"""
# TODO allow (loc,scale) parameterization to allow independent constraints.
arg_constraints = {
"low": constraints.dependent(is_discrete=False, event_dim=0),
"high": constraints.dependent(is_discrete=False, event_dim=0),
}
has_rsample = True
@property
def mean(self):
return (self.high + self.low) / 2
@property
def mode(self):
return nan * self.high
@property
def stddev(self):
return (self.high - self.low) / 12 ** 0.5
@property
def variance(self):
return (self.high - self.low).pow(2) / 12
def __init__(self, low, high, validate_args=None):
self.low, self.high = broadcast_all(low, high)
if isinstance(low, Number) and isinstance(high, Number):
batch_shape = torch.Size()
else:
batch_shape = self.low.size()
super().__init__(batch_shape, validate_args=validate_args)
if self._validate_args and not torch.lt(self.low, self.high).all():
raise ValueError("Uniform is not defined when low>= high")
def expand(self, batch_shape, _instance=None):
new = self._get_checked_instance(Uniform, _instance)
batch_shape = torch.Size(batch_shape)
new.low = self.low.expand(batch_shape)
new.high = self.high.expand(batch_shape)
super(Uniform, new).__init__(batch_shape, validate_args=False)
new._validate_args = self._validate_args
return new
@constraints.dependent_property(is_discrete=False, event_dim=0)
def support(self):
return constraints.interval(self.low, self.high)
def rsample(self, sample_shape=torch.Size()):
shape = self._extended_shape(sample_shape)
rand = torch.rand(shape, dtype=self.low.dtype, device=self.low.device)
return self.low + rand * (self.high - self.low)
def log_prob(self, value):
if self._validate_args:
self._validate_sample(value)
lb = self.low.le(value).type_as(self.low)
ub = self.high.gt(value).type_as(self.low)
return torch.log(lb.mul(ub)) - torch.log(self.high - self.low)
def cdf(self, value):
if self._validate_args:
self._validate_sample(value)
result = (value - self.low) / (self.high - self.low)
return result.clamp(min=0, max=1)
def icdf(self, value):
result = value * (self.high - self.low) + self.low
return result
def entropy(self):
return torch.log(self.high - self.low)
下面将依次从上到下进行解释:
1.1 首先是一个使用例子
import torch
from torch import distributions
m = distributions.Uniform(torch.tensor([0.0]), torch.tensor([5.0]))
s = m.sample()
print(s) # tensor([1.7908])
实际上 Uniform(0.0, 5.0)
也是可以的, 参数说明:
Args:
low (float or Tensor): lower range (inclusive).
high (float or Tensor): upper range (exclusive).
你也可以创建向量的均匀分布, 如:
m = distributions.Uniform(torch.tensor([0.0, 1.0]), torch.tensor([5.0, 1.01]))
s = m.sample()
print(s) # tensor([1.5399, 1.0046])
甚至可以 float
和 Tensor
混合:
m = distributions.Uniform(1.0, torch.tensor([5.0, 1.01]))
s = m.sample()
print(s) # tensor([2.4717, 1.0079])
这是因为 Uniform
中使用了 distributions.utils.broadcast_all(*values)
将参数进行了广播: 先将非 tensor 转化为 tensor, 再通过 torch.broadcast_tensors(*values)
函数进行广播. 本例相当于:
Uniform(torch.tensor([1.0, 1.0]), torch.tensor([5.0, 1.01]))
1.2 arg_constraints
arg_constraints = {
"low": constraints.dependent(is_discrete=False, event_dim=0),
"high": constraints.dependent(is_discrete=False, event_dim=0),
}
对参数做一些限制, 包括类型和范围等, 具体请参见 constraints.py. 这里大概是限制非离散吧, 看不太懂.
然后, class Distribution
构造函数中会对参数进行检查, 除非 validate_args=False
或者执行 Python 命令时加上 -O
. 因为各种分布都继承自 class Distribution
, 所以基本都会检查.
1.3 has_rsample=True
关于 Reparameterization Trick, 文心一言说:
Reparameterization Trick 的基本思想是将随机变量 Z Z Z 表达为某个确定性函数 g ( ϵ ) g(\epsilon) g(ϵ) 的形式,其中 ϵ \epsilon ϵ 是从一个简单的分布(如标准正态分布)中采样得到的。这样,我们可以将关于 Z Z Z 的梯度转化为关于 ϵ \epsilon ϵ 的梯度,而 ϵ \epsilon ϵ 的采样过程是确定的、可微分的。
例如,考虑从均值为 μ \mu μ、标准差为 σ \sigma σ 的正态分布中采样 Z Z Z 的情况。我们可以将 Z Z Z 重写为: Z = μ + σ ⋅ ϵ ⇔ ϵ = Z − μ σ [ 标准化 ] Z = \mu + \sigma \cdot \epsilon ~ \Leftrightarrow ~ \epsilon = \frac{Z-\mu}{\sigma}[标准化] Z=μ+σ⋅ϵ ⇔ ϵ=σZ−μ[标准化] 其中 ϵ \epsilon ϵ 是从标准正态分布中采样得到的。这样,我们就将随机采样 Z Z Z 的过程转化为了一个确定性函数 g ( ϵ ) = μ + σ ⋅ ϵ g(\epsilon) = \mu + \sigma \cdot \epsilon g(ϵ)=μ+σ⋅ϵ。现在,我们可以直接计算关于 ϵ \epsilon ϵ 的梯度,从而间接地得到关于 Z Z Z 的梯度。
其实这么表述有点绕, 大部分的博文也这么讲, 什么"关于 Z Z Z 的梯度"? “关于 ϵ \epsilon ϵ 的梯度”? 平时都是求关于普通变量的梯度, 咋还对随机变量求梯度了?
不过看有一些博文提到 GAN 网络求生成分布的事, 想一想大概就明白了: GAN 网络在寻求一个能生成类似训练数据的分布, 用 Z Z Z 表示服从该分布的随机变量, 想通过梯度优化完成这个求解过程, 就不恰当地表述为 “关于 Z Z Z 的梯度”[甚至有些人表述为"对采样过程的梯度"], 实际上是"分布的梯度", 因为优化过程中变化的是分布, 而不是 Z Z Z 在变化, 是 Z Z Z 服从的分布在变化.
我的理解是, 这个待求分布本身是无法直接表示的, 你不可能从一个未知的分布中采样, 即采样本身是无法实现的, 但它却可以间接地由更简单的、能表示出来的、能采样的分布表示出来, 如上面说的 Z ∼ N ( μ , σ 2 ) Z \sim N(\mu, \sigma^2) Z∼N(μ,σ2), 而 μ , σ \mu, \sigma μ,σ 都是未知的, 用参数化的方式表示为 μ + σ ⋅ ϵ \mu + \sigma \cdot \epsilon μ+σ⋅ϵ 后, 便能方便地从 N ( 0 , 1 ) N(0, 1) N(0,1) 采样, 并对参数 μ , σ \mu, \sigma μ,σ 求梯度以更新分布.
故, 表述为 “关于分布参数的梯度” 更好.
回到 Uniform
, 其 has_rsample=True
应该就是指其有 Reparameterization Trick, 下面的
def rsample(self, sample_shape=torch.Size()):
shape = self._extended_shape(sample_shape)
rand = torch.rand(shape, dtype=self.low.dtype, device=self.low.device)
return self.low + rand * (self.high - self.low)
就是其 Reparameterization 的过程, 通过对
U
(
0
,
1
)
U(0,1)
U(0,1) 的采样 + self.low + rand * (self.high - self.low)
的可微函数, 表示了对
U
(
l
o
w
,
h
i
g
h
)
U(low, high)
U(low,high) 的采样, 且, 如果
self.low = torch.tensor(init_low, requires_grad=True)
self.high = torch.tensor(init_high, requires_grad=True)
就可以通过梯度优化求解想要的 U ( l o w , h i g h ) U(low, high) U(low,high) 了.
1.4 概率分布的属性
包括: m e a n mean mean, m o d e mode mode, s t d std std, v a r i a n c e variance variance, e n t r o p y entropy entropy 等基本属性, 还有一些相关的函数:
- cumulative density/mass function
cdf(value)
; - inverse cumulative density/mass function
icdf(value)
;
这个函数非常有用, Inverse Transform Sampling 中用其进行采样. 从 U ( 0 , 1 ) U(0,1) U(0,1) 中采样一个 u u u, 然后令 x = F − 1 ( u ) x = F^{-1}(u) x=F−1(u) 就是所求随机变量 X X X 的一个采样. - log of the probability density/mass function
log_prob(value)
2. von Mises-Fisher 分布的实现
代码来源于EBSW, 有改动.
2.1 概述
import math
import torch
from torch import distributions
from torch.distributions import constraints
from torch.distributions.kl import register_kl
from torch.nn import functional as func
from hyperspherical_uniform import HypersphericalUniform
from ive import ive # 采样过程并没有用到 ive, 所以我扯那一拨关于 Bessel Function 的梯度问题并没有用.
class VonMisesFisher(distributions.Distribution):
"""
一般来说, 维度 p 固定了, 那么优化的参数就是 kappa 和 mu 了
mu 不在 Bessel Function Ip/2-1 中, 所以梯度计算简单, PyTorch 可自己搞定
kappa 是 Ip/2-1(k) 的参数, 不可导, 则计算梯度需要用户编写 autograd.Function
"""
arg_constraints = { # 对参数的一些限制, 如果 self.xxx 没有被设置为被限制的类型, 则报错
'loc': constraints.real,
'scale': constraints.positive,
}
support = constraints.real # 支撑集
has_rsample = True
_mean_carrier_measure = 0
def __init__(self, loc, scale, validate_args=None, k=20):
"""
:param loc: μ 待优化
:param scale: kappa, 集中参数
:param validate_args: 是否检查类型限制
:param k: 那这个 k 是啥? for sampling algorithm, 采样算法中用到的参数: 预采样个数
"""
self.dtype = loc.dtype
self.loc = loc
self.scale = scale
self.device = loc.device
self.__m = loc.shape[-1] # 维度(p)
# >>> 用于采样算法 >>>
self.__e1 = torch.Tensor([1.0] + [0] * (loc.shape[-1] - 1)).to(self.device) # [1, 0, 0, ...]
self.k = k
self.__normal = distributions.Normal(0, 1)
self.__uniform = distributions.Uniform(0, 1)
self.__beta = distributions.Beta(
torch.tensor((self.__m - 1) / 2, dtype=torch.float64),
torch.tensor((self.__m - 1) / 2, dtype=torch.float64)
)
self.__b, self.__a, self.__d = self._bad()
# <<< 用于采样算法 <<<
super(VonMisesFisher, self).__init__(loc.size(), validate_args=validate_args)
继承 distributions.Distribution
类, 设置了分布的一些参数, 并为 sampling algorithm 做了一些准备.
2.2 mean
@property
def mean(self):
# mean 不应该是 loc=μ 吗? hhh!!! mean 和 mean direction 不是一回事
value1 = ive(self.__m / 2, self.scale)
value2 = ive(self.__m / 2 - 1, self.scale)
Ap_kappa = value1 / value2 # 均值的长度 Ap_kappa = R = |mean(x_i)|
mean_value = Ap_kappa * self.loc
return mean_value
刚开始想当然地以为
μ
=
l
o
c
\mu = loc
μ=loc 就是
m
e
a
n
mean
mean, 其实不然, 这只是平均方向, 这里
m
e
a
n
=
1
N
∑
i
N
x
i
mean=\frac{1}{N}\sum_i^N \bm{x}_i
mean=N1∑iNxi, 叫 expected value.
其中
是均值的长度, 但为何如此? 维基百科也未说明, 于是进行了推导.
代码中的第一类修正 Bessel Function ive
见 Modified Bessel Function of the First Kind.
2.3 标准差 stddev
@property
def stddev(self):
"""
:return: 分布的标准差, 怎么可能是 scale 呢
"""
return self.scale
不太懂, 按理说计算应该是按公式来:
∫
球
∣
x
−
μ
∣
f
p
(
x
;
μ
,
κ
)
d
x
\int_{球} |\bm{x}-\bm{\mu}| f_p(\bm{x};\bm{\mu},\kappa)d\bm{x}
∫球∣x−μ∣fp(x;μ,κ)dx 咱也不会这种积分, 但从极限看, 当
κ
→
+
∞
\kappa \rightarrow +\infin
κ→+∞ 时, 形成
μ
\bm{\mu}
μ 处的狄拉克分布, 标准差为
0
0
0, 所以代码中把 self.scale
当作标准差肯定不对, 1/self.scale
还差不多.
2.4 entropy
def entropy(self):
apk = ive(self.__m / 2, self.scale) / ive((self.__m / 2) - 1, self.scale)
output = -self.scale * apk
return output.view(*(output.shape[:-1])) + self._log_normalization()
此处有详细的推导.
output = -self.scale * apk
已经计算了
−
κ
A
p
(
κ
)
-\kappa A_p(\kappa)
−κAp(κ), 而
−
l
o
g
C
p
(
κ
)
-log C_p(\kappa)
−logCp(κ) 的计算是:
def _log_normalization(self): # -logCp(kappa)
output = -(
(self.__m / 2 - 1) * torch.log(self.scale)
- (self.__m / 2) * math.log(2 * math.pi)
- (self.scale + torch.log(ive(self.__m / 2 - 1, self.scale)))
)
return output.view(*(output.shape[:-1]))
l
o
g
C
p
(
κ
)
=
l
o
g
(
κ
p
/
2
−
1
(
2
π
)
p
/
2
I
p
/
2
−
1
(
κ
)
)
=
(
p
2
−
1
)
l
o
g
(
κ
)
−
p
2
l
o
g
(
2
π
)
−
l
o
g
(
I
p
/
2
−
1
(
κ
)
)
\begin{aligned} log C_p(\kappa) &= log\left( \frac{\kappa^{p/2-1}}{(2\pi)^{p/2} I_{p/2-1}(\kappa)} \right) \\ &= (\frac{p}{2}-1)log\left( \kappa \right) - \frac{p}{2}log\left( 2\pi \right) - log\left( I_{p/2-1}(\kappa) \right) \end{aligned}
logCp(κ)=log((2π)p/2Ip/2−1(κ)κp/2−1)=(2p−1)log(κ)−2plog(2π)−log(Ip/2−1(κ)) 而代码中用的 ive
是
I
p
/
2
−
1
(
κ
)
∗
e
x
p
(
−
κ
)
I_{p/2-1}(\kappa) * exp(-\kappa)
Ip/2−1(κ)∗exp(−κ).
2.5 log_prob
def log_prob(self, x):
return self._log_unnormalized_prob(x) - self._log_normalization()
def _log_unnormalized_prob(self, x): # k<μ,x>
output = self.scale * (self.loc * x).sum(-1, keepdim=True)
return output.view(*(output.shape[:-1]))
概率密度函数的对数.
2.6 sampling
def sample(self, shape=torch.Size()):
with torch.no_grad(): # rsample 是 reparameterized sample, 便于梯度更新以调整分布参数
return self.rsample(shape)
reparameterized 与否采样过程都一样, 不一样的地方就在于有没有参数需要更新, 此处的 sample()
是不更新参数的.
def rsample(self, shape=torch.Size()):
"""
Reparameterized Sample: 从一个简单的分布通过一个参数化变换使得其满足一个更复杂的分布;
此处, loc 是可变参数, 通过 radial-tangential decomposition 采样;
梯度下降更新 loc, 以获得满足要求的 vMF.
:param shape: 样本的形状
:return: [shape|m] 的张量, shape 个 m 维方向向量
"""
shape = shape if isinstance(shape, torch.Size) else torch.Size(shape)
w = (
self._sample_w3(shape=shape)
if self.__m == 3
else self._sample_w_rej(shape=shape)
)
v = (
self.__normal.sample(torch.Size(shape + self.loc.shape))
.to(self.device)
.transpose(0, -1)[1:]
).transpose(0, -1)
v = func.normalize(v, dim=-1)
w_ = torch.sqrt(torch.clamp(1 - (w ** 2), 1e-10))
x = torch.cat((w, w_ * v), -1)
z = self._householder_rotation(x)
return z.type(self.dtype)
rsample
的意思是 reparameterized sampling, 不光要采样, 采样过程中会有待优化参数, 上面已经说过. 这个
v
M
F
vMF
vMF 的采样不算复杂, 主要是拒绝采样, 但推导起来相当麻烦, 感兴趣的见详细过程. 大概的过程是, 根据概率密度:
(其中
ν
=
p
2
−
1
\nu=\frac{p}{2}-1
ν=2p−1). 采样一个
t
t
t, 也就是代码中的 w
, 然后从均匀球上采样一个
p
−
1
p-1
p−1 维的单位向量
v
\bm{v}
v, 拼接
[
t
∣
1
−
t
2
v
]
[t|\sqrt{1-t^2}\bm{v}]
[t∣1−t2v], 就是一个
v
M
F
vMF
vMF 采样.
通过查找 self.loc
, 发现在采样过程中只有在最后的 Householder Transform 处出现, 可见其微分还是简单明了的, 下面会说.
当 p = 3 p=3 p=3 时, 可求得 f r a d i a l ( t ; κ , p ) f_{radial}(t;\kappa,p) fradial(t;κ,p) 的累积分布函数, 再通过 Inverse Transform Sampling 进行采样, 详情见 5.1 p = 3 p=3 p=3 时的 Inverse Transform Sampling. 代码如下:
def _sample_w3(self, shape: torch.Size):
shape = torch.Size(shape + self.scale.shape) # torch.Size 继承自 tuple, 其 + 运算就是连接操作
# https://en.wikipedia.org/wiki/Von_Mises%E2%80%93Fisher_distribution # 3-D sphere
u = self.__uniform.sample(shape).to(self.device)
w = 1 + torch.stack( # 这个公式是按 μ=(0,0,1) 计算的 w, arccosw=φ, 即 w=z
[ # 最后的旋转可能是旋转至按真正的 μ 采样结果
torch.log(u),
torch.log(1 - u) - 2 * self.scale
],
dim=0
).logsumexp(0) / self.scale
return w
Householder Transform
上面还只是采样了 μ = e 1 = [ 1 , 0 , ⋯ , 0 ] \bm{\mu}=e_1=[1,0, \cdots, 0] μ=e1=[1,0,⋯,0] 的情况, 那么一般的 μ \bm{\mu} μ 呢? 好办, 再旋转一下就好了吧?
def _householder_rotation(self, x):
# 关于 self.loc, 也许只在 rotation 的时候用了一下, 前面的采样估计是按
# 某个特定的 μ 进行采样, 采好之后, rotate 一下就相当于按 loc 采样了
# 所以说, 前面那一大坨的计算, 并不涉及 loc 的优化, 它们只是旋转前的 sample, 旋转才是对 loc 梯度有影响的
u = func.normalize(self.__e1 - self.loc, dim=-1)
z = x - 2 * (x * u).sum(-1, keepdim=True) * u
return z
高维情况
高维情况复杂一些. 无法求得
f
r
a
d
i
a
l
(
t
;
κ
,
p
)
f_{radial}(t;\kappa,p)
fradial(t;κ,p) 的累积分布函数, 那么只能用拒绝采样法了. 其数学推导见 5.2
p
>
3
p > 3
p>3 时的 Rejection Sampling. 最终的采样算法是:
兑成代码就是:
def _sample_w_rej(self, shape: torch.Size, eps=1e-20): # 所以这个也是求 z?
# matrix while loop: samples a matrix of [A, k] samples, to avoid looping all together
b, a, d = [
e.repeat(*shape, *([1] * len(self.scale.shape))).reshape(-1, 1)
for e in (self.__b, self.__a, self.__d)
]
w, e, bool_mask = (
torch.zeros_like(b).to(self.device),
torch.zeros_like(b).to(self.device),
torch.eq(torch.ones_like(b), 1).to(self.device)
)
sample_shape = torch.Size([b.shape[0], self.k])
shape = shape + torch.Size(self.scale.shape)
uniform = distributions.Uniform(0 + eps, 1 - eps)
while bool_mask.sum() != 0:
e_ = self.__beta.sample(sample_shape).to(self.device).type(self.dtype)
u = uniform.sample(sample_shape).to(self.device).type(self.dtype)
w_ = (1 - (1 + b) * e_) / (1 - (1 - b) * e_)
t = (2 * a * b) / (1 - (1 - b) * e_)
accept = ((self.__m - 1.0) * t.log() - t + d) > torch.log(u)
accept_idx = self.first_nonzero(accept, dim=-1, invalid_val=-1).unsqueeze(1)
accept_idx_clamped = accept_idx.clamp(0)
# we use .abs(), in order to not get -1 index issues, the -1 is still used afterward
w_ = w_.gather(1, accept_idx_clamped.view(-1, 1))
e_ = e_.gather(1, accept_idx_clamped.view(-1, 1))
reject = accept_idx < 0
accept = ~reject if torch.__version__ >= '1.2.0' else 1 - reject
w[bool_mask * accept] = w_[bool_mask * accept]
e[bool_mask * accept] = e_[bool_mask * accept]
bool_mask[bool_mask * accept] = reject[bool_mask * accept]
return w.reshape(shape)
@staticmethod
def first_nonzero(x, dim, invalid_val=-1):
mask = x > 0
idx = torch.where(
mask.any(dim=dim),
mask.float().argmax(dim=1).squeeze(),
torch.tensor(invalid_val, device=x.device)
)
return idx
def _bad(self):
c = torch.sqrt(4 * self.scale ** 2 + (self.__m - 1) ** 2)
b_true = (-2 * self.scale + c) / (self.__m - 1)
# using Taylor approximation with a smooth swift from 10 < scale < 11
# to avoid numerical errors for large scale
b_app = (self.__m - 1) / (4 * self.scale)
s = torch.min(
torch.max(
torch.tensor([0.0], dtype=self.dtype, device=self.device),
self.scale - 10,
),
torch.tensor([1.0], dtype=self.dtype, device=self.device)
)
b = b_app * s + b_true * (1 - s)
a = (self.__m - 1 + 2 * self.scale + c) / 4
d = (4 * a * b) / (1 + b) - (self.__m - 1) * math.log(self.__m - 1)
return b, a, d
2.7 注册 KL 散度的计算函数
@register_kl(VonMisesFisher, HypersphericalUniform)
def _kl_vmf_uniform(vmf, hyu):
return -vmf.entropy() + hyu.entropy() # √
关于注册器, 见 decorator & register.