Skip to content

Commit

Permalink
Support DescribeConfigs request (apache#251)
Browse files Browse the repository at this point in the history
Master issue: apache#241

This PR add support for Kafka DescribeConfigs request. The request is to get topic or broker related configs from Kafka but the Kafka's config is much different from KoP. We just only support request for topic's configs and use the default config value so that Confluent Schema Registry can run with KoP.


* Handle DescribeConfig request

* Add check for topic existence

* Fix NPE when DescribeConfigsRequest has no config names

* Check for non-existed topic

* Add unit test for DescribeConfigsRequest
  • Loading branch information
BewareMyPower authored Dec 7, 2020
1 parent 9f324b0 commit 08a8c07
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,22 @@
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.pulsar.client.admin.PulsarAdmin;

@Slf4j
Expand Down Expand Up @@ -111,4 +117,58 @@ CompletableFuture<Map<String, ApiError>> createTopicsAsync(Map<String, TopicDeta

return resultFuture;
}

CompletableFuture<Map<ConfigResource, DescribeConfigsResponse.Config>> describeConfigsAsync(
Map<ConfigResource, Optional<Set<String>>> resourceToConfigNames) {
// Since Kafka's storage and policies are much different from Pulsar, here we just return a default config to
// avoid some Kafka based systems need to send DescribeConfigs request, like confluent schema registry.
final DescribeConfigsResponse.Config defaultTopicConfig = new DescribeConfigsResponse.Config(ApiError.NONE,
KafkaLogConfig.getEntries().entrySet().stream().map(entry ->
new DescribeConfigsResponse.ConfigEntry(entry.getKey(), entry.getValue(),
DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, false,
Collections.emptyList())
).collect(Collectors.toList()));

Map<ConfigResource, CompletableFuture<DescribeConfigsResponse.Config>> futureMap =
resourceToConfigNames.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
ConfigResource resource = entry.getKey();
try {
CompletableFuture<DescribeConfigsResponse.Config> future = new CompletableFuture<>();
switch (resource.type()) {
case TOPIC:
KopTopic kopTopic = new KopTopic(resource.name());
admin.topics().getPartitionedTopicMetadataAsync(kopTopic.getFullName())
.whenComplete((metadata, e) -> {
if (e != null) {
future.complete(new DescribeConfigsResponse.Config(
ApiError.fromThrowable(e), Collections.emptyList()));
} else if (metadata.partitions > 0) {
future.complete(defaultTopicConfig);
} else {
final ApiError error = new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
"Topic " + kopTopic.getOriginalName() + " doesn't exist");
future.complete(new DescribeConfigsResponse.Config(
error, Collections.emptyList()));
}
});
break;
case BROKER:
throw new RuntimeException("KoP doesn't support resource type: " + resource.type());
default:
throw new InvalidRequestException("Unsupported resource type: " + resource.type());
}
return future;
} catch (Exception e) {
return CompletableFuture.completedFuture(
new DescribeConfigsResponse.Config(ApiError.fromThrowable(e), Collections.emptyList()));
}
}));
CompletableFuture<Map<ConfigResource, DescribeConfigsResponse.Config>> resultFuture = new CompletableFuture<>();
CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).whenComplete((ignored, e) -> {
resultFuture.complete(futureMap.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getNow(null))
));
});
return resultFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case CREATE_TOPICS:
handleCreateTopics(kafkaHeaderAndRequest, responseFuture);
break;
case DESCRIBE_CONFIGS:
handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture);
break;
default:
handleError(kafkaHeaderAndRequest, responseFuture);
}
Expand Down Expand Up @@ -366,6 +369,9 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) {
protected abstract void
handleCreateTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeConfigs(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

static class KafkaHeaderAndRequest implements Closeable {

private static final String DEFAULT_CLIENT_HOST = "";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.Records;

/**
* KafkaLogConfig is ported from kafka.log.LogConfig.
*/
public class KafkaLogConfig {
private static final Map<String, String> entries = defaultEntries();

public static Map<String, String> getEntries() {
return entries;
}

private static Map<String, String> defaultEntries() {
return Collections.unmodifiableMap(new HashMap<String, String>(){{
put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(1024 * 1024 * 1024));
put(TopicConfig.SEGMENT_MS_CONFIG, Long.toString(24 * 7 * 7 * 60 * 1000L));
put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, "0");
put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, Integer.toString(10 * 1024 * 1024));
put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(Long.MAX_VALUE));
put(TopicConfig.FLUSH_MS_CONFIG, Long.toString(Long.MAX_VALUE));
put(TopicConfig.RETENTION_BYTES_CONFIG, Long.toString(-1L));
put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(24 * 7 * 60 * 60 * 60 * 1000L));
put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, Integer.toString(1000000 + Records.LOG_OVERHEAD));
put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "4096");
put(TopicConfig.DELETE_RETENTION_MS_CONFIG, Long.toString(24 * 60 * 60 * 1000L));
put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, Long.toString(0L));
put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "60000");
put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.5");
// Kafka's default value of cleanup.policy is "delete", but here we set it to "compact" because confluent
// schema registry needs this config value.
put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact");
put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1");
put(TopicConfig.COMPRESSION_TYPE_CONFIG, "producer");
put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false");
put(TopicConfig.PREALLOCATE_CONFIG, "false");
put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "2.0");
put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.toString(Long.MAX_VALUE));
put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, "true");
}});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMember;
Expand Down Expand Up @@ -1182,6 +1184,22 @@ protected void handleCreateTopics(KafkaHeaderAndRequest createTopics,
}
}

protected void handleDescribeConfigs(KafkaHeaderAndRequest describeConfigs,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(describeConfigs.getRequest() instanceof DescribeConfigsRequest);
DescribeConfigsRequest request = (DescribeConfigsRequest) describeConfigs.getRequest();

adminManager.describeConfigsAsync(new ArrayList<>(request.resources()).stream()
.collect(Collectors.toMap(
resource -> resource,
resource -> Optional.ofNullable(request.configNames(resource)).map(HashSet::new)
))
).thenApply(configResourceConfigMap -> {
resultFuture.complete(new DescribeConfigsResponse(0, configResourceConfigMap));
return null;
});
}

private SaslHandshakeResponse checkSaslMechanism(String mechanism) {
if (getKafkaConfig().getSaslAllowedMechanisms().contains(mechanism)) {
return new SaslHandshakeResponse(Errors.NONE, getKafkaConfig().getSaslAllowedMechanisms());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand All @@ -54,7 +55,9 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiVersionsRequest;
Expand Down Expand Up @@ -362,6 +365,34 @@ public void testCreateInvalidTopics() {
}
}

@Test(timeOut = 10000)
public void testDescribeConfigs() throws Exception {
final String topic = "testDescribeConfigs";
admin.topics().createPartitionedTopic(topic, 1);

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());

@Cleanup
AdminClient kafkaAdmin = AdminClient.create(props);
final Map<String, String> entries = KafkaLogConfig.getEntries();

kafkaAdmin.describeConfigs(Collections.singletonList(new ConfigResource(ConfigResource.Type.TOPIC, topic)))
.all().get().forEach((resource, config) -> {
assertEquals(resource.name(), topic);
config.entries().forEach(entry -> assertEquals(entry.value(), entries.get(entry.name())));
});

final String invalidTopic = "invalid-topic";
try {
kafkaAdmin.describeConfigs(Collections.singletonList(
new ConfigResource(ConfigResource.Type.TOPIC, invalidTopic))).all().get();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof UnknownTopicOrPartitionException);
assertTrue(e.getMessage().contains("Topic " + invalidTopic + " doesn't exist"));
}
}

@Test(timeOut = 10000)
public void testProduceCallback() throws Exception {
final String topic = "test-produce-callback";
Expand Down

0 comments on commit 08a8c07

Please sign in to comment.