Skip to content

Commit

Permalink
fix: Add additional serving bundle checks
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Feb 20, 2024
1 parent f143029 commit 6139750
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ bin
.vscode
config.yaml
config2.yaml
__debug_bin
__debug_bin*
dist/
checkpointz
2 changes: 1 addition & 1 deletion pkg/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (h *Handler) wrappedHandler(handler func(ctx context.Context, r *http.Reque
"path": r.URL.Path,
"content_type": contentType,
"accept": r.Header.Get("Accept"),
}).Debug("Handling request")
}).Trace("Handling request")

h.metrics.ObserveRequest(r.Method, registeredPath)

Expand Down
107 changes: 90 additions & 17 deletions pkg/beacon/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/chuckpreslar/emission"
"github.com/ethpandaops/beacon/pkg/beacon"
"github.com/ethpandaops/beacon/pkg/beacon/api/types"
"github.com/ethpandaops/beacon/pkg/beacon/state"
"github.com/ethpandaops/checkpointz/pkg/beacon/checkpoints"
"github.com/ethpandaops/checkpointz/pkg/beacon/node"
"github.com/ethpandaops/checkpointz/pkg/beacon/store"
"github.com/ethpandaops/checkpointz/pkg/eth"
"github.com/ethpandaops/ethwallclock"
"github.com/go-co-op/gocron"
perrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand All @@ -40,6 +44,10 @@ type Default struct {

historicalSlotFailures map[phase0.Slot]int

servingMutex sync.Mutex
historicalMutex sync.Mutex
majorityMutex sync.Mutex

metrics *Metrics
}

Expand Down Expand Up @@ -72,6 +80,10 @@ func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.C
states: store.NewBeaconState(log, config.Caches.States, namespace),
depositSnapshots: store.NewDepositSnapshot(log, config.Caches.DepositSnapshots, namespace),

servingMutex: sync.Mutex{},
historicalMutex: sync.Mutex{},
majorityMutex: sync.Mutex{},

metrics: NewMetrics(namespace + "_beacon"),
}
}
Expand Down Expand Up @@ -100,17 +112,46 @@ func (d *Default) Start(ctx context.Context) error {
d.log.WithError(err).Fatal("Failed to start crons")
}

// Check for new serving checkpoints after each epoch
node, err := d.nodes.Healthy(ctx).NotSyncing(ctx).RandomNode(ctx)
if err != nil {
d.log.WithError(err).Fatal("Failed to get a healthy, non-syncing node to subscribe to wallclock events")
}

node.Beacon.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) {
// Sleep for a bit to allow the beacon nodes to run their epoch transition.
time.Sleep(time.Second * 30)

if err := d.checkForNewServingCheckpoint(ctx); err != nil {
d.log.WithError(err).Error("Failed to check for new serving checkpoint after epoch change")
}
})

break
}
}()

// Subscribe to the nodes' finality updates.
for _, node := range d.nodes {
node.Beacon.OnFinalityCheckpointUpdated(ctx, func(ctx context.Context, checkpoint *beacon.FinalityCheckpointUpdated) error {
// Check if we have a new majority finality.
if err := d.checkFinality(ctx); err != nil {
d.log.WithError(err).Error("Failed to check finality")

return err
}

return d.checkForNewServingCheckpoint(ctx)
})
}

return nil
}

func (d *Default) startCrons(ctx context.Context) error {
s := gocron.NewScheduler(time.Local)

if _, err := s.Every("5s").Do(func() {
if _, err := s.Every("30s").Do(func() {
if err := d.checkFinality(ctx); err != nil {
d.log.WithError(err).Error("Failed to check finality")
}
Expand Down Expand Up @@ -210,13 +251,17 @@ func (d *Default) startHistoricalLoop(ctx context.Context) error {
}

func (d *Default) startServingLoop(ctx context.Context) error {
if err := d.checkForNewServingCheckpoint(ctx); err != nil {
d.log.WithError(err).Error("Failed to check for serving checkpoint")
}

for {
select {
case <-time.After(time.Second * 1):
case <-time.After(time.Second * 5):
if err := d.checkForNewServingCheckpoint(ctx); err != nil {
d.log.WithError(err).Error("Failed to check for new serving checkpoint")

time.Sleep(time.Second * 30)
time.Sleep(time.Second * 15)
}
case <-ctx.Done():
return ctx.Err()
Expand All @@ -225,22 +270,44 @@ func (d *Default) startServingLoop(ctx context.Context) error {
}

func (d *Default) checkForNewServingCheckpoint(ctx context.Context) error {
d.servingMutex.Lock()
defer d.servingMutex.Unlock()

// Don't bother checking if we don't know the head yet.
if d.head == nil {
return nil
return errors.New("head finality is unknown")
}

if d.head.Finalized == nil {
return nil
return errors.New("head finalized checkpoint is unknown")
}

// If head == serving, we're done.
if d.servingBundle != nil && d.servingBundle.Finalized != nil && d.servingBundle.Finalized.Epoch == d.head.Finalized.Epoch {
return nil
logCtx := d.log.WithFields(logrus.Fields{
"head_epoch": d.head.Finalized.Epoch,
"head_root": fmt.Sprintf("%#x", d.head.Finalized.Root),
})

// If we don't have a serving bundle already, download one.
if d.servingBundle == nil {
logCtx.Info("No serving bundle available, downloading")

return d.downloadServingCheckpoint(ctx, d.head)
}

if err := d.downloadServingCheckpoint(ctx, d.head); err != nil {
return err
if d.servingBundle.Finalized == nil {
logCtx.Info("Serving bundle is unknown, downloading")

return d.downloadServingCheckpoint(ctx, d.head)
}

// If the head has moved on, download a new serving bundle.
if d.servingBundle.Finalized.Epoch != d.head.Finalized.Epoch {
logCtx.
WithField("serving_epoch", d.servingBundle.Finalized.Epoch).
WithField("serving_root", fmt.Sprintf("%#x", d.servingBundle.Finalized.Root)).
Info("Head finality has advanced, downloading new serving bundle")

return d.downloadServingCheckpoint(ctx, d.head)
}

return nil
Expand Down Expand Up @@ -336,6 +403,9 @@ func (d *Default) shouldDownloadStates() bool {
}

func (d *Default) checkFinality(ctx context.Context) error {
d.majorityMutex.Lock()
defer d.majorityMutex.Unlock()

aggFinality := []*v1.Finality{}
readyNodes := d.nodes.Ready(ctx)

Expand All @@ -350,19 +420,22 @@ func (d *Default) checkFinality(ctx context.Context) error {
aggFinality = append(aggFinality, finality)
}

Default, err := checkpoints.NewMajorityDecider().Decide(aggFinality)
majority, err := checkpoints.NewMajorityDecider().Decide(aggFinality)
if err != nil {
return err
return perrors.Wrap(err, "failed to decide majority finality")
}

if d.head == nil || d.head.Finalized == nil || d.head.Finalized.Root != Default.Finalized.Root {
d.head = Default
if d.head == nil || d.head.Finalized == nil || d.head.Finalized.Root != majority.Finalized.Root {
d.head = majority

d.publishFinalityCheckpointHeadUpdated(ctx, Default)
d.publishFinalityCheckpointHeadUpdated(ctx, majority)

d.log.WithField("epoch", Default.Finalized.Epoch).WithField("root", fmt.Sprintf("%#x", Default.Finalized.Root)).Info("New finalized head checkpoint")
d.log.
WithField("epoch", majority.Finalized.Epoch).
WithField("root", fmt.Sprintf("%#x", majority.Finalized.Root)).
Info("New finalized head checkpoint")

d.metrics.ObserveHeadEpoch(Default.Finalized.Epoch)
d.metrics.ObserveHeadEpoch(majority.Finalized.Epoch)
}

return nil
Expand Down
16 changes: 14 additions & 2 deletions pkg/beacon/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,31 @@ import (
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/checkpointz/pkg/eth"
perrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func (d *Default) downloadServingCheckpoint(ctx context.Context, checkpoint *v1.Finality) error {
if checkpoint == nil {
return errors.New("checkpoint is nil")
}

if checkpoint.Finalized == nil {
return errors.New("finalized checkpoint is nil")
}

upstream, err := d.nodes.
Ready(ctx).
DataProviders(ctx).
PastFinalizedCheckpoint(ctx, checkpoint). // Ensure we attempt to fetch the bundle from a node that knows about the checkpoint.
RandomNode(ctx)
if err != nil {
return err
return perrors.Wrap(err, "no data provider node available")
}

block, err := d.fetchBundle(ctx, checkpoint.Finalized.Root, upstream)
if err != nil {
return err
return perrors.Wrap(err, "failed to fetch bundle")
}

// Validate that everything is ok to serve.
Expand Down Expand Up @@ -117,6 +126,9 @@ func (d *Default) checkGenesis(ctx context.Context) error {
}

func (d *Default) fetchHistoricalCheckpoints(ctx context.Context, checkpoint *v1.Finality) error {
d.historicalMutex.Lock()
defer d.historicalMutex.Unlock()

if d.spec == nil {
return errors.New("beacon spec unavailable")
}
Expand Down
1 change: 0 additions & 1 deletion pkg/beacon/store/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func NewBlock(log logrus.FieldLogger, config Config, namespace string) *Block {
block, ok := value.(*spec.VersionedSignedBeaconBlock)
if !ok {
c.log.WithField("block_root", key).Error("Invalid block type when cleaning up block cache")

return
}

Expand Down

0 comments on commit 6139750

Please sign in to comment.