diff --git a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java index bb2eab2bdec3a..b23116ba48638 100644 --- a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java +++ b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java @@ -53,10 +53,16 @@ import java.util.stream.IntStream; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImplWrapper; +import org.apache.bookkeeper.mledger.impl.OffsetFinder; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.Node; @@ -109,6 +115,7 @@ import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.Topic.PublishContext; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.CompressionType; @@ -122,7 +129,6 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands.ChecksumType; import org.apache.pulsar.common.schema.KeyValue; @@ -140,6 +146,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final NamespaceName kafkaNamespace; private final ExecutorService executor; private final PulsarAdmin admin; + private final KafkaTopicManager topicManager; private static final Clock clock = Clock.systemDefaultZone(); private static final String FAKE_KOP_PRODUCER_NAME = "fake_kop_producer_name"; @@ -158,6 +165,7 @@ public KafkaRequestHandler(KafkaService kafkaService) throws Exception { kafkaService.getKafkaConfig().getKafkaNamespace()); this.executor = kafkaService.getExecutor(); this.admin = kafkaService.getAdminClient(); + this.topicManager = kafkaService.getKafkaTopicManager(); } protected CompletableFuture handleApiVersionsRequest(KafkaHeaderAndRequest apiVersionRequest) { @@ -445,6 +453,7 @@ protected CompletableFuture handleProduceRequest(KafkaHeader partitionResponse.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR)); } else { if (topicOpt.isPresent()) { + topicManager.addTopic(topicName.toString(), (PersistentTopic) topicOpt.get()); publishMessages(entry.getValue(), topicOpt.get(), partitionResponse); } else { log.error("[{}] Request {}: getOrCreateTopic get empty topic for name {}", @@ -760,49 +769,180 @@ protected CompletableFuture handleOffsetFetchRequest(KafkaHe return resultFuture; } - protected CompletableFuture handleListOffsetRequest(KafkaHeaderAndRequest listOffset) { - checkArgument(listOffset.getRequest() instanceof ListOffsetRequest); - ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest(); + private CompletableFuture + fetchOffsetForTimestamp(PersistentTopic persistentTopic, Long timestamp) { + ManagedLedgerImplWrapper managedLedger = new ManagedLedgerImplWrapper( + (ManagedLedgerImpl) persistentTopic.getManagedLedger()); - CompletableFuture resultFuture = new CompletableFuture<>(); + CompletableFuture partitionData = new CompletableFuture<>(); - List>> statsList = - request.partitionTimestamps().entrySet().stream() - .map((topicPartition) -> Pair.of(topicPartition.getKey(), admin.topics() - .getInternalStatsAsync(pulsarTopicName(topicPartition.getKey()).toString()))) - .collect(Collectors.toList()); + try { + if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP) { + PositionImpl position = managedLedger.getLastConfirmedEntry(); + if (log.isDebugEnabled()) { + log.debug("Get latest position for topic {} time {}. result: {}", + persistentTopic.getName(), timestamp, position); + } - CompletableFuture.allOf(statsList.stream().map(entry -> entry.getValue()).toArray(CompletableFuture[]::new)) - .whenComplete((ignore, ex) -> { - Map responses = - statsList.stream().map((v) -> { - TopicPartition key = v.getKey(); - try { - List ledgers = v.getValue().join().ledgers; - // return first ledger. - long offset = MessageIdUtils.getOffset(ledgers.get(0).ledgerId, 0); + // no entry in ledger, then entry id could be -1 + long entryId = position.getEntryId(); - if (log.isDebugEnabled()) { - log.debug("[{}] Request {}: topic {} Return offset: {}, ledgerId: {}.", - ctx.channel(), listOffset.getHeader(), key, offset, ledgers.get(0).ledgerId); + partitionData.complete(new ListOffsetResponse.PartitionData( + Errors.NONE, + RecordBatch.NO_TIMESTAMP, + MessageIdUtils + .getOffset(position.getLedgerId(), entryId == -1 ? 0 : entryId))); + } else if (timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) { + PositionImpl position = managedLedger.getFirstValidPosition(); + + if (log.isDebugEnabled()) { + log.debug("Get earliest position for topic {} time {}. result: {}", + persistentTopic.getName(), timestamp, position); + } + + partitionData.complete(new ListOffsetResponse.PartitionData( + Errors.NONE, + RecordBatch.NO_TIMESTAMP, + MessageIdUtils.getOffset(position.getLedgerId(), position.getEntryId()))); + } else { + // find with real wanted timestamp + OffsetFinder offsetFinder = new OffsetFinder(managedLedger.getManagedLedger()); + + offsetFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + PositionImpl finalPosition; + if (position == null) { + finalPosition = managedLedger.getFirstValidPosition(); + if (finalPosition == null) { + log.warn("Unable to find position for topic {} time {}. get NULL position", + persistentTopic.getName(), timestamp); + + partitionData.complete(new ListOffsetResponse + .PartitionData( + Errors.UNKNOWN_SERVER_ERROR, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)); + return; } - return Pair.of(key, new ListOffsetResponse.PartitionData( - Errors.NONE, -1L, offset)); - } catch (Exception e) { - log.error("[{}] Request {}: topic {} meet error of getInternalStats.", - ctx.channel(), listOffset.getHeader(), key, e); - return Pair.of(key, new ListOffsetResponse.PartitionData( - Errors.UNKNOWN_TOPIC_OR_PARTITION, -1L, -1L)); + } else { + finalPosition = (PositionImpl) position; + } + + if (log.isDebugEnabled()) { + log.debug("Find position for topic {} time {}. position: {}", + persistentTopic.getName(), timestamp, finalPosition); } - }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + partitionData.complete(new ListOffsetResponse.PartitionData( + Errors.NONE, + RecordBatch.NO_TIMESTAMP, + MessageIdUtils.getOffset(finalPosition.getLedgerId(), finalPosition.getEntryId()))); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Object ctx) { + log.warn("Unable to find position for topic {} time {}. Exception:", + persistentTopic.getName(), timestamp, exception); + partitionData.complete(new ListOffsetResponse + .PartitionData( + Errors.UNKNOWN_SERVER_ERROR, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)); + return; + } + }); + } + } catch (Exception e) { + log.error("Failed while get position for topic: {} ts: {}.", + persistentTopic.getName(), timestamp, e); + + partitionData.complete(new ListOffsetResponse.PartitionData( + Errors.UNKNOWN_SERVER_ERROR, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)); + } + + return partitionData; + } + + private CompletableFuture handleListOffsetRequestV1AndAbove(KafkaHeaderAndRequest listOffset) { + ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest(); + + CompletableFuture resultFuture = new CompletableFuture<>(); + Map> responseData = Maps.newHashMap(); + + request.partitionTimestamps().entrySet().stream().forEach(tms -> { + TopicPartition topic = tms.getKey(); + Long times = tms.getValue(); + String pulsarTopic = pulsarTopicName(topic).toString(); + CompletableFuture partitionData; + + + // topic not exist, return UNKNOWN_TOPIC_OR_PARTITION + if (!topicManager.topicExists(pulsarTopic)) { + partitionData = new CompletableFuture<>(); + partitionData.complete(new ListOffsetResponse + .PartitionData( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)); + } else { + PersistentTopic persistentTopic = topicManager.getTopic(pulsarTopic); + partitionData = fetchOffsetForTimestamp(persistentTopic, times); + } + + responseData.put(topic, partitionData); + }); + + CompletableFuture + .allOf(responseData.values().stream().toArray(CompletableFuture[]::new)) + .whenComplete((ignore, ex) -> { + ListOffsetResponse response = + new ListOffsetResponse(CoreUtils.mapValue(responseData, future -> future.join())); resultFuture.complete(ResponseAndRequest - .of(new ListOffsetResponse(responses), listOffset)); + .of(response, listOffset)); }); return resultFuture; } + // get offset from underline managedLedger + protected CompletableFuture handleListOffsetRequest(KafkaHeaderAndRequest listOffset) { + checkArgument(listOffset.getRequest() instanceof ListOffsetRequest); + + // not support version 0 + if (listOffset.getHeader().apiVersion() == 0) { + CompletableFuture resultFuture = new CompletableFuture<>(); + ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest(); + + log.error("ListOffset not support V0 format request"); + + ListOffsetResponse response = new ListOffsetResponse(CoreUtils.mapValue(request.partitionTimestamps(), + ignored -> new ListOffsetResponse + .PartitionData(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Lists.newArrayList()))); + + resultFuture.complete(ResponseAndRequest + .of(response, listOffset)); + + return resultFuture; + } + + return handleListOffsetRequestV1AndAbove(listOffset); + } + + // For non exist topics handleOffsetCommitRequest return UNKNOWN_TOPIC_OR_PARTITION + private Map nonExistingTopicErrors(OffsetCommitRequest request) { + return request.offsetData().entrySet().stream() + .filter(entry -> + // filter not exist topics + !topicManager.topicExists(pulsarTopicName(entry.getKey()).toString())) + .collect(Collectors.toMap( + e -> e.getKey(), + e -> Errors.UNKNOWN_TOPIC_OR_PARTITION + )); + } + protected CompletableFuture handleOffsetCommitRequest(KafkaHeaderAndRequest offsetCommit) { checkArgument(offsetCommit.getRequest() instanceof OffsetCommitRequest); checkState(kafkaService.getGroupCoordinator() != null, @@ -811,6 +951,8 @@ protected CompletableFuture handleOffsetCommitRequest(KafkaH OffsetCommitRequest request = (OffsetCommitRequest) offsetCommit.getRequest(); CompletableFuture resultFuture = new CompletableFuture<>(); + Map nonExistingTopic = nonExistingTopicErrors(request); + kafkaService.getGroupCoordinator().handleCommitOffsets( request.groupId(), request.memberId(), @@ -821,6 +963,9 @@ protected CompletableFuture handleOffsetCommitRequest(KafkaH OffsetAndMetadata.apply(partitionData.offset, partitionData.metadata, partitionData.timestamp) ) ).thenAccept(offsetCommitResult -> { + if (nonExistingTopic != null) { + offsetCommitResult.putAll(nonExistingTopic); + } OffsetCommitResponse response = new OffsetCommitResponse(offsetCommitResult); resultFuture.complete(ResponseAndRequest.of(response, offsetCommit)); }); @@ -1003,7 +1148,7 @@ public void readEntriesComplete(List list, Object o) { entry.getLength(), keptOffset, offset); } - kafkaService.getKafkaTopicManager() + topicManager .getTopicConsumerManager(topicName.toString()) .thenAccept(cm -> cm.add(offset + 1, Pair.of(cursor, offset + 1))); } else { @@ -1013,7 +1158,7 @@ public void readEntriesComplete(List list, Object o) { keptOffset); } - kafkaService.getKafkaTopicManager() + topicManager .getTopicConsumerManager(topicName.toString()) .thenAccept(cm -> cm.add(keptOffset, Pair.of(cursor, keptOffset))); @@ -1069,8 +1214,7 @@ protected CompletableFuture handleFetchRequest(KafkaHeaderAn return Pair.of( entry.getKey(), - kafkaService.getKafkaTopicManager() - .getTopicConsumerManager(topicName.toString()) + topicManager.getTopicConsumerManager(topicName.toString()) .thenCompose(cm -> cm.remove(offset))); }) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); @@ -1295,6 +1439,26 @@ private CompletableFuture findBroker(KafkaService kafkaServic log.debug("Found broker: {} for topicName: {}", uri, topic); } + // auto create topic. + String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress( + kafkaService.getKafkaConfig().getAdvertisedAddress()); + if (!topicManager.topicExists(topic.toString()) && hostname.equals(uri.getHost())) { + kafkaService.getBrokerService().getTopic(topic.toString(), true) + .whenComplete((topicOpt, exception) -> { + if (exception != null) { + log.error("[{}] findBroker: Failed to getOrCreateTopic {}. exception:", + ctx.channel(), topic.toString(), exception); + } else { + if (topicOpt.isPresent()) { + topicManager.addTopic(topic.toString(), (PersistentTopic) topicOpt.get()); + } else { + log.error("[{}] findBroker: getOrCreateTopic get empty topic for name {}", + ctx.channel(), topic.toString()); + } + } + }); + } + Node node = newNode(new InetSocketAddress( uri.getHost(), kafkaService.getKafkaConfig().getKafkaServicePort().get())); diff --git a/src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java b/src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java index 2dfb90084c5f3..71733fc119598 100644 --- a/src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java +++ b/src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java @@ -16,14 +16,13 @@ import static com.google.common.base.Preconditions.checkArgument; import io.streamnative.kop.utils.MessageIdUtils; -import io.streamnative.kop.utils.ReflectionUtils; -import java.lang.reflect.Method; import java.util.UUID; import java.util.concurrent.CompletableFuture; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImplWrapper; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; import org.apache.commons.codec.digest.DigestUtils; @@ -72,11 +71,9 @@ public CompletableFuture> remove(long offset) { try { // get previous position, because NonDurableCursor is read from next position. - ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); - Method getPreviousPosition = ReflectionUtils - .setMethodAccessible(ledger, "getPreviousPosition", PositionImpl.class); - PositionImpl previous = (PositionImpl) getPreviousPosition.invoke(ledger, position); - + ManagedLedgerImplWrapper ledger = + new ManagedLedgerImplWrapper((ManagedLedgerImpl) topic.getManagedLedger()); + PositionImpl previous = ledger.getPreviousPosition(position); if (log.isDebugEnabled()) { log.debug("Create cursor {} for offset: {}. position: {}, previousPosition: {}", cursorName, offset, position, previous); diff --git a/src/main/java/io/streamnative/kop/KafkaTopicManager.java b/src/main/java/io/streamnative/kop/KafkaTopicManager.java index b369e4fedec21..8736a984da355 100644 --- a/src/main/java/io/streamnative/kop/KafkaTopicManager.java +++ b/src/main/java/io/streamnative/kop/KafkaTopicManager.java @@ -28,18 +28,23 @@ public class KafkaTopicManager { private final BrokerService service; + + // consumerTopics for consumers cache. @Getter - private final ConcurrentOpenHashMap> topics; + private final ConcurrentOpenHashMap> consumerTopics; + + // cache for topics + private final ConcurrentOpenHashMap topics; KafkaTopicManager(BrokerService service) { this.service = service; + consumerTopics = new ConcurrentOpenHashMap<>(); topics = new ConcurrentOpenHashMap<>(); } - // topicName is in pulsar format. e.g. persistent://public/default/topic-partition-0 public CompletableFuture getTopicConsumerManager(String topicName) { - return topics.computeIfAbsent( + return consumerTopics.computeIfAbsent( topicName, t -> service .getTopic(topicName, true) @@ -48,6 +53,7 @@ public CompletableFuture getTopicConsumerManager(Stri log.debug("Call getTopicConsumerManager for {}, and create KafkaTopicConsumerManager.", topicName); } + topics.putIfAbsent(topicName, (PersistentTopic) t2.get()); return new KafkaTopicConsumerManager((PersistentTopic) t2.get()); }) .exceptionally(ex -> { @@ -57,4 +63,19 @@ public CompletableFuture getTopicConsumerManager(Stri }) ); } + + // whether topic exists or not + public boolean topicExists(String topicName) { + return topics.containsKey(topicName); + } + + public PersistentTopic addTopic(String topicName, PersistentTopic persistentTopic) { + return topics.putIfAbsent(topicName, persistentTopic); + } + + public PersistentTopic getTopic(String topicName) { + return topics.get(topicName); + } + + } diff --git a/src/main/java/io/streamnative/kop/coordinator/group/GroupCoordinator.java b/src/main/java/io/streamnative/kop/coordinator/group/GroupCoordinator.java index 4d60cd2a2723f..49d8bda35aae1 100644 --- a/src/main/java/io/streamnative/kop/coordinator/group/GroupCoordinator.java +++ b/src/main/java/io/streamnative/kop/coordinator/group/GroupCoordinator.java @@ -721,35 +721,37 @@ public CompletableFuture> handleCommitOffsets( int generationId, Map offsetMetadata ) { - return validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT).map(error -> - CompletableFuture.completedFuture( - CoreUtils.mapValue( - offsetMetadata, - ignored -> error - ) - ) - ).orElseGet(() -> { - return groupManager.getGroup(groupId).map(group -> - doCommitOffsets( - group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH, - offsetMetadata + return validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) + .map(error -> + CompletableFuture.completedFuture( + CoreUtils.mapValue( + offsetMetadata, + ignored -> error + ) ) ).orElseGet(() -> { - if (generationId < 0) { - // the group is not relying on Kafka for group management, so allow the commit - GroupMetadata group = groupManager.addGroup(new GroupMetadata(groupId, Empty)); - return doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH, - offsetMetadata); - } else { - return CompletableFuture.completedFuture( - CoreUtils.mapValue( - offsetMetadata, - ignored -> Errors.ILLEGAL_GENERATION + return groupManager.getGroup(groupId) + .map(group -> + doCommitOffsets( + group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH, + offsetMetadata ) - ); - } + ).orElseGet(() -> { + if (generationId < 0) { + // the group is not relying on Kafka for group management, so allow the commit + GroupMetadata group = groupManager.addGroup(new GroupMetadata(groupId, Empty)); + return doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH, + offsetMetadata); + } else { + return CompletableFuture.completedFuture( + CoreUtils.mapValue( + offsetMetadata, + ignored -> Errors.ILLEGAL_GENERATION + ) + ); + } + }); }); - }); } public Future scheduleHandleTxnCompletion( @@ -896,11 +898,14 @@ private Optional validateGroupStatus(String groupId, return Optional.of(Errors.COORDINATOR_NOT_AVAILABLE); } else if (groupManager.isGroupLoading(groupId)) { return Optional.of(Errors.COORDINATOR_LOAD_IN_PROGRESS); - } else if (!groupManager.isGroupLocal(groupId) - && api != ApiKeys.JOIN_GROUP // first time join, group may not persisted. - && api != ApiKeys.SYNC_GROUP - && api != ApiKeys.OFFSET_FETCH) { - return Optional.of(Errors.NOT_COORDINATOR); + // TODO: make group coordinator running in distributed mode. + // https://github.com/streamnative/kop/issues/32 + // } else if (!groupManager.isGroupLocal(groupId) + // && api != ApiKeys.JOIN_GROUP // first time join, group may not persisted. + // && api != ApiKeys.SYNC_GROUP + // && api != ApiKeys.OFFSET_FETCH) { + // return Optional.of(Errors.NOT_COORDINATOR); + // } } else { return Optional.empty(); } diff --git a/src/main/java/io/streamnative/kop/utils/CoreUtils.java b/src/main/java/io/streamnative/kop/utils/CoreUtils.java index fc42e011a85c7..7c12323b16a51 100644 --- a/src/main/java/io/streamnative/kop/utils/CoreUtils.java +++ b/src/main/java/io/streamnative/kop/utils/CoreUtils.java @@ -66,4 +66,14 @@ public static Map mapValue(Map map, )); } + public static Map mapKeyValue(Map map, + Function, V2> func) { + return map.entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> func.apply(e) + )); + } + } diff --git a/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java b/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java index 9d71b722dae5b..88757e2de56a2 100644 --- a/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java +++ b/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java @@ -13,6 +13,8 @@ */ package io.streamnative.kop.utils; +import static com.google.common.base.Preconditions.checkArgument; + import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -26,12 +28,17 @@ public static final long getOffset(long ledgerId, long entryId) { // Combine ledger id and entry id to form offset // Use less than 32 bits to represent entry id since it will get // rolled over way before overflowing the max int range + checkArgument(ledgerId > 0, "Expected ledgerId > 0, but get " + ledgerId); + checkArgument(entryId >= 0, "Expected entryId >= 0, but get " + entryId); + long offset = (ledgerId << 28) | entryId; return offset; } public static final MessageId getMessageId(long offset) { // De-multiplex ledgerId and entryId from offset + checkArgument(offset > 0, "Expected Offset > 0, but get " + offset); + long ledgerId = offset >>> 28; long entryId = offset & 0x0F_FF_FF_FFL; @@ -40,6 +47,8 @@ public static final MessageId getMessageId(long offset) { public static final PositionImpl getPosition(long offset) { // De-multiplex ledgerId and entryId from offset + checkArgument(offset >= 0, "Expected Offset >= 0, but get " + offset); + long ledgerId = offset >>> 28; long entryId = offset & 0x0F_FF_FF_FFL; diff --git a/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImplWrapper.java b/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImplWrapper.java new file mode 100644 index 0000000000000..7bc183ac342fc --- /dev/null +++ b/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImplWrapper.java @@ -0,0 +1,56 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import lombok.Getter; + +/** + * A wrapper to make ManagedLedgerImpl accessible. + */ +public class ManagedLedgerImplWrapper { + @Getter + private final ManagedLedgerImpl managedLedger; + + public ManagedLedgerImplWrapper(ManagedLedgerImpl managedLedger) { + this.managedLedger = managedLedger; + } + + public PositionImpl getNextValidPosition(final PositionImpl position) { + return managedLedger.getNextValidPosition(position); + } + + // return PositionImpl(firstLedgerId, -1) + public PositionImpl getFirstPosition() { + return managedLedger.getFirstPosition(); + } + + // combine getFirstPosition and getNextValidPosition together. + public PositionImpl getFirstValidPosition() { + PositionImpl firstPosition = managedLedger.getFirstPosition(); + if (firstPosition == null) { + return null; + } else { + return getNextValidPosition(firstPosition); + } + } + + public PositionImpl getPreviousPosition(PositionImpl position) { + return managedLedger.getPreviousPosition(position); + } + + public PositionImpl getLastConfirmedEntry() { + return (PositionImpl) managedLedger.getLastConfirmedEntry(); + } + +} diff --git a/src/main/java/org/apache/bookkeeper/mledger/impl/OffsetFinder.java b/src/main/java/org/apache/bookkeeper/mledger/impl/OffsetFinder.java new file mode 100644 index 0000000000000..a7f4028106c5e --- /dev/null +++ b/src/main/java/org/apache/bookkeeper/mledger/impl/OffsetFinder.java @@ -0,0 +1,129 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Predicate; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.client.impl.MessageImpl; + +/** + * given a timestamp find the first message (position) (published) at or before the timestamp. + * Most of the code is similar to Pulsar class `PersistentMessageFinder`. + */ +@Slf4j +public class OffsetFinder implements AsyncCallbacks.FindEntryCallback { + private final ManagedLedgerImpl managedLedger; + private long timestamp = 0; + + private static final int FALSE = 0; + private static final int TRUE = 1; + @SuppressWarnings("unused") + private volatile int messageFindInProgress = FALSE; + private static final AtomicIntegerFieldUpdater messageFindInProgressUpdater = + AtomicIntegerFieldUpdater.newUpdater(OffsetFinder.class, "messageFindInProgress"); + + public OffsetFinder(ManagedLedgerImpl managedLedger) { + this.managedLedger = managedLedger; + } + + public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback callback) { + this.timestamp = timestamp; + if (messageFindInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { + if (log.isDebugEnabled()) { + log.debug("[{}] Starting message position find at timestamp {}", timestamp); + } + + asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> { + MessageImpl msg = null; + try { + msg = MessageImpl.deserialize(entry.getDataBuffer()); + return msg.getPublishTime() <= timestamp; + } catch (Exception e) { + log.error("[{}][{}] Error deserializing message for message position find", e); + } finally { + entry.release(); + if (msg != null) { + msg.recycle(); + } + } + return false; + }, this, callback); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running"); + } + callback.findEntryFailed( + new ManagedLedgerException.ConcurrentFindCursorPositionException("last find is still running"), + null); + } + } + + @Override + public void findEntryComplete(Position position, Object ctx) { + checkArgument(ctx instanceof AsyncCallbacks.FindEntryCallback); + AsyncCallbacks.FindEntryCallback callback = (AsyncCallbacks.FindEntryCallback) ctx; + if (position != null) { + log.info("[{}][{}] Found position {} closest to provided timestamp {}", position, + timestamp); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] No position found closest to provided timestamp {}", timestamp); + } + } + messageFindInProgress = FALSE; + callback.findEntryComplete(position, null); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Object ctx) { + checkArgument(ctx instanceof AsyncCallbacks.FindEntryCallback); + AsyncCallbacks.FindEntryCallback callback = (AsyncCallbacks.FindEntryCallback) ctx; + if (log.isDebugEnabled()) { + log.debug("[{}][{}] message position find operation failed for provided timestamp {}", + timestamp, exception); + } + messageFindInProgress = FALSE; + callback.findEntryFailed(exception, null); + } + + public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + FindEntryCallback callback, Object ctx) { + checkState(constraint == FindPositionConstraint.SearchAllAvailableEntries); + + // return PositionImpl(firstLedgerId, -1) + PositionImpl startPosition = managedLedger.getFirstPosition(); + long max = managedLedger.getNumberOfEntries() - 1; + + if (startPosition == null) { + callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"), ctx); + return; + } else { + startPosition = managedLedger.getNextValidPosition(startPosition); + } + + OpFindNewestEntry op = new OpFindNewestEntry(managedLedger, startPosition, condition, max, callback, ctx); + op.find(); + } +} diff --git a/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewestEntry.java b/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewestEntry.java new file mode 100644 index 0000000000000..52135ffd8c220 --- /dev/null +++ b/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewestEntry.java @@ -0,0 +1,124 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import com.google.common.base.Predicate; +import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; + +/** + * Used to find Entry/Offset from ManagedLedger. + * Copy from org.apache.bookkeeper.mledger.impl.OpFindNewest. + * only different is here we passed in ManagedLedgerImpl directly. + */ +class OpFindNewestEntry implements ReadEntryCallback { + private final ManagedLedgerImpl managedLedger; + private final PositionImpl startPosition; + private final FindEntryCallback callback; + private final Predicate condition; + private final Object ctx; + + enum State { + checkFirst, checkLast, searching + } + + PositionImpl searchPosition; + long min; + long max; + Position lastMatchedPosition = null; + State state; + + public OpFindNewestEntry(ManagedLedgerImpl managedLedger, PositionImpl startPosition, Predicate condition, + long numberOfEntries, FindEntryCallback callback, Object ctx) { + this.managedLedger = managedLedger; + this.startPosition = startPosition; + this.callback = callback; + this.condition = condition; + this.ctx = ctx; + + this.min = 0; + this.max = numberOfEntries; + + this.searchPosition = startPosition; + this.state = State.checkFirst; + } + + @Override + public void readEntryComplete(Entry entry, Object ctx) { + final Position position = entry.getPosition(); + switch (state) { + case checkFirst: + if (!condition.apply(entry)) { + callback.findEntryComplete(startPosition, this.ctx); + return; + } else { + lastMatchedPosition = position; + + // check last entry + state = State.checkLast; + searchPosition = managedLedger.getPositionAfterN(searchPosition, max, PositionBound.startExcluded); + find(); + } + break; + case checkLast: + if (condition.apply(entry)) { + callback.findEntryComplete(position, this.ctx); + return; + } else { + // start binary search + state = State.searching; + searchPosition = managedLedger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded); + find(); + } + break; + case searching: + if (condition.apply(entry)) { + // mid - last + lastMatchedPosition = position; + min = mid(); + } else { + // start - mid + max = mid() - 1; + } + + if (max <= min) { + callback.findEntryComplete(lastMatchedPosition, this.ctx); + return; + } + searchPosition = managedLedger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded); + find(); + } + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + callback.findEntryFailed(exception, this.ctx); + } + + public void find() { + if (managedLedger.hasMoreEntries(searchPosition)) { + managedLedger.asyncReadEntry(searchPosition, this, null); + } else { + callback.findEntryComplete(lastMatchedPosition, this.ctx); + } + } + + private long mid() { + return min + Math.max((max - min) / 2, 1); + } +} diff --git a/src/main/java/org/apache/bookkeeper/mledger/impl/package-info.java b/src/main/java/org/apache/bookkeeper/mledger/impl/package-info.java new file mode 100644 index 0000000000000..25531f51907ed --- /dev/null +++ b/src/main/java/org/apache/bookkeeper/mledger/impl/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Timer related classes. + * + *

The classes under this package are ported from Kafka. + */ +package org.apache.bookkeeper.mledger.impl; diff --git a/src/test/java/io/streamnative/kop/FetchRequestTest.java b/src/test/java/io/streamnative/kop/FetchRequestTest.java new file mode 100644 index 0000000000000..e69de8eca4e83 --- /dev/null +++ b/src/test/java/io/streamnative/kop/FetchRequestTest.java @@ -0,0 +1,270 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop; + +import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.streamnative.kop.KafkaCommandDecoder.KafkaHeaderAndRequest; +import io.streamnative.kop.KafkaCommandDecoder.ResponseAndRequest; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.FetchResponse.PartitionData; +import org.apache.kafka.common.serialization.StringSerializer; +import org.testng.annotations.Test; + +/** + * Validate KafkaApisTest. + */ +@Slf4j +public class FetchRequestTest extends KafkaApisTest { + private void checkFetchResponse(List expectedPartitions, + FetchResponse fetchResponse, + int maxPartitionBytes, + int maxResponseBytes, + int numMessagesPerPartition) { + + assertEquals(expectedPartitions.size(), fetchResponse.responseData().size()); + expectedPartitions.forEach(tp -> assertTrue(fetchResponse.responseData().get(tp) != null)); + + final AtomicBoolean emptyResponseSeen = new AtomicBoolean(false); + AtomicInteger responseSize = new AtomicInteger(0); + AtomicInteger responseBufferSize = new AtomicInteger(0); + + expectedPartitions.forEach(tp -> { + PartitionData partitionData = fetchResponse.responseData().get(tp); + assertEquals(Errors.NONE, partitionData.error); + assertTrue(partitionData.highWatermark > 0); + + MemoryRecords records = (MemoryRecords) partitionData.records; + AtomicInteger batchesSize = new AtomicInteger(0); + responseBufferSize.addAndGet(records.sizeInBytes()); + List batches = Lists.newArrayList(); + records.batches().forEach(batch -> { + batches.add(batch); + batchesSize.addAndGet(batch.sizeInBytes()); + }); + assertTrue(batches.size() < numMessagesPerPartition); + responseSize.addAndGet(batchesSize.get()); + + if (batchesSize.get() == 0 && !emptyResponseSeen.get()) { + assertEquals(0, records.sizeInBytes()); + emptyResponseSeen.set(true); + } else if (batchesSize.get() != 0 && !emptyResponseSeen.get()) { + assertTrue(batchesSize.get() <= maxPartitionBytes); + assertTrue(maxPartitionBytes >= records.sizeInBytes()); + } else if (batchesSize.get() != 0 && emptyResponseSeen.get()) { + fail("Expected partition with size 0, but found " + tp + " with size " + batchesSize.get()); + } else if (records.sizeInBytes() != 0 && emptyResponseSeen.get()) { + fail("Expected partition buffer with size 0, but found " + + tp + " with size " + records.sizeInBytes()); + } + }); + + // In Kop implementation, fetch at least 1 item for each topicPartition in the request. + } + + private Map createPartitionMap(int maxPartitionBytes, + List topicPartitions, + Map offsetMap) { + return topicPartitions.stream() + .map(topic -> + Pair.of(topic, + new FetchRequest.PartitionData( + offsetMap.getOrDefault(topic, 0L), + 0L, + maxPartitionBytes))) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + } + + private KafkaHeaderAndRequest createFetchRequest(int maxResponseBytes, + int maxPartitionBytes, + List topicPartitions, + Map offsetMap) { + + AbstractRequest.Builder builder = FetchRequest.Builder + .forConsumer(Integer.MAX_VALUE, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap)) + .setMaxBytes(maxResponseBytes); + + return buildRequest(builder); + } + + private List createTopics(String topicName, int numTopics, int numPartitions) throws Exception { + List result = Lists.newArrayListWithExpectedSize(numPartitions * numTopics); + + for (int topicIndex = 0; topicIndex < numTopics; topicIndex++) { + String tName = topicName + "_" + topicIndex; + kafkaService.getAdminClient().topics().createPartitionedTopic(tName, numPartitions); + + for (int partitionIndex = 0; partitionIndex < numPartitions; partitionIndex++) { + kafkaService.getAdminClient().topics() + .createNonPartitionedTopic(tName + PARTITIONED_TOPIC_SUFFIX + partitionIndex); + result.add(new TopicPartition(tName, partitionIndex)); + } + } + + return result; + } + + private KafkaProducer createKafkaProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost" + ":" + getKafkaBrokerPort()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "FetchRequestTestProducer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + KafkaProducer producer = new KafkaProducer<>(props); + return producer; + } + + private void produceData(KafkaProducer producer, + List topicPartitions, + int numMessagesPerPartition) throws Exception{ + for (int index = 0; index < topicPartitions.size(); index++) { + TopicPartition tp = topicPartitions.get(index); + for (int messageIndex = 0; messageIndex < numMessagesPerPartition; messageIndex++) { + String suffix = tp.toString() + "-" + messageIndex; + producer + .send( + new ProducerRecord<>( + tp.topic(), + tp.partition(), + "key " + suffix, + "value " + suffix)) + .get(); + } + } + } + + @Test(timeOut = 20000) + public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { + String topicName = "kopBrokerRespectsPartitionsOrderAndSizeLimits"; + int numberTopics = 5; + int numberPartitions = 6; + + int messagesPerPartition = 9; + int maxResponseBytes = 800; + int maxPartitionBytes = 190; + + List topicPartitions = createTopics(topicName, numberTopics, numberPartitions); + + List partitionsWithLargeMessages = topicPartitions + .subList(topicPartitions.size() - 2, topicPartitions.size()); + TopicPartition partitionWithLargeMessage1 = partitionsWithLargeMessages.get(0); + TopicPartition partitionWithLargeMessage2 = partitionsWithLargeMessages.get(1); + List partitionsWithoutLargeMessages = topicPartitions + .subList(0, topicPartitions.size() - 2); + + @Cleanup + KafkaProducer kProducer = createKafkaProducer(); + produceData(kProducer, topicPartitions, messagesPerPartition); + + kProducer + .send( + new ProducerRecord<>( + partitionWithLargeMessage1.topic(), + partitionWithLargeMessage1.partition(), + "larger than partition limit", + new String(new byte[maxPartitionBytes + 1]))) + .get(); + + kProducer + .send( + new ProducerRecord<>( + partitionWithLargeMessage2.topic(), + partitionWithLargeMessage2.partition(), + "larger than partition limit", + new String(new byte[maxResponseBytes + 1]))) + .get(); + + // 1. Partitions with large messages at the end + Collections.shuffle(partitionsWithoutLargeMessages); + List shuffledTopicPartitions1 = Lists.newArrayListWithExpectedSize(topicPartitions.size()); + shuffledTopicPartitions1.addAll(partitionsWithoutLargeMessages); + shuffledTopicPartitions1.addAll(partitionsWithLargeMessages); + + KafkaHeaderAndRequest fetchRequest1 = createFetchRequest( + maxResponseBytes, + maxPartitionBytes, + shuffledTopicPartitions1, + Collections.EMPTY_MAP); + CompletableFuture responseFuture1 = kafkaRequestHandler.handleFetchRequest(fetchRequest1); + FetchResponse fetchResponse1 = + (FetchResponse) responseFuture1.get().getResponse(); + + checkFetchResponse(shuffledTopicPartitions1, fetchResponse1, + maxPartitionBytes, maxResponseBytes, messagesPerPartition); + + // 2. Same as 1, but shuffled again + Collections.shuffle(partitionsWithoutLargeMessages); + List shuffledTopicPartitions2 = Lists.newArrayListWithExpectedSize(topicPartitions.size()); + shuffledTopicPartitions2.addAll(partitionsWithoutLargeMessages); + shuffledTopicPartitions2.addAll(partitionsWithLargeMessages); + + KafkaHeaderAndRequest fetchRequest2 = createFetchRequest( + maxResponseBytes, + maxPartitionBytes, + shuffledTopicPartitions2, + Collections.EMPTY_MAP); + CompletableFuture responseFuture2 = kafkaRequestHandler.handleFetchRequest(fetchRequest2); + FetchResponse fetchResponse2 = + (FetchResponse) responseFuture2.get().getResponse(); + + checkFetchResponse(shuffledTopicPartitions2, fetchResponse2, + maxPartitionBytes, maxResponseBytes, messagesPerPartition); + + // 3. Partition with message larger than the partition limit at the start of the list + Collections.shuffle(partitionsWithoutLargeMessages); + List shuffledTopicPartitions3 = Lists.newArrayListWithExpectedSize(topicPartitions.size()); + shuffledTopicPartitions3.addAll(partitionsWithLargeMessages); + shuffledTopicPartitions3.addAll(partitionsWithoutLargeMessages); + + + Map offsetMaps = Maps.newHashMap(); + offsetMaps.put(partitionWithLargeMessage1, Long.valueOf(messagesPerPartition)); + KafkaHeaderAndRequest fetchRequest3 = createFetchRequest( + maxResponseBytes, + maxPartitionBytes, + shuffledTopicPartitions3, + offsetMaps); + CompletableFuture responseFuture3 = kafkaRequestHandler.handleFetchRequest(fetchRequest3); + FetchResponse fetchResponse3 = + (FetchResponse) responseFuture3.get().getResponse(); + + checkFetchResponse(shuffledTopicPartitions3, fetchResponse3, + maxPartitionBytes, maxResponseBytes, messagesPerPartition); + } +} diff --git a/src/test/java/io/streamnative/kop/KafkaApisTest.java b/src/test/java/io/streamnative/kop/KafkaApisTest.java new file mode 100644 index 0000000000000..987b197b31d92 --- /dev/null +++ b/src/test/java/io/streamnative/kop/KafkaApisTest.java @@ -0,0 +1,325 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop; + +import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.streamnative.kop.KafkaCommandDecoder.KafkaHeaderAndRequest; +import io.streamnative.kop.KafkaCommandDecoder.ResponseAndRequest; +import io.streamnative.kop.utils.MessageIdUtils; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.IsolationLevel; +import org.apache.kafka.common.requests.ListOffsetRequest; +import org.apache.kafka.common.requests.ListOffsetResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetCommitRequest.PartitionData; +import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Validate KafkaApisTest. + */ +@Slf4j +public class KafkaApisTest extends MockKafkaServiceBaseTest { + + KafkaRequestHandler kafkaRequestHandler; + SocketAddress serviceAddress; + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + log.info("success internal setup"); + + if (!admin.clusters().getClusters().contains(configClusterName)) { + // so that clients can test short names + admin.clusters().createCluster(configClusterName, + new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + } else { + admin.clusters().updateCluster(configClusterName, + new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + } + + if (!admin.tenants().getTenants().contains("public")) { + admin.tenants().createTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + } else { + admin.tenants().updateTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + } + if (!admin.namespaces().getNamespaces("public").contains("public/default")) { + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/default", + new RetentionPolicies(60, 1000)); + } + if (!admin.namespaces().getNamespaces("public").contains("public/__kafka")) { + admin.namespaces().createNamespace("public/__kafka"); + admin.namespaces().setNamespaceReplicationClusters("public/__kafka", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/__kafka", + new RetentionPolicies(-1, -1)); + } + + log.info("created namespaces, init handler"); + + kafkaRequestHandler = new KafkaRequestHandler(kafkaService); + ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); + Channel mockChannel = mock(Channel.class); + doReturn(mockChannel).when(mockCtx).channel(); + kafkaRequestHandler.ctx = mockCtx; + + serviceAddress = new InetSocketAddress(kafkaService.getBindAddress(), kafkaBrokerPort); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder) { + AbstractRequest request = builder.build(); + builder.apiKey(); + + ByteBuffer serializedRequest = request + .serialize(new RequestHeader(builder.apiKey(), request.version(), "fake_client_id", 0)); + + ByteBuf byteBuf = Unpooled.copiedBuffer(serializedRequest); + + RequestHeader header = RequestHeader.parse(serializedRequest); + + ApiKeys apiKey = header.apiKey(); + short apiVersion = header.apiVersion(); + Struct struct = apiKey.parseRequest(apiVersion, serializedRequest); + AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); + return new KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress); + } + + CompletableFuture checkInvalidPartition(String topic, + int invalidPartitionId) { + TopicPartition invalidTopicPartition = new TopicPartition(topic, invalidPartitionId); + PartitionData partitionOffsetCommitData = new OffsetCommitRequest.PartitionData(15L, ""); + Map offsetData = Maps.newHashMap(); + offsetData.put(invalidTopicPartition, partitionOffsetCommitData); + KafkaHeaderAndRequest request = buildRequest(new OffsetCommitRequest.Builder("groupId", offsetData)); + return kafkaRequestHandler.handleOffsetCommitRequest(request); + } + + @Test(timeOut = 20000) + public void testOffsetCommitWithInvalidPartition() throws Exception { + String topicName = "kopOffsetCommitWithInvalidPartition"; + + // invalid partition id -1; + CompletableFuture invalidResponse1 = checkInvalidPartition(topicName, -1); + ResponseAndRequest response1 = invalidResponse1.get(); + assertEquals(response1.getRequest().getHeader().apiKey(), ApiKeys.OFFSET_COMMIT); + TopicPartition topicPartition1 = new TopicPartition(topicName, -1); + assertEquals(((OffsetCommitResponse) response1.getResponse()).responseData().get(topicPartition1), + Errors.UNKNOWN_TOPIC_OR_PARTITION); + + // invalid partition id 1. + CompletableFuture invalidResponse2 = checkInvalidPartition(topicName, 1); + TopicPartition topicPartition2 = new TopicPartition(topicName, 1); + ResponseAndRequest response2 = invalidResponse2.get(); + assertEquals(response2.getRequest().getHeader().apiKey(), ApiKeys.OFFSET_COMMIT); + assertEquals(((OffsetCommitResponse) response2.getResponse()).responseData().get(topicPartition2), + Errors.UNKNOWN_TOPIC_OR_PARTITION); + } + + // TODO: Add transaction support https://github.com/streamnative/kop/issues/39 + // testTxnOffsetCommitWithInvalidPartition + // testAddPartitionsToTxnWithInvalidPartition + // shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported + // shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported + // shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported + // shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported + // shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported + // shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired + // shouldRespondWithUnknownTopicWhenPartitionIsNotHosted + // shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition + // shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition + // shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion + + // these 2 test cases test HighWatermark and LastStableOffset. they are the same for Pulsar, + // so combine it in one test case. + // Test ListOffset for earliest get the earliest message in topic. + // testReadUncommittedConsumerListOffsetEarliestOffsetEqualsHighWatermark + // testReadCommittedConsumerListOffsetEarliestOffsetEqualsLastStableOffset + @Test(timeOut = 20000) + public void testReadUncommittedConsumerListOffsetEarliestOffsetEquals() throws Exception { + String topicName = "testReadUncommittedConsumerListOffsetEarliest"; + TopicPartition tp = new TopicPartition(topicName, 0); + + // use producer to create some message to get Limit Offset. + String pulsarTopicName = "persistent://public/default/" + topicName; + + // create partitioned topic. + kafkaService.getAdminClient().topics().createPartitionedTopic(topicName, 1); + + // 1. prepare topic: + // use kafka producer to produce 10 messages. + // use pulsar consumer to get message offset. + @Cleanup + KProducer kProducer = new KProducer(topicName, false, getKafkaBrokerPort()); + int totalMsgs = 10; + String messageStrPrefix = topicName + "_message_"; + + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; + kProducer.getProducer() + .send(new ProducerRecord<>( + topicName, + i, + messageStr)) + .get(); + log.debug("Kafka Producer Sent message: ({}, {})", i, messageStr); + } + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(pulsarTopicName) + .subscriptionName(topicName + "_sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Message msg = consumer.receive(100, TimeUnit.MILLISECONDS); + assertNotNull(msg); + MessageIdImpl messageId = (MessageIdImpl) ((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId(); + // first entry should be the limit offset. + long limitOffset = MessageIdUtils.getOffset(messageId.getLedgerId(), 0); + log.info("After create {} messages, get messageId: {} expected earliest limit: {}", + totalMsgs, messageId, limitOffset); + + // 2. real test, for ListOffset request verify Earliest get earliest + Map targetTimes = Maps.newHashMap(); + targetTimes.put(tp, ListOffsetRequest.EARLIEST_TIMESTAMP); + + ListOffsetRequest.Builder builder = ListOffsetRequest.Builder + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .setTargetTimes(targetTimes); + + KafkaHeaderAndRequest request = buildRequest(builder); + CompletableFuture responseFuture = kafkaRequestHandler + .handleListOffsetRequest(request); + + ResponseAndRequest response = responseFuture.get(); + ListOffsetResponse listOffsetResponse = (ListOffsetResponse) response.getResponse(); + assertEquals(response.getRequest().getHeader().apiKey(), ApiKeys.LIST_OFFSETS); + assertEquals(listOffsetResponse.responseData().get(tp).error, Errors.NONE); + assertEquals(listOffsetResponse.responseData().get(tp).offset, Long.valueOf(limitOffset)); + assertEquals(listOffsetResponse.responseData().get(tp).timestamp, Long.valueOf(NO_TIMESTAMP)); + } + + // these 2 test cases test Read Commit / UnCommit. they are the same for Pulsar, + // so combine it in one test case. + // Test ListOffset for latest get the earliest message in topic. + // testReadUncommittedConsumerListOffsetLatest + // testReadCommittedConsumerListOffsetLatest + @Test(timeOut = 20000) + public void testConsumerListOffsetLatest() throws Exception { + String topicName = "testConsumerListOffsetLatest"; + TopicPartition tp = new TopicPartition(topicName, 0); + + // use producer to create some message to get Limit Offset. + String pulsarTopicName = "persistent://public/default/" + topicName; + + // create partitioned topic. + kafkaService.getAdminClient().topics().createPartitionedTopic(topicName, 1); + + // 1. prepare topic: + // use kafka producer to produce 10 messages. + // use pulsar consumer to get message offset. + @Cleanup + KProducer kProducer = new KProducer(topicName, false, getKafkaBrokerPort()); + int totalMsgs = 10; + String messageStrPrefix = topicName + "_message_"; + + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; + kProducer.getProducer() + .send(new ProducerRecord<>( + topicName, + i, + messageStr)) + .get(); + log.debug("Kafka Producer Sent message: ({}, {})", i, messageStr); + } + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(pulsarTopicName) + .subscriptionName(topicName + "_sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Message msg = consumer.receive(100, TimeUnit.MILLISECONDS); + assertNotNull(msg); + MessageIdImpl messageId = (MessageIdImpl) ((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId(); + // LAC entry should be the limit offset. + long limitOffset = MessageIdUtils.getOffset(messageId.getLedgerId(), totalMsgs - 1); + log.info("After create {} messages, get messageId: {} expected latest limit: {}", + totalMsgs, messageId, limitOffset); + + // 2. real test, for ListOffset request verify Earliest get earliest + Map targetTimes = Maps.newHashMap(); + targetTimes.put(tp, ListOffsetRequest.LATEST_TIMESTAMP); + + ListOffsetRequest.Builder builder = ListOffsetRequest.Builder + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .setTargetTimes(targetTimes); + + KafkaHeaderAndRequest request = buildRequest(builder); + CompletableFuture responseFuture = kafkaRequestHandler + .handleListOffsetRequest(request); + + ResponseAndRequest response = responseFuture.get(); + ListOffsetResponse listOffsetResponse = (ListOffsetResponse) response.getResponse(); + assertEquals(response.getRequest().getHeader().apiKey(), ApiKeys.LIST_OFFSETS); + assertEquals(listOffsetResponse.responseData().get(tp).error, Errors.NONE); + assertEquals(listOffsetResponse.responseData().get(tp).offset, Long.valueOf(limitOffset)); + assertEquals(listOffsetResponse.responseData().get(tp).timestamp, Long.valueOf(NO_TIMESTAMP)); + } +} diff --git a/src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java b/src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java index 1b64ea1921d05..77069b64b4667 100644 --- a/src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java +++ b/src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java @@ -68,7 +68,7 @@ public void testGetTopicConsumerManager() throws Exception { KafkaTopicConsumerManager topicConsumerManager2 = tcm.get(); assertTrue(topicConsumerManager == topicConsumerManager2); - assertEquals(kafkaTopicManager.getTopics().size(), 1); + assertEquals(kafkaTopicManager.getConsumerTopics().size(), 1); // 2. verify another get with different topic will return different tcm String topicName2 = "persistent://public/default/testGetTopicConsumerManager2"; @@ -76,7 +76,7 @@ public void testGetTopicConsumerManager() throws Exception { tcm = kafkaTopicManager.getTopicConsumerManager(topicName2); topicConsumerManager2 = tcm.get(); assertTrue(topicConsumerManager != topicConsumerManager2); - assertEquals(kafkaTopicManager.getTopics().size(), 2); + assertEquals(kafkaTopicManager.getConsumerTopics().size(), 2); } diff --git a/src/test/java/io/streamnative/kop/LogOffsetTest.java b/src/test/java/io/streamnative/kop/LogOffsetTest.java new file mode 100644 index 0000000000000..8f27c5db7d691 --- /dev/null +++ b/src/test/java/io/streamnative/kop/LogOffsetTest.java @@ -0,0 +1,60 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop; + +import static org.testng.Assert.assertEquals; + +import com.google.common.collect.Maps; +import io.streamnative.kop.KafkaCommandDecoder.KafkaHeaderAndRequest; +import io.streamnative.kop.KafkaCommandDecoder.ResponseAndRequest; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.IsolationLevel; +import org.apache.kafka.common.requests.ListOffsetRequest; +import org.apache.kafka.common.requests.ListOffsetResponse; +import org.testng.annotations.Test; + +/** + * Validate LogOffset. + */ +@Slf4j +public class LogOffsetTest extends KafkaApisTest { + + @Test(timeOut = 20000) + public void testGetOffsetsForUnknownTopic() throws Exception { + String topicName = "kopTestGetOffsetsForUnknownTopic"; + + TopicPartition tp = new TopicPartition(topicName, 0); + Map targetTimes = Maps.newHashMap(); + targetTimes.put(tp, ListOffsetRequest.LATEST_TIMESTAMP); + + ListOffsetRequest.Builder builder = ListOffsetRequest.Builder + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .setTargetTimes(targetTimes); + + KafkaHeaderAndRequest request = buildRequest(builder); + CompletableFuture responseFuture = kafkaRequestHandler + .handleListOffsetRequest(request); + + ResponseAndRequest response = responseFuture.get(); + ListOffsetResponse listOffsetResponse = (ListOffsetResponse) response.getResponse(); + assertEquals(response.getRequest().getHeader().apiKey(), ApiKeys.LIST_OFFSETS); + assertEquals(listOffsetResponse.responseData().get(tp).error, + Errors.UNKNOWN_TOPIC_OR_PARTITION); + } +} diff --git a/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java b/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java index 179593ff442a2..f74cfc6e51fbe 100644 --- a/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java +++ b/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java @@ -386,6 +386,7 @@ public KConsumer(String topic, String host, int port, boolean autoCommit) { if (autoCommit) { props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); } else { props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/src/test/java/io/streamnative/kop/coordinator/group/GroupCoordinatorTest.java b/src/test/java/io/streamnative/kop/coordinator/group/GroupCoordinatorTest.java index 2c1283c19a6d4..fa0075c935a60 100644 --- a/src/test/java/io/streamnative/kop/coordinator/group/GroupCoordinatorTest.java +++ b/src/test/java/io/streamnative/kop/coordinator/group/GroupCoordinatorTest.java @@ -389,7 +389,7 @@ public void testJoinGroupUnknownConsumerExistingGroup() throws Exception { @Test public void testHeartbeatWrongCoordinator() throws Exception { Errors error = groupCoordinator.handleHeartbeat(otherGroupId, memberId, -1).get(); - assertEquals(Errors.NOT_COORDINATOR, error); + assertEquals(Errors.UNKNOWN_MEMBER_ID, error); } @Test @@ -1678,7 +1678,7 @@ public void testLeaveGroupWrongCoordinator() throws Exception { Errors leaveGroupResult = groupCoordinator.handleLeaveGroup( otherGroupId, JoinGroupRequest.UNKNOWN_MEMBER_ID ).get(); - assertEquals(Errors.NOT_COORDINATOR, leaveGroupResult); + assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult); } @Test @@ -1764,7 +1764,7 @@ groupId, memberId, protocolType, newProtocols() @Test public void testDescribeGroupWrongCoordinator() { KeyValue describeGroupResult = groupCoordinator.handleDescribeGroup(otherGroupId); - assertEquals(Errors.NOT_COORDINATOR, describeGroupResult.getKey()); + assertEquals(Errors.NONE, describeGroupResult.getKey()); } @Test diff --git a/src/test/java/org/apache/bookkeeper/mledger/impl/OffsetFinderTest.java b/src/test/java/org/apache/bookkeeper/mledger/impl/OffsetFinderTest.java new file mode 100644 index 0000000000000..52de6d29269c8 --- /dev/null +++ b/src/test/java/org/apache/bookkeeper/mledger/impl/OffsetFinderTest.java @@ -0,0 +1,183 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; + +import java.lang.reflect.Field; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.protocol.ByteBufPair; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; +import org.testng.annotations.Test; + +/** + * Test for OffsetFinder. + */ +public class OffsetFinderTest extends MockedBookKeeperTestCase { + + public static byte[] createMessageWrittenToLedger(String msg) throws Exception { + PulsarApi.MessageMetadata.Builder messageMetadataBuilder = PulsarApi.MessageMetadata.newBuilder(); + messageMetadataBuilder.setPublishTime(System.currentTimeMillis()); + messageMetadataBuilder.setProducerName("createMessageWrittenToLedger"); + messageMetadataBuilder.setSequenceId(1); + PulsarApi.MessageMetadata messageMetadata = messageMetadataBuilder.build(); + ByteBuf data = UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(msg.getBytes()); + + int msgMetadataSize = messageMetadata.getSerializedSize(); + int payloadSize = data.readableBytes(); + int totalSize = 4 + msgMetadataSize + payloadSize; + + ByteBuf headers = PulsarByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize); + ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers); + headers.writeInt(msgMetadataSize); + messageMetadata.writeTo(outStream); + ByteBuf headersAndPayload = ByteBufPair.coalesce(ByteBufPair.get(headers, data)); + byte[] byteMessage = headersAndPayload.nioBuffer().array(); + headersAndPayload.release(); + return byteMessage; + } + + class Result { + ManagedLedgerException exception = null; + Position position = null; + + void reset() { + this.exception = null; + this.position = null; + } + } + + CompletableFuture findMessage(final Result result, final ManagedLedger c1, final long timestamp) { + OffsetFinder messageFinder = new OffsetFinder((ManagedLedgerImpl) c1); + + final CompletableFuture future = new CompletableFuture<>(); + messageFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + result.position = position; + future.complete(null); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Object ctx) { + result.exception = exception; + future.completeExceptionally(exception); + } + }); + return future; + } + + @Test + void testOffsetFinder() throws Exception { + final String ledgerAndCursorName = "testOffsetFinder"; + int entriesPerLedger = 2; + long beginTimestamp = System.currentTimeMillis(); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(10); + config.setMaxEntriesPerLedger(entriesPerLedger); + config.setRetentionTime(1, TimeUnit.HOURS); + ManagedLedger ledger = factory.open(ledgerAndCursorName, config); + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + + ledger.addEntry(createMessageWrittenToLedger("retained1")); + // space apart message publish times + Thread.sleep(100); + ledger.addEntry(createMessageWrittenToLedger("retained2")); + Thread.sleep(100); + Position newPosition = ledger.addEntry(createMessageWrittenToLedger("retained3")); + Thread.sleep(100); + long timestamp = System.currentTimeMillis(); + Thread.sleep(10); + + ledger.addEntry(createMessageWrittenToLedger("afterresetposition")); + + Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("not-read")); + List entries = c1.readEntries(3); + c1.markDelete(entries.get(2).getPosition()); + c1.close(); + ledger.close(); + entries.forEach(e -> e.release()); + // give timed ledger trimming a chance to run + Thread.sleep(1000); + + ledger = factory.open(ledgerAndCursorName, config); + c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + long endTimestamp = System.currentTimeMillis(); + + Result result = new Result(); + + CompletableFuture future = findMessage(result, ledger, timestamp); + future.get(); + assertEquals(result.exception, null); + assertTrue(result.position != null); + assertEquals(result.position, newPosition); + + result.reset(); + future = findMessage(result, ledger, beginTimestamp); + future.get(); + assertEquals(result.exception, null); + assertEquals(result.position, c1.getFirstPosition()); + + result.reset(); + future = findMessage(result, ledger, endTimestamp); + future.get(); + assertEquals(result.exception, null); + assertNotEquals(result.position, null); + assertEquals(result.position, lastPosition); + + OffsetFinder messageFinder = new OffsetFinder((ManagedLedgerImpl) ledger); + final AtomicBoolean ex = new AtomicBoolean(false); + messageFinder.findEntryFailed(new ManagedLedgerException("failed"), new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Object ctx) { + ex.set(true); + } + }); + assertTrue(ex.get()); + + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1); + monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), null); + Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress"); + field.setAccessible(true); + assertEquals(0, field.get(monitor)); + + result.reset(); + c1.close(); + ledger.close(); + factory.shutdown(); + } +}