From 531de54454d76eaa4fc108c229d02e1d795036ee Mon Sep 17 00:00:00 2001 From: zhangkai Date: Wed, 24 Apr 2024 07:36:47 +0800 Subject: [PATCH 1/2] add pending stat --- docs/config-file/node-config-doc.html | 8 +- docs/config-file/node-config-doc.md | 97 +++++++++++ docs/config-file/node-config-schema.json | 38 +++++ jsonrpc/endpoints_eth_xlayer.go | 9 + jsonrpc/mocks/mock_pool_xlayer.go | 23 +++ jsonrpc/types/interfaces.go | 1 + pool/apollo_xlayer.go | 13 ++ pool/config.go | 3 + pool/interfaces.go | 6 + pool/pendingstat_xlayer.go | 190 +++++++++++++++++++++ pool/pendingstatcache_xlayer.go | 71 ++++++++ pool/pgpoolstorage/pgpoolstorage_xlayer.go | 114 +++++++++++++ pool/pool.go | 2 + pool/pool_xlayer.go | 5 + pool/trace/trace.go | 23 +++ 15 files changed, 602 insertions(+), 1 deletion(-) create mode 100644 pool/pendingstat_xlayer.go create mode 100644 pool/pendingstatcache_xlayer.go create mode 100644 pool/trace/trace.go diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index 8561f49ee2..982fb7455b 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -14,7 +14,13 @@
"300ms"
 

Default: "15s"Type: string

PollMinAllowedGasPriceInterval is the interval to poll the suggested min gas price for a tx


Examples:

"1m"
 
"300ms"
-

Default: 64Type: integer

AccountQueue represents the maximum number of non-executable transaction slots permitted per account


Default: 1024Type: integer

GlobalQueue represents the maximum number of non-executable transaction slots for all accounts


EffectiveGasPrice is the config for the effective gas price calculation
Default: falseType: boolean

Enabled is a flag to enable/disable the effective gas price


Default: 0.25Type: number

L1GasPriceFactor is the percentage of the L1 gas price that will be used as the L2 min gas price


Default: 16Type: integer

ByteGasCost is the gas cost per byte that is not 0


Default: 4Type: integer

ZeroByteGasCost is the gas cost per byte that is 0


Default: 1Type: number

NetProfit is the profit margin to apply to the calculated breakEvenGasPrice


Default: 1.1Type: number

BreakEvenFactor is the factor to apply to the calculated breakevenGasPrice when comparing it with the gasPriceSigned of a tx


Default: 10Type: integer

FinalDeviationPct is the max allowed deviation percentage BreakEvenGasPrice on re-calculation


Default: 0Type: integer

EthTransferGasPrice is the fixed gas price returned as effective gas price for txs tha are ETH transfers (0 means disabled)
Only one of EthTransferGasPrice or EthTransferL1GasPriceFactor params can be different than 0. If both params are set to 0, the sequencer will halt and log an error


Default: 0Type: number

EthTransferL1GasPriceFactor is the percentage of L1 gas price returned as effective gas price for txs tha are ETH transfers (0 means disabled)
Only one of EthTransferGasPrice or EthTransferL1GasPriceFactor params can be different than 0. If both params are set to 0, the sequencer will halt and log an error


Default: 0.5Type: number

L2GasPriceSuggesterFactor is the factor to apply to L1 gas price to get the suggested L2 gas price used in the
calculations when the effective gas price is disabled (testing/metrics purposes)


Default: 0Type: integer

ForkID is the current fork ID of the chain


Default: ["0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"]Type: array of string

XLayer config
FreeGasAddress is the default free gas address

Each item of this array must be:


Default: 150000Type: integer

FreeClaimGasLimit is the max gas allowed use to do a free claim


Type: array of string

BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method

Each item of this array must be:


Configuration for RPC service. THis one offers a extended Ethereum JSON-RPC API interface to interact with the node
Default: "0.0.0.0"Type: string

Host defines the network adapter that will be used to serve the HTTP requests


Default: 8545Type: integer

Port defines the port to serve the endpoints via HTTP


Default: "1m0s"Type: string

ReadTimeout is the HTTP server read timeout
check net/http.server.ReadTimeout and net/http.server.ReadHeaderTimeout


Examples:

"1m"
+

Default: 64Type: integer

AccountQueue represents the maximum number of non-executable transaction slots permitted per account


Default: 1024Type: integer

GlobalQueue represents the maximum number of non-executable transaction slots for all accounts


EffectiveGasPrice is the config for the effective gas price calculation
Default: falseType: boolean

Enabled is a flag to enable/disable the effective gas price


Default: 0.25Type: number

L1GasPriceFactor is the percentage of the L1 gas price that will be used as the L2 min gas price


Default: 16Type: integer

ByteGasCost is the gas cost per byte that is not 0


Default: 4Type: integer

ZeroByteGasCost is the gas cost per byte that is 0


Default: 1Type: number

NetProfit is the profit margin to apply to the calculated breakEvenGasPrice


Default: 1.1Type: number

BreakEvenFactor is the factor to apply to the calculated breakevenGasPrice when comparing it with the gasPriceSigned of a tx


Default: 10Type: integer

FinalDeviationPct is the max allowed deviation percentage BreakEvenGasPrice on re-calculation


Default: 0Type: integer

EthTransferGasPrice is the fixed gas price returned as effective gas price for txs tha are ETH transfers (0 means disabled)
Only one of EthTransferGasPrice or EthTransferL1GasPriceFactor params can be different than 0. If both params are set to 0, the sequencer will halt and log an error


Default: 0Type: number

EthTransferL1GasPriceFactor is the percentage of L1 gas price returned as effective gas price for txs tha are ETH transfers (0 means disabled)
Only one of EthTransferGasPrice or EthTransferL1GasPriceFactor params can be different than 0. If both params are set to 0, the sequencer will halt and log an error


Default: 0.5Type: number

L2GasPriceSuggesterFactor is the factor to apply to L1 gas price to get the suggested L2 gas price used in the
calculations when the effective gas price is disabled (testing/metrics purposes)


Default: 0Type: integer

ForkID is the current fork ID of the chain


Default: ["0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"]Type: array of string

XLayer config
FreeGasAddress is the default free gas address

Each item of this array must be:


Default: 150000Type: integer

FreeClaimGasLimit is the max gas allowed use to do a free claim


Type: array of string

BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method

Each item of this array must be:


PendingStat is the configuration for the pending statistics
Default: falseType: boolean

Default: "0s"Type: string

Examples:

"1m"
+
"300ms"
+

Default: "0s"Type: string

Examples:

"1m"
+
"300ms"
+

Default: "0s"Type: string

Examples:

"1m"
+
"300ms"
+

Configuration for RPC service. THis one offers a extended Ethereum JSON-RPC API interface to interact with the node
Default: "0.0.0.0"Type: string

Host defines the network adapter that will be used to serve the HTTP requests


Default: 8545Type: integer

Port defines the port to serve the endpoints via HTTP


Default: "1m0s"Type: string

ReadTimeout is the HTTP server read timeout
check net/http.server.ReadTimeout and net/http.server.ReadHeaderTimeout


Examples:

"1m"
 
"300ms"
 

Default: "1m0s"Type: string

WriteTimeout is the HTTP server write timeout
check net/http.server.WriteTimeout


Examples:

"1m"
 
"300ms"
diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md
index e2df90a6f2..d141187a2c 100644
--- a/docs/config-file/node-config-doc.md
+++ b/docs/config-file/node-config-doc.md
@@ -709,6 +709,7 @@ SecretKey=""
 | - [FreeGasAddress](#Pool_FreeGasAddress )                                       | No      | array of string | No         | -          | XLayer config
FreeGasAddress is the default free gas address | | - [FreeClaimGasLimit](#Pool_FreeClaimGasLimit ) | No | integer | No | - | FreeClaimGasLimit is the max gas allowed use to do a free claim | | - [BridgeClaimMethodSigs](#Pool_BridgeClaimMethodSigs ) | No | array of string | No | - | BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method | +| - [PendingStat](#Pool_PendingStat ) | No | object | No | - | PendingStat is the configuration for the pending statistics | ### 7.1. `Pool.IntervalToRefreshBlockedAddresses` @@ -1248,6 +1249,102 @@ FreeClaimGasLimit=150000 **Type:** : `array of string` **Description:** BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method +### 7.18. `[Pool.PendingStat]` + +**Type:** : `object` +**Description:** PendingStat is the configuration for the pending statistics + +| Property | Pattern | Type | Deprecated | Definition | Title/Description | +| --------------------------------------------------- | ------- | ------- | ---------- | ---------- | ----------------- | +| - [Enable](#Pool_PendingStat_Enable ) | No | boolean | No | - | - | +| - [Interval](#Pool_PendingStat_Interval ) | No | string | No | - | Duration | +| - [StaleInterval](#Pool_PendingStat_StaleInterval ) | No | string | No | - | Duration | +| - [CacheInternal](#Pool_PendingStat_CacheInternal ) | No | string | No | - | Duration | + +#### 7.18.1. `Pool.PendingStat.Enable` + +**Type:** : `boolean` + +**Default:** `false` + +**Example setting the default value** (false): +``` +[Pool.PendingStat] +Enable=false +``` + +#### 7.18.2. `Pool.PendingStat.Interval` + +**Title:** Duration + +**Type:** : `string` + +**Default:** `"0s"` + +**Examples:** + +```json +"1m" +``` + +```json +"300ms" +``` + +**Example setting the default value** ("0s"): +``` +[Pool.PendingStat] +Interval="0s" +``` + +#### 7.18.3. `Pool.PendingStat.StaleInterval` + +**Title:** Duration + +**Type:** : `string` + +**Default:** `"0s"` + +**Examples:** + +```json +"1m" +``` + +```json +"300ms" +``` + +**Example setting the default value** ("0s"): +``` +[Pool.PendingStat] +StaleInterval="0s" +``` + +#### 7.18.4. `Pool.PendingStat.CacheInternal` + +**Title:** Duration + +**Type:** : `string` + +**Default:** `"0s"` + +**Examples:** + +```json +"1m" +``` + +```json +"300ms" +``` + +**Example setting the default value** ("0s"): +``` +[Pool.PendingStat] +CacheInternal="0s" +``` + ## 8. `[RPC]` **Type:** : `object` diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json index 12d15f76de..5aebce0147 100644 --- a/docs/config-file/node-config-schema.json +++ b/docs/config-file/node-config-schema.json @@ -471,6 +471,44 @@ }, "type": "array", "description": "BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method" + }, + "PendingStat": { + "properties": { + "Enable": { + "type": "boolean", + "default": false + }, + "Interval": { + "type": "string", + "title": "Duration", + "default": "0s", + "examples": [ + "1m", + "300ms" + ] + }, + "StaleInterval": { + "type": "string", + "title": "Duration", + "default": "0s", + "examples": [ + "1m", + "300ms" + ] + }, + "CacheInternal": { + "type": "string", + "title": "Duration", + "default": "0s", + "examples": [ + "1m", + "300ms" + ] + } + }, + "additionalProperties": false, + "type": "object", + "description": "PendingStat is the configuration for the pending statistics" } }, "additionalProperties": false, diff --git a/jsonrpc/endpoints_eth_xlayer.go b/jsonrpc/endpoints_eth_xlayer.go index 9a9fcc4a16..bfde6a4356 100644 --- a/jsonrpc/endpoints_eth_xlayer.go +++ b/jsonrpc/endpoints_eth_xlayer.go @@ -16,6 +16,7 @@ import ( "github.com/0xPolygonHermez/zkevm-node/jsonrpc/metrics" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/types" "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/0xPolygonHermez/zkevm-node/pool" "github.com/0xPolygonHermez/zkevm-node/state" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -397,3 +398,11 @@ func (e *EthEndpoints) getMinPriceFromSequencerNode() (interface{}, types.Error) } return gasPrice, nil } + +// GetPendingStat returns the pending stat +func (e *EthEndpoints) GetPendingStat() (interface{}, types.Error) { + if e.isDisabled("eth_getPendingStat") || (e.pool != nil && !e.pool.IsPendingStatEnabled(context.Background())) { + return RPCErrorResponse(types.DefaultErrorCode, "not supported yet", nil, true) + } + return pool.GetPendingStat(), nil +} diff --git a/jsonrpc/mocks/mock_pool_xlayer.go b/jsonrpc/mocks/mock_pool_xlayer.go index f2bca7c1d3..2b785d244f 100644 --- a/jsonrpc/mocks/mock_pool_xlayer.go +++ b/jsonrpc/mocks/mock_pool_xlayer.go @@ -84,3 +84,26 @@ func (_m *PoolMock) GetMinSuggestedGasPriceWithDelta(ctx context.Context, delta return r0, r1 } + +// IsPendingStatEnabled provides a mock function with given fields: ctx +func (_m *PoolMock) IsPendingStatEnabled(ctx context.Context) bool { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for IsPendingStatEnabled") + } + + var r0 bool + if rf, ok := ret.Get(0).(func(context.Context) bool); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) bool); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(bool) + } + } + + return r0 +} diff --git a/jsonrpc/types/interfaces.go b/jsonrpc/types/interfaces.go index 2a261b753f..07aef05a26 100644 --- a/jsonrpc/types/interfaces.go +++ b/jsonrpc/types/interfaces.go @@ -28,6 +28,7 @@ type PoolInterface interface { AddInnerTx(ctx context.Context, txHash common.Hash, innerTx []byte) error GetInnerTx(ctx context.Context, txHash common.Hash) (string, error) GetMinSuggestedGasPriceWithDelta(ctx context.Context, delta time.Duration) (uint64, error) + IsPendingStatEnabled(ctx context.Context) bool } // StateInterface gathers the methods required to interact with the state. diff --git a/pool/apollo_xlayer.go b/pool/apollo_xlayer.go index 7e8f1b370c..a5e885de0d 100644 --- a/pool/apollo_xlayer.go +++ b/pool/apollo_xlayer.go @@ -14,6 +14,7 @@ type apolloConfig struct { AccountQueue uint64 EnableWhitelist bool BridgeClaimMethods []string + EnablePendingStat bool sync.RWMutex } @@ -56,6 +57,7 @@ func (c *apolloConfig) setBridgeClaimMethods(bridgeClaimMethods []string) { // AccountQueue // FreeGasAddress // EnableWhitelist +// EnablePendingStat func UpdateConfig(apolloConfig Config) { getApolloConfig().Lock() getApolloConfig().EnableApollo = true @@ -64,6 +66,7 @@ func UpdateConfig(apolloConfig Config) { getApolloConfig().setFreeGasAddresses(apolloConfig.FreeGasAddress) getApolloConfig().EnableWhitelist = apolloConfig.EnableWhitelist getApolloConfig().setBridgeClaimMethods(apolloConfig.BridgeClaimMethodSigs) + getApolloConfig().EnablePendingStat = apolloConfig.PendingStat.Enable getApolloConfig().Unlock() } @@ -122,3 +125,13 @@ func getEnableWhitelist(enableWhitelist bool) bool { return enableWhitelist } + +func getEnablePendingStat(enablePendingStat bool) bool { + if getApolloConfig().enable() { + getApolloConfig().RLock() + defer getApolloConfig().RUnlock() + return getApolloConfig().EnablePendingStat + } + + return enablePendingStat +} diff --git a/pool/config.go b/pool/config.go index b6733f1a48..73372bf335 100644 --- a/pool/config.go +++ b/pool/config.go @@ -57,6 +57,9 @@ type Config struct { FreeClaimGasLimit uint64 `mapstructure:"FreeClaimGasLimit"` // BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method BridgeClaimMethodSigs []string `mapstructure:"BridgeClaimMethodSigs"` + + // PendingStat is the configuration for the pending statistics + PendingStat PendingStatCfg `mapstructure:"PendingStat"` } // EffectiveGasPriceCfg contains the configuration properties for the effective gas price diff --git a/pool/interfaces.go b/pool/interfaces.go index 76e8bebd78..45089e0a54 100644 --- a/pool/interfaces.go +++ b/pool/interfaces.go @@ -42,6 +42,12 @@ type storage interface { GetEarliestProcessedTx(ctx context.Context) (common.Hash, error) AddInnerTx(ctx context.Context, txHash common.Hash, innerTx []byte) error GetInnerTx(ctx context.Context, txHash common.Hash) (string, error) + GetPendingFromAndMinNonceBefore(ctx context.Context, timeDuration time.Duration) ([]common.Address, []uint64, error) + LockStat(ctx context.Context, timeDuration time.Duration) (bool, error) + UnLockStat(ctx context.Context) error + UpdateStatAndUnlock(ctx context.Context, totoal, skip, balanceIssue, nonceIssue uint64) error + GetStat(ctx context.Context) (uint64, uint64, uint64, uint64, error) + CountTransactionsByFromStatusAndNonce(ctx context.Context, from common.Address, nonce uint64, status ...TxStatus) (uint64, error) } type stateInterface interface { diff --git a/pool/pendingstat_xlayer.go b/pool/pendingstat_xlayer.go new file mode 100644 index 0000000000..2ccf3ffa23 --- /dev/null +++ b/pool/pendingstat_xlayer.go @@ -0,0 +1,190 @@ +package pool + +import ( + "context" + "fmt" + + "github.com/0xPolygonHermez/zkevm-node/config/types" + "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/0xPolygonHermez/zkevm-node/pool/trace" + "github.com/0xPolygonHermez/zkevm-node/state" + "github.com/ethereum/go-ethereum/common" + "github.com/google/uuid" +) + +// PendingStatCfg is the configuration for the pending stat +type PendingStatCfg struct { + Enable bool `mapstructure:"Enable"` + Interval types.Duration `mapstructure:"Interval"` + StaleInterval types.Duration `mapstructure:"StaleInterval"` + CacheInternal types.Duration `mapstructure:"CacheInternal"` +} + +func (p *Pool) startPendingStat() { + if p.cfg.PendingStat.Enable { + go state.InfiniteSafeRun(p.updatePendingStat, "error updating pending stat", p.cfg.PendingStat.Interval.Duration) + go state.InfiniteSafeRun(p.updatePendingStatCache, "error updating pending stat cache", p.cfg.PendingStat.CacheInternal.Duration) + } +} + +// updatePendingStat updates the pending statistics +// 1. find all pending transactions count +// 2. find all pending address and min nonce received before the stale interval +// 3. find all pending address that skip nonce +// 4. find all pending address that have balance issue +func (p *Pool) updatePendingStat() { + if !getEnablePendingStat(p.cfg.PendingStat.Enable) { + return + } + ctx := context.WithValue(context.Background(), trace.ID, uuid.New().String()) + locked, err := p.storage.LockStat(ctx, p.cfg.PendingStat.Interval.Duration) + if err != nil { + return + } + if !locked { + return + } + defer func() { + err = p.storage.UnLockStat(ctx) + if err != nil { + log.WithFields(trace.GetID(ctx)).Error("error unlocking stat", "err", err) + } + }() + + mLog := log.WithFields(trace.GetID(ctx)) + mLog.Infof("updating pending stat") + + totalCount, err := p.CountTransactionsByStatus(ctx, TxStatusPending) + if err != nil { + mLog.Error("error getting pending transactions count", "err", err) + return + } + mLog.Infof("total pending transactions %v", totalCount) + + address, nonces, err := p.storage.GetPendingFromAndMinNonceBefore(ctx, p.cfg.PendingStat.StaleInterval.Duration) + if err != nil { + mLog.Error("error getting pending address and min nonce before ", err) + return + } + mLog.Infof("pending address count %v before %v", len(address), p.cfg.PendingStat.StaleInterval.Duration) + + skipNonceAddress, continueNonceAddress, continueNonces, errNonceAddress, noncesNormal, err := p.filterAddress(ctx, address, nonces) + if err != nil { + mLog.Error("error filtering skip nonce address", "err", err) + return + } + mLog.Infof("skip nonce address count %v, continue nonce address count %v, err nonce address count %v", len(skipNonceAddress), len(continueNonceAddress), len(errNonceAddress)) + + totalSkipNonceTransactions, err := p.countSkipNonceTransactions(ctx, skipNonceAddress) + if err != nil { + mLog.Error("error counting skip nonce transactions", "err", err) + return + } + mLog.Infof("total skip nonce transactions %v", totalSkipNonceTransactions) + + totalBalanceIssueTransactions, err := p.countBalanceIssueTransactions(ctx, continueNonceAddress, continueNonces) + if err != nil { + mLog.Error("error counting balance issue transactions", "err", err) + return + } + mLog.Infof("total balance issue transactions %v", totalBalanceIssueTransactions) + + totalErrNonceTransactions, err := p.countErrNonceTransactions(ctx, errNonceAddress, noncesNormal) + if err != nil { + mLog.Error("error counting nonce issue transactions", "err", err) + return + } + mLog.Infof("total nonce issue transactions %v", totalErrNonceTransactions) + + err = p.storage.UpdateStatAndUnlock(ctx, totalCount, totalSkipNonceTransactions, totalBalanceIssueTransactions, totalErrNonceTransactions) + if err != nil { + mLog.Error("error updating stat and unlock", "err", err) + } else { + mLog.Infof("total %v, skip nonce %v, balance issue %v, nonce issue %v", totalCount, totalSkipNonceTransactions, totalBalanceIssueTransactions, totalErrNonceTransactions) + } +} + +func (p *Pool) filterAddress(ctx context.Context, addresses []common.Address, nonces []uint64) ([]common.Address, []common.Address, []uint64, []common.Address, []uint64, error) { + var skipNonceAddresses []common.Address + var continueNonceAddresses []common.Address + var continueNonces []uint64 + var errNonceAddresses []common.Address + var noncesNormal []uint64 + + lastL2Block, err := p.state.GetLastL2Block(ctx, nil) + if err != nil { + return nil, nil, nil, nil, nil, fmt.Errorf("failed to load last l2 block while adding tx to the pool: %w", err) + } + for i, addr := range addresses { + nonce, err := p.state.GetNonce(ctx, addr, lastL2Block.Root()) + if err != nil { + return nil, nil, nil, nil, nil, fmt.Errorf("failed to load nonce while adding tx to the pool: %w", err) + } + if nonces[i]-nonce > 0 { + skipNonceAddresses = append(skipNonceAddresses, addr) + } else if (nonces[i] - nonce) == 0 { + continueNonceAddresses = append(continueNonceAddresses, addr) + continueNonces = append(continueNonces, nonces[i]) + } else { + errNonceAddresses = append(errNonceAddresses, addr) + noncesNormal = append(noncesNormal, nonce) + } + } + return skipNonceAddresses, continueNonceAddresses, continueNonces, errNonceAddresses, noncesNormal, nil +} + +func (p *Pool) countSkipNonceTransactions(ctx context.Context, addresses []common.Address) (uint64, error) { + var totalSkipNonceTransactions uint64 + for _, addr := range addresses { + count, err := p.storage.CountTransactionsByFromAndStatus(ctx, addr, TxStatusPending) + if err != nil { + return 0, err + } + totalSkipNonceTransactions += count + } + return totalSkipNonceTransactions, nil +} + +func (p *Pool) countBalanceIssueTransactions(ctx context.Context, addresses []common.Address, nonces []uint64) (uint64, error) { + var totalBalanceIssueTransactions uint64 + mLog := log.WithFields(trace.GetID(ctx)) + + lastL2Block, err := p.state.GetLastL2Block(ctx, nil) + if err != nil { + return 0, fmt.Errorf("failed to load last l2 block while adding tx to the pool: %w", err) + } + + for i, addr := range addresses { + txs, err := p.storage.GetTxsByFromAndNonce(ctx, addr, nonces[i]) + if err != nil || len(txs) > 1 { + mLog.Warnf("error getting transactions by from %v and nonce %v tx count %v, err %v", addr, nonces[i], len(txs), err) + continue + } + balance, err := p.state.GetBalance(ctx, addr, lastL2Block.Root()) + if err != nil { + mLog.Warnf("error getting balance for address %v, l2block %v, err %v", addr, err, lastL2Block.Root()) + continue + } + if balance.Uint64() < txs[0].Value().Uint64() { + count, err := p.storage.CountTransactionsByFromAndStatus(ctx, addr, TxStatusPending) + if err != nil { + mLog.Warnf("error getting transactions count by from %v, err %v", addr, err) + } + totalBalanceIssueTransactions += count + } + } + + return totalBalanceIssueTransactions, nil +} + +func (p *Pool) countErrNonceTransactions(ctx context.Context, addresses []common.Address, nonces []uint64) (uint64, error) { + var totalErrNonceTransactions uint64 + for i, addr := range addresses { + count, err := p.storage.CountTransactionsByFromStatusAndNonce(ctx, addr, nonces[i], TxStatusPending) + if err != nil { + return 0, err + } + totalErrNonceTransactions += count + } + return totalErrNonceTransactions, nil +} diff --git a/pool/pendingstatcache_xlayer.go b/pool/pendingstatcache_xlayer.go new file mode 100644 index 0000000000..0ca6a2134d --- /dev/null +++ b/pool/pendingstatcache_xlayer.go @@ -0,0 +1,71 @@ +package pool + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/0xPolygonHermez/zkevm-node/pool/trace" + "github.com/google/uuid" +) + +// PendingStat is the pending stat +type PendingStat struct { + // Total is the total number of pending transactions + Total uint64 `json:"total"` + + // SkipNonce is the number of transactions that skipped nonce + SkipNonce uint64 `json:"skipNonce"` + + // BalanceIssue is the number of transactions that have balance issue + BalanceIssue uint64 `json:"balanceIssue"` + + // ErrorNonce is the number of transactions that have nonce issue + ErrorNonce uint64 `json:"errorNonce"` +} + +var pendingStatInst *PendingStat +var pendingStatOnce sync.Once + +// GetPendingStat returns the singleton instance +func GetPendingStat() *PendingStat { + pendingStatOnce.Do(func() { + pendingStatInst = &PendingStat{} + }) + return pendingStatInst +} + +func (ps *PendingStat) setStat(stat PendingStat) { + atomic.StoreUint64(&ps.Total, stat.Total) + atomic.StoreUint64(&ps.SkipNonce, stat.SkipNonce) + atomic.StoreUint64(&ps.BalanceIssue, stat.BalanceIssue) +} + +// GetStat returns the pending stat +func (ps *PendingStat) GetStat() PendingStat { + return PendingStat{ + Total: atomic.LoadUint64(&ps.Total), + SkipNonce: atomic.LoadUint64(&ps.SkipNonce), + BalanceIssue: atomic.LoadUint64(&ps.BalanceIssue), + } +} + +func (p *Pool) updatePendingStatCache() { + if !getEnablePendingStat(p.cfg.PendingStat.Enable) { + return + } + ctx := context.WithValue(context.Background(), trace.ID, uuid.New().String()) + mLog := log.WithFields(trace.GetID(ctx)) + total, skip, balanceIssue, nonceIssue, err := p.storage.GetStat(ctx) + if err != nil { + mLog.Errorf("error getting stat: %v", err) + return + } + GetPendingStat().setStat(PendingStat{ + Total: total, + SkipNonce: skip, + BalanceIssue: balanceIssue, + ErrorNonce: nonceIssue, + }) +} diff --git a/pool/pgpoolstorage/pgpoolstorage_xlayer.go b/pool/pgpoolstorage/pgpoolstorage_xlayer.go index 633d3470ef..0d69cdebb0 100644 --- a/pool/pgpoolstorage/pgpoolstorage_xlayer.go +++ b/pool/pgpoolstorage/pgpoolstorage_xlayer.go @@ -3,7 +3,11 @@ package pgpoolstorage import ( "context" "errors" + "time" + "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/0xPolygonHermez/zkevm-node/pool" + "github.com/0xPolygonHermez/zkevm-node/pool/trace" "github.com/ethereum/go-ethereum/common" "github.com/jackc/pgx/v4" ) @@ -65,3 +69,113 @@ func (p *PostgresPoolStorage) GetInnerTx(ctx context.Context, txHash common.Hash return innerTx, nil } + +// GetPendingFromAndMinNonceBefore get pending from and min nonce before timeDuration +func (p *PostgresPoolStorage) GetPendingFromAndMinNonceBefore(ctx context.Context, timeDuration time.Duration) ([]common.Address, []uint64, error) { + sql := `SELECT from_address, MIN(nonce) FROM pool."transaction" where status='pending' and "received_at" < $1 GROUP BY from_address` + + mLog := log.WithFields(trace.GetID(ctx)) + timeStamp := time.Now().Add(-timeDuration) + rows, err := p.db.Query(ctx, sql, timeStamp) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + mLog.Infof("no pending transactions before %v", timeStamp) + return nil, nil, nil + } else { + return nil, nil, err + } + } + defer rows.Close() + + var addresses []common.Address + var nonces []uint64 + for rows.Next() { + var address string + var nonce uint64 + err := rows.Scan(&address, &nonce) + if err != nil { + return nil, nil, err + } + addresses = append(addresses, common.HexToAddress(address)) + nonces = append(nonces, nonce) + } + mLog.Infof("pending address count %v before %v", len(addresses), timeStamp) + + return addresses, nonces, nil +} + +// CREATE TABLE pool.stat ( +// id INT PRIMARY KEY NOT NULL, +// total INT, +// skip_nonce INT, +// balance_issue INT, +// nonce_issue INT, +// locked INT, // 1 for unlocked 2 for locked +// created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, +// updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +// ); +// insert into pool.stat(id, total, skip_nonce, balance_issue, nonce_issue, locked) values(1, 0, 0, 0, 0, 1); + +// LockStat lock stat +func (p *PostgresPoolStorage) LockStat(ctx context.Context, timeDuration time.Duration) (bool, error) { + timeStamp := time.Now().Add(-timeDuration) + sql := `UPDATE pool.stat SET locked = 2 WHERE locked = 1 and updated_at < $1 and id=1` + + stat, err := p.db.Exec(ctx, sql, timeStamp) + if err != nil { + return false, err + } + if stat.RowsAffected() > 0 { + return true, nil + } + + return false, nil +} + +// UnLockStat unlock stat +func (p *PostgresPoolStorage) UnLockStat(ctx context.Context) error { + sql := `UPDATE pool.stat SET locked = 1 WHERE locked = 2 and id=1` + + _, err := p.db.Exec(ctx, sql) + if err != nil { + return err + } + + return nil +} + +// UpdateStatAndUnlock update stat and unlock +func (p *PostgresPoolStorage) UpdateStatAndUnlock(ctx context.Context, totoal, skip, balanceIssue, nonceIssue uint64) error { + sql := `UPDATE pool.stat SET total = $1, skip_nonce = $2, balance_issue = $3, nonce_issue = $4, locked = 1, updated_at = CURRENT_TIMESTAMP WHERE id=1` + + _, err := p.db.Exec(ctx, sql, totoal, skip, balanceIssue, nonceIssue) + if err != nil { + return err + } + + return nil +} + +// GetStat get stat +func (p *PostgresPoolStorage) GetStat(ctx context.Context) (uint64, uint64, uint64, uint64, error) { + sql := `SELECT total, skip_nonce, balance_issue, nonce_issue FROM pool.stat WHERE id=1` + + var total, skip, balanceIssue, nonceIssue uint64 + err := p.db.QueryRow(ctx, sql).Scan(&total, &skip, &balanceIssue, &nonceIssue) + if err != nil { + return 0, 0, 0, 0, err + } + + return total, skip, balanceIssue, nonceIssue, nil +} + +// CountTransactionsByFromStatusAndNonce count transactions by from status and nonce +func (p *PostgresPoolStorage) CountTransactionsByFromStatusAndNonce(ctx context.Context, from common.Address, nonce uint64, status ...pool.TxStatus) (uint64, error) { + sql := "SELECT COUNT(*) FROM pool.transaction WHERE from_address = $1 AND nonce <= $2 AND status = ANY ($3)" + var counter uint64 + err := p.db.QueryRow(ctx, sql, from.String(), nonce, status).Scan(&counter) + if err != nil { + return 0, err + } + return counter, nil +} diff --git a/pool/pool.go b/pool/pool.go index 8ac72acce6..37cd2b5739 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -96,6 +96,8 @@ func NewPool(cfg Config, batchConstraintsCfg state.BatchConstraintsCfg, s storag } }(&cfg, p) + p.startPendingStat() + return p } diff --git a/pool/pool_xlayer.go b/pool/pool_xlayer.go index 6944dbcd14..a2bd2600fb 100644 --- a/pool/pool_xlayer.go +++ b/pool/pool_xlayer.go @@ -84,3 +84,8 @@ func (p *Pool) GetMinSuggestedGasPriceWithDelta(ctx context.Context, delta time. return p.storage.MinL2GasPriceSince(ctx, fromTimestamp) } + +// IsPendingStatEnabled checks if the pending stat is enabled +func (p *Pool) IsPendingStatEnabled(ctx context.Context) bool { + return getEnablePendingStat(p.cfg.PendingStat.Enable) +} diff --git a/pool/trace/trace.go b/pool/trace/trace.go new file mode 100644 index 0000000000..66f375e401 --- /dev/null +++ b/pool/trace/trace.go @@ -0,0 +1,23 @@ +package trace + +import "context" + +type contextKey string + +const ( + // ID is the key to store the trace ID in the context + ID contextKey = "traceID" +) + +// GetID returns the trace ID from the context +func GetID(ctx context.Context) (string, string) { + if ctx == nil || ctx.Value(ID) == nil { + return "", "" + } + return string(ID), ctx.Value(ID).(string) +} + +// String returns the string representation of the context key +func (key contextKey) String() string { + return string(key) +} From 8baae4368379438d19b59c20909c23231cb23c38 Mon Sep 17 00:00:00 2001 From: LeoGuo621 <335209779@qq.com> Date: Wed, 24 Apr 2024 17:32:40 +0800 Subject: [PATCH 2/2] modify dgp congestion condition --- jsonrpc/dynamic_gas_price_xlayer.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/jsonrpc/dynamic_gas_price_xlayer.go b/jsonrpc/dynamic_gas_price_xlayer.go index fc8aec9066..42b4ffe93d 100644 --- a/jsonrpc/dynamic_gas_price_xlayer.go +++ b/jsonrpc/dynamic_gas_price_xlayer.go @@ -10,6 +10,7 @@ import ( zktypes "github.com/0xPolygonHermez/zkevm-node/config/types" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/metrics" "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/0xPolygonHermez/zkevm-node/pool" "github.com/ethereum/go-ethereum/core/types" ) @@ -212,9 +213,20 @@ func (e *EthEndpoints) getL2BatchTxsTips(ctx context.Context, l2BlockNumber uint } func (e *EthEndpoints) isCongested(ctx context.Context) (bool, error) { - txCount, err := e.pool.CountPendingTransactions(ctx) - if err != nil { - return false, err + var txCount uint64 + if e.pool != nil && e.pool.IsPendingStatEnabled(ctx) { + stat := pool.GetPendingStat() + if stat.Total < stat.SkipNonce+stat.BalanceIssue+stat.ErrorNonce { + txCount = 0 + } else { + txCount = stat.Total - stat.SkipNonce - stat.BalanceIssue - stat.ErrorNonce + } + } else { + cnt, err := e.pool.CountPendingTransactions(ctx) + if err != nil { + return false, err + } + txCount = cnt } if txCount >= e.cfg.DynamicGP.CongestionTxThreshold { return true, nil