Skip to content

Commit

Permalink
[issue:40]Fix producer send protocol error (apache#42)
Browse files Browse the repository at this point in the history
* [issue:40]Fix producer send protocol error

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>

* fix comments and check code format

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
  • Loading branch information
wolfstudy authored Aug 6, 2019
1 parent 61c5fe4 commit d049757
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 31 deletions.
13 changes: 11 additions & 2 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestProducerConsumer(t *testing.T) {
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Shared,
Type: Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()
Expand All @@ -67,6 +67,10 @@ func TestProducerConsumer(t *testing.T) {
for i := 0; i < 10; i++ {
if err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
}); err != nil {
log.Fatal(err)
}
Expand All @@ -80,7 +84,12 @@ func TestProducerConsumer(t *testing.T) {
}

expectMsg := fmt.Sprintf("hello-%d", i)
expectProperties := map[string]string{
"key-1": "pulsar-1",
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())

// ack message
if err := consumer.Ack(msg); err != nil {
Expand Down Expand Up @@ -213,7 +222,7 @@ func makeHTTPCall(t *testing.T, method string, urls string, body string) {
defer res.Body.Close()
}

func TestConsumerShared(t *testing.T) {
func TestConsumerKeyShared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
Expand Down
104 changes: 75 additions & 29 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
package internal

import (
`bytes`
`encoding/binary`
`fmt`
"github.com/golang/protobuf/proto"
`io`

"github.com/apache/pulsar-client-go/pkg/pb"
log "github.com/sirupsen/logrus"
"bytes"
"encoding/binary"
"fmt"
"github.com/golang/protobuf/proto"
"io"

"github.com/apache/pulsar-client-go/pkg/pb"
log "github.com/sirupsen/logrus"
)

const (
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
MaxFrameSize = 5 * 1024 * 1024
magicCrc32c uint16 = 0x0e01
MaxFrameSize = 5 * 1024 * 1024
magicCrc32c uint16 = 0x0e01
)

func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
Expand Down Expand Up @@ -82,6 +82,7 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [
log.WithError(err).Fatal("Protobuf serialization error")
}

wb.WriteUint32(uint32(len(serialized)))
wb.Write(serialized)
wb.Write(payload)
}
Expand Down Expand Up @@ -150,7 +151,7 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa
// guard against allocating large buffer
if metadataSize > MaxFrameSize {
return nil, nil, fmt.Errorf("frame metadata size (%d) "+
"cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize)
"cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize)
}

// Read protobuf encoded metadata
Expand All @@ -163,28 +164,30 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa
return nil, nil, err
}

batchLen := make([]byte, 2)
if _, err = io.ReadFull(lr, batchLen); err != nil {
return nil, nil, err
}

// Anything left in the frame is considered
// the payload and can be any sequence of bytes.
if lr.N > 0 {
// guard against allocating large buffer
if lr.N > MaxFrameSize {
return nil, nil, fmt.Errorf("frame payload size (%d) "+
"cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
}
payload = make([]byte, lr.N)
if _, err = io.ReadFull(lr, payload); err != nil {
return nil, nil, err
}
// Anything left in the frame is considered
// the payload and can be any sequence of bytes.
payloads := make([]byte, lr.N)
if _, err = io.ReadFull(lr, payloads); err != nil {
return nil, nil, err
}

numMsg := msgMeta.GetNumMessagesInBatch()

singleMessages, err := decodeBatchPayload(payloads, numMsg)
if err != nil {
return nil, nil, err
}

for _, singleMsg := range singleMessages {
payload = singleMsg.SinglePayload
msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
msgMeta.Properties = singleMsg.SingleMeta.Properties
msgMeta.EventTime = singleMsg.SingleMeta.EventTime
}

if computed := chksum.compute(); !bytes.Equal(computed, expectedChksum) {
return nil, nil, fmt.Errorf("checksum mismatch: computed (0x%X) does "+
"not match given checksum (0x%X)", computed, expectedChksum)
"not match given checksum (0x%X)", computed, expectedChksum)
}

return msgMeta, payload, nil
Expand Down Expand Up @@ -237,6 +240,49 @@ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageM
wb.PutUint32(checksum, checksumIdx)
}

// singleMessage represents one of the elements of the batch type payload
type singleMessage struct {
SingleMetaSize uint32
SingleMeta *pb.SingleMessageMetadata
SinglePayload []byte
}

// decodeBatchPayload parses the payload of the batch type
// If the producer uses the batch function, msg.Payload will be a singleMessage array structure.
func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
buf32 := make([]byte, 4)
rdBuf := bytes.NewReader(bp)
list := make([]*singleMessage, 0, batchNum)
for i := int32(0); i < batchNum; i++ {
// singleMetaSize
if _, err := io.ReadFull(rdBuf, buf32); err != nil {
return nil, err
}
singleMetaSize := binary.BigEndian.Uint32(buf32)

// singleMeta
singleMetaBuf := make([]byte, singleMetaSize)
if _, err := io.ReadFull(rdBuf, singleMetaBuf); err != nil {
return nil, err
}
singleMeta := new(pb.SingleMessageMetadata)
if err := proto.Unmarshal(singleMetaBuf, singleMeta); err != nil {
return nil, err
}
// payload
singlePayload := make([]byte, singleMeta.GetPayloadSize())
if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
return nil, err
}
d := &singleMessage{}
d.SingleMetaSize = singleMetaSize
d.SingleMeta = singleMeta
d.SinglePayload = singlePayload
list = append(list, d)
}
return list, nil
}

// ConvertFromStringMap convert a string map to a KeyValue []byte
func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
list := make([]*pb.KeyValue, len(m))
Expand Down
17 changes: 17 additions & 0 deletions pulsar/internal/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,20 @@ func TestConvertStringMap(t *testing.T) {
assert.Equal(t, "1", m2["a"])
assert.Equal(t, "2", m2["b"])
}

func TestDecodeBatchPayload(t *testing.T) {
// singleMsg = singleMetaSize(4 bytes) + singleMeta(var length) + payload
singleMsg := []byte{0, 0, 0, 2, 24, 12, 104, 101, 108, 108, 111, 45, 112, 117, 108, 115, 97, 114}
list, err := decodeBatchPayload(singleMsg, 1)
if err != nil {
t.Fatal(err)
}
if get, want := len(list), 1; get != want {
t.Errorf("want %v, but get %v", get, want)
}

m := list[0]
if get, want := string(m.SinglePayload), "hello-pulsar"; get != want {
t.Errorf("want %v, but get %v", get, want)
}
}

0 comments on commit d049757

Please sign in to comment.