Skip to content

Commit

Permalink
feat(graphsync): add link limits
Browse files Browse the repository at this point in the history
add options to configure the maximum number of allowed links to traverse on the requestor and the
responder
  • Loading branch information
hannahhoward committed Sep 30, 2021
1 parent 57df640 commit 91a6ceb
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/ipfs/go-peertaskqueue v0.2.0
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.12.0
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.13.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZ
github.com/ipld/go-codec-dagpb v1.3.0 h1:czTcaoAuNNyIYWs6Qe01DJ+sEX7B+1Z0LcXjSatMGe8=
github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA=
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
github.com/ipld/go-ipld-prime v0.12.0 h1:JapyKWTsJgmhrPI7hfx4V798c/RClr85sXfBZnH1VIw=
github.com/ipld/go-ipld-prime v0.12.0/go.mod h1:hy8b93WleDMRKumOJnTIrr0MbbFbx9GD6Kzxa53Xppc=
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e h1:HPLQ9V/OFHKjfbFio8vQV+EW7lpQPj+iPl93VcwSTYM=
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e/go.mod h1:PaeLYq8k6dJLmDUSLrzkEpoGV4PEfe/1OtFN/eALOc8=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down Expand Up @@ -539,6 +539,8 @@ github.com/multiformats/go-multiaddr-net v0.2.0/go.mod h1:gGdH3UXny6U3cKKYCvpXI5
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multicodec v0.3.0 h1:tstDwfIjiHbnIjeM5Lp+pMrSeN+LCMsEwOrkPmWm03A=
github.com/multiformats/go-multicodec v0.3.0/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
Expand Down
22 changes: 20 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type graphsyncConfigOptions struct {
maxInProgressIncomingRequests uint64
maxInProgressOutgoingRequests uint64
registerDefaultValidator bool
maxLinksPerOutgoingRequest uint64
maxLinksPerIncomingRequest uint64
}

// Option defines the functional option type that can be used to configure
Expand Down Expand Up @@ -136,6 +138,22 @@ func MaxInProgressOutgoingRequests(maxInProgressOutgoingRequests uint64) Option
}
}

// MaxLinksPerOutgoingRequests changes the allowed number of links an outgoing
// request can traverse before failing
func MaxLinksPerOutgoingRequests(maxLinksPerOutgoingRequest uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxLinksPerOutgoingRequest = maxLinksPerOutgoingRequest
}
}

// MaxLinksPerIncomingRequests changes the allowed number of links an incoming
// request can traverse before failing
func MaxLinksPerIncomingRequests(maxLinksPerIncomingRequest uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxLinksPerIncomingRequest = maxLinksPerIncomingRequest
}
}

// New creates a new GraphSync Exchange on the given network,
// and the given link loader+storer.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
Expand Down Expand Up @@ -179,11 +197,11 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,

asyncLoader := asyncloader.New(ctx, linkSystem, requestAllocator)
requestQueue := taskqueue.NewTaskQueue(ctx)
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, requestQueue, network.ConnectionManager())
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, requestQueue, network.ConnectionManager(), gsConfig.maxLinksPerOutgoingRequest)
requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks, asyncLoader.AsyncLoad)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressIncomingRequests, network.ConnectionManager())
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressIncomingRequests, network.ConnectionManager(), gsConfig.maxLinksPerIncomingRequest)
graphSync := &GraphSync{
network: network,
linkSystem: linkSystem,
Expand Down
52 changes: 52 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,58 @@ func TestRejectRequestsByDefault(t *testing.T) {
testutil.VerifySingleTerminalError(ctx, t, errChan)
}

func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

var linksToTraverse uint64 = 5
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1(MaxLinksPerOutgoingRequests(linksToTraverse))

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
td.GraphSyncHost2()

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

// response budgets don't include the root block, so total links traverse with be one more than expected
blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse))
testutil.VerifySingleTerminalError(ctx, t, errChan)
require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks")
}

func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

var linksToTraverse uint64 = 5
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
td.GraphSyncHost2(MaxLinksPerIncomingRequests(linksToTraverse))

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

// response budgets don't include the root block, so total links traverse with be one more than expected
blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse))
testutil.VerifySingleTerminalError(ctx, t, errChan)
require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks")
}

func TestGraphsyncRoundTrip(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
10 changes: 10 additions & 0 deletions ipldutil/traverser.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type TraversalBuilder struct {
Visitor traversal.AdvVisitFn
LinkSystem ipld.LinkSystem
Chooser traversal.LinkTargetNodePrototypeChooser
Budget *traversal.Budget
}

// Traverser is an interface for performing a selector traversal that operates iteratively --
Expand Down Expand Up @@ -81,6 +82,7 @@ func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser {
visitor: defaultVisitor,
chooser: defaultChooser,
linkSystem: tb.LinkSystem,
budget: tb.Budget,
awaitRequest: make(chan struct{}, 1),
stateChan: make(chan state, 1),
responses: make(chan nextResponse),
Expand Down Expand Up @@ -120,6 +122,7 @@ type traverser struct {
chooser traversal.LinkTargetNodePrototypeChooser
currentLink ipld.Link
currentContext ipld.LinkContext
budget *traversal.Budget
isDone bool
completionErr error
awaitRequest chan struct{}
Expand Down Expand Up @@ -184,6 +187,12 @@ func (t *traverser) start() {
t.writeDone(err)
return
}
if t.budget != nil {
t.budget.LinkBudget--
if t.budget.LinkBudget <= 0 {
t.writeDone(&traversal.ErrBudgetExceeded{BudgetKind: "link", Link: t.root})
}
}
nd, err := t.linkSystem.Load(ipld.LinkContext{Ctx: t.ctx}, t.root, ns)
if err != nil {
t.writeDone(err)
Expand All @@ -201,6 +210,7 @@ func (t *traverser) start() {
LinkSystem: t.linkSystem,
LinkTargetNodePrototypeChooser: t.chooser,
},
Budget: t.budget,
}.WalkAdv(nd, sel, t.visitor)
t.writeDone(err)
}()
Expand Down
58 changes: 54 additions & 4 deletions ipldutil/traverser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ipldutil
import (
"bytes"
"context"
"math"
"testing"
"time"

Expand Down Expand Up @@ -56,7 +57,7 @@ func TestTraverser(t *testing.T) {
testdata.MiddleMapBlock,
testdata.LeafAlphaBlock,
testdata.LeafAlphaBlock,
})
}, nil)
})

t.Run("traverses correctly, blockchain", func(t *testing.T) {
Expand Down Expand Up @@ -86,13 +87,58 @@ func TestTraverser(t *testing.T) {
blockChain.VerifyWholeChainWithTypes(ctx, inProgressChan)
close(done)
}()
checkTraverseSequence(ctx, t, traverser, blockChain.AllBlocks())
checkTraverseSequence(ctx, t, traverser, blockChain.AllBlocks(), nil)
close(inProgressChan)
testutil.AssertDoesReceive(ctx, t, done, "should have completed verification but did not")
})

t.Run("errors correctly, with budget", func(t *testing.T) {
store := make(map[ipld.Link][]byte)
persistence := testutil.NewTestStore(store)
blockChain := testutil.SetupBlockChain(ctx, t, persistence, 100, 10)
traverser := TraversalBuilder{
Root: blockChain.TipLink,
Selector: blockChain.Selector(),
Chooser: blockChain.Chooser,
LinkSystem: persistence,
Visitor: func(tp traversal.Progress, node ipld.Node, r traversal.VisitReason) error {
return nil
},
Budget: &traversal.Budget{
NodeBudget: math.MaxInt64,
LinkBudget: 6,
},
}.Start(ctx)
var path ipld.Path
for i := 0; i < 6; i++ {
path = path.AppendSegment(ipld.PathSegmentOfString("Parents"))
path = path.AppendSegment(ipld.PathSegmentOfInt(0))
}
checkTraverseSequence(ctx, t, traverser, blockChain.Blocks(0, 6), &traversal.ErrBudgetExceeded{BudgetKind: "link", Path: path, Link: blockChain.LinkTipIndex(6)})
})

t.Run("errors correctly, no budget", func(t *testing.T) {
store := make(map[ipld.Link][]byte)
persistence := testutil.NewTestStore(store)
blockChain := testutil.SetupBlockChain(ctx, t, persistence, 100, 10)
traverser := TraversalBuilder{
Root: blockChain.TipLink,
Selector: blockChain.Selector(),
Chooser: blockChain.Chooser,
LinkSystem: persistence,
Visitor: func(tp traversal.Progress, node ipld.Node, r traversal.VisitReason) error {
return nil
},
Budget: &traversal.Budget{
NodeBudget: math.MaxInt64,
LinkBudget: 0,
},
}.Start(ctx)
checkTraverseSequence(ctx, t, traverser, []blocks.Block{}, &traversal.ErrBudgetExceeded{BudgetKind: "link", Link: blockChain.TipLink})
})
}

func checkTraverseSequence(ctx context.Context, t *testing.T, traverser Traverser, expectedBlks []blocks.Block) {
func checkTraverseSequence(ctx context.Context, t *testing.T, traverser Traverser, expectedBlks []blocks.Block, finalErr error) {
for _, blk := range expectedBlks {
isComplete, err := traverser.IsComplete()
require.False(t, isComplete)
Expand All @@ -104,5 +150,9 @@ func checkTraverseSequence(ctx context.Context, t *testing.T, traverser Traverse
}
isComplete, err := traverser.IsComplete()
require.True(t, isComplete)
require.NoError(t, err)
if finalErr == nil {
require.NoError(t, err)
} else {
require.EqualError(t, err, finalErr.Error())
}
}
21 changes: 12 additions & 9 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,16 @@ type AsyncLoader interface {
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
ctx context.Context
cancel func()
messages chan requestManagerMessage
peerHandler PeerHandler
rc *responseCollector
asyncLoader AsyncLoader
disconnectNotif *pubsub.PubSub
linkSystem ipld.LinkSystem
connManager network.ConnManager
ctx context.Context
cancel func()
messages chan requestManagerMessage
peerHandler PeerHandler
rc *responseCollector
asyncLoader AsyncLoader
disconnectNotif *pubsub.PubSub
linkSystem ipld.LinkSystem
connManager network.ConnManager
maxLinksPerRequest uint64

// dont touch out side of run loop
nextRequestID graphsync.RequestID
Expand Down Expand Up @@ -129,6 +130,7 @@ func New(ctx context.Context,
networkErrorListeners *listeners.NetworkErrorListeners,
requestQueue taskqueue.TaskQueue,
connManager network.ConnManager,
maxLinksPerRequest uint64,
) *RequestManager {
ctx, cancel := context.WithCancel(ctx)
return &RequestManager{
Expand All @@ -145,6 +147,7 @@ func New(ctx context.Context,
networkErrorListeners: networkErrorListeners,
requestQueue: requestQueue,
connManager: connManager,
maxLinksPerRequest: maxLinksPerRequest,
}
}

Expand Down
2 changes: 1 addition & 1 deletion requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ func newTestData(ctx context.Context, t *testing.T) *testData {
td.networkErrorListeners = listeners.NewNetworkErrorListeners()
td.taskqueue = taskqueue.NewTaskQueue(ctx)
lsys := cidlink.DefaultLinkSystem()
td.requestManager = New(ctx, td.fal, lsys, td.requestHooks, td.responseHooks, td.networkErrorListeners, td.taskqueue, td.tcm)
td.requestManager = New(ctx, td.fal, lsys, td.requestHooks, td.responseHooks, td.networkErrorListeners, td.taskqueue, td.tcm, 0)
td.executor = executor.NewExecutor(td.requestManager, td.blockHooks, td.fal.AsyncLoad)
td.requestManager.SetDelegate(td.fph)
td.requestManager.Startup()
Expand Down
8 changes: 8 additions & 0 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
var initialRequest bool
if ipr.traverser == nil {
initialRequest = true
var budget *traversal.Budget
if rm.maxLinksPerRequest > 0 {
budget = &traversal.Budget{
NodeBudget: math.MaxInt64,
LinkBudget: int64(rm.maxLinksPerRequest),
}
}
ipr.traverser = ipldutil.TraversalBuilder{
Root: cidlink.Link{Cid: ipr.request.Root()},
Selector: ipr.request.Selector(),
Expand All @@ -118,6 +125,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
},
Chooser: ipr.nodeStyleChooser,
LinkSystem: rm.linkSystem,
Budget: budget,
}.Start(ipr.ctx)
}

Expand Down
3 changes: 3 additions & 0 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type ResponseManager struct {
inProgressResponses map[responseKey]*inProgressResponseStatus
maxInProcessRequests uint64
connManager network.ConnManager
maxLinksPerRequest uint64
}

// New creates a new response manager for responding to requests
Expand All @@ -172,6 +173,7 @@ func New(ctx context.Context,
networkErrorListeners NetworkErrorListeners,
maxInProcessRequests uint64,
connManager network.ConnManager,
maxLinksPerRequest uint64,
) *ResponseManager {
ctx, cancelFn := context.WithCancel(ctx)
messages := make(chan responseManagerMessage, 16)
Expand All @@ -194,6 +196,7 @@ func New(ctx context.Context,
inProgressResponses: make(map[responseKey]*inProgressResponseStatus),
maxInProcessRequests: maxInProcessRequests,
connManager: connManager,
maxLinksPerRequest: maxLinksPerRequest,
}
rm.qe = &queryExecutor{
blockHooks: blockHooks,
Expand Down
Loading

0 comments on commit 91a6ceb

Please sign in to comment.