diff --git a/.github/dependabot.yml b/.github/dependabot.yml
new file mode 100644
index 000000000..d913b40dc
--- /dev/null
+++ b/.github/dependabot.yml
@@ -0,0 +1,11 @@
+# To get started with Dependabot version updates, you'll need to specify which
+# package ecosystems to update and where the package manifests are located.
+# Please see the documentation for all configuration options:
+# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
+
+version: 2
+updates:
+ - package-ecosystem: "Maven" # See documentation for possible values
+ directory: "/" # Location of package manifests
+ schedule:
+ interval: "weekly"
diff --git a/.github/workflows/ci-integration-test-main.yml b/.github/workflows/ci-integration-test-main.yml
index 6ac106be2..060c4e8bc 100644
--- a/.github/workflows/ci-integration-test-main.yml
+++ b/.github/workflows/ci-integration-test-main.yml
@@ -15,7 +15,7 @@ jobs:
matrix:
os: [ubuntu-latest]
java: [11.0.x]
- cpversion: [7.0.0]
+ cpversion: [7.5.0]
runs-on: ${{ matrix.os }}
diff --git a/.gitignore b/.gitignore
index 040a954b3..9b1bf6d71 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@ logs/*
.cluster-state
*.iml
_build
+*~
dependency-reduced-pom.xml
server-api/logs/
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index f37c6334b..b2b90ab32 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -4,7 +4,7 @@
Have a problem you want the Kafka health API to solve for you?
-* File a ticket on [GitHub](https://github.com/purbon/kafka-topology-builder/issues).
+* File a ticket on [GitHub](https://github.com/purbon/kafka-topology-builder/issues).
## Something Not Working? Found a Bug? or a Security Issue?
@@ -15,7 +15,7 @@ If you think you found a bug, it probably is a bug.
# Contributing Documentation and Code Changes
If you have a bugfix or new feature that you would like to contribute, and you think it will take
-more than a few minutes to produce the fix (ie; write code), it is worth discussing the change.
+more than a few minutes to produce the fix (ie; write code), it is worth discussing the change.
You can reach us via [GitHub](https://github.com/purbon/kafka-topology-builder/issues).
Please note that Pull Requests without tests and documentation may not be merged. If you would like to contribute but do not have
@@ -91,11 +91,11 @@ Example:
request](https://help.github.com/articles/using-pull-requests). In the pull
request, describe what your changes do and mention any bugs/issues related
to the pull request.
-
+
# Pull Request Guidelines
The following exists as a way to set expectations for yourself and for the review process. We *want* to merge fixes and features, so let's describe how we can achieve this:
-
+
## Goals
* To constantly make forward progress on PRs
diff --git a/README.md b/README.md
index c616b6eb8..a11d71c18 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@
I'm gratefuly of how many people the JulieOps project has helped during it existance, it is totally mind blowing to get more than 300 starts for a humble human like me, thanks everyone!!.
-Sadly this days, between my workload and personal arrangements, the project has been lacking proper mantainance and care, what honestly makes me very sad as I would love to see it grow and provide more and more people with such features, I'm a big beliver of self service and automation.
+Sadly this days, between my workload and personal arrangements, the project has been lacking proper mantainance and care, what honestly makes me very sad as I would love to see it grow and provide more and more people with such features, I'm a big beliver of self service and automation.
So, until new notice, or something change, you should take the project with care, as currently it is mostly on a long winter hibernation :-) I'm sorry for this, but I can't do more as a mostly sole mantainer.
@@ -13,7 +13,7 @@ Thanks again to everyone who was, is or will be involved with the project life.
-
+
-- Pere
### README
@@ -23,21 +23,21 @@ Thanks again to everyone who was, is or will be involved with the project life.
![CI tests](https://github.com/kafka-ops/kafka-topology-builder/workflows/CI%20tests/badge.svg?branch=master) [![Gitter](https://badges.gitter.im/kafka-topology-builder/community.svg)](https://gitter.im/kafka-topology-builder/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge) [![Documentation Status](https://readthedocs.org/projects/julieops/badge/?version=latest)](https://julieops.readthedocs.io/?badge=latest)
JulieOps helps you automate the management of your things within Apache Kafka, from Topics,
-Configuration to Metadata but as well Access Control, Schemas.
-More items are plan, check [here](https://github.com/kafka-ops/julie/issues) for details.
+Configuration to Metadata but as well Access Control, Schemas.
+More items are plan, check [here](https://github.com/kafka-ops/julie/issues) for details.
-## The motivation
+## The motivation
-One of the typical questions while building an Apache Kafka infrastructure is how to handle topics,
+One of the typical questions while building an Apache Kafka infrastructure is how to handle topics,
configurations and the required permissions to use them (Access Control List).
-The JulieOps cli, in close collaboration with git and Jenkins (CI/CD) is here to help you setup an
+The JulieOps cli, in close collaboration with git and Jenkins (CI/CD) is here to help you setup an
organised and automated way of managing your Kafka Cluster.
-
+
## Where's the docs?
We recommend taking time to [read the docs](https://julieops.readthedocs.io/en/latest/).
-There's quite a bit of detailed information about GitOps, Apache Kafka and how this project can help you automate
+There's quite a bit of detailed information about GitOps, Apache Kafka and how this project can help you automate
the common operational tasks.
## Automating Management with CI/CD and GitOps
@@ -63,9 +63,9 @@ You might be wondering what is the usual workflow to implement this approach:
Considerations:
-* Using webhooks, the git server (github, gitlab or bitbucket) will inform the CI/CD system changes had happened
+* Using webhooks, the git server (github, gitlab or bitbucket) will inform the CI/CD system changes had happened
and the need to apply them to the cluster.
-* All changes (git push) to master branch are disabled directly.
+* All changes (git push) to master branch are disabled directly.
Changes only can happen with a pull request. Providing a Change Management mechanism to fit into your org procedures.
## Help??
@@ -85,7 +85,7 @@ What can you achieve with this tool:
* Automatically set access control rules for:
* Kafka Consumers
* Kafka Producers
- * Kafka Connect
+ * Kafka Connect
* Kafka Streams applications ( microservices )
* KSQL applications
* Schema Registry instances
@@ -102,13 +102,13 @@ What can you achieve with this tool:
* Manage your cluster schemas.
- Support for Confluent Schema Registry
-Out of the box support for Confluent Cloud and other clouds that enable you to use the AdminClient API.
+Out of the box support for Confluent Cloud and other clouds that enable you to use the AdminClient API.
### How can I run JulieOps directly?
This tool is available in multiple formats:
-- As a Docker image, available from [docker hub](https://hub.docker.com/r/purbon/kafka-topology-builder)
+- As a Docker image, available from [docker hub](https://hub.docker.com/r/purbon/kafka-topology-builder)
- As an RPM package, for the RedHat alike distributions
- As a DEB package, for Debian based distros
- Directly as a fat jar (zip/tar.gz)
@@ -120,7 +120,7 @@ The latest version are available from the [releases](https://github.com/kafka-op
This is how you can run the tool directly as a docker image:
-```bash
+```bash
docker run purbon/kafka-topology-builder:latest julie-ops-cli.sh --help
Parsing failed cause of Missing required options: topology, brokers, clientConfig
usage: cli
@@ -142,7 +142,7 @@ usage: cli
If you install the tool as rpm, you will have available in your $PATH the _julie-ops-cli.sh_.
You can run this script with the same options observed earlier, however you will need to be using, or be in the group,
-for the user julie-kafka.
+for the user julie-kafka.
#### An example topology
@@ -193,10 +193,10 @@ projects:
num.partitions: "3"
```
-more examples can be found at the [example/](example/) directory.
+more examples can be found at the [example/](example/) directory.
Also, please check, the documentation in [the docs](https://julieops.readthedocs.io/) for extra information and
-examples on managing ACLs, RBAC, Principales, Schemas and many others.
+examples on managing ACLs, RBAC, Principales, Schemas and many others.
## Troubleshooting guides
@@ -213,7 +213,7 @@ Check our [contributing](CONTRIBUTING.md) doc for guidance.
## Building JulieOps from scratch (source code)
The project is build using Java and Maven, so both are required if you aim to build the tool from scratch.
-The minimum version of Java supported is Java 8, note it soon will be deprecated here, it is only keep as supported
+The minimum version of Java supported is Java 8, note it soon will be deprecated here, it is only keep as supported
for very legacy environments.
It is recommended to run JulieOps with Java 11 and an open JDK version.
diff --git a/changelog.md b/changelog.md
index 99338180d..e5f73e6fa 100644
--- a/changelog.md
+++ b/changelog.md
@@ -1,18 +1,150 @@
+v4.4.1 / 2022-10-10
+===================
+
+ * Adds support for KSqlDB session variables (#544)
+ * Add sync actions for kafka generic artefacts, Connect and kSQL for now (#542)
+
+v4.3.0 / 2022-08-22
+===================
+* Implements optimized ACLs for subjects #528
+* Add Subject prefix filtering support (#525)
+* Add a config test to validate when people use empty managed prefixes as mistake (#522)
+* Ammend introduce dedicated resource filters for access control rules and extend it logic to be more precise (#521)
+* [neat] Extensions and small collateral updates for testing variables with config as JSON (#520)
+* Verify schemas deletion process successfully, complement the related test (#516)
+* Extend the project name format to include by default the separator, this is useful to narrow down optimised acls to be more secure (#515)
+* Add autocomplete for bash (#500)
+
+v4.2.9 / 2022-08-22
+===================
+
+* Fix pagination issue when listing service accounts for confluent cloud (#531)
+* Don't print empty operations (#517)
+
+v4.2.8 / 2022-07-28
+===================
+
+* [fix] remove condition of failing if min.insync.validation == 1 and other bigger (#509)
+* [neat] Add extra validations for topics (#508)
+
+v4.2.6 / 2022-07-27
+===================
+
+* [bug] Force special topics to be using the name topic naming convention in order
+ to avoid cases when the prefix formats break the flow (#507)
+* [testing] Extend example for rbac tls for testing (#498)
+
+v4.2.5 / 2022-05-02
+===================
+
+* Fix the acl creation in the hybrid ccloud provider (#494)
+
+v4.2.4 / 2022-04-28
+===================
+
+* [neat] make hybrid ccloud provider list acls use admin client
+
+v4.2.3 / 2022-04-27
+===================
+
+* [bug] fix a stupid missed thing when loading the hybrid provider, this code really needs to be done cleaner
+
+v4.2.2 / 2022-04-27
+===================
+
+* [Feature] Introduce the concept of an hybrid ccloud provider, to set acls via admin client and translation via api (#492)
+
+v4.2.1 / 2022-04-26
+===================
+
+* [Feature] Add feature flag to make the remote state verification backwards compatible again (#491)
+* [Feature] Allow setting log level as debug in the code (#490)
+* [Feature] allow insecure https connection when using mds, default to false
+* [Neat] add a method to simple sanitize a string that could contain empty values
+* [Feature] add a pre-flight check for valid clusterIds in your platform
+
+v4.2.0 / 2022-04-13
+===================
+
+* [Big] Fix Confluent Cloud Translation mechanism when the Service Account does not have a type prefix (default user) (#485)
+* [Feature] Introduce the concept of an AuditLog for JulieOps (#484)
+* [Feature] Add support for out of the box topics, an special topics list managed by JulieOps (#482)
+* [Feature] Add Kafka Streams applicationId as internal topics, if available (#481)
+
+v4.1.3 / 2022-04-08
+===================
+
+* Detect divergences between local state and the remote cluster current status (#478)
+* Add validators backwards compatibility (#480)
+
+v4.1.2 / 2022-04-06
+===================
+
+* [Security] Fix CWE-787 CVE-2020-36518 for jackson-databind (#476)
+* Add support for deploying packaged released to Maven Central (#473) (#475) (#477)
+* Add a JSON schema description of the topology/descriptor file sintax (#471) (#472)
+* [Test] Refactor and add tests related to Confluent Cloud service account translation feature (#468)
+* Allowing to configure the redis bucket used by JulieOps (#465)
+* [Test] Clarify S3 Backend IT test (#464)
+* [Bug] Fix RedisBackend bootstrap, NullPointerException (#462)
+* [Bug] Issue fix (456) for resolution of service account mapping (Translation of principals) (#459)
+
+v4.1.1 / 2022-02-05
+===================
+
+* Fix Confluent Cloud ACL(s) API usage, so ACL(s) are finally created properly (#444)
+* Fix config passing for topology validator for regular expressions (#443)
+* Bump log4j-api from 2.17.0 to 2.17.1 (#436)
+
+v4.1.0 / 2021-12-30
+===================
+
+* [ksqlDB] when using ACLs configure all internal topics with ALL permissions for the ksql server user (#433)
+* Bring Principal Management for Confluent Cloud out of Experimental into Production ready feature (#435)
+* Use Confluent Cloud API when integrating with the Confluent fully managed service (#431)
+* Throw an exception when an invalid plan is used (#426)
+* Add docker to the SAN to make it run in our in-house Gitlab (#421)
+* Improved execution log for topics and schemas (#383)
+
+v4.0.1 / 2021-12-20
+==================
+
+* Bump log4j to 2.17.0, prevent latest Log4j CVE, ref https://logging.apache.org/log4j/2.x/security.html (#427)
+
+v4.0.0 / 2021-12-10
+===================
+
+* neat: Adapt CI jobs and other actions to latest versions (#419)
+* port the main changelog to master
+* fix&feature: establish service accounts prefix filter a primary criteria when available to filter the list of access control rules (#418)
+* Bump log4j-api from 2.13.3 to 2.15.0 (#416)
+* fix: issue with connector and subject permissions being ignored when more than one (#415)
+* Add producer Idempotence permissions to Confluent RBAC (#414)
+* Fix: Topic config values should take precedence over plan config values (#410)
+* Quotas implementation, only quotas based in user principal (#376)
+* fix request scope to hold proper pattern type (literal, prefix) in rbac
+* add proper clusterIds for RBAC clear bindings operations, not only kafka one
+* add support in the custom roles for subject and connect attributes (#406)
+* fix incorrect rbac resource name for subjects and connectors (#405)
+* ammend order for delete in ksql to be reverse from creation (#400)
+* add Parameter for initial Kafka Backend consumer retry
+* raise initial load as done if tried at least five times
+
v4.0.0 / 2021-12-10
==================
- * Fix/feature: establish service accounts prefix filter a primary criteria when available to filter the list of access control rules (#418)
- * Security fix: Bump log4j-api from 2.13.3 to 2.15.0 (#416)
- * fix: issue with connector and subject permissions being ignored when more than one (#415)
- * feature: Add producer Idempotence permissions to Confluent RBAC (#414)
- * fix: Topic config values should take precedence over plan config values (#410)
- * feature: Quotas implementation, only quotas based in user principal (#376)
- * feature: add support in the custom roles for subject and connect attributes (#406)
- * fix: incorrect rbac resource name for subjects and connectors (#405)
- * fix: ammend order for delete in ksql to be reverse from creation (#400)
- * fix: add Parameter for initial Kafka Backend consumer retry
- * fix: Raise initial load as done if tried at least five times for the Kafka Backend
+* Fix/feature: establish service accounts prefix filter a primary criteria when available to filter the list of access control rules (#418)
+* Security fix: Bump log4j-api from 2.13.3 to 2.15.0 (#416)
+* fix: issue with connector and subject permissions being ignored when more than one (#415)
+* feature: Add producer Idempotence permissions to Confluent RBAC (#414)
+* fix: Topic config values should take precedence over plan config values (#410)
+* feature: Quotas implementation, only quotas based in user principal (#376)
+* feature: add support in the custom roles for subject and connect attributes (#406)
+* fix: incorrect rbac resource name for subjects and connectors (#405)
+* fix: ammend order for delete in ksql to be reverse from creation (#400)
+* fix: add Parameter for initial Kafka Backend consumer retry
+* fix: Raise initial load as done if tried at least five times for the Kafka Backend
v3.3.3 / 2021-11-23
===================
diff --git a/pom.xml b/pom.xml
index 44274b1c8..9e702a93b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,7 @@
+ org.apache.maven.plugins
maven-failsafe-plugin
${maven-failsafe-plugin.version}
@@ -76,6 +77,7 @@
+ org.apache.maven.plugins
maven-surefire-plugin
${maven-surefire-plugin.version}
@@ -112,6 +114,7 @@
+ org.apache.maven.plugins
maven-surefire-plugin
${maven-surefire-plugin.version}
@@ -148,9 +151,9 @@
${project.artifactId}
- com.coveo
+ com.spotify.fmt
fmt-maven-plugin
- 2.9
+ 2.20
@@ -166,7 +169,7 @@
org.apache.maven.plugins
maven-shade-plugin
- 3.2.4
+ 3.5.0
@@ -186,6 +189,17 @@
shade
+
+
+ *:*
+
+ **/Log4j2Plugins.dat
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
com.purbon.kafka.topology.CommandLineInterface
@@ -210,7 +224,9 @@
+ org.apache.maven.plugins
maven-assembly-plugin
+ 3.6.0
assembly.xml
@@ -358,7 +374,7 @@
org.vafer
jdeb
- 1.8
+ 1.10
package
@@ -425,6 +441,7 @@
org.apache.maven.plugins
maven-source-plugin
+ 3.3.0
attach-sources
@@ -438,7 +455,7 @@
org.apache.maven.plugins
maven-javadoc-plugin
- 3.4.0
+ 3.5.0
attach-javadocs
@@ -458,7 +475,7 @@
org.apache.maven.plugins
maven-gpg-plugin
- 3.0.1
+ 3.1.0
sign-artifacts
@@ -517,37 +534,37 @@
UTF-8
- 3.12.0
- 3.1.12.2
- 3.9.0
- 3.0.0
- 0.8.6
- 2.2.0
- 3.7.0
- 2.22.2
- 2.22.2
+ 0.8.10
+ 3.11.0
+ 3.1.2
+ 3.21.0
+ 3.4.5
+ 3.12.1
+ 3.1.2
+ 2.3.0
+ 4.7.3.5
- 2.13.4
- 2.17.1
- 3.8.0
- 1.4
- 3.6.0
- 4.13.1
- 1.15.3
- 3.2.0
- 6.1.0
- 6.1.0-ce
- 1.9.2
- 2.22.2
+ 3.24.2
+ 1.11.2
+ 2.20.144
+ 1.5.0
+ 7.5.0-ce
+ 7.5.0
+ 26.22.0
2.2
- 3.15.0
- 2.5.4
- 2.16.31
- 19.2.1
- 0.27.1
+ 2.15.2
+ 4.4.4
+ 3.1.3
+ 2.7.1
+ 4.13.2
7.0.0
- 1.4.0
- 1.18.22
+ 0.27.1
+ 2.20.0
+ 1.18.28
+ 5.5.0
+ 1.19.0
+ 1.4.2
+ 3.9.0
@@ -607,6 +624,17 @@
org.apache.zookeeper
zookeeper
${zookeeper.version}
+ test
+
+
+ ch.qos.logback
+ logback-core
+
+
+ ch.qos.logback
+ logback-classic
+
+
org.apache.kafka
@@ -677,10 +705,12 @@
software.amazon.awssdk
s3
+ ${aws.java.sdk.version}
com.google.cloud
google-cloud-storage
+ 2.26.1
org.hamcrest
@@ -721,7 +751,7 @@
org.awaitility
awaitility
- 4.0.3
+ 4.2.0
test
@@ -733,7 +763,7 @@
com.github.tomakehurst
wiremock-jre8
- 2.33.1
+ 2.35.1
test
diff --git a/src/main/java/com/purbon/kafka/topology/AccessControlManager.java b/src/main/java/com/purbon/kafka/topology/AccessControlManager.java
index 9427a5221..c8f2d7bbd 100644
--- a/src/main/java/com/purbon/kafka/topology/AccessControlManager.java
+++ b/src/main/java/com/purbon/kafka/topology/AccessControlManager.java
@@ -97,6 +97,10 @@ private Set loadActualClusterStateIfAvailable(ExecutionPlan
private void detectDivergencesInTheRemoteCluster(ExecutionPlan plan)
throws RemoteValidationException {
+ if (!config.isAllowDeleteTopics()) {
+ /* Assume topics are cleaned up by mechanisms outside JulieOps, and do not fail. */
+ return;
+ }
var remoteAcls = providerBindings();
var delta =
diff --git a/src/main/java/com/purbon/kafka/topology/ArtefactManager.java b/src/main/java/com/purbon/kafka/topology/ArtefactManager.java
index d8ac320a0..eb395b0f5 100644
--- a/src/main/java/com/purbon/kafka/topology/ArtefactManager.java
+++ b/src/main/java/com/purbon/kafka/topology/ArtefactManager.java
@@ -38,10 +38,10 @@ public ArtefactManager(
this.topologyFileOrDir = topologyFileOrDir;
}
- private boolean findKsqlVarsArtefact(Artefact artefact){
+ private boolean findKsqlVarsArtefact(Artefact artefact) {
return Optional.ofNullable(artefact.getClass().getAnnotation(TypeArtefact.class))
- .map(x -> x.name().equals("VARS"))
- .orElse(false);
+ .map(x -> x.name().equals("VARS"))
+ .orElse(false);
}
@Override
@@ -54,11 +54,9 @@ public void updatePlan(ExecutionPlan plan, Map topologies) thr
Set extends Artefact> entryArtefacts = parseNewArtefacts(topology);
final var kSqlVarsArtefact =
- ((Optional)
- entryArtefacts.stream()
- .filter(this::findKsqlVarsArtefact)
- .findFirst())
- .orElseGet(()->new KsqlVarsArtefact(Collections.emptyMap()));
+ ((Optional)
+ entryArtefacts.stream().filter(this::findKsqlVarsArtefact).findFirst())
+ .orElseGet(() -> new KsqlVarsArtefact(Collections.emptyMap()));
entryArtefacts.removeIf(this::findKsqlVarsArtefact);
for (Artefact artefact : entryArtefacts) {
@@ -143,6 +141,10 @@ protected Collection extends Artefact> loadActualClusterStateIfAvailable(Execu
}
private void detectDivergencesInTheRemoteCluster(ExecutionPlan plan) throws IOException {
+ if (!config.isAllowDeleteTopics()) {
+ /* Assume topics are cleaned up by mechanisms outside JulieOps, and do not fail. */
+ return;
+ }
var remoteArtefacts = getClustersState();
var delta =
diff --git a/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java b/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java
index 1992c3c7d..2933dff5f 100644
--- a/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java
+++ b/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java
@@ -33,6 +33,10 @@ public class CommandLineInterface {
public static final String DRY_RUN_OPTION = "dryRun";
public static final String DRY_RUN_DESC = "Print the execution plan without altering anything.";
+ public static final String RECURSIVE_OPTION = "recursive";
+ public static final String RECURSIVE_DESC =
+ "Recursively look for topology files below the given directory.";
+
public static final String QUIET_OPTION = "quiet";
public static final String QUIET_DESC = "Print minimum status update";
@@ -97,6 +101,14 @@ private Options buildOptions() {
.required(false)
.build();
+ final Option recursiveOption =
+ Option.builder()
+ .longOpt(RECURSIVE_OPTION)
+ .hasArg(false)
+ .desc(RECURSIVE_DESC)
+ .required(false)
+ .build();
+
final Option quietOption =
Option.builder()
.longOpt(QUIET_OPTION)
@@ -133,6 +145,7 @@ private Options buildOptions() {
options.addOption(overridingAdminClientConfigFileOption);
options.addOption(dryRunOption);
+ options.addOption(recursiveOption);
options.addOption(quietOption);
options.addOption(validateOption);
options.addOption(versionOption);
@@ -171,6 +184,7 @@ private Map parseConfig(CommandLine cmd) {
config.put(BROKERS_OPTION, cmd.getOptionValue(BROKERS_OPTION));
}
config.put(DRY_RUN_OPTION, String.valueOf(cmd.hasOption(DRY_RUN_OPTION)));
+ config.put(RECURSIVE_OPTION, String.valueOf(cmd.hasOption(RECURSIVE_OPTION)));
config.put(QUIET_OPTION, String.valueOf(cmd.hasOption(QUIET_OPTION)));
config.put(VALIDATE_OPTION, String.valueOf(cmd.hasOption(VALIDATE_OPTION)));
config.put(
diff --git a/src/main/java/com/purbon/kafka/topology/Configuration.java b/src/main/java/com/purbon/kafka/topology/Configuration.java
index 2cba90175..61f116c38 100644
--- a/src/main/java/com/purbon/kafka/topology/Configuration.java
+++ b/src/main/java/com/purbon/kafka/topology/Configuration.java
@@ -364,6 +364,10 @@ public boolean isDryRun() {
return Boolean.parseBoolean(cliParams.getOrDefault(DRY_RUN_OPTION, "false"));
}
+ public boolean isRecursive() {
+ return Boolean.parseBoolean(cliParams.getOrDefault(RECURSIVE_OPTION, "false"));
+ }
+
public FileType getTopologyFileType() {
return config.getEnum(FileType.class, TOPOLOGY_FILE_TYPE);
}
diff --git a/src/main/java/com/purbon/kafka/topology/ExecutionPlan.java b/src/main/java/com/purbon/kafka/topology/ExecutionPlan.java
index 697cc2c08..944c475df 100644
--- a/src/main/java/com/purbon/kafka/topology/ExecutionPlan.java
+++ b/src/main/java/com/purbon/kafka/topology/ExecutionPlan.java
@@ -16,14 +16,13 @@
import com.purbon.kafka.topology.model.cluster.ServiceAccount;
import com.purbon.kafka.topology.roles.TopologyAclBinding;
import com.purbon.kafka.topology.utils.StreamUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class ExecutionPlan {
diff --git a/src/main/java/com/purbon/kafka/topology/TopicManager.java b/src/main/java/com/purbon/kafka/topology/TopicManager.java
index 9b8172efe..900da3487 100644
--- a/src/main/java/com/purbon/kafka/topology/TopicManager.java
+++ b/src/main/java/com/purbon/kafka/topology/TopicManager.java
@@ -151,6 +151,10 @@ private Set loadActualClusterStateIfAvailable(ExecutionPlan plan) throws
}
private void detectDivergencesInTheRemoteCluster(ExecutionPlan plan) throws IOException {
+ if (!config.isAllowDeleteTopics()) {
+ /* Assume topics are cleaned up by mechanisms outside JulieOps, and do not fail. */
+ return;
+ }
Set remoteTopics = adminClient.listApplicationTopics();
List delta =
plan.getTopics().stream()
diff --git a/src/main/java/com/purbon/kafka/topology/TopologyObjectBuilder.java b/src/main/java/com/purbon/kafka/topology/TopologyObjectBuilder.java
index 8a8fb0071..3d9e43ced 100644
--- a/src/main/java/com/purbon/kafka/topology/TopologyObjectBuilder.java
+++ b/src/main/java/com/purbon/kafka/topology/TopologyObjectBuilder.java
@@ -8,6 +8,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
@@ -87,17 +88,34 @@ private static List parseListOfTopologies(
String fileOrDir, Configuration config, PlanMap plans) throws IOException {
TopologySerdes parser = new TopologySerdes(config, plans);
List topologies = new ArrayList<>();
- boolean isDir = Files.isDirectory(Paths.get(fileOrDir));
- if (isDir) {
- Files.list(Paths.get(fileOrDir))
+ final Path path = Paths.get(fileOrDir);
+ if (Files.isDirectory(path)) {
+ loadFromDirectory(path, config.isRecursive(), parser, topologies);
+ } else {
+ topologies.add(parser.deserialise(new File(fileOrDir)));
+ }
+ return topologies;
+ }
+
+ private static void loadFromDirectory(
+ final Path directory,
+ final boolean recursive,
+ final TopologySerdes parser,
+ final List topologies) {
+ try {
+ Files.list(directory)
.sorted()
.filter(p -> !Files.isDirectory(p))
.map(path -> parser.deserialise(path.toFile()))
- .forEach(subTopology -> topologies.add(subTopology));
- } else {
- Topology firstTopology = parser.deserialise(new File(fileOrDir));
- topologies.add(firstTopology);
+ .forEach(topologies::add);
+ if (recursive) {
+ Files.list(directory)
+ .sorted()
+ .filter(Files::isDirectory)
+ .forEach(p -> loadFromDirectory(p, recursive, parser, topologies));
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
}
- return topologies;
}
}
diff --git a/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClient.java b/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClient.java
index c6a464611..2331cf010 100644
--- a/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClient.java
+++ b/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClient.java
@@ -114,7 +114,7 @@ public void updateTopicConfig(TopicConfigUpdatePlan configUpdatePlan) {
public int getPartitionCount(String topic) throws IOException {
try {
Map results =
- adminClient.describeTopics(Collections.singletonList(topic)).all().get();
+ adminClient.describeTopics(Collections.singletonList(topic)).allTopicNames().get();
return results.get(topic).partitions().size();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error(e);
diff --git a/src/main/java/com/purbon/kafka/topology/model/artefact/KsqlArtefacts.java b/src/main/java/com/purbon/kafka/topology/model/artefact/KsqlArtefacts.java
index cbba99d28..995d83647 100644
--- a/src/main/java/com/purbon/kafka/topology/model/artefact/KsqlArtefacts.java
+++ b/src/main/java/com/purbon/kafka/topology/model/artefact/KsqlArtefacts.java
@@ -1,14 +1,13 @@
package com.purbon.kafka.topology.model.artefact;
import com.purbon.kafka.topology.model.Artefacts;
-
import java.util.*;
public class KsqlArtefacts implements Artefacts {
private List streams;
private List tables;
- final private KsqlVarsArtefact vars;
+ private final KsqlVarsArtefact vars;
public KsqlArtefacts() {
this(new ArrayList<>(), new ArrayList<>(), new KsqlVarsArtefact(Collections.emptyMap()));
diff --git a/src/main/java/com/purbon/kafka/topology/model/artefact/KsqlVarsArtefact.java b/src/main/java/com/purbon/kafka/topology/model/artefact/KsqlVarsArtefact.java
index 7d6bff4de..a2831f60a 100644
--- a/src/main/java/com/purbon/kafka/topology/model/artefact/KsqlVarsArtefact.java
+++ b/src/main/java/com/purbon/kafka/topology/model/artefact/KsqlVarsArtefact.java
@@ -1,6 +1,5 @@
package com.purbon.kafka.topology.model.artefact;
-
import java.util.Map;
@TypeArtefact(name = "VARS")
diff --git a/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java b/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java
index 96c89b479..d9f388639 100644
--- a/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java
+++ b/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java
@@ -399,7 +399,7 @@ private Optional doKSqlElements(JsonParser parser, JsonNode node
if (artefactsNode.has(VARS_NODE)) {
artefactsNode.get(VARS_NODE);
varsArtefacts.setSessionVars(
- parser.getCodec().treeToValue(artefactsNode.get(VARS_NODE), Map.class));
+ parser.getCodec().treeToValue(artefactsNode.get(VARS_NODE), Map.class));
}
}
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
new file mode 100644
index 000000000..709a8b0ea
--- /dev/null
+++ b/src/main/resources/log4j.properties
@@ -0,0 +1,5 @@
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
diff --git a/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java b/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java
index 6c5c7d0ad..66bc7fa7f 100644
--- a/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java
+++ b/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java
@@ -15,7 +15,6 @@
import com.purbon.kafka.topology.model.*;
import com.purbon.kafka.topology.model.Impl.ProjectImpl;
import com.purbon.kafka.topology.model.Impl.TopologyImpl;
-import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.users.*;
import com.purbon.kafka.topology.model.users.platform.*;
import com.purbon.kafka.topology.roles.SimpleAclsProvider;
diff --git a/src/test/java/com/purbon/kafka/topology/CLITest.java b/src/test/java/com/purbon/kafka/topology/CLITest.java
index 0f72f2fd9..a19018dee 100644
--- a/src/test/java/com/purbon/kafka/topology/CLITest.java
+++ b/src/test/java/com/purbon/kafka/topology/CLITest.java
@@ -41,6 +41,7 @@ public void testParamPassing() throws Exception {
Map config = new HashMap<>();
config.put(BROKERS_OPTION, "localhost:9092");
config.put(DRY_RUN_OPTION, "false");
+ config.put(RECURSIVE_OPTION, "false");
config.put(QUIET_OPTION, "false");
config.put(VALIDATE_OPTION, "false");
config.put(CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties");
@@ -65,6 +66,7 @@ public void testDryRun() throws Exception {
Map config = new HashMap<>();
config.put(BROKERS_OPTION, "localhost:9092");
config.put(DRY_RUN_OPTION, "true");
+ config.put(RECURSIVE_OPTION, "false");
config.put(QUIET_OPTION, "false");
config.put(VALIDATE_OPTION, "false");
config.put(CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties");
diff --git a/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java b/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java
index 43bc514fc..f0db0f50c 100644
--- a/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java
+++ b/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java
@@ -1,6 +1,7 @@
package com.purbon.kafka.topology;
import static com.purbon.kafka.topology.CommandLineInterface.BROKERS_OPTION;
+import static com.purbon.kafka.topology.CommandLineInterface.RECURSIVE_OPTION;
import static com.purbon.kafka.topology.Constants.JULIE_ENABLE_MULTIPLE_CONTEXT_PER_DIR;
import static com.purbon.kafka.topology.Constants.PLATFORM_SERVERS_CONNECT;
import static org.assertj.core.api.Assertions.assertThat;
@@ -165,4 +166,19 @@ public void testInvalidTopologyFromDir() throws IOException {
String dirPath = TestUtils.getResourceFilename("/errors_dir");
TopologyObjectBuilder.build(dirPath);
}
+
+ @Test
+ public void shouldReadFilesRecursively() throws IOException {
+ Map cliOps = new HashMap<>();
+ cliOps.put(BROKERS_OPTION, "");
+ cliOps.put(RECURSIVE_OPTION, "true");
+ var props = new Properties();
+ Configuration config = new Configuration(cliOps, props);
+
+ String fileOrDirPath = TestUtils.getResourceFilename("/dir_recursive");
+ var map = TopologyObjectBuilder.build(fileOrDirPath, config);
+ assertThat(map).hasSize(1);
+ final Topology topology = map.values().iterator().next();
+ assertThat(topology.getProjects()).hasSize(4);
+ }
}
diff --git a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
index e6ee27568..abdb72f82 100644
--- a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
+++ b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
@@ -14,7 +14,6 @@
import com.purbon.kafka.topology.model.*;
import com.purbon.kafka.topology.model.Impl.ProjectImpl;
import com.purbon.kafka.topology.model.Impl.TopologyImpl;
-import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.artefact.KafkaConnectArtefact;
import com.purbon.kafka.topology.model.artefact.KsqlArtefacts;
import com.purbon.kafka.topology.model.artefact.KsqlStreamArtefact;
@@ -695,7 +694,7 @@ public void testTopicsWithDLQConfigVerify() {
assertEquals("contextOrg.source.foo.bar.avro", mainTopic.toString());
assertEquals("contextOrg.source.foo.bar.avro.dlq", dlqTopic.toString());
- assertThat(mainTopic.getConfig()).isEqualToComparingFieldByField(dlqTopic.getConfig());
+ assertThat(mainTopic.getConfig()).usingRecursiveComparison().isEqualTo(dlqTopic.getConfig());
}
@Test
diff --git a/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java b/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java
index b216ded28..398dd5a6c 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java
@@ -20,7 +20,6 @@
import com.purbon.kafka.topology.model.*;
import com.purbon.kafka.topology.model.Impl.ProjectImpl;
import com.purbon.kafka.topology.model.Impl.TopologyImpl;
-import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.users.*;
import com.purbon.kafka.topology.model.users.platform.*;
import com.purbon.kafka.topology.roles.SimpleAclsProvider;
@@ -201,6 +200,7 @@ public void shouldDetectChangesInTheRemoteClusterBetweenRuns() throws IOExceptio
Properties props = new Properties();
props.put(ALLOW_DELETE_BINDINGS, true);
props.put(JULIE_VERIFY_STATE_SYNC, true);
+ props.put(ALLOW_DELETE_TOPICS, true);
HashMap cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
@@ -212,7 +212,7 @@ public void shouldDetectChangesInTheRemoteClusterBetweenRuns() throws IOExceptio
TopologyBuilderAdminClient adminClient = new TopologyBuilderAdminClient(kafkaAdminClient);
var topology =
- TestTopologyBuilder.createProject()
+ TestTopologyBuilder.createProject(config)
.addTopic("topic1")
.addTopic("topic2")
.addConsumer("User:foo")
diff --git a/src/test/java/com/purbon/kafka/topology/integration/ConnectApiClientIT.java b/src/test/java/com/purbon/kafka/topology/integration/ConnectApiClientIT.java
index b3c515234..83638c338 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/ConnectApiClientIT.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/ConnectApiClientIT.java
@@ -1,5 +1,8 @@
package com.purbon.kafka.topology.integration;
+import static com.purbon.kafka.topology.CommandLineInterface.BROKERS_OPTION;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.purbon.kafka.topology.Configuration;
@@ -9,19 +12,15 @@
import com.purbon.kafka.topology.integration.containerutils.SaslPlaintextKafkaContainer;
import com.purbon.kafka.topology.model.Artefact;
import com.purbon.kafka.topology.model.artefact.KafkaConnectArtefact;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
-
-import static com.purbon.kafka.topology.CommandLineInterface.BROKERS_OPTION;
-import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
public class ConnectApiClientIT {
diff --git a/src/test/java/com/purbon/kafka/topology/integration/ConnectorManagerIT.java b/src/test/java/com/purbon/kafka/topology/integration/ConnectorManagerIT.java
index 81a6dc895..bba0f200e 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/ConnectorManagerIT.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/ConnectorManagerIT.java
@@ -92,6 +92,7 @@ public void shouldDetectChangesInTheRemoteClusterBetweenRuns()
props.put(TOPOLOGY_TOPIC_STATE_FROM_CLUSTER, "false");
props.put(ALLOW_DELETE_CONNECT_ARTEFACTS, "true");
props.put(JULIE_VERIFY_STATE_SYNC, true);
+ props.put(ALLOW_DELETE_TOPICS, "true");
props.put(PLATFORM_SERVERS_CONNECT + ".0", "connector0:" + connectContainer.getHttpsUrl());
File file = TestUtils.getResourceFile("/descriptor-connector.yaml");
diff --git a/src/test/java/com/purbon/kafka/topology/integration/KsqlManagerIT.java b/src/test/java/com/purbon/kafka/topology/integration/KsqlManagerIT.java
index c4243a322..5aeaf3fa3 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/KsqlManagerIT.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/KsqlManagerIT.java
@@ -89,6 +89,7 @@ public void shouldDetectChangesInTheRemoteClusterBetweenRuns() throws IOExceptio
props.put(TOPOLOGY_TOPIC_STATE_FROM_CLUSTER, "false");
props.put(ALLOW_DELETE_KSQL_ARTEFACTS, "true");
props.put(JULIE_VERIFY_STATE_SYNC, true);
+ props.put(ALLOW_DELETE_TOPICS, "true");
props.put(PLATFORM_SERVER_KSQL_URL, "http://" + client.getServer());
File file = TestUtils.getResourceFile("/descriptor-ksql.yaml");
diff --git a/src/test/java/com/purbon/kafka/topology/integration/MDSBaseTest.java b/src/test/java/com/purbon/kafka/topology/integration/MDSBaseTest.java
index 431dc2b3c..9bfb28acc 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/MDSBaseTest.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/MDSBaseTest.java
@@ -17,7 +17,9 @@ public void beforeEach() throws IOException, InterruptedException {
}
protected String getKafkaClusterID() {
-
+ /* TODO: This method is the only reason JulieOps depends on zookeeper,
+ * a component that is about to be retired. Figure out a more modern way
+ * to get the cluster id, and remove the zookeeper stuff from the code. */
try {
String nodeData = zkClient.getNodeData("/cluster/id");
return JSON.toMap(nodeData).get("id").toString();
diff --git a/src/test/java/com/purbon/kafka/topology/integration/StreamsAclIT.java b/src/test/java/com/purbon/kafka/topology/integration/StreamsAclIT.java
index fc7a85b37..b57b2c0a3 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/StreamsAclIT.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/StreamsAclIT.java
@@ -59,6 +59,7 @@ public void shouldNotProduceWithoutPermission() {
final TestStreams streams =
TestStreams.create(container, STREAMS_USERNAME, STREAMS_APP_ID, builder.build());
+
streams.start();
await()
diff --git a/src/test/java/com/purbon/kafka/topology/integration/backend/RedisBackendIT.java b/src/test/java/com/purbon/kafka/topology/integration/backend/RedisBackendIT.java
index 70ca95448..03e489cee 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/backend/RedisBackendIT.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/backend/RedisBackendIT.java
@@ -76,7 +76,7 @@ public void before() throws IOException {
final SchemaRegistryManager schemaRegistryManager =
new SchemaRegistryManager(schemaRegistryClient, System.getProperty("user.dir"));
- this.jedis = new Jedis(redis.getContainerIpAddress(), redis.getFirstMappedPort());
+ this.jedis = new Jedis(redis.getHost(), redis.getFirstMappedPort());
var backend = new RedisBackend(jedis, bucket);
this.plan = ExecutionPlan.init(new BackendController(backend), System.out);
@@ -86,7 +86,7 @@ public void before() throws IOException {
props.put(ALLOW_DELETE_TOPICS, true);
props.put(
STATE_PROCESSOR_IMPLEMENTATION_CLASS, "com.purbon.kafka.topology.backend.RedisBackend");
- props.put(REDIS_HOST_CONFIG, redis.getContainerIpAddress());
+ props.put(REDIS_HOST_CONFIG, redis.getHost());
props.put(REDIS_PORT_CONFIG, redis.getFirstMappedPort());
HashMap cliOps = new HashMap<>();
@@ -100,7 +100,7 @@ public void before() throws IOException {
@Test
public void testStoreAndFetch() throws IOException {
- String host = redis.getContainerIpAddress();
+ String host = redis.getHost();
int port = redis.getFirstMappedPort();
RedisBackend rsp = new RedisBackend(host, port, bucket);
rsp.load();
diff --git a/src/test/java/com/purbon/kafka/topology/integration/containerutils/AlternativeKafkaContainer.java b/src/test/java/com/purbon/kafka/topology/integration/containerutils/AlternativeKafkaContainer.java
index 2c9b3879f..dd4d8e056 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/containerutils/AlternativeKafkaContainer.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/containerutils/AlternativeKafkaContainer.java
@@ -118,7 +118,7 @@ private void createStartupScript(final String zookeeperConnect) {
listeners
.replaceAll(":" + KAFKA_PORT, ":" + getMappedPort(KAFKA_PORT))
.replaceAll("OTHER://0\\.0\\.0\\.0", "OTHER://kafka")
- .replaceAll("0\\.0\\.0\\.0", getContainerIpAddress()));
+ .replaceAll("0\\.0\\.0\\.0", getHost()));
final String startupScript =
overrideStartupScript(
diff --git a/src/test/java/com/purbon/kafka/topology/integration/containerutils/ConnectContainer.java b/src/test/java/com/purbon/kafka/topology/integration/containerutils/ConnectContainer.java
index e5efb29a8..342612d70 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/containerutils/ConnectContainer.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/containerutils/ConnectContainer.java
@@ -8,7 +8,7 @@
public class ConnectContainer extends GenericContainer {
private static final DockerImageName DEFAULT_IMAGE =
- DockerImageName.parse("confluentinc/cp-kafka-connect").withTag("6.0.2");
+ DockerImageName.parse("confluentinc/cp-kafka-connect").withTag("7.5.0");
private static int CONNECT_PORT = 8083;
private static int CONNECT_SSL_PORT = 8084;
@@ -39,7 +39,7 @@ public ConnectContainer(
withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1");
withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1");
withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1");
- withEnv("CONNECT_PLUGIN_PATH", "/usr/share/java");
+ withEnv("CONNECT_PLUGIN_PATH", "/usr/share/java,/usr/share/filestream-connectors");
withEnv(
"CONNECT_LISTENERS",
"http://0.0.0.0:" + CONNECT_PORT + ", https://0.0.0.0:" + CONNECT_SSL_PORT);
diff --git a/src/test/java/com/purbon/kafka/topology/integration/containerutils/ContainerTestUtils.java b/src/test/java/com/purbon/kafka/topology/integration/containerutils/ContainerTestUtils.java
index 52a3f7ca6..fcae8f0dd 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/containerutils/ContainerTestUtils.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/containerutils/ContainerTestUtils.java
@@ -17,7 +17,7 @@
public final class ContainerTestUtils {
- static final String DEFAULT_CP_KAFKA_VERSION = "6.1.0";
+ static final String DEFAULT_CP_KAFKA_VERSION = "7.5.0";
private ContainerTestUtils() {}
diff --git a/src/test/java/com/purbon/kafka/topology/integration/containerutils/KsqlContainer.java b/src/test/java/com/purbon/kafka/topology/integration/containerutils/KsqlContainer.java
index 1cdfd2afb..5526090da 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/containerutils/KsqlContainer.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/containerutils/KsqlContainer.java
@@ -7,7 +7,7 @@
public class KsqlContainer extends GenericContainer {
private static final DockerImageName DEFAULT_IMAGE =
- DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.21.0");
+ DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.26.0");
public static final int KSQL_PORT = 8088;
diff --git a/src/test/java/com/purbon/kafka/topology/integration/containerutils/SchemaRegistryContainer.java b/src/test/java/com/purbon/kafka/topology/integration/containerutils/SchemaRegistryContainer.java
index 0981095c8..389f2bbf0 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/containerutils/SchemaRegistryContainer.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/containerutils/SchemaRegistryContainer.java
@@ -6,7 +6,7 @@
public class SchemaRegistryContainer extends GenericContainer {
private static final DockerImageName DEFAULT_IMAGE =
- DockerImageName.parse("confluentinc/cp-schema-registry").withTag("6.0.2");
+ DockerImageName.parse("confluentinc/cp-schema-registry").withTag("7.5.0");
public static final int SR_PORT = 8081;
diff --git a/src/test/java/com/purbon/kafka/topology/integration/containerutils/TestStreams.java b/src/test/java/com/purbon/kafka/topology/integration/containerutils/TestStreams.java
index e7ca98473..0263c5c15 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/containerutils/TestStreams.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/containerutils/TestStreams.java
@@ -1,7 +1,8 @@
package com.purbon.kafka.topology.integration.containerutils;
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
import java.io.Closeable;
-import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -10,6 +11,7 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
public final class TestStreams implements Closeable {
@@ -49,9 +51,11 @@ private static TestStreams create(
public void start() {
setUncaughtExceptionHandler(
- (t, e) ->
- topicAuthorizationExceptionThrown =
- ExceptionUtils.indexOfType(e, TopicAuthorizationException.class) > 0);
+ t -> {
+ topicAuthorizationExceptionThrown =
+ ExceptionUtils.indexOfType(t, TopicAuthorizationException.class) > 0;
+ return SHUTDOWN_CLIENT;
+ });
streams.start();
}
@@ -60,7 +64,7 @@ public void close() {
streams.close();
}
- public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
+ public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh) {
streams.setUncaughtExceptionHandler(eh);
}
diff --git a/src/main/java/com/purbon/kafka/topology/utils/ZKClient.java b/src/test/java/com/purbon/kafka/topology/utils/ZKClient.java
similarity index 100%
rename from src/main/java/com/purbon/kafka/topology/utils/ZKClient.java
rename to src/test/java/com/purbon/kafka/topology/utils/ZKClient.java
diff --git a/src/main/java/com/purbon/kafka/topology/utils/ZKConnection.java b/src/test/java/com/purbon/kafka/topology/utils/ZKConnection.java
similarity index 100%
rename from src/main/java/com/purbon/kafka/topology/utils/ZKConnection.java
rename to src/test/java/com/purbon/kafka/topology/utils/ZKConnection.java
diff --git a/src/test/resources/dir_recursive/middle/bottom/bottom1.yml b/src/test/resources/dir_recursive/middle/bottom/bottom1.yml
new file mode 100644
index 000000000..d42b9bccb
--- /dev/null
+++ b/src/test/resources/dir_recursive/middle/bottom/bottom1.yml
@@ -0,0 +1,6 @@
+---
+context: "context"
+projects:
+ - name: "bottom-project"
+ topics:
+ - name: "bottom1"
diff --git a/src/test/resources/dir_recursive/middle/middle1.yml b/src/test/resources/dir_recursive/middle/middle1.yml
new file mode 100644
index 000000000..ede87406a
--- /dev/null
+++ b/src/test/resources/dir_recursive/middle/middle1.yml
@@ -0,0 +1,6 @@
+---
+context: "context"
+projects:
+ - name: "middle-project1"
+ topics:
+ - name: "middle1"
diff --git a/src/test/resources/dir_recursive/middle/middle2.yml b/src/test/resources/dir_recursive/middle/middle2.yml
new file mode 100644
index 000000000..f212652c0
--- /dev/null
+++ b/src/test/resources/dir_recursive/middle/middle2.yml
@@ -0,0 +1,6 @@
+---
+context: "context"
+projects:
+ - name: "middle-project2"
+ topics:
+ - name: "middle2"
diff --git a/src/test/resources/dir_recursive/top1.yml b/src/test/resources/dir_recursive/top1.yml
new file mode 100644
index 000000000..492fe6a19
--- /dev/null
+++ b/src/test/resources/dir_recursive/top1.yml
@@ -0,0 +1,6 @@
+---
+context: "context"
+projects:
+ - name: "top-project"
+ topics:
+ - name: "top1"
diff --git a/troubleshooting.md b/troubleshooting.md
index f33e1e1c7..3c876c2cb 100644
--- a/troubleshooting.md
+++ b/troubleshooting.md
@@ -1,10 +1,10 @@
# TroubleShooting JulieOps
-## State management
+## State management
By default, JulieOps creates a hidden file on the directory you are running julie-ops from by the name `.cluster-state`,
this file captures the state of all actions performed JulieOps.
-*Note*: This is the necessary state for JulieOps to operate, something like the state in terraform.
-The state could be managed as well using other mechanisms like a kafka topic, an S3 bucket, etc.
-Look what your system has configured
\ No newline at end of file
+*Note*: This is the necessary state for JulieOps to operate, something like the state in terraform.
+The state could be managed as well using other mechanisms like a kafka topic, an S3 bucket, etc.
+Look what your system has configured