Skip to content

Euler 2.0 点和边的属性索引的应用

origin edited this page Jun 29, 2020 · 2 revisions

本章的章节安排如下:

在本章,我们将在Euler2.0是如何应用点和边的属性索引。

Euler2.0 提供了对于点和边的属性进行索引的功能(详见这里),用户在节点或边进行采样的时候可以通过该功能按照属性对于相应的节点和边进行过滤,以此来满足用户的个性化需求。

在应用点和边的属性索引的时候,需要以下几步:

  • 创建索引meta文件
  • Euler 2.0图数据生成
  • 使用索引

创建索引meta文件

首先需要在Euler2.0图数据生成的时候,除了提供JSON文件之外,还有创建对应的索引meta文件(详见这里),索引meta文件定义了对节点或者边的哪些特征创建哪一种索引,以支持后续的基于索引的查询。

这里我们可以通过python来创建meta文件,如下,这里针对节点的price, category和brand分别构建了三种不同类型的索引。

def generate_index_meta(self):
    meta = {}
    meta['node'] = {}
    meta['node']['features'] = {}
    meta['node']['features']['price'] = \
        "price:float:uint64_t:range_index"

    meta['node']['features']['category'] = \
        "category:string:uint64_t:hash_index"
    meta['node']['features']['brand'] = \
        "brand:string:uint64_t:neighbor_index" 
    meta['edge'] = {}
    meta_json = json.dumps(meta)
    print(meta_json)
    with open(self.meta_file ,'w') as out:
        out.write(meta_json)

Euler 2.0 图数据生成

在创建了索引meta文件之后,用户需要将meta传入图生成工具中(即下图的convert_meta),这样以来,生成的euler2.0图数据二进制文件就会包含索引信息。

def convert2euler(self, convert_dir, out_dir):
    dir_name = os.path.dirname(os.path.realpath(__file__))
    convert_meta = self.meta_file
    g = EulerGenerator(convert_dir,
                       convert_meta,
                       out_dir,
                       self.partition_num)
    g.do()

使用索引

创建好索引之后,生成图数据之后,用户可以利用索引来实现个性化的需求。这里将以节点为对象(对边进行索引使用类似),从节点采样以及其邻居节点采样两个角度来介绍索引的使用。

全局节点采样

在有些场景下,用户可能希望对满足要求的部分进行embedding学习。在此情况下,用户可以利用Euler2.0提供的索引功能来很方便的完成该需求。

以下以几个例子为例来进行介绍

Example 1

可以在图数据加载的时候,即生成每一个batch 数据的时候,对于sample的节点按照某种方式过滤。如下所示,是对price大于100 的节点进行采样

#通过tf_euler.sample_node采样训练的节点,生成Batch data,
def get_train_from_input(self, inputs, params):
    #对于price大于100的节点进行采样
    result = tf_euler.sample_node(inputs, params['train_node_type'],'price > 100')
    result.set_shape([self.params['batch_size']])
    return result

Example 2

对于category为1的节点进行采样

#通过tf_euler.sample_node采样训练的节点,生成Batch data,
def get_train_from_input(self, inputs, params):
    #对于category为1的节点进行采样
    result = tf_euler.sample_node(inputs, params['train_node_type'],'category = 1')
    result.set_shape([self.params['batch_size']])
    return result

邻居节点采样

有些场景下,用户对于聚合的节点有要求,可能希望只聚合满足某种条件的一部分邻居节点。用户可以利用Euler2.0提供的所有功能来实现对于邻居节点的过滤。

为了在GNN中应用该功能,可以通过以下两种方式实现:

不基于Message Passing的框架来实现GNN时:

在采样邻居的时候,直接调用Euler 2.0的基于条件的邻居采样op实现:

Example 3

采样brand为1的邻居节点

tf_euler.sample_neighbor(nodes, edge_types, count, default_node=-1, condition='brand = 1')

基于Message Passing的框架来实现GNN时:

需要实现DataFlow(定义对应的采样过滤方式),然后在实现GNN Encoder的时候,创建对应的sampler。

Example 4

采样brand为1的邻居节点

#实现DataFlow
class MyDataFlow(UniqueDataFlow):
    def __init__(self, fanouts, metapath,
                 add_self_loops=True,
                 max_id=-1,
                 **kwargs):
        super(SageDataFlow, self).__init__(num_hops=len(metapath),
                                           add_self_loops=add_self_loops)
        self.fanouts = fanouts
        self.metapath = metapath
        self.max_id = max_id

    def get_neighbors(self, n_id):
        neighbors = []
        neighbor_src = []
        for hop_edge_types, count in zip(self.metapath, self.fanouts):
            n_id = tf.reshape(n_id, [-1])
            #只采邻居中brand为1的邻居
            one_neighbor, _w, _ = tf_euler.sample_neighbor(
                n_id, hop_edge_types, count, default_node=self.max_id+1,condition='brand = 1')
            
            neighbors.append(tf.reshape(one_neighbor, [-1]))
            node_src = tf.range(tf.size(n_id))
            node_src = tf.tile(tf.reshape(node_src, [-1, 1]), [1, count])
            node_src = tf.reshape(node_src, [-1])
            neighbor_src.append(node_src)
            new_n_id = tf.reshape(one_neighbor, [-1])
            n_id = tf.concat([new_n_id, n_id], axis=0)
            n_id, _ = tf.unique(tf.reshape(n_id, [-1]))
        return neighbors, neighbor_src

#在实现GNN Encoder的时候,创建对应的sampler
class GNN(BaseGNNNet):
    def __init__(self, conv, flow,
                 dims, fanouts, metapath,
                 feature_idx, feature_dim,
                 add_self_loops=False,
                 max_id=-1, **kwargs):
        super(GNN, self).__init__(conv=conv,
                                  flow=flow,
                                  dims=dims,
                                  fanouts=fanouts,
                                  metapath=metapath,
                                  add_self_loops=add_self_loops,
                                  max_id=max_id,
                                  **kwargs)

        self.sampler = MyDataFlow(fanouts, metapath, add_self_loops, max_id=max_id)
Clone this wiki locally