From 9694e9e508d9f3bd289944975b9b773e5605b5c2 Mon Sep 17 00:00:00 2001 From: Michael Piazza Date: Tue, 17 May 2022 00:41:28 +0000 Subject: [PATCH 1/3] Add monitoring for reactions --- discovery-provider/src/queries/get_health.py | 67 ++++++++++++++++++- .../src/queries/health_check.py | 6 ++ .../src/tasks/index_reactions.py | 9 ++- .../src/utils/redis_constants.py | 4 ++ 4 files changed, 83 insertions(+), 3 deletions(-) diff --git a/discovery-provider/src/queries/get_health.py b/discovery-provider/src/queries/get_health.py index 49bb03f3c05..46adb923a31 100644 --- a/discovery-provider/src/queries/get_health.py +++ b/discovery-provider/src/queries/get_health.py @@ -1,7 +1,8 @@ import logging import os import time -from datetime import datetime +from datetime import date, datetime +from optparse import Option from typing import Dict, Optional, Tuple, TypedDict, cast from elasticsearch import Elasticsearch @@ -25,6 +26,8 @@ from src.utils.helpers import redis_get_or_restore, redis_set_and_dump from src.utils.prometheus_metric import PrometheusMetric, PrometheusType from src.utils.redis_constants import ( + LAST_REACTIONS_INDEX_TIME_KEY, + LAST_SEEN_NEW_REACTION_TIME_KEY, challenges_last_processed_event_redis_key, index_eth_last_completion_redis_key, latest_block_hash_redis_key, @@ -170,6 +173,13 @@ class GetHealthArgs(TypedDict): # Number of seconds play counts are allowed to drift plays_count_max_drift: Optional[int] + # Userbank max drift + userbank_max_drift: Optional[int] + + # Reactions max drift + reactions_max_indexing_drift: Optional[int] + reactions_max_last_reaction_drift: Optional[int] + def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict, bool]: """ @@ -226,6 +236,11 @@ def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict, rewards_manager_health_info = get_rewards_manager_health_info(redis) user_bank_health_info = get_user_bank_health_info(redis) spl_audio_info = get_spl_audio_info(redis) + reactions_health_info = get_reactions_health_info( + redis, + args.get("reactions_max_indexing_drift"), + args.get("reactions_max_last_reaction_drift"), + ) # fetch latest db state if: # we explicitly don't want to use redis cache or @@ -309,6 +324,7 @@ def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict, "user_bank": user_bank_health_info, "openresty_public_key": openresty_public_key, "spl_audio_info": spl_audio_info, + "reactions": reactions_health_info, } block_difference = abs(latest_block_num - latest_indexed_block_num) @@ -370,7 +386,10 @@ def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict, ) is_unhealthy = ( - unhealthy_blocks or unhealthy_challenges or play_health_info["is_unhealthy"] + unhealthy_blocks + or unhealthy_challenges + or play_health_info["is_unhealthy"] + or reactions_health_info["is_unhealthy"] ) return health_results, is_unhealthy @@ -529,6 +548,50 @@ def get_user_bank_health_info( } +def get_reactions_health_info( + redis: Redis, + max_indexing_drift: Optional[int] = None, + max_reaction_drift: Optional[int] = None, +): + now = datetime.now() + last_index_time = redis.get(LAST_REACTIONS_INDEX_TIME_KEY) + last_index_time = int(last_index_time) if last_index_time else None + last_reaction_time = redis.get(LAST_SEEN_NEW_REACTION_TIME_KEY) + last_reaction_time = int(last_reaction_time) if last_reaction_time else None + + last_index_time = ( + datetime.fromtimestamp(last_index_time) if last_index_time else None + ) + last_reaction_time = ( + datetime.fromtimestamp(last_reaction_time) if last_reaction_time else None + ) + + indexing_delta = ( + (now - last_index_time).total_seconds() if last_index_time else None + ) + reaction_delta = ( + (now - last_reaction_time).total_seconds() if last_reaction_time else None + ) + + is_unhealthy_indexing = bool( + indexing_delta and max_indexing_drift and indexing_delta > max_indexing_drift + ) + is_unhealthy_reaction = bool( + reaction_delta and max_reaction_drift and reaction_delta > max_reaction_drift + ) + logger.info( + f"reactions | {indexing_delta} {max_indexing_drift} {is_unhealthy_indexing}" + ) + + is_unhealthy = is_unhealthy_indexing or is_unhealthy_reaction + + return { + "indexing_delta": indexing_delta, + "reaction_delta": reaction_delta, + "is_unhealthy": is_unhealthy, + } + + def get_spl_audio_info(redis: Redis, max_drift: Optional[int] = None) -> SolHealthInfo: if redis is None: raise Exception("Invalid arguments for get_spl_audio_info") diff --git a/discovery-provider/src/queries/health_check.py b/discovery-provider/src/queries/health_check.py index acbb94ac9c5..05980b51599 100644 --- a/discovery-provider/src/queries/health_check.py +++ b/discovery-provider/src/queries/health_check.py @@ -51,6 +51,12 @@ def health_check(): "challenge_events_age_max_drift", type=int ), "plays_count_max_drift": request.args.get("plays_count_max_drift", type=int), + "reactions_max_indexing_drift": request.args.get( + "reactions_max_indexing_drift", type=int + ), + "reactions_max_last_reaction_drift": request.args.get( + "reactions_max_last_reaction_drift", type=int + ), } (health_results, error) = get_health(args) diff --git a/discovery-provider/src/tasks/index_reactions.py b/discovery-provider/src/tasks/index_reactions.py index 3c4f4157fed..86cb755f6bf 100644 --- a/discovery-provider/src/tasks/index_reactions.py +++ b/discovery-provider/src/tasks/index_reactions.py @@ -10,6 +10,10 @@ from src.tasks.aggregates import init_task_and_acquire_lock from src.tasks.celery_app import celery from src.utils.config import shared_config +from src.utils.redis_constants import ( + LAST_REACTIONS_INDEX_TIME_KEY, + LAST_SEEN_NEW_REACTION_TIME_KEY, +) from src.utils.session_manager import SessionManager from src.utils.update_indexing_checkpoints import ( get_last_indexed_checkpoint, @@ -68,7 +72,7 @@ def fetch_reactions_from_identity(start_index) -> List[ReactionResponse]: return new_reactions_response.json()["reactions"] -def index_identity_reactions(session: Session, _): +def index_identity_reactions(session: Session, redis: Redis): try: last_checkpoint = get_last_indexed_checkpoint( session, IDENTITY_INDEXING_CHECKPOINT_NAME @@ -77,6 +81,8 @@ def index_identity_reactions(session: Session, _): new_reactions: List[ReactionResponse] = fetch_reactions_from_identity( last_checkpoint ) + redis.set(LAST_REACTIONS_INDEX_TIME_KEY, int(datetime.now().timestamp())) + if not len(new_reactions): return @@ -92,6 +98,7 @@ def index_identity_reactions(session: Session, _): logger.info( f"Indexed {len(reaction_models)} reactions, new checkpoint: {new_checkpoint}" ) + redis.set(LAST_SEEN_NEW_REACTION_TIME_KEY, int(datetime.now().timestamp())) except Exception as e: logger.error(f"index_reactions: error {e}") diff --git a/discovery-provider/src/utils/redis_constants.py b/discovery-provider/src/utils/redis_constants.py index fc8e52c3490..8a16302d4bd 100644 --- a/discovery-provider/src/utils/redis_constants.py +++ b/discovery-provider/src/utils/redis_constants.py @@ -44,3 +44,7 @@ latest_sol_plays_slot_key = "latest_sol_slot:plays" latest_sol_listen_count_milestones_slot_key = "latest_sol_slot:listen_count_milestones" latest_sol_rewards_manager_slot_key = "latest_sol_slot:rewards_manager" + +# Reactions +LAST_REACTIONS_INDEX_TIME_KEY = "reactions_last_index_time" +LAST_SEEN_NEW_REACTION_TIME_KEY = "reactions_last_new_reaction_time" \ No newline at end of file From 02656fb4c115ec4018ac1d1bfa9464eaa5380bad Mon Sep 17 00:00:00 2001 From: Michael Piazza Date: Tue, 17 May 2022 00:47:54 +0000 Subject: [PATCH 2/3] Remove log --- discovery-provider/src/queries/get_health.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/discovery-provider/src/queries/get_health.py b/discovery-provider/src/queries/get_health.py index 46adb923a31..94dd881f655 100644 --- a/discovery-provider/src/queries/get_health.py +++ b/discovery-provider/src/queries/get_health.py @@ -579,9 +579,6 @@ def get_reactions_health_info( is_unhealthy_reaction = bool( reaction_delta and max_reaction_drift and reaction_delta > max_reaction_drift ) - logger.info( - f"reactions | {indexing_delta} {max_indexing_drift} {is_unhealthy_indexing}" - ) is_unhealthy = is_unhealthy_indexing or is_unhealthy_reaction From 3ae26464f38d6f887d23be8e88f12886d1f6bce3 Mon Sep 17 00:00:00 2001 From: Michael Piazza Date: Tue, 17 May 2022 16:26:50 +0000 Subject: [PATCH 3/3] PR fixes --- discovery-provider/src/queries/get_health.py | 6 +----- discovery-provider/src/utils/redis_constants.py | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/discovery-provider/src/queries/get_health.py b/discovery-provider/src/queries/get_health.py index 94dd881f655..0abbfb0aa4f 100644 --- a/discovery-provider/src/queries/get_health.py +++ b/discovery-provider/src/queries/get_health.py @@ -1,8 +1,7 @@ import logging import os import time -from datetime import date, datetime -from optparse import Option +from datetime import datetime from typing import Dict, Optional, Tuple, TypedDict, cast from elasticsearch import Elasticsearch @@ -173,9 +172,6 @@ class GetHealthArgs(TypedDict): # Number of seconds play counts are allowed to drift plays_count_max_drift: Optional[int] - # Userbank max drift - userbank_max_drift: Optional[int] - # Reactions max drift reactions_max_indexing_drift: Optional[int] reactions_max_last_reaction_drift: Optional[int] diff --git a/discovery-provider/src/utils/redis_constants.py b/discovery-provider/src/utils/redis_constants.py index 8a16302d4bd..5741017a9ff 100644 --- a/discovery-provider/src/utils/redis_constants.py +++ b/discovery-provider/src/utils/redis_constants.py @@ -47,4 +47,4 @@ # Reactions LAST_REACTIONS_INDEX_TIME_KEY = "reactions_last_index_time" -LAST_SEEN_NEW_REACTION_TIME_KEY = "reactions_last_new_reaction_time" \ No newline at end of file +LAST_SEEN_NEW_REACTION_TIME_KEY = "reactions_last_new_reaction_time"