Skip to content

Commit

Permalink
fix event time, change default listener in standalone config (apache#48)
Browse files Browse the repository at this point in the history
In pulsar cpp client, the default event time is set as minus value and caused error when use kafka consumer to consume message produced by pulsar cpp/cgo client. 
In this change, if we checked the event time is not set, using publish time to set it.
  • Loading branch information
jiazhai authored and sijie committed Nov 19, 2019
1 parent 3424abc commit 58daa59
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
2 changes: 1 addition & 1 deletion conf/kop_standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ enableGroupCoordinator=true

messagingProtocols=kafka

listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
listeners=PLAINTEXT://localhost:9092
### --- General broker settings --- ###

# Zookeeper quorum connection string
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/io/streamnative/kop/utils/MessageRecordUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,30 +239,30 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.
// each entry is a batched message
ByteBuf metadataAndPayload = entry.getDataBuffer();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload);
int batchSize = msgMetadata.getNumMessagesInBatch();
boolean isBatchMessage = msgMetadata.hasNumMessagesInBatch();
int numMessages = msgMetadata.getNumMessagesInBatch();
boolean notBatchMessage = (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch());
ByteBuf payload = metadataAndPayload.retain();

if (log.isDebugEnabled()) {
log.debug("entriesToRecords. NumMessagesInBatch {}: entries in list: {}. new entryId {}:{}",
batchSize, entries.size(), entry.getLedgerId(), entry.getEntryId());
log.debug("entriesToRecords. readerIndex:{} writerIndex:{}",
payload.readerIndex(), payload.writerIndex());
log.debug("entriesToRecords. NumMessagesInBatch: {}, isBatchMessage: {}, entries in list: {}."
+ " new entryId {}:{}, readerIndex: {}, writerIndex: {}",
numMessages, !notBatchMessage, entries.size(), entry.getLedgerId(),
entry.getEntryId(), payload.readerIndex(), payload.writerIndex());
}

// need handle encryption
checkState(msgMetadata.getEncryptionKeysCount() == 0);

if (isBatchMessage) {
for (int i = 0; i < batchSize; ++i) {
if (!notBatchMessage) {
for (int i = 0; i < numMessages; ++i) {
if (log.isDebugEnabled()) {
log.debug(" processing message num - {} in batch", i);
}

SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata
.newBuilder();
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(metadataAndPayload,
singleMessageMetadataBuilder, i, batchSize);
singleMessageMetadataBuilder, i, numMessages);

SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build();

Expand All @@ -272,7 +272,7 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.

builder.appendWithOffset(
MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId(), i),
msgMetadata.getEventTime(),
msgMetadata.getEventTime() > 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(),
Base64.getDecoder().decode(singleMessageMetadata.getPartitionKey()),
data,
headers);
Expand All @@ -285,7 +285,7 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.

builder.appendWithOffset(
MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()),
msgMetadata.getEventTime(),
msgMetadata.getEventTime() > 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(),
Base64.getDecoder().decode(msgMetadata.getPartitionKey()),
data,
headers);
Expand Down
14 changes: 12 additions & 2 deletions src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.streamnative.kop.utils.MessageIdUtils;
import java.time.Duration;
Expand All @@ -31,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -53,6 +55,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -323,15 +326,22 @@ public void testPulsarProduceKafkaConsume(int partitionNumber, boolean isBatch)

@Cleanup
Producer<byte[]> producer = producerBuilder.create();
List<CompletableFuture<MessageId>> sendResults = Lists.newArrayListWithExpectedSize(totalMsgs);
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < totalMsgs; i++) {
String message = messageStrPrefix + i;
producer.newMessage()
CompletableFuture<MessageId> id = producer.newMessage()
.keyBytes(kafkaIntSerialize(Integer.valueOf(i)))
.value(message.getBytes())
.property(key1 + i, value1 + i)
.property(key2 + i, value2 + i)
.send();
.sendAsync();
sendResults.add(id);
}
FutureUtil.waitForAll(sendResults).whenCompleteAsync((r, t) -> {
latch.countDown();
});
latch.await();

// 2. use kafka consumer to consume.
@Cleanup
Expand Down

0 comments on commit 58daa59

Please sign in to comment.