Skip to content

Commit

Permalink
Fix the map of pendingReqs concurrent issue (apache#48)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Fix the map of pendingReqs concurrent issue.
  • Loading branch information
wolfstudy authored and jiazhai committed Aug 9, 2019
1 parent 7698eb9 commit a7c1a04
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ type connection struct {

incomingRequests chan *request
writeRequests chan []byte
pendingReqs map[uint64]*request
listeners map[uint64]ConnectionListener
connWrapper *ConnWrapper

mapMutex sync.RWMutex
pendingReqs map[uint64]*request
listeners map[uint64]ConnectionListener
connWrapper *ConnWrapper

tlsOptions *TLSOptions
auth auth.Provider
Expand Down Expand Up @@ -255,7 +257,9 @@ func (c *connection) run() {
if req == nil {
return
}
c.mapMutex.Lock()
c.pendingReqs[req.id] = req
c.mapMutex.Unlock()
c.writeCommand(req.cmd)

case data := <-c.writeRequests:
Expand Down Expand Up @@ -369,12 +373,16 @@ func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback
}

func (c *connection) internalSendRequest(req *request) {
c.mapMutex.Lock()
c.pendingReqs[req.id] = req
c.mapMutex.Unlock()
c.writeCommand(req.cmd)
}

func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
c.mapMutex.RLock()
request, ok := c.pendingReqs[requestID]
c.mapMutex.RUnlock()
if !ok {
c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type)
return
Expand Down

0 comments on commit a7c1a04

Please sign in to comment.