Skip to content

Commit

Permalink
feat(requestmanager): report inProgressRequestCount on OutgoingReques…
Browse files Browse the repository at this point in the history
…ts event
  • Loading branch information
rvagg committed Oct 19, 2021
1 parent c30ea9d commit c96b3b2
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
2 changes: 1 addition & 1 deletion graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ type OnResponseCompletedListener func(p peer.ID, request RequestData, status Res

// OnOutgoingRequestProcessingListener is called when a request actually begins processing (reaches
// the top of the outgoing request queue)
type OnOutgoingRequestProcessingListener func(p peer.ID, request RequestData)
type OnOutgoingRequestProcessingListener func(p peer.ID, request RequestData, inProgressRequestCount int)

// OnRequestorCancelledListener provides a way to listen for responses the requestor canncels
type OnRequestorCancelledListener func(p peer.ID, request RequestData)
Expand Down
11 changes: 6 additions & 5 deletions listeners/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ type OutgoingRequestProcessingListeners struct {
}

type internalOutgoingRequestProcessingEvent struct {
p peer.ID
request graphsync.RequestData
p peer.ID
request graphsync.RequestData
inProgressRequestCount int
}

func outgoingRequestProcessingDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalOutgoingRequestProcessingEvent)
listener := subscriberFn.(graphsync.OnOutgoingRequestProcessingListener)
listener(ie.p, ie.request)
listener(ie.p, ie.request, ie.inProgressRequestCount)
return nil
}

Expand All @@ -100,8 +101,8 @@ func (bsl *OutgoingRequestProcessingListeners) Register(listener graphsync.OnOut
}

// NotifyOutgoingRequestProcessingListeners notifies all listeners that a requestor cancelled a response
func (bsl *OutgoingRequestProcessingListeners) NotifyOutgoingRequestProcessingListeners(p peer.ID, request graphsync.RequestData) {
_ = bsl.pubSub.Publish(internalOutgoingRequestProcessingEvent{p, request})
func (bsl *OutgoingRequestProcessingListeners) NotifyOutgoingRequestProcessingListeners(p peer.ID, request graphsync.RequestData, inProgressRequestCount int) {
_ = bsl.pubSub.Publish(internalOutgoingRequestProcessingEvent{p, request, inProgressRequestCount})
}

// BlockSentListeners is a set of listeners for when requestors cancel
Expand Down
16 changes: 9 additions & 7 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,9 @@ func TestOutgoingRequestHooks(t *testing.T) {
}

type outgoingRequestProcessingEvent struct {
p peer.ID
request graphsync.RequestData
p peer.ID
request graphsync.RequestData
inProgressRequestCount int
}

func TestOutgoingRequestListeners(t *testing.T) {
Expand All @@ -767,8 +768,8 @@ func TestOutgoingRequestListeners(t *testing.T) {

// Listen for outgoing request starts
outgoingRequests := make(chan outgoingRequestProcessingEvent, 1)
td.outgoingRequestProcessingListeners.Register(func(p peer.ID, request graphsync.RequestData) {
outgoingRequests <- outgoingRequestProcessingEvent{p, request}
td.outgoingRequestProcessingListeners.Register(func(p peer.ID, request graphsync.RequestData, inProgressRequestCount int) {
outgoingRequests <- outgoingRequestProcessingEvent{p, request, inProgressRequestCount}
})

returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)
Expand All @@ -778,9 +779,10 @@ func TestOutgoingRequestListeners(t *testing.T) {
// Should have fired by now
select {
case or := <-outgoingRequests:
require.Equal(t, or.p, peers[0])
require.Equal(t, or.request.Selector(), td.blockChain.Selector())
require.Equal(t, cidlink.Link{Cid: or.request.Root()}, td.blockChain.TipLink)
require.Equal(t, peers[0], or.p)
require.Equal(t, td.blockChain.Selector(), or.request.Selector())
require.Equal(t, td.blockChain.TipLink, cidlink.Link{Cid: or.request.Root()})
require.Equal(t, 1, or.inProgressRequestCount)
default:
t.Fatal("should fire outgoing request listener")
}
Expand Down
3 changes: 2 additions & 1 deletion requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
Budget: budget,
}.Start(ctx)

rm.outgoingRequestProcessingListeners.NotifyOutgoingRequestProcessingListeners(ipr.p, ipr.request)
inProgressCount := len(rm.inProgressRequestStatuses)
rm.outgoingRequestProcessingListeners.NotifyOutgoingRequestProcessingListeners(ipr.p, ipr.request, inProgressCount)
}

ipr.state = running
Expand Down

0 comments on commit c96b3b2

Please sign in to comment.