Skip to content

Commit

Permalink
Merge branch 'zjg/agg-witness' into zhangkai/add-genesis-oom
Browse files Browse the repository at this point in the history
  • Loading branch information
zjg555543 committed Dec 20, 2024
2 parents 8851865 + cb1b873 commit 90f5197
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 19 deletions.
2 changes: 1 addition & 1 deletion aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func New(
aggLayerClient: aggLayerClient,
sequencerPrivateKey: sequencerPrivateKey,
witnessRetrievalChan: make(chan state.DBBatch),
rpcClient: rpc.NewBatchEndpoints(cfg.RPCURL),
rpcClient: rpc.NewBatchEndpoints(strings.Split(cfg.RPCURLS, ","), cfg.RPCTimeout.Duration),
}

if a.ctx == nil {
Expand Down
7 changes: 5 additions & 2 deletions aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ type Config struct {
// final gas: 1100
GasOffset uint64 `mapstructure:"GasOffset"`

// RPCURL is the URL of the RPC server
RPCURL string `mapstructure:"RPCURL"`
// RPCURLS is the URL of the RPC server
RPCURLS string `mapstructure:"RPCURLS"`

// RPCTimeout is the timeout for the L2 RPC calls
RPCTimeout types.Duration `mapstructure:"RPCTimeout"`

// WitnessURL is the URL of the witness server
WitnessURL string `mapstructure:"WitnessURL"`
Expand Down
4 changes: 2 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ func createSequenceSender(
logger := log.WithFields("module", cdkcommon.SEQUENCE_SENDER)

// Check config
if cfg.SequenceSender.RPCURL == "" {
logger.Fatal("Required field RPCURL is empty in sequence sender config")
if cfg.SequenceSender.RPCURLS == "" {
logger.Fatal("Required field RPCURLS is empty in sequence sender config")
}

ethman, err := etherman.NewClient(ethermanconfig.Config{
Expand Down
6 changes: 4 additions & 2 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ WaitPeriodPurgeTxFile = "15m"
MaxPendingTx = 1
MaxBatchesForL1 = 300
BlockFinality = "FinalizedBlock"
RPCURL = "{{L2URL}}"
RPCURLS = "{{L2URL}}"
RPCTimeout = "60s"
GetBatchWaitInterval = "10s"
[SequenceSender.EthTxManager]
FrequencyToMonitorTxs = "1s"
Expand Down Expand Up @@ -130,7 +131,8 @@ SenderAddress = "{{SenderProofToL1Addr}}"
CleanupLockedProofsInterval = "2m"
GeneratingProofCleanupThreshold = "10m"
GasOffset = 0
RPCURL = "{{L2URL}}"
RPCURLS = "{{L2URL}}"
RPCTimeout = "60s"
WitnessURL = "{{WitnessURL}}"
UseFullWitness = false
SettlementBackend = "l1"
Expand Down
34 changes: 28 additions & 6 deletions rpc/batch.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package rpc

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"time"

"github.com/0xPolygon/cdk-rpc/rpc"
"github.com/0xPolygon/cdk/log"
Expand All @@ -21,11 +23,16 @@ var (
const busyResponse = "busy"

type BatchEndpoints struct {
url string
urlList []string
index int
readTimeout time.Duration
}

func NewBatchEndpoints(url string) *BatchEndpoints {
return &BatchEndpoints{url: url}
func NewBatchEndpoints(urlList []string, readTimeout time.Duration) *BatchEndpoints {
return &BatchEndpoints{
urlList: urlList,
readTimeout: readTimeout,
}
}

func (b *BatchEndpoints) GetBatch(batchNumber uint64) (*types.RPCBatch, error) {
Expand All @@ -45,7 +52,9 @@ func (b *BatchEndpoints) GetBatch(batchNumber uint64) (*types.RPCBatch, error) {

log.Infof("Getting batch %d from RPC", batchNumber)

response, err := rpc.JSONRPCCall(b.url, "zkevm_getBatchByNumber", batchNumber)
ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()
response, err := rpc.JSONRPCCallWithContext(ctx, b.getNextURL(), "zkevm_getBatchByNumber", batchNumber)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -92,7 +101,9 @@ func (b *BatchEndpoints) GetL2BlockTimestamp(blockHash string) (uint64, error) {

log.Infof("Getting l2 block timestamp from RPC. Block hash: %s", blockHash)

response, err := rpc.JSONRPCCall(b.url, "eth_getBlockByHash", blockHash, false)
ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()
response, err := rpc.JSONRPCCallWithContext(ctx, b.getNextURL(), "eth_getBlockByHash", blockHash, false)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -126,7 +137,9 @@ func (b *BatchEndpoints) GetWitness(batchNumber uint64, fullWitness bool) ([]byt

log.Infof("Requesting witness for batch %d of type %s", batchNumber, witnessType)

response, err = rpc.JSONRPCCall(b.url, "zkevm_getBatchWitness", batchNumber, witnessType)
ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()
response, err = rpc.JSONRPCCallWithContext(ctx, b.getNextURL(), "zkevm_getBatchWitness", batchNumber, witnessType)
if err != nil {
return nil, err
}
Expand All @@ -147,3 +160,12 @@ func (b *BatchEndpoints) GetWitness(batchNumber uint64, fullWitness bool) ([]byt

return common.FromHex(witness), nil
}

func (b *BatchEndpoints) getNextURL() string {
b.index++
if b.index >= len(b.urlList) {
b.index = 0
}
log.Infof("Request url: %s", b.urlList[b.index])
return b.urlList[b.index]
}
6 changes: 3 additions & 3 deletions rpc/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func Test_getBatchFromRPC(t *testing.T) {
}))
defer srv.Close()

rcpBatchClient := NewBatchEndpoints(srv.URL)
rcpBatchClient := NewBatchEndpoints([]string{srv.URL}, 0)
rpcBatch, err := rcpBatchClient.GetBatch(tt.batch)
if rpcBatch != nil {
copiedrpcBatch := rpcBatch.DeepCopy()
Expand Down Expand Up @@ -187,7 +187,7 @@ func Test_getBatchWitnessRPC(t *testing.T) {
}))
defer srv.Close()

rcpBatchClient := NewBatchEndpoints(srv.URL)
rcpBatchClient := NewBatchEndpoints([]string{srv.URL}, 0)
witness, err := rcpBatchClient.GetWitness(tt.batch, false)
if tt.expectErr != nil {
require.Equal(t, tt.expectErr.Error(), err.Error())
Expand Down Expand Up @@ -252,7 +252,7 @@ func Test_getGetL2BlockTimestamp(t *testing.T) {
}))
defer srv.Close()

rcpBatchClient := NewBatchEndpoints(srv.URL)
rcpBatchClient := NewBatchEndpoints([]string{srv.URL}, 0)
timestamp, err := rcpBatchClient.GetL2BlockTimestamp(string(tt.blockHash))
if tt.expectErr != nil {
require.Equal(t, tt.expectErr.Error(), err.Error())
Expand Down
7 changes: 5 additions & 2 deletions sequencesender/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ type Config struct {
// BlockFinality indicates the status of the blocks that will be queried in order to sync
BlockFinality string `jsonschema:"enum=LatestBlock, enum=SafeBlock, enum=PendingBlock, enum=FinalizedBlock, enum=EarliestBlock" mapstructure:"BlockFinality"` //nolint:lll

// RPCURL is the URL of the RPC server
RPCURL string `mapstructure:"RPCURL"`
// RPCURLS is the URL of the RPC server
RPCURLS string `mapstructure:"RPCURLS"`

// RPCTimeout is the timeout for the L2 RPC calls
RPCTimeout types.Duration `mapstructure:"RPCTimeout"`

// GetBatchWaitInterval is the time to wait to query for a new batch when there are no more batches available
GetBatchWaitInterval types.Duration `mapstructure:"GetBatchWaitInterval"`
Expand Down
3 changes: 2 additions & 1 deletion sequencesender/sequencesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"os"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -99,7 +100,7 @@ func New(cfg Config, logger *log.Logger,
sequenceData: make(map[uint64]*sequenceData),
validStream: false,
TxBuilder: txBuilder,
rpcClient: rpc.NewBatchEndpoints(cfg.RPCURL),
rpcClient: rpc.NewBatchEndpoints(strings.Split(cfg.RPCURLS, ","), cfg.RPCTimeout.Duration),
}

logger.Infof("TxBuilder configuration: %s", txBuilder.String())
Expand Down

0 comments on commit 90f5197

Please sign in to comment.