From 4ec538da5bc086fce1655e770c630d729cb642cf Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Mon, 29 Jul 2019 16:26:19 +0800 Subject: [PATCH] Request Fetch implementation (#19) Master Issue: #4 This is the request Fetch implementation. Basic logic: For each Fetch request from a partition, create and maintain a NonDurableCursor to read from backed PersistentTopic. ** changes ** - Add basic code implementation - Add unit tests. --- conf/kop_standalone.conf | 2 +- .../streamnative/kop/KafkaRequestHandler.java | 429 ++++++++++++++++-- .../io/streamnative/kop/KafkaService.java | 4 + .../kop/KafkaServiceConfiguration.java | 2 +- .../io/streamnative/kop/KafkaStandalone.java | 3 + .../kop/KafkaTopicConsumerManager.java | 99 ++++ .../streamnative/kop/KafkaTopicManager.java | 58 +++ .../kop/utils/MessageIdUtils.java | 11 +- .../kop/utils/ReflectionUtils.java | 25 +- .../kop/KafkaRequestTypeTest.java | 144 +++++- .../kop/KafkaTopicConsumerManagerTest.java | 160 +++++++ .../kop/MockKafkaServiceBaseTest.java | 67 ++- src/test/resources/log4j2.xml | 2 +- 13 files changed, 945 insertions(+), 61 deletions(-) create mode 100644 src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java create mode 100644 src/main/java/io/streamnative/kop/KafkaTopicManager.java create mode 100644 src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java diff --git a/conf/kop_standalone.conf b/conf/kop_standalone.conf index 7f8b182d6edc8..5ce3c04d124de 100644 --- a/conf/kop_standalone.conf +++ b/conf/kop_standalone.conf @@ -25,7 +25,7 @@ zookeeperServers= # Configuration Store connection string configurationStoreServers= -brokerServicePort=9092 +brokerServicePort=6650 # Port to use to server HTTP request webServicePort=8080 diff --git a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java index 6c866b6f7c5cb..c78dc18c9b348 100644 --- a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java +++ b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java @@ -24,39 +24,61 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.streamnative.kop.utils.MessageIdUtils; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetSocketAddress; import java.net.URI; import java.time.Clock; import java.util.Base64; import java.util.Collections; +import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.FetchResponse.PartitionData; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata; +import org.apache.kafka.common.requests.OffsetFetchRequest; +import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; +import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.Topic.PublishContext; @@ -72,6 +94,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands.ChecksumType; @@ -89,7 +112,10 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final PulsarAdmin admin; private static final Clock clock = Clock.systemDefaultZone(); - private static final String producerName = "fake_kop_producer_name"; + private static final String FAKE_KOP_PRODUCER_NAME = "fake_kop_producer_name"; + + private static final int DEFAULT_FETCH_BUFFER_SIZE = 1024 * 1024; + private static final int MAX_RECORDS_BUFFER_SIZE = 100 * 1024 * 1024; public KafkaRequestHandler(KafkaService kafkaService) throws Exception { @@ -140,13 +166,15 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar) { .groupingBy(topicName -> getLocalNameWithoutPartition(topicName), Collectors.toList())); if (log.isDebugEnabled()) { - log.debug("Get all topics in TopicMetadata request, will get {} topics", pulsarTopics.size()); + log.debug("[{}] Request {}: Get all topics, will get {} topics", + ctx.channel(), metadataHar.getHeader(), pulsarTopics.size()); } pulsarTopicsFuture.complete(pulsarTopics); } catch (Exception e) { // error when getListOfPersistentTopics - log.error("Failed to get all topics list", e); + log.error("[{}] Request {}: Failed to get all topics list", + ctx.channel(), metadataHar.getHeader(), e); pulsarTopicsFuture.completeExceptionally(e); } }); @@ -177,8 +205,8 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar) { topic, false, Collections.emptyList())); - log.warn("[{}] Failed to get partitioned topic metadata: {}", - topicName, throwable.getMessage()); + log.warn("[{}] Request {}: Failed to get partitioned topic {} metadata: {}", + ctx.channel(), metadataHar.getHeader(), topicName, throwable.getMessage()); } else { List topicNames; if (partitionedTopicMetadata.partitions > 1) { @@ -194,7 +222,8 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar) { } else { if (log.isDebugEnabled()) { - log.debug("Topic {} has 1 partitions", topic); + log.debug("[{}] Request {}: Topic {} has 1 partitions", + ctx.channel(), metadataHar.getHeader(), topic); } topicNames = Lists.newArrayList(topicName); } @@ -205,7 +234,8 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar) { int completedTopics = topicsCompleted.incrementAndGet(); if (completedTopics == topicsNumber) { if (log.isDebugEnabled()) { - log.debug("Completed get {} topic's partitions", topicsNumber); + log.debug("[{}] Request {}: Completed get {} topic's partitions", + ctx.channel(), metadataHar.getHeader(), topicsNumber); } pulsarTopicsFuture.complete(pulsarTopics); } @@ -217,7 +247,8 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar) { AtomicInteger topicsCompleted = new AtomicInteger(0); pulsarTopicsFuture.whenComplete((pulsarTopics, e) -> { if (e != null) { - log.warn("Exception fetching metadata, will return null Response", e); + log.warn("[{}] Request {}: Exception fetching metadata, will return null Response", + ctx.channel(), metadataHar.getHeader(), e); MetadataResponse finalResponse = new MetadataResponse( Collections.emptyList(), @@ -240,7 +271,8 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar) { findBroker(kafkaService, topicName) .whenComplete(((partitionMetadata, throwable) -> { if (throwable != null) { - log.warn("Exception while find Broker metadata", throwable); + log.warn("[{}] Request {}: Exception while find Broker metadata", + ctx.channel(), metadataHar.getHeader(), throwable); partitionMetadatas.add(newFailedPartitionMetadata(topicName)); } else { Node newNode = partitionMetadata.leader(); @@ -253,7 +285,7 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar) { // whether completed this topic's partitions list. int finishedPartitions = partitionsCompleted.incrementAndGet(); if (log.isDebugEnabled()) { - log.debug("FindBroker for {} partitions of topic {}, total partitions: {}", + log.debug("[{}] Request {}: FindBroker for topic {}, partitions found/all: {}/{}.", finishedPartitions, topic, partitionsNumber); } if (finishedPartitions == partitionsNumber) { @@ -264,9 +296,10 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar) { // whether completed all the topics requests. int finishedTopics = topicsCompleted.incrementAndGet(); if (log.isDebugEnabled()) { - log.debug("Completed findBroker for all partitions of topic {}, partitions: {}; " - + "Finished Topics: {}, total topics: {}", - topic, partitionsNumber, finishedTopics, topicsNumber); + log.debug("[{}] Request {}: Completed findBroker for topic {}, " + + "partitions found/all: {}/{}", + ctx.channel(), metadataHar.getHeader(), topic, + finishedTopics, topicsNumber); } if (finishedTopics == topicsNumber) { // TODO: confirm right value for controller_id @@ -310,23 +343,25 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar) { responsesFutures.put(topicPartition, partitionResponse); if (log.isDebugEnabled()) { - log.debug("[{}] Produce messages for topic {} partition {}, request size: {} ", - ctx.channel(), topicPartition.topic(), topicPartition.partition(), responsesSize); + log.debug("[{}] Request {}: Produce messages for topic {} partition {}, request size: {} ", + ctx.channel(), produceHar.getHeader(), + topicPartition.topic(), topicPartition.partition(), responsesSize); } - TopicName topicName = pulsarTopicName(topicPartition.topic(), topicPartition.partition()); + TopicName topicName = pulsarTopicName(topicPartition); kafkaService.getBrokerService().getTopic(topicName.toString(), true) .whenComplete((topicOpt, exception) -> { if (exception != null) { - log.error("[{}] Failed to getOrCreateTopic {}. exception:", - ctx.channel(), topicName, exception); + log.error("[{}] Request {}: Failed to getOrCreateTopic {}. exception:", + ctx.channel(), produceHar.getHeader(), topicName, exception); partitionResponse.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR)); } else { if (topicOpt.isPresent()) { publishMessages(entry.getValue(), topicOpt.get(), partitionResponse); } else { - log.error("[{}] getOrCreateTopic get empty topic for name {}", ctx.channel(), topicName); + log.error("[{}] Request {}: getOrCreateTopic get empty topic for name {}", + ctx.channel(), produceHar.getHeader(), topicName); partitionResponse.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR)); } } @@ -343,7 +378,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar) { } if (log.isDebugEnabled()) { - log.debug("[{}] Complete handle produce request {}.", + log.debug("[{}] Request {}: Complete handle produce.", ctx.channel(), produceHar.toString()); } ctx.writeAndFlush(responseToByteBuf(new ProduceResponse(responses), produceHar)); @@ -384,7 +419,7 @@ private void publishMessages(Records records, CompletableFuture.allOf(futures.toArray(new CompletableFuture[rec])).whenComplete((ignore, ex) -> { if (ex != null) { - log.debug("[{}] publishMessages for topic partition: {} failed when write.", + log.error("[{}] publishMessages for topic partition: {} failed when write.", ctx.channel(), topic.getName(), ex); future.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR)); } else { @@ -412,7 +447,7 @@ public long getSequenceId() { public void completed(Exception exception, long ledgerId, long entryId) { if (exception != null) { - log.error("Failed write entry: {}, entryId: {}, sequenceId: {}. and triggered send callback.", + log.error("Failed write entry: ledgerId: {}, entryId: {}, sequenceId: {}. triggered send callback.", ledgerId, entryId, sequenceId); offsetFuture.completeExceptionally(exception); } else { @@ -475,6 +510,7 @@ private Message recordToEntry(Record record) { // key if (record.hasKey()) { byte[] key = new byte[record.keySize()]; + record.key().get(key); builder.keyBytes(key); } @@ -522,8 +558,9 @@ private ByteBuf messageToByteBuf(Message message) { msgMetadataBuilder.setPublishTime(clock.millis()); } if (!msgMetadataBuilder.hasProducerName()) { - msgMetadataBuilder.setProducerName(producerName); + msgMetadataBuilder.setProducerName(FAKE_KOP_PRODUCER_NAME); } + msgMetadataBuilder.setCompression( CompressionCodecProvider.convertToWireProtocol(CompressionType.NONE)); msgMetadataBuilder.setUncompressedSize(payload.readableBytes()); @@ -537,28 +574,354 @@ private ByteBuf messageToByteBuf(Message message) { return buf; } - protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator) { - throw new NotImplementedException("handleFindCoordinatorRequest"); + + // convert entries read from BookKeeper into Kafka Records + private static MemoryRecords entriesToRecords(List entries) { + try (ByteBufferOutputStream outputStream = new ByteBufferOutputStream(DEFAULT_FETCH_BUFFER_SIZE)) { + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, RecordBatch.CURRENT_MAGIC_VALUE, + org.apache.kafka.common.record.CompressionType.NONE, + TimestampType.CREATE_TIME, + 0, + RecordBatch.NO_TIMESTAMP, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + false, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + MAX_RECORDS_BUFFER_SIZE); + + for (Entry entry : entries) { + ByteBuf metadataAndPayload = entry.getDataBuffer(); + MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); + ByteBuf payload = metadataAndPayload.retain(); + + byte[] data = new byte[payload.readableBytes()]; + payload.readBytes(data); + + builder.appendWithOffset( + MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()), + msgMetadata.getEventTime(), + Base64.getDecoder().decode(msgMetadata.getPartitionKey()), + data); + } + return builder.build(); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } } - protected void handleListOffsetRequest(KafkaHeaderAndRequest listOffset) { - throw new NotImplementedException("handleListOffsetRequest"); + // A simple implementation, returns this broker node. + protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator) { + checkArgument(findCoordinator.getRequest() instanceof FindCoordinatorRequest); + FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest(); + + if (request.coordinatorType() == FindCoordinatorRequest.CoordinatorType.GROUP) { + AbstractResponse response; + try { + URI uri = new URI(kafkaService.getBrokerServiceUrl()); + Node node = newNode( + new InetSocketAddress( + uri.getHost(), + kafkaService.getKafkaConfig().getKafkaServicePort().get())); + + if (log.isDebugEnabled()) { + log.debug("[{}] Request {}: Return current broker node as Coordinator: {}.", + ctx.channel(), findCoordinator.getHeader(), node); + } + + response = new FindCoordinatorResponse( + Errors.NONE, + node); + } catch (Exception e) { + log.error("[{}] Request {}: Error while find coordinator.", + ctx.channel(), findCoordinator.getHeader(), e); + response = new FindCoordinatorResponse( + Errors.COORDINATOR_NOT_AVAILABLE, + Node.noNode()); + } + + ctx.writeAndFlush(responseToByteBuf(response, findCoordinator)); + } else { + throw new NotImplementedException("FindCoordinatorRequest not support TRANSACTION type"); + } } protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch) { - throw new NotImplementedException("handleOffsetFetchRequest"); + checkArgument(offsetFetch.getRequest() instanceof OffsetFetchRequest); + OffsetFetchRequest request = (OffsetFetchRequest) offsetFetch.getRequest(); + + List>> statsList = + request.partitions().stream().map((topicPartition) -> Pair.of(topicPartition, admin.topics() + .getInternalStatsAsync(pulsarTopicName(topicPartition).toString()))) + .collect(Collectors.toList()); + + CompletableFuture.allOf(statsList.stream().map(entry -> entry.getValue()).toArray(CompletableFuture[]::new)) + .whenComplete((ignore, ex) -> { + if (ex != null) { + log.error("[{}] Request {}: OffsetFetch {} meet error:", + ctx.channel(), offsetFetch.getHeader(), offsetFetch.getHeader(), ex); + ctx.writeAndFlush(responseToByteBuf( + new OffsetFetchResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null), offsetFetch)); + return; + } + + // TODO: topics may not exist currently, need createTopic? + Map responses = + statsList.stream().map((v) -> { + TopicPartition key = v.getKey(); + try { + List ledgers = v.getValue().join().ledgers; + // return first ledger. + long offset = MessageIdUtils.getOffset(ledgers.get(0).ledgerId, 0); + + if (log.isDebugEnabled()) { + log.debug("[{}] Request {}: topic {} Return offset: {}, ledgerId: {}.", + ctx.channel(), offsetFetch.getHeader(), key, offset, ledgers.get(0).ledgerId); + } + return Pair.of(key, new OffsetFetchResponse.PartitionData( + offset, OffsetFetchResponse.NO_METADATA, Errors.NONE)); + } catch (Exception e) { + log.error("[{}] Request {}: topic {} meet error.", + ctx.channel(), offsetFetch.getHeader(), key, e); + return Pair.of(key, new OffsetFetchResponse.PartitionData( + OffsetFetchResponse.INVALID_OFFSET, + OffsetFetchResponse.NO_METADATA, + Errors.UNKNOWN_SERVER_ERROR)); + } + }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + ctx.writeAndFlush(responseToByteBuf(new OffsetFetchResponse(Errors.NONE, responses), offsetFetch)); + }); + } + + protected void handleListOffsetRequest(KafkaHeaderAndRequest listOffset) { + throw new NotImplementedException("handleListOffsetRequest"); } protected void handleOffsetCommitRequest(KafkaHeaderAndRequest offsetCommit) { throw new NotImplementedException("handleOffsetCommitRequest"); } + private void readMessages(KafkaHeaderAndRequest fetch, + Map> cursors) { + AtomicInteger bytesRead = new AtomicInteger(0); + Map> responseValues = new ConcurrentHashMap<>(); + + if (log.isDebugEnabled()) { + log.debug("[{}] Request {}: Read Messages for request.", + ctx.channel(), fetch.getHeader()); + } + + readMessagesInternal(fetch, cursors, bytesRead, responseValues); + } + + private void readMessagesInternal(KafkaHeaderAndRequest fetch, + Map> cursors, + AtomicInteger bytesRead, + Map> responseValues) { + + Map> readFutures = readAllCursorOnce(cursors); + CompletableFuture.allOf(readFutures.values().stream().toArray(CompletableFuture[]::new)) + .whenComplete((ignore, ex) -> { + // keep entries since all read completed. + readFutures.forEach((topic, readEntry) -> { + try { + Entry entry = readEntry.join(); + if (entry != null) { + List entryList = responseValues.computeIfAbsent(topic, l -> Lists.newArrayList()); + entryList.add(entry); + bytesRead.addAndGet(entry.getLength()); + if (log.isDebugEnabled()) { + log.debug("[{}] Request {}: For topic {}, entries in list: {}. add new entry {}:{}", + ctx.channel(), fetch.getHeader(), topic.toString(), entryList.size(), + entry.getLedgerId(), entry.getEntryId()); + } + } + } catch (Exception e) { + // readEntry.join failed. ignore this partition + log.error("[{}] Request {}: Failed readEntry.join for topic: {}. ", + ctx.channel(), fetch.getHeader(), topic, e); + cursors.remove(topic); + responseValues.putIfAbsent(topic, Lists.newArrayList()); + } + }); + + FetchRequest request = (FetchRequest) fetch.getRequest(); + int maxBytes = request.maxBytes(); + int minBytes = request.minBytes(); + int waitTime = request.maxWait(); // in ms + // if endTime <= 0, then no time wait, wait for minBytes. + long endTime = waitTime > 0 ? System.currentTimeMillis() + waitTime : waitTime; + + int allSize = bytesRead.get(); + + if (log.isDebugEnabled()) { + log.debug("[{}] Request {}: One round read {} entries, " + + "allSize/maxBytes/minBytes/endTime: {}/{}/{}/{}", + ctx.channel(), fetch.getHeader(), readFutures.size(), + allSize, maxBytes, minBytes, new Date(endTime)); + } + + // reach maxTime, return; + // reach minBytes if no endTime, return; + if ((endTime > 0 && endTime <= System.currentTimeMillis()) + || allSize > minBytes + || allSize > maxBytes){ + if (log.isDebugEnabled()) { + log.debug("[{}] Request {}: Complete read {} entries with size {}", + ctx.channel(), fetch.getHeader(), readFutures.size(), allSize); + } + + LinkedHashMap> responseData = new LinkedHashMap<>(); + + AtomicBoolean allPartitionsFailed = new AtomicBoolean(true); + responseValues.forEach((topicPartition, entries) -> { + FetchResponse.PartitionData partitionData; + if (entries.isEmpty()) { + partitionData = new FetchResponse.PartitionData( + Errors.OFFSET_OUT_OF_RANGE, + FetchResponse.INVALID_HIGHWATERMARK, + FetchResponse.INVALID_LAST_STABLE_OFFSET, + FetchResponse.INVALID_LOG_START_OFFSET, + null, + MemoryRecords.EMPTY); + } else { + allPartitionsFailed.set(false); + Entry entry = entries.get(entries.size() - 1); + long entryOffset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()); + long highWatermark = entryOffset + cursors.get(topicPartition).join().getNumberOfEntries(); + + MemoryRecords records = entriesToRecords(entries); + partitionData = new FetchResponse.PartitionData( + Errors.NONE, + highWatermark, + highWatermark, + highWatermark, + null, + records); + } + + responseData.put(topicPartition, partitionData); + }); + + if (allPartitionsFailed.get()) { + log.error("[{}] Request {}: All partitions for request failed", + ctx.channel(), fetch.getHeader()); + ctx.writeAndFlush( + responseToByteBuf( + new FetchResponse(Errors.UNKNOWN_SERVER_ERROR, + responseData, + ((Integer) THROTTLE_TIME_MS.defaultValue), + ((FetchRequest) fetch.getRequest()).metadata().sessionId()), + fetch)); + } else { + ctx.writeAndFlush( + responseToByteBuf( + new FetchResponse( + Errors.NONE, + responseData, + ((Integer) THROTTLE_TIME_MS.defaultValue), + ((FetchRequest) fetch.getRequest()).metadata().sessionId()), + fetch)); + } + } else { + //need do another round read + readMessagesInternal(fetch, cursors, bytesRead, responseValues); + } + }); + } + + private Map> readAllCursorOnce( + Map> cursors) { + Map> readFutures = new ConcurrentHashMap<>(); + + cursors.entrySet().forEach(pair -> { + // non durable cursor create is a sync method. + ManagedCursor cursor; + CompletableFuture readFuture = new CompletableFuture<>(); + + try { + cursor = pair.getValue().join(); + + // only read 1 entry currently. could read more in a batch. + cursor.asyncReadEntries(1, + new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List list, Object o) { + TopicName topicName = pulsarTopicName(pair.getKey()); + + Entry entry = null; + if (!list.isEmpty()) { + entry = list.get(0); + long offset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()); + + if (log.isDebugEnabled()) { + log.debug("Topic {} success read entry: ledgerId: {}, entryId: {}, size: {}," + + " ConsumerManager add offset: {}", + topicName.toString(), entry.getLedgerId(), entry.getEntryId(), + entry.getLength(), offset + 1); + } + + kafkaService.getKafkaTopicManager() + .getTopicConsumerManager(topicName.toString()) + .thenAccept(cm -> cm.add(offset + 1, pair.getValue())); + } + + readFuture.complete(entry); + } + + @Override + public void readEntriesFailed(ManagedLedgerException e, Object o) { + log.error("Error read entry for topic: {}", pulsarTopicName(pair.getKey())); + readFuture.completeExceptionally(e); + } + }, null); + } catch (Exception e) { + log.error("Error for cursor to read entry for topic: {}. ", pulsarTopicName(pair.getKey()), e); + readFuture.completeExceptionally(e); + } + + readFutures.putIfAbsent(pair.getKey(), readFuture); + }); + + return readFutures; + } + + protected void handleFetchRequest(KafkaHeaderAndRequest fetch) { - throw new NotImplementedException("handleFetchRequest"); + checkArgument(fetch.getRequest() instanceof FetchRequest); + FetchRequest request = (FetchRequest) fetch.getRequest(); + + // Map of partition and related cursor + Map> topicsAndCursor = request + .fetchData().entrySet().stream() + .map(entry -> { + TopicName topicName = pulsarTopicName(entry.getKey()); + long offset = entry.getValue().fetchOffset; + + if (log.isDebugEnabled()) { + log.debug("[{}] Request {}: Fetch topic {}, remove cursor for fetch offset: {}.", + ctx.channel(), fetch.getHeader(), topicName, offset); + } + + return Pair.of( + entry.getKey(), + kafkaService.getKafkaTopicManager() + .getTopicConsumerManager(topicName.toString()) + .thenCompose(cm -> cm.remove(offset))); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + // wait to get all the cursor, then readMessages + CompletableFuture + .allOf(topicsAndCursor.entrySet().stream().map(Map.Entry::getValue).toArray(CompletableFuture[]::new)) + .whenComplete((ignore, ex) -> { + readMessages(fetch, topicsAndCursor); + }); } protected void handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup) { - throw new NotImplementedException("handleFetchRequest"); + throw new NotImplementedException("handleJoinGroupRequest"); } protected void handleSyncGroupRequest(KafkaHeaderAndRequest syncGroup) { @@ -597,7 +960,7 @@ private CompletableFuture findBroker(KafkaService kafkaServic URI uri = new URI(brokerUrl); if (log.isDebugEnabled()) { - log.debug("Find broker: {} for topicName: {}", uri, topic); + log.debug("Found broker: {} for topicName: {}", uri, topic); } Node node = newNode(new InetSocketAddress( @@ -622,6 +985,10 @@ private CompletableFuture findBroker(KafkaService kafkaServic } + private TopicName pulsarTopicName(TopicPartition topicPartition) { + return pulsarTopicName(topicPartition.topic(), topicPartition.partition()); + } + private TopicName pulsarTopicName(String topic) { return TopicName.get(TopicDomain.persistent.value(), kafkaNamespace, topic); } diff --git a/src/main/java/io/streamnative/kop/KafkaService.java b/src/main/java/io/streamnative/kop/KafkaService.java index 58859c0b3375a..ee822c0f4e600 100644 --- a/src/main/java/io/streamnative/kop/KafkaService.java +++ b/src/main/java/io/streamnative/kop/KafkaService.java @@ -41,6 +41,8 @@ public class KafkaService extends PulsarService { @Getter private final KafkaServiceConfiguration kafkaConfig; + @Getter + private KafkaTopicManager kafkaTopicManager; public KafkaService(KafkaServiceConfiguration config) { super(config); @@ -167,6 +169,8 @@ public Boolean get() { + (kafkaConfig.getKafkaServicePortTls().isPresent() ? "broker url= " + kafkaConfig.getKafkaServicePortTls() : ""); + kafkaTopicManager = new KafkaTopicManager(getBrokerService()); + log.info("Kafka messaging service is ready, {}, cluster={}, configs={}", bootstrapMessage, kafkaConfig.getClusterName(), ReflectionToStringBuilder.toString(kafkaConfig)); } catch (Exception e) { diff --git a/src/main/java/io/streamnative/kop/KafkaServiceConfiguration.java b/src/main/java/io/streamnative/kop/KafkaServiceConfiguration.java index 883c78908e24a..2b76e6dab5a9c 100644 --- a/src/main/java/io/streamnative/kop/KafkaServiceConfiguration.java +++ b/src/main/java/io/streamnative/kop/KafkaServiceConfiguration.java @@ -57,7 +57,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { doc = "The port for serving Kafka requests" ) - private Optional kafkaServicePort = Optional.of(9902); + private Optional kafkaServicePort = Optional.of(9092); @FieldContext( category = CATEGORY_KOP, diff --git a/src/main/java/io/streamnative/kop/KafkaStandalone.java b/src/main/java/io/streamnative/kop/KafkaStandalone.java index e7dfe018b19fa..d75addecb9375 100644 --- a/src/main/java/io/streamnative/kop/KafkaStandalone.java +++ b/src/main/java/io/streamnative/kop/KafkaStandalone.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; @@ -264,6 +265,8 @@ private void createDefaultNameSpace(URL webServiceUrl, String brokerServiceUrl, Set clusters = Sets.newHashSet(config.getKafkaClusterName()); admin.namespaces().createNamespace(defaultNamespace, clusters); admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, clusters); + admin.namespaces().setRetention(defaultNamespace, + new RetentionPolicies(20, 100)); } } catch (PulsarAdminException e) { log.info("error while create default namespace: {}", e.getMessage()); diff --git a/src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java b/src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java new file mode 100644 index 0000000000000..955a559d0e7ac --- /dev/null +++ b/src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java @@ -0,0 +1,99 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop; + +import io.streamnative.kop.utils.MessageIdUtils; +import io.streamnative.kop.utils.ReflectionUtils; +import java.lang.reflect.Method; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; + +/** + * KafkaTopicConsumerManager manages a topic and its related offset cursor. + * Each cursor is trying to track the read from a consumer client. + */ +@Slf4j +public class KafkaTopicConsumerManager { + + private final PersistentTopic topic; + @Getter + private final ConcurrentLongHashMap> consumers; + + KafkaTopicConsumerManager(PersistentTopic topic) { + this.topic = topic; + this.consumers = new ConcurrentLongHashMap<>(); + } + + public CompletableFuture remove(long offset) { + CompletableFuture cursor = consumers.remove(offset); + if (cursor != null) { + if (log.isDebugEnabled()) { + log.debug("Get cursor for offset: {} in cache", offset); + } + return cursor; + } + + // handle null remove. + cursor = new CompletableFuture<>(); + CompletableFuture oldCursor = consumers.putIfAbsent(offset, cursor); + if (oldCursor != null) { + // added by other thread while creating. + return remove(offset); + } + + String cursorName = "kop-consumer-cursor-" + topic.getName() + "-" + offset + "-" + + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10); + + PositionImpl position = MessageIdUtils.getPosition(offset); + + try { + // get previous position, because NonDurableCursor is read from next position. + ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); + Method getPreviousPosition = ReflectionUtils + .setMethodAccessible(ledger, "getPreviousPosition", PositionImpl.class); + PositionImpl previous = (PositionImpl) getPreviousPosition.invoke(ledger, position); + + if (log.isDebugEnabled()) { + log.debug("Create cursor {} for offset: {}. position: {}, previousPosition: {}", + cursorName, offset, position, previous); + } + + cursor.complete(topic.getManagedLedger().newNonDurableCursor(previous, cursorName)); + } catch (Exception e) { + log.error("Failed create nonDurable cursor for topic {} position: {}.", topic, position, e); + cursor.completeExceptionally(e); + } + + return remove(offset); + } + + + // once entry read complete, add new offset back. + public void add(long offset, CompletableFuture cursor) { + CompletableFuture oldCursor = consumers.putIfAbsent(offset, cursor); + + if (log.isDebugEnabled()) { + log.debug("Add cursor {} for offset: {}. oldCursor: {}", cursor.join().getName(), offset, oldCursor); + } + } + +} diff --git a/src/main/java/io/streamnative/kop/KafkaTopicManager.java b/src/main/java/io/streamnative/kop/KafkaTopicManager.java new file mode 100644 index 0000000000000..c1b8fade9b7ec --- /dev/null +++ b/src/main/java/io/streamnative/kop/KafkaTopicManager.java @@ -0,0 +1,58 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop; + +import java.util.concurrent.CompletableFuture; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; + +/** + * KafkaTopicManager manages a Map of topic to KafkaTopicConsumerManager. + * For each topic, there is a KafkaTopicConsumerManager, which manages a topic and its related offset cursor. + */ +@Slf4j +public class KafkaTopicManager { + + private final BrokerService service; + @Getter + private final ConcurrentOpenHashMap> topics; + + KafkaTopicManager(BrokerService service) { + this.service = service; + topics = new ConcurrentOpenHashMap<>(); + } + + public CompletableFuture getTopicConsumerManager(String topicName) { + return topics.computeIfAbsent( + topicName, + t -> service + .getTopic(topicName, true) + .thenApply(t2 -> { + if (log.isDebugEnabled()) { + log.debug("Call getTopicConsumerManager for {}, and create KafkaTopicConsumerManager.", + topicName); + } + return new KafkaTopicConsumerManager((PersistentTopic) t2.get()); + }) + .exceptionally(ex -> { + log.error("Failed to getTopicConsumerManager {}. exception:", + topicName, ex); + return null; + }) + ); + } +} diff --git a/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java b/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java index d2b4e31bbff4b..9d71b722dae5b 100644 --- a/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java +++ b/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java @@ -13,6 +13,7 @@ */ package io.streamnative.kop.utils; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -30,10 +31,18 @@ public static final long getOffset(long ledgerId, long entryId) { } public static final MessageId getMessageId(long offset) { - // Demultiplex ledgerId and entryId from offset + // De-multiplex ledgerId and entryId from offset long ledgerId = offset >>> 28; long entryId = offset & 0x0F_FF_FF_FFL; return new MessageIdImpl(ledgerId, entryId, -1); } + + public static final PositionImpl getPosition(long offset) { + // De-multiplex ledgerId and entryId from offset + long ledgerId = offset >>> 28; + long entryId = offset & 0x0F_FF_FF_FFL; + + return new PositionImpl(ledgerId, entryId); + } } diff --git a/src/main/java/io/streamnative/kop/utils/ReflectionUtils.java b/src/main/java/io/streamnative/kop/utils/ReflectionUtils.java index 1748e2a942711..243724834289b 100644 --- a/src/main/java/io/streamnative/kop/utils/ReflectionUtils.java +++ b/src/main/java/io/streamnative/kop/utils/ReflectionUtils.java @@ -30,7 +30,7 @@ public final class ReflectionUtils { * @return the value of the private field */ @SuppressWarnings("unchecked") - public static T getField(Object privateObject, String fieldName) { + public static T getSuperField(Object privateObject, String fieldName) { try { Field privateField = privateObject.getClass().getSuperclass().getDeclaredField(fieldName); privateField.setAccessible(true); @@ -49,7 +49,7 @@ public static T getField(Object privateObject, String fieldName) { * @throws IllegalAccessException * @throws NoSuchFieldException */ - public static void setField(Object privateObject, + public static void setSuperField(Object privateObject, String fieldName, T fieldValue) throws IllegalAccessException, NoSuchFieldException { @@ -59,12 +59,12 @@ public static void setField(Object privateObject, } /** - * Call the private method's super class. + * Call the private method's super class method. * * @param privateObject the object * @param methodName the private method name */ - public static void callNoArgVoidMethod(Object privateObject, + public static void callSuperNoArgVoidMethod(Object privateObject, String methodName) throws Exception { try { Method privateStringMethod = privateObject.getClass().getSuperclass() @@ -83,6 +83,23 @@ public static void callNoArgVoidMethod(Object privateObject, } } + /** + * set the private method's accessible. + * + * @param privateObject the object + * @param methodName the private method name + * @param parameterTypes the parameter types + */ + public static Method setMethodAccessible(Object privateObject, + String methodName, + Class... parameterTypes) throws Exception { + Method privateStringMethod = privateObject.getClass().getDeclaredMethod(methodName, parameterTypes); + + privateStringMethod.setAccessible(true); + + return privateStringMethod; + } + private ReflectionUtils() {} } diff --git a/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java b/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java index 1f6d77b90fdb4..38e07459fbd3f 100644 --- a/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java +++ b/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java @@ -19,12 +19,21 @@ import static org.testng.Assert.assertNull; import com.google.common.collect.Sets; +import java.time.Duration; +import java.util.Base64; +import java.util.Collections; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -47,6 +56,8 @@ protected void setup() throws Exception { new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); admin.namespaces().createNamespace("public/default"); admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/default", + new RetentionPolicies(20, 100)); } @AfterMethod @@ -55,10 +66,9 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Test(timeOut = 20000) - public void testProduceRequest() throws Exception { - String topicName = "kopTopicProduce"; + public void testKafkaProducePulsarConsume() throws Exception { + String topicName = "kopKafkaProducePulsarConsume"; // create partitioned topic. kafkaService.getAdminClient().topics().createPartitionedTopic(topicName, 1); @@ -67,35 +77,37 @@ public void testProduceRequest() throws Exception { .topic("persistent://public/default/" + topicName + PARTITIONED_TOPIC_SUFFIX + 0) .subscriptionName("test_producer_sub").subscribe(); - // 1. produce message with Kafka producer. - Producer producer = new Producer(topicName, false); + KProducer kProducer = new KProducer(topicName, false); - int messageNo = 0; int totalMsgs = 10; + String messageStrPrefix = "Message_Kop_KafkaProducePulsarConsume_"; - while (messageNo < totalMsgs) { - String messageStr = "Message_Kop_" + messageNo; - + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; try { - producer.getProducer().send(new ProducerRecord<>(topicName, - messageNo, - messageStr)).get(); - log.info("Sent message: (" + messageNo + ", " + messageStr + ")"); + kProducer.getProducer() + .send(new ProducerRecord<>( + topicName, + i, + messageStr)) + .get(); + log.debug("Sent message: ({}, {})", i, messageStr); } catch (Exception e) { e.printStackTrace(); } - ++messageNo; } - assertEquals(totalMsgs, messageNo); - + // 2. Consume messages use Pulsar client Consumer. verify content and key Message msg = null; - - // 2. Consume messages use Pulsar client Consumer. for (int i = 0; i < totalMsgs; i++) { msg = consumer.receive(100, TimeUnit.MILLISECONDS); - log.info("Pulsar consumer get message: {}", new String(msg.getData())); + + assertEquals(messageStrPrefix + i, new String(msg.getValue())); + assertEquals(Integer.valueOf(i), kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey()))); + + log.debug("Pulsar consumer get message: {}, key: {}", + new String(msg.getData()), kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())).toString()); consumer.acknowledge(msg); } @@ -103,4 +115,98 @@ public void testProduceRequest() throws Exception { msg = consumer.receive(100, TimeUnit.MILLISECONDS); assertNull(msg); } + + @Test(timeOut = 20000) + public void testKafkaProduceKafkaConsume() throws Exception { + String topicName = "kopKafkaProduceKafkaConsume"; + + // create partitioned topic. + kafkaService.getAdminClient().topics().createPartitionedTopic(topicName, 1); + + // 1. produce message with Kafka producer. + int totalMsgs = 10; + String messageStrPrefix = "Message_Kop_KafkaProduceKafkaConsume_"; + + KProducer producer = new KProducer(topicName, false); + + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; + try { + producer.getProducer() + .send(new ProducerRecord<>( + topicName, + i, + messageStr)) + .get(); + log.debug("Sent message: ({}, {})", i, messageStr); + } catch (Exception e) { + e.printStackTrace(); + } + } + + // 2. use kafka consumer to consume. + KConsumer kConsumer = new KConsumer(topicName); + kConsumer.getConsumer().assign(Collections.singletonList(new TopicPartition(topicName, 0))); + + for (int i = 0; i < totalMsgs; i++) { + log.debug("start poll: {}", i); + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + assertEquals(messageStrPrefix + i, record.value()); + assertEquals(Integer.valueOf(i), record.key()); + + log.debug("Received message: {}, {} at offset {}", + record.key(), record.value(), record.offset()); + } + } + } + + @Test(timeOut = 20000) + public void testPulsarProduceKafkaConsume() throws Exception { + String topicName = "kopPulsarProduceKafkaConsume"; + String pulsarTopicName = "persistent://public/default/" + topicName + PARTITIONED_TOPIC_SUFFIX + 0; + + // create partitioned topic. + kafkaService.getAdminClient().topics().createPartitionedTopic(topicName, 1); + + // create a consumer to retention the data? + Consumer consumer = pulsarClient.newConsumer() + .topic("persistent://public/default/" + topicName + PARTITIONED_TOPIC_SUFFIX + 0) + .subscriptionName("test_producer_sub").subscribe(); + + + // 1. use pulsar producer to produce. + int totalMsgs = 10; + String messageStrPrefix = "Message_Kop_PulsarProduceKafkaConsume_"; + + ProducerBuilder producerBuilder = pulsarClient.newProducer() + .topic(pulsarTopicName) + .enableBatching(false); + + Producer producer = producerBuilder.create(); + for (int i = 0; i < totalMsgs; i++) { + String message = messageStrPrefix + i; + producer.newMessage() + .keyBytes(kafkaIntSerialize(Integer.valueOf(i))) + .value(message.getBytes()) + .send(); + } + + // 2. use kafka consumer to consume. + KConsumer kConsumer = new KConsumer(topicName); + kConsumer.getConsumer().assign(Collections.singletonList(new TopicPartition(topicName, 0))); + + for (int i = 0; i < totalMsgs; i++) { + log.debug("start poll: {}", i); + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + assertEquals(messageStrPrefix + i, record.value()); + assertEquals(Integer.valueOf(i), record.key()); + + log.debug("Received message: {}, {} at offset {}", + record.key(), record.value(), record.offset()); + } + } + } + } diff --git a/src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java b/src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java new file mode 100644 index 0000000000000..c36bc8ab7c0bb --- /dev/null +++ b/src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java @@ -0,0 +1,160 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Sets; +import io.streamnative.kop.utils.MessageIdUtils; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Pulsar service configuration object. + */ +public class KafkaTopicConsumerManagerTest extends MockKafkaServiceBaseTest { + + private KafkaTopicManager kafkaTopicManager; + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + // so that clients can test short names + admin.clusters().createCluster("test", + new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + + admin.tenants().createTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/default", + new RetentionPolicies(20, 100)); + + kafkaTopicManager = new KafkaTopicManager(kafkaService.getBrokerService()); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testGetTopicConsumerManager() throws Exception { + String topicName = "persistent://public/default/testGetTopicConsumerManager"; + admin.lookups().lookupTopic(topicName); + CompletableFuture tcm = kafkaTopicManager.getTopicConsumerManager(topicName); + KafkaTopicConsumerManager topicConsumerManager = tcm.get(); + + // 1. verify another get with same topic will return same tcm + tcm = kafkaTopicManager.getTopicConsumerManager(topicName); + KafkaTopicConsumerManager topicConsumerManager2 = tcm.get(); + + assertTrue(topicConsumerManager == topicConsumerManager2); + assertEquals(kafkaTopicManager.getTopics().size(), 1); + + // 2. verify another get with different topic will return different tcm + String topicName2 = "persistent://public/default/testGetTopicConsumerManager2"; + admin.lookups().lookupTopic(topicName2); + tcm = kafkaTopicManager.getTopicConsumerManager(topicName2); + topicConsumerManager2 = tcm.get(); + assertTrue(topicConsumerManager != topicConsumerManager2); + assertEquals(kafkaTopicManager.getTopics().size(), 2); + } + + + @Test + public void testTopicConsumerManagerRemoveAndAdd() throws Exception { + String topicName = "persistent://public/default/testTopicConsumerManagerRemoveAndAdd"; + admin.lookups().lookupTopic(topicName); + + ProducerBuilder producerBuilder = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false); + + Producer producer = producerBuilder.create(); + MessageIdImpl messageId = null; + int i = 0; + String messagePrefix = "testTopicConsumerManagerRemoveAndAdd_message_"; + for (; i < 5; i++) { + String message = messagePrefix + i; + messageId = (MessageIdImpl) producer.newMessage() + .keyBytes(kafkaIntSerialize(Integer.valueOf(i))) + .value(message.getBytes()) + .send(); + } + + CompletableFuture tcm = kafkaTopicManager.getTopicConsumerManager(topicName); + KafkaTopicConsumerManager topicConsumerManager = tcm.get(); + + long offset = MessageIdUtils.getOffset(messageId.getLedgerId(), messageId.getEntryId()); + + // before a read, first get cursor of offset. + CompletableFuture cursorCompletableFuture = topicConsumerManager.remove(offset); + assertEquals(topicConsumerManager.getConsumers().size(), 0); + ManagedCursor cursor = cursorCompletableFuture.get(); + + // another write. + producer.newMessage() + .keyBytes(kafkaIntSerialize(Integer.valueOf(i))) + .value((messagePrefix + i).getBytes()) + .send(); + i++; + + // simulate a read complete; + offset++; + topicConsumerManager.add(offset, cursorCompletableFuture); + assertEquals(topicConsumerManager.getConsumers().size(), 1); + + // another read, cache hit. + cursorCompletableFuture = topicConsumerManager.remove(offset); + assertEquals(topicConsumerManager.getConsumers().size(), 0); + ManagedCursor cursor2 = cursorCompletableFuture.get(); + + assertTrue(cursor2 == cursor); + assertEquals(cursor2.getName(), cursor.getName()); + + // simulate a read complete, add back offset. + offset++; + topicConsumerManager.add(offset, cursorCompletableFuture); + + // produce another 3 message + for (; i < 10; i++) { + String message = messagePrefix + i; + messageId = (MessageIdImpl) producer.newMessage() + .keyBytes(kafkaIntSerialize(Integer.valueOf(i))) + .value(message.getBytes()) + .send(); + } + + // try read last messages, so read not continuous + offset = MessageIdUtils.getOffset(messageId.getLedgerId(), messageId.getEntryId()); + cursorCompletableFuture = topicConsumerManager.remove(offset); + assertEquals(topicConsumerManager.getConsumers().size(), 1); + cursor2 = cursorCompletableFuture.get(); + assertNotEquals(cursor2.getName(), cursor.getName()); + } +} diff --git a/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java b/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java index 8172f27b1c61a..9f7ba0291480f 100644 --- a/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java +++ b/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java @@ -39,10 +39,13 @@ import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.test.PortManager; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.broker.BookKeeperClientFactory; @@ -304,15 +307,15 @@ public static void setFieldValue(Class clazz, Object classObj, String fieldName, * A producer wrapper. */ @Getter - public class Producer { + public class KProducer { private final KafkaProducer producer; private final String topic; private final Boolean isAsync; - public Producer(String topic, Boolean isAsync) { + public KProducer(String topic, Boolean isAsync) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost" + ":" + kafkaBrokerPort); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoKafkaOnPulsarProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<>(props); @@ -356,4 +359,62 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { } } } + + + /** + * A consumer wrapper. + */ + @Getter + public class KConsumer { + private final KafkaConsumer consumer; + private final String topic; + + public KConsumer(String topic) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost" + ":" + kafkaBrokerPort); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoKafkaOnPulsarConsumer"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.IntegerDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + + this.consumer = new KafkaConsumer<>(props); + this.topic = topic; + } + } + + + public static Integer kafkaIntDeserialize(byte[] data) { + if (data == null) { + return null; + } + + if (data.length != 4) { + throw new SerializationException("Size of data received by IntegerDeserializer is not 4"); + } + + int value = 0; + for (byte b : data) { + value <<= 8; + value |= b & 0xFF; + } + return value; + } + + public static byte[] kafkaIntSerialize(Integer data) { + if (data == null) { + return null; + } + + return new byte[] { + (byte) (data >>> 24), + (byte) (data >>> 16), + (byte) (data >>> 8), + data.byteValue() + }; + } } diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index 8e79ebf8d8157..4f9dc8da81eed 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -28,6 +28,6 @@ - +