Skip to content

Commit

Permalink
Merge pull request #4964 from halseth/gossip-local-no-batch
Browse files Browse the repository at this point in the history
channeldb+routing+gossiper: add local updates to graph immediately
  • Loading branch information
Roasbeef authored Feb 11, 2021
2 parents 1ee5eb9 + c9afc93 commit 355b6a2
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 30 deletions.
19 changes: 18 additions & 1 deletion batch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,31 @@ type Request struct {
//
// NOTE: This field is optional.
OnCommit func(commitErr error) error

// lazy should be true if we don't have to immediately execute this
// request when it comes in. This means that it can be scheduled later,
// allowing larger batches.
lazy bool
}

// SchedulerOption is a type that can be used to supply options to a scheduled
// request.
type SchedulerOption func(r *Request)

// LazyAdd will make the request be executed lazily, added to the next batch to
// reduce db contention.
func LazyAdd() SchedulerOption {
return func(r *Request) {
r.lazy = true
}
}

// Scheduler abstracts a generic batching engine that accumulates an incoming
// set of Requests, executes them, and returns the error from the operation.
type Scheduler interface {
// Execute schedules a Request for execution with the next available
// batch. This method blocks until the the underlying closure has been
// run against the databse. The resulting error is returned to the
// run against the database. The resulting error is returned to the
// caller.
Execute(req *Request) error
}
6 changes: 6 additions & 0 deletions batch/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func (s *TimeScheduler) Execute(r *Request) error {
time.AfterFunc(s.duration, s.b.trigger)
}
s.b.reqs = append(s.b.reqs, &req)

// If this is a non-lazy request, we'll execute the batch immediately.
if !r.lazy {
go s.b.trigger()
}

s.mu.Unlock()

// Wait for the batch to process the request. If the batch didn't
Expand Down
43 changes: 34 additions & 9 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,20 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
// channel update.
//
// TODO(roasbeef): also need sig of announcement
func (c *ChannelGraph) AddLightningNode(node *LightningNode) error {
return c.nodeScheduler.Execute(&batch.Request{
func (c *ChannelGraph) AddLightningNode(node *LightningNode,
op ...batch.SchedulerOption) error {

r := &batch.Request{
Update: func(tx kvdb.RwTx) error {
return addLightningNode(tx, node)
},
})
}

for _, f := range op {
f(r)
}

return c.nodeScheduler.Execute(r)
}

func addLightningNode(tx kvdb.RwTx, node *LightningNode) error {
Expand Down Expand Up @@ -588,9 +596,11 @@ func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket,
// involved in creation of the channel, and the set of features that the channel
// supports. The chanPoint and chanID are used to uniquely identify the edge
// globally within the database.
func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo,
op ...batch.SchedulerOption) error {

var alreadyExists bool
return c.chanScheduler.Execute(&batch.Request{
r := &batch.Request{
Reset: func() {
alreadyExists = false
},
Expand Down Expand Up @@ -618,7 +628,13 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
return nil
}
},
})
}

for _, f := range op {
f(r)
}

return c.chanScheduler.Execute(r)
}

// addChannelEdge is the private form of AddChannelEdge that allows callers to
Expand Down Expand Up @@ -1994,12 +2010,15 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex,
// updated, otherwise it's the second node's information. The node ordering is
// determined by the lexicographical ordering of the identity public keys of the
// nodes on either side of the channel.
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy,
op ...batch.SchedulerOption) error {

var (
isUpdate1 bool
edgeNotFound bool
)
return c.chanScheduler.Execute(&batch.Request{

r := &batch.Request{
Reset: func() {
isUpdate1 = false
edgeNotFound = false
Expand Down Expand Up @@ -2028,7 +2047,13 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
return nil
}
},
})
}

for _, f := range op {
f(r)
}

return c.chanScheduler.Execute(r)
}

func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) {
Expand Down
20 changes: 15 additions & 5 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
Expand Down Expand Up @@ -1468,7 +1469,9 @@ func (d *AuthenticatedGossiper) processRejectedEdge(

// addNode processes the given node announcement, and adds it to our channel
// graph.
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error {
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
op ...batch.SchedulerOption) error {

if err := routing.ValidateNodeAnn(msg); err != nil {
return fmt.Errorf("unable to validate node announcement: %v",
err)
Expand All @@ -1488,7 +1491,7 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error {
ExtraOpaqueData: msg.ExtraOpaqueData,
}

return d.cfg.Router.AddNode(node)
return d.cfg.Router.AddNode(node, op...)
}

// processNetworkAnnouncement processes a new network relate authenticated
Expand All @@ -1505,6 +1508,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return chanID.BlockHeight+delta > d.bestHeight
}

// If this is a remote update, we set the scheduler option to lazily
// add it to the graph.
var schedulerOp []batch.SchedulerOption
if nMsg.isRemote {
schedulerOp = append(schedulerOp, batch.LazyAdd())
}

var announcements []networkMsg

switch msg := nMsg.msg.(type) {
Expand All @@ -1523,7 +1533,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil
}

if err := d.addNode(msg); err != nil {
if err := d.addNode(msg, schedulerOp...); err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {

Expand Down Expand Up @@ -1681,7 +1691,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// writes to the DB.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
if err := d.cfg.Router.AddEdge(edge); err != nil {
if err := d.cfg.Router.AddEdge(edge, schedulerOp...); err != nil {
// If the edge was rejected due to already being known,
// then it may be that case that this new message has a
// fresh channel proof, so we'll check.
Expand Down Expand Up @@ -2031,7 +2041,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
ExtraOpaqueData: msg.ExtraOpaqueData,
}

if err := d.cfg.Router.UpdateEdge(update); err != nil {
if err := d.cfg.Router.UpdateEdge(update, schedulerOp...); err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
log.Debug(err)
Expand Down
13 changes: 10 additions & 3 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
Expand Down Expand Up @@ -115,15 +116,19 @@ func newMockRouter(height uint32) *mockGraphSource {

var _ routing.ChannelGraphSource = (*mockGraphSource)(nil)

func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error {
func (r *mockGraphSource) AddNode(node *channeldb.LightningNode,
_ ...batch.SchedulerOption) error {

r.mu.Lock()
defer r.mu.Unlock()

r.nodes = append(r.nodes, *node)
return nil
}

func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error {
func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo,
_ ...batch.SchedulerOption) error {

r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -135,7 +140,9 @@ func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error {
return nil
}

func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error {
func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy,
_ ...batch.SchedulerOption) error {

r.mu.Lock()
defer r.mu.Unlock()

Expand Down
37 changes: 25 additions & 12 deletions routing/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-errors/errors"

sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/clock"
Expand Down Expand Up @@ -82,20 +83,20 @@ type ChannelGraphSource interface {
// AddNode is used to add information about a node to the router
// database. If the node with this pubkey is not present in an existing
// channel, it will be ignored.
AddNode(node *channeldb.LightningNode) error
AddNode(node *channeldb.LightningNode, op ...batch.SchedulerOption) error

// AddEdge is used to add edge/channel to the topology of the router,
// after all information about channel will be gathered this
// edge/channel might be used in construction of payment path.
AddEdge(edge *channeldb.ChannelEdgeInfo) error
AddEdge(edge *channeldb.ChannelEdgeInfo, op ...batch.SchedulerOption) error

// AddProof updates the channel edge info with proof which is needed to
// properly announce the edge to the rest of the network.
AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error

// UpdateEdge is used to update edge information, without this message
// edge considered as not fully constructed.
UpdateEdge(policy *channeldb.ChannelEdgePolicy) error
UpdateEdge(policy *channeldb.ChannelEdgePolicy, op ...batch.SchedulerOption) error

// IsStaleNode returns true if the graph source has a node announcement
// for the target node with a more recent timestamp. This method will
Expand Down Expand Up @@ -957,7 +958,7 @@ func (r *ChannelRouter) networkHandler() {
// this is either a new update from our PoV or
// an update to a prior vertex/edge we
// previously accepted.
err = r.processUpdate(update.msg)
err = r.processUpdate(update.msg, update.op...)
update.err <- err

// If this message had any dependencies, then
Expand Down Expand Up @@ -1176,7 +1177,9 @@ func (r *ChannelRouter) assertNodeAnnFreshness(node route.Vertex,
// channel/edge update network update. If the update didn't affect the internal
// state of the draft due to either being out of date, invalid, or redundant,
// then error is returned.
func (r *ChannelRouter) processUpdate(msg interface{}) error {
func (r *ChannelRouter) processUpdate(msg interface{},
op ...batch.SchedulerOption) error {

switch msg := msg.(type) {
case *channeldb.LightningNode:
// Before we add the node to the database, we'll check to see
Expand All @@ -1187,7 +1190,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
return err
}

if err := r.cfg.Graph.AddLightningNode(msg); err != nil {
if err := r.cfg.Graph.AddLightningNode(msg, op...); err != nil {
return errors.Errorf("unable to add node %v to the "+
"graph: %v", msg.PubKeyBytes, err)
}
Expand Down Expand Up @@ -1219,7 +1222,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
// short-circuit our path straight to adding the edge to our
// graph.
if r.cfg.AssumeChannelValid {
if err := r.cfg.Graph.AddChannelEdge(msg); err != nil {
if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil {
return fmt.Errorf("unable to add edge: %v", err)
}
log.Tracef("New channel discovered! Link "+
Expand Down Expand Up @@ -1291,7 +1294,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
// after commitment fees are dynamic.
msg.Capacity = btcutil.Amount(chanUtxo.Value)
msg.ChannelPoint = *fundingPoint
if err := r.cfg.Graph.AddChannelEdge(msg); err != nil {
if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil {
return errors.Errorf("unable to add edge: %v", err)
}

Expand Down Expand Up @@ -1390,7 +1393,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
// Now that we know this isn't a stale update, we'll apply the
// new edge policy to the proper directional edge within the
// channel graph.
if err = r.cfg.Graph.UpdateEdgePolicy(msg); err != nil {
if err = r.cfg.Graph.UpdateEdgePolicy(msg, op...); err != nil {
err := errors.Errorf("unable to add channel: %v", err)
log.Error(err)
return err
Expand Down Expand Up @@ -1444,6 +1447,7 @@ func (r *ChannelRouter) fetchFundingTx(
// error channel.
type routingMsg struct {
msg interface{}
op []batch.SchedulerOption
err chan error
}

Expand Down Expand Up @@ -2140,9 +2144,12 @@ func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate,
// be ignored.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) AddNode(node *channeldb.LightningNode) error {
func (r *ChannelRouter) AddNode(node *channeldb.LightningNode,
op ...batch.SchedulerOption) error {

rMsg := &routingMsg{
msg: node,
op: op,
err: make(chan error, 1),
}

Expand All @@ -2164,9 +2171,12 @@ func (r *ChannelRouter) AddNode(node *channeldb.LightningNode) error {
// in construction of payment path.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error {
func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo,
op ...batch.SchedulerOption) error {

rMsg := &routingMsg{
msg: edge,
op: op,
err: make(chan error, 1),
}

Expand All @@ -2187,9 +2197,12 @@ func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error {
// considered as not fully constructed.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) UpdateEdge(update *channeldb.ChannelEdgePolicy) error {
func (r *ChannelRouter) UpdateEdge(update *channeldb.ChannelEdgePolicy,
op ...batch.SchedulerOption) error {

rMsg := &routingMsg{
msg: update,
op: op,
err: make(chan error, 1),
}

Expand Down

0 comments on commit 355b6a2

Please sign in to comment.