Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new slot ticker and use it on attestation aggregation #12377

Merged
merged 13 commits into from
May 10, 2023
1 change: 1 addition & 0 deletions beacon-chain/operations/attestations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
deps = [
"//beacon-chain/operations/attestations/kv:go_default_library",
"//cache/lru:go_default_library",
"//config/features:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/hash:go_default_library",
Expand Down
10 changes: 4 additions & 6 deletions beacon-chain/operations/attestations/prepare_forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,22 @@ import (
"time"

"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v4/config/features"
"github.com/prysmaticlabs/prysm/v4/crypto/hash"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
attaggregation "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation/attestations"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"go.opencensus.io/trace"
)

// Prepare attestations for fork choice three times per slot.
var prepareForkChoiceAttsPeriod = slots.DivideSlotBy(3 /* times-per-slot */)

// This prepares fork choice attestations by running batchForkChoiceAtts
// every prepareForkChoiceAttsPeriod.
func (s *Service) prepareForkChoiceAtts() {
ticker := time.NewTicker(prepareForkChoiceAttsPeriod)
defer ticker.Stop()
intervals := features.Get().AggregateIntervals
ticker := slots.NewSlotTickerWithIntervals(time.Unix(int64(s.genesisTime), 0), intervals)
for {
select {
case <-ticker.C:
case <-ticker.C():
if err := s.batchForkChoiceAtts(s.ctx); err != nil {
log.WithError(err).Error("Could not prepare attestations for fork choice")
}
Expand Down
4 changes: 4 additions & 0 deletions config/features/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type Flags struct {
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
// changed on disk. This feature is for advanced use cases only.
KeystoreImportDebounceInterval time.Duration

// AggregateIntervals specifies the time durations at which we aggregate attestations preparing for forkchoice.
AggregateIntervals []time.Duration
}

var featureConfig *Flags
Expand Down Expand Up @@ -257,6 +260,7 @@ func ConfigureValidator(ctx *cli.Context) error {
cfg.EnableBeaconRESTApi = true
}
cfg.KeystoreImportDebounceInterval = ctx.Duration(dynamicKeyReloadDebounceInterval.Name)
cfg.AggregateIntervals = []time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
Init(cfg)
return nil
}
Expand Down
18 changes: 18 additions & 0 deletions config/features/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ var (
Usage: "(Danger): Writes the wallet password to the wallet directory on completing Prysm web onboarding. " +
"We recommend against this flag unless you are an advanced user.",
}
aggregateFirstInterval = &cli.DurationFlag{
Name: "aggregate-first-interval",
Usage: "(Advanced): Specifies the first interval in which attestations are aggregated in the slot (typically unnaggregated attestations are aggregated in this interval)",
Value: 7 * time.Second,
Hidden: true,
}
aggregateSecondInterval = &cli.DurationFlag{
Name: "aggregate-second-interval",
Usage: "(Advanced): Specifies the second interval in which attestations are aggregated in the slot",
Value: 9 * time.Second,
Hidden: true,
}
aggregateThirdInterval = &cli.DurationFlag{
Name: "aggregate-third-interval",
Usage: "(Advanced): Specifies the third interval in which attestations are aggregated in the slot",
Value: 11 * time.Second,
Hidden: true,
}
dynamicKeyReloadDebounceInterval = &cli.DurationFlag{
Name: "dynamic-key-reload-debounce-interval",
Usage: "(Advanced): Specifies the time duration the validator waits to reload new keys if they have " +
Expand Down
63 changes: 63 additions & 0 deletions time/slots/slotticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package slots
import (
"time"

"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
)
Expand Down Expand Up @@ -104,3 +105,65 @@ func (s *SlotTicker) start(
}
}()
}

// startWithIntervals starts a ticker that emits a tick every slot at the
// prescribed intervals. The caller is responsible to make these intervals increasing and
// less than secondsPerSlot
func (s *SlotTicker) startWithIntervals(
genesisTime time.Time,
since, until func(time.Time) time.Duration,
after func(time.Duration) <-chan time.Time,
intervals []time.Duration) {

go func() {
slot := Since(genesisTime)
slot++
interval := 0
nextTickTime := startTime(genesisTime, slot).Add(intervals[0])

for {
waitTime := until(nextTickTime)
select {
case <-after(waitTime):
s.c <- slot
interval++
if interval == len(intervals) {
interval = 0
slot++
}
nextTickTime = startTime(genesisTime, slot).Add(intervals[interval])
case <-s.done:
return
}
}
}()
}

// NewSlotTickerWithIntervals starts and returns a SlotTicker instance that allows
// several offsets of time from genesis,
// Caller is responsible to input the intervals in increasing order and none bigger or equal than
// SecondsPerSlot
func NewSlotTickerWithIntervals(genesisTime time.Time, intervals []time.Duration) *SlotTicker {
if genesisTime.Unix() == 0 {
panic("zero genesis time")
}
if len(intervals) == 0 {
panic("at least one interval has to be entered")
}
slotDuration := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
lastOffset := time.Duration(0)
for _, offset := range intervals {
if offset < lastOffset {
panic("invalid decreasing offsets")
}
if offset >= slotDuration {
panic("invalid ticker offset")
}
}
ticker := &SlotTicker{
c: make(chan primitives.Slot),
done: make(chan struct{}),
}
ticker.startWithIntervals(genesisTime, prysmTime.Since, prysmTime.Until, time.After, intervals)
return ticker
}
27 changes: 27 additions & 0 deletions time/slots/slotticker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
)

Expand Down Expand Up @@ -136,3 +137,29 @@ func TestGetSlotTickerWithOffset_OK(t *testing.T) {
}
}
}

func TestGetSlotTickerWitIntervals(t *testing.T) {
genesisTime := time.Now()
offset := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second / 3
intervals := []time.Duration{offset, 2 * offset}

intervalTicker := NewSlotTickerWithIntervals(genesisTime, intervals)
normalTicker := NewSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot)

firstTicked := 0
for {
select {
case <-intervalTicker.C():
// interval ticks starts in second slot
if firstTicked < 2 {
t.Fatal("Expected other ticker to tick first")
}
return
case <-normalTicker.C():
if firstTicked > 1 {
t.Fatal("Expected normal ticker to tick first")
}
firstTicked++
}
}
}
11 changes: 8 additions & 3 deletions time/slots/slottime.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ import (
// incoming objects. (24 mins with mainnet spec)
const MaxSlotBuffer = uint64(1 << 7)

// startTime returns the slot start in terms of genesis time.Time
func startTime(genesis time.Time, slot primitives.Slot) time.Time {
duration := time.Second * time.Duration(slot.Mul(params.BeaconConfig().SecondsPerSlot))
return genesis.Add(duration) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
}

// StartTime returns the start time in terms of its unix epoch
// value.
func StartTime(genesis uint64, slot primitives.Slot) time.Time {
duration := time.Second * time.Duration(slot.Mul(params.BeaconConfig().SecondsPerSlot))
startTime := time.Unix(int64(genesis), 0).Add(duration) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
return startTime
genesisTime := time.Unix(int64(genesis), 0) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
return startTime(genesisTime, slot)
}

// SinceGenesis returns the number of slots since
Expand Down