Skip to content

Commit

Permalink
codec(cdc): fix encoder max-message-bytes (pingcap#4074) (pingcap#4077
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ti-chi-bot authored Dec 27, 2021
1 parent 1ac656b commit d94cf7c
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 148 deletions.
42 changes: 21 additions & 21 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
const (
// BatchVersion1 represents the version of batch format
BatchVersion1 uint64 = 1
// DefaultMaxMessageBytes sets the default value for max-message-bytes
DefaultMaxMessageBytes int = 1 * 1024 * 1024 // 1M
// DefaultMaxBatchSize sets the default value for max-batch-size
DefaultMaxBatchSize int = 16
)
Expand Down Expand Up @@ -317,13 +315,13 @@ type JSONEventBatchEncoder struct {
messageBuf []*MQMessage
curBatchSize int
// configs
maxKafkaMessageSize int
maxBatchSize int
maxMessageBytes int
maxBatchSize int
}

// GetMaxKafkaMessageSize is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxKafkaMessageSize() int {
return d.maxKafkaMessageSize
// GetMaxMessageBytes is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxMessageBytes() int {
return d.maxMessageBytes
}

// GetMaxBatchSize is only for unit testing.
Expand Down Expand Up @@ -402,15 +400,15 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
// for single message that longer than max-message-size, do not send it.
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
length := len(key) + len(value) + maximumRecordOverhead + 16 + 8
if length > d.maxKafkaMessageSize {
if length > d.maxMessageBytes {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxKafkaMessageSize), zap.Int("length", length), zap.Any("table", e.Table))
zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table))
return EncoderNoOperation, cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}

if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize {
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes {

versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)
Expand All @@ -428,10 +426,10 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table

if message.Length() > d.maxKafkaMessageSize {
if message.Length() > d.maxMessageBytes {
// `len(d.messageBuf) == 1` is implied
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize))
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes))
}
d.curBatchSize++
}
Expand Down Expand Up @@ -549,17 +547,19 @@ func (d *JSONEventBatchEncoder) Reset() {
// SetParams reads relevant parameters for Open Protocol
func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error {
var err error
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
d.maxKafkaMessageSize, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrKafkaInvalidConfig.Wrap(err)
}
} else {
d.maxKafkaMessageSize = DefaultMaxMessageBytes

maxMessageBytes, ok := params["max-message-bytes"]
if !ok {
return cerror.ErrKafkaInvalidConfig.GenWithStack("max-message-bytes not found")
}

d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

if d.maxKafkaMessageSize <= 0 {
return cerror.ErrKafkaInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxKafkaMessageSize))
if d.maxMessageBytes <= 0 {
return cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.Errorf("invalid max-message-bytes %d", d.maxMessageBytes))
}

if maxBatchSize, ok := params["max-batch-size"]; ok {
Expand Down
53 changes: 41 additions & 12 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"testing"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
)

Expand Down Expand Up @@ -224,10 +226,10 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder().(*JSONEventBatchEncoder)
err := encoder.SetParams(map[string]string{})
err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760"})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-message-bytes": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -238,28 +240,50 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxInt32)

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxUint32)

err = encoder.SetParams(map[string]string{"max-batch-size": "0"})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": "-1"})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": "-1"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)
}

func (s *batchSuite) TestSetParams(c *check.C) {
defer testleak.AfterTest(c)

opts := make(map[string]string)
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(opts)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
".*max-message-bytes not found.*",
)

opts["max-message-bytes"] = "1"
encoder = NewJSONEventBatchEncoder()
err = encoder.SetParams(opts)
c.Assert(err, check.IsNil)
c.Assert(encoder, check.NotNil)
jsonEncoder, ok := encoder.(*JSONEventBatchEncoder)
c.Assert(ok, check.IsTrue)
c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1)
}

func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
Expand Down Expand Up @@ -307,8 +331,13 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
func (s *batchSuite) TestMaxBatchSize(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-batch-size": "64"})
c.Check(err, check.IsNil)
err := encoder.SetParams(map[string]string{"max-message-bytes": "1048576", "max-batch-size": "64"})
c.Assert(encoder, check.NotNil)
c.Assert(err, check.IsNil)

jsonEncoder, ok := encoder.(*JSONEventBatchEncoder)
c.Assert(ok, check.IsTrue)
c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1048576)

testEvent := &model.RowChangedEvent{
CommitTs: 1,
Expand Down
10 changes: 7 additions & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,11 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
return r == '/'
})
producer, err := kafka.NewKafkaSaramaProducer(ctx, sinkURI.Host, topic, config, errCh)
if topic == "" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, opts, errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -443,8 +447,8 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
if s != "" {
opts["max-batch-size"] = s
}
// For now, it's a place holder. Avro format have to make connection to Schema Registery,
// and it may needs credential.
// For now, it's a placeholder. Avro format have to make connection to Schema Registry,
// and it may need credential.
credential := &security.Credential{}
sink, err := newMqSink(ctx, credential, producer, filter, replicaConfig, opts, errCh)
if err != nil {
Expand Down
16 changes: 14 additions & 2 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,19 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

encoder := sink.newEncoder()
c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxKafkaMessageSize(), check.Equals, 4194304)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304)

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
Expand Down Expand Up @@ -159,6 +165,12 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -216,5 +228,5 @@ func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) {
encoder := sink.newEncoder()
c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxKafkaMessageSize(), check.Equals, 4194304)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304)
}
Loading

0 comments on commit d94cf7c

Please sign in to comment.