Skip to content

Commit

Permalink
feat(relayer): MessageStatusChanged events (#13272)
Browse files Browse the repository at this point in the history
Co-authored-by: David <david@taiko.xyz>
  • Loading branch information
cyberhorsey and davidtaikocha authored Mar 13, 2023
1 parent cc5f5c4 commit f5f4fc4
Show file tree
Hide file tree
Showing 16 changed files with 415 additions and 28 deletions.
6 changes: 3 additions & 3 deletions packages/relayer/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ linters:

linters-settings:
funlen:
lines: 130
statements: 52
lines: 132
statements: 54
gocognit:
min-complexity: 40
min-complexity: 41

issues:
exclude-rules:
Expand Down
9 changes: 9 additions & 0 deletions packages/relayer/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,13 @@ type Bridge interface {
GetMessageStatus(opts *bind.CallOpts, msgHash [32]byte) (uint8, error)
ProcessMessage(opts *bind.TransactOpts, message bridge.IBridgeMessage, proof []byte) (*types.Transaction, error)
IsMessageReceived(opts *bind.CallOpts, msgHash [32]byte, srcChainId *big.Int, proof []byte) (bool, error) // nolint
FilterMessageStatusChanged(
opts *bind.FilterOpts,
msgHash [][32]byte,
) (*bridge.BridgeMessageStatusChangedIterator, error)
WatchMessageStatusChanged(
opts *bind.WatchOpts,
sink chan<- *bridge.BridgeMessageStatusChanged,
msgHash [][32]byte,
) (event.Subscription, error)
}
11 changes: 10 additions & 1 deletion packages/relayer/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
)

var (
EventNameMessageSent = "MessageSent"
EventNameMessageSent = "MessageSent"
EventNameMessageStatusChanged = "MessageStatusChanged"
)

// EventStatus is used to indicate whether processing has been attempted
Expand Down Expand Up @@ -54,6 +55,8 @@ type Event struct {
CanonicalTokenName string `json:"canonicalTokenName"`
CanonicalTokenDecimals uint8 `json:"canonicalTokenDecimals"`
Amount string `json:"amount"`
MsgHash string `json:"msgHash"`
MessageOwner string `json:"messageOwner"`
}

// SaveEventOpts
Expand All @@ -68,6 +71,8 @@ type SaveEventOpts struct {
CanonicalTokenName string
CanonicalTokenDecimals uint8
Amount string
MsgHash string
MessageOwner string
}

// EventRepository is used to interact with events in the store
Expand All @@ -83,4 +88,8 @@ type EventRepository interface {
ctx context.Context,
address common.Address,
) ([]*Event, error)
FindAllByMsgHash(
ctx context.Context,
msgHash string,
) ([]*Event, error)
}
25 changes: 20 additions & 5 deletions packages/relayer/indexer/filter_then_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,31 @@ func (svc *Service) FilterThenSubscribe(
end = header.Number.Uint64()
}

events, err := svc.bridge.FilterMessageSent(&bind.FilterOpts{
filterOpts := &bind.FilterOpts{
Start: svc.processingBlockHeight,
End: &end,
Context: ctx,
}, nil)
}

messageStatusChangedEvents, err := svc.bridge.FilterMessageStatusChanged(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "bridge.FilterMessageStatusChanged")
}

// we dont need to do anything with msgStatus events except save them to the DB.
// we dont need to process them. they are for exposing via the API.

err = svc.saveMessageStatusChangedEvents(ctx, chainID, messageStatusChangedEvents)
if err != nil {
return errors.Wrap(err, "bridge.saveMessageStatusChangedEvents")
}

messageSentEvents, err := svc.bridge.FilterMessageSent(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "bridge.FilterMessageSent")
}

if !events.Next() || events.Event == nil {
if !messageSentEvents.Next() || messageSentEvents.Event == nil {
if err := svc.handleNoEventsInBatch(ctx, chainID, int64(end)); err != nil {
return errors.Wrap(err, "svc.handleNoEventsInBatch")
}
Expand All @@ -83,7 +98,7 @@ func (svc *Service) FilterThenSubscribe(
group.SetLimit(svc.numGoroutines)

for {
event := events.Event
event := messageSentEvents.Event

group.Go(func() error {
err := svc.handleEvent(groupCtx, chainID, event)
Expand All @@ -97,7 +112,7 @@ func (svc *Service) FilterThenSubscribe(
})

// if there are no more events
if !events.Next() {
if !messageSentEvents.Next() {
// wait for the last of the goroutines to finish
if err := group.Wait(); err != nil {
return errors.Wrap(err, "group.Wait")
Expand Down
9 changes: 6 additions & 3 deletions packages/relayer/indexer/filter_then_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func Test_FilterThenSubscribe(t *testing.T) {
<-time.After(6 * time.Second)

assert.Equal(t, b.MessagesSent, 1)
assert.Equal(t, b.ErrorsSent, 1)
assert.Equal(t, b.MessageStatusesChanged, 1)
assert.Equal(t, b.ErrorsSent, 2)
}

func Test_FilterThenSubscribe_subscribeWatchMode(t *testing.T) {
Expand All @@ -45,7 +46,8 @@ func Test_FilterThenSubscribe_subscribeWatchMode(t *testing.T) {
<-time.After(6 * time.Second)

assert.Equal(t, b.MessagesSent, 1)
assert.Equal(t, b.ErrorsSent, 1)
assert.Equal(t, b.MessageStatusesChanged, 1)
assert.Equal(t, b.ErrorsSent, 2)
}

func Test_FilterThenSubscribe_alreadyCaughtUp(t *testing.T) {
Expand All @@ -65,5 +67,6 @@ func Test_FilterThenSubscribe_alreadyCaughtUp(t *testing.T) {
<-time.After(6 * time.Second)

assert.Equal(t, b.MessagesSent, 1)
assert.Equal(t, b.ErrorsSent, 1)
assert.Equal(t, b.MessageStatusesChanged, 1)
assert.Equal(t, b.ErrorsSent, 2)
}
4 changes: 3 additions & 1 deletion packages/relayer/indexer/handle_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (svc *Service) handleEvent(
}

e, err := svc.eventRepo.Save(ctx, relayer.SaveEventOpts{
Name: eventName,
Name: relayer.EventNameMessageSent,
Data: string(marshaled),
ChainID: chainID,
Status: eventStatus,
Expand All @@ -62,6 +62,8 @@ func (svc *Service) handleEvent(
CanonicalTokenName: canonicalToken.Name,
CanonicalTokenDecimals: canonicalToken.Decimals,
Amount: amount.String(),
MsgHash: common.Hash(event.MsgHash).Hex(),
MessageOwner: event.Message.Owner.Hex(),
})
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Save")
Expand Down
74 changes: 74 additions & 0 deletions packages/relayer/indexer/save_message_status_changed_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package indexer

import (
"context"
"encoding/json"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/taikoxyz/taiko-mono/packages/relayer"
"github.com/taikoxyz/taiko-mono/packages/relayer/contracts/bridge"
)

func (svc *Service) saveMessageStatusChangedEvents(
ctx context.Context,
chainID *big.Int,
events *bridge.BridgeMessageStatusChangedIterator,
) error {
if !events.Next() {
log.Infof("no messageStatusChanged events")
return nil
}

for {
event := events.Event
log.Infof("messageStatusChanged: %v", common.Hash(event.MsgHash).Hex())

if err := svc.saveMessageStatusChangedEvent(ctx, chainID, event); err != nil {
return errors.Wrap(err, "svc.saveMessageStatusChangedEvent")
}

if !events.Next() {
return nil
}
}
}

func (svc *Service) saveMessageStatusChangedEvent(
ctx context.Context,
chainID *big.Int,
event *bridge.BridgeMessageStatusChanged,
) error {
marshaled, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "json.Marshal(event)")
}

// get the previous MessageSent event or other message status changed events,
// so we can find out the previous owner of this msg hash,
// to save to the db.
previousEvents, err := svc.eventRepo.FindAllByMsgHash(ctx, common.Hash(event.MsgHash).Hex())
if err != nil {
return errors.Wrap(err, "svc.eventRepo.FindAllByMsgHash")
}

if len(previousEvents) == 0 {
return errors.Wrap(err, "svc.eventRepo.FindAllByMsgHash")
}

_, err = svc.eventRepo.Save(ctx, relayer.SaveEventOpts{
Name: relayer.EventNameMessageStatusChanged,
Data: string(marshaled),
ChainID: chainID,
Status: relayer.EventStatus(event.Status),
MessageOwner: previousEvents[0].MessageOwner,
MsgHash: common.Hash(event.MsgHash).Hex(),
})
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Save")
}

return nil
}
59 changes: 58 additions & 1 deletion packages/relayer/indexer/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand All @@ -16,6 +17,25 @@ import (
func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {
log.Info("subscribing to new events")

errChan := make(chan error)

go svc.subscribeMessageSent(ctx, chainID, errChan)

go svc.subscribeMessageStatusChanged(ctx, chainID, errChan)

// nolint: gosimple
for {
select {
case <-ctx.Done():
log.Info("context finished")
return nil
case err := <-errChan:
return errors.Wrap(err, "errChan")
}
}
}

func (svc *Service) subscribeMessageSent(ctx context.Context, chainID *big.Int, errChan chan error) {
sink := make(chan *bridge.BridgeMessageSent)

sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) {
Expand All @@ -32,11 +52,16 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {

for {
select {
case <-ctx.Done():
log.Info("context finished")
return
case err := <-sub.Err():
return errors.Wrap(err, "sub.Err()")
errChan <- errors.Wrap(err, "sub.Err()")
case event := <-sink:
go func() {
log.Infof("new message sent event %v from chainID %v", common.Hash(event.MsgHash).Hex(), chainID.String())
err := svc.handleEvent(ctx, chainID, event)

if err != nil {
log.Errorf("svc.subscribe, svc.handleEvent: %v", err)
}
Expand All @@ -63,3 +88,35 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {
}
}
}

func (svc *Service) subscribeMessageStatusChanged(ctx context.Context, chainID *big.Int, errChan chan error) {
sink := make(chan *bridge.BridgeMessageStatusChanged)

sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
log.Errorf("svc.bridge.WatchMessageStatusChanged: %v", err)
}

return svc.bridge.WatchMessageStatusChanged(&bind.WatchOpts{
Context: ctx,
}, sink, nil)
})

defer sub.Unsubscribe()

for {
select {
case <-ctx.Done():
log.Info("context finished")
return
case err := <-sub.Err():
errChan <- errors.Wrap(err, "sub.Err()")
case event := <-sink:
log.Infof("new message status changed event %v from chainID %v", common.Hash(event.MsgHash).Hex(), chainID.String())

if err := svc.saveMessageStatusChangedEvent(ctx, chainID, event); err != nil {
log.Errorf("svc.subscribe, svc.saveMessageStatusChangedEvent: %v", err)
}
}
}
}
5 changes: 3 additions & 2 deletions packages/relayer/indexer/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func Test_subscribe(t *testing.T) {

b := bridge.(*mock.Bridge)

assert.Equal(t, b.MessagesSent, 1)
assert.Equal(t, b.ErrorsSent, 1)
assert.Equal(t, 1, b.MessagesSent)
assert.Equal(t, 1, b.MessageStatusesChanged)
assert.Equal(t, 2, b.ErrorsSent)
}
Loading

0 comments on commit f5f4fc4

Please sign in to comment.