diff --git a/docker/sasl/up b/docker/sasl/up index c97a251e2..03d1b5344 100755 --- a/docker/sasl/up +++ b/docker/sasl/up @@ -10,3 +10,5 @@ docker-compose up -d --build docker-compose exec kafka kafka-acls --authorizer-properties zookeeper.connect=zookeeper:2181 --add --cluster --operation=All --allow-principal=User:kafka +docker exec kafka kafka-topics --bootstrap-server kafka:29092 \ + --create --topic julieops --partitions 1 --replication-factor 1 \ No newline at end of file diff --git a/example/descriptor-only-topics.yaml b/example/descriptor-only-topics.yaml index 57e3cb509..cd46f50b1 100644 --- a/example/descriptor-only-topics.yaml +++ b/example/descriptor-only-topics.yaml @@ -2,6 +2,8 @@ context: "oltp" projects: - name: "foo" + consumers: + - principal: "User:App0" topics: - name: "foo" config: diff --git a/example/topology-builder-sasl-plain.properties b/example/topology-builder-sasl-plain.properties index 78b4a6ef7..6a173e1ae 100644 --- a/example/topology-builder-sasl-plain.properties +++ b/example/topology-builder-sasl-plain.properties @@ -9,4 +9,10 @@ sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule require platform.servers.connect.0=connect:http://localhost:18083 platform.servers.basic.auth.0=connect@user:pass -topology.state.cluster.enabled=false \ No newline at end of file +topology.state.cluster.enabled=false +topology.state.topics.cluster.enabled=false + +#julie.instance.id=julieops +#julie.kafka.config.topic=julieops +#topology.builder.state.processor.class=com.purbon.kafka.topology.backend.KafkaBackend +#kafka.internal.topic.prefixes.0=julieops \ No newline at end of file diff --git a/src/main/java/com/purbon/kafka/topology/ExecutionPlan.java b/src/main/java/com/purbon/kafka/topology/ExecutionPlan.java index 9c8086462..339d3fadf 100644 --- a/src/main/java/com/purbon/kafka/topology/ExecutionPlan.java +++ b/src/main/java/com/purbon/kafka/topology/ExecutionPlan.java @@ -94,14 +94,16 @@ public void run(boolean dryRun) throws IOException { } } - backendController.reset(); - backendController.addBindings(new ArrayList<>(bindings)); - backendController.addServiceAccounts(serviceAccounts); - backendController.addTopics(topics); - backendController.addConnectors(connectors); - backendController.addKSqlStreams(ksqlStreams); - backendController.addKSqlTables(ksqlTables); - backendController.flushAndClose(); + if (!dryRun) { + backendController.reset(); + backendController.addBindings(new ArrayList<>(bindings)); + backendController.addServiceAccounts(serviceAccounts); + backendController.addTopics(topics); + backendController.addConnectors(connectors); + backendController.addKSqlStreams(ksqlStreams); + backendController.addKSqlTables(ksqlTables); + backendController.flushAndClose(); + } } private void execute(Action action, boolean dryRun) throws IOException { diff --git a/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClient.java b/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClient.java index 7e2d6ac06..7e99333ae 100644 --- a/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClient.java +++ b/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClient.java @@ -189,9 +189,11 @@ public void createTopic(Topic topic, String fullTopicName) throws IOException { .configs(topic.getRawConfig()); try { createAllTopics(Collections.singleton(newTopic)); - } catch (TopicExistsException ex) { - LOGGER.info(ex); } catch (ExecutionException | InterruptedException e) { + if (e.getCause() instanceof TopicExistsException) { + LOGGER.info(e.getMessage()); + return; + } LOGGER.error(e); throw new IOException(e); } diff --git a/src/main/java/com/purbon/kafka/topology/backend/KafkaBackend.java b/src/main/java/com/purbon/kafka/topology/backend/KafkaBackend.java index c6c4736bf..2c3cb26b6 100644 --- a/src/main/java/com/purbon/kafka/topology/backend/KafkaBackend.java +++ b/src/main/java/com/purbon/kafka/topology/backend/KafkaBackend.java @@ -5,6 +5,7 @@ import com.purbon.kafka.topology.backend.kafka.KafkaBackendProducer; import com.purbon.kafka.topology.backend.kafka.RecordReceivedCallback; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -22,11 +23,13 @@ public class KafkaBackend implements Backend, RecordReceivedCallback { private KafkaBackendProducer producer; private AtomicReference latest; + private AtomicBoolean shouldWaitForLoad; private String instanceId; private Thread thread; public KafkaBackend() { isCompleted = false; + shouldWaitForLoad = new AtomicBoolean(true); } private static class JulieKafkaConsumerThread implements Runnable { @@ -53,7 +56,7 @@ public void run() { public void configure(Configuration config) { instanceId = config.getJulieInstanceId(); latest = new AtomicReference<>(new BackendState()); - + shouldWaitForLoad.set(true); consumer = new KafkaBackendConsumer(config); consumer.configure(); @@ -88,11 +91,19 @@ public void save(BackendState state) throws IOException { producer.save(state); } + @SneakyThrows @Override public BackendState load() throws IOException { + while (shouldWaitForLoad.get()) { + continue; + } return latest == null ? new BackendState() : latest.get(); } + public void initialLoadFinish() { + shouldWaitForLoad.set(false); + } + @Override public void close() { consumer.stop(); diff --git a/src/main/java/com/purbon/kafka/topology/backend/kafka/KafkaBackendConsumer.java b/src/main/java/com/purbon/kafka/topology/backend/kafka/KafkaBackendConsumer.java index 559a39564..97b33a2b4 100644 --- a/src/main/java/com/purbon/kafka/topology/backend/kafka/KafkaBackendConsumer.java +++ b/src/main/java/com/purbon/kafka/topology/backend/kafka/KafkaBackendConsumer.java @@ -16,6 +16,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; public class KafkaBackendConsumer { @@ -36,21 +37,24 @@ public void configure() { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass()); var serde = new JsonDeserializer<>(BackendState.class); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serde.getClass()); - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(GROUP_ID_CONFIG, config.getKafkaBackendConsumerGroupId()); consumer = new KafkaConsumer<>(consumerProperties); - consumer.subscribe(Collections.singletonList(config.getJulieKafkaConfigTopic())); + + var topicPartition = new TopicPartition(config.getJulieKafkaConfigTopic(), 0); + var topicPartitions = Collections.singletonList(topicPartition); + consumer.assign(topicPartitions); + consumer.seekToBeginning(topicPartitions); } public void retrieve(KafkaBackend callback) { - while (running.get()) { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); callback.complete(); for (ConsumerRecord record : records) { callback.apply(record); } + if (records.count() > 0) callback.initialLoadFinish(); consumer.commitAsync(); } } diff --git a/src/test/java/com/purbon/kafka/topology/JulieOpsTest.java b/src/test/java/com/purbon/kafka/topology/JulieOpsTest.java index 43cb42aa0..29084ed8c 100644 --- a/src/test/java/com/purbon/kafka/topology/JulieOpsTest.java +++ b/src/test/java/com/purbon/kafka/topology/JulieOpsTest.java @@ -4,7 +4,6 @@ import static com.purbon.kafka.topology.Constants.*; import static org.mockito.Mockito.*; -import com.purbon.kafka.topology.BackendController.Mode; import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient; import com.purbon.kafka.topology.backend.BackendState; import com.purbon.kafka.topology.backend.RedisBackend; @@ -188,7 +187,6 @@ public void builderRunTestAsFromCLIWithARedisBackend() throws Exception { builder.close(); verify(stateProcessor, times(1)).createOrOpen(); - verify(stateProcessor, times(1)).createOrOpen(Mode.TRUNCATE); verify(topicManager, times(1)).apply(any(Map.class), any(ExecutionPlan.class)); verify(accessControlManager, times(1)).apply(any(Map.class), any(ExecutionPlan.class)); } diff --git a/src/test/java/com/purbon/kafka/topology/TopicManagerTest.java b/src/test/java/com/purbon/kafka/topology/TopicManagerTest.java index 73d48b00f..3c26188b4 100644 --- a/src/test/java/com/purbon/kafka/topology/TopicManagerTest.java +++ b/src/test/java/com/purbon/kafka/topology/TopicManagerTest.java @@ -214,13 +214,13 @@ public void topicDeleteWithConfiguredInternalTopicsTest() throws IOException { public void topicDeleteWithConfiguredNoDelete() throws IOException { Properties props = new Properties(); - props.put(KAFKA_INTERNAL_TOPIC_PREFIXES, Arrays.asList("foo.", "_")); + props.put(KAFKA_INTERNAL_TOPIC_PREFIXES + ".0", "foo."); + props.put(KAFKA_INTERNAL_TOPIC_PREFIXES + ".1", "_"); HashMap cliOps = new HashMap<>(); cliOps.put(BROKERS_OPTION, ""); Configuration config = new Configuration(cliOps, props); - TopicManager topicManager = new TopicManager(adminClient, schemaRegistryManager, config); // Topology after delete action diff --git a/src/test/java/com/purbon/kafka/topology/integration/backend/KafkaBackendIT.java b/src/test/java/com/purbon/kafka/topology/integration/backend/KafkaBackendIT.java index dc7c0c911..2f3bff6e0 100644 --- a/src/test/java/com/purbon/kafka/topology/integration/backend/KafkaBackendIT.java +++ b/src/test/java/com/purbon/kafka/topology/integration/backend/KafkaBackendIT.java @@ -19,7 +19,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.resource.ResourceType; import org.junit.After; import org.junit.Before; @@ -83,7 +82,7 @@ public void testExpectedFlow() throws IOException, InterruptedException { HashMap cliOps = new HashMap<>(); cliOps.put(BROKERS_OPTION, container.getBootstrapServers()); - props.put(JULIE_KAFKA_CONSUMER_GROUP_ID, "julieops"+System.currentTimeMillis()); + props.put(JULIE_KAFKA_CONSUMER_GROUP_ID, "julieops" + System.currentTimeMillis()); Configuration config = new Configuration(cliOps, props); newBackend.configure(config);