Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request limits #224

Merged
merged 8 commits into from
Sep 28, 2021
Merged

Add request limits #224

merged 8 commits into from
Sep 28, 2021

Conversation

hannahhoward
Copy link
Collaborator

Goals

implement #215

Implementation

This is not a super simple PR as adding request limits required some sizable lifting to the request manager operations.

Here are the substantive changes:

  • Use a PeerTaskQueue to handle rate limiting and request distribution
  • I find myself writing the worker function to consume a PeerTaskQueue over and over... so I wrote an abstraction to do it. And actually, this is useful cause we can make it an interface and implement other versions -- like an execute immediately, or maybe one that scales with demand, or handles multiple types of tasks, or distributes different requests to different queues... who knows!
  • Refactor the executor to simply execute a single request till it pauses or finishes. I've removed the "resume" code from the executor which makes it much simpler -- now when you resume you just put the request back in the queue and pick up where you left off
  • This means a bit more work is done in the request manager setting up the request and holding data about it.
  • At the same time, this simplifies who has control at any given time. Essentially, if a request is in the running state, that means the the executor is in control, and if you example the code carefully, the RequeustManager doesn't modify anything about the request other than to relay pause messages to the executor, and to cancel the request context to cause the request to terminate. There is during this time no modifications directly to the inProgressRequestStatus. Once the request exits a running state, it either becomes paused or is removed cause it's finished.
  • This means we have fairly universal behavior for parts of the code that need to abort the request -- if the request is running, they only cancel the context and let the executor finish before cleanup. If the request is not running, they perform full cleanup.
  • I unified the request cleanup process cause it was kind of all over the map :)
  • one thing that may be slightly confusing is the "terminalError" property (though it's less bad than it used to be). Essentially, this is used to attach one additional error to the error stream at the end of the request.
  • also obviously add startup options to configure the number of in progress requests

For discussion

There are several obvious "next steps"/refactors that are NOT included in this PR to reduce the already complicated changeset.

  • The ResponseManager should be refactored to use the TaskQueue. I just haven't done this to reduce the change set
  • The OutgoingRequestHook is called at the point the request is QUEUED, rather than the moment it actually starts. Similar to the ResponseManager, we may need to add an event for "the request is started"

There is one very small breaking change to graphsyncs little known and extremely underused pause request feature -- see #160 for the suggestion this shouldn't even exist. Previously, if you paused a request, it would pause on the next block load, but it would NOT return the IPLD responses for the last block. Now it does return IPLD responses. This is better behavior anyway, and I'm pretty sure no one is using this feature.

While not breaking, requests not executing immediately is a potential source of unexpected behavior and this PR, once merged to master, should be thouroughly vetted in go-data-transfer and lotus prior to merge into higher level products

setup a general purpose task queue, refactor functioning of requestmanger -- dispatchign requests to
the queue, feeding request data when those queues resume
use a state var to track operational model
make sure not to send cancel when other peer has already sent terminal status
@hannahhoward hannahhoward force-pushed the feat/request-limits-refactored branch from dcff717 to d5bad97 Compare September 24, 2021 04:10
@hannahhoward hannahhoward changed the base branch from refactor/cleanup-complex-components to main September 24, 2021 04:10
if !isContextErr(err) {
e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID()))
}
if !isContextErr(err) && !isPausedErr(err) {
Copy link
Member

@rvagg rvagg Sep 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could skip isContextErr check here by nesting in the above block, but also is this logic of these two blocks and order correct here? The previous form in run doesn't check isPausedErr and it seems strange to skip passing in the error if that's the case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous form would not end on IsPausedError -- it handled the pause/resume internally and the request all the way to the end.

Copy link
Member

@rvagg rvagg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My lgtm isn't worth much here but I don't see any obvious problems.
I like the TaskQueue code though, 👌, fwiw.

Comment on lines +53 to +67
ctx context.Context
startTime time.Time
cancelFn func()
p peer.ID
terminalError error
pauseMessages chan struct{}
state state
lastResponse atomic.Value
onTerminated []chan<- error
request gsmsg.GraphSyncRequest
doNotSendCids *cid.Set
nodeStyleChooser traversal.LinkTargetNodePrototypeChooser
inProgressChan chan graphsync.ResponseProgress
inProgressErr chan error
traverser ipldutil.Traverser
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a lot of state for a request status.
can none of these be de-duped? e.g. can you not learn the peer.ID from the request?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope unfortuantely you can't.

Thhere are a lot of things to track, but also, I'm totally worried. the block memory trumps this in terms of memory consumption.

)

var log = logging.Logger("gs_request_executor")

// Manager are the Executor uses to interact with the request manager
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grammar

re.restartNeeded = true
return nil
func (e *Executor) advanceTraversal(rt RequestTask, result types.AsyncLoadResult) error {
if result.Err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could result.Err also be a traversal.SkipMe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not.

}
request = rt.Request.ReplaceExtensions([]graphsync.ExtensionData{{Name: graphsync.ExtensionDoNotSendCIDs, Data: cidsData}})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this extension need to be tightly interlinked in this logic, rather than abstracted to a general 'extension' interface?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes cause graphsync directly manages and updates it for pause/resume. BUT, it will still accept the version that is passed in with the call to graphsync.

requestmanager/requestmanager_test.go Outdated Show resolved Hide resolved
// that pop tasks and execute them
type WorkerTaskQueue struct {
ctx context.Context
cancelFn func()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context.CancelFunc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see external ticket but I will address shortly.

@hannahhoward hannahhoward force-pushed the feat/request-limits-refactored branch from c7ed81e to 5e9dd10 Compare September 27, 2021 21:59
add precautionary check to avoid send on close channel
@hannahhoward hannahhoward force-pushed the feat/request-limits-refactored branch 2 times, most recently from c892fe8 to 81adab5 Compare September 28, 2021 14:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants