From a7c1a0458bba622e3146688cd8d14f04fe669eeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=89=E5=B0=8F=E9=BE=99?= Date: Fri, 9 Aug 2019 18:31:08 +0800 Subject: [PATCH] Fix the map of pendingReqs concurrent issue (#48) Signed-off-by: xiaolong.ran Fix the map of pendingReqs concurrent issue. --- pulsar/internal/connection.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 11337ec41ce23..d83f30be86909 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -105,9 +105,11 @@ type connection struct { incomingRequests chan *request writeRequests chan []byte - pendingReqs map[uint64]*request - listeners map[uint64]ConnectionListener - connWrapper *ConnWrapper + + mapMutex sync.RWMutex + pendingReqs map[uint64]*request + listeners map[uint64]ConnectionListener + connWrapper *ConnWrapper tlsOptions *TLSOptions auth auth.Provider @@ -255,7 +257,9 @@ func (c *connection) run() { if req == nil { return } + c.mapMutex.Lock() c.pendingReqs[req.id] = req + c.mapMutex.Unlock() c.writeCommand(req.cmd) case data := <-c.writeRequests: @@ -369,12 +373,16 @@ func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback } func (c *connection) internalSendRequest(req *request) { + c.mapMutex.Lock() c.pendingReqs[req.id] = req + c.mapMutex.Unlock() c.writeCommand(req.cmd) } func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) { + c.mapMutex.RLock() request, ok := c.pendingReqs[requestID] + c.mapMutex.RUnlock() if !ok { c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type) return