Skip to content

Commit

Permalink
Merge pull request #34 from TogetherCrew/feat/33-compute-latest-graph…
Browse files Browse the repository at this point in the history
…-analytics

feat: computing neo4j analytics for just the only latest date!
  • Loading branch information
amindadgar authored Sep 23, 2024
2 parents 67fe177 + 44d33ad commit 7d8d320
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 151 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name="tc-analyzer-lib",
version="1.4.9",
version="1.4.10",
author="Mohammad Amin Dadgar, TogetherCrew",
maintainer="Mohammad Amin Dadgar",
maintainer_email="dadgaramin96@gmail.com",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ def __init__(
self.projection_utils = ProjectionUtils(self.platform_id, self.graph_schema)

def compute_stats(self, from_start: bool) -> None:
"""
from_start is disabled. We would always compute just for the latest date
"""
# possible dates to do the computations
possible_dates = self.projection_utils.get_dates()

# if we didn't want to compute from the day start
if not from_start:
computed_dates = self.get_computed_dates()
possible_dates = possible_dates - computed_dates
# if not from_start:
# computed_dates = self.get_computed_dates()
# possible_dates = possible_dates - computed_dates

for date in possible_dates:
try:
Expand Down
44 changes: 24 additions & 20 deletions tc_analyzer_lib/algorithms/neo4j_analysis/centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def compute_degree_centerality(
the computed_dates will be based on the
network decentrality metric computations
NOTE: `from_start` is disabled and we're always computing for the latest date available
Parameters:
------------
direction : str
Expand Down Expand Up @@ -68,30 +70,32 @@ def compute_degree_centerality(
node = self.graph_schema.user_label
weighted = True if "weighted" not in kwargs.keys() else kwargs["weighted"]
normalize = False if "normalize" not in kwargs.keys() else kwargs["normalize"]
preserve_parallel = (
True
if "preserve_parallel" not in kwargs.keys()
else kwargs["preserve_parallel"]
)
preserve_parallel = kwargs.get("preserve_parallel", True)

recompute_dates = None
if "recompute_dates" in kwargs:
recompute_dates = kwargs["recompute_dates"]
# recompute_dates = None
# if "recompute_dates" in kwargs:
# recompute_dates = kwargs["recompute_dates"]

if weighted and not preserve_parallel:
logging.warn(
logging.warning(
"""preserver_parallel=False with weighted=True
could produce wrong results!"""
)

interacted_with_label = self.graph_schema.interacted_with_rel
query = """
MATCH () -[r:INTERACTED_WITH {platformId: $platform_id}]-()
WITH max(r.date) as latest_date
"""

# determining one line of the query useing the direction variable
interaction = f"[r:{interacted_with_label} {{date: latest_date}}]"
if direction == "in_degree":
query = f"MATCH (a:{node})<-[r:{interacted_with_label}]-(b:{node})"
query += f"MATCH (a:{node})<-{interaction}-(b:{node})"
elif direction == "out_degree":
query = f"MATCH (a:{node})-[r:{interacted_with_label}]->(b:{node})"
query += f"MATCH (a:{node})-{interaction}->(b:{node})"
elif direction == "undirected":
query = f"MATCH (a:{node})-[r:{interacted_with_label}]-(b:{node})"
query += f"MATCH (a:{node})-{interaction}-(b:{node})"

results = self.neo4j_ops.gds.run_cypher(
f"""
Expand All @@ -107,14 +111,14 @@ def compute_degree_centerality(
)

dates_to_compute = set(results["date"].value_counts().index)
if not from_start:
projection_utils = ProjectionUtils(self.platform_id, self.graph_schema)

dates_to_compute = self._get_dates_to_compute(
projection_utils, dates_to_compute
)
if recompute_dates is not None:
dates_to_compute = dates_to_compute.union(recompute_dates)
# if not from_start:
# projection_utils = ProjectionUtils(self.platform_id, self.graph_schema)

# dates_to_compute = self._get_dates_to_compute(
# projection_utils, dates_to_compute
# )
# if recompute_dates is not None:
# dates_to_compute = dates_to_compute.union(recompute_dates)

degree_centerality = self.count_degrees(
computation_date=dates_to_compute,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ def compute(self, from_start: bool = False) -> None:

computable_dates = self.projection_utils.get_dates()

# compute for each date
to_compute: set[float]
if from_start:
to_compute = computable_dates
else:
computed_dates = self.get_computed_dates()
to_compute = computable_dates - computed_dates

for date in to_compute:
# # compute for each date
# to_compute: set[float]
# if from_start:
# to_compute = computable_dates
# else:
# computed_dates = self.get_computed_dates()
# to_compute = computable_dates - computed_dates

for date in computable_dates:
try:
self.closeness_computation_wrapper(date)
except Exception as exp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ def compute(self, from_start: bool = False) -> None:
# Getting all possible dates
computable_dates = self.projection_utils.get_dates()

computed_dates = self.get_computed_dates()
# computed_dates = self.get_computed_dates()

# compute for each date
to_compute: set[float]
if from_start:
to_compute = computable_dates
else:
to_compute = computable_dates - computed_dates
# to_compute: set[float]
# if from_start:
# to_compute = computable_dates
# else:
# to_compute = computable_dates - computed_dates

# for the computation date
for date in to_compute:
for date in computable_dates:
try:
self.local_clustering_computation_wrapper(date=date)
except Exception as exp:
Expand Down
16 changes: 8 additions & 8 deletions tc_analyzer_lib/algorithms/neo4j_analysis/louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ def compute(self, from_start: bool = False) -> None:
computable_dates = self.projection_utils.get_dates()

# compute for each date
to_compute: set[float]
if from_start:
to_compute = computable_dates
else:
computed_dates = self.get_computed_dates()
to_compute = computable_dates - computed_dates

for date in to_compute:
# to_compute: set[float]
# if from_start:
# to_compute = computable_dates
# else:
# computed_dates = self.get_computed_dates()
# to_compute = computable_dates - computed_dates

for date in computable_dates:
try:
self.louvain_computation_wrapper(date)
except Exception as exp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def project_temp_graph(
def get_dates(self) -> set[float]:
"""
get all the dates we do have on the INTERACTED_WITH relations
Note: returning just the only previous date
Parameters:
------------
Expand All @@ -124,8 +125,7 @@ def get_dates(self) -> set[float]:
f"""
MATCH (a:{self.user_label})
-[r:{self.between_user_label} {{platformId: $platform_id}}]-()
WITH DISTINCT(r.date) as dates
RETURN dates
RETURN r.date as dates ORDER BY dates DESC LIMIT 1
""",
params={"platform_id": self.platform_id},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ def test_available_date(self):
louvain = ClosenessCentrality(platform_id, graph_schema)
louvain.compute(from_start=False)

yesterday_results = self.neo4j_ops.gds.run_cypher(
f"""
MATCH (user:{graph_schema.user_label})-[r:HAVE_METRICS {{platformId: '{platform_id}', date: {yesterday}}}]->(user)
RETURN r.date as date, r.closenessCentrality as closenessScore
"""
)
# yesterday three users were interacting
assert len(yesterday_results) == 3
assert yesterday_results["date"].iloc[0] == yesterday
# yesterday_results = self.neo4j_ops.gds.run_cypher(
# f"""
# MATCH (user:{graph_schema.user_label})-[r:HAVE_METRICS {{platformId: '{platform_id}', date: {yesterday}}}]->(user)
# RETURN r.date as date, r.closenessCentrality as closenessScore
# """
# )
# # yesterday three users were interacting
# assert len(yesterday_results) == 3
# assert yesterday_results["date"].iloc[0] == yesterday

today_results = self.neo4j_ops.gds.run_cypher(
f"""
Expand Down Expand Up @@ -157,15 +157,15 @@ def test_more_available_data(self):

louvain.compute(from_start=False)

yesterday_results = self.neo4j_ops.gds.run_cypher(
f"""
MATCH (user:{graph_schema.user_label})-[r:HAVE_METRICS {{platformId: '{platform_id}', date: {yesterday}}}]->(user)
RETURN r.date as date, r.closenessCentrality as closenessScore
"""
)
# yesterday 4 users were interacting
assert len(yesterday_results) == 4
assert yesterday_results["date"].iloc[0] == yesterday
# yesterday_results = self.neo4j_ops.gds.run_cypher(
# f"""
# MATCH (user:{graph_schema.user_label})-[r:HAVE_METRICS {{platformId: '{platform_id}', date: {yesterday}}}]->(user)
# RETURN r.date as date, r.closenessCentrality as closenessScore
# """
# )
# # yesterday 4 users were interacting
# assert len(yesterday_results) == 4
# assert yesterday_results["date"].iloc[0] == yesterday

today_results = self.neo4j_ops.gds.run_cypher(
f"""
Expand Down
34 changes: 17 additions & 17 deletions tests/integration/test_closeness_centrality_with_mutual_ties.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,23 +232,23 @@ def test_three_vertices_two_dates_from_start_true(self):
centrality = ClosenessCentrality(platform_id, self.graph_schema)
centrality.compute(from_start=True)

yesterday_results = self.neo4j_ops.gds.run_cypher(
f"""
MATCH (user:{self.graph_schema.user_label})-[r:HAVE_METRICS {{platformId: '{platform_id}', date: {yesterday}}}]->(user)
RETURN user.id as userid, r.date as date, r.closenessCentrality as closenessScore
"""
)
self.assertEqual(len(yesterday_results), 3)

# the yesterday scores should be recomputed
for _, row in yesterday_results.iterrows():
self.assertEqual(row["date"], yesterday)

if row["userid"] == "a" or row["userid"] == "b" or row["userid"] == "c":
expected_score = 1 if row["userid"] == "a" else 2 / 3
self.assertAlmostEqual(row["closenessScore"], expected_score)
else:
raise ValueError("Never should reach here!")
# yesterday_results = self.neo4j_ops.gds.run_cypher(
# f"""
# MATCH (user:{self.graph_schema.user_label})-[r:HAVE_METRICS {{platformId: '{platform_id}', date: {yesterday}}}]->(user)
# RETURN user.id as userid, r.date as date, r.closenessCentrality as closenessScore
# """
# )
# self.assertEqual(len(yesterday_results), 3)

# # the yesterday scores should be recomputed
# for _, row in yesterday_results.iterrows():
# self.assertEqual(row["date"], yesterday)

# if row["userid"] == "a" or row["userid"] == "b" or row["userid"] == "c":
# expected_score = 1 if row["userid"] == "a" else 2 / 3
# self.assertAlmostEqual(row["closenessScore"], expected_score)
# else:
# raise ValueError("Never should reach here!")

today_results = self.neo4j_ops.gds.run_cypher(
f"""
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_decentralization_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ def test_decentralization_score():
)

# because python is not good with equality comparison of float values
assert network_decentrality[yesterday] - 133.33 < 0.1
# assert network_decentrality[yesterday] - 133.33 < 0.1
assert network_decentrality[today] - 66.66 < 0.1
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ def test_partially_connected_coeffs():
)

print(degree_centrality)
assert degree_centrality[yesterday]["1000"] == 2 / 3
# assert degree_centrality[yesterday]["1000"] == 2 / 3
assert degree_centrality[today]["1000"] == 1 / 2

assert degree_centrality[yesterday]["1001"] == 1
# assert degree_centrality[yesterday]["1001"] == 1
assert degree_centrality[today]["1001"] == 1

assert degree_centrality[yesterday]["1002"] == 2 / 3
# assert degree_centrality[yesterday]["1002"] == 2 / 3
assert degree_centrality[today]["1002"] == 3 / 4

assert degree_centrality[yesterday]["1003"] == 1
# assert degree_centrality[yesterday]["1003"] == 1
assert degree_centrality[today]["1003"] == 1 / 2

assert "1004" not in degree_centrality[yesterday]
# assert "1004" not in degree_centrality[yesterday]
assert degree_centrality[today]["1004"] == 1 / 4
21 changes: 6 additions & 15 deletions tests/integration/test_lcc_all_connected.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,13 @@ def test_all_connected_coeffs():
)

user0_id = "1000"
expected_results_user0 = [
[user0_id, yesterday, 1.0],
[user0_id, today, 0.0],
]
assert expected_results_user0 in results[results.userId == user0_id].values
expected_results_user0 = [user0_id, today, 0.0]
assert expected_results_user0 == list(results[results.userId == user0_id].values[0])

user1_id = "1001"
expected_results_user1 = [
[user1_id, yesterday, 1.0],
[user1_id, today, 0.0],
]
assert expected_results_user1 in results[results.userId == user1_id].values
expected_results_user1 = [user1_id, today, 0.0]
assert expected_results_user1 == list(results[results.userId == user1_id].values[0])

user2_id = "1002"
expected_results_user2 = [
[user2_id, yesterday, 1.0],
[user2_id, today, 0.0],
]
assert expected_results_user2 in results[results.userId == user2_id].values
expected_results_user2 = []
assert expected_results_user2 == list(results[results.userId == user2_id].values)
12 changes: 4 additions & 8 deletions tests/integration/test_louvain_algorithm_computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def test_louvain_algorithm_available_data():
user_label = graph_schema.user_label
platform_label = graph_schema.platform_label
interacted_with = graph_schema.interacted_with_rel
interacted_in = graph_schema.interacted_in_rel
is_member = graph_schema.member_relation

# creating some nodes with data
Expand Down Expand Up @@ -52,9 +51,8 @@ def test_louvain_algorithm_available_data():
"""
)

assert len(results) == 2
assert results["date"].iloc[0] in [yesterday, today]
assert results["date"].iloc[1] in [yesterday, today]
assert len(results) == 1
assert results["date"].iloc[0] in [today]


def test_louvain_algorithm_more_available_data():
Expand All @@ -74,7 +72,6 @@ def test_louvain_algorithm_more_available_data():
user_label = graph_schema.user_label
platform_label = graph_schema.platform_label
interacted_with = graph_schema.interacted_with_rel
interacted_in = graph_schema.interacted_in_rel
is_member = graph_schema.member_relation

# creating some nodes with data
Expand Down Expand Up @@ -128,6 +125,5 @@ def test_louvain_algorithm_more_available_data():
"""
)
print(results)
assert len(results) == 2
assert results["date"].iloc[0] in [yesterday, today]
assert results["date"].iloc[1] in [yesterday, today]
assert len(results) == 1
assert results["date"].iloc[0] in [today]
Loading

0 comments on commit 7d8d320

Please sign in to comment.