Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-supervisor: store block-checkpoints per block, implement block iteration #11989

Merged
merged 5 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion op-e2e/interop/supersystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ func (s *interopE2ESystem) newL2(id string, l2Out *interopgen.L2Output) l2Set {

// prepareSupervisor creates a new supervisor for the system
func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
logger := s.logger.New("role", "supervisor")
// Be verbose with op-supervisor, it's in early test phase
logger := testlog.Logger(s.t, log.LevelDebug).New("role", "supervisor")
cfg := supervisorConfig.Config{
MetricsConfig: metrics.CLIConfig{
Enabled: false,
Expand Down
47 changes: 28 additions & 19 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ import (
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
Expand All @@ -19,9 +24,6 @@ import (
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)

type SupervisorBackend struct {
Expand Down Expand Up @@ -94,7 +96,7 @@ func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger,
if err != nil {
return fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
}
logDB, err := logs.NewFromFile(logger, cm, path)
logDB, err := logs.NewFromFile(logger, cm, path, true)
if err != nil {
return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
}
Expand Down Expand Up @@ -133,8 +135,9 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
if !su.started.CompareAndSwap(false, true) {
return errors.New("already started")
}
// initiate "Resume" on the chains db, which rewinds the database to the last block that is guaranteed to have been fully recorded
if err := su.db.Resume(); err != nil {
// initiate "ResumeFromLastSealedBlock" on the chains db,
// which rewinds the database to the last block that is guaranteed to have been fully recorded
if err := su.db.ResumeFromLastSealedBlock(); err != nil {
return fmt.Errorf("failed to resume chains db: %w", err)
}
// start chain monitors
Expand All @@ -144,8 +147,8 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
}
}
// start db maintenance loop
maintinenceCtx, cancel := context.WithCancel(context.Background())
su.db.StartCrossHeadMaintenance(maintinenceCtx)
maintenanceCtx, cancel := context.WithCancel(context.Background())
su.db.StartCrossHeadMaintenance(maintenanceCtx)
su.maintenanceCancel = cancel
return nil
}
Expand Down Expand Up @@ -188,13 +191,16 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa
chainID := identifier.ChainID
blockNum := identifier.BlockNumber
logIdx := identifier.LogIndex
ok, i, err := su.db.Check(chainID, blockNum, uint32(logIdx), backendTypes.TruncateHash(payloadHash))
if err != nil {
return types.Invalid, fmt.Errorf("failed to check log: %w", err)
i, err := su.db.Check(chainID, blockNum, uint32(logIdx), backendTypes.TruncateHash(payloadHash))
if errors.Is(err, logs.ErrFuture) {
return types.Unsafe, nil
}
if !ok {
if errors.Is(err, logs.ErrConflict) {
return types.Invalid, nil
}
if err != nil {
return types.Invalid, fmt.Errorf("failed to check log: %w", err)
}
safest := types.CrossUnsafe
// at this point we have the log entry, and we can check if it is safe by various criteria
for _, checker := range []db.SafetyChecker{
Expand Down Expand Up @@ -231,16 +237,19 @@ func (su *SupervisorBackend) CheckMessages(
// The block is considered safe if all logs in the block are safe
// this is decided by finding the last log in the block and
func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) {
// TODO(#11612): this function ignores blockHash and assumes that the block in the db is the one we are looking for
// In order to check block hash, the database must *always* insert a block hash checkpoint, which is not currently done
safest := types.CrossUnsafe
// find the last log index in the block
i, err := su.db.LastLogInBlock(types.ChainID(*chainID), uint64(blockNumber))
// TODO(#11836) checking for EOF as a non-error case is a bit of a code smell
// and could potentially be incorrect if the given block number is intentionally far in the future
if err != nil && !errors.Is(err, io.EOF) {
id := eth.BlockID{Hash: blockHash, Number: uint64(blockNumber)}
i, err := su.db.FindSealedBlock(types.ChainID(*chainID), id)
if errors.Is(err, logs.ErrFuture) {
return types.Unsafe, nil
}
if errors.Is(err, logs.ErrConflict) {
return types.Invalid, nil
}
if err != nil {
su.logger.Error("failed to scan block", "err", err)
return types.Invalid, fmt.Errorf("failed to scan block: %w", err)
return "", err
}
// at this point we have the extent of the block, and we can check if it is safe by various criteria
for _, checker := range []db.SafetyChecker{
Expand Down
135 changes: 71 additions & 64 deletions op-supervisor/supervisor/backend/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"io"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
)

var (
Expand All @@ -22,16 +24,32 @@ var (

type LogStorage interface {
io.Closer
AddLog(logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error

AddLog(logHash backendTypes.TruncatedHash, parentBlock eth.BlockID,
logIdx uint32, execMsg *backendTypes.ExecutingMessage) error

SealBlock(parentHash common.Hash, block eth.BlockID, timestamp uint64) error

Rewind(newHeadBlockNum uint64) error
LatestBlockNum() uint64
ClosestBlockInfo(blockNum uint64) (uint64, backendTypes.TruncatedHash, error)
ClosestBlockIterator(blockNum uint64) (logs.Iterator, error)
Contains(blockNum uint64, logIdx uint32, loghash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error)
LastCheckpointBehind(entrydb.EntryIdx) (logs.Iterator, error)
NextExecutingMessage(logs.Iterator) (backendTypes.ExecutingMessage, error)

LatestSealedBlockNum() (n uint64, ok bool)

// FindSealedBlock finds the requested block, to check if it exists,
// returning the next index after it where things continue from.
// returns ErrFuture if the block is too new to be able to tell
// returns ErrDifferent if the known block does not match
FindSealedBlock(block eth.BlockID) (nextEntry entrydb.EntryIdx, err error)

IteratorStartingAt(i entrydb.EntryIdx) (logs.Iterator, error)

// returns ErrConflict if the log does not match the canonical chain.
// returns ErrFuture if the log is out of reach.
// returns nil if the log is known and matches the canonical chain.
Contains(blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) (nextIndex entrydb.EntryIdx, err error)
}

var _ LogStorage = (*logs.DB)(nil)

type HeadsStorage interface {
Current() *heads.Heads
Apply(op heads.Operation) error
Expand Down Expand Up @@ -62,14 +80,20 @@ func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) {
db.logDBs[chain] = logDB
}

// Resume prepares the chains db to resume recording events after a restart.
// It rewinds the database to the last block that is guaranteed to have been fully recorded to the database
// ResumeFromLastSealedBlock prepares the chains db to resume recording events after a restart.
// It rewinds the database to the last block that is guaranteed to have been fully recorded to the database,
// to ensure it can resume recording from the first log of the next block.
// TODO(#11793): we can rename this to something more descriptive like "PrepareWithRollback"
func (db *ChainsDB) Resume() error {
func (db *ChainsDB) ResumeFromLastSealedBlock() error {
for chain, logStore := range db.logDBs {
if err := Resume(logStore); err != nil {
return fmt.Errorf("failed to resume chain %v: %w", chain, err)
headNum, ok := logStore.LatestSealedBlockNum()
if ok {
// db must be empty, nothing to rewind to
db.logger.Info("Resuming, but found no DB contents", "chain", chain)
continue
}
db.logger.Info("Resuming, starting from last sealed block", "head", headNum)
if err := logStore.Rewind(headNum); err != nil {
return fmt.Errorf("failed to rewind chain %s to sealed block %d", chain, headNum)
}
}
return nil
Expand Down Expand Up @@ -101,10 +125,10 @@ func (db *ChainsDB) StartCrossHeadMaintenance(ctx context.Context) {
}

// Check calls the underlying logDB to determine if the given log entry is safe with respect to the checker's criteria.
func (db *ChainsDB) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error) {
func (db *ChainsDB) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) (entrydb.EntryIdx, error) {
logDB, ok := db.logDBs[chain]
if !ok {
return false, 0, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
return 0, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.Contains(blockNum, logIdx, logHash)
}
Expand Down Expand Up @@ -146,7 +170,7 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe
// advance as far as the local head
localHead := checker.LocalHeadForChain(chainID)
// get an iterator for the last checkpoint behind the x-head
i, err := db.logDBs[chainID].LastCheckpointBehind(xHead)
iter, err := db.logDBs[chainID].IteratorStartingAt(xHead)
if err != nil {
return fmt.Errorf("failed to rewind cross-safe head for chain %v: %w", chainID, err)
}
Expand All @@ -158,16 +182,21 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe
// - when we reach a message that is not safe
// - if an error occurs
for {
exec, err := db.logDBs[chainID].NextExecutingMessage(i)
if err == io.EOF {
if err := iter.NextExecMsg(); err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("failed to read next executing message for chain %v: %w", chainID, err)
}
// if we are now beyond the local head, stop
if i.Index() > localHead {
// if we would exceed the local head, then abort
if iter.NextIndex() > localHead {
xHead = localHead // clip to local head
updated = localHead != xHead
break
}
exec := iter.ExecMessage()
if exec == nil {
panic("expected executing message after traversing to one without error")
}
// use the checker to determine if this message is safe
safe := checker.Check(
types.ChainIDFromUInt64(uint64(exec.Chain)),
Expand All @@ -178,10 +207,9 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe
break
}
// if all is well, prepare the x-head update to this point
xHead = i.Index()
xHead = iter.NextIndex()
updated = true
}

// have the checker create an update to the x-head in question, and apply that update
err = db.heads.Apply(checker.Update(chainID, xHead))
if err != nil {
Expand All @@ -191,8 +219,10 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe
// this allows for the maintenance loop to handle cascading updates
// instead of waiting for the next scheduled update
if updated {
db.logger.Debug("heads were updated, requesting maintenance")
db.logger.Info("Promoting cross-head", "head", xHead, "safety-level", checker.SafetyLevel())
db.RequestMaintenance()
} else {
db.logger.Info("No cross-head update", "head", xHead, "safety-level", checker.SafetyLevel())
}
return nil
}
Expand All @@ -210,62 +240,39 @@ func (db *ChainsDB) UpdateCrossHeads(checker SafetyChecker) error {
return nil
}

// LastLogInBlock scans through the logs of the given chain starting from the given block number,
// and returns the index of the last log entry in that block.
func (db *ChainsDB) LastLogInBlock(chain types.ChainID, blockNum uint64) (entrydb.EntryIdx, error) {
func (db *ChainsDB) FindSealedBlock(chain types.ChainID, block eth.BlockID) (nextEntry entrydb.EntryIdx, err error) {
logDB, ok := db.logDBs[chain]
if !ok {
return 0, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
iter, err := logDB.ClosestBlockIterator(blockNum)
if err != nil {
return 0, fmt.Errorf("failed to get block iterator for chain %v: %w", chain, err)
}
ret := entrydb.EntryIdx(0)
// scan through using the iterator until the block number exceeds the target
for {
bn, index, _, err := iter.NextLog()
// if we have reached the end of the database, stop
if err == io.EOF {
break
}
// all other errors are fatal
if err != nil {
return 0, fmt.Errorf("failed to read next log entry for chain %v: %w", chain, err)
}
// if we are now beyond the target block, stop withour updating the return value
if bn > blockNum {
break
}
// only update the return value if the block number is the same
// it is possible the iterator started before the target block, or that the target block is not in the db
if bn == blockNum {
ret = entrydb.EntryIdx(index)
}
}
// if we never found the block, return an error
if ret == 0 {
return 0, fmt.Errorf("block %v not found in chain %v", blockNum, chain)
}
return ret, nil
return logDB.FindSealedBlock(block)
}

// LatestBlockNum returns the latest block number that has been recorded to the logs db
// LatestBlockNum returns the latest fully-sealed block number that has been recorded to the logs db
// for the given chain. It does not contain safety guarantees.
func (db *ChainsDB) LatestBlockNum(chain types.ChainID) uint64 {
// The block number might not be available (empty database, or non-existent chain).
func (db *ChainsDB) LatestBlockNum(chain types.ChainID) (num uint64, ok bool) {
logDB, knownChain := db.logDBs[chain]
if !knownChain {
return 0, false
}
return logDB.LatestSealedBlockNum()
}

func (db *ChainsDB) SealBlock(chain types.ChainID, parentHash common.Hash, block eth.BlockID, timestamp uint64) error {
logDB, ok := db.logDBs[chain]
if !ok {
return 0
return fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.LatestBlockNum()
return logDB.SealBlock(parentHash, block, timestamp)
}

func (db *ChainsDB) AddLog(chain types.ChainID, logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error {
func (db *ChainsDB) AddLog(chain types.ChainID, logHash backendTypes.TruncatedHash, parentBlock eth.BlockID, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error {
logDB, ok := db.logDBs[chain]
if !ok {
return fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.AddLog(logHash, block, timestamp, logIdx, execMsg)
return logDB.AddLog(logHash, parentBlock, logIdx, execMsg)
}

func (db *ChainsDB) Rewind(chain types.ChainID, headBlockNum uint64) error {
Expand Down
Loading