Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Support env variable substitution in topology configuration files #532

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@
<log4j.version>2.17.1</log4j.version>
<zookeeper.version>3.5.7</zookeeper.version>
<commons.version>1.4</commons.version>
<commons-text.version>1.10.0</commons-text.version>
<mockito.version>3.6.0</mockito.version>
<junit.version>4.13.1</junit.version>
<testcontainers.version>1.15.3</testcontainers.version>
Expand Down Expand Up @@ -735,7 +736,11 @@
<version>2.33.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>${commons-text.version}</version>
</dependency>
</dependencies>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private ListServiceAccountResponse getListServiceAccounts(String url, int page_s
throws IOException {
String requestUrl = url;
if (!url.contains("page_token")) {
requestUrl = String.format("%s?page_size=%d", url, page_size);
requestUrl = String.format("%s?page_size=%d", url, page_size);
}
Response r = ccloudApiHttpClient.doGet(requestUrl);
return (ListServiceAccountResponse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,13 @@ public List<TopologyAclBinding> setSchemaAuthorization(
Boolean shouldOptimizeAcls,
String namePrefix) {
if (shouldOptimizeAcls) {
return setDetailedSchemaAuthorization(principal, role, namePrefix);
return setOptimizedSchemaAuthorization(principal, role, namePrefix);
} else {
return setOptimizedSchemaAuthorization(principal, subjects, role, prefixed);
return setDetailedSchemaAuthorization(principal, subjects, role, prefixed);
}
}

private List<TopologyAclBinding> setDetailedSchemaAuthorization(
private List<TopologyAclBinding> setOptimizedSchemaAuthorization(
String principal, String role, String namePrefix) {
return List.of(
apiClient
Expand All @@ -467,7 +467,7 @@ private List<TopologyAclBinding> setDetailedSchemaAuthorization(
.apply("SUBJECT", namePrefix, PatternType.PREFIXED.name()));
}

private List<TopologyAclBinding> setOptimizedSchemaAuthorization(
private List<TopologyAclBinding> setDetailedSchemaAuthorization(
String principal, List<String> subjects, String role, boolean prefixed) {

String patternType = prefixed ? PatternType.PREFIXED.name() : PatternType.LITERAL.name();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.purbon.kafka.topology.serdes;

import com.purbon.kafka.topology.exceptions.TopologyParsingException;

import org.apache.commons.text.StringSubstitutor;
import org.apache.commons.text.lookup.StringLookupFactory;

/**
* A simple substitutor which substitutes system properties in the topology file. If a property is
* not found, it throws a {@link com.purbon.kafka.topology.exceptions.TopologyParsingException}.
*/
public class SystemPropertySubstitutor {
public SystemPropertySubstitutor() {
}

public String replace(String original) {
try {
return new StringSubstitutor(StringLookupFactory.INSTANCE.systemPropertyStringLookup())
.setEnableUndefinedVariableException(true)
.replace(original);
} catch (IllegalArgumentException ex) {
throw new TopologyParsingException("A variable was not resolved: ", ex);
}
}
}
14 changes: 10 additions & 4 deletions src/main/java/com/purbon/kafka/topology/serdes/TopologySerdes.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.Topology;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class TopologySerdes {

private ObjectMapper mapper;
private final ObjectMapper mapper;
private final SystemPropertySubstitutor systemPropertySubstitutor;

public enum FileType {
JSON,
Expand All @@ -32,11 +35,13 @@ public TopologySerdes(Configuration config, PlanMap plans) {

public TopologySerdes(Configuration config, FileType type, PlanMap plans) {
mapper = ObjectMapperFactory.build(type, config, plans);
systemPropertySubstitutor = new SystemPropertySubstitutor();
}

public Topology deserialise(File file) {
try {
return mapper.readValue(file, Topology.class);
try (FileInputStream inputStream = new FileInputStream(file)) {
String content = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
return deserialise(content);
} catch (IOException e) {
throw new TopologyParsingException(
"Failed to deserialize topology from " + file.getPath(), e);
Expand All @@ -45,7 +50,8 @@ public Topology deserialise(File file) {

public Topology deserialise(String content) {
try {
return mapper.readValue(content, Topology.class);
String substitutedContent = systemPropertySubstitutor.replace(content);
return mapper.readValue(substitutedContent, Topology.class);
} catch (IOException e) {
throw new TopologyParsingException("Failed to deserialize topology from " + content, e);
}
Expand Down
38 changes: 38 additions & 0 deletions src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.DEVELOPER_READ;
import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.RESOURCE_OWNER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -35,12 +36,20 @@
import java.util.*;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class TopologySerdesTest {

private TopologySerdes parser;

@BeforeClass
public static void beforeClass() {
System.setProperty("env", "staging");
System.setProperty("source", "partner");
System.setProperty("project", "my_project");
}

@Before
public void setup() {
Properties props = new Properties();
Expand Down Expand Up @@ -70,6 +79,35 @@ public void testMetadata() {
.containsKey("system");
}

@Test
public void testSystemPropertySubstitution() {
// Given
// System properties already set in the setup fonction

// When
Topology topology =
parser.deserialise(TestUtils.getResourceFile("/descriptor-with-system-properties.yaml"));
Project project = topology.getProjects().get(0);

// Then
assertThat(project.getName()).isEqualTo("my_project");
assertThat(project.namePrefix()).isEqualTo("staging.partner.my_project.");
}

@Test
public void testInvalidSystemPropertySubstitution() {
// Given
// System properties already set in the setup fonction

// When
// Then
assertThatThrownBy(
() ->
parser.deserialise(
TestUtils.getResourceFile("/descriptor-with-unknown-system-properties.yaml")))
.isExactlyInstanceOf(TopologyParsingException.class);
}

@Test
public void testStreamsParsingOnlyReadTopicsShouldNotParseAsNull() {
Topology topology =
Expand Down
117 changes: 58 additions & 59 deletions src/test/java/com/purbon/kafka/topology/api/ccloud/CCloudApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@
import com.purbon.kafka.topology.model.cluster.ServiceAccount;
import com.purbon.kafka.topology.roles.TopologyAclBinding;
import java.io.IOException;
import java.net.http.HttpResponse;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -130,70 +128,71 @@ public void testDeleteServiceAccount() throws IOException {
@Test
public void listServiceAccountsShouldAcceptPage() throws IOException {

String body01 = "{\n" +
" \"api_version\": \"iam/v2\",\n" +
" \"kind\": \"ServiceAccountList\",\n" +
" \"metadata\": {\n" +
" \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n" +
" \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n" +
" \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n" +
" \"next\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=UvmDWOB1iwfAIBPj6EYb\",\n" +
" \"total_size\": 123\n" +
" },\n" +
" \"data\": [\n" +
" {\n" +
" \"api_version\": \"iam/v2\",\n" +
" \"kind\": \"ServiceAccount\",\n" +
" \"id\": \"dlz-f3a90de\",\n" +
" \"metadata\": {\n" +
" \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n" +
" \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n" +
" \"created_at\": \"2006-01-02T15:04:05-07:00\",\n" +
" \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n" +
" \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n" +
" },\n" +
" \"display_name\": \"DeLorean_auto_repair\",\n" +
" \"description\": \"Doc's repair bot for the DeLorean\"\n" +
" }\n" +
" ]\n" +
"}";
String body01 =
"{\n"
+ " \"api_version\": \"iam/v2\",\n"
+ " \"kind\": \"ServiceAccountList\",\n"
+ " \"metadata\": {\n"
+ " \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n"
+ " \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n"
+ " \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n"
+ " \"next\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=UvmDWOB1iwfAIBPj6EYb\",\n"
+ " \"total_size\": 123\n"
+ " },\n"
+ " \"data\": [\n"
+ " {\n"
+ " \"api_version\": \"iam/v2\",\n"
+ " \"kind\": \"ServiceAccount\",\n"
+ " \"id\": \"dlz-f3a90de\",\n"
+ " \"metadata\": {\n"
+ " \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n"
+ " \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n"
+ " \"created_at\": \"2006-01-02T15:04:05-07:00\",\n"
+ " \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n"
+ " \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n"
+ " },\n"
+ " \"display_name\": \"DeLorean_auto_repair\",\n"
+ " \"description\": \"Doc's repair bot for the DeLorean\"\n"
+ " }\n"
+ " ]\n"
+ "}";

Response response01 = new Response(null, 200, body01);

when(httpClient.doGet(String.format("%s?page_size=%d", V2_IAM_SERVICE_ACCOUNTS_URL, 1)))
.thenReturn(response01);

String body02 = "{\n" +
" \"api_version\": \"iam/v2\",\n" +
" \"kind\": \"ServiceAccountList\",\n" +
" \"metadata\": {\n" +
" \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n" +
" \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n" +
" \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n" +
" \"total_size\": 123\n" +
" },\n" +
" \"data\": [\n" +
" {\n" +
" \"api_version\": \"iam/v2\",\n" +
" \"kind\": \"ServiceAccount\",\n" +
" \"id\": \"abc-f3a90de\",\n" +
" \"metadata\": {\n" +
" \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n" +
" \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n" +
" \"created_at\": \"2006-01-02T15:04:05-07:00\",\n" +
" \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n" +
" \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n" +
" },\n" +
" \"display_name\": \"MacFly\",\n" +
" \"description\": \"Doc's repair bot for the MacFly\"\n" +
" }\n" +
" ]\n" +
"}";
Response response02 = new Response(null, 200, body02);
.thenReturn(response01);

String body02 =
"{\n"
+ " \"api_version\": \"iam/v2\",\n"
+ " \"kind\": \"ServiceAccountList\",\n"
+ " \"metadata\": {\n"
+ " \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n"
+ " \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n"
+ " \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n"
+ " \"total_size\": 123\n"
+ " },\n"
+ " \"data\": [\n"
+ " {\n"
+ " \"api_version\": \"iam/v2\",\n"
+ " \"kind\": \"ServiceAccount\",\n"
+ " \"id\": \"abc-f3a90de\",\n"
+ " \"metadata\": {\n"
+ " \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n"
+ " \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n"
+ " \"created_at\": \"2006-01-02T15:04:05-07:00\",\n"
+ " \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n"
+ " \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n"
+ " },\n"
+ " \"display_name\": \"MacFly\",\n"
+ " \"description\": \"Doc's repair bot for the MacFly\"\n"
+ " }\n"
+ " ]\n"
+ "}";
Response response02 = new Response(null, 200, body02);

when(httpClient.doGet("/iam/v2/service-accounts?page_token=UvmDWOB1iwfAIBPj6EYb"))
.thenReturn(response02);
.thenReturn(response02);

Set<ServiceAccount> accounts = apiClient.listServiceAccounts();
assertThat(accounts).hasSize(2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.purbon.kafka.topology.serdes;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.purbon.kafka.topology.exceptions.TopologyParsingException;
import org.junit.Test;

public class SystemPropertySubstitutorTest {
@Test
public void standardReplacement() {
// Given
System.getProperties().setProperty("env", "staging");
System.getProperties().setProperty("project", "my_project");
SystemPropertySubstitutor systemPropertySubstitutor = new SystemPropertySubstitutor();

// When
String result = systemPropertySubstitutor.replace("context: ${env}\n" + "project: ${project}");

// Then
assertThat(result).isEqualTo("context: staging\n" + "project: my_project");
}

@Test
public void notFoundKeyReplacement() {
// Given
SystemPropertySubstitutor systemPropertySubstitutor = new SystemPropertySubstitutor();

// When
// Then
assertThatThrownBy(() -> systemPropertySubstitutor.replace("${not_found_key}"))
.isExactlyInstanceOf(TopologyParsingException.class);
}
}
26 changes: 26 additions & 0 deletions src/test/resources/descriptor-with-system-properties.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
context: ${env}
source: ${source}
projects:
- name: ${project}
consumers:
- principal: "User:App0"
metadata:
system: "OwnerSystem0"
producers:
- principal: "User:App1"
metadata:
contactInfo: "app1@company.com"
topics:
- name: "topicA"
consumers:
- principal: "User:App4"
metadata:
system: "OwnerSystem4"
config:
replication.factor: "1"
num.partitions: "1"
- name: "topicB"
dataType: "avro"
schemas:
value.schema.file: "schemas/bar-value.avsc"
Loading