Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean callbacks of connection after run loop stopped (#239) #248

Merged
merged 3 commits into from
Aug 17, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type connection struct {
cond *sync.Cond
state connectionState
connectionTimeout time.Duration
closeOnce sync.Once

logicalAddr *url.URL
physicalAddr *url.URL
Expand Down Expand Up @@ -309,10 +310,19 @@ func (c *connection) run() {
go c.reader.readFromConnection()
go c.runPingCheck()

defer func() {
// all the accesses to the pendingReqs should be happened in this run loop thread,
// including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239
for id, req := range c.pendingReqs {
req.callback(nil, errors.New("connection closed"))
delete(c.pendingReqs, id)
}
c.Close()
}()

for {
select {
case <-c.closeCh:
c.Close()
return

case req := <-c.incomingRequestsCh:
Expand Down Expand Up @@ -361,7 +371,7 @@ func (c *connection) internalWriteData(data []byte) {
c.log.Debug("Write data: ", len(data))
if _, err := c.cnx.Write(data); err != nil {
c.log.WithError(err).Warn("Failed to write on connection")
c.Close()
c.TriggerClose()
}
}

Expand Down Expand Up @@ -450,7 +460,7 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl

default:
c.log.Errorf("Received invalid command type: %s", cmd.Type)
c.Close()
c.TriggerClose()
}
}

Expand Down Expand Up @@ -568,7 +578,7 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
authData, err := c.auth.GetData()
if err != nil {
c.log.WithError(err).Warn("Failed to load auth credentials")
c.Close()
c.TriggerClose()
return
}

Expand Down Expand Up @@ -630,18 +640,14 @@ func (c *connection) UnregisterListener(id uint64) {
// Triggers the connection close by forcing the socket to close and
// broadcasting the notification on the close channel
func (c *connection) TriggerClose() {
cnx := c.cnx
if cnx != nil {
cnx.Close()
}
c.closeOnce.Do(func() {
cnx := c.cnx
if cnx != nil {
cnx.Close()
}

select {
case <-c.closeCh:
return
default:
close(c.closeCh)
}

})
}

func (c *connection) Close() {
Expand All @@ -656,20 +662,14 @@ func (c *connection) Close() {

c.log.Info("Connection closed")
c.state = connectionClosed
if c.cnx != nil {
c.cnx.Close()
}
c.TriggerClose()
Copy link
Contributor

@cckellogg cckellogg Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a client calls this won't it trigger another call to Close() if the run loop is still going. This would cause all the listeners to be triggered again. Are there potential issues with that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is wrapped with a mutex and a c.state == connectionClosed check.
I think another call to Close() will have no effect.

c.pingTicker.Stop()
c.pingCheckTicker.Stop()

for _, listener := range c.listeners {
listener.ConnectionClosed()
}

for _, req := range c.pendingReqs {
req.callback(nil, errors.New("connection closed"))
}

consumerHandlers := make(map[uint64]ConsumerHandler)
c.consumerHandlersLock.RLock()
for id, handler := range c.consumerHandlers {
Expand Down