Skip to content

Commit

Permalink
client: monitor wallet peers
Browse files Browse the repository at this point in the history
  • Loading branch information
chappjc committed Jan 25, 2022
1 parent 9f14d65 commit 0b49f8d
Show file tree
Hide file tree
Showing 20 changed files with 309 additions and 55 deletions.
44 changes: 38 additions & 6 deletions client/asset/btc/btc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ import (
const (
version = 0

// Use RawRequest to get the verbose block header for a blockhash.
methodGetBlockHeader = "getblockheader"
// Use RawRequest to get the verbose block with verbose txs, as the btcd
// rpcclient.Client's GetBlockVerboseTx appears to be busted.
methodGetNetworkInfo = "getnetworkinfo"
methodGetBlockchainInfo = "getblockchaininfo"
// BipID is the BIP-0044 asset ID.
BipID = 0

Expand Down Expand Up @@ -81,6 +75,7 @@ const (
var (
// blockTicker is the delay between calls to check for new blocks.
blockTicker = time.Second
peerCountTicker = 5 * time.Second
walletBlockAllowance = time.Second * 10
conventionalConversionFactor = float64(dexbtc.UnitInfo.Conventional.ConversionFactor)
rpcOpts = []*asset.ConfigOption{
Expand Down Expand Up @@ -513,6 +508,8 @@ type ExchangeWallet struct {
log dex.Logger
symbol string
tipChange func(error)
lastPeerCount uint32
peersChange func(uint32)
minNetworkVersion uint64
fallbackFeeRate uint64
feeRateLimit uint64
Expand Down Expand Up @@ -721,6 +718,7 @@ func newUnconnectedWallet(cfg *BTCCloneCFG, walletCfg *WalletConfig) (*ExchangeW
chainParams: cfg.ChainParams,
log: cfg.Logger,
tipChange: cfg.WalletCFG.TipChange,
peersChange: cfg.WalletCFG.PeersChange,
fundingCoins: make(map[outPoint]*utxo),
findRedemptionQueue: make(map[outPoint]*findRedemptionReq),
minNetworkVersion: cfg.MinNetworkVersion,
Expand Down Expand Up @@ -816,6 +814,11 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error)
btc.watchBlocks(ctx)
btc.shutdown()
}()
wg.Add(1)
go func() {
defer wg.Done()
btc.monitorPeers(ctx)
}()
return &wg, nil
}

Expand Down Expand Up @@ -2685,6 +2688,35 @@ func (btc *ExchangeWallet) RegFeeConfirmations(_ context.Context, id dex.Bytes)
return uint32(tx.Confirmations), nil
}

func (btc *ExchangeWallet) checkPeers() {
numPeers, err := btc.node.peerCount()
if err != nil { // e.g. dcrd passthrough fail in non-SPV mode
prevPeer := atomic.SwapUint32(&btc.lastPeerCount, 0)
if prevPeer != 0 {
btc.log.Errorf("Failed to get peer count: %v", err)
btc.peersChange(0)
}
return
}
prevPeer := atomic.SwapUint32(&btc.lastPeerCount, numPeers)
if prevPeer != numPeers {
btc.peersChange(numPeers)
}
}

func (btc *ExchangeWallet) monitorPeers(ctx context.Context) {
ticker := time.NewTicker(peerCountTicker)
defer ticker.Stop()
for {
select {
case <-ticker.C:
btc.checkPeers()
case <-ctx.Done():
return
}
}
}

// watchBlocks pings for new blocks and runs the tipChange callback function
// when the block changes.
func (btc *ExchangeWallet) watchBlocks(ctx context.Context) {
Expand Down
3 changes: 3 additions & 0 deletions client/asset/btc/btc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,9 @@ func tNewWallet(segwit bool, walletType string) (*ExchangeWallet, *testData, fun
default:
}
},
PeersChange: func(num uint32) {
fmt.Println("peer count: ", num)
},
}
walletCtx, shutdown := context.WithCancel(tCtx)
cfg := &BTCCloneCFG{
Expand Down
3 changes: 3 additions & 0 deletions client/asset/btc/livetest/livetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func tBackend(ctx context.Context, t *testing.T, cfg *Config, node, name string,
TipChange: func(err error) {
blkFunc(reportName, err)
},
PeersChange: func(num uint32) {
fmt.Println("peer count: ", num)
},
}

w, err := cfg.NewWallet(walletCfg, logger, dex.Regtest)
Expand Down
20 changes: 20 additions & 0 deletions client/asset/btc/rpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ const (
methodGetBestBlockHash = "getbestblockhash"
methodGetRawMempool = "getrawmempool"
methodGetRawTransaction = "getrawtransaction"
methodGetBlockHeader = "getblockheader"
methodGetNetworkInfo = "getnetworkinfo"
methodGetBlockchainInfo = "getblockchaininfo"
)

// RawRequester defines decred's rpcclient RawRequest func where all RPC
Expand Down Expand Up @@ -532,6 +535,23 @@ func (wc *rpcClient) getBlockHeight(blockHash *chainhash.Hash) (int32, error) {
return int32(hdr.Height), nil
}

func (wc *rpcClient) peerCount() (uint32, error) {
// var peerInfo []btcjson.GetPeerInfoResult
// err := wc.call(methodGetPeerInfo, nil, &peerInfo)
// if err != nil {
// return 0, err
// }
// return uint32(len(peerInfo)), nil
var r struct {
Connections uint32 `json:"connections"`
}
err := wc.call(methodGetNetworkInfo, nil, &r)
if err != nil {
return 0, err
}
return r.Connections, nil
}

// getBlockchainInfo sends the getblockchaininfo request and returns the result.
func (wc *rpcClient) getBlockchainInfo() (*getBlockchainInfoResult, error) {
chainInfo := new(getBlockchainInfoResult)
Expand Down
4 changes: 4 additions & 0 deletions client/asset/btc/spv.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,10 @@ func (w *spvWallet) getChainHeight() (int32, error) {
return blk.Height, err
}

func (w *spvWallet) peerCount() (uint32, error) {
return uint32(len(w.cl.Peers())), nil
}

// syncHeight is the best known sync height among peers.
func (w *spvWallet) syncHeight() int32 {
var maxHeight int32
Expand Down
1 change: 1 addition & 0 deletions client/asset/btc/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Wallet interface {
sendToAddress(address string, value, feeRate uint64, subtract bool) (*chainhash.Hash, error)
locked() bool
syncStatus() (*syncStatus, error)
peerCount() (uint32, error)
swapConfirmations(txHash *chainhash.Hash, vout uint32, contract []byte, startTime time.Time) (confs uint32, spent bool, err error)
getBlockHeader(blockHash *chainhash.Hash) (*blockHeader, error)
ownsAddress(addr btcutil.Address) (bool, error)
Expand Down
41 changes: 41 additions & 0 deletions client/asset/dcr/dcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"decred.org/dcrdex/client/asset"
Expand Down Expand Up @@ -69,6 +70,7 @@ const (
var (
// blockTicker is the delay between calls to check for new blocks.
blockTicker = time.Second
peerCountTicker = 5 * time.Second
conventionalConversionFactor = float64(dexdcr.UnitInfo.Conventional.ConversionFactor)
configOpts = []*asset.ConfigOption{
{
Expand Down Expand Up @@ -386,6 +388,8 @@ type ExchangeWallet struct {
log dex.Logger
acct string
tipChange func(error)
lastPeerCount uint32
peersChange func(uint32)
fallbackFeeRate uint64
feeRateLimit uint64
redeemConfTarget uint64
Expand Down Expand Up @@ -501,6 +505,7 @@ func unconnectedWallet(cfg *asset.WalletConfig, dcrCfg *Config, chainParams *cha
chainParams: chainParams,
acct: dcrCfg.Account,
tipChange: cfg.TipChange,
peersChange: cfg.PeersChange,
fundingCoins: make(map[outPoint]*fundingCoin),
findRedemptionQueue: make(map[outPoint]*findRedemptionReq),
externalTxCache: make(map[chainhash.Hash]*externalTx),
Expand Down Expand Up @@ -583,6 +588,11 @@ func (dcr *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error)
}
dcr.shutdown()
}()
wg.Add(1)
go func() {
defer wg.Done()
dcr.monitorPeers(ctx)
}()
return &wg, nil
}

Expand Down Expand Up @@ -3067,6 +3077,37 @@ func (dcr *ExchangeWallet) getKeys(addr stdaddr.Address) (*secp256k1.PrivateKey,
return priv, priv.PubKey(), nil
}

func (dcr *ExchangeWallet) checkPeers() {
ctx, cancel := context.WithTimeout(dcr.ctx, 2*time.Second)
defer cancel()
numPeers, err := dcr.wallet.PeerCount(ctx)
if err != nil { // e.g. dcrd passthrough fail in non-SPV mode
prevPeer := atomic.SwapUint32(&dcr.lastPeerCount, 0)
if prevPeer != 0 {
dcr.log.Errorf("Failed to get peer count: %v", err)
dcr.peersChange(0)
}
return
}
prevPeer := atomic.SwapUint32(&dcr.lastPeerCount, numPeers)
if prevPeer != numPeers {
dcr.peersChange(numPeers)
}
}

func (dcr *ExchangeWallet) monitorPeers(ctx context.Context) {
ticker := time.NewTicker(peerCountTicker)
defer ticker.Stop()
for {
select {
case <-ticker.C:
dcr.checkPeers()
case <-ctx.Done():
return
}
}
}

// monitorBlocks pings for new blocks and runs the tipChange callback function
// when the block changes. New blocks are also scanned for potential contract
// redeems.
Expand Down
9 changes: 8 additions & 1 deletion client/asset/dcr/dcr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func newTxOutResult(script []byte, value uint64, confs int64) *chainjson.GetTxOu
func tNewWallet() (*ExchangeWallet, *tRPCClient, func(), error) {
client := newTRPCClient()
walletCfg := &asset.WalletConfig{
TipChange: func(error) {},
TipChange: func(error) {},
PeersChange: func(uint32) {},
}
walletCtx, shutdown := context.WithCancel(tCtx)
wallet, err := unconnectedWallet(walletCfg, &Config{Account: "default"}, tChainParams, tLogger)
Expand Down Expand Up @@ -530,6 +531,12 @@ func (c *tRPCClient) RawRequest(_ context.Context, method string, params []json.
}

switch method {
case methodGetPeerInfo:
return json.Marshal([]*walletjson.GetPeerInfoResult{
{
Addr: "127.0.0.1",
},
})
case methodGetCFilterV2:
if len(params) != 1 {
return nil, fmt.Errorf("getcfilterv2 requires 1 param, got %d", len(params))
Expand Down
19 changes: 18 additions & 1 deletion client/asset/dcr/rpcwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
methodListLockUnspent = "listlockunspent"
methodSignRawTransaction = "signrawtransaction"
methodSyncStatus = "syncstatus"
methodGetPeerInfo = "getpeerinfo"
)

// rpcWallet implements Wallet functionality using an rpc client to communicate
Expand Down Expand Up @@ -436,6 +437,7 @@ func (w *rpcWallet) GetNewAddressGapPolicy(ctx context.Context, account string,
// SignRawTransaction signs the provided transaction using rpc RawRequest.
// Part of the Wallet interface.
func (w *rpcWallet) SignRawTransaction(ctx context.Context, txHex string) (*walletjson.SignRawTransactionResult, error) {
// w.rpcClient.SignRawTransaction exists
var res walletjson.SignRawTransactionResult
err := w.rpcClientRawRequest(ctx, methodSignRawTransaction, anylist{txHex}, &res)
return &res, err
Expand Down Expand Up @@ -574,7 +576,22 @@ func (w *rpcWallet) SyncStatus(ctx context.Context) (bool, float32, error) {
return false, 0, fmt.Errorf("rawrequest error: %w", err)
}
ready := syncStatus.Synced && !syncStatus.InitialBlockDownload
return ready, syncStatus.HeadersFetchProgress, nil
if !ready {
return ready, syncStatus.HeadersFetchProgress, nil
}
// It looks like we are ready based on syncstatus, but that may just be
// comparing wallet height to known chain height. Now check peers.
numPeers, err := w.PeerCount(ctx)
if err != nil {
return false, 0, err
}
return ready && numPeers > 0, syncStatus.HeadersFetchProgress, nil
}

func (w *rpcWallet) PeerCount(ctx context.Context) (uint32, error) {
var peerInfo []*walletjson.GetPeerInfoResult
err := w.rpcClientRawRequest(ctx, methodGetPeerInfo, nil, &peerInfo)
return uint32(len(peerInfo)), err
}

// AddressPrivKey fetches the privkey for the specified address.
Expand Down
3 changes: 3 additions & 0 deletions client/asset/dcr/simnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func tBackend(t *testing.T, name string, blkFunc func(string, error)) (*Exchange
TipChange: func(err error) {
blkFunc(name, err)
},
PeersChange: func(num uint32) {
t.Log("peer count: ", num)
},
}
var backend asset.Wallet
backend, err = NewWallet(walletCfg, tLogger, dex.Simnet)
Expand Down
1 change: 1 addition & 0 deletions client/asset/dcr/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type Wallet interface {
UnlockAccount(ctx context.Context, account, passphrase string) error
// SyncStatus returns the wallet's sync status.
SyncStatus(ctx context.Context) (bool, float32, error)
PeerCount(ctx context.Context) (uint32, error)
// AddressPrivKey fetches the privkey for the specified address.
AddressPrivKey(ctx context.Context, address stdaddr.Address) (*dcrutil.WIF, error)
}
Expand Down
Loading

0 comments on commit 0b49f8d

Please sign in to comment.