Skip to content

Commit

Permalink
feat(SPV-1026): broadcast cron job removed
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-4chain committed Sep 5, 2024
1 parent a134b1a commit 4413d57
Show file tree
Hide file tree
Showing 7 changed files with 0 additions and 371 deletions.
5 changes: 0 additions & 5 deletions engine/cron_job_declarations.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ func (c *Client) cronJobs() taskmanager.CronJobs {
60*time.Second,
taskCleanupDraftTransactions,
)
addJob(
CronJobNameSyncTransactionBroadcast,
2*time.Minute,
taskBroadcastTransactions,
)
addJob(
CronJobNameSyncTransactionSync,
5*time.Minute,
Expand Down
11 changes: 0 additions & 11 deletions engine/cron_job_definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,6 @@ func taskCleanupDraftTransactions(ctx context.Context, client *Client) error {
return nil
}

// taskBroadcastTransactions will broadcast any transactions
func taskBroadcastTransactions(ctx context.Context, client *Client) error {
client.Logger().Info().Msg("running broadcast transaction(s) task...")

err := processBroadcastTransactions(ctx, 1000, WithClient(client))
if err == nil || errors.Is(err, datastore.ErrNoResults) {
return nil
}
return err
}

// taskSyncTransactions will sync any transactions
func taskSyncTransactions(ctx context.Context, client *Client) error {
logClient := client.Logger()
Expand Down
85 changes: 0 additions & 85 deletions engine/model_sync_transactions_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package engine

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -24,90 +23,6 @@ func TestSyncTransaction_GetModelName(t *testing.T) {
})
}

func Test_areParentsBroadcast(t *testing.T) {
ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup())
defer deferMe()

opts := []ModelOps{WithClient(client)}

tx, err := txFromHex(testTxHex, append(opts, New())...)
require.NoError(t, err)

txErr := tx.Save(ctx)
require.NoError(t, txErr)

tx2, err := txFromHex(testTx2Hex, append(opts, New())...)
require.NoError(t, err)

txErr = tx2.Save(ctx)
require.NoError(t, txErr)

tx3, err := txFromHex(testTx3Hex, append(opts, New())...)
require.NoError(t, err)

txErr = tx3.Save(ctx)
require.NoError(t, txErr)

// input of testTxID
syncTx := newSyncTransaction("65bb8d2733298b2d3b441a871868d6323c5392facf0d3eced3a6c6a17dc84c10", &SyncConfig{SyncOnChain: false, Broadcast: false}, append(opts, New())...)
syncTx.BroadcastStatus = SyncStatusComplete
txErr = syncTx.Save(ctx)
require.NoError(t, txErr)

// input of testTxInID
syncTx = newSyncTransaction("89fbccca3a5e2bfc8a161bf7f54e8cb5898e296ae8c23b620b89ed570711f931", &SyncConfig{SyncOnChain: false, Broadcast: false}, append(opts, New())...)
txErr = syncTx.Save(ctx)
require.NoError(t, txErr)

type args struct {
tx *Transaction
opts []ModelOps
}
tests := []struct {
name string
args args
want bool
wantErr assert.ErrorAssertionFunc
}{
{
name: "no parents",
args: args{
tx: tx3,
opts: opts,
},
want: true,
wantErr: assert.NoError,
},
{
name: "parent not broadcast",
args: args{
tx: tx2,
opts: opts,
},
want: false,
wantErr: assert.NoError,
},
{
name: "parent broadcast",
args: args{
tx: tx,
opts: opts,
},
want: true,
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := _areParentsBroadcasted(ctx, tt.args.tx, tt.args.opts...)
if !tt.wantErr(t, err, fmt.Sprintf("areParentsBroadcast(%v, %v, %v)", ctx, tt.args.tx, tt.args.opts)) {
return
}
assert.Equalf(t, tt.want, got, "areParentsBroadcast(%v, %v, %v)", ctx, tt.args.tx, tt.args.opts)
})
}
}

func TestSyncTransaction_SaveHook(t *testing.T) {
t.Parallel()

Expand Down
1 change: 0 additions & 1 deletion engine/model_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ var (
testTxID = "1b52eac9d1eb0adf3ce6a56dee1c4768780b8126e288aca65dd1db32f173b853"
testTxID2 = "104cc87da1c6a6d3ce3e0dcffa92533c32d66818871a443b2d8b2933278dbb65"
testTx2Hex = "020000000189fbccca3a5e2bfc8a161bf7f54e8cb5898e296ae8c23b620b89ed570711f931000000006a47304402204e94380ae4d27f8bb9b40dd9944b4fea532d5fe12cf62c1994a6a495c81490f202204aab42f8f1b15259a032e58a3810fbbfd691771b92317f8a12a0da84761a400641210382229c0295e4d63ee54c541eba40be2963f0e80489b7da34e022d513a723181fffffffff0259970400000000001976a914e069bd2e2fe3ea702c40d5e65b491b734c01686788ac00000000000000000c006a09446f7457616c6c657400000000"
testTx3Hex = "01000000012c1466b3f92c703033fd1d21c1ff3a8b4ab8ceb32debfa9f7c3b2eb21b97dabf010000006a47304402204212fbb123f339c75eb28d3d6254af4ff1b9aa7c163e8f6cbb187ed49e12aaa5022010713e5b4bac82aea2232529370c2b41d3f6d5bfae0e5fb5689d74a7d29b3e48412103f26186a9f5ee7efaaf614de6451f2ad67712e728d4e1ac705cc73550546817e7feffffff01d8aa191e000000001976a914010af176de3faac864f148461340be6a7bb9eff488ac00000000"
testTxInID = "9b0495704e23e4b3bef3682c6a5c40abccc32a3e6b7b01ae3295e93a9d3a0482"
testTxInScriptPubKey = "76a914e069bd2e2fe3ea702c40d5e65b491b734c01686788ac"
testTxScriptPubKey1 = "76a91413473d21dc9e1fb392f05a028b447b165a052d4d88ac"
Expand Down
76 changes: 0 additions & 76 deletions engine/sync_tx_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package engine

import (
"context"
"encoding/hex"
"errors"

"github.com/bitcoin-sv/spv-wallet/engine/datastore"
"github.com/bitcoin-sv/spv-wallet/engine/spverrors"
"github.com/libsv/go-bt/v2"
)

/*** exported funcs ***/
Expand Down Expand Up @@ -54,54 +51,6 @@ func GetSyncTransactionByTxID(ctx context.Context, txID string, opts ...ModelOps

/*** public unexported funcs ***/

// getTransactionsToBroadcast will get the sync transactions to broadcast
func getTransactionsToBroadcast(ctx context.Context, queryParams *datastore.QueryParams,
opts ...ModelOps,
) ([]*SyncTransaction, error) {
// Get the records by status
scTxs, err := _getSyncTransactionsByConditions(
ctx,
map[string]interface{}{
broadcastStatusField: SyncStatusReady.String(),
},
queryParams, opts...,
)
if err != nil {
return nil, err
} else if len(scTxs) == 0 {
return nil, nil
}

// hydrate and see if it's ready to sync
res := make([]*SyncTransaction, 0, len(scTxs))

for _, sTx := range scTxs {
// hydrate
sTx.transaction, err = getTransactionByID(
ctx, "", sTx.ID, opts...,
)
if err != nil {
return nil, err
} else if sTx.transaction == nil {
return nil, spverrors.ErrCouldNotFindTransaction
}

parentsBroadcast, err := _areParentsBroadcasted(ctx, sTx.transaction, opts...)
if err != nil {
return nil, err
}

if !parentsBroadcast {
// if all parents are not broadcast, then we cannot broadcast this tx
continue
}

res = append(res, sTx)
}

return res, nil
}

// getTransactionsToSync will get the sync transactions to sync
func getTransactionsToSync(ctx context.Context, queryParams *datastore.QueryParams,
opts ...ModelOps,
Expand Down Expand Up @@ -157,28 +106,3 @@ func _getSyncTransactionsByConditions(ctx context.Context, conditions map[string

return txs, nil
}

func _areParentsBroadcasted(ctx context.Context, tx *Transaction, opts ...ModelOps) (bool, error) {
// get the sync transaction of all inputs
btTx, err := bt.NewTxFromString(tx.Hex)
if err != nil {
return false, spverrors.Wrapf(err, "could not parse transaction hex")
}

// check that all inputs we handled have been broadcast, or are not handled by SPV Wallet Engine
parentsBroadcasted := true
for _, input := range btTx.Inputs {
var parentTx *SyncTransaction
previousTxID := hex.EncodeToString(bt.ReverseBytes(input.PreviousTxID()))
parentTx, err = GetSyncTransactionByID(ctx, previousTxID, opts...)
if err != nil {
return false, err
}
// if we have a sync transaction, and it is not complete, then we cannot broadcast
if parentTx != nil && parentTx.BroadcastStatus != SyncStatusComplete {
parentsBroadcasted = false
}
}

return parentsBroadcasted, nil
}
72 changes: 0 additions & 72 deletions engine/sync_tx_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime"
"sync"
"time"

"github.com/bitcoin-sv/go-paymail"
Expand Down Expand Up @@ -45,55 +43,6 @@ func processSyncTransactions(ctx context.Context, maxTransactions int, opts ...M
return nil
}

// processBroadcastTransactions will process sync transaction records
func processBroadcastTransactions(ctx context.Context, maxTransactions int, opts ...ModelOps) error {
queryParams := &datastore.QueryParams{
Page: 1,
PageSize: maxTransactions,
OrderByField: createdAtField,
SortDirection: datastore.SortAsc,
}

// Get maxTransactions records, grouped by xpub
snTxs, err := getTransactionsToBroadcast(ctx, queryParams, opts...)
if err != nil {
return err
} else if len(snTxs) == 0 {
return nil
}

// Process the transactions per xpub, in parallel
txsByXpub := _groupByXpub(snTxs)

// we limit the number of concurrent broadcasts to the number of cpus*2, since there is lots of IO wait
limit := make(chan bool, runtime.NumCPU()*2)
wg := new(sync.WaitGroup)

for xPubID := range txsByXpub {
limit <- true // limit the number of routines running at the same time
wg.Add(1)
go func(xPubID string) {
defer wg.Done()
defer func() { <-limit }()

for _, tx := range txsByXpub[xPubID] {
if err = broadcastSyncTransaction(
ctx, tx,
); err != nil {
tx.Client().Logger().Error().
Str("txID", tx.ID).
Str("xpubID", xPubID).
Msgf("error running broadcast tx: %s", err.Error())
return // stop processing transactions for this xpub if we found an error
}
}
}(xPubID)
}
wg.Wait()

return nil
}

// broadcastSyncTransaction will broadcast transaction related to syncTx record
func broadcastSyncTransaction(ctx context.Context, syncTx *SyncTransaction) error {
// Successfully capture any panics, convert to readable string and log the error
Expand Down Expand Up @@ -362,27 +311,6 @@ func _notifyPaymailProviders(ctx context.Context, transaction *Transaction) ([]*

// utils

func _groupByXpub(scTxs []*SyncTransaction) map[string][]*SyncTransaction {
txsByXpub := make(map[string][]*SyncTransaction)

// group transactions by xpub and return including the tx itself
for _, tx := range scTxs {
xPubID := "" // fallback if we have no input xpubs
if len(tx.transaction.XpubInIDs) > 0 {
// use the first xpub for the grouping
// in most cases when we are broadcasting, there should be only 1 xpub in
xPubID = tx.transaction.XpubInIDs[0]
}

if txsByXpub[xPubID] == nil {
txsByXpub[xPubID] = make([]*SyncTransaction, 0)
}
txsByXpub[xPubID] = append(txsByXpub[xPubID], tx)
}

return txsByXpub
}

// _addSyncResult will save the error message for a sync tx
func _addSyncResult(ctx context.Context, syncTx *SyncTransaction,
action, provider, message string,
Expand Down
Loading

0 comments on commit 4413d57

Please sign in to comment.