diff --git a/kafka-impl/conf/kop.conf b/kafka-impl/conf/kop.conf index a8ea392735..a5bb1c4d04 100755 --- a/kafka-impl/conf/kop.conf +++ b/kafka-impl/conf/kop.conf @@ -71,6 +71,16 @@ offsetsTopicNumPartitions=8 # Maximum number of entries that are read from cursor once per time maxReadEntriesNum=5 +# The format of an entry. The default value is pulsar. +# Optional values: [pulsar] +# +# pulsar: +# When KoP receives messages from kafka producer, it will serialize these messages to +# the format so that pulsar consumer can read directly. +# When KoP sends entries to kafka consumer, it will treat each entry as pulsar's +# format and deserialize each entry to kafka's format. +entry.format=pulsar + ### --- KoP SSL configs--- ### # Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol diff --git a/kafka-impl/conf/kop_standalone.conf b/kafka-impl/conf/kop_standalone.conf index b9b58ca777..21e7cfb9d9 100644 --- a/kafka-impl/conf/kop_standalone.conf +++ b/kafka-impl/conf/kop_standalone.conf @@ -71,6 +71,16 @@ offsetsTopicNumPartitions=8 # Maximum number of entries that are read from cursor once per time maxReadEntriesNum=1 +# The format of an entry. The default value is pulsar. +# Optional values: [pulsar] +# +# pulsar: +# When KoP receives messages from kafka producer, it will serialize these messages to +# the format so that pulsar consumer can read directly. +# When KoP sends entries to kafka consumer, it will treat each entry as pulsar's +# format and deserialize each entry to kafka's format. +entry.format=pulsar + ### --- KoP SSL configs--- ### # Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 2c40bc457f..2368322bad 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -29,6 +29,8 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator; import io.streamnative.pulsar.handlers.kop.utils.CoreUtils; @@ -157,6 +159,8 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final int sslPort; private final int defaultNumPartitions; public final int maxReadEntriesNum; + @Getter + private final EntryFormatter entryFormatter; private final Map pendingProduceQueueMap = new ConcurrentHashMap<>(); @@ -184,6 +188,7 @@ public KafkaRequestHandler(PulsarService pulsarService, this.topicManager = new KafkaTopicManager(this); this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions(); this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum(); + this.entryFormatter = EntryFormatterFactory.create(kafkaConfig.getEntryFormat()); } @Override @@ -600,8 +605,8 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, MemoryRecords records = (MemoryRecords) entry.getValue(); String fullPartitionName = KopTopic.toString(topicPartition); - PendingProduce pendingProduce = - new PendingProduce(partitionResponse, topicManager, fullPartitionName, records, executor); + PendingProduce pendingProduce = new PendingProduce(partitionResponse, topicManager, fullPartitionName, + entryFormatter, records, executor); PendingProduceQueue queue = pendingProduceQueueMap.computeIfAbsent(topicPartition, ignored -> new PendingProduceQueue()); queue.add(pendingProduce); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 40bfbffe0f..ec561fbfb0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -249,4 +249,9 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private int maxReadEntriesNum = 5; + @FieldContext( + category = CATEGORY_KOP, + doc = "The format of an entry. Default: pulsar. Optional: [pulsar]" + ) + private String entryFormat = "pulsar"; } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index 9ba41c5892..f65d8d8757 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -13,7 +13,6 @@ */ package io.streamnative.pulsar.handlers.kop; -import static io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils.entriesToRecords; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; import com.google.common.collect.Lists; @@ -322,9 +321,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, } else if (apiVersion <= 3) { magic = RecordBatch.MAGIC_VALUE_V1; } - MemoryRecords records; - // by default kafka is produced message in batched mode. - records = entriesToRecords(entries, magic); + final MemoryRecords records = requestHandler.getEntryFormatter().decode(entries, magic); partitionData = new FetchResponse.PartitionData( Errors.NONE, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java index 068de28dfa..0b03d93814 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java @@ -14,14 +14,13 @@ package io.streamnative.pulsar.handlers.kop; import io.netty.buffer.ByteBuf; -import io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -42,12 +41,13 @@ public class PendingProduce { public PendingProduce(CompletableFuture responseFuture, KafkaTopicManager topicManager, String partitionName, + EntryFormatter entryFormatter, MemoryRecords memoryRecords, ExecutorService executor) { this.responseFuture = responseFuture; this.topicManager = topicManager; this.partitionName = partitionName; - this.numMessages = parseNumMessages(memoryRecords); + this.numMessages = EntryFormatter.parseNumMessages(memoryRecords); this.topicFuture = topicManager.getTopic(partitionName).exceptionally(e -> { log.error("Failed to getTopic for partition '{}': {}", partitionName, e); @@ -58,10 +58,7 @@ public PendingProduce(CompletableFuture responseFuture, log.error("Failed to compute ByteBuf for partition '{}': {}", partitionName, e); return null; }); - executor.execute(() -> { - ByteBuf byteBuf = MessageRecordUtils.recordsToByteBuf(memoryRecords, this.numMessages); - this.byteBufFuture.complete(byteBuf); - }); + executor.execute(() -> byteBufFuture.complete(entryFormatter.encode(memoryRecords, numMessages))); this.offsetFuture = new CompletableFuture<>(); } @@ -119,12 +116,4 @@ public void publishMessages() { byteBuf.release(); }); } - - private static int parseNumMessages(MemoryRecords records) { - int n = 0; - for (Record ignored : records.records()) { - n++; - } - return n; - } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java new file mode 100644 index 0000000000..76b4afda7e --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java @@ -0,0 +1,63 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; + +import io.netty.buffer.ByteBuf; +import java.util.List; + +import org.apache.bookkeeper.mledger.Entry; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; + + +/** + * The formatter for conversion between Kafka records and Bookie entries. + */ +public interface EntryFormatter { + + /** + * Encode Kafka records to a ByteBuf. + * + * @param records messages with Kafka's format + * @param numMessages the number of messages + * @return the ByteBuf of an entry that is to be written to Bookie + */ + ByteBuf encode(final MemoryRecords records, final int numMessages); + + /** + * Decode a stream of entries to Kafka records. + * + * @param entries the list of entries + * @param magic the Kafka record batch's magic value + * @return the Kafka records + */ + MemoryRecords decode(final List entries, final byte magic); + + /** + * Get the number of messages from MemoryRecords. + * Since MemoryRecords doesn't provide a way to get the number of messages. We need to iterate over the whole + * MemoryRecords object. So we use a helper method to get the number of messages that can be passed to + * {@link EntryFormatter#encode(MemoryRecords, int)} and metrics related methods as well. + * + * @param records messages with Kafka's format + * @return the number of messages + */ + static int parseNumMessages(final MemoryRecords records) { + int numMessages = 0; + for (MutableRecordBatch batch : records.batches()) { + numMessages += (batch.lastOffset() - batch.baseOffset() + 1); + } + return numMessages; + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java new file mode 100644 index 0000000000..4dba4b7999 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java @@ -0,0 +1,40 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; + +/** + * Factory of EntryFormatter. + * + * @see EntryFormatter + */ +public class EntryFormatterFactory { + + enum EntryFormat { + PULSAR + } + + public static EntryFormatter create(final String format) { + try { + EntryFormat entryFormat = Enum.valueOf(EntryFormat.class, format.toUpperCase()); + switch (entryFormat) { + case PULSAR: + return new PulsarEntryFormatter(); + default: + throw new Exception("No EntryFormatter for " + entryFormat); + } + } catch (Exception e) { + throw new IllegalArgumentException("Unsupported entry.format '" + format + "': " + e.getMessage()); + } + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java similarity index 50% rename from kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java rename to kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java index cd4ec237ad..7a5117841b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java @@ -11,25 +11,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.streamnative.pulsar.handlers.kop.utils; +package io.streamnative.pulsar.handlers.kop.format; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; +import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils; +import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; import java.io.IOException; import java.io.UncheckedIOException; -import java.nio.ByteBuffer; -import java.time.Clock; -import java.util.Base64; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.stream.IntStream; import java.util.stream.StreamSupport; -import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.record.MemoryRecords; @@ -38,8 +35,6 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; @@ -55,103 +50,18 @@ /** - * Pulsar Message and Kafka Record utils. + * The entry formatter that uses Pulsar's format. */ -@UtilityClass @Slf4j -public final class MessageRecordUtils { +public class PulsarEntryFormatter implements EntryFormatter { private static final int DEFAULT_FETCH_BUFFER_SIZE = 1024 * 1024; private static final int MAX_RECORDS_BUFFER_SIZE = 100 * 1024 * 1024; - private static final String FAKE_KOP_PRODUCER_NAME = "fake_kop_producer_name"; - - private static final Clock clock = Clock.systemDefaultZone(); - - // convert kafka Record to Pulsar Message. - // called when publish received Kafka Record into Pulsar. - public static MessageImpl recordToEntry(Record record) { - @SuppressWarnings("unchecked") - TypedMessageBuilderImpl builder = new TypedMessageBuilderImpl(null, Schema.BYTES); - - // key - if (record.hasKey()) { - byte[] key = new byte[record.keySize()]; - record.key().get(key); - builder.keyBytes(key); - // reuse ordering key to avoid converting string < > bytes - builder.orderingKey(key); - } - - // value - if (record.hasValue()) { - byte[] value = new byte[record.valueSize()]; - record.value().get(value); - builder.value(value); - } else { - builder.value(new byte[0]); - } - - // sequence - if (record.sequence() >= 0) { - builder.sequenceId(record.sequence()); - } - - // timestamp - if (record.timestamp() >= 0) { - builder.eventTime(record.timestamp()); - builder.getMetadataBuilder().setPublishTime(record.timestamp()); - } else { - builder.getMetadataBuilder().setPublishTime(System.currentTimeMillis()); - } - - // header - for (Header h : record.headers()) { - builder.property(h.key(), - new String(h.value(), UTF_8)); - } - - return (MessageImpl) builder.getMessage(); - } - - // convert message to ByteBuf payload for ledger.addEntry. - // parameter message is converted from passed in Kafka record. - // called when publish received Kafka Record into Pulsar. - public static ByteBuf messageToByteBuf(Message message) { - checkArgument(message instanceof MessageImpl); - - MessageImpl msg = (MessageImpl) message; - MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder(); - ByteBuf payload = msg.getDataBuffer(); - - // filled in required fields - if (!msgMetadataBuilder.hasSequenceId()) { - msgMetadataBuilder.setSequenceId(-1); - } - if (!msgMetadataBuilder.hasPublishTime()) { - msgMetadataBuilder.setPublishTime(clock.millis()); - } - if (!msgMetadataBuilder.hasProducerName()) { - msgMetadataBuilder.setProducerName(FAKE_KOP_PRODUCER_NAME); - } - - msgMetadataBuilder.setCompression( - CompressionCodecProvider.convertToWireProtocol(CompressionType.NONE)); - msgMetadataBuilder.setUncompressedSize(payload.readableBytes()); - MessageMetadata msgMetadata = msgMetadataBuilder.build(); - - ByteBuf buf = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload); - - msgMetadataBuilder.recycle(); - msgMetadata.recycle(); - - return buf; - } - //// for Batch messages - protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024; - protected static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024; + private static final int INITIAL_BATCH_BUFFER_SIZE = 1024; + private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024; - // If records stored in a batched way, turn MemoryRecords into a pulsar batched message. - public static ByteBuf recordsToByteBuf(MemoryRecords records, int size) { + @Override + public ByteBuf encode(final MemoryRecords records, final int numMessages) { long currentBatchSizeBytes = 0; int numMessagesInBatch = 0; @@ -160,8 +70,8 @@ public static ByteBuf recordsToByteBuf(MemoryRecords records, int size) { PulsarApi.CompressionType compressionType = PulsarApi.CompressionType.NONE; ByteBuf batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT - .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); - List> messages = Lists.newArrayListWithExpectedSize(size); + .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); + List> messages = Lists.newArrayListWithExpectedSize(numMessages); MessageMetadata.Builder messageMetaBuilder = MessageMetadata.newBuilder(); StreamSupport.stream(records.records().spliterator(), true).forEachOrdered(record -> { @@ -188,106 +98,6 @@ public static ByteBuf recordsToByteBuf(MemoryRecords records, int size) { msgBuilder.recycle(); } -// Iterator iterator = records.records().iterator(); -// while (iterator.hasNext()) { -// MessageImpl message = recordToEntry(iterator.next()); -// if (++numMessagesInBatch == 1) { -// sequenceId = Commands.initBatchMessageMetadata(messageMetaBuilder, message.getMessageBuilder()); -// } -// messages.add(message); -// currentBatchSizeBytes += message.getDataBuffer().readableBytes(); -// -// if (log.isDebugEnabled()) { -// log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", -// sequenceId, numMessagesInBatch, currentBatchSizeBytes); -// } -// } - -// for (MessageImpl msg : messages) { -// PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder(); -// batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, -// msg.getDataBuffer(), batchedMessageMetadataAndPayload); -// msgBuilder.recycle(); -// } - int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); - - if (PulsarApi.CompressionType.NONE != compressionType) { - messageMetaBuilder.setCompression(compressionType); - messageMetaBuilder.setUncompressedSize(uncompressedSize); - } - - messageMetaBuilder.setNumMessagesInBatch(numMessagesInBatch); - - MessageMetadata msgMetadata = messageMetaBuilder.build(); - - ByteBuf buf = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, - msgMetadata, - batchedMessageMetadataAndPayload); - - messageMetaBuilder.recycle(); - msgMetadata.recycle(); - batchedMessageMetadataAndPayload.release(); - - return buf; - } - - public static void recordsToByteBuf(MemoryRecords records, int size, - CompletableFuture transFuture) { - long currentBatchSizeBytes = 0; - int numMessagesInBatch = 0; - - long sequenceId = -1; - - // TODO: handle different compression type - PulsarApi.CompressionType compressionType = PulsarApi.CompressionType.NONE; - - ByteBuf batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT - .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); - List> messages = Lists.newArrayListWithExpectedSize(size); - MessageMetadata.Builder messageMetaBuilder = MessageMetadata.newBuilder(); - - StreamSupport.stream(records.records().spliterator(), true).forEachOrdered(record -> { - MessageImpl message = recordToEntry(record); - messages.add(message); - }); - - for (MessageImpl message : messages) { - if (++numMessagesInBatch == 1) { - sequenceId = Commands.initBatchMessageMetadata(messageMetaBuilder, message.getMessageBuilder()); - } - currentBatchSizeBytes += message.getDataBuffer().readableBytes(); - if (log.isDebugEnabled()) { - log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", - sequenceId, numMessagesInBatch, currentBatchSizeBytes); - } - - PulsarApi.MessageMetadata.Builder msgBuilder = message.getMessageBuilder(); - batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, - message.getDataBuffer(), batchedMessageMetadataAndPayload); - msgBuilder.recycle(); - } - -// Iterator iterator = records.records().iterator(); -// while (iterator.hasNext()) { -// MessageImpl message = recordToEntry(iterator.next()); -// if (++numMessagesInBatch == 1) { -// sequenceId = Commands.initBatchMessageMetadata(messageMetaBuilder, message.getMessageBuilder()); -// } -// messages.add(message); -// currentBatchSizeBytes += message.getDataBuffer().readableBytes(); -// -// if (log.isDebugEnabled()) { -// log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", -// sequenceId, numMessagesInBatch, currentBatchSizeBytes); -// } -// } - -// for (MessageImpl msg : messages) { -// PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder(); -// batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, -// msg.getDataBuffer(), batchedMessageMetadataAndPayload); -// msgBuilder.recycle(); -// } int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); if (PulsarApi.CompressionType.NONE != compressionType) { @@ -307,47 +117,24 @@ public static void recordsToByteBuf(MemoryRecords records, int size, msgMetadata.recycle(); batchedMessageMetadataAndPayload.release(); - transFuture.complete(buf); - } - - private static Header[] getHeadersFromMetadata(List properties) { - Header[] headers = new Header[properties.size()]; - - if (log.isDebugEnabled()) { - log.debug("getHeadersFromMetadata. Header size: {}", - properties.size()); - } - - int index = 0; - for (KeyValue kv: properties) { - headers[index] = new RecordHeader(kv.getKey(), kv.getValue().getBytes(UTF_8)); - - if (log.isDebugEnabled()) { - log.debug("index: {} kv.getKey: {}. kv.getValue: {}", - index, kv.getKey(), kv.getValue()); - } - index++; - } - - return headers; + return buf; } - // Convert entries read from BookKeeper into Kafka Records - // Entries can be batched messages, may need un-batch. - public static MemoryRecords entriesToRecords(List entries, byte magic) { + @Override + public MemoryRecords decode(final List entries, final byte magic) { try (ByteBufferOutputStream outputStream = new ByteBufferOutputStream(DEFAULT_FETCH_BUFFER_SIZE)) { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, magic, - org.apache.kafka.common.record.CompressionType.NONE, - TimestampType.CREATE_TIME, - // using the first entry, index 0 as base offset - MessageIdUtils.getOffset(entries.get(0).getLedgerId(), entries.get(0).getEntryId(), 0), - RecordBatch.NO_TIMESTAMP, - RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, - false, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - MAX_RECORDS_BUFFER_SIZE); + org.apache.kafka.common.record.CompressionType.NONE, + TimestampType.CREATE_TIME, + // using the first entry, index 0 as base offset + MessageIdUtils.getOffset(entries.get(0).getLedgerId(), entries.get(0).getEntryId(), 0), + RecordBatch.NO_TIMESTAMP, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + false, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + MAX_RECORDS_BUFFER_SIZE); entries.parallelStream().forEachOrdered(entry -> { // each entry is a batched message @@ -369,9 +156,9 @@ public static MemoryRecords entriesToRecords(List 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), - getKeyByteBuffer(singleMessageMetadata), - getNioBuffer(singleMessagePayload), + ByteBufUtils.getKeyByteBuffer(singleMessageMetadata), + ByteBufUtils.getNioBuffer(singleMessagePayload), headers); singleMessagePayload.release(); singleMessageMetadataBuilder.recycle(); @@ -409,11 +196,11 @@ public static MemoryRecords entriesToRecords(List 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), - getKeyByteBuffer(msgMetadata), - getNioBuffer(payload), - headers); + MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()), + msgMetadata.getEventTime() > 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), + ByteBufUtils.getKeyByteBuffer(msgMetadata), + ByteBufUtils.getNioBuffer(payload), + headers); } msgMetadata.recycle(); @@ -430,41 +217,74 @@ public static MemoryRecords entriesToRecords(List recordToEntry(Record record) { + @SuppressWarnings("unchecked") + TypedMessageBuilderImpl builder = new TypedMessageBuilderImpl(null, Schema.BYTES); + + // key + if (record.hasKey()) { + byte[] key = new byte[record.keySize()]; + record.key().get(key); + builder.keyBytes(key); + // reuse ordering key to avoid converting string < > bytes + builder.orderingKey(key); } - String key = messageMetadata.getPartitionKey(); - if (messageMetadata.hasPartitionKeyB64Encoded()) { - return ByteBuffer.wrap(Base64.getDecoder().decode(key)); + // value + if (record.hasValue()) { + byte[] value = new byte[record.valueSize()]; + record.value().get(value); + builder.value(value); } else { - // for Base64 not encoded string, convert to UTF_8 chars - return ByteBuffer.wrap(key.getBytes(UTF_8)); + builder.value(new byte[0]); } - } - private static ByteBuffer getKeyByteBuffer(SingleMessageMetadata messageMetadata) { - if (messageMetadata.hasOrderingKey()) { - return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); + // sequence + if (record.sequence() >= 0) { + builder.sequenceId(record.sequence()); } - String key = messageMetadata.getPartitionKey(); - if (messageMetadata.hasPartitionKeyB64Encoded()) { - return ByteBuffer.wrap(Base64.getDecoder().decode(key)); + // timestamp + if (record.timestamp() >= 0) { + builder.eventTime(record.timestamp()); + builder.getMetadataBuilder().setPublishTime(record.timestamp()); } else { - // for Base64 not encoded string, convert to UTF_8 chars - return ByteBuffer.wrap(key.getBytes(UTF_8)); + builder.getMetadataBuilder().setPublishTime(System.currentTimeMillis()); + } + + // header + for (Header h : record.headers()) { + builder.property(h.key(), + new String(h.value(), UTF_8)); } + + return (MessageImpl) builder.getMessage(); } - public static ByteBuffer getNioBuffer(ByteBuf buffer) { - if (buffer.isDirect()) { - return buffer.nioBuffer(); + private Header[] getHeadersFromMetadata(List properties) { + Header[] headers = new Header[properties.size()]; + + if (log.isDebugEnabled()) { + log.debug("getHeadersFromMetadata. Header size: {}", + properties.size()); } - final byte[] bytes = new byte[buffer.readableBytes()]; - buffer.getBytes(buffer.readerIndex(), bytes); - return ByteBuffer.wrap(bytes); + + int index = 0; + for (KeyValue kv: properties) { + headers[index] = new RecordHeader(kv.getKey(), kv.getValue().getBytes(UTF_8)); + + if (log.isDebugEnabled()) { + log.debug("index: {} kv.getKey: {}. kv.getValue: {}", + index, kv.getKey(), kv.getValue()); + } + index++; + } + + return headers; } + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java new file mode 100644 index 0000000000..16e3703426 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java @@ -0,0 +1,14 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java new file mode 100644 index 0000000000..0450ba8963 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java @@ -0,0 +1,65 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.utils; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import io.netty.buffer.ByteBuf; +import java.nio.ByteBuffer; +import java.util.Base64; +import org.apache.pulsar.common.api.proto.PulsarApi; + + +/** + * Utils for ByteBuf operations. + */ +public class ByteBufUtils { + + public static ByteBuffer getKeyByteBuffer(PulsarApi.SingleMessageMetadata messageMetadata) { + if (messageMetadata.hasOrderingKey()) { + return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); + } + + String key = messageMetadata.getPartitionKey(); + if (messageMetadata.hasPartitionKeyB64Encoded()) { + return ByteBuffer.wrap(Base64.getDecoder().decode(key)); + } else { + // for Base64 not encoded string, convert to UTF_8 chars + return ByteBuffer.wrap(key.getBytes(UTF_8)); + } + } + + public static ByteBuffer getKeyByteBuffer(PulsarApi.MessageMetadata messageMetadata) { + if (messageMetadata.hasOrderingKey()) { + return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); + } + + String key = messageMetadata.getPartitionKey(); + if (messageMetadata.hasPartitionKeyB64Encoded()) { + return ByteBuffer.wrap(Base64.getDecoder().decode(key)); + } else { + // for Base64 not encoded string, convert to UTF_8 chars + return ByteBuffer.wrap(key.getBytes(UTF_8)); + } + } + + public static ByteBuffer getNioBuffer(ByteBuf buffer) { + if (buffer.isDirect()) { + return buffer.nioBuffer(); + } + final byte[] bytes = new byte[buffer.readableBytes()]; + buffer.getBytes(buffer.readerIndex(), bytes); + return ByteBuffer.wrap(bytes); + } +} diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EntryFormatterTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EntryFormatterTest.java new file mode 100644 index 0000000000..e6714ce89d --- /dev/null +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EntryFormatterTest.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import static org.testng.Assert.assertEquals; + +import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.stream.IntStream; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Tests for EntryFormatter. + */ +public class EntryFormatterTest { + + @DataProvider(name = "compressionTypes") + Object[] allCompressionTypes() { + return Arrays.stream(CompressionType.values()).map(x -> (Object) x).toArray(); + } + + @Test(dataProvider = "compressionTypes") + public void testParseNumMessages(CompressionType compressionType) { + ByteBuffer buffer = ByteBuffer.allocate(1024); + int[] batchSizes = {3, 2, 5}; + + int baseOffset = 0; + for (int batchSize : batchSizes) { + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, + compressionType, TimestampType.LOG_APPEND_TIME, baseOffset); + for (int i = 0; i < batchSize; i++) { + builder.append(0L, "a".getBytes(), "1".getBytes()); + } + baseOffset += batchSize; + // Normally the offsets of batches are continuous, here we add an extra interval just for robustness. + baseOffset += 1; + builder.close(); + } + + buffer.flip(); + final MemoryRecords records = MemoryRecords.readableRecords(buffer); + assertEquals(EntryFormatter.parseNumMessages(records), IntStream.of(batchSizes).sum()); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index 081856bee0..0c9ca18013 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -493,7 +493,7 @@ private void produceData(KafkaProducer producer, @Test(timeOut = 20000) public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { String topicName = "kopBrokerRespectsPartitionsOrderAndSizeLimits"; - int numberTopics = 5; + int numberTopics = 8; int numberPartitions = 6; int messagesPerPartition = 9;