Skip to content

Commit

Permalink
Add KafkaEntryFormatter (apache#288)
Browse files Browse the repository at this point in the history
It's a follow PR of [apache#280](streamnative/kop#280). This PR adds an optional value to `entry.format` config so that user can improve the performance because there's little entry conversion between `MemoryRecords` and entry's `ByteBuf`, while the cost is that Kafka client won't be compatible with Pulsar client.

Because the entry format of `kafka` still adds a `MessageMetadata` header to record the `entry.format`, it will be possible for Pulsar broker/client to decode these messages later.

This PR adds a `KafkaEntryFormatter` class associated to `entry.format=kafka` and apply this config to existed tests that have relation to Kafka client but not Pulsar client. An exception is `SaslPlainTest`, it will fail when multiple test instances are created because `LOOKUP_CACHE` is a static variable, the `findBroker` call of other test instances may be influenced. So we create two separated derived classes for test.

Finally this PR adds a performance test that can be run to see the time cost of different `EntryFormatter`s, a simple run result is:

```
--- fixed records for 1000 times ---
PulsarEntryFormatter encode time: 7287 ms
KafkaEntryFormatter encode time: 79 ms
NoHeaderKafkaEntryFormatter encode time: 0 ms
--- random records for 1000 times ---
PulsarEntryFormatter encode time: 6947 ms
KafkaEntryFormatter encode time: 78 ms
NoHeaderKafkaEntryFormatter encode time: 1 ms
```

The default batch is 2048 messages and each message contains 1024 bytes. We can see the behavior of `KafkaEntryFormatter`  has some overhead caused by adding header and copying the original bytes from `MemoryRecords`. However it's not significant much. But `PulsarEntryFormatter` takes about 7 ms for each batch, which may be not acceptable.

Another performance test that compares `PulsarEntryFormatter` and `KafkaEntryFormatter` is done by running KoP with Pulsar 2.8.0-SNAPSHOT (commit 11b9359) standalone on my LapTop.

```bash
$ ./bin/kafka-producer-perf-test.sh --topic my-topic --producer-props bootstrap.servers=127.0.0.1:9092 \
 --num-records 100000000 --record-size 128 --throughput -1
```

Sampling from 5 lines from the output after the throughput/latency are stable, and calculating the average value. The result is:

| entryFormat | Throughput (MB/sec) | Avg. Latency (ms) | Max Latency (ms) |
| - | - | - | - |
| pulsar | 39.768 | 740.74 | 802.8 |
| kafka | 44.212 | 667.98 | 756.6 |
  • Loading branch information
BewareMyPower authored Dec 27, 2020
1 parent 5ca7687 commit fdb3adb
Show file tree
Hide file tree
Showing 25 changed files with 567 additions and 67 deletions.
12 changes: 10 additions & 2 deletions kafka-impl/conf/kop.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,22 @@ offsetsTopicNumPartitions=8
maxReadEntriesNum=5

# The format of an entry. The default value is pulsar.
# Optional values: [pulsar]
# Optional values: [pulsar, kafka]
#
# pulsar:
# When KoP receives messages from kafka producer, it will serialize these messages to
# the format so that pulsar consumer can read directly.
# When KoP sends entries to kafka consumer, it will treat each entry as pulsar's
# format and deserialize each entry to kafka's format.
entry.format=pulsar
#
# kafka:
# When KoP receives messages from kafka producer, add a header which is PulsarApi.Metadata
# before the messages' bytes, and then write to BK directly.
# When KoP sends entries to kafka consumer, it will treat each entry as kafka's format and
# just discard the pulsar header and send left bytes to Kafka consumer.
# This mode means that current pulsar clients cannot interact with kafka clients, but
# kafka producer works well with kafka consumer.
entryFormat=pulsar

### --- KoP SSL configs--- ###

Expand Down
10 changes: 9 additions & 1 deletion kafka-impl/conf/kop_standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,15 @@ maxReadEntriesNum=1
# the format so that pulsar consumer can read directly.
# When KoP sends entries to kafka consumer, it will treat each entry as pulsar's
# format and deserialize each entry to kafka's format.
entry.format=pulsar
#
# kafka:
# When KoP receives messages from kafka producer, add a header which is PulsarApi.Metadata
# before the messages' bytes, and then write to BK directly.
# When KoP sends entries to kafka consumer, it will treat each entry as kafka's format and
# just discard the pulsar header and send left bytes to Kafka consumer.
# This mode means that current pulsar clients cannot interact with kafka clients, but
# kafka producer works well with kafka consumer.
entryFormat=pulsar

### --- KoP SSL configs--- ###

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {

@FieldContext(
category = CATEGORY_KOP,
doc = "The format of an entry. Default: pulsar. Optional: [pulsar]"
doc = "The format of an entry. Default: pulsar. Optional: [pulsar, kafka]"
)
private String entryFormat = "pulsar";
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public interface EntryFormatter {

/**
* Decode a stream of entries to Kafka records.
* It should be noted that this method is responsible for releasing the entries.
*
* @param entries the list of entries
* @param magic the Kafka record batch's magic value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
public class EntryFormatterFactory {

enum EntryFormat {
PULSAR
PULSAR,
KAFKA
}

public static EntryFormatter create(final String format) {
Expand All @@ -30,6 +31,8 @@ public static EntryFormatter create(final String format) {
switch (entryFormat) {
case PULSAR:
return new PulsarEntryFormatter();
case KAFKA:
return new KafkaEntryFormatter();
default:
throw new Exception("No EntryFormatter for " + entryFormat);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.format;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.pulsar.common.protocol.Commands;


/**
* The entry formatter that uses Kafka's format.
*/
public class KafkaEntryFormatter implements EntryFormatter {
private final KafkaEntryFormatterHeader header = new KafkaEntryFormatterHeader();

@Override
public ByteBuf encode(MemoryRecords records, int numMessages) {
return Commands.serializeMetadataAndPayload(
Commands.ChecksumType.None,
header.getMessageMetadata(),
Unpooled.wrappedBuffer(records.buffer())
);
}

@Override
public MemoryRecords decode(List<Entry> entries, byte magic) {
int size = 0;
for (Entry entry : entries) {
size += entry.getLength();
}
final MemoryRecordsBuilder builder = MemoryRecords.builder(
ByteBuffer.allocate(size),
magic,
CompressionType.NONE,
TimestampType.CREATE_TIME,
MessageIdUtils.getOffset(entries.get(0).getLedgerId(), entries.get(0).getEntryId()));
entries.forEach(entry -> {
final ByteBuf byteBuf = entry.getDataBuffer();
Commands.skipMessageMetadata(byteBuf);
final MemoryRecords records = MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(byteBuf));
long offset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId());
for (Record record : records.records()) {
builder.appendWithOffset(offset, record);
offset++;
}
entry.release();
});
return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.format;

import org.apache.pulsar.common.api.proto.PulsarApi;


/**
* The header of KafkaEntryFormatter.
*/
public class KafkaEntryFormatterHeader {

private static volatile PulsarApi.MessageMetadata messageMetadata = null;

public PulsarApi.MessageMetadata getMessageMetadata() {
if (messageMetadata == null) {
synchronized (KafkaEntryFormatterHeader.class) {
if (messageMetadata == null) {
messageMetadata = createMessageMetadata();
}
}
}
return messageMetadata;
}

private static PulsarApi.MessageMetadata createMessageMetadata() {
final PulsarApi.MessageMetadata.Builder builder = PulsarApi.MessageMetadata.newBuilder();

// TODO: Pulsar broker may add a field that represents entry.format to MessageMetadata in future. After that we
// should set that field instead of adding a key-value property.
builder.addProperties(PulsarApi.KeyValue.newBuilder()
.setKey("entry.format")
.setValue(EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase())
.build());

// Following fields are meaningless because the metadata is already contained in MemoryRecords. Here we set
// them just because they're required fields.
builder.setProducerName("");
builder.setSequenceId(0L);
builder.setPublishTime(0L);

return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.format;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;

import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;


/**
* The performance test for {@link EntryFormatter#encode(MemoryRecords, int)}.
*/
public class EncodePerformanceTest {

private static final int NUM_MESSAGES = 2048;
private static final int MESSAGE_SIZE = 1024;

public static void main(String[] args) {
// The first time to run PulsarEntryFormatter a warn log will be printed that could take a lot of time.
runSingleTest(prepareFixedRecords(), "fixed records", 1);

runSingleTest(prepareFixedRecords(), "fixed records", 100);
runSingleTest(prepareRandomRecords(), "random records", 100);

runSingleTest(prepareFixedRecords(), "fixed records", 1000);
runSingleTest(prepareRandomRecords(), "random records", 1000);
}

private static void runSingleTest(final MemoryRecords records, final String description, final int repeatTimes) {
final EntryFormatter pulsarFormatter = EntryFormatterFactory.create("pulsar");
final EntryFormatter kafkaFormatter = EntryFormatterFactory.create("kafka");
// Here we also add a comparison with NoHeaderKafkaEntryFormatter to measure the overhead of adding a header
// and copy the ByteBuffer of MemoryRecords that are done by KafkaEntryFormatter.
final EntryFormatter noHeaderKafkaFormatter = new NoHeaderKafkaEntryFormatter();

System.out.println("--- " + description + " for " + repeatTimes + " times ---");

long t1 = System.currentTimeMillis();
for (int i = 0; i < repeatTimes; i++) {
pulsarFormatter.encode(records, NUM_MESSAGES).release();
}
long t2 = System.currentTimeMillis();
System.out.println("PulsarEntryFormatter encode time: " + (t2 - t1) + " ms");

t1 = System.currentTimeMillis();
for (int i = 0; i < repeatTimes; i++) {
kafkaFormatter.encode(records, NUM_MESSAGES).release();
}
t2 = System.currentTimeMillis();
System.out.println("KafkaEntryFormatter encode time: " + (t2 - t1) + " ms");

t1 = System.currentTimeMillis();
for (int i = 0; i < repeatTimes; i++) {
noHeaderKafkaFormatter.encode(records, NUM_MESSAGES).release();
}
t2 = System.currentTimeMillis();
System.out.println("NoHeaderKafkaEntryFormatter encode time: " + (t2 - t1) + " ms");
}

private static MemoryRecordsBuilder newMemoryRecordsBuilder() {
return MemoryRecords.builder(
ByteBuffer.allocate(1024 * 1024 * 5),
RecordBatch.CURRENT_MAGIC_VALUE,
CompressionType.NONE,
TimestampType.CREATE_TIME,
0L);
}

private static MemoryRecords prepareFixedRecords() {
final MemoryRecordsBuilder builder = newMemoryRecordsBuilder();
for (int i = 0; i < NUM_MESSAGES; i++) {
final byte[] value = new byte[MESSAGE_SIZE];
Arrays.fill(value, (byte) 'a');
builder.append(new SimpleRecord(System.currentTimeMillis(), "key".getBytes(), value));
}
return builder.build();
}

private static MemoryRecords prepareRandomRecords() {
final MemoryRecordsBuilder builder = newMemoryRecordsBuilder();
final Random random = new Random();
for (int i = 0; i < NUM_MESSAGES; i++) {
final ByteBuffer buffer = ByteBuffer.allocate(MESSAGE_SIZE);
for (int j = 0; j < MESSAGE_SIZE / 4; j++) {
buffer.putInt(random.nextInt());
}
builder.append(new SimpleRecord(System.currentTimeMillis(), "key".getBytes(), buffer.array()));
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.format;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.record.MemoryRecords;

/**
* The entry formatter that uses Kafka's format but has no header.
*/
public class NoHeaderKafkaEntryFormatter implements EntryFormatter {

@Override
public ByteBuf encode(MemoryRecords records, int numMessages) {
// The difference from KafkaEntryFormatter is here we don't add the header
return Unpooled.wrappedBuffer(records.buffer());
}

@Override
public MemoryRecords decode(List<Entry> entries, byte magic) {
// Do nothing
return null;
}
}
1 change: 1 addition & 0 deletions kafka-impl/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
</Root>
<Logger name="org.eclipse.jetty" level="info"/>
<Logger name="io.streamnative" level="trace"/>
<Logger name="io.streamnative.pulsar.handlers.kop.format" level="error"/>
<Logger name="org.apache.pulsar" level="info"/>
<Logger name="org.apache.bookkeeper" level="info"/>
<Logger name="org.apache.kafka" level="debug"/>
Expand Down
Loading

0 comments on commit fdb3adb

Please sign in to comment.