Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Change EntryFormatter#encode API
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Dec 22, 2020
1 parent 74cdfe4 commit 0780706
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;

Expand All @@ -48,7 +47,7 @@ public PendingProduce(CompletableFuture<PartitionResponse> responseFuture,
this.responseFuture = responseFuture;
this.topicManager = topicManager;
this.partitionName = partitionName;
this.numMessages = parseNumMessages(memoryRecords);
this.numMessages = EntryFormatter.parseNumMessages(memoryRecords);

this.topicFuture = topicManager.getTopic(partitionName).exceptionally(e -> {
log.error("Failed to getTopic for partition '{}': {}", partitionName, e);
Expand All @@ -59,7 +58,7 @@ public PendingProduce(CompletableFuture<PartitionResponse> responseFuture,
log.error("Failed to compute ByteBuf for partition '{}': {}", partitionName, e);
return null;
});
executor.execute(() -> this.byteBufFuture.complete(entryFormatter.encode(memoryRecords)));
executor.execute(() -> byteBufFuture.complete(entryFormatter.encode(memoryRecords, numMessages)));
this.offsetFuture = new CompletableFuture<>();
}

Expand Down Expand Up @@ -117,12 +116,4 @@ public void publishMessages() {
byteBuf.release();
});
}

private static int parseNumMessages(MemoryRecords records) {
int n = 0;
for (Record ignored : records.records()) {
n++;
}
return n;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.Optional;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;


/**
Expand All @@ -28,9 +31,10 @@ public interface EntryFormatter {
* Encode Kafka records to a ByteBuf.
*
* @param records messages with Kafka's format
* @param numMessages the number of messages
* @return the ByteBuf of an entry that is to be written to Bookie
*/
ByteBuf encode(MemoryRecords records);
ByteBuf encode(final MemoryRecords records, final int numMessages);

/**
* Decode a stream of entries to Kafka records.
Expand All @@ -39,5 +43,23 @@ public interface EntryFormatter {
* @param magic the Kafka record batch's magic value
* @return the Kafka records
*/
MemoryRecords decode(List<Entry> entries, byte magic);
MemoryRecords decode(final List<Entry> entries, final byte magic);

/**
* Get the number of messages from MemoryRecords.
* Since MemoryRecords doesn't provide a way to get the number of messages. We need to iterate over the whole
* MemoryRecords object. So we use a helper method to get the number of messages that can be passed to
* {@link EntryFormatter#encode(MemoryRecords, int)} and metrics related methods as well.
*
* @param records messages with Kafka's format
* @return the number of messages
*/
static int parseNumMessages(final MemoryRecords records) {
Optional<Long> firstOffset = Optional.empty();
int numMessages = 0;
for (MutableRecordBatch batch : records.batches()) {
numMessages += (batch.lastOffset() - batch.baseOffset() + 1);
}
return numMessages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class PulsarEntryFormatter implements EntryFormatter {
private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;

@Override
public ByteBuf encode(MemoryRecords records) {
public ByteBuf encode(final MemoryRecords records, final int numMessages) {
long currentBatchSizeBytes = 0;
int numMessagesInBatch = 0;

Expand All @@ -72,8 +72,7 @@ public ByteBuf encode(MemoryRecords records) {

ByteBuf batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
.buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES));
final int size = getMemoryRecordsCount(records);
List<MessageImpl<byte[]>> messages = Lists.newArrayListWithExpectedSize(size);
List<MessageImpl<byte[]>> messages = Lists.newArrayListWithExpectedSize(numMessages);
MessageMetadata.Builder messageMetaBuilder = MessageMetadata.newBuilder();

StreamSupport.stream(records.records().spliterator(), true).forEachOrdered(record -> {
Expand Down Expand Up @@ -123,7 +122,7 @@ public ByteBuf encode(MemoryRecords records) {
}

@Override
public MemoryRecords decode(List<Entry> entries, byte magic) {
public MemoryRecords decode(final List<Entry> entries, final byte magic) {
try (ByteBufferOutputStream outputStream = new ByteBufferOutputStream(DEFAULT_FETCH_BUFFER_SIZE)) {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, magic,
org.apache.kafka.common.record.CompressionType.NONE,
Expand Down Expand Up @@ -219,14 +218,6 @@ public MemoryRecords decode(List<Entry> entries, byte magic) {
}
}

private static int getMemoryRecordsCount(final MemoryRecords records) {
int n = 0;
for (Record ignored : records.records()) {
n++;
}
return n;
}

// convert kafka Record to Pulsar Message.
// convert kafka Record to Pulsar Message.
// called when publish received Kafka Record into Pulsar.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop;

import static org.testng.Assert.assertEquals;

import io.streamnative.pulsar.handlers.kop.format.EntryFormatter;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.stream.IntStream;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* Tests for EntryFormatter.
*/
public class EntryFormatterTest {

@DataProvider(name = "compressionTypes")
Object[] allCompressionTypes() {
return Arrays.stream(CompressionType.values()).map(x -> (Object) x).toArray();
}

@Test(dataProvider = "compressionTypes")
public void testParseNumMessages(CompressionType compressionType) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int[] batchSizes = {3, 2, 5};

int baseOffset = 0;
for (int batchSize : batchSizes) {
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
compressionType, TimestampType.LOG_APPEND_TIME, baseOffset);
for (int i = 0; i < batchSize; i++) {
builder.append(0L, "a".getBytes(), "1".getBytes());
}
baseOffset += batchSize;
// Normally the offsets of batches are continuous, here we add an extra interval just for robustness.
baseOffset += 1;
builder.close();
}

buffer.flip();
final MemoryRecords records = MemoryRecords.readableRecords(buffer);
assertEquals(EntryFormatter.parseNumMessages(records), IntStream.of(batchSizes).sum());
}
}

0 comments on commit 0780706

Please sign in to comment.