Skip to content

Commit

Permalink
Merge branch 'main' into refactor/move-paymail-servant-to-package
Browse files Browse the repository at this point in the history
  • Loading branch information
dorzepowski authored Sep 12, 2024
2 parents 80122e1 + ba50ac3 commit eb0ab89
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 210 deletions.
7 changes: 0 additions & 7 deletions engine/model_sync_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ type SyncResults struct {
Results []*SyncResult `json:"results"` // Each result of a sync task
}

// Sync actions for syncing transactions
const (
syncActionBroadcast = "broadcast" // Broadcast a transaction into the mempool
syncActionP2P = "p2p" // Notify all paymail providers associated to the transaction
syncActionSync = "sync" // Get on-chain data about the transaction (IE: block hash, height, etc)
)

// SyncResult is the complete attempt/result to sync (multiple providers and strategies)
type SyncResult struct {
Action string `json:"action"` // type: broadcast, sync etc
Expand Down
35 changes: 0 additions & 35 deletions engine/model_sync_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package engine

import (
"context"
"time"

"github.com/bitcoin-sv/spv-wallet/engine/datastore"
"github.com/bitcoin-sv/spv-wallet/engine/spverrors"
Expand Down Expand Up @@ -99,25 +98,6 @@ func (m *SyncTransaction) AfterCreated(_ context.Context) error {

// BeforeUpdating will fire before the model is being updated
func (m *SyncTransaction) BeforeUpdating(_ context.Context) error {
m.Client().Logger().Debug().
Str("txID", m.ID).
Msgf("starting: %s BeforeUpdate hook...", m.Name())

// Trim the results to the last 20
maxResultsLength := 20

ln := len(m.Results.Results)
if ln > maxResultsLength {
m.Client().Logger().Warn().
Str("txID", m.ID).
Msgf("trimming syncTx.Results")

m.Results.Results = m.Results.Results[ln-maxResultsLength:]
}

m.Client().Logger().Debug().
Str("txID", m.ID).
Msgf("end: %s BeforeUpdate hook", m.Name())
return nil
}

Expand All @@ -126,18 +106,3 @@ func (m *SyncTransaction) Migrate(client datastore.ClientInterface) error {
err := client.IndexMetadata(client.GetTableName(tableSyncTransactions), metadataField)
return spverrors.Wrapf(err, "failed to index metadata column on model %s", m.GetModelName())
}

func (m *SyncTransaction) addSyncResult(ctx context.Context, action, provider, message string) {
m.Results.Results = append(m.Results.Results, &SyncResult{
Action: action,
ExecutedAt: time.Now().UTC(),
Provider: provider,
StatusMessage: message,
})

if m.IsNew() {
return // do not save if new record! caller should decide if want to save new record
}

_ = m.Save(ctx)
}
27 changes: 0 additions & 27 deletions engine/model_sync_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,3 @@ func TestSyncTransaction_GetModelName(t *testing.T) {
require.Nil(t, syncTx)
})
}

func TestSyncTransaction_SaveHook(t *testing.T) {
t.Parallel()

t.Run("trim Results to last 20 messages", func(t *testing.T) {
// Given
ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup())
defer deferMe()

opts := []ModelOps{WithClient(client), New()}
syncTx := newSyncTransaction(testTxID, &SyncConfig{SyncOnChain: true, Broadcast: true}, opts...)

txErr := syncTx.Save(ctx)
require.NoError(t, txErr)

// When
for i := 0; i < 40; i++ {
syncTx.Results.Results = append(syncTx.Results.Results, &SyncResult{Action: "test", StatusMessage: "msg"})
}
txErr = syncTx.Save(ctx)
require.NoError(t, txErr)

// Then
resultsLen := len(syncTx.Results.Results)
require.Equal(t, 20, resultsLen)
})
}
10 changes: 5 additions & 5 deletions engine/paymail.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// finalizeP2PTransaction will notify the paymail provider about the transaction
func finalizeP2PTransaction(ctx context.Context, client paymail.ClientInterface, p4 *PaymailP4, transaction *Transaction) (*paymail.P2PTransactionPayload, error) {
func finalizeP2PTransaction(ctx context.Context, client paymail.ClientInterface, p4 *PaymailP4, transaction *Transaction) error {
if transaction.client != nil {
transaction.client.Logger().Info().
Str("txID", transaction.ID).
Expand All @@ -17,25 +17,25 @@ func finalizeP2PTransaction(ctx context.Context, client paymail.ClientInterface,

p2pTransaction, err := buildP2pTx(ctx, p4, transaction)
if err != nil {
return nil, err
return err
}

response, err := client.SendP2PTransaction(p4.ReceiveEndpoint, p4.Alias, p4.Domain, p2pTransaction)
_, err = client.SendP2PTransaction(p4.ReceiveEndpoint, p4.Alias, p4.Domain, p2pTransaction)
if err != nil {
if transaction.client != nil {
transaction.client.Logger().Info().
Str("txID", transaction.ID).
Msgf("finalizeerror %s, reason: %s", p4.Format, err.Error())
}
return nil, spverrors.Wrapf(err, "failed to send transaction via paymail")
return spverrors.Wrapf(err, "failed to send transaction via paymail")
}

if transaction.client != nil {
transaction.client.Logger().Info().
Str("txID", transaction.ID).
Msgf("successfully finished %s", p4.Format)
}
return &response.P2PTransactionPayload, nil
return nil
}

func buildP2pTx(ctx context.Context, p4 *PaymailP4, transaction *Transaction) (*paymail.P2PTransaction, error) {
Expand Down
33 changes: 5 additions & 28 deletions engine/process_p2p_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package engine
import (
"context"
"fmt"
"time"

"github.com/bitcoin-sv/go-paymail"
)

// processP2PTransaction will process the sync transaction record, or save the failure
Expand All @@ -25,30 +22,20 @@ func processP2PTransaction(ctx context.Context, tx *Transaction) error {

// No draft?
if len(tx.DraftID) == 0 {
syncTx.addSyncResult(ctx, syncActionP2P, "all", "no draft found, cannot complete p2p")

return nil // TODO: why nil here??
}

// Notify any P2P paymail providers associated to the transaction
var results []*SyncResult
if results, err = _notifyPaymailProviders(ctx, tx); err != nil {
syncTx.addSyncResult(ctx, syncActionP2P, "", err.Error())
if err = _notifyPaymailProviders(ctx, tx); err != nil {
return err
}

// Update if we have some results
if len(results) > 0 {
syncTx.Results.Results = append(syncTx.Results.Results, results...)
}

// Update sync status to be ready now
if syncTx.SyncStatus == SyncStatusPending {
syncTx.SyncStatus = SyncStatusReady
}

if err = syncTx.Save(ctx); err != nil {
syncTx.addSyncResult(ctx, syncActionP2P, "internal", err.Error())
return err
}

Expand All @@ -57,14 +44,11 @@ func processP2PTransaction(ctx context.Context, tx *Transaction) error {
}

// _notifyPaymailProviders will notify any associated Paymail providers
func _notifyPaymailProviders(ctx context.Context, transaction *Transaction) ([]*SyncResult, error) {
func _notifyPaymailProviders(ctx context.Context, transaction *Transaction) error {
pm := transaction.Client().PaymailClient()
outputs := transaction.draftTransaction.Configuration.Outputs

notifiedReceivers := make([]string, 0)
results := make([]*SyncResult, len(outputs))

var payload *paymail.P2PTransactionPayload
var err error

for _, out := range outputs {
Expand All @@ -79,23 +63,16 @@ func _notifyPaymailProviders(ctx context.Context, transaction *Transaction) ([]*
continue // no need to send the same transaction to the same receiver second time
}

if payload, err = finalizeP2PTransaction(
if err = finalizeP2PTransaction(
ctx,
pm,
p4,
transaction,
); err != nil {
return nil, err
return err
}

notifiedReceivers = append(notifiedReceivers, receiver)
results = append(results, &SyncResult{
Action: syncActionP2P,
ExecutedAt: time.Now().UTC(),
Provider: p4.ReceiveEndpoint,
StatusMessage: "success: " + payload.TxID,
})

}
return results, nil
return nil
}
40 changes: 6 additions & 34 deletions engine/record_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package engine
import (
"context"
"fmt"
"time"

"github.com/bitcoin-sv/spv-wallet/engine/spverrors"
"github.com/libsv/go-bt/v2"
)

Expand All @@ -29,15 +29,18 @@ func recordTransaction(ctx context.Context, c ClientInterface, strategy recordTx
}()
}

unlock := waitForRecordTxWriteLock(ctx, c, strategy.LockKey())
unlock, err := newWriteLock(ctx, fmt.Sprintf(lockKeyRecordTx, strategy.LockKey()), c.Cachestore())
defer unlock()
if err != nil {
return nil, spverrors.ErrInternal.Wrap(err)
}

logger := c.Logger()
logger.Debug().Str("strategy", strategy.Name()).Str("txID", strategy.TxID()).Msg("Start executing recordTx strategy.")

transaction, err = strategy.Execute(ctx, c, opts)
if err != nil {
logger.Warn().Str("strategy", strategy.Name()).Str("txID", strategy.TxID()).Err(err).Msg("Failed to execure recordTx strategy.")
logger.Warn().Str("strategy", strategy.Name()).Str("txID", strategy.TxID()).Err(err).Msg("Failed to execute recordTx strategy.")
}
return
}
Expand Down Expand Up @@ -81,34 +84,3 @@ func getIncomingTxRecordStrategy(ctx context.Context, c ClientInterface, btTx *b

return rts, nil
}

func waitForRecordTxWriteLock(ctx context.Context, c ClientInterface, key string) func() {
var (
unlock func()
err error
)
// Create the lock and set the release for after the function completes
// Waits for the moment when the transaction is unlocked and creates a new lock
// Relevant for SPV Wallet to SPV Wallet transactions, as we have 1 tx but need to record 2 txs - outgoing and incoming

lockKey := fmt.Sprintf(lockKeyRecordTx, key)

c.Logger().Debug().Msgf("try add write lock %s", lockKey)

for {

unlock, err = newWriteLock(
ctx, lockKey, c.Cachestore(),
)
if err == nil {
c.Logger().Debug().Msgf("added write lock %s", lockKey)
break
}
time.Sleep(time.Second * 1)
}

return func() {
c.Logger().Debug().Msgf("unlock %s", lockKey)
unlock()
}
}
33 changes: 6 additions & 27 deletions engine/sync_tx_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/bitcoin-sv/spv-wallet/engine/chainstate"
"github.com/bitcoin-sv/spv-wallet/engine/datastore"
Expand Down Expand Up @@ -42,12 +41,11 @@ func processSyncTransactions(ctx context.Context, maxTransactions int, opts ...M
return nil
}

// broadcastTxAndUpdateSync will broadcast transaction and update Results and SyncStatus in syncTx
// broadcastTxAndUpdateSync will broadcast transaction and and SyncStatus in syncTx
// It most probably will be deleted after syncTX removal
func broadcastTxAndUpdateSync(ctx context.Context, tx *Transaction) error {
syncTx := tx.syncTransaction
syncResult, err := broadcastTransaction(ctx, tx)
syncTx.Results.Results = append(syncTx.Results.Results, syncResult)
err := broadcastTransaction(ctx, tx)
if err != nil {
return err
}
Expand All @@ -60,7 +58,7 @@ func broadcastTxAndUpdateSync(ctx context.Context, tx *Transaction) error {
return syncTx.Save(ctx)
}

func broadcastTransaction(ctx context.Context, tx *Transaction) (*SyncResult, error) {
func broadcastTransaction(ctx context.Context, tx *Transaction) error {
client := tx.Client()
chainstateSrv := client.Chainstate()

Expand All @@ -73,28 +71,18 @@ func broadcastTransaction(ctx context.Context, tx *Transaction) (*SyncResult, er
)
defer unlock()
if err != nil {
return nil, err
return err
}

// Broadcast
txHex, hexFormat := _getTxHexInFormat(ctx, tx, chainstateSrv.SupportedBroadcastFormats(), client)
br := chainstateSrv.Broadcast(ctx, tx.ID, txHex, hexFormat, defaultBroadcastTimeout)

if br.Failure != nil { // broadcast failed
return &SyncResult{
Action: syncActionBroadcast,
ExecutedAt: time.Now().UTC(),
Provider: br.Provider,
StatusMessage: br.Failure.Error.Error(),
}, br.Failure.Error
return br.Failure.Error
}

return &SyncResult{
Action: syncActionBroadcast,
ExecutedAt: time.Now().UTC(),
Provider: br.Provider,
StatusMessage: "broadcast success",
}, nil
return nil
}

// ///////////////
Expand Down Expand Up @@ -141,7 +129,6 @@ func _syncTxDataFromChain(ctx context.Context, syncTx *SyncTransaction, transact
Msgf("Transaction not found on-chain, will try again later")

syncTx.SyncStatus = SyncStatusReady
syncTx.addSyncResult(ctx, syncActionSync, "all", "transaction not found on-chain")
return nil
}
return spverrors.Wrapf(err, "could not query transaction")
Expand Down Expand Up @@ -179,20 +166,12 @@ func processSyncTxSave(ctx context.Context, txInfo *chainstate.TransactionInfo,

transaction.setChainInfo(txInfo)
if err := transaction.Save(ctx); err != nil {
syncTx.addSyncResult(ctx, syncActionSync, "internal", err.Error())
return err
}

syncTx.SyncStatus = SyncStatusComplete
syncTx.Results.Results = append(syncTx.Results.Results, &SyncResult{
Action: syncActionSync,
ExecutedAt: time.Now().UTC(),
Provider: chainstate.ProviderBroadcastClient,
StatusMessage: "transaction was found on-chain by " + chainstate.ProviderBroadcastClient,
})

if err := syncTx.Save(ctx); err != nil {
syncTx.addSyncResult(ctx, syncActionSync, "internal", err.Error())
return err
}

Expand Down
Loading

0 comments on commit eb0ab89

Please sign in to comment.