Skip to content

Commit

Permalink
Fix deadlock on notifications (#242)
Browse files Browse the repository at this point in the history
* fix(notifications): fix deadlock on notifications

The notification queue is extremely vulnerable to deadlocks if one of
the subscribers takes a long time to process a notification. This change
makes calls to Subscribe and Publish more non-blocking by treating
internal commands as an unbuffered queue, rather than a blocking
channel.

We also cleanup the subscriber for the response manager. Ultimately, it
may make sense to move these subscribers directly into the message queue
and remove the notification system entirely. but in the meantime, it
makes sense for subscriber not to have access to private vars in the
repsonse manager

* fix(notifications): add logging
  • Loading branch information
hannahhoward authored Oct 7, 2021
1 parent a15abd7 commit 49f490d
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 25 deletions.
55 changes: 43 additions & 12 deletions notifications/publisher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package notifications

import "sync"
import (
"sync"

logging "github.com/ipfs/go-log/v2"
)

var log = logging.Logger("gs-notifications")

type operation int

Expand All @@ -21,16 +27,17 @@ type cmd struct {

// publisher is a publisher of events for
type publisher struct {
lk sync.RWMutex
closed chan struct{}
cmdChan chan cmd
lk sync.RWMutex
closed chan struct{}
cmds []cmd
cmdsLk *sync.Cond
}

// NewPublisher returns a new message event publisher
func NewPublisher() Publisher {
ps := &publisher{
cmdChan: make(chan cmd),
closed: make(chan struct{}),
cmdsLk: sync.NewCond(&sync.Mutex{}),
closed: make(chan struct{}),
}
return ps
}
Expand All @@ -49,7 +56,7 @@ func (ps *publisher) Publish(topic Topic, event Event) {
default:
}

ps.cmdChan <- cmd{op: pub, topics: []Topic{topic}, msg: event}
ps.queue(cmd{op: pub, topics: []Topic{topic}, msg: event})
}

// Shutdown shuts down all events and subscriptions
Expand All @@ -62,7 +69,7 @@ func (ps *publisher) Shutdown() {
default:
}
close(ps.closed)
ps.cmdChan <- cmd{op: shutdown}
ps.queue(cmd{op: shutdown})
}

func (ps *publisher) Close(id Topic) {
Expand All @@ -73,7 +80,7 @@ func (ps *publisher) Close(id Topic) {
return
default:
}
ps.cmdChan <- cmd{op: closeTopic, topics: []Topic{id}}
ps.queue(cmd{op: closeTopic, topics: []Topic{id}})
}

func (ps *publisher) Subscribe(topic Topic, sub Subscriber) bool {
Expand All @@ -86,7 +93,7 @@ func (ps *publisher) Subscribe(topic Topic, sub Subscriber) bool {
default:
}

ps.cmdChan <- cmd{op: subscribe, topics: []Topic{topic}, sub: sub}
ps.queue(cmd{op: subscribe, topics: []Topic{topic}, sub: sub})
return true
}

Expand All @@ -100,7 +107,7 @@ func (ps *publisher) Unsubscribe(sub Subscriber) bool {
default:
}

ps.cmdChan <- cmd{op: unsubAll, sub: sub}
ps.queue(cmd{op: unsubAll, sub: sub})
return true
}

Expand All @@ -111,7 +118,8 @@ func (ps *publisher) start() {
}

loop:
for cmd := range ps.cmdChan {
for {
cmd := ps.dequeue()
if cmd.topics == nil {
switch cmd.op {
case unsubAll:
Expand Down Expand Up @@ -202,3 +210,26 @@ func (reg *subscriberRegistry) remove(topic Topic, sub Subscriber) {

sub.OnClose(topic)
}

func (ps *publisher) queue(cmd cmd) {
ps.cmdsLk.L.Lock()
ps.cmds = append(ps.cmds, cmd)
cmdsLen := len(ps.cmds)
ps.cmdsLk.L.Unlock()
log.Debugw("added notification command", "cmd", cmd, "queue len", cmdsLen)
ps.cmdsLk.Signal()
}

func (ps *publisher) dequeue() cmd {
ps.cmdsLk.L.Lock()
for len(ps.cmds) == 0 {
ps.cmdsLk.Wait()
}

cmd := ps.cmds[0]
ps.cmds = ps.cmds[1:]
cmdsLen := len(ps.cmds)
ps.cmdsLk.L.Unlock()
log.Debugw("processing notification command", "cmd", cmd, "remaining in queue", cmdsLen)
return cmd
}
5 changes: 5 additions & 0 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) {
rm.send(&finishTaskRequest{task, err}, nil)
}

// CloseWithNetworkError closes a request due to a network error
func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.RequestID) {
rm.send(&errorRequestMessage{p, requestID, errNetworkError, make(chan error, 1)}, nil)
}

func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) {
select {
case <-rm.ctx.Done():
Expand Down
3 changes: 1 addition & 2 deletions responsemanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
sub := notifications.NewTopicDataSubscriber(&subscriber{
p: key.p,
request: request,
ctx: rm.ctx,
messages: rm.messages,
requestCloser: rm,
blockSentListeners: rm.blockSentListeners,
completedListeners: rm.completedListeners,
networkErrorListeners: rm.networkErrorListeners,
Expand Down
18 changes: 7 additions & 11 deletions responsemanager/subscriber.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package responsemanager

import (
"context"
"errors"

"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -15,11 +14,14 @@ import (

var errNetworkError = errors.New("network error")

type RequestCloser interface {
CloseWithNetworkError(p peer.ID, requestID graphsync.RequestID)
}

type subscriber struct {
p peer.ID
request gsmsg.GraphSyncRequest
ctx context.Context
messages chan responseManagerMessage
requestCloser RequestCloser
blockSentListeners BlockSentListeners
networkErrorListeners NetworkErrorListeners
completedListeners CompletedListeners
Expand All @@ -36,10 +38,7 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event
switch responseEvent.Name {
case messagequeue.Error:
s.networkErrorListeners.NotifyNetworkErrorListeners(s.p, s.request, responseEvent.Err)
select {
case s.messages <- &errorRequestMessage{s.p, s.request.ID(), errNetworkError, make(chan error, 1)}:
case <-s.ctx.Done():
}
s.requestCloser.CloseWithNetworkError(s.p, s.request.ID())
case messagequeue.Sent:
s.blockSentListeners.NotifyBlockSentListeners(s.p, s.request, blockData)
}
Expand All @@ -51,10 +50,7 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event
switch responseEvent.Name {
case messagequeue.Error:
s.networkErrorListeners.NotifyNetworkErrorListeners(s.p, s.request, responseEvent.Err)
select {
case s.messages <- &errorRequestMessage{s.p, s.request.ID(), errNetworkError, make(chan error, 1)}:
case <-s.ctx.Done():
}
s.requestCloser.CloseWithNetworkError(s.p, s.request.ID())
case messagequeue.Sent:
s.completedListeners.NotifyCompletedListeners(s.p, s.request, status)
}
Expand Down

0 comments on commit 49f490d

Please sign in to comment.