-
Notifications
You must be signed in to change notification settings - Fork 78
/
UserCF.py
160 lines (145 loc) · 6.24 KB
/
UserCF.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# -*- coding = utf-8 -*-
"""
User-based Collaborative filtering.
Created on 2018-04-15
@author: fuxuemingzhu
"""
import collections
from operator import itemgetter
import math
from collections import defaultdict
import similarity
import utils
from utils import LogTime
class UserBasedCF:
"""
User-based Collaborative filtering.
Top-N recommendation.
"""
def __init__(self, k_sim_user=20, n_rec_movie=10, use_iif_similarity=False, save_model=True):
"""
Init UserBasedCF with n_sim_user and n_rec_movie.
:return: None
"""
print("UserBasedCF start...\n")
self.k_sim_user = k_sim_user
self.n_rec_movie = n_rec_movie
self.trainset = None
self.save_model = save_model
self.use_iif_similarity = use_iif_similarity
def fit(self, trainset):
"""
Fit the trainset by calculate user similarity matrix.
:param trainset: train dataset
:return: None
"""
model_manager = utils.ModelManager()
try:
self.user_sim_mat = model_manager.load_model(
'user_sim_mat-iif' if self.use_iif_similarity else 'user_sim_mat')
self.movie_popular = model_manager.load_model('movie_popular')
self.movie_count = model_manager.load_model('movie_count')
self.trainset = model_manager.load_model('trainset')
print('User origin similarity model has saved before.\nLoad model success...\n')
except OSError:
print('No model saved before.\nTrain a new model...')
self.user_sim_mat, self.movie_popular, self.movie_count = \
similarity.calculate_user_similarity(trainset=trainset,
use_iif_similarity=self.use_iif_similarity)
self.trainset = trainset
print('Train a new model success.')
if self.save_model:
model_manager.save_model(self.user_sim_mat,
'user_sim_mat-iif' if self.use_iif_similarity else 'user_sim_mat')
model_manager.save_model(self.movie_popular, 'movie_popular')
model_manager.save_model(self.movie_count, 'movie_count')
print('The new model has saved success.\n')
def recommend(self, user):
"""
Find K similar users and recommend N movies for the user.
:param user: The user we recommend movies to.
:return: the N best score movies
"""
if not self.user_sim_mat or not self.n_rec_movie or \
not self.trainset or not self.movie_popular or not self.movie_count:
raise NotImplementedError('UserCF has not init or fit method has not called yet.')
K = self.k_sim_user
N = self.n_rec_movie
predict_score = collections.defaultdict(int)
if user not in self.trainset:
print('The user (%s) not in trainset.' % user)
return
# print('Recommend movies to user start...')
watched_movies = self.trainset[user]
for similar_user, similarity_factor in sorted(self.user_sim_mat[user].items(),
key=itemgetter(1), reverse=True)[0:K]:
for movie, rating in self.trainset[similar_user].items():
if movie in watched_movies:
continue
# predict the user's "interest" for each movie
# the predict_score is sum(similarity_factor * rating)
predict_score[movie] += similarity_factor * rating
# log steps and times.
# print('Recommend movies to user success.')
# return the N best score movies
return [movie for movie, _ in sorted(predict_score.items(), key=itemgetter(1), reverse=True)[0:N]]
def test(self, testset):
"""
Test the recommendation system by recommending scores to all users in testset.
:param testset: test dataset
:return:
"""
if not self.n_rec_movie or not self.trainset or not self.movie_popular or not self.movie_count:
raise ValueError('UserCF has not init or fit method has not called yet.')
self.testset = testset
print('Test recommendation system start...')
N = self.n_rec_movie
# varables for precision and recall
hit = 0
rec_count = 0
test_count = 0
# varables for coverage
all_rec_movies = set()
# varables for popularity
popular_sum = 0
# record the calculate time has spent.
test_time = LogTime(print_step=1000)
for i, user in enumerate(self.trainset):
test_movies = self.testset.get(user, {})
rec_movies = self.recommend(user) # type:list
for movie in rec_movies:
if movie in test_movies:
hit += 1
all_rec_movies.add(movie)
popular_sum += math.log(1 + self.movie_popular[movie])
# log steps and times.
rec_count += N
test_count += len(test_movies)
# print time per 500 times.
test_time.count_time()
precision = hit / (1.0 * rec_count)
recall = hit / (1.0 * test_count)
coverage = len(all_rec_movies) / (1.0 * self.movie_count)
popularity = popular_sum / (1.0 * rec_count)
print('Test recommendation system success.')
test_time.finish()
print('precision=%.4f\trecall=%.4f\tcoverage=%.4f\tpopularity=%.4f\n' %
(precision, recall, coverage, popularity))
def predict(self, testset):
"""
Recommend movies to all users in testset.
:param testset: test dataset
:return: `dict` : recommend list for each user.
"""
movies_recommend = defaultdict(list)
print('Predict scores start...')
# record the calculate time has spent.
predict_time = LogTime(print_step=500)
for i, user in enumerate(testset):
rec_movies = self.recommend(user) # type:list
movies_recommend[user].append(rec_movies)
# log steps and times.
predict_time.count_time()
print('Predict scores success.')
predict_time.finish()
return movies_recommend