diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index acfbca638cb..42027d9550f 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -428,7 +428,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi s = sinkURI.Query().Get("max-batch-size") if s != "" { - opts["max-message-bytes"] = s + opts["max-batch-size"] = s } s = sinkURI.Query().Get("compression") @@ -498,7 +498,7 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, s = sinkURI.Query().Get("max-batch-size") if s != "" { - opts["max-message-bytes"] = s + opts["max-batch-size"] = s } // For now, it's a place holder. Avro format have to make connection to Schema Registery, // and it may needs credential.