Skip to content

Commit

Permalink
enable the use of patterns within the DLQ allow and deny list (#393)
Browse files Browse the repository at this point in the history
  • Loading branch information
purbon committed Dec 9, 2021
1 parent 709ff1f commit eba377f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,20 @@ private Project parseProject(
+ project.getName()
+ ", this might be a required field, be aware.");
} else {
var allowList =
config.getDlqTopicsAllowList().stream()
.map(Pattern::compile)
.collect(Collectors.toList());
var denyList =
config.getDlqTopicsDenyList().stream().map(Pattern::compile).collect(Collectors.toList());
new JsonSerdesUtils<Topic>()
.parseApplicationUser(parser, topicsNode, Topic.class)
.forEach(
topic -> {
project.addTopic(topic); // add normal topic and evaluate
if (config.shouldGenerateDlqTopics()) {
String name = topic.toString();
if (shouldGenerateDlqTopic().apply(name)) {
if (shouldGenerateDlqTopic(allowList, denyList).apply(name)) {
Topic dlqTopic = topic.clone();
dlqTopic.setDlqPrefix(config.getDlqTopicLabel());
dlqTopic.setTopicNamePattern(config.getDlqTopicPrefixFormat());
Expand All @@ -269,14 +275,15 @@ private Project parseProject(
return project;
}

private Function<String, Boolean> shouldGenerateDlqTopic() {
private Function<String, Boolean> shouldGenerateDlqTopic(
List<Pattern> allowList, List<Pattern> denyList) {
return name -> {
var allowList = config.getDlqTopicsAllowList();
var denyList = config.getDlqTopicsDenyList();
boolean isAllowedOrEmpty =
allowList.isEmpty() || (!allowList.isEmpty() && allowList.contains(name));
boolean isNotDeniedOrEmpty =
denyList.isEmpty() || (!denyList.isEmpty() && !denyList.contains(name));
var foundInAllowList =
allowList.stream().map(e -> e.matcher(name).matches()).anyMatch(p -> p);
var foundInDenyList = denyList.stream().map(e -> e.matcher(name).matches()).anyMatch(p -> p);

var isAllowedOrEmpty = allowList.isEmpty() || foundInAllowList;
var isNotDeniedOrEmpty = denyList.isEmpty() || !foundInDenyList;
return isAllowedOrEmpty && isNotDeniedOrEmpty;
};
}
Expand Down
47 changes: 47 additions & 0 deletions src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,53 @@ public void testTopicsWithDLQWithDenyList() {
assertEquals("contextOrg.source.foo.bar.avro", p.getTopics().get(2).toString());
}

@Test
public void testTopicsWithDLQWithDenyListAndPattern() {
Map<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
cliOps.put(CLIENT_CONFIG_OPTION, "/fooBar");

Properties props = new Properties();
props.put(TOPOLOGY_DLQ_TOPICS_GENERATE, "true");
props.put(TOPOLOGY_DQL_TOPICS_DENY_LIST + ".0", "^.*source.foo.foo$");

Configuration config = new Configuration(cliOps, props);

TopologySerdes parser = new TopologySerdes(config, new PlanMap());

Topology topology =
parser.deserialise(TestUtils.getResourceFile("/descriptor-only-topics.yaml"));
Project p = topology.getProjects().get(0);

assertThat(p.getTopics()).hasSize(3);
assertEquals("contextOrg.source.foo.foo", p.getTopics().get(0).toString());
assertEquals("contextOrg.source.foo.bar.avro", p.getTopics().get(1).toString());
assertEquals("contextOrg.source.foo.bar.avro.dlq", p.getTopics().get(2).toString());
}

@Test
public void testTopicsWithDLQWithDenyListAndPatternFullDeny() {
Map<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
cliOps.put(CLIENT_CONFIG_OPTION, "/fooBar");

Properties props = new Properties();
props.put(TOPOLOGY_DLQ_TOPICS_GENERATE, "true");
props.put(TOPOLOGY_DQL_TOPICS_DENY_LIST + ".0", "^.*source.foo.*$");

Configuration config = new Configuration(cliOps, props);

TopologySerdes parser = new TopologySerdes(config, new PlanMap());

Topology topology =
parser.deserialise(TestUtils.getResourceFile("/descriptor-only-topics.yaml"));
Project p = topology.getProjects().get(0);

assertThat(p.getTopics()).hasSize(2);
assertEquals("contextOrg.source.foo.foo", p.getTopics().get(0).toString());
assertEquals("contextOrg.source.foo.bar.avro", p.getTopics().get(1).toString());
}

@Test
public void testTopicsWithDLQWithTopicPattern() {
Map<String, String> cliOps = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -83,7 +82,7 @@ public void testExpectedFlow() throws IOException, InterruptedException {

HashMap<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, container.getBootstrapServers());
props.put(JULIE_KAFKA_CONSUMER_GROUP_ID, "julieops"+System.currentTimeMillis());
props.put(JULIE_KAFKA_CONSUMER_GROUP_ID, "julieops" + System.currentTimeMillis());
Configuration config = new Configuration(cliOps, props);
newBackend.configure(config);

Expand Down

0 comments on commit eba377f

Please sign in to comment.