diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 6b05dd3df9..47c0a9f531 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -905,6 +905,11 @@ type defaultValue: 0 name: "stop-at-epoch" .}: uint64 + payloadBuilderEnable* {. + desc: "Enable usage of beacon node with external payload builder" + defaultValue: false + name: "payload-builder" .}: bool + beaconNodes* {. desc: "URL addresses to one or more beacon node HTTP REST APIs", defaultValue: @[defaultBeaconNodeUri] diff --git a/beacon_chain/spec/eth2_apis/rest_validator_calls.nim b/beacon_chain/spec/eth2_apis/rest_validator_calls.nim index 1f74a7ce3e..c971da7cf3 100644 --- a/beacon_chain/spec/eth2_apis/rest_validator_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_validator_calls.nim @@ -87,3 +87,8 @@ proc prepareBeaconProposer*(body: seq[PrepareBeaconProposer]): RestPlainResponse rest, endpoint: "/eth/v1/validator/prepare_beacon_proposer", meth: MethodPost.} ## https://ethereum.github.io/beacon-APIs/#/ValidatorRequiredApi/prepareBeaconProposer + +proc registerValidator*(body: seq[SignedValidatorRegistrationV1]): RestPlainResponse {. + rest, endpoint: "/eth/v1/validator/register_validator", + meth: MethodPost.} + ## https://ethereum.github.io/beacon-APIs/#/Validator/registerValidator diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index 8d7bb3baa9..220ce074c5 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -2087,3 +2087,43 @@ proc prepareBeaconProposer*( debug "Beacon proposer preparation failed", status = response.status, endpoint = apiResponse.node return count + +proc registerValidator*( + vc: ValidatorClientRef, + data: seq[SignedValidatorRegistrationV1] + ): Future[int] {.async.} = + logScope: request = "registerValidators" + let resp = vc.onceToAll(RestPlainResponse, SlotDuration, + {BeaconNodeRole.BlockProposalPublish}, + registerValidator(it, data)) + if len(resp.data) == 0: + # We did not get any response from beacon nodes. + case resp.status + of ApiOperation.Success: + # This should not be happened, there should be present at least one + # successfull response. + return 0 + of ApiOperation.Timeout: + debug "Unable to register validators in time", + timeout = SlotDuration + return 0 + of ApiOperation.Interrupt: + debug "Validator registration was interrupted" + return 00 + of ApiOperation.Failure: + debug "Unexpected error happened while registering validators" + return 0 + else: + var count = 0 + for apiResponse in resp.data: + if apiResponse.data.isErr(): + debug "Unable to register validator with beacon node", + endpoint = apiResponse.node, error = apiResponse.data.error() + else: + let response = apiResponse.data.get() + if response.status == 200: + inc(count) + else: + debug "Unable to register validators with beacon node", + status = response.status, endpoint = apiResponse.node + return count diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index 7dbb25c97e..54808e216e 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -18,13 +18,15 @@ import ".."/validators/[keystore_management, validator_pool, slashing_protection], ".."/[conf, beacon_clock, version, nimbus_binary_common] +from std/times import Time, toUnix, fromUnix, getTime + export os, sets, sequtils, chronos, presto, chronicles, confutils, nimbus_binary_common, version, conf, options, tables, results, base10, byteutils, presto_client, eth2_rest_serialization, rest_beacon_client, phase0, altair, helpers, signatures, validator, eth2_merkleization, beacon_clock, keystore_management, slashing_protection, validator_pool, - dynamic_fee_recipients + dynamic_fee_recipients, Time, toUnix, fromUnix, getTime const SYNC_TOLERANCE* = 4'u64 @@ -32,6 +34,8 @@ const HISTORICAL_DUTIES_EPOCHS* = 2'u64 TIME_DELAY_FROM_SLOT* = 79.milliseconds SUBSCRIPTION_BUFFER_SLOTS* = 2'u64 + VALIDATOR_DEFAULT_GAS_LIMIT* = 30_000_000'u64 # Stand-in, reasonable default + EPOCHS_BETWEEN_VALIDATOR_REGISTRATION* = 1 DelayBuckets* = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05, 0.05, 0.1, 0.5, 1.0, 2.0, 4.0, 8.0, Inf] @@ -44,6 +48,13 @@ type slot*: Slot proposers*: seq[ValidatorPubKey] + RegistrationKind* {.pure.} = enum + Cached, IncorrectTime, MissingIndex, MissingFee, ErrorSignature, NoSignature + + PendingValidatorRegistration* = object + registration*: SignedValidatorRegistrationV1 + future*: Future[SignatureResult] + ClientServiceRef* = ref object of RootObj name*: string state*: ServiceState @@ -176,6 +187,7 @@ type beaconGenesis*: RestGenesis proposerTasks*: Table[Slot, seq[ProposerTask]] dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore + validatorsRegCache*: Table[ValidatorPubKey, SignedValidatorRegistrationV1] rng*: ref HmacDrbgContext ValidatorClientRef* = ref ValidatorClient @@ -736,3 +748,160 @@ proc prepareProposersList*(vc: ValidatorClientRef, res.add(PrepareBeaconProposer(validator_index: index, fee_recipient: feeRecipient.get())) res + +proc isDefault*(reg: SignedValidatorRegistrationV1): bool = + (reg.message.timestamp == 0'u64) or (reg.message.gas_limit == 0'u64) + +proc isExpired*(vc: ValidatorClientRef, + reg: SignedValidatorRegistrationV1, slot: Slot): bool = + let + regTime = fromUnix(int64(reg.message.timestamp)) + regSlot = + block: + let res = vc.beaconClock.toSlot(regTime) + if not(res.afterGenesis): + # This case should not be happend, but it could in case of time jumps + # (time could be modified by admin or ntpd). + return false + uint64(res.slot) + + if regSlot > slot: + # This case should not be happened, but if it happens (time could be + # modified by admin or ntpd). + false + else: + if (slot - regSlot) div SLOTS_PER_EPOCH >= + EPOCHS_BETWEEN_VALIDATOR_REGISTRATION: + false + else: + true + +proc getValidatorRegistraion( + vc: ValidatorClientRef, + validator: AttachedValidator, + timestamp: Time, + fork: Fork + ): Result[PendingValidatorRegistration, RegistrationKind] = + if validator.index.isNone(): + debug "Validator registration missing validator index", + validator = shortLog(validator) + return err(RegistrationKind.MissingIndex) + + let + vindex = validator.index.get() + cached = vc.validatorsRegCache.getOrDefault(validator.pubkey) + currentSlot = + block: + let res = vc.beaconClock.toSlot(timestamp) + if not(res.afterGenesis): + return err(RegistrationKind.IncorrectTime) + res.slot + + if cached.isDefault() or vc.isExpired(cached, currentSlot): + let feeRecipient = vc.getFeeRecipient(validator.pubkey, vindex, + currentSlot.epoch()) + if feeRecipient.isNone(): + debug "Could not get fee recipient for registration data", + validator = shortLog(validator) + return err(RegistrationKind.MissingFee) + + var registration = + SignedValidatorRegistrationV1( + message: ValidatorRegistrationV1( + fee_recipient: + ExecutionAddress(data: distinctBase(feeRecipient.get())), + gas_limit: VALIDATOR_DEFAULT_GAS_LIMIT, + timestamp: uint64(timestamp.toUnix()), + pubkey: validator.pubkey + ) + ) + + let sigfut = validator.getBuilderSignature(fork, registration.message) + if sigfut.finished(): + # This is short-path if we able to create signature locally. + if not(sigfut.done()): + let exc = sigfut.readError() + debug "Got unexpected exception while signing validator registration", + validator = shortLog(validator), error_name = $exc.name, + error_msg = $exc.msg + return err(RegistrationKind.ErrorSignature) + let sigres = sigfut.read() + if sigres.isErr(): + debug "Failed to get signature for validator registration", + validator = shortLog(validator), error = sigres.error() + return err(RegistrationKind.NoSignature) + registration.signature = sigres.get() + # Updating cache table with new signed registration data + vc.validatorsRegCache[registration.message.pubkey] = registration + ok(PendingValidatorRegistration(registration: registration, future: nil)) + else: + # Remote signature service involved, cache will be updated later. + ok(PendingValidatorRegistration(registration: registration, + future: sigfut)) + else: + # Returning cached result. + err(RegistrationKind.Cached) + +proc prepareRegistrationList*( + vc: ValidatorClientRef, + timestamp: Time, + fork: Fork + ): Future[seq[SignedValidatorRegistrationV1]] {.async.} = + + var + messages: seq[SignedValidatorRegistrationV1] + futures: seq[Future[SignatureResult]] + registrations: seq[SignedValidatorRegistrationV1] + total = vc.attachedValidators[].count() + succeed = 0 + bad = 0 + errors = 0 + indexMissing = 0 + feeMissing = 0 + cached = 0 + timed = 0 + + for validator in vc.attachedValidators[].items(): + let res = vc.getValidatorRegistraion(validator, timestamp, fork) + if res.isOk(): + let preg = res.get() + if preg.future.isNil(): + registrations.add(preg.registration) + else: + messages.add(preg.registration) + futures.add(preg.future) + else: + case res.error() + of RegistrationKind.Cached: inc(cached) + of RegistrationKind.IncorrectTime: inc(timed) + of RegistrationKind.NoSignature: inc(bad) + of RegistrationKind.ErrorSignature: inc(errors) + of RegistrationKind.MissingIndex: inc(indexMissing) + of RegistrationKind.MissingFee: inc(feeMissing) + + succeed = len(registrations) + + if len(futures) > 0: + await allFutures(futures) + + for index, future in futures.pairs(): + if future.done(): + let sres = future.read() + if sres.isOk(): + var reg = messages[index] + reg.signature = sres.get() + registrations.add(reg) + # Updating cache table + vc.validatorsRegCache[reg.message.pubkey] = reg + inc(succeed) + else: + inc(bad) + else: + inc(errors) + + debug "Validator registrations prepared", total = total, succeed = succeed, + cached = cached, bad = bad, errors = errors, + index_missing = indexMissing, fee_missing = feeMissing, + incorrect_time = timed + + return registrations diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index ee206d2556..ec267a5e8a 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -17,7 +17,7 @@ logScope: service = ServiceName type DutiesServiceLoop* = enum AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop, - ProposerPreparationLoop + ProposerPreparationLoop, ValidatorRegisterLoop chronicles.formatIt(DutiesServiceLoop): case it @@ -26,6 +26,7 @@ chronicles.formatIt(DutiesServiceLoop): of IndicesLoop: "index_loop" of SyncCommitteeLoop: "sync_committee_loop" of ProposerPreparationLoop: "proposer_prepare_loop" + of ValidatorRegisterLoop: "validator_register_loop" proc checkDuty(duty: RestAttesterDuty): bool = (duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and @@ -551,6 +552,54 @@ proc prepareBeaconProposers*(service: DutiesServiceRef) {.async.} = proposers_count = len(proposers), prepared_count = count +proc registerValidators*(service: DutiesServiceRef) {.async.} = + let vc = service.client + let sres = vc.getCurrentSlot() + + var default: seq[SignedValidatorRegistrationV1] + if sres.isSome(): + let + genesisFork = vc.forks[0] + currentSlot = sres.get() + registrations = + try: + await vc.prepareRegistrationList(getTime(), genesisFork) + except CancelledError as exc: + debug "Validator registration preparation was interrupted", + slot = currentSlot, fork = genesisFork + raise exc + except CatchableError as exc: + error "Unexpected error occured while preparing validators " & + "registration data", slot = currentSlot, fork = genesisFork, + err_name = exc.name, err_msg = exc.msg + default + + let count = + if len(registrations) > 0: + try: + await registerValidator(vc, registrations) + except ValidatorApiError as exc: + warn "Unable to register validators", slot = currentSlot, + fork = genesisFork, err_name = exc.name, + err_msg = exc.msg + 0 + except CancelledError as exc: + debug "Validator registration was interrupted", slot = currentSlot, + fork = genesisFork + raise exc + except CatchableError as exc: + error "Unexpected error occured while registering validators", + slot = currentSlot, fork = genesisFork, err_name = exc.name, + err_msg = exc.msg + 0 + else: + 0 + + if count > 0: + debug "Validators registered", slot = currentSlot, + beacon_nodes_count = count, registrations = len(registrations), + validators_count = vc.attachedValidators[].count() + proc waitForNextSlot(service: DutiesServiceRef, serviceLoop: DutiesServiceLoop) {.async.} = let vc = service.client @@ -585,12 +634,23 @@ proc validatorIndexLoop(service: DutiesServiceRef) {.async.} = proc proposerPreparationsLoop(service: DutiesServiceRef) {.async.} = let vc = service.client + debug "Beacon proposer preparation loop waiting for validator indices update" await vc.indicesAvailable.wait() while true: await service.prepareBeaconProposers() await service.waitForNextSlot(ProposerPreparationLoop) +proc validatorRegisterLoop(service: DutiesServiceRef) {.async.} = + let vc = service.client + doAssert(vc.config.payloadBuilderEnable) + + debug "Validator registration loop is waiting for initialization" + await allFutures(vc.indicesAvailable.wait(), vc.forksAvailable.wait()) + while true: + await service.registerValidators() + await service.waitForNextSlot(ValidatorRegisterLoop) + proc syncCommitteeDutiesLoop(service: DutiesServiceRef) {.async.} = let vc = service.client @@ -616,6 +676,8 @@ template checkAndRestart(serviceLoop: DutiesServiceLoop, future = body proc mainLoop(service: DutiesServiceRef) {.async.} = + let vc = service.client + service.state = ServiceState.Running debug "Service started" @@ -625,6 +687,11 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = indicesFut = service.validatorIndexLoop() syncFut = service.syncCommitteeDutiesLoop() prepareFut = service.proposerPreparationsLoop() + registerFut = + if vc.config.payloadBuilderEnable: + service.validatorRegisterLoop() + else: + nil while true: # This loop could look much more nicer/better, when @@ -632,8 +699,11 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = # become safe to combine loops, breaks and exception handlers. let breakLoop = try: - discard await race(attestFut, proposeFut, indicesFut, syncFut, - prepareFut) + var futures = @[FutureBase(attestFut), FutureBase(proposeFut), + FutureBase(indicesFut), FutureBase(syncFut), + FutureBase(prepareFut)] + if not(isNil(registerFut)): futures.add(FutureBase(registerFut)) + discard await race(futures) checkAndRestart(AttesterLoop, attestFut, service.attesterDutiesLoop()) checkAndRestart(ProposerLoop, proposeFut, service.proposerDutiesLoop()) checkAndRestart(IndicesLoop, indicesFut, service.validatorIndexLoop()) @@ -641,6 +711,9 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = service.syncCommitteeDutiesLoop()) checkAndRestart(ProposerPreparationLoop, prepareFut, service.proposerPreparationsLoop()) + if not(isNil(registerFut)): + checkAndRestart(ValidatorRegisterLoop, registerFut, + service.validatorRegisterLoop()) false except CancelledError: debug "Service interrupted" @@ -655,6 +728,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = pending.add(syncFut.cancelAndWait()) if not(prepareFut.finished()): pending.add(prepareFut.cancelAndWait()) + if not(isNil(registerFut)) and not(registerFut.finished()): + pending.add(registerFut.cancelAndWait()) await allFutures(pending) true except CatchableError as exc: