Skip to content

Commit

Permalink
txwather: wait for initial block header sub
Browse files Browse the repository at this point in the history
wait for initial block header subscription to handle potential stalls.
remove goroutines for Tx watchers.
These do not need to be goroutine on the caller's side.
  • Loading branch information
YusukeShimizu committed May 9, 2024
1 parent 538b83c commit a448d46
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 34 deletions.
24 changes: 10 additions & 14 deletions cmd/peerswap-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,23 +350,19 @@ func run(ctx context.Context, lightningPlugin *clightning.ClightningClient) erro
swapService := swap.NewSwapService(swapServices)

if liquidTxWatcher != nil && liquidEnabled {
go func() {
err := liquidTxWatcher.StartWatchingTxs()
if err != nil {
log.Infof("%v", err)
os.Exit(1)
}
}()
err := liquidTxWatcher.StartWatchingTxs()
if err != nil {
log.Infof("%v", err)
os.Exit(1)
}
}

if bitcoinTxWatcher != nil {
go func() {
err := bitcoinTxWatcher.StartWatchingTxs()
if err != nil {
log.Infof("%v", err)
os.Exit(1)
}
}()
err := bitcoinTxWatcher.StartWatchingTxs()
if err != nil {
log.Infof("%v", err)
os.Exit(1)
}
}

err = swapService.Start()
Expand Down
12 changes: 5 additions & 7 deletions cmd/peerswaplnd/peerswapd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,11 @@ func run() error {
swapService := swap.NewSwapService(swapServices)

if liquidTxWatcher != nil {
go func() {
err := liquidTxWatcher.StartWatchingTxs()
if err != nil {
log.Infof("%v", err)
os.Exit(1)
}
}()
err := liquidTxWatcher.StartWatchingTxs()
if err != nil {
log.Infof("%v", err)
os.Exit(1)
}
}

err = swapService.Start()
Expand Down
24 changes: 23 additions & 1 deletion lwk/electrumtxwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ package lwk

import (
"context"
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/elementsproject/peerswap/electrum"
"github.com/elementsproject/peerswap/log"
"github.com/elementsproject/peerswap/swap"
)

// initialBlockHeaderSubscriptionTimeout is
// the initial block header subscription timeout.
const initialBlockHeaderSubscriptionTimeout = 1000 * time.Second

type electrumTxWatcher struct {
electrumClient electrum.RPC
blockHeight electrum.BlocKHeight
Expand Down Expand Up @@ -53,7 +58,24 @@ func (r *electrumTxWatcher) StartWatchingTxs() error {
}
}
}()
return nil
return r.waitForInitialBlockHeaderSubscription(ctx)
}

// waitForInitialBlockHeaderSubscription waits for the initial block header subscription to be confirmed.
func (r *electrumTxWatcher) waitForInitialBlockHeaderSubscription(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, initialBlockHeaderSubscriptionTimeout)
defer cancel()
for {
select {
case <-ctx.Done():
log.Infof("Initial block header subscription timeout.")
return ctx.Err()
default:
if r.blockHeight.Confirmed() {
return nil
}
}
}
}

func (r *electrumTxWatcher) AddWaitForConfirmationTx(swapIDStr, txIDStr string, vout, startingHeight uint32, scriptpubkeyByte []byte) {
Expand Down
23 changes: 17 additions & 6 deletions lwk/electrumtxwatcher_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lwk_test

import (
"sync"
"testing"

"github.com/checksum0/go-electrum/electrum"
Expand Down Expand Up @@ -58,13 +59,18 @@ func TestElectrumTxWatcher_Callback(t *testing.T) {
return nil
},
)
err = r.StartWatchingTxs()
assert.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
err = r.StartWatchingTxs()
assert.NoError(t, err)
wg.Done()
}()
r.AddWaitForConfirmationTx(wantSwapID, wantTxID, 0, 0, wantscriptpubkey)
headerResultChan <- &electrum.SubscribeHeadersResult{
Height: onchain.LiquidConfs + targetTXHeight + 1,
}

wg.Wait()
assert.Equal(t, <-callbackChan, wantSwapID)
})

Expand Down Expand Up @@ -109,13 +115,18 @@ func TestElectrumTxWatcher_Callback(t *testing.T) {
return nil
},
)
err = r.StartWatchingTxs()
assert.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
err = r.StartWatchingTxs()
assert.NoError(t, err)
wg.Done()
}()
r.AddWaitForCsvTx(wantSwapID, wantTxID, 0, 0, wantscriptpubkey)
headerResultChan <- &electrum.SubscribeHeadersResult{
Height: onchain.LiquidCsv + targetTXHeight + 1,
}

wg.Wait()
assert.Equal(t, <-callbackChan, wantSwapID)
})

Expand Down
4 changes: 2 additions & 2 deletions test/lwk_cln_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ func Test_ClnCln_LWK_SwapIn(t *testing.T) {
tailableProcess{
p: lightningds[0].DaemonProcess,
filter: filter,
lines: defaultLines,
lines: 3000,
},
tailableProcess{
p: lightningds[1].DaemonProcess,
filter: filter,
lines: defaultLines,
lines: 3000,
},
tailableProcess{
p: electrs.Process,
Expand Down
8 changes: 4 additions & 4 deletions test/testcases.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func coopClaimTest(t *testing.T, params *testParams) {
if err2 != nil {
return false
}
return b != params.origMakerWallet-commitFee-params.swapAmt
return b == params.origMakerWallet-commitFee-claimFee
}, testframework.TIMEOUT))

// Check Wallet balance.
Expand Down Expand Up @@ -199,15 +199,15 @@ func preimageClaimTest(t *testing.T, params *testParams) {
if err2 != nil {
return false
}
return b != params.origTakerWallet
return b == params.origTakerWallet-claimFee+params.swapAmt
}, testframework.TIMEOUT))
require.NoError(
testframework.WaitFor(func() bool {
b, err2 := params.makerNode.GetBtcBalanceSat()
if err2 != nil {
return false
}
return b != params.origMakerWallet
return b == params.origMakerWallet-commitFee-params.swapAmt
}, testframework.TIMEOUT))

// Check Wallet balance.
Expand Down Expand Up @@ -321,7 +321,7 @@ func csvClaimTest(t *testing.T, params *testParams) {
if err2 != nil {
return false
}
return b != params.origMakerWallet-commitFee-params.swapAmt
return b == params.origMakerWallet-commitFee-claimFee
}, testframework.TIMEOUT))

balance, err := params.makerNode.GetBtcBalanceSat()
Expand Down

0 comments on commit a448d46

Please sign in to comment.