对象创建:
model = LLGC(description.size(1), label.max().item()+1, args.drop_out, args.use_bias).to(device)
模型使用:
output = model(train_features)
LLGC:
# Lorentzian MODEL
class LLGC(nn.Module):
def __init__(self, nfeat, nclass, drop_out, use_bias):
super(LLGC, self).__init__()
self.drop_out = drop_out
self.use_bias = use_bias
self.nclass = nclass
self.c = torch.tensor([1.0]).to("cuda")
self.manifold = getattr(manifolds_LLGC, "Lorentzian")()
#创建了manifolds中的一个lorentzian类的对象,赋值给self.manifold
self.W = LorentzLinear(self.manifold, nfeat, nclass, self.c, self.drop_out, self.use_bias)
def forward(self, x, batch_size):
x_loren = self.manifold.normalize_input(x, self.c)
#normalize_input操作内部带有对数映射,self.c为曲率。x_loren为对数映射后的结果
x_w = self.W(x_loren)
x_tan = self.manifold.log_map_zero(x_w, self.c)
return x_tan[:batch_size]
- 欧式空间中的点到流形的映射
- 计算
- 流形映射到欧式空间
1. 欧式空间中的点到流形的映射
目标是使用self.manifold.normalize_input将欧式空间的目标特征x映射到流形上,返回x_loren
x_loren = self.manifold.normalize_input(x, self.c)
创建一个全零张量。
将全零张量和输入张量拼接,增加一个额外的维度。
调用exp_map_zero。
def normalize_input(self, x, c):
# print('=====normalize original input===========')
num_nodes = x.size(0)
zeros = torch.zeros(num_nodes, 1, dtype=x.dtype, device=x.device)
x_tan = torch.cat((zeros, x), dim=1)
return self.exp_map_zero(x_tan, c)
创建流形上的基点
def exp_map_zero(self, dp, c, is_res_normalize=True, is_dp_normalize=True):
zeros = torch.zeros_like(dp)
zeros[:, 0] = c ** 0.5
return self.exp_map_x(zeros, dp, c, is_res_normalize, is_dp_normalize)
exp_map_x 方法通过指数映射将一个点 p 和切向量 dp 映射回洛伦兹流形上。
首先规范化切向量 dp。
然后计算其洛伦兹范数。
接着通过指数映射公式将其映射到流形上,并可选地对结果进行规范化。
def exp_map_x(self, p, dp, c, is_res_normalize=True, is_dp_normalize=True):
if is_dp_normalize:
dp = self.normalize_tangent(p, dp, c)
dp_lnorm = self.l_inner(dp, dp, keep_dim=True)
dp_lnorm = torch.sqrt(torch.clamp(dp_lnorm + self.eps[p.dtype], 1e-6))
dp_lnorm_cut = torch.clamp(dp_lnorm, max=50)
sqrt_c = c ** 0.5
res = (torch.cosh(dp_lnorm_cut / sqrt_c) * p) + sqrt_c * (torch.sinh(dp_lnorm_cut / sqrt_c) * dp / dp_lnorm)
if is_res_normalize:
res = self.normalize(res, c)
return res
normalize_tangent 方法的目的是规范化洛伦兹流形上切向量,使其满足洛伦兹内积 <p, p_tan>_L = 0
def normalize_tangent(self, p, p_tan, c):
"""
Normalize tangent vectors to place the vectors satisfies <p, p_tan>_L=0
:param p: the tangent spaces at p. size:[nodes, feature]
:param p_tan: the tangent vector in tangent space at p
"""
d = p_tan.size(1) - 1
p_tail = p.narrow(1, 1, d)
p_tan_tail = p_tan.narrow(1, 1, d)
ptpt = torch.sum(p_tail * p_tan_tail, dim=1, keepdim=True)
p_head = torch.sqrt(c + torch.sum(torch.pow(p_tail, 2), dim=1, keepdim=True) + self.eps[p.dtype])
return torch.cat((ptpt / p_head, p_tan_tail), dim=1)
计算内积
def l_inner(self, x, y, keep_dim=False):
# input shape [node, features]
d = x.size(-1) - 1
xy = x * y
xy = torch.cat((-xy.narrow(1, 0, 1), xy.narrow(1, 1, d)), dim=1)
return torch.sum(xy, dim=1, keepdim=keep_dim)
目的是将一个向量 p 规范化,以确保它位于双曲面上。
这个过程可以理解为确保该向量符合双曲空间的几何结构。
def normalize(self, p, c):
"""
Normalize vector to confirm it is located on the hyperboloid
:param p: [nodes, features(d + 1)]
:param c: parameter of curvature
"""
d = p.size(-1) - 1
narrowed = p.narrow(-1, 1, d)
if self.max_norm:
narrowed = torch.renorm(narrowed.view(-1, d), 2, 0, self.max_norm)
first = c + torch.sum(torch.pow(narrowed, 2), dim=-1, keepdim=True)
first = torch.sqrt(first)
return torch.cat((first, narrowed), dim=1)
2. 计算
x_w = self.W(x_loren)
其中LorentzLinear的类定义如下:
class LorentzLinear(nn.Module):
# Lorentz Hyperbolic Graph Neural Layer
def __init__(self, manifold, in_features, out_features, c, drop_out, use_bias):
super(LorentzLinear, self).__init__()
# print("LorentzLinear")
self.manifold = manifold
self.in_features = in_features
self.out_features = out_features
self.c = c
self.drop_out = drop_out
self.use_bias = use_bias
self.bias = nn.Parameter(torch.Tensor(out_features-1)) # -1 when use mine mat-vec multiply
self.weight = nn.Parameter(torch.Tensor(out_features - 1, in_features)) # -1, 0 when use mine mat-vec multiply
self.reset_parameters()
def reset_parameters(self):
init.xavier_uniform_(self.weight, gain=math.sqrt(2))
init.constant_(self.bias, 0)
def forward(self, x):
drop_weight = F.dropout(self.weight, self.drop_out, training=self.training)
mv = self.manifold.matvec_regular(drop_weight, x, self.bias, self.c, self.use_bias)
return mv
dropout的输入可以是特征,也可以是权值矩阵。
总归返回的是,以概率p随机给元素置零之后的输入。
对输入执行对数映射。
分割映射结果。
矩阵乘法(将 x_tail 和权重矩阵 m 进行矩阵乘法。注意,这里对 m 执行了转置操作,以确保维度匹配)。
拼接结果,恢复到原有的维度。
首先执行 normalize_tangent_zero,将数据归一化到洛伦兹流形的切空间。再通过 执行指数映射,将数据映射回洛伦兹流形。
检查 mx 中的元素是否为零,用零替换掉 mx 中满足某个条件的部分。
def matvec_regular(self, m, x, b, c, use_bias):
d = x.size(1) - 1
x_tan = self.log_map_zero(x, c)
x_head = x_tan.narrow(1, 0, 1)
x_tail = x_tan.narrow(1, 1, d)
mx = x_tail @ m.transpose(-1, -2)
if use_bias:
mx_b = mx + b
else:
mx_b = mx
mx = torch.cat((x_head, mx_b), dim=1)
mx = self.normalize_tangent_zero(mx, c)
mx = self.exp_map_zero(mx, c)
cond = (mx==0).prod(-1, keepdim=True, dtype=torch.uint8)
res = torch.zeros(1, dtype=mx.dtype, device=mx.device)
res = torch.where(cond, res, mx)
return res
def log_map_zero(self, y, c, is_tan_normalize=True):
zeros = torch.zeros_like(y)
zeros[:, 0] = c ** 0.5
return self.log_map_x(zeros, y, c, is_tan_normalize)
对数映射的作用是将洛伦兹流形上的点投影到某个点 x 的切空间中(即欧几里得空间)。
通过内积调整 y,得到一个新的向量 tmp_vector。
计算 tmp_vector 的范数。
计算切向量 y_tan。
如果 is_tan_normalize 为真,则对计算得到的切向量 y_tan 进行归一化处理,确保它满足洛伦兹切空间的约束。
def log_map_x(self, x, y, c, is_tan_normalize=True):
"""
Logarithmic map at x: project hyperboloid vectors to a tangent space at x
:param x: vector on hyperboloid
:param y: vector to project a tangent space at x
:param normalize: whether normalize the y_tangent
:return: y_tangent
"""
xy_distance = self.induced_distance(x, y, c)
tmp_vector = y + self.l_inner(x, y, keep_dim=True) / c * x
tmp_norm = torch.sqrt(self.l_inner(tmp_vector, tmp_vector) + self.eps[x.dtype])
y_tan = xy_distance.unsqueeze(-1) / tmp_norm.unsqueeze(-1) * tmp_vector
if is_tan_normalize:
y_tan = self.normalize_tangent(x, y_tan, c)
return y_tan
这里通过 induced_distance 方法计算向量 x 和 y 在洛伦兹流形上的距离,这实际上是两点在洛伦兹空间的测地线距离。
def induced_distance(self, x, y, c):
xy_inner = self.l_inner(x, y)
sqrt_c = c ** 0.5
return sqrt_c * arcosh(-xy_inner / c + self.eps[x.dtype])
3. 流形映射到欧式空间
x_tan = self.manifold.log_map_zero(x_w, self.c)