From 62948083eb1f6bce5f746e72051ea2b976438ba7 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 10 Nov 2021 13:31:58 -0500 Subject: [PATCH 1/3] feat: add exactly-once to kstreams and create transactionalid permissions --- .../topology/BindingsBuilderProvider.java | 6 ++- .../builders/BuildBindingsForKStreams.java | 6 ++- .../kafka/topology/model/users/KStream.java | 24 +++++++++- .../roles/acls/AclsBindingsBuilder.java | 23 ++++++++-- .../roles/rbac/RBACBindingsBuilder.java | 12 ++++- .../serdes/TopologyCustomDeserializer.java | 44 ++++++++++++------- .../topology/AccessControlManagerTest.java | 6 ++- .../topology/AclsBindingsBuilderTest.java | 36 ++++++++++++++- .../kafka/topology/TopologySerdesTest.java | 4 +- .../BuildBindingsForKStreamsTest.java | 8 ++-- 10 files changed, 135 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java b/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java index 69b17deb7..1d0f90c17 100644 --- a/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java +++ b/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java @@ -21,7 +21,11 @@ public interface BindingsBuilderProvider { List buildBindingsForConnect(Connector connector, String topicPrefix); List buildBindingsForStreamsApp( - String principal, String topicPrefix, List readTopics, List writeTopics); + String principal, + String topicPrefix, + List readTopics, + List writeTopics, + boolean eos); List buildBindingsForConsumers( Collection consumers, String resource, boolean prefixed); diff --git a/src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreams.java b/src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreams.java index a630b363d..f0dcd42e9 100644 --- a/src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreams.java +++ b/src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreams.java @@ -35,7 +35,11 @@ protected void execute() throws IOException { bindings = builderProvider.buildBindingsForStreamsApp( - app.getPrincipal(), prefix, readTopics, writeTopics); + app.getPrincipal(), + prefix, + readTopics, + writeTopics, + app.getExactlyOnce().orElse(false)); } @Override diff --git a/src/main/java/com/purbon/kafka/topology/model/users/KStream.java b/src/main/java/com/purbon/kafka/topology/model/users/KStream.java index 7604d8e2b..fce287528 100644 --- a/src/main/java/com/purbon/kafka/topology/model/users/KStream.java +++ b/src/main/java/com/purbon/kafka/topology/model/users/KStream.java @@ -5,6 +5,7 @@ import com.purbon.kafka.topology.model.DynamicUser; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; public class KStream extends DynamicUser { @@ -12,25 +13,44 @@ public class KStream extends DynamicUser { @JsonInclude(Include.NON_EMPTY) private Optional applicationId; + private Optional exactlyOnce; + public KStream() { this("", new HashMap<>()); } public KStream( - String principal, HashMap> topics, Optional applicationId) { + String principal, + Map> topics, + Optional applicationId, + Optional exactlyOnce) { super(principal, topics); this.applicationId = applicationId; + this.exactlyOnce = exactlyOnce; + } + + public KStream( + String principal, HashMap> topics, Optional applicationId) { + this(principal, topics, applicationId, Optional.empty()); } public KStream(String principal, HashMap> topics) { - this(principal, topics, Optional.empty()); + this(principal, topics, Optional.empty(), Optional.empty()); } public Optional getApplicationId() { return applicationId; } + public Optional getExactlyOnce() { + return exactlyOnce; + } + public void setApplicationId(Optional applicationId) { this.applicationId = applicationId; } + + public void setExactlyOnce(Optional exactlyOnce) { + this.exactlyOnce = exactlyOnce; + } } diff --git a/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java index 87a2f5f85..8ca645b7b 100644 --- a/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java @@ -96,8 +96,13 @@ public List buildBindingsForConnect( @Override public List buildBindingsForStreamsApp( - String principal, String topicPrefix, List readTopics, List writeTopics) { - return toList(streamsAppStream(translate(principal), topicPrefix, readTopics, writeTopics)); + String principal, + String topicPrefix, + List readTopics, + List writeTopics, + boolean eos) { + return toList( + streamsAppStream(translate(principal), topicPrefix, readTopics, writeTopics, eos)); } @Override @@ -229,7 +234,11 @@ private Stream consumerAclsStream(Consumer consumer, String topic, b } private Stream streamsAppStream( - String principal, String prefix, List readTopics, List writeTopics) { + String principal, + String prefix, + List readTopics, + List writeTopics, + boolean eos) { List acls = new ArrayList<>(); @@ -246,6 +255,14 @@ private Stream streamsAppStream( acls.add(buildGroupLevelAcl(principal, prefix, PatternType.PREFIXED, AclOperation.READ)); + if (eos) { + acls.add( + buildTransactionIdLevelAcl(principal, prefix, PatternType.PREFIXED, AclOperation.WRITE)); + acls.add( + buildTransactionIdLevelAcl( + principal, prefix, PatternType.PREFIXED, AclOperation.DESCRIBE)); + } + return acls.stream(); } diff --git a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java index 2e7d1dda0..e52ab6336 100644 --- a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java @@ -99,7 +99,11 @@ public List buildBindingsForConnect(Connector connector, Str @Override public List buildBindingsForStreamsApp( - String principal, String topicPrefix, List readTopics, List writeTopics) { + String principal, + String topicPrefix, + List readTopics, + List writeTopics, + boolean eos) { List bindings = new ArrayList<>(); TopologyAclBinding binding = apiClient.bind(principal, DEVELOPER_READ, topicPrefix, PREFIX); @@ -118,6 +122,10 @@ public List buildBindingsForStreamsApp( bindings.add(writeBinding); }); + if (eos) { + bindings.add(apiClient.bind(principal, DEVELOPER_WRITE, topicPrefix, "TransactionalId", PREFIX)); + } + binding = apiClient.bind(principal, RESOURCE_OWNER, topicPrefix, PREFIX); bindings.add(binding); binding = apiClient.bind(principal, RESOURCE_OWNER, topicPrefix, "Group", PREFIX); @@ -318,7 +326,7 @@ public Collection buildBindingsForKSqlApp(KSqlApp app, Strin // schema access List subjects = readTopics.stream() - .flatMap((Function, Stream>) topics -> topics.stream()) + .flatMap((Function, Stream>) Collection::stream) .map(topicName -> String.format("%s-value", topicName)) .collect(Collectors.toList()); diff --git a/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java b/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java index dff3176a8..9ef5001fb 100644 --- a/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java +++ b/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java @@ -379,28 +379,40 @@ private Optional doKSqlElements(JsonParser parser, JsonNode node } private Optional doStreamsElements(JsonParser parser, JsonNode node) - throws IOException { + throws IOException { List streams = - new JsonSerdesUtils().parseApplicationUser(parser, node, KStream.class) - .stream() - .map(ks -> { - ks.getTopics().putIfAbsent(KStream.READ_TOPICS, Collections.emptyList()); - ks.getTopics().putIfAbsent(KStream.WRITE_TOPICS, Collections.emptyList()); - return ks; - }).collect(Collectors.toList()); - - for(KStream ks : streams) { + new JsonSerdesUtils() + .parseApplicationUser(parser, node, KStream.class).stream() + .map( + ks -> { + ks.getTopics().putIfAbsent(KStream.READ_TOPICS, Collections.emptyList()); + ks.getTopics().putIfAbsent(KStream.WRITE_TOPICS, Collections.emptyList()); + return ks; + }) + .collect(Collectors.toList()); + + for (KStream ks : streams) { var topics = ks.getTopics(); if (topics.get(KStream.READ_TOPICS).isEmpty() || topics.get(KStream.WRITE_TOPICS).isEmpty()) { - LOGGER.warn("A Kafka Streams application with Id ("+ks.getApplicationId()+") and Principal ("+ks.getPrincipal()+")" + - " might require both read and write topics as per its " + - "nature it is always reading and writing into Apache Kafka, be aware if you notice problems."); + LOGGER.warn( + "A Kafka Streams application with Id (" + + ks.getApplicationId() + + ") and Principal (" + + ks.getPrincipal() + + ")" + + " might require both read and write topics as per its " + + "nature it is always reading and writing into Apache Kafka, be aware if you notice problems."); } if (topics.get(KStream.READ_TOPICS).isEmpty()) { - // should have at minimum read topics defined as we could think of write topics as internal topics being + // should have at minimum read topics defined as we could think of write topics as internal + // topics being // auto created. - throw new IOException("Kafka Streams application with Id "+ks.getApplicationId()+" and principal "+ks.getPrincipal()+ - " have missing read topics. This field is required."); + throw new IOException( + "Kafka Streams application with Id " + + ks.getApplicationId() + + " and principal " + + ks.getPrincipal() + + " have missing read topics. This field is required."); } } return Optional.of(new PlatformSystem(streams)); diff --git a/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java b/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java index feed7811c..09e8b7fa2 100644 --- a/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java +++ b/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java @@ -246,13 +246,15 @@ public void newKafkaStreamsAppACLsCreation() throws IOException { "User:App0", project.namePrefix(), topics.get(KStream.READ_TOPICS), - topics.get(KStream.WRITE_TOPICS)); + topics.get(KStream.WRITE_TOPICS), + false); verify(aclsBuilder, times(1)) .buildBindingsForStreamsApp( eq("User:App0"), eq(project.namePrefix()), eq(topics.get(KStream.READ_TOPICS)), - eq(topics.get(KStream.WRITE_TOPICS))); + eq(topics.get(KStream.WRITE_TOPICS)), + eq(false)); } @Test diff --git a/src/test/java/com/purbon/kafka/topology/AclsBindingsBuilderTest.java b/src/test/java/com/purbon/kafka/topology/AclsBindingsBuilderTest.java index b540c66dd..8794fca8f 100644 --- a/src/test/java/com/purbon/kafka/topology/AclsBindingsBuilderTest.java +++ b/src/test/java/com/purbon/kafka/topology/AclsBindingsBuilderTest.java @@ -16,6 +16,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Optional; import java.util.Properties; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; @@ -85,6 +86,39 @@ public void testProducerAclsBuilder() { buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.DESCRIBE)); } + @Test + public void testStreamsWithTxIdAclsBuilder() { + KStream producer = + new KStream( + "User:foo", + singletonMap("read", singletonList("bar")), + Optional.of("app1"), + Optional.of(true)); + List aclBindings = + builder.buildBindingsForStreamsApp( + "User:foo", "app1", singletonList("bar"), emptyList(), true); + assertThat(aclBindings.size()).isEqualTo(5); + + assertThat(aclBindings) + .contains(buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.READ)); + + assertThat(aclBindings) + .contains( + buildTransactionIdLevelAcl( + producer.getPrincipal(), + producer.getApplicationId().get(), + PatternType.PREFIXED, + AclOperation.DESCRIBE)); + + assertThat(aclBindings) + .contains( + buildTransactionIdLevelAcl( + producer.getPrincipal(), + producer.getApplicationId().get(), + PatternType.PREFIXED, + AclOperation.WRITE)); + } + @Test public void testProducerWithTxIdAclsBuilder() { Producer producer = new Producer("User:foo", "1234", true); @@ -282,7 +316,7 @@ public void testStreamsAclsBuilder() { List bindings = builder.buildBindingsForStreamsApp( - stream.getPrincipal(), "prefix", readTopics, writeTopics); + stream.getPrincipal(), "prefix", readTopics, writeTopics, false); assertThat(bindings.size()).isEqualTo(4); assertThat(bindings) diff --git a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java index f74f81e47..4e54be59f 100644 --- a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java +++ b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java @@ -73,11 +73,11 @@ public void testMetadata() { @Test public void testStreamsParsingOnlyReadTopicsShouldNotParseAsNull() { Topology topology = - parser.deserialise(TestUtils.getResourceFile("/descriptor-streams-only-read.yaml")); + parser.deserialise(TestUtils.getResourceFile("/descriptor-streams-only-read.yaml")); Project p = topology.getProjects().get(0); - for(KStream s : p.getStreams()) { + for (KStream s : p.getStreams()) { assertThat(s.getTopics().get(KStream.READ_TOPICS)).isNotNull(); assertThat(s.getTopics().get(KStream.READ_TOPICS)).isInstanceOf(List.class); assertThat(s.getTopics().get(KStream.READ_TOPICS)).contains("topicA", "topicB"); diff --git a/src/test/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreamsTest.java b/src/test/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreamsTest.java index bdab6fde5..0d4eb500c 100644 --- a/src/test/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreamsTest.java +++ b/src/test/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreamsTest.java @@ -37,12 +37,12 @@ public void testStreamsWithoutApplicationId() throws IOException { assertThat(action.getBindings()) .anyMatch( b -> - b.getResourceType() == ResourceType.TOPIC.name() + b.getResourceType().equals(ResourceType.TOPIC.name()) && b.getResourceName().equals(topicPrefix)); assertThat(action.getBindings()) .anyMatch( b -> - b.getResourceType() == ResourceType.GROUP.name() + b.getResourceType().equals(ResourceType.GROUP.name()) && b.getResourceName().equals(topicPrefix)); } @@ -59,12 +59,12 @@ public void testStreamWithApplicationId() throws IOException { assertThat(action.getBindings()) .anyMatch( b -> - b.getResourceType() == ResourceType.TOPIC.name() + b.getResourceType().equals(ResourceType.TOPIC.name()) && b.getResourceName().equals(applicationId)); assertThat(action.getBindings()) .anyMatch( b -> - b.getResourceType() == ResourceType.GROUP.name() + b.getResourceType().equals(ResourceType.GROUP.name()) && b.getResourceName().equals(applicationId)); } } From 6a690b874be19ef79bd5d8988570256362e69057 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 10 Nov 2021 13:54:12 -0500 Subject: [PATCH 2/3] reformat --- .../java/com/purbon/kafka/topology/model/users/KStream.java | 1 + .../purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/purbon/kafka/topology/model/users/KStream.java b/src/main/java/com/purbon/kafka/topology/model/users/KStream.java index fce287528..7fb39dd36 100644 --- a/src/main/java/com/purbon/kafka/topology/model/users/KStream.java +++ b/src/main/java/com/purbon/kafka/topology/model/users/KStream.java @@ -13,6 +13,7 @@ public class KStream extends DynamicUser { @JsonInclude(Include.NON_EMPTY) private Optional applicationId; + @JsonInclude(Include.NON_EMPTY) private Optional exactlyOnce; public KStream() { diff --git a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java index e52ab6336..e9be10bd8 100644 --- a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java @@ -123,7 +123,8 @@ public List buildBindingsForStreamsApp( }); if (eos) { - bindings.add(apiClient.bind(principal, DEVELOPER_WRITE, topicPrefix, "TransactionalId", PREFIX)); + bindings.add( + apiClient.bind(principal, DEVELOPER_WRITE, topicPrefix, "TransactionalId", PREFIX)); } binding = apiClient.bind(principal, RESOURCE_OWNER, topicPrefix, PREFIX); From 1f467837b61b4fa4562d39502a39f0f4f9ea9c5a Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 11 Nov 2021 08:02:47 -0500 Subject: [PATCH 3/3] refactor: initialize eos --- .../actions/access/builders/BuildBindingsForKStreams.java | 2 +- .../java/com/purbon/kafka/topology/model/users/KStream.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreams.java b/src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreams.java index f0dcd42e9..9d7b105a6 100644 --- a/src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreams.java +++ b/src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForKStreams.java @@ -39,7 +39,7 @@ protected void execute() throws IOException { prefix, readTopics, writeTopics, - app.getExactlyOnce().orElse(false)); + app.getExactlyOnce().get()); } @Override diff --git a/src/main/java/com/purbon/kafka/topology/model/users/KStream.java b/src/main/java/com/purbon/kafka/topology/model/users/KStream.java index 7fb39dd36..bb68c6a96 100644 --- a/src/main/java/com/purbon/kafka/topology/model/users/KStream.java +++ b/src/main/java/com/purbon/kafka/topology/model/users/KStream.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import org.apache.zookeeper.Op; public class KStream extends DynamicUser { @@ -32,11 +33,11 @@ public KStream( public KStream( String principal, HashMap> topics, Optional applicationId) { - this(principal, topics, applicationId, Optional.empty()); + this(principal, topics, applicationId, Optional.of(Boolean.FALSE)); } public KStream(String principal, HashMap> topics) { - this(principal, topics, Optional.empty(), Optional.empty()); + this(principal, topics, Optional.empty(), Optional.of(Boolean.FALSE)); } public Optional getApplicationId() {