Skip to content

Commit

Permalink
[Issue apache#123] Ensure message is sent if no room in current batch. (
Browse files Browse the repository at this point in the history
apache#124)

* [Issue apache#123] Ensure message is sent if no room in current batch.

* Add issue to test comment.

* Increase test wait timeout.

* Add timeout log for test.

* Add log when a single message fails to be added to batch.
  • Loading branch information
cckellogg authored and wolfstudy committed Dec 17, 2019
1 parent eb9b69e commit 85bbf45
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 8 deletions.
3 changes: 2 additions & 1 deletion pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,8 @@ func TestConsumerMetadata(t *testing.T) {
t.Fatal(err)
}
subs := stats["subscriptions"].(map[string]interface{})
meta := subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{})
cons := subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})
meta := cons["metadata"].(map[string]interface{})
assert.Equal(t, len(props), len(meta))
for k, v := range props {
mv := meta[k].(string)
Expand Down
8 changes: 4 additions & 4 deletions pulsar/negative_acks_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
const testNackDelay = 300 * time.Millisecond

type nackMockedConsumer struct {
ch chan messageID
ch chan messageID
closed bool
lock sync.Mutex
lock sync.Mutex
msgIds []messageID
}

Expand All @@ -42,7 +42,7 @@ func newNackMockedConsumer() *nackMockedConsumer {
go func() {
// since the client ticks at an interval of delay / 3
// wait another interval to ensure we get all messages
time.Sleep(testNackDelay + 101 * time.Millisecond)
time.Sleep(testNackDelay + 101*time.Millisecond)
t.lock.Lock()
defer t.lock.Unlock()
t.closed = true
Expand All @@ -69,7 +69,7 @@ func sortMessageIds(msgIds []messageID) []messageID {
return msgIds
}

func (nmc *nackMockedConsumer) Wait() <- chan messageID {
func (nmc *nackMockedConsumer) Wait() <-chan messageID {
return nmc.ch
}

Expand Down
19 changes: 16 additions & 3 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,27 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
}

if sendAsBatch {
ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
if ok == false {
added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
if !added {
// The current batch is full.. flush it and retry
p.internalFlushCurrentBatch()

// after flushing try again to add the current payload
if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters); !ok {
p.log.WithField("size", len(msg.Payload)).
WithField("sequenceID", sequenceID).
WithField("properties", msg.Properties).
Error("unable to add message to batch")
}
}
} else {
// Send individually
p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters); !added {
p.log.WithField("size", len(msg.Payload)).
WithField("sequenceID", sequenceID).
WithField("properties", msg.Properties).
Error("unable to send single message")
}
p.internalFlushCurrentBatch()
}

Expand Down
63 changes: 63 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,3 +558,66 @@ func TestProducerMetadata(t *testing.T) {
assert.Equal(t, v, mv)
}
}

// test for issues #76, #114 and #123
func TestBatchMessageFlushing(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()

topic := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
if err != nil {
t.Fatal(err)
}
defer producer.Close()

maxBytes := internal.MaxBatchSize
genbytes := func(n int) []byte {
c := []byte("a")[0]
bytes := make([]byte, n)
for i := 0; i < n; i++ {
bytes[i] = c
}
return bytes
}

msgs := [][]byte{
genbytes(maxBytes - 10),
genbytes(11),
}

ch := make(chan struct{}, 2)
ctx := context.Background()
for _, msg := range msgs {
msg := &ProducerMessage{
Payload: msg,
}
producer.SendAsync(ctx, msg, func(id MessageID, producerMessage *ProducerMessage, err error) {
ch <- struct{}{}
})
}

published := 0
keepGoing := true
for keepGoing {
select {
case <-ch:
published++
if published == 2 {
keepGoing = false
}
case <-time.After(defaultBatchingMaxPublishDelay * 10):
fmt.Println("TestBatchMessageFlushing timeout waiting to publish messages")
keepGoing = false
}
}

assert.Equal(t, 2, published, "expected to publish two messages")
}

0 comments on commit 85bbf45

Please sign in to comment.