Skip to content

Commit

Permalink
Add error listener on receiver (#136)
Browse files Browse the repository at this point in the history
* add receiver error listener (code complete)

* code complete, test passing, not sure if it works

* fix(graphsync): fix receive err test

* Update graphsync.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Update graphsync.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Update listeners/listeners.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

Co-authored-by: acruikshank <acruikshank@example.com>
Co-authored-by: dirkmc <dirkmdev@gmail.com>
  • Loading branch information
3 people authored Dec 22, 2020
1 parent 43d46f0 commit ae3cd24
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 11 deletions.
2 changes: 1 addition & 1 deletion benchmarks/testnet/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
lam.f(ctx, p, incoming)
}

func (lam *lambdaImpl) ReceiveError(err error) {
func (lam *lambdaImpl) ReceiveError(_ peer.ID, _ error) {
// TODO log error
}

Expand Down
6 changes: 6 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ type OnBlockSentListener func(p peer.ID, request RequestData, block BlockData)
// OnNetworkErrorListener runs when queued data is not able to be sent
type OnNetworkErrorListener func(p peer.ID, request RequestData, err error)

// OnReceiverNetworkErrorListener runs when errors occur receiving data over the wire
type OnReceiverNetworkErrorListener func(p peer.ID, err error)

// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)

Expand Down Expand Up @@ -340,6 +343,9 @@ type GraphExchange interface {
// RegisterNetworkErrorListener adds a listener for when errors occur sending data over the wire
RegisterNetworkErrorListener(listener OnNetworkErrorListener) UnregisterHookFunc

// RegisterReceiverNetworkErrorListener adds a listener for when errors occur receiving data over the wire
RegisterReceiverNetworkErrorListener(listener OnReceiverNetworkErrorListener) UnregisterHookFunc

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
UnpauseRequest(RequestID, ...ExtensionData) error
Expand Down
15 changes: 11 additions & 4 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type GraphSync struct {
requestorCancelledListeners *listeners.RequestorCancelledListeners
blockSentListeners *listeners.BlockSentListeners
networkErrorListeners *listeners.NetworkErrorListeners
receiverErrorListeners *listeners.NetworkReceiverErrorListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
incomingBlockHooks *requestorhooks.IncomingBlockHooks
Expand Down Expand Up @@ -115,6 +116,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
outgoingRequestHooks := requestorhooks.NewRequestHooks()
incomingBlockHooks := requestorhooks.NewBlockHooks()
networkErrorListeners := listeners.NewNetworkErrorListeners()
receiverErrorListeners := listeners.NewReceiverNetworkErrorListeners()
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
peerTaskQueue := peertaskqueue.New()

Expand All @@ -141,6 +143,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestorCancelledListeners: requestorCancelledListeners,
blockSentListeners: blockSentListeners,
networkErrorListeners: networkErrorListeners,
receiverErrorListeners: receiverErrorListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
Expand Down Expand Up @@ -251,6 +254,11 @@ func (gs *GraphSync) RegisterNetworkErrorListener(listener graphsync.OnNetworkEr
return gs.networkErrorListeners.Register(listener)
}

// RegisterReceiverNetworkErrorListener adds a listener for when errors occur receiving data over the wire
func (gs *GraphSync) RegisterReceiverNetworkErrorListener(listener graphsync.OnReceiverNetworkErrorListener) graphsync.UnregisterHookFunc {
return gs.receiverErrorListeners.Register(listener)
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (gs *GraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
Expand Down Expand Up @@ -295,10 +303,9 @@ func (gsr *graphSyncReceiver) ReceiveMessage(

// ReceiveError is part of the network's Receiver interface and handles incoming
// errors from the network.
func (gsr *graphSyncReceiver) ReceiveError(err error) {
log.Infof("Graphsync ReceiveError: %s", err)
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
func (gsr *graphSyncReceiver) ReceiveError(p peer.ID, err error) {
log.Infof("Graphsync ReceiveError from %s: %s", p, err)
gsr.receiverErrorListeners.NotifyNetworkErrorListeners(p, err)
}

// Connected is part of the networks 's Receiver interface and handles peers connecting
Expand Down
11 changes: 10 additions & 1 deletion impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,13 @@ func TestNetworkDisconnect(t *testing.T) {
default:
}
})
receiverError := make(chan error, 1)
requestor.RegisterReceiverNetworkErrorListener(func(p peer.ID, err error) {
select {
case receiverError <- err:
default:
}
})
requestCtx, requestCancel := context.WithTimeout(ctx, 1*time.Second)
defer requestCancel()
progressChan, errChan := requestor.Request(requestCtx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)
Expand All @@ -609,6 +616,7 @@ func TestNetworkDisconnect(t *testing.T) {
testutil.AssertReceive(ctx, t, networkError, &err, "should receive network error")
testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error")
require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error())
testutil.AssertReceive(ctx, t, receiverError, &err, "should receive an error on receiver side")
}

func TestConnectFail(t *testing.T) {
Expand Down Expand Up @@ -1131,7 +1139,8 @@ func (r *receiver) ReceiveMessage(
}
}

func (r *receiver) ReceiveError(err error) {
func (r *receiver) ReceiveError(_ peer.ID, err error) {
fmt.Println("got receive err")
}

func (r *receiver) Connected(p peer.ID) {
Expand Down
32 changes: 32 additions & 0 deletions listeners/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,35 @@ func (nel *NetworkErrorListeners) Register(listener graphsync.OnNetworkErrorList
func (nel *NetworkErrorListeners) NotifyNetworkErrorListeners(p peer.ID, request graphsync.RequestData, err error) {
_ = nel.pubSub.Publish(internalNetworkErrorEvent{p, request, err})
}

// NetworkReceiverErrorListeners is a set of listeners for network errors on the receiving side
type NetworkReceiverErrorListeners struct {
pubSub *pubsub.PubSub
}

type receiverNetworkErrorEvent struct {
p peer.ID
err error
}

func receiverNetworkErrorDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(receiverNetworkErrorEvent)
listener := subscriberFn.(graphsync.OnReceiverNetworkErrorListener)
listener(ie.p, ie.err)
return nil
}

// NewReceiverNetworkErrorListeners returns a new list of listeners for receiving errors
func NewReceiverNetworkErrorListeners() *NetworkReceiverErrorListeners {
return &NetworkReceiverErrorListeners{pubSub: pubsub.New(receiverNetworkErrorDispatcher)}
}

// Register registers an listener for completed responses
func (nel *NetworkReceiverErrorListeners) Register(listener graphsync.OnReceiverNetworkErrorListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(nel.pubSub.Subscribe(listener))
}

// NotifyReceiverNetworkErrorListeners notifies all listeners that a receive connection failed
func (nel *NetworkReceiverErrorListeners) NotifyNetworkErrorListeners(p peer.ID, err error) {
_ = nel.pubSub.Publish(receiverNetworkErrorEvent{p, err})
}
4 changes: 2 additions & 2 deletions network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type GraphSyncNetwork interface {
peer.ID,
gsmsg.GraphSyncMessage) error

// SetDelegate registers the Reciver to handle messages received from the
// SetDelegate registers the Receiver to handle messages received from the
// network.
SetDelegate(Receiver)

Expand All @@ -47,7 +47,7 @@ type Receiver interface {
sender peer.ID,
incoming gsmsg.GraphSyncMessage)

ReceiveError(error)
ReceiveError(p peer.ID, err error)

Connected(p peer.ID)
Disconnected(p peer.ID)
Expand Down
5 changes: 3 additions & 2 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,17 @@ func (gsnet *libp2pGraphSyncNetwork) handleNewStream(s network.Stream) {
reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
for {
received, err := gsmsg.FromMsgReader(reader)
p := s.Conn().RemotePeer()

if err != nil {
if err != io.EOF {
_ = s.Reset()
go gsnet.receiver.ReceiveError(err)
go gsnet.receiver.ReceiveError(p, err)
log.Debugf("graphsync net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
}
return
}

p := s.Conn().RemotePeer()
ctx := context.Background()
log.Debugf("graphsync net handleNewStream from %s", s.Conn().RemotePeer())
gsnet.receiver.ReceiveMessage(ctx, p, received)
Expand Down
2 changes: 1 addition & 1 deletion network/libp2p_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (r *receiver) ReceiveMessage(
}
}

func (r *receiver) ReceiveError(err error) {
func (r *receiver) ReceiveError(_ peer.ID, _ error) {
}

func (r *receiver) Connected(p peer.ID) {
Expand Down

0 comments on commit ae3cd24

Please sign in to comment.