From ad16fa296bdd6aec3090f9f4b682ba3510face2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=89=E5=B0=8F=E9=BE=99?= Date: Sun, 2 Feb 2020 16:22:50 +0800 Subject: [PATCH] Fix concurrent map read and map write (#179) Signed-off-by: xiaolong.ran ### Modifications Fix concurrent map read and map write --- pulsar/internal/connection.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 600790962e99d..b31e70f34b111 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -509,6 +509,10 @@ func (c *connection) handleResponseError(serverError *pb.CommandError) { func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) { producerID := response.GetProducerId() + + c.Lock() + defer c.Unlock() + if producer, ok := c.listeners[producerID]; ok { producer.ReceivedSendReceipt(response) } else { @@ -582,6 +586,10 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge) func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) { c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId()) consumerID := closeConsumer.GetConsumerId() + + c.Lock() + defer c.Unlock() + if consumer, ok := c.consumerHandler(consumerID); ok { consumer.ConnectionClosed() delete(c.listeners, consumerID) @@ -594,6 +602,8 @@ func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer) c.log.Infof("Broker notification of Closed producer: %d", closeProducer.GetProducerId()) producerID := closeProducer.GetProducerId() + c.Lock() + defer c.Unlock() if producer, ok := c.listeners[producerID]; ok { producer.ConnectionClosed() delete(c.listeners, producerID)