-
Notifications
You must be signed in to change notification settings - Fork 0
/
complex.py
371 lines (293 loc) · 19.2 KB
/
complex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
from typing import Tuple, Any
import numpy as np
import torch
from torch.nn import Parameter
from dataset import Dataset
from model import Model, DIMENSION, INIT_SCALE
class ComplEx(Model):
"""
The ComplEx class provides a Model implementation in PyTorch for the ComplEx system.
This implementation adheres to paper "Canonical Tensor Decomposition for Knowledge Base Completion",
and is largely inspired by the official implementation provided by the authors Lacroix, Usunier and Obozinski
at https://github.com/facebookresearch/kbc/tree/master/kbc.
In training or evaluation, our ComplEx class requires samples to be passed as 2-dimensional np.arrays.
Each row corresponds to a sample and contains the integer ids of its head, relation and tail.
Only *direct* samples should be passed to the model.
TODO: add documentation about inverse facts and relations
TODO: explain that the input must always be direct facts only
"""
def __init__(self,
dataset: Dataset,
hyperparameters: dict,
init_random = True):
"""
Constructor for ComplEx model.
:param dataset: the ComplExDataset on which to train and evaluate the model
:param hyperparameters: hyperparameters dictionary.
Must contain at least DIMENSION and INIT_SCALE
"""
# note: the init_random parameter is important because when initializing a KelpieComplEx,
# self.entity_embeddings and self.relation_embeddings must not be initialized as Parameters!
# initialize this object both as a Model and as a nn.Module
Model.__init__(self, dataset)
self.dataset = dataset
self.num_entities = dataset.num_entities # number of entities in dataset
self.num_relations = dataset.num_relations # number of relations in dataset
self.dimension = 2*hyperparameters[DIMENSION] # embedding dimension; it is 2* the passed dimension because each element must have both a real and an imaginary value
self.real_dimension = hyperparameters[DIMENSION] # dimension of the real part of each embedding
self.init_scale = hyperparameters[INIT_SCALE]
# create the embeddings for entities and relations as Parameters.
# We do not use the torch.Embeddings module here in order to keep the code uniform to the KelpieComplEx model,
# (on which torch.Embeddings can not be used as they do not allow the post-training mechanism).
# We have verified that this does not affect performances in any way.
# Each entity embedding and relation embedding is instantiated with size 2 * dimension because
# for each entity and relation we store real and imaginary parts as separate elements
if init_random:
self.entity_embeddings = Parameter(torch.rand(self.num_entities, self.dimension).cuda(), requires_grad=True)
self.relation_embeddings = Parameter(torch.rand(self.num_relations, self.dimension).cuda(), requires_grad=True)
# initialization step to make the embedding values smaller in the embedding space
with torch.no_grad():
self.entity_embeddings *= self.init_scale
self.relation_embeddings *= self.init_scale
def score(self, samples: np.array) -> np.array:
"""
Compute scores for the passed samples
:param samples: a 2-dimensional numpy array containing the samples to score, one per row
:return: a numpy array containing the scores of the passed samples
"""
lhs = self.entity_embeddings[samples[:, 0]] # list of entity embeddings for the heads of the facts
rel = self.relation_embeddings[samples[:, 1]] # list of relation embeddings for the relations of the heads
rhs = self.entity_embeddings[samples[:, 2]] # list of entity embeddings for the tails of the facts
return self.score_embeddings(lhs, rel, rhs).detach().cpu().numpy()
def score_embeddings(self,
head_embeddings: torch.Tensor,
rel_embeddings: torch.Tensor,
tail_embeddings: torch.Tensor):
"""
Compute scores for the passed triples of head, relation and tail embeddings.
:param head_embeddings: a torch.Tensor containing the embeddings representing the head entities
:param rel_embeddings: a torch.Tensor containing the embeddings representing the relations
:param tail_embeddings: a torch.Tensor containing the embeddings representing the tail entities
:return: a numpy array containing the scores computed for the passed triples of embeddings
"""
# NOTE: this method is extremely important, because apart from being called by the ComplEx score(samples) method
# it is also used to perform the operations of paper "Data Poisoning Attack against Knowledge Graph Embedding"
# that we use as a baseline and as a heuristic for our work.
# split the head embedding into real and imaginary components
lhs = head_embeddings[:, :self.real_dimension], head_embeddings[:, self.real_dimension:]
# split the relation embedding into real and imaginary components
rel = rel_embeddings[:, :self.real_dimension], rel_embeddings[:, self.real_dimension:]
# split the tail embedding into real and imaginary components
rhs = tail_embeddings[:, :self.real_dimension], tail_embeddings[:, self.real_dimension:]
# Bilinear product lhs * rel * rhs in complex scenario:
# lhs => lhs[0] + i*lhs[1]
# rel => rel[0] + i*rel[1]
# rhs => rhs[0] - i*rhs[1] (complex conjugate of the tail: negative sign to its imaginary part)
#
# ( lhs[0] + i*lhs[1] ) * ( rel[0] + i*rel[1] ) * ( rhs[0] - i*rhs[1] )
# ( lhs[0]*rel[0] + i*(lhs[0]*rel[1]) + i*(lhs[1]*rel[0]) + i*i*(lhs[1]*rel[1] ) * ( rhs[0] - i*rhs[1] )
# ( lhs[0]*rel[0] - lhs[1]*rel[1] + i(lhs[0]*rel[1] + lhs[1]*rel[0]) ) * ( rhs[0] - i*rhs[1] )
#
# (lhs[0]*rel[0] - lhs[1]*rel[1]) * rhs[0] + i*(...)*rhs[0] - i*(...)*rhs[1] - i*i*(lhs[0]*rel[1] + lhs[1]*rel[0])*rhs[1]
# (lhs[0]*rel[0] - lhs[1]*rel[1]) * rhs[0] + (lhs[0]*rel[1] + lhs[1]*rel[0]) * rhs[1] + i*(...)
#
# => the real part of the result is:
# (lhs[0] * rel[0] - lhs[1] * rel[1]) * rhs[0] +
# (lhs[0] * rel[1] + lhs[1] * rel[0]) * rhs[1]
return torch.sum(
(lhs[0] * rel[0] - lhs[1] * rel[1]) * rhs[0] +
(lhs[0] * rel[1] + lhs[1] * rel[0]) * rhs[1],
1, keepdim=True
)
def all_scores(self, samples: np.array) -> np.array:
"""
For each of the passed samples, compute scores for all possible tail entities.
:param samples: a 2-dimensional numpy array containing the samples to score, one per row
:return: a 2-dimensional numpy array that, for each sample, contains a row for each passed sample
and a column for each possible tail
"""
# for each fact <cur_head, cur_rel, cur_tail> to predict, get all (cur_head, cur_rel) couples
# and compute the scores using any possible entity as a tail
q = self._get_queries(samples)
rhs = self._get_rhs()
return q @ rhs # 2d matrix: each row corresponds to a sample and has the scores for all entities
def all_scores_relations(self, samples: np.array) -> np.array:
"""
For each of the passed samples, compute scores for all possible tail relations.
:param samples: a 2-dimensional numpy array containing the samples to score, one per row
:return: a 2-dimensional numpy array that, for each sample, contains a row for each passed sample
and a column for each possible relation
"""
output = []
all_relation_ids = np.array(range(self.num_relations))
for sample in samples:
samples_to_predict = np.tile(sample, (self.num_relations, 1))
samples_to_predict[:, 1] = all_relation_ids
relation_scores = self.score(samples_to_predict)
output.append(relation_scores)
return np.array(output)
def forward(self, samples: np.array):
"""
Perform forward propagation on the passed samples
:param samples: a 2-dimensional numpy array containing the samples to use in forward propagation, one per row
:return: a tuple containing
- the scores for each passed sample with all possible tails
- a partial result to use in regularization
"""
lhs = self.entity_embeddings[samples[:, 0]] # list of entity embeddings for the heads of the facts
rel = self.relation_embeddings[samples[:, 1]] # list of relation embeddings for the relations of the heads
rhs = self.entity_embeddings[samples[:, 2]] # list of entity embeddings for the tails of the facts
lhs = lhs[:, :self.real_dimension], lhs[:, self.real_dimension:]
rel = rel[:, :self.real_dimension], rel[:, self.real_dimension:]
rhs = rhs[:, :self.real_dimension], rhs[:, self.real_dimension:]
to_score = self.entity_embeddings
to_score = to_score[:, :self.real_dimension], to_score[:, self.real_dimension:]
# this returns two factors
# factor 1 is one matrix with the scores for the fact
#
# factor 2 is 3 matrices that will then be used for regularization:
# matrix 1: for each head entity get the real and imaginary component,
# square each of their elements
# sum the resulting vectors
# squareroot the elements of the resulting vector
# matrix 2: for each relation get the real and imaginary component,
# square each of their elements
# sum the resulting vectors
# squareroot the elements of the resulting vector
# matrix 3: for each tail entity get the real and imaginary component,
# square each of their elements
# sum the resulting vectors
# squareroot the elements of the resulting vector
return (
(lhs[0] * rel[0] - lhs[1] * rel[1]) @ to_score[0].transpose(0, 1) +
(lhs[0] * rel[1] + lhs[1] * rel[0]) @ to_score[1].transpose(0, 1)
), (
torch.sqrt(lhs[0] ** 2 + lhs[1] ** 2),
torch.sqrt(rel[0] ** 2 + rel[1] ** 2),
torch.sqrt(rhs[0] ** 2 + rhs[1] ** 2)
)
def _get_rhs(self):
"""
This private method computes, for each fact to score,
a partial factor of the score that only depends on the tail of the fact.
In our scenario, this actually just corresponds to the transposed embeddings of all entities.
Note: This method is used in conjunction with _get_queries
to obtain very efficiently, for each test fact, the score for each possible tail:
scores = output of _get_queries @ output of _get_rhs
:return: a torch.Tensor containing the embeddings of all entities (one per column)
"""
return self.entity_embeddings[:self.dataset.num_entities].transpose(0, 1).detach()
def _get_queries(self, samples: np.array):
"""
This private method computes, for each fact to score,
the partial factor of the score that only depends on the head and relation of the fact
Note: This method is used in conjunction with _get_rhs
to obtain very efficiently, for each test fact, the score for each possible tail:
scores = output of get_queries @ output of get_rhs
:param samples: the facts to compute the partial factors for, as a np.array
:return: a torch.Tensor with, in each row, the partial score factor of the head and relation for one fact
"""
# get the embeddings for head and relation of each fact
head_embeddings = self.entity_embeddings[samples[:, 0]]
relation_embeddings = self.relation_embeddings[samples[:, 1]]
# split both head embeddings and relation embeddings into real and imaginary parts
head_embeddings = head_embeddings[:, :self.real_dimension], head_embeddings[:, self.real_dimension:]
relation_embeddings = relation_embeddings[:, :self.real_dimension], relation_embeddings[:, self.real_dimension:]
# obtain a tensor that, in each row, has the partial score factor of the head and relation for one fact
return torch.cat([
head_embeddings[0] * relation_embeddings[0] - head_embeddings[1] * relation_embeddings[1],
head_embeddings[0] * relation_embeddings[1] + head_embeddings[1] * relation_embeddings[0]
], 1)
def predict_samples(self, samples: np.array) -> Tuple[Any, Any, Any]:
"""
This method takes as an input a tensor of 'direct' samples,
runs head and tail prediction on each of them
and returns
- the obtained scores for direct and inverse version of each sample,
- the obtained head and tail ranks for each sample
- the list of predicted entities for each sample
:param samples: a torch.Tensor containing all the DIRECT samples to predict.
They will be automatically inverted to perform head prediction
:return: three dicts mapping each passed direct sample (in Tuple format) respectively to
- the scores of that direct sample and of the corresponding inverse sample;
- the head and tail rank for that sample;
- the head and tail predictions for that sample
"""
direct_samples = samples
# make sure that all the passed samples are "direct" samples
assert np.all(direct_samples[:, 1] < self.dataset.num_direct_relations)
# output data structures
scores, ranks, predictions = [], [], []
# invert samples to perform head predictions
inverse_samples = self.dataset.invert_samples(direct_samples)
#obtain scores, ranks and predictions both for direct and inverse samples
direct_scores, tail_ranks, tail_predictions = self.predict_tails(direct_samples)
inverse_scores, head_ranks, head_predictions = self.predict_tails(inverse_samples)
for i in range(direct_samples.shape[0]):
# add to the scores list a couple containing the scores of the direct and of the inverse sample
scores.append((direct_scores[i], inverse_scores[i]))
# add to the ranks list a couple containing the ranks of the head and of the tail
ranks.append((int(head_ranks[i]), int(tail_ranks[i])))
# add to the prediction list a couple containing the lists of predictions
predictions.append((head_predictions[i], tail_predictions[i]))
return scores, ranks, predictions
def predict_tails(self, samples: np.array) -> Tuple[Any, Any, Any]:
"""
Returns filtered scores, ranks and predicted entities for each passed fact.
:param samples: a torch.LongTensor of triples (head, relation, tail).
The triples can also be "inverse triples" with (tail, inverse_relation_id, head)
:return:
"""
ranks = torch.ones(len(samples)) # initialize with ONES
with torch.no_grad():
# compute scores for each sample for all possible tails
all_scores = self.all_scores(samples)
# from the obtained scores, extract the the scores of the actual facts <cur_head, cur_rel, cur_tail>
targets = torch.zeros(size=(len(samples), 1)).cuda()
for i, (_, _, tail_id) in enumerate(samples):
targets[i, 0] = all_scores[i, tail_id].item()
# set to -1e6 the scores obtained using tail entities that must be filtered out (filtered scenario)
# In this way, those entities will be ignored in rankings
for i, (head_id, rel_id, tail_id) in enumerate(samples):
# get the list of tails to filter out; include the actual target tail entity too
filter_out = self.dataset.to_filter[(head_id, rel_id)]
if tail_id not in filter_out:
filter_out.append(tail_id)
all_scores[i, torch.LongTensor(filter_out)] = -1e6
# fill the ranks data structure and convert it to a Python list
ranks += torch.sum((all_scores >= targets).float(), dim=1).cpu() #ranks was initialized with ONES
ranks = ranks.cpu().numpy().tolist()
all_scores = all_scores.cpu().numpy()
targets = targets.cpu().numpy()
# save the list of all obtained scores
scores = [targets[i, 0] for i in range(len(samples))]
predictions = []
for i, (head_id, rel_id, tail_id) in enumerate(samples):
filter_out = self.dataset.to_filter[(head_id, rel_id)]
if tail_id not in filter_out:
filter_out.append(tail_id)
predicted_tails = np.where(all_scores[i] > -1e6)[0]
# get all not filtered tails and corresponding scores for current fact
#predicted_tails = np.where(all_scores[i] != -1e6)
predicted_tails_scores = all_scores[i, predicted_tails] #for cur_tail in predicted_tails]
# note: the target tail score and the tail id are in the same position in their respective lists!
#predicted_tails_scores = np.append(predicted_tails_scores, scores[i])
#predicted_tails = np.append(predicted_tails, [tail_id])
# sort the scores and predicted tails list in the same way
permutation = np.argsort(-predicted_tails_scores)
predicted_tails_scores = predicted_tails_scores[permutation]
predicted_tails = predicted_tails[permutation]
# include the score of the target tail in the predictions list
# after ALL entities with greater or equal scores (MIN policy)
j = 0
while j < len(predicted_tails_scores) and predicted_tails_scores[j] >= scores[i]:
j += 1
predicted_tails_scores = np.concatenate((predicted_tails_scores[:j],
np.array([scores[i]]),
predicted_tails_scores[j:]))
predicted_tails = np.concatenate((predicted_tails[:j],
np.array([tail_id]),
predicted_tails[j:]))
# add to the results data structure
predictions.append(predicted_tails) # as a np array!
return scores, ranks, predictions