-
Notifications
You must be signed in to change notification settings - Fork 0
/
bm25.py
152 lines (127 loc) · 5.05 KB
/
bm25.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import math
from six import iteritems
PARAM_K1 = 1.5
PARAM_B = 0.75
EPSILON = 0.25
class BM25(object):
"""Implementation of Best Matching 25 ranking function.
Attributes
----------
corpus_size : int
Size of corpus (number of documents).
avgdl : float
Average length of document in `corpus`.
doc_freqs : list of dicts of int
Dictionary with terms frequencies for each document in `corpus`. Words used as keys and frequencies as values.
idf : dict
Dictionary with inversed documents frequencies for whole `corpus`. Words used as keys and frequencies as values.
doc_len : list of int
List of document lengths.
"""
def __init__(self, corpus):
"""
Parameters
----------
corpus : list of list of str
Given corpus.
"""
self.corpus_size = 0
self.corpus = corpus
self.avgdl = 0
self.doc_freqs = []
self.idf = {}
self.doc_len = []
self.nd = {}
self._initialize(corpus)
def _initialize(self, corpus):
"""Calculates frequencies of terms in documents and in corpus. Also computes inverse document frequencies."""
nd = {} # word -> number of documents with word
num_doc = 0
for document in corpus:
self.corpus_size += 1
self.doc_len.append(len(document))
num_doc += len(document)
frequencies = {}
for word in document:
if word not in frequencies:
frequencies[word] = 0
frequencies[word] += 1
self.doc_freqs.append(frequencies)
for word, freq in iteritems(frequencies):
if word not in nd:
nd[word] = 0
nd[word] += 1
self.avgdl = float(num_doc) / self.corpus_size
# collect idf sum to calculate an average idf for epsilon value
idf_sum = 0
# collect words with negative idf to set them a special epsilon value.
# idf can be negative if word is contained in more than half of documents
negative_idfs = []
for word, freq in iteritems(nd):
idf = math.log(self.corpus_size - freq + 0.5) - math.log(freq + 0.5)
self.idf[word] = idf
idf_sum += idf
if idf < 0:
negative_idfs.append(word)
self.average_idf = float(idf_sum) / len(self.idf)
eps = EPSILON * self.average_idf
for word in negative_idfs:
self.idf[word] = eps
def get_score(self, document, index):
"""Computes BM25 score of given `document` in relation to item of corpus selected by `index`.
Parameters
----------
document : list of str
Document to be scored.
index : int
Index of document in corpus selected to score with `document`.
Returns
-------
float
BM25 score.
"""
score = 0
doc_freqs = self.doc_freqs[index]
for word in document:
if word not in doc_freqs:
continue
score += (self.idf[word] * doc_freqs[word] * (PARAM_K1 + 1)
/ (doc_freqs[word] + PARAM_K1 * (1 - PARAM_B + PARAM_B * self.doc_len[index] / self.avgdl)))
# score += self.idf[word] * doc_freqs[word]
return score
def get_scores(self, document):
"""Computes and returns BM25 scores of given `document` in relation to
every item in corpus.
Parameters
----------
document : list of str
Document to be scored.
Returns
-------
list of float
BM25 scores.
"""
scores = [self.get_score(document, index) for index in range(self.corpus_size)]
return scores
def get_words_score(self,document, index):
words_score = {}
doc_freqs = self.doc_freqs[index]
for word in document:
if word not in doc_freqs:
continue
score = (self.idf[word] * doc_freqs[word] * (PARAM_K1 + 1)
/ (doc_freqs[word] + PARAM_K1 * (1 - PARAM_B + PARAM_B * self.doc_len[index] / self.avgdl)))
# score = self.idf[word] * doc_freqs[word]
if word not in words_score:
words_score[word] = score
else:
words_score[word] = max(words_score[word],score)
word_score_tuples = [(word, score) for word, score in words_score.items()]
word_score_tuples = sorted(word_score_tuples,key=lambda x:x[1],reverse=True)
return word_score_tuples
def get_keywords(self, index, mention_tokens):
document = self.corpus[index]
intersection = list(set(document).intersection(set(mention_tokens)))
words_scores = self.get_words_score(intersection,index)
keywords = [ws[0] for ws in words_scores if ws[1] > 0 ]
return keywords