From a448d46f9b6269c3f4f01b26af8d537e5106e8d6 Mon Sep 17 00:00:00 2001 From: bruwbird Date: Thu, 9 May 2024 16:39:29 +0900 Subject: [PATCH] txwather: wait for initial block header sub wait for initial block header subscription to handle potential stalls. remove goroutines for Tx watchers. These do not need to be goroutine on the caller's side. --- cmd/peerswap-plugin/main.go | 24 ++++++++++-------------- cmd/peerswaplnd/peerswapd/main.go | 12 +++++------- lwk/electrumtxwatcher.go | 24 +++++++++++++++++++++++- lwk/electrumtxwatcher_test.go | 23 +++++++++++++++++------ test/lwk_cln_test.go | 4 ++-- test/testcases.go | 8 ++++---- 6 files changed, 61 insertions(+), 34 deletions(-) diff --git a/cmd/peerswap-plugin/main.go b/cmd/peerswap-plugin/main.go index fcd9ba89..b065d6cb 100644 --- a/cmd/peerswap-plugin/main.go +++ b/cmd/peerswap-plugin/main.go @@ -350,23 +350,19 @@ func run(ctx context.Context, lightningPlugin *clightning.ClightningClient) erro swapService := swap.NewSwapService(swapServices) if liquidTxWatcher != nil && liquidEnabled { - go func() { - err := liquidTxWatcher.StartWatchingTxs() - if err != nil { - log.Infof("%v", err) - os.Exit(1) - } - }() + err := liquidTxWatcher.StartWatchingTxs() + if err != nil { + log.Infof("%v", err) + os.Exit(1) + } } if bitcoinTxWatcher != nil { - go func() { - err := bitcoinTxWatcher.StartWatchingTxs() - if err != nil { - log.Infof("%v", err) - os.Exit(1) - } - }() + err := bitcoinTxWatcher.StartWatchingTxs() + if err != nil { + log.Infof("%v", err) + os.Exit(1) + } } err = swapService.Start() diff --git a/cmd/peerswaplnd/peerswapd/main.go b/cmd/peerswaplnd/peerswapd/main.go index b13d22e5..45c9a784 100644 --- a/cmd/peerswaplnd/peerswapd/main.go +++ b/cmd/peerswaplnd/peerswapd/main.go @@ -340,13 +340,11 @@ func run() error { swapService := swap.NewSwapService(swapServices) if liquidTxWatcher != nil { - go func() { - err := liquidTxWatcher.StartWatchingTxs() - if err != nil { - log.Infof("%v", err) - os.Exit(1) - } - }() + err := liquidTxWatcher.StartWatchingTxs() + if err != nil { + log.Infof("%v", err) + os.Exit(1) + } } err = swapService.Start() diff --git a/lwk/electrumtxwatcher.go b/lwk/electrumtxwatcher.go index b4cf6f5b..cb1cd627 100644 --- a/lwk/electrumtxwatcher.go +++ b/lwk/electrumtxwatcher.go @@ -2,6 +2,7 @@ package lwk import ( "context" + "time" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/elementsproject/peerswap/electrum" @@ -9,6 +10,10 @@ import ( "github.com/elementsproject/peerswap/swap" ) +// initialBlockHeaderSubscriptionTimeout is +// the initial block header subscription timeout. +const initialBlockHeaderSubscriptionTimeout = 1000 * time.Second + type electrumTxWatcher struct { electrumClient electrum.RPC blockHeight electrum.BlocKHeight @@ -53,7 +58,24 @@ func (r *electrumTxWatcher) StartWatchingTxs() error { } } }() - return nil + return r.waitForInitialBlockHeaderSubscription(ctx) +} + +// waitForInitialBlockHeaderSubscription waits for the initial block header subscription to be confirmed. +func (r *electrumTxWatcher) waitForInitialBlockHeaderSubscription(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, initialBlockHeaderSubscriptionTimeout) + defer cancel() + for { + select { + case <-ctx.Done(): + log.Infof("Initial block header subscription timeout.") + return ctx.Err() + default: + if r.blockHeight.Confirmed() { + return nil + } + } + } } func (r *electrumTxWatcher) AddWaitForConfirmationTx(swapIDStr, txIDStr string, vout, startingHeight uint32, scriptpubkeyByte []byte) { diff --git a/lwk/electrumtxwatcher_test.go b/lwk/electrumtxwatcher_test.go index 44135b8f..29b99151 100644 --- a/lwk/electrumtxwatcher_test.go +++ b/lwk/electrumtxwatcher_test.go @@ -1,6 +1,7 @@ package lwk_test import ( + "sync" "testing" "github.com/checksum0/go-electrum/electrum" @@ -58,13 +59,18 @@ func TestElectrumTxWatcher_Callback(t *testing.T) { return nil }, ) - err = r.StartWatchingTxs() - assert.NoError(t, err) + var wg sync.WaitGroup + wg.Add(1) + go func() { + err = r.StartWatchingTxs() + assert.NoError(t, err) + wg.Done() + }() r.AddWaitForConfirmationTx(wantSwapID, wantTxID, 0, 0, wantscriptpubkey) headerResultChan <- &electrum.SubscribeHeadersResult{ Height: onchain.LiquidConfs + targetTXHeight + 1, } - + wg.Wait() assert.Equal(t, <-callbackChan, wantSwapID) }) @@ -109,13 +115,18 @@ func TestElectrumTxWatcher_Callback(t *testing.T) { return nil }, ) - err = r.StartWatchingTxs() - assert.NoError(t, err) + var wg sync.WaitGroup + wg.Add(1) + go func() { + err = r.StartWatchingTxs() + assert.NoError(t, err) + wg.Done() + }() r.AddWaitForCsvTx(wantSwapID, wantTxID, 0, 0, wantscriptpubkey) headerResultChan <- &electrum.SubscribeHeadersResult{ Height: onchain.LiquidCsv + targetTXHeight + 1, } - + wg.Wait() assert.Equal(t, <-callbackChan, wantSwapID) }) diff --git a/test/lwk_cln_test.go b/test/lwk_cln_test.go index 69f67abb..ea53290b 100644 --- a/test/lwk_cln_test.go +++ b/test/lwk_cln_test.go @@ -33,12 +33,12 @@ func Test_ClnCln_LWK_SwapIn(t *testing.T) { tailableProcess{ p: lightningds[0].DaemonProcess, filter: filter, - lines: defaultLines, + lines: 3000, }, tailableProcess{ p: lightningds[1].DaemonProcess, filter: filter, - lines: defaultLines, + lines: 3000, }, tailableProcess{ p: electrs.Process, diff --git a/test/testcases.go b/test/testcases.go index 4559d667..326b9478 100644 --- a/test/testcases.go +++ b/test/testcases.go @@ -113,7 +113,7 @@ func coopClaimTest(t *testing.T, params *testParams) { if err2 != nil { return false } - return b != params.origMakerWallet-commitFee-params.swapAmt + return b == params.origMakerWallet-commitFee-claimFee }, testframework.TIMEOUT)) // Check Wallet balance. @@ -199,7 +199,7 @@ func preimageClaimTest(t *testing.T, params *testParams) { if err2 != nil { return false } - return b != params.origTakerWallet + return b == params.origTakerWallet-claimFee+params.swapAmt }, testframework.TIMEOUT)) require.NoError( testframework.WaitFor(func() bool { @@ -207,7 +207,7 @@ func preimageClaimTest(t *testing.T, params *testParams) { if err2 != nil { return false } - return b != params.origMakerWallet + return b == params.origMakerWallet-commitFee-params.swapAmt }, testframework.TIMEOUT)) // Check Wallet balance. @@ -321,7 +321,7 @@ func csvClaimTest(t *testing.T, params *testParams) { if err2 != nil { return false } - return b != params.origMakerWallet-commitFee-params.swapAmt + return b == params.origMakerWallet-commitFee-claimFee }, testframework.TIMEOUT)) balance, err := params.makerNode.GetBtcBalanceSat()