Skip to content

Commit

Permalink
Add some test cases for KafkaRequestHandler (apache#40)
Browse files Browse the repository at this point in the history
- Add some test cases for KafkaRequestHandler. fix error in the tests.
- change handleListOffsetRequest to get topic offset from PersistentTopic directly instead of calling  admin command to get topic stats.
- In topicManager add a Map to cache PersistentTopic.
  • Loading branch information
jiazhai authored and sijie committed Sep 17, 2019
1 parent 2ebd3ac commit 2c4ef03
Show file tree
Hide file tree
Showing 17 changed files with 1,453 additions and 80 deletions.
234 changes: 199 additions & 35 deletions src/main/java/io/streamnative/kop/KafkaRequestHandler.java

Large diffs are not rendered by default.

11 changes: 4 additions & 7 deletions src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
import static com.google.common.base.Preconditions.checkArgument;

import io.streamnative.kop.utils.MessageIdUtils;
import io.streamnative.kop.utils.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImplWrapper;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.commons.codec.digest.DigestUtils;
Expand Down Expand Up @@ -72,11 +71,9 @@ public CompletableFuture<Pair<ManagedCursor, Long>> remove(long offset) {

try {
// get previous position, because NonDurableCursor is read from next position.
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
Method getPreviousPosition = ReflectionUtils
.setMethodAccessible(ledger, "getPreviousPosition", PositionImpl.class);
PositionImpl previous = (PositionImpl) getPreviousPosition.invoke(ledger, position);

ManagedLedgerImplWrapper ledger =
new ManagedLedgerImplWrapper((ManagedLedgerImpl) topic.getManagedLedger());
PositionImpl previous = ledger.getPreviousPosition(position);
if (log.isDebugEnabled()) {
log.debug("Create cursor {} for offset: {}. position: {}, previousPosition: {}",
cursorName, offset, position, previous);
Expand Down
27 changes: 24 additions & 3 deletions src/main/java/io/streamnative/kop/KafkaTopicManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,23 @@
public class KafkaTopicManager {

private final BrokerService service;

// consumerTopics for consumers cache.
@Getter
private final ConcurrentOpenHashMap<String, CompletableFuture<KafkaTopicConsumerManager>> topics;
private final ConcurrentOpenHashMap<String, CompletableFuture<KafkaTopicConsumerManager>> consumerTopics;

// cache for topics
private final ConcurrentOpenHashMap<String, PersistentTopic> topics;

KafkaTopicManager(BrokerService service) {
this.service = service;
consumerTopics = new ConcurrentOpenHashMap<>();
topics = new ConcurrentOpenHashMap<>();
}


// topicName is in pulsar format. e.g. persistent://public/default/topic-partition-0
public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(String topicName) {
return topics.computeIfAbsent(
return consumerTopics.computeIfAbsent(
topicName,
t -> service
.getTopic(topicName, true)
Expand All @@ -48,6 +53,7 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
log.debug("Call getTopicConsumerManager for {}, and create KafkaTopicConsumerManager.",
topicName);
}
topics.putIfAbsent(topicName, (PersistentTopic) t2.get());
return new KafkaTopicConsumerManager((PersistentTopic) t2.get());
})
.exceptionally(ex -> {
Expand All @@ -57,4 +63,19 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
})
);
}

// whether topic exists or not
public boolean topicExists(String topicName) {
return topics.containsKey(topicName);
}

public PersistentTopic addTopic(String topicName, PersistentTopic persistentTopic) {
return topics.putIfAbsent(topicName, persistentTopic);
}

public PersistentTopic getTopic(String topicName) {
return topics.get(topicName);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -721,35 +721,37 @@ public CompletableFuture<Map<TopicPartition, Errors>> handleCommitOffsets(
int generationId,
Map<TopicPartition, OffsetAndMetadata> offsetMetadata
) {
return validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT).map(error ->
CompletableFuture.completedFuture(
CoreUtils.mapValue(
offsetMetadata,
ignored -> error
)
)
).orElseGet(() -> {
return groupManager.getGroup(groupId).map(group ->
doCommitOffsets(
group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
offsetMetadata
return validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT)
.map(error ->
CompletableFuture.completedFuture(
CoreUtils.mapValue(
offsetMetadata,
ignored -> error
)
)
).orElseGet(() -> {
if (generationId < 0) {
// the group is not relying on Kafka for group management, so allow the commit
GroupMetadata group = groupManager.addGroup(new GroupMetadata(groupId, Empty));
return doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
offsetMetadata);
} else {
return CompletableFuture.completedFuture(
CoreUtils.mapValue(
offsetMetadata,
ignored -> Errors.ILLEGAL_GENERATION
return groupManager.getGroup(groupId)
.map(group ->
doCommitOffsets(
group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
offsetMetadata
)
);
}
).orElseGet(() -> {
if (generationId < 0) {
// the group is not relying on Kafka for group management, so allow the commit
GroupMetadata group = groupManager.addGroup(new GroupMetadata(groupId, Empty));
return doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
offsetMetadata);
} else {
return CompletableFuture.completedFuture(
CoreUtils.mapValue(
offsetMetadata,
ignored -> Errors.ILLEGAL_GENERATION
)
);
}
});
});
});
}

public Future<?> scheduleHandleTxnCompletion(
Expand Down Expand Up @@ -896,11 +898,14 @@ private Optional<Errors> validateGroupStatus(String groupId,
return Optional.of(Errors.COORDINATOR_NOT_AVAILABLE);
} else if (groupManager.isGroupLoading(groupId)) {
return Optional.of(Errors.COORDINATOR_LOAD_IN_PROGRESS);
} else if (!groupManager.isGroupLocal(groupId)
&& api != ApiKeys.JOIN_GROUP // first time join, group may not persisted.
&& api != ApiKeys.SYNC_GROUP
&& api != ApiKeys.OFFSET_FETCH) {
return Optional.of(Errors.NOT_COORDINATOR);
// TODO: make group coordinator running in distributed mode.
// https://github.com/streamnative/kop/issues/32
// } else if (!groupManager.isGroupLocal(groupId)
// && api != ApiKeys.JOIN_GROUP // first time join, group may not persisted.
// && api != ApiKeys.SYNC_GROUP
// && api != ApiKeys.OFFSET_FETCH) {
// return Optional.of(Errors.NOT_COORDINATOR);
// }
} else {
return Optional.empty();
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/streamnative/kop/utils/CoreUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,14 @@ public static <K, V1, V2> Map<K, V2> mapValue(Map<K, V1> map,
));
}

public static <K, V1, V2> Map<K, V2> mapKeyValue(Map<K, V1> map,
Function<Map.Entry<K, V1>, V2> func) {
return map.entrySet()
.stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> func.apply(e)
));
}

}
9 changes: 9 additions & 0 deletions src/main/java/io/streamnative/kop/utils/MessageIdUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.streamnative.kop.utils;

import static com.google.common.base.Preconditions.checkArgument;

import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand All @@ -26,12 +28,17 @@ public static final long getOffset(long ledgerId, long entryId) {
// Combine ledger id and entry id to form offset
// Use less than 32 bits to represent entry id since it will get
// rolled over way before overflowing the max int range
checkArgument(ledgerId > 0, "Expected ledgerId > 0, but get " + ledgerId);
checkArgument(entryId >= 0, "Expected entryId >= 0, but get " + entryId);

long offset = (ledgerId << 28) | entryId;
return offset;
}

public static final MessageId getMessageId(long offset) {
// De-multiplex ledgerId and entryId from offset
checkArgument(offset > 0, "Expected Offset > 0, but get " + offset);

long ledgerId = offset >>> 28;
long entryId = offset & 0x0F_FF_FF_FFL;

Expand All @@ -40,6 +47,8 @@ public static final MessageId getMessageId(long offset) {

public static final PositionImpl getPosition(long offset) {
// De-multiplex ledgerId and entryId from offset
checkArgument(offset >= 0, "Expected Offset >= 0, but get " + offset);

long ledgerId = offset >>> 28;
long entryId = offset & 0x0F_FF_FF_FFL;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import lombok.Getter;

/**
* A wrapper to make ManagedLedgerImpl accessible.
*/
public class ManagedLedgerImplWrapper {
@Getter
private final ManagedLedgerImpl managedLedger;

public ManagedLedgerImplWrapper(ManagedLedgerImpl managedLedger) {
this.managedLedger = managedLedger;
}

public PositionImpl getNextValidPosition(final PositionImpl position) {
return managedLedger.getNextValidPosition(position);
}

// return PositionImpl(firstLedgerId, -1)
public PositionImpl getFirstPosition() {
return managedLedger.getFirstPosition();
}

// combine getFirstPosition and getNextValidPosition together.
public PositionImpl getFirstValidPosition() {
PositionImpl firstPosition = managedLedger.getFirstPosition();
if (firstPosition == null) {
return null;
} else {
return getNextValidPosition(firstPosition);
}
}

public PositionImpl getPreviousPosition(PositionImpl position) {
return managedLedger.getPreviousPosition(position);
}

public PositionImpl getLastConfirmedEntry() {
return (PositionImpl) managedLedger.getLastConfirmedEntry();
}

}
Loading

0 comments on commit 2c4ef03

Please sign in to comment.