Skip to content

Commit

Permalink
Revert the return of topic name from persistent://tent/ns/topicx to t…
Browse files Browse the repository at this point in the history
…opicx (apache#50)

In PR apache#47,  there is a change to return kafka client with pulsar format topic, This is a change mainly to revert the return of topic name from `persistent://tent/ns/topicx` back to `topicx`.
Also added a unit-test for this.

Some minor changes, besides this main change:
- fix log level when parse listeners port.
- change default log4j2 log level from debug to info.
- fix assert from ledgeId > 0 to ledgerId >= 0.
- add a test for message order.

========

* fix log level for listeners port get

* change default log level in log4j2

* fix assert from ledgeId > 0 to ledgerId >= 0

* add a test to check message order

* revert the return of topic name from persistent://tent/ns/topicx to topicx

* add test for metadata request handling

* fix checkstyle

* mv utils from request handler to file TopicNameUtils
  • Loading branch information
jiazhai authored Dec 6, 2019
1 parent b43e04f commit 38a95aa
Show file tree
Hide file tree
Showing 12 changed files with 360 additions and 94 deletions.
10 changes: 7 additions & 3 deletions conf/kop_standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ enableGroupCoordinator=true
messagingProtocols=kafka

listeners=PLAINTEXT://127.0.0.1:9092


### --- Changed for KoP --- ###

# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=false

### --- General broker settings --- ###

# Zookeeper quorum connection string
Expand Down Expand Up @@ -72,9 +79,6 @@ backlogQuotaDefaultLimitGB=10
# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0

# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=true

# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60

Expand Down
4 changes: 2 additions & 2 deletions conf/log4j2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Configuration:

Logger:
- name: io.streamnative.kop
level: trace
level: info
additivity: false
AppenderRef:
- ref: Console
Expand All @@ -47,7 +47,7 @@ Configuration:
- ref: Console

- name: org.apache.pulsar.broker.service
level: debug
level: info
additivity: false
AppenderRef:
- ref: Console
4 changes: 2 additions & 2 deletions src/main/java/io/streamnative/kop/KafkaProtocolHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public static int getListenerPort(String listeners, ListenerType type) {
}
}

log.error("KafkaProtocolHandler listeners {} not contains type {}", listeners, type);
log.info("KafkaProtocolHandler listeners {} not contains type {}", listeners, type);
return -1;
}

Expand All @@ -332,7 +332,7 @@ public static String getBrokerUrl(String listeners, Boolean tlsEnabled) {
}
}

log.error("listener {} not contains a valid SSL or PLAINTEXT address", listeners);
log.info("listener {} not contains a valid SSL or PLAINTEXT address", listeners);
return null;
}
}
91 changes: 50 additions & 41 deletions src/main/java/io/streamnative/kop/KafkaRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.streamnative.kop.KafkaProtocolHandler.getBrokerUrl;
import static io.streamnative.kop.KafkaProtocolHandler.getListenerPort;
import static io.streamnative.kop.MessagePublishContext.publishMessages;
import static io.streamnative.kop.utils.TopicNameUtils.getKafkaTopicNameFromPulsarTopicname;
import static io.streamnative.kop.utils.TopicNameUtils.pulsarTopicName;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
Expand Down Expand Up @@ -168,6 +169,9 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.tlsEnabled = tlsEnabled;
this.plaintextPort = getListenerPort(kafkaConfig.getListeners(), PLAINTEXT);
this.sslPort = getListenerPort(kafkaConfig.getListeners(), SSL);
this.namespace = NamespaceName.get(
kafkaConfig.getKafkaTenant(),
kafkaConfig.getKafkaNamespace());
}

protected CompletableFuture<ResponseAndRequest> handleApiVersionsRequest(KafkaHeaderAndRequest apiVersionRequest) {
Expand Down Expand Up @@ -205,22 +209,24 @@ protected CompletableFuture<ResponseAndRequest> handleTopicMetadataRequest(Kafka
// topics in format : persistent://%s/%s/abc-partition-x, will be grouped by as:
// Entry<abc, List[TopicName]>

if (null == namespace) {
log.warn("unknown namespace, setting it to default");
namespace = NamespaceName.get("public/default");
}

// A future for a map from <kafka topic> to <pulsarPartitionTopics>:
// e.g. <topic1, {persistent://public/default/topic1-partition-0,...}>
// 1. no topics provided, get all topics from namespace;
// 2. topics provided, get provided topics.
CompletableFuture<Map<String, List<TopicName>>> pulsarTopicsFuture =
(topics == null || topics.isEmpty())
? pulsarService.getNamespaceService()
.getListOfPersistentTopics(namespace)
.thenApply(list -> list.stream()
.map(topicString -> TopicName.get(topicString))
.collect(Collectors
.groupingBy(topicName ->
getPartitionedTopicNameWithoutPartitions(topicName), Collectors.toList()))
)
: new CompletableFuture<>();
?
pulsarService.getNamespaceService().getListOfPersistentTopics(namespace)
.thenApply(
list -> list.stream()
.map(topicString -> TopicName.get(topicString))
.collect(Collectors
.groupingBy(
topicName -> getKafkaTopicNameFromPulsarTopicname(topicName),
Collectors.toList()))
)
:
new CompletableFuture<>();

if (!(topics == null || topics.isEmpty())) {
Map<String, List<TopicName>> pulsarTopics = Maps.newHashMap();
Expand All @@ -231,7 +237,7 @@ protected CompletableFuture<ResponseAndRequest> handleTopicMetadataRequest(Kafka

requestTopics.stream()
.forEach(topic -> {
TopicName topicName = pulsarTopicName(topic);
TopicName pulsarTopicName = pulsarTopicName(topic, namespace);
AuthenticationDataSource authData =
null != authState ? authState.getAuthDataSource() : null;
// get partition numbers for each topic.
Expand All @@ -241,7 +247,7 @@ protected CompletableFuture<ResponseAndRequest> handleTopicMetadataRequest(Kafka
authRole,
null,
authData,
topicName)
pulsarTopicName)
.whenComplete((partitionedTopicMetadata, throwable) -> {
if (throwable != null) {
// Failed get partitions.
Expand All @@ -251,21 +257,21 @@ protected CompletableFuture<ResponseAndRequest> handleTopicMetadataRequest(Kafka
topic,
false,
Collections.emptyList()));
log.warn("[{}] Request {}: Failed to get partitioned topic {} metadata: {}",
ctx.channel(), metadataHar.getHeader(), topicName, throwable.getMessage());
log.warn("[{}] Request {}: Failed to get partitioned pulsar topic {} metadata: {}",
ctx.channel(), metadataHar.getHeader(), pulsarTopicName, throwable.getMessage());
} else {
List<TopicName> topicNames;
List<TopicName> pulsarTopicNames;
if (partitionedTopicMetadata.partitions > 0) {
if (log.isDebugEnabled()) {
log.debug("Topic {} has {} partitions",
topic, partitionedTopicMetadata.partitions);
}
topicNames = IntStream
pulsarTopicNames = IntStream
.range(0, partitionedTopicMetadata.partitions)
.mapToObj(i ->
TopicName.get(topicName.toString() + PARTITIONED_TOPIC_SUFFIX + i))
TopicName.get(pulsarTopicName.toString() + PARTITIONED_TOPIC_SUFFIX + i))
.collect(Collectors.toList());
pulsarTopics.put(topic, topicNames);
pulsarTopics.put(topic, pulsarTopicNames);
} else {
if (kafkaConfig.isAllowAutoTopicCreation()) {
try {
Expand All @@ -274,9 +280,9 @@ protected CompletableFuture<ResponseAndRequest> handleTopicMetadataRequest(Kafka
+ "auto create partitioned topic",
ctx.channel(), metadataHar.getHeader(), topic);
}
admin.topics().createPartitionedTopic(topicName.toString(), 1);
admin.topics().createPartitionedTopic(pulsarTopicName.toString(), 1);
final TopicName newTopic = TopicName
.get(topicName.toString() + PARTITIONED_TOPIC_SUFFIX + 0);
.get(pulsarTopicName.toString() + PARTITIONED_TOPIC_SUFFIX + 0);
pulsarTopics.put(topic, Lists.newArrayList(newTopic));
} catch (PulsarAdminException e) {
log.error("[{}] Request {}: createPartitionedTopic failed.",
Expand Down Expand Up @@ -381,15 +387,27 @@ protected CompletableFuture<ResponseAndRequest> handleTopicMetadataRequest(Kafka
if (finishedPartitions == partitionsNumber) {
// new TopicMetadata for this topic
allTopicMetadata.add(
new TopicMetadata(Errors.NONE, topic, false, partitionMetadatas));
new TopicMetadata(
Errors.NONE,
TopicName.get(topic).getLocalName(),
false,
partitionMetadatas));

// whether completed all the topics requests.
int finishedTopics = topicsCompleted.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Completed findBroker for topic {}, "
+ "partitions found/all: {}/{}",
+ "partitions found/all: {}/{}. \n dump All Metadata:",
ctx.channel(), metadataHar.getHeader(), topic,
finishedTopics, topicsNumber);

allTopicMetadata.stream()
.forEach(data -> {
log.debug("topicMetadata: {}", data.toString());
data.partitionMetadata()
.forEach(partitionData ->
log.debug(" partitionMetadata: {}", data.toString()));
});
}
if (finishedTopics == topicsNumber) {
// TODO: confirm right value for controller_id
Expand Down Expand Up @@ -443,7 +461,7 @@ protected CompletableFuture<ResponseAndRequest> handleProduceRequest(KafkaHeader
topicPartition.topic(), topicPartition.partition(), responsesSize);
}

TopicName topicName = pulsarTopicName(topicPartition);
TopicName topicName = pulsarTopicName(topicPartition, namespace);

pulsarService.getBrokerService().getTopic(topicName.toString(), true)
.whenComplete((topicOpt, exception) -> {
Expand Down Expand Up @@ -642,7 +660,7 @@ private CompletableFuture<ResponseAndRequest> handleListOffsetRequestV1AndAbove(

request.partitionTimestamps().entrySet().stream().forEach(tms -> {
TopicPartition topic = tms.getKey();
TopicName pulsarTopic = pulsarTopicName(topic);
TopicName pulsarTopic = pulsarTopicName(topic, namespace);
Long times = tms.getValue();
CompletableFuture<ListOffsetResponse.PartitionData> partitionData;

Expand Down Expand Up @@ -705,7 +723,7 @@ private Map<TopicPartition, Errors> nonExistingTopicErrors(OffsetCommitRequest r
return request.offsetData().entrySet().stream()
.filter(entry ->
// filter not exist topics
!topicManager.topicExists(pulsarTopicName(entry.getKey()).toString()))
!topicManager.topicExists(pulsarTopicName(entry.getKey(), namespace).toString()))
.collect(Collectors.toMap(
e -> e.getKey(),
e -> Errors.UNKNOWN_TOPIC_OR_PARTITION
Expand Down Expand Up @@ -1081,8 +1099,8 @@ private CompletableFuture<PartitionMetadata> findBroker(PulsarService pulsarServ
String listeners1 = kafkaConfig.getListeners();

if (log.isDebugEnabled()) {
log.debug("Found broker: {} for topicName: {}, local address: {}, found brokerUri: {}",
listeners, topic, listeners1, uri);
log.debug("Found broker: {} for topicName: {}, local address: {}, found brokerUri: {}: {}:{}",
listeners, topic, listeners1, uri, uri.getHost(), uri.getPort());
}

if (!topicManager.topicExists(topic.toString()) && listeners1.contains(uri.getHost())) {
Expand Down Expand Up @@ -1123,7 +1141,7 @@ private CompletableFuture<PartitionMetadata> findBroker(PulsarService pulsarServ

static Node newNode(InetSocketAddress address) {
if (log.isDebugEnabled()) {
log.debug("Return Broker Node of {}", address);
log.debug("Return Broker Node of {}. {}:{}", address, address.getHostString(), address.getPort());
}
return new Node(
Murmur3_32Hash.getInstance().makeHash((address.getHostString() + address.getPort()).getBytes(UTF_8)),
Expand Down Expand Up @@ -1182,15 +1200,6 @@ static PartitionMetadata newFailedPartitionMetadata(TopicName topicName) {
);
}

static String getPartitionedTopicNameWithoutPartitions(TopicName topicName) {
String localName = topicName.getPartitionedTopicName();
if (localName.contains(PARTITIONED_TOPIC_SUFFIX)) {
return localName.substring(0, localName.lastIndexOf(PARTITIONED_TOPIC_SUFFIX));
} else {
return localName;
}
}

static AbstractResponse failedResponse(KafkaHeaderAndRequest requestHar, Throwable e) {
if (log.isDebugEnabled()) {
log.debug("Request {} get failed response ", requestHar.getHeader().apiKey(), e);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/streamnative/kop/MessageFetchContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public CompletableFuture<ResponseAndRequest> handleFetch(CompletableFuture<Respo
((FetchRequest) fetchRequest.getRequest())
.fetchData().entrySet().stream()
.map(entry -> {
TopicName topicName = pulsarTopicName(entry.getKey());
TopicName topicName = pulsarTopicName(entry.getKey(), requestHandler.getNamespace());
long offset = entry.getValue().fetchOffset;

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -282,7 +282,7 @@ private Map<TopicPartition, CompletableFuture<Entry>> readAllCursorOnce(
new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> list, Object o) {
TopicName topicName = pulsarTopicName(pair.getKey());
TopicName topicName = pulsarTopicName(pair.getKey(), requestHandler.getNamespace());

Entry entry = null;
if (!list.isEmpty()) {
Expand Down Expand Up @@ -324,7 +324,7 @@ public void readEntriesComplete(List<Entry> list, Object o) {
@Override
public void readEntriesFailed(ManagedLedgerException e, Object o) {
log.error("Error read entry for topic: {}",
pulsarTopicName(pair.getKey()));
pulsarTopicName(pair.getKey(), requestHandler.getNamespace()));
readFuture.completeExceptionally(e);
}
}, null);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/streamnative/kop/utils/MessageIdUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ public class MessageIdUtils {

public static final long getOffset(long ledgerId, long entryId) {
// Combine ledger id and entry id to form offset
checkArgument(ledgerId > 0, "Expected ledgerId > 0, but get " + ledgerId);
checkArgument(ledgerId >= 0, "Expected ledgerId >= 0, but get " + ledgerId);
checkArgument(entryId >= 0, "Expected entryId >= 0, but get " + entryId);

long offset = (ledgerId << (ENTRY_BITS + BATCH_BITS) | (entryId << BATCH_BITS));
return offset;
}

public static final long getOffset(long ledgerId, long entryId, int batchIndex) {
checkArgument(ledgerId > 0, "Expected ledgerId > 0, but get " + ledgerId);
checkArgument(ledgerId >= 0, "Expected ledgerId >= 0, but get " + ledgerId);
checkArgument(entryId >= 0, "Expected entryId >= 0, but get " + entryId);
checkArgument(batchIndex >= 0, "Expected batchIndex >= 0, but get " + batchIndex);
checkArgument(batchIndex < (1 << BATCH_BITS),
Expand Down Expand Up @@ -78,7 +78,7 @@ public static final int getBatchIndex(long offset) {
}

// get next offset that after batch Index.
// In TopicConsumereManager, next read offset is updated after each entry reads,
// In TopicConsumerManager, next read offset is updated after each entry reads,
// if it read a batched message previously, the next offset waiting read is next entry.
public static final long offsetAfterBatchIndex(long offset) {
// De-multiplex ledgerId and entryId from offset
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/io/streamnative/kop/utils/TopicNameUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,21 @@ public static TopicName pulsarTopicName(String topic, int partitionIndex, Namesp
namespace,
topic + PARTITIONED_TOPIC_SUFFIX + partitionIndex);
}

public static String getPartitionedTopicNameWithoutPartitions(TopicName topicName) {
String localName = topicName.getPartitionedTopicName();
if (localName.contains(PARTITIONED_TOPIC_SUFFIX)) {
return localName.substring(0, localName.lastIndexOf(PARTITIONED_TOPIC_SUFFIX));
} else {
return localName;
}
}

public static String getKafkaTopicNameFromPulsarTopicname(TopicName topicName) {
// remove partition part
String localName = topicName.getPartitionedTopicName();
// remove persistent://tenant/ns
return TopicName.get(localName).getLocalName();
}

}
Loading

0 comments on commit 38a95aa

Please sign in to comment.