Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VC: Hardening and optimizing time handling. #4743

Merged
merged 8 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions beacon_chain/beacon_clock.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ from times import Time, getTime, fromUnix, `<`, `-`, inNanoseconds

export chronos.Duration, Moment, now

const
NANOSECONDS_PER_SLOT* = SECONDS_PER_SLOT * 1_000_000_000'u64
cheatfate marked this conversation as resolved.
Show resolved Hide resolved
NANOSECONDS_PER_EPOCH* = NANOSECONDS_PER_SLOT * SLOTS_PER_EPOCH

type
BeaconClock* = object
## The beacon clock represents time as it passes on a beacon chain. Beacon
Expand Down Expand Up @@ -70,18 +74,39 @@ proc fromNow*(c: BeaconClock, slot: Slot): tuple[inFuture: bool, offset: Duratio
c.fromNow(slot.start_beacon_time())

proc durationToNextSlot*(c: BeaconClock): Duration =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would remove durationToNextSlot and fromNow - both of these are compound operations that mix "current time" from the global clock with "time arithmetic" which leads to problems and confusion in code that uses them, since getting the "current time" generally should only be done in a controlled way:

  • once per "block" of arithmetic (ie we should not get the current time twice when performing arithmetic because there's an inherent race condition when the clock between the calls)
  • not reused between await boundaries

therefore:

  • remove fromNow, durationToNext* etc
  • use now consistently when the current time is needed as the only function that accesses the global clock
  • introduce a time difference operation like durationFrom(a, b: BeaconTime): tuple[future: bool, dur: Duration] as is done in fromNow
  • replace all code in both bn and vc to use the above functions
    • fromNow becomes slot.start_beacon_time().durationFrom(clock.now())
    • durationToNextSlot becomes let now = clock.now(); saturate((now.slotOrZero() + 1).start_beacon_time().durationFrom(now)) more or less (slotOrZero is incorrect for pre-genesis - it needs to return duration to genesis for any time before genesis)
    • the above can be added as helpers that take a BeaconTime and do the saturation if need be, but these helpers should never access the clock itself
    • more durationFrom can be added for slot vs time etc to make it less verbose

durationFrom can also be durationTo or some other name or shape - doesn't matter which direction it goes in, but since Duration is undefined for negative values, we need to return a tuple - we also sometimes need to know that genesis hasn't happened yet, so we don't always want to saturate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposed solution do not support negative Slot values which is happened when you are running before genesis. So it become impossible to provide such UX experience like it was done now. Because its impossible to supply negative Slot values it is very hard to find proper argument value for durationFrom/durationTo. Its why durationToNextSlot looks the most appropriate name for the procedure which is able to calculate duration to next slot which is either before or after genesis.

let (afterGenesis, slot) = c.now().toSlot()
if afterGenesis:
c.fromNow(slot + 1'u64).offset
let
currentTime = c.now()
currentSlot = currentTime.toSlot()

if currentSlot.afterGenesis:
let nextSlot = currentSlot.slot + 1
chronos.nanoseconds(
(nextSlot.start_beacon_time() - currentTime).nanoseconds)
else:
c.fromNow(Slot(0)).offset
# absoluteTime = BeaconTime(-currentTime.ns_since_genesis).
let
absoluteTime = Slot(0).start_beacon_time() +
(Slot(0).start_beacon_time() - currentTime)
timeToNextSlot = absoluteTime - currentSlot.slot.start_beacon_time()
chronos.nanoseconds(timeToNextSlot.nanoseconds)

proc durationToNextEpoch*(c: BeaconClock): Duration =
let (afterGenesis, slot) = c.now().toSlot()
if afterGenesis:
c.fromNow((slot.epoch + 1).start_slot()).offset
let
currentTime = c.now()
currentSlot = currentTime.toSlot()

if currentSlot.afterGenesis:
let nextEpochSlot = (currentSlot.slot.epoch() + 1).start_slot()
chronos.nanoseconds(
(nextEpochSlot.start_beacon_time() - currentTime).nanoseconds)
else:
c.fromNow(Epoch(0).start_slot()).offset
# absoluteTime = BeaconTime(-currentTime.ns_since_genesis).
let
absoluteTime = Slot(0).start_beacon_time() +
(Slot(0).start_beacon_time() - currentTime)
timeToNextEpoch = absoluteTime -
currentSlot.slot.epoch().start_slot().start_beacon_time()
chronos.nanoseconds(timeToNextEpoch.nanoseconds)

func saturate*(d: tuple[inFuture: bool, offset: Duration]): Duration =
if d.inFuture: d.offset else: seconds(0)
Expand Down
204 changes: 141 additions & 63 deletions beacon_chain/nimbus_validator_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import
libp2p/crypto/crypto,
./rpc/rest_key_management_api,
./validator_client/[
common, fallback_service, duties_service, fork_service,
common, fallback_service, duties_service, fork_service, block_service,
doppelganger_service, attestation_service, sync_committee_service]

const
PREGENESIS_EPOCHS_COUNT = 1

proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} =
info "Initializing genesis", nodes_count = len(vc.beaconNodes)
var nodes = vc.beaconNodes
Expand Down Expand Up @@ -93,16 +96,22 @@ proc initValidators(vc: ValidatorClientRef): Future[bool] {.async.} =
proc initClock(vc: ValidatorClientRef): Future[BeaconClock] {.async.} =
# This procedure performs initialization of BeaconClock using current genesis
# information. It also performs waiting for genesis.
let res = BeaconClock.init(vc.beaconGenesis.genesis_time)
let currentSlot = res.now().slotOrZero()
let currentEpoch = currentSlot.epoch()
info "Initializing beacon clock",
genesis_time = vc.beaconGenesis.genesis_time,
current_slot = currentSlot, current_epoch = currentEpoch
let genesisTime = res.fromNow(start_beacon_time(Slot(0)))
let
res = BeaconClock.init(vc.beaconGenesis.genesis_time)
currentTime = res.now()
currentSlot = currentTime.slotOrZero()
currentEpoch = currentSlot.epoch()
genesisTime = res.fromNow(Slot(0))

if genesisTime.inFuture:
notice "Waiting for genesis", genesisIn = genesisTime.offset
await sleepAsync(genesisTime.offset)
info "Initializing beacon clock",
genesis_time = vc.beaconGenesis.genesis_time,
current_slot = "<n/a>", current_epoch = "<n/a>",
time_to_genesis = genesisTime.offset
else:
info "Initializing beacon clock",
genesis_time = vc.beaconGenesis.genesis_time,
current_slot = currentSlot, current_epoch = currentEpoch
return res

proc initMetrics(vc: ValidatorClientRef): Future[bool] {.async.} =
Expand Down Expand Up @@ -139,58 +148,66 @@ proc shutdownSlashingProtection(vc: ValidatorClientRef) =
info "Closing slashing protection", path = vc.config.validatorsDir()
vc.attachedValidators[].slashingProtection.close()

proc onSlotStart(vc: ValidatorClientRef, wallTime: BeaconTime,
lastSlot: Slot): Future[bool] {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late.
## wallTime: current system time - we will strive to perform all duties up
## to this point in time
## lastSlot: the last slot that we successfully processed, so we know where to
## start work from - there might be jumps if processing is delayed
proc runVCSlotLoop(vc: ValidatorClientRef) {.async.} =
var
startTime = vc.beaconClock.now()
curSlot = startTime.slotOrZero()
nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1
timeToNextSlot = nextSlot.start_beacon_time() - startTime

let
# The slot we should be at, according to the clock
beaconTime = wallTime
wallSlot = wallTime.toSlot()
info "Scheduling first slot action",
start_time = shortLog(startTime),
current_slot = shortLog(curSlot),
next_slot = shortLog(nextSlot),
time_to_next_slot = shortLog(timeToNextSlot)

let
# If everything was working perfectly, the slot that we should be processing
expectedSlot = lastSlot + 1
delay = wallTime - expectedSlot.start_beacon_time()
var currentSlot = Opt.some(curSlot)

if checkIfShouldStopAtEpoch(wallSlot.slot, vc.config.stopAtEpoch):
return true
while true:
currentSlot = await vc.checkedWaitForNextSlot(currentSlot, ZeroTimeDiff,
true)
if currentSlot.isNone():
## Fatal log line should be printed by checkedWaitForNextSlot().
return

if len(vc.beaconNodes) > 1:
let
counts = vc.getNodeCounts()
# Good nodes are nodes which can be used for ALL the requests.
goodNodes = counts.data[int(RestBeaconNodeStatus.Synced)]
# Viable nodes are nodes which can be used only SOME of the requests.
viableNodes = counts.data[int(RestBeaconNodeStatus.OptSynced)] +
counts.data[int(RestBeaconNodeStatus.NotSynced)] +
counts.data[int(RestBeaconNodeStatus.Compatible)]
# Bad nodes are nodes which can't be used at all.
badNodes = counts.data[int(RestBeaconNodeStatus.Offline)] +
counts.data[int(RestBeaconNodeStatus.Online)] +
counts.data[int(RestBeaconNodeStatus.Incompatible)]
info "Slot start",
slot = shortLog(wallSlot.slot),
attestationIn = vc.getDurationToNextAttestation(wallSlot.slot),
blockIn = vc.getDurationToNextBlock(wallSlot.slot),
validators = vc.attachedValidators[].count(),
good_nodes = goodNodes, viable_nodes = viableNodes, bad_nodes = badNodes,
delay = shortLog(delay)
else:
info "Slot start",
slot = shortLog(wallSlot.slot),
attestationIn = vc.getDurationToNextAttestation(wallSlot.slot),
blockIn = vc.getDurationToNextBlock(wallSlot.slot),
validators = vc.attachedValidators[].count(),
node_status = $vc.beaconNodes[0].status,
delay = shortLog(delay)

return false
wallTime = vc.beaconClock.now()
wallSlot = currentSlot.get()
delay = wallTime - wallSlot.start_beacon_time()

if checkIfShouldStopAtEpoch(wallSlot, vc.config.stopAtEpoch):
return

if len(vc.beaconNodes) > 1:
let
counts = vc.getNodeCounts()
# Good nodes are nodes which can be used for ALL the requests.
goodNodes = counts.data[int(RestBeaconNodeStatus.Synced)]
# Viable nodes are nodes which can be used only SOME of the requests.
viableNodes = counts.data[int(RestBeaconNodeStatus.OptSynced)] +
counts.data[int(RestBeaconNodeStatus.NotSynced)] +
counts.data[int(RestBeaconNodeStatus.Compatible)]
# Bad nodes are nodes which can't be used at all.
badNodes = counts.data[int(RestBeaconNodeStatus.Offline)] +
counts.data[int(RestBeaconNodeStatus.Online)] +
counts.data[int(RestBeaconNodeStatus.Incompatible)]
info "Slot start",
slot = shortLog(wallSlot),
epoch = shortLog(wallSlot.epoch()),
attestationIn = vc.getDurationToNextAttestation(wallSlot),
blockIn = vc.getDurationToNextBlock(wallSlot),
validators = vc.attachedValidators[].count(),
good_nodes = goodNodes, viable_nodes = viableNodes,
bad_nodes = badNodes, delay = shortLog(delay)
else:
info "Slot start",
slot = shortLog(wallSlot),
epoch = shortLog(wallSlot.epoch()),
attestationIn = vc.getDurationToNextAttestation(wallSlot),
blockIn = vc.getDurationToNextBlock(wallSlot),
validators = vc.attachedValidators[].count(),
node_status = $vc.beaconNodes[0].status,
delay = shortLog(delay)

proc new*(T: type ValidatorClientRef,
config: ValidatorClientConf,
Expand Down Expand Up @@ -224,6 +241,8 @@ proc new*(T: type ValidatorClientRef,
config: config,
beaconNodes: beaconNodes,
graffitiBytes: config.graffiti.get(defaultGraffitiBytes()),
preGenesisEvent: newAsyncEvent(),
genesisEvent: newAsyncEvent(),
nodesAvailable: newAsyncEvent(),
forksAvailable: newAsyncEvent(),
doppelExit: newAsyncEvent(),
Expand All @@ -239,6 +258,8 @@ proc new*(T: type ValidatorClientRef,
config: config,
beaconNodes: beaconNodes,
graffitiBytes: config.graffiti.get(defaultGraffitiBytes()),
preGenesisEvent: newAsyncEvent(),
genesisEvent: newAsyncEvent(),
nodesAvailable: newAsyncEvent(),
forksAvailable: newAsyncEvent(),
indicesAvailable: newAsyncEvent(),
Expand All @@ -260,8 +281,8 @@ proc asyncInit(vc: ValidatorClientRef): Future[ValidatorClientRef] {.async.} =

vc.beaconGenesis = await vc.initGenesis()
info "Genesis information", genesis_time = vc.beaconGenesis.genesis_time,
genesis_fork_version = vc.beaconGenesis.genesis_fork_version,
genesis_root = vc.beaconGenesis.genesis_validators_root
genesis_fork_version = vc.beaconGenesis.genesis_fork_version,
genesis_root = vc.beaconGenesis.genesis_validators_root

vc.beaconClock = await vc.initClock()

Expand Down Expand Up @@ -295,6 +316,7 @@ proc asyncInit(vc: ValidatorClientRef): Future[ValidatorClientRef] {.async.} =
vc.dutiesService = await DutiesServiceRef.init(vc)
vc.doppelgangerService = await DoppelgangerServiceRef.init(vc)
vc.attestationService = await AttestationServiceRef.init(vc)
vc.blockService = await BlockServiceRef.init(vc)
vc.syncCommitteeService = await SyncCommitteeServiceRef.init(vc)
vc.keymanagerServer = keymanagerInitResult.server
if vc.keymanagerServer != nil:
Expand Down Expand Up @@ -322,12 +344,65 @@ proc asyncInit(vc: ValidatorClientRef): Future[ValidatorClientRef] {.async.} =

return vc

proc runPreGenesisWaitingLoop(vc: ValidatorClientRef) {.async.} =
var breakLoop = false
while not(breakLoop):
let
genesisTime = vc.beaconClock.fromNow(Slot(0))
currentEpoch = vc.beaconClock.now().toSlot().slot.epoch()

if not(genesisTime.inFuture) or currentEpoch < PREGENESIS_EPOCHS_COUNT:
break

notice "Waiting for genesis",
genesis_time = vc.beaconGenesis.genesis_time,
time_to_genesis = genesisTime.offset

breakLoop =
try:
await sleepAsync(vc.beaconClock.durationToNextSlot())
false
except CancelledError:
debug "Pre-genesis waiting loop was interrupted"
true
except CatchableError as exc:
error "Pre-genesis waiting loop failed with unexpected error",
err_name = $exc.name, err_msg = $exc.msg
true
vc.preGenesisEvent.fire()

proc runGenesisWaitingLoop(vc: ValidatorClientRef) {.async.} =
var breakLoop = false
while not(breakLoop):
let genesisTime = vc.beaconClock.fromNow(Slot(0))

if not(genesisTime.inFuture):
break

notice "Waiting for genesis",
genesis_time = vc.beaconGenesis.genesis_time,
time_to_genesis = genesisTime.offset

breakLoop =
try:
await sleepAsync(vc.beaconClock.durationToNextSlot())
false
except CancelledError:
debug "Genesis waiting loop was interrupted"
true
except CatchableError as exc:
error "Genesis waiting loop failed with unexpected error",
err_name = $exc.name, err_msg = $exc.msg
true
vc.genesisEvent.fire()

proc asyncRun*(vc: ValidatorClientRef) {.async.} =
vc.fallbackService.start()
vc.forkService.start()
vc.dutiesService.start()
vc.doppelgangerService.start()
vc.attestationService.start()
vc.blockService.start()
vc.syncCommitteeService.start()

if not isNil(vc.keymanagerServer):
Expand All @@ -337,7 +412,12 @@ proc asyncRun*(vc: ValidatorClientRef) {.async.} =

let doppelEventFut = vc.doppelExit.wait()
try:
vc.runSlotLoopFut = runSlotLoop(vc, vc.beaconClock.now(), onSlotStart)
# Waiting for `GENESIS - PREGENESIS_EPOCHS_COUNT` loop.
await vc.runPreGenesisWaitingLoop()
# Waiting for `GENESIS` loop.
await vc.runGenesisWaitingLoop()
# Main processing loop.
vc.runSlotLoopFut = vc.runVCSlotLoop()
vc.runKeystoreCachePruningLoopFut =
runKeystorecachePruningLoop(vc.keystoreCache)
discard await race(vc.runSlotLoopFut, doppelEventFut)
Expand All @@ -355,8 +435,6 @@ proc asyncRun*(vc: ValidatorClientRef) {.async.} =
if doppelEventFut.completed():
# Critically, database has been shut down - the rest doesn't matter, we need
# to stop as soon as possible
# TODO we need to actually quit _before_ any other async tasks have had the
# chance to happen
quitDoppelganger()

debug "Stopping main processing loop"
Expand All @@ -373,10 +451,10 @@ proc asyncRun*(vc: ValidatorClientRef) {.async.} =
pending.add(vc.dutiesService.stop())
pending.add(vc.doppelgangerService.stop())
pending.add(vc.attestationService.stop())
pending.add(vc.blockService.stop())
pending.add(vc.syncCommitteeService.stop())
if not isNil(vc.keymanagerServer):
pending.add(vc.keymanagerServer.stop())

await allFutures(pending)

template runWithSignals(vc: ValidatorClientRef, body: untyped): bool =
Expand Down
Loading