Skip to content

Commit

Permalink
Merge branch 'fix/serving-bundle-lag' into release/serving-bundle-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Feb 20, 2024
2 parents bc7b281 + c17d72d commit 7523028
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ bin
.vscode
config.yaml
config2.yaml
__debug_bin
__debug_bin*
dist/
checkpointz
25 changes: 13 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ module github.com/ethpandaops/checkpointz
go 1.17

require (
github.com/attestantio/go-eth2-client v0.18.2
github.com/attestantio/go-eth2-client v0.19.9
github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9
github.com/creasty/defaults v1.6.0
github.com/ethpandaops/beacon v0.28.0
github.com/ethpandaops/beacon v0.35.0
github.com/ethpandaops/ethwallclock v0.2.0
github.com/go-co-op/gocron v1.18.0
github.com/julienschmidt/httprouter v1.3.0
github.com/nanmu42/gzip v1.2.0
Expand All @@ -20,8 +21,7 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/ethpandaops/ethwallclock v0.2.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/ferranbt/fastssz v0.1.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.7.4 // indirect
Expand All @@ -32,16 +32,17 @@ require (
github.com/go-playground/validator/v10 v10.9.0 // indirect
github.com/goccy/go-yaml v1.9.5 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/holiman/uint256 v1.2.2 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/huandu/go-clone v1.6.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/klauspost/cpuid/v2 v2.1.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand All @@ -59,12 +60,12 @@ require (
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
)
70 changes: 27 additions & 43 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (h *Handler) wrappedHandler(handler func(ctx context.Context, r *http.Reque
"path": r.URL.Path,
"content_type": contentType,
"accept": r.Header.Get("Accept"),
}).Debug("Handling request")
}).Trace("Handling request")

h.metrics.ObserveRequest(r.Method, registeredPath)

Expand Down
123 changes: 106 additions & 17 deletions pkg/beacon/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/chuckpreslar/emission"
"github.com/ethpandaops/beacon/pkg/beacon"
"github.com/ethpandaops/beacon/pkg/beacon/api/types"
"github.com/ethpandaops/beacon/pkg/beacon/state"
"github.com/ethpandaops/checkpointz/pkg/beacon/checkpoints"
"github.com/ethpandaops/checkpointz/pkg/beacon/node"
"github.com/ethpandaops/checkpointz/pkg/beacon/store"
"github.com/ethpandaops/checkpointz/pkg/eth"
"github.com/ethpandaops/ethwallclock"
"github.com/go-co-op/gocron"
perrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand All @@ -40,6 +44,10 @@ type Default struct {

historicalSlotFailures map[phase0.Slot]int

servingMutex sync.Mutex
historicalMutex sync.Mutex
majorityMutex sync.Mutex

metrics *Metrics
}

Expand Down Expand Up @@ -72,6 +80,10 @@ func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.C
states: store.NewBeaconState(log, config.Caches.States, namespace),
depositSnapshots: store.NewDepositSnapshot(log, config.Caches.DepositSnapshots, namespace),

servingMutex: sync.Mutex{},
historicalMutex: sync.Mutex{},
majorityMutex: sync.Mutex{},

metrics: NewMetrics(namespace + "_beacon"),
}
}
Expand Down Expand Up @@ -104,13 +116,58 @@ func (d *Default) Start(ctx context.Context) error {
}
}()

// Subscribe to the nodes' finality updates.
for _, node := range d.nodes {
n := node

logCtx := d.log.WithFields(logrus.Fields{
"node": n.Config.Name,
})

n.Beacon.OnFinalityCheckpointUpdated(ctx, func(ctx context.Context, event *beacon.FinalityCheckpointUpdated) error {
logCtx.WithFields(logrus.Fields{
"epoch": event.Finality.Finalized.Epoch,
"root": fmt.Sprintf("%#x", event.Finality.Finalized.Root),
}).Info("Node has a new finalized checkpoint")

// Check if we have a new majority finality.
if err := d.checkFinality(ctx); err != nil {
logCtx.WithError(err).Error("Failed to check finality")

return err
}

return d.checkForNewServingCheckpoint(ctx)
})

n.Beacon.OnReady(ctx, func(ctx context.Context, _ *beacon.ReadyEvent) error {
n.Beacon.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) {
time.Sleep(time.Second * 5)

if _, err := node.Beacon.FetchFinality(ctx, "head"); err != nil {
logCtx.WithError(err).Error("Failed to fetch finality after epoch transition")
}

if err := d.checkFinality(ctx); err != nil {
logCtx.WithError(err).Error("Failed to check finality")
}

if err := d.checkForNewServingCheckpoint(ctx); err != nil {
logCtx.WithError(err).Error("Failed to check for new serving checkpoint after epoch change")
}
})

return nil
})
}

return nil
}

func (d *Default) startCrons(ctx context.Context) error {
s := gocron.NewScheduler(time.Local)

if _, err := s.Every("5s").Do(func() {
if _, err := s.Every("30s").Do(func() {
if err := d.checkFinality(ctx); err != nil {
d.log.WithError(err).Error("Failed to check finality")
}
Expand Down Expand Up @@ -210,13 +267,17 @@ func (d *Default) startHistoricalLoop(ctx context.Context) error {
}

func (d *Default) startServingLoop(ctx context.Context) error {
if err := d.checkForNewServingCheckpoint(ctx); err != nil {
d.log.WithError(err).Error("Failed to check for serving checkpoint")
}

for {
select {
case <-time.After(time.Second * 1):
case <-time.After(time.Second * 5):
if err := d.checkForNewServingCheckpoint(ctx); err != nil {
d.log.WithError(err).Error("Failed to check for new serving checkpoint")

time.Sleep(time.Second * 30)
time.Sleep(time.Second * 15)
}
case <-ctx.Done():
return ctx.Err()
Expand All @@ -225,22 +286,44 @@ func (d *Default) startServingLoop(ctx context.Context) error {
}

func (d *Default) checkForNewServingCheckpoint(ctx context.Context) error {
d.servingMutex.Lock()
defer d.servingMutex.Unlock()

// Don't bother checking if we don't know the head yet.
if d.head == nil {
return nil
return errors.New("head finality is unknown")
}

if d.head.Finalized == nil {
return nil
return errors.New("head finalized checkpoint is unknown")
}

// If head == serving, we're done.
if d.servingBundle != nil && d.servingBundle.Finalized != nil && d.servingBundle.Finalized.Epoch == d.head.Finalized.Epoch {
return nil
logCtx := d.log.WithFields(logrus.Fields{
"head_epoch": d.head.Finalized.Epoch,
"head_root": fmt.Sprintf("%#x", d.head.Finalized.Root),
})

// If we don't have a serving bundle already, download one.
if d.servingBundle == nil {
logCtx.Info("No serving bundle available, downloading")

return d.downloadServingCheckpoint(ctx, d.head)
}

if err := d.downloadServingCheckpoint(ctx, d.head); err != nil {
return err
if d.servingBundle.Finalized == nil {
logCtx.Info("Serving bundle is unknown, downloading")

return d.downloadServingCheckpoint(ctx, d.head)
}

// If the head has moved on, download a new serving bundle.
if d.servingBundle.Finalized.Epoch != d.head.Finalized.Epoch {
logCtx.
WithField("serving_epoch", d.servingBundle.Finalized.Epoch).
WithField("serving_root", fmt.Sprintf("%#x", d.servingBundle.Finalized.Root)).
Info("Head finality has advanced, downloading new serving bundle")

return d.downloadServingCheckpoint(ctx, d.head)
}

return nil
Expand Down Expand Up @@ -336,6 +419,9 @@ func (d *Default) shouldDownloadStates() bool {
}

func (d *Default) checkFinality(ctx context.Context) error {
d.majorityMutex.Lock()
defer d.majorityMutex.Unlock()

aggFinality := []*v1.Finality{}
readyNodes := d.nodes.Ready(ctx)

Expand All @@ -350,19 +436,22 @@ func (d *Default) checkFinality(ctx context.Context) error {
aggFinality = append(aggFinality, finality)
}

Default, err := checkpoints.NewMajorityDecider().Decide(aggFinality)
majority, err := checkpoints.NewMajorityDecider().Decide(aggFinality)
if err != nil {
return err
return perrors.Wrap(err, "failed to decide majority finality")
}

if d.head == nil || d.head.Finalized == nil || d.head.Finalized.Root != Default.Finalized.Root {
d.head = Default
if d.head == nil || d.head.Finalized == nil || d.head.Finalized.Root != majority.Finalized.Root {
d.head = majority

d.publishFinalityCheckpointHeadUpdated(ctx, Default)
d.publishFinalityCheckpointHeadUpdated(ctx, majority)

d.log.WithField("epoch", Default.Finalized.Epoch).WithField("root", fmt.Sprintf("%#x", Default.Finalized.Root)).Info("New finalized head checkpoint")
d.log.
WithField("epoch", majority.Finalized.Epoch).
WithField("root", fmt.Sprintf("%#x", majority.Finalized.Root)).
Info("New finalized head checkpoint")

d.metrics.ObserveHeadEpoch(Default.Finalized.Epoch)
d.metrics.ObserveHeadEpoch(majority.Finalized.Epoch)
}

return nil
Expand Down
22 changes: 19 additions & 3 deletions pkg/beacon/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,31 @@ import (
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/checkpointz/pkg/eth"
perrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func (d *Default) downloadServingCheckpoint(ctx context.Context, checkpoint *v1.Finality) error {
if checkpoint == nil {
return errors.New("checkpoint is nil")
}

if checkpoint.Finalized == nil {
return errors.New("finalized checkpoint is nil")
}

upstream, err := d.nodes.
Ready(ctx).
DataProviders(ctx).
PastFinalizedCheckpoint(ctx, checkpoint). // Ensure we attempt to fetch the bundle from a node that knows about the checkpoint.
RandomNode(ctx)
if err != nil {
return err
return perrors.Wrap(err, "no data provider node available")
}

block, err := d.fetchBundle(ctx, checkpoint.Finalized.Root, upstream)
if err != nil {
return err
return perrors.Wrap(err, "failed to fetch bundle")
}

// Validate that everything is ok to serve.
Expand Down Expand Up @@ -117,6 +126,9 @@ func (d *Default) checkGenesis(ctx context.Context) error {
}

func (d *Default) fetchHistoricalCheckpoints(ctx context.Context, checkpoint *v1.Finality) error {
d.historicalMutex.Lock()
defer d.historicalMutex.Unlock()

if d.spec == nil {
return errors.New("beacon spec unavailable")
}
Expand Down Expand Up @@ -245,6 +257,7 @@ func (d *Default) downloadBlock(ctx context.Context, slot phase0.Slot, upstream
"slot": slot,
"root": eth.RootAsString(root),
"state_root": eth.RootAsString(stateRoot),
"node": upstream.Config.Name,
}).
Infof("Downloaded and stored block for slot %d", slot)

Expand Down Expand Up @@ -365,7 +378,10 @@ func (d *Default) downloadAndStoreDepositSnapshot(ctx context.Context, epoch pha
}

d.log.
WithFields(logrus.Fields{"epoch": epoch}).
WithFields(logrus.Fields{
"epoch": epoch,
"node": node.Config.Name,
}).
Infof("Downloaded and stored deposit snapshot for epoch %d", epoch)

return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/beacon/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"math/rand"
"strings"
"time"

v1 "github.com/attestantio/go-eth2-client/api/v1"
Expand All @@ -25,7 +26,7 @@ func NewNodesFromConfig(log logrus.FieldLogger, configs []node.Config, namespace
for i, config := range configs {
sconfig := &sbeacon.Config{
Name: config.Name,
Addr: config.Address,
Addr: strings.TrimRight(config.Address, "/"),
Headers: config.Headers,
}

Expand Down
1 change: 0 additions & 1 deletion pkg/beacon/store/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func NewBlock(log logrus.FieldLogger, config Config, namespace string) *Block {
block, ok := value.(*spec.VersionedSignedBeaconBlock)
if !ok {
c.log.WithField("block_root", key).Error("Invalid block type when cleaning up block cache")

return
}

Expand Down

0 comments on commit 7523028

Please sign in to comment.