diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 11337ec41ce23..d83f30be86909 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -105,9 +105,11 @@ type connection struct { incomingRequests chan *request writeRequests chan []byte - pendingReqs map[uint64]*request - listeners map[uint64]ConnectionListener - connWrapper *ConnWrapper + + mapMutex sync.RWMutex + pendingReqs map[uint64]*request + listeners map[uint64]ConnectionListener + connWrapper *ConnWrapper tlsOptions *TLSOptions auth auth.Provider @@ -255,7 +257,9 @@ func (c *connection) run() { if req == nil { return } + c.mapMutex.Lock() c.pendingReqs[req.id] = req + c.mapMutex.Unlock() c.writeCommand(req.cmd) case data := <-c.writeRequests: @@ -369,12 +373,16 @@ func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback } func (c *connection) internalSendRequest(req *request) { + c.mapMutex.Lock() c.pendingReqs[req.id] = req + c.mapMutex.Unlock() c.writeCommand(req.cmd) } func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) { + c.mapMutex.RLock() request, ok := c.pendingReqs[requestID] + c.mapMutex.RUnlock() if !ok { c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type) return