diff --git a/contrib/sarplus/python/pysarplus/SARPlus.py b/contrib/sarplus/python/pysarplus/SARPlus.py index 1a0aa7ac8e..c3733e970a 100644 --- a/contrib/sarplus/python/pysarplus/SARPlus.py +++ b/contrib/sarplus/python/pysarplus/SARPlus.py @@ -49,13 +49,20 @@ def __init__( col_rating (str): rating column name col_timestamp (str): timestamp column name table_prefix (str): name prefix of the generated tables - similarity_type (str): ['cooccurrence', 'jaccard', 'lift'] option for computing item-item similarity - time_decay_coefficient (float): number of days till ratings are decayed by 1/2 - time_now (int | None): current time for time decay calculation + similarity_type (str): ['cooccurrence', 'jaccard', 'lift'] + option for computing item-item similarity + time_decay_coefficient (float): number of days till + ratings are decayed by 1/2. denominator in time + decay. Zero makes time decay irrelevant + time_now (int | None): current time for time decay + calculation timedecay_formula (bool): flag to apply time decay - threshold (int): item-item co-occurrences below this threshold will be removed - cache_path (str): user specified local cache directory for recommend_k_items(). If specified, - recommend_k_items() will do C++ based fast predictions. + threshold (int): item-item co-occurrences below this + threshold will be removed + cache_path (str): user specified local cache directory for + recommend_k_items(). If specified, + recommend_k_items() will do C++ based fast + predictions. """ assert threshold > 0 @@ -67,7 +74,7 @@ def __init__( "col_timestamp": col_timestamp, "prefix": table_prefix, "time_now": time_now, - "time_decay_coefficient": time_decay_coefficient, + "time_decay_half_life": time_decay_coefficient * 24 * 60 * 60, "threshold": threshold, } @@ -79,50 +86,52 @@ def __init__( def _format(self, string, **kwargs): return string.format(**self.header, **kwargs) - # denominator in time decay. Zero makes time decay irrelevant - # toggle the computation of time decay group by formula - # current time for time decay calculation - # cooccurrence matrix threshold def fit(self, df): """Main fit method for SAR. - Expects the dataframes to have row_id, col_id columns which are indexes, - i.e. contain the sequential integer index of the original alphanumeric user and item IDs. - Dataframe also contains rating and timestamp as floats; timestamp is in seconds since Epoch by default. + Expects the dataframes to have row_id, col_id columns which + are indexes, i.e. contain the sequential integer index of the + original alphanumeric user and item IDs. Dataframe also + contains rating and timestamp as floats; timestamp is in + seconds since Epoch by default. Arguments: - df (pySpark.DataFrame): input dataframe which contains the index of users and items. + df (pySpark.DataFrame): input dataframe which contains the + index of users and items. """ - # threshold - items below this number get set to zero in cooccurrence counts df.createOrReplaceTempView(self._format("{prefix}df_train_input")) if self.timedecay_formula: - # WARNING: previously we would take the last value in training dataframe and set it - # as a matrix U element - # for each user-item pair. Now with time decay, we compute a sum over ratings given - # by a user in the case - # when T=np.inf, so user gets a cumulative sum of ratings for a particular item and - # not the last rating. - # Time Decay - # does a group by on user item pairs and apply the formula for time decay there - # Time T parameter is in days and input time is in seconds, - # so we do dt/60/(T*24*60)=dt/(T*24*3600) - # the following is the query which we want to run - - query = self._format( - """ - SELECT - {col_user}, {col_item}, - SUM({col_rating} * EXP(-log(2) * (latest_timestamp - CAST({col_timestamp} AS long)) / ({time_decay_coefficient} * 3600 * 24))) as {col_rating} - FROM {prefix}df_train_input, - (SELECT CAST(MAX({col_timestamp}) AS long) latest_timestamp FROM {prefix}df_train_input) - GROUP BY {col_user}, {col_item} - CLUSTER BY {col_user} - """ - ) + # With time decay, we compute a sum over ratings given by + # a user in the case when T=np.inf, so user gets a + # cumulative sum of ratings for a particular item and not + # the last rating. Time Decay does a group by on user + # item pairs and apply the formula for time decay there + # Time T parameter is in days and input time is in + # seconds, so we do dt/60/(T*24*60)=dt/(T*24*3600) the + # following is the query which we want to run + + if self.header["time_now"] is None: + query = self._format(""" + SELECT CAST(MAX({col_timestamp}) AS long) + FROM {prefix}df_train_input + """) + self.header["time_now"] = self.spark.sql(query).first()[0] + + query = self._format(""" + SELECT {col_user}, + {col_item}, + SUM( + {col_rating} * + POW(2, (CAST({col_timestamp} AS LONG) - {time_now}) / {time_decay_half_life}) + ) AS {col_rating} + FROM {prefix}df_train_input + GROUP BY {col_user}, {col_item} + CLUSTER BY {col_user} + """) - # replace with timedecayed version + # replace with time-decayed version df = self.spark.sql(query) else: # since SQL is case-insensitive, this check needs to be performed similar @@ -262,7 +271,7 @@ def get_user_affinity(self, test): SELECT a.{col_user}, a.{col_item}, CAST(a.{col_rating} AS double) {col_rating} FROM {prefix}df_train a INNER JOIN {prefix}df_test_users b ON a.{col_user} = b.{col_user} DISTRIBUTE BY {col_user} - SORT BY {col_user}, {col_item} + SORT BY {col_user}, {col_item} """ ) diff --git a/contrib/sarplus/python/tests/test_pyspark_sar.py b/contrib/sarplus/python/tests/test_pyspark_sar.py index 1ebcc3767d..2570f5db8b 100644 --- a/contrib/sarplus/python/tests/test_pyspark_sar.py +++ b/contrib/sarplus/python/tests/test_pyspark_sar.py @@ -319,6 +319,30 @@ def test_user_affinity(spark, demo_usage_data, sar_settings, header): atol=sar_settings["ATOL"], ) + # Set time_now to 60 days later + user_affinity_ref = pd.read_csv( + sar_settings["FILE_DIR"] + "user_aff_2_months_later.csv" + ).iloc[:, 1:].squeeze() + user_affinity_ref = user_affinity_ref[user_affinity_ref > 0] + + two_months = 2 * 30 * (24 * 60 * 60) + model = SARPlus( + spark, + **header, + timedecay_formula=True, + time_decay_coefficient=30, + time_now=demo_usage_data[header["col_timestamp"]].max() + two_months, + similarity_type="cooccurrence", + ) + model.fit(spark.createDataFrame(demo_usage_data)) + df_test = pd.DataFrame({header["col_user"]: [sar_settings["TEST_USER_ID"]]}) + df_test = spark.createDataFrame(df_test) + user_affinity = model.get_user_affinity(df_test).toPandas() + user_affinity = user_affinity.set_index(header["col_item"])[header["col_rating"]] + user_affinity = user_affinity[user_affinity_ref.index] + + assert np.allclose(user_affinity_ref, user_affinity, atol=sar_settings["ATOL"]) + # Tests 8-10 @pytest.mark.parametrize( diff --git a/contrib/sarplus/scala/build.sbt b/contrib/sarplus/scala/build.sbt index 9ee7eea51a..7883c972da 100644 --- a/contrib/sarplus/scala/build.sbt +++ b/contrib/sarplus/scala/build.sbt @@ -5,7 +5,7 @@ import Utils._ -// Denpendency configuration +// Dependency configuration lazy val sparkVer = settingKey[String]("spark version") lazy val hadoopVer = settingKey[String]("hadoop version") diff --git a/tests/conftest.py b/tests/conftest.py index 64c46557d1..5511d300a6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,7 +5,7 @@ # https://docs.pytest.org/en/latest/fixture.html: # "If during implementing your tests you realize that you want to use a fixture function from multiple test files # you can move it to a conftest.py file. You don't need to import the module you defined your fixtures to use in a test, -# it automatically gets discovered by pytest and thus you can simply receive fixture objects by naming them as +# it automatically gets discovered by pytest, and thus you can simply receive fixture objects by naming them as # an input argument in the test." import calendar @@ -481,14 +481,14 @@ def test_specs(): @pytest.fixture(scope="module") def affinity_matrix(test_specs): - """Generate a random user/item affinity matrix. By increasing the likehood of 0 elements we simulate + """Generate a random user/item affinity matrix. By increasing the likelihood of 0 elements we simulate a typical recommending situation where the input matrix is highly sparse. Args: - users (int): number of users (rows). - items (int): number of items (columns). - ratings (int): rating scale, e.g. 5 meaning rates are from 1 to 5. - spars: probability of obtaining zero. This roughly corresponds to the sparseness. + test_specs["users"] (int): number of users (rows). + test_specs["items"] (int): number of items (columns). + test_specs["ratings"] (int): rating scale, e.g. 5 meaning rates are from 1 to 5. + test_specs["spars"]: probability of obtaining zero. This roughly corresponds to the sparseness. of the generated matrix. If spars = 0 then the affinity matrix is dense. Returns: @@ -512,7 +512,7 @@ def affinity_matrix(test_specs): X, ratio=test_specs["ratio"], seed=test_specs["seed"] ) - return (Xtr, Xtst) + return Xtr, Xtst # DeepRec Fixtures diff --git a/tests/unit/recommenders/models/test_sar_singlenode.py b/tests/unit/recommenders/models/test_sar_singlenode.py index f675764c07..73ecee2837 100644 --- a/tests/unit/recommenders/models/test_sar_singlenode.py +++ b/tests/unit/recommenders/models/test_sar_singlenode.py @@ -226,6 +226,24 @@ def test_user_affinity(demo_usage_data, sar_settings, header): atol=sar_settings["ATOL"], ) + # Set time_now to 60 days later + two_months = 2 * 30 * (24 * 60 * 60) + model = SARSingleNode( + similarity_type="cooccurrence", + timedecay_formula=True, + time_decay_coefficient=30, + time_now=demo_usage_data[header["col_timestamp"]].max() + two_months, + **header + ) + model.fit(demo_usage_data) + true_user_affinity_url = sar_settings["FILE_DIR"] + "user_aff_2_months_later.csv" + true_user_affinity = pd.read_csv(true_user_affinity_url).iloc[:, 1:] + user_index = model.user2index[sar_settings["TEST_USER_ID"]] + item_indexes = pd.Series(model.item2index)[true_user_affinity.columns] + sar_user_affinity = model.user_affinity[user_index].toarray().flatten()[item_indexes] + true_user_affinity = true_user_affinity.astype(sar_user_affinity.dtype) + assert np.allclose(true_user_affinity, sar_user_affinity, atol=sar_settings["ATOL"]) + @pytest.mark.parametrize( "threshold,similarity_type,file",