Skip to content

Commit

Permalink
fix: Kafka as a Backend for JulieOps resiliency. Ensure state is not …
Browse files Browse the repository at this point in the history
…corrupted. (#396)

* strengh the kafka backend resilence model

* add example props for kafka as backend

* add call to create julieops internal topic

* update simplified topology file

* add kafka config topic as defult internal topic if using kafka backend

* Revert "add kafka config topic as defult internal topic if using kafka backend"

This reverts commit 2f84410.

* ammend tests
  • Loading branch information
purbon committed Nov 13, 2021
1 parent 466688c commit 93f50d5
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 22 deletions.
2 changes: 2 additions & 0 deletions docker/sasl/up
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ docker-compose up -d --build

docker-compose exec kafka kafka-acls --authorizer-properties zookeeper.connect=zookeeper:2181 --add --cluster --operation=All --allow-principal=User:kafka

docker exec kafka kafka-topics --bootstrap-server kafka:29092 \
--create --topic julieops --partitions 1 --replication-factor 1
2 changes: 2 additions & 0 deletions example/descriptor-only-topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
context: "oltp"
projects:
- name: "foo"
consumers:
- principal: "User:App0"
topics:
- name: "foo"
config:
Expand Down
8 changes: 7 additions & 1 deletion example/topology-builder-sasl-plain.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,10 @@ sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule require
platform.servers.connect.0=connect:http://localhost:18083
platform.servers.basic.auth.0=connect@user:pass

topology.state.cluster.enabled=false
topology.state.cluster.enabled=false
topology.state.topics.cluster.enabled=false

#julie.instance.id=julieops
#julie.kafka.config.topic=julieops
#topology.builder.state.processor.class=com.purbon.kafka.topology.backend.KafkaBackend
#kafka.internal.topic.prefixes.0=julieops
18 changes: 10 additions & 8 deletions src/main/java/com/purbon/kafka/topology/ExecutionPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,16 @@ public void run(boolean dryRun) throws IOException {
}
}

backendController.reset();
backendController.addBindings(new ArrayList<>(bindings));
backendController.addServiceAccounts(serviceAccounts);
backendController.addTopics(topics);
backendController.addConnectors(connectors);
backendController.addKSqlStreams(ksqlStreams);
backendController.addKSqlTables(ksqlTables);
backendController.flushAndClose();
if (!dryRun) {
backendController.reset();
backendController.addBindings(new ArrayList<>(bindings));
backendController.addServiceAccounts(serviceAccounts);
backendController.addTopics(topics);
backendController.addConnectors(connectors);
backendController.addKSqlStreams(ksqlStreams);
backendController.addKSqlTables(ksqlTables);
backendController.flushAndClose();
}
}

private void execute(Action action, boolean dryRun) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,11 @@ public void createTopic(Topic topic, String fullTopicName) throws IOException {
.configs(topic.getRawConfig());
try {
createAllTopics(Collections.singleton(newTopic));
} catch (TopicExistsException ex) {
LOGGER.info(ex);
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof TopicExistsException) {
LOGGER.info(e.getMessage());
return;
}
LOGGER.error(e);
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.purbon.kafka.topology.backend.kafka.KafkaBackendProducer;
import com.purbon.kafka.topology.backend.kafka.RecordReceivedCallback;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -22,11 +23,13 @@ public class KafkaBackend implements Backend, RecordReceivedCallback {
private KafkaBackendProducer producer;

private AtomicReference<BackendState> latest;
private AtomicBoolean shouldWaitForLoad;
private String instanceId;
private Thread thread;

public KafkaBackend() {
isCompleted = false;
shouldWaitForLoad = new AtomicBoolean(true);
}

private static class JulieKafkaConsumerThread implements Runnable {
Expand All @@ -53,7 +56,7 @@ public void run() {
public void configure(Configuration config) {
instanceId = config.getJulieInstanceId();
latest = new AtomicReference<>(new BackendState());

shouldWaitForLoad.set(true);
consumer = new KafkaBackendConsumer(config);
consumer.configure();

Expand Down Expand Up @@ -88,11 +91,19 @@ public void save(BackendState state) throws IOException {
producer.save(state);
}

@SneakyThrows
@Override
public BackendState load() throws IOException {
while (shouldWaitForLoad.get()) {
continue;
}
return latest == null ? new BackendState() : latest.get();
}

public void initialLoadFinish() {
shouldWaitForLoad.set(false);
}

@Override
public void close() {
consumer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;

public class KafkaBackendConsumer {
Expand All @@ -36,21 +37,24 @@ public void configure() {
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass());
var serde = new JsonDeserializer<>(BackendState.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serde.getClass());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

consumerProperties.put(GROUP_ID_CONFIG, config.getKafkaBackendConsumerGroupId());
consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Collections.singletonList(config.getJulieKafkaConfigTopic()));

var topicPartition = new TopicPartition(config.getJulieKafkaConfigTopic(), 0);
var topicPartitions = Collections.singletonList(topicPartition);
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);
}

public void retrieve(KafkaBackend callback) {

while (running.get()) {
ConsumerRecords<String, BackendState> records = consumer.poll(Duration.ofSeconds(1));
ConsumerRecords<String, BackendState> records = consumer.poll(Duration.ofSeconds(10));
callback.complete();
for (ConsumerRecord<String, BackendState> record : records) {
callback.apply(record);
}
if (records.count() > 0) callback.initialLoadFinish();
consumer.commitAsync();
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/test/java/com/purbon/kafka/topology/JulieOpsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static com.purbon.kafka.topology.Constants.*;
import static org.mockito.Mockito.*;

import com.purbon.kafka.topology.BackendController.Mode;
import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient;
import com.purbon.kafka.topology.backend.BackendState;
import com.purbon.kafka.topology.backend.RedisBackend;
Expand Down Expand Up @@ -188,7 +187,6 @@ public void builderRunTestAsFromCLIWithARedisBackend() throws Exception {
builder.close();

verify(stateProcessor, times(1)).createOrOpen();
verify(stateProcessor, times(1)).createOrOpen(Mode.TRUNCATE);
verify(topicManager, times(1)).apply(any(Map.class), any(ExecutionPlan.class));
verify(accessControlManager, times(1)).apply(any(Map.class), any(ExecutionPlan.class));
}
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/com/purbon/kafka/topology/TopicManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,13 @@ public void topicDeleteWithConfiguredInternalTopicsTest() throws IOException {
public void topicDeleteWithConfiguredNoDelete() throws IOException {

Properties props = new Properties();
props.put(KAFKA_INTERNAL_TOPIC_PREFIXES, Arrays.asList("foo.", "_"));
props.put(KAFKA_INTERNAL_TOPIC_PREFIXES + ".0", "foo.");
props.put(KAFKA_INTERNAL_TOPIC_PREFIXES + ".1", "_");

HashMap<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");

Configuration config = new Configuration(cliOps, props);

TopicManager topicManager = new TopicManager(adminClient, schemaRegistryManager, config);

// Topology after delete action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -83,7 +82,7 @@ public void testExpectedFlow() throws IOException, InterruptedException {

HashMap<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, container.getBootstrapServers());
props.put(JULIE_KAFKA_CONSUMER_GROUP_ID, "julieops"+System.currentTimeMillis());
props.put(JULIE_KAFKA_CONSUMER_GROUP_ID, "julieops" + System.currentTimeMillis());
Configuration config = new Configuration(cliOps, props);
newBackend.configure(config);

Expand Down

0 comments on commit 93f50d5

Please sign in to comment.