From cb1b8733f241b84092fda558f3475871cba7aeed Mon Sep 17 00:00:00 2001 From: Barry Date: Thu, 19 Dec 2024 17:57:59 +0800 Subject: [PATCH] update --- aggregator/aggregator.go | 2 +- aggregator/config.go | 7 +++++-- cmd/run.go | 4 ++-- config/default.go | 6 ++++-- rpc/batch.go | 34 ++++++++++++++++++++++++++------ rpc/batch_test.go | 6 +++--- sequencesender/config.go | 7 +++++-- sequencesender/sequencesender.go | 3 ++- 8 files changed, 50 insertions(+), 19 deletions(-) diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 9b89d557..01e48b54 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -158,7 +158,7 @@ func New( aggLayerClient: aggLayerClient, sequencerPrivateKey: sequencerPrivateKey, witnessRetrievalChan: make(chan state.DBBatch), - rpcClient: rpc.NewBatchEndpoints(cfg.RPCURL), + rpcClient: rpc.NewBatchEndpoints(strings.Split(cfg.RPCURLS, ","), cfg.RPCTimeout.Duration), } if a.ctx == nil { diff --git a/aggregator/config.go b/aggregator/config.go index e17d68af..8e4d1750 100644 --- a/aggregator/config.go +++ b/aggregator/config.go @@ -99,8 +99,11 @@ type Config struct { // final gas: 1100 GasOffset uint64 `mapstructure:"GasOffset"` - // RPCURL is the URL of the RPC server - RPCURL string `mapstructure:"RPCURL"` + // RPCURLS is the URL of the RPC server + RPCURLS string `mapstructure:"RPCURLS"` + + // RPCTimeout is the timeout for the L2 RPC calls + RPCTimeout types.Duration `mapstructure:"RPCTimeout"` // WitnessURL is the URL of the witness server WitnessURL string `mapstructure:"WitnessURL"` diff --git a/cmd/run.go b/cmd/run.go index 365e2407..155ce703 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -234,8 +234,8 @@ func createSequenceSender( logger := log.WithFields("module", cdkcommon.SEQUENCE_SENDER) // Check config - if cfg.SequenceSender.RPCURL == "" { - logger.Fatal("Required field RPCURL is empty in sequence sender config") + if cfg.SequenceSender.RPCURLS == "" { + logger.Fatal("Required field RPCURLS is empty in sequence sender config") } ethman, err := etherman.NewClient(ethermanconfig.Config{ diff --git a/config/default.go b/config/default.go index 6a505b88..03a5346f 100644 --- a/config/default.go +++ b/config/default.go @@ -90,7 +90,8 @@ WaitPeriodPurgeTxFile = "15m" MaxPendingTx = 1 MaxBatchesForL1 = 300 BlockFinality = "FinalizedBlock" -RPCURL = "{{L2URL}}" +RPCURLS = "{{L2URL}}" +RPCTimeout = "60s" GetBatchWaitInterval = "10s" [SequenceSender.EthTxManager] FrequencyToMonitorTxs = "1s" @@ -130,7 +131,8 @@ SenderAddress = "{{SenderProofToL1Addr}}" CleanupLockedProofsInterval = "2m" GeneratingProofCleanupThreshold = "10m" GasOffset = 0 -RPCURL = "{{L2URL}}" +RPCURLS = "{{L2URL}}" +RPCTimeout = "60s" WitnessURL = "{{WitnessURL}}" UseFullWitness = false SettlementBackend = "l1" diff --git a/rpc/batch.go b/rpc/batch.go index 59e10b20..fdc2e340 100644 --- a/rpc/batch.go +++ b/rpc/batch.go @@ -1,10 +1,12 @@ package rpc import ( + "context" "encoding/json" "errors" "fmt" "math/big" + "time" "github.com/0xPolygon/cdk-rpc/rpc" "github.com/0xPolygon/cdk/log" @@ -21,11 +23,16 @@ var ( const busyResponse = "busy" type BatchEndpoints struct { - url string + urlList []string + index int + readTimeout time.Duration } -func NewBatchEndpoints(url string) *BatchEndpoints { - return &BatchEndpoints{url: url} +func NewBatchEndpoints(urlList []string, readTimeout time.Duration) *BatchEndpoints { + return &BatchEndpoints{ + urlList: urlList, + readTimeout: readTimeout, + } } func (b *BatchEndpoints) GetBatch(batchNumber uint64) (*types.RPCBatch, error) { @@ -45,7 +52,9 @@ func (b *BatchEndpoints) GetBatch(batchNumber uint64) (*types.RPCBatch, error) { log.Infof("Getting batch %d from RPC", batchNumber) - response, err := rpc.JSONRPCCall(b.url, "zkevm_getBatchByNumber", batchNumber) + ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) + defer cancel() + response, err := rpc.JSONRPCCallWithContext(ctx, b.getNextURL(), "zkevm_getBatchByNumber", batchNumber) if err != nil { return nil, err } @@ -92,7 +101,9 @@ func (b *BatchEndpoints) GetL2BlockTimestamp(blockHash string) (uint64, error) { log.Infof("Getting l2 block timestamp from RPC. Block hash: %s", blockHash) - response, err := rpc.JSONRPCCall(b.url, "eth_getBlockByHash", blockHash, false) + ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) + defer cancel() + response, err := rpc.JSONRPCCallWithContext(ctx, b.getNextURL(), "eth_getBlockByHash", blockHash, false) if err != nil { return 0, err } @@ -126,7 +137,9 @@ func (b *BatchEndpoints) GetWitness(batchNumber uint64, fullWitness bool) ([]byt log.Infof("Requesting witness for batch %d of type %s", batchNumber, witnessType) - response, err = rpc.JSONRPCCall(b.url, "zkevm_getBatchWitness", batchNumber, witnessType) + ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) + defer cancel() + response, err = rpc.JSONRPCCallWithContext(ctx, b.getNextURL(), "zkevm_getBatchWitness", batchNumber, witnessType) if err != nil { return nil, err } @@ -147,3 +160,12 @@ func (b *BatchEndpoints) GetWitness(batchNumber uint64, fullWitness bool) ([]byt return common.FromHex(witness), nil } + +func (b *BatchEndpoints) getNextURL() string { + b.index++ + if b.index >= len(b.urlList) { + b.index = 0 + } + log.Infof("Request url: %s", b.urlList[b.index]) + return b.urlList[b.index] +} diff --git a/rpc/batch_test.go b/rpc/batch_test.go index d6940bf3..8f13b7ff 100644 --- a/rpc/batch_test.go +++ b/rpc/batch_test.go @@ -103,7 +103,7 @@ func Test_getBatchFromRPC(t *testing.T) { })) defer srv.Close() - rcpBatchClient := NewBatchEndpoints(srv.URL) + rcpBatchClient := NewBatchEndpoints([]string{srv.URL}, 0) rpcBatch, err := rcpBatchClient.GetBatch(tt.batch) if rpcBatch != nil { copiedrpcBatch := rpcBatch.DeepCopy() @@ -187,7 +187,7 @@ func Test_getBatchWitnessRPC(t *testing.T) { })) defer srv.Close() - rcpBatchClient := NewBatchEndpoints(srv.URL) + rcpBatchClient := NewBatchEndpoints([]string{srv.URL}, 0) witness, err := rcpBatchClient.GetWitness(tt.batch, false) if tt.expectErr != nil { require.Equal(t, tt.expectErr.Error(), err.Error()) @@ -252,7 +252,7 @@ func Test_getGetL2BlockTimestamp(t *testing.T) { })) defer srv.Close() - rcpBatchClient := NewBatchEndpoints(srv.URL) + rcpBatchClient := NewBatchEndpoints([]string{srv.URL}, 0) timestamp, err := rcpBatchClient.GetL2BlockTimestamp(string(tt.blockHash)) if tt.expectErr != nil { require.Equal(t, tt.expectErr.Error(), err.Error()) diff --git a/sequencesender/config.go b/sequencesender/config.go index 4f77500b..9a65cfff 100644 --- a/sequencesender/config.go +++ b/sequencesender/config.go @@ -65,8 +65,11 @@ type Config struct { // BlockFinality indicates the status of the blocks that will be queried in order to sync BlockFinality string `jsonschema:"enum=LatestBlock, enum=SafeBlock, enum=PendingBlock, enum=FinalizedBlock, enum=EarliestBlock" mapstructure:"BlockFinality"` //nolint:lll - // RPCURL is the URL of the RPC server - RPCURL string `mapstructure:"RPCURL"` + // RPCURLS is the URL of the RPC server + RPCURLS string `mapstructure:"RPCURLS"` + + // RPCTimeout is the timeout for the L2 RPC calls + RPCTimeout types.Duration `mapstructure:"RPCTimeout"` // GetBatchWaitInterval is the time to wait to query for a new batch when there are no more batches available GetBatchWaitInterval types.Duration `mapstructure:"GetBatchWaitInterval"` diff --git a/sequencesender/sequencesender.go b/sequencesender/sequencesender.go index 9bede87f..1ca747fd 100644 --- a/sequencesender/sequencesender.go +++ b/sequencesender/sequencesender.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "os" + "strings" "sync" "sync/atomic" "time" @@ -98,7 +99,7 @@ func New(cfg Config, logger *log.Logger, sequenceData: make(map[uint64]*sequenceData), validStream: false, TxBuilder: txBuilder, - rpcClient: rpc.NewBatchEndpoints(cfg.RPCURL), + rpcClient: rpc.NewBatchEndpoints(strings.Split(cfg.RPCURLS, ","), cfg.RPCTimeout.Duration), } logger.Infof("TxBuilder configuration: %s", txBuilder.String())