Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: epoch monitoring #41

Merged
merged 13 commits into from
Aug 21, 2023
Merged

Feat: epoch monitoring #41

merged 13 commits into from
Aug 21, 2023

Conversation

anomit
Copy link
Member

@anomit anomit commented Aug 20, 2023

Fixes #40

Checklist

  • My branch is up-to-date with upstream/develop branch.
  • Everything works and tested for Python 3.8.0 and above.
  • I ran pre-commit checks against my changes.
  • I've written tests against my changes and all the current present tests are passing.

Current behaviour

There are no APIs that expose the processing status of a snapshot according to the state transitions it goes through before being finalized.

For more details, refer #40

New expected behaviour

The changes included in this PR capture the state transitions as detailed in #40 as per the following data models as an epoch is released until SnapshotFinalized event is received by the processor distributor for the specific epoch.


EPOCH_RELEASED

Redis key being set:

def epoch_id_epoch_released_key(epoch_id):
return f'epochID:{epoch_id}:epochReleased'

await self._redis_conn.set(
epoch_id_epoch_released_key(_.epochId),
int(time.time()),
)

It contains the UNIX timestamp at which the EpochReleased event was received from the protocol state contract.


For each of the state IDs corresponding to a state transition, except EPOCH_RELEASED, a Redis hashtable entry is set against every project ID or type being processed by the snapshotter. The key of this hashtable is in the following format

def epoch_id_project_to_state_mapping(epoch_id, state_id):
return f'epochID:{epoch_id}:stateID:{state_id}:processingStatus'

and each value against a hashtable entry of a project ID follows the data model:

class SnapshotterStateUpdate(BaseModel):
status: str
error: Optional[str] = None
extra: Optional[Dict[str, Any]] = None
timestamp: int


PRELOAD

For every project type's preloader specifications, the status of all the preloading dependencies being satisfied is captured here:

for project_type in self._project_type_config_mapping:
project_config = self._project_type_config_mapping[project_type]
if not project_config.preload_tasks:
continue
self._logger.debug(
'Expected list of successful preloading for project type {}: {}',
project_type,
project_config.preload_tasks,
)
if all([t in succesful_preloads for t in project_config.preload_tasks]):
self._logger.info(
'Preloading dependency satisfied for project type {} epoch {}. Distributing snapshot build tasks...',
project_type, epoch.epochId,
)
asyncio.ensure_future(
self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(epoch.epochId, SnapshotterStates.PRELOAD.value),
mapping={
project_type: SnapshotterStateUpdate(
status='success', timestamp=int(time.time())
).json()
}
)
)
await self._distribute_callbacks_snapshotting(project_type, epoch)

SNAPSHOT_BUILD

The snapshot builders as configured in projects.json are executed. Also refer to the case study of the current implementation of Pooler for a detailed look at snapshot building for base as well as aggregates.

await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch_id=msg_obj.epochId, state_id=SnapshotterStates.SNAPSHOT_BUILD.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='failed', error=str(e), timestamp=int(time.time()),
).json(),
},
)
else:
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch_id=msg_obj.epochId, state_id=SnapshotterStates.SNAPSHOT_BUILD.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='success', timestamp=int(time.time()),
).json(),
},
)

SNAPSHOT_SUBMIT_PAYLOAD_COMMIT

Captures the status of propagation of the built snapshot to the payload commit service in Audit Protocol for further submission to the protocol state contract.

except Exception as e:
self._logger.opt(exception=True).error(
(
'Exception committing snapshot to commit payload queue:'
' {} | dump: {}'
),
snapshot,
e,
)
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch.epochId, SnapshotterStates.SNAPSHOT_SUBMIT_PAYLOAD_COMMIT.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='failed', error=str(e), timestamp=int(time.time()),
).json(),
},
)
else:
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch.epochId, SnapshotterStates.SNAPSHOT_SUBMIT_PAYLOAD_COMMIT.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='success', timestamp=int(time.time()),
).json(),
},
)

SNAPSHOT_SUBMIT_PROTOCOL_CONTRACT

The snapshot submission transaction from the relayer to the protocol state smart contract was successful and a SnapshotSubmitted event was generated

elif message_type == 'SnapshotSubmitted':
try:
msg_obj: PowerloomSnapshotSubmittedMessage = PowerloomSnapshotSubmittedMessage.parse_raw(message.body)
except:
pass
else:
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(msg_obj.epochId, SnapshotterStates.SNAPSHOT_SUBMIT_PROTOCOL_CONTRACT.value),
mapping={
msg_obj.projectId: SnapshotterStateUpdate(
status='success', timestamp=int(time.time()), extra={'snapshotCid': msg_obj.snapshotCid}
).json()
},
)

SNAPSHOT_FINALIZE

Captures the finalized snapshot accepted against an epoch via a SnapshotFinalized event.

await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(msg_obj.epochId, SnapshotterStates.SNAPSHOT_FINALIZE.value),
mapping={
msg_obj.projectId: SnapshotterStateUpdate(
status='success', timestamp=int(time.time()), extra={'snapshot_cid': msg_obj.snapshotCid}
).json()
}
)


Core API ([core_api.py](https://github.com/PowerLoom/pooler/blob/bcc245d228acce504ba803b9b50fd89c8eb05984/snapshotter/core_api.py)) now exposes an internal API endpoint GET /internal/snapshotter/epochProcessingStatus that returns a paginated response of the state transition status of snapshots against project IDs, upto 30 most recent epochs.

Request:

curl -X 'GET' \
  'http://localhost:8002/internal/snapshotter/epochProcessingStatus?page=1&size=10' \
  -H 'accept: application/json'

Sample response:

{
    "items": [
      {
        "epochId": 43523,
        "transitionStatus": {
          "EPOCH_RELEASED": {
            "status": "success",
            "error": null,
            "extra": null,
            "timestamp": 1692530595
          },
          "PRELOAD": {
            "pairContract_pair_total_reserves": {
              "status": "success",
              "error": null,
              "extra": null,
              "timestamp": 1692530595
            },
          },
          "SNAPSHOT_BUILD": {
            "aggregate_24h_stats_lite:35ee1886fa4665255a0d0486c6079c4719c82f0f62ef9e96a98f26fde2e8a106:UNISWAPV2": {
              "status": "success",
              "error": null,
              "extra": null,
              "timestamp": 1692530596
            },
          },
          "SNAPSHOT_SUBMIT_PAYLOAD_COMMIT": {

          },
         "RELAYER_SEND": {

         },
        "SNAPSHOT_FINALIZE": {

        },
      },
    }
   ],
   "total": 30,
   "page": 1,
   "size": 10,
   "pages": 3
}

Change logs

Added

  • Core API query endpoint GET /internal/snapshotter/epochProcessingStatus

  • Record state transition of a snapshot at every step

  • Send health pings to onchain consensus reporting service every 2 seconds

if settings.reporting.service_url:
try:
self._httpx_client.post(
url=urljoin(settings.reporting.service_url, '/ping'),
json=SnapshotterPing(instanceID=settings.instance_id).dict(),
)
except Exception as e:
if settings.logs.trace_enabled:
self._logger.opt(exception=True).error('Error while pinging reporting service: {}', e,)
else:
self._logger.error(
'Error while pinging reporting service: {}', e,
)

  • Send reports on crashed callback workers as well as complete shutdown of running snapshotter instances

elif signum in [SIGINT, SIGTERM, SIGQUIT]:
self._shutdown_initiated = True
if settings.reporting.service_url:
self._logger.debug('Sending shutdown signal to reporting service')
send_failure_notifications_sync(
client=self._httpx_client,
message=SnapshotterIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.SHUTDOWN_INITIATED.value,
projectID='',
epochId='',
timeOfReporting=datetime.now().isoformat(),
)
)

if settings.reporting.service_url:
send_failure_notifications_sync(
client=self._httpx_client,
message=SnapshotterIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.CRASHED_CHILD_WORKER.value,
projectID='',
epochId='',
timeOfReporting=datetime.now().isoformat(),
extra=json.dumps(
{
'worker_name': callback_worker_name,
'pid': pid,
'worker_class': callback_worker_class,
'worker_unique_id': callback_worker_unique_id,
'respawned_pid': worker_obj.pid,
}
),
)
)

Deployment Instructions

Pull the latest code and restart.

Ensure the issue reporting service URL in config/settings.json is set the base URL of the reporting service entry point of onchain-consensus

"reporting": {
"slack_url": "https://slack-reporting-url",
"service_url": "https://powerloom-reporting-url"
},

Copy link
Member

@xadahiya xadahiya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving since new changes are deployed on one of the Staging instances and have been working without any issues.
Suggested improvements are also incorporated in the latest commits.

@anomit anomit merged commit 56e058b into main Aug 21, 2023
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Internal API for snapshot processing status per epoch
2 participants