Skip to content

Commit

Permalink
add extensions on complete (#76)
Browse files Browse the repository at this point in the history
* feat(hooks): add extensions on complete

add the ability to send extensions with a final message

* refactor(hooks): complete listener->hook rename
  • Loading branch information
hannahhoward authored Jul 15, 2020
1 parent caa872f commit 31cc0d5
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 96 deletions.
14 changes: 10 additions & 4 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ type RequestUpdatedHookActions interface {
UnpauseResponse()
}

// ResponseCompletedHookActions are actions that can be taken in response completed hook to add a
// final extension on a response
type ResponseCompletedHookActions interface {
SendExtensionData(ExtensionData)
}

// OnIncomingRequestHook is a hook that runs each time a new request is received.
// It receives the peer that sent the request and all data about the request.
// It receives an interface for customizing the response to this request
Expand Down Expand Up @@ -272,8 +278,8 @@ type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, h
// It receives an interface to taking further action on the response
type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions)

// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)
// OnResponseCompletedHook provides a way to listen for when responder has finished serving a response
type OnResponseCompletedHook func(p peer.ID, request RequestData, status ResponseStatusCode, hookActions ResponseCompletedHookActions)

// OnRequestorCancelledListener provides a way to listen for responses the requestor canncels
type OnRequestorCancelledListener func(p peer.ID, request RequestData)
Expand Down Expand Up @@ -307,8 +313,8 @@ type GraphExchange interface {
// RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received
RegisterRequestUpdatedHook(hook OnRequestUpdatedHook) UnregisterHookFunc

// RegisterCompletedResponseListener adds a listener on the responder for completed responses
RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc
// RegisterCompletedResponseHook adds a hook on the responder for completed responses
RegisterCompletedResponseHook(hook OnResponseCompletedHook) UnregisterHookFunc

// RegisterRequestorCancelledListener adds a listener on the responder for
// responses cancelled by the requestor
Expand Down
14 changes: 7 additions & 7 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type GraphSync struct {
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
completedResponseHooks *responderhooks.CompletedResponseHooks
requestorCancelledListeners *responderhooks.RequestorCancelledListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
Expand Down Expand Up @@ -88,9 +88,9 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
completedResponseListeners := responderhooks.NewCompletedResponseListeners()
completedResponseHooks := responderhooks.NewCompletedResponseHooks()
requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners)
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseHooks, requestorCancelledListeners)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
Expand All @@ -103,7 +103,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
completedResponseHooks: completedResponseHooks,
requestorCancelledListeners: requestorCancelledListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
Expand Down Expand Up @@ -170,9 +170,9 @@ func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedH
return gs.requestUpdatedHooks.Register(hook)
}

// RegisterCompletedResponseListener adds a listener on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
return gs.completedResponseListeners.Register(listener)
// RegisterCompletedResponseHook adds a hook on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseHook(hook graphsync.OnResponseCompletedHook) graphsync.UnregisterHookFunc {
return gs.completedResponseHooks.Register(hook)
}

// RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store)
Expand Down
25 changes: 17 additions & 8 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ func TestGraphsyncRoundTrip(t *testing.T) {
// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

var receivedResponseData []byte
var receivedResponseData [][]byte
var receivedRequestData []byte

requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = data
receivedResponseData = append(receivedResponseData, data)
}
})

Expand All @@ -213,7 +213,8 @@ func TestGraphsyncRoundTrip(t *testing.T) {
})

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
responder.RegisterCompletedResponseHook(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) {
hookActions.SendExtensionData(td.extensionFinal)
select {
case finalResponseStatusChan <- status:
default:
Expand All @@ -227,9 +228,11 @@ func TestGraphsyncRoundTrip(t *testing.T) {

// verify extension roundtrip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")
require.Len(t, receivedResponseData, 2)
require.Equal(t, td.extensionResponseData, receivedResponseData[0], "did not receive correct extension response data")
require.Equal(t, td.extensionFinalData, receivedResponseData[1], "did not receive correct extension response data")

// verify listener
// verify completed hook
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)
Expand All @@ -256,7 +259,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
responder := td.GraphSyncHost2()

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
responder.RegisterCompletedResponseHook(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) {
select {
case finalResponseStatusChan <- status:
default:
Expand All @@ -278,7 +281,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
require.Equal(t, tree.MiddleMapBlock.RawData(), td.blockStore1[tree.MiddleMapNodeLnk])
require.Equal(t, tree.RootBlock.RawData(), td.blockStore1[tree.RootNodeLnk])

// verify listener
// verify completed hook
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedPartial, finalResponseStatus)
Expand Down Expand Up @@ -820,6 +823,8 @@ type gsTestData struct {
extensionResponse graphsync.ExtensionData
extensionUpdateData []byte
extensionUpdate graphsync.ExtensionData
extensionFinalData []byte
extensionFinal graphsync.ExtensionData
}

func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
Expand Down Expand Up @@ -857,7 +862,11 @@ func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
Name: td.extensionName,
Data: td.extensionUpdateData,
}

td.extensionFinalData = testutil.RandomBytes(100)
td.extensionFinal = graphsync.ExtensionData{
Name: td.extensionName,
Data: td.extensionFinalData,
}
return td
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,59 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
)

// CompletedResponseListeners is a set of listeners for completed responses
type CompletedResponseListeners struct {
// CompletedResponseHooks is a set of hooks for completed responses
type CompletedResponseHooks struct {
pubSub *pubsub.PubSub
}

type internalCompletedResponseEvent struct {
p peer.ID
request graphsync.RequestData
status graphsync.ResponseStatusCode
cha *completeHookActions
}

func completedResponseDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalCompletedResponseEvent)
listener := subscriberFn.(graphsync.OnResponseCompletedListener)
listener(ie.p, ie.request, ie.status)
hook := subscriberFn.(graphsync.OnResponseCompletedHook)
hook(ie.p, ie.request, ie.status, ie.cha)
return nil
}

// NewCompletedResponseListeners returns a new list of completed response listeners
func NewCompletedResponseListeners() *CompletedResponseListeners {
return &CompletedResponseListeners{pubSub: pubsub.New(completedResponseDispatcher)}
// NewCompletedResponseHooks returns a new list of completed response hooks
func NewCompletedResponseHooks() *CompletedResponseHooks {
return &CompletedResponseHooks{pubSub: pubsub.New(completedResponseDispatcher)}
}

// Register registers an listener for completed responses
func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(listener))
// Register registers an hook for completed responses
func (crl *CompletedResponseHooks) Register(hook graphsync.OnResponseCompletedHook) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(hook))
}

// ProcessCompleteHooks runs notifies all completed hooks that a response has completed
func (crl *CompletedResponseHooks) ProcessCompleteHooks(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) CompleteResult {
ha := &completeHookActions{}
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status, ha})
return ha.result()
}

// CompleteResult is the outcome of running complete response hooks
type CompleteResult struct {
Extensions []graphsync.ExtensionData
}

type completeHookActions struct {
extensions []graphsync.ExtensionData
}

func (ha *completeHookActions) result() CompleteResult {
return CompleteResult{
Extensions: ha.extensions,
}
}

// NotifyCompletedListeners runs notifies all completed listeners that a response has completed
func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status})
func (ha *completeHookActions) SendExtensionData(ext graphsync.ExtensionData) {
ha.extensions = append(ha.extensions, ext)
}

// RequestorCancelledListeners is a set of listeners for when requestors cancel
Expand Down
57 changes: 57 additions & 0 deletions responsemanager/hooks/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,60 @@ func TestUpdateHookProcessing(t *testing.T) {
})
}
}

func TestCompleteHookProcessing(t *testing.T) {
extensionData := testutil.RandomBytes(100)
extensionName := graphsync.ExtensionName("AppleSauce/McGee")
extension := graphsync.ExtensionData{
Name: extensionName,
Data: extensionData,
}
extensionResponseData := testutil.RandomBytes(100)
extensionResponse := graphsync.ExtensionData{
Name: extensionName,
Data: extensionResponseData,
}

root := testutil.GenerateCids(1)[0]
requestID := graphsync.RequestID(rand.Int31())
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)
request := gsmsg.NewRequest(requestID, root, ssb.Matcher().Node(), graphsync.Priority(0), extension)
status := graphsync.RequestCompletedFull
p := testutil.GeneratePeers(1)[0]
testCases := map[string]struct {
configure func(t *testing.T, completedHooks *hooks.CompletedResponseHooks)
assert func(t *testing.T, result hooks.CompleteResult)
}{
"no hooks": {
assert: func(t *testing.T, result hooks.CompleteResult) {
require.Empty(t, result.Extensions)
},
},
"send extension data": {
configure: func(t *testing.T, completedHooks *hooks.CompletedResponseHooks) {
completedHooks.Register(func(p peer.ID, requestData graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) {
_, found := requestData.Extension(extensionName)
if found {
hookActions.SendExtensionData(extensionResponse)
}
})
},
assert: func(t *testing.T, result hooks.CompleteResult) {
require.Len(t, result.Extensions, 1)
require.Contains(t, result.Extensions, extensionResponse)
},
},
}
for testCase, data := range testCases {
t.Run(testCase, func(t *testing.T) {
completedHooks := hooks.NewCompletedResponseHooks()
if data.configure != nil {
data.configure(t, completedHooks)
}
result := completedHooks.ProcessCompleteHooks(p, request, status)
if data.assert != nil {
data.assert(t, result)
}
})
}
}
57 changes: 34 additions & 23 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type queryExecutor struct {
requestHooks RequestHooks
blockHooks BlockHooks
updateHooks UpdateHooks
completedListeners CompletedListeners
completedHooks CompletedHooks
cancelledListeners CancelledListeners
peerManager PeerManager
loader ipld.Loader
Expand Down Expand Up @@ -71,13 +71,6 @@ func (qe *queryExecutor) processQueriesWorker() {
continue
}
status, err := qe.executeTask(key, taskData)
_, isPaused := err.(hooks.ErrPaused)
isCancelled := err != nil && isContextErr(err)
if isCancelled {
qe.cancelledListeners.NotifyCancelledListeners(key.p, taskData.request)
} else if !isPaused {
qe.completedListeners.NotifyCompletedListeners(key.p, taskData.request, status)
}
select {
case qe.messages <- &finishTaskRequest{key, status, err}:
case <-qe.ctx.Done():
Expand Down Expand Up @@ -207,23 +200,41 @@ func (qe *queryExecutor) executeQuery(
})
return err
})
if err != nil {
_, isPaused := err.(hooks.ErrPaused)
if isPaused {
return graphsync.RequestPaused, err
}

var status graphsync.ResponseStatusCode
_ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
status = qe.closeRequest(transaction, err)
if isContextErr(err) {
peerResponseSender.FinishWithCancel(request.ID())
return graphsync.RequestCancelled, err
}
if err == errCancelledByCommand {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestCancelled)
return graphsync.RequestCancelled, err
qe.cancelledListeners.NotifyCancelledListeners(p, request)
} else if status != graphsync.RequestPaused {
result := qe.completedHooks.ProcessCompleteHooks(p, request, status)
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
}
}
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return graphsync.RequestFailedUnknown, err
return nil
})
return status, err
}

func (qe *queryExecutor) closeRequest(peerResponseSender peerresponsemanager.PeerResponseTransactionSender, err error) graphsync.ResponseStatusCode {
_, isPaused := err.(hooks.ErrPaused)
if isPaused {
return graphsync.RequestPaused
}
if isContextErr(err) {
peerResponseSender.FinishWithCancel()
return graphsync.RequestCancelled
}
if err == errCancelledByCommand {
peerResponseSender.FinishWithError(graphsync.RequestCancelled)
return graphsync.RequestCancelled
}
if err != nil {
peerResponseSender.FinishWithError(graphsync.RequestFailedUnknown)
return graphsync.RequestFailedUnknown
}
return peerResponseSender.FinishRequest(request.ID()), nil
return peerResponseSender.FinishRequest()
}

func (qe *queryExecutor) checkForUpdates(
Expand Down Expand Up @@ -268,5 +279,5 @@ func (qe *queryExecutor) checkForUpdates(

func isContextErr(err error) bool {
// TODO: Match with errors.Is when https://github.com/ipld/go-ipld-prime/issues/58 is resolved
return strings.Contains(err.Error(), ipldutil.ContextCancelError{}.Error())
return err != nil && strings.Contains(err.Error(), ipldutil.ContextCancelError{}.Error())
}
Loading

0 comments on commit 31cc0d5

Please sign in to comment.