diff --git a/.github/workflows/relayer.yml b/.github/workflows/relayer.yml index 29716a41ab6..47d340b27e9 100644 --- a/.github/workflows/relayer.yml +++ b/.github/workflows/relayer.yml @@ -2,7 +2,7 @@ name: Relayer on: push: - branches: [main] + branches: [main, alpha-2] paths: - "packages/relayer/**" pull_request: diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index 875496b379e..86cdbd7747d 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -27,6 +27,8 @@ func (svc *Service) FilterThenSubscribe( return errors.Wrap(err, "svc.ethClient.ChainID()") } + go scanBlocks(ctx, svc.ethClient, chainID) + // if subscribing to new events, skip filtering and subscribe if watchMode == relayer.SubscribeWatchMode { return svc.subscribe(ctx, chainID) diff --git a/packages/relayer/indexer/scan_blocks.go b/packages/relayer/indexer/scan_blocks.go new file mode 100644 index 00000000000..a3024f498c1 --- /dev/null +++ b/packages/relayer/indexer/scan_blocks.go @@ -0,0 +1,31 @@ +package indexer + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/taikoxyz/taiko-mono/packages/relayer" +) + +func scanBlocks(ctx context.Context, ethClient ethClient, chainID *big.Int) { + headers := make(chan *types.Header) + + sub, err := ethClient.SubscribeNewHead(ctx, headers) + if err != nil { + panic(err) + } + + for { + select { + case <-sub.Err(): + relayer.BlocksScannedError.Inc() + + scanBlocks(ctx, ethClient, chainID) + + return + case <-headers: + relayer.BlocksScanned.Inc() + } + } +} diff --git a/packages/relayer/indexer/service.go b/packages/relayer/indexer/service.go index 9d64482aaed..77af3af3653 100644 --- a/packages/relayer/indexer/service.go +++ b/packages/relayer/indexer/service.go @@ -7,6 +7,7 @@ import ( "time" "github.com/cyberhorsey/errors" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -27,6 +28,7 @@ var ( type ethClient interface { ChainID(ctx context.Context) (*big.Int, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) } type Service struct { diff --git a/packages/relayer/mock/eth_client.go b/packages/relayer/mock/eth_client.go index c8ec2e135ed..dcfdc993a37 100644 --- a/packages/relayer/mock/eth_client.go +++ b/packages/relayer/mock/eth_client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "math/big" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -71,3 +72,24 @@ func (c *EthClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types. return Header, nil } + +func (c *EthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { + go func() { + t := time.NewTicker(time.Second * 1) + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + ch <- &types.Header{} + } + } + }() + + s := &Subscription{ + errChan: make(chan error), + } + + return s, nil +} diff --git a/packages/relayer/prometheus.go b/packages/relayer/prometheus.go index b2df6d3c558..7bd34c0d285 100644 --- a/packages/relayer/prometheus.go +++ b/packages/relayer/prometheus.go @@ -14,6 +14,14 @@ var ( Name: "blocks_processed_ops_total", Help: "The total number of processed blocks", }) + BlocksScanned = promauto.NewCounter(prometheus.CounterOpts{ + Name: "blocks_scanned_ops_total", + Help: "The total number of scanned blocks. Acts as heartbeat metric.", + }) + BlocksScannedError = promauto.NewCounter(prometheus.CounterOpts{ + Name: "blocks_scanned_error_ops_total", + Help: "The total number of scanned block errors.", + }) RetriableEvents = promauto.NewCounter(prometheus.CounterOpts{ Name: "events_processed_retriable_status_ops_total", Help: "The total number of processed events that ended up in Retriable status",