Skip to content

Commit

Permalink
Refactor bindings building (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
sverrehu authored Sep 7, 2021
1 parent bd9bb68 commit 3f96dfa
Show file tree
Hide file tree
Showing 33 changed files with 435 additions and 532 deletions.
183 changes: 97 additions & 86 deletions src/main/java/com/purbon/kafka/topology/AccessControlManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import com.purbon.kafka.topology.model.users.platform.KsqlServerInstance;
import com.purbon.kafka.topology.model.users.platform.SchemaRegistryInstance;
import com.purbon.kafka.topology.roles.TopologyAclBinding;
import com.purbon.kafka.topology.utils.Either;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -69,9 +67,10 @@ public AccessControlManager(
*/
@Override
public void updatePlan(ExecutionPlan plan, final Topology topology) throws IOException {
List<Action> actions = buildProjectActions(topology);
actions.addAll(buildPlatformLevelActions(topology));
buildUpdateBindingsActions(actions, loadActualClusterStateIfAvailable(plan)).forEach(plan::add);
List<AclBindingsResult> aclBindingsResults = buildProjectAclBindings(topology);
aclBindingsResults.addAll(buildPlatformLevelActions(topology));
buildUpdateBindingsActions(aclBindingsResults, loadActualClusterStateIfAvailable(plan))
.forEach(plan::add);
}

private Set<TopologyAclBinding> loadActualClusterStateIfAvailable(ExecutionPlan plan) {
Expand Down Expand Up @@ -100,63 +99,69 @@ private Set<TopologyAclBinding> providerBindings() {
* @param topology A topology file
* @return List<Action> A list of actions required based on the parameters
*/
private List<Action> buildProjectActions(Topology topology) {
List<Action> actions = new ArrayList<>();
private List<AclBindingsResult> buildProjectAclBindings(Topology topology) {
List<AclBindingsResult> aclBindingsResults = new ArrayList<>();

for (Project project : topology.getProjects()) {
if (config.shouldOptimizeAcls()) {
actions.addAll(buildOptimizeConsumerAndProducerAcls(project));
aclBindingsResults.addAll(buildOptimizeConsumerAndProducerAcls(project));
} else {
actions.addAll(buildDetailedConsumerAndProducerAcls(project));
aclBindingsResults.addAll(buildDetailedConsumerAndProducerAcls(project));
}
// Setup global Kafka Stream Access control lists
String topicPrefix = project.namePrefix();
for (KStream app : project.getStreams()) {
syncApplicationAcls(app, topicPrefix).ifPresent(actions::add);
syncApplicationAcls(app, topicPrefix).ifPresent(aclBindingsResults::add);
}
for (KSqlApp kSqlApp : project.getKSqls()) {
syncApplicationAcls(kSqlApp, topicPrefix).ifPresent(actions::add);
syncApplicationAcls(kSqlApp, topicPrefix).ifPresent(aclBindingsResults::add);
}
for (Connector connector : project.getConnectors()) {
syncApplicationAcls(connector, topicPrefix).ifPresent(actions::add);
syncApplicationAcls(connector, topicPrefix).ifPresent(aclBindingsResults::add);
connector
.getConnectors()
.ifPresent(
(list) -> {
actions.add(
new BuildBindingsForConnectorAuthorization(bindingsBuilder, connector));
aclBindingsResults.add(
new ConnectorAuthorizationAclBindingsBuilder(bindingsBuilder, connector)
.getAclBindings());
});
}

for (Schemas schemaAuthorization : project.getSchemas()) {
actions.add(new BuildBindingsForSchemaAuthorization(bindingsBuilder, schemaAuthorization));
aclBindingsResults.add(
new SchemaAuthorizationAclBindingsBuilder(bindingsBuilder, schemaAuthorization)
.getAclBindings());
}

syncRbacRawRoles(project.getRbacRawRoles(), topicPrefix, actions);
syncRbacRawRoles(project.getRbacRawRoles(), topicPrefix, aclBindingsResults);
}
return actions;
return aclBindingsResults;
}

private List<Action> buildOptimizeConsumerAndProducerAcls(Project project) {
List<Action> actions = new ArrayList<>();
actions.add(
new BuildBindingsForConsumer(
bindingsBuilder, project.getConsumers(), project.namePrefix(), true));
actions.add(
new BuildBindingsForProducer(
bindingsBuilder, project.getProducers(), project.namePrefix(), true));
private List<AclBindingsResult> buildOptimizeConsumerAndProducerAcls(Project project) {
List<AclBindingsResult> aclBindingsResults = new ArrayList<>();
aclBindingsResults.add(
new ConsumerAclBindingsBuilder(
bindingsBuilder, project.getConsumers(), project.namePrefix(), true)
.getAclBindings());
aclBindingsResults.add(
new ProducerAclBindingsBuilder(
bindingsBuilder, project.getProducers(), project.namePrefix(), true)
.getAclBindings());

// When optimised, still need to add any topic level specific.
actions.addAll(buildBasicUsersAcls(project, false));
return actions;
aclBindingsResults.addAll(buildBasicUsersAcls(project, false));
return aclBindingsResults;
}

private List<Action> buildDetailedConsumerAndProducerAcls(Project project) {
private List<AclBindingsResult> buildDetailedConsumerAndProducerAcls(Project project) {
return buildBasicUsersAcls(project, true);
}

private List<Action> buildBasicUsersAcls(Project project, boolean includeProjectLevel) {
List<Action> actions = new ArrayList<>();
private List<AclBindingsResult> buildBasicUsersAcls(
Project project, boolean includeProjectLevel) {
List<AclBindingsResult> aclBindingsResults = new ArrayList<>();
project
.getTopics()
.forEach(
Expand All @@ -167,66 +172,55 @@ private List<Action> buildBasicUsersAcls(Project project, boolean includeProject
consumers.addAll(project.getConsumers());
}
if (!consumers.isEmpty()) {
Action action =
new BuildBindingsForConsumer(
bindingsBuilder, new ArrayList<>(consumers), fullTopicName, false);
actions.add(action);
AclBindingsResult aclBindingsResult =
new ConsumerAclBindingsBuilder(
bindingsBuilder, new ArrayList<>(consumers), fullTopicName, false)
.getAclBindings();
aclBindingsResults.add(aclBindingsResult);
}
Set<Producer> producers = new HashSet(topic.getProducers());
if (includeProjectLevel) {
producers.addAll(project.getProducers());
}
if (!producers.isEmpty()) {
Action action =
new BuildBindingsForProducer(
bindingsBuilder, new ArrayList<>(producers), fullTopicName, false);
actions.add(action);
AclBindingsResult aclBindingsResult =
new ProducerAclBindingsBuilder(
bindingsBuilder, new ArrayList<>(producers), fullTopicName, false)
.getAclBindings();
aclBindingsResults.add(aclBindingsResult);
}
});
return actions;
return aclBindingsResults;
}

/**
* Build a list of actions required to create or delete necessary bindings
*
* @param actions List of pre computed actions based on a topology
* @param aclBindingsResults List of pre computed actions based on a topology
* @param bindings List of current bindings available in the cluster
* @return List<Action> list of actions necessary to update the cluster
*/
private List<Action> buildUpdateBindingsActions(
List<Action> actions, Set<TopologyAclBinding> bindings) throws IOException {
List<AclBindingsResult> aclBindingsResults, Set<TopologyAclBinding> bindings)
throws IOException {

List<Action> updateActions = new ArrayList<>();

List<Either> eitherStreamOrError =
actions.stream()
.map(
action -> {
try {
action.run();
return Either.Left(action.getBindings().stream());
} catch (IOException e) {
return Either.Right(e);
}
})
final List<String> errorMessages =
aclBindingsResults.stream()
.filter(AclBindingsResult::isError)
.map(AclBindingsResult::getErrorMessage)
.collect(Collectors.toList());

List<IOException> errors =
eitherStreamOrError.stream()
.filter(Either::isRight)
.map(e -> (IOException) e.getRight().get())
.collect(Collectors.toList());
if (!errors.isEmpty()) {
for (IOException err : errors) {
LOGGER.error(err);
if (!errorMessages.isEmpty()) {
for (String errorMessage : errorMessages) {
LOGGER.error(errorMessage);
}
throw errors.get(0);
throw new IOException(errorMessages.get(0));
}

Set<TopologyAclBinding> allFinalBindings =
eitherStreamOrError.stream()
.filter(Either::isLeft)
.flatMap(e -> (Stream<TopologyAclBinding>) e.getLeft().get())
aclBindingsResults.stream()
.flatMap(aboe -> aboe.getAclBindings().stream())
.collect(Collectors.toSet());

Set<TopologyAclBinding> bindingsToBeCreated =
Expand Down Expand Up @@ -295,63 +289,80 @@ private boolean matchesPrefix(List<String> prefixes, String item, String type) {
}

// Sync platform relevant Access Control List.
private List<Action> buildPlatformLevelActions(final Topology topology) {
List<Action> actions = new ArrayList<>();
private List<AclBindingsResult> buildPlatformLevelActions(final Topology topology) {
List<AclBindingsResult> aclBindingsResults = new ArrayList<>();
Platform platform = topology.getPlatform();

// Set cluster level ACLs
syncClusterLevelRbac(platform.getKafka().getRbac(), KAFKA, actions);
syncClusterLevelRbac(platform.getKafkaConnect().getRbac(), KAFKA_CONNECT, actions);
syncClusterLevelRbac(platform.getSchemaRegistry().getRbac(), SCHEMA_REGISTRY, actions);
syncClusterLevelRbac(platform.getKafka().getRbac(), KAFKA, aclBindingsResults);
syncClusterLevelRbac(platform.getKafkaConnect().getRbac(), KAFKA_CONNECT, aclBindingsResults);
syncClusterLevelRbac(
platform.getSchemaRegistry().getRbac(), SCHEMA_REGISTRY, aclBindingsResults);

// Set component level ACLs
for (SchemaRegistryInstance schemaRegistry : platform.getSchemaRegistry().getInstances()) {
actions.add(new BuildBindingsForSchemaRegistry(bindingsBuilder, schemaRegistry));
aclBindingsResults.add(
new SchemaRegistryAclBindingsBuilder(bindingsBuilder, schemaRegistry).getAclBindings());
}
for (ControlCenterInstance controlCenter : platform.getControlCenter().getInstances()) {
actions.add(new BuildBindingsForControlCenter(bindingsBuilder, controlCenter));
aclBindingsResults.add(
new ControlCenterAclBindingsBuilder(bindingsBuilder, controlCenter).getAclBindings());
}

for (KsqlServerInstance ksqlServer : platform.getKsqlServer().getInstances()) {
actions.add(new BuildBindingsForKSqlServer(bindingsBuilder, ksqlServer));
aclBindingsResults.add(
new KSqlServerAclBindingsBuilder(bindingsBuilder, ksqlServer).getAclBindings());
}

return actions;
return aclBindingsResults;
}

private void syncClusterLevelRbac(
Optional<Map<String, List<User>>> rbac, Component cmp, List<Action> actions) {
Optional<Map<String, List<User>>> rbac,
Component cmp,
List<AclBindingsResult> aclBindingsResults) {
if (rbac.isPresent()) {
Map<String, List<User>> roles = rbac.get();
for (String role : roles.keySet()) {
for (User user : roles.get(role)) {
actions.add(new BuildClusterLevelBinding(bindingsBuilder, role, user, cmp));
aclBindingsResults.add(
new ClusterLevelAclBindingsBuilder(bindingsBuilder, role, user, cmp)
.getAclBindings());
}
}
}
}

private void syncRbacRawRoles(
Map<String, List<String>> rbacRawRoles, String topicPrefix, List<Action> actions) {
Map<String, List<String>> rbacRawRoles,
String topicPrefix,
List<AclBindingsResult> aclBindingsResults) {
rbacRawRoles.forEach(
(predefinedRole, principals) ->
principals.forEach(
principal ->
actions.add(
new BuildPredefinedBinding(
bindingsBuilder, principal, predefinedRole, topicPrefix))));
aclBindingsResults.add(
new PredefinedAclBindingsBuilder(
bindingsBuilder, principal, predefinedRole, topicPrefix)
.getAclBindings())));
}

private Optional<Action> syncApplicationAcls(DynamicUser app, String topicPrefix) {
Action action = null;
private Optional<AclBindingsResult> syncApplicationAcls(DynamicUser app, String topicPrefix) {
AclBindingsResult aclBindingsResult = null;
if (app instanceof KStream) {
action = new BuildBindingsForKStreams(bindingsBuilder, (KStream) app, topicPrefix);
aclBindingsResult =
new KStreamsAclBindingsBuilder(bindingsBuilder, (KStream) app, topicPrefix)
.getAclBindings();
} else if (app instanceof Connector) {
action = new BuildBindingsForKConnect(bindingsBuilder, (Connector) app, topicPrefix);
aclBindingsResult =
new KConnectAclBindingsBuilder(bindingsBuilder, (Connector) app, topicPrefix)
.getAclBindings();
} else if (app instanceof KSqlApp) {
action = new BuildBindingsForKSqlApp(bindingsBuilder, (KSqlApp) app, topicPrefix);
aclBindingsResult =
new KSqlAppAclBindingsBuilder(bindingsBuilder, (KSqlApp) app, topicPrefix)
.getAclBindings();
}
return Optional.ofNullable(action);
return Optional.ofNullable(aclBindingsResult);
}

@Override
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/purbon/kafka/topology/ExecutionPlan.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.purbon.kafka.topology;

import com.purbon.kafka.topology.actions.Action;
import com.purbon.kafka.topology.actions.BaseAccountsAction;
import com.purbon.kafka.topology.actions.CreateArtefactAction;
import com.purbon.kafka.topology.actions.DeleteArtefactAction;
import com.purbon.kafka.topology.actions.*;
import com.purbon.kafka.topology.actions.access.ClearBindings;
import com.purbon.kafka.topology.actions.accounts.ClearAccounts;
import com.purbon.kafka.topology.actions.accounts.CreateAccounts;
Expand Down Expand Up @@ -120,13 +117,16 @@ private void execute(Action action, boolean dryRun) throws IOException {
new StreamUtils<>(topics.stream())
.filterAsSet(topic -> !topicsToBeDeleted.contains(topic));
}
if (!action.getBindings().isEmpty()) {
if (action instanceof BaseAccessControlAction
&& !((BaseAccessControlAction) action).getAclBindings().isEmpty()) {
if (action instanceof ClearBindings) {
bindings =
new StreamUtils<>(bindings.stream())
.filterAsSet(binding -> !action.getBindings().contains(binding));
.filterAsSet(
binding ->
!((BaseAccessControlAction) action).getAclBindings().contains(binding));
} else {
bindings.addAll(action.getBindings());
bindings.addAll(((BaseAccessControlAction) action).getAclBindings());
}
}
if (action instanceof BaseAccountsAction) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

7 changes: 0 additions & 7 deletions src/main/java/com/purbon/kafka/topology/actions/Action.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
package com.purbon.kafka.topology.actions;

import com.purbon.kafka.topology.roles.TopologyAclBinding;
import java.io.IOException;
import java.util.Collections;
import java.util.List;

public interface Action {

void run() throws IOException;

default List<TopologyAclBinding> getBindings() {
return Collections.emptyList();
}
}
Loading

0 comments on commit 3f96dfa

Please sign in to comment.