1. 图同构网络:Weisfeiler-Lehman 测试与图神经网络的表达力
本节介绍一个关于图神经网络表达力的经典工作,以及随之产生的另一个重要的模型——图同构网络。图同构问题指的是验证两个图在拓扑结构上是否相同。Weisfeiler-Lehman 测试是一种有效的检验两个图是否同构的近似方法。当我们要判断两个图是否同构时,先通过聚合节点和它们邻居的标签,再通过散列函数得到节点新的标签,不断重复,直到每个节点的标签稳定不变。如果在某些迭代中,两个图的节点标签不同,则可以判定这两个图是不同的。在Weisfeiler-Lehman测试的过程中,K次迭代之后,我们会得到关于一个节点的高度为K 的子树。Weisfeiler-Lehman子树常被用于核方法中,来计算两个图的相似度。
类似于消息传递网络中所归纳的框架,大部分基于空域的图神经网络都可以归结为两个步骤:聚合邻接点信息和更新节点信息。
与Weisfeiler-Lehman测试一样,在表达网络结构的时候,一个节点的表征会由该节点的父节点的子树信息聚合而成。在图同构网络的论文中,作者证明了Weisfeiler-Lehman测试是图神经网络表征能力的上限。
定理 :设G₁和G₂ 为任意非同构图。如果一个图神经网络遵循领域聚合方案, 将G₁和G₂ 映射到不同的嵌入,则Weisfeiler-Lehman测试也判定G₁和G₂不是同构的。这说明,图神经网络的表达能力不会超过Weisfeiler-Lehman测试的区分能力。那么,我们有没有办法得到和Weisfeiler-Lehman测试一样强大的图神经网络呢?Weisfeiler-Lehman 测试最大的特点是其对每个节点的子树的聚合函数采用的是单射的散列函数。那么,是否将图神经网络的聚合函数也改成单射函数就能达到和Weisfeiler-Lehman测试一样的效果呢?
定理4:设A:G→Rd(上标)是一个遵循邻域聚合方案的图神经网络。通过足够的迭代次数(在图神经网络层数多的情况下),如果满足以下条件,则 A可以通过 Weisfeiler-Lehman测试把非同构的两个图G₁ 和G₂ 映射到不同的嵌入:
(1)A 在每次迭代时所采用的节点状态更新公式:
其中φ是单射函数,f 是一个作用在多重集上的函数,也是单射函数。
(2)从节点嵌入整合到最终的图嵌入时,A 所采用的读取函数运行在节点嵌入的多重集{hvk}上,也是一个单射函数。单射指的是不同的输入值一定会对应到不同的函数值。这个结论说明,要设计与Weisfeiler-Lehman一样强大的图卷积网络,最重要的条件是设计一个单射的聚合函数。
其实,求和函数在多重集上就是一个单射函数。由此,我们得到了一个新模型——图同构网络,只需要把聚合函数改为求和函数,就可以提升图神经网络的表达力:
2.图卷积网络实战
init部分
#导入确保代码在Python 2中也能使用Python 3的特性
from __future__ import print_function
#从layers模块导入所有内容
from __future__ import division
from .layers import *
#从models模块导入所有内容
from .models import *
#utils模块导入所有内容
from .utils import *
layers部分
import math
import torch
#从 PyTorch 的 nn 模块中导入 Parameter 类,它用于定义可学习的参数
from torch.nn.parameter import Parameter
#从 PyTorch 的 nn 模块中导入 Module 类,它是所有神经网络模块的基类
from torch.nn.modules.module import Module
#定义一个名为 GraphConvolution 的类,它继承自 Module 类
class GraphConvolution(Module):
#bias=True 是一个常见的参数设置,它用于决定是否在模型的某些层中添加偏置项。偏置项是神经网络中的一个参数,它允许模型在特征空间中进行平移,从而提高模型的灵活性和学习能力
#接收输入特征数 in_features、输出特征数 out_features
def __init__(self,in_features,out_features,bias=True):
super(GraphConvolution,self).__init__()
self.in_features=in_features
self.out_features=out_features
#创建一个权重矩阵 weight,它是一个 Parameter 对象,用于存储可学习的权重参数
self.weight=Parameter(torch.FloatTensor(in_features,out_features))
if bias:
self.bias=Parameter(torch.FloatTensor(out_features))
else:
#如果 bias 参数为 False,则不创建偏置向量,而是将 bias 注册为一个不存在的参数
self.register_parameter('bias',None)
#调用 reset_parameters 方法来初始化权重和偏置参数
self.reset_parameters()
def reset_parameters(self):
#计算权重的初始化标准差,使用输出特征数的平方根的倒数
stdv=1./math.sqrt(self.weight.size(1))
#使用均匀分布初始化权重,范围在 [-stdv, stdv] 之间
self.weight.data.uniform_(-stdv,stdv)
#定义 forward 方法,它是模型的前向传播方法,接收输入数据 input 和邻接矩阵 adj
def forward(self,input,adj):
#定义 forward 方法,它是模型的前向传播方法,接收输入数据 input 和邻接矩阵 adj
support=torch.mm(input,self.weight)
#使用矩阵乘法计算输入数据和权重的乘积,得到变换后的特征
output=torch.spmm(adj,support)
if self.bias is not None:
return output+self.bias
else:
return output
#定义 __repr__ 方法,用于返回类的字符串表示
def __repr__(self):
return self.__class__.__name +'('\
+str(self.in_features)+'->'\
+str(self.out_features)+')'
models部分
#导入 PyTorch 的神经网络模块
import torch.nn as nn
#导入 PyTorch 的函数模块,它包含了一些常用的函数,比如激活函数。
import torch.nn.functional as F
#从 pygcn 库中导入 GraphConvolution 类,这是一个图卷积层的实现
from pygcn.layers import GraphConvolution
#定义了一个名为 GCN 的类,它继承自 nn.Module
class GCN(nn.Module):
#nfeat:输入特征的数量,nhid:隐藏层的特征数量
#nclass:输出类别的数量,dropout:Dropout 层的丢弃概率
def __init__(self,nfeat,nhid,nclass,dropout):
super(GCN,self).__init__()
#self.gc1=GraphConvolution(nfeat,nhid):创建第一个图卷积层,将输入特征从 nfeat 转换到 nhid。
self.gc1=GraphConvolution(nfeat,nhid)
#self.gc2=GraphConvolution(nhid,nclass):创建第二个图卷积层,将隐藏层特征从 nhid 转换到输出类别 nclass
self.gc2=GraphConvolution(nhid,nclass)
#将Dropout 层的丢弃概率设置为传入的 dropout 参数
self.dropout=dropout
#定义了模型的前向传播函数,它接收两个参数
#x:输入的特征矩阵,adj:邻接矩阵,表示图结构
def forward(self,x,adj):
#通过第一个图卷积层 gc1 传递 x 和 adj,然后应用 ReLU 激活函数
x=F.relu(self.gc1(x,adj))
#在激活后的特征上应用 Dropout 层
x=F.dropout(x,self.dropout,training=self.training)
#通过第二个图卷积层 gc2 传递 x 和 adj
x=self.gc2(x,adj)
#在最后一层上应用 log-softmax 函数,得到每个类别的对数概率,并返回结果
#Softmax 目的是将一个向量或一个批量的向量中的元素值转换成概率分布
return F.log_softmax(x,dim=1)
utils部分(函数或代码模块)
import numpy as np
#导入 SciPy 库中的稀疏矩阵模块,用于处理稀疏矩阵sparse稀疏矩阵
import scipy.sparse as sp
import torch
#定义一个函数,将标签编码为 one-hot 格式
def encode_onehot(labels):
classes=set(labels)
#从标签中提取所有唯一的类别
classes_dict={c:np.identity(len(classes))[i,:] for i,c in enumerate(classes)}
labels_onehot=np.array(list(map(classes_dict.get,labels)),
dtype=np.int32)
#将所有标签转换为 one-hot 编码格式
return labels_onehot
#返回 one-hot 编码的标签数组
def load_data(path="../data/cora/",dataset='cora'):
print('Loading {} dataset ...'.format(dataset))
idx_features_labels=np.genfromtxt("{}{}.content".format(path,dataset),
dtype=np.dtype(str))
#从文件中加载节点的特征和标签 sp.csr_matriX稀疏矩阵
features=sp.csr_matrix(idx_features_labels[:,1,-1],dtype=np.int32)
#创建一个稀疏矩阵,包含节点的特征
idx_map={j:i for i,j in enumerate(idx)}
#创建一个映射,将索引映射到一个新值
edges_unorded=np.genformtxt("{}{}.cites".format(path,dataset),
dtype=np.int32)
#从文件中加载引用(边)信息
edges=np.array(list(map(idx_map.get,edges_unorded.flatten())),
dtype=np.int32).reshape(edges_unorded.shape)
shape=(labels.shape[0],labels.shape[0]), dtype=np.float32)
#创建一个 COO 格式的稀疏邻接矩阵
adj=sp.coo_matrix((np.ones(edges.shape[0]),(edges[:,0],edges[:,1])),
shape=(labels.shape[0],labels.shape[0]),
dtype=np.float32)
#构建对称邻接矩阵是图数据处理中的一个常见步骤
adj=adj+adj.T.multiply(adj.T>adj)-adj.multiply(adj.T>adj)
#确保邻接矩阵是对称的
features=normalize(features)
#对特征矩阵进行归一化处理,归一化:数据按照一定的比例缩放,使其落在特定的区间内
adj=normalize(adj+sp.eye(adj.shape[0]))
#对邻接矩阵进行归一化,并添加自环,添加自环是指在图中添加一条边,使得这条边连接一个顶点和它自身。
idx_train=range(140)
#定义训练集的索引
idx_val=range(200,500)
#定义验证集的索引
idx_test=range(500,1500)
#定义测试集的索引
features=torch.FloatTensor(np.array(features.todense()))
#将特征矩阵转换为 PyTorch 的 FloatTensor
labels=torch.LongTensor(np.where(labels)[1])
#将标签转换为 PyTorch 的 LongTensor
adj=sparse_mx_to_torch_sparse_tensor(adj)
#将稀疏邻接矩阵转换为 PyTorch 的稀疏张量
idx_train=torch.LongTensor(idx_train)
#将训练集索引转换为 PyTorch 的 LongTensor
idx_val=torch.LongTensor(idx_val)
#将验证集索引转换为 PyTorch 的 LongTensor
idx_test=torch.LongTensor(idx_test)
#将测试集索引转换为 PyTorch 的 LongTensor。
return adj,features,labels,idx_train,idx_val,idx_test
def normalize(mx):
#定义一个函数,用于归一化矩阵
rowsum=np.array(mx.sum(1))
#计算每一行的和
r_inv=np.power(rowsum,-1).flatten()
#计算每一行的倒数
r_mat_inv=sp.diags(r_inv)
#创建一个对角矩阵,对角线上是行的倒数
mx=r_mat_inv.dot(mx)
#用对角矩阵乘以原矩阵,进行归一化,
#mx=r_mat_inv.dot(mx)是矩阵乘法操作
return mx
def accuracy(output,labels):
#定义一个函数,用于计算准确率
preds=output.max(1)[1].type_as(labels)
#获取预测的类别
correct=preds.eq(labels).double()
#计算预测正确的数量
correct=correct.sum()
#将正确的数量相加
return correct/len(labels)
#返回准确率
def sparse_mx_to_torch_sparse_tensor(sparse_mx):
#定义一个函数,将 SciPy 的稀疏矩阵转换为 PyTorch 的稀疏张量
sparse_mx=sparse_mx.tocoo().astype(np.float32)
#将稀疏矩阵转换为 COO 格式
indices=torch.from_numpy(
np.vstack((sparse_mx.row,sparse_mx.col)).astype(np.int64))
#创建一个包含行和列索引的张量
values=torch.from_numpy(sparse_mx.data)
#创建一个包含数据值的张量
shape=torch.Size(sparse_mx.shape)
return torch.sparse.FloatTensor(indices,values,shape)
#返回一个 PyTorch 的稀疏 FloatTensor
train部分
from __future__ import division
#从 __future__ 模块导入 division,使得除法 / 总是产生浮点数结果
from __future__ import print_function
#从 __future__ 模块导入 print_function,确保在 Python 2 中使用 Python 3 的打印语法
import time
import argparse
#导入argparse模块,用于解析命令行参数
import numpy as np
import torch
import torch.nn.functional as F
import torch.optim as optim
#从 torch 导入 optim 模块,提供优化算法
from pygcn.utils import load_data, accuracy
#从 pygcn 包的 utils 模块导入 load_data 和 accuracy 函数
from pygcn.models import GCN
#从 pygcn 包的 models 模块导入 GCN 类
parser = argparse.ArgumentParser()
#创建一个 ArgumentParser 对象,用于解析命令行参数
parser.add_argument('--no-cuda', action='store_true', default=False,
help='Disables CUDA training.')
#添加一个命令行参数,用于禁用 CUDA 训练
parser.add_argument('--fastmode', action='store_true', default=False,
help='Validate during training pass.')
#添加一个命令行参数,用于在训练过程中进行验证
parser.add_argument('--seed', type=int, default=42, help='Random seed.')
#添加一个命令行参数,用于设置随机种子
parser.add_argument('--epochs',type=int,default=42,help='Random seed.')
#添加一个命令行参数,用于设置训练的轮数
parser.add_argument('--lr',type=float,default=0.01,
help='Inital learning rate.')
#添加一个命令行参数,用于设置初始学习率
parser.add_argument('--weight_deacy',type=float,default=5e-4,
help='Weight deacy(L2 loss on parameters).')
#添加一个命令行参数,用于设置权重衰减
parser.add_argument('--hidden',type=int,default=16,
help='Number of hidden units.')
#添加一个命令行参数,用于设置隐藏层单元的数量
parser.add_argument('--dropout',type=float,default=0.5,
help='Dropout rate (1-keep probability).')
#添加一个命令行参数,用于设置 dropout 率
args=parser.parse_args()
#解析命令行参数
args.cuda=not args.no_cuda and torch.cuda.is_available()
#设置 cuda 标志,如果命令行参数 --no-cuda 没有被设置且 CUDA 可用,则启用 CUDA
np.random.seed(args.seed)
#设置 NumPy 的随机种子
torch.manual_seed(args.seed)
#设置 PyTorch 的随机种子
if args.cuda:
torch.cuda.manual_seed(args.seed)
#如果使用 CUDA,则设置 CUDA 的随机种子
#load data
adj,features,labels,idx_train,idx_val,idx_test=load_data()
#调用 load_data 函数加载数据
#model and optimizer
model =GCN(nfeat=features.shape[1],#nfeat:特征的数量
nhid=args.hidden,#nhid:隐藏层的单元数
nclass=labels.max().item+1,#nclass:类别的数量
dropout=args.dropout)#dropout 比率,用于正则化以防止过拟合
#创建一个 GCN 模型实例,并设置相应的参数
optimizer=optim.Adam(model.parameters(),
lr=args.lr,weight_deacy=args.weight_deacy)
#创建一个 Adam 优化器实例,并设置学习率和权重衰减
if args.cuda:
model.cuda()
features=features.cuda()
adj=adj.cuda()
labels=labels.cuda()
idx_train=idx_train.cuda()
idx_val=idx_val.cuda()
idx_test=idx_test.cuda()
#如果使用 CUDA,则将模型和数据迁移到 GPU
def train(epoch):
#定义 train 函数,用于执行一个训练周期
t=time.time()
#记录训练开始的时间
model.train()
#设置模型为训练模式
optimizer.zero_grad()
#清空优化器的梯度
output=model(features,adj)
#前向传播,计算模型输出
loss_train=F.nll_loss(output[idx_train],labels[idx_train])
#计算训练集上的损失
acc_train=accuracy(output[idx_train],labels[idx_train])
#计算训练集上的准确率
loss_train.backward()
#反向传播,计算梯度
optimizer.step()
#更新模型参数
if not args.fastmode:
#(fast mode)通常是指一种优化的执行模式,旨在提高程序的运行速度,通常是以牺牲一些功能或降低准确性为代价
model.eval()
output=model(features,adj)
loss_val=F.nll_loss(output[idx_val],labels[idx_val])
acc_val=accuracy(output[idx_val],labels[idx_val])
print('Epoch:{:04d}'.format(loss_train.item()),
'loss_train:{:.4f}'.format(loss_train.item()),
'acc_train:{:.4f}'.format(acc_train.item()),
'loss_val:{:.4f}'.format(acc_val.item()),
'time:{:.4f}s'.format(time.time()-t))
#如果不使用快速模式,则在每个训练周期后进行验证,并计算验证集上的损失和准确率
def test():
#定义 test 函数,用于测试模型
model.eval()
#设置模型为评估模式
output=model(features,adj)
#前向传播,计算模型输出
loss_test=F.nll_loss(output[idx_test],labels[idx_test])
#计算测试集上的损失
print("Test set results:",
"loss={:.4f}".format(acc_test.item()))
#打印测试集的结果
t_total=time.time()
#记录总的开始时间
for epoch in range(args.epochs):
train(epoch)
#进行指定次数的训练周期
print("Optimization Finished!")
#训练完成后打印信息
print('Total time elapsed:{:.4f}s'.format(time.time()-t_total))
#打印总的执行时间
test()
#调用 test 函数进行测试
#前向传播是模型评估和预测的基础,它使得模型能够根据训练过程中学到的权重和偏差来对新的输入数据做出响应
#1.在训练过程中,前向传播用于计算预测值,然后通过反向传播算法计算损失函数的梯度,并更新模型的权重
# 2.在模型部署或推理阶段,前向传播用于生成最终的预测结果