diff --git a/discovery-provider/src/queries/get_health.py b/discovery-provider/src/queries/get_health.py index 49bb03f3c05..0abbfb0aa4f 100644 --- a/discovery-provider/src/queries/get_health.py +++ b/discovery-provider/src/queries/get_health.py @@ -25,6 +25,8 @@ from src.utils.helpers import redis_get_or_restore, redis_set_and_dump from src.utils.prometheus_metric import PrometheusMetric, PrometheusType from src.utils.redis_constants import ( + LAST_REACTIONS_INDEX_TIME_KEY, + LAST_SEEN_NEW_REACTION_TIME_KEY, challenges_last_processed_event_redis_key, index_eth_last_completion_redis_key, latest_block_hash_redis_key, @@ -170,6 +172,10 @@ class GetHealthArgs(TypedDict): # Number of seconds play counts are allowed to drift plays_count_max_drift: Optional[int] + # Reactions max drift + reactions_max_indexing_drift: Optional[int] + reactions_max_last_reaction_drift: Optional[int] + def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict, bool]: """ @@ -226,6 +232,11 @@ def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict, rewards_manager_health_info = get_rewards_manager_health_info(redis) user_bank_health_info = get_user_bank_health_info(redis) spl_audio_info = get_spl_audio_info(redis) + reactions_health_info = get_reactions_health_info( + redis, + args.get("reactions_max_indexing_drift"), + args.get("reactions_max_last_reaction_drift"), + ) # fetch latest db state if: # we explicitly don't want to use redis cache or @@ -309,6 +320,7 @@ def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict, "user_bank": user_bank_health_info, "openresty_public_key": openresty_public_key, "spl_audio_info": spl_audio_info, + "reactions": reactions_health_info, } block_difference = abs(latest_block_num - latest_indexed_block_num) @@ -370,7 +382,10 @@ def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict, ) is_unhealthy = ( - unhealthy_blocks or unhealthy_challenges or play_health_info["is_unhealthy"] + unhealthy_blocks + or unhealthy_challenges + or play_health_info["is_unhealthy"] + or reactions_health_info["is_unhealthy"] ) return health_results, is_unhealthy @@ -529,6 +544,47 @@ def get_user_bank_health_info( } +def get_reactions_health_info( + redis: Redis, + max_indexing_drift: Optional[int] = None, + max_reaction_drift: Optional[int] = None, +): + now = datetime.now() + last_index_time = redis.get(LAST_REACTIONS_INDEX_TIME_KEY) + last_index_time = int(last_index_time) if last_index_time else None + last_reaction_time = redis.get(LAST_SEEN_NEW_REACTION_TIME_KEY) + last_reaction_time = int(last_reaction_time) if last_reaction_time else None + + last_index_time = ( + datetime.fromtimestamp(last_index_time) if last_index_time else None + ) + last_reaction_time = ( + datetime.fromtimestamp(last_reaction_time) if last_reaction_time else None + ) + + indexing_delta = ( + (now - last_index_time).total_seconds() if last_index_time else None + ) + reaction_delta = ( + (now - last_reaction_time).total_seconds() if last_reaction_time else None + ) + + is_unhealthy_indexing = bool( + indexing_delta and max_indexing_drift and indexing_delta > max_indexing_drift + ) + is_unhealthy_reaction = bool( + reaction_delta and max_reaction_drift and reaction_delta > max_reaction_drift + ) + + is_unhealthy = is_unhealthy_indexing or is_unhealthy_reaction + + return { + "indexing_delta": indexing_delta, + "reaction_delta": reaction_delta, + "is_unhealthy": is_unhealthy, + } + + def get_spl_audio_info(redis: Redis, max_drift: Optional[int] = None) -> SolHealthInfo: if redis is None: raise Exception("Invalid arguments for get_spl_audio_info") diff --git a/discovery-provider/src/queries/health_check.py b/discovery-provider/src/queries/health_check.py index acbb94ac9c5..05980b51599 100644 --- a/discovery-provider/src/queries/health_check.py +++ b/discovery-provider/src/queries/health_check.py @@ -51,6 +51,12 @@ def health_check(): "challenge_events_age_max_drift", type=int ), "plays_count_max_drift": request.args.get("plays_count_max_drift", type=int), + "reactions_max_indexing_drift": request.args.get( + "reactions_max_indexing_drift", type=int + ), + "reactions_max_last_reaction_drift": request.args.get( + "reactions_max_last_reaction_drift", type=int + ), } (health_results, error) = get_health(args) diff --git a/discovery-provider/src/tasks/index_reactions.py b/discovery-provider/src/tasks/index_reactions.py index 3c4f4157fed..86cb755f6bf 100644 --- a/discovery-provider/src/tasks/index_reactions.py +++ b/discovery-provider/src/tasks/index_reactions.py @@ -10,6 +10,10 @@ from src.tasks.aggregates import init_task_and_acquire_lock from src.tasks.celery_app import celery from src.utils.config import shared_config +from src.utils.redis_constants import ( + LAST_REACTIONS_INDEX_TIME_KEY, + LAST_SEEN_NEW_REACTION_TIME_KEY, +) from src.utils.session_manager import SessionManager from src.utils.update_indexing_checkpoints import ( get_last_indexed_checkpoint, @@ -68,7 +72,7 @@ def fetch_reactions_from_identity(start_index) -> List[ReactionResponse]: return new_reactions_response.json()["reactions"] -def index_identity_reactions(session: Session, _): +def index_identity_reactions(session: Session, redis: Redis): try: last_checkpoint = get_last_indexed_checkpoint( session, IDENTITY_INDEXING_CHECKPOINT_NAME @@ -77,6 +81,8 @@ def index_identity_reactions(session: Session, _): new_reactions: List[ReactionResponse] = fetch_reactions_from_identity( last_checkpoint ) + redis.set(LAST_REACTIONS_INDEX_TIME_KEY, int(datetime.now().timestamp())) + if not len(new_reactions): return @@ -92,6 +98,7 @@ def index_identity_reactions(session: Session, _): logger.info( f"Indexed {len(reaction_models)} reactions, new checkpoint: {new_checkpoint}" ) + redis.set(LAST_SEEN_NEW_REACTION_TIME_KEY, int(datetime.now().timestamp())) except Exception as e: logger.error(f"index_reactions: error {e}") diff --git a/discovery-provider/src/utils/redis_constants.py b/discovery-provider/src/utils/redis_constants.py index fc8e52c3490..5741017a9ff 100644 --- a/discovery-provider/src/utils/redis_constants.py +++ b/discovery-provider/src/utils/redis_constants.py @@ -44,3 +44,7 @@ latest_sol_plays_slot_key = "latest_sol_slot:plays" latest_sol_listen_count_milestones_slot_key = "latest_sol_slot:listen_count_milestones" latest_sol_rewards_manager_slot_key = "latest_sol_slot:rewards_manager" + +# Reactions +LAST_REACTIONS_INDEX_TIME_KEY = "reactions_last_index_time" +LAST_SEEN_NEW_REACTION_TIME_KEY = "reactions_last_new_reaction_time"