Skip to content

Commit

Permalink
restore doppelganger check on connectivity loss
Browse files Browse the repository at this point in the history
#4398 introduced a
regression in functionality where doppelganger detection would not be
rerun during connectivity loss. This PR reintroduces this check and
makes some adjustments to the implementation to simplify the code flow
for both BN and VC.

* track when check was last performed for each validator (to deal with
late-added validators)
* track when we performed a doppel-detectable activity (attesting) so as
to avoid false positives
* remove nodeStart special case (this should be treated the same as
adding a validator dynamically just after startup)
  • Loading branch information
arnetheduck committed Feb 13, 2023
1 parent aee19fe commit 93e4c02
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 136 deletions.
35 changes: 10 additions & 25 deletions beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ type
## of gossip interleaving between nodes so long as they don't gossip at
## the same time.

nodeLaunchSlot*: Slot ##\
## Set once, at node launch. This functions as a basic protection against
## false positives from attestations persisting within the gossip network
## across quick restarts.

Eth2Processor* = object
## The Eth2Processor is the entry point for untrusted message processing -
## when we receive messages from various sources, we pass them to the
Expand Down Expand Up @@ -164,7 +159,6 @@ proc new*(T: type Eth2Processor,
(ref Eth2Processor)(
doppelgangerDetectionEnabled: doppelgangerDetectionEnabled,
doppelgangerDetection: DoppelgangerProtection(
nodeLaunchSlot: getBeaconTime().slotOrZero,
broadcastStartEpoch: FAR_FUTURE_EPOCH),
blockProcessor: blockProcessor,
validatorMonitor: validatorMonitor,
Expand Down Expand Up @@ -263,8 +257,7 @@ proc setupDoppelgangerDetection*(self: var Eth2Processor, slot: Slot) =
if self.doppelgangerDetectionEnabled:
notice "Setting up doppelganger detection",
epoch = slot.epoch,
broadcast_epoch = self.doppelgangerDetection.broadcastStartEpoch,
nodestart_epoch = self.doppelgangerDetection.nodeLaunchSlot.epoch()
broadcast_epoch = self.doppelgangerDetection.broadcastStartEpoch

proc clearDoppelgangerProtection*(self: var Eth2Processor) =
self.doppelgangerDetection.broadcastStartEpoch = FAR_FUTURE_EPOCH
Expand All @@ -278,25 +271,17 @@ proc checkForPotentialDoppelganger(
if not self.doppelgangerDetectionEnabled:
return

if attestation.data.slot <= self.doppelgangerDetection.nodeLaunchSlot + 1:
return

let broadcastStartEpoch = self.doppelgangerDetection.broadcastStartEpoch

for validatorIndex in attesterIndices:
let
validatorPubkey = self.dag.validatorKey(validatorIndex).get().toPubKey()
validator = self.validatorPool[].getValidator(validatorPubkey)

if not(isNil(validator)):
if validator.triggersDoppelganger(broadcastStartEpoch):
warn "Doppelganger attestation",
validator = shortLog(validator),
validator_index = validatorIndex,
activation_epoch = validator.activationEpoch,
broadcast_epoch = broadcastStartEpoch,
attestation = shortLog(attestation)
quitDoppelganger()
pubkey = self.dag.validatorKey(validatorIndex).get().toPubKey()

if self.validatorPool[].triggersDoppelganger(
pubkey, attestation.data.slot.epoch):
warn "Doppelganger attestation",
validator = shortLog(pubkey),
validator_index = validatorIndex,
attestation = shortLog(attestation)
quitDoppelganger()

proc processAttestation*(
self: ref Eth2Processor, src: MsgSource,
Expand Down
6 changes: 3 additions & 3 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ proc updateSyncCommitteeTopics(node: BeaconNode, slot: Slot) =

node.network.updateSyncnetsMetadata(syncnets)

proc updateDoppelganger(node: BeaconNode, epoch: Epoch) =
proc doppelgangerChecked(node: BeaconNode, epoch: Epoch) =
if not node.processor[].doppelgangerDetectionEnabled:
return

Expand All @@ -989,7 +989,7 @@ proc updateDoppelganger(node: BeaconNode, epoch: Epoch) =
# active
if epoch > node.processor[].doppelgangerDetection.broadcastStartEpoch:
for validator in node.attachedValidators[]:
validator.updateDoppelganger(epoch - 1)
validator.doppelgangerChecked(epoch - 1)

proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
## Subscribe to subnets that we are providing stability for or aggregating
Expand Down Expand Up @@ -1104,7 +1104,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
addMessageHandlers[gossipFork](node, forkDigests[gossipFork], slot)

node.gossipState = targetGossipState
node.updateDoppelganger(slot.epoch)
node.doppelgangerChecked(slot.epoch)
node.updateAttestationSubnetHandlers(slot)
node.updateBlocksGossipStatus(slot, isBehind)
node.updateLightClientGossipStatus(slot, isBehind)
Expand Down
3 changes: 2 additions & 1 deletion beacon_chain/validator_client/attestation_service.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type
proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
duty: DutyAndProof): Future[bool] {.async.} =
let vc = service.client
let validator = vc.getValidatorForDuties(duty.data.pubkey, adata.slot).valueOr:
let validator = vc.getValidatorForDuties(
duty.data.pubkey, adata.slot, true).valueOr:
return false
let fork = vc.forkAtEpoch(adata.slot.epoch)

Expand Down
5 changes: 3 additions & 2 deletions beacon_chain/validator_client/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,9 @@ proc getDelay*(vc: ValidatorClientRef, deadline: BeaconTime): TimeDiff =
vc.beaconClock.now() - deadline

proc getValidatorForDuties*(vc: ValidatorClientRef,
key: ValidatorPubKey, slot: Slot): Opt[AttachedValidator] =
vc.attachedValidators[].getValidatorForDuties(key, slot)
key: ValidatorPubKey, slot: Slot,
doppelActivity = false): Opt[AttachedValidator] =
vc.attachedValidators[].getValidatorForDuties(key, slot, doppelActivity)

proc forkAtEpoch*(vc: ValidatorClientRef, epoch: Epoch): Fork =
# If schedule is present, it MUST not be empty.
Expand Down
14 changes: 7 additions & 7 deletions beacon_chain/validator_client/doppelganger_service.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ logScope: service = ServiceName
proc getCheckingList*(vc: ValidatorClientRef, epoch: Epoch): seq[ValidatorIndex] =
var res: seq[ValidatorIndex]
for validator in vc.attachedValidators[]:
if validator.index.isSome and validator.triggersDoppelganger(epoch):
if validator.index.isSome and
(validator.doppelCheck.isNone or validator.doppelCheck.get() < epoch):
res.add validator.index.get()
res

Expand All @@ -36,12 +37,11 @@ proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch,
let vindex = item.index
for validator in vc.attachedValidators[]:
if validator.index == Opt.some(vindex):
if item.is_live:
if validator.triggersDoppelganger(epoch):
vc.doppelExit.fire()
return
else:
validator.updateDoppelganger(epoch)
validator.doppelgangerChecked(epoch)

if item.is_live and validator.triggersDoppelganger(epoch):
vc.doppelExit.fire()
return

proc mainLoop(service: DoppelgangerServiceRef) {.async.} =
let vc = service.client
Expand Down
9 changes: 6 additions & 3 deletions beacon_chain/validators/validator_duties.nim
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ proc addValidators*(node: BeaconNode) =

proc getValidatorForDuties*(
node: BeaconNode,
idx: ValidatorIndex, slot: Slot): Opt[AttachedValidator] =
idx: ValidatorIndex, slot: Slot,
doppelActivity = false): Opt[AttachedValidator] =
let key = ? node.dag.validatorKey(idx)

node.attachedValidators[].getValidatorForDuties(key.toPubKey(), slot)
node.attachedValidators[].getValidatorForDuties(
key.toPubKey(), slot, doppelActivity)

proc isSynced*(node: BeaconNode, head: BlockRef): SyncStatus =
## TODO This function is here as a placeholder for some better heurestics to
Expand Down Expand Up @@ -1014,7 +1016,8 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
epochRef.shufflingRef, slot, committee_index)

for index_in_committee, validator_index in committee:
let validator = node.getValidatorForDuties(validator_index, slot).valueOr:
let validator = node.getValidatorForDuties(
validator_index, slot, true).valueOr:
continue

let
Expand Down
139 changes: 74 additions & 65 deletions beacon_chain/validators/validator_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ export

const
WEB3_SIGNER_DELAY_TOLERANCE = 3.seconds
DOPPELGANGER_EPOCHS_COUNT = 1
## The number of full epochs that we monitor validators for doppelganger
## protection

declareGauge validators,
"Number of validators attached to the beacon node"

logScope: topics = "val_pool"

type
ValidatorKind* {.pure.} = enum
Local, Remote
Expand All @@ -44,9 +43,6 @@ type
index*: ValidatorIndex
validator*: Validator

DoppelgangerStatus {.pure.} = enum
Unknown, Checking, Checked

AttachedValidator* = ref object
data*: KeystoreData
case kind*: ValidatorKind
Expand Down Expand Up @@ -76,9 +72,10 @@ type
# builder should be informed of current validators
externalBuilderRegistration*: Opt[SignedValidatorRegistrationV1]

doppelStatus: DoppelgangerStatus
doppelEpoch*: Opt[Epoch]
## The epoch where doppelganger detection started doing its monitoring
doppelCheck*: Opt[Epoch]
## The epoch where doppelganger detection last performed a check
doppelActivity*: Opt[Epoch]
## The last time we attempted to perform a duty with this validator

lastWarning*: Opt[Slot]

Expand Down Expand Up @@ -246,14 +243,6 @@ proc updateValidator*(

validator.activationEpoch = activationEpoch

if validator.doppelStatus == DoppelgangerStatus.Unknown:
if validator.doppelEpoch.isSome() and activationEpoch != FAR_FUTURE_EPOCH:
let doppelEpoch = validator.doppelEpoch.get()
if doppelEpoch >= validator.activationEpoch + DOPPELGANGER_EPOCHS_COUNT:
validator.doppelStatus = DoppelgangerStatus.Checking
else:
validator.doppelStatus = DoppelgangerStatus.Checked

proc close*(pool: var ValidatorPool) =
## Unlock and close all validator keystore's files managed by ``pool``.
for validator in pool.validators.values():
Expand All @@ -275,52 +264,61 @@ iterator items*(pool: ValidatorPool): AttachedValidator =
for item in pool.validators.values():
yield item

proc triggersDoppelganger*(v: AttachedValidator, epoch: Epoch): bool =
## Returns true iff detected activity in the given epoch would trigger
## doppelganger detection
if v.doppelStatus != DoppelgangerStatus.Checked:
if v.activationEpoch == FAR_FUTURE_EPOCH:
false
elif epoch < v.activationEpoch + DOPPELGANGER_EPOCHS_COUNT:
v.doppelStatus = DoppelgangerStatus.Checked
false
else:
true
proc doppelgangerChecked*(validator: AttachedValidator, epoch: Epoch) =
## Call when the validator was checked for activity in the given epoch

if validator.doppelCheck.isNone():
debug "Doppelganger first check",
validator = shortLog(validator), epoch
elif validator.doppelCheck.get() + 1 != epoch:
debug "Doppelganger stale check",
validator = shortLog(validator),
checked = validator.doppelCheck.get(), epoch

validator.doppelCheck = Opt.some epoch

proc doppelgangerActivity*(validator: AttachedValidator, epoch: Epoch) =
## Call when we performed a doppelganger-monitored activity in the epoch
if validator.doppelActivity.isNone():
debug "Doppelganger first activity",
validator = shortLog(validator), epoch
elif validator.doppelActivity.get() + 1 != epoch:
debug "Doppelganger stale activity",
validator = shortLog(validator),
checked = validator.doppelActivity.get(), epoch

validator.doppelActivity = Opt.some epoch

func triggersDoppelganger*(v: AttachedValidator, epoch: Epoch): bool =
## Returns true iff we have proof that an activity in the given epoch
## triggers doppelganger detection: this means the network was active for this
## validator during the given epoch (via doppelgangerChecked) but the activity
## did not originate from this instance.

if v.doppelActivity.isSome() and v.doppelActivity.get() >= epoch:
false # This was our own activity
elif v.doppelCheck.isNone():
false # Can't prove that the activity triggers the check
else:
false

proc updateDoppelganger*(validator: AttachedValidator, epoch: Epoch) =
## Called when the validator has proven to be inactive in the given epoch -
## this call should be made after the end of `epoch` before acting on duties
## in `epoch + 1`.

if validator.doppelStatus == DoppelgangerStatus.Checked:
return

if validator.doppelEpoch.isNone():
validator.doppelEpoch = Opt.some epoch

let doppelEpoch = validator.doppelEpoch.get()

if validator.doppelStatus == DoppelgangerStatus.Unknown:
if validator.activationEpoch == FAR_FUTURE_EPOCH:
return

# We don't do doppelganger checking for validators that are about to be
# activated since both clients would be waiting for the other to start
# performing duties - this accounts for genesis as well
# The slot is rounded up to ensure we cover all slots
if doppelEpoch + 1 <= validator.activationEpoch + DOPPELGANGER_EPOCHS_COUNT:
validator.doppelStatus = DoppelgangerStatus.Checked
return

validator.doppelStatus = DoppelgangerStatus.Checking

if epoch + 1 >= doppelEpoch + DOPPELGANGER_EPOCHS_COUNT:
validator.doppelStatus = DoppelgangerStatus.Checked
v.doppelCheck.get() == epoch

proc doppelgangerReady*(validator: AttachedValidator, slot: Slot): bool =
## Returns true iff the validator has passed doppelganger detection by being
## monitored in the previous epoch (or the given epoch is the activation
## epoch, in which case we always consider it ready)
##
## If we checked doppelganger, we allow the check to lag by one slot to avoid
## a race condition where the check for epoch N is ongoing and block
## block production for slot_start(N+1) is about to happen
let epoch = slot.epoch
epoch == validator.activationEpoch or
(validator.doppelCheck.isSome and
(validator.doppelCheck.get() + 1 == epoch or
((validator.doppelCheck.get() + 2).start_slot) == slot))

proc getValidatorForDuties*(
pool: ValidatorPool, key: ValidatorPubKey, slot: Slot):
pool: ValidatorPool, key: ValidatorPubKey, slot: Slot,
doppelActivity = false):
Opt[AttachedValidator] =
## Return validator only if it is ready for duties (has index and has passed
## doppelganger check where applicable)
Expand All @@ -329,16 +327,27 @@ proc getValidatorForDuties*(
return Opt.none(AttachedValidator)

if pool.doppelgangerDetectionEnabled and
validator.triggersDoppelganger(slot.epoch):
# If the validator would trigger for an activity in the given slot, we don't
# return it for duties
not validator.doppelgangerReady(slot):
notice "Doppelganger detection active - " &
"skipping validator duties while observing the network",
validator = shortLog(validator)
"skipping validator duties while observing the network",
validator = shortLog(validator),
doppelCheck = validator.doppelCheck,
activationEpoch = shortLog(validator.activationEpoch)

return Opt.none(AttachedValidator)

if doppelActivity:
# Record the activity
# TODO consider moving to the the "registration point"
validator.doppelgangerActivity(slot.epoch)

return Opt.some(validator)

func triggersDoppelganger*(
pool: ValidatorPool, pubkey: ValidatorPubKey, epoch: Epoch): bool =
let v = pool.getValidator(pubkey)
v != nil and v.triggersDoppelganger(epoch)

proc signWithDistributedKey(v: AttachedValidator,
request: Web3SignerRequest): Future[SignatureResult]
{.async.} =
Expand Down
Loading

0 comments on commit 93e4c02

Please sign in to comment.