Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: add htlc stream subscription and routing dashboard #59

Merged
merged 5 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions collectors/chain_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package collectors

import (
"context"
"fmt"

"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightninglabs/lndclient"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -13,12 +14,18 @@ type ChainCollector struct {
bestBlockTimestamp *prometheus.Desc
syncedToChain *prometheus.Desc

lnd lnrpc.LightningClient
lnd lndclient.LightningClient

// errChan is a channel that we send any errors that we encounter into.
// This channel should be buffered so that it does not block sends.
errChan chan<- error
}

// NewChainCollector returns a new instance of the ChainCollector for the target
// lnd client.
func NewChainCollector(lnd lnrpc.LightningClient) *ChainCollector {
func NewChainCollector(lnd lndclient.LightningClient,
errChan chan<- error) *ChainCollector {

return &ChainCollector{
bestBlockHeight: prometheus.NewDesc(
"lnd_chain_block_height", "best block height from lnd",
Expand All @@ -34,7 +41,8 @@ func NewChainCollector(lnd lnrpc.LightningClient) *ChainCollector {
"lnd is synced to the chain",
nil, nil,
),
lnd: lnd,
lnd: lnd,
errChan: errChan,
}
}

Expand All @@ -53,9 +61,10 @@ func (c *ChainCollector) Describe(ch chan<- *prometheus.Desc) {
//
// NOTE: Part of the prometheus.Collector interface.
func (c *ChainCollector) Collect(ch chan<- prometheus.Metric) {
resp, err := c.lnd.GetInfo(context.Background(), &lnrpc.GetInfoRequest{})
resp, err := c.lnd.GetInfo(context.Background())
if err != nil {
chainLogger.Error(err)
c.errChan <- fmt.Errorf("ChainCollector GetInfo failed with: "+
"%v", err)
return
}

Expand All @@ -66,7 +75,7 @@ func (c *ChainCollector) Collect(ch chan<- prometheus.Metric) {

ch <- prometheus.MustNewConstMetric(
c.bestBlockTimestamp, prometheus.GaugeValue,
float64(resp.BestHeaderTimestamp),
float64(resp.BestHeaderTimeStamp.Unix()),
)

ch <- prometheus.MustNewConstMetric(
Expand Down
4 changes: 2 additions & 2 deletions collectors/channel_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"testing"

"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightninglabs/lndclient"
"github.com/stretchr/testify/require"
)

var (
remotePolicies = map[uint64]*lnrpc.RoutingPolicy{
remotePolicies = map[uint64]*lndclient.RoutingPolicy{
1: {
FeeBaseMsat: 20000,
FeeRateMilliMsat: 10000,
Expand Down
127 changes: 63 additions & 64 deletions collectors/channels_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strconv"

"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightninglabs/lndclient"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -41,14 +41,18 @@ type ChannelsCollector struct {
// last hop towards this node.
inboundFee *prometheus.Desc

lnd lnrpc.LightningClient
lnd lndclient.LightningClient

primaryNode *route.Vertex

// errChan is a channel that we send any errors that we encounter into.
// This channel should be buffered so that it does not block sends.
errChan chan<- error
}

// NewChannelsCollector returns a new instance of the ChannelsCollector for the
// target lnd client.
func NewChannelsCollector(lnd lnrpc.LightningClient,
func NewChannelsCollector(lnd lndclient.LightningClient, errChan chan<- error,
cfg *MonitoringConfig) *ChannelsCollector {

// Our set of labels, status should either be active or inactive. The
Expand Down Expand Up @@ -155,6 +159,7 @@ func NewChannelsCollector(lnd lnrpc.LightningClient,

lnd: lnd,
primaryNode: cfg.PrimaryNode,
errChan: errChan,
}
}

Expand Down Expand Up @@ -199,11 +204,10 @@ func (c *ChannelsCollector) Describe(ch chan<- *prometheus.Desc) {
func (c *ChannelsCollector) Collect(ch chan<- prometheus.Metric) {
// First, based on the channel balance, we'll export the total and
// pending channel balances.
chanBalResp, err := c.lnd.ChannelBalance(
context.Background(), &lnrpc.ChannelBalanceRequest{},
)
chanBalResp, err := c.lnd.ChannelBalance(context.Background())
if err != nil {
channelLogger.Error(err)
c.errChan <- fmt.Errorf("ChannelsCollector ChannelBalance "+
"failed with: %v", err)
return
}

Expand All @@ -213,45 +217,43 @@ func (c *ChannelsCollector) Collect(ch chan<- prometheus.Metric) {
)
ch <- prometheus.MustNewConstMetric(
c.pendingChannelBalanceDesc, prometheus.GaugeValue,
float64(chanBalResp.PendingOpenBalance),
float64(chanBalResp.PendingBalance),
)

// Obtain information w.r.t the number of channels we
// have open.
getInfoResp, err := c.lnd.GetInfo(
context.Background(), &lnrpc.GetInfoRequest{},
)
getInfoResp, err := c.lnd.GetInfo(context.Background())
if err != nil {
channelLogger.Error(err)
c.errChan <- fmt.Errorf("ChannelsCollector GetInfo failed "+
"with: %v", err)
return
}

ch <- prometheus.MustNewConstMetric(
c.numActiveChansDesc, prometheus.GaugeValue,
float64(getInfoResp.NumActiveChannels),
float64(getInfoResp.ActiveChannels),
)
ch <- prometheus.MustNewConstMetric(
c.numInactiveChansDesc, prometheus.GaugeValue,
float64(getInfoResp.NumInactiveChannels),
float64(getInfoResp.InactiveChannels),
)
ch <- prometheus.MustNewConstMetric(
c.numPendingChansDesc, prometheus.GaugeValue,
float64(getInfoResp.NumPendingChannels),
float64(getInfoResp.PendingChannels),
)

// Next, for each channel we'll export the total sum of our balances,
// as well as the number of pending HTLCs.
listChannelsResp, err := c.lnd.ListChannels(
context.Background(), &lnrpc.ListChannelsRequest{},
)
listChannelsResp, err := c.lnd.ListChannels(context.Background())
if err != nil {
channelLogger.Error(err)
c.errChan <- fmt.Errorf("ChannelsCollector ListChannels "+
"failed with: %v", err)
return
}

// statusLabel is a small helper function returns the proper status
// label for a given channel.
statusLabel := func(c *lnrpc.Channel) string {
statusLabel := func(c lndclient.ChannelInfo) string {
if c.Active {
return "active"
}
Expand All @@ -261,7 +263,7 @@ func (c *ChannelsCollector) Collect(ch chan<- prometheus.Metric) {

// initiatorLabel is a small helper function that returns the proper
// "initiator" label for a given channel.
initiatorLabel := func(c *lnrpc.Channel) string {
initiatorLabel := func(c lndclient.ChannelInfo) string {
if c.Initiator {
return "true"
}
Expand All @@ -270,91 +272,91 @@ func (c *ChannelsCollector) Collect(ch chan<- prometheus.Metric) {
}

remoteBalances := make(map[uint64]btcutil.Amount)
for _, channel := range listChannelsResp.Channels {
for _, channel := range listChannelsResp {
status := statusLabel(channel)
initiator := initiatorLabel(channel)

chanIdStr := strconv.Itoa(int(channel.ChannelID))

primaryChannel := c.primaryNode != nil &&
channel.RemotePubkey == c.primaryNode.String()
channel.PubKeyBytes == *c.primaryNode

// Only record balances for channels that are usable and
// external.
if channel.Active && !primaryChannel {
remoteBalances[channel.ChanId] = btcutil.Amount(
channel.RemoteBalance,
)
remoteBalances[channel.ChannelID] = channel.RemoteBalance
}

ch <- prometheus.MustNewConstMetric(
c.incomingChanSatDesc, prometheus.GaugeValue,
float64(channel.RemoteBalance),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.RemoteBalance), chanIdStr, status,
initiator,
)
ch <- prometheus.MustNewConstMetric(
c.outgoingChanSatDesc, prometheus.GaugeValue,
float64(channel.LocalBalance),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.LocalBalance), chanIdStr, status,
initiator,
)
ch <- prometheus.MustNewConstMetric(
c.numPendingHTLCsDesc, prometheus.GaugeValue,
float64(len(channel.PendingHtlcs)),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.NumPendingHtlcs), chanIdStr, status,
initiator,
)
ch <- prometheus.MustNewConstMetric(
c.satsSentDesc, prometheus.GaugeValue,
float64(channel.TotalSatoshisSent),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.TotalSent), chanIdStr, status,
initiator,
)
ch <- prometheus.MustNewConstMetric(
c.satsRecvDesc, prometheus.GaugeValue,
float64(channel.TotalSatoshisReceived),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.TotalReceived), chanIdStr, status,
initiator,
)
ch <- prometheus.MustNewConstMetric(
c.numUpdatesDesc, prometheus.GaugeValue,
float64(channel.NumUpdates),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.NumUpdates), chanIdStr, status,
initiator,
)
ch <- prometheus.MustNewConstMetric(
c.csvDelayDesc, prometheus.GaugeValue,
float64(channel.CsvDelay),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.LocalConstraints.CsvDelay), chanIdStr,
status, initiator,
)
ch <- prometheus.MustNewConstMetric(
c.unsettledBalanceDesc, prometheus.GaugeValue,
float64(channel.UnsettledBalance),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.UnsettledBalance), chanIdStr, status,
initiator,
)
ch <- prometheus.MustNewConstMetric(
c.feePerKwDesc, prometheus.GaugeValue,
float64(channel.FeePerKw),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.FeePerKw), chanIdStr, status, initiator,
)
ch <- prometheus.MustNewConstMetric(
c.commitWeightDesc, prometheus.GaugeValue,
float64(channel.CommitWeight),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.CommitWeight), chanIdStr, status,
initiator,
)
ch <- prometheus.MustNewConstMetric(
c.commitFeeDesc, prometheus.GaugeValue,
float64(channel.CommitFee),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.CommitFee), chanIdStr, status,
initiator,
)

// Only record uptime if the channel has been monitored.
if channel.Lifetime != 0 {
if channel.LifeTime != 0 {
ch <- prometheus.MustNewConstMetric(
c.channelUptimeDesc, prometheus.GaugeValue,
float64(channel.Uptime)/float64(channel.Lifetime),
strconv.Itoa(int(channel.ChanId)), status, initiator,
float64(channel.Uptime)/float64(channel.LifeTime),
chanIdStr, status, initiator,
)
}
}

// Get all remote policies
remotePolicies, err := c.getRemotePolicies(getInfoResp.IdentityPubkey)
if err != nil {
channelLogger.Error(err)
c.errChan <- fmt.Errorf("ChannelsCollector getRemotePolicies "+
"failed with: %v", err)
return
}

Expand Down Expand Up @@ -386,7 +388,7 @@ func (c *ChannelsCollector) Collect(ch chan<- prometheus.Metric) {

// approximateInboundFee calculates to forward fee for a specific amount charged by the
// last hop before this node.
func approximateInboundFee(amt btcutil.Amount, remotePolicies map[uint64]*lnrpc.RoutingPolicy,
func approximateInboundFee(amt btcutil.Amount, remotePolicies map[uint64]*lndclient.RoutingPolicy,
remoteBalances map[uint64]btcutil.Amount) *btcutil.Amount {

var fee btcutil.Amount
Expand Down Expand Up @@ -468,27 +470,24 @@ func approximateInboundFee(amt btcutil.Amount, remotePolicies map[uint64]*lnrpc.

// getRemotePolicies gets all the remote policies for enabled channels of this
// node's peers.
func (c *ChannelsCollector) getRemotePolicies(pubkey string) (
map[uint64]*lnrpc.RoutingPolicy, error) {
func (c *ChannelsCollector) getRemotePolicies(pubkey route.Vertex) (
map[uint64]*lndclient.RoutingPolicy, error) {

nodeInfoResp, err := c.lnd.GetNodeInfo(
context.Background(), &lnrpc.NodeInfoRequest{
IncludeChannels: true,
PubKey: pubkey,
},
context.Background(), pubkey, true,
)
if err != nil {
return nil, err
}

policies := make(map[uint64]*lnrpc.RoutingPolicy)
policies := make(map[uint64]*lndclient.RoutingPolicy)
for _, i := range nodeInfoResp.Channels {
var policy *lnrpc.RoutingPolicy
var policy *lndclient.RoutingPolicy
switch {
case i.Node1Pub == pubkey:
policy = i.Node2Policy
case i.Node1 == pubkey:
policy = i.Node1Policy

case i.Node2Pub == pubkey:
case i.Node2 == pubkey:
policy = i.Node1Policy

default:
Expand Down
Loading