Skip to content

Commit

Permalink
Request Fetch implementation (#19)
Browse files Browse the repository at this point in the history
Master Issue: #4 

This is the request Fetch implementation.
Basic logic: For each Fetch request from a partition, create and maintain a NonDurableCursor to read from backed PersistentTopic.

** changes **
- Add basic code implementation
- Add unit tests.
  • Loading branch information
jiazhai authored and sijie committed Jul 29, 2019
1 parent daa19e1 commit 4ec538d
Show file tree
Hide file tree
Showing 13 changed files with 945 additions and 61 deletions.
2 changes: 1 addition & 1 deletion conf/kop_standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ zookeeperServers=
# Configuration Store connection string
configurationStoreServers=

brokerServicePort=9092
brokerServicePort=6650

# Port to use to server HTTP request
webServicePort=8080
Expand Down
429 changes: 398 additions & 31 deletions src/main/java/io/streamnative/kop/KafkaRequestHandler.java

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/main/java/io/streamnative/kop/KafkaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class KafkaService extends PulsarService {

@Getter
private final KafkaServiceConfiguration kafkaConfig;
@Getter
private KafkaTopicManager kafkaTopicManager;

public KafkaService(KafkaServiceConfiguration config) {
super(config);
Expand Down Expand Up @@ -167,6 +169,8 @@ public Boolean get() {
+ (kafkaConfig.getKafkaServicePortTls().isPresent()
? "broker url= " + kafkaConfig.getKafkaServicePortTls() : "");

kafkaTopicManager = new KafkaTopicManager(getBrokerService());

log.info("Kafka messaging service is ready, {}, cluster={}, configs={}", bootstrapMessage,
kafkaConfig.getClusterName(), ReflectionToStringBuilder.toString(kafkaConfig));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
doc = "The port for serving Kafka requests"
)

private Optional<Integer> kafkaServicePort = Optional.of(9902);
private Optional<Integer> kafkaServicePort = Optional.of(9092);

@FieldContext(
category = CATEGORY_KOP,
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/streamnative/kop/KafkaStandalone.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;

Expand Down Expand Up @@ -264,6 +265,8 @@ private void createDefaultNameSpace(URL webServiceUrl, String brokerServiceUrl,
Set<String> clusters = Sets.newHashSet(config.getKafkaClusterName());
admin.namespaces().createNamespace(defaultNamespace, clusters);
admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, clusters);
admin.namespaces().setRetention(defaultNamespace,
new RetentionPolicies(20, 100));
}
} catch (PulsarAdminException e) {
log.info("error while create default namespace: {}", e.getMessage());
Expand Down
99 changes: 99 additions & 0 deletions src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kop;

import io.streamnative.kop.utils.MessageIdUtils;
import io.streamnative.kop.utils.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;

/**
* KafkaTopicConsumerManager manages a topic and its related offset cursor.
* Each cursor is trying to track the read from a consumer client.
*/
@Slf4j
public class KafkaTopicConsumerManager {

private final PersistentTopic topic;
@Getter
private final ConcurrentLongHashMap<CompletableFuture<ManagedCursor>> consumers;

KafkaTopicConsumerManager(PersistentTopic topic) {
this.topic = topic;
this.consumers = new ConcurrentLongHashMap<>();
}

public CompletableFuture<ManagedCursor> remove(long offset) {
CompletableFuture<ManagedCursor> cursor = consumers.remove(offset);
if (cursor != null) {
if (log.isDebugEnabled()) {
log.debug("Get cursor for offset: {} in cache", offset);
}
return cursor;
}

// handle null remove.
cursor = new CompletableFuture<>();
CompletableFuture<ManagedCursor> oldCursor = consumers.putIfAbsent(offset, cursor);
if (oldCursor != null) {
// added by other thread while creating.
return remove(offset);
}

String cursorName = "kop-consumer-cursor-" + topic.getName() + "-" + offset + "-"
+ DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);

PositionImpl position = MessageIdUtils.getPosition(offset);

try {
// get previous position, because NonDurableCursor is read from next position.
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
Method getPreviousPosition = ReflectionUtils
.setMethodAccessible(ledger, "getPreviousPosition", PositionImpl.class);
PositionImpl previous = (PositionImpl) getPreviousPosition.invoke(ledger, position);

if (log.isDebugEnabled()) {
log.debug("Create cursor {} for offset: {}. position: {}, previousPosition: {}",
cursorName, offset, position, previous);
}

cursor.complete(topic.getManagedLedger().newNonDurableCursor(previous, cursorName));
} catch (Exception e) {
log.error("Failed create nonDurable cursor for topic {} position: {}.", topic, position, e);
cursor.completeExceptionally(e);
}

return remove(offset);
}


// once entry read complete, add new offset back.
public void add(long offset, CompletableFuture<ManagedCursor> cursor) {
CompletableFuture<ManagedCursor> oldCursor = consumers.putIfAbsent(offset, cursor);

if (log.isDebugEnabled()) {
log.debug("Add cursor {} for offset: {}. oldCursor: {}", cursor.join().getName(), offset, oldCursor);
}
}

}
58 changes: 58 additions & 0 deletions src/main/java/io/streamnative/kop/KafkaTopicManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kop;

import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;

/**
* KafkaTopicManager manages a Map of topic to KafkaTopicConsumerManager.
* For each topic, there is a KafkaTopicConsumerManager, which manages a topic and its related offset cursor.
*/
@Slf4j
public class KafkaTopicManager {

private final BrokerService service;
@Getter
private final ConcurrentOpenHashMap<String, CompletableFuture<KafkaTopicConsumerManager>> topics;

KafkaTopicManager(BrokerService service) {
this.service = service;
topics = new ConcurrentOpenHashMap<>();
}

public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(String topicName) {
return topics.computeIfAbsent(
topicName,
t -> service
.getTopic(topicName, true)
.thenApply(t2 -> {
if (log.isDebugEnabled()) {
log.debug("Call getTopicConsumerManager for {}, and create KafkaTopicConsumerManager.",
topicName);
}
return new KafkaTopicConsumerManager((PersistentTopic) t2.get());
})
.exceptionally(ex -> {
log.error("Failed to getTopicConsumerManager {}. exception:",
topicName, ex);
return null;
})
);
}
}
11 changes: 10 additions & 1 deletion src/main/java/io/streamnative/kop/utils/MessageIdUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.kop.utils;

import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;

Expand All @@ -30,10 +31,18 @@ public static final long getOffset(long ledgerId, long entryId) {
}

public static final MessageId getMessageId(long offset) {
// Demultiplex ledgerId and entryId from offset
// De-multiplex ledgerId and entryId from offset
long ledgerId = offset >>> 28;
long entryId = offset & 0x0F_FF_FF_FFL;

return new MessageIdImpl(ledgerId, entryId, -1);
}

public static final PositionImpl getPosition(long offset) {
// De-multiplex ledgerId and entryId from offset
long ledgerId = offset >>> 28;
long entryId = offset & 0x0F_FF_FF_FFL;

return new PositionImpl(ledgerId, entryId);
}
}
25 changes: 21 additions & 4 deletions src/main/java/io/streamnative/kop/utils/ReflectionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public final class ReflectionUtils {
* @return the value of the private field
*/
@SuppressWarnings("unchecked")
public static <T> T getField(Object privateObject, String fieldName) {
public static <T> T getSuperField(Object privateObject, String fieldName) {
try {
Field privateField = privateObject.getClass().getSuperclass().getDeclaredField(fieldName);
privateField.setAccessible(true);
Expand All @@ -49,7 +49,7 @@ public static <T> T getField(Object privateObject, String fieldName) {
* @throws IllegalAccessException
* @throws NoSuchFieldException
*/
public static <T> void setField(Object privateObject,
public static <T> void setSuperField(Object privateObject,
String fieldName,
T fieldValue)
throws IllegalAccessException, NoSuchFieldException {
Expand All @@ -59,12 +59,12 @@ public static <T> void setField(Object privateObject,
}

/**
* Call the private method's super class.
* Call the private method's super class method.
*
* @param privateObject the object
* @param methodName the private method name
*/
public static void callNoArgVoidMethod(Object privateObject,
public static void callSuperNoArgVoidMethod(Object privateObject,
String methodName) throws Exception {
try {
Method privateStringMethod = privateObject.getClass().getSuperclass()
Expand All @@ -83,6 +83,23 @@ public static void callNoArgVoidMethod(Object privateObject,
}
}

/**
* set the private method's accessible.
*
* @param privateObject the object
* @param methodName the private method name
* @param parameterTypes the parameter types
*/
public static Method setMethodAccessible(Object privateObject,
String methodName,
Class<?>... parameterTypes) throws Exception {
Method privateStringMethod = privateObject.getClass().getDeclaredMethod(methodName, parameterTypes);

privateStringMethod.setAccessible(true);

return privateStringMethod;
}

private ReflectionUtils() {}

}
Loading

0 comments on commit 4ec538d

Please sign in to comment.