From d8ef5005cff2f863c7c5701b6a9671c93fea768a Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Mon, 30 Sep 2024 11:20:48 +0200 Subject: [PATCH] feat: prepare txns before routines --- ethtxmanager/ethtxmanager.go | 271 +++++++++-------------------------- ethtxmanager/monitoredtx.go | 68 +++++++++ 2 files changed, 139 insertions(+), 200 deletions(-) diff --git a/ethtxmanager/ethtxmanager.go b/ethtxmanager/ethtxmanager.go index 0124944..de7a312 100644 --- a/ethtxmanager/ethtxmanager.go +++ b/ethtxmanager/ethtxmanager.go @@ -44,12 +44,10 @@ type Client struct { ctx context.Context cancel context.CancelFunc - cfg Config - etherman ethermanInterface - storage storageInterface - from common.Address - nonceMutex sync.Mutex - addTxMutex sync.Mutex + cfg Config + etherman ethermanInterface + storage storageInterface + from common.Address } type pending struct { @@ -154,47 +152,6 @@ func pendingL1Txs(URL string, from common.Address, httpHeaders map[string]string return mTxs, nil } -// getTxNonce get the nonce for the given account -func (c *Client) getTxNonce(ctx context.Context, from common.Address, checkCreatedTxs bool) (uint64, error) { - c.nonceMutex.Lock() - defer c.nonceMutex.Unlock() - - var ( - localNonce, pendingNonce uint64 - err error - ) - - if checkCreatedTxs { - // Get created transactions from the database for the given account - createdTxs, err := c.storage.GetByStatus(ctx, []MonitoredTxStatus{MonitoredTxStatusCreated}) - if err != nil { - return 0, fmt.Errorf("failed to get monitored txs in getTxNonce: %w", err) - } - - if len(createdTxs) > 0 { - // if there are pending txs, we adjust the nonce accordingly - for _, createdTx := range createdTxs { - if createdTx.Nonce > localNonce { - localNonce = createdTx.Nonce - } - } - - localNonce++ - } - } - - // if there are no pending txs, we get the pending nonce from the etherman - if pendingNonce, err = c.etherman.PendingNonce(ctx, from); err != nil { - return 0, fmt.Errorf("failed to get pending nonce: %w", err) - } - - if localNonce > pendingNonce { - return localNonce, nil - } - - return pendingNonce, nil -} - // Add a transaction to be sent and monitored func (c *Client) Add(ctx context.Context, to *common.Address, forcedNonce *uint64, value *big.Int, data []byte, gasOffset uint64, sidecar *types.BlobTxSidecar) (common.Hash, error) { return c.add(ctx, to, forcedNonce, value, data, gasOffset, sidecar, 0) @@ -209,25 +166,12 @@ func (c *Client) add(ctx context.Context, to *common.Address, forcedNonce *uint6 var nonce uint64 var err error - c.addTxMutex.Lock() - defer c.addTxMutex.Unlock() - - if forcedNonce == nil { - // get next nonce - nonce, err = c.getTxNonce(ctx, c.from, true) - if err != nil { - err := fmt.Errorf("failed to get current nonce: %w", err) - log.Errorf(err.Error()) - return common.Hash{}, err - } - } else { + if forcedNonce != nil { + // we should review this forced nonce feature, because now it doesn't make sense to have it + // since we are always updating the nonce before sending the tx nonce = *forcedNonce } - // Avoid nonces to be assigned while a new tx is being created - c.nonceMutex.Lock() - defer c.nonceMutex.Unlock() - // get gas price gasPrice, err := c.suggestedGasPrice(ctx) if err != nil { @@ -495,26 +439,25 @@ func (c *Client) Stop() { // monitorTxs processes all pending monitored txs func (c *Client) monitorTxs(ctx context.Context) error { - statusesFilter := []MonitoredTxStatus{MonitoredTxStatusCreated, MonitoredTxStatusSent} - mTxs, err := c.storage.GetByStatus(ctx, statusesFilter) + iterations, err := c.getMonitoredTxnIteration(ctx) if err != nil { - return fmt.Errorf("failed to get created monitored txs: %v", err) + return fmt.Errorf("failed to get monitored txs: %v", err) } - log.Debugf("found %v monitored tx to process", len(mTxs)) + log.Debugf("found %v monitored tx to process", len(iterations)) wg := sync.WaitGroup{} - wg.Add(len(mTxs)) - for _, mTx := range mTxs { + wg.Add(len(iterations)) + for _, mTx := range iterations { mTx := mTx // force variable shadowing to avoid pointer conflicts - go func(c *Client, mTx monitoredTx) { - mTxLogger := createMonitoredTxLogger(mTx) - defer func(mTx monitoredTx, mTxLogger *log.Logger) { + go func(c *Client, mTx *monitoredTxnIteration) { + mTxLogger := createMonitoredTxLogger(*mTx.monitoredTx) + defer func(mTxLogger *log.Logger) { if err := recover(); err != nil { mTxLogger.Errorf("monitoring recovered from this err: %v", err) } wg.Done() - }(mTx, mTxLogger) + }(mTxLogger) c.monitorTx(ctx, mTx, mTxLogger) }(c, mTx) } @@ -609,82 +552,19 @@ func (c *Client) waitSafeTxToBeFinalized(ctx context.Context) error { } // monitorTx does all the monitoring steps to the monitored tx -func (c *Client) monitorTx(ctx context.Context, mTx monitoredTx, logger *log.Logger) { +func (c *Client) monitorTx(ctx context.Context, mTx *monitoredTxnIteration, logger *log.Logger) { var err error logger.Info("processing") - // check if any of the txs in the history was confirmed - var lastReceiptChecked types.Receipt - // monitored tx is confirmed until we find a successful receipt - confirmed := false - // monitored tx doesn't have a failed receipt until we find a failed receipt for any - // tx in the monitored tx history - hasFailedReceipts := false - // all history txs are considered mined until we can't find a receipt for any - // tx in the monitored tx history - allHistoryTxsWereMined := true - for txHash := range mTx.History { - mined, receipt, err := c.etherman.CheckTxWasMined(ctx, txHash) - if err != nil { - logger.Errorf("failed to check if tx %v was mined: %v", txHash.String(), err) - continue - } - - // if the tx is not mined yet, check that not all the tx were mined and go to the next - if !mined { - allHistoryTxsWereMined = false - continue - } - - lastReceiptChecked = *receipt - - // if the tx was mined successfully we can set it as confirmed and break the loop - if lastReceiptChecked.Status == types.ReceiptStatusSuccessful { - confirmed = true - break - } - - // if the tx was mined but failed, we continue to consider it was not confirmed - // and set that we have found a failed receipt. This info will be used later - // to check if nonce needs to be reviewed - confirmed = false - hasFailedReceipts = true - } - - // we need to check if we need to review the nonce carefully, to avoid sending - // duplicated data to the roll-up and causing an unnecessary trusted state reorg. - // - // if we have failed receipts, this means at least one of the generated txs was mined, - // in this case maybe the current nonce was already consumed(if this is the first iteration - // of this cycle, next iteration might have the nonce already updated by the preivous one), - // then we need to check if there are tx that were not mined yet, if so, we just need to wait - // because maybe one of them will get mined successfully - // - // in case of the monitored tx is not confirmed yet, all tx were mined and none of them were - // mined successfully, we need to review the nonce - if !confirmed && hasFailedReceipts && allHistoryTxsWereMined { - logger.Infof("nonce needs to be updated") - - err := c.reviewMonitoredTxNonce(ctx, mTx, logger) - if err != nil { - logger.Errorf("failed to review monitored tx nonce: %v", err) - return - } - } var signedTx *types.Transaction - if !confirmed { + if !mTx.confirmed { // review tx and increase gas and gas price if needed if mTx.Status == MonitoredTxStatusSent { - err := c.reviewMonitoredTx(ctx, &mTx, logger) + err := c.reviewMonitoredTxGas(ctx, mTx, logger) if err != nil { logger.Errorf("failed to review monitored tx: %v", err) return } - err = c.storage.Update(ctx, mTx) - if err != nil { - logger.Errorf("failed to update monitored tx review change: %v", err) - return - } } // rebuild transaction @@ -708,7 +588,7 @@ func (c *Client) monitorTx(ctx context.Context, mTx monitoredTx, logger *log.Log return } else { // update monitored tx changes into storage - err = c.storage.Update(ctx, mTx) + err = c.storage.Update(ctx, *mTx.monitoredTx) if err != nil { logger.Errorf("failed to update monitored tx: %v", err) return @@ -732,7 +612,7 @@ func (c *Client) monitorTx(ctx context.Context, mTx monitoredTx, logger *log.Log mTx.Status = MonitoredTxStatusSent logger.Debugf("status changed to %v", string(mTx.Status)) // update monitored tx changes into storage - err = c.storage.Update(ctx, mTx) + err = c.storage.Update(ctx, *mTx.monitoredTx) if err != nil { logger.Errorf("failed to update monitored tx changes: %v", err) return @@ -745,7 +625,7 @@ func (c *Client) monitorTx(ctx context.Context, mTx monitoredTx, logger *log.Log log.Infof("waiting signedTx to be mined...") // wait tx to get mined - confirmed, err = c.etherman.WaitTxToBeMined(ctx, signedTx, c.cfg.WaitTxToBeMined.Duration) + confirmed, err := c.etherman.WaitTxToBeMined(ctx, signedTx, c.cfg.WaitTxToBeMined.Duration) if err != nil { logger.Warnf("failed to wait tx to be mined: %v", err) return @@ -772,28 +652,29 @@ func (c *Client) monitorTx(ctx context.Context, mTx monitoredTx, logger *log.Log } } - lastReceiptChecked = *txReceipt + mTx.lastReceipt = txReceipt + mTx.confirmed = confirmed } // if mined, check receipt and mark as Failed or Confirmed - if lastReceiptChecked.Status == types.ReceiptStatusSuccessful { + if mTx.lastReceipt.Status == types.ReceiptStatusSuccessful { mTx.Status = MonitoredTxStatusMined - mTx.BlockNumber = lastReceiptChecked.BlockNumber + mTx.BlockNumber = mTx.lastReceipt.BlockNumber logger.Info("mined") } else { // if we should continue to monitor, we move to the next one and this will // be reviewed in the next monitoring cycle - if c.shouldContinueToMonitorThisTx(ctx, lastReceiptChecked) { + if c.shouldContinueToMonitorThisTx(ctx, mTx.lastReceipt) { return } // otherwise we understand this monitored tx has failed mTx.Status = MonitoredTxStatusFailed - mTx.BlockNumber = lastReceiptChecked.BlockNumber + mTx.BlockNumber = mTx.lastReceipt.BlockNumber logger.Info("failed") } // update monitored tx changes into storage - err = c.storage.Update(ctx, mTx) + err = c.storage.Update(ctx, *mTx.monitoredTx) if err != nil { logger.Errorf("failed to update monitored tx: %v", err) return @@ -802,7 +683,7 @@ func (c *Client) monitorTx(ctx context.Context, mTx monitoredTx, logger *log.Log // shouldContinueToMonitorThisTx checks the the tx receipt and decides if it should // continue or not to monitor the monitored tx related to the tx from this receipt -func (c *Client) shouldContinueToMonitorThisTx(ctx context.Context, receipt types.Receipt) bool { +func (c *Client) shouldContinueToMonitorThisTx(ctx context.Context, receipt *types.Receipt) bool { // if the receipt has a is successful result, stop monitoring if receipt.Status == types.ReceiptStatusSuccessful { return false @@ -826,14 +707,16 @@ func (c *Client) shouldContinueToMonitorThisTx(ctx context.Context, receipt type return false } -// reviewMonitoredTx checks if some field needs to be updated +// reviewMonitoredTxGas checks if gas fields needs to be updated // accordingly to the current information stored and the current // state of the blockchain -func (c *Client) reviewMonitoredTx(ctx context.Context, mTx *monitoredTx, mTxLogger *log.Logger) error { +func (c *Client) reviewMonitoredTxGas(ctx context.Context, mTx *monitoredTxnIteration, mTxLogger *log.Logger) error { mTxLogger.Debug("reviewing") - isBlobTx := mTx.BlobSidecar != nil - var err error - var gas uint64 + var ( + isBlobTx = mTx.BlobSidecar != nil + err error + gas uint64 + ) // get gas price gasPrice, err := c.suggestedGasPrice(ctx) @@ -912,66 +795,54 @@ func (c *Client) reviewMonitoredTx(ctx context.Context, mTx *monitoredTx, mTxLog mTxLogger.Infof("monitored tx (blob? %t) Gas updated from %v to %v", isBlobTx, mTx.Gas, gas) mTx.Gas = gas } - return nil -} -// reviewMonitoredTxNonce checks if the nonce needs to be updated accordingly to -// the current nonce of the sender account. -// -// IMPORTANT: Nonce is reviewed apart from the other fields because it is a very -// sensible information and can make duplicated data to be sent to the blockchain, -// causing possible side effects and wasting resources. -func (c *Client) reviewMonitoredTxNonce(ctx context.Context, mTx monitoredTx, mTxLogger *log.Logger) error { - // Avoid txs being added while we are reviewing the nonce - c.addTxMutex.Lock() - defer c.addTxMutex.Unlock() - - prevNonce := mTx.Nonce - - mTxLogger.Debug("reviewing nonce") - currentNonce, err := c.getTxNonce(ctx, mTx.From, false) + err = c.storage.Update(ctx, *mTx.monitoredTx) if err != nil { - err := fmt.Errorf("failed to load current nonce for acc %v: %w", mTx.From.String(), err) - mTxLogger.Errorf(err.Error()) - return err + return fmt.Errorf("failed to update monitored tx changes: %w", err) } - if currentNonce > mTx.Nonce { - mTxLogger.Infof("monitored tx nonce updated from %v to %v", mTx.Nonce, currentNonce) - mTx.Nonce = currentNonce - - err = c.storage.Update(ctx, mTx) - if err != nil { - mTxLogger.Errorf("failed to update monitored tx nonce change: %v", err) - return err - } - } + return nil +} - // we need to update the rest of pending txs nonces - // to avoid nonce conflicts - createdTxs, err := c.storage.GetByStatus(ctx, []MonitoredTxStatus{MonitoredTxStatusCreated}) +func (c *Client) getMonitoredTxnIteration(ctx context.Context) ([]*monitoredTxnIteration, error) { + txsToUpdate, err := c.storage.GetByStatus(ctx, []MonitoredTxStatus{MonitoredTxStatusCreated, MonitoredTxStatusSent}) if err != nil { - mTxLogger.Errorf("failed to get created monitored txs: %v", err) - return err + return nil, fmt.Errorf("failed to get txs to update nonces: %w", err) } - for _, cTx := range createdTxs { - // Avoid memory aliasing - createdTx := cTx - if createdTx.Nonce > prevNonce && createdTx.Nonce < currentNonce { - currentNonce++ - mTxLogger.Infof("monitored tx nonce updated from %v to %v", createdTx.Nonce, currentNonce) - createdTx.Nonce = currentNonce + iterations := make([]*monitoredTxnIteration, 0, len(txsToUpdate)) + senderNonces := make(map[common.Address]uint64) + + for _, tx := range txsToUpdate { + iteration := &monitoredTxnIteration{monitoredTx: &tx} + iterations = append(iterations, iteration) - err = c.storage.Update(ctx, createdTx) + updateNonce := iteration.shouldUpdateNonce(ctx, c.etherman) + if !updateNonce { + continue + } + + nonce, ok := senderNonces[tx.From] + if !ok { + // if there are no pending txs, we get the pending nonce from the etherman + nonce, err = c.etherman.PendingNonce(ctx, tx.From) if err != nil { - mTxLogger.Errorf("failed to update monitored tx nonce change: %v", err) - return err + return nil, fmt.Errorf("failed to get pending nonce for sender: %s. Error: %w", tx.From, err) } + + senderNonces[tx.From] = nonce } + + iteration.Nonce = nonce + err = c.storage.Update(ctx, tx) + if err != nil { + return nil, fmt.Errorf("failed to update nonce for tx %v: %w", tx.ID.String(), err) + } + + senderNonces[tx.From]++ } - return nil + return iterations, nil } func (c *Client) suggestedGasPrice(ctx context.Context) (*big.Int, error) { diff --git a/ethtxmanager/monitoredtx.go b/ethtxmanager/monitoredtx.go index 17e3c00..585e9bd 100644 --- a/ethtxmanager/monitoredtx.go +++ b/ethtxmanager/monitoredtx.go @@ -1,6 +1,7 @@ package ethtxmanager import ( + "context" "math/big" "time" @@ -171,3 +172,70 @@ type TxResult struct { Receipt *types.Receipt RevertMessage string } + +type monitoredTxnIteration struct { + *monitoredTx + confirmed bool + lastReceipt *types.Receipt +} + +func (m *monitoredTxnIteration) shouldUpdateNonce(ctx context.Context, etherman ethermanInterface) bool { + if m.Status == MonitoredTxStatusCreated { + // transaction was not sent, so no need to check if it was mined + // we need to update the nonce in this case + return true + } + + // check if any of the txs in the history was confirmed + var lastReceiptChecked *types.Receipt + // monitored tx is confirmed until we find a successful receipt + confirmed := false + // monitored tx doesn't have a failed receipt until we find a failed receipt for any + // tx in the monitored tx history + hasFailedReceipts := false + // all history txs are considered mined until we can't find a receipt for any + // tx in the monitored tx history + allHistoryTxsWereMined := true + for txHash := range m.History { + mined, receipt, err := etherman.CheckTxWasMined(ctx, txHash) + if err != nil { + continue + } + + // if the tx is not mined yet, check that not all the tx were mined and go to the next + if !mined { + allHistoryTxsWereMined = false + continue + } + + lastReceiptChecked = receipt + + // if the tx was mined successfully we can set it as confirmed and break the loop + if lastReceiptChecked.Status == types.ReceiptStatusSuccessful { + confirmed = true + break + } + + // if the tx was mined but failed, we continue to consider it was not confirmed + // and set that we have found a failed receipt. This info will be used later + // to check if nonce needs to be reviewed + confirmed = false + hasFailedReceipts = true + } + + m.confirmed = confirmed + m.lastReceipt = lastReceiptChecked + + // we need to check if we need to review the nonce carefully, to avoid sending + // duplicated data to the roll-up and causing an unnecessary trusted state reorg. + // + // if we have failed receipts, this means at least one of the generated txs was mined, + // in this case maybe the current nonce was already consumed(if this is the first iteration + // of this cycle, next iteration might have the nonce already updated by the preivous one), + // then we need to check if there are tx that were not mined yet, if so, we just need to wait + // because maybe one of them will get mined successfully + // + // in case of the monitored tx is not confirmed yet, all tx were mined and none of them were + // mined successfully, we need to review the nonce + return !confirmed && hasFailedReceipts && allHistoryTxsWereMined +}