Skip to content

Commit

Permalink
feat: add exactly-once to kstreams and create transactionalid ACLs (#388
Browse files Browse the repository at this point in the history
)

* feat: add exactly-once to kstreams and create transactionalid permissions

* reformat

* refactor: initialize eos
  • Loading branch information
jeqo authored Nov 12, 2021
1 parent 0bd4e81 commit 2db3160
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ public interface BindingsBuilderProvider {
List<TopologyAclBinding> buildBindingsForConnect(Connector connector, String topicPrefix);

List<TopologyAclBinding> buildBindingsForStreamsApp(
String principal, String topicPrefix, List<String> readTopics, List<String> writeTopics);
String principal,
String topicPrefix,
List<String> readTopics,
List<String> writeTopics,
boolean eos);

List<TopologyAclBinding> buildBindingsForConsumers(
Collection<Consumer> consumers, String resource, boolean prefixed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ protected void execute() throws IOException {

bindings =
builderProvider.buildBindingsForStreamsApp(
app.getPrincipal(), prefix, readTopics, writeTopics);
app.getPrincipal(),
prefix,
readTopics,
writeTopics,
app.getExactlyOnce().get());
}

@Override
Expand Down
26 changes: 24 additions & 2 deletions src/main/java/com/purbon/kafka/topology/model/users/KStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,54 @@
import com.purbon.kafka.topology.model.DynamicUser;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.zookeeper.Op;

public class KStream extends DynamicUser {

@JsonInclude(Include.NON_EMPTY)
private Optional<String> applicationId;

@JsonInclude(Include.NON_EMPTY)
private Optional<Boolean> exactlyOnce;

public KStream() {
this("", new HashMap<>());
}

public KStream(
String principal, HashMap<String, List<String>> topics, Optional<String> applicationId) {
String principal,
Map<String, List<String>> topics,
Optional<String> applicationId,
Optional<Boolean> exactlyOnce) {
super(principal, topics);
this.applicationId = applicationId;
this.exactlyOnce = exactlyOnce;
}

public KStream(
String principal, HashMap<String, List<String>> topics, Optional<String> applicationId) {
this(principal, topics, applicationId, Optional.of(Boolean.FALSE));
}

public KStream(String principal, HashMap<String, List<String>> topics) {
this(principal, topics, Optional.empty());
this(principal, topics, Optional.empty(), Optional.of(Boolean.FALSE));
}

public Optional<String> getApplicationId() {
return applicationId;
}

public Optional<Boolean> getExactlyOnce() {
return exactlyOnce;
}

public void setApplicationId(Optional<String> applicationId) {
this.applicationId = applicationId;
}

public void setExactlyOnce(Optional<Boolean> exactlyOnce) {
this.exactlyOnce = exactlyOnce;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,13 @@ public List<TopologyAclBinding> buildBindingsForConnect(

@Override
public List<TopologyAclBinding> buildBindingsForStreamsApp(
String principal, String topicPrefix, List<String> readTopics, List<String> writeTopics) {
return toList(streamsAppStream(translate(principal), topicPrefix, readTopics, writeTopics));
String principal,
String topicPrefix,
List<String> readTopics,
List<String> writeTopics,
boolean eos) {
return toList(
streamsAppStream(translate(principal), topicPrefix, readTopics, writeTopics, eos));
}

@Override
Expand Down Expand Up @@ -229,7 +234,11 @@ private Stream<AclBinding> consumerAclsStream(Consumer consumer, String topic, b
}

private Stream<AclBinding> streamsAppStream(
String principal, String prefix, List<String> readTopics, List<String> writeTopics) {
String principal,
String prefix,
List<String> readTopics,
List<String> writeTopics,
boolean eos) {

List<AclBinding> acls = new ArrayList<>();

Expand All @@ -246,6 +255,14 @@ private Stream<AclBinding> streamsAppStream(

acls.add(buildGroupLevelAcl(principal, prefix, PatternType.PREFIXED, AclOperation.READ));

if (eos) {
acls.add(
buildTransactionIdLevelAcl(principal, prefix, PatternType.PREFIXED, AclOperation.WRITE));
acls.add(
buildTransactionIdLevelAcl(
principal, prefix, PatternType.PREFIXED, AclOperation.DESCRIBE));
}

return acls.stream();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ public List<TopologyAclBinding> buildBindingsForConnect(Connector connector, Str

@Override
public List<TopologyAclBinding> buildBindingsForStreamsApp(
String principal, String topicPrefix, List<String> readTopics, List<String> writeTopics) {
String principal,
String topicPrefix,
List<String> readTopics,
List<String> writeTopics,
boolean eos) {
List<TopologyAclBinding> bindings = new ArrayList<>();

TopologyAclBinding binding = apiClient.bind(principal, DEVELOPER_READ, topicPrefix, PREFIX);
Expand All @@ -118,6 +122,11 @@ public List<TopologyAclBinding> buildBindingsForStreamsApp(
bindings.add(writeBinding);
});

if (eos) {
bindings.add(
apiClient.bind(principal, DEVELOPER_WRITE, topicPrefix, "TransactionalId", PREFIX));
}

binding = apiClient.bind(principal, RESOURCE_OWNER, topicPrefix, PREFIX);
bindings.add(binding);
binding = apiClient.bind(principal, RESOURCE_OWNER, topicPrefix, "Group", PREFIX);
Expand Down Expand Up @@ -318,7 +327,7 @@ public Collection<TopologyAclBinding> buildBindingsForKSqlApp(KSqlApp app, Strin
// schema access
List<String> subjects =
readTopics.stream()
.flatMap((Function<List<String>, Stream<String>>) topics -> topics.stream())
.flatMap((Function<List<String>, Stream<String>>) Collection::stream)
.map(topicName -> String.format("%s-value", topicName))
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,15 @@ public void newKafkaStreamsAppACLsCreation() throws IOException {
"User:App0",
project.namePrefix(),
topics.get(KStream.READ_TOPICS),
topics.get(KStream.WRITE_TOPICS));
topics.get(KStream.WRITE_TOPICS),
false);
verify(aclsBuilder, times(1))
.buildBindingsForStreamsApp(
eq("User:App0"),
eq(project.namePrefix()),
eq(topics.get(KStream.READ_TOPICS)),
eq(topics.get(KStream.WRITE_TOPICS)));
eq(topics.get(KStream.WRITE_TOPICS)),
eq(false));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
Expand Down Expand Up @@ -85,6 +86,39 @@ public void testProducerAclsBuilder() {
buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.DESCRIBE));
}

@Test
public void testStreamsWithTxIdAclsBuilder() {
KStream producer =
new KStream(
"User:foo",
singletonMap("read", singletonList("bar")),
Optional.of("app1"),
Optional.of(true));
List<TopologyAclBinding> aclBindings =
builder.buildBindingsForStreamsApp(
"User:foo", "app1", singletonList("bar"), emptyList(), true);
assertThat(aclBindings.size()).isEqualTo(5);

assertThat(aclBindings)
.contains(buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.READ));

assertThat(aclBindings)
.contains(
buildTransactionIdLevelAcl(
producer.getPrincipal(),
producer.getApplicationId().get(),
PatternType.PREFIXED,
AclOperation.DESCRIBE));

assertThat(aclBindings)
.contains(
buildTransactionIdLevelAcl(
producer.getPrincipal(),
producer.getApplicationId().get(),
PatternType.PREFIXED,
AclOperation.WRITE));
}

@Test
public void testProducerWithTxIdAclsBuilder() {
Producer producer = new Producer("User:foo", "1234", true);
Expand Down Expand Up @@ -282,7 +316,7 @@ public void testStreamsAclsBuilder() {

List<TopologyAclBinding> bindings =
builder.buildBindingsForStreamsApp(
stream.getPrincipal(), "prefix", readTopics, writeTopics);
stream.getPrincipal(), "prefix", readTopics, writeTopics, false);

assertThat(bindings.size()).isEqualTo(4);
assertThat(bindings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ public void testStreamsWithoutApplicationId() throws IOException {
assertThat(action.getBindings())
.anyMatch(
b ->
b.getResourceType() == ResourceType.TOPIC.name()
b.getResourceType().equals(ResourceType.TOPIC.name())
&& b.getResourceName().equals(topicPrefix));
assertThat(action.getBindings())
.anyMatch(
b ->
b.getResourceType() == ResourceType.GROUP.name()
b.getResourceType().equals(ResourceType.GROUP.name())
&& b.getResourceName().equals(topicPrefix));
}

Expand All @@ -59,12 +59,12 @@ public void testStreamWithApplicationId() throws IOException {
assertThat(action.getBindings())
.anyMatch(
b ->
b.getResourceType() == ResourceType.TOPIC.name()
b.getResourceType().equals(ResourceType.TOPIC.name())
&& b.getResourceName().equals(applicationId));
assertThat(action.getBindings())
.anyMatch(
b ->
b.getResourceType() == ResourceType.GROUP.name()
b.getResourceType().equals(ResourceType.GROUP.name())
&& b.getResourceName().equals(applicationId));
}
}

0 comments on commit 2db3160

Please sign in to comment.