Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: use finalized block as the chain freeze indicator #28683

Merged
merged 6 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {

func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme string) {
// It's hard to follow the test case, visualize the input
//log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// fmt.Println(tt.dump(true))

// Create a temporary persistent database
Expand Down Expand Up @@ -1830,10 +1830,14 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
}
// Force run a freeze cycle
type freezer interface {
Freeze(threshold uint64) error
Freeze() error
Ancients() (uint64, error)
}
db.(freezer).Freeze(tt.freezeThreshold)
if tt.freezeThreshold < uint64(tt.canonicalBlocks) {
final := uint64(tt.canonicalBlocks) - tt.freezeThreshold
chain.SetFinalized(canonblocks[int(final)-1].Header())
}
db.(freezer).Freeze()

// Set the simulated pivot block
if tt.pivotBlock != nil {
Expand Down
8 changes: 6 additions & 2 deletions core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2044,10 +2044,14 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme

// Force run a freeze cycle
type freezer interface {
Freeze(threshold uint64) error
Freeze() error
Ancients() (uint64, error)
}
db.(freezer).Freeze(tt.freezeThreshold)
if tt.freezeThreshold < uint64(tt.canonicalBlocks) {
final := uint64(tt.canonicalBlocks) - tt.freezeThreshold
chain.SetFinalized(canonblocks[int(final)-1].Header())
}
db.(freezer).Freeze()

// Set the simulated pivot block
if tt.pivotBlock != nil {
Expand Down
113 changes: 70 additions & 43 deletions core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package rawdb

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -43,8 +43,6 @@ const (
// The background thread will keep moving ancient chain segments from key-value
// database to flat files for saving space on live database.
type chainFreezer struct {
threshold atomic.Uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)

*Freezer
quit chan struct{}
wg sync.WaitGroup
Expand All @@ -57,13 +55,11 @@ func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFre
if err != nil {
return nil, err
}
cf := chainFreezer{
return &chainFreezer{
Freezer: freezer,
quit: make(chan struct{}),
trigger: make(chan chan struct{}),
}
cf.threshold.Store(params.FullImmutabilityThreshold)
return &cf, nil
}, nil
}

// Close closes the chain freezer instance and terminates the background thread.
Expand All @@ -77,6 +73,57 @@ func (f *chainFreezer) Close() error {
return f.Freezer.Close()
}

// readHeadNumber returns the number of chain head block. 0 is returned if the
// block is unknown or not available yet.
func (f *chainFreezer) readHeadNumber(db ethdb.KeyValueReader) uint64 {
hash := ReadHeadBlockHash(db)
if hash == (common.Hash{}) {
log.Error("Head block is not reachable")
return 0
}
number := ReadHeaderNumber(db, hash)
if number == nil {
log.Error("Number of head block is missing")
return 0
}
return *number
}

// readFinalizedNumber returns the number of finalized block. 0 is returned
// if the block is unknown or not available yet.
func (f *chainFreezer) readFinalizedNumber(db ethdb.KeyValueReader) uint64 {
hash := ReadFinalizedBlockHash(db)
if hash == (common.Hash{}) {
return 0
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
}
number := ReadHeaderNumber(db, hash)
if number == nil {
log.Error("Number of finalized block is missing")
return 0
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
}
return *number
}

// freezeThreshold returns the threshold for chain freezing. It's determined
// by formula: max(finality, HEAD-params.FullImmutabilityThreshold).
func (f *chainFreezer) freezeThreshold(db ethdb.KeyValueReader) (uint64, error) {
var (
head = f.readHeadNumber(db)
final = f.readFinalizedNumber(db)
headLimit uint64
)
if head > params.FullImmutabilityThreshold {
headLimit = head - params.FullImmutabilityThreshold
}
if final == 0 && headLimit == 0 {
return 0, errors.New("freezing threshold is not available")
}
if final > headLimit {
return final, nil
}
return headLimit, nil
}

// freeze is a background thread that periodically checks the blockchain for any
// import progress and moves ancient data from the fast database into the freezer.
//
Expand Down Expand Up @@ -114,60 +161,39 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
return
}
}
// Retrieve the freezing threshold.
hash := ReadHeadBlockHash(nfdb)
if hash == (common.Hash{}) {
log.Debug("Current full block hash unavailable") // new chain, empty database
threshold, err := f.freezeThreshold(nfdb)
if err != nil {
backoff = true
log.Debug("Current full block not old enough to freeze", "err", err)
continue
}
number := ReadHeaderNumber(nfdb, hash)
threshold := f.threshold.Load()
frozen := f.frozen.Load()
switch {
case number == nil:
log.Error("Current full block number unavailable", "hash", hash)
backoff = true
continue

case *number < threshold:
log.Debug("Current full block not old enough to freeze", "number", *number, "hash", hash, "delay", threshold)
backoff = true
continue

case *number-threshold <= frozen:
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", frozen)
// Short circuit if the blocks below threshold are already frozen.
if frozen != 0 && frozen-1 >= threshold {
backoff = true
log.Debug("Ancient blocks frozen already", "threshold", threshold, "frozen", frozen)
continue
}
head := ReadHeader(nfdb, hash, *number)
if head == nil {
log.Error("Current full block unavailable", "number", *number, "hash", hash)
backoff = true
continue
}

// Seems we have data ready to be frozen, process in usable batches
var (
start = time.Now()
first, _ = f.Ancients()
limit = *number - threshold
start = time.Now()
first = frozen // the first block to freeze
last = threshold // the last block to freeze
)
if limit-first > freezerBatchLimit {
limit = first + freezerBatchLimit
if last-first+1 > freezerBatchLimit {
last = freezerBatchLimit + first - 1
}
ancients, err := f.freezeRange(nfdb, first, limit)
ancients, err := f.freezeRange(nfdb, first, last)
if err != nil {
log.Error("Error in block freeze operation", "err", err)
backoff = true
continue
}

// Batch of blocks have been frozen, flush them before wiping from leveldb
if err := f.Sync(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err)
}

// Wipe out all data from the active database
batch := db.NewBatch()
for i := 0; i < len(ancients); i++ {
Expand Down Expand Up @@ -250,8 +276,11 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
}
}

// freezeRange moves a batch of chain segments from the fast database to the freezer.
// The parameters (number, limit) specify the relevant block range, both of which
// are included.
func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
hashes = make([]common.Hash, 0, limit-number)
hashes = make([]common.Hash, 0, limit-number+1)

_, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for ; number <= limit; number++ {
Expand Down Expand Up @@ -293,11 +322,9 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash
if err := op.AppendRaw(ChainFreezerDifficultyTable, number, td); err != nil {
return fmt.Errorf("can't write td to Freezer: %v", err)
}

hashes = append(hashes, hash)
}
return nil
})

return hashes, err
}
8 changes: 1 addition & 7 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,10 @@ func (frdb *freezerdb) Close() error {
// Freeze is a helper method used for external testing to trigger and block until
// a freeze cycle completes, without having to sleep for a minute to trigger the
// automatic background run.
func (frdb *freezerdb) Freeze(threshold uint64) error {
func (frdb *freezerdb) Freeze() error {
if frdb.AncientStore.(*chainFreezer).readonly {
return errReadOnly
}
// Set the freezer threshold to a temporary value
defer func(old uint64) {
frdb.AncientStore.(*chainFreezer).threshold.Store(old)
}(frdb.AncientStore.(*chainFreezer).threshold.Load())
frdb.AncientStore.(*chainFreezer).threshold.Store(threshold)

// Trigger a freeze cycle and block until it's done
trigger := make(chan struct{}, 1)
frdb.AncientStore.(*chainFreezer).trigger <- trigger
Expand Down
Loading