From 6b5e9d254f6abe1e467cb3137c7b1a8f35e8cd4d Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Wed, 27 Mar 2024 15:34:28 -0500 Subject: [PATCH 1/5] Feat: add GetChainIDsByStatus() to db service; fetch pending txs by chain --- ethergo/submitter/db/mocks/service.go | 30 +++++++++++++++++++++ ethergo/submitter/db/service.go | 5 +++- ethergo/submitter/db/txdb/store.go | 31 ++++++++++++++++++++- ethergo/submitter/queue.go | 39 ++++++++++++++++++--------- 4 files changed, 90 insertions(+), 15 deletions(-) diff --git a/ethergo/submitter/db/mocks/service.go b/ethergo/submitter/db/mocks/service.go index 2d7b773239..b38708e44b 100644 --- a/ethergo/submitter/db/mocks/service.go +++ b/ethergo/submitter/db/mocks/service.go @@ -62,6 +62,36 @@ func (_m *Service) GetAllTXAttemptByStatus(ctx context.Context, fromAddress comm return r0, r1 } +// GetChainIDsByStatus provides a mock function with given fields: ctx, fromAddress, matchStatuses +func (_m *Service) GetChainIDsByStatus(ctx context.Context, fromAddress common.Address, matchStatuses ...db.Status) ([]*big.Int, error) { + _va := make([]interface{}, len(matchStatuses)) + for _i := range matchStatuses { + _va[_i] = matchStatuses[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, fromAddress) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 []*big.Int + if rf, ok := ret.Get(0).(func(context.Context, common.Address, ...db.Status) []*big.Int); ok { + r0 = rf(ctx, fromAddress, matchStatuses...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*big.Int) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, common.Address, ...db.Status) error); ok { + r1 = rf(ctx, fromAddress, matchStatuses...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetNonceAttemptsByStatus provides a mock function with given fields: ctx, fromAddress, chainID, nonce, matchStatuses func (_m *Service) GetNonceAttemptsByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, nonce uint64, matchStatuses ...db.Status) ([]db.TX, error) { _va := make([]interface{}, len(matchStatuses)) diff --git a/ethergo/submitter/db/service.go b/ethergo/submitter/db/service.go index 6d212f529a..0a7fa3ab0b 100644 --- a/ethergo/submitter/db/service.go +++ b/ethergo/submitter/db/service.go @@ -5,10 +5,11 @@ import ( "database/sql/driver" "errors" "fmt" + "math/big" + "github.com/ethereum/go-ethereum/common" "github.com/synapsecns/sanguine/core/dbcommon" "golang.org/x/exp/slices" - "math/big" ) // Service is the interface for the tx queue database. @@ -35,6 +36,8 @@ type Service interface { GetNonceStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, nonce uint64) (status Status, err error) // GetNonceAttemptsByStatus gets all txs for a given address and chain id with a given status and nonce. GetNonceAttemptsByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, nonce uint64, matchStatuses ...Status) (txs []TX, err error) + // GetChainIDsByStatus gets the distinct chain ids for a given address and status. + GetChainIDsByStatus(ctx context.Context, fromAddress common.Address, matchStatuses ...Status) (chainIDs []*big.Int, err error) } // TransactionFunc is a function that can be passed to DBTransaction. diff --git a/ethergo/submitter/db/txdb/store.go b/ethergo/submitter/db/txdb/store.go index adcf461b11..1e85f6e2a7 100644 --- a/ethergo/submitter/db/txdb/store.go +++ b/ethergo/submitter/db/txdb/store.go @@ -5,6 +5,8 @@ import ( "database/sql" "errors" "fmt" + "math/big" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/imkira/go-interpol" @@ -15,7 +17,6 @@ import ( "github.com/synapsecns/sanguine/ethergo/util" "gorm.io/gorm" "gorm.io/gorm/clause" - "math/big" ) // NewTXStore creates a new transaction store. @@ -353,6 +354,34 @@ func (s *Store) GetNonceAttemptsByStatus(ctx context.Context, fromAddress common return txs, nil } +// GetChainIDsByStatus returns the distinct chain ids for a given address and status. +func (s *Store) GetChainIDsByStatus(ctx context.Context, fromAddress common.Address, matchStatuses ...db.Status) (chainIDs []*big.Int, err error) { + chainIDs64 := []uint64{} + + inArgs := statusToArgs(matchStatuses...) + + query := ETHTX{ + From: fromAddress.String(), + } + + tx := s.DB().WithContext(ctx). + Model(ÐTX{}). + Select(chainIDFieldName). + Distinct(). + Where(query). + Where(fmt.Sprintf("%s IN ?", statusFieldName), inArgs). + Find(&chainIDs64) + if tx.Error != nil { + return nil, fmt.Errorf("could not get chain ids: %w", tx.Error) + } + + for _, chainID64 := range chainIDs64 { + chainIDs = append(chainIDs, new(big.Int).SetUint64(chainID64)) + } + + return chainIDs, nil +} + // DB gets the database. func (s Store) DB() *gorm.DB { return s.db diff --git a/ethergo/submitter/queue.go b/ethergo/submitter/queue.go index 06144105ab..7cfc46f243 100644 --- a/ethergo/submitter/queue.go +++ b/ethergo/submitter/queue.go @@ -3,6 +3,10 @@ package submitter import ( "context" "fmt" + "math/big" + "sync" + "time" + "github.com/ethereum/go-ethereum/core/types" "github.com/lmittmann/w3" "github.com/lmittmann/w3/module/eth" @@ -12,9 +16,6 @@ import ( "github.com/synapsecns/sanguine/ethergo/submitter/db" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "math/big" - "sync" - "time" ) // runSelector runs the selector start loop. @@ -60,24 +61,36 @@ func (t *txSubmitterImpl) processQueue(parentCtx context.Context) (err error) { } }() - // get all the pendingTxes in the queue - pendingTxes, err := t.db.GetTXS(ctx, t.signer.Address(), nil, db.Stored, db.Pending, db.FailedSubmit, db.Submitted) + pendingChainIDs, err := t.db.GetChainIDsByStatus(ctx, t.signer.Address(), db.Stored, db.Pending, db.FailedSubmit, db.Submitted) if err != nil { - return fmt.Errorf("could not get pendingTxes: %w", err) + return fmt.Errorf("could not get pendingChainIDs: %w", err) } - // fetch txes into a map by chainid. - sortedTXsByChainID := sortTxesByChainID(pendingTxes) + pendingChainIDs64 := make([]int64, len(pendingChainIDs)) + for i, chainID := range pendingChainIDs { + pendingChainIDs64[i] = chainID.Int64() + } + span.SetAttributes(attribute.Int64Slice("pending_chain_ids", pendingChainIDs64)) - wg.Add(len(sortedTXsByChainID)) + wg.Add(len(pendingChainIDs)) - for chainID := range sortedTXsByChainID { - go func(chainID uint64) { + for _, chainID := range pendingChainIDs { + go func(chainID *big.Int) { defer wg.Done() - err := t.chainPendingQueue(ctx, new(big.Int).SetUint64(chainID), sortedTXsByChainID[chainID]) + + // get all the pendingTxes in the queue + pendingTxes, err := t.db.GetTXS(ctx, t.signer.Address(), chainID, db.Stored, db.Pending, db.FailedSubmit, db.Submitted) + if err != nil { + span.AddEvent("could not get pendingTxes", trace.WithAttributes( + attribute.String("error", err.Error()), attribute.Int64("chainID", chainID.Int64()), + )) + return + } + + err = t.chainPendingQueue(ctx, chainID, pendingTxes) if err != nil { span.AddEvent("chainPendingQueue error", trace.WithAttributes( - attribute.String("error", err.Error()), attribute.Int64("chainID", int64(chainID)))) + attribute.String("error", err.Error()), attribute.Int64("chainID", chainID.Int64()))) } }(chainID) } From 97c0e6f8adaa092c5610498eb9d623cf9e018077 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Wed, 27 Mar 2024 15:46:53 -0500 Subject: [PATCH 2/5] Feat: add TestGetChainIDsByStatus --- ethergo/submitter/db_test.go | 46 ++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/ethergo/submitter/db_test.go b/ethergo/submitter/db_test.go index e60809935d..ba047c10e5 100644 --- a/ethergo/submitter/db_test.go +++ b/ethergo/submitter/db_test.go @@ -159,3 +159,49 @@ func (t *TXSubmitterDBSuite) TestGetNonceStatus() { } }) } + +func (t *TXSubmitterDBSuite) TestGetChainIDsByStatus() { + t.RunOnAllDBs(func(testDB db.Service) { + chainIDToStatus := map[int64]db.Status{ + 1: db.Pending, + 3: db.Stored, + 4: db.FailedSubmit, + } + expectedPendingChainIDs := []int64{1} + + for _, mockAccount := range t.mockAccounts { + for _, backend := range t.testBackends { + manager := t.managers[backend.GetChainID()] + + // create some test transactions + var txs []*types.Transaction + for i := 0; i < 50; i++ { + legacyTx := &types.LegacyTx{ + To: &mockAccount.Address, + Value: big.NewInt(0), + Nonce: uint64(i), + } + tx, err := manager.SignTx(types.NewTx(legacyTx), backend.Signer(), mockAccount.PrivateKey) + t.Require().NoError(err) + txs = append(txs, tx) + } + + // put the transactions in the database + for _, tx := range txs { + err := testDB.PutTXS(t.GetTestContext(), db.NewTX(tx, chainIDToStatus[backend.GetBigChainID().Int64()], uuid.New().String())) + t.Require().NoError(err) + } + } + + // check which chainIDs are stored with pending status + result, err := testDB.GetChainIDsByStatus(t.GetTestContext(), mockAccount.Address, db.Pending) + t.Require().NoError(err) + + resultInt64 := make([]int64, len(result)) + for i, chainID := range result { + resultInt64[i] = chainID.Int64() + } + t.Equal(expectedPendingChainIDs, resultInt64) + } + }) +} From 43cfbb822664ae8254feb85922aea3b7eef1c3cf Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Wed, 27 Mar 2024 17:35:47 -0500 Subject: [PATCH 3/5] [goreleaser] From 1e2a2309701264bbeab60ce09e4f2730d3776e82 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 28 Mar 2024 13:14:44 -0500 Subject: [PATCH 4/5] Fix: mark ReplacedOrConfirmed on Pending / Stored statuses --- ethergo/submitter/db/txdb/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethergo/submitter/db/txdb/store.go b/ethergo/submitter/db/txdb/store.go index 1e85f6e2a7..07ac502387 100644 --- a/ethergo/submitter/db/txdb/store.go +++ b/ethergo/submitter/db/txdb/store.go @@ -54,7 +54,7 @@ func (s *Store) MarkAllBeforeOrAtNonceReplacedOrConfirmed(ctx context.Context, s Where(fmt.Sprintf("%s <= ?", nonceFieldName), nonce). Where(fmt.Sprintf("`%s` = ?", fromFieldName), signer.String()). // just in case we're updating a tx already marked as confirmed - Where(fmt.Sprintf("%s IN ?", statusFieldName), []int{int(db.Submitted.Int()), int(db.FailedSubmit.Int())}). + Where(fmt.Sprintf("%s IN ?", statusFieldName), []int{int(db.Pending.Int()), int(db.Stored.Int()), int(db.Submitted.Int()), int(db.FailedSubmit.Int())}). Updates(map[string]interface{}{statusFieldName: db.ReplacedOrConfirmed.Int()}) if dbTX.Error != nil { From cbe85eaecfc34702ec542b25e20eb3b93e157673 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 28 Mar 2024 13:14:46 -0500 Subject: [PATCH 5/5] [goreleaser]