Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staging to master to add NCF hyperparameter tuning #1102

Merged
merged 64 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
5b1e551
update nni to 1.5
seanytak Apr 16, 2020
eb0c50b
Merge branch 'seanytak/ncf_test_tuning' of github.com:seanytak/recomm…
Apr 16, 2020
6e485a1
update nni==1.5
seanytak Apr 17, 2020
67e54de
update nni surprise svd for nni 1.5
seanytak Apr 17, 2020
f20cf2e
add ncf training using nni
seanytak Apr 21, 2020
457d363
add ncf tuning harness notebook
seanytak Apr 21, 2020
7a4f24e
add model comparison for svd and ncf
seanytak Apr 22, 2020
70d5420
clean up ncf training
seanytak Apr 23, 2020
75c2999
clean up ncf markdown
seanytak Apr 23, 2020
d71ab3f
removed unused params in ncf training
seanytak Apr 23, 2020
77214ef
update readme
seanytak Apr 23, 2020
bed9d6a
Merge branch 'staging' into seanytak/ncf_test_tuning
miguelgfierro Apr 23, 2020
e8d9f0f
added initial files
Apr 23, 2020
f195b36
updated notebook
Apr 23, 2020
3ebd747
Merge branch 'staging' into cheetm/add_lightfm_deepdive
miguelgfierro Apr 24, 2020
038745a
added unittests and updated scripts
Apr 24, 2020
5ab3cae
Merge branch 'staging' into seanytak/ncf_test_tuning
miguelgfierro Apr 24, 2020
f9253fa
Merge branch 'cheetm/add_lightfm_deepdive' of github.com:cheetm/recom…
Knostromo Apr 24, 2020
af3a133
added unittests and updated scripts
Knostromo Apr 24, 2020
f98a084
addressed some comments
Knostromo Apr 24, 2020
4cb36ae
remove svd components
seanytak Apr 27, 2020
4f491fb
fix up final test results reporting
seanytak Apr 27, 2020
68738b6
addressed some more comments
Knostromo Apr 27, 2020
e2bd87f
fixed latex
Knostromo Apr 27, 2020
9ec6c8b
fixed latex
Knostromo Apr 27, 2020
b5c2c6a
Merge branch 'staging' into seanytak/ncf_test_tuning
miguelgfierro Apr 28, 2020
42d0f61
Merge branch 'staging' into cheetm/add_lightfm_deepdive
miguelgfierro Apr 28, 2020
dc1947a
addressed more comments
Knostromo Apr 28, 2020
1d21c6e
Merge branch 'cheetm/add_lightfm_deepdive' of github.com:cheetm/recom…
Knostromo Apr 28, 2020
ead671c
remove output from cell with warnings
seanytak Apr 29, 2020
3f86742
fix output on nni svd surprise
seanytak Apr 29, 2020
1902400
fix notebook outputs for demonstration purposes
seanytak Apr 29, 2020
be04df2
remove old svd references in nni_ncf
seanytak Apr 30, 2020
4f8a3f1
remove old svd model saving
seanytak Apr 30, 2020
069d050
Merge pull request #1092 from seanytak/seanytak/ncf_test_tuning
miguelgfierro May 1, 2020
2d5e892
Merge branch 'master' into staging
miguelgfierro May 1, 2020
04f3657
Merge branch 'staging' into cheetm/add_lightfm_deepdive
miguelgfierro May 6, 2020
9af8055
updated scripts to work with reco metrics
Knostromo May 8, 2020
b2fd3f3
slight cleanup
Knostromo May 11, 2020
bb44da5
removed auc import from util py
Knostromo May 11, 2020
241ef2f
corrected typo and added sec 3.6
Knostromo May 11, 2020
4e78675
Merge pull request #1096 from cheetm/cheetm/add_lightfm_deepdive
miguelgfierro May 13, 2020
2f7ed86
tf is going bananas
miguelgfierro May 18, 2020
1544354
error with azure storage library
miguelgfierro May 18, 2020
8106c03
test comment
miguelgfierro May 18, 2020
1564033
blacked
miguelgfierro May 18, 2020
f632a5c
format
miguelgfierro May 18, 2020
9547cf8
format
miguelgfierro May 18, 2020
aff2c4b
format
miguelgfierro May 18, 2020
03add11
DRY
miguelgfierro May 18, 2020
6a2a880
minor
miguelgfierro May 18, 2020
1541979
license
miguelgfierro May 18, 2020
ee530ee
format
miguelgfierro May 18, 2020
5bb464c
format
miguelgfierro May 18, 2020
50b9a90
format
miguelgfierro May 18, 2020
a304749
refact
miguelgfierro May 18, 2020
4cd67ca
minor
miguelgfierro May 19, 2020
bd44402
put subparameters into nested arguments
zzn2 May 20, 2020
a61880d
Merge branch 'staging' into master
zzn2 May 21, 2020
d9420ea
Merge pull request #1107 from zzn2/master
miguelgfierro May 21, 2020
2a16dd0
sys.executable instead of root python
miguelgfierro May 22, 2020
ebb66ee
formating
miguelgfierro May 22, 2020
3ef022e
Merge branch 'staging' into random_fixes
miguelgfierro May 22, 2020
73b6622
Merge pull request #1105 from microsoft/random_fixes
miguelgfierro May 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The table below lists the recommender algorithms currently available in the repo
| Deep Knowledge-Aware Network (DKN)<sup>*</sup> | [Python CPU / Python GPU](notebooks/00_quick_start/dkn_synthetic.ipynb) | Content-Based Filtering | Deep learning algorithm incorporating a knowledge graph and article embeddings to provide powerful news or article recommendations |
| Extreme Deep Factorization Machine (xDeepFM)<sup>*</sup> | [Python CPU / Python GPU](notebooks/00_quick_start/xdeepfm_criteo.ipynb) | Hybrid | Deep learning based algorithm for implicit and explicit feedback with user/item features |
| FastAI Embedding Dot Bias (FAST) | [Python CPU / Python GPU](notebooks/00_quick_start/fastai_movielens.ipynb) | Collaborative Filtering | General purpose algorithm with embeddings and biases for users and items |
| LightFM/Hybrid Matrix Factorization | [Python CPU](notebooks/02_model/lightfm_deep_dive.ipynb) | Hybrid | Hybrid matrix factorization algorithm for both implicit and explicit feedbacks |
| LightGBM/Gradient Boosting Tree<sup>*</sup> | [Python CPU](notebooks/00_quick_start/lightgbm_tinycriteo.ipynb) / [PySpark](notebooks/02_model/mmlspark_lightgbm_criteo.ipynb) | Content-Based Filtering | Gradient Boosting Tree algorithm for fast training and low memory usage in content-based problems |
| GRU4Rec | [Python CPU / Python GPU](notebooks/00_quick_start/sequential_recsys_amazondataset.ipynb) | Collaborative Filtering | Sequential-based algorithm that aims to capture both long and short-term user preferences using recurrent neural networks |
| Neural Recommendation with Long- and Short-term User Representations (LSTUR)<sup>*</sup> | [Python CPU / Python GPU](notebooks/00_quick_start/lstur_synthetic.ipynb) | Content-Based Filtering | Neural recommendation algorithm with long- and short-term user interest modeling |
Expand Down
1 change: 0 additions & 1 deletion contrib/sarplus/python/pysarplus/SARModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ def find_or_raise(extension):

def predict(self, items, ratings, top_k, remove_seen):
return self.model.predict(items, ratings, top_k, remove_seen)

158 changes: 104 additions & 54 deletions contrib/sarplus/python/pysarplus/SARPlus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@
import logging
import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, IntegerType, FloatType
from pyspark.sql.types import (
StringType,
DoubleType,
StructType,
StructField,
IntegerType,
FloatType,
)
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pysarplus import SARModel

Expand All @@ -14,7 +21,8 @@
SIM_LIFT = "lift"

logging.basicConfig(level=logging.INFO)
log = logging.getLogger('sarplus')
log = logging.getLogger("sarplus")


class SARPlus:
"""SAR implementation for PySpark"""
Expand All @@ -31,7 +39,7 @@ def __init__(
time_decay_coefficient=30,
time_now=None,
timedecay_formula=False,
threshold=1
threshold=1,
):
assert threshold > 0

Expand All @@ -44,7 +52,7 @@ def __init__(
"prefix": table_prefix,
"time_now": time_now,
"time_decay_coefficient": time_decay_coefficient,
"threshold": threshold
"threshold": threshold,
}

self.similarity_type = similarity_type
Expand Down Expand Up @@ -83,24 +91,27 @@ def fit(self, df):
# the folling is the query which we want to run

query = self.f(
"""
"""
SELECT
{col_user}, {col_item},
SUM({col_rating} * EXP(-log(2) * (latest_timestamp - CAST({col_timestamp} AS long)) / ({time_decay_coefficient} * 3600 * 24))) as {col_rating}
FROM {prefix}df_train_input,
(SELECT CAST(MAX({col_timestamp}) AS long) latest_timestamp FROM {prefix}df_train_input)
GROUP BY {col_user}, {col_item}
CLUSTER BY {col_user}
""")
"""
)

# replace with timedecayed version
df = self.spark.sql(query)
else:
# since SQL is case insensitive, this check needs to be performed similar
if self.header['col_timestamp'].lower() in [s.name.lower() for s in df.schema]:
if self.header["col_timestamp"].lower() in [
s.name.lower() for s in df.schema
]:
# we need to de-duplicate items by using the latest item
query = self.f(
"""
"""
SELECT {col_user}, {col_item}, {col_rating}
FROM
(
Expand All @@ -112,7 +123,7 @@ def fit(self, df):
WHERE latest = 1
"""
)

df = self.spark.sql(query)

df.createOrReplaceTempView(self.f("{prefix}df_train"))
Expand All @@ -128,7 +139,8 @@ def fit(self, df):
GROUP BY A.{col_item}, B.{col_item}
HAVING COUNT(*) >= {threshold}
CLUSTER BY i1, i2
""")
"""
)

item_cooccurrence = self.spark.sql(query)
item_cooccurrence.write.mode("overwrite").saveAsTable(
Expand All @@ -148,7 +160,7 @@ def fit(self, df):
self.item_similarity = item_cooccurrence
elif self.similarity_type == SIM_JACCARD:
query = self.f(
"""
"""
SELECT i1, i2, value / (M1.margin + M2.margin - value) AS value
FROM {prefix}item_cooccurrence A
INNER JOIN {prefix}item_marginal M1 ON A.i1 = M1.i
Expand All @@ -159,7 +171,7 @@ def fit(self, df):
self.item_similarity = self.spark.sql(query)
elif self.similarity_type == SIM_LIFT:
query = self.f(
"""
"""
SELECT i1, i2, value / (M1.margin * M2.margin) AS value
FROM {prefix}item_cooccurrence A
INNER JOIN {prefix}item_marginal M1 ON A.i1 = M1.i
Expand All @@ -169,19 +181,22 @@ def fit(self, df):
)
self.item_similarity = self.spark.sql(query)
else:
raise ValueError("Unknown similarity type: {0}".format(self.similarity_type))

raise ValueError(
"Unknown similarity type: {0}".format(self.similarity_type)
)

# store upper triangular
log.info("sarplus.fit 2/2: compute similiarity metric %s..." % self.similarity_type)
log.info(
"sarplus.fit 2/2: compute similiarity metric %s..." % self.similarity_type
)
self.item_similarity.write.mode("overwrite").saveAsTable(
self.f("{prefix}item_similarity_upper")
)

# expand upper triangular to full matrix

query = self.f(
"""
"""
SELECT i1, i2, value
FROM
(
Expand Down Expand Up @@ -223,7 +238,7 @@ def get_user_affinity(self, test):
)

query = self.f(
"""
"""
SELECT a.{col_user}, a.{col_item}, CAST(a.{col_rating} AS double) {col_rating}
FROM {prefix}df_train a INNER JOIN {prefix}df_test_users b ON a.{col_user} = b.{col_user}
DISTRIBUTE BY {col_user}
Expand All @@ -233,53 +248,79 @@ def get_user_affinity(self, test):

return self.spark.sql(query)

def recommend_k_items(self, test, cache_path, top_k=10, remove_seen=True, n_user_prediction_partitions=200):
def recommend_k_items(
self,
test,
cache_path,
top_k=10,
remove_seen=True,
n_user_prediction_partitions=200,
):

# create item id to continuous index mapping
log.info("sarplus.recommend_k_items 1/3: create item index")
self.spark.sql(self.f("SELECT i1, row_number() OVER(ORDER BY i1)-1 idx FROM (SELECT DISTINCT i1 FROM {prefix}item_similarity) CLUSTER BY i1"))\
.write.mode("overwrite").saveAsTable(self.f("{prefix}item_mapping"))
self.spark.sql(
self.f(
"SELECT i1, row_number() OVER(ORDER BY i1)-1 idx FROM (SELECT DISTINCT i1 FROM {prefix}item_similarity) CLUSTER BY i1"
)
).write.mode("overwrite").saveAsTable(self.f("{prefix}item_mapping"))

# map similarity matrix into index space
self.spark.sql(self.f("""
self.spark.sql(
self.f(
"""
SELECT a.idx i1, b.idx i2, is.value
FROM {prefix}item_similarity is, {prefix}item_mapping a, {prefix}item_mapping b
WHERE is.i1 = a.i1 AND i2 = b.i1
"""))\
.write.mode("overwrite").saveAsTable(self.f("{prefix}item_similarity_mapped"))
"""
)
).write.mode("overwrite").saveAsTable(self.f("{prefix}item_similarity_mapped"))

cache_path_output = cache_path
if cache_path.startswith('dbfs:'):
cache_path_input = '/dbfs' + cache_path[5:]
if cache_path.startswith("dbfs:"):
cache_path_input = "/dbfs" + cache_path[5:]
else:
cache_path_input = cache_path

# export similarity matrix for C++ backed UDF
log.info("sarplus.recommend_k_items 2/3: prepare similarity matrix")

self.spark.sql(self.f("SELECT i1, i2, CAST(value AS DOUBLE) value FROM {prefix}item_similarity_mapped ORDER BY i1, i2"))\
.coalesce(1)\
.write.format("com.microsoft.sarplus").mode("overwrite")\
.save(cache_path_output)
self.spark.sql(
self.f(
"SELECT i1, i2, CAST(value AS DOUBLE) value FROM {prefix}item_similarity_mapped ORDER BY i1, i2"
)
).coalesce(1).write.format("com.microsoft.sarplus").mode("overwrite").save(
cache_path_output
)

self.get_user_affinity(test).createOrReplaceTempView(self.f("{prefix}user_affinity"))
self.get_user_affinity(test).createOrReplaceTempView(
self.f("{prefix}user_affinity")
)

# map item ids to index space
pred_input = self.spark.sql(self.f("""
pred_input = self.spark.sql(
self.f(
"""
SELECT {col_user}, idx, rating
FROM
(
SELECT {col_user}, b.idx, {col_rating} rating
FROM {prefix}user_affinity JOIN {prefix}item_mapping b ON {col_item} = b.i1
)
CLUSTER BY {col_user}
"""))
"""
)
)

schema = StructType([
StructField("userID", pred_input.schema[self.header['col_user']].dataType, True),
StructField("itemID", IntegerType(), True),
StructField("score", FloatType(), True)
])
schema = StructType(
[
StructField(
"userID", pred_input.schema[self.header["col_user"]].dataType, True
),
StructField("itemID", IntegerType(), True),
StructField("score", FloatType(), True),
]
)

# make sure only the header is pickled
local_header = self.header
Expand All @@ -291,33 +332,42 @@ def sar_predict_udf(df):
# The cache_path points to file write to by com.microsoft.sarplus
# This has exactly the memory layout we need and since the file is
# memory mapped, the memory consumption only happens ones per worker
# for all python processes
# for all python processes
model = SARModel(cache_path_input)
preds = model.predict(df['idx'].values, df['rating'].values, top_k, remove_seen)

user = df[local_header['col_user']].iloc[0]
preds = model.predict(
df["idx"].values, df["rating"].values, top_k, remove_seen
)

user = df[local_header["col_user"]].iloc[0]

preds_ret = pd.DataFrame(
[(user, x.id, x.score) for x in preds],
columns=range(3))
[(user, x.id, x.score) for x in preds], columns=range(3)
)

return preds_ret

log.info("sarplus.recommend_k_items 3/3: compute recommendations")

df_preds = pred_input\
.repartition(n_user_prediction_partitions, self.header['col_user'])\
.groupby(self.header['col_user'])\
df_preds = (
pred_input.repartition(
n_user_prediction_partitions, self.header["col_user"]
)
.groupby(self.header["col_user"])
.apply(sar_predict_udf)
)

df_preds.createOrReplaceTempView(self.f("{prefix}predictions"))

return self.spark.sql(self.f("""
return self.spark.sql(
self.f(
"""
SELECT userID {col_user}, b.i1 {col_item}, score
FROM {prefix}predictions p, {prefix}item_mapping b
WHERE p.itemID = b.idx
"""))

"""
)
)

def recommend_k_items_slow(self, test, top_k=10, remove_seen=True):
"""Recommend top K items for all users which are in the test set.

Expand All @@ -331,9 +381,9 @@ def recommend_k_items_slow(self, test, top_k=10, remove_seen=True):
if remove_seen:
raise ValueError("Not implemented")

self.get_user_affinity(test)\
.write.mode("overwrite")\
.saveAsTable(self.f("{prefix}user_affinity"))
self.get_user_affinity(test).write.mode("overwrite").saveAsTable(
self.f("{prefix}user_affinity")
)

# user_affinity * item_similarity
# filter top-k
Expand All @@ -357,4 +407,4 @@ def recommend_k_items_slow(self, test, top_k=10, remove_seen=True):
top_k=top_k,
)

return self.spark.sql(query)
return self.spark.sql(query)
2 changes: 1 addition & 1 deletion contrib/sarplus/python/pysarplus/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .SARModel import SARModel
from .SARPlus import SARPlus
from .SARPlus import SARPlus
Loading