From 215023a2091c0f4425906e5964d84ea4e7cbb65f Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Nov 2019 14:32:03 -0800 Subject: [PATCH] Fixed RpcClient.RequestOnCnxNoWait() to not require a request id (#95) --- pulsar/consumer_partition.go | 13 ++++-------- pulsar/internal/commands.go | 4 ---- pulsar/internal/commands_test.go | 11 +++------- pulsar/internal/connection.go | 29 ++++++++++++++++---------- pulsar/internal/lookup_service_test.go | 4 +--- pulsar/internal/rpc_client.go | 18 +++------------- 6 files changed, 29 insertions(+), 50 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index a8d9b6cf47116..67c04e5116622 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -174,8 +174,7 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) { } } - requestID := internal.RequestIDNoResponse - pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, + pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{ ConsumerId: proto.Uint64(pc.consumerID), MessageIds: msgIdDataList, @@ -203,14 +202,14 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) { LedgerId: proto.Uint64(uint64(msgId.ledgerID)), EntryId: proto.Uint64(uint64(msgId.entryID)), } - requestID := internal.RequestIDNoResponse + cmdAck := &pb.CommandAck{ ConsumerId: proto.Uint64(pc.consumerID), MessageId: messageIDs, AckType: pb.CommandAck_Individual.Enum(), } - pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_ACK, cmdAck) + pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, pb.BaseCommand_ACK, cmdAck) } func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error { @@ -281,15 +280,11 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error { return fmt.Errorf("invalid number of permits requested: %d", permits) } - requestID := internal.RequestIDNoResponse cmdFlow := &pb.CommandFlow{ ConsumerId: proto.Uint64(pc.consumerID), MessagePermits: proto.Uint32(permits), } - _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_FLOW, cmdFlow) - if err != nil { - return err - } + pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, pb.BaseCommand_FLOW, cmdFlow) return nil } diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index 4e691f0260833..09d223fa76b46 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -68,7 +68,6 @@ type MessageReader struct { batched bool } - // ReadChecksum func (r *MessageReader) readChecksum() (uint32, error) { if r.buffer.ReadableBytes() < 6 { @@ -83,7 +82,6 @@ func (r *MessageReader) readChecksum() (uint32, error) { return checksum, nil } - func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) { // Wire format // [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] @@ -145,7 +143,6 @@ func (r *MessageReader) readSingleMessage() (*pb.SingleMessageMetadata, []byte, return &meta, r.buffer.Read(uint32(meta.GetPayloadSize())), nil } - func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand { cmd := &pb.BaseCommand{ Type: &cmdType, @@ -246,7 +243,6 @@ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageM wb.PutUint32(checksum, checksumIdx) } - // ConvertFromStringMap convert a string map to a KeyValue []byte func ConvertFromStringMap(m map[string]string) []*pb.KeyValue { list := make([]*pb.KeyValue, len(m)) diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go index 5c2e19ad60ca6..b43335a57301e 100644 --- a/pulsar/internal/commands_test.go +++ b/pulsar/internal/commands_test.go @@ -38,7 +38,6 @@ func TestConvertStringMap(t *testing.T) { assert.Equal(t, "2", m2["b"]) } - func TestReadMessageMetadata(t *testing.T) { // read old style message (not batched) reader := NewMessageReaderFromArray(rawCompatSingleMessage) @@ -71,7 +70,6 @@ func TestReadMessageMetadata(t *testing.T) { assert.Equal(t, 10, int(meta.GetNumMessagesInBatch())) } - func TestReadMessageOldFormat(t *testing.T) { reader := NewMessageReaderFromArray(rawCompatSingleMessage) _, err := reader.ReadMessageMetadata() @@ -87,11 +85,10 @@ func TestReadMessageOldFormat(t *testing.T) { assert.Equal(t, true, ssm == nil) assert.Equal(t, "hello", string(payload)) - _ , _, err = reader.ReadMessage() + _, _, err = reader.ReadMessage() assert.Equal(t, ErrEOM, err) } - func TestReadMessagesBatchSize1(t *testing.T) { reader := NewMessageReaderFromArray(rawBatchMessage1) meta, err := reader.ReadMessageMetadata() @@ -109,11 +106,10 @@ func TestReadMessagesBatchSize1(t *testing.T) { assert.Equal(t, "hello", string(payload)) } - _ , _, err = reader.ReadMessage() + _, _, err = reader.ReadMessage() assert.Equal(t, ErrEOM, err) } - func TestReadMessagesBatchSize10(t *testing.T) { reader := NewMessageReaderFromArray(rawBatchMessage10) meta, err := reader.ReadMessageMetadata() @@ -131,11 +127,10 @@ func TestReadMessagesBatchSize10(t *testing.T) { assert.Equal(t, "hello", string(payload)) } - _ , _, err = reader.ReadMessage() + _, _, err = reader.ReadMessage() assert.Equal(t, ErrEOM, err) } - // Raw single message in old format // metadata properties: properties: // payload = "hello" diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index b1d4faa18654e..ba4147acd6329 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -56,6 +56,7 @@ type ConnectionListener interface { // Connection is a interface of client cnx. type Connection interface { SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error)) + SendRequestNoWait(req *pb.BaseCommand) WriteData(data []byte) RegisterListener(id uint64, listener ConnectionListener) UnregisterListener(id uint64) @@ -101,7 +102,7 @@ func (s connectionState) String() string { const keepAliveInterval = 30 * time.Second type request struct { - id uint64 + id *uint64 cmd *pb.BaseCommand callback func(command *pb.BaseCommand, err error) } @@ -133,7 +134,7 @@ type connection struct { requestIDGenerator uint64 incomingRequestsCh chan *request - incomingCmdCh chan *incomingCmd + incomingCmdCh chan *incomingCmd writeRequestsCh chan []byte pendingReqs map[uint64]*request @@ -285,13 +286,9 @@ func (c *connection) run() { if req == nil { return } - // does this request expect a response? - if req.id != RequestIDNoResponse { - c.pendingReqs[req.id] = req - } - c.writeCommand(req.cmd) + c.internalSendRequest(req) - case cmd := <- c.incomingCmdCh: + case cmd := <-c.incomingCmdCh: c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload) case data := <-c.writeRequestsCh: @@ -412,14 +409,24 @@ func (c *connection) Write(data []byte) { func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand, err error)) { c.incomingRequestsCh <- &request{ - id: requestID, + id: &requestID, cmd: req, callback: callback, } } +func (c *connection) SendRequestNoWait(req *pb.BaseCommand) { + c.incomingRequestsCh <- &request{ + id: nil, + cmd: req, + callback: nil, + } +} + func (c *connection) internalSendRequest(req *request) { - c.pendingReqs[req.id] = req + if req.id != nil { + c.pendingReqs[*req.id] = req + } c.writeCommand(req.cmd) } @@ -477,7 +484,7 @@ func (c *connection) lastDataReceived() time.Time { c.lastDataReceivedLock.Lock() defer c.lastDataReceivedLock.Unlock() t := c.lastDataReceivedTime - return t; + return t } func (c *connection) setLastDataReceived(t time.Time) { diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go index 0c548bc79523a..576745e6966a8 100644 --- a/pulsar/internal/lookup_service_test.go +++ b/pulsar/internal/lookup_service_test.go @@ -95,10 +95,8 @@ func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType return nil, nil } -func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, - message proto.Message) (*RPCResult, error) { +func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) { assert.Fail(c.t, "Shouldn't be called") - return nil, nil } func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupTopicResponse_LookupType { diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index 6a01fe0a2cd91..7e5bae30322b7 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -31,9 +31,6 @@ type RPCResult struct { Cnx Connection } -// RequestID for a request when there is no expected response -const RequestIDNoResponse = uint64(0) - type RPCClient interface { // Create a new unique request id NewRequestID() uint64 @@ -48,7 +45,7 @@ type RPCClient interface { Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) - RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) + RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) } @@ -120,17 +117,8 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.Ba return rpcResult, rpcErr } -func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, - message proto.Message) (*RPCResult, error) { - rpcResult := &RPCResult{ - Cnx: cnx, - } - - cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) { - rpcResult.Response = response - }) - - return rpcResult, nil +func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) { + cnx.SendRequestNoWait(baseCommand(cmdType, message)) } func (c *rpcClient) NewRequestID() uint64 {