Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channeldb+routing+gossiper: add local updates to graph immediately #4964

Merged
merged 4 commits into from
Feb 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion batch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,31 @@ type Request struct {
//
// NOTE: This field is optional.
OnCommit func(commitErr error) error

// lazy should be true if we don't have to immediately execute this
// request when it comes in. This means that it can be scheduled later,
// allowing larger batches.
lazy bool
}

// SchedulerOption is a type that can be used to supply options to a scheduled
// request.
type SchedulerOption func(r *Request)

// LazyAdd will make the request be executed lazily, added to the next batch to
// reduce db contention.
func LazyAdd() SchedulerOption {
return func(r *Request) {
r.lazy = true
}
}

// Scheduler abstracts a generic batching engine that accumulates an incoming
// set of Requests, executes them, and returns the error from the operation.
type Scheduler interface {
// Execute schedules a Request for execution with the next available
// batch. This method blocks until the the underlying closure has been
// run against the databse. The resulting error is returned to the
// run against the database. The resulting error is returned to the
// caller.
Execute(req *Request) error
}
6 changes: 6 additions & 0 deletions batch/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func (s *TimeScheduler) Execute(r *Request) error {
time.AfterFunc(s.duration, s.b.trigger)
}
s.b.reqs = append(s.b.reqs, &req)

// If this is a non-lazy request, we'll execute the batch immediately.
if !r.lazy {
go s.b.trigger()
}

s.mu.Unlock()

// Wait for the batch to process the request. If the batch didn't
Expand Down
43 changes: 34 additions & 9 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,20 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
// channel update.
//
// TODO(roasbeef): also need sig of announcement
func (c *ChannelGraph) AddLightningNode(node *LightningNode) error {
return c.nodeScheduler.Execute(&batch.Request{
func (c *ChannelGraph) AddLightningNode(node *LightningNode,
op ...batch.SchedulerOption) error {

r := &batch.Request{
Update: func(tx kvdb.RwTx) error {
return addLightningNode(tx, node)
},
})
}

for _, f := range op {
f(r)
}

return c.nodeScheduler.Execute(r)
}

func addLightningNode(tx kvdb.RwTx, node *LightningNode) error {
Expand Down Expand Up @@ -588,9 +596,11 @@ func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket,
// involved in creation of the channel, and the set of features that the channel
// supports. The chanPoint and chanID are used to uniquely identify the edge
// globally within the database.
func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo,
op ...batch.SchedulerOption) error {

var alreadyExists bool
return c.chanScheduler.Execute(&batch.Request{
r := &batch.Request{
Reset: func() {
alreadyExists = false
},
Expand Down Expand Up @@ -618,7 +628,13 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
return nil
}
},
})
}

for _, f := range op {
f(r)
}

return c.chanScheduler.Execute(r)
}

// addChannelEdge is the private form of AddChannelEdge that allows callers to
Expand Down Expand Up @@ -1994,12 +2010,15 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex,
// updated, otherwise it's the second node's information. The node ordering is
// determined by the lexicographical ordering of the identity public keys of the
// nodes on either side of the channel.
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy,
op ...batch.SchedulerOption) error {

var (
isUpdate1 bool
edgeNotFound bool
)
return c.chanScheduler.Execute(&batch.Request{

r := &batch.Request{
Reset: func() {
isUpdate1 = false
edgeNotFound = false
Expand Down Expand Up @@ -2028,7 +2047,13 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
return nil
}
},
})
}

for _, f := range op {
f(r)
}

return c.chanScheduler.Execute(r)
}

func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) {
Expand Down
20 changes: 15 additions & 5 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
Expand Down Expand Up @@ -1449,7 +1450,9 @@ func (d *AuthenticatedGossiper) processRejectedEdge(

// addNode processes the given node announcement, and adds it to our channel
// graph.
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error {
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
op ...batch.SchedulerOption) error {

if err := routing.ValidateNodeAnn(msg); err != nil {
return fmt.Errorf("unable to validate node announcement: %v",
err)
Expand All @@ -1469,7 +1472,7 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error {
ExtraOpaqueData: msg.ExtraOpaqueData,
}

return d.cfg.Router.AddNode(node)
return d.cfg.Router.AddNode(node, op...)
}

// processNetworkAnnouncement processes a new network relate authenticated
Expand All @@ -1486,6 +1489,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return chanID.BlockHeight+delta > d.bestHeight
}

// If this is a remote update, we set the scheduler option to lazily
// add it to the graph.
var schedulerOp []batch.SchedulerOption
if nMsg.isRemote {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

schedulerOp = append(schedulerOp, batch.LazyAdd())
}

var announcements []networkMsg

switch msg := nMsg.msg.(type) {
Expand All @@ -1504,7 +1514,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil
}

if err := d.addNode(msg); err != nil {
if err := d.addNode(msg, schedulerOp...); err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {

Expand Down Expand Up @@ -1662,7 +1672,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// writes to the DB.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
if err := d.cfg.Router.AddEdge(edge); err != nil {
if err := d.cfg.Router.AddEdge(edge, schedulerOp...); err != nil {
// If the edge was rejected due to already being known,
// then it may be that case that this new message has a
// fresh channel proof, so we'll check.
Expand Down Expand Up @@ -2002,7 +2012,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
ExtraOpaqueData: msg.ExtraOpaqueData,
}

if err := d.cfg.Router.UpdateEdge(update); err != nil {
if err := d.cfg.Router.UpdateEdge(update, schedulerOp...); err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
log.Debug(err)
Expand Down
13 changes: 10 additions & 3 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
Expand Down Expand Up @@ -115,15 +116,19 @@ func newMockRouter(height uint32) *mockGraphSource {

var _ routing.ChannelGraphSource = (*mockGraphSource)(nil)

func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error {
func (r *mockGraphSource) AddNode(node *channeldb.LightningNode,
_ ...batch.SchedulerOption) error {

r.mu.Lock()
defer r.mu.Unlock()

r.nodes = append(r.nodes, *node)
return nil
}

func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error {
func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo,
_ ...batch.SchedulerOption) error {

r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -135,7 +140,9 @@ func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error {
return nil
}

func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error {
func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy,
_ ...batch.SchedulerOption) error {

r.mu.Lock()
defer r.mu.Unlock()

Expand Down
37 changes: 25 additions & 12 deletions routing/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-errors/errors"

sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/clock"
Expand Down Expand Up @@ -82,20 +83,20 @@ type ChannelGraphSource interface {
// AddNode is used to add information about a node to the router
// database. If the node with this pubkey is not present in an existing
// channel, it will be ignored.
AddNode(node *channeldb.LightningNode) error
AddNode(node *channeldb.LightningNode, op ...batch.SchedulerOption) error

// AddEdge is used to add edge/channel to the topology of the router,
// after all information about channel will be gathered this
// edge/channel might be used in construction of payment path.
AddEdge(edge *channeldb.ChannelEdgeInfo) error
AddEdge(edge *channeldb.ChannelEdgeInfo, op ...batch.SchedulerOption) error

// AddProof updates the channel edge info with proof which is needed to
// properly announce the edge to the rest of the network.
AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error

// UpdateEdge is used to update edge information, without this message
// edge considered as not fully constructed.
UpdateEdge(policy *channeldb.ChannelEdgePolicy) error
UpdateEdge(policy *channeldb.ChannelEdgePolicy, op ...batch.SchedulerOption) error

// IsStaleNode returns true if the graph source has a node announcement
// for the target node with a more recent timestamp. This method will
Expand Down Expand Up @@ -957,7 +958,7 @@ func (r *ChannelRouter) networkHandler() {
// this is either a new update from our PoV or
// an update to a prior vertex/edge we
// previously accepted.
err = r.processUpdate(update.msg)
err = r.processUpdate(update.msg, update.op...)
update.err <- err

// If this message had any dependencies, then
Expand Down Expand Up @@ -1176,7 +1177,9 @@ func (r *ChannelRouter) assertNodeAnnFreshness(node route.Vertex,
// channel/edge update network update. If the update didn't affect the internal
// state of the draft due to either being out of date, invalid, or redundant,
// then error is returned.
func (r *ChannelRouter) processUpdate(msg interface{}) error {
func (r *ChannelRouter) processUpdate(msg interface{},
op ...batch.SchedulerOption) error {

switch msg := msg.(type) {
case *channeldb.LightningNode:
// Before we add the node to the database, we'll check to see
Expand All @@ -1187,7 +1190,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
return err
}

if err := r.cfg.Graph.AddLightningNode(msg); err != nil {
if err := r.cfg.Graph.AddLightningNode(msg, op...); err != nil {
return errors.Errorf("unable to add node %v to the "+
"graph: %v", msg.PubKeyBytes, err)
}
Expand Down Expand Up @@ -1219,7 +1222,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
// short-circuit our path straight to adding the edge to our
// graph.
if r.cfg.AssumeChannelValid {
if err := r.cfg.Graph.AddChannelEdge(msg); err != nil {
if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil {
return fmt.Errorf("unable to add edge: %v", err)
}
log.Tracef("New channel discovered! Link "+
Expand Down Expand Up @@ -1291,7 +1294,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
// after commitment fees are dynamic.
msg.Capacity = btcutil.Amount(chanUtxo.Value)
msg.ChannelPoint = *fundingPoint
if err := r.cfg.Graph.AddChannelEdge(msg); err != nil {
if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil {
return errors.Errorf("unable to add edge: %v", err)
}

Expand Down Expand Up @@ -1390,7 +1393,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
// Now that we know this isn't a stale update, we'll apply the
// new edge policy to the proper directional edge within the
// channel graph.
if err = r.cfg.Graph.UpdateEdgePolicy(msg); err != nil {
if err = r.cfg.Graph.UpdateEdgePolicy(msg, op...); err != nil {
err := errors.Errorf("unable to add channel: %v", err)
log.Error(err)
return err
Expand Down Expand Up @@ -1444,6 +1447,7 @@ func (r *ChannelRouter) fetchFundingTx(
// error channel.
type routingMsg struct {
msg interface{}
op []batch.SchedulerOption
err chan error
}

Expand Down Expand Up @@ -2140,9 +2144,12 @@ func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate,
// be ignored.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) AddNode(node *channeldb.LightningNode) error {
func (r *ChannelRouter) AddNode(node *channeldb.LightningNode,
op ...batch.SchedulerOption) error {

rMsg := &routingMsg{
msg: node,
op: op,
err: make(chan error, 1),
}

Expand All @@ -2164,9 +2171,12 @@ func (r *ChannelRouter) AddNode(node *channeldb.LightningNode) error {
// in construction of payment path.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error {
func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo,
op ...batch.SchedulerOption) error {

rMsg := &routingMsg{
msg: edge,
op: op,
err: make(chan error, 1),
}

Expand All @@ -2187,9 +2197,12 @@ func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error {
// considered as not fully constructed.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) UpdateEdge(update *channeldb.ChannelEdgePolicy) error {
func (r *ChannelRouter) UpdateEdge(update *channeldb.ChannelEdgePolicy,
op ...batch.SchedulerOption) error {

rMsg := &routingMsg{
msg: update,
op: op,
err: make(chan error, 1),
}

Expand Down