From 13cec87c716d93c5e9d1abfbc65eefeadfbaefdb Mon Sep 17 00:00:00 2001 From: jeff <113397187+cyberhorsey@users.noreply.github.com> Date: Mon, 29 Apr 2024 17:38:43 -0700 Subject: [PATCH 1/4] fix(relayer): handle retry count manually since it's lost in translation from moving between exchanges (#16922) --- .github/_typos.toml | 5 +- .../healthchecker/healthchecker.go | 2 +- packages/relayer/cmd/flags/processor.go | 8 +++ .../indexer/handle_message_processed_event.go | 2 +- .../indexer/handle_message_sent_event.go | 2 +- packages/relayer/pkg/mock/queue.go | 8 ++- packages/relayer/pkg/queue/queue.go | 18 ++++-- packages/relayer/pkg/queue/rabbitmq/queue.go | 55 +++++-------------- packages/relayer/processor/config.go | 3 + packages/relayer/processor/process_message.go | 28 ++++++---- .../relayer/processor/process_message_test.go | 4 +- packages/relayer/processor/process_single.go | 2 +- packages/relayer/processor/processor.go | 11 +++- packages/relayer/processor/processor_test.go | 1 + 14 files changed, 81 insertions(+), 68 deletions(-) diff --git a/.github/_typos.toml b/.github/_typos.toml index 6234b2c783f..d7fc0095f57 100644 --- a/.github/_typos.toml +++ b/.github/_typos.toml @@ -1,4 +1,7 @@ [default.extend-words] TGE = "TGE" TKO = "TKO" -Ethereum = "Ethereum" \ No newline at end of file +Ethereum = "Ethereum" + +[files] +extend-exclude = ["packages/protocol/audit"] \ No newline at end of file diff --git a/packages/guardian-prover-health-check/healthchecker/healthchecker.go b/packages/guardian-prover-health-check/healthchecker/healthchecker.go index 461ad4a5b7e..dca9eaae8a8 100644 --- a/packages/guardian-prover-health-check/healthchecker/healthchecker.go +++ b/packages/guardian-prover-health-check/healthchecker/healthchecker.go @@ -114,7 +114,7 @@ func InitFromConfig(ctx context.Context, h *HealthChecker, cfg *Config) (err err guardianProvers = append(guardianProvers, guardianproverhealthcheck.GuardianProver{ Address: guardianAddress, - ID: guardianId, + ID: new(big.Int).Sub(guardianId, common.Big1), HealthCheckCounter: promauto.NewCounter(prometheus.CounterOpts{ Name: fmt.Sprintf("guardian_prover_%v_health_checks_ops_total", guardianId.Uint64()), Help: "The total number of health checks", diff --git a/packages/relayer/cmd/flags/processor.go b/packages/relayer/cmd/flags/processor.go index b55f9ca3a19..72ea928ac4c 100644 --- a/packages/relayer/cmd/flags/processor.go +++ b/packages/relayer/cmd/flags/processor.go @@ -129,6 +129,13 @@ var ( Required: false, EnvVars: []string{"UNPROFITABLE_MESSAGE_QUEUE_EXPIRATION"}, } + MaxMessageRetries = &cli.Uint64Flag{ + Name: "maxMessageRetries", + Usage: "How many times to retry a message due to unprofitability", + Category: processorCategory, + Value: 5, + EnvVars: []string{"MAX_MESSAGE_RETRIES"}, + } ) var ProcessorFlags = MergeFlags(CommonFlags, QueueFlags, TxmgrFlags, []cli.Flag{ @@ -151,4 +158,5 @@ var ProcessorFlags = MergeFlags(CommonFlags, QueueFlags, TxmgrFlags, []cli.Flag{ TargetTxHash, CacheOption, UnprofitableMessageQueueExpiration, + MaxMessageRetries, }) diff --git a/packages/relayer/indexer/handle_message_processed_event.go b/packages/relayer/indexer/handle_message_processed_event.go index 453ab7e837e..dd232374dcb 100644 --- a/packages/relayer/indexer/handle_message_processed_event.go +++ b/packages/relayer/indexer/handle_message_processed_event.go @@ -98,7 +98,7 @@ func (i *Indexer) handleMessageProcessedEvent( // we add it to the queue, so the processor can pick up and attempt to process // the message onchain. - if err := i.queue.Publish(ctx, i.queueName(), marshalledMsg, nil); err != nil { + if err := i.queue.Publish(ctx, i.queueName(), marshalledMsg, nil, nil); err != nil { return errors.Wrap(err, "i.queue.Publish") } diff --git a/packages/relayer/indexer/handle_message_sent_event.go b/packages/relayer/indexer/handle_message_sent_event.go index 8ced106e5eb..d7d541baf88 100644 --- a/packages/relayer/indexer/handle_message_sent_event.go +++ b/packages/relayer/indexer/handle_message_sent_event.go @@ -113,7 +113,7 @@ func (i *Indexer) handleMessageSentEvent( // we add it to the queue, so the processor can pick up and attempt to process // the message onchain. - if err := i.queue.Publish(ctx, i.queueName(), marshalledMsg, nil); err != nil { + if err := i.queue.Publish(ctx, i.queueName(), marshalledMsg, nil, nil); err != nil { return errors.Wrap(err, "i.queue.Publish") } diff --git a/packages/relayer/pkg/mock/queue.go b/packages/relayer/pkg/mock/queue.go index fccaa521f8a..bd23bca6326 100644 --- a/packages/relayer/pkg/mock/queue.go +++ b/packages/relayer/pkg/mock/queue.go @@ -22,7 +22,13 @@ func (r *Queue) Notify(ctx context.Context, wg *sync.WaitGroup) error { return nil } -func (r *Queue) Publish(ctx context.Context, queueName string, msg []byte, expiration *string) error { +func (r *Queue) Publish( + ctx context.Context, + queueName string, + msg []byte, + headers map[string]interface{}, + expiration *string, +) error { return nil } diff --git a/packages/relayer/pkg/queue/queue.go b/packages/relayer/pkg/queue/queue.go index 85a15cbe07d..9a436de3e0e 100644 --- a/packages/relayer/pkg/queue/queue.go +++ b/packages/relayer/pkg/queue/queue.go @@ -15,7 +15,13 @@ var ( type Queue interface { Start(ctx context.Context, queueName string) error Close(ctx context.Context) - Publish(ctx context.Context, queueName string, msg []byte, expiration *string) error + Publish( + ctx context.Context, + queueName string, + msg []byte, + headers map[string]interface{}, + expiration *string, + ) error Notify(ctx context.Context, wg *sync.WaitGroup) error Subscribe(ctx context.Context, msgs chan<- Message, wg *sync.WaitGroup) error Ack(ctx context.Context, msg Message) error @@ -23,8 +29,9 @@ type Queue interface { } type QueueMessageSentBody struct { - Event *bridge.BridgeMessageSent - ID int + Event *bridge.BridgeMessageSent + ID int + TimesRetried uint64 } type QueueMessageProcessedBody struct { @@ -33,9 +40,8 @@ type QueueMessageProcessedBody struct { } type Message struct { - Body []byte - Internal interface{} - TimesRetried int64 + Body []byte + Internal interface{} } type NewQueueOpts struct { diff --git a/packages/relayer/pkg/queue/rabbitmq/queue.go b/packages/relayer/pkg/queue/rabbitmq/queue.go index 1e5814a55ac..37cd868007a 100644 --- a/packages/relayer/pkg/queue/rabbitmq/queue.go +++ b/packages/relayer/pkg/queue/rabbitmq/queue.go @@ -242,7 +242,13 @@ func (r *RabbitMQ) Close(ctx context.Context) { slog.Info("closed rabbitmq connection") } -func (r *RabbitMQ) Publish(ctx context.Context, queueName string, msg []byte, expiration *string) error { +func (r *RabbitMQ) Publish( + ctx context.Context, + queueName string, + msg []byte, + headers map[string]interface{}, + expiration *string, +) error { slog.Info("publishing rabbitmq msg to queue", "queue", r.queue.Name) p := amqp.Publishing{ @@ -250,6 +256,7 @@ func (r *RabbitMQ) Publish(ctx context.Context, queueName string, msg []byte, ex Body: msg, MessageId: uuid.New().String(), DeliveryMode: 2, // persistent messages, saved to disk to survive server restart + Headers: headers, } if expiration != nil { @@ -274,7 +281,7 @@ func (r *RabbitMQ) Publish(ctx context.Context, queueName string, msg []byte, ex return err } - return r.Publish(ctx, queueName, msg, expiration) + return r.Publish(ctx, queueName, msg, headers, expiration) } else { return err } @@ -372,7 +379,7 @@ func (r *RabbitMQ) Notify(ctx context.Context, wg *sync.WaitGroup) error { slog.Error("rabbitmq notify return", "id", returnMsg.MessageId, "err", returnMsg.ReplyText) slog.Info("rabbitmq attempting republish of returned msg", "id", returnMsg.MessageId) - if err := r.Publish(ctx, r.queue.Name, returnMsg.Body, &returnMsg.Expiration); err != nil { + if err := r.Publish(ctx, r.queue.Name, returnMsg.Body, returnMsg.Headers, &returnMsg.Expiration); err != nil { slog.Error("error publishing msg", "err", err.Error()) } } @@ -455,45 +462,9 @@ func (r *RabbitMQ) Subscribe(ctx context.Context, msgChan chan<- queue.Message, if d.Body != nil { slog.Info("rabbitmq message found", "msgId", d.MessageId) - - var timesRetried int64 = 0 - - var maxRetries int64 = 3 - - xDeath, exists := d.Headers["x-death"].([]interface{}) - - if exists { - // message was rejected before - c := xDeath[0].(amqp.Table)["count"].(int64) - - timesRetried = c - - if timesRetried > 0 { - relayer.MessageSentEventsRetries.Inc() - } - } - - if timesRetried > 0 { - slog.Info("rabbitmq message times retried", - "msgId", d.MessageId, - "timesRetried", timesRetried, - ) - } - - if timesRetried >= int64(maxRetries) { - slog.Info("msg has reached max retries", "id", d.MessageId) - - relayer.MessageSentEventsMaxRetriesReached.Inc() - - if err := d.Ack(false); err != nil { - slog.Error("error acking msg after max retries") - } - } else { - msgChan <- queue.Message{ - Body: d.Body, - Internal: d, - TimesRetried: timesRetried, - } + msgChan <- queue.Message{ + Body: d.Body, + Internal: d, } } else { slog.Info("nil body message, queue is closed") diff --git a/packages/relayer/processor/config.go b/packages/relayer/processor/config.go index 8ca912c36aa..ab116091f41 100644 --- a/packages/relayer/processor/config.go +++ b/packages/relayer/processor/config.go @@ -83,6 +83,8 @@ type Config struct { UnprofitableMessageQueueExpiration *string TxmgrConfigs *txmgr.CLIConfig + + MaxMessageRetries uint64 } // NewConfigFromCliContext creates a new config instance from command line flags. @@ -166,6 +168,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { processorPrivateKey, c, ), + MaxMessageRetries: c.Uint64(flags.MaxMessageRetries.Name), OpenDBFunc: func() (DB, error) { return db.OpenDBConnection(db.DBConnectionOpts{ Name: c.String(flags.DatabaseUsername.Name), diff --git a/packages/relayer/processor/process_message.go b/packages/relayer/processor/process_message.go index 832b965d0f1..2288470abb3 100644 --- a/packages/relayer/processor/process_message.go +++ b/packages/relayer/processor/process_message.go @@ -60,17 +60,23 @@ func (p *Processor) eventStatusFromMsgHash( func (p *Processor) processMessage( ctx context.Context, msg queue.Message, -) (bool, error) { +) (bool, uint64, error) { msgBody := &queue.QueueMessageSentBody{} if err := json.Unmarshal(msg.Body, msgBody); err != nil { - return false, errors.Wrap(err, "json.Unmarshal") + return false, 0, errors.Wrap(err, "json.Unmarshal") } slog.Info("message received", "srcTxHash", msgBody.Event.Raw.TxHash.Hex()) + if msgBody.TimesRetried >= p.maxMessageRetries { + slog.Warn("max retries reached", "timesRetried", msgBody.TimesRetried) + + return false, msgBody.TimesRetried, nil + } + eventStatus, err := p.eventStatusFromMsgHash(ctx, msgBody.Event.MsgHash) if err != nil { - return false, errors.Wrap(err, "p.eventStatusFromMsgHash") + return false, msgBody.TimesRetried, errors.Wrap(err, "p.eventStatusFromMsgHash") } if !canProcessMessage( @@ -80,32 +86,32 @@ func (p *Processor) processMessage( p.relayerAddr, uint64(msgBody.Event.Message.GasLimit), ) { - return false, nil + return false, msgBody.TimesRetried, nil } if err := p.waitForConfirmations(ctx, msgBody.Event.Raw.TxHash, msgBody.Event.Raw.BlockNumber); err != nil { - return false, err + return false, msgBody.TimesRetried, err } encodedSignalProof, err := p.generateEncodedSignalProof(ctx, msgBody.Event) if err != nil { - return false, err + return false, msgBody.TimesRetried, err } receipt, err := p.sendProcessMessageCall(ctx, msgBody.Event, encodedSignalProof) if err != nil { - return false, err + return false, msgBody.TimesRetried, err } if receipt.Status != types.ReceiptStatusSuccessful { - return false, err + return false, msgBody.TimesRetried, err } messageStatus, err := p.destBridge.MessageStatus(&bind.CallOpts{ Context: ctx, }, msgBody.Event.MsgHash) if err != nil { - return false, errors.Wrap(err, "p.destBridge.GetMessageStatus") + return false, msgBody.TimesRetried, errors.Wrap(err, "p.destBridge.GetMessageStatus") } slog.Info( @@ -125,11 +131,11 @@ func (p *Processor) processMessage( if msg.Internal != nil { // update message status if err := p.eventRepo.UpdateStatus(ctx, msgBody.ID, relayer.EventStatus(messageStatus)); err != nil { - return false, err + return false, msgBody.TimesRetried, err } } - return false, nil + return false, msgBody.TimesRetried, nil } // generateEncodedSignalproof takes a MessageSent event and calls a diff --git a/packages/relayer/processor/process_message_test.go b/packages/relayer/processor/process_message_test.go index e00b4145816..2b6c02eb217 100644 --- a/packages/relayer/processor/process_message_test.go +++ b/packages/relayer/processor/process_message_test.go @@ -73,7 +73,7 @@ func Test_ProcessMessage_messageUnprocessable(t *testing.T) { Body: marshalled, } - shouldRequeue, err := p.processMessage(context.Background(), msg) + shouldRequeue, _, err := p.processMessage(context.Background(), msg) assert.Nil(t, err) @@ -117,7 +117,7 @@ func Test_ProcessMessage_unprofitable(t *testing.T) { Body: marshalled, } - shouldRequeue, err := p.processMessage(context.Background(), msg) + shouldRequeue, _, err := p.processMessage(context.Background(), msg) assert.Equal( t, diff --git a/packages/relayer/processor/process_single.go b/packages/relayer/processor/process_single.go index 2e130d0f6ec..07e9ed71896 100644 --- a/packages/relayer/processor/process_single.go +++ b/packages/relayer/processor/process_single.go @@ -47,7 +47,7 @@ func (p *Processor) processSingle(ctx context.Context) error { return err } - if _, err := p.processMessage(ctx, queue.Message{ + if _, _, err := p.processMessage(ctx, queue.Message{ Body: marshalledMsg, }); err != nil { return err diff --git a/packages/relayer/processor/processor.go b/packages/relayer/processor/processor.go index 40cae8d531b..117cc6116ae 100644 --- a/packages/relayer/processor/processor.go +++ b/packages/relayer/processor/processor.go @@ -128,6 +128,8 @@ type Processor struct { cfg *Config txmgr txmgr.TxManager + + maxMessageRetries uint64 } // InitFromCli creates a new processor from a cli context @@ -349,6 +351,8 @@ func InitFromConfig(ctx context.Context, p *Processor, cfg *Config) error { p.targetTxHash = cfg.TargetTxHash + p.maxMessageRetries = cfg.MaxMessageRetries + return nil } @@ -432,7 +436,7 @@ func (p *Processor) eventLoop(ctx context.Context) { return case msg := <-p.msgCh: go func(m queue.Message) { - shouldRequeue, err := p.processMessage(ctx, m) + shouldRequeue, timesRetried, err := p.processMessage(ctx, m) if err != nil { switch { @@ -443,10 +447,15 @@ func (p *Processor) eventLoop(ctx context.Context) { case errors.Is(err, relayer.ErrUnprofitable): slog.Info("publishing to unprofitable queue") + headers := make(map[string]interface{}, 0) + + headers["retries"] = timesRetried + 1 + if err := p.queue.Publish( ctx, fmt.Sprintf("%v-unprofitable", p.queueName()), m.Body, + headers, p.cfg.UnprofitableMessageQueueExpiration, ); err != nil { slog.Error("error publishing to unprofitable queue", "error", err) diff --git a/packages/relayer/processor/processor_test.go b/packages/relayer/processor/processor_test.go index b458a56e9d8..4fc61cdc406 100644 --- a/packages/relayer/processor/processor_test.go +++ b/packages/relayer/processor/processor_test.go @@ -46,5 +46,6 @@ func newTestProcessor(profitableOnly bool) *Processor { cfg: &Config{ DestBridgeAddress: common.HexToAddress("0xC4279588B8dA563D264e286E2ee7CE8c244444d6"), }, + maxMessageRetries: 5, } } From a27fdbf60ef51f758c8bd64d37418266e8b22091 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 30 Apr 2024 10:15:45 +0800 Subject: [PATCH 2/4] fix(protocol): fix a workflow issue (#16921) --- packages/protocol/script/DeployOnL1.s.sol | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/protocol/script/DeployOnL1.s.sol b/packages/protocol/script/DeployOnL1.s.sol index 001dd3a634a..1ded6d6b069 100644 --- a/packages/protocol/script/DeployOnL1.s.sol +++ b/packages/protocol/script/DeployOnL1.s.sol @@ -329,8 +329,7 @@ contract DeployOnL1 is DeployCapability { name: "automata_dcap_attestation", impl: automateDcapV3AttestationImpl, data: abi.encodeCall( - AutomataDcapV3Attestation.init, - (timelock, address(sigVerifyLib), address(pemCertChainLib)) + AutomataDcapV3Attestation.init, (owner, address(sigVerifyLib), address(pemCertChainLib)) ), registerTo: rollupAddressManager }); From bfca520f5aaee7772032962047a844089428d392 Mon Sep 17 00:00:00 2001 From: Daniel Wang <99078276+dantaik@users.noreply.github.com> Date: Tue, 30 Apr 2024 13:41:36 +0800 Subject: [PATCH 3/4] docs(repo): update README.md to warn against airdrop PRs (#16924) --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 40ed82d51f7..fc31a296e8a 100644 --- a/README.md +++ b/README.md @@ -56,9 +56,9 @@ If you find a bug or have a feature request, please [open an issue](https://gith ## Contributing -Check out [CONTRIBUTING.md](./CONTRIBUTING.md) for details on how to contribute. +Check out [CONTRIBUTING.md](./CONTRIBUTING.md) for details on how to contribute. You can also check out our grants cycle at [grants.taiko.xyz](https://grants.taiko.xyz). -You can also check out our grants cycle at [grants.taiko.xyz](https://grants.taiko.xyz). +⚠️ Please refrain from submitting typo/comment-only pull requests with the expectation of receiving TKO airdrops. ## Getting support From 1045a55d3499f5295ffb8f041533639ba409ae4d Mon Sep 17 00:00:00 2001 From: David Date: Tue, 30 Apr 2024 15:36:19 +0800 Subject: [PATCH 4/4] feat(protocol): add `PAUSE_BRIDGE` env to `DeployOnL1` script (#16927) --- packages/protocol/script/DeployOnL1.s.sol | 10 ++++++++-- packages/protocol/script/test_deploy_on_l1.sh | 1 + 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/protocol/script/DeployOnL1.s.sol b/packages/protocol/script/DeployOnL1.s.sol index 1ded6d6b069..bd2b26ef86e 100644 --- a/packages/protocol/script/DeployOnL1.s.sol +++ b/packages/protocol/script/DeployOnL1.s.sol @@ -171,13 +171,19 @@ contract DeployOnL1 is DeployCapability { registerTo: sharedAddressManager }); - deployProxy({ + address brdige = deployProxy({ name: "bridge", impl: address(new Bridge()), - data: abi.encodeCall(Bridge.init, (owner, sharedAddressManager)), + data: abi.encodeCall(Bridge.init, (address(0), sharedAddressManager)), registerTo: sharedAddressManager }); + if (vm.envBool("PAUSE_BRIDGE")) { + Bridge(payable(brdige)).pause(); + } + + Bridge(payable(brdige)).transferOwnership(owner); + console2.log("------------------------------------------"); console2.log( "Warning - you need to register *all* counterparty bridges to enable multi-hop bridging:" diff --git a/packages/protocol/script/test_deploy_on_l1.sh b/packages/protocol/script/test_deploy_on_l1.sh index 7dce3a714f7..85703d3accc 100755 --- a/packages/protocol/script/test_deploy_on_l1.sh +++ b/packages/protocol/script/test_deploy_on_l1.sh @@ -17,6 +17,7 @@ TAIKO_TOKEN_SYMBOL=TTKOk \ SHARED_ADDRESS_MANAGER=0x0000000000000000000000000000000000000000 \ L2_GENESIS_HASH=0xee1950562d42f0da28bd4550d88886bc90894c77c9c9eaefef775d4c8223f259 \ PAUSE_TAIKO_L1=true \ +PAUSE_BRIDGE=true \ TIER_PROVIDER="devnet" \ forge script script/DeployOnL1.s.sol:DeployOnL1 \ --fork-url http://localhost:8545 \