diff --git a/go.mod b/go.mod index 6148f70b865..18d17b45c80 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/pressly/goose/v3 v3.7.0 github.com/prometheus/client_golang v1.14.0 github.com/sirupsen/logrus v1.9.0 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.8.4 github.com/testcontainers/testcontainers-go v0.15.0 golang.org/x/sync v0.1.0 gopkg.in/go-playground/assert.v1 v1.2.1 @@ -86,6 +86,7 @@ require ( github.com/prometheus/common v0.39.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect diff --git a/go.sum b/go.sum index 14078b2b373..e298e030acd 100644 --- a/go.sum +++ b/go.sum @@ -861,6 +861,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -871,6 +873,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= diff --git a/packages/eventindexer/event.go b/packages/eventindexer/event.go index 8ad022c0230..66fd1a68ad4 100644 --- a/packages/eventindexer/event.go +++ b/packages/eventindexer/event.go @@ -2,6 +2,7 @@ package eventindexer import ( "context" + "database/sql" "math/big" "gorm.io/datatypes" @@ -23,6 +24,7 @@ type Event struct { ChainID int64 `json:"chainID"` Event string `json:"event"` Address string `json:"address"` + BlockID sql.NullInt64 `json:"blockID"` } // SaveEventOpts @@ -32,6 +34,7 @@ type SaveEventOpts struct { ChainID *big.Int Event string Address string + BlockID *int64 } type UniqueProversResponse struct { @@ -53,5 +56,13 @@ type EventRepository interface { FindUniqueProposers( ctx context.Context, ) ([]UniqueProposersResponse, error) + FindByEventTypeAndBlockID( + ctx context.Context, + eventType string, + blockID int64) (*Event, error) + Delete( + ctx context.Context, + id int, + ) error GetCountByAddressAndEventName(ctx context.Context, address string, event string) (int, error) } diff --git a/packages/eventindexer/indexer/detect_and_handle_reorg.go b/packages/eventindexer/indexer/detect_and_handle_reorg.go new file mode 100644 index 00000000000..bc61ee7807f --- /dev/null +++ b/packages/eventindexer/indexer/detect_and_handle_reorg.go @@ -0,0 +1,24 @@ +package indexer + +import ( + "context" + + "github.com/pkg/errors" +) + +func (svc *Service) detectAndHandleReorg(ctx context.Context, event string, blockID int64) error { + existingEvent, err := svc.eventRepo.FindByEventTypeAndBlockID(ctx, event, blockID) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.FindByEventTypeAndBlockID") + } + + if existingEvent != nil { + // reorg detected + err := svc.eventRepo.Delete(ctx, existingEvent.ID) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.Delete") + } + } + + return nil +} diff --git a/packages/eventindexer/indexer/save_block_proposed_event.go b/packages/eventindexer/indexer/save_block_proposed_event.go index ecf37d1814d..32c0bf46727 100644 --- a/packages/eventindexer/indexer/save_block_proposed_event.go +++ b/packages/eventindexer/indexer/save_block_proposed_event.go @@ -25,8 +25,8 @@ func (svc *Service) saveBlockProposedEvents( for { event := events.Event - if event.Raw.Removed { - continue + if err := svc.detectAndHandleReorg(ctx, eventindexer.EventNameBlockProposed, event.Id.Int64()); err != nil { + return errors.Wrap(err, "svc.detectAndHandleReorg") } tx, _, err := svc.ethClient.TransactionByHash(ctx, event.Raw.TxHash) @@ -66,12 +66,15 @@ func (svc *Service) saveBlockProposedEvent( return errors.Wrap(err, "json.Marshal(event)") } + blockID := event.Id.Int64() + _, err = svc.eventRepo.Save(ctx, eventindexer.SaveEventOpts{ Name: eventindexer.EventNameBlockProposed, Data: string(marshaled), ChainID: chainID, Event: eventindexer.EventNameBlockProposed, Address: sender.Hex(), + BlockID: &blockID, }) if err != nil { return errors.Wrap(err, "svc.eventRepo.Save") diff --git a/packages/eventindexer/indexer/save_block_proven_event.go b/packages/eventindexer/indexer/save_block_proven_event.go index f76ee379de9..5e5b78d50ac 100644 --- a/packages/eventindexer/indexer/save_block_proven_event.go +++ b/packages/eventindexer/indexer/save_block_proven_event.go @@ -30,8 +30,8 @@ func (svc *Service) saveBlockProvenEvents( for { event := events.Event - if event.Raw.Removed { - continue + if err := svc.detectAndHandleReorg(ctx, eventindexer.EventNameBlockProven, event.Id.Int64()); err != nil { + return errors.Wrap(err, "svc.detectAndHandleReorg") } log.Infof("blockProven by: %v", event.Prover.Hex()) @@ -60,12 +60,15 @@ func (svc *Service) saveBlockProvenEvent( return errors.Wrap(err, "json.Marshal(event)") } + blockID := event.Id.Int64() + _, err = svc.eventRepo.Save(ctx, eventindexer.SaveEventOpts{ Name: eventindexer.EventNameBlockProven, Data: string(marshaled), ChainID: chainID, Event: eventindexer.EventNameBlockProven, Address: event.Prover.Hex(), + BlockID: &blockID, }) if err != nil { return errors.Wrap(err, "svc.eventRepo.Save") diff --git a/packages/eventindexer/indexer/save_block_verified_event.go b/packages/eventindexer/indexer/save_block_verified_event.go index b56eff36a3f..cc930013f89 100644 --- a/packages/eventindexer/indexer/save_block_verified_event.go +++ b/packages/eventindexer/indexer/save_block_verified_event.go @@ -26,8 +26,8 @@ func (svc *Service) saveBlockVerifiedEvents( log.Infof("new blockVerified event, blockId: %v", event.Id) - if event.Raw.Removed { - continue + if err := svc.detectAndHandleReorg(ctx, eventindexer.EventNameBlockVerified, event.Id.Int64()); err != nil { + return errors.Wrap(err, "svc.detectAndHandleReorg") } if err := svc.saveBlockVerifiedEvent(ctx, chainID, event); err != nil { @@ -52,12 +52,15 @@ func (svc *Service) saveBlockVerifiedEvent( return errors.Wrap(err, "json.Marshal(event)") } + blockID := event.Id.Int64() + _, err = svc.eventRepo.Save(ctx, eventindexer.SaveEventOpts{ Name: eventindexer.EventNameBlockVerified, Data: string(marshaled), ChainID: chainID, Event: eventindexer.EventNameBlockVerified, Address: "", + BlockID: &blockID, }) if err != nil { return errors.Wrap(err, "svc.eventRepo.Save") diff --git a/packages/eventindexer/migrations/1666650599_create_events_table.sql b/packages/eventindexer/migrations/1666650599_create_events_table.sql index 36aec4c6e47..99af8917801 100644 --- a/packages/eventindexer/migrations/1666650599_create_events_table.sql +++ b/packages/eventindexer/migrations/1666650599_create_events_table.sql @@ -7,6 +7,7 @@ CREATE TABLE IF NOT EXISTS events ( chain_id int NOT NULL, data JSON NOT NULL, address VARCHAR(255) NOT NULL DEFAULT "", + block_id int DEFAULT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP , updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); diff --git a/packages/eventindexer/mock/event_repository.go b/packages/eventindexer/mock/event_repository.go index 9c9fe9a34c9..6e274228637 100644 --- a/packages/eventindexer/mock/event_repository.go +++ b/packages/eventindexer/mock/event_repository.go @@ -6,6 +6,7 @@ import ( "github.com/taikoxyz/taiko-mono/packages/eventindexer" "gorm.io/datatypes" + "gorm.io/gorm" ) type EventRepository struct { @@ -57,3 +58,29 @@ func (r *EventRepository) GetCountByAddressAndEventName( return count, nil } + +func (r *EventRepository) FindByEventTypeAndBlockID( + ctx context.Context, + eventType string, + blockID int64) (*eventindexer.Event, error) { + for _, e := range r.events { + if e.Event == eventType && e.BlockID.Int64 == blockID { + return e, nil + } + } + + return nil, gorm.ErrRecordNotFound +} + +func (r *EventRepository) Delete( + ctx context.Context, + id int, +) error { + for i, e := range r.events { + if e.ID == id { + r.events = append(r.events[:i], r.events[i+1:]...) + } + } + + return nil +} diff --git a/packages/eventindexer/repo/event.go b/packages/eventindexer/repo/event.go index fd877465fdd..2ce163a4d54 100644 --- a/packages/eventindexer/repo/event.go +++ b/packages/eventindexer/repo/event.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" "github.com/taikoxyz/taiko-mono/packages/eventindexer" "gorm.io/datatypes" + "gorm.io/gorm" ) type EventRepository struct { @@ -38,6 +39,34 @@ func (r *EventRepository) Save(ctx context.Context, opts eventindexer.SaveEventO return e, nil } +func (r *EventRepository) FindByEventTypeAndBlockID( + ctx context.Context, + eventType string, + blockID int64) (*eventindexer.Event, error) { + e := &eventindexer.Event{} + + if err := r.db.GormDB(). + Where("event = ?", eventType). + Where("block_id = ?", blockID).First(e).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return nil, nil + } + + return nil, err + } + + return e, nil +} + +func (r *EventRepository) Delete( + ctx context.Context, + id int, +) error { + e := &eventindexer.Event{} + + return r.db.GormDB().Delete(e, id).Error +} + func (r *EventRepository) FindUniqueProvers( ctx context.Context, ) ([]eventindexer.UniqueProversResponse, error) { diff --git a/packages/eventindexer/repo/event_test.go b/packages/eventindexer/repo/event_test.go index 1bdfa17de8a..62b69010a94 100644 --- a/packages/eventindexer/repo/event_test.go +++ b/packages/eventindexer/repo/event_test.go @@ -6,17 +6,19 @@ import ( "testing" "github.com/davecgh/go-spew/spew" + "github.com/stretchr/testify/assert" "github.com/taikoxyz/taiko-mono/packages/eventindexer" - "gotest.tools/assert" ) var ( - dummyProveEventOpts = eventindexer.SaveEventOpts{ + blockID int64 = 1 + dummyProveEventOpts = eventindexer.SaveEventOpts{ Name: eventindexer.EventNameBlockProven, Address: "0x123", Data: "{\"data\":\"something\"}", Event: eventindexer.EventNameBlockProven, ChainID: big.NewInt(1), + BlockID: &blockID, } dummyProposeEventOpts = eventindexer.SaveEventOpts{ Name: eventindexer.EventNameBlockProposed, @@ -24,6 +26,7 @@ var ( Data: "{\"data\":\"something\"}", Event: eventindexer.EventNameBlockProposed, ChainID: big.NewInt(1), + BlockID: &blockID, } ) @@ -209,3 +212,48 @@ func TestIntegration_Event_GetCountByAddressAndEventName(t *testing.T) { }) } } + +func TestIntegration_Event_Delete(t *testing.T) { + db, close, err := testMysql(t) + assert.Equal(t, nil, err) + + defer close() + + eventRepo, err := NewEventRepository(db) + assert.Equal(t, nil, err) + + event, err := eventRepo.Save(context.Background(), dummyProveEventOpts) + + assert.Equal(t, nil, err) + + tests := []struct { + name string + id int + wantErr error + }{ + { + "success", + event.ID, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := eventRepo.Delete( + context.Background(), + tt.id, + ) + assert.Equal(t, tt.wantErr, err) + + foundEvent, err := eventRepo.FindByEventTypeAndBlockID( + context.Background(), + event.Event, + event.BlockID.Int64, + ) + + assert.Equal(t, nil, err) + assert.Nil(t, foundEvent) + }) + } +} diff --git a/packages/relayer/event.go b/packages/relayer/event.go index 8f6653cd32c..f50e7380bd7 100644 --- a/packages/relayer/event.go +++ b/packages/relayer/event.go @@ -101,4 +101,5 @@ type EventRepository interface { ctx context.Context, msgHash string, ) ([]*Event, error) + Delete(ctx context.Context, id int) error } diff --git a/packages/relayer/indexer/detect_and_handle_reorg.go b/packages/relayer/indexer/detect_and_handle_reorg.go new file mode 100644 index 00000000000..b354c20f6c8 --- /dev/null +++ b/packages/relayer/indexer/detect_and_handle_reorg.go @@ -0,0 +1,38 @@ +package indexer + +import ( + "context" + + "github.com/pkg/errors" + "github.com/taikoxyz/taiko-mono/packages/relayer" +) + +func (svc *Service) detectAndHandleReorg(ctx context.Context, eventType string, msgHash string) error { + events, err := svc.eventRepo.FindAllByMsgHash(ctx, msgHash) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.FindAllByMsgHash") + } + + if events == nil { + return nil + } + + var existingEvent *relayer.Event + + for _, e := range events { + if e.Event == eventType && e.MsgHash == msgHash { + existingEvent = e + break + } + } + + if existingEvent != nil { + // reorg detected + err := svc.eventRepo.Delete(ctx, existingEvent.ID) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.Delete") + } + } + + return nil +} diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index a0f447db988..aad5552363d 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -18,15 +18,10 @@ func (svc *Service) handleEvent( chainID *big.Int, event *bridge.BridgeMessageSent, ) error { - raw := event.Raw - log.Infof("event found for msgHash: %v, txHash: %v", common.Hash(event.MsgHash).Hex(), event.Raw.TxHash.Hex()) - // handle chain re-org by checking Removed property, no need to - // return error, just continue and do not process. - if raw.Removed { - log.Warnf("event msgHash was removed: %v", common.Hash(event.MsgHash).Hex()) - return nil + if err := svc.detectAndHandleReorg(ctx, relayer.EventNameMessageSent, common.Hash(event.MsgHash).Hex()); err != nil { + return errors.Wrap(err, "svc.detectAndHandleReorg") } if event.MsgHash == relayer.ZeroHash { diff --git a/packages/relayer/indexer/save_message_status_changed_events.go b/packages/relayer/indexer/save_message_status_changed_events.go index 49ec9687445..ba74f596913 100644 --- a/packages/relayer/indexer/save_message_status_changed_events.go +++ b/packages/relayer/indexer/save_message_status_changed_events.go @@ -24,8 +24,17 @@ func (svc *Service) saveMessageStatusChangedEvents( for { event := events.Event + log.Infof("messageStatusChanged: %v", common.Hash(event.MsgHash).Hex()) + if err := svc.detectAndHandleReorg( + ctx, + relayer.EventNameMessageStatusChanged, + common.Hash(event.MsgHash).Hex(), + ); err != nil { + return errors.Wrap(err, "svc.detectAndHandleReorg") + } + if err := svc.saveMessageStatusChangedEvent(ctx, chainID, event); err != nil { return errors.Wrap(err, "svc.saveMessageStatusChangedEvent") } diff --git a/packages/relayer/mock/event_repository.go b/packages/relayer/mock/event_repository.go index dd3317198fe..53208a11ad8 100644 --- a/packages/relayer/mock/event_repository.go +++ b/packages/relayer/mock/event_repository.go @@ -107,3 +107,16 @@ func (r *EventRepository) FindAllByMsgHash( return events, nil } + +func (r *EventRepository) Delete( + ctx context.Context, + id int, +) error { + for i, e := range r.events { + if e.ID == id { + r.events = append(r.events[:i], r.events[i+1:]...) + } + } + + return nil +} diff --git a/packages/relayer/repo/event.go b/packages/relayer/repo/event.go index a5884bba762..9d413aa673e 100644 --- a/packages/relayer/repo/event.go +++ b/packages/relayer/repo/event.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/taikoxyz/taiko-mono/packages/relayer" "gorm.io/datatypes" + "gorm.io/gorm" ) type EventRepository struct { @@ -72,6 +73,10 @@ func (r *EventRepository) FindAllByMsgHash( // find all message sent events if err := r.db.GormDB().Where("msg_hash = ?", msgHash). Find(&e).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return e, nil + } + return nil, errors.Wrap(err, "r.db.Find") } @@ -112,3 +117,10 @@ func (r *EventRepository) FindAllByAddress( return page, nil } + +func (r *EventRepository) Delete( + ctx context.Context, + id int, +) error { + return r.db.GormDB().Delete(relayer.Event{}, id).Error +}