Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

routing: dial back max concurrent block fetches #5043

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion routing/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package routing
import (
"bytes"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -914,7 +915,28 @@ func (r *ChannelRouter) networkHandler() {

// We'll use this validation barrier to ensure that we process all jobs
// in the proper order during parallel validation.
validationBarrier := NewValidationBarrier(1000, r.quit)
//
// NOTE: For AssumeChannelValid, we bump up the maximum number of
// concurrent validation requests since there are no blocks being
// fetched. This significantly increases the performance of IGD for
// neutrino nodes.
//
// However, we dial back to use multiple of the number of cores when
// fully validating, to avoid fetching up to 1000 blocks from the
// backend. On bitcoind, this will empirically cause massive latency
// spikes when executing this many concurrent RPC calls. Critical
// subsystems or basic rpc calls that rely on calls such as GetBestBlock
// will hang due to excessive load.
//
// See https://github.com/lightningnetwork/lnd/issues/4892.
var validationBarrier *ValidationBarrier
if r.cfg.AssumeChannelValid {
validationBarrier = NewValidationBarrier(1000, r.quit)
} else {
validationBarrier = NewValidationBarrier(
4*runtime.NumCPU(), r.quit,
)
}

for {

Expand Down
57 changes: 39 additions & 18 deletions routing/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ func (c *testCtx) RestartRouter() error {
func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGraphInstance) (
*testCtx, func(), error) {

return createTestCtxFromGraphInstanceAssumeValid(
startingHeight, graphInstance, false,
)
}

func createTestCtxFromGraphInstanceAssumeValid(startingHeight uint32,
graphInstance *testGraphInstance, assumeValid bool) (*testCtx, func(), error) {

// We'll initialize an instance of the channel router with mock
// versions of the chain and channel notifier. As we don't need to test
// any p2p functionality, the peer send and switch send messages won't
Expand Down Expand Up @@ -126,8 +134,9 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
next := atomic.AddUint64(&uniquePaymentID, 1)
return next, nil
},
PathFindingConfig: pathFindingConfig,
Clock: clock.NewTestClock(time.Unix(1, 0)),
PathFindingConfig: pathFindingConfig,
Clock: clock.NewTestClock(time.Unix(1, 0)),
AssumeChannelValid: assumeValid,
})
if err != nil {
return nil, nil, fmt.Errorf("unable to create router %v", err)
Expand Down Expand Up @@ -2034,6 +2043,15 @@ func TestPruneChannelGraphStaleEdges(t *testing.T) {
func TestPruneChannelGraphDoubleDisabled(t *testing.T) {
t.Parallel()

t.Run("no_assumechannelvalid", func(t *testing.T) {
testPruneChannelGraphDoubleDisabled(t, false)
})
t.Run("assumechannelvalid", func(t *testing.T) {
testPruneChannelGraphDoubleDisabled(t, true)
})
}

func testPruneChannelGraphDoubleDisabled(t *testing.T, assumeValid bool) {
// We'll create the following test graph so that only the last channel
// is pruned. We'll use a fresh timestamp to ensure they're not pruned
// according to that heuristic.
Expand Down Expand Up @@ -2125,34 +2143,37 @@ func TestPruneChannelGraphDoubleDisabled(t *testing.T) {
defer testGraph.cleanUp()

const startingHeight = 100
ctx, cleanUp, err := createTestCtxFromGraphInstance(
startingHeight, testGraph,
ctx, cleanUp, err := createTestCtxFromGraphInstanceAssumeValid(
startingHeight, testGraph, assumeValid,
)
if err != nil {
t.Fatalf("unable to create test context: %v", err)
}
defer cleanUp()

// All the channels should exist within the graph before pruning them.
assertChannelsPruned(t, ctx.graph, testChannels)

// If we attempt to prune them without AssumeChannelValid being set,
// none should be pruned.
if err := ctx.router.pruneZombieChans(); err != nil {
t.Fatalf("unable to prune zombie channels: %v", err)
// All the channels should exist within the graph before pruning them
// when not using AssumeChannelValid, otherwise we should have pruned
// the last channel on startup.
if !assumeValid {
assertChannelsPruned(t, ctx.graph, testChannels)
} else {
prunedChannel := testChannels[len(testChannels)-1].ChannelID
assertChannelsPruned(t, ctx.graph, testChannels, prunedChannel)
}

assertChannelsPruned(t, ctx.graph, testChannels)

// Now that AssumeChannelValid is set, we'll prune the graph again and
// the last channel should be the only one pruned.
ctx.router.cfg.AssumeChannelValid = true
if err := ctx.router.pruneZombieChans(); err != nil {
t.Fatalf("unable to prune zombie channels: %v", err)
}

prunedChannel := testChannels[len(testChannels)-1].ChannelID
assertChannelsPruned(t, ctx.graph, testChannels, prunedChannel)
// If we attempted to prune them without AssumeChannelValid being set,
// none should be pruned. Otherwise the last channel should still be
// pruned.
if !assumeValid {
assertChannelsPruned(t, ctx.graph, testChannels)
} else {
prunedChannel := testChannels[len(testChannels)-1].ChannelID
assertChannelsPruned(t, ctx.graph, testChannels, prunedChannel)
}
}

// TestFindPathFeeWeighting tests that the findPath method will properly prefer
Expand Down