Skip to content

Commit

Permalink
Merge branch 'main' into taikoon-ui-setup
Browse files Browse the repository at this point in the history
  • Loading branch information
bearni95 committed Apr 30, 2024
2 parents 6c86d89 + 1045a55 commit 307ddb1
Show file tree
Hide file tree
Showing 17 changed files with 93 additions and 74 deletions.
5 changes: 4 additions & 1 deletion .github/_typos.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
[default.extend-words]
TGE = "TGE"
TKO = "TKO"
Ethereum = "Ethereum"
Ethereum = "Ethereum"

[files]
extend-exclude = ["packages/protocol/audit"]
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ If you find a bug or have a feature request, please [open an issue](https://gith

## Contributing

Check out [CONTRIBUTING.md](./CONTRIBUTING.md) for details on how to contribute.
Check out [CONTRIBUTING.md](./CONTRIBUTING.md) for details on how to contribute. You can also check out our grants cycle at [grants.taiko.xyz](https://grants.taiko.xyz).

You can also check out our grants cycle at [grants.taiko.xyz](https://grants.taiko.xyz).
⚠️ Please refrain from submitting typo/comment-only pull requests with the expectation of receiving TKO airdrops.

## Getting support

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func InitFromConfig(ctx context.Context, h *HealthChecker, cfg *Config) (err err

guardianProvers = append(guardianProvers, guardianproverhealthcheck.GuardianProver{
Address: guardianAddress,
ID: guardianId,
ID: new(big.Int).Sub(guardianId, common.Big1),
HealthCheckCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("guardian_prover_%v_health_checks_ops_total", guardianId.Uint64()),
Help: "The total number of health checks",
Expand Down
13 changes: 9 additions & 4 deletions packages/protocol/script/DeployOnL1.s.sol
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,19 @@ contract DeployOnL1 is DeployCapability {
registerTo: sharedAddressManager
});

deployProxy({
address brdige = deployProxy({
name: "bridge",
impl: address(new Bridge()),
data: abi.encodeCall(Bridge.init, (owner, sharedAddressManager)),
data: abi.encodeCall(Bridge.init, (address(0), sharedAddressManager)),
registerTo: sharedAddressManager
});

if (vm.envBool("PAUSE_BRIDGE")) {
Bridge(payable(brdige)).pause();
}

Bridge(payable(brdige)).transferOwnership(owner);

console2.log("------------------------------------------");
console2.log(
"Warning - you need to register *all* counterparty bridges to enable multi-hop bridging:"
Expand Down Expand Up @@ -329,8 +335,7 @@ contract DeployOnL1 is DeployCapability {
name: "automata_dcap_attestation",
impl: automateDcapV3AttestationImpl,
data: abi.encodeCall(
AutomataDcapV3Attestation.init,
(timelock, address(sigVerifyLib), address(pemCertChainLib))
AutomataDcapV3Attestation.init, (owner, address(sigVerifyLib), address(pemCertChainLib))
),
registerTo: rollupAddressManager
});
Expand Down
1 change: 1 addition & 0 deletions packages/protocol/script/test_deploy_on_l1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ TAIKO_TOKEN_SYMBOL=TTKOk \
SHARED_ADDRESS_MANAGER=0x0000000000000000000000000000000000000000 \
L2_GENESIS_HASH=0xee1950562d42f0da28bd4550d88886bc90894c77c9c9eaefef775d4c8223f259 \
PAUSE_TAIKO_L1=true \
PAUSE_BRIDGE=true \
TIER_PROVIDER="devnet" \
forge script script/DeployOnL1.s.sol:DeployOnL1 \
--fork-url http://localhost:8545 \
Expand Down
8 changes: 8 additions & 0 deletions packages/relayer/cmd/flags/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ var (
Required: false,
EnvVars: []string{"UNPROFITABLE_MESSAGE_QUEUE_EXPIRATION"},
}
MaxMessageRetries = &cli.Uint64Flag{
Name: "maxMessageRetries",
Usage: "How many times to retry a message due to unprofitability",
Category: processorCategory,
Value: 5,
EnvVars: []string{"MAX_MESSAGE_RETRIES"},
}
)

var ProcessorFlags = MergeFlags(CommonFlags, QueueFlags, TxmgrFlags, []cli.Flag{
Expand All @@ -151,4 +158,5 @@ var ProcessorFlags = MergeFlags(CommonFlags, QueueFlags, TxmgrFlags, []cli.Flag{
TargetTxHash,
CacheOption,
UnprofitableMessageQueueExpiration,
MaxMessageRetries,
})
2 changes: 1 addition & 1 deletion packages/relayer/indexer/handle_message_processed_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (i *Indexer) handleMessageProcessedEvent(

// we add it to the queue, so the processor can pick up and attempt to process
// the message onchain.
if err := i.queue.Publish(ctx, i.queueName(), marshalledMsg, nil); err != nil {
if err := i.queue.Publish(ctx, i.queueName(), marshalledMsg, nil, nil); err != nil {
return errors.Wrap(err, "i.queue.Publish")
}

Expand Down
2 changes: 1 addition & 1 deletion packages/relayer/indexer/handle_message_sent_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (i *Indexer) handleMessageSentEvent(

// we add it to the queue, so the processor can pick up and attempt to process
// the message onchain.
if err := i.queue.Publish(ctx, i.queueName(), marshalledMsg, nil); err != nil {
if err := i.queue.Publish(ctx, i.queueName(), marshalledMsg, nil, nil); err != nil {
return errors.Wrap(err, "i.queue.Publish")
}

Expand Down
8 changes: 7 additions & 1 deletion packages/relayer/pkg/mock/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ func (r *Queue) Notify(ctx context.Context, wg *sync.WaitGroup) error {
return nil
}

func (r *Queue) Publish(ctx context.Context, queueName string, msg []byte, expiration *string) error {
func (r *Queue) Publish(
ctx context.Context,
queueName string,
msg []byte,
headers map[string]interface{},
expiration *string,
) error {
return nil
}

Expand Down
18 changes: 12 additions & 6 deletions packages/relayer/pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@ var (
type Queue interface {
Start(ctx context.Context, queueName string) error
Close(ctx context.Context)
Publish(ctx context.Context, queueName string, msg []byte, expiration *string) error
Publish(
ctx context.Context,
queueName string,
msg []byte,
headers map[string]interface{},
expiration *string,
) error
Notify(ctx context.Context, wg *sync.WaitGroup) error
Subscribe(ctx context.Context, msgs chan<- Message, wg *sync.WaitGroup) error
Ack(ctx context.Context, msg Message) error
Nack(ctx context.Context, msg Message, requeue bool) error
}

type QueueMessageSentBody struct {
Event *bridge.BridgeMessageSent
ID int
Event *bridge.BridgeMessageSent
ID int
TimesRetried uint64
}

type QueueMessageProcessedBody struct {
Expand All @@ -33,9 +40,8 @@ type QueueMessageProcessedBody struct {
}

type Message struct {
Body []byte
Internal interface{}
TimesRetried int64
Body []byte
Internal interface{}
}

type NewQueueOpts struct {
Expand Down
55 changes: 13 additions & 42 deletions packages/relayer/pkg/queue/rabbitmq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,21 @@ func (r *RabbitMQ) Close(ctx context.Context) {
slog.Info("closed rabbitmq connection")
}

func (r *RabbitMQ) Publish(ctx context.Context, queueName string, msg []byte, expiration *string) error {
func (r *RabbitMQ) Publish(
ctx context.Context,
queueName string,
msg []byte,
headers map[string]interface{},
expiration *string,
) error {
slog.Info("publishing rabbitmq msg to queue", "queue", r.queue.Name)

p := amqp.Publishing{
ContentType: "text/plain",
Body: msg,
MessageId: uuid.New().String(),
DeliveryMode: 2, // persistent messages, saved to disk to survive server restart
Headers: headers,
}

if expiration != nil {
Expand All @@ -274,7 +281,7 @@ func (r *RabbitMQ) Publish(ctx context.Context, queueName string, msg []byte, ex
return err
}

return r.Publish(ctx, queueName, msg, expiration)
return r.Publish(ctx, queueName, msg, headers, expiration)
} else {
return err
}
Expand Down Expand Up @@ -372,7 +379,7 @@ func (r *RabbitMQ) Notify(ctx context.Context, wg *sync.WaitGroup) error {
slog.Error("rabbitmq notify return", "id", returnMsg.MessageId, "err", returnMsg.ReplyText)
slog.Info("rabbitmq attempting republish of returned msg", "id", returnMsg.MessageId)

if err := r.Publish(ctx, r.queue.Name, returnMsg.Body, &returnMsg.Expiration); err != nil {
if err := r.Publish(ctx, r.queue.Name, returnMsg.Body, returnMsg.Headers, &returnMsg.Expiration); err != nil {
slog.Error("error publishing msg", "err", err.Error())
}
}
Expand Down Expand Up @@ -455,45 +462,9 @@ func (r *RabbitMQ) Subscribe(ctx context.Context, msgChan chan<- queue.Message,

if d.Body != nil {
slog.Info("rabbitmq message found", "msgId", d.MessageId)

var timesRetried int64 = 0

var maxRetries int64 = 3

xDeath, exists := d.Headers["x-death"].([]interface{})

if exists {
// message was rejected before
c := xDeath[0].(amqp.Table)["count"].(int64)

timesRetried = c

if timesRetried > 0 {
relayer.MessageSentEventsRetries.Inc()
}
}

if timesRetried > 0 {
slog.Info("rabbitmq message times retried",
"msgId", d.MessageId,
"timesRetried", timesRetried,
)
}

if timesRetried >= int64(maxRetries) {
slog.Info("msg has reached max retries", "id", d.MessageId)

relayer.MessageSentEventsMaxRetriesReached.Inc()

if err := d.Ack(false); err != nil {
slog.Error("error acking msg after max retries")
}
} else {
msgChan <- queue.Message{
Body: d.Body,
Internal: d,
TimesRetried: timesRetried,
}
msgChan <- queue.Message{
Body: d.Body,
Internal: d,
}
} else {
slog.Info("nil body message, queue is closed")
Expand Down
3 changes: 3 additions & 0 deletions packages/relayer/processor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Config struct {
UnprofitableMessageQueueExpiration *string

TxmgrConfigs *txmgr.CLIConfig

MaxMessageRetries uint64
}

// NewConfigFromCliContext creates a new config instance from command line flags.
Expand Down Expand Up @@ -166,6 +168,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
processorPrivateKey,
c,
),
MaxMessageRetries: c.Uint64(flags.MaxMessageRetries.Name),
OpenDBFunc: func() (DB, error) {
return db.OpenDBConnection(db.DBConnectionOpts{
Name: c.String(flags.DatabaseUsername.Name),
Expand Down
28 changes: 17 additions & 11 deletions packages/relayer/processor/process_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,23 @@ func (p *Processor) eventStatusFromMsgHash(
func (p *Processor) processMessage(
ctx context.Context,
msg queue.Message,
) (bool, error) {
) (bool, uint64, error) {
msgBody := &queue.QueueMessageSentBody{}
if err := json.Unmarshal(msg.Body, msgBody); err != nil {
return false, errors.Wrap(err, "json.Unmarshal")
return false, 0, errors.Wrap(err, "json.Unmarshal")
}

slog.Info("message received", "srcTxHash", msgBody.Event.Raw.TxHash.Hex())

if msgBody.TimesRetried >= p.maxMessageRetries {
slog.Warn("max retries reached", "timesRetried", msgBody.TimesRetried)

return false, msgBody.TimesRetried, nil
}

eventStatus, err := p.eventStatusFromMsgHash(ctx, msgBody.Event.MsgHash)
if err != nil {
return false, errors.Wrap(err, "p.eventStatusFromMsgHash")
return false, msgBody.TimesRetried, errors.Wrap(err, "p.eventStatusFromMsgHash")
}

if !canProcessMessage(
Expand All @@ -80,32 +86,32 @@ func (p *Processor) processMessage(
p.relayerAddr,
uint64(msgBody.Event.Message.GasLimit),
) {
return false, nil
return false, msgBody.TimesRetried, nil
}

if err := p.waitForConfirmations(ctx, msgBody.Event.Raw.TxHash, msgBody.Event.Raw.BlockNumber); err != nil {
return false, err
return false, msgBody.TimesRetried, err
}

encodedSignalProof, err := p.generateEncodedSignalProof(ctx, msgBody.Event)
if err != nil {
return false, err
return false, msgBody.TimesRetried, err
}

receipt, err := p.sendProcessMessageCall(ctx, msgBody.Event, encodedSignalProof)
if err != nil {
return false, err
return false, msgBody.TimesRetried, err
}

if receipt.Status != types.ReceiptStatusSuccessful {
return false, err
return false, msgBody.TimesRetried, err
}

messageStatus, err := p.destBridge.MessageStatus(&bind.CallOpts{
Context: ctx,
}, msgBody.Event.MsgHash)
if err != nil {
return false, errors.Wrap(err, "p.destBridge.GetMessageStatus")
return false, msgBody.TimesRetried, errors.Wrap(err, "p.destBridge.GetMessageStatus")
}

slog.Info(
Expand All @@ -125,11 +131,11 @@ func (p *Processor) processMessage(
if msg.Internal != nil {
// update message status
if err := p.eventRepo.UpdateStatus(ctx, msgBody.ID, relayer.EventStatus(messageStatus)); err != nil {
return false, err
return false, msgBody.TimesRetried, err
}
}

return false, nil
return false, msgBody.TimesRetried, nil
}

// generateEncodedSignalproof takes a MessageSent event and calls a
Expand Down
4 changes: 2 additions & 2 deletions packages/relayer/processor/process_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func Test_ProcessMessage_messageUnprocessable(t *testing.T) {
Body: marshalled,
}

shouldRequeue, err := p.processMessage(context.Background(), msg)
shouldRequeue, _, err := p.processMessage(context.Background(), msg)

assert.Nil(t, err)

Expand Down Expand Up @@ -117,7 +117,7 @@ func Test_ProcessMessage_unprofitable(t *testing.T) {
Body: marshalled,
}

shouldRequeue, err := p.processMessage(context.Background(), msg)
shouldRequeue, _, err := p.processMessage(context.Background(), msg)

assert.Equal(
t,
Expand Down
2 changes: 1 addition & 1 deletion packages/relayer/processor/process_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (p *Processor) processSingle(ctx context.Context) error {
return err
}

if _, err := p.processMessage(ctx, queue.Message{
if _, _, err := p.processMessage(ctx, queue.Message{
Body: marshalledMsg,
}); err != nil {
return err
Expand Down
Loading

0 comments on commit 307ddb1

Please sign in to comment.