Skip to content

Commit

Permalink
Merge pull request #3 from sverrehu/rename-methods
Browse files Browse the repository at this point in the history
Rename methods and ManagerOfThings
  • Loading branch information
sverrehu authored Jun 18, 2021
2 parents 150036b + 07a4078 commit 3f237ce
Show file tree
Hide file tree
Showing 21 changed files with 261 additions and 216 deletions.
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
package com.purbon.kafka.topology;

import com.purbon.kafka.topology.actions.accounts.ClearAccounts;
import com.purbon.kafka.topology.actions.accounts.CreateAccounts;
import com.purbon.kafka.topology.model.*;
import com.purbon.kafka.topology.model.Platform;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.model.User;
import com.purbon.kafka.topology.model.cluster.ServiceAccount;
import com.purbon.kafka.topology.serviceAccounts.VoidPrincipalProvider;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class PrincipalManager {
abstract class AbstractPrincipalManager implements ExecutionPlanUpdater {

private static final Logger LOGGER = LogManager.getLogger(PrincipalManager.class);
private static final Logger LOGGER = LogManager.getLogger(AbstractPrincipalManager.class);
private final List<String> managedPrefixes;
protected PrincipalProvider provider;
protected Configuration config;

private PrincipalProvider provider;

private Configuration config;

public PrincipalManager(PrincipalProvider provider, Configuration config) {
public AbstractPrincipalManager(PrincipalProvider provider, Configuration config) {
this.provider = provider;
this.config = config;
this.managedPrefixes = config.getServiceAccountManagedPrefixes();
}

public void applyCreate(Topology topology, ExecutionPlan plan) throws IOException {
@Override
public final void updatePlan(ExecutionPlan plan, Topology topology) throws IOException {
if (!config.enabledExperimental()) {
LOGGER.debug("Not running the PrincipalsManager as this is an experimental feature.");
return;
Expand All @@ -38,51 +41,18 @@ public void applyCreate(Topology topology, ExecutionPlan plan) throws IOExceptio
// This means the management of principals is either not possible or has not been configured
return;
}

provider.configure();

List<String> principals = parseListOfPrincipals(topology);
Map<String, ServiceAccount> accounts = loadActualClusterStateIfAvailable(plan);

// build set of principals to be created.
Set<ServiceAccount> principalsToBeCreated =
principals.stream()
.filter(wishPrincipal -> !accounts.containsKey(wishPrincipal))
.map(principal -> new ServiceAccount(-1, principal, "Managed by KTB"))
.collect(Collectors.toSet());

if (!principalsToBeCreated.isEmpty()) {
plan.add(new CreateAccounts(provider, principalsToBeCreated));
}
doUpdatePlan(plan, topology, principals, accounts);
}

public void applyDelete(Topology topology, ExecutionPlan plan) throws IOException {
if (!config.enabledExperimental()) {
LOGGER.debug("Not running the PrincipalsManager as this is an experimental feature.");
return;
}
if (provider instanceof VoidPrincipalProvider) {
// Do Nothing if the provider is the void one.
// This means the management of principals is either not possible or has not been configured
return;
}

if (config.isAllowDeletePrincipals()) {
provider.configure();

List<String> principals = parseListOfPrincipals(topology);
Map<String, ServiceAccount> accounts = loadActualClusterStateIfAvailable(plan);

// build list of principals to be deleted.
List<ServiceAccount> principalsToBeDeleted =
accounts.values().stream()
.filter(currentPrincipal -> !principals.contains(currentPrincipal.getName()))
.collect(Collectors.toList());
if (!principalsToBeDeleted.isEmpty()) {
plan.add(new ClearAccounts(provider, principalsToBeDeleted));
}
}
}
protected abstract void doUpdatePlan(
ExecutionPlan plan,
Topology topology,
final List<String> principals,
final Map<String, ServiceAccount> accounts)
throws IOException;

private Map<String, ServiceAccount> loadActualClusterStateIfAvailable(ExecutionPlan plan)
throws IOException {
Expand Down Expand Up @@ -132,7 +102,8 @@ private List<String> parseListOfPrincipals(Topology topology) {
.collect(Collectors.toList());
}

public void printCurrentState(PrintStream out) throws IOException {
@Override
public final void printCurrentState(PrintStream out) throws IOException {
out.println("List of Principles: ");
provider.listServiceAccounts().forEach(out::println);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class AccessControlManager {
public class AccessControlManager implements ExecutionPlanUpdater {

private static final Logger LOGGER = LogManager.getLogger(AccessControlManager.class);

Expand Down Expand Up @@ -64,10 +64,11 @@ public AccessControlManager(
* Main apply method, append to the execution plan the necessary bindings to update the access
* control
*
* @param topology A topology file descriptor
* @param plan An Execution plan
* @param topology A topology file descriptor
*/
public void apply(final Topology topology, ExecutionPlan plan) throws IOException {
@Override
public void updatePlan(ExecutionPlan plan, final Topology topology) throws IOException {
List<Action> actions = buildProjectActions(topology);
actions.addAll(buildPlatformLevelActions(topology));
buildUpdateBindingsActions(actions, loadActualClusterStateIfAvailable(plan)).forEach(plan::add);
Expand Down Expand Up @@ -99,7 +100,7 @@ private Set<TopologyAclBinding> providerBindings() {
* @param topology A topology file
* @return List<Action> A list of actions required based on the parameters
*/
public List<Action> buildProjectActions(Topology topology) {
private List<Action> buildProjectActions(Topology topology) {
List<Action> actions = new ArrayList<>();

for (Project project : topology.getProjects()) {
Expand Down Expand Up @@ -192,7 +193,7 @@ private List<Action> buildBasicUsersAcls(Project project, boolean includeProject
* @param bindings List of current bindings available in the cluster
* @return List<Action> list of actions necessary to update the cluster
*/
public List<Action> buildUpdateBindingsActions(
private List<Action> buildUpdateBindingsActions(
List<Action> actions, Set<TopologyAclBinding> bindings) throws IOException {

List<Action> updateActions = new ArrayList<>();
Expand Down Expand Up @@ -294,7 +295,7 @@ private boolean matchesPrefix(List<String> prefixes, String item, String type) {
}

// Sync platform relevant Access Control List.
public List<Action> buildPlatformLevelActions(final Topology topology) {
private List<Action> buildPlatformLevelActions(final Topology topology) {
List<Action> actions = new ArrayList<>();
Platform platform = topology.getPlatform();

Expand Down Expand Up @@ -353,6 +354,7 @@ private Optional<Action> syncApplicationAcls(DynamicUser app, String topicPrefix
return Optional.ofNullable(action);
}

@Override
public void printCurrentState(PrintStream out) {
out.println("List of ACLs: ");
controlProvider
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/purbon/kafka/topology/ArtefactManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.apache.logging.log4j.Logger;

/** Manages Artefacts as defined within the context of the filter class */
public abstract class ArtefactManager implements ManagerOfThings {
public abstract class ArtefactManager implements ExecutionPlanUpdater {

private static final Logger LOGGER = LogManager.getLogger(ArtefactManager.class);

Expand All @@ -35,7 +35,7 @@ public ArtefactManager(
}

@Override
public void apply(Topology topology, ExecutionPlan plan) throws IOException {
public void updatePlan(ExecutionPlan plan, Topology topology) throws IOException {

Collection<? extends Artefact> currentArtefacts = loadActualClusterStateIfAvailable(plan);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import java.io.IOException;
import java.io.PrintStream;

public interface ManagerOfThings {
public interface ExecutionPlanUpdater {

void apply(Topology topology, ExecutionPlan plan) throws IOException;
void updatePlan(ExecutionPlan plan, Topology topology) throws IOException;

void printCurrentState(PrintStream out) throws IOException;
}
48 changes: 24 additions & 24 deletions src/main/java/com/purbon/kafka/topology/JulieOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class JulieOps implements AutoCloseable {
private static final Logger LOGGER = LogManager.getLogger(JulieOps.class);

private TopicManager topicManager;
private final PrincipalManager principalManager;
private final PrincipalUpdateManager principalUpdateManager;
private final PrincipalDeleteManager principalDeleteManager;
private AccessControlManager accessControlManager;
private KafkaConnectArtefactManager connectorManager;
private KSqlArtefactManager kSqlArtefactManager;
Expand All @@ -48,14 +49,16 @@ private JulieOps(
Configuration config,
TopicManager topicManager,
AccessControlManager accessControlManager,
PrincipalManager principalManager,
PrincipalUpdateManager principalUpdateManager,
PrincipalDeleteManager principalDeleteManager,
KafkaConnectArtefactManager connectorManager,
KSqlArtefactManager kSqlArtefactManager) {
this.topology = topology;
this.config = config;
this.topicManager = topicManager;
this.accessControlManager = accessControlManager;
this.principalManager = principalManager;
this.principalUpdateManager = principalUpdateManager;
this.principalDeleteManager = principalDeleteManager;
this.connectorManager = connectorManager;
this.kSqlArtefactManager = kSqlArtefactManager;
this.outputStream = System.out;
Expand Down Expand Up @@ -154,7 +157,10 @@ public static JulieOps build(

TopicManager topicManager = new TopicManager(adminClient, schemaRegistryManager, config);

PrincipalManager principalManager = new PrincipalManager(principalProvider, config);
PrincipalUpdateManager principalUpdateManager =
new PrincipalUpdateManager(principalProvider, config);
PrincipalDeleteManager principalDeleteManager =
new PrincipalDeleteManager(principalProvider, config);

KafkaConnectArtefactManager connectorManager =
configureKConnectArtefactManager(config, topologyFileOrDir);
Expand All @@ -167,7 +173,8 @@ public static JulieOps build(
config,
topicManager,
accessControlManager,
principalManager,
principalUpdateManager,
principalDeleteManager,
connectorManager,
kSqlArtefactManager);
}
Expand Down Expand Up @@ -219,32 +226,27 @@ static void verifyRequiredParameters(String topologyFile, Map<String, String> co
}
}

void run(ExecutionPlan plan) throws IOException {
void run(BackendController backendController, PrintStream printStream) throws IOException {
ExecutionPlan plan = ExecutionPlan.init(backendController, printStream);
LOGGER.debug(
String.format(
"Running topology builder with TopicManager=[%s], accessControlManager=[%s], dryRun=[%s], isQuite=[%s]",
"Running topology builder with topicManager=[%s], accessControlManager=[%s], dryRun=[%s], isQuiet=[%s]",
topicManager, accessControlManager, config.isDryRun(), config.isQuiet()));

// Create users should always be first, so user exists when making acl link
principalManager.applyCreate(topology, plan);

topicManager.apply(topology, plan);
accessControlManager.apply(topology, plan);

connectorManager.apply(topology, plan);
kSqlArtefactManager.apply(topology, plan);

// Delete users should always be last,
// avoids any unlinked acls, e.g. if acl delete or something errors then there is a link still
// from the account, and can be re-run or manually fixed more easily
principalManager.applyDelete(topology, plan);
principalUpdateManager.updatePlan(plan, topology);
topicManager.updatePlan(plan, topology);
accessControlManager.updatePlan(plan, topology);
connectorManager.updatePlan(plan, topology);
kSqlArtefactManager.updatePlan(plan, topology);
principalDeleteManager.updatePlan(plan, topology); // Must be last

plan.run(config.isDryRun());

if (!config.isQuiet() && !config.isDryRun()) {
topicManager.printCurrentState(System.out);
accessControlManager.printCurrentState(System.out);
principalManager.printCurrentState(System.out);
principalUpdateManager.printCurrentState(System.out);
principalDeleteManager.printCurrentState(System.out);
connectorManager.printCurrentState(System.out);
kSqlArtefactManager.printCurrentState(System.out);
}
Expand All @@ -254,9 +256,7 @@ public void run() throws IOException {
if (config.doValidate()) {
return;
}
BackendController cs = buildBackendController(config);
ExecutionPlan plan = ExecutionPlan.init(cs, outputStream);
run(plan);
run(buildBackendController(config), outputStream);
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.purbon.kafka.topology;

import com.purbon.kafka.topology.actions.accounts.ClearAccounts;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.model.cluster.ServiceAccount;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class PrincipalDeleteManager extends AbstractPrincipalManager {

public PrincipalDeleteManager(PrincipalProvider provider, Configuration config) {
super(provider, config);
}

@Override
protected void doUpdatePlan(
ExecutionPlan plan,
Topology topology,
List<String> principals,
Map<String, ServiceAccount> accounts) {
if (config.isAllowDeletePrincipals()) {
// build list of principals to be deleted.
List<ServiceAccount> principalsToBeDeleted =
accounts.values().stream()
.filter(currentPrincipal -> !principals.contains(currentPrincipal.getName()))
.collect(Collectors.toList());
if (!principalsToBeDeleted.isEmpty()) {
plan.add(new ClearAccounts(provider, principalsToBeDeleted));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.purbon.kafka.topology;

import com.purbon.kafka.topology.actions.accounts.CreateAccounts;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.model.cluster.ServiceAccount;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class PrincipalUpdateManager extends AbstractPrincipalManager {

public PrincipalUpdateManager(PrincipalProvider provider, Configuration config) {
super(provider, config);
}

@Override
protected void doUpdatePlan(
ExecutionPlan plan,
Topology topology,
List<String> principals,
Map<String, ServiceAccount> accounts) {
// build set of principals to be created.
Set<ServiceAccount> principalsToBeCreated =
principals.stream()
.filter(wishPrincipal -> !accounts.containsKey(wishPrincipal))
.map(principal -> new ServiceAccount(-1, principal, "Managed by KTB"))
.collect(Collectors.toSet());

if (!principalsToBeCreated.isEmpty()) {
plan.add(new CreateAccounts(provider, principalsToBeCreated));
}
}
}
Loading

0 comments on commit 3f237ce

Please sign in to comment.