diff --git a/src/main/java/com/purbon/kafka/topology/model/Impl/ProjectImpl.java b/src/main/java/com/purbon/kafka/topology/model/Impl/ProjectImpl.java index bb22bd031..0de1e61ef 100644 --- a/src/main/java/com/purbon/kafka/topology/model/Impl/ProjectImpl.java +++ b/src/main/java/com/purbon/kafka/topology/model/Impl/ProjectImpl.java @@ -212,7 +212,10 @@ private String patternBasedProjectPrefix() { private String namePrefix(String topologyPrefix) { StringBuilder sb = new StringBuilder(); - sb.append(topologyPrefix).append(config.getTopicPrefixSeparator()).append(name); + sb.append(topologyPrefix) + .append(config.getTopicPrefixSeparator()) + .append(name) + .append(config.getTopicPrefixSeparator()); return sb.toString(); } diff --git a/src/main/java/com/purbon/kafka/topology/model/Topic.java b/src/main/java/com/purbon/kafka/topology/model/Topic.java index a272705f9..2b621fa75 100644 --- a/src/main/java/com/purbon/kafka/topology/model/Topic.java +++ b/src/main/java/com/purbon/kafka/topology/model/Topic.java @@ -161,7 +161,7 @@ private String patternBasedTopicNameStructureString() { private String defaultTopicStructureString(String projectPrefix) { StringBuilder sb = new StringBuilder(); if (projectPrefix != null && !projectPrefix.isBlank()) { - sb.append(projectPrefix).append(appConfig.getTopicPrefixSeparator()); + sb.append(projectPrefix); } sb.append(getName()); diff --git a/src/main/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidation.java b/src/main/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidation.java index 65c5c96fb..808fc1501 100644 --- a/src/main/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidation.java +++ b/src/main/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidation.java @@ -10,8 +10,7 @@ public class MinInSyncReplicasValidation implements TopicValidation { @Override public void valid(Topic topic) throws ValidationException { - if (topic.replicationFactor().isPresent() - && !validateMinInsyncReplicas(topic)) { + if (topic.replicationFactor().isPresent() && !validateMinInsyncReplicas(topic)) { String msg = String.format( "Topic %s has an unexpected min.insync.replicas config vs it's replication factor: %s value", @@ -21,9 +20,9 @@ public void valid(Topic topic) throws ValidationException { } private boolean validateMinInsyncReplicas(Topic topic) { - short replicationFactor = topic.replicationFactor().orElse((short)-1); + short replicationFactor = topic.replicationFactor().orElse((short) -1); String minInSyncReplicas = topic.getConfig().get(MIN_INSYNC_REPLICAS); - return minInSyncReplicas == null || (Integer.parseInt(minInSyncReplicas) <= replicationFactor - 1); + return minInSyncReplicas == null + || (Integer.parseInt(minInSyncReplicas) <= replicationFactor - 1); } - } diff --git a/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java b/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java index 17fc0d526..6c5c7d0ad 100644 --- a/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java +++ b/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java @@ -101,7 +101,7 @@ public void newConsumerOptimisedACLsCreation() throws IOException { .buildBindingsForConsumers(users, builder.getProject().namePrefix(), true); accessControlManager.updatePlan(builder.buildTopology(), plan); verify(aclsBuilder, times(1)) - .buildBindingsForConsumers(eq(users), eq(builder.getProject().namePrefix()), eq(true)); + .buildBindingsForConsumers(eq(users), eq("ctx.project."), eq(true)); } @Test @@ -177,7 +177,7 @@ public void newProducerOptimizedACLsCreation() throws IOException { .buildBindingsForProducers(producers, builder.getProject().namePrefix(), true); accessControlManager.updatePlan(builder.buildTopology(), plan); verify(aclsBuilder, times(1)) - .buildBindingsForProducers(eq(producers), eq(builder.getProject().namePrefix()), eq(true)); + .buildBindingsForProducers(eq(producers), eq("ctx.project."), eq(true)); } @Test @@ -273,7 +273,7 @@ public void newKSqlApplicationCreation() throws IOException { .when(aclsBuilder) .buildBindingsForKSqlApp(any(KSqlApp.class), anyString()); - verify(aclsBuilder, times(1)).buildBindingsForKSqlApp(app, "default.default"); + verify(aclsBuilder, times(1)).buildBindingsForKSqlApp(app, "default.default."); } @Test(expected = IOException.class) diff --git a/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java b/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java index 9bc7f20c4..334311e8f 100644 --- a/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java +++ b/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java @@ -288,7 +288,7 @@ public void shouldAddStreamsProjectPrefixAsInternalTopics() { var topology = TestTopologyBuilder.createProject().addKStream("foo").buildTopology(); var internals = config.getKafkaInternalTopicPrefixes(Collections.singletonList(topology)); - assertThat(internals).contains("ctx.project"); + assertThat(internals).contains("ctx.project."); assertThat(internals).contains("_"); } diff --git a/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java b/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java index 01ee8fa3a..43bc514fc 100644 --- a/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java +++ b/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java @@ -61,10 +61,10 @@ public void buildOutOfMultipleTopos() throws IOException { var projects = Arrays.asList( - "context2.source2.foo.bar.projectFoo", - "context2.source2.foo.bar.projectBar", - "context2.source2.foo.bar.projectZet", - "context2.source2.foo.bar.projectBear"); + "context2.source2.foo.bar.projectFoo.", + "context2.source2.foo.bar.projectBar.", + "context2.source2.foo.bar.projectZet.", + "context2.source2.foo.bar.projectBear."); assertThat(context.getProjects()).hasSize(4); for (Project proj : context.getProjects()) { assertThat(projects).contains(proj.namePrefix()); diff --git a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java index c813de4a5..b86c39c35 100644 --- a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java +++ b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java @@ -101,7 +101,7 @@ public void testDynamicFirstLevelAttributes() { Topology anotherTopology = parser.deserialise(TestUtils.getResourceFile("/descriptor.yaml")); Project anotherProject = anotherTopology.getProjects().get(0); - assertEquals("contextOrg.source.foo", anotherProject.namePrefix()); + assertEquals("contextOrg.source.foo.", anotherProject.namePrefix()); } @Test diff --git a/src/test/java/com/purbon/kafka/topology/actions/topics/CreateTopicActionTest.java b/src/test/java/com/purbon/kafka/topology/actions/topics/CreateTopicActionTest.java index 3d155be24..66adc7be2 100644 --- a/src/test/java/com/purbon/kafka/topology/actions/topics/CreateTopicActionTest.java +++ b/src/test/java/com/purbon/kafka/topology/actions/topics/CreateTopicActionTest.java @@ -2,8 +2,10 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.purbon.kafka.topology.TestTopologyBuilder; import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient; import com.purbon.kafka.topology.model.Topic; +import com.purbon.kafka.topology.model.Topology; import java.util.Collections; import org.junit.Rule; import org.junit.Test; @@ -19,17 +21,22 @@ public class CreateTopicActionTest { @Test public void shouldComposeDetailedViewOfProperties() { - Topic topic = new Topic("foo"); - topic.setProjectPrefix("bar"); - topic.setConfig(Collections.singletonMap("foo", "bar")); + Topic t = new Topic("foo"); + t.setConfig(Collections.singletonMap("foo", "bar")); + + TestTopologyBuilder builder = TestTopologyBuilder.createProject().addTopic(t); + + Topology topology = builder.buildTopology(); + var topic = topology.getProjects().get(0).getTopics().get(0); + var action = new CreateTopicAction(adminClient, topic, topic.toString()); var refs = action.refs(); assertThat(refs).hasSize(1); var ref = refs.get(0); assertThat(ref) .contains( - "\"resource_name\" : \"rn://create.topic/com.purbon.kafka.topology.actions.topics.CreateTopicAction/bar.foo\""); + "\"resource_name\" : \"rn://create.topic/com.purbon.kafka.topology.actions.topics.CreateTopicAction/ctx.project.foo\""); assertThat(ref).contains("\"foo\" : \"bar\""); - assertThat(ref).contains("\"topic\" : \"bar.foo\","); + assertThat(ref).contains("\"topic\" : \"ctx.project.foo\","); } } diff --git a/src/test/java/com/purbon/kafka/topology/actions/topics/DeleteTopicsActionTest.java b/src/test/java/com/purbon/kafka/topology/actions/topics/DeleteTopicsActionTest.java index 6044b672c..b2cc85722 100644 --- a/src/test/java/com/purbon/kafka/topology/actions/topics/DeleteTopicsActionTest.java +++ b/src/test/java/com/purbon/kafka/topology/actions/topics/DeleteTopicsActionTest.java @@ -2,8 +2,10 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.purbon.kafka.topology.TestTopologyBuilder; import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient; import com.purbon.kafka.topology.model.Topic; +import com.purbon.kafka.topology.model.Topology; import java.util.Collections; import org.junit.Rule; import org.junit.Test; @@ -19,9 +21,14 @@ public class DeleteTopicsActionTest { @Test public void shouldComposeDetailedViewOfProperties() { - Topic topic = new Topic("foo"); - topic.setProjectPrefix("bar"); - topic.setConfig(Collections.singletonMap("foo", "bar")); + Topic t = new Topic("foo"); + t.setConfig(Collections.singletonMap("foo", "bar")); + + TestTopologyBuilder builder = TestTopologyBuilder.createProject().addTopic(t); + + Topology topology = builder.buildTopology(); + var topic = topology.getProjects().get(0).getTopics().get(0); + var action = new DeleteTopics(adminClient, Collections.singletonList(topic.toString())); var refs = action.refs(); @@ -29,7 +36,7 @@ public void shouldComposeDetailedViewOfProperties() { var ref = refs.get(0); assertThat(ref) .contains( - "\"resource_name\" : \"rn://delete.topic/com.purbon.kafka.topology.actions.topics.DeleteTopics$1/bar.foo\""); - assertThat(ref).contains("\"topic\" : \"bar.foo\","); + "\"resource_name\" : \"rn://delete.topic/com.purbon.kafka.topology.actions.topics.DeleteTopics$1/ctx.project.foo\""); + assertThat(ref).contains("\"topic\" : \"ctx.project.foo\","); } } diff --git a/src/test/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidationTest.java b/src/test/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidationTest.java index d57fd0d48..01490b948 100644 --- a/src/test/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidationTest.java +++ b/src/test/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidationTest.java @@ -3,57 +3,54 @@ import com.purbon.kafka.topology.exceptions.ConfigurationException; import com.purbon.kafka.topology.exceptions.ValidationException; import com.purbon.kafka.topology.model.Topic; -import org.junit.Test; - import java.util.HashMap; import java.util.Map; +import org.junit.Test; public class MinInSyncReplicasValidationTest { - @Test(expected = ValidationException.class) - public void shouldCheckKoValuesSuccessfully() - throws ValidationException, ConfigurationException { - Map config = new HashMap<>(); - config.put("replication.factor", "3"); - config.put("min.insync.replicas", "3"); - - Topic topic = new Topic("topic", config); - MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); - validation.valid(topic); - } - - @Test - public void shouldCheckMinimalValuesSuccessfully() - throws ValidationException, ConfigurationException { - Map config = new HashMap<>(); - config.put("replication.factor", "3"); - config.put("min.insync.replicas", "1"); - - Topic topic = new Topic("topic", config); - MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); - validation.valid(topic); - } - - @Test - public void shouldCheckOkValuesSuccessfully() - throws ValidationException, ConfigurationException { - Map config = new HashMap<>(); - config.put("replication.factor", "3"); - config.put("min.insync.replicas", "2"); - - Topic topic = new Topic("topic", config); - MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); - validation.valid(topic); - } - - @Test - public void shouldCheckMissingMinInSyncValuesSuccessfully() - throws ValidationException, ConfigurationException { - Map config = new HashMap<>(); - config.put("replication.factor", "3"); - - Topic topic = new Topic("topic", config); - MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); - validation.valid(topic); - } + @Test(expected = ValidationException.class) + public void shouldCheckKoValuesSuccessfully() throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "3"); + config.put("min.insync.replicas", "3"); + + Topic topic = new Topic("topic", config); + MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); + validation.valid(topic); + } + + @Test + public void shouldCheckMinimalValuesSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "3"); + config.put("min.insync.replicas", "1"); + + Topic topic = new Topic("topic", config); + MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); + validation.valid(topic); + } + + @Test + public void shouldCheckOkValuesSuccessfully() throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "3"); + config.put("min.insync.replicas", "2"); + + Topic topic = new Topic("topic", config); + MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); + validation.valid(topic); + } + + @Test + public void shouldCheckMissingMinInSyncValuesSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "3"); + + Topic topic = new Topic("topic", config); + MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); + validation.valid(topic); + } }