-
Notifications
You must be signed in to change notification settings - Fork 558
Euler 2.0 Message Passing接口
本章的章节内容安排如下:
Euler2.0对GNN类的算法模型进行了Message Passing范式的抽象,将一个GNN类的算法模型定义为子图抽样模块,图卷积模块,以及可选的池化模块的组合。Message Passing范式能够提高高稀疏不规则数据的计算效率,同时更利于算法模型的创新或组合。
图神经网络的模型,可以看作消息传递的过程,每一个节点会发出它自己所含有的消息,也会接受其他节点传来的消息。然后将得到的所有信息做聚合,计算节点新的表示。
在Euler2.0中,为了提高分布式并行计算能力,Message Passing接口会对训练的全图做子图抽样,得到一个有汇聚方向的二部图。然后在这个二部图上进行消息传递过程,得到汇聚目标节点的新的表示。汇聚目标节点新的表示会经过可选的模块,完成一个GNN类算法的计算。Message Passing范式下的计算流程如下图所示。
Message Passing通过Euler2.0提供的一系列OP来构造各种各样的子图抽样方式,来生成一组表示从源节点到目标节点信息汇聚的二部图。目前,Message Passing提供的子图生成方式包括:
DataFlow | Paper | Note |
---|---|---|
whole | - | 输入节点到输入节点邻接关系的二部图 |
Full | GCN | 输入节点到输入节点全部邻居邻接关系的二部图 |
sage | GraphSage | 输入节点到采样邻居邻接关系的二部图 |
fast | FastGCN | 输入节点到随机采样节点邻接关系的二部图(最后一层为全部邻居) |
adapt | AdaptiveGCN | 输入节点到Layerwise采样邻居邻接关系的二部图(最后一层为全部邻居) |
layerwise | - | 输入节点到Layerwise采样邻居邻接关系的二部图(第一层为采样邻居) |
relation | RGCN | 输入节点到输入节点全部邻居邻接关系的二部图(可获取边信息) |
Messsage Passing同样支持通过Euler2.0 OP自定义一个DataFlow。一个自定义的DataFlow可以通过实现继承UniqueDataFlow/NeighborDataFlow的类,并提供get_neighbor方法来完成。get_neighbor方法的输入为源节点,返回值为邻居节点以及与邻居节点对应的目标节点index。下面的代码为GCN Dataflow的示例。
import tensorflow as tf
import tf_euler
from tf_euler.python.dataflow.neighbor_dataflow import UniqueDataFlow, NeighborDataFlow
class GCNDataFlow(UniqueDataFlow):
def __init__(self, metapath,
add_self_loops=True):
super(GCNDataFlow, self).__init__(num_hops=len(metapath),
add_self_loops=add_self_loops)
self.metapath = metapath
def get_neighbors(self, n_id):
'''
The neighbor sampler in a mini-batch of n_id.
It returns: neighbors: a list of 'tensor';
neighbor_src: a list of 'tensor'
Input:
n_id: input nodes
Output:
neighbors: [[n_id's neighbor], [n_id's neighbor's neighbor], ...]
neighbor_src: [[n_neighbor_src_idx], [n_neighbor_neighbor_src_idx], ...]
'''
neighbors = []
neighbor_src = []
for i in range(len(self.metapath)):
n_id = tf.reshape(n_id, [-1])
one_neighbor = tf_euler.get_full_neighbor(n_id,
self.metapath[i])[0]
neighbors.append(tf.reshape(one_neighbor.values, [-1]))
one_indices = one_neighbor.indices[:, 0]
neighbor_src.append(tf.cast(one_indices, tf.int32))
new_n_id = tf.reshape(one_neighbor.values, [-1])
n_id = tf.concat([new_n_id, n_id], axis=0)
n_id, _ = tf.unique(n_id)
return neighbors, neighbor_src
Message Passing中的图卷积模块定义了信息在二部图上的消息传递汇聚方式。convolution函数在Message Passing范式下的过程和公式如下所示:
该公式由三部分组成:
- 消息传递函数:表示源节点与目标节点所含有消息在连接边的传递过程
- 消息汇聚函数:表示目标节点如何处理汇聚所有源节点传递过来的消息
- 消息更新函数:表示目标节点如何更新自己所含有的消息
可以通过Messsage Passing接口自定义一个Convolution函数。一个自定义的Convolution可以通过实现继承Conv的类,并完成表示消息汇聚函数的apply_edge,表示消息更新函数的apply_node,使用(或自定义)表示消息汇聚函数的scatter_op来组成。下面的代码为GCN Convlution的示例。
import tensorflow as tf
from tf_euler.python.convolution import conv
from tf_euler.python.euler_ops import mp_ops
class GCNConv(conv.Conv):
def __init__(self, dim, **kwargs):
super(GCNConv, self).__init__(aggr='add', **kwargs)
self.fc = tf.layers.Dense(dim, use_bias=False)
@staticmethod
def norm(edge_index, size):
edge_weight = tf.ones([tf.shape(edge_index)[1], 1])
def deg_inv_sqrt(i):
deg = mp_ops.scatter_add(edge_weight, edge_index[i], size[i])
return deg ** -0.5
return tuple(map(deg_inv_sqrt, [0, 1]))
def __call__(self, x, edge_index, size=None, **kwargs):
'''
x和edge_index参数共同描述了一个二部子图
x以二元组[src_node_emb, dst_node_emb],表示该二部图中src_node和dst_node集合的embedding
edge_index以连续紧密编号的二元组[src_node_idx, dst_node_idx]表示该二部图中src_node和dst_node之间的链接关系
a--c x = [[a, b], [c, d]]
\ edge_index = [[0, 0, 1], ->表示存在3条边 a, a, b
b--d [0, 1, 1]] c, d, d
下面的讲解均以该图为例
'''
# 计算子图度的normalize
norm = self.norm(edge_index, size)
'''
gather_feature函数会将输入的list,分别按照edge_index做gather拓展
x = [[a, b], [c, d]]
edge_index = [[0, 0, 1], [0, 1, 1]]
gather_x = [[a, a, b], [c, d, d]]
'''
gather_x, gather_norm, = self.gather_feature([x, norm], edge_index)
'''
消息传递apply_edge
apply_edge表示对二部图中每一条边上信息的处理方式
GCN中每一条边保留的信息为:dst_node_emb * src_norm * dst_norm
得到每条边保留下来的message的embedding
分别得到 c->a, d->a, d->b 分别传递的信息embedding
'''
out = self.apply_edge(gather_x[1], gather_norm[0], gather_norm[1])
'''
消息汇聚scatter: mean, add, max, softmax可选
将多组边的信息向src node做汇聚,
定义了a如何处理c 和 d节点分别发送过来的消息embedding,以及b如何处理d节点发送过来的embedding
得到a和b节点汇聚之后的信息结果
'''
out = mp_ops.scatter_(self.aggr, out, edge_index[0], size=size[0])
'''
消息更新apply_node
表示src节点如何处理自己原来的消息embedding和从邻居节点汇聚过来的消息embedding
在GCN中仅将汇聚过来的消息过全链接作为src节点输出的embedding
'''
out = self.apply_node(out)
return out
def apply_edge(self, x_j, norm_i, norm_j):
return norm_i * norm_j * x_j
def apply_node(self, aggr_out):
return self.fc(aggr_out)
使用Message Passing接口,在已有的convolution和dataflow模块下自定义一个算法模型,可以分为以下两步:
- 实现GNN模型:通过继承BaseGNN的方式定义一个GNN模型。实现to_x函数,来表示convlolution过程h0层的embedding结果。
- 定义模型损失及验证metric:可以通过继承Message Passing接口提供的SuperviseMode/UnsuperviseModel完成模型的定义。
Example:
import tensorflow as tf
import tf_euler
from tf_euler.python.mp_utils.base_gnn import BaseGNNNet
from tf_euler.python.mp_utils.base import SuperviseModel, UnsuperviseModel
class GNN(BaseGNNNet):
def __init__(self, conv, flow,
dims, fanouts, metapath,
feature_idx, feature_dim, add_self_loops=True):
super(GNN, self).__init__(conv=conv,
flow=flow,
dims=dims,
fanouts=fanouts,
metapath=metapath,
add_self_loops=add_self_loops)
self.feature_idx = feature_idx
self.feature_dim = feature_dim
def to_x(self, n_id):
# ho层的embedding直接通过结点的dense feature决定
x, = tf_euler.get_dense_feature(n_id,
self.feature_idx,
self.feature_dim)
return x
class SupervisedGCN(SuperviseModel):
def __init__(self, dims, metapath,
feature_idx, feature_dim,
label_idx, label_dim):
super(SupervisedGCN, self).__init__(label_idx,
label_dim)
''''
定义GNN模型的dataflow,convolutin
可选convolution:gcn, sage, gat, tag, agnn, sgcn, graphgcn, appnp,
arma, dna, gin, gated, relation
可选dataflow:full, sage, adapt, layerwise, whole, relation
'''
self.gnn = GNN('gcn', 'full', dims, None, metapath,
feature_idx, feature_dim)
# 定义root结点的表示方法
def embed(self, n_id):
return self.gnn(n_id)