forked from itratrahman/mapreduce_with_mrjobs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
12MovieSimilaritiesLarge.py
123 lines (93 loc) · 4.23 KB
/
12MovieSimilaritiesLarge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# To run on 20 EMR nodes:
# !python MovieSimilaritiesLarge.py -r emr --num-ec2-instances=20 --items=ml-1m/movies.dat ml-1m/ratings.dat
# Troubleshooting EMR jobs (subsitute your job ID):
# !python -m mrjob.tools.emr.fetch_logs --find-failure j-1NXMMBNEQHAFT
from mrjob.job import MRJob
from mrjob.step import MRStep
from math import sqrt
from itertools import combinations
class MovieSimilarities(MRJob):
def configure_options(self):
super(MovieSimilarities, self).configure_options()
self.add_file_option('--items', help='Path to movies.dat')
def load_movie_names(self):
# Load database of movie names.
self.movieNames = {}
with open("movies.dat") as f:
for line in f:
fields = line.split('::')
if (fields[0] != 'movieId'): # Skip first line
self.movieNames[int(fields[0])] = fields[1].decode('utf-8', 'ignore')
def steps(self):
return [
MRStep(mapper=self.mapper_parse_input,
reducer=self.reducer_ratings_by_user),
MRStep(mapper=self.mapper_create_item_pairs,
reducer=self.reducer_compute_similarity),
MRStep(mapper=self.mapper_sort_similarities,
mapper_init=self.load_movie_names,
reducer=self.reducer_output_similarities)]
def mapper_parse_input(self, key, line):
# Outputs userID => (movieID, rating)
(userID, movieID, rating, timestamp) = line.split('::')
if (userID != 'userId'): # Skip first line
yield userID, (movieID, float(rating))
def reducer_ratings_by_user(self, user_id, itemRatings):
#Group (item, rating) pairs by userID
ratings = []
for movieID, rating in itemRatings:
ratings.append((movieID, rating))
yield user_id, ratings
def mapper_create_item_pairs(self, user_id, itemRatings):
# Find every pair of movies each user has seen, and emit
# each pair with its associated ratings
# "combinations" finds every possible pair from the list of movies
# this user viewed.
for itemRating1, itemRating2 in combinations(itemRatings, 2):
movieID1 = itemRating1[0]
rating1 = itemRating1[1]
movieID2 = itemRating2[0]
rating2 = itemRating2[1]
# Produce both orders so sims are bi-directional
yield (movieID1, movieID2), (rating1, rating2)
yield (movieID2, movieID1), (rating2, rating1)
def cosine_similarity(self, ratingPairs):
# Computes the cosine similarity metric between two
# rating vectors.
numPairs = 0
sum_xx = sum_yy = sum_xy = 0
for ratingX, ratingY in ratingPairs:
sum_xx += ratingX * ratingX
sum_yy += ratingY * ratingY
sum_xy += ratingX * ratingY
numPairs += 1
numerator = sum_xy
denominator = sqrt(sum_xx) * sqrt(sum_yy)
score = 0
if (denominator):
score = (numerator / (float(denominator)))
return (score, numPairs)
def reducer_compute_similarity(self, moviePair, ratingPairs):
# Compute the similarity score between the ratings vectors
# for each movie pair viewed by multiple people
# Output movie pair => score, number of co-ratings
score, numPairs = self.cosine_similarity(ratingPairs)
# Enforce a minimum score and minimum number of co-ratings
# to ensure quality
if (numPairs > 10 and score > 0.95):
yield moviePair, (score, numPairs)
def mapper_sort_similarities(self, moviePair, scores):
# Shuffle things around so the key is (movie1, score)
# so we have meaningfully sorted results.
score, n = scores
movie1, movie2 = moviePair
yield (self.movieNames[int(movie1)], score), \
(self.movieNames[int(movie2)], n)
def reducer_output_similarities(self, movieScore, similarN):
# Output the results.
# Movie => Similar Movie, score, number of co-ratings
movie1, score = movieScore
for movie2, n in similarN:
yield movie1, (movie2, score, n)
if __name__ == '__main__':
MovieSimilarities.run()