diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 0e13380114..8de1ad5986 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -152,6 +152,7 @@ type connection struct { cond *sync.Cond state int32 connectionTimeout time.Duration + closeOnce sync.Once logicalAddr *url.URL physicalAddr *url.URL @@ -352,10 +353,19 @@ func (c *connection) run() { go c.reader.readFromConnection() go c.runPingCheck() + defer func() { + // all the accesses to the pendingReqs should be happened in this run loop thread, + // including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239 + for id, req := range c.pendingReqs { + req.callback(nil, errors.New("connection closed")) + delete(c.pendingReqs, id) + } + c.Close() + }() + for { select { case <-c.closeCh: - c.Close() return case req := <-c.incomingRequestsCh: @@ -431,7 +441,7 @@ func (c *connection) internalWriteData(data Buffer) { c.log.Debug("Write data: ", data.ReadableBytes()) if _, err := c.cnx.Write(data.ReadableSlice()); err != nil { c.log.WithError(err).Warn("Failed to write on connection") - c.Close() + c.TriggerClose() } } @@ -520,7 +530,7 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl default: c.log.Errorf("Received invalid command type: %s", cmd.Type) - c.Close() + c.TriggerClose() } } @@ -638,7 +648,7 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge) authData, err := c.auth.GetData() if err != nil { c.log.WithError(err).Warn("Failed to load auth credentials") - c.Close() + c.TriggerClose() return } @@ -700,18 +710,14 @@ func (c *connection) UnregisterListener(id uint64) { // Triggers the connection close by forcing the socket to close and // broadcasting the notification on the close channel func (c *connection) TriggerClose() { - cnx := c.cnx - if cnx != nil { - cnx.Close() - } + c.closeOnce.Do(func() { + cnx := c.cnx + if cnx != nil { + cnx.Close() + } - select { - case <-c.closeCh: - return - default: close(c.closeCh) - } - + }) } func (c *connection) Close() { @@ -726,9 +732,7 @@ func (c *connection) Close() { c.log.Info("Connection closed") c.state = connectionClosed - if c.cnx != nil { - c.cnx.Close() - } + c.TriggerClose() c.pingTicker.Stop() c.pingCheckTicker.Stop() @@ -736,10 +740,6 @@ func (c *connection) Close() { listener.ConnectionClosed() } - for _, req := range c.pendingReqs { - req.callback(nil, errors.New("connection closed")) - } - consumerHandlers := make(map[uint64]ConsumerHandler) c.consumerHandlersLock.RLock() for id, handler := range c.consumerHandlers {