Skip to content

Commit

Permalink
[Issue 240] Add check for max message size (apache#263)
Browse files Browse the repository at this point in the history
* Add check for max message size

1. When creating a connection, try to get maxMessageSize from handshake
response command. If it's not set, then use the default maxMessageSize
value defined in the client side.
2. When sending a message, check whether the size of payload exceeds
maxMessageSize. If so, return error immediately without adding this
meesage into sending queue.
3. To implement these, I made some tiny modifications in Connection
interface and added a field in its implementation struct.

* Add testing for max message size

* Fix error log
  • Loading branch information
izackwu authored May 28, 2020
1 parent e31d474 commit bf248fd
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 6 deletions.
3 changes: 3 additions & 0 deletions integration-tests/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ statusFilePath=/usr/local/apache/htdocs
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000

# Set maxMessageSize to 1MB rather than the default value 5MB for testing
maxMessageSize=1048576

### --- Authentication --- ###

# Enable TLS
Expand Down
2 changes: 0 additions & 2 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
)

const (
// MaxMessageSize limit message size for transfer
MaxMessageSize = 5 * 1024 * 1024
// MaxBatchSize will be the largest size for a batch sent from this particular producer.
// This is used as a baseline to allocate a new buffer that can hold the entire batch
// without needing costly re-allocations.
Expand Down
8 changes: 6 additions & 2 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ import (

const (
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
MaxFrameSize = 5 * 1024 * 1024
magicCrc32c uint16 = 0x0e01
MaxFrameSize = 5 * 1024 * 1024
// MessageFramePadding is for metadata and other frame headers
MessageFramePadding = 10 * 1024
// MaxMessageSize limit message size for transfer
MaxMessageSize = MaxFrameSize - MessageFramePadding
magicCrc32c uint16 = 0x0e01
)

// ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data.
Expand Down
15 changes: 14 additions & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Connection interface {
AddConsumeHandler(id uint64, handler ConsumerHandler)
DeleteConsumeHandler(id uint64)
ID() string
GetMaxMessageSize() int32
Close()
}

Expand Down Expand Up @@ -157,6 +158,8 @@ type connection struct {

tlsOptions *TLSOptions
auth auth.Provider

maxMessageSize int32
}

func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
Expand Down Expand Up @@ -282,7 +285,13 @@ func (c *connection) doHandshake() bool {
cmd.Type)
return false
}

if cmd.Connected.MaxMessageSize != nil {
c.log.Debug("Got MaxMessageSize from handshake response:", *cmd.Connected.MaxMessageSize)
c.maxMessageSize = *cmd.Connected.MaxMessageSize
} else {
c.log.Debug("No MaxMessageSize from handshake response, use default: ", MaxMessageSize)
c.maxMessageSize = MaxMessageSize
}
c.log.Info("Connection is ready")
c.changeState(connectionReady)
return true
Expand Down Expand Up @@ -749,3 +758,7 @@ func (c *connection) consumerHandler(id uint64) (ConsumerHandler, bool) {
func (c *connection) ID() string {
return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr())
}

func (c *connection) GetMaxMessageSize() int32 {
return c.maxMessageSize
}
15 changes: 14 additions & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ const (
producerClosed
)

var errFailAddBatch = errors.New("message send failed")
var (
errFailAddBatch = errors.New("message send failed")
errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
)

type partitionProducer struct {
state int32
Expand Down Expand Up @@ -236,6 +239,16 @@ func (p *partitionProducer) internalSend(request *sendRequest) {

msg := request.msg

// if msg is too large
if len(msg.Payload) > int(p.cnx.GetMaxMessageSize()) {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithField("size", len(msg.Payload)).
WithField("properties", msg.Properties).
WithError(errMessageTooLarge).Error()
return
}

deliverAt := msg.DeliverAt
if msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(msg.DeliverAfter)
Expand Down
27 changes: 27 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,3 +790,30 @@ func TestDelayAbsolute(t *testing.T) {
assert.NotNil(t, msg)
canc()
}

func TestMaxMessageSize(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.NoError(t, err)
defer client.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
})
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
serverMaxMessageSize := 1024 * 1024
for bias := -1; bias <= 1; bias++ {
payload := make([]byte, serverMaxMessageSize+bias)
ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: payload,
})
if bias <= 0 {
assert.NoError(t, err)
assert.NotNil(t, ID)
} else {
assert.Equal(t, errMessageTooLarge, err)
}
}
}

0 comments on commit bf248fd

Please sign in to comment.