Skip to content

Commit

Permalink
Fixed RpcClient.RequestOnCnxNoWait() to not require a request id (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Nov 12, 2019
1 parent 71bec03 commit 215023a
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 50 deletions.
13 changes: 4 additions & 9 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
}
}

requestID := internal.RequestIDNoResponse
pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID,
pc.client.rpcClient.RequestOnCnxNoWait(pc.conn,
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
ConsumerId: proto.Uint64(pc.consumerID),
MessageIds: msgIdDataList,
Expand Down Expand Up @@ -203,14 +202,14 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
LedgerId: proto.Uint64(uint64(msgId.ledgerID)),
EntryId: proto.Uint64(uint64(msgId.entryID)),
}
requestID := internal.RequestIDNoResponse

cmdAck := &pb.CommandAck{
ConsumerId: proto.Uint64(pc.consumerID),
MessageId: messageIDs,
AckType: pb.CommandAck_Individual.Enum(),
}

pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_ACK, cmdAck)
pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, pb.BaseCommand_ACK, cmdAck)
}

func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
Expand Down Expand Up @@ -281,15 +280,11 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error {
return fmt.Errorf("invalid number of permits requested: %d", permits)
}

requestID := internal.RequestIDNoResponse
cmdFlow := &pb.CommandFlow{
ConsumerId: proto.Uint64(pc.consumerID),
MessagePermits: proto.Uint32(permits),
}
_, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_FLOW, cmdFlow)
if err != nil {
return err
}
pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, pb.BaseCommand_FLOW, cmdFlow)

return nil
}
Expand Down
4 changes: 0 additions & 4 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type MessageReader struct {
batched bool
}


// ReadChecksum
func (r *MessageReader) readChecksum() (uint32, error) {
if r.buffer.ReadableBytes() < 6 {
Expand All @@ -83,7 +82,6 @@ func (r *MessageReader) readChecksum() (uint32, error) {
return checksum, nil
}


func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
// Wire format
// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA]
Expand Down Expand Up @@ -145,7 +143,6 @@ func (r *MessageReader) readSingleMessage() (*pb.SingleMessageMetadata, []byte,
return &meta, r.buffer.Read(uint32(meta.GetPayloadSize())), nil
}


func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
cmd := &pb.BaseCommand{
Type: &cmdType,
Expand Down Expand Up @@ -246,7 +243,6 @@ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageM
wb.PutUint32(checksum, checksumIdx)
}


// ConvertFromStringMap convert a string map to a KeyValue []byte
func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
list := make([]*pb.KeyValue, len(m))
Expand Down
11 changes: 3 additions & 8 deletions pulsar/internal/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func TestConvertStringMap(t *testing.T) {
assert.Equal(t, "2", m2["b"])
}


func TestReadMessageMetadata(t *testing.T) {
// read old style message (not batched)
reader := NewMessageReaderFromArray(rawCompatSingleMessage)
Expand Down Expand Up @@ -71,7 +70,6 @@ func TestReadMessageMetadata(t *testing.T) {
assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
}


func TestReadMessageOldFormat(t *testing.T) {
reader := NewMessageReaderFromArray(rawCompatSingleMessage)
_, err := reader.ReadMessageMetadata()
Expand All @@ -87,11 +85,10 @@ func TestReadMessageOldFormat(t *testing.T) {
assert.Equal(t, true, ssm == nil)
assert.Equal(t, "hello", string(payload))

_ , _, err = reader.ReadMessage()
_, _, err = reader.ReadMessage()
assert.Equal(t, ErrEOM, err)
}


func TestReadMessagesBatchSize1(t *testing.T) {
reader := NewMessageReaderFromArray(rawBatchMessage1)
meta, err := reader.ReadMessageMetadata()
Expand All @@ -109,11 +106,10 @@ func TestReadMessagesBatchSize1(t *testing.T) {
assert.Equal(t, "hello", string(payload))
}

_ , _, err = reader.ReadMessage()
_, _, err = reader.ReadMessage()
assert.Equal(t, ErrEOM, err)
}


func TestReadMessagesBatchSize10(t *testing.T) {
reader := NewMessageReaderFromArray(rawBatchMessage10)
meta, err := reader.ReadMessageMetadata()
Expand All @@ -131,11 +127,10 @@ func TestReadMessagesBatchSize10(t *testing.T) {
assert.Equal(t, "hello", string(payload))
}

_ , _, err = reader.ReadMessage()
_, _, err = reader.ReadMessage()
assert.Equal(t, ErrEOM, err)
}


// Raw single message in old format
// metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
// payload = "hello"
Expand Down
29 changes: 18 additions & 11 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ConnectionListener interface {
// Connection is a interface of client cnx.
type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
SendRequestNoWait(req *pb.BaseCommand)
WriteData(data []byte)
RegisterListener(id uint64, listener ConnectionListener)
UnregisterListener(id uint64)
Expand Down Expand Up @@ -101,7 +102,7 @@ func (s connectionState) String() string {
const keepAliveInterval = 30 * time.Second

type request struct {
id uint64
id *uint64
cmd *pb.BaseCommand
callback func(command *pb.BaseCommand, err error)
}
Expand Down Expand Up @@ -133,7 +134,7 @@ type connection struct {
requestIDGenerator uint64

incomingRequestsCh chan *request
incomingCmdCh chan *incomingCmd
incomingCmdCh chan *incomingCmd
writeRequestsCh chan []byte

pendingReqs map[uint64]*request
Expand Down Expand Up @@ -285,13 +286,9 @@ func (c *connection) run() {
if req == nil {
return
}
// does this request expect a response?
if req.id != RequestIDNoResponse {
c.pendingReqs[req.id] = req
}
c.writeCommand(req.cmd)
c.internalSendRequest(req)

case cmd := <- c.incomingCmdCh:
case cmd := <-c.incomingCmdCh:
c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)

case data := <-c.writeRequestsCh:
Expand Down Expand Up @@ -412,14 +409,24 @@ func (c *connection) Write(data []byte) {

func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand, err error)) {
c.incomingRequestsCh <- &request{
id: requestID,
id: &requestID,
cmd: req,
callback: callback,
}
}

func (c *connection) SendRequestNoWait(req *pb.BaseCommand) {
c.incomingRequestsCh <- &request{
id: nil,
cmd: req,
callback: nil,
}
}

func (c *connection) internalSendRequest(req *request) {
c.pendingReqs[req.id] = req
if req.id != nil {
c.pendingReqs[*req.id] = req
}
c.writeCommand(req.cmd)
}

Expand Down Expand Up @@ -477,7 +484,7 @@ func (c *connection) lastDataReceived() time.Time {
c.lastDataReceivedLock.Lock()
defer c.lastDataReceivedLock.Unlock()
t := c.lastDataReceivedTime
return t;
return t
}

func (c *connection) setLastDataReceived(t time.Time) {
Expand Down
4 changes: 1 addition & 3 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,8 @@ func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType
return nil, nil
}

func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) {
assert.Fail(c.t, "Shouldn't be called")
return nil, nil
}

func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupTopicResponse_LookupType {
Expand Down
18 changes: 3 additions & 15 deletions pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ type RPCResult struct {
Cnx Connection
}

// RequestID for a request when there is no expected response
const RequestIDNoResponse = uint64(0)

type RPCClient interface {
// Create a new unique request id
NewRequestID() uint64
Expand All @@ -48,7 +45,7 @@ type RPCClient interface {
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message)

RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
}
Expand Down Expand Up @@ -120,17 +117,8 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.Ba
return rpcResult, rpcErr
}

func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
rpcResult := &RPCResult{
Cnx: cnx,
}

cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
rpcResult.Response = response
})

return rpcResult, nil
func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) {
cnx.SendRequestNoWait(baseCommand(cmdType, message))
}

func (c *rpcClient) NewRequestID() uint64 {
Expand Down

0 comments on commit 215023a

Please sign in to comment.