-
Notifications
You must be signed in to change notification settings - Fork 0
/
movie-similarities.py
130 lines (102 loc) · 4.36 KB
/
movie-similarities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import sys
from pyspark import SparkConf, SparkContext
from math import sqrt
import codecs
def loadMovieNames():
movieNames = {}
movieGenres = {}
with codecs.open("u.item", encoding='ascii', errors='ignore') as f:
for line in f:
fields = line.split('|')
movieNames[int(fields[0])] = fields[1]
movieGenres[int(fields[0])] = fields[5:23]
return movieNames, movieGenres
#Python 3 doesn't let you pass around unpacked tuples,
#so we explicitly extract the ratings now.
def makePairs( userRatings ):
ratings = userRatings[1]
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return ((movie1, movie2), (rating1, rating2))
def filterDuplicates( userRatings ):
ratings = userRatings[1]
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return movie1 < movie2
def computeCosineSimilarity(ratingPairs):
numPairs = 0
sum_xx = sum_yy = sum_xy = 0
for ratingX, ratingY in ratingPairs:
sum_xx += ratingX * ratingX
sum_yy += ratingY * ratingY
sum_xy += ratingX * ratingY
numPairs += 1
numerator = sum_xy
denominator = sqrt(sum_xx) * sqrt(sum_yy)
score = 0
if (denominator):
score = (numerator / (float(denominator)))
return (score, numPairs)
def filterBadMovies(userRatings):
ratings = userRatings[1][1]
return ratings > 2
def GetGenres(moviePairs):
movie1 = moviePairs[0][0]
movie2 = moviePairs[0][1]
genreMatch = [int(a) and int(b) for a, b in zip(genreDict[movie1], genreDict[movie2])]
score = sum(genreMatch)
return ((movie1, movie2),score)
conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities")
sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
print("\nLoading movie names...")
nameDict, genreDict = loadMovieNames()
data = sc.textFile("u.data")
# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))
filteredMovies = ratings.filter(filterBadMovies)
# Emit every movie rated together by the same user.
# Self-join to find every combination.
joinedRatings = filteredMovies.join(filteredMovies)
# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))
# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)
# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs)
# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()
# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities.
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()
movieGenreSimilarities = moviePairSimilarities.map(GetGenres)
movieRatingsAndGenreScore = moviePairSimilarities.join(movieGenreSimilarities)
print(movieGenreSimilarities.take(1))
print("------------------------------------------------------------")
print(moviePairSimilarities.take(1))
print("------------------------------------------------------------")
print(movieRatingsAndGenreScore.take(1))
# print(genreDict)
# Save the results if desired
#moviePairSimilarities.sortByKey()
#moviePairSimilarities.saveAsTextFile("movie-sims")
# Extract similarities for the movie we care about that are "good".
if (len(sys.argv) > 1):
scoreThreshold = 0.97
coOccurenceThreshold = 50
movieID = int(sys.argv[1])
# Filter for movies with this sim that are "good" as defined by
# our quality thresholds above
filteredResults = movieRatingsAndGenreScore.filter(lambda pairSim: \
(pairSim[0][0] == movieID or pairSim[0][1] == movieID) \
and pairSim[1][0][0] > scoreThreshold and pairSim[1][1][1] > coOccurenceThreshold)
# Sort by quality score.
results = filteredResults.map(lambda pairSim: (pairSim[1], pairSim[0])).sortByKey(ascending = False).take(10)
print("Top 10 similar movies for " + nameDict[movieID])
for result in results:
(sim, pair) = result
# Display the similarity result that isn't the movie we're looking at
similarMovieID = pair[0]
if (similarMovieID == movieID):
similarMovieID = pair[1]
print(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))