diff --git a/pom.xml b/pom.xml
index 7273aa7ee..2e461c08b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -531,6 +531,7 @@
2.17.1
3.5.7
1.4
+ 1.10.0
3.6.0
4.13.1
1.15.3
@@ -735,7 +736,11 @@
2.33.1
test
-
+
+ org.apache.commons
+ commons-text
+ ${commons-text.version}
+
diff --git a/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java b/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java
index 0cad01271..8461f6d81 100644
--- a/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java
+++ b/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java
@@ -159,7 +159,7 @@ private ListServiceAccountResponse getListServiceAccounts(String url, int page_s
throws IOException {
String requestUrl = url;
if (!url.contains("page_token")) {
- requestUrl = String.format("%s?page_size=%d", url, page_size);
+ requestUrl = String.format("%s?page_size=%d", url, page_size);
}
Response r = ccloudApiHttpClient.doGet(requestUrl);
return (ListServiceAccountResponse)
diff --git a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java
index a1fa4abce..1276cff98 100644
--- a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java
+++ b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java
@@ -452,13 +452,13 @@ public List setSchemaAuthorization(
Boolean shouldOptimizeAcls,
String namePrefix) {
if (shouldOptimizeAcls) {
- return setDetailedSchemaAuthorization(principal, role, namePrefix);
+ return setOptimizedSchemaAuthorization(principal, role, namePrefix);
} else {
- return setOptimizedSchemaAuthorization(principal, subjects, role, prefixed);
+ return setDetailedSchemaAuthorization(principal, subjects, role, prefixed);
}
}
- private List setDetailedSchemaAuthorization(
+ private List setOptimizedSchemaAuthorization(
String principal, String role, String namePrefix) {
return List.of(
apiClient
@@ -467,7 +467,7 @@ private List setDetailedSchemaAuthorization(
.apply("SUBJECT", namePrefix, PatternType.PREFIXED.name()));
}
- private List setOptimizedSchemaAuthorization(
+ private List setDetailedSchemaAuthorization(
String principal, List subjects, String role, boolean prefixed) {
String patternType = prefixed ? PatternType.PREFIXED.name() : PatternType.LITERAL.name();
diff --git a/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java b/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java
new file mode 100644
index 000000000..94e5dbb80
--- /dev/null
+++ b/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java
@@ -0,0 +1,25 @@
+package com.purbon.kafka.topology.serdes;
+
+import com.purbon.kafka.topology.exceptions.TopologyParsingException;
+
+import org.apache.commons.text.StringSubstitutor;
+import org.apache.commons.text.lookup.StringLookupFactory;
+
+/**
+ * A simple substitutor which substitutes system properties in the topology file. If a property is
+ * not found, it throws a {@link com.purbon.kafka.topology.exceptions.TopologyParsingException}.
+ */
+public class SystemPropertySubstitutor {
+ public SystemPropertySubstitutor() {
+ }
+
+ public String replace(String original) {
+ try {
+ return new StringSubstitutor(StringLookupFactory.INSTANCE.systemPropertyStringLookup())
+ .setEnableUndefinedVariableException(true)
+ .replace(original);
+ } catch (IllegalArgumentException ex) {
+ throw new TopologyParsingException("A variable was not resolved: ", ex);
+ }
+ }
+}
diff --git a/src/main/java/com/purbon/kafka/topology/serdes/TopologySerdes.java b/src/main/java/com/purbon/kafka/topology/serdes/TopologySerdes.java
index 3c9e0b4cf..7c12899f9 100644
--- a/src/main/java/com/purbon/kafka/topology/serdes/TopologySerdes.java
+++ b/src/main/java/com/purbon/kafka/topology/serdes/TopologySerdes.java
@@ -11,11 +11,14 @@
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.Topology;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
public class TopologySerdes {
- private ObjectMapper mapper;
+ private final ObjectMapper mapper;
+ private final SystemPropertySubstitutor systemPropertySubstitutor;
public enum FileType {
JSON,
@@ -32,11 +35,13 @@ public TopologySerdes(Configuration config, PlanMap plans) {
public TopologySerdes(Configuration config, FileType type, PlanMap plans) {
mapper = ObjectMapperFactory.build(type, config, plans);
+ systemPropertySubstitutor = new SystemPropertySubstitutor();
}
public Topology deserialise(File file) {
- try {
- return mapper.readValue(file, Topology.class);
+ try (FileInputStream inputStream = new FileInputStream(file)) {
+ String content = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
+ return deserialise(content);
} catch (IOException e) {
throw new TopologyParsingException(
"Failed to deserialize topology from " + file.getPath(), e);
@@ -45,7 +50,8 @@ public Topology deserialise(File file) {
public Topology deserialise(String content) {
try {
- return mapper.readValue(content, Topology.class);
+ String substitutedContent = systemPropertySubstitutor.replace(content);
+ return mapper.readValue(substitutedContent, Topology.class);
} catch (IOException e) {
throw new TopologyParsingException("Failed to deserialize topology from " + content, e);
}
diff --git a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
index b86c39c35..18d84f0f1 100644
--- a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
+++ b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
@@ -7,6 +7,7 @@
import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.DEVELOPER_READ;
import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.RESOURCE_OWNER;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -35,12 +36,20 @@
import java.util.*;
import java.util.stream.Collectors;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TopologySerdesTest {
private TopologySerdes parser;
+ @BeforeClass
+ public static void beforeClass() {
+ System.setProperty("env", "staging");
+ System.setProperty("source", "partner");
+ System.setProperty("project", "my_project");
+ }
+
@Before
public void setup() {
Properties props = new Properties();
@@ -70,6 +79,35 @@ public void testMetadata() {
.containsKey("system");
}
+ @Test
+ public void testSystemPropertySubstitution() {
+ // Given
+ // System properties already set in the setup fonction
+
+ // When
+ Topology topology =
+ parser.deserialise(TestUtils.getResourceFile("/descriptor-with-system-properties.yaml"));
+ Project project = topology.getProjects().get(0);
+
+ // Then
+ assertThat(project.getName()).isEqualTo("my_project");
+ assertThat(project.namePrefix()).isEqualTo("staging.partner.my_project.");
+ }
+
+ @Test
+ public void testInvalidSystemPropertySubstitution() {
+ // Given
+ // System properties already set in the setup fonction
+
+ // When
+ // Then
+ assertThatThrownBy(
+ () ->
+ parser.deserialise(
+ TestUtils.getResourceFile("/descriptor-with-unknown-system-properties.yaml")))
+ .isExactlyInstanceOf(TopologyParsingException.class);
+ }
+
@Test
public void testStreamsParsingOnlyReadTopicsShouldNotParseAsNull() {
Topology topology =
diff --git a/src/test/java/com/purbon/kafka/topology/api/ccloud/CCloudApiTest.java b/src/test/java/com/purbon/kafka/topology/api/ccloud/CCloudApiTest.java
index ec4d0c3eb..fa10f2a3c 100644
--- a/src/test/java/com/purbon/kafka/topology/api/ccloud/CCloudApiTest.java
+++ b/src/test/java/com/purbon/kafka/topology/api/ccloud/CCloudApiTest.java
@@ -14,13 +14,11 @@
import com.purbon.kafka.topology.model.cluster.ServiceAccount;
import com.purbon.kafka.topology.roles.TopologyAclBinding;
import java.io.IOException;
-import java.net.http.HttpResponse;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
-
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -130,70 +128,71 @@ public void testDeleteServiceAccount() throws IOException {
@Test
public void listServiceAccountsShouldAcceptPage() throws IOException {
- String body01 = "{\n" +
- " \"api_version\": \"iam/v2\",\n" +
- " \"kind\": \"ServiceAccountList\",\n" +
- " \"metadata\": {\n" +
- " \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n" +
- " \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n" +
- " \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n" +
- " \"next\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=UvmDWOB1iwfAIBPj6EYb\",\n" +
- " \"total_size\": 123\n" +
- " },\n" +
- " \"data\": [\n" +
- " {\n" +
- " \"api_version\": \"iam/v2\",\n" +
- " \"kind\": \"ServiceAccount\",\n" +
- " \"id\": \"dlz-f3a90de\",\n" +
- " \"metadata\": {\n" +
- " \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n" +
- " \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n" +
- " \"created_at\": \"2006-01-02T15:04:05-07:00\",\n" +
- " \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n" +
- " \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n" +
- " },\n" +
- " \"display_name\": \"DeLorean_auto_repair\",\n" +
- " \"description\": \"Doc's repair bot for the DeLorean\"\n" +
- " }\n" +
- " ]\n" +
- "}";
+ String body01 =
+ "{\n"
+ + " \"api_version\": \"iam/v2\",\n"
+ + " \"kind\": \"ServiceAccountList\",\n"
+ + " \"metadata\": {\n"
+ + " \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n"
+ + " \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n"
+ + " \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n"
+ + " \"next\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=UvmDWOB1iwfAIBPj6EYb\",\n"
+ + " \"total_size\": 123\n"
+ + " },\n"
+ + " \"data\": [\n"
+ + " {\n"
+ + " \"api_version\": \"iam/v2\",\n"
+ + " \"kind\": \"ServiceAccount\",\n"
+ + " \"id\": \"dlz-f3a90de\",\n"
+ + " \"metadata\": {\n"
+ + " \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n"
+ + " \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n"
+ + " \"created_at\": \"2006-01-02T15:04:05-07:00\",\n"
+ + " \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n"
+ + " \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n"
+ + " },\n"
+ + " \"display_name\": \"DeLorean_auto_repair\",\n"
+ + " \"description\": \"Doc's repair bot for the DeLorean\"\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
Response response01 = new Response(null, 200, body01);
when(httpClient.doGet(String.format("%s?page_size=%d", V2_IAM_SERVICE_ACCOUNTS_URL, 1)))
- .thenReturn(response01);
-
- String body02 = "{\n" +
- " \"api_version\": \"iam/v2\",\n" +
- " \"kind\": \"ServiceAccountList\",\n" +
- " \"metadata\": {\n" +
- " \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n" +
- " \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n" +
- " \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n" +
- " \"total_size\": 123\n" +
- " },\n" +
- " \"data\": [\n" +
- " {\n" +
- " \"api_version\": \"iam/v2\",\n" +
- " \"kind\": \"ServiceAccount\",\n" +
- " \"id\": \"abc-f3a90de\",\n" +
- " \"metadata\": {\n" +
- " \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n" +
- " \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n" +
- " \"created_at\": \"2006-01-02T15:04:05-07:00\",\n" +
- " \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n" +
- " \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n" +
- " },\n" +
- " \"display_name\": \"MacFly\",\n" +
- " \"description\": \"Doc's repair bot for the MacFly\"\n" +
- " }\n" +
- " ]\n" +
- "}";
- Response response02 = new Response(null, 200, body02);
+ .thenReturn(response01);
+ String body02 =
+ "{\n"
+ + " \"api_version\": \"iam/v2\",\n"
+ + " \"kind\": \"ServiceAccountList\",\n"
+ + " \"metadata\": {\n"
+ + " \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n"
+ + " \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n"
+ + " \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n"
+ + " \"total_size\": 123\n"
+ + " },\n"
+ + " \"data\": [\n"
+ + " {\n"
+ + " \"api_version\": \"iam/v2\",\n"
+ + " \"kind\": \"ServiceAccount\",\n"
+ + " \"id\": \"abc-f3a90de\",\n"
+ + " \"metadata\": {\n"
+ + " \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n"
+ + " \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n"
+ + " \"created_at\": \"2006-01-02T15:04:05-07:00\",\n"
+ + " \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n"
+ + " \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n"
+ + " },\n"
+ + " \"display_name\": \"MacFly\",\n"
+ + " \"description\": \"Doc's repair bot for the MacFly\"\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+ Response response02 = new Response(null, 200, body02);
when(httpClient.doGet("/iam/v2/service-accounts?page_token=UvmDWOB1iwfAIBPj6EYb"))
- .thenReturn(response02);
+ .thenReturn(response02);
Set accounts = apiClient.listServiceAccounts();
assertThat(accounts).hasSize(2);
diff --git a/src/test/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutorTest.java b/src/test/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutorTest.java
new file mode 100644
index 000000000..1f061ee21
--- /dev/null
+++ b/src/test/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutorTest.java
@@ -0,0 +1,34 @@
+package com.purbon.kafka.topology.serdes;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.purbon.kafka.topology.exceptions.TopologyParsingException;
+import org.junit.Test;
+
+public class SystemPropertySubstitutorTest {
+ @Test
+ public void standardReplacement() {
+ // Given
+ System.getProperties().setProperty("env", "staging");
+ System.getProperties().setProperty("project", "my_project");
+ SystemPropertySubstitutor systemPropertySubstitutor = new SystemPropertySubstitutor();
+
+ // When
+ String result = systemPropertySubstitutor.replace("context: ${env}\n" + "project: ${project}");
+
+ // Then
+ assertThat(result).isEqualTo("context: staging\n" + "project: my_project");
+ }
+
+ @Test
+ public void notFoundKeyReplacement() {
+ // Given
+ SystemPropertySubstitutor systemPropertySubstitutor = new SystemPropertySubstitutor();
+
+ // When
+ // Then
+ assertThatThrownBy(() -> systemPropertySubstitutor.replace("${not_found_key}"))
+ .isExactlyInstanceOf(TopologyParsingException.class);
+ }
+}
diff --git a/src/test/resources/descriptor-with-system-properties.yaml b/src/test/resources/descriptor-with-system-properties.yaml
new file mode 100644
index 000000000..c501a84e5
--- /dev/null
+++ b/src/test/resources/descriptor-with-system-properties.yaml
@@ -0,0 +1,26 @@
+---
+context: ${env}
+source: ${source}
+projects:
+ - name: ${project}
+ consumers:
+ - principal: "User:App0"
+ metadata:
+ system: "OwnerSystem0"
+ producers:
+ - principal: "User:App1"
+ metadata:
+ contactInfo: "app1@company.com"
+ topics:
+ - name: "topicA"
+ consumers:
+ - principal: "User:App4"
+ metadata:
+ system: "OwnerSystem4"
+ config:
+ replication.factor: "1"
+ num.partitions: "1"
+ - name: "topicB"
+ dataType: "avro"
+ schemas:
+ value.schema.file: "schemas/bar-value.avsc"
diff --git a/src/test/resources/descriptor-with-unknown-system-properties.yaml b/src/test/resources/descriptor-with-unknown-system-properties.yaml
new file mode 100644
index 000000000..5683c09d5
--- /dev/null
+++ b/src/test/resources/descriptor-with-unknown-system-properties.yaml
@@ -0,0 +1,26 @@
+---
+context: ${unknown-env}
+source: ${source}
+projects:
+ - name: ${project}
+ consumers:
+ - principal: "User:App0"
+ metadata:
+ system: "OwnerSystem0"
+ producers:
+ - principal: "User:App1"
+ metadata:
+ contactInfo: "app1@company.com"
+ topics:
+ - name: "topicA"
+ consumers:
+ - principal: "User:App4"
+ metadata:
+ system: "OwnerSystem4"
+ config:
+ replication.factor: "1"
+ num.partitions: "1"
+ - name: "topicB"
+ dataType: "avro"
+ schemas:
+ value.schema.file: "schemas/bar-value.avsc"