Skip to content

Commit

Permalink
Expose BatchingMaxSize from ProducerOptions (apache#280)
Browse files Browse the repository at this point in the history
Previously, the producer maximum batch size was hard-coded to 128 KB.

Now, the produdcer maximum batch size is exposed via ProducerOptions
and defaults to 128 KB

Co-authored-by: Daniel Ferstay <dferstay@splunk.com>
dferstay and Daniel Ferstay authored Jun 13, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 2a6c3e7 commit 62203d7
Showing 4 changed files with 24 additions and 11 deletions.
22 changes: 15 additions & 7 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
@@ -29,10 +29,9 @@ import (
)

const (
// MaxBatchSize will be the largest size for a batch sent from this particular producer.
// This is used as a baseline to allocate a new buffer that can hold the entire batch
// without needing costly re-allocations.
MaxBatchSize = 128 * 1024
// DefaultMaxBatchSize init default for maximum number of bytes per batch
DefaultMaxBatchSize = 128 * 1024

// DefaultMaxMessagesPerBatch init default num of entries in per batch.
DefaultMaxMessagesPerBatch = 1000
)
@@ -47,6 +46,11 @@ type BatchBuilder struct {
// Max number of message allowed in the batch
maxMessages uint

// The largest size for a batch sent from this praticular producer.
// This is used as a baseline to allocate a new buffer that can hold the entire batch
// without needing costly re-allocations.
maxBatchSize uint

producerName string
producerID uint64

@@ -58,15 +62,19 @@ type BatchBuilder struct {
}

// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
if maxBatchSize == 0 {
maxBatchSize = DefaultMaxBatchSize
}
bb := &BatchBuilder{
buffer: NewBuffer(4096),
numMessages: 0,
maxMessages: maxMessages,
maxBatchSize: maxBatchSize,
producerName: producerName,
producerID: producerID,
cmdSend: baseCommand(pb.BaseCommand_SEND,
@@ -93,12 +101,12 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,

// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
func (bb *BatchBuilder) IsFull() bool {
return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > MaxBatchSize
return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > uint32(bb.maxBatchSize)
}

func (bb *BatchBuilder) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > MaxBatchSize
return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > uint32(bb.maxBatchSize)
}

// Add will add single message to batch.
7 changes: 6 additions & 1 deletion pulsar/producer.go
Original file line number Diff line number Diff line change
@@ -109,8 +109,13 @@ type ProducerOptions struct {

// BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000)
// If set to a value greater than 1, messages will be queued until this threshold is reached or
// batch interval has elapsed.
// BatchingMaxSize (see below) has been reached or the batch interval has elapsed.
BatchingMaxMessages uint

// BatchingMaxSize sets the maximum number of bytes permitted in a batch. (default 128 KB)
// If set to a value greater than 1, messages will be queued until this threshold is reached or
// BatchingMaxMessages (see above) has been reached or the batch interval has elapsed.
BatchingMaxSize uint
}

// Producer is used to publish messages on a topic
4 changes: 2 additions & 2 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
@@ -151,8 +151,8 @@ func (p *partitionProducer) grabCnx() error {

p.producerName = res.Response.ProducerSuccess.GetProducerName()
if p.batchBuilder == nil {
p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.producerName,
p.producerID, pb.CompressionType(p.options.CompressionType))
p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType))
if err != nil {
return err
}
2 changes: 1 addition & 1 deletion pulsar/producer_test.go
Original file line number Diff line number Diff line change
@@ -661,7 +661,7 @@ func TestBatchMessageFlushing(t *testing.T) {
}
defer producer.Close()

maxBytes := internal.MaxBatchSize
maxBytes := internal.DefaultMaxBatchSize
genbytes := func(n int) []byte {
c := []byte("a")[0]
bytes := make([]byte, n)

0 comments on commit 62203d7

Please sign in to comment.