Skip to content

Commit

Permalink
Fixed locking between connection_reader and connection (apache#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Nov 9, 2019
1 parent 94184ea commit 3378790
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 19 deletions.
6 changes: 2 additions & 4 deletions pulsar/impl_partition_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,11 +600,9 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error {
return nil
}

func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error {
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
pbMsgID := response.GetMessageId()

reader := internal.NewMessageReader(headersAndPayload)

reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
msgMeta, err := reader.ReadMessageMetadata()
if err != nil {
// TODO send discardCorruptedMessage
Expand Down
29 changes: 17 additions & 12 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Connection interface {
}

type ConsumerHandler interface {
MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error
MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer) error

// ConnectionClosed close the TCP connection.
ConnectionClosed()
Expand Down Expand Up @@ -107,6 +107,11 @@ type request struct {
callback func(command *pb.BaseCommand, err error)
}

type incomingCmd struct {
cmd *pb.BaseCommand
headersAndPayload Buffer
}

type connection struct {
sync.Mutex
cond *sync.Cond
Expand All @@ -129,9 +134,9 @@ type connection struct {
requestIDGenerator uint64

incomingRequestsCh chan *request
incomingCmdCh chan *incomingCmd
writeRequestsCh chan []byte

mapMutex sync.RWMutex
pendingReqs map[uint64]*request
listeners map[uint64]ConnectionListener

Expand All @@ -156,6 +161,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO
auth: auth,

incomingRequestsCh: make(chan *request),
incomingCmdCh: make(chan *incomingCmd),
writeRequestsCh: make(chan []byte),
listeners: make(map[uint64]ConnectionListener),
consumerHandlers: make(map[uint64]ConsumerHandler),
Expand Down Expand Up @@ -280,11 +286,12 @@ func (c *connection) run() {
if req == nil {
return
}
c.mapMutex.Lock()
c.pendingReqs[req.id] = req
c.mapMutex.Unlock()
c.writeCommand(req.cmd)

case cmd := <- c.incomingCmdCh:
c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)

case data := <-c.writeRequestsCh:
if data == nil {
return
Expand Down Expand Up @@ -331,7 +338,11 @@ func (c *connection) writeCommand(cmd proto.Message) {
c.internalWriteData(data)
}

func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) {
func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
c.incomingCmdCh <- &incomingCmd{cmd, headersAndPayload}
}

func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload)
c.setLastDataReceived(time.Now())
var err error
Expand Down Expand Up @@ -406,28 +417,23 @@ func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback
}

func (c *connection) internalSendRequest(req *request) {
c.mapMutex.Lock()
c.pendingReqs[req.id] = req
c.mapMutex.Unlock()
c.writeCommand(req.cmd)
}

func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
c.mapMutex.RLock()
request, ok := c.pendingReqs[requestID]
if !ok {
c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type)
return
}

delete(c.pendingReqs, requestID)
c.mapMutex.RUnlock()
request.callback(response, nil)
}

func (c *connection) handleResponseError(serverError *pb.CommandError) {
requestID := serverError.GetRequestId()
c.mapMutex.RLock()
request, ok := c.pendingReqs[requestID]
if !ok {
c.log.Warnf("Received unexpected error response for request %d of type %s",
Expand All @@ -436,7 +442,6 @@ func (c *connection) handleResponseError(serverError *pb.CommandError) {
}

delete(c.pendingReqs, requestID)
c.mapMutex.RUnlock()

request.callback(nil,
errors.New(fmt.Sprintf("server error: %s: %s", serverError.GetError(), serverError.GetMessage())))
Expand All @@ -451,7 +456,7 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
}
}

func (c *connection) handleMessage(response *pb.CommandMessage, payload []byte) error {
func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer) error {
c.log.Debug("Got Message: ", response)
consumerID := response.GetConsumerId()
if consumer, ok := c.consumerHandler(consumerID); ok {
Expand Down
6 changes: 3 additions & 3 deletions pulsar/internal/connection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *connectionReader) readFromConnection() {
}
}

func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload []byte, err error) {
func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload Buffer, err error) {
// First, we need to read the frame size
if r.buffer.ReadableBytes() < 4 {
if r.buffer.ReadableBytes() == 0 {
Expand Down Expand Up @@ -92,8 +92,8 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
// Also read the eventual payload
headersAndPayloadSize := frameSize - (cmdSize + 4)
if cmdSize+4 < frameSize {
headersAndPayload = make([]byte, headersAndPayloadSize)
copy(headersAndPayload, r.buffer.Read(headersAndPayloadSize))
headersAndPayload = NewBuffer(int(headersAndPayloadSize))
headersAndPayload.Write(r.buffer.Read(headersAndPayloadSize))
}
return cmd, headersAndPayload, nil
}
Expand Down

0 comments on commit 3378790

Please sign in to comment.