Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement time_now for sarplus #1719

Merged
merged 8 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 50 additions & 41 deletions contrib/sarplus/python/pysarplus/SARPlus.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,20 @@ def __init__(
col_rating (str): rating column name
col_timestamp (str): timestamp column name
table_prefix (str): name prefix of the generated tables
similarity_type (str): ['cooccurrence', 'jaccard', 'lift'] option for computing item-item similarity
time_decay_coefficient (float): number of days till ratings are decayed by 1/2
time_now (int | None): current time for time decay calculation
similarity_type (str): ['cooccurrence', 'jaccard', 'lift']
option for computing item-item similarity
time_decay_coefficient (float): number of days till
ratings are decayed by 1/2. denominator in time
decay. Zero makes time decay irrelevant
time_now (int | None): current time for time decay
calculation
timedecay_formula (bool): flag to apply time decay
threshold (int): item-item co-occurrences below this threshold will be removed
cache_path (str): user specified local cache directory for recommend_k_items(). If specified,
recommend_k_items() will do C++ based fast predictions.
threshold (int): item-item co-occurrences below this
threshold will be removed
cache_path (str): user specified local cache directory for
recommend_k_items(). If specified,
recommend_k_items() will do C++ based fast
predictions.
"""
assert threshold > 0

Expand All @@ -67,7 +74,7 @@ def __init__(
"col_timestamp": col_timestamp,
"prefix": table_prefix,
"time_now": time_now,
"time_decay_coefficient": time_decay_coefficient,
"time_decay_half_life": time_decay_coefficient * 24 * 60 * 60,
"threshold": threshold,
}

Expand All @@ -79,50 +86,52 @@ def __init__(
def _format(self, string, **kwargs):
return string.format(**self.header, **kwargs)

# denominator in time decay. Zero makes time decay irrelevant
# toggle the computation of time decay group by formula
# current time for time decay calculation
# cooccurrence matrix threshold
def fit(self, df):
"""Main fit method for SAR.

Expects the dataframes to have row_id, col_id columns which are indexes,
i.e. contain the sequential integer index of the original alphanumeric user and item IDs.
Dataframe also contains rating and timestamp as floats; timestamp is in seconds since Epoch by default.
Expects the dataframes to have row_id, col_id columns which
are indexes, i.e. contain the sequential integer index of the
original alphanumeric user and item IDs. Dataframe also
contains rating and timestamp as floats; timestamp is in
seconds since Epoch by default.

Arguments:
df (pySpark.DataFrame): input dataframe which contains the index of users and items.
df (pySpark.DataFrame): input dataframe which contains the
index of users and items.
"""
# threshold - items below this number get set to zero in cooccurrence counts

df.createOrReplaceTempView(self._format("{prefix}df_train_input"))

if self.timedecay_formula:
# WARNING: previously we would take the last value in training dataframe and set it
# as a matrix U element
# for each user-item pair. Now with time decay, we compute a sum over ratings given
# by a user in the case
# when T=np.inf, so user gets a cumulative sum of ratings for a particular item and
# not the last rating.
# Time Decay
# does a group by on user item pairs and apply the formula for time decay there
# Time T parameter is in days and input time is in seconds,
# so we do dt/60/(T*24*60)=dt/(T*24*3600)
# the following is the query which we want to run

query = self._format(
"""
SELECT
{col_user}, {col_item},
SUM({col_rating} * EXP(-log(2) * (latest_timestamp - CAST({col_timestamp} AS long)) / ({time_decay_coefficient} * 3600 * 24))) as {col_rating}
FROM {prefix}df_train_input,
(SELECT CAST(MAX({col_timestamp}) AS long) latest_timestamp FROM {prefix}df_train_input)
GROUP BY {col_user}, {col_item}
CLUSTER BY {col_user}
"""
)
# With time decay, we compute a sum over ratings given by
# a user in the case when T=np.inf, so user gets a
# cumulative sum of ratings for a particular item and not
# the last rating. Time Decay does a group by on user
# item pairs and apply the formula for time decay there
# Time T parameter is in days and input time is in
# seconds, so we do dt/60/(T*24*60)=dt/(T*24*3600) the
# following is the query which we want to run

if self.header["time_now"] is None:
query = self._format("""
SELECT CAST(MAX({col_timestamp}) AS long)
FROM {prefix}df_train_input
""")
self.header["time_now"] = self.spark.sql(query).first()[0]

query = self._format("""
SELECT {col_user},
{col_item},
SUM(
{col_rating} *
POW(2, (CAST({col_timestamp} AS LONG) - {time_now}) / {time_decay_half_life})
) AS {col_rating}
FROM {prefix}df_train_input
GROUP BY {col_user}, {col_item}
CLUSTER BY {col_user}
Comment on lines +122 to +131
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed there is a change and we don't have the exponential anymore. If you get the same data in SAR and SAR+ with the time decay, do you get the same result?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Tests to verify are needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

""")

# replace with timedecayed version
# replace with time-decayed version
df = self.spark.sql(query)
else:
# since SQL is case-insensitive, this check needs to be performed similar
Expand Down Expand Up @@ -262,7 +271,7 @@ def get_user_affinity(self, test):
SELECT a.{col_user}, a.{col_item}, CAST(a.{col_rating} AS double) {col_rating}
FROM {prefix}df_train a INNER JOIN {prefix}df_test_users b ON a.{col_user} = b.{col_user}
DISTRIBUTE BY {col_user}
SORT BY {col_user}, {col_item}
SORT BY {col_user}, {col_item}
"""
)

Expand Down
24 changes: 24 additions & 0 deletions contrib/sarplus/python/tests/test_pyspark_sar.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,30 @@ def test_user_affinity(spark, demo_usage_data, sar_settings, header):
atol=sar_settings["ATOL"],
)

# Set time_now to 60 days later
user_affinity_ref = pd.read_csv(
sar_settings["FILE_DIR"] + "user_aff_2_months_later.csv"
).iloc[:, 1:].squeeze()
user_affinity_ref = user_affinity_ref[user_affinity_ref > 0]

two_months = 2 * 30 * (24 * 60 * 60)
model = SARPlus(
spark,
**header,
timedecay_formula=True,
time_decay_coefficient=30,
time_now=demo_usage_data[header["col_timestamp"]].max() + two_months,
similarity_type="cooccurrence",
)
model.fit(spark.createDataFrame(demo_usage_data))
df_test = pd.DataFrame({header["col_user"]: [sar_settings["TEST_USER_ID"]]})
df_test = spark.createDataFrame(df_test)
user_affinity = model.get_user_affinity(df_test).toPandas()
user_affinity = user_affinity.set_index(header["col_item"])[header["col_rating"]]
user_affinity = user_affinity[user_affinity_ref.index]

assert np.allclose(user_affinity_ref, user_affinity, atol=sar_settings["ATOL"])


# Tests 8-10
@pytest.mark.parametrize(
Expand Down
2 changes: 1 addition & 1 deletion contrib/sarplus/scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import Utils._

// Denpendency configuration
// Dependency configuration

lazy val sparkVer = settingKey[String]("spark version")
lazy val hadoopVer = settingKey[String]("hadoop version")
Expand Down
14 changes: 7 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# https://docs.pytest.org/en/latest/fixture.html:
# "If during implementing your tests you realize that you want to use a fixture function from multiple test files
# you can move it to a conftest.py file. You don't need to import the module you defined your fixtures to use in a test,
# it automatically gets discovered by pytest and thus you can simply receive fixture objects by naming them as
# it automatically gets discovered by pytest, and thus you can simply receive fixture objects by naming them as
# an input argument in the test."

import calendar
Expand Down Expand Up @@ -481,14 +481,14 @@ def test_specs():

@pytest.fixture(scope="module")
def affinity_matrix(test_specs):
"""Generate a random user/item affinity matrix. By increasing the likehood of 0 elements we simulate
"""Generate a random user/item affinity matrix. By increasing the likelihood of 0 elements we simulate
a typical recommending situation where the input matrix is highly sparse.

Args:
users (int): number of users (rows).
items (int): number of items (columns).
ratings (int): rating scale, e.g. 5 meaning rates are from 1 to 5.
spars: probability of obtaining zero. This roughly corresponds to the sparseness.
test_specs["users"] (int): number of users (rows).
test_specs["items"] (int): number of items (columns).
test_specs["ratings"] (int): rating scale, e.g. 5 meaning rates are from 1 to 5.
test_specs["spars"]: probability of obtaining zero. This roughly corresponds to the sparseness.
of the generated matrix. If spars = 0 then the affinity matrix is dense.

Returns:
Expand All @@ -512,7 +512,7 @@ def affinity_matrix(test_specs):
X, ratio=test_specs["ratio"], seed=test_specs["seed"]
)

return (Xtr, Xtst)
return Xtr, Xtst


# DeepRec Fixtures
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/recommenders/models/test_sar_singlenode.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,24 @@ def test_user_affinity(demo_usage_data, sar_settings, header):
atol=sar_settings["ATOL"],
)

# Set time_now to 60 days later
two_months = 2 * 30 * (24 * 60 * 60)
model = SARSingleNode(
similarity_type="cooccurrence",
timedecay_formula=True,
time_decay_coefficient=30,
time_now=demo_usage_data[header["col_timestamp"]].max() + two_months,
**header
)
model.fit(demo_usage_data)
true_user_affinity_url = sar_settings["FILE_DIR"] + "user_aff_2_months_later.csv"
true_user_affinity = pd.read_csv(true_user_affinity_url).iloc[:, 1:]
user_index = model.user2index[sar_settings["TEST_USER_ID"]]
item_indexes = pd.Series(model.item2index)[true_user_affinity.columns]
sar_user_affinity = model.user_affinity[user_index].toarray().flatten()[item_indexes]
true_user_affinity = true_user_affinity.astype(sar_user_affinity.dtype)
assert np.allclose(true_user_affinity, sar_user_affinity, atol=sar_settings["ATOL"])


@pytest.mark.parametrize(
"threshold,similarity_type,file",
Expand Down