Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/downloader: remove stale beacon headers as backfilling progresses #24670

Merged
merged 4 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type beaconBackfiller struct {
syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
success func() // Callback to run on successful sync cycle completion
filling bool // Flag whether the downloader is backfilling or not
filled *types.Header // Last header filled by the last terminated sync loop
started chan struct{} // Notification channel whether the downloader inited
lock sync.Mutex // Mutex protecting the sync lock
}
Expand All @@ -48,16 +49,18 @@ func newBeaconBackfiller(dl *Downloader, success func()) backfiller {
}
}

// suspend cancels any background downloader threads.
func (b *beaconBackfiller) suspend() {
// suspend cancels any background downloader threads and returns the last header
// that has been successfully backfilled.
func (b *beaconBackfiller) suspend() *types.Header {
// If no filling is running, don't waste cycles
b.lock.Lock()
filling := b.filling
filled := b.filled
started := b.started
b.lock.Unlock()

if !filling {
return
return filled // Return the filled header on the previous sync completion
}
// A previous filling should be running, though it may happen that it hasn't
// yet started (being done on a new goroutine). Many concurrent beacon head
Expand All @@ -69,6 +72,10 @@ func (b *beaconBackfiller) suspend() {
// Now that we're sure the downloader successfully started up, we can cancel
// it safely without running the risk of data races.
b.downloader.Cancel()

// Sync cycle was just terminated, retrieve and return the last filled header.
// Can't use `filled` as that contains a stale value from before cancellation.
return b.downloader.blockchain.CurrentFastBlock().Header()
}

// resume starts the downloader threads for backfilling state and chain data.
Expand All @@ -81,6 +88,7 @@ func (b *beaconBackfiller) resume() {
return
}
b.filling = true
b.filled = nil
b.started = make(chan struct{})
mode := b.syncMode
b.lock.Unlock()
Expand All @@ -92,6 +100,7 @@ func (b *beaconBackfiller) resume() {
defer func() {
b.lock.Lock()
b.filling = false
b.filled = b.downloader.blockchain.CurrentFastBlock().Header()
b.lock.Unlock()
}()
// If the downloader fails, report an error as in beacon chain mode there
Expand Down
142 changes: 132 additions & 10 deletions eth/downloader/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package downloader
import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"sort"
"time"
Expand Down Expand Up @@ -148,11 +149,15 @@ type backfiller interface {
// based on the skeleton chain as it might be invalid. The backfiller should
// gracefully handle multiple consecutive suspends without a resume, even
// on initial sartup.
suspend()
//
// The method should return the last block header that has been successfully
// backfilled, or nil if the backfiller was not resumed.
suspend() *types.Header

// resume requests the backfiller to start running fill or snap sync based on
// the skeleton chain as it has successfully been linked. Appending new heads
// to the end of the chain will not result in suspend/resume cycles.
// leaking too much sync logic out to the filler.
resume()
}

Expand Down Expand Up @@ -358,8 +363,17 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
if linked {
s.filler.resume()
}
defer s.filler.suspend()

defer func() {
if filled := s.filler.suspend(); filled != nil {
// If something was filled, try to delete stale sync helpers. If
// unsuccessful, warn the user, but not much else we can do (it's
// a programming error, just let users report an issue and don't
// choke in the meantime).
if err := s.cleanStales(filled); err != nil {
log.Error("Failed to clean stale beacon headers", "err", err)
}
}
}()
// Create a set of unique channels for this sync cycle. We need these to be
// ephemeral so a data race doesn't accidentally deliver something stale on
// a persistent channel across syncs (yup, this happened)
Expand Down Expand Up @@ -582,8 +596,16 @@ func (s *skeleton) processNewHead(head *types.Header, force bool) bool {

lastchain := s.progress.Subchains[0]
if lastchain.Tail >= number {
// If the chain is down to a single beacon header, and it is re-announced
// once more, ignore it instead of tearing down sync for a noop.
if lastchain.Head == lastchain.Tail {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear to me..

So we have the lastchain whose head is equal to tail(it means we haven't fetched any skeleton header yet in this cycle) and we get a lower point announcement but the corresponding skeleton is already existent.

Any particular reason for specialize this scenario?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a different scenario:

  • We start beacon sync, retrieve all the headers
  • Start backfilling and completely sync up
  • Delete all the beacon headers as they've already been used

At this point we in theory should delete all the headers, but the invariant is that there's at least 1 beacon subchain with at least 1 header (there's no special casing for empty subchains or no subchains). This means that if we say synced to block 1000, we'll have deleted headesr 0-999, but still keep the 1000th, even though we already backfilled it.

Now, it can happen that at this point, the beacon client sends us a forkchoice update to this exact header. (e.g. we received newPayload with header 1000, processed it via sync and then get the forckchoice update later).

Without this special casing, the sync would "error out" with a beacon chain reorg. It would spin back up and download a lot o headers uselessly. This check just ensures that if we want to sethead to the current head, there's no sync failure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I get it. So you want to specialize the case: the newly announced header is the head of lastchain and the hash is same.

It makes sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, yesterday I still managed to hit the reorg warnings somehow, so it might not be completely correct. But couldn't repro it with higher logs the second time around.

if current := rawdb.ReadSkeletonHeader(s.db, number); current.Hash() == head.Hash() {
return false
}
}
// Not a noop / double head announce, abort with a reorg
if force {
log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "newHead", number)
log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "head", lastchain.Head, "newHead", number)
}
return true
}
Expand Down Expand Up @@ -943,12 +965,44 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
// If the beacon chain was linked to the local chain, completely swap out
// all internal progress and abort header synchronization.
if linked {
// Note, linking into the local chain should also mean that there are
// no leftover subchains, but just in case there's some junk due to
// strange conditions or bugs, clean up all internal state.
if len(s.progress.Subchains) > 1 {
log.Error("Cleaning up leftovers after beacon link")
s.progress.Subchains = s.progress.Subchains[:1]
// Linking into the local chain should also mean that there are no
// leftover subchains, but in the case of importing the blocks via
// the engine API, we will not push the subchains forward. This will
// lead to a gap between an old sync cycle and a future one.
if subchains := len(s.progress.Subchains); subchains > 1 {
switch {
// If there are only 2 subchains - the current one and an older
// one - and the old one consists of a single block, then it's
// the expected new sync cycle after some propagated blocks. Log
// it for debugging purposes, explicitly clean and don't escalate.
case subchains == 2 && s.progress.Subchains[1].Head == s.progress.Subchains[1].Tail:
log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head)
rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head)
s.progress.Subchains = s.progress.Subchains[:1]

// If we have more than one header or more than one leftover chain,
// the syncer's internal state is corrupted. Do try to fix it, but
// be very vocal about the fault.
default:
var context []interface{}

for i := range s.progress.Subchains[1:] {
context = append(context, fmt.Sprintf("stale_head_%d", i+1))
context = append(context, s.progress.Subchains[i+1].Head)
context = append(context, fmt.Sprintf("stale_tail_%d", i+1))
context = append(context, s.progress.Subchains[i+1].Tail)
context = append(context, fmt.Sprintf("stale_next_%d", i+1))
context = append(context, s.progress.Subchains[i+1].Next)
}
log.Error("Cleaning spurious beacon sync leftovers", context...)
s.progress.Subchains = s.progress.Subchains[:1]

// Note, here we didn't actually delete the headers at all,
// just the metadata. We could implement a cleanup mechanism,
// but further modifying corrupted state is kind of asking
// for it. Unless there's a good enough reason to risk it,
// better to live with the small database junk.
}
}
break
}
Expand Down Expand Up @@ -1023,6 +1077,74 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
return linked, merged
}

// cleanStales removes previously synced beacon headers that have become stale
// due to the downloader backfilling past the tracked tail.
func (s *skeleton) cleanStales(filled *types.Header) error {
number := filled.Number.Uint64()
log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash())

// If the filled header is below the linked subchain, something's
// corrupted internally. Report and error and refuse to do anything.
if number < s.progress.Subchains[0].Tail {
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail)
}
// Subchain seems trimmable, push the tail forward up to the last
// filled header and delete everything before it - if available. In
// case we filled past the head, recreate the subchain with a new
// head to keep it consistent with the data on disk.
var (
start = s.progress.Subchains[0].Tail // start deleting from the first known header
end = number // delete until the requested threshold
)
s.progress.Subchains[0].Tail = number
s.progress.Subchains[0].Next = filled.ParentHash

if s.progress.Subchains[0].Head < number {
// If more headers were filled than available, push the entire
// subchain forward to keep tracking the node's block imports
end = s.progress.Subchains[0].Head + 1 // delete the entire original range, including the head
s.progress.Subchains[0].Head = number // assign a new head (tail is already assigned to this)
}
// Execute the trimming and the potential rewiring of the progress
batch := s.db.NewBatch()

if end != number {
// The entire original skeleton chain was deleted and a new one
// defined. Make sure the new single-header chain gets pushed to
// disk to keep internal state consistent.
rawdb.WriteSkeletonHeader(batch, filled)
}
s.saveSyncStatus(batch)
for n := start; n < end; n++ {
// If the batch grew too big, flush it and continue with a new batch.
// The catch is that the sync metadata needs to reflect the actually
// flushed state, so temporarily change the subchain progress and
// revert after the flush.
if batch.ValueSize() >= ethdb.IdealBatchSize {
tmpTail := s.progress.Subchains[0].Tail
tmpNext := s.progress.Subchains[0].Next

s.progress.Subchains[0].Tail = n
s.progress.Subchains[0].Next = rawdb.ReadSkeletonHeader(s.db, n).ParentHash
s.saveSyncStatus(batch)

if err := batch.Write(); err != nil {
log.Crit("Failed to write beacon trim data", "err", err)
}
batch.Reset()

s.progress.Subchains[0].Tail = tmpTail
s.progress.Subchains[0].Next = tmpNext
s.saveSyncStatus(batch)
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
}
rawdb.DeleteSkeletonHeader(batch, n)
}
if err := batch.Write(); err != nil {
log.Crit("Failed to write beacon trim data", "err", err)
}
return nil
}

// Bounds retrieves the current head and tail tracked by the skeleton syncer.
// This method is used by the backfiller, whose life cycle is controlled by the
// skeleton syncer.
Expand Down
6 changes: 3 additions & 3 deletions eth/downloader/skeleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ func newHookedBackfiller() backfiller {
// based on the skeleton chain as it might be invalid. The backfiller should
// gracefully handle multiple consecutive suspends without a resume, even
// on initial sartup.
func (hf *hookedBackfiller) suspend() {
func (hf *hookedBackfiller) suspend() *types.Header {
if hf.suspendHook != nil {
hf.suspendHook()
}
return nil // we don't really care about header cleanups for now
}

// resume requests the backfiller to start running fill or snap sync based on
Expand Down Expand Up @@ -426,7 +427,6 @@ func TestSkeletonSyncExtend(t *testing.T) {
newstate: []*subchain{
{Head: 49, Tail: 49},
},
err: errReorgDenied,
},
// Initialize a sync and try to extend it with a sibling block.
{
Expand Down Expand Up @@ -489,7 +489,7 @@ func TestSkeletonSyncExtend(t *testing.T) {

<-wait
if err := skeleton.Sync(tt.extend, false); err != tt.err {
t.Errorf("extension failure mismatch: have %v, want %v", err, tt.err)
t.Errorf("test %d: extension failure mismatch: have %v, want %v", i, err, tt.err)
}
skeleton.Terminate()

Expand Down