Skip to content

Commit

Permalink
fix: fix 1064
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleiphir2769 committed Jul 21, 2023
1 parent 3812c07 commit 0eafe04
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,9 @@ func (p *partitionProducer) internalFlushCurrentBatches() {

func (p *partitionProducer) internalFlush(fr *flushRequest) {

p.internalFlushCurrentBatch()
if !p.options.DisableBatching {
p.internalFlushCurrentBatch()
}

pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
if !ok {
Expand Down
44 changes: 44 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,50 @@ func TestProducerAsyncSend(t *testing.T) {
wg.Wait()
}

func TestProducerFlushDisableBatching(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.NoError(t, err)
defer client.Close()

producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
DisableBatching: true,
})

assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()

wg := sync.WaitGroup{}
wg.Add(10)
errors := internal.NewBlockingQueue(10)

for i := 0; i < 10; i++ {
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
}, func(id MessageID, message *ProducerMessage, e error) {
if e != nil {
log.WithError(e).Error("Failed to publish")
errors.Put(e)
} else {
log.Info("Published message ", id)
}
wg.Done()
})

assert.NoError(t, err)
}

err = producer.Flush()
assert.Nil(t, err)

wg.Wait()

assert.Equal(t, 0, errors.Size())
}

func TestProducerCompression(t *testing.T) {
type testProvider struct {
name string
Expand Down

0 comments on commit 0eafe04

Please sign in to comment.