Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

- DryRun enhacement for topics creation/update #221

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/config-values.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,44 @@ You can also create your own custom validations. The validations must implement
- com.purbon.kafka.topology.validation.TopologyValidation
- com.purbon.kafka.topology.validation.TopicValidation

Topic overwrite / Update Option and dryrun output
-----------
Regularly when julie applies changes to existing topics it will always overwrite all* the configuration keys with the existing ones. Also, the output of the dryrun will print all (un)changed properties.
With this option, this can be configured. Only the changed topics and values will be shown in the -dryrun log with the following format. Apart from that, it will also only change the keys which were updated via the KafkaAdminClient.

::
{
"Operation" : "com.purbon.kafka.topology.actions.topics.SyncTopicAction",
"Topic" : "project.test-new-topic.a0",
"Action" : "create",
"Changes" : {
"cleanup.policy" : "[Set] delete",
"min.insync.replicas" : "[Set] 3"
}
}
{
"Operation" : "com.purbon.kafka.topology.actions.topics.SyncTopicAction",
"Topic" : "project.test-existing-topic.a0",
"Action" : "update",
"Changes" : {
"cleanup.policy" : "[Update] delete -> cleanup",
"min.insync.replicas" : "[Update] 2 -> 3",
"retention.ms" : "[Unset] 2592000000 -> "
}
}

Legend
[Set] config entry is new
[Unset] config entry is removed
[Update] config entry is updated

Note: num.partitions, replication.factor will never be part of the update as this needs to be applied differently.

An example configuration to only update changed configuration and output it in the dryrun.
::
topology.topic.sync.overwrite: false
**Default value**: true

Prevent ACL for topic creation for connector principal
-----------

Expand Down
1 change: 1 addition & 0 deletions docs/how-to-run-it.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ If you are using the CLI tool, you an use the *--help* command to list the diffe
--help Prints usage information.
--quiet Print minimum status update
--topology <arg> Topology config file.
--plan <arg> Plan config file.
--version Prints useful version information.

The most important ones are:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void run(String[] args) throws Exception {

processTopology(
cmd.getOptionValue(TOPOLOGY_OPTION), cmd.getOptionValue(PLANS_OPTION, "default"), config);
System.out.println("Kafka Topology updated");
System.out.println("Kafka Topology finished successfully");
}

private Map<String, String> parseConfig(CommandLine cmd) {
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class Configuration {
private static final String CONFLUENT_COMMAND_TOPIC_CONFIG = "confluent.command.topic";
private static final String CONFLUENT_METRICS_TOPIC_CONFIG = "confluent.metrics.topic";
static final String TOPIC_PREFIX_FORMAT_CONFIG = "topology.topic.prefix.format";
static final String TOPIC_SYNC_OVERWRITE_CONFIG = "topology.topic.sync.overwrite";
schnaker85 marked this conversation as resolved.
Show resolved Hide resolved

static final String PROJECT_PREFIX_FORMAT_CONFIG = "topology.project.prefix.format";
static final String TOPIC_PREFIX_SEPARATOR_CONFIG = "topology.topic.prefix.separator";
static final String TOPOLOGY_VALIDATIONS_CONFIG = "topology.validations";
Expand Down Expand Up @@ -305,6 +307,10 @@ public Boolean shouldOptimizeAcls() {
return config.getBoolean(OPTIMIZED_ACLS_CONFIG);
}

public Boolean shouldOverwriteTopicsInSync() {
return config.getBoolean(TOPIC_SYNC_OVERWRITE_CONFIG);
}

public String getConfluentCloudEnv() {
return config.getString(CCLOUD_ENV_CONFIG);
}
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/com/purbon/kafka/topology/ExecutionPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -81,9 +82,12 @@ public void run(boolean dryRun) throws IOException {
}

private void execute(Action action, boolean dryRun) throws IOException {
LOGGER.debug(String.format("Execution action %s (dryRun=%s)", action, dryRun));
LOGGER.debug("Execution action {} (dryRun={}})", action, dryRun);
if (dryRun) {
outputStream.println(action);
String actions = action.toString();
if (StringUtils.isNotBlank(actions)) {
schnaker85 marked this conversation as resolved.
Show resolved Hide resolved
outputStream.println(actions);
}
} else {
action.run();
if (action instanceof SyncTopicAction) {
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/com/purbon/kafka/topology/TopicManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ public void apply(Topology topology, ExecutionPlan plan) throws IOException {
(topicName, topic) -> {
plan.add(
new SyncTopicAction(
adminClient, schemaRegistryManager, topic, topicName, listOfTopics));
adminClient,
schemaRegistryManager,
config,
topic,
topicName,
listOfTopics.contains(topicName)));
});

if (config.allowDelete() || config.isAllowDeleteTopics()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ public abstract class BaseAction implements Action {
@Override
public String toString() {
try {
return JSON.asPrettyString(props());
Map<String, Object> props = props();
if (props != null) {
return JSON.asPrettyString(props);
}
return "";
} catch (JsonProcessingException e) {
return "";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package com.purbon.kafka.topology.actions.topics;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.actions.BaseAction;
import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.TopicSchemas;
import com.purbon.kafka.topology.model.schema.Subject;
import com.purbon.kafka.topology.schemas.SchemaRegistryManager;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -20,21 +20,24 @@ public class SyncTopicAction extends BaseAction {

private final Topic topic;
private final String fullTopicName;
private final Set<String> listOfTopics;
private final TopologyBuilderAdminClient adminClient;
private final SchemaRegistryManager schemaRegistryManager;
private final Configuration config;
private final boolean topicExists;

public SyncTopicAction(
TopologyBuilderAdminClient adminClient,
SchemaRegistryManager schemaRegistryManager,
Configuration config,
Topic topic,
String fullTopicName,
Set<String> listOfTopics) {
boolean topicExists) {
this.topic = topic;
this.fullTopicName = fullTopicName;
this.listOfTopics = listOfTopics;
this.adminClient = adminClient;
this.schemaRegistryManager = schemaRegistryManager;
this.topicExists = topicExists;
this.config = config;
}

public String getTopic() {
Expand All @@ -43,20 +46,27 @@ public String getTopic() {

@Override
public void run() throws IOException {
syncTopic(topic, fullTopicName, listOfTopics);
syncTopic(topic, fullTopicName);
}

private void syncTopic(Topic topic, String fullTopicName, Set<String> listOfTopics)
throws IOException {
LOGGER.debug(String.format("Sync topic %s", fullTopicName));
if (existTopic(fullTopicName, listOfTopics)) {
private boolean topicExists() {
return topicExists;
}

private void syncTopic(Topic topic, String fullTopicName) throws IOException {
LOGGER.debug("Sync topic {}", fullTopicName);
if (topicExists()) {
if (topic.partitionsCount() > adminClient.getPartitionCount(fullTopicName)) {
LOGGER.debug(String.format("Update partition count of topic %s", fullTopicName));
LOGGER.debug("Update partition count of topic {}", fullTopicName);
adminClient.updatePartitionCount(topic, fullTopicName);
} else {
LOGGER.warn(
schnaker85 marked this conversation as resolved.
Show resolved Hide resolved
"Skipping topic sync for topic {} due to limitation to decrease partition count of existing topic configuration",
fullTopicName);
}
adminClient.updateTopicConfig(topic, fullTopicName);
adminClient.updateTopicConfig(topic, fullTopicName, false);
schnaker85 marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOGGER.debug(String.format("Create new topic with name %s", fullTopicName));
LOGGER.debug("Create new topic with name {}", fullTopicName);
adminClient.createTopic(topic, fullTopicName);
}

Expand All @@ -83,17 +93,29 @@ private void setCompatibility(String subjectName, Optional<String> compatibility
compatibility -> schemaRegistryManager.setCompatibility(subjectName, compatibility));
}

private boolean existTopic(String topic, Set<String> listOfTopics) {
return listOfTopics.contains(topic);
}

@Override
protected Map<String, Object> props() {
Map<String, Object> map = new HashMap<>();
Map<String, Object> map = new LinkedHashMap<>();
map.put("Operation", getClass().getName());
map.put("Topic", fullTopicName);
String actionName = existTopic(fullTopicName, listOfTopics) ? "update" : "create";
map.put("Action", actionName);
String actionName = topicExists() ? "update" : "create";
Map<String, String> changes = resolveChanges();
if (!changes.isEmpty()) {
map.put("Action", actionName);
map.put("Changes", changes);
return map;
} else if (!config.shouldOverwriteTopicsInSync()) {
return null;
}
return map;
}

private Map<String, String> resolveChanges() {
try {
return this.adminClient.updateTopicConfig(this.topic, this.fullTopicName, true);
} catch (IOException e) {
LOGGER.error(e);
throw new RuntimeException(e);
}
}
}
Loading