Skip to content

Commit

Permalink
test(responsemanager): add more tests for update behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Apr 20, 2020
1 parent 2e8e556 commit 30b6697
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 59 deletions.
2 changes: 2 additions & 0 deletions responsemanager/responsemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync
}
if result.Err != nil {
peerResponseSender.FinishWithError(key.requestID, graphsync.RequestFailedUnknown)
delete(rm.inProgressResponses, key)
response.cancelFn()
return
}
if result.Unpause {
Expand Down
209 changes: 150 additions & 59 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,74 +537,165 @@ func TestValidationAndExtensions(t *testing.T) {
})

t.Run("can send extension data", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})
blkIndex := 0
blockCount := 3
wait := make(chan struct{})
sent := make(chan struct{})
td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
blkIndex++
if blkIndex == blockCount {
close(sent)
<-wait
}
t.Run("when unpaused", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})
blkIndex := 0
blockCount := 3
wait := make(chan struct{})
sent := make(chan struct{})
td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
blkIndex++
if blkIndex == blockCount {
close(sent)
<-wait
}
})
td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
if _, found := updateData.Extension(td.extensionName); found {
hookActions.SendExtensionData(td.extensionResponse)
}
})
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
responseManager.synchronize()
close(wait)
var lastRequest completedRequest
testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
var receivedExtension sentExtension
testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
})
td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
if _, found := updateData.Extension(td.extensionName); found {
hookActions.SendExtensionData(td.extensionResponse)

t.Run("when paused", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})
blkIndex := 0
blockCount := 3
td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
blkIndex++
if blkIndex == blockCount {
hookActions.PauseResponse()
}
})
td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
if _, found := updateData.Extension(td.extensionName); found {
hookActions.SendExtensionData(td.extensionResponse)
}
})
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
var sentResponses []sentResponse
for i := 0; i < blockCount; i++ {
testutil.AssertDoesReceive(td.ctx, t, td.sentResponses, "should sent block")
}
testutil.AssertChannelEmpty(t, td.sentResponses, "should not send more blocks")
var pausedRequest pausedRequest
testutil.AssertReceive(td.ctx, t, td.pausedRequests, &pausedRequest, "should pause request")
require.LessOrEqual(t, len(sentResponses), blockCount)

// send update
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)

// receive data
var receivedExtension sentExtension
testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")

// should still be paused
timer := time.NewTimer(500 * time.Millisecond)
testutil.AssertDoesReceiveFirst(t, timer.C, "should not complete request while paused", td.completedRequestChan)
})
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
responseManager.synchronize()
close(wait)
var lastRequest completedRequest
testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
var receivedExtension sentExtension
testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
})

t.Run("can send errors", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
t.Run("when unpaused", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})
blkIndex := 0
blockCount := 3
wait := make(chan struct{})
sent := make(chan struct{})
td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
blkIndex++
if blkIndex == blockCount {
close(sent)
<-wait
}
})
td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
if _, found := updateData.Extension(td.extensionName); found {
hookActions.TerminateWithError(errors.New("something went wrong"))
}
})
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
responseManager.synchronize()
close(wait)
var lastRequest completedRequest
testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "request should fail")
})
blkIndex := 0
blockCount := 3
wait := make(chan struct{})
sent := make(chan struct{})
td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
blkIndex++
if blkIndex == blockCount {
close(sent)
<-wait
}
})
td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
if _, found := updateData.Extension(td.extensionName); found {
hookActions.TerminateWithError(errors.New("something went wrong"))

t.Run("when paused", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})
blkIndex := 0
blockCount := 3
td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
blkIndex++
if blkIndex == blockCount {
hookActions.PauseResponse()
}
})
td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
if _, found := updateData.Extension(td.extensionName); found {
hookActions.TerminateWithError(errors.New("something went wrong"))
}
})
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
var sentResponses []sentResponse
for i := 0; i < blockCount; i++ {
testutil.AssertDoesReceive(td.ctx, t, td.sentResponses, "should sent block")
}
testutil.AssertChannelEmpty(t, td.sentResponses, "should not send more blocks")
var pausedRequest pausedRequest
testutil.AssertReceive(td.ctx, t, td.pausedRequests, &pausedRequest, "should pause request")
require.LessOrEqual(t, len(sentResponses), blockCount)

// send update
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)

// should terminate
var lastRequest completedRequest
testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "request should fail")

// cannot unpause
err := responseManager.UnpauseResponse(td.p, td.requestID)
require.Error(t, err)
})
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
responseManager.synchronize()
close(wait)
var lastRequest completedRequest
testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "request should fail")
})

})
Expand Down

0 comments on commit 30b6697

Please sign in to comment.