Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(responsemanager): add listener for completed responses #64

Merged
merged 1 commit into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, h
// It receives an interface to taking further action on the response
type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions)

// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)

// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()

Expand All @@ -243,6 +246,9 @@ type GraphExchange interface {
// RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received
RegisterRequestUpdatedHook(hook OnRequestUpdatedHook) UnregisterHookFunc

// RegisterCompletedResponseListener adds a listener on the responder for completed responses
RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
UnpauseResponse(peer.ID, RequestID) error
}
10 changes: 9 additions & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type GraphSync struct {
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
persistenceOptions *persistenceoptions.PersistenceOptions
Expand Down Expand Up @@ -84,7 +85,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks)
completedResponseListeners := responderhooks.NewCompletedResponseListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
Expand All @@ -97,6 +99,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
peerTaskQueue: peerTaskQueue,
Expand Down Expand Up @@ -161,6 +164,11 @@ func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedH
return gs.requestUpdatedHooks.Register(hook)
}

// RegisterCompletedResponseListener adds a listener on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
return gs.completedResponseListeners.Register(listener)
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.UnpauseResponse(p, requestID)
Expand Down
12 changes: 12 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ func TestGraphsyncRoundTrip(t *testing.T) {
}
})

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
Expand All @@ -217,6 +224,11 @@ func TestGraphsyncRoundTrip(t *testing.T) {
// verify extension roundtrip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")

// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)
}

func TestPauseResume(t *testing.T) {
Expand Down
53 changes: 53 additions & 0 deletions responsemanager/hooks/completedlisteners.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package hooks

import (
"sync"

"github.com/ipfs/go-graphsync"
peer "github.com/libp2p/go-libp2p-core/peer"
)

type completedListener struct {
key uint64
listener graphsync.OnResponseCompletedListener
}

// CompletedResponseListeners is a set of listeners for completed responses
type CompletedResponseListeners struct {
listenersLk sync.RWMutex
nextKey uint64
listeners []completedListener
}

// NewCompletedResponseListeners returns a new list of completed response listeners
func NewCompletedResponseListeners() *CompletedResponseListeners {
return &CompletedResponseListeners{}
}

// Register registers an listener for completed responses
func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
crl.listenersLk.Lock()
cl := completedListener{crl.nextKey, listener}
crl.nextKey++
crl.listeners = append(crl.listeners, cl)
crl.listenersLk.Unlock()
return func() {
crl.listenersLk.Lock()
defer crl.listenersLk.Unlock()
for i, matchListener := range crl.listeners {
if cl.key == matchListener.key {
crl.listeners = append(crl.listeners[:i], crl.listeners[i+1:]...)
return
}
}
}
}

// NotifyCompletedListeners runs notifies all completed listeners that a response has completed
func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
crl.listenersLk.RLock()
defer crl.listenersLk.RUnlock()
for _, listener := range crl.listeners {
listener.listener(p, request, status)
}
}
5 changes: 3 additions & 2 deletions responsemanager/peerresponsemanager/peerresponsesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type PeerResponseSender interface {
data []byte,
) graphsync.BlockData
SendExtensionData(graphsync.RequestID, graphsync.ExtensionData)
FinishRequest(requestID graphsync.RequestID)
FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode
FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode)
PauseRequest(requestID graphsync.RequestID)
}
Expand Down Expand Up @@ -147,7 +147,7 @@ func (prm *peerResponseSender) SendResponse(
}

// FinishRequest marks the given requestID as having sent all responses
func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) {
func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode {
prm.linkTrackerLk.Lock()
isComplete := prm.linkTracker.FinishRequest(requestID)
prm.linkTrackerLk.Unlock()
Expand All @@ -158,6 +158,7 @@ func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) {
status = graphsync.RequestCompletedPartial
}
prm.finish(requestID, status)
return status
}

// FinishWithError marks the given requestID as having terminated with an error
Expand Down
37 changes: 23 additions & 14 deletions responsemanager/responsemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ type UpdateHooks interface {
ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult
}

// CompletedListeners is an interface for notifying listeners that responses are complete
type CompletedListeners interface {
NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode)
}

// PeerManager is an interface that returns sender interfaces for peer responses.
type PeerManager interface {
SenderForPeer(p peer.ID) peerresponsemanager.PeerResponseSender
Expand All @@ -96,6 +101,7 @@ type ResponseManager struct {
requestHooks RequestHooks
blockHooks BlockHooks
updateHooks UpdateHooks
completedListeners CompletedListeners
messages chan responseManagerMessage
workSignal chan struct{}
ticker *time.Ticker
Expand All @@ -110,7 +116,8 @@ func New(ctx context.Context,
queryQueue QueryQueue,
requestHooks RequestHooks,
blockHooks BlockHooks,
updateHooks UpdateHooks) *ResponseManager {
updateHooks UpdateHooks,
completedListeners CompletedListeners) *ResponseManager {
ctx, cancelFn := context.WithCancel(ctx)
return &ResponseManager{
ctx: ctx,
Expand All @@ -121,6 +128,7 @@ func New(ctx context.Context,
requestHooks: requestHooks,
blockHooks: blockHooks,
updateHooks: updateHooks,
completedListeners: completedListeners,
messages: make(chan responseManagerMessage, 16),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(thawSpeed),
Expand Down Expand Up @@ -187,8 +195,9 @@ type responseDataRequest struct {
}

type finishTaskRequest struct {
key responseKey
err error
key responseKey
status graphsync.ResponseStatusCode
err error
}

type setResponseDataRequest struct {
Expand Down Expand Up @@ -231,9 +240,9 @@ func (rm *ResponseManager) processQueriesWorker() {
case <-rm.ctx.Done():
return
}
err := rm.executeTask(key, taskData)
status, err := rm.executeTask(key, taskData)
select {
case rm.messages <- &finishTaskRequest{key, err}:
case rm.messages <- &finishTaskRequest{key, status, err}:
case <-rm.ctx.Done():
}
}
Expand All @@ -243,18 +252,18 @@ func (rm *ResponseManager) processQueriesWorker() {

}

func (rm *ResponseManager) executeTask(key responseKey, taskData *responseTaskData) error {
func (rm *ResponseManager) executeTask(key responseKey, taskData *responseTaskData) (graphsync.ResponseStatusCode, error) {
var err error
loader := taskData.loader
traverser := taskData.traverser
if loader == nil || traverser == nil {
loader, traverser, err = rm.prepareQuery(taskData.ctx, key.p, taskData.request)
if err != nil {
return err
return graphsync.RequestFailedUnknown, err
}
select {
case <-rm.ctx.Done():
return nil
return graphsync.RequestFailedUnknown, errors.New("context cancelled")
case rm.messages <- &setResponseDataRequest{key, loader, traverser}:
}
}
Expand Down Expand Up @@ -291,7 +300,7 @@ func (rm *ResponseManager) executeQuery(
request gsmsg.GraphSyncRequest,
loader ipld.Loader,
traverser ipldutil.Traverser,
updateSignal chan struct{}) error {
updateSignal chan struct{}) (graphsync.ResponseStatusCode, error) {
updateChan := make(chan []gsmsg.GraphSyncRequest)
peerResponseSender := rm.peerManager.SenderForPeer(p)
err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
Expand All @@ -314,13 +323,12 @@ func (rm *ResponseManager) executeQuery(
if err != nil {
if err != hooks.ErrPaused {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
} else {
peerResponseSender.PauseRequest(request.ID())
return graphsync.RequestFailedUnknown, err
}
return err
peerResponseSender.PauseRequest(request.ID())
return graphsync.RequestPaused, err
}
peerResponseSender.FinishRequest(request.ID())
return nil
return peerResponseSender.FinishRequest(request.ID()), nil
}

func (rm *ResponseManager) checkForUpdates(
Expand Down Expand Up @@ -492,6 +500,7 @@ func (ftr *finishTaskRequest) handle(rm *ResponseManager) {
response.isPaused = true
return
}
rm.completedListeners.NotifyCompletedListeners(ftr.key.p, response.request, ftr.status)
if ftr.err != nil {
log.Infof("response failed: %w", ftr.err)
}
Expand Down
Loading