Skip to content

Commit

Permalink
restore doppelganger check on connectivity loss (#4616)
Browse files Browse the repository at this point in the history
* restore doppelganger check on connectivity loss

#4398 introduced a
regression in functionality where doppelganger detection would not be
rerun during connectivity loss. This PR reintroduces this check and
makes some adjustments to the implementation to simplify the code flow
for both BN and VC.

* track when check was last performed for each validator (to deal with
late-added validators)
* track when we performed a doppel-detectable activity (attesting) so as
to avoid false positives
* remove nodeStart special case (this should be treated the same as
adding a validator dynamically just after startup)

* allow sync committee duties in doppelganger period

* don't trigger doppelganger when registering duties

* fix crash when expected index response is missing

* fix missing slashingSafe propagation
  • Loading branch information
arnetheduck authored Feb 20, 2023
1 parent 97247ea commit 83f9745
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 215 deletions.
9 changes: 2 additions & 7 deletions AllTests-mainnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -527,15 +527,10 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 6/6 Fail: 0/6 Skip: 0/6
## Validator pool
```diff
+ Activation after check OK
+ Doppelganger for already active validator OK
+ Doppelganger for genesis validator OK
+ Doppelganger for validator that activates in future epoch OK
+ Doppelganger for validator that activates in previous epoch OK
+ Doppelganger for validator that activates in same epoch as check OK
+ Future activation after check OK
```
OK: 7/7 Fail: 0/7 Skip: 0/7
OK: 2/2 Fail: 0/2 Skip: 0/2
## Zero signature sanity checks
```diff
+ SSZ serialization roundtrip of SignedBeaconBlockHeader OK
Expand Down Expand Up @@ -630,4 +625,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
OK: 9/9 Fail: 0/9 Skip: 0/9

---TOTAL---
OK: 351/356 Fail: 0/356 Skip: 5/356
OK: 346/351 Fail: 0/351 Skip: 5/351
35 changes: 10 additions & 25 deletions beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ type
## of gossip interleaving between nodes so long as they don't gossip at
## the same time.

nodeLaunchSlot*: Slot ##\
## Set once, at node launch. This functions as a basic protection against
## false positives from attestations persisting within the gossip network
## across quick restarts.

Eth2Processor* = object
## The Eth2Processor is the entry point for untrusted message processing -
## when we receive messages from various sources, we pass them to the
Expand Down Expand Up @@ -164,7 +159,6 @@ proc new*(T: type Eth2Processor,
(ref Eth2Processor)(
doppelgangerDetectionEnabled: doppelgangerDetectionEnabled,
doppelgangerDetection: DoppelgangerProtection(
nodeLaunchSlot: getBeaconTime().slotOrZero,
broadcastStartEpoch: FAR_FUTURE_EPOCH),
blockProcessor: blockProcessor,
validatorMonitor: validatorMonitor,
Expand Down Expand Up @@ -263,8 +257,7 @@ proc setupDoppelgangerDetection*(self: var Eth2Processor, slot: Slot) =
if self.doppelgangerDetectionEnabled:
notice "Setting up doppelganger detection",
epoch = slot.epoch,
broadcast_epoch = self.doppelgangerDetection.broadcastStartEpoch,
nodestart_epoch = self.doppelgangerDetection.nodeLaunchSlot.epoch()
broadcast_epoch = self.doppelgangerDetection.broadcastStartEpoch

proc clearDoppelgangerProtection*(self: var Eth2Processor) =
self.doppelgangerDetection.broadcastStartEpoch = FAR_FUTURE_EPOCH
Expand All @@ -278,25 +271,17 @@ proc checkForPotentialDoppelganger(
if not self.doppelgangerDetectionEnabled:
return

if attestation.data.slot <= self.doppelgangerDetection.nodeLaunchSlot + 1:
return

let broadcastStartEpoch = self.doppelgangerDetection.broadcastStartEpoch

for validatorIndex in attesterIndices:
let
validatorPubkey = self.dag.validatorKey(validatorIndex).get().toPubKey()
validator = self.validatorPool[].getValidator(validatorPubkey)

if not(isNil(validator)):
if validator.triggersDoppelganger(broadcastStartEpoch):
warn "Doppelganger attestation",
validator = shortLog(validator),
validator_index = validatorIndex,
activation_epoch = validator.activationEpoch,
broadcast_epoch = broadcastStartEpoch,
attestation = shortLog(attestation)
quitDoppelganger()
pubkey = self.dag.validatorKey(validatorIndex).get().toPubKey()

if self.validatorPool[].triggersDoppelganger(
pubkey, attestation.data.slot.epoch):
warn "Doppelganger attestation",
validator = shortLog(pubkey),
validator_index = validatorIndex,
attestation = shortLog(attestation)
quitDoppelganger()

proc processAttestation*(
self: ref Eth2Processor, src: MsgSource,
Expand Down
6 changes: 3 additions & 3 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ proc updateSyncCommitteeTopics(node: BeaconNode, slot: Slot) =

node.network.updateSyncnetsMetadata(syncnets)

proc updateDoppelganger(node: BeaconNode, epoch: Epoch) =
proc doppelgangerChecked(node: BeaconNode, epoch: Epoch) =
if not node.processor[].doppelgangerDetectionEnabled:
return

Expand All @@ -989,7 +989,7 @@ proc updateDoppelganger(node: BeaconNode, epoch: Epoch) =
# active
if epoch > node.processor[].doppelgangerDetection.broadcastStartEpoch:
for validator in node.attachedValidators[]:
validator.updateDoppelganger(epoch - 1)
validator.doppelgangerChecked(epoch - 1)

proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
## Subscribe to subnets that we are providing stability for or aggregating
Expand Down Expand Up @@ -1104,7 +1104,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
addMessageHandlers[gossipFork](node, forkDigests[gossipFork], slot)

node.gossipState = targetGossipState
node.updateDoppelganger(slot.epoch)
node.doppelgangerChecked(slot.epoch)
node.updateAttestationSubnetHandlers(slot)
node.updateBlocksGossipStatus(slot, isBehind)
node.updateLightClientGossipStatus(slot, isBehind)
Expand Down
3 changes: 1 addition & 2 deletions beacon_chain/nimbus_signing_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ proc installApiHandlers*(node: SigningNode) =
if validator_key.isErr():
return errorResponse(Http400, InvalidValidatorPublicKey)
let key = validator_key.get()
let validator = node.attachedValidators.getValidator(key)
if isNil(validator):
let validator = node.attachedValidators.getValidator(key).valueOr:
return errorResponse(Http404, ValidatorNotFoundError)
validator

Expand Down
5 changes: 3 additions & 2 deletions beacon_chain/validator_client/attestation_service.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type
proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
duty: DutyAndProof): Future[bool] {.async.} =
let vc = service.client
let validator = vc.getValidatorForDuties(duty.data.pubkey, adata.slot).valueOr:
let validator = vc.getValidatorForDuties(
duty.data.pubkey, adata.slot, true).valueOr:
return false
let fork = vc.forkAtEpoch(adata.slot.epoch)

Expand Down Expand Up @@ -260,7 +261,7 @@ proc produceAndPublishAggregates(service: AttestationServiceRef,
block:
var res: seq[AggregateItem]
for duty in duties:
let validator = vc.attachedValidators[].getValidatorForDuties(
let validator = vc.getValidatorForDuties(
duty.data.pubkey, slot).valueOr:
continue

Expand Down
40 changes: 22 additions & 18 deletions beacon_chain/validator_client/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,11 @@ proc getDelay*(vc: ValidatorClientRef, deadline: BeaconTime): TimeDiff =
vc.beaconClock.now() - deadline

proc getValidatorForDuties*(vc: ValidatorClientRef,
key: ValidatorPubKey, slot: Slot): Opt[AttachedValidator] =
vc.attachedValidators[].getValidatorForDuties(key, slot)
key: ValidatorPubKey, slot: Slot,
doppelActivity = false,
slashingSafe = false): Opt[AttachedValidator] =
vc.attachedValidators[].getValidatorForDuties(
key, slot, doppelActivity, slashingSafe)

proc forkAtEpoch*(vc: ValidatorClientRef, epoch: Epoch): Fork =
# If schedule is present, it MUST not be empty.
Expand Down Expand Up @@ -518,22 +521,23 @@ proc addValidator*(vc: ValidatorClientRef, keystore: KeystoreData) =

proc removeValidator*(vc: ValidatorClientRef,
pubkey: ValidatorPubKey) {.async.} =
let validator = vc.attachedValidators[].getValidator(pubkey)
if not(isNil(validator)):
case validator.kind
of ValidatorKind.Local:
discard
of ValidatorKind.Remote:
# We must close all the REST clients running for the remote validator.
let pending =
block:
var res: seq[Future[void]]
for item in validator.clients:
res.add(item[0].closeWait())
res
await allFutures(pending)
# Remove validator from ValidatorPool.
vc.attachedValidators[].removeValidator(pubkey)
let validator = vc.attachedValidators[].getValidator(pubkey).valueOr:
return
# Remove validator from ValidatorPool.
vc.attachedValidators[].removeValidator(pubkey)

case validator.kind
of ValidatorKind.Local:
discard
of ValidatorKind.Remote:
# We must close all the REST clients running for the remote validator.
let pending =
block:
var res: seq[Future[void]]
for item in validator.clients:
res.add(item[0].closeWait())
res
await allFutures(pending)

proc getFeeRecipient*(vc: ValidatorClientRef, pubkey: ValidatorPubKey,
validatorIdx: ValidatorIndex,
Expand Down
14 changes: 7 additions & 7 deletions beacon_chain/validator_client/doppelganger_service.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ logScope: service = ServiceName
proc getCheckingList*(vc: ValidatorClientRef, epoch: Epoch): seq[ValidatorIndex] =
var res: seq[ValidatorIndex]
for validator in vc.attachedValidators[]:
if validator.index.isSome and validator.triggersDoppelganger(epoch):
if validator.index.isSome and
(validator.doppelCheck.isNone or validator.doppelCheck.get() < epoch):
res.add validator.index.get()
res

Expand All @@ -36,12 +37,11 @@ proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch,
let vindex = item.index
for validator in vc.attachedValidators[]:
if validator.index == Opt.some(vindex):
if item.is_live:
if validator.triggersDoppelganger(epoch):
vc.doppelExit.fire()
return
else:
validator.updateDoppelganger(epoch)
validator.doppelgangerChecked(epoch)

if item.is_live and validator.triggersDoppelganger(epoch):
vc.doppelExit.fire()
return

proc mainLoop(service: DoppelgangerServiceRef) {.async.} =
let vc = service.client
Expand Down
12 changes: 7 additions & 5 deletions beacon_chain/validator_client/duties_service.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
list: seq[AttachedValidator]

for item in validators:
var validator = vc.attachedValidators[].getValidator(item.validator.pubkey)
if isNil(validator):
let validator = vc.attachedValidators[].getValidator(item.validator.pubkey)
if validator.isNone():
missing.add(validatorLog(item.validator.pubkey, item.index))
else:
validator.updateValidator(Opt.some ValidatorAndIndex(
validator.get().updateValidator(Opt.some ValidatorAndIndex(
index: item.index,
validator: item.validator))
updated.add(validatorLog(item.validator.pubkey, item.index))
list.add(validator)
list.add(validator.get())

if len(updated) > 0:
info "Validator indices updated",
Expand Down Expand Up @@ -198,7 +198,9 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
var pendingRequests: seq[Future[SignatureResult]]
var validators: seq[AttachedValidator]
for item in addOrReplaceItems:
let validator = vc.attachedValidators[].getValidator(item.duty.pubkey)
let validator =
vc.attachedValidators[].getValidator(item.duty.pubkey).valueOr:
continue
let fork = vc.forkAtEpoch(item.duty.slot.epoch)
let future = validator.getSlotSignature(
fork, genesisRoot, item.duty.slot)
Expand Down
8 changes: 4 additions & 4 deletions beacon_chain/validator_client/sync_committee_service.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
vindex = duty.validator_index
subcommitteeIdx = getSubcommitteeIndex(
duty.validator_sync_committee_index)
validator = vc.getValidatorForDuties(duty.pubkey, slot).valueOr: return false
validator = vc.getValidatorForDuties(
duty.pubkey, slot, slashingSafe = true).valueOr: return false
message =
block:
let res = await getSyncCommitteeMessage(validator, fork,
Expand Down Expand Up @@ -212,10 +213,9 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
var validators: seq[(AttachedValidator, SyncSubcommitteeIndex)]

for duty in duties:
let validator = vc.attachedValidators[].getValidatorForDuties(
duty.pubkey, slot).valueOr:
continue
let
validator = vc.getValidatorForDuties(duty.pubkey, slot).valueOr:
continue
subCommitteeIdx =
getSubcommitteeIndex(duty.validator_sync_committee_index)
future = validator.getSyncCommitteeSelectionProof(
Expand Down
3 changes: 1 addition & 2 deletions beacon_chain/validators/keystore_management.nim
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,7 @@ proc removeValidator*(pool: var ValidatorPool,
publicKey: ValidatorPubKey,
kind: KeystoreKind): KmResult[RemoveValidatorStatus] {.
raises: [Defect].} =
let validator = pool.getValidator(publicKey)
if isNil(validator):
let validator = pool.getValidator(publicKey).valueOr:
return ok(RemoveValidatorStatus.notFound)
if validator.kind.toKeystoreKind() != kind:
return ok(RemoveValidatorStatus.notFound)
Expand Down
31 changes: 21 additions & 10 deletions beacon_chain/validators/validator_duties.nim
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,18 @@ proc addValidators*(node: BeaconNode) =
v = node.attachedValidators[].addValidator(keystore, feeRecipient, gasLimit)
v.updateValidator(data)

proc getValidator*(node: BeaconNode, idx: ValidatorIndex): Opt[AttachedValidator] =
let key = ? node.dag.validatorKey(idx)
node.attachedValidators[].getValidator(key.toPubKey())

proc getValidatorForDuties*(
node: BeaconNode,
idx: ValidatorIndex, slot: Slot): Opt[AttachedValidator] =
idx: ValidatorIndex, slot: Slot,
doppelActivity = false, slashingSafe = false): Opt[AttachedValidator] =
let key = ? node.dag.validatorKey(idx)

node.attachedValidators[].getValidatorForDuties(key.toPubKey(), slot)
node.attachedValidators[].getValidatorForDuties(
key.toPubKey(), slot, doppelActivity, slashingSafe)

proc isSynced*(node: BeaconNode, head: BlockRef): SyncStatus =
## TODO This function is here as a placeholder for some better heurestics to
Expand Down Expand Up @@ -1036,7 +1042,8 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
epochRef.shufflingRef, slot, committee_index)

for index_in_committee, validator_index in committee:
let validator = node.getValidatorForDuties(validator_index, slot).valueOr:
let validator = node.getValidatorForDuties(
validator_index, slot, true).valueOr:
continue

let
Expand Down Expand Up @@ -1107,7 +1114,8 @@ proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) =

for subcommitteeIdx in SyncSubcommitteeIndex:
for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx):
let validator = node.getValidatorForDuties(valIdx, slot).valueOr:
let validator = node.getValidatorForDuties(
valIdx, slot, slashingSafe = true).valueOr:
continue
asyncSpawn createAndSendSyncCommitteeMessage(node, validator, slot,
subcommitteeIdx, head)
Expand Down Expand Up @@ -1174,7 +1182,8 @@ proc handleSyncCommitteeContributions(

for subcommitteeIdx in SyncSubCommitteeIndex:
for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx):
let validator = node.getValidatorForDuties(valIdx, slot).valueOr:
let validator = node.getValidatorForDuties(
valIdx, slot, slashingSafe = true).valueOr:
continue

asyncSpawn signAndSendContribution(
Expand Down Expand Up @@ -1475,9 +1484,10 @@ proc updateValidators(
# checking all validators would significantly slow down this loop when there
# are many inactive keys
for i in node.dutyValidatorCount..validators.high:
let v = node.attachedValidators[].getValidator(validators[i].pubkey)
if v != nil:
v.index = Opt.some ValidatorIndex(i)
let
v = node.attachedValidators[].getValidator(validators[i].pubkey).valueOr:
continue
v.index = Opt.some ValidatorIndex(i)

node.dutyValidatorCount = validators.len

Expand Down Expand Up @@ -1656,9 +1666,10 @@ proc registerDuties*(node: BeaconNode, wallSlot: Slot) {.async.} =
let committee = get_beacon_committee(shufflingRef, slot, committee_index)

for index_in_committee, validator_index in committee:
let validator = node.getValidatorForDuties(validator_index, slot).valueOr:
continue
let
validator = node.getValidator(validator_index).valueOr:
continue

subnet_id = compute_subnet_for_attestation(
committees_per_slot, slot, committee_index)
slotSigRes = await validator.getSlotSignature(
Expand Down
Loading

0 comments on commit 83f9745

Please sign in to comment.