-
Notifications
You must be signed in to change notification settings - Fork 736
/
lib.rs
2074 lines (1833 loc) · 80.3 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
mod attestation;
mod attestation_id;
mod attestation_storage;
mod attester_slashing;
mod bls_to_execution_changes;
mod max_cover;
mod metrics;
mod persistence;
mod reward_cache;
mod sync_aggregate_id;
pub use crate::bls_to_execution_changes::ReceivedPreCapella;
pub use attestation::{earliest_attestation_validators, AttMaxCover};
pub use attestation_storage::{AttestationRef, SplitAttestation};
pub use max_cover::MaxCover;
pub use persistence::{
PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV14,
PersistedOperationPoolV15, PersistedOperationPoolV5,
};
pub use reward_cache::RewardCache;
use crate::attestation_storage::{AttestationMap, CheckpointKey};
use crate::bls_to_execution_changes::BlsToExecutionChanges;
use crate::sync_aggregate_id::SyncAggregateId;
use attester_slashing::AttesterSlashingMaxCover;
use max_cover::maximum_cover;
use parking_lot::{RwLock, RwLockWriteGuard};
use rand::seq::SliceRandom;
use rand::thread_rng;
use state_processing::per_block_processing::errors::AttestationValidationError;
use state_processing::per_block_processing::{
get_slashable_indices_modular, verify_exit, VerifySignatures,
};
use state_processing::{SigVerifiedOp, VerifyOperation};
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::marker::PhantomData;
use std::ptr;
use types::{
sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, AbstractExecPayload,
Attestation, AttestationData, AttesterSlashing, BeaconState, BeaconStateError, ChainSpec,
Epoch, EthSpec, ProposerSlashing, SignedBeaconBlock, SignedBlsToExecutionChange,
SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution, Validator,
};
type SyncContributions<T> = RwLock<HashMap<SyncAggregateId, Vec<SyncCommitteeContribution<T>>>>;
#[derive(Default, Debug)]
pub struct OperationPool<T: EthSpec + Default> {
/// Map from attestation ID (see below) to vectors of attestations.
attestations: RwLock<AttestationMap<T>>,
/// Map from sync aggregate ID to the best `SyncCommitteeContribution`s seen for that ID.
sync_contributions: SyncContributions<T>,
/// Set of attester slashings, and the fork version they were verified against.
attester_slashings: RwLock<HashSet<SigVerifiedOp<AttesterSlashing<T>, T>>>,
/// Map from proposer index to slashing.
proposer_slashings: RwLock<HashMap<u64, SigVerifiedOp<ProposerSlashing, T>>>,
/// Map from exiting validator to their exit data.
voluntary_exits: RwLock<HashMap<u64, SigVerifiedOp<SignedVoluntaryExit, T>>>,
/// Map from credential changing validator to their position in the queue.
bls_to_execution_changes: RwLock<BlsToExecutionChanges<T>>,
/// Reward cache for accelerating attestation packing.
reward_cache: RwLock<RewardCache>,
_phantom: PhantomData<T>,
}
#[derive(Debug, PartialEq)]
pub enum OpPoolError {
GetAttestationsTotalBalanceError(BeaconStateError),
GetBlockRootError(BeaconStateError),
SyncAggregateError(SyncAggregateError),
RewardCacheUpdatePrevEpoch(BeaconStateError),
RewardCacheUpdateCurrEpoch(BeaconStateError),
RewardCacheGetBlockRoot(BeaconStateError),
RewardCacheWrongEpoch,
RewardCacheValidatorUnknown(BeaconStateError),
RewardCacheOutOfBounds,
IncorrectOpPoolVariant,
}
#[derive(Default)]
pub struct AttestationStats {
/// Total number of attestations for all committeees/indices/votes.
pub num_attestations: usize,
/// Number of unique `AttestationData` attested to.
pub num_attestation_data: usize,
/// Maximum number of aggregates for a single `AttestationData`.
pub max_aggregates_per_data: usize,
}
impl From<SyncAggregateError> for OpPoolError {
fn from(e: SyncAggregateError) -> Self {
OpPoolError::SyncAggregateError(e)
}
}
impl<T: EthSpec> OperationPool<T> {
/// Create a new operation pool.
pub fn new() -> Self {
Self::default()
}
/// Insert a sync contribution into the pool. We don't aggregate these contributions until they
/// are retrieved from the pool.
///
/// ## Note
///
/// This function assumes the given `contribution` is valid.
pub fn insert_sync_contribution(
&self,
contribution: SyncCommitteeContribution<T>,
) -> Result<(), OpPoolError> {
let aggregate_id = SyncAggregateId::new(contribution.slot, contribution.beacon_block_root);
let mut contributions = self.sync_contributions.write();
match contributions.entry(aggregate_id) {
Entry::Vacant(entry) => {
// If no contributions exist for the key, insert the given contribution.
entry.insert(vec![contribution]);
}
Entry::Occupied(mut entry) => {
// If contributions exists for this key, check whether there exists a contribution
// with a matching `subcommittee_index`. If one exists, check whether the new or
// old contribution has more aggregation bits set. If the new one does, add it to the
// pool in place of the old one.
let existing_contributions = entry.get_mut();
match existing_contributions
.iter_mut()
.find(|existing_contribution| {
existing_contribution.subcommittee_index == contribution.subcommittee_index
}) {
Some(existing_contribution) => {
// Only need to replace the contribution if the new contribution has more
// bits set.
if existing_contribution.aggregation_bits.num_set_bits()
< contribution.aggregation_bits.num_set_bits()
{
*existing_contribution = contribution;
}
}
None => {
// If there has been no previous sync contribution for this subcommittee index,
// add it to the pool.
existing_contributions.push(contribution);
}
}
}
};
Ok(())
}
/// Calculate the `SyncAggregate` from the sync contributions that exist in the pool for the
/// slot previous to the slot associated with `state`. Return the calculated `SyncAggregate` if
/// contributions exist at this slot, or else `None`.
pub fn get_sync_aggregate(
&self,
state: &BeaconState<T>,
) -> Result<Option<SyncAggregate<T>>, OpPoolError> {
// Sync aggregates are formed from the contributions from the previous slot.
let slot = state.slot().saturating_sub(1u64);
let block_root = *state
.get_block_root(slot)
.map_err(OpPoolError::GetBlockRootError)?;
let id = SyncAggregateId::new(slot, block_root);
self.sync_contributions
.read()
.get(&id)
.map(|contributions| SyncAggregate::from_contributions(contributions))
.transpose()
.map_err(|e| e.into())
}
/// Total number of sync contributions in the pool.
pub fn num_sync_contributions(&self) -> usize {
self.sync_contributions
.read()
.values()
.map(|contributions| contributions.len())
.sum()
}
/// Remove sync contributions which are too old to be included in a block.
pub fn prune_sync_contributions(&self, current_slot: Slot) {
// Prune sync contributions that are from before the previous slot.
self.sync_contributions.write().retain(|_, contributions| {
// All the contributions in this bucket have the same data, so we only need to
// check the first one.
contributions.first().map_or(false, |contribution| {
current_slot <= contribution.slot.saturating_add(Slot::new(1))
})
});
}
/// Insert an attestation into the pool, aggregating it with existing attestations if possible.
///
/// ## Note
///
/// This function assumes the given `attestation` is valid.
pub fn insert_attestation(
&self,
attestation: Attestation<T>,
attesting_indices: Vec<u64>,
) -> Result<(), AttestationValidationError> {
self.attestations
.write()
.insert(attestation, attesting_indices);
Ok(())
}
/// Total number of attestations in the pool, including attestations for the same data.
pub fn num_attestations(&self) -> usize {
self.attestation_stats().num_attestations
}
pub fn attestation_stats(&self) -> AttestationStats {
self.attestations.read().stats()
}
/// Return all valid attestations for the given epoch, for use in max cover.
#[allow(clippy::too_many_arguments)]
fn get_valid_attestations_for_epoch<'a>(
&'a self,
checkpoint_key: &'a CheckpointKey,
all_attestations: &'a AttestationMap<T>,
state: &'a BeaconState<T>,
reward_cache: &'a RewardCache,
total_active_balance: u64,
validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send,
spec: &'a ChainSpec,
) -> impl Iterator<Item = AttMaxCover<'a, T>> + Send {
all_attestations
.get_attestations(checkpoint_key)
.filter(|att| {
att.data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= att.data.slot + T::slots_per_epoch()
})
.filter(validity_filter)
.filter_map(move |att| {
AttMaxCover::new(att, state, reward_cache, total_active_balance, spec)
})
}
/// Get a list of attestations for inclusion in a block.
///
/// The `validity_filter` is a closure that provides extra filtering of the attestations
/// before an approximately optimal bundle is constructed. We use it to provide access
/// to the fork choice data from the `BeaconChain` struct that doesn't logically belong
/// in the operation pool.
pub fn get_attestations(
&self,
state: &BeaconState<T>,
prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send,
curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send,
spec: &ChainSpec,
) -> Result<Vec<Attestation<T>>, OpPoolError> {
// Attestations for the current fork, which may be from the current or previous epoch.
let (prev_epoch_key, curr_epoch_key) = CheckpointKey::keys_for_state(state);
let all_attestations = self.attestations.read();
let total_active_balance = state
.get_total_active_balance()
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
// Update the reward cache.
let reward_timer = metrics::start_timer(&metrics::BUILD_REWARD_CACHE_TIME);
let mut reward_cache = self.reward_cache.write();
reward_cache.update(state)?;
let reward_cache = RwLockWriteGuard::downgrade(reward_cache);
drop(reward_timer);
// Split attestations for the previous & current epochs, so that we
// can optimise them individually in parallel.
let mut num_prev_valid = 0_i64;
let mut num_curr_valid = 0_i64;
let prev_epoch_att = self
.get_valid_attestations_for_epoch(
&prev_epoch_key,
&*all_attestations,
state,
&reward_cache,
total_active_balance,
prev_epoch_validity_filter,
spec,
)
.inspect(|_| num_prev_valid += 1);
let curr_epoch_att = self
.get_valid_attestations_for_epoch(
&curr_epoch_key,
&*all_attestations,
state,
&reward_cache,
total_active_balance,
curr_epoch_validity_filter,
spec,
)
.inspect(|_| num_curr_valid += 1);
let prev_epoch_limit = if let BeaconState::Base(base_state) = state {
std::cmp::min(
T::MaxPendingAttestations::to_usize()
.saturating_sub(base_state.previous_epoch_attestations.len()),
T::MaxAttestations::to_usize(),
)
} else {
T::MaxAttestations::to_usize()
};
let (prev_cover, curr_cover) = rayon::join(
move || {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PREV_EPOCH_PACKING_TIME);
// If we're in the genesis epoch, just use the current epoch attestations.
if prev_epoch_key == curr_epoch_key {
vec![]
} else {
maximum_cover(prev_epoch_att, prev_epoch_limit, "prev_epoch_attestations")
}
},
move || {
let _timer = metrics::start_timer(&metrics::ATTESTATION_CURR_EPOCH_PACKING_TIME);
maximum_cover(
curr_epoch_att,
T::MaxAttestations::to_usize(),
"curr_epoch_attestations",
)
},
);
metrics::set_gauge(&metrics::NUM_PREV_EPOCH_ATTESTATIONS, num_prev_valid);
metrics::set_gauge(&metrics::NUM_CURR_EPOCH_ATTESTATIONS, num_curr_valid);
Ok(max_cover::merge_solutions(
curr_cover,
prev_cover,
T::MaxAttestations::to_usize(),
))
}
/// Remove attestations which are too old to be included in a block.
pub fn prune_attestations(&self, current_epoch: Epoch) {
self.attestations.write().prune(current_epoch);
}
/// Insert a proposer slashing into the pool.
pub fn insert_proposer_slashing(
&self,
verified_proposer_slashing: SigVerifiedOp<ProposerSlashing, T>,
) {
self.proposer_slashings.write().insert(
verified_proposer_slashing.as_inner().proposer_index(),
verified_proposer_slashing,
);
}
/// Insert an attester slashing into the pool.
pub fn insert_attester_slashing(
&self,
verified_slashing: SigVerifiedOp<AttesterSlashing<T>, T>,
) {
self.attester_slashings.write().insert(verified_slashing);
}
/// Get proposer and attester slashings for inclusion in a block.
///
/// This function computes both types of slashings together, because
/// attester slashings may be invalidated by proposer slashings included
/// earlier in the block.
pub fn get_slashings_and_exits(
&self,
state: &BeaconState<T>,
spec: &ChainSpec,
) -> (
Vec<ProposerSlashing>,
Vec<AttesterSlashing<T>>,
Vec<SignedVoluntaryExit>,
) {
let proposer_slashings = filter_limit_operations(
self.proposer_slashings.read().values(),
|slashing| {
slashing.signature_is_still_valid(&state.fork())
&& state
.validators()
.get(slashing.as_inner().signed_header_1.message.proposer_index as usize)
.map_or(false, |validator| !validator.slashed)
},
|slashing| slashing.as_inner().clone(),
T::MaxProposerSlashings::to_usize(),
);
// Set of validators to be slashed, so we don't attempt to construct invalid attester
// slashings.
let mut to_be_slashed = proposer_slashings
.iter()
.map(|s| s.proposer_index())
.collect();
let attester_slashings = self.get_attester_slashings(state, &mut to_be_slashed);
let voluntary_exits = self.get_voluntary_exits(
state,
|exit| !to_be_slashed.contains(&exit.message.validator_index),
spec,
);
(proposer_slashings, attester_slashings, voluntary_exits)
}
/// Get attester slashings taking into account already slashed validators.
///
/// This function *must* remain private.
fn get_attester_slashings(
&self,
state: &BeaconState<T>,
to_be_slashed: &mut HashSet<u64>,
) -> Vec<AttesterSlashing<T>> {
let reader = self.attester_slashings.read();
let relevant_attester_slashings = reader.iter().flat_map(|slashing| {
if slashing.signature_is_still_valid(&state.fork()) {
AttesterSlashingMaxCover::new(slashing.as_inner(), to_be_slashed, state)
} else {
None
}
});
maximum_cover(
relevant_attester_slashings,
T::MaxAttesterSlashings::to_usize(),
"attester_slashings",
)
.into_iter()
.map(|cover| {
to_be_slashed.extend(cover.covering_set().keys());
cover.intermediate().clone()
})
.collect()
}
/// Prune proposer slashings for validators which are exited in the finalized epoch.
pub fn prune_proposer_slashings(&self, head_state: &BeaconState<T>) {
prune_validator_hash_map(
&mut self.proposer_slashings.write(),
|_, validator| validator.exit_epoch <= head_state.finalized_checkpoint().epoch,
head_state,
);
}
/// Prune attester slashings for all slashed or withdrawn validators, or attestations on another
/// fork.
pub fn prune_attester_slashings(&self, head_state: &BeaconState<T>) {
self.attester_slashings.write().retain(|slashing| {
// Check that the attestation's signature is still valid wrt the fork version.
let signature_ok = slashing.signature_is_still_valid(&head_state.fork());
// Slashings that don't slash any validators can also be dropped.
let slashing_ok =
get_slashable_indices_modular(head_state, slashing.as_inner(), |_, validator| {
// Declare that a validator is still slashable if they have not exited prior
// to the finalized epoch.
//
// We cannot check the `slashed` field since the `head` is not finalized and
// a fork could un-slash someone.
validator.exit_epoch > head_state.finalized_checkpoint().epoch
})
.map_or(false, |indices| !indices.is_empty());
signature_ok && slashing_ok
});
}
/// Total number of attester slashings in the pool.
pub fn num_attester_slashings(&self) -> usize {
self.attester_slashings.read().len()
}
/// Total number of proposer slashings in the pool.
pub fn num_proposer_slashings(&self) -> usize {
self.proposer_slashings.read().len()
}
/// Insert a voluntary exit that has previously been checked elsewhere.
pub fn insert_voluntary_exit(&self, exit: SigVerifiedOp<SignedVoluntaryExit, T>) {
self.voluntary_exits
.write()
.insert(exit.as_inner().message.validator_index, exit);
}
/// Get a list of voluntary exits for inclusion in a block.
fn get_voluntary_exits<F>(
&self,
state: &BeaconState<T>,
filter: F,
spec: &ChainSpec,
) -> Vec<SignedVoluntaryExit>
where
F: Fn(&SignedVoluntaryExit) -> bool,
{
filter_limit_operations(
self.voluntary_exits.read().values(),
|exit| {
filter(exit.as_inner())
&& exit.signature_is_still_valid(&state.fork())
&& verify_exit(state, None, exit.as_inner(), VerifySignatures::False, spec)
.is_ok()
},
|exit| exit.as_inner().clone(),
T::MaxVoluntaryExits::to_usize(),
)
}
/// Prune if validator has already exited at or before the finalized checkpoint of the head.
pub fn prune_voluntary_exits(&self, head_state: &BeaconState<T>) {
prune_validator_hash_map(
&mut self.voluntary_exits.write(),
// This condition is slightly too loose, since there will be some finalized exits that
// are missed here.
//
// We choose simplicity over the gain of pruning more exits since they are small and
// should not be seen frequently.
|_, validator| validator.exit_epoch <= head_state.finalized_checkpoint().epoch,
head_state,
);
}
/// Check if an address change equal to `address_change` is already in the pool.
///
/// Return `None` if no address change for the validator index exists in the pool.
pub fn bls_to_execution_change_in_pool_equals(
&self,
address_change: &SignedBlsToExecutionChange,
) -> Option<bool> {
self.bls_to_execution_changes
.read()
.existing_change_equals(address_change)
}
/// Insert a BLS to execution change into the pool, *only if* no prior change is known.
///
/// Return `true` if the change was inserted.
pub fn insert_bls_to_execution_change(
&self,
verified_change: SigVerifiedOp<SignedBlsToExecutionChange, T>,
received_pre_capella: ReceivedPreCapella,
) -> bool {
self.bls_to_execution_changes
.write()
.insert(verified_change, received_pre_capella)
}
/// Get a list of execution changes for inclusion in a block.
///
/// They're in random `HashMap` order, which isn't exactly fair, but isn't unfair either.
pub fn get_bls_to_execution_changes(
&self,
state: &BeaconState<T>,
spec: &ChainSpec,
) -> Vec<SignedBlsToExecutionChange> {
filter_limit_operations(
self.bls_to_execution_changes.read().iter_lifo(),
|address_change| {
address_change.signature_is_still_valid(&state.fork())
&& state
.get_validator(address_change.as_inner().message.validator_index as usize)
.map_or(false, |validator| {
!validator.has_eth1_withdrawal_credential(spec)
})
},
|address_change| address_change.as_inner().clone(),
T::MaxBlsToExecutionChanges::to_usize(),
)
}
/// Get a list of execution changes to be broadcast at the Capella fork.
///
/// The list that is returned will be shuffled to help provide a fair
/// broadcast of messages.
pub fn get_bls_to_execution_changes_received_pre_capella(
&self,
state: &BeaconState<T>,
spec: &ChainSpec,
) -> Vec<SignedBlsToExecutionChange> {
let mut changes = filter_limit_operations(
self.bls_to_execution_changes
.read()
.iter_received_pre_capella(),
|address_change| {
address_change.signature_is_still_valid(&state.fork())
&& state
.get_validator(address_change.as_inner().message.validator_index as usize)
.map_or(false, |validator| {
!validator.has_eth1_withdrawal_credential(spec)
})
},
|address_change| address_change.as_inner().clone(),
usize::max_value(),
);
changes.shuffle(&mut thread_rng());
changes
}
/// Removes `broadcasted` validators from the set of validators that should
/// have their BLS changes broadcast at the Capella fork boundary.
pub fn register_indices_broadcasted_at_capella(&self, broadcasted: &HashSet<u64>) {
self.bls_to_execution_changes
.write()
.register_indices_broadcasted_at_capella(broadcasted);
}
/// Prune BLS to execution changes that have been applied to the state more than 1 block ago.
pub fn prune_bls_to_execution_changes<Payload: AbstractExecPayload<T>>(
&self,
head_block: &SignedBeaconBlock<T, Payload>,
head_state: &BeaconState<T>,
spec: &ChainSpec,
) {
self.bls_to_execution_changes
.write()
.prune(head_block, head_state, spec)
}
/// Prune all types of transactions given the latest head state and head fork.
pub fn prune_all<Payload: AbstractExecPayload<T>>(
&self,
head_block: &SignedBeaconBlock<T, Payload>,
head_state: &BeaconState<T>,
current_epoch: Epoch,
spec: &ChainSpec,
) {
self.prune_attestations(current_epoch);
self.prune_sync_contributions(head_state.slot());
self.prune_proposer_slashings(head_state);
self.prune_attester_slashings(head_state);
self.prune_voluntary_exits(head_state);
self.prune_bls_to_execution_changes(head_block, head_state, spec);
}
/// Total number of voluntary exits in the pool.
pub fn num_voluntary_exits(&self) -> usize {
self.voluntary_exits.read().len()
}
/// Returns all known `Attestation` objects.
///
/// This method may return objects that are invalid for block inclusion.
pub fn get_all_attestations(&self) -> Vec<Attestation<T>> {
self.attestations
.read()
.iter()
.map(|att| att.clone_as_attestation())
.collect()
}
/// Returns all known `Attestation` objects that pass the provided filter.
///
/// This method may return objects that are invalid for block inclusion.
pub fn get_filtered_attestations<F>(&self, filter: F) -> Vec<Attestation<T>>
where
F: Fn(&AttestationData) -> bool,
{
self.attestations
.read()
.iter()
.filter(|att| filter(&att.attestation_data()))
.map(|att| att.clone_as_attestation())
.collect()
}
/// Returns all known `AttesterSlashing` objects.
///
/// This method may return objects that are invalid for block inclusion.
pub fn get_all_attester_slashings(&self) -> Vec<AttesterSlashing<T>> {
self.attester_slashings
.read()
.iter()
.map(|slashing| slashing.as_inner().clone())
.collect()
}
/// Returns all known `ProposerSlashing` objects.
///
/// This method may return objects that are invalid for block inclusion.
pub fn get_all_proposer_slashings(&self) -> Vec<ProposerSlashing> {
self.proposer_slashings
.read()
.iter()
.map(|(_, slashing)| slashing.as_inner().clone())
.collect()
}
/// Returns all known `SignedVoluntaryExit` objects.
///
/// This method may return objects that are invalid for block inclusion.
pub fn get_all_voluntary_exits(&self) -> Vec<SignedVoluntaryExit> {
self.voluntary_exits
.read()
.iter()
.map(|(_, exit)| exit.as_inner().clone())
.collect()
}
/// Returns all known `SignedBlsToExecutionChange` objects.
///
/// This method may return objects that are invalid for block inclusion.
pub fn get_all_bls_to_execution_changes(&self) -> Vec<SignedBlsToExecutionChange> {
self.bls_to_execution_changes
.read()
.iter_fifo()
.map(|address_change| address_change.as_inner().clone())
.collect()
}
}
/// Filter up to a maximum number of operations out of an iterator.
fn filter_limit_operations<'a, T: 'a, V: 'a, I, F, G>(
operations: I,
filter: F,
mapping: G,
limit: usize,
) -> Vec<V>
where
I: IntoIterator<Item = &'a T>,
F: Fn(&T) -> bool,
G: Fn(&T) -> V,
T: Clone,
{
operations
.into_iter()
.filter(|x| filter(*x))
.take(limit)
.map(mapping)
.collect()
}
/// Remove all entries from the given hash map for which `prune_if` returns true.
///
/// The keys in the map should be validator indices, which will be looked up
/// in the state's validator registry and then passed to `prune_if`.
/// Entries for unknown validators will be kept.
fn prune_validator_hash_map<T, F, E: EthSpec>(
map: &mut HashMap<u64, SigVerifiedOp<T, E>>,
prune_if: F,
head_state: &BeaconState<E>,
) where
F: Fn(u64, &Validator) -> bool,
T: VerifyOperation<E>,
{
map.retain(|&validator_index, op| {
op.signature_is_still_valid(&head_state.fork())
&& head_state
.validators()
.get(validator_index as usize)
.map_or(true, |validator| !prune_if(validator_index, validator))
});
}
/// Compare two operation pools.
impl<T: EthSpec + Default> PartialEq for OperationPool<T> {
fn eq(&self, other: &Self) -> bool {
if ptr::eq(self, other) {
return true;
}
*self.attestations.read() == *other.attestations.read()
&& *self.sync_contributions.read() == *other.sync_contributions.read()
&& *self.attester_slashings.read() == *other.attester_slashings.read()
&& *self.proposer_slashings.read() == *other.proposer_slashings.read()
&& *self.voluntary_exits.read() == *other.voluntary_exits.read()
}
}
#[cfg(all(test, not(debug_assertions)))]
mod release_tests {
use super::attestation::earliest_attestation_validators;
use super::*;
use beacon_chain::test_utils::{
test_spec, BeaconChainHarness, EphemeralHarnessType, RelativeSyncCommittee,
};
use lazy_static::lazy_static;
use maplit::hashset;
use state_processing::{common::get_attesting_indices_from_state, VerifyOperation};
use std::collections::BTreeSet;
use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT;
use types::*;
pub const MAX_VALIDATOR_COUNT: usize = 4 * 32 * 128;
lazy_static! {
/// A cached set of keys.
static ref KEYPAIRS: Vec<Keypair> = types::test_utils::generate_deterministic_keypairs(MAX_VALIDATOR_COUNT);
}
fn get_harness<E: EthSpec>(
validator_count: usize,
spec: Option<ChainSpec>,
) -> BeaconChainHarness<EphemeralHarnessType<E>> {
let harness = BeaconChainHarness::builder(E::default())
.spec_or_default(spec)
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.fresh_ephemeral_store()
.mock_execution_layer()
.build();
harness.advance_slot();
harness
}
/// Test state for attestation-related tests.
fn attestation_test_state<E: EthSpec>(
num_committees: usize,
) -> (BeaconChainHarness<EphemeralHarnessType<E>>, ChainSpec) {
let spec = test_spec::<E>();
let num_validators =
num_committees * E::slots_per_epoch() as usize * spec.target_committee_size;
let harness = get_harness::<E>(num_validators, Some(spec.clone()));
(harness, spec)
}
/// Test state for sync contribution-related tests.
async fn sync_contribution_test_state<E: EthSpec>(
num_committees: usize,
) -> (BeaconChainHarness<EphemeralHarnessType<E>>, ChainSpec) {
let mut spec = E::default_spec();
spec.altair_fork_epoch = Some(Epoch::new(0));
let num_validators =
num_committees * E::slots_per_epoch() as usize * spec.target_committee_size;
let harness = get_harness::<E>(num_validators, Some(spec.clone()));
let state = harness.get_current_state();
harness
.add_attested_blocks_at_slots(
state,
Hash256::zero(),
&[Slot::new(1)],
(0..num_validators).collect::<Vec<_>>().as_slice(),
)
.await;
(harness, spec)
}
#[test]
fn test_earliest_attestation() {
let (harness, ref spec) = attestation_test_state::<MainnetEthSpec>(1);
// Only run this test on the phase0 hard-fork.
if spec.altair_fork_epoch != None {
return;
}
let mut state = harness.get_current_state();
let slot = state.slot();
let committees = state
.get_beacon_committees_at_slot(slot)
.unwrap()
.into_iter()
.map(BeaconCommittee::into_owned)
.collect::<Vec<_>>();
let num_validators =
MainnetEthSpec::slots_per_epoch() as usize * spec.target_committee_size;
let attestations = harness.make_attestations(
(0..num_validators).collect::<Vec<_>>().as_slice(),
&state,
Hash256::zero(),
SignedBeaconBlockHash::from(Hash256::zero()),
slot,
);
for (atts, aggregate) in &attestations {
let att2 = aggregate.as_ref().unwrap().message.aggregate.clone();
let att1 = atts
.into_iter()
.map(|(att, _)| att)
.take(2)
.fold::<Option<Attestation<MainnetEthSpec>>, _>(None, |att, new_att| {
if let Some(mut a) = att {
a.aggregate(&new_att);
Some(a)
} else {
Some(new_att.clone())
}
})
.unwrap();
let att1_indices = get_attesting_indices_from_state(&state, &att1).unwrap();
let att2_indices = get_attesting_indices_from_state(&state, &att2).unwrap();
let att1_split = SplitAttestation::new(att1.clone(), att1_indices);
let att2_split = SplitAttestation::new(att2.clone(), att2_indices);
assert_eq!(
att1.aggregation_bits.num_set_bits(),
earliest_attestation_validators(
&att1_split.as_ref(),
&state,
state.as_base().unwrap()
)
.num_set_bits()
);
state
.as_base_mut()
.unwrap()
.current_epoch_attestations
.push(PendingAttestation {
aggregation_bits: att1.aggregation_bits.clone(),
data: att1.data.clone(),
inclusion_delay: 0,
proposer_index: 0,
})
.unwrap();
assert_eq!(
committees.get(0).unwrap().committee.len() - 2,
earliest_attestation_validators(
&att2_split.as_ref(),
&state,
state.as_base().unwrap()
)
.num_set_bits()
);
}
}
/// End-to-end test of basic attestation handling.
#[test]
fn attestation_aggregation_insert_get_prune() {
let (harness, ref spec) = attestation_test_state::<MainnetEthSpec>(1);
let op_pool = OperationPool::<MainnetEthSpec>::new();
let mut state = harness.get_current_state();
let slot = state.slot();
let committees = state
.get_beacon_committees_at_slot(slot)
.unwrap()
.into_iter()
.map(BeaconCommittee::into_owned)
.collect::<Vec<_>>();
assert_eq!(
committees.len(),
1,
"we expect just one committee with this many validators"
);
let num_validators =
MainnetEthSpec::slots_per_epoch() as usize * spec.target_committee_size;
let attestations = harness.make_attestations(
(0..num_validators).collect::<Vec<_>>().as_slice(),
&state,
Hash256::zero(),
SignedBeaconBlockHash::from(Hash256::zero()),
slot,
);
for (atts, _) in attestations {
for (att, _) in atts {
let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap();
op_pool.insert_attestation(att, attesting_indices).unwrap();
}
}
assert_eq!(op_pool.num_attestations(), committees.len());
// Before the min attestation inclusion delay, get_attestations shouldn't return anything.
assert_eq!(
op_pool
.get_attestations(&state, |_| true, |_| true, spec)
.expect("should have attestations")
.len(),
0
);
// Then once the delay has elapsed, we should get a single aggregated attestation.
*state.slot_mut() += spec.min_attestation_inclusion_delay;
let block_attestations = op_pool
.get_attestations(&state, |_| true, |_| true, spec)
.expect("Should have block attestations");
assert_eq!(block_attestations.len(), committees.len());
let agg_att = &block_attestations[0];
assert_eq!(
agg_att.aggregation_bits.num_set_bits(),
spec.target_committee_size as usize
);
// Prune attestations shouldn't do anything at this point.
op_pool.prune_attestations(state.current_epoch());
assert_eq!(op_pool.num_attestations(), committees.len());
// But once we advance to more than an epoch after the attestation, it should prune it
// out of existence.
*state.slot_mut() += 2 * MainnetEthSpec::slots_per_epoch();
op_pool.prune_attestations(state.current_epoch());
assert_eq!(op_pool.num_attestations(), 0);
}