Skip to content

Commit

Permalink
Add request limits (#224)
Browse files Browse the repository at this point in the history
* feat(requestmanager): add request limits

setup a general purpose task queue, refactor functioning of requestmanger -- dispatchign requests to
the queue, feeding request data when those queues resume

* refactor(taskqueue): convert to interface

* refactor(requestmanager): use state var

use a state var to track operational model

* fix(requestmanager): correct cancel request sending

make sure not to send cancel when other peer has already sent terminal status

* fix(executor): clean up comments + code

* Update requestmanager/requestmanager_test.go

Co-authored-by: Will <will.scott@protocol.ai>

* fix(requestmanager): check for context cancel

add precautionary check to avoid send on close channel

* fix(style): add information to test fail

Co-authored-by: Will <will.scott@protocol.ai>
  • Loading branch information
hannahhoward and willscott authored Sep 28, 2021
1 parent 9c1504a commit c691887
Show file tree
Hide file tree
Showing 10 changed files with 650 additions and 562 deletions.
53 changes: 36 additions & 17 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import (
"github.com/ipfs/go-graphsync/peermanager"
"github.com/ipfs/go-graphsync/requestmanager"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
"github.com/ipfs/go-graphsync/requestmanager/executor"
requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager"
responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
"github.com/ipfs/go-graphsync/responsemanager/responseassembler"
"github.com/ipfs/go-graphsync/selectorvalidator"
"github.com/ipfs/go-graphsync/taskqueue"
)

var log = logging.Logger("graphsync")
Expand All @@ -40,6 +42,8 @@ type GraphSync struct {
requestManager *requestmanager.RequestManager
responseManager *responsemanager.ResponseManager
asyncLoader *asyncloader.AsyncLoader
requestQueue taskqueue.TaskQueue
requestExecutor *executor.Executor
responseAssembler *responseassembler.ResponseAssembler
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
Expand All @@ -63,12 +67,13 @@ type GraphSync struct {
}

type graphsyncConfigOptions struct {
totalMaxMemoryResponder uint64
maxMemoryPerPeerResponder uint64
totalMaxMemoryRequestor uint64
maxMemoryPerPeerRequestor uint64
maxInProgressRequests uint64
registerDefaultValidator bool
totalMaxMemoryResponder uint64
maxMemoryPerPeerResponder uint64
totalMaxMemoryRequestor uint64
maxMemoryPerPeerRequestor uint64
maxInProgressIncomingRequests uint64
maxInProgressOutgoingRequests uint64
registerDefaultValidator bool
}

// Option defines the functional option type that can be used to configure
Expand Down Expand Up @@ -115,11 +120,19 @@ func MaxMemoryPerPeerRequestor(maxMemoryPerPeer uint64) Option {
}
}

// MaxInProgressRequests changes the maximum number of
// MaxInProgressIncomingRequests changes the maximum number of
// graphsync requests that are processed in parallel (default 6)
func MaxInProgressRequests(maxInProgressRequests uint64) Option {
func MaxInProgressIncomingRequests(maxInProgressIncomingRequests uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxInProgressRequests = maxInProgressRequests
gs.maxInProgressIncomingRequests = maxInProgressIncomingRequests
}
}

// MaxInProgressOutgoingRequests changes the maximum number of
// graphsync requests that are processed in parallel (default 6)
func MaxInProgressOutgoingRequests(maxInProgressOutgoingRequests uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxInProgressOutgoingRequests = maxInProgressOutgoingRequests
}
}

Expand All @@ -130,12 +143,13 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
ctx, cancel := context.WithCancel(parent)

gsConfig := &graphsyncConfigOptions{
totalMaxMemoryResponder: defaultTotalMaxMemory,
maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer,
totalMaxMemoryRequestor: defaultTotalMaxMemory,
maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer,
maxInProgressRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
totalMaxMemoryResponder: defaultTotalMaxMemory,
maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer,
totalMaxMemoryRequestor: defaultTotalMaxMemory,
maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer,
maxInProgressIncomingRequests: defaultMaxInProgressRequests,
maxInProgressOutgoingRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
}
for _, option := range options {
option(gsConfig)
Expand Down Expand Up @@ -164,16 +178,20 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor)

asyncLoader := asyncloader.New(ctx, linkSystem, requestAllocator)
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
requestQueue := taskqueue.NewTaskQueue(ctx)
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, requestQueue)
requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks, asyncLoader.AsyncLoad)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressIncomingRequests)
graphSync := &GraphSync{
network: network,
linkSystem: linkSystem,
requestManager: requestManager,
responseManager: responseManager,
asyncLoader: asyncLoader,
requestQueue: requestQueue,
requestExecutor: requestExecutor,
responseAssembler: responseAssembler,
peerTaskQueue: peerTaskQueue,
peerManager: peerManager,
Expand All @@ -198,6 +216,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,

requestManager.SetDelegate(peerManager)
requestManager.Startup()
requestQueue.Startup(gsConfig.maxInProgressOutgoingRequests, requestExecutor)
responseManager.Startup()
network.SetDelegate((*graphSyncReceiver)(graphSync))
return graphSync
Expand Down
4 changes: 2 additions & 2 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,15 @@ func TestPauseResumeRequest(t *testing.T) {

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyResponseRange(ctx, progressChan, 0, stopPoint-1)
blockChain.VerifyResponseRange(ctx, progressChan, 0, stopPoint)
timer := time.NewTimer(100 * time.Millisecond)
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)

requestID := <-requestIDChan
err := requestor.UnpauseRequest(requestID, td.extensionUpdate)
require.NoError(t, err)

blockChain.VerifyRemainder(ctx, progressChan, stopPoint-1)
blockChain.VerifyRemainder(ctx, progressChan, stopPoint)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")
}
Expand Down
76 changes: 40 additions & 36 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,25 @@ import (

"github.com/hannahhoward/go-pubsub"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipfs/go-graphsync/listeners"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/messagequeue"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/requestmanager/executor"
"github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/requestmanager/types"
"github.com/ipfs/go-graphsync/taskqueue"
)

// The code in this file implements the public interface of the request manager.
Expand All @@ -35,17 +41,30 @@ const (
defaultPriority = graphsync.Priority(0)
)

type state uint64

const (
queued state = iota
running
paused
)

type inProgressRequestStatus struct {
ctx context.Context
startTime time.Time
cancelFn func()
p peer.ID
terminalError chan error
resumeMessages chan []graphsync.ExtensionData
pauseMessages chan struct{}
paused bool
lastResponse atomic.Value
onTerminated []chan<- error
ctx context.Context
startTime time.Time
cancelFn func()
p peer.ID
terminalError error
pauseMessages chan struct{}
state state
lastResponse atomic.Value
onTerminated []chan<- error
request gsmsg.GraphSyncRequest
doNotSendCids *cid.Set
nodeStyleChooser traversal.LinkTargetNodePrototypeChooser
inProgressChan chan graphsync.ResponseProgress
inProgressErr chan error
traverser ipldutil.Traverser
}

// PeerHandler is an interface that can send requests to peers
Expand Down Expand Up @@ -81,8 +100,8 @@ type RequestManager struct {
inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
requestHooks RequestHooks
responseHooks ResponseHooks
blockHooks BlockHooks
networkErrorListeners *listeners.NetworkErrorListeners
requestQueue taskqueue.TaskQueue
}

type requestManagerMessage interface {
Expand All @@ -99,19 +118,14 @@ type ResponseHooks interface {
ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) hooks.UpdateResult
}

// BlockHooks run for each block loaded
type BlockHooks interface {
ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) hooks.UpdateResult
}

// New generates a new request manager from a context, network, and selectorQuerier
func New(ctx context.Context,
asyncLoader AsyncLoader,
linkSystem ipld.LinkSystem,
requestHooks RequestHooks,
responseHooks ResponseHooks,
blockHooks BlockHooks,
networkErrorListeners *listeners.NetworkErrorListeners,
requestQueue taskqueue.TaskQueue,
) *RequestManager {
ctx, cancel := context.WithCancel(ctx)
return &RequestManager{
Expand All @@ -125,8 +139,8 @@ func New(ctx context.Context,
inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
requestHooks: requestHooks,
responseHooks: responseHooks,
blockHooks: blockHooks,
networkErrorListeners: networkErrorListeners,
requestQueue: requestQueue,
}
}

Expand Down Expand Up @@ -227,7 +241,7 @@ func (rm *RequestManager) cancelRequestAndClose(requestID graphsync.RequestID,
cancelMessageChannel := rm.messages
for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
select {
case cancelMessageChannel <- &cancelRequestMessage{requestID, false, nil, nil}:
case cancelMessageChannel <- &cancelRequestMessage{requestID, nil, nil}:
cancelMessageChannel = nil
// clear out any remaining responses, in case and "incoming reponse"
// messages get processed before our cancel message
Expand All @@ -248,7 +262,7 @@ func (rm *RequestManager) cancelRequestAndClose(requestID graphsync.RequestID,
// CancelRequest cancels the given request ID and waits for the request to terminate
func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
terminated := make(chan error, 1)
rm.send(&cancelRequestMessage{requestID, false, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
rm.send(&cancelRequestMessage{requestID, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand Down Expand Up @@ -289,24 +303,14 @@ func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error {
}
}

// ProcessBlockHooks processes block hooks for the given response & block and cancels
// the request as needed
func (rm *RequestManager) ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) error {
result := rm.blockHooks.ProcessBlockHooks(p, response, block)
if len(result.Extensions) > 0 {
updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
rm.SendRequest(p, updateRequest)
}
if result.Err != nil {
_, isPause := result.Err.(hooks.ErrPaused)
rm.send(&cancelRequestMessage{response.RequestID(), isPause, nil, nil}, nil)
}
return result.Err
// GetRequestTask gets data for the given task in the request queue
func (rm *RequestManager) GetRequestTask(p peer.ID, task *peertask.Task, requestExecutionChan chan executor.RequestTask) {
rm.send(&getRequestTaskMessage{p, task, requestExecutionChan}, nil)
}

// TerminateRequest marks a request done
func (rm *RequestManager) TerminateRequest(requestID graphsync.RequestID) {
rm.send(&terminateRequestMessage{requestID}, nil)
// ReleaseRequestTask releases a task request the requestQueue
func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err error) {
rm.send(&releaseRequestTaskMessage{p, task, err}, nil)
}

// SendRequest sends a request to the message queue
Expand Down
Loading

0 comments on commit c691887

Please sign in to comment.