PyGAT图注意力模型
PyGAT实现的分类器: https://www.aliyundrive.com/s/vfK8ndntpyc
还在发烧,不是特别清醒,就简单写了写。用GAT进行关系预测,GAT可能是只做中间层,不过本来在GAT这一层就为了能懂就简化了很多地方了,如果再加别的,预测正确率大概率很低。尝试了直接用GAT预测边权(没用稀疏矩阵的版本),内存不够没办法跑(需要至少100G+),试了少一些节点,也基本预测不出来,所以这里只介绍GAT基本实现和GAT进行分类。
代码是用ANACONDA的IPYTHON虚拟环境里运行的。
需要安装pytorch,在conda prompt输入以下命令:
conda install pytorch torchvision torchaudio cpuonly -c pytorch
不用conda可以↓,选Pip,得到Pip的命令安装PyTorch。CUDA没有对应的版本就选CPU。
https://pytorch.org/
GAT
GAT对于一个图,按照其输入的节点特征预测输出新的节点的特征。
GAT是一种能够直接作用于图并且利用其结构信息的卷积神经网络,可用于网络中的半监督学习问题,学习网络中结点的特征与网络结构的信息。主要思想是对每个结点的邻居及其自身的信息作加权平均,按照其输入的节点特征输出新的节点特征。图注意力模型用注意力机制对邻近节点特征加权求和,每个节点可以根据邻节点的特征,为其分配不同的权值,将权重称为注意力系数,然后根据注意力系数进行加权求和,得到节点的新特征。
图注意力模型训练得到的是一个计算好节点的注意力权重的图,得到该图以后,就可以输入节点特征,得到新的节点特征。这个新的特征是什么取决于使用图注意力模型的目的。
GAT模型
假设输入特征维度为x,输出特征维度为y。
GAT是图注意力层+MUTIHEAD机制,MUTIHEAD机制相当于有多个图注意力层,每个层输出是n1,n2,n3…个新特征,最后再将这些特征转变为y个特征,也就是最终输出的特征。
注意力系数计算
构建图注意力层的第一步是计算注意力系数,对所有节点计算他的所有相邻节点的注意力系数。计算注意力系数公式如下:
•eij:节点i对邻居j的注意力系数
•W:可学习的参数矩阵
•a:可学习参数,一个向量,将多维特征转化为一个数
得到注意力系数后,将该节点与所有邻居节点的注意力系数做归一化处理,就得到了注意力系数。这个归一化是指数归一化,并且归一化之前要使用L e a k y R e L U 进行非线性激活。归一后就得到了节点i和节点j的注意力系数。
聚合
得到计算好的注意力系数,将特征进行加权求和,经激活函数激活后就得到了每个节点的新特征。
计算的例子
https://zhuanlan.zhihu.com/p/412270208这里有一个图注意力模型的具体的计算的例子,可以看一下,就知道GAT是怎么由输入得到输出的了,这里的GAT代码是PyGAT的官方代码,对cora数据集分类,如果能看懂就知道GAT怎么用了,或者可以继续看下去,用了PyGAT的模型,训练的部分简化了,删除了稀疏矩阵运算的版本。
模型定义
图注意力层的定义如下:
class GraphAttentionLayer(nn.Module):
"""
Simple GAT layer, similar to https://arxiv.org/abs/1710.10903
"""
def __init__(self, in_features, out_features, dropout, alpha, concat=True):
super(GraphAttentionLayer, self).__init__()
self.dropout = dropout #dropout表示随机放弃多少邻节点,一般0.2
self.in_features = in_features #输入特征
self.out_features = out_features #输出特征
self.alpha = alpha #激活函数用的参数,0.2
self.concat = concat
self.W = nn.Parameter(torch.empty(size=(in_features, out_features))) #参数矩阵W
nn.init.xavier_uniform_(self.W.data, gain=1.414) #初始化
self.a = nn.Parameter(torch.empty(size=(2*out_features, 1))) #参数向量a
nn.init.xavier_uniform_(self.a.data, gain=1.414) #初始化
self.leakyrelu = nn.LeakyReLU(self.alpha) #激活函数
注意力层计算的过程在forward函数中,输入是h,原特征矩阵,输出是新特征矩阵。
def forward(self, h, adj):
Wh = torch.mm(h, self.W) # h.shape: (N, in_features), Wh.shape: (N, out_features)
e = self._prepare_attentional_mechanism_input(Wh)
zero_vec = -9e15*torch.ones_like(e)
attention = torch.where(adj > 0, e, zero_vec) #邻接矩阵为0的位置表示没有边,注意力系数为0
attention = F.softmax(attention, dim=1) #指数归一化
attention = F.dropout(attention, self.dropout, training=self.training) #drop 0.2 的邻节点的注意力系数
h_prime = torch.matmul(attention, Wh) #输出特征
if self.concat:
return F.elu(h_prime) #激活后的输出特征
else:
return h_prime
GAT模型就是在图注意力层基础上MUTIHEAD机制的实现,但是MUTIHEAD机制不是图注意力模型必须的,只有1个图注意力层就可以算是一个图注意力模型了。
下面看一下GAT模型的定义,MUTIHEAD机制的实现不重要,所以不看具体实现,只看参数。init中,nfeat是输入特征的维度,nhid是中间层,也就是如果有多个注意力层的情况下,注意力层的输出特征维度,nclass是最终的输出维度,因为这个GAT实现的是一个分类器,所以输出特征数就是类别数,即如果有n类,输出特征就应该有n个,每个输出值表示该样本是该类别的概率。例如每个样本都有一些自己的特征,一共有三类,把特征以及该样本和其他样本的关系输入到训练好的GAT,输出特征是0.1,0.5,0.4,就表示该样本是第一类,第二类,第三类的概率是0.1,0.5,0.4。至于为什么输出是概率,就涉及到模型训练的损失函数了,这部分内容可以看一下交叉熵相关的知识。顶着烧的我反正是没太看懂
class GAT(nn.Module):
def __init__(self, nfeat, nhid, nclass, dropout, alpha, nheads):
def forward(self, x, adj):
模型训练
GAT能把输入特征变换为输出特征,是需要训练的。GAT一共两个可变参数,参数矩阵W和参数a,已经在模型定义时用nn.Parameter声明成参数了,接下来只要定义训练函数,就能训练出合适的模型了。
怎么训练出合适的模型,让模型能够得到预期的输出,取决于模型的任务。对于分类任务来说,应该用已有数据的标签来训练模型,让模型得到的样本标签能够尽量贴近真实标签。
首先要实例化一个模型,假设实例化一个三分类GAT模型:
hidden = int(labels.max()) + 1 #直接把中间层特征数也设置成输出特征数了
model = GAT(nfeat=features.shape[1],
nhid=hidden, #中间层特征数,模型可以在每一层有不同的输出特征数
nclass=int(labels.max()) + 1, #类别即输出特征数
dropout=0.2,
nheads= 1,
alpha=0.2)
训练模型用的优化器也要实例化一个:
patience = 100 #100次LOSS没有下降,就停止训练
epochs = 1000
optimizer = optim.Adam(model.parameters(),
lr=0.001, #学习率,训练慢了可以调大一点
weight_decay=5e-4)
单轮次的训练是这样的,就是用GAT算输出,然后用输出和正确结果计算损失,然后使用优化器进行优化,优化器会会优化参数。损失函数就是上面提到的交叉熵。其中labels是ONEHOT编码,例如有三个类别type1,type2,type3,编码就是001,010,100,至于为什么使用ONEHOT编码,这也是和交叉熵有关的。
def train(epoch):
t = time.time()
model.train()
optimizer.zero_grad()
output = model(features, adj) # 算输出
loss_train = F.nll_loss(output[idx_train], labels[idx_train]) # nll_loss损失函数
acc_train = accuracy(output[idx_train], labels[idx_train])
loss_train.backward()
optimizer.step() # 使用优化器优化
loss_val = F.nll_loss(output[idx_train], labels[idx_train])
acc_val = accuracy(output[idx_train], labels[idx_train])
# 输出单轮训练结果
print('Epoch: {:04d}'.format(epoch+1),
'loss_train: {:.4f}'.format(loss_train.data.item()),
'acc_train: {:.4f}'.format(acc_train.data.item()),
'loss_val: {:.4f}'.format(loss_val.data.item()),
'acc_val: {:.4f}'.format(acc_val.data.item()),
'time: {:.4f}s'.format(time.time() - t))
return loss_val.data.item()
这样的训练进行多轮,直到所有轮次训练完或者损失函数算的loss值不再变换。
t_total = time.time()
loss_values = []
bad_counter = 0
best = epochs + 1
best_epoch = 0
for epoch in range(epochs):
loss_values.append(train(epoch))
torch.save(model.state_dict(), '{}.pkl'.format(epoch))
if loss_values[-1] < best:
best = loss_values[-1]
best_epoch = epoch
bad_counter = 0
else:
bad_counter += 1
if bad_counter == patience:
break
files = glob.glob('*.pkl')
for file in files:
epoch_nb = int(file.split('.')[0])
if epoch_nb < best_epoch:
os.remove(file)
train.py里用随机生成的数据进行了训练,运行一下就可以看到模型训练的过程结果,训练好的最佳参数的模型会保存到一个pkl文件里,最后的测试部分,会读入文件中的模型,测试中有一个idx_test是用于测试的数据的下标,我没有定义测试数据,所以将测试的这部分注释掉了。
(原来的数据集中标签是一整份的,idx_train是用于训练的样本下标,idx_test是用于测试的样本下标,例如idx_train是0-4,表示labels[0:4]都是训练用的)