Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

feat(sender): fix bug in sender #606

Merged
merged 12 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToHigherFork() {
s.Equal(l1Head3.Hash(), l1Head1.Hash())

// Because of evm_revert operation, the nonce of the proposer need to be adjusted.
sender.AdjustNonce(nil)
s.Nil(sender.SetNonce(nil, true))
// Propose ten blocks on another fork
for i := 0; i < 10; i++ {
s.ProposeInvalidTxListBytes(s.p)
Expand Down Expand Up @@ -225,7 +225,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToLowerFork() {
s.Equal(l1Head3.Number.Uint64(), l1Head1.Number.Uint64())
s.Equal(l1Head3.Hash(), l1Head1.Hash())

sender.AdjustNonce(nil)
s.Nil(sender.SetNonce(nil, true))
// Propose one blocks on another fork
s.ProposeInvalidTxListBytes(s.p)

Expand Down Expand Up @@ -283,7 +283,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToSameHeightFork() {
s.Equal(l1Head3.Number.Uint64(), l1Head1.Number.Uint64())
s.Equal(l1Head3.Hash(), l1Head1.Hash())

sender.AdjustNonce(nil)
s.Nil(sender.SetNonce(nil, true))
// Propose two blocks on another fork
s.ProposeInvalidTxListBytes(s.p)
time.Sleep(3 * time.Second)
Expand Down
44 changes: 25 additions & 19 deletions internal/sender/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,33 @@ func (s *Sender) adjustGas(txData types.TxData) {
}
}

// AdjustNonce adjusts the nonce of the given transaction with the current nonce of the sender.
func (s *Sender) AdjustNonce(txData types.TxData) {
nonce, err := s.client.NonceAt(s.ctx, s.Opts.From, nil)
if err != nil {
log.Warn("Failed to get the nonce", "from", s.Opts.From, "err", err)
return
// SetNonce adjusts the nonce of the given transaction with the current nonce of the sender.
func (s *Sender) SetNonce(txData types.TxData, adjust bool) (err error) {
var nonce uint64
if adjust {
s.nonce, err = s.client.NonceAt(s.ctx, s.Opts.From, nil)
if err != nil {
log.Warn("Failed to get the nonce", "from", s.Opts.From, "err", err)
return err
}
}
s.Opts.Nonce = new(big.Int).SetUint64(nonce)

switch tx := txData.(type) {
case *types.DynamicFeeTx:
tx.Nonce = nonce
case *types.BlobTx:
tx.Nonce = nonce
case *types.LegacyTx:
tx.Nonce = nonce
case *types.AccessListTx:
tx.Nonce = nonce
default:
log.Debug("Unsupported transaction type when adjust nonce", "from", s.Opts.From)
nonce = s.nonce

if !utils.IsNil(txData) {
switch tx := txData.(type) {
case *types.DynamicFeeTx:
tx.Nonce = nonce
case *types.BlobTx:
tx.Nonce = nonce
case *types.LegacyTx:
tx.Nonce = nonce
case *types.AccessListTx:
tx.Nonce = nonce
default:
return fmt.Errorf("unsupported transaction type: %v", txData)
}
}
return
}

// updateGasTipGasFee updates the gas tip cap and gas fee cap of the sender with the given chain head info.
Expand Down
95 changes: 70 additions & 25 deletions internal/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ type Sender struct {
head *types.Header
client *rpc.EthClient

Opts *bind.TransactOpts
nonce uint64
Opts *bind.TransactOpts

unconfirmedTxs cmap.ConcurrentMap[string, *TxToConfirm]
txToConfirmCh cmap.ConcurrentMap[string, chan *TxToConfirm]
Expand Down Expand Up @@ -110,6 +111,12 @@ func NewSender(ctx context.Context, cfg *Config, client *rpc.EthClient, priv *ec
}
}

// Get the nonce
nonce, err := client.NonceAt(ctx, opts.From, nil)
if err != nil {
return nil, err
}

// Get the chain ID
head, err := client.HeaderByNumber(ctx, nil)
if err != nil {
Expand All @@ -121,13 +128,12 @@ func NewSender(ctx context.Context, cfg *Config, client *rpc.EthClient, priv *ec
Config: cfg,
head: head,
client: client,
nonce: nonce,
Opts: opts,
unconfirmedTxs: cmap.New[*TxToConfirm](),
txToConfirmCh: cmap.New[chan *TxToConfirm](),
stopCh: make(chan struct{}),
}
// Initialize the nonce
sender.AdjustNonce(nil)

// Initialize the gas fee related fields
if err = sender.updateGasTipGasFee(head); err != nil {
Expand Down Expand Up @@ -177,6 +183,10 @@ func (s *Sender) GetUnconfirmedTx(txID string) *types.Transaction {

// SendRawTransaction sends a transaction to the given Ethereum node.
func (s *Sender) SendRawTransaction(nonce uint64, target *common.Address, value *big.Int, data []byte) (string, error) {
if s.unconfirmedTxs.Count() >= unconfirmedTxsCap {
return "", fmt.Errorf("too many pending transactions")
}

gasLimit := s.GasLimit
if gasLimit == 0 {
var err error
Expand All @@ -192,16 +202,36 @@ func (s *Sender) SendRawTransaction(nonce uint64, target *common.Address, value
return "", err
}
}
return s.SendTransaction(types.NewTx(&types.DynamicFeeTx{
ChainID: s.client.ChainID,
To: target,
Nonce: nonce,
GasFeeCap: s.Opts.GasFeeCap,
GasTipCap: s.Opts.GasTipCap,
Gas: gasLimit,
Value: value,
Data: data,
}))

txID := uuid.New()
txToConfirm := &TxToConfirm{
ID: txID,
originalTx: &types.DynamicFeeTx{
ChainID: s.client.ChainID,
To: target,
Nonce: nonce,
GasFeeCap: s.Opts.GasFeeCap,
GasTipCap: s.Opts.GasTipCap,
Gas: gasLimit,
Value: value,
Data: data,
},
}

if err := s.send(txToConfirm, false); err != nil && !strings.Contains(err.Error(), "replacement transaction") {
log.Error("Failed to send transaction",
"tx_id", txID,
"nonce", txToConfirm.CurrentTx.Nonce(),
"err", err,
)
return "", err
}

// Add the transaction to the unconfirmed transactions
s.unconfirmedTxs.Set(txID, txToConfirm)
s.txToConfirmCh.Set(txID, make(chan *TxToConfirm, 1))

return txID, nil
}

// SendTransaction sends a transaction to the given Ethereum node.
Expand All @@ -222,7 +252,7 @@ func (s *Sender) SendTransaction(tx *types.Transaction) (string, error) {
CurrentTx: tx,
}

if err := s.send(txToConfirm); err != nil && !strings.Contains(err.Error(), "replacement transaction") {
if err := s.send(txToConfirm, true); err != nil && !strings.Contains(err.Error(), "replacement transaction") {
log.Error("Failed to send transaction",
"tx_id", txID,
"nonce", txToConfirm.CurrentTx.Nonce(),
Expand All @@ -240,12 +270,19 @@ func (s *Sender) SendTransaction(tx *types.Transaction) (string, error) {
}

// send is the internal method to send the given transaction.
func (s *Sender) send(tx *TxToConfirm) error {
func (s *Sender) send(tx *TxToConfirm, resetNonce bool) error {
s.mu.Lock()
defer s.mu.Unlock()

originalTx := tx.originalTx

if resetNonce {
// Set the nonce of the transaction.
if err := s.SetNonce(originalTx, false); err != nil {
return err
}
}

for i := 0; i < nonceIncorrectRetrys; i++ {
// Retry when nonce is incorrect
rawTx, err := s.Opts.Signer(s.Opts.From, types.NewTx(originalTx))
Expand All @@ -258,13 +295,21 @@ func (s *Sender) send(tx *TxToConfirm) error {
// Check if the error is nonce too low
if err != nil {
if strings.Contains(err.Error(), "nonce too low") {
s.AdjustNonce(originalTx)
log.Warn("Nonce is incorrect, retry sending the transaction with new nonce",
"tx_id", tx.ID,
"nonce", tx.CurrentTx.Nonce(),
"hash", rawTx.Hash(),
"err", err,
)
if err := s.SetNonce(originalTx, true); err != nil {
log.Error("Failed to set nonce when appear nonce too low",
"tx_id", tx.ID,
"nonce", tx.CurrentTx.Nonce(),
"hash", rawTx.Hash(),
"err", err,
)
} else {
log.Warn("Nonce is incorrect, retry sending the transaction with new nonce",
"tx_id", tx.ID,
"nonce", tx.CurrentTx.Nonce(),
"hash", rawTx.Hash(),
"err", err,
)
}
continue
}
if strings.Contains(err.Error(), "replacement transaction underpriced") {
Expand All @@ -287,7 +332,7 @@ func (s *Sender) send(tx *TxToConfirm) error {
}
break
}
s.Opts.Nonce = new(big.Int).Add(s.Opts.Nonce, common.Big1)
s.nonce++
return nil
}

Expand Down Expand Up @@ -340,7 +385,7 @@ func (s *Sender) resendUnconfirmedTxs() {
s.releaseUnconfirmedTx(id)
continue
}
if err := s.send(unconfirmedTx); err != nil {
if err := s.send(unconfirmedTx, true); err != nil {
log.Warn(
"Failed to resend the transaction",
"tx_id", id,
Expand Down Expand Up @@ -390,7 +435,7 @@ func (s *Sender) checkPendingTransactionsConfirmation() {
}
pendingTx.Receipt = receipt
if receipt.Status != types.ReceiptStatusSuccessful {
pendingTx.Err = fmt.Errorf("transaction reverted, hash: %s", receipt.TxHash)
pendingTx.Err = fmt.Errorf("transaction status is failed, hash: %s", receipt.TxHash)
s.releaseUnconfirmedTx(id)
continue
}
Expand Down
42 changes: 40 additions & 2 deletions internal/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,50 @@ type SenderTestSuite struct {
sender *sender.Sender
}

func (s *SenderTestSuite) TestNormalSender() {
func (s *SenderTestSuite) TestSendTransaction() {
var (
opts = s.sender.Opts
client = s.RPCClient.L1
eg errgroup.Group
)
eg.SetLimit(runtime.NumCPU())
for i := 0; i < 8; i++ {
i := i
eg.Go(func() error {
to := common.BigToAddress(big.NewInt(int64(i)))
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: client.ChainID,
To: &to,
GasFeeCap: opts.GasFeeCap,
GasTipCap: opts.GasTipCap,
Gas: 21000000,
Value: big.NewInt(1),
Data: nil,
})

_, err := s.sender.SendTransaction(tx)
return err
})
}
s.Nil(eg.Wait())

for _, confirmCh := range s.sender.TxToConfirmChannels() {
confirm := <-confirmCh
s.Nil(confirm.Err)
}
}

func (s *SenderTestSuite) TestSendRawTransaction() {
nonce, err := s.RPCClient.L1.NonceAt(context.Background(), s.sender.Opts.From, nil)
s.Nil(err)

var eg errgroup.Group
eg.SetLimit(runtime.NumCPU())
for i := 0; i < 5; i++ {
i := i
eg.Go(func() error {
addr := common.BigToAddress(big.NewInt(int64(i)))
_, err := s.sender.SendRawTransaction(s.sender.Opts.Nonce.Uint64(), &addr, big.NewInt(1), nil)
_, err := s.sender.SendRawTransaction(nonce+uint64(i), &addr, big.NewInt(1), nil)
return err
})
}
Expand Down Expand Up @@ -121,6 +157,7 @@ func (s *SenderTestSuite) TestNonceTooLow() {

func (s *SenderTestSuite) SetupTest() {
s.ClientTestSuite.SetupTest()
s.SetL1Automine(true)

ctx := context.Background()
priv, err := crypto.ToECDSA(common.FromHex(os.Getenv("L1_PROPOSER_PRIVATE_KEY")))
Expand All @@ -137,6 +174,7 @@ func (s *SenderTestSuite) SetupTest() {
}

func (s *SenderTestSuite) TearDownTest() {
s.SetL1Automine(false)
s.sender.Close()
s.ClientTestSuite.TearDownTest()
}
Expand Down
8 changes: 8 additions & 0 deletions internal/testutils/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func (s *ClientTestSuite) ProposeInvalidTxListBytes(proposer Proposer) {
invalidTxListBytes := RandomBytes(256)

s.Nil(proposer.ProposeTxList(context.Background(), invalidTxListBytes, 1))
for _, confirmCh := range proposer.GetSender().TxToConfirmChannels() {
confirm := <-confirmCh
s.Nil(confirm.Err)
}
}

func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks(
Expand All @@ -54,6 +58,10 @@ func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks(
s.Nil(err)

s.Nil(proposer.ProposeTxList(context.Background(), encoded, 0))
for _, confirmCh := range proposer.GetSender().TxToConfirmChannels() {
confirm := <-confirmCh
s.Nil(confirm.Err)
}

s.ProposeInvalidTxListBytes(proposer)

Expand Down
Loading
Loading