From 85bbf458b960be81d236bd58d424f189d8934d71 Mon Sep 17 00:00:00 2001 From: cckellogg Date: Mon, 16 Dec 2019 20:35:53 -0800 Subject: [PATCH] [Issue #123] Ensure message is sent if no room in current batch. (#124) * [Issue #123] Ensure message is sent if no room in current batch. * Add issue to test comment. * Increase test wait timeout. * Add timeout log for test. * Add log when a single message fails to be added to batch. --- pulsar/consumer_test.go | 3 +- pulsar/negative_acks_tracker_test.go | 8 ++-- pulsar/producer_partition.go | 19 +++++++-- pulsar/producer_test.go | 63 ++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 8 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 5ff734c2dc09a..c2a2073611123 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -787,7 +787,8 @@ func TestConsumerMetadata(t *testing.T) { t.Fatal(err) } subs := stats["subscriptions"].(map[string]interface{}) - meta := subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{}) + cons := subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{}) + meta := cons["metadata"].(map[string]interface{}) assert.Equal(t, len(props), len(meta)) for k, v := range props { mv := meta[k].(string) diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index 4dac66e2cd928..93c1af2763aef 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -29,9 +29,9 @@ import ( const testNackDelay = 300 * time.Millisecond type nackMockedConsumer struct { - ch chan messageID + ch chan messageID closed bool - lock sync.Mutex + lock sync.Mutex msgIds []messageID } @@ -42,7 +42,7 @@ func newNackMockedConsumer() *nackMockedConsumer { go func() { // since the client ticks at an interval of delay / 3 // wait another interval to ensure we get all messages - time.Sleep(testNackDelay + 101 * time.Millisecond) + time.Sleep(testNackDelay + 101*time.Millisecond) t.lock.Lock() defer t.lock.Unlock() t.closed = true @@ -69,7 +69,7 @@ func sortMessageIds(msgIds []messageID) []messageID { return msgIds } -func (nmc *nackMockedConsumer) Wait() <- chan messageID { +func (nmc *nackMockedConsumer) Wait() <-chan messageID { return nmc.ch } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index c5345cb4f0f11..b3b89370fda7e 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -256,14 +256,27 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } if sendAsBatch { - ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters) - if ok == false { + added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters) + if !added { // The current batch is full.. flush it and retry p.internalFlushCurrentBatch() + + // after flushing try again to add the current payload + if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters); !ok { + p.log.WithField("size", len(msg.Payload)). + WithField("sequenceID", sequenceID). + WithField("properties", msg.Properties). + Error("unable to add message to batch") + } } } else { // Send individually - p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters) + if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters); !added { + p.log.WithField("size", len(msg.Payload)). + WithField("sequenceID", sequenceID). + WithField("properties", msg.Properties). + Error("unable to send single message") + } p.internalFlushCurrentBatch() } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index bc3f8ea8de8e6..4fcc011a7fc8b 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -558,3 +558,66 @@ func TestProducerMetadata(t *testing.T) { assert.Equal(t, v, mv) } } + +// test for issues #76, #114 and #123 +func TestBatchMessageFlushing(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + topic := newTopicName() + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + if err != nil { + t.Fatal(err) + } + defer producer.Close() + + maxBytes := internal.MaxBatchSize + genbytes := func(n int) []byte { + c := []byte("a")[0] + bytes := make([]byte, n) + for i := 0; i < n; i++ { + bytes[i] = c + } + return bytes + } + + msgs := [][]byte{ + genbytes(maxBytes - 10), + genbytes(11), + } + + ch := make(chan struct{}, 2) + ctx := context.Background() + for _, msg := range msgs { + msg := &ProducerMessage{ + Payload: msg, + } + producer.SendAsync(ctx, msg, func(id MessageID, producerMessage *ProducerMessage, err error) { + ch <- struct{}{} + }) + } + + published := 0 + keepGoing := true + for keepGoing { + select { + case <-ch: + published++ + if published == 2 { + keepGoing = false + } + case <-time.After(defaultBatchingMaxPublishDelay * 10): + fmt.Println("TestBatchMessageFlushing timeout waiting to publish messages") + keepGoing = false + } + } + + assert.Equal(t, 2, published, "expected to publish two messages") +}