From e675b173ece66c41039a1be5d8d9f31d120cedf6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 24 Dec 2020 03:11:10 -0600 Subject: [PATCH] Add entry formatter to do conversions between MemoryRecords and ByteBuf (#280) Currently, KoP has some significant performance overhead in the conversion between Kafka's records and Pulsar's messages. For pure Kafka users, i.e. no Pulsar client is involved, the conversion is unnecessary. So this PR provides a config `entry.format` to specify if the entry's format can be accepted by Pulsar clients. And a `EntryFormatter` is used to do the format conversion instead of static methods of `MessageRecordUtils`. In future, we'll implement a new `EntryFormatter` to avoid this conversion so that the performance can be improved a lot. After that, users can choose whether to support Pulsar clients for KoP. --- kafka-impl/conf/kop.conf | 10 + kafka-impl/conf/kop_standalone.conf | 10 + .../handlers/kop/KafkaRequestHandler.java | 9 +- .../kop/KafkaServiceConfiguration.java | 5 + .../handlers/kop/MessageFetchContext.java | 5 +- .../pulsar/handlers/kop/PendingProduce.java | 19 +- .../handlers/kop/format/EntryFormatter.java | 63 +++ .../kop/format/EntryFormatterFactory.java | 40 ++ .../PulsarEntryFormatter.java} | 364 +++++------------- .../handlers/kop/format/package-info.java | 14 + .../handlers/kop/utils/ByteBufUtils.java | 65 ++++ .../handlers/kop/EntryFormatterTest.java | 62 +++ .../pulsar/handlers/kop/KafkaApisTest.java | 2 +- 13 files changed, 374 insertions(+), 294 deletions(-) create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java rename kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/{utils/MessageRecordUtils.java => format/PulsarEntryFormatter.java} (50%) create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java create mode 100644 kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EntryFormatterTest.java diff --git a/kafka-impl/conf/kop.conf b/kafka-impl/conf/kop.conf index a8ea392735..a5bb1c4d04 100755 --- a/kafka-impl/conf/kop.conf +++ b/kafka-impl/conf/kop.conf @@ -71,6 +71,16 @@ offsetsTopicNumPartitions=8 # Maximum number of entries that are read from cursor once per time maxReadEntriesNum=5 +# The format of an entry. The default value is pulsar. +# Optional values: [pulsar] +# +# pulsar: +# When KoP receives messages from kafka producer, it will serialize these messages to +# the format so that pulsar consumer can read directly. +# When KoP sends entries to kafka consumer, it will treat each entry as pulsar's +# format and deserialize each entry to kafka's format. +entry.format=pulsar + ### --- KoP SSL configs--- ### # Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol diff --git a/kafka-impl/conf/kop_standalone.conf b/kafka-impl/conf/kop_standalone.conf index b9b58ca777..21e7cfb9d9 100644 --- a/kafka-impl/conf/kop_standalone.conf +++ b/kafka-impl/conf/kop_standalone.conf @@ -71,6 +71,16 @@ offsetsTopicNumPartitions=8 # Maximum number of entries that are read from cursor once per time maxReadEntriesNum=1 +# The format of an entry. The default value is pulsar. +# Optional values: [pulsar] +# +# pulsar: +# When KoP receives messages from kafka producer, it will serialize these messages to +# the format so that pulsar consumer can read directly. +# When KoP sends entries to kafka consumer, it will treat each entry as pulsar's +# format and deserialize each entry to kafka's format. +entry.format=pulsar + ### --- KoP SSL configs--- ### # Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 2c40bc457f..2368322bad 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -29,6 +29,8 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator; import io.streamnative.pulsar.handlers.kop.utils.CoreUtils; @@ -157,6 +159,8 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final int sslPort; private final int defaultNumPartitions; public final int maxReadEntriesNum; + @Getter + private final EntryFormatter entryFormatter; private final Map pendingProduceQueueMap = new ConcurrentHashMap<>(); @@ -184,6 +188,7 @@ public KafkaRequestHandler(PulsarService pulsarService, this.topicManager = new KafkaTopicManager(this); this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions(); this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum(); + this.entryFormatter = EntryFormatterFactory.create(kafkaConfig.getEntryFormat()); } @Override @@ -600,8 +605,8 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, MemoryRecords records = (MemoryRecords) entry.getValue(); String fullPartitionName = KopTopic.toString(topicPartition); - PendingProduce pendingProduce = - new PendingProduce(partitionResponse, topicManager, fullPartitionName, records, executor); + PendingProduce pendingProduce = new PendingProduce(partitionResponse, topicManager, fullPartitionName, + entryFormatter, records, executor); PendingProduceQueue queue = pendingProduceQueueMap.computeIfAbsent(topicPartition, ignored -> new PendingProduceQueue()); queue.add(pendingProduce); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 40bfbffe0f..ec561fbfb0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -249,4 +249,9 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private int maxReadEntriesNum = 5; + @FieldContext( + category = CATEGORY_KOP, + doc = "The format of an entry. Default: pulsar. Optional: [pulsar]" + ) + private String entryFormat = "pulsar"; } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index 9ba41c5892..f65d8d8757 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -13,7 +13,6 @@ */ package io.streamnative.pulsar.handlers.kop; -import static io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils.entriesToRecords; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; import com.google.common.collect.Lists; @@ -322,9 +321,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, } else if (apiVersion <= 3) { magic = RecordBatch.MAGIC_VALUE_V1; } - MemoryRecords records; - // by default kafka is produced message in batched mode. - records = entriesToRecords(entries, magic); + final MemoryRecords records = requestHandler.getEntryFormatter().decode(entries, magic); partitionData = new FetchResponse.PartitionData( Errors.NONE, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java index 068de28dfa..0b03d93814 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java @@ -14,14 +14,13 @@ package io.streamnative.pulsar.handlers.kop; import io.netty.buffer.ByteBuf; -import io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -42,12 +41,13 @@ public class PendingProduce { public PendingProduce(CompletableFuture responseFuture, KafkaTopicManager topicManager, String partitionName, + EntryFormatter entryFormatter, MemoryRecords memoryRecords, ExecutorService executor) { this.responseFuture = responseFuture; this.topicManager = topicManager; this.partitionName = partitionName; - this.numMessages = parseNumMessages(memoryRecords); + this.numMessages = EntryFormatter.parseNumMessages(memoryRecords); this.topicFuture = topicManager.getTopic(partitionName).exceptionally(e -> { log.error("Failed to getTopic for partition '{}': {}", partitionName, e); @@ -58,10 +58,7 @@ public PendingProduce(CompletableFuture responseFuture, log.error("Failed to compute ByteBuf for partition '{}': {}", partitionName, e); return null; }); - executor.execute(() -> { - ByteBuf byteBuf = MessageRecordUtils.recordsToByteBuf(memoryRecords, this.numMessages); - this.byteBufFuture.complete(byteBuf); - }); + executor.execute(() -> byteBufFuture.complete(entryFormatter.encode(memoryRecords, numMessages))); this.offsetFuture = new CompletableFuture<>(); } @@ -119,12 +116,4 @@ public void publishMessages() { byteBuf.release(); }); } - - private static int parseNumMessages(MemoryRecords records) { - int n = 0; - for (Record ignored : records.records()) { - n++; - } - return n; - } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java new file mode 100644 index 0000000000..76b4afda7e --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java @@ -0,0 +1,63 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; + +import io.netty.buffer.ByteBuf; +import java.util.List; + +import org.apache.bookkeeper.mledger.Entry; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; + + +/** + * The formatter for conversion between Kafka records and Bookie entries. + */ +public interface EntryFormatter { + + /** + * Encode Kafka records to a ByteBuf. + * + * @param records messages with Kafka's format + * @param numMessages the number of messages + * @return the ByteBuf of an entry that is to be written to Bookie + */ + ByteBuf encode(final MemoryRecords records, final int numMessages); + + /** + * Decode a stream of entries to Kafka records. + * + * @param entries the list of entries + * @param magic the Kafka record batch's magic value + * @return the Kafka records + */ + MemoryRecords decode(final List entries, final byte magic); + + /** + * Get the number of messages from MemoryRecords. + * Since MemoryRecords doesn't provide a way to get the number of messages. We need to iterate over the whole + * MemoryRecords object. So we use a helper method to get the number of messages that can be passed to + * {@link EntryFormatter#encode(MemoryRecords, int)} and metrics related methods as well. + * + * @param records messages with Kafka's format + * @return the number of messages + */ + static int parseNumMessages(final MemoryRecords records) { + int numMessages = 0; + for (MutableRecordBatch batch : records.batches()) { + numMessages += (batch.lastOffset() - batch.baseOffset() + 1); + } + return numMessages; + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java new file mode 100644 index 0000000000..4dba4b7999 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java @@ -0,0 +1,40 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; + +/** + * Factory of EntryFormatter. + * + * @see EntryFormatter + */ +public class EntryFormatterFactory { + + enum EntryFormat { + PULSAR + } + + public static EntryFormatter create(final String format) { + try { + EntryFormat entryFormat = Enum.valueOf(EntryFormat.class, format.toUpperCase()); + switch (entryFormat) { + case PULSAR: + return new PulsarEntryFormatter(); + default: + throw new Exception("No EntryFormatter for " + entryFormat); + } + } catch (Exception e) { + throw new IllegalArgumentException("Unsupported entry.format '" + format + "': " + e.getMessage()); + } + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java similarity index 50% rename from kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java rename to kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java index cd4ec237ad..7a5117841b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java @@ -11,25 +11,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.streamnative.pulsar.handlers.kop.utils; +package io.streamnative.pulsar.handlers.kop.format; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; +import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils; +import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; import java.io.IOException; import java.io.UncheckedIOException; -import java.nio.ByteBuffer; -import java.time.Clock; -import java.util.Base64; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.stream.IntStream; import java.util.stream.StreamSupport; -import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.record.MemoryRecords; @@ -38,8 +35,6 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; @@ -55,103 +50,18 @@ /** - * Pulsar Message and Kafka Record utils. + * The entry formatter that uses Pulsar's format. */ -@UtilityClass @Slf4j -public final class MessageRecordUtils { +public class PulsarEntryFormatter implements EntryFormatter { private static final int DEFAULT_FETCH_BUFFER_SIZE = 1024 * 1024; private static final int MAX_RECORDS_BUFFER_SIZE = 100 * 1024 * 1024; - private static final String FAKE_KOP_PRODUCER_NAME = "fake_kop_producer_name"; - - private static final Clock clock = Clock.systemDefaultZone(); - - // convert kafka Record to Pulsar Message. - // called when publish received Kafka Record into Pulsar. - public static MessageImpl recordToEntry(Record record) { - @SuppressWarnings("unchecked") - TypedMessageBuilderImpl builder = new TypedMessageBuilderImpl(null, Schema.BYTES); - - // key - if (record.hasKey()) { - byte[] key = new byte[record.keySize()]; - record.key().get(key); - builder.keyBytes(key); - // reuse ordering key to avoid converting string < > bytes - builder.orderingKey(key); - } - - // value - if (record.hasValue()) { - byte[] value = new byte[record.valueSize()]; - record.value().get(value); - builder.value(value); - } else { - builder.value(new byte[0]); - } - - // sequence - if (record.sequence() >= 0) { - builder.sequenceId(record.sequence()); - } - - // timestamp - if (record.timestamp() >= 0) { - builder.eventTime(record.timestamp()); - builder.getMetadataBuilder().setPublishTime(record.timestamp()); - } else { - builder.getMetadataBuilder().setPublishTime(System.currentTimeMillis()); - } - - // header - for (Header h : record.headers()) { - builder.property(h.key(), - new String(h.value(), UTF_8)); - } - - return (MessageImpl) builder.getMessage(); - } - - // convert message to ByteBuf payload for ledger.addEntry. - // parameter message is converted from passed in Kafka record. - // called when publish received Kafka Record into Pulsar. - public static ByteBuf messageToByteBuf(Message message) { - checkArgument(message instanceof MessageImpl); - - MessageImpl msg = (MessageImpl) message; - MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder(); - ByteBuf payload = msg.getDataBuffer(); - - // filled in required fields - if (!msgMetadataBuilder.hasSequenceId()) { - msgMetadataBuilder.setSequenceId(-1); - } - if (!msgMetadataBuilder.hasPublishTime()) { - msgMetadataBuilder.setPublishTime(clock.millis()); - } - if (!msgMetadataBuilder.hasProducerName()) { - msgMetadataBuilder.setProducerName(FAKE_KOP_PRODUCER_NAME); - } - - msgMetadataBuilder.setCompression( - CompressionCodecProvider.convertToWireProtocol(CompressionType.NONE)); - msgMetadataBuilder.setUncompressedSize(payload.readableBytes()); - MessageMetadata msgMetadata = msgMetadataBuilder.build(); - - ByteBuf buf = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload); - - msgMetadataBuilder.recycle(); - msgMetadata.recycle(); - - return buf; - } - //// for Batch messages - protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024; - protected static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024; + private static final int INITIAL_BATCH_BUFFER_SIZE = 1024; + private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024; - // If records stored in a batched way, turn MemoryRecords into a pulsar batched message. - public static ByteBuf recordsToByteBuf(MemoryRecords records, int size) { + @Override + public ByteBuf encode(final MemoryRecords records, final int numMessages) { long currentBatchSizeBytes = 0; int numMessagesInBatch = 0; @@ -160,8 +70,8 @@ public static ByteBuf recordsToByteBuf(MemoryRecords records, int size) { PulsarApi.CompressionType compressionType = PulsarApi.CompressionType.NONE; ByteBuf batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT - .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); - List> messages = Lists.newArrayListWithExpectedSize(size); + .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); + List> messages = Lists.newArrayListWithExpectedSize(numMessages); MessageMetadata.Builder messageMetaBuilder = MessageMetadata.newBuilder(); StreamSupport.stream(records.records().spliterator(), true).forEachOrdered(record -> { @@ -188,106 +98,6 @@ public static ByteBuf recordsToByteBuf(MemoryRecords records, int size) { msgBuilder.recycle(); } -// Iterator iterator = records.records().iterator(); -// while (iterator.hasNext()) { -// MessageImpl message = recordToEntry(iterator.next()); -// if (++numMessagesInBatch == 1) { -// sequenceId = Commands.initBatchMessageMetadata(messageMetaBuilder, message.getMessageBuilder()); -// } -// messages.add(message); -// currentBatchSizeBytes += message.getDataBuffer().readableBytes(); -// -// if (log.isDebugEnabled()) { -// log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", -// sequenceId, numMessagesInBatch, currentBatchSizeBytes); -// } -// } - -// for (MessageImpl msg : messages) { -// PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder(); -// batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, -// msg.getDataBuffer(), batchedMessageMetadataAndPayload); -// msgBuilder.recycle(); -// } - int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); - - if (PulsarApi.CompressionType.NONE != compressionType) { - messageMetaBuilder.setCompression(compressionType); - messageMetaBuilder.setUncompressedSize(uncompressedSize); - } - - messageMetaBuilder.setNumMessagesInBatch(numMessagesInBatch); - - MessageMetadata msgMetadata = messageMetaBuilder.build(); - - ByteBuf buf = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, - msgMetadata, - batchedMessageMetadataAndPayload); - - messageMetaBuilder.recycle(); - msgMetadata.recycle(); - batchedMessageMetadataAndPayload.release(); - - return buf; - } - - public static void recordsToByteBuf(MemoryRecords records, int size, - CompletableFuture transFuture) { - long currentBatchSizeBytes = 0; - int numMessagesInBatch = 0; - - long sequenceId = -1; - - // TODO: handle different compression type - PulsarApi.CompressionType compressionType = PulsarApi.CompressionType.NONE; - - ByteBuf batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT - .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); - List> messages = Lists.newArrayListWithExpectedSize(size); - MessageMetadata.Builder messageMetaBuilder = MessageMetadata.newBuilder(); - - StreamSupport.stream(records.records().spliterator(), true).forEachOrdered(record -> { - MessageImpl message = recordToEntry(record); - messages.add(message); - }); - - for (MessageImpl message : messages) { - if (++numMessagesInBatch == 1) { - sequenceId = Commands.initBatchMessageMetadata(messageMetaBuilder, message.getMessageBuilder()); - } - currentBatchSizeBytes += message.getDataBuffer().readableBytes(); - if (log.isDebugEnabled()) { - log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", - sequenceId, numMessagesInBatch, currentBatchSizeBytes); - } - - PulsarApi.MessageMetadata.Builder msgBuilder = message.getMessageBuilder(); - batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, - message.getDataBuffer(), batchedMessageMetadataAndPayload); - msgBuilder.recycle(); - } - -// Iterator iterator = records.records().iterator(); -// while (iterator.hasNext()) { -// MessageImpl message = recordToEntry(iterator.next()); -// if (++numMessagesInBatch == 1) { -// sequenceId = Commands.initBatchMessageMetadata(messageMetaBuilder, message.getMessageBuilder()); -// } -// messages.add(message); -// currentBatchSizeBytes += message.getDataBuffer().readableBytes(); -// -// if (log.isDebugEnabled()) { -// log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", -// sequenceId, numMessagesInBatch, currentBatchSizeBytes); -// } -// } - -// for (MessageImpl msg : messages) { -// PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder(); -// batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, -// msg.getDataBuffer(), batchedMessageMetadataAndPayload); -// msgBuilder.recycle(); -// } int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); if (PulsarApi.CompressionType.NONE != compressionType) { @@ -307,47 +117,24 @@ public static void recordsToByteBuf(MemoryRecords records, int size, msgMetadata.recycle(); batchedMessageMetadataAndPayload.release(); - transFuture.complete(buf); - } - - private static Header[] getHeadersFromMetadata(List properties) { - Header[] headers = new Header[properties.size()]; - - if (log.isDebugEnabled()) { - log.debug("getHeadersFromMetadata. Header size: {}", - properties.size()); - } - - int index = 0; - for (KeyValue kv: properties) { - headers[index] = new RecordHeader(kv.getKey(), kv.getValue().getBytes(UTF_8)); - - if (log.isDebugEnabled()) { - log.debug("index: {} kv.getKey: {}. kv.getValue: {}", - index, kv.getKey(), kv.getValue()); - } - index++; - } - - return headers; + return buf; } - // Convert entries read from BookKeeper into Kafka Records - // Entries can be batched messages, may need un-batch. - public static MemoryRecords entriesToRecords(List entries, byte magic) { + @Override + public MemoryRecords decode(final List entries, final byte magic) { try (ByteBufferOutputStream outputStream = new ByteBufferOutputStream(DEFAULT_FETCH_BUFFER_SIZE)) { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, magic, - org.apache.kafka.common.record.CompressionType.NONE, - TimestampType.CREATE_TIME, - // using the first entry, index 0 as base offset - MessageIdUtils.getOffset(entries.get(0).getLedgerId(), entries.get(0).getEntryId(), 0), - RecordBatch.NO_TIMESTAMP, - RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, - false, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - MAX_RECORDS_BUFFER_SIZE); + org.apache.kafka.common.record.CompressionType.NONE, + TimestampType.CREATE_TIME, + // using the first entry, index 0 as base offset + MessageIdUtils.getOffset(entries.get(0).getLedgerId(), entries.get(0).getEntryId(), 0), + RecordBatch.NO_TIMESTAMP, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + false, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + MAX_RECORDS_BUFFER_SIZE); entries.parallelStream().forEachOrdered(entry -> { // each entry is a batched message @@ -369,9 +156,9 @@ public static MemoryRecords entriesToRecords(List 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), - getKeyByteBuffer(singleMessageMetadata), - getNioBuffer(singleMessagePayload), + ByteBufUtils.getKeyByteBuffer(singleMessageMetadata), + ByteBufUtils.getNioBuffer(singleMessagePayload), headers); singleMessagePayload.release(); singleMessageMetadataBuilder.recycle(); @@ -409,11 +196,11 @@ public static MemoryRecords entriesToRecords(List 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), - getKeyByteBuffer(msgMetadata), - getNioBuffer(payload), - headers); + MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()), + msgMetadata.getEventTime() > 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), + ByteBufUtils.getKeyByteBuffer(msgMetadata), + ByteBufUtils.getNioBuffer(payload), + headers); } msgMetadata.recycle(); @@ -430,41 +217,74 @@ public static MemoryRecords entriesToRecords(List recordToEntry(Record record) { + @SuppressWarnings("unchecked") + TypedMessageBuilderImpl builder = new TypedMessageBuilderImpl(null, Schema.BYTES); + + // key + if (record.hasKey()) { + byte[] key = new byte[record.keySize()]; + record.key().get(key); + builder.keyBytes(key); + // reuse ordering key to avoid converting string < > bytes + builder.orderingKey(key); } - String key = messageMetadata.getPartitionKey(); - if (messageMetadata.hasPartitionKeyB64Encoded()) { - return ByteBuffer.wrap(Base64.getDecoder().decode(key)); + // value + if (record.hasValue()) { + byte[] value = new byte[record.valueSize()]; + record.value().get(value); + builder.value(value); } else { - // for Base64 not encoded string, convert to UTF_8 chars - return ByteBuffer.wrap(key.getBytes(UTF_8)); + builder.value(new byte[0]); } - } - private static ByteBuffer getKeyByteBuffer(SingleMessageMetadata messageMetadata) { - if (messageMetadata.hasOrderingKey()) { - return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); + // sequence + if (record.sequence() >= 0) { + builder.sequenceId(record.sequence()); } - String key = messageMetadata.getPartitionKey(); - if (messageMetadata.hasPartitionKeyB64Encoded()) { - return ByteBuffer.wrap(Base64.getDecoder().decode(key)); + // timestamp + if (record.timestamp() >= 0) { + builder.eventTime(record.timestamp()); + builder.getMetadataBuilder().setPublishTime(record.timestamp()); } else { - // for Base64 not encoded string, convert to UTF_8 chars - return ByteBuffer.wrap(key.getBytes(UTF_8)); + builder.getMetadataBuilder().setPublishTime(System.currentTimeMillis()); + } + + // header + for (Header h : record.headers()) { + builder.property(h.key(), + new String(h.value(), UTF_8)); } + + return (MessageImpl) builder.getMessage(); } - public static ByteBuffer getNioBuffer(ByteBuf buffer) { - if (buffer.isDirect()) { - return buffer.nioBuffer(); + private Header[] getHeadersFromMetadata(List properties) { + Header[] headers = new Header[properties.size()]; + + if (log.isDebugEnabled()) { + log.debug("getHeadersFromMetadata. Header size: {}", + properties.size()); } - final byte[] bytes = new byte[buffer.readableBytes()]; - buffer.getBytes(buffer.readerIndex(), bytes); - return ByteBuffer.wrap(bytes); + + int index = 0; + for (KeyValue kv: properties) { + headers[index] = new RecordHeader(kv.getKey(), kv.getValue().getBytes(UTF_8)); + + if (log.isDebugEnabled()) { + log.debug("index: {} kv.getKey: {}. kv.getValue: {}", + index, kv.getKey(), kv.getValue()); + } + index++; + } + + return headers; } + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java new file mode 100644 index 0000000000..16e3703426 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java @@ -0,0 +1,14 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java new file mode 100644 index 0000000000..0450ba8963 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java @@ -0,0 +1,65 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.utils; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import io.netty.buffer.ByteBuf; +import java.nio.ByteBuffer; +import java.util.Base64; +import org.apache.pulsar.common.api.proto.PulsarApi; + + +/** + * Utils for ByteBuf operations. + */ +public class ByteBufUtils { + + public static ByteBuffer getKeyByteBuffer(PulsarApi.SingleMessageMetadata messageMetadata) { + if (messageMetadata.hasOrderingKey()) { + return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); + } + + String key = messageMetadata.getPartitionKey(); + if (messageMetadata.hasPartitionKeyB64Encoded()) { + return ByteBuffer.wrap(Base64.getDecoder().decode(key)); + } else { + // for Base64 not encoded string, convert to UTF_8 chars + return ByteBuffer.wrap(key.getBytes(UTF_8)); + } + } + + public static ByteBuffer getKeyByteBuffer(PulsarApi.MessageMetadata messageMetadata) { + if (messageMetadata.hasOrderingKey()) { + return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); + } + + String key = messageMetadata.getPartitionKey(); + if (messageMetadata.hasPartitionKeyB64Encoded()) { + return ByteBuffer.wrap(Base64.getDecoder().decode(key)); + } else { + // for Base64 not encoded string, convert to UTF_8 chars + return ByteBuffer.wrap(key.getBytes(UTF_8)); + } + } + + public static ByteBuffer getNioBuffer(ByteBuf buffer) { + if (buffer.isDirect()) { + return buffer.nioBuffer(); + } + final byte[] bytes = new byte[buffer.readableBytes()]; + buffer.getBytes(buffer.readerIndex(), bytes); + return ByteBuffer.wrap(bytes); + } +} diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EntryFormatterTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EntryFormatterTest.java new file mode 100644 index 0000000000..e6714ce89d --- /dev/null +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EntryFormatterTest.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import static org.testng.Assert.assertEquals; + +import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.stream.IntStream; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Tests for EntryFormatter. + */ +public class EntryFormatterTest { + + @DataProvider(name = "compressionTypes") + Object[] allCompressionTypes() { + return Arrays.stream(CompressionType.values()).map(x -> (Object) x).toArray(); + } + + @Test(dataProvider = "compressionTypes") + public void testParseNumMessages(CompressionType compressionType) { + ByteBuffer buffer = ByteBuffer.allocate(1024); + int[] batchSizes = {3, 2, 5}; + + int baseOffset = 0; + for (int batchSize : batchSizes) { + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, + compressionType, TimestampType.LOG_APPEND_TIME, baseOffset); + for (int i = 0; i < batchSize; i++) { + builder.append(0L, "a".getBytes(), "1".getBytes()); + } + baseOffset += batchSize; + // Normally the offsets of batches are continuous, here we add an extra interval just for robustness. + baseOffset += 1; + builder.close(); + } + + buffer.flip(); + final MemoryRecords records = MemoryRecords.readableRecords(buffer); + assertEquals(EntryFormatter.parseNumMessages(records), IntStream.of(batchSizes).sum()); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index 081856bee0..0c9ca18013 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -493,7 +493,7 @@ private void produceData(KafkaProducer producer, @Test(timeOut = 20000) public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { String topicName = "kopBrokerRespectsPartitionsOrderAndSizeLimits"; - int numberTopics = 5; + int numberTopics = 8; int numberPartitions = 6; int messagesPerPartition = 9;