Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSM oracle hot fixes #442

Merged
merged 27 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
83e0a0d
fix: get_stuck_node_operators args order
madlabman Apr 29, 2024
89c0736
chore(config): do not allow infinite cycle run
madlabman Apr 29, 2024
14b865e
chore: Checkpoint types
madlabman Apr 29, 2024
483a981
fix: find non-missed slots for stuck events search
madlabman Apr 29, 2024
c93d2b5
chore: web3py csm logging
madlabman Apr 29, 2024
512810e
chore: typings for CSModule
madlabman Apr 29, 2024
f616c46
fix: performance cache invalidation and storage
madlabman Apr 29, 2024
7188c20
refactor: extract current frame slots computation
madlabman Apr 29, 2024
88d4b03
chore: fix linters
madlabman Apr 29, 2024
2623773
chore: pinch of logging
madlabman Apr 29, 2024
cdca06c
fix: tweak checkpoints factory
madlabman Apr 30, 2024
1059ec2
chore: docs and logs
madlabman Apr 30, 2024
bce38a4
chore: check cache invariant
madlabman Apr 30, 2024
4d3c13a
fix: rework cache storage once again
madlabman Apr 30, 2024
185ae48
refactor: simplify performance cache
madlabman May 2, 2024
b5f414f
chore: get_first_non_missed_slot forward mode checks
madlabman May 6, 2024
7a0f6ab
chore: wording here and there
madlabman May 6, 2024
cb34b0c
refactor: seq -> sequence
madlabman May 6, 2024
14e9f8d
chore: prepare_checkpoints rearrangements
madlabman May 6, 2024
b934a05
chore: add TODO
madlabman May 6, 2024
5f28fb9
chore: simplify get_stuck_keys_events left block calculation
madlabman May 6, 2024
98238f8
fix: assert False in tree.py
madlabman May 6, 2024
bbe4b5e
docs: upd README
madlabman May 6, 2024
9db8bf8
chore: remove unused import
madlabman May 6, 2024
ba4a1d3
chore: raise ValueError on unsupported type in tree encoder
madlabman May 6, 2024
42b7a8c
chore: drop YAML support TODO
madlabman May 6, 2024
7280417
fix: check for missing epochs in validate_state
madlabman May 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ In manual mode all sleeps are disabled and `ALLOW_REPORTING_IN_BUNKER_MODE` is T
| `DAEMON` | If False Oracle runs one cycle and ask for manual input to send report. | False | `True` |
| `TX_GAS_ADDITION` | Used to modify gas parameter that used in transaction. (gas = estimated_gas + TX_GAS_ADDITION) | False | `100000` |
| `CYCLE_SLEEP_IN_SECONDS` | The time between cycles of the oracle's activity | False | `12` |
| `MAX_CYCLE_LIFETIME_IN_SECONDS` | The maximum time for a cycle to continue (0 = infinity) | False | `3000` |
| `MAX_CYCLE_LIFETIME_IN_SECONDS` | The maximum time for a cycle to continue | False | `3000` |
| `SUBMIT_DATA_DELAY_IN_SLOTS` | The difference in slots between submit data transactions from Oracles. It is used to prevent simultaneous sending of transactions and, as a result, transactions revert. | False | `6` |
| `HTTP_REQUEST_TIMEOUT_EXECUTION` | Timeout for HTTP execution layer requests | False | `120` |
| `HTTP_REQUEST_TIMEOUT_CONSENSUS` | Timeout for HTTP consensus layer requests | False | `300` |
Expand Down Expand Up @@ -305,7 +305,7 @@ export KEYS_API_URI=...
export LIDO_LOCATOR_ADDRESS=...
export CSM_ORACLE_ADDRESS=...
export CSM_MODULE_ADDRESS=...
export MAX_CYCLE_LIFETIME_IN_SECONDS=0
export MAX_CYCLE_LIFETIME_IN_SECONDS=60000 # Reasonable high value to make sure the oracle have enough time.
madlabman marked this conversation as resolved.
Show resolved Hide resolved
```

Run oracle module
Expand Down

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def main(module_name: OracleModule):
logger.info({'msg': 'Initialize keys api client.'})
kac = KeysAPIClientModule(variables.KEYS_API_URI, web3)

logger.info({'msg': 'Check configured providers.'})
check_providers_chain_ids(web3, cc, kac)

web3.attach_modules({
Expand Down
30 changes: 24 additions & 6 deletions src/modules/csm/checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import time
from threading import Thread, Lock
from typing import cast
from typing import Any, Iterable, cast

from src.modules.csm.typings import FramePerformance
from src.providers.consensus.client import ConsensusClient
Expand Down Expand Up @@ -38,15 +38,33 @@ def prepare_checkpoints(
def _prepare_checkpoint(_slot: SlotNumber, _duty_epochs: list[EpochNumber]):
return Checkpoint(self.cc, self.converter, self.frame_performance, _slot, _duty_epochs)

processing_delay = finalized_epoch - (max(self.frame_performance.processed_epochs, default=0) or l_epoch)
checkpoint_step = min(self.MAX_CHECKPOINT_STEP, max(processing_delay, self.MIN_CHECKPOINT_STEP))
def _max_in_seq(items: Iterable[Any]) -> Any:
sorted_ = sorted(items)
assert sorted_
item = sorted_[0]
for curr in sorted_:
if curr - item > 1:
break
item = curr
return item

l_epoch = _max_in_seq((l_epoch, *self.frame_performance.processed_epochs))
vgorkavenko marked this conversation as resolved.
Show resolved Hide resolved
processing_delay = finalized_epoch - l_epoch

if l_epoch == r_epoch:
madlabman marked this conversation as resolved.
Show resolved Hide resolved
logger.info({"msg": "All epochs processed, no checkpoints required"})
madlabman marked this conversation as resolved.
Show resolved Hide resolved
return []

if processing_delay < self.MIN_CHECKPOINT_STEP and finalized_epoch < r_epoch:
madlabman marked this conversation as resolved.
Show resolved Hide resolved
logger.info({"msg": f"Minimum checkpoint step is not reached, current delay is {processing_delay}"})
vgorkavenko marked this conversation as resolved.
Show resolved Hide resolved
return []

duty_epochs = cast(list[EpochNumber], list(range(l_epoch, r_epoch + 1)))
checkpoints: list[Checkpoint] = []
checkpoint_epochs = []
for index, epoch in enumerate(duty_epochs, 1):
checkpoint_epochs.append(epoch)
if index % checkpoint_step == 0 or epoch == r_epoch:
if index % self.MAX_CHECKPOINT_STEP == 0 or epoch == r_epoch:
madlabman marked this conversation as resolved.
Show resolved Hide resolved
checkpoint_slot = self.converter.get_epoch_last_slot(EpochNumber(epoch + 1))
checkpoints.append(_prepare_checkpoint(checkpoint_slot, checkpoint_epochs))
logger.info(
Expand All @@ -69,7 +87,7 @@ class Checkpoint:

slot: SlotNumber # last slot of the epoch
duty_epochs: list[EpochNumber] # max 255 elements
block_roots: list[BlockRoot] # max 8192 elements
block_roots: list[BlockRoot | None] # max 8192 elements

def __init__(
self,
Expand Down Expand Up @@ -118,7 +136,7 @@ def _await_all_threads(self):

def _select_roots_to_check(
self, duty_epoch: EpochNumber
) -> list[BlockRoot]:
) -> list[BlockRoot | None]:
# inspired by the spec
# https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#get_block_root_at_slot
roots_to_check = []
Expand Down
77 changes: 57 additions & 20 deletions src/modules/csm/csm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from src.providers.execution.contracts.CSFeeOracle import CSFeeOracle
from src.typings import BlockStamp, EpochNumber, ReferenceBlockStamp, SlotNumber, ValidatorIndex
from src.utils.cache import global_lru_cache as lru_cache
from src.utils.slot import get_first_non_missed_slot
from src.utils.web3converter import Web3Converter
from src.web3py.extensions.lido_validators import NodeOperatorId, StakingModule, ValidatorsByNodeOperator
from src.web3py.typings import Web3
Expand Down Expand Up @@ -52,6 +53,9 @@ def execute_module(self, last_finalized_blockstamp: BlockStamp) -> ModuleExecute
collected = self._collect_data(last_finalized_blockstamp)
if not collected:
# The data is not fully collected yet, wait for the next epoch
madlabman marked this conversation as resolved.
Show resolved Hide resolved
logger.info(
{"msg": "Data required for the report is not fully collected yet, waiting for the next finalized epoch"}
madlabman marked this conversation as resolved.
Show resolved Hide resolved
)
return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH
# pylint:disable=duplicate-code
report_blockstamp = self.get_blockstamp_for_report(last_finalized_blockstamp)
Expand All @@ -72,9 +76,21 @@ def build_report(self, blockstamp: ReferenceBlockStamp) -> tuple:
self._print_collect_result()

threshold = self.frame_performance.avg_perf * self.w3.csm.oracle.perf_threshold(blockstamp.block_hash)
# NOTE: r_block is guaranteed to be <= ref_slot, and the check
# in the inner frames assures the l_block <= r_block.
stuck_operators = self.w3.csm.get_csm_stuck_node_operators(
self._slot_to_block_identifier(self.frame_performance.l_slot),
self._slot_to_block_identifier(self.frame_performance.r_slot),
get_first_non_missed_slot(
self.w3.cc,
self.frame_performance.l_slot,
blockstamp.slot_number,
direction='forward',
).message.body.execution_payload.block_hash,
get_first_non_missed_slot(
self.w3.cc,
self.frame_performance.r_slot,
blockstamp.slot_number,
direction='back',
).message.body.execution_payload.block_hash,
)

operators = self.module_validators_by_node_operators(blockstamp)
Expand Down Expand Up @@ -178,32 +194,27 @@ def module_validators_by_node_operators(self, blockstamp: BlockStamp) -> Validat
def _collect_data(self, blockstamp: BlockStamp) -> bool:
"""Ongoing report data collection before the report ref slot and it's submission"""
logger.info({"msg": "Collecting data for the report"})
converter = Web3Converter(self.get_chain_config(blockstamp), self.get_frame_config(blockstamp))

r_ref_slot = self.get_current_frame(blockstamp).ref_slot
l_ref_slot = self.w3.csm.get_csm_last_processing_ref_slot(blockstamp)
if not l_ref_slot:
# The very first report, no previous ref slot
l_ref_slot = r_ref_slot - converter.get_epoch_last_slot(EpochNumber(converter.frame_config.epochs_per_frame))
if l_ref_slot == r_ref_slot:
# We are between reports, next report slot didn't happen yet. Predicting the next ref slot for the report
# To calculate epochs range to collect the data
r_ref_slot = converter.get_epoch_last_slot(EpochNumber(
converter.get_epoch_by_slot(l_ref_slot) + converter.frame_config.epochs_per_frame
))

l_ref_slot, r_ref_slot = self.current_frame_range(blockstamp)
logger.info({"msg": f"Frame for performance data collect: ({l_ref_slot};{r_ref_slot}]"})

# TODO: To think about the proper cache invalidation conditions.
if self.frame_performance:
if self.frame_performance.l_slot < l_ref_slot:
# If the cache is in memory, its left border should follow up the last ref slot.
assert self.frame_performance.l_slot <= l_ref_slot
# If the frame is extending we can reuse the cache.
if r_ref_slot > self.frame_performance.r_slot:
self.frame_performance.r_slot = r_ref_slot
# If the collected data overlaps the current frame, the cache should be invalidated.
if l_ref_slot > self.frame_performance.l_slot or r_ref_slot < self.frame_performance.r_slot:
self.frame_performance = None
madlabman marked this conversation as resolved.
Show resolved Hide resolved

if not self.frame_performance:
self.frame_performance = FramePerformance.try_read(l_ref_slot) or FramePerformance(
l_slot=l_ref_slot, r_slot=r_ref_slot
self.frame_performance = FramePerformance.try_read(
l_slot=l_ref_slot,
r_slot=r_ref_slot,
)

converter = self.converter(blockstamp)
# Finalized slot is the first slot of justifying epoch, so we need to take the previous
finalized_epoch = EpochNumber(converter.get_epoch_by_slot(blockstamp.slot_number) - 1)

Expand All @@ -223,9 +234,35 @@ def _collect_data(self, blockstamp: BlockStamp) -> bool:
logger.info({"msg": f"Processing checkpoint for slot {checkpoint.slot}"})
logger.info({"msg": f"Processing {len(checkpoint.duty_epochs)} epochs"})
checkpoint.process(blockstamp)
logger.info({"msg": f"All epochs processed in {time.time() - start:.2f} seconds"})
if checkpoints:
logger.info({"msg": f"All epochs processed in {time.time() - start:.2f} seconds"})
madlabman marked this conversation as resolved.
Show resolved Hide resolved
return self.frame_performance.is_coherent

def current_frame_range(self, blockstamp: BlockStamp) -> tuple[SlotNumber, SlotNumber]:
l_ref_slot = self.w3.csm.get_csm_last_processing_ref_slot(blockstamp)
r_ref_slot = self.get_current_frame(blockstamp).ref_slot

converter = self.converter(blockstamp)

# The very first report, no previous ref slot.
if not l_ref_slot:
l_ref_slot = SlotNumber(
r_ref_slot - converter.get_epoch_last_slot(EpochNumber(converter.frame_config.epochs_per_frame))
)
madlabman marked this conversation as resolved.
Show resolved Hide resolved

# We are between reports, next report slot didn't happen yet. Predicting the next ref slot for the report
# to calculate epochs range to collect the data.
if l_ref_slot == r_ref_slot:
r_ref_slot = converter.get_epoch_last_slot(
EpochNumber(converter.get_epoch_by_slot(l_ref_slot) + converter.frame_config.epochs_per_frame)
)

return l_ref_slot, r_ref_slot

@lru_cache(maxsize=1)
def converter(self, blockstamp: BlockStamp) -> Web3Converter:
return Web3Converter(self.get_chain_config(blockstamp), self.get_frame_config(blockstamp))

def _print_collect_result(self):
assert self.frame_performance
assigned = 0
Expand Down
42 changes: 30 additions & 12 deletions src/modules/csm/typings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import os
import pickle
from dataclasses import dataclass, field
from pathlib import Path
from statistics import mean
from typing import Any, Self

Expand Down Expand Up @@ -41,15 +43,15 @@ def perf(self) -> float:
return self.included / self.assigned


# INFO: Using slots here to compare after loading and object from pickle.
@dataclass(slots=True)
@dataclass(slots=True, repr=False)
class FramePerformance:
"""Data structure to store required data for performance calculation within the given frame."""

l_slot: SlotNumber
r_slot: SlotNumber

aggr_per_val: dict[ValidatorIndex, AttestationsAggregate] = field(default_factory=dict)

processed_epochs: set[EpochNumber] = field(default_factory=set)
processed_roots: set[BlockRoot] = field(default_factory=set)

Expand All @@ -61,8 +63,11 @@ class FramePerformance:

__schema__: str | None = None

EXTENSION = ".pkl"

def __post_init__(self) -> None:
self.__schema__ = self.schema()
logger.info({"msg": f"New instance of {repr(self)} created"})

@classmethod
def schema(cls) -> str:
Expand Down Expand Up @@ -92,31 +97,44 @@ def dump(
perf_data.included += 1 if validator['included'] else 0
self.aggr_per_val[key] = perf_data

with open(self.filename(self.l_slot), mode="wb") as f:
with self.buffer.open(mode="wb") as f:
pickle.dump(self, f)

os.replace(self.buffer, self.file(self.l_slot, self.r_slot))

@classmethod
def try_read(cls, l_slot: SlotNumber) -> Self | None:
def try_read(cls, l_slot: SlotNumber, r_slot: SlotNumber) -> Self:
"""Used to restore the object from the persistent storage."""

filename = cls.filename(l_slot)
obj = None
file = cls.file(l_slot, r_slot)
obj: Self | None = None

try:
with open(filename, mode="rb") as f:
with file.open(mode="rb") as f:
obj = pickle.load(f)
assert obj

logger.info({"msg": f"{repr(obj)} read from {file.absolute()}"})

# TODO: To think about a better way to check for schema changes.
if cls.schema() != obj.__schema__:
raise ValueError("Schema mismatch")
except Exception as e: # pylint: disable=broad-exception-caught
logger.info({"msg": f"Unable to restore FramePerformance instance from {filename}", "error": str(e)})
logger.info({"msg": f"Unable to restore FramePerformance instance from {file.absolute()}", "error": str(e)})

return obj
return obj or cls(l_slot=l_slot, r_slot=r_slot)

@property
def is_coherent(self) -> bool:
return (self.r_slot - self.l_slot) // 32 == len(self.processed_epochs)

@staticmethod
def filename(l_slot: SlotNumber) -> str:
return f"{l_slot}.pkl"
@classmethod
def file(cls, l_slot: SlotNumber, r_slot: SlotNumber) -> Path:
return Path(f"{l_slot}_{r_slot}").with_suffix(cls.EXTENSION)

@property
def buffer(self) -> Path:
return self.file(self.l_slot, self.r_slot).with_suffix(".buf")

def __repr__(self) -> str:
return f"{self.__class__.__name__}(l_slot={self.l_slot},r_slot={self.r_slot})"
madlabman marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion src/modules/submodules/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def process_report(self, blockstamp: ReferenceBlockStamp) -> None:
logger.info({'msg': 'Build report.', 'value': report_data})

report_hash = self._encode_data_hash(report_data)
logger.info({'msg': 'Calculate report hash.', 'value': report_hash})
logger.info({'msg': 'Calculate report hash.', 'value': repr(report_hash)})
# We need to check whether report has unexpected data before sending.
# otherwise we have to check it manually.
if not self.is_reporting_allowed(blockstamp):
Expand Down
22 changes: 13 additions & 9 deletions src/providers/execution/contracts/CSModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from itertools import groupby
from typing import Callable, Iterable, cast

from eth_typing.evm import ChecksumAddress
from eth_typing import BlockNumber, ChecksumAddress
from web3.contract.contract import Contract, ContractEvent
from web3.types import BlockIdentifier, EventData

Expand All @@ -24,13 +24,15 @@ def __init__(self, address: ChecksumAddress | None = None) -> None:
def get_stuck_node_operators(self, l_block: BlockIdentifier, r_block: BlockIdentifier) -> Iterable:
"""Returns node operators assumed to be stuck for the given frame (defined by the blockstamps)"""

l_block_number = int(self.w3.eth.get_block(l_block)["number"]) # type: ignore
l_block_number = self.w3.eth.get_block(l_block).get("number", BlockNumber(0))
r_block_number = self.w3.eth.get_block(r_block).get("number", BlockNumber(0))
assert l_block_number <= r_block_number

by_no_id: Callable[[EventData], int] = lambda e: int(
e["args"]["nodeOperatorId"]
) # TODO: Maybe it's int already?

events = sorted(self.get_stuck_keys_events(r_block), key=by_no_id)
events = sorted(self.get_stuck_keys_events(r_block_number), key=by_no_id)
for no_id, group in groupby(events, key=by_no_id):
last_event = sorted(tuple(group), key=lambda e: e["blockNumber"])[-1]
# Operators unstucked at the very beginning of the frame are fine.
Expand All @@ -42,12 +44,12 @@ def get_stuck_node_operators(self, l_block: BlockIdentifier, r_block: BlockIdent

yield no_id

def get_stuck_keys_events(self, block: BlockIdentifier) -> Iterable[EventData]:
# TODO: Make a cache for the events?
def get_stuck_keys_events(self, r_block: BlockNumber) -> Iterable[EventData]:
"""Fetch all the StuckSigningKeysCountChanged events up to the given block (closed interval)"""

r_block = int(self.w3.eth.get_block(block)["number"]) # type: ignore
l_block = r_block - variables.EVENTS_SEARCH_STEP
l_block = 0 if l_block < 0 else l_block
l_block = BlockNumber(r_block - variables.EVENTS_SEARCH_STEP)
l_block = BlockNumber(0) if l_block < 0 else l_block
madlabman marked this conversation as resolved.
Show resolved Hide resolved

while l_block >= 0:
logger.info({"msg": f"Fetching stuck node operators events in range [{l_block};{r_block}]"})
Expand All @@ -60,10 +62,12 @@ def get_stuck_keys_events(self, block: BlockIdentifier) -> Iterable[EventData]:
if not self.is_deployed(l_block):
break

r_block = l_block - 1
l_block = r_block - variables.EVENTS_SEARCH_STEP
r_block = BlockNumber(l_block - 1)
l_block = BlockNumber(r_block - variables.EVENTS_SEARCH_STEP)

# TODO: Move to a base contract class.
def is_deployed(self, block: BlockIdentifier) -> bool:
logger.info({"msg": f"Check that the contract {self.__class__.__name__} id deployed at {block=}"})
madlabman marked this conversation as resolved.
Show resolved Hide resolved
return self.w3.eth.get_code(self.address, block_identifier=block) != b""

def is_paused(self, block: BlockIdentifier = "latest") -> bool:
Expand Down
Loading
Loading