Skip to content

Commit

Permalink
Merge pull request #41 from PowerLoom/feat/epoch-monitoring
Browse files Browse the repository at this point in the history
Feat: epoch monitoring
  • Loading branch information
anomit authored Aug 21, 2023
2 parents 56c3dd7 + 2b1dbd5 commit 56e058b
Show file tree
Hide file tree
Showing 13 changed files with 472 additions and 74 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
- [Preloading](#preloading)
- [Base Snapshot Generation](#base-snapshot-generation)
- [Snapshot Finalization](#snapshot-finalization)
- [Aggregation and data composition](#aggregation-and-data-composition---snapshot-generation-of-higher-order-data-points-on-base-snapshots)
- [Aggregation and data composition - snapshot generation of higher-order data points on base snapshots](#aggregation-and-data-composition---snapshot-generation-of-higher-order-data-points-on-base-snapshots)
- [Major Components](#major-components)
- [System Event Detector](#system-event-detector)
- [Process Hub Core](#process-hub-core)
Expand Down Expand Up @@ -373,7 +373,7 @@ In this section, let us take a look at the data composition abilities of Pooler
Required reading:
* [Base Snapshot Generation](#base-snapshot-generation) and
* [configuring `config/projects.json`](#configuration)
* [Aggregation and data composition](#aggregation-and-data-composition---snapshot-generation-of-higher-order-datapoints-on-base-snapshots)
* [Aggregation and data composition](#aggregation-and-data-composition---snapshot-generation-of-higher-order-data-points-on-base-snapshots)
As you can notice in [`config/projects.example.json`](config/projects.example.json), each project config needs to have the following components
Expand Down
36 changes: 34 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ fastapi = "^0.95.1"
ifps-client = {git = "https://git@github.com/PowerLoom/py-ipfs-client.git"}
aiorwlock = "^1.3.0"
aio-pika = "^9.1.4"
fastapi-pagination = "^0.12.8"


[build-system]
Expand Down
103 changes: 103 additions & 0 deletions snapshotter/core_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import json
from typing import List

from fastapi import Depends
from fastapi import FastAPI
from fastapi import Request
from fastapi import Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi_pagination import add_pagination
from fastapi_pagination import Page
from fastapi_pagination import paginate
from ipfs_client.main import AsyncIPFSClientSingleton
from pydantic import Field
from redis import asyncio as aioredis
from web3 import Web3

Expand All @@ -20,8 +27,14 @@
from snapshotter.utils.data_utils import get_snapshotter_status
from snapshotter.utils.default_logger import logger
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.models.data_models import SnapshotterEpochProcessingReportItem
from snapshotter.utils.models.data_models import SnapshotterStates
from snapshotter.utils.models.data_models import SnapshotterStateUpdate
from snapshotter.utils.redis.rate_limiter import load_rate_limiter_scripts
from snapshotter.utils.redis.redis_conn import RedisPoolCache
from snapshotter.utils.redis.redis_keys import epoch_id_epoch_released_key
from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping
from snapshotter.utils.redis.redis_keys import epoch_process_report_cached_key
from snapshotter.utils.redis.redis_keys import project_last_finalized_epoch_key
from snapshotter.utils.rpc import RpcHelper

Expand All @@ -46,6 +59,11 @@
# setup CORS origins stuff
origins = ['*']
app = FastAPI()
# for pagination of epoch processing status reports
Page = Page.with_custom_options(
size=Field(10, ge=1, le=30),
)
add_pagination(app)
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
Expand Down Expand Up @@ -460,3 +478,88 @@ async def get_snapshotter_project_level_status(
await incr_success_calls_count(auth_redis_conn, rate_limit_auth_dep)

return snapshotter_project_status.dict(exclude_none=True, exclude_unset=True)


@app.get('/internal/snapshotter/epochProcessingStatus')
async def get_snapshotter_epoch_processing_status(
request: Request,
response: Response,
rate_limit_auth_dep: RateLimitAuthCheck = Depends(
rate_limit_auth_check,
),
) -> Page[SnapshotterEpochProcessingReportItem]:
if not (
rate_limit_auth_dep.rate_limit_passed and
rate_limit_auth_dep.authorized and
rate_limit_auth_dep.owner.active == UserStatusEnum.active
):
return inject_rate_limit_fail_response(rate_limit_auth_dep)
redis_conn: aioredis.Redis = request.app.state.redis_pool
_ = await redis_conn.get(epoch_process_report_cached_key)
if _:
epoch_processing_final_report = list(
map(
lambda x: SnapshotterEpochProcessingReportItem.parse_obj(x),
json.loads(_),
),
)
return paginate(epoch_processing_final_report)
epoch_processing_final_report: List[SnapshotterEpochProcessingReportItem] = list()
try:
[current_epoch_data] = await request.app.state.anchor_rpc_helper.web3_call(
[request.app.state.protocol_state_contract.functions.currentEpoch()],
redis_conn=request.app.state.redis_pool,
)
current_epoch = {
'begin': current_epoch_data[0],
'end': current_epoch_data[1],
'epochId': current_epoch_data[2],
}

except Exception as e:
rest_logger.exception(
'Exception in get_current_epoch',
e=e,
)
response.status_code = 500
return {
'status': 'error',
'message': f'Unable to get current epoch, error: {e}',
}
current_epoch_id = current_epoch['epochId']
for epoch_id in range(current_epoch_id, current_epoch_id - 30 - 1, -1):
epoch_specific_report = SnapshotterEpochProcessingReportItem.construct()
epoch_specific_report.epochId = epoch_id
epoch_release_status = await redis_conn.get(
epoch_id_epoch_released_key(epoch_id=epoch_id),
)
if not epoch_release_status:
continue
epoch_specific_report.transitionStatus = dict()
if epoch_release_status:
epoch_specific_report.transitionStatus['EPOCH_RELEASED'] = SnapshotterStateUpdate(
status='success', timestamp=int(epoch_release_status),
)
else:
epoch_specific_report.transitionStatus['EPOCH_RELEASED'] = None
for state in SnapshotterStates:
state_report_entries = await redis_conn.hgetall(
name=epoch_id_project_to_state_mapping(epoch_id=epoch_id, state_id=state.value),
)
if state_report_entries:
project_state_report_entries = dict()
epoch_specific_report.transitionStatus[state.value] = dict()
project_state_report_entries = {
project_id.decode('utf-8'): SnapshotterStateUpdate.parse_raw(project_state_entry)
for project_id, project_state_entry in state_report_entries.items()
}
epoch_specific_report.transitionStatus[state.value] = project_state_report_entries
else:
epoch_specific_report.transitionStatus[state.value] = None
epoch_processing_final_report.append(epoch_specific_report)
await redis_conn.set(
epoch_process_report_cached_key,
json.dumps(list(map(lambda x: x.json(), epoch_processing_final_report))),
ex=60,
)
return paginate(epoch_processing_final_report)
61 changes: 60 additions & 1 deletion snapshotter/process_hub_core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import datetime
import json
import os
import threading
from urllib.parse import urljoin
import uuid
from multiprocessing import Process
from signal import SIGCHLD
Expand All @@ -11,6 +13,7 @@
from threading import Thread
from typing import Dict
from typing import Optional
import httpx

import psutil
import pydantic
Expand All @@ -20,12 +23,14 @@
from snapshotter.settings.config import settings
from snapshotter.system_event_detector import EventDetectorProcess
from snapshotter.utils.aggregation_worker import AggregationAsyncWorker
from snapshotter.utils.callback_helpers import send_failure_notifications_sync
from snapshotter.utils.default_logger import logger
from snapshotter.utils.delegate_worker import DelegateAsyncWorker
from snapshotter.utils.exceptions import SelfExitException
from snapshotter.utils.helper_functions import cleanup_proc_hub_children
from snapshotter.utils.models.data_models import ProcessorWorkerDetails
from snapshotter.utils.models.data_models import ProcessorWorkerDetails, SnapshotterIssue, SnapshotterReportState
from snapshotter.utils.models.data_models import SnapshotWorkerDetails
from snapshotter.utils.models.data_models import SnapshotterPing
from snapshotter.utils.models.message_models import ProcessHubCommand
from snapshotter.utils.rabbitmq_helpers import RabbitmqSelectLoopInteractor
from snapshotter.utils.redis.redis_conn import provide_redis_conn
Expand All @@ -52,6 +57,14 @@ def __init__(self, name, **kwargs):
self._spawned_cb_processes_map: Dict[str, Dict[str, Optional[SnapshotWorkerDetails]]] = (
dict()
) # separate map for callback worker spawns. unique ID -> dict(unique_name, pid)
self._httpx_client = httpx.Client(
base_url=settings.reporting.service_url,
limits=httpx.Limits(
max_keepalive_connections=2,
max_connections=2,
keepalive_expiry=300,
),
)
self._thread_shutdown_event = threading.Event()
self._shutdown_initiated = False

Expand Down Expand Up @@ -95,6 +108,7 @@ def signal_handler(self, signum, frame):
callback_worker_name = worker_process_details.unique_name
callback_worker_unique_id = unique_id
callback_worker_class = cb_worker_type

break

if (
Expand Down Expand Up @@ -130,6 +144,26 @@ def signal_handler(self, signum, frame):
worker_obj.pid,
pid,
)
if settings.reporting.service_url:
send_failure_notifications_sync(
client=self._httpx_client,
message=SnapshotterIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.CRASHED_CHILD_WORKER.value,
projectID='',
epochId='',
timeOfReporting=datetime.now().isoformat(),
extra=json.dumps(
{
'worker_name': callback_worker_name,
'pid': pid,
'worker_class': callback_worker_class,
'worker_unique_id': callback_worker_unique_id,
'respawned_pid': worker_obj.pid,
}
),
)
)
return

for cb_worker_type, worker_pid in self._spawned_processes_map.items():
Expand All @@ -151,6 +185,18 @@ def signal_handler(self, signum, frame):
self._spawned_processes_map[cb_worker_type] = proc_obj.pid
elif signum in [SIGINT, SIGTERM, SIGQUIT]:
self._shutdown_initiated = True
if settings.reporting.service_url:
self._logger.debug('Sending shutdown signal to reporting service')
send_failure_notifications_sync(
client=self._httpx_client,
message=SnapshotterIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.SHUTDOWN_INITIATED.value,
projectID='',
epochId='',
timeOfReporting=datetime.now().isoformat(),
)
)
self.rabbitmq_interactor.stop()
# raise GenericExitOnSignal

Expand Down Expand Up @@ -217,6 +263,19 @@ def internal_state_reporter(self, redis_conn: redis.Redis = None):
name=f'powerloom:snapshotter:{settings.namespace}:{settings.instance_id}:Processes',
mapping=proc_id_map,
)
if settings.reporting.service_url:
try:
self._httpx_client.post(
url=urljoin(settings.reporting.service_url, '/ping'),
json=SnapshotterPing(instanceID=settings.instance_id).dict(),
)
except Exception as e:
if settings.logs.trace_enabled:
self._logger.opt(exception=True).error('Error while pinging reporting service: {}', e,)
else:
self._logger.error(
'Error while pinging reporting service: {}', e,
)
self._logger.error(
(
'Caught thread shutdown notification event. Deleting process'
Expand Down
Loading

0 comments on commit 56e058b

Please sign in to comment.