Skip to content

Commit

Permalink
Fix concurrent map read and map write (apache#179)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolong.ran <rxl@apache.org>

### Modifications

Fix concurrent map read and map write
  • Loading branch information
wolfstudy authored Feb 2, 2020
1 parent 6943327 commit ad16fa2
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,10 @@ func (c *connection) handleResponseError(serverError *pb.CommandError) {

func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
producerID := response.GetProducerId()

c.Lock()
defer c.Unlock()

if producer, ok := c.listeners[producerID]; ok {
producer.ReceivedSendReceipt(response)
} else {
Expand Down Expand Up @@ -582,6 +586,10 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) {
c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId())
consumerID := closeConsumer.GetConsumerId()

c.Lock()
defer c.Unlock()

if consumer, ok := c.consumerHandler(consumerID); ok {
consumer.ConnectionClosed()
delete(c.listeners, consumerID)
Expand All @@ -594,6 +602,8 @@ func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer)
c.log.Infof("Broker notification of Closed producer: %d", closeProducer.GetProducerId())
producerID := closeProducer.GetProducerId()

c.Lock()
defer c.Unlock()
if producer, ok := c.listeners[producerID]; ok {
producer.ConnectionClosed()
delete(c.listeners, producerID)
Expand Down

0 comments on commit ad16fa2

Please sign in to comment.