From 9702affefad7219b55cc6fe9503310ce0a512a9a Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 14 May 2020 02:29:31 +0800 Subject: [PATCH 1/3] Call pending callbacks of connection after run loop stopped (#239) --- pulsar/internal/connection.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index d02dedad6b..6facde3c15 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -147,6 +147,7 @@ type connection struct { incomingRequestsCh chan *request incomingCmdCh chan *incomingCmd closeCh chan interface{} + runLoopStoppedCh chan interface{} writeRequestsCh chan []byte pendingReqs map[uint64]*request @@ -309,10 +310,15 @@ func (c *connection) run() { go c.reader.readFromConnection() go c.runPingCheck() + c.runLoopStoppedCh = make(chan interface{}) + defer func() { + close(c.runLoopStoppedCh) + c.Close() + }() + for { select { case <-c.closeCh: - c.Close() return case req := <-c.incomingRequestsCh: @@ -361,7 +367,7 @@ func (c *connection) internalWriteData(data []byte) { c.log.Debug("Write data: ", len(data)) if _, err := c.cnx.Write(data); err != nil { c.log.WithError(err).Warn("Failed to write on connection") - c.Close() + c.TriggerClose() } } @@ -450,7 +456,7 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl default: c.log.Errorf("Received invalid command type: %s", cmd.Type) - c.Close() + c.TriggerClose() } } @@ -568,7 +574,7 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge) authData, err := c.auth.GetData() if err != nil { c.log.WithError(err).Warn("Failed to load auth credentials") - c.Close() + c.TriggerClose() return } @@ -656,9 +662,7 @@ func (c *connection) Close() { c.log.Info("Connection closed") c.state = connectionClosed - if c.cnx != nil { - c.cnx.Close() - } + c.TriggerClose() c.pingTicker.Stop() c.pingCheckTicker.Stop() @@ -666,8 +670,12 @@ func (c *connection) Close() { listener.ConnectionClosed() } - for _, req := range c.pendingReqs { + if c.runLoopStoppedCh != nil { + <-c.runLoopStoppedCh + } + for id, req := range c.pendingReqs { req.callback(nil, errors.New("connection closed")) + delete(c.pendingReqs, id) } consumerHandlers := make(map[uint64]ConsumerHandler) From 682abb43b7adf1e6b830a44e6f043b6006f73624 Mon Sep 17 00:00:00 2001 From: Rueian Date: Sat, 16 May 2020 12:28:32 +0800 Subject: [PATCH 2/3] Fixed race condition in TriggerClose with sync.Once --- pulsar/internal/connection.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 6facde3c15..0c9c7c2843 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -126,6 +126,7 @@ type connection struct { cond *sync.Cond state connectionState connectionTimeout time.Duration + closeOnce sync.Once logicalAddr *url.URL physicalAddr *url.URL @@ -636,18 +637,14 @@ func (c *connection) UnregisterListener(id uint64) { // Triggers the connection close by forcing the socket to close and // broadcasting the notification on the close channel func (c *connection) TriggerClose() { - cnx := c.cnx - if cnx != nil { - cnx.Close() - } + c.closeOnce.Do(func() { + cnx := c.cnx + if cnx != nil { + cnx.Close() + } - select { - case <-c.closeCh: - return - default: close(c.closeCh) - } - + }) } func (c *connection) Close() { From 533441bd8d373face03b77c90b7fc11fd8ff8b03 Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 28 May 2020 00:36:59 +0800 Subject: [PATCH 3/3] Move cleanup of connection.pendingReqs to the end of the run() --- pulsar/internal/connection.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 0c9c7c2843..744099aec0 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -148,7 +148,6 @@ type connection struct { incomingRequestsCh chan *request incomingCmdCh chan *incomingCmd closeCh chan interface{} - runLoopStoppedCh chan interface{} writeRequestsCh chan []byte pendingReqs map[uint64]*request @@ -311,9 +310,13 @@ func (c *connection) run() { go c.reader.readFromConnection() go c.runPingCheck() - c.runLoopStoppedCh = make(chan interface{}) defer func() { - close(c.runLoopStoppedCh) + // all the accesses to the pendingReqs should be happened in this run loop thread, + // including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239 + for id, req := range c.pendingReqs { + req.callback(nil, errors.New("connection closed")) + delete(c.pendingReqs, id) + } c.Close() }() @@ -667,14 +670,6 @@ func (c *connection) Close() { listener.ConnectionClosed() } - if c.runLoopStoppedCh != nil { - <-c.runLoopStoppedCh - } - for id, req := range c.pendingReqs { - req.callback(nil, errors.New("connection closed")) - delete(c.pendingReqs, id) - } - consumerHandlers := make(map[uint64]ConsumerHandler) c.consumerHandlersLock.RLock() for id, handler := range c.consumerHandlers {