Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved execution log for topics and schemas #383

Merged
merged 34 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9131d08
Refactor SyncTopicAction
akselh Jun 17, 2021
150036b
Only change topic configs that are actually changed
akselh Jun 17, 2021
f170302
run -> buildAndExecutePlan
sverrehu Jun 15, 2021
e0366d7
applyCreate -> updatePlanWithPrincipalsCreation
sverrehu Jun 15, 2021
bbdbc63
apply -> updatePlan
sverrehu Jun 15, 2021
49fcc0d
applyDelete -> updatePlanWithPrincipalsDeletion
sverrehu Jun 15, 2021
d72accc
Auto-format
sverrehu Jun 15, 2021
f428d3b
Spelling
sverrehu Jun 15, 2021
5325dba
bindings -> aclBindings
sverrehu Jun 15, 2021
c797b9b
Make methods private. Add @Override annotations. Implement ManagerOfT…
sverrehu Jun 15, 2021
5fb4ba4
ManagerOfThings -> ExecutionPlanUpdater
sverrehu Jun 15, 2021
1df12bc
Make PrincipalManager an ExecutionPlanUpdater and simplify the plan u…
sverrehu Jun 15, 2021
d5f1c2c
Revert "bindings -> aclBindings"
sverrehu Jun 17, 2021
b4a2fc7
Explicit method calls per manager
sverrehu Jun 18, 2021
621eb7c
buildAndExecutePlan -> run, as per review request.
sverrehu Jun 18, 2021
8bf5623
Split PrincipalManager in two, as per review request.
sverrehu Jun 18, 2021
9cbf64c
Make method private
sverrehu Jun 18, 2021
07a4078
Remove unnecessary throws clause
sverrehu Jun 18, 2021
3f237ce
Merge pull request #3 from sverrehu/rename-methods
sverrehu Jun 18, 2021
67caf4d
dryRun: Only log actual topic config changes
akselh Jun 22, 2021
a6d684f
Renamed and fixed test for partition count update.
akselh Jun 23, 2021
543d66d
Remove comments
akselh Jun 23, 2021
239f1e8
Test TopicConfigUpdatePlanBuilder and fix a bug (#12)
sverrehu Sep 7, 2021
bd9bb68
Rewrite deprecated code (#11)
sverrehu Sep 7, 2021
3f96dfa
Refactor bindings building (#6)
sverrehu Sep 7, 2021
d1e135a
Merge upstream master and fix issues
akselh Nov 9, 2021
3f54286
Add both localhost and host.docker.internal to the cert, to make it u…
akselh Nov 9, 2021
c123b8a
Improve execution log output for topics. Includes refactor/cleanup th…
akselh Nov 9, 2021
59ec5a7
Code style reformatting
akselh Nov 9, 2021
2eef351
Improve RegisterSchemaAction output
akselh Nov 9, 2021
011d70a
Print output for both dryrun and apply phases
akselh Nov 9, 2021
0bfff69
Make log output format consistent for create and delete topic actions
akselh Nov 9, 2021
7b2e52e
Merge with upstream julieops/master
akselh Dec 9, 2021
e930f05
Merge with julieops/master
akselh Dec 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
package com.purbon.kafka.topology;

import com.purbon.kafka.topology.actions.accounts.ClearAccounts;
import com.purbon.kafka.topology.actions.accounts.CreateAccounts;
import com.purbon.kafka.topology.model.*;
import com.purbon.kafka.topology.model.Platform;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.model.User;
import com.purbon.kafka.topology.model.cluster.ServiceAccount;
import com.purbon.kafka.topology.serviceAccounts.VoidPrincipalProvider;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class PrincipalManager {
abstract class AbstractPrincipalManager implements ExecutionPlanUpdater {

private static final Logger LOGGER = LogManager.getLogger(PrincipalManager.class);
private static final Logger LOGGER = LogManager.getLogger(AbstractPrincipalManager.class);
private final List<String> managedPrefixes;
protected PrincipalProvider provider;
protected Configuration config;

private PrincipalProvider provider;

private Configuration config;

public PrincipalManager(PrincipalProvider provider, Configuration config) {
public AbstractPrincipalManager(PrincipalProvider provider, Configuration config) {
this.provider = provider;
this.config = config;
this.managedPrefixes = config.getServiceAccountManagedPrefixes();
}

public void applyCreate(Topology topology, ExecutionPlan plan) throws IOException {
@Override
public final void updatePlan(ExecutionPlan plan, Map<String, Topology> topologies)
throws IOException {
if (!config.enabledExperimental()) {
LOGGER.debug("Not running the PrincipalsManager as this is an experimental feature.");
return;
Expand All @@ -38,52 +42,21 @@ public void applyCreate(Topology topology, ExecutionPlan plan) throws IOExceptio
// This means the management of principals is either not possible or has not been configured
return;
}

provider.configure();

List<String> principals = parseListOfPrincipals(topology);
Map<String, ServiceAccount> accounts = loadActualClusterStateIfAvailable(plan);

// build set of principals to be created.
Set<ServiceAccount> principalsToBeCreated =
principals.stream()
.filter(wishPrincipal -> !accounts.containsKey(wishPrincipal))
.map(principal -> new ServiceAccount(-1, principal, "Managed by KTB"))
.collect(Collectors.toSet());

if (!principalsToBeCreated.isEmpty()) {
plan.add(new CreateAccounts(provider, principalsToBeCreated));
}
}

public void applyDelete(Topology topology, ExecutionPlan plan) throws IOException {
if (!config.enabledExperimental()) {
LOGGER.debug("Not running the PrincipalsManager as this is an experimental feature.");
return;
}
if (provider instanceof VoidPrincipalProvider) {
// Do Nothing if the provider is the void one.
// This means the management of principals is either not possible or has not been configured
return;
}

if (config.isAllowDeletePrincipals()) {
provider.configure();

for (Topology topology : topologies.values()) {
List<String> principals = parseListOfPrincipals(topology);
Map<String, ServiceAccount> accounts = loadActualClusterStateIfAvailable(plan);

// build list of principals to be deleted.
List<ServiceAccount> principalsToBeDeleted =
accounts.values().stream()
.filter(currentPrincipal -> !principals.contains(currentPrincipal.getName()))
.collect(Collectors.toList());
if (!principalsToBeDeleted.isEmpty()) {
plan.add(new ClearAccounts(provider, principalsToBeDeleted));
}
doUpdatePlan(plan, topology, principals, accounts);
}
}

protected abstract void doUpdatePlan(
ExecutionPlan plan,
Topology topology,
final List<String> principals,
final Map<String, ServiceAccount> accounts);

private Map<String, ServiceAccount> loadActualClusterStateIfAvailable(ExecutionPlan plan)
throws IOException {
Set<ServiceAccount> accounts =
Expand Down Expand Up @@ -132,7 +105,8 @@ private List<String> parseListOfPrincipals(Topology topology) {
.collect(Collectors.toList());
}

public void printCurrentState(PrintStream out) throws IOException {
@Override
public final void printCurrentState(PrintStream out) throws IOException {
out.println("List of Principles: ");
provider.listServiceAccounts().forEach(out::println);
}
Expand Down
Loading