Skip to content

Commit

Permalink
[EVM-703]: Saving events in boltdb should have retry mechanism (#1652)
Browse files Browse the repository at this point in the history
* Initial changes

* Exit events update

* Checkpoint manager change

* Event tracker error

* Lint fix

* Comments fix

* Comments fix

* Lint fix

* Comments fix

* Comments fix

* Comments fix
  • Loading branch information
goran-ethernal authored Jul 7, 2023
1 parent 2d35db6 commit ac9b449
Show file tree
Hide file tree
Showing 24 changed files with 462 additions and 227 deletions.
4 changes: 2 additions & 2 deletions command/bridge/exit/exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func run(cmd *cobra.Command, _ []string) {
}

outputter.SetCommandResult(&exitResult{
ID: strconv.FormatUint(exitEvent.ID, 10),
ID: strconv.FormatUint(exitEvent.ID.Uint64(), 10),
Sender: exitEvent.Sender.String(),
Receiver: exitEvent.Receiver.String(),
})
Expand All @@ -186,7 +186,7 @@ func createExitTxn(sender ethgo.Address, proof types.Proof) (*ethgo.Transaction,

var exitEventAPI contractsapi.L2StateSyncedEvent

exitEventEncoded, err := exitEventAPI.Encode(exitEvent)
exitEventEncoded, err := exitEventAPI.Encode(exitEvent.L2StateSyncedEvent)
if err != nil {
return nil, nil, fmt.Errorf("failed to encode exit event: %w", err)
}
Expand Down
105 changes: 52 additions & 53 deletions consensus/polybft/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,23 @@ type checkpointManager struct {
logger hclog.Logger
// state boltDb instance
state *State
// eventGetter gets exit events (missed or current) from blocks
eventGetter *eventsGetter[*ExitEvent]
}

// newCheckpointManager creates a new instance of checkpointManager
func newCheckpointManager(key ethgo.Key, checkpointOffset uint64,
checkpointManagerSC types.Address, txRelayer txrelayer.TxRelayer,
blockchain blockchainBackend, backend polybftBackend, logger hclog.Logger,
state *State) *checkpointManager {
retry := &eventsGetter[*ExitEvent]{
blockchain: blockchain,
isValidLogFn: func(l *types.Log) bool {
return l.Address == contracts.L2StateSenderContract
},
parseEventFn: parseExitEvent,
}

return &checkpointManager{
key: key,
blockchain: blockchain,
Expand All @@ -84,6 +94,7 @@ func newCheckpointManager(key ethgo.Key, checkpointOffset uint64,
checkpointManagerAddr: checkpointManagerSC,
logger: logger,
state: state,
eventGetter: retry,
}
}

Expand Down Expand Up @@ -273,30 +284,28 @@ func (c *checkpointManager) isCheckpointBlock(blockNumber uint64, isEpochEndingB
// PostBlock is called on every insert of finalized block (either from consensus or syncer)
// It will read any exit event that happened in block and insert it to state boltDb
func (c *checkpointManager) PostBlock(req *PostBlockRequest) error {
var (
epoch = req.Epoch
block = req.FullBlock.Block.Number()
)
block := req.FullBlock.Block.Number()

if req.IsEpochEndingBlock {
// exit events that happened in epoch ending blocks,
// should be added to the tree of the next epoch
epoch++
block++
lastBlock, err := c.state.CheckpointStore.getLastSaved()
if err != nil {
return fmt.Errorf("could not get last processed block for exit events. Error: %w", err)
}

// commit exit events only when we finalize a block
events, err := getExitEventsFromReceipts(epoch, block, req.FullBlock.Receipts)
exitEvents, err := c.eventGetter.getFromBlocks(lastBlock, req.FullBlock)
if err != nil {
return err
}

if len(events) > 0 {
c.logger.Debug("Gotten exit events from logs on block",
"eventsNum", len(events), "block", req.FullBlock.Block.Number())
sort.Slice(exitEvents, func(i, j int) bool {
// keep events in sequential order
return exitEvents[i].ID.Cmp(exitEvents[j].ID) < 0
})

if err := c.state.CheckpointStore.insertExitEvents(exitEvents); err != nil {
return err
}

if err := c.state.CheckpointStore.insertExitEvents(events); err != nil {
if err := c.state.CheckpointStore.updateLastSaved(block); err != nil {
return err
}

Expand Down Expand Up @@ -399,7 +408,7 @@ func (c *checkpointManager) GenerateExitProof(exitID uint64) (types.Proof, error

var exitEventAPI contractsapi.L2StateSyncedEvent

e, err := exitEventAPI.Encode(exitEvent)
e, err := exitEventAPI.Encode(exitEvent.L2StateSyncedEvent)
if err != nil {
return types.Proof{}, err
}
Expand Down Expand Up @@ -436,50 +445,14 @@ func (c *checkpointManager) GenerateExitProof(exitID uint64) (types.Proof, error
}, nil
}

// getExitEventsFromReceipts parses logs from receipts to find exit events
func getExitEventsFromReceipts(epoch, block uint64, receipts []*types.Receipt) ([]*ExitEvent, error) {
events := make([]*ExitEvent, 0)

for i := 0; i < len(receipts); i++ {
if receipts[i].Status == nil || *receipts[i].Status != types.ReceiptSuccess {
continue
}

for _, log := range receipts[i].Logs {
if log.Address != contracts.L2StateSenderContract {
continue
}

event, err := decodeExitEvent(convertLog(log), epoch, block)
if err != nil {
return nil, err
}

if event == nil {
// valid case, not an exit event
continue
}

events = append(events, event)
}
}

// enforce sequential order
sort.Slice(events, func(i, j int) bool {
return events[i].ID < events[j].ID
})

return events, nil
}

// createExitTree creates an exit event merkle tree from provided exit events
func createExitTree(exitEvents []*ExitEvent) (*merkle.MerkleTree, error) {
numOfEvents := len(exitEvents)
data := make([][]byte, numOfEvents)

var exitEventAPI contractsapi.L2StateSyncedEvent
for i := 0; i < numOfEvents; i++ {
b, err := exitEventAPI.Encode(exitEvents[i])
b, err := exitEventAPI.Encode(exitEvents[i].L2StateSyncedEvent)
if err != nil {
return nil, err
}
Expand All @@ -489,3 +462,29 @@ func createExitTree(exitEvents []*ExitEvent) (*merkle.MerkleTree, error) {

return merkle.NewMerkleTree(data)
}

// parseExitEvent parses exit event from provided log
func parseExitEvent(h *types.Header, l *ethgo.Log) (*ExitEvent, bool, error) {
extra, err := GetIbftExtra(h.ExtraData)
if err != nil {
return nil, false,
fmt.Errorf("could not get header extra on exit event parsing. Error: %w", err)
}

epoch := extra.Checkpoint.EpochNumber
block := h.Number

if extra.Validators != nil {
// exit events that happened in epoch ending blocks,
// should be added to the tree of the next epoch
epoch++
block++
}

event, err := decodeExitEvent(l, epoch, block)
if err != nil {
return nil, false, err
}

return event, true, nil
}
95 changes: 83 additions & 12 deletions consensus/polybft/checkpoint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,46 +296,117 @@ func TestCheckpointManager_PostBlock(t *testing.T) {

state := newTestState(t)

receipts := make([]*types.Receipt, numOfReceipts)
for i := 0; i < numOfReceipts; i++ {
receipts[i] = &types.Receipt{Logs: []*types.Log{
createTestLogForExitEvent(t, uint64(i)),
}}
receipts[i].SetStatus(types.ReceiptSuccess)
createReceipts := func(startID, endID uint64) []*types.Receipt {
receipts := make([]*types.Receipt, endID-startID)
for i := startID; i < endID; i++ {
receipts[i-startID] = &types.Receipt{Logs: []*types.Log{
createTestLogForExitEvent(t, i),
}}
receipts[i-startID].SetStatus(types.ReceiptSuccess)
}

return receipts
}

req := &PostBlockRequest{FullBlock: &types.FullBlock{Block: &types.Block{Header: &types.Header{Number: block}}, Receipts: receipts},
extra := &Extra{
Checkpoint: &CheckpointData{
EpochNumber: epoch,
},
}

req := &PostBlockRequest{FullBlock: &types.FullBlock{Block: &types.Block{Header: &types.Header{Number: block}}},
Epoch: epoch}

req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil)

blockchain := new(blockchainMock)
checkpointManager := newCheckpointManager(wallet.NewEcdsaSigner(createTestKey(t)), 5, types.ZeroAddress,
nil, nil, nil, hclog.NewNullLogger(), state)
nil, blockchain, nil, hclog.NewNullLogger(), state)

t.Run("PostBlock - not epoch ending block", func(t *testing.T) {
require.NoError(t, state.CheckpointStore.updateLastSaved(block-1)) // we got everything till the current block
req.IsEpochEndingBlock = false
req.FullBlock.Receipts = createReceipts(0, 5)
require.NoError(t, checkpointManager.PostBlock(req))

exitEvents, err := state.CheckpointStore.getExitEvents(epoch, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block
})

require.NoError(t, err)
require.Len(t, exitEvents, numOfReceipts)
require.Len(t, exitEvents, 5)
require.Equal(t, uint64(epoch), exitEvents[0].EpochNumber)
})

t.Run("PostBlock - epoch ending block (exit events are saved to the next epoch)", func(t *testing.T) {
require.NoError(t, state.CheckpointStore.updateLastSaved(block)) // we got everything till the current block
req.IsEpochEndingBlock = true
req.FullBlock.Receipts = createReceipts(5, 10)
extra.Validators = &validator.ValidatorSetDelta{}
req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil)
req.FullBlock.Block.Header.Number = block + 1

require.NoError(t, checkpointManager.PostBlock(req))

exitEvents, err := state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block+1
return exitEvent.BlockNumber == block+2 // they should be saved in the next epoch and its first block
})

require.NoError(t, err)
require.Len(t, exitEvents, numOfReceipts)
require.Equal(t, uint64(block+1), exitEvents[0].BlockNumber)
require.Len(t, exitEvents, 5)
require.Equal(t, uint64(block+2), exitEvents[0].BlockNumber)
require.Equal(t, uint64(epoch+1), exitEvents[0].EpochNumber)
})

t.Run("PostBlock - there are missing events", func(t *testing.T) {
require.NoError(t, state.CheckpointStore.updateLastSaved(block)) // we are missing one block

missedReceipts := createReceipts(10, 13)
newReceipts := createReceipts(13, 15)

extra := &Extra{
Checkpoint: &CheckpointData{
EpochNumber: epoch + 1,
},
}

blockchain.On("GetHeaderByNumber", uint64(block+1)).Return(&types.Header{
Number: block + 1,
ExtraData: extra.MarshalRLPTo(nil),
Hash: types.BytesToHash([]byte{0, 1, 2, 3}),
}, true)
blockchain.On("GetReceiptsByHash", types.BytesToHash([]byte{0, 1, 2, 3})).Return([]*types.Receipt{}, nil)
blockchain.On("GetHeaderByNumber", uint64(block+2)).Return(&types.Header{
Number: block + 2,
ExtraData: extra.MarshalRLPTo(nil),
Hash: types.BytesToHash([]byte{4, 5, 6, 7}),
}, true)
blockchain.On("GetReceiptsByHash", types.BytesToHash([]byte{4, 5, 6, 7})).Return(missedReceipts, nil)

req.IsEpochEndingBlock = false
req.FullBlock.Block.Header.Number = block + 3 // new block
req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil) // same epoch
req.FullBlock.Receipts = newReceipts
require.NoError(t, checkpointManager.PostBlock(req))

exitEvents, err := state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block+2
})

require.NoError(t, err)
// receipts from missed block + events from previous test case that were saved in the next epoch
// since they were in epoch ending block
require.Len(t, exitEvents, len(missedReceipts)+5)
require.Equal(t, extra.Checkpoint.EpochNumber, exitEvents[0].EpochNumber)

exitEvents, err = state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block+3
})

require.NoError(t, err)
require.Len(t, exitEvents, len(newReceipts))
require.Equal(t, extra.Checkpoint.EpochNumber, exitEvents[0].EpochNumber)
})
}

func TestCheckpointManager_BuildEventRoot(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion consensus/polybft/consensus_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ func encodeExitEvents(t *testing.T, exitEvents []*ExitEvent) [][]byte {

var exitEventAPI contractsapi.L2StateSyncedEvent
for i, e := range exitEvents {
encodedEvent, err := exitEventAPI.Encode(e)
encodedEvent, err := exitEventAPI.Encode(e.L2StateSyncedEvent)
require.NoError(t, err)

encodedEvents[i] = encodedEvent
Expand Down
11 changes: 11 additions & 0 deletions consensus/polybft/contractsapi/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package contractsapi

import (
"github.com/0xPolygon/polygon-edge/types"
"github.com/umbracle/ethgo"
"github.com/umbracle/ethgo/abi"
)

Expand All @@ -13,6 +14,16 @@ type StateTransactionInput interface {
DecodeAbi(b []byte) error
}

// EventAbi is an interface representing an event generated in contractsapi
type EventAbi interface {
// Sig returns the event ABI signature or ID (which is unique for all event types)
Sig() ethgo.Hash
// Encode does abi encoding of given event
Encode(inputs interface{}) ([]byte, error)
// ParseLog parses the provided receipt log to given event type
ParseLog(log *ethgo.Log) (bool, error)
}

var (
// stateSyncABIType is a specific case where we need to encode state sync event as a tuple of tuple
stateSyncABIType = abi.MustNewType(
Expand Down
4 changes: 0 additions & 4 deletions consensus/polybft/extra.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,5 @@ func GetIbftExtra(extraRaw []byte) (*Extra, error) {
return nil, err
}

if extra.Validators == nil {
extra.Validators = &validator.ValidatorSetDelta{}
}

return extra, nil
}
13 changes: 9 additions & 4 deletions consensus/polybft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ var (
"allowed in an epoch ending block")
errProposalDontMatch = errors.New("failed to insert proposal, because the validated proposal " +
"is either nil or it does not match the received one")
errValidatorSetDeltaMismatch = errors.New("validator set delta mismatch")
errValidatorsUpdateInNonEpochEnding = errors.New("trying to update validator set in a non epoch ending block")
errValidatorSetDeltaMismatch = errors.New("validator set delta mismatch")
errValidatorsUpdateInNonEpochEnding = errors.New("trying to update validator set in a non epoch ending block")
errValidatorDeltaNilInEpochEndingBlock = errors.New("validator set delta is nil in epoch ending block")
)

type fsm struct {
Expand Down Expand Up @@ -337,11 +338,15 @@ func (f *fsm) Validate(proposal []byte) error {

// validate validators delta
if f.isEndOfEpoch {
if extra.Validators == nil {
return errValidatorDeltaNilInEpochEndingBlock
}

if !extra.Validators.Equals(f.newValidatorsDelta) {
return errValidatorSetDeltaMismatch
}
} else if !extra.Validators.IsEmpty() {
// delta should be empty in non epoch ending blocks
} else if extra.Validators != nil {
// delta should be nil in non epoch ending blocks
return errValidatorsUpdateInNonEpochEnding
}

Expand Down
Loading

0 comments on commit ac9b449

Please sign in to comment.