Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a CN-like metric registry for DN's Prometheus metrics #3404

Merged
merged 23 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions discovery-provider/src/queries/get_celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytz
from src.monitors import monitor_names, monitors
from src.utils.prometheus_metric import PrometheusMetric, PrometheusType
from src.utils.prometheus_metric import PrometheusMetric, PrometheusMetricNames

logger = logging.getLogger(__name__)
MONITORS = monitors.MONITORS
Expand Down Expand Up @@ -32,12 +32,7 @@ def celery_tasks_prometheus_exporter():
active_tasks = all_tasks["active_tasks"]
registered_tasks = all_tasks["registered_celery_tasks"]

metric = PrometheusMetric(
"celery_task_active_duration_seconds",
"How long the currently running celery task has been running",
labelnames=["task_name"],
metric_type=PrometheusType.GAUGE,
)
metric = PrometheusMetric(PrometheusMetricNames.CELERY_TASK_ACTIVE_DURATION_SECONDS)

active_task_names = []
for task in active_tasks:
Expand Down
20 changes: 8 additions & 12 deletions discovery-provider/src/queries/get_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from src.utils.config import shared_config
from src.utils.elasticdsl import ES_INDEXES, esclient
from src.utils.helpers import redis_get_or_restore, redis_set_and_dump
from src.utils.prometheus_metric import PrometheusMetric, PrometheusType
from src.utils.prometheus_metric import PrometheusMetric, PrometheusMetricNames
from src.utils.redis_constants import (
LAST_REACTIONS_INDEX_TIME_KEY,
LAST_SEEN_NEW_REACTION_TIME_KEY,
Expand Down Expand Up @@ -462,17 +462,13 @@ def get_elasticsearch_health_info(
def health_check_prometheus_exporter():
health_results, is_unhealthy = get_health({})

PrometheusMetric(
"health_check_block_difference_current",
"Difference between the latest block and the latest indexed block",
metric_type=PrometheusType.GAUGE,
).save(health_results["block_difference"])

PrometheusMetric(
"health_check_latest_indexed_block_num_current",
"Latest indexed block number",
metric_type=PrometheusType.GAUGE,
).save(health_results["web"]["blocknumber"])
PrometheusMetric(PrometheusMetricNames.HEALTH_CHECK_BLOCK_DIFFERENCE_LATEST).save(
health_results["block_difference"]
)

PrometheusMetric(PrometheusMetricNames.HEALTH_CHECK_INDEXED_BLOCK_NUM_LATEST).save(
health_results["web"]["blocknumber"]
)


PrometheusMetric.register_collector(
Expand Down
6 changes: 2 additions & 4 deletions discovery-provider/src/tasks/aggregates/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from sqlalchemy import text
from sqlalchemy.orm.session import Session
from src.models.indexing.block import Block
from src.utils.prometheus_metric import PrometheusMetric
from src.utils.prometheus_metric import PrometheusMetric, PrometheusMetricNames
from src.utils.update_indexing_checkpoints import (
get_last_indexed_checkpoint,
save_indexed_checkpoint,
Expand Down Expand Up @@ -75,9 +75,7 @@ def update_aggregate_table(
current_checkpoint,
):
metric = PrometheusMetric(
"update_aggregate_table_latency_seconds",
"Runtimes for src.task.aggregates:update_aggregate_table()",
("table_name", "task_name"),
PrometheusMetricNames.UPDATE_AGGREGATE_TABLE_DURATION_SECONDS
)

# get name of the caller function
Expand Down
12 changes: 6 additions & 6 deletions discovery-provider/src/tasks/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
sweep_old_index_blocks_ms,
)
from src.utils.indexing_errors import IndexingError
from src.utils.prometheus_metric import PrometheusMetric, save_duration_metric
from src.utils.prometheus_metric import (
PrometheusMetric,
PrometheusMetricNames,
save_duration_metric,
)
from src.utils.redis_cache import (
remove_cached_playlist_ids,
remove_cached_track_ids,
Expand Down Expand Up @@ -561,11 +565,7 @@ def index_blocks(self, db, blocks_list):
block_order_range = range(len(blocks_list) - 1, -1, -1)
latest_block_timestamp = None
changed_entity_ids_map = {}
metric = PrometheusMetric(
"index_blocks_duration_seconds",
"Runtimes for src.task.index:index_blocks()",
("scope",),
)
metric = PrometheusMetric(PrometheusMetricNames.INDEX_BLOCKS_DURATION_SECONDS)
for i in block_order_range:
start_time = time.time()
metric.reset_timer()
Expand Down
18 changes: 8 additions & 10 deletions discovery-provider/src/tasks/index_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
from src.tasks.celery_app import celery
from src.utils.get_all_other_nodes import get_all_other_nodes
from src.utils.helpers import redis_get_or_restore, redis_set_and_dump
from src.utils.prometheus_metric import PrometheusMetric, save_duration_metric
from src.utils.prometheus_metric import (
PrometheusMetric,
PrometheusMetricNames,
save_duration_metric,
)
from src.utils.redis_metrics import (
METRICS_INTERVAL,
datetime_format_secondary,
Expand Down Expand Up @@ -414,9 +418,7 @@ def update_metrics(self):
f"index_metrics.py | update_metrics | {self.request.id} | Acquired update_metrics_lock"
)
metric = PrometheusMetric(
"index_metrics_duration_seconds",
"Runtimes for src.task.index_metrics:celery.task()",
("task_name",),
PrometheusMetricNames.INDEX_METRICS_DURATION_SECONDS
)
sweep_metrics(db, redis)
refresh_metrics_matviews(db)
Expand Down Expand Up @@ -458,9 +460,7 @@ def aggregate_metrics(self):
f"index_metrics.py | aggregate_metrics | {self.request.id} | Acquired aggregate_metrics_lock"
)
metric = PrometheusMetric(
"index_metrics_duration_seconds",
"Runtimes for src.task.index_metrics:celery.task()",
("task_name",),
PrometheusMetricNames.INDEX_METRICS_DURATION_SECONDS
)
consolidate_metrics_from_other_nodes(self, db, redis)
metric.save_time({"task_name": "aggregate_metrics"})
Expand Down Expand Up @@ -503,9 +503,7 @@ def synchronize_metrics(self):
f"index_metrics.py | synchronize_metrics | {self.request.id} | Acquired synchronize_metrics_lock"
)
metric = PrometheusMetric(
"index_metrics_duration_seconds",
"Runtimes for src.task.index_metrics:celery.task()",
("task_name",),
PrometheusMetricNames.INDEX_METRICS_DURATION_SECONDS
)
synchronize_all_node_metrics(self, db)
metric.save_time({"task_name": "synchronize_metrics"})
Expand Down
15 changes: 7 additions & 8 deletions discovery-provider/src/tasks/index_trending.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
from src.trending_strategies.trending_strategy_factory import TrendingStrategyFactory
from src.trending_strategies.trending_type_and_version import TrendingType
from src.utils.config import shared_config
from src.utils.prometheus_metric import PrometheusMetric, save_duration_metric
from src.utils.prometheus_metric import (
PrometheusMetric,
PrometheusMetricNames,
save_duration_metric,
)
from src.utils.redis_cache import set_json_cached_key
from src.utils.redis_constants import trending_tracks_last_completion_redis_key
from src.utils.session_manager import SessionManager
Expand Down Expand Up @@ -105,9 +109,7 @@ def get_genres(session: Session) -> List[str]:
def update_view(session: Session, mat_view_name: str):
start_time = time.time()
metric = PrometheusMetric(
"update_trending_view_duration_seconds",
"Runtimes for src.task.index_trending:update_view()",
("mat_view_name",),
PrometheusMetricNames.UPDATE_TRENDING_VIEW_DURATION_SECONDS
)
session.execute(f"REFRESH MATERIALIZED VIEW {mat_view_name}")
update_time = time.time() - start_time
Expand All @@ -125,10 +127,7 @@ def update_view(session: Session, mat_view_name: str):
def index_trending(self, db: SessionManager, redis: Redis, timestamp):
logger.info("index_trending.py | starting indexing")
update_start = time.time()
metric = PrometheusMetric(
"index_trending_duration_seconds",
"Runtimes for src.task.index_trending:index_trending()",
)
metric = PrometheusMetric(PrometheusMetricNames.INDEX_TRENDING_DURATION_SECONDS)
with db.scoped_session() as session:
genres = get_genres(session)

Expand Down
8 changes: 2 additions & 6 deletions discovery-provider/src/tasks/tracks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from src.utils import helpers, multihash
from src.utils.indexing_errors import EntityMissingRequiredFieldError, IndexingError
from src.utils.model_nullable_validator import all_required_fields_present
from src.utils.prometheus_metric import PrometheusMetric
from src.utils.prometheus_metric import PrometheusMetric, PrometheusMetricNames
from src.utils.track_event_constants import (
track_event_types_arr,
track_event_types_lookup,
Expand All @@ -40,11 +40,7 @@ def track_state_update(
) -> Tuple[int, Set]:
"""Return tuple containing int representing number of Track model state changes found in transaction and set of processed track IDs."""
begin_track_state_update = datetime.now()
metric = PrometheusMetric(
"track_state_update_duration_seconds",
"Runtimes for src.task.tracks:track_state_update()",
("scope",),
)
metric = PrometheusMetric(PrometheusMetricNames.TRACK_STATE_UPDATE_DURATION_SECONDS)

blockhash = update_task.web3.toHex(block_hash)
num_total_changes = 0
Expand Down
10 changes: 6 additions & 4 deletions discovery-provider/src/tasks/update_track_is_available.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from src.models.tracks.track import Track
from src.models.users.user import User
from src.tasks.celery_app import celery
from src.utils.prometheus_metric import PrometheusMetric, save_duration_metric
from src.utils.prometheus_metric import (
PrometheusMetric,
PrometheusMetricNames,
save_duration_metric,
)
from src.utils.redis_constants import (
ALL_UNAVAILABLE_TRACKS_REDIS_KEY,
UPDATE_TRACK_IS_AVAILABLE_FINISH_REDIS_KEY,
Expand Down Expand Up @@ -231,9 +235,7 @@ def update_track_is_available(self) -> None:
have_lock = update_lock.acquire(blocking=False)
if have_lock:
metric = PrometheusMetric(
"update_track_is_available_duration_seconds",
"Runtimes for src.task.update_track_is_available:celery.task()",
("task_name", "success"),
PrometheusMetricNames.UPDATE_TRACK_IS_AVAILABLE_DURATION_SECONDS
)
try:
# TODO: we can deprecate this manual redis timestamp tracker once we confirm
Expand Down
14 changes: 3 additions & 11 deletions discovery-provider/src/tasks/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from src.utils import helpers
from src.utils.indexing_errors import EntityMissingRequiredFieldError, IndexingError
from src.utils.model_nullable_validator import all_required_fields_present
from src.utils.prometheus_metric import PrometheusMetric
from src.utils.prometheus_metric import PrometheusMetric, PrometheusMetricNames
from src.utils.user_event_constants import user_event_types_arr, user_event_types_lookup

logger = logging.getLogger(__name__)
Expand All @@ -40,11 +40,7 @@ def user_state_update(
) -> Tuple[int, Set]:
"""Return tuple containing int representing number of User model state changes found in transaction and set of processed user IDs."""
begin_user_state_update = datetime.now()
metric = PrometheusMetric(
"user_state_update_duration_seconds",
"Runtimes for src.task.users:user_state_update()",
("scope",),
)
metric = PrometheusMetric(PrometheusMetricNames.USER_STATE_UPDATE_DURATION_SECONDS)

blockhash = update_task.web3.toHex(block_hash)
num_total_changes = 0
Expand Down Expand Up @@ -149,11 +145,7 @@ def process_user_txs_serial(
user_ids,
skipped_tx_count,
):
metric = PrometheusMetric(
"user_state_update_duration_seconds",
"Runtimes for src.task.users:user_state_update()",
("scope",),
)
metric = PrometheusMetric(PrometheusMetricNames.USER_STATE_UPDATE_DURATION_SECONDS)
processed_entries = 0
for user_tx in user_txs:
try:
Expand Down
Loading