Skip to content

Commit

Permalink
feat(eventindexer): handle reorg (#13841)
Browse files Browse the repository at this point in the history
Co-authored-by: Roger <50648015+RogerLamTd@users.noreply.github.com>
Co-authored-by: Daniel Wang <99078276+dantaik@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 1, 2023
1 parent 98017a2 commit 0a26ce5
Show file tree
Hide file tree
Showing 17 changed files with 238 additions and 16 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/pressly/goose/v3 v3.7.0
github.com/prometheus/client_golang v1.14.0
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.8.4
github.com/testcontainers/testcontainers-go v0.15.0
golang.org/x/sync v0.1.0
gopkg.in/go-playground/assert.v1 v1.2.1
Expand Down Expand Up @@ -86,6 +86,7 @@ require (
github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand All @@ -871,6 +873,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down
11 changes: 11 additions & 0 deletions packages/eventindexer/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eventindexer

import (
"context"
"database/sql"
"math/big"

"gorm.io/datatypes"
Expand All @@ -23,6 +24,7 @@ type Event struct {
ChainID int64 `json:"chainID"`
Event string `json:"event"`
Address string `json:"address"`
BlockID sql.NullInt64 `json:"blockID"`
}

// SaveEventOpts
Expand All @@ -32,6 +34,7 @@ type SaveEventOpts struct {
ChainID *big.Int
Event string
Address string
BlockID *int64
}

type UniqueProversResponse struct {
Expand All @@ -53,5 +56,13 @@ type EventRepository interface {
FindUniqueProposers(
ctx context.Context,
) ([]UniqueProposersResponse, error)
FindByEventTypeAndBlockID(
ctx context.Context,
eventType string,
blockID int64) (*Event, error)
Delete(
ctx context.Context,
id int,
) error
GetCountByAddressAndEventName(ctx context.Context, address string, event string) (int, error)
}
24 changes: 24 additions & 0 deletions packages/eventindexer/indexer/detect_and_handle_reorg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package indexer

import (
"context"

"github.com/pkg/errors"
)

func (svc *Service) detectAndHandleReorg(ctx context.Context, event string, blockID int64) error {
existingEvent, err := svc.eventRepo.FindByEventTypeAndBlockID(ctx, event, blockID)
if err != nil {
return errors.Wrap(err, "svc.eventRepo.FindByEventTypeAndBlockID")
}

if existingEvent != nil {
// reorg detected
err := svc.eventRepo.Delete(ctx, existingEvent.ID)
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Delete")
}
}

return nil
}
7 changes: 5 additions & 2 deletions packages/eventindexer/indexer/save_block_proposed_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func (svc *Service) saveBlockProposedEvents(
for {
event := events.Event

if event.Raw.Removed {
continue
if err := svc.detectAndHandleReorg(ctx, eventindexer.EventNameBlockProposed, event.Id.Int64()); err != nil {
return errors.Wrap(err, "svc.detectAndHandleReorg")
}

tx, _, err := svc.ethClient.TransactionByHash(ctx, event.Raw.TxHash)
Expand Down Expand Up @@ -66,12 +66,15 @@ func (svc *Service) saveBlockProposedEvent(
return errors.Wrap(err, "json.Marshal(event)")
}

blockID := event.Id.Int64()

_, err = svc.eventRepo.Save(ctx, eventindexer.SaveEventOpts{
Name: eventindexer.EventNameBlockProposed,
Data: string(marshaled),
ChainID: chainID,
Event: eventindexer.EventNameBlockProposed,
Address: sender.Hex(),
BlockID: &blockID,
})
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Save")
Expand Down
7 changes: 5 additions & 2 deletions packages/eventindexer/indexer/save_block_proven_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func (svc *Service) saveBlockProvenEvents(
for {
event := events.Event

if event.Raw.Removed {
continue
if err := svc.detectAndHandleReorg(ctx, eventindexer.EventNameBlockProven, event.Id.Int64()); err != nil {
return errors.Wrap(err, "svc.detectAndHandleReorg")
}

log.Infof("blockProven by: %v", event.Prover.Hex())
Expand Down Expand Up @@ -60,12 +60,15 @@ func (svc *Service) saveBlockProvenEvent(
return errors.Wrap(err, "json.Marshal(event)")
}

blockID := event.Id.Int64()

_, err = svc.eventRepo.Save(ctx, eventindexer.SaveEventOpts{
Name: eventindexer.EventNameBlockProven,
Data: string(marshaled),
ChainID: chainID,
Event: eventindexer.EventNameBlockProven,
Address: event.Prover.Hex(),
BlockID: &blockID,
})
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Save")
Expand Down
7 changes: 5 additions & 2 deletions packages/eventindexer/indexer/save_block_verified_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func (svc *Service) saveBlockVerifiedEvents(

log.Infof("new blockVerified event, blockId: %v", event.Id)

if event.Raw.Removed {
continue
if err := svc.detectAndHandleReorg(ctx, eventindexer.EventNameBlockVerified, event.Id.Int64()); err != nil {
return errors.Wrap(err, "svc.detectAndHandleReorg")
}

if err := svc.saveBlockVerifiedEvent(ctx, chainID, event); err != nil {
Expand All @@ -52,12 +52,15 @@ func (svc *Service) saveBlockVerifiedEvent(
return errors.Wrap(err, "json.Marshal(event)")
}

blockID := event.Id.Int64()

_, err = svc.eventRepo.Save(ctx, eventindexer.SaveEventOpts{
Name: eventindexer.EventNameBlockVerified,
Data: string(marshaled),
ChainID: chainID,
Event: eventindexer.EventNameBlockVerified,
Address: "",
BlockID: &blockID,
})
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Save")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ CREATE TABLE IF NOT EXISTS events (
chain_id int NOT NULL,
data JSON NOT NULL,
address VARCHAR(255) NOT NULL DEFAULT "",
block_id int DEFAULT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
Expand Down
27 changes: 27 additions & 0 deletions packages/eventindexer/mock/event_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/taikoxyz/taiko-mono/packages/eventindexer"
"gorm.io/datatypes"
"gorm.io/gorm"
)

type EventRepository struct {
Expand Down Expand Up @@ -57,3 +58,29 @@ func (r *EventRepository) GetCountByAddressAndEventName(

return count, nil
}

func (r *EventRepository) FindByEventTypeAndBlockID(
ctx context.Context,
eventType string,
blockID int64) (*eventindexer.Event, error) {
for _, e := range r.events {
if e.Event == eventType && e.BlockID.Int64 == blockID {
return e, nil
}
}

return nil, gorm.ErrRecordNotFound
}

func (r *EventRepository) Delete(
ctx context.Context,
id int,
) error {
for i, e := range r.events {
if e.ID == id {
r.events = append(r.events[:i], r.events[i+1:]...)
}
}

return nil
}
29 changes: 29 additions & 0 deletions packages/eventindexer/repo/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/pkg/errors"
"github.com/taikoxyz/taiko-mono/packages/eventindexer"
"gorm.io/datatypes"
"gorm.io/gorm"
)

type EventRepository struct {
Expand Down Expand Up @@ -38,6 +39,34 @@ func (r *EventRepository) Save(ctx context.Context, opts eventindexer.SaveEventO
return e, nil
}

func (r *EventRepository) FindByEventTypeAndBlockID(
ctx context.Context,
eventType string,
blockID int64) (*eventindexer.Event, error) {
e := &eventindexer.Event{}

if err := r.db.GormDB().
Where("event = ?", eventType).
Where("block_id = ?", blockID).First(e).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}

return nil, err
}

return e, nil
}

func (r *EventRepository) Delete(
ctx context.Context,
id int,
) error {
e := &eventindexer.Event{}

return r.db.GormDB().Delete(e, id).Error
}

func (r *EventRepository) FindUniqueProvers(
ctx context.Context,
) ([]eventindexer.UniqueProversResponse, error) {
Expand Down
52 changes: 50 additions & 2 deletions packages/eventindexer/repo/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,27 @@ import (
"testing"

"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/assert"
"github.com/taikoxyz/taiko-mono/packages/eventindexer"
"gotest.tools/assert"
)

var (
dummyProveEventOpts = eventindexer.SaveEventOpts{
blockID int64 = 1
dummyProveEventOpts = eventindexer.SaveEventOpts{
Name: eventindexer.EventNameBlockProven,
Address: "0x123",
Data: "{\"data\":\"something\"}",
Event: eventindexer.EventNameBlockProven,
ChainID: big.NewInt(1),
BlockID: &blockID,
}
dummyProposeEventOpts = eventindexer.SaveEventOpts{
Name: eventindexer.EventNameBlockProposed,
Address: "0x123",
Data: "{\"data\":\"something\"}",
Event: eventindexer.EventNameBlockProposed,
ChainID: big.NewInt(1),
BlockID: &blockID,
}
)

Expand Down Expand Up @@ -209,3 +212,48 @@ func TestIntegration_Event_GetCountByAddressAndEventName(t *testing.T) {
})
}
}

func TestIntegration_Event_Delete(t *testing.T) {
db, close, err := testMysql(t)
assert.Equal(t, nil, err)

defer close()

eventRepo, err := NewEventRepository(db)
assert.Equal(t, nil, err)

event, err := eventRepo.Save(context.Background(), dummyProveEventOpts)

assert.Equal(t, nil, err)

tests := []struct {
name string
id int
wantErr error
}{
{
"success",
event.ID,
nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := eventRepo.Delete(
context.Background(),
tt.id,
)
assert.Equal(t, tt.wantErr, err)

foundEvent, err := eventRepo.FindByEventTypeAndBlockID(
context.Background(),
event.Event,
event.BlockID.Int64,
)

assert.Equal(t, nil, err)
assert.Nil(t, foundEvent)
})
}
}
1 change: 1 addition & 0 deletions packages/relayer/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,5 @@ type EventRepository interface {
ctx context.Context,
msgHash string,
) ([]*Event, error)
Delete(ctx context.Context, id int) error
}
38 changes: 38 additions & 0 deletions packages/relayer/indexer/detect_and_handle_reorg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package indexer

import (
"context"

"github.com/pkg/errors"
"github.com/taikoxyz/taiko-mono/packages/relayer"
)

func (svc *Service) detectAndHandleReorg(ctx context.Context, eventType string, msgHash string) error {
events, err := svc.eventRepo.FindAllByMsgHash(ctx, msgHash)
if err != nil {
return errors.Wrap(err, "svc.eventRepo.FindAllByMsgHash")
}

if events == nil {
return nil
}

var existingEvent *relayer.Event

for _, e := range events {
if e.Event == eventType && e.MsgHash == msgHash {
existingEvent = e
break
}
}

if existingEvent != nil {
// reorg detected
err := svc.eventRepo.Delete(ctx, existingEvent.ID)
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Delete")
}
}

return nil
}
Loading

0 comments on commit 0a26ce5

Please sign in to comment.