Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PAY-172] Add monitoring for reactions #3105

Merged
merged 3 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion discovery-provider/src/queries/get_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from src.utils.helpers import redis_get_or_restore, redis_set_and_dump
from src.utils.prometheus_metric import PrometheusMetric, PrometheusType
from src.utils.redis_constants import (
LAST_REACTIONS_INDEX_TIME_KEY,
LAST_SEEN_NEW_REACTION_TIME_KEY,
challenges_last_processed_event_redis_key,
index_eth_last_completion_redis_key,
latest_block_hash_redis_key,
Expand Down Expand Up @@ -170,6 +172,10 @@ class GetHealthArgs(TypedDict):
# Number of seconds play counts are allowed to drift
plays_count_max_drift: Optional[int]

# Reactions max drift
reactions_max_indexing_drift: Optional[int]
reactions_max_last_reaction_drift: Optional[int]


def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict, bool]:
"""
Expand Down Expand Up @@ -226,6 +232,11 @@ def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict,
rewards_manager_health_info = get_rewards_manager_health_info(redis)
user_bank_health_info = get_user_bank_health_info(redis)
spl_audio_info = get_spl_audio_info(redis)
reactions_health_info = get_reactions_health_info(
redis,
args.get("reactions_max_indexing_drift"),
args.get("reactions_max_last_reaction_drift"),
)

# fetch latest db state if:
# we explicitly don't want to use redis cache or
Expand Down Expand Up @@ -309,6 +320,7 @@ def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict,
"user_bank": user_bank_health_info,
"openresty_public_key": openresty_public_key,
"spl_audio_info": spl_audio_info,
"reactions": reactions_health_info,
}

block_difference = abs(latest_block_num - latest_indexed_block_num)
Expand Down Expand Up @@ -370,7 +382,10 @@ def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict,
)

is_unhealthy = (
unhealthy_blocks or unhealthy_challenges or play_health_info["is_unhealthy"]
unhealthy_blocks
or unhealthy_challenges
or play_health_info["is_unhealthy"]
or reactions_health_info["is_unhealthy"]
)

return health_results, is_unhealthy
Expand Down Expand Up @@ -529,6 +544,47 @@ def get_user_bank_health_info(
}


def get_reactions_health_info(
redis: Redis,
max_indexing_drift: Optional[int] = None,
max_reaction_drift: Optional[int] = None,
):
now = datetime.now()
last_index_time = redis.get(LAST_REACTIONS_INDEX_TIME_KEY)
last_index_time = int(last_index_time) if last_index_time else None
last_reaction_time = redis.get(LAST_SEEN_NEW_REACTION_TIME_KEY)
last_reaction_time = int(last_reaction_time) if last_reaction_time else None

last_index_time = (
datetime.fromtimestamp(last_index_time) if last_index_time else None
)
last_reaction_time = (
datetime.fromtimestamp(last_reaction_time) if last_reaction_time else None
)

indexing_delta = (
(now - last_index_time).total_seconds() if last_index_time else None
)
reaction_delta = (
(now - last_reaction_time).total_seconds() if last_reaction_time else None
)

is_unhealthy_indexing = bool(
indexing_delta and max_indexing_drift and indexing_delta > max_indexing_drift
)
is_unhealthy_reaction = bool(
reaction_delta and max_reaction_drift and reaction_delta > max_reaction_drift
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how useful this one will be for determining overall health, since it's possible nobody reacts. I see it's an optional param for now though but still curious if it needs to be part of the overall is_healthy calculation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking you can set it super conservatively - like if nobody reacts in 2 days or something. Without something like this, the indexing check alone won't catch if there's a logic error somewhere later in that function that processes the reactions. Wdyt

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My biggest concern would be a spiralling reselection and degraded user experience because nobody reacted for a while, but as long as we don't include the argument in libs and keep it optional, I'm ok with it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thought - would definitely only include this for our monitoring system

)

is_unhealthy = is_unhealthy_indexing or is_unhealthy_reaction

return {
"indexing_delta": indexing_delta,
"reaction_delta": reaction_delta,
"is_unhealthy": is_unhealthy,
}


def get_spl_audio_info(redis: Redis, max_drift: Optional[int] = None) -> SolHealthInfo:
if redis is None:
raise Exception("Invalid arguments for get_spl_audio_info")
Expand Down
6 changes: 6 additions & 0 deletions discovery-provider/src/queries/health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ def health_check():
"challenge_events_age_max_drift", type=int
),
"plays_count_max_drift": request.args.get("plays_count_max_drift", type=int),
"reactions_max_indexing_drift": request.args.get(
"reactions_max_indexing_drift", type=int
),
"reactions_max_last_reaction_drift": request.args.get(
"reactions_max_last_reaction_drift", type=int
),
}

(health_results, error) = get_health(args)
Expand Down
9 changes: 8 additions & 1 deletion discovery-provider/src/tasks/index_reactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
from src.tasks.aggregates import init_task_and_acquire_lock
from src.tasks.celery_app import celery
from src.utils.config import shared_config
from src.utils.redis_constants import (
LAST_REACTIONS_INDEX_TIME_KEY,
LAST_SEEN_NEW_REACTION_TIME_KEY,
)
from src.utils.session_manager import SessionManager
from src.utils.update_indexing_checkpoints import (
get_last_indexed_checkpoint,
Expand Down Expand Up @@ -68,7 +72,7 @@ def fetch_reactions_from_identity(start_index) -> List[ReactionResponse]:
return new_reactions_response.json()["reactions"]


def index_identity_reactions(session: Session, _):
def index_identity_reactions(session: Session, redis: Redis):
try:
last_checkpoint = get_last_indexed_checkpoint(
session, IDENTITY_INDEXING_CHECKPOINT_NAME
Expand All @@ -77,6 +81,8 @@ def index_identity_reactions(session: Session, _):
new_reactions: List[ReactionResponse] = fetch_reactions_from_identity(
last_checkpoint
)
redis.set(LAST_REACTIONS_INDEX_TIME_KEY, int(datetime.now().timestamp()))

if not len(new_reactions):
return

Expand All @@ -92,6 +98,7 @@ def index_identity_reactions(session: Session, _):
logger.info(
f"Indexed {len(reaction_models)} reactions, new checkpoint: {new_checkpoint}"
)
redis.set(LAST_SEEN_NEW_REACTION_TIME_KEY, int(datetime.now().timestamp()))
except Exception as e:
logger.error(f"index_reactions: error {e}")

Expand Down
4 changes: 4 additions & 0 deletions discovery-provider/src/utils/redis_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@
latest_sol_plays_slot_key = "latest_sol_slot:plays"
latest_sol_listen_count_milestones_slot_key = "latest_sol_slot:listen_count_milestones"
latest_sol_rewards_manager_slot_key = "latest_sol_slot:rewards_manager"

# Reactions
LAST_REACTIONS_INDEX_TIME_KEY = "reactions_last_index_time"
LAST_SEEN_NEW_REACTION_TIME_KEY = "reactions_last_new_reaction_time"