-
Notifications
You must be signed in to change notification settings - Fork 558
Euler 2.0 在知识图谱上的应用
本章的章节安排如下:
在本章,我们将介绍euler2.0是如何在知识图谱上应用。
知识图谱是一类特殊的图数据,带有一定的语义,并且可复用于多个场景的。其是对人类知识的一种结构化的友好表示。在知识图谱中,往往使用三元组(head, relation, tail)来表示知识,其中head和tail都是实体用节点表示,relation表示关系用边表示,比如(清华大学, 坐落于, 北京)。知识图谱表示了实体之间复杂的语义关系,利用知识图谱可以帮助人们进行事物的理解和推理,比如在推荐系统中,除了建模用户与商品之间的交互以外,可以利用电商知识图谱来丰富商品的语义,挖掘潜在的商品关系等,来辅助对用户行为进行预测。
本章以知识图谱表示学习方法TransE,知识图谱fb15k为例,介绍如何利用Euler2.0构建表示学习方法来解决知识图谱上的表示学习,完整的代码见这里。
Note:
在章会对相应的一些句子加粗或者在代码做注释来显式地区别与其他场景的应用的一些不同处理。与其他场景的联系和区别的详细对比见这里。
与在有属性图上的应用中的数据准备类似,也是先生成知识图谱数据,然后对知识图谱数据进行加载。
如下所示,为生成fb15k数据对应的JSON文件,其中Meta文件为可选文件,这里不提供Meta文件。完整的代码见这里。
通过convert2json()函数生成相应的JSON文件,需要注意的是,fb15k中节点的没有属性信息,边的属性用关系类型特征存储。
def convert2json(self, convert_dir, out_dir):
def add_node(id, type, weight):
node_buf = {}
node_buf["id"] = id
node_buf["type"] = type
node_buf["weight"] = weight
node_buf["features"] = []
return node_buf
def add_edge(src, dst, id, type, weight):
edge_buf = {}
edge_buf["src"] = src
edge_buf["dst"] = dst
edge_buf["type"] = type
edge_buf["weight"] = weight
edge_buf["features"] = []、
#将节点与节点之间的关系类型作为属性特征
lab_buf = {}
lab_buf["name"] = "id"
lab_buf["type"] = "dense"
lab_buf["value"] = [id]
edge_buf["features"].append(lab_buf)
return edge_buf
out_test = open(self.edge_id_file, 'w')
node_out_test = open(self.node_id_file, 'w')
with open(out_dir, 'w') as out:
buf = {}
buf["nodes"] = []
buf["edges"] = []
entity_map = {}
relation_map = {}
entity_id = 0
relation_id = 0
for file_type in ['train', 'test', 'valid']:
in_file = open(convert_dir + '/freebase_mtr100_mte100-' +
file_type + '.txt', 'r')
for line in in_file.readlines():
triple = line.strip().split()
if not triple[0] in entity_map:
entity_map[triple[0]] = entity_id
entity_id += 1
buf["nodes"].append(add_node(entity_map[triple[0]],
file_type, 1))
if not triple[2] in entity_map:
entity_map[triple[2]] = entity_id
entity_id += 1
buf["nodes"].append(add_node(entity_map[triple[2]],
file_type, 1))
if not triple[1] in relation_map:
relation_map[triple[1]] = relation_id
relation_id += 1
buf["edges"].append(add_edge(entity_map[triple[0]],
entity_map[triple[2]],
relation_map[triple[1]],
file_type, 1))
if file_type == 'test':
edge_line = str(entity_map[triple[0]]) + " " + \
str(entity_map[triple[2]]) + " " + "1" + "\n"
id_line = str(entity_map[triple[0]]) + "\n"
out_test.write(edge_line)
node_out_test.write(id_line)
in_file.close()
out.write(json.dumps(buf))
print("Total Entity: {}, Relation: {}, Node: {}, Edge: {}."
.format(len(buf["nodes"]), relation_id,
entity_id, len(buf["edges"])))
out_test.close()
node_out_test.close()
与有属性图类似,这里利用Euler2.0 python工具(详细介绍见这里)将JSON和Meta文件转化成对应的二进制文件。
def convert2euler(self, convert_dir, out_dir):
dir_name = os.path.dirname(os.path.realpath(__file__))
convert_meta = self.meta_file
g = EulerGenerator(convert_dir,
convert_meta,
out_dir,
self.partition_num)
g.do()
与有属性图类似,知识图谱数据生成之后,用户需要在训练的时候加载对应的数据,并在每个Batch获取具体的数据(需要注意的是,TransE的训练的每一个样本是一条边而不是一个节点)方式如下:
import tf_euler
#加载图数据
euler_graph = tf_euler.dataset.get_dataset('fb15k')
euler_graph.load_graph()
#由于TransE的样本是一条边,因此可以通过tf_euler.sample_edge采样训练的边,生成Batch data,
def get_train_from_input(self, inputs, params):
source = tf_euler.sample_edge(inputs, params['train_edge_type'])
source.set_shape([inputs, 3])
return source
Euler-2.0以及实现了知识图谱相关的TransX系列算法,包括TransE,TransH,TransR,TransD等。
TransX系列算法的核心想法就是先将三元组(head, relation, tail)中的每一个的id都转化成embedding向量,然后依据某种向量距离的约束来训练三元组中的embedding。
因此,Euler-2.0将TransX系列算法抽象出一个基类TransX(详见这里),用户在实现TransX系列算法或者设计新的距离约束的时候,只需要继承该基类,然后实现具体的generate_embedding()和loss_fn()即可。
这里以TransE为例,进行具体的介绍。
该类封装了利用Euler2.0生成训练样本的负节点对以及一些工具函数,如距离计算,embedding正则等等。
class TransX(tf_euler.utils.layers.Layer):
def __init__(self, node_type, edge_type,
node_max_id, edge_max_id,
ent_dim, rel_dim,
num_negs=5, l1=True, metric_name='mrr',
corrupt='both',
**kwargs):
super(TransX, self).__init__(**kwargs)
self.node_type = node_type
self.edge_type = edge_type
self.node_max_id = node_max_id
self.edge_max_id = edge_max_id
self.ent_dim = ent_dim
self.rel_dim = rel_dim
self.num_negs = num_negs
self.l1 = l1
self.metric_name = metric_name
self.corrupt = corrupt
self.entity_encoder = tf_euler.utils.layers.Embedding(node_max_id+1,
ent_dim)
self.relation_encoder = tf_euler.utils.layers.Embedding(edge_max_id+1,
rel_dim)
def generate_negative(self, batch_size):
return tf_euler.sample_node(batch_size * self.num_negs, self.node_type)
def generate_triplets(self, inputs):
batch_size = tf.shape(inputs)[0]
src, dst, _ = tf.split(inputs, [1, 1, 1], axis=1)
rel = tf_euler.get_edge_dense_feature(inputs, ['id'], [1])
rel = tf.cast(rel, tf.int64)
neg = self.generate_negative(batch_size)
src = tf.reshape(src, [batch_size, 1])
dst = tf.reshape(dst, [batch_size, 1])
neg = tf.reshape(neg, [batch_size, self.num_negs])
rel = tf.reshape(rel, [batch_size, 1])
return src, dst, neg, rel
def norm_emb(self, embedding):
output_shape = embedding.shape
embedding = tf.reshape(embedding, [-1, self.ent_dim])
embedding = tf.nn.l2_normalize(embedding, 1)
output_shape = [d if d is not None else -1
for d in output_shape.as_list()]
embedding = tf.reshape(embedding, output_shape)
return embedding
def calculate_scores(self, src_emb, rel_emb, dst_emb):
if self.l1:
scores = tf.norm(src_emb + rel_emb - dst_emb, ord=1, axis=-1)
else:
scores = tf.norm(src_emb + rel_emb - dst_emb,
ord='euclidean', axis=-1)
scores = tf.negative(scores)
return scores
def _mrr(self, pos_scores, neg_scores):
scores_all = tf.concat([neg_scores, pos_scores], axis=2)
size = tf.shape(scores_all)[2]
_, indices_of_ranks = tf.nn.top_k(scores_all, k=size)
_, ranks = tf.nn.top_k(-indices_of_ranks, k=size)
return tf.reduce_mean(tf.reciprocal(tf.to_float(ranks[:, :, -1] + 1)))
def _mr(self, pos_scores, neg_scores):
scores_all = tf.concat([neg_scores, pos_scores], axis=2)
size = tf.shape(scores_all)[2]
_, indices_of_ranks = tf.nn.top_k(scores_all, k=size)
ranks = tf.argmax(indices_of_ranks, -1)
return tf.reduce_mean(ranks)
def _hit10(self, pos_scores, neg_scores):
scores_all = tf.concat([neg_scores, pos_scores], axis=2)
size = tf.shape(scores_all)[2]
_, indices_of_ranks = tf.nn.top_k(scores_all, k=size)
ranks = tf.argmax(indices_of_ranks, -1)
return tf.reduce_mean(tf.cast(tf.less(ranks, 10), tf.float32))
def loss_fn(self, pos_scores, neg_scores):
raise NotImplementedError()
def calculate_energy(self, src_emb, dst_emb, neg_emb, rel_emb):
# expand true triplets embedding dims to fit corrupted trplets
src_expand_emb = tf.tile(src_emb, [1, self.num_negs, 1])
rel_expand_emb = tf.tile(rel_emb, [1, self.num_negs, 1])
dst_expand_emb = tf.tile(dst_emb, [1, self.num_negs, 1])
pos_scores = self.calculate_scores(src_emb, rel_emb, dst_emb)
pos_scores = tf.reshape(pos_scores, [-1, 1, 1])
if self.corrupt == 'front':
neg_scores = self.calculate_scores(neg_emb,
rel_expand_emb,
dst_expand_emb)
neg_scores = tf.reshape(neg_scores, [-1, 1, self.num_negs])
elif self.corrupt == 'tail':
neg_scores = self.calculate_scores(src_expand_emb,
rel_expand_emb,
neg_emb)
neg_scores = tf.reshape(neg_scores, [-1, 1, self.num_negs])
else:
front_neg_scores = self.calculate_scores(neg_emb,
rel_expand_emb,
dst_expand_emb)
tail_neg_scores = self.calculate_scores(src_expand_emb,
rel_expand_emb,
neg_emb)
neg_scores = tf.reshape(tf.concat([front_neg_scores,
tail_neg_scores], axis=-1),
[-1, 1, self.num_negs*2])
loss = self.loss_fn(pos_scores, neg_scores)
if self.metric_name == 'mrr':
metric = self._mrr(pos_scores, neg_scores)
elif self.metric_name == 'mr':
metric = self._mr(pos_scores, neg_scores)
elif self.metric_name == 'hit10':
metric = self._hit10(pos_scores, neg_scores)
else:
msg = 'Metric name :{} not in list [mrr, mr, hit10]'\
.format(self.metric_name)
raise ValueError(msg)
return loss, metric
def generate_embedding(self, src, dst, neg, rel):
raise NotImplementedError()
def call(self, inputs):
src, dst, neg, rel = self.generate_triplets(inputs)
src_emb, dst_emb, neg_emb, rel_emb = \
self.generate_embedding(src, dst, neg, rel)
loss, metric = self.calculate_energy(src_emb, dst_emb,
neg_emb, rel_emb)
src_emb = tf.reshape(src_emb, [-1, self.ent_dim])
dst_emb = tf.reshape(dst_emb, [-1, self.ent_dim])
rel_emb = tf.reshape(rel_emb, [-1, self.rel_dim])
return ModelOutput(embedding=[src_emb, rel_emb, dst_emb],
loss=loss,
metric_name=self.metric_name,
metric=metric)
在实现TransE的时候,用户可以通过继承基类TransX,并且实现generate_embedding()和loss_fn() 来实现。
- generate_embedding()定义了对于知识图谱的三元组以及负样例的每个id的embedding生成。
- loss_fn()定义了依据哪一种距离约束来优化embedding。
class TransE(TransX):
def __init__(self, node_type, edge_type,
node_max_id, edge_max_id,
ent_dim, rel_dim,
num_negs=5, margin=1.,
l1=True, metric_name='mrr',
corrupt='both',
*args, **kwargs):
super(TransE, self).__init__(node_type, edge_type,
node_max_id, edge_max_id,
ent_dim, rel_dim,
num_negs=num_negs, l1=l1,
metric_name=metric_name,
corrupt=corrupt,
*args, **kwargs)
if ent_dim != rel_dim:
raise ValueError('Entity dim and Relation dim \
should be equal in TransE')
self.margin = margin
def generate_embedding(self, src, dst, neg, rel):
src_emb = self.entity_encoder(src)
dst_emb = self.entity_encoder(dst)
neg_emb = self.entity_encoder(neg)
rel_emb = self.relation_encoder(rel)
src_emb = self.norm_emb(src_emb)
dst_emb = self.norm_emb(dst_emb)
neg_emb = self.norm_emb(neg_emb)
rel_emb = self.norm_emb(rel_emb)
return src_emb, dst_emb, neg_emb, rel_emb
def loss_fn(self, pos_scores, neg_scores):
pos_scores = tf.reshape(pos_scores, [-1, 1, 1])
if self.corrupt == 'both':
neg_scores_mean = tf.reduce_mean(tf.reshape(neg_scores,
[-1, 2*self.num_negs]),
axis=-1, keep_dims=True)
else:
neg_scores_mean = tf.reduce_mean(tf.reshape(neg_scores,
[-1, self.num_negs]),
axis=-1, keep_dims=True)
neg_scores_mean = tf.reshape(neg_scores_mean, [-1, 1, 1])
loss = tf.reduce_mean(tf.maximum(self.margin + neg_scores_mean -
pos_scores, 0))
return loss
euler_graph = tf_euler.dataset.get_dataset('fb15k')
euler_graph.load_graph()
model = TransE(
node_type=euler_graph.train_node_type[0],
edge_type=euler_graph.train_edge_type,
node_max_id=euler_graph.max_node_id,
edge_max_id=euler_graph.max_edge_id,
ent_dim=flags_obj.embedding_dim,
rel_dim=flags_obj.embedding_dim,
num_negs=num_negs,
margin=flags_obj.margin,
l1=flags_obj.L1,
metric_name=flags_obj.metric_name,
corrupt=flags_obj.corrupt)
Euler-2.0提供了NodeEstimator GraphEstimator EdgeEstimator类和相应接口(详见这里),方便用户快速的完成模型训练,预测,embedding导出任务。其中NodeEstimator为点分类模型,GraphEstimator为图分类模型,EdgeEstimator为边分类模型(link prediction任务)。
因此每次输入的样本是一个三元组,即一条边,因此这里利用EdgeEstimator来训练TransE。
params = {'train_edge_type': euler_graph.train_edge_type[0],
'batch_size': flags_obj.batch_size,
'optimizer': flags_obj.optimizer,
'learning_rate': flags_obj.learning_rate,
'log_steps': flags_obj.log_steps,
'model_dir': flags_obj.model_dir,
'id_file': euler_graph.edge_id_file,
'infer_dir': flags_obj.model_dir,
'infer_type': flags_obj.infer_type,
'total_step': num_steps}
config = tf.estimator.RunConfig(log_step_count_steps=None)
model_estimator = EdgeEstimator(model, params, config)
if flags_obj.run_mode == 'train':
model_estimator.train()
elif flags_obj.run_mode == 'evaluate':
model_estimator.evaluate()
elif flags_obj.run_mode == 'infer':
model_estimator.infer()
else:
raise ValueError('Run mode not exist!')