Skip to content

Commit

Permalink
fix(requestmanager): correct cancel request sending
Browse files Browse the repository at this point in the history
make sure not to send cancel when other peer has already sent terminal status
  • Loading branch information
hannahhoward committed Sep 24, 2021
1 parent b80966d commit d5bad97
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
4 changes: 3 additions & 1 deletion requestmanager/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.
log.Debugw("beginning request execution", "id", requestTask.Request.ID(), "peer", pid.String(), "root_cid", requestTask.Request.Root().String())
err := e.traverse(requestTask)
if err != nil {
e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID()))
if !isContextErr(err) {
e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID()))
}
if !isContextErr(err) && !isPausedErr(err) {
select {
case <-requestTask.Ctx.Done():
Expand Down
8 changes: 2 additions & 6 deletions requestmanager/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ func TestRequestExecutionBlockChain(t *testing.T) {
verifyResults: func(t *testing.T, tbc *testutil.TestBlockChain, ree *requestExecutionEnv, responses []graphsync.ResponseProgress, receivedErrors []error) {
tbc.VerifyResponseRangeSync(responses, 0, 6)
require.Empty(t, receivedErrors)
require.Len(t, ree.requestsSent, 2)
require.Equal(t, ree.request, ree.requestsSent[0].request)
require.True(t, ree.requestsSent[1].request.IsCancel())
require.Equal(t, []requestSent{{ree.p, ree.request}}, ree.requestsSent)
require.Len(t, ree.blookHooksCalled, 6)
require.EqualError(t, ree.terminalError, ipldutil.ContextCancelError{}.Error())
},
Expand Down Expand Up @@ -265,7 +263,6 @@ type pauseKey struct {
type requestExecutionEnv struct {
// params
ctx context.Context
cancelFn func()
request gsmsg.GraphSyncRequest
p peer.ID
blockHookResults map[blockHookKey]hooks.UpdateResult
Expand All @@ -276,7 +273,6 @@ type requestExecutionEnv struct {
traverser ipldutil.Traverser
inProgressErr chan error
initialRequest bool
empty bool

// results
requestsSent []requestSent
Expand Down Expand Up @@ -307,7 +303,7 @@ func (ree *requestExecutionEnv) GetRequestTask(_ peer.ID, _ *peertask.Task, requ
Traverser: ree.traverser,
P: ree.p,
InProgressErr: ree.inProgressErr,
Empty: ree.empty,
Empty: false,
InitialRequest: ree.initialRequest,
}
go func() {
Expand Down
4 changes: 3 additions & 1 deletion requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, onTermina
if onTerminated != nil {
inProgressRequestStatus.onTerminated = append(inProgressRequestStatus.onTerminated, onTerminated)
}
rm.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(requestID))
rm.cancelOnError(requestID, inProgressRequestStatus, terminalError)
}

Expand Down Expand Up @@ -263,6 +264,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg
if !ok {
return false
}
rm.SendRequest(requestStatus.p, gsmsg.CancelRequest(response.RequestID()))
rm.cancelOnError(response.RequestID(), requestStatus, result.Err)
return false
}
Expand Down Expand Up @@ -333,7 +335,7 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error {
if !ok {
return graphsync.RequestNotFoundErr{}
}
if inProgressRequestStatus.state != running {
if inProgressRequestStatus.state == paused {
return errors.New("request is already paused")
}
select {
Expand Down

0 comments on commit d5bad97

Please sign in to comment.