From 3a27664b7e18fe9a8b152c5ef8bfb38b33a5a92f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 7 Oct 2021 12:11:32 -0700 Subject: [PATCH 1/2] fix(notifications): fix deadlock on notifications The notification queue is extremely vulnerable to deadlocks if one of the subscribers takes a long time to process a notification. This change makes calls to Subscribe and Publish more non-blocking by treating internal commands as an unbuffered queue, rather than a blocking channel. We also cleanup the subscriber for the response manager. Ultimately, it may make sense to move these subscribers directly into the message queue and remove the notification system entirely. but in the meantime, it makes sense for subscriber not to have access to private vars in the repsonse manager --- notifications/publisher.go | 43 ++++++++++++++++++++++++++--------- responsemanager/client.go | 5 ++++ responsemanager/server.go | 3 +-- responsemanager/subscriber.go | 18 ++++++--------- 4 files changed, 45 insertions(+), 24 deletions(-) diff --git a/notifications/publisher.go b/notifications/publisher.go index 8e4bff84..0f4b6e91 100644 --- a/notifications/publisher.go +++ b/notifications/publisher.go @@ -21,16 +21,17 @@ type cmd struct { // publisher is a publisher of events for type publisher struct { - lk sync.RWMutex - closed chan struct{} - cmdChan chan cmd + lk sync.RWMutex + closed chan struct{} + cmds []cmd + cmdsLk *sync.Cond } // NewPublisher returns a new message event publisher func NewPublisher() Publisher { ps := &publisher{ - cmdChan: make(chan cmd), - closed: make(chan struct{}), + cmdsLk: sync.NewCond(&sync.Mutex{}), + closed: make(chan struct{}), } return ps } @@ -49,7 +50,7 @@ func (ps *publisher) Publish(topic Topic, event Event) { default: } - ps.cmdChan <- cmd{op: pub, topics: []Topic{topic}, msg: event} + ps.queue(cmd{op: pub, topics: []Topic{topic}, msg: event}) } // Shutdown shuts down all events and subscriptions @@ -62,7 +63,7 @@ func (ps *publisher) Shutdown() { default: } close(ps.closed) - ps.cmdChan <- cmd{op: shutdown} + ps.queue(cmd{op: shutdown}) } func (ps *publisher) Close(id Topic) { @@ -73,7 +74,7 @@ func (ps *publisher) Close(id Topic) { return default: } - ps.cmdChan <- cmd{op: closeTopic, topics: []Topic{id}} + ps.queue(cmd{op: closeTopic, topics: []Topic{id}}) } func (ps *publisher) Subscribe(topic Topic, sub Subscriber) bool { @@ -86,7 +87,7 @@ func (ps *publisher) Subscribe(topic Topic, sub Subscriber) bool { default: } - ps.cmdChan <- cmd{op: subscribe, topics: []Topic{topic}, sub: sub} + ps.queue(cmd{op: subscribe, topics: []Topic{topic}, sub: sub}) return true } @@ -100,7 +101,7 @@ func (ps *publisher) Unsubscribe(sub Subscriber) bool { default: } - ps.cmdChan <- cmd{op: unsubAll, sub: sub} + ps.queue(cmd{op: unsubAll, sub: sub}) return true } @@ -111,7 +112,8 @@ func (ps *publisher) start() { } loop: - for cmd := range ps.cmdChan { + for { + cmd := ps.dequeue() if cmd.topics == nil { switch cmd.op { case unsubAll: @@ -202,3 +204,22 @@ func (reg *subscriberRegistry) remove(topic Topic, sub Subscriber) { sub.OnClose(topic) } + +func (ps *publisher) queue(cmd cmd) { + ps.cmdsLk.L.Lock() + ps.cmds = append(ps.cmds, cmd) + ps.cmdsLk.L.Unlock() + ps.cmdsLk.Signal() +} + +func (ps *publisher) dequeue() cmd { + ps.cmdsLk.L.Lock() + for len(ps.cmds) == 0 { + ps.cmdsLk.Wait() + } + + cmd := ps.cmds[0] + ps.cmds = ps.cmds[1:] + ps.cmdsLk.L.Unlock() + return cmd +} diff --git a/responsemanager/client.go b/responsemanager/client.go index 95172551..5212e2ad 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -280,6 +280,11 @@ func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) { rm.send(&finishTaskRequest{task, err}, nil) } +// CloseWithNetworkError closes a request due to a network error +func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.RequestID) { + rm.send(&errorRequestMessage{p, requestID, errNetworkError, make(chan error, 1)}, nil) +} + func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) { select { case <-rm.ctx.Done(): diff --git a/responsemanager/server.go b/responsemanager/server.go index c6d09ebf..b8a59118 100644 --- a/responsemanager/server.go +++ b/responsemanager/server.go @@ -158,8 +158,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync sub := notifications.NewTopicDataSubscriber(&subscriber{ p: key.p, request: request, - ctx: rm.ctx, - messages: rm.messages, + requestCloser: rm, blockSentListeners: rm.blockSentListeners, completedListeners: rm.completedListeners, networkErrorListeners: rm.networkErrorListeners, diff --git a/responsemanager/subscriber.go b/responsemanager/subscriber.go index 2afb7a2e..12960001 100644 --- a/responsemanager/subscriber.go +++ b/responsemanager/subscriber.go @@ -1,7 +1,6 @@ package responsemanager import ( - "context" "errors" "github.com/libp2p/go-libp2p-core/peer" @@ -15,11 +14,14 @@ import ( var errNetworkError = errors.New("network error") +type RequestCloser interface { + CloseWithNetworkError(p peer.ID, requestID graphsync.RequestID) +} + type subscriber struct { p peer.ID request gsmsg.GraphSyncRequest - ctx context.Context - messages chan responseManagerMessage + requestCloser RequestCloser blockSentListeners BlockSentListeners networkErrorListeners NetworkErrorListeners completedListeners CompletedListeners @@ -36,10 +38,7 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event switch responseEvent.Name { case messagequeue.Error: s.networkErrorListeners.NotifyNetworkErrorListeners(s.p, s.request, responseEvent.Err) - select { - case s.messages <- &errorRequestMessage{s.p, s.request.ID(), errNetworkError, make(chan error, 1)}: - case <-s.ctx.Done(): - } + s.requestCloser.CloseWithNetworkError(s.p, s.request.ID()) case messagequeue.Sent: s.blockSentListeners.NotifyBlockSentListeners(s.p, s.request, blockData) } @@ -51,10 +50,7 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event switch responseEvent.Name { case messagequeue.Error: s.networkErrorListeners.NotifyNetworkErrorListeners(s.p, s.request, responseEvent.Err) - select { - case s.messages <- &errorRequestMessage{s.p, s.request.ID(), errNetworkError, make(chan error, 1)}: - case <-s.ctx.Done(): - } + s.requestCloser.CloseWithNetworkError(s.p, s.request.ID()) case messagequeue.Sent: s.completedListeners.NotifyCompletedListeners(s.p, s.request, status) } From 8de141bb5405a4cd4cfb4b2eb52b40e80b268dc9 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 7 Oct 2021 14:15:53 -0700 Subject: [PATCH 2/2] fix(notifications): add logging --- notifications/publisher.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/notifications/publisher.go b/notifications/publisher.go index 0f4b6e91..1c43cd56 100644 --- a/notifications/publisher.go +++ b/notifications/publisher.go @@ -1,6 +1,12 @@ package notifications -import "sync" +import ( + "sync" + + logging "github.com/ipfs/go-log/v2" +) + +var log = logging.Logger("gs-notifications") type operation int @@ -208,7 +214,9 @@ func (reg *subscriberRegistry) remove(topic Topic, sub Subscriber) { func (ps *publisher) queue(cmd cmd) { ps.cmdsLk.L.Lock() ps.cmds = append(ps.cmds, cmd) + cmdsLen := len(ps.cmds) ps.cmdsLk.L.Unlock() + log.Debugw("added notification command", "cmd", cmd, "queue len", cmdsLen) ps.cmdsLk.Signal() } @@ -220,6 +228,8 @@ func (ps *publisher) dequeue() cmd { cmd := ps.cmds[0] ps.cmds = ps.cmds[1:] + cmdsLen := len(ps.cmds) ps.cmdsLk.L.Unlock() + log.Debugw("processing notification command", "cmd", cmd, "remaining in queue", cmdsLen) return cmd }