Skip to content

Commit

Permalink
Add a new slot ticker and use it on attestation aggregation (#12377)
Browse files Browse the repository at this point in the history
* Add slot ticker with intervals

* add flags for aggregation duration

* misspelling

* hide flags

* fix flags and default durations

* lint

* wait for initial sync

* deep source

* add log

* Preston's review

* fix error message

---------

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
potuz and prylabs-bulldozer[bot] authored May 10, 2023
1 parent 07db0dc commit f6764fe
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 12 deletions.
3 changes: 2 additions & 1 deletion beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ func (b *BeaconNode) fetchBuilderService() *builder.Service {

func (b *BeaconNode) registerAttestationPool() error {
s, err := attestations.NewService(b.ctx, &attestations.Config{
Pool: b.attestationPool,
Pool: b.attestationPool,
InitialSyncComplete: b.initialSyncComplete,
})
if err != nil {
return errors.Wrap(err, "could not register atts pool service")
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/operations/attestations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
deps = [
"//beacon-chain/operations/attestations/kv:go_default_library",
"//cache/lru:go_default_library",
"//config/features:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/hash:go_default_library",
Expand Down
24 changes: 18 additions & 6 deletions beacon-chain/operations/attestations/prepare_forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,39 @@ import (
"time"

"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v4/config/features"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/crypto/hash"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
attaggregation "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation/attestations"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"go.opencensus.io/trace"
)

// Prepare attestations for fork choice three times per slot.
var prepareForkChoiceAttsPeriod = slots.DivideSlotBy(3 /* times-per-slot */)

// This prepares fork choice attestations by running batchForkChoiceAtts
// every prepareForkChoiceAttsPeriod.
func (s *Service) prepareForkChoiceAtts() {
ticker := time.NewTicker(prepareForkChoiceAttsPeriod)
defer ticker.Stop()
intervals := features.Get().AggregateIntervals
slotDuration := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
// Adjust intervals for networks with a lower slot duration (Hive, e2e, etc)
for {
if intervals[len(intervals)-1] >= slotDuration {
for i, offset := range intervals {
intervals[i] = offset / 2
}
} else {
break
}
}
ticker := slots.NewSlotTickerWithIntervals(time.Unix(int64(s.genesisTime), 0), intervals)
for {
select {
case <-ticker.C:
case <-ticker.C():
t := time.Now()
if err := s.batchForkChoiceAtts(s.ctx); err != nil {
log.WithError(err).Error("Could not prepare attestations for fork choice")
}
log.WithField("latency", time.Since(t).Milliseconds()).Debug("batched forkchoice attestations")
case <-s.ctx.Done():
log.Debug("Context closed, exiting routine")
return
Expand Down
20 changes: 18 additions & 2 deletions beacon-chain/operations/attestations/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package attestations

import (
"context"
"errors"
"time"

lru "github.com/hashicorp/golang-lru"
Expand All @@ -26,8 +27,9 @@ type Service struct {

// Config options for the service.
type Config struct {
Pool Pool
pruneInterval time.Duration
Pool Pool
pruneInterval time.Duration
InitialSyncComplete chan struct{}
}

// NewService instantiates a new attestation pool service instance that will
Expand All @@ -51,10 +53,24 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {

// Start an attestation pool service's main event loop.
func (s *Service) Start() {
if err := s.waitForSync(s.cfg.InitialSyncComplete); err != nil {
log.WithError(err).Error("failed to wait for initial sync")
return
}
go s.prepareForkChoiceAtts()
go s.pruneAttsPool()
}

// waitForSync waits until the beacon node is synced to the latest head.
func (s *Service) waitForSync(syncChan chan struct{}) error {
select {
case <-syncChan:
return nil
case <-s.ctx.Done():
return errors.New("context closed, exiting goroutine")
}
}

// Stop the beacon block attestation pool service's main event loop
// and associated goroutines.
func (s *Service) Stop() error {
Expand Down
4 changes: 4 additions & 0 deletions config/features/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type Flags struct {
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
// changed on disk. This feature is for advanced use cases only.
KeystoreImportDebounceInterval time.Duration

// AggregateIntervals specifies the time durations at which we aggregate attestations preparing for forkchoice.
AggregateIntervals []time.Duration
}

var featureConfig *Flags
Expand Down Expand Up @@ -218,6 +221,7 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(buildBlockParallel)
cfg.BuildBlockParallel = true
}
cfg.AggregateIntervals = []time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
Init(cfg)
return nil
}
Expand Down
21 changes: 21 additions & 0 deletions config/features/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ var (
Usage: "(Danger): Writes the wallet password to the wallet directory on completing Prysm web onboarding. " +
"We recommend against this flag unless you are an advanced user.",
}
aggregateFirstInterval = &cli.DurationFlag{
Name: "aggregate-first-interval",
Usage: "(Advanced): Specifies the first interval in which attestations are aggregated in the slot (typically unnaggregated attestations are aggregated in this interval)",
Value: 7 * time.Second,
Hidden: true,
}
aggregateSecondInterval = &cli.DurationFlag{
Name: "aggregate-second-interval",
Usage: "(Advanced): Specifies the second interval in which attestations are aggregated in the slot",
Value: 9 * time.Second,
Hidden: true,
}
aggregateThirdInterval = &cli.DurationFlag{
Name: "aggregate-third-interval",
Usage: "(Advanced): Specifies the third interval in which attestations are aggregated in the slot",
Value: 11 * time.Second,
Hidden: true,
}
dynamicKeyReloadDebounceInterval = &cli.DurationFlag{
Name: "dynamic-key-reload-debounce-interval",
Usage: "(Advanced): Specifies the time duration the validator waits to reload new keys if they have " +
Expand Down Expand Up @@ -168,6 +186,9 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
enableVerboseSigVerification,
enableOptionalEngineMethods,
prepareAllPayloads,
aggregateFirstInterval,
aggregateSecondInterval,
aggregateThirdInterval,
buildBlockParallel,
}...)...)

Expand Down
1 change: 1 addition & 0 deletions time/slots/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ go_test(
"//time:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
],
)
63 changes: 63 additions & 0 deletions time/slots/slotticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package slots
import (
"time"

"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
)
Expand Down Expand Up @@ -104,3 +105,65 @@ func (s *SlotTicker) start(
}
}()
}

// startWithIntervals starts a ticker that emits a tick every slot at the
// prescribed intervals. The caller is responsible to make these intervals increasing and
// less than secondsPerSlot
func (s *SlotTicker) startWithIntervals(
genesisTime time.Time,
until func(time.Time) time.Duration,
after func(time.Duration) <-chan time.Time,
intervals []time.Duration) {
go func() {
slot := Since(genesisTime)
slot++
interval := 0
nextTickTime := startFromTime(genesisTime, slot).Add(intervals[0])

for {
waitTime := until(nextTickTime)
select {
case <-after(waitTime):
s.c <- slot
interval++
if interval == len(intervals) {
interval = 0
slot++
}
nextTickTime = startFromTime(genesisTime, slot).Add(intervals[interval])
case <-s.done:
return
}
}
}()
}

// NewSlotTickerWithIntervals starts and returns a SlotTicker instance that allows
// several offsets of time from genesis,
// Caller is responsible to input the intervals in increasing order and none bigger or equal than
// SecondsPerSlot
func NewSlotTickerWithIntervals(genesisTime time.Time, intervals []time.Duration) *SlotTicker {
if genesisTime.Unix() == 0 {
panic("zero genesis time")
}
if len(intervals) == 0 {
panic("at least one interval has to be entered")
}
slotDuration := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
lastOffset := time.Duration(0)
for _, offset := range intervals {
if offset < lastOffset {
panic("invalid decreasing offsets")
}
if offset >= slotDuration {
panic("invalid ticker offset")
}
lastOffset = offset
}
ticker := &SlotTicker{
c: make(chan primitives.Slot),
done: make(chan struct{}),
}
ticker.startWithIntervals(genesisTime, prysmTime.Until, time.After, intervals)
return ticker
}
48 changes: 48 additions & 0 deletions time/slots/slotticker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"testing"
"time"

"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/stretchr/testify/require"
)

var _ Ticker = (*SlotTicker)(nil)
Expand Down Expand Up @@ -136,3 +138,49 @@ func TestGetSlotTickerWithOffset_OK(t *testing.T) {
}
}
}

func TestGetSlotTickerWitIntervals(t *testing.T) {
genesisTime := time.Now()
offset := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second / 3
intervals := []time.Duration{offset, 2 * offset}

intervalTicker := NewSlotTickerWithIntervals(genesisTime, intervals)
normalTicker := NewSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot)

firstTicked := 0
for {
select {
case <-intervalTicker.C():
// interval ticks starts in second slot
if firstTicked < 2 {
t.Fatal("Expected other ticker to tick first")
}
return
case <-normalTicker.C():
if firstTicked > 1 {
t.Fatal("Expected normal ticker to tick first")
}
firstTicked++
}
}
}

func TestSlotTickerWithIntervalsInputValidation(t *testing.T) {
var genesisTime time.Time
offset := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second / 3
intervals := make([]time.Duration, 0)
panicCall := func() {
NewSlotTickerWithIntervals(genesisTime, intervals)
}
require.Panics(t, panicCall, "zero genesis time")
genesisTime = time.Now()
require.Panics(t, panicCall, "at least one interval has to be entered")
intervals = []time.Duration{2 * offset, offset}
require.Panics(t, panicCall, "invalid decreasing offsets")
intervals = []time.Duration{offset, 4 * offset}
require.Panics(t, panicCall, "invalid ticker offset")
intervals = []time.Duration{4 * offset, offset}
require.Panics(t, panicCall, "invalid ticker offset")
intervals = []time.Duration{offset, 2 * offset}
require.NotPanics(t, panicCall)
}
11 changes: 8 additions & 3 deletions time/slots/slottime.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ import (
// incoming objects. (24 mins with mainnet spec)
const MaxSlotBuffer = uint64(1 << 7)

// startFromTime returns the slot start in terms of genesis time.Time
func startFromTime(genesis time.Time, slot primitives.Slot) time.Time {
duration := time.Second * time.Duration(slot.Mul(params.BeaconConfig().SecondsPerSlot))
return genesis.Add(duration) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
}

// StartTime returns the start time in terms of its unix epoch
// value.
func StartTime(genesis uint64, slot primitives.Slot) time.Time {
duration := time.Second * time.Duration(slot.Mul(params.BeaconConfig().SecondsPerSlot))
startTime := time.Unix(int64(genesis), 0).Add(duration) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
return startTime
genesisTime := time.Unix(int64(genesis), 0) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
return startFromTime(genesisTime, slot)
}

// SinceGenesis returns the number of slots since
Expand Down

0 comments on commit f6764fe

Please sign in to comment.