Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request limits #224

Merged
merged 8 commits into from
Sep 28, 2021
53 changes: 36 additions & 17 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import (
"github.com/ipfs/go-graphsync/peermanager"
"github.com/ipfs/go-graphsync/requestmanager"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
"github.com/ipfs/go-graphsync/requestmanager/executor"
requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager"
responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
"github.com/ipfs/go-graphsync/responsemanager/responseassembler"
"github.com/ipfs/go-graphsync/selectorvalidator"
"github.com/ipfs/go-graphsync/taskqueue"
)

var log = logging.Logger("graphsync")
Expand All @@ -40,6 +42,8 @@ type GraphSync struct {
requestManager *requestmanager.RequestManager
responseManager *responsemanager.ResponseManager
asyncLoader *asyncloader.AsyncLoader
requestQueue taskqueue.TaskQueue
requestExecutor *executor.Executor
responseAssembler *responseassembler.ResponseAssembler
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
Expand All @@ -63,12 +67,13 @@ type GraphSync struct {
}

type graphsyncConfigOptions struct {
totalMaxMemoryResponder uint64
maxMemoryPerPeerResponder uint64
totalMaxMemoryRequestor uint64
maxMemoryPerPeerRequestor uint64
maxInProgressRequests uint64
registerDefaultValidator bool
totalMaxMemoryResponder uint64
maxMemoryPerPeerResponder uint64
totalMaxMemoryRequestor uint64
maxMemoryPerPeerRequestor uint64
maxInProgressIncomingRequests uint64
maxInProgressOutgoingRequests uint64
registerDefaultValidator bool
}

// Option defines the functional option type that can be used to configure
Expand Down Expand Up @@ -115,11 +120,19 @@ func MaxMemoryPerPeerRequestor(maxMemoryPerPeer uint64) Option {
}
}

// MaxInProgressRequests changes the maximum number of
// MaxInProgressIncomingRequests changes the maximum number of
// graphsync requests that are processed in parallel (default 6)
func MaxInProgressRequests(maxInProgressRequests uint64) Option {
func MaxInProgressIncomingRequests(maxInProgressIncomingRequests uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxInProgressRequests = maxInProgressRequests
gs.maxInProgressIncomingRequests = maxInProgressIncomingRequests
}
}

// MaxInProgressOutgoingRequests changes the maximum number of
// graphsync requests that are processed in parallel (default 6)
func MaxInProgressOutgoingRequests(maxInProgressOutgoingRequests uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxInProgressOutgoingRequests = maxInProgressOutgoingRequests
}
}

Expand All @@ -130,12 +143,13 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
ctx, cancel := context.WithCancel(parent)

gsConfig := &graphsyncConfigOptions{
totalMaxMemoryResponder: defaultTotalMaxMemory,
maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer,
totalMaxMemoryRequestor: defaultTotalMaxMemory,
maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer,
maxInProgressRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
totalMaxMemoryResponder: defaultTotalMaxMemory,
maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer,
totalMaxMemoryRequestor: defaultTotalMaxMemory,
maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer,
maxInProgressIncomingRequests: defaultMaxInProgressRequests,
maxInProgressOutgoingRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
}
for _, option := range options {
option(gsConfig)
Expand Down Expand Up @@ -164,16 +178,20 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor)

asyncLoader := asyncloader.New(ctx, linkSystem, requestAllocator)
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
requestQueue := taskqueue.NewTaskQueue(ctx)
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, requestQueue)
requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks, asyncLoader.AsyncLoad)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressIncomingRequests)
graphSync := &GraphSync{
network: network,
linkSystem: linkSystem,
requestManager: requestManager,
responseManager: responseManager,
asyncLoader: asyncLoader,
requestQueue: requestQueue,
requestExecutor: requestExecutor,
responseAssembler: responseAssembler,
peerTaskQueue: peerTaskQueue,
peerManager: peerManager,
Expand All @@ -198,6 +216,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,

requestManager.SetDelegate(peerManager)
requestManager.Startup()
requestQueue.Startup(gsConfig.maxInProgressOutgoingRequests, requestExecutor)
responseManager.Startup()
network.SetDelegate((*graphSyncReceiver)(graphSync))
return graphSync
Expand Down
4 changes: 2 additions & 2 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,15 @@ func TestPauseResumeRequest(t *testing.T) {

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyResponseRange(ctx, progressChan, 0, stopPoint-1)
blockChain.VerifyResponseRange(ctx, progressChan, 0, stopPoint)
timer := time.NewTimer(100 * time.Millisecond)
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)

requestID := <-requestIDChan
err := requestor.UnpauseRequest(requestID, td.extensionUpdate)
require.NoError(t, err)

blockChain.VerifyRemainder(ctx, progressChan, stopPoint-1)
blockChain.VerifyRemainder(ctx, progressChan, stopPoint)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")
}
Expand Down
76 changes: 40 additions & 36 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,25 @@ import (

"github.com/hannahhoward/go-pubsub"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipfs/go-graphsync/listeners"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/messagequeue"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/requestmanager/executor"
"github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/requestmanager/types"
"github.com/ipfs/go-graphsync/taskqueue"
)

// The code in this file implements the public interface of the request manager.
Expand All @@ -35,17 +41,30 @@ const (
defaultPriority = graphsync.Priority(0)
)

type state uint64

const (
queued state = iota
running
paused
)

type inProgressRequestStatus struct {
ctx context.Context
startTime time.Time
cancelFn func()
p peer.ID
terminalError chan error
resumeMessages chan []graphsync.ExtensionData
pauseMessages chan struct{}
paused bool
lastResponse atomic.Value
onTerminated []chan<- error
ctx context.Context
startTime time.Time
cancelFn func()
p peer.ID
terminalError error
pauseMessages chan struct{}
state state
lastResponse atomic.Value
onTerminated []chan<- error
request gsmsg.GraphSyncRequest
doNotSendCids *cid.Set
nodeStyleChooser traversal.LinkTargetNodePrototypeChooser
inProgressChan chan graphsync.ResponseProgress
inProgressErr chan error
traverser ipldutil.Traverser
Comment on lines +53 to +67
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a lot of state for a request status.
can none of these be de-duped? e.g. can you not learn the peer.ID from the request?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope unfortuantely you can't.

Thhere are a lot of things to track, but also, I'm totally worried. the block memory trumps this in terms of memory consumption.

}

// PeerHandler is an interface that can send requests to peers
Expand Down Expand Up @@ -81,8 +100,8 @@ type RequestManager struct {
inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
requestHooks RequestHooks
responseHooks ResponseHooks
blockHooks BlockHooks
networkErrorListeners *listeners.NetworkErrorListeners
requestQueue taskqueue.TaskQueue
}

type requestManagerMessage interface {
Expand All @@ -99,19 +118,14 @@ type ResponseHooks interface {
ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) hooks.UpdateResult
}

// BlockHooks run for each block loaded
type BlockHooks interface {
ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) hooks.UpdateResult
}

// New generates a new request manager from a context, network, and selectorQuerier
func New(ctx context.Context,
asyncLoader AsyncLoader,
linkSystem ipld.LinkSystem,
requestHooks RequestHooks,
responseHooks ResponseHooks,
blockHooks BlockHooks,
networkErrorListeners *listeners.NetworkErrorListeners,
requestQueue taskqueue.TaskQueue,
) *RequestManager {
ctx, cancel := context.WithCancel(ctx)
return &RequestManager{
Expand All @@ -125,8 +139,8 @@ func New(ctx context.Context,
inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
requestHooks: requestHooks,
responseHooks: responseHooks,
blockHooks: blockHooks,
networkErrorListeners: networkErrorListeners,
requestQueue: requestQueue,
}
}

Expand Down Expand Up @@ -227,7 +241,7 @@ func (rm *RequestManager) cancelRequestAndClose(requestID graphsync.RequestID,
cancelMessageChannel := rm.messages
for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
select {
case cancelMessageChannel <- &cancelRequestMessage{requestID, false, nil, nil}:
case cancelMessageChannel <- &cancelRequestMessage{requestID, nil, nil}:
cancelMessageChannel = nil
// clear out any remaining responses, in case and "incoming reponse"
// messages get processed before our cancel message
Expand All @@ -248,7 +262,7 @@ func (rm *RequestManager) cancelRequestAndClose(requestID graphsync.RequestID,
// CancelRequest cancels the given request ID and waits for the request to terminate
func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
terminated := make(chan error, 1)
rm.send(&cancelRequestMessage{requestID, false, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
rm.send(&cancelRequestMessage{requestID, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand Down Expand Up @@ -289,24 +303,14 @@ func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error {
}
}

// ProcessBlockHooks processes block hooks for the given response & block and cancels
// the request as needed
func (rm *RequestManager) ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) error {
result := rm.blockHooks.ProcessBlockHooks(p, response, block)
if len(result.Extensions) > 0 {
updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
rm.SendRequest(p, updateRequest)
}
if result.Err != nil {
_, isPause := result.Err.(hooks.ErrPaused)
rm.send(&cancelRequestMessage{response.RequestID(), isPause, nil, nil}, nil)
}
return result.Err
// GetRequestTask gets data for the given task in the request queue
func (rm *RequestManager) GetRequestTask(p peer.ID, task *peertask.Task, requestExecutionChan chan executor.RequestTask) {
rm.send(&getRequestTaskMessage{p, task, requestExecutionChan}, nil)
}

// TerminateRequest marks a request done
func (rm *RequestManager) TerminateRequest(requestID graphsync.RequestID) {
rm.send(&terminateRequestMessage{requestID}, nil)
// ReleaseRequestTask releases a task request the requestQueue
func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err error) {
rm.send(&releaseRequestTaskMessage{p, task, err}, nil)
}

// SendRequest sends a request to the message queue
Expand Down
Loading