Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

- DryRun enhacement for topics creation/update #221

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/config-values.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,39 @@ You can also create your own custom validations. The validations must implement
- com.purbon.kafka.topology.validation.TopologyValidation
- com.purbon.kafka.topology.validation.TopicValidation

Dryrun output for topics
-----------
When the --dryRun option is specified then only the changed topics and values will be shown in the log for the topics with the following format.

::
{
"Operation" : "com.purbon.kafka.topology.actions.topics.SyncTopicAction",
"Topic" : "project.test-new-topic.a0",
"Action" : "create",
"Changes" : {
"cleanup.policy" : "[Set] delete",
"min.insync.replicas" : "[Set] 3"
}
}
{
"Operation" : "com.purbon.kafka.topology.actions.topics.SyncTopicAction",
"Topic" : "project.test-existing-topic.a0",
"Action" : "update",
"Changes" : {
"cleanup.policy" : "[Update] delete -> cleanup",
"min.insync.replicas" : "[Update] 2 -> 3",
"retention.ms" : "[Unset] 2592000000 -> "
}
}

Legend
[Set] config entry is new
[Unset] config entry is removed
[Update] config entry is updated

Note: num.partitions, replication.factor will never be part of the update as this needs to be applied differently.


Prevent ACL for topic creation for connector principal
-----------

Expand Down
1 change: 1 addition & 0 deletions docs/how-to-run-it.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ If you are using the CLI tool, you an use the *--help* command to list the diffe
--help Prints usage information.
--quiet Print minimum status update
--topology <arg> Topology config file.
--plan <arg> Plan config file.
--version Prints useful version information.

The most important ones are:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void run(String[] args) throws Exception {

processTopology(
cmd.getOptionValue(TOPOLOGY_OPTION), cmd.getOptionValue(PLANS_OPTION, "default"), config);
System.out.println("Kafka Topology updated");
System.out.println("Kafka Topology finished successfully");
}

private Map<String, String> parseConfig(CommandLine cmd) {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class Configuration {
private static final String CONFLUENT_COMMAND_TOPIC_CONFIG = "confluent.command.topic";
private static final String CONFLUENT_METRICS_TOPIC_CONFIG = "confluent.metrics.topic";
static final String TOPIC_PREFIX_FORMAT_CONFIG = "topology.topic.prefix.format";

static final String PROJECT_PREFIX_FORMAT_CONFIG = "topology.project.prefix.format";
static final String TOPIC_PREFIX_SEPARATOR_CONFIG = "topology.topic.prefix.separator";
static final String TOPOLOGY_VALIDATIONS_CONFIG = "topology.validations";
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/purbon/kafka/topology/ExecutionPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void run(boolean dryRun) throws IOException {
}

private void execute(Action action, boolean dryRun) throws IOException {
LOGGER.debug(String.format("Execution action %s (dryRun=%s)", action, dryRun));
LOGGER.debug("Execution action {} (dryRun={}})", action, dryRun);
if (dryRun) {
outputStream.println(action);
} else {
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/com/purbon/kafka/topology/TopicManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ public void apply(Topology topology, ExecutionPlan plan) throws IOException {
(topicName, topic) -> {
plan.add(
new SyncTopicAction(
adminClient, schemaRegistryManager, topic, topicName, listOfTopics));
adminClient,
schemaRegistryManager,
config,
topic,
topicName,
listOfTopics.contains(topicName)));
});

if (config.allowDelete() || config.isAllowDeleteTopics()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ public abstract class BaseAction implements Action {
@Override
public String toString() {
try {
return JSON.asPrettyString(props());
Map<String, Object> props = props();
if (props != null) {
return JSON.asPrettyString(props);
}
return "";
} catch (JsonProcessingException e) {
return "";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package com.purbon.kafka.topology.actions.topics;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.actions.BaseAction;
import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.TopicSchemas;
import com.purbon.kafka.topology.model.schema.Subject;
import com.purbon.kafka.topology.schemas.SchemaRegistryManager;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -20,21 +20,24 @@ public class SyncTopicAction extends BaseAction {

private final Topic topic;
private final String fullTopicName;
private final Set<String> listOfTopics;
private final TopologyBuilderAdminClient adminClient;
private final SchemaRegistryManager schemaRegistryManager;
private final Configuration config;
private final boolean topicExists;

public SyncTopicAction(
TopologyBuilderAdminClient adminClient,
SchemaRegistryManager schemaRegistryManager,
Configuration config,
Topic topic,
String fullTopicName,
Set<String> listOfTopics) {
boolean topicExists) {
this.topic = topic;
this.fullTopicName = fullTopicName;
this.listOfTopics = listOfTopics;
this.adminClient = adminClient;
this.schemaRegistryManager = schemaRegistryManager;
this.topicExists = topicExists;
this.config = config;
}

public String getTopic() {
Expand All @@ -43,20 +46,27 @@ public String getTopic() {

@Override
public void run() throws IOException {
syncTopic(topic, fullTopicName, listOfTopics);
syncTopic(topic, fullTopicName);
}

private void syncTopic(Topic topic, String fullTopicName, Set<String> listOfTopics)
throws IOException {
LOGGER.debug(String.format("Sync topic %s", fullTopicName));
if (existTopic(fullTopicName, listOfTopics)) {
private boolean topicExists() {
return topicExists;
}

private void syncTopic(Topic topic, String fullTopicName) throws IOException {
LOGGER.debug("Sync topic {}", fullTopicName);
if (topicExists()) {
if (topic.partitionsCount() > adminClient.getPartitionCount(fullTopicName)) {
LOGGER.debug(String.format("Update partition count of topic %s", fullTopicName));
LOGGER.debug("Update partition count of topic {}", fullTopicName);
adminClient.updatePartitionCount(topic, fullTopicName);
} else {
LOGGER.info(
"Skipping topic sync for topic {} due to limitation to decrease partition count of existing topic configuration",
fullTopicName);
}
adminClient.updateTopicConfig(topic, fullTopicName);
adminClient.updateTopicConfig(topic, fullTopicName, config.isDryRun());
} else {
LOGGER.debug(String.format("Create new topic with name %s", fullTopicName));
LOGGER.debug("Create new topic with name {}", fullTopicName);
adminClient.createTopic(topic, fullTopicName);
}

Expand All @@ -83,17 +93,28 @@ private void setCompatibility(String subjectName, Optional<String> compatibility
compatibility -> schemaRegistryManager.setCompatibility(subjectName, compatibility));
}

private boolean existTopic(String topic, Set<String> listOfTopics) {
return listOfTopics.contains(topic);
}

@Override
protected Map<String, Object> props() {
Map<String, Object> map = new HashMap<>();
Map<String, Object> map = new LinkedHashMap<>();
map.put("Operation", getClass().getName());
map.put("Topic", fullTopicName);
String actionName = existTopic(fullTopicName, listOfTopics) ? "update" : "create";
map.put("Action", actionName);
String actionName = topicExists() ? "update" : "create";
Map<String, String> changes = resolveChanges();
if (!changes.isEmpty()) {
map.put("Action", actionName);
map.put("Changes", changes);
} else {
return null;
}
return map;
}

private Map<String, String> resolveChanges() {
try {
return this.adminClient.updateTopicConfig(this.topic, this.fullTopicName, true);
} catch (IOException e) {
LOGGER.error(e);
throw new RuntimeException(e);
}
}
}
Loading