Skip to content

Commit

Permalink
Clean callbacks of connection after run loop stopped (apache#239) (ap…
Browse files Browse the repository at this point in the history
…ache#248)

Fixes apache#239

### Motivation

As @wolfstudy pointed out here apache/pulsar-client-go#239 (comment)

There is a race on callbacks of `pendingReqs` when closing the connection while the run loop is still running, which will lead to calling a callback up to 2 times:
https://github.com/apache/pulsar-client-go/blob/e7f1673350f208b5063823282d14906d70d66904/pulsar/internal/connection.go#L669-L671

### Modifications

Introducing a `runLoopStoppedCh` to make sure that the run loop has already stopped when cleaning callbacks of `pendingReqs` in the `Close()`
  • Loading branch information
rueian authored Aug 17, 2020
1 parent 3b2d278 commit 5d57012
Showing 1 changed file with 21 additions and 21 deletions.
42 changes: 21 additions & 21 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type connection struct {
cond *sync.Cond
state int32
connectionTimeout time.Duration
closeOnce sync.Once

logicalAddr *url.URL
physicalAddr *url.URL
Expand Down Expand Up @@ -352,10 +353,19 @@ func (c *connection) run() {
go c.reader.readFromConnection()
go c.runPingCheck()

defer func() {
// all the accesses to the pendingReqs should be happened in this run loop thread,
// including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239
for id, req := range c.pendingReqs {
req.callback(nil, errors.New("connection closed"))
delete(c.pendingReqs, id)
}
c.Close()
}()

for {
select {
case <-c.closeCh:
c.Close()
return

case req := <-c.incomingRequestsCh:
Expand Down Expand Up @@ -431,7 +441,7 @@ func (c *connection) internalWriteData(data Buffer) {
c.log.Debug("Write data: ", data.ReadableBytes())
if _, err := c.cnx.Write(data.ReadableSlice()); err != nil {
c.log.WithError(err).Warn("Failed to write on connection")
c.Close()
c.TriggerClose()
}
}

Expand Down Expand Up @@ -520,7 +530,7 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl

default:
c.log.Errorf("Received invalid command type: %s", cmd.Type)
c.Close()
c.TriggerClose()
}
}

Expand Down Expand Up @@ -638,7 +648,7 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
authData, err := c.auth.GetData()
if err != nil {
c.log.WithError(err).Warn("Failed to load auth credentials")
c.Close()
c.TriggerClose()
return
}

Expand Down Expand Up @@ -700,18 +710,14 @@ func (c *connection) UnregisterListener(id uint64) {
// Triggers the connection close by forcing the socket to close and
// broadcasting the notification on the close channel
func (c *connection) TriggerClose() {
cnx := c.cnx
if cnx != nil {
cnx.Close()
}
c.closeOnce.Do(func() {
cnx := c.cnx
if cnx != nil {
cnx.Close()
}

select {
case <-c.closeCh:
return
default:
close(c.closeCh)
}

})
}

func (c *connection) Close() {
Expand All @@ -726,20 +732,14 @@ func (c *connection) Close() {

c.log.Info("Connection closed")
c.state = connectionClosed
if c.cnx != nil {
c.cnx.Close()
}
c.TriggerClose()
c.pingTicker.Stop()
c.pingCheckTicker.Stop()

for _, listener := range c.listeners {
listener.ConnectionClosed()
}

for _, req := range c.pendingReqs {
req.callback(nil, errors.New("connection closed"))
}

consumerHandlers := make(map[uint64]ConsumerHandler)
c.consumerHandlersLock.RLock()
for id, handler := range c.consumerHandlers {
Expand Down

0 comments on commit 5d57012

Please sign in to comment.