From cb4fd69b192cd57f41592cead95fc488c6b22d86 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jul 2024 10:29:39 +0200 Subject: [PATCH 01/15] Remove schema registry support from core module --- .../templates/job.yaml | 2 +- charts/producer-app/templates/pod.yaml | 2 +- .../templates/job.yaml | 2 +- charts/streams-app/templates/deployment.yaml | 2 +- settings.gradle | 1 + .../com/bakdata/kafka/KafkaApplication.java | 11 +- .../bakdata/kafka/StringListConverter.java | 10 +- .../kafka/integration/RunProducerAppTest.java | 3 +- streams-bootstrap-core/build.gradle.kts | 5 +- .../bakdata/kafka/KafkaEndpointConfig.java | 10 +- .../bakdata/kafka/ProducerCleanUpRunner.java | 4 +- .../bakdata/kafka/ProducerTopicConfig.java | 8 +- .../bakdata/kafka/StreamsCleanUpRunner.java | 23 +-- .../kafka/StreamsExecutionOptions.java | 2 - .../com/bakdata/kafka/StreamsTopicConfig.java | 13 +- .../kafka/util/ImprovedAdminClient.java | 47 +---- .../bakdata/kafka/util/SchemaTopicClient.java | 147 -------------- .../com/bakdata/kafka/util/TopicClient.java | 5 +- .../kafka/ConfiguredProducerAppTest.java | 6 +- .../kafka/ConfiguredStreamsAppTest.java | 7 +- .../bakdata/kafka/integration/KafkaTest.java | 5 +- .../test_applications/AvroKeyProducer.java | 4 +- .../test_applications/AvroValueProducer.java | 4 +- .../ComplexTopologyApplication.java | 4 +- .../test_applications/MirrorKeyWithAvro.java | 4 +- .../MirrorValueWithAvro.java | 4 +- .../MirrorWithNonDefaultSerde.java | 4 +- .../kafka/util/SchemaTopicClientTest.java | 189 ------------------ .../build.gradle.kts | 9 + .../lombok.config | 3 + .../SchemaRegistryKafkaApplicationUtils.java | 120 +++++++++++ .../kafka/SchemaRegistryProducerApp.java | 52 +++++ .../kafka/SchemaRegistryStreamsApp.java | 52 +++++ .../StreamsBootstrapTopologyFactory.java | 5 +- 34 files changed, 315 insertions(+), 454 deletions(-) delete mode 100644 streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/SchemaTopicClient.java delete mode 100644 streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java create mode 100644 streams-bootstrap-schema-registry/build.gradle.kts create mode 100644 streams-bootstrap-schema-registry/lombok.config create mode 100644 streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java create mode 100644 streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryProducerApp.java create mode 100644 streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryStreamsApp.java diff --git a/charts/producer-app-cleanup-job/templates/job.yaml b/charts/producer-app-cleanup-job/templates/job.yaml index a6a94422..af42098f 100644 --- a/charts/producer-app-cleanup-job/templates/job.yaml +++ b/charts/producer-app-cleanup-job/templates/job.yaml @@ -69,7 +69,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" + - name: "KAFKA_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if hasKey .Values.kafka "outputTopic" }} diff --git a/charts/producer-app/templates/pod.yaml b/charts/producer-app/templates/pod.yaml index ba063e0b..92f4eb8e 100644 --- a/charts/producer-app/templates/pod.yaml +++ b/charts/producer-app/templates/pod.yaml @@ -53,7 +53,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" + - name: "KAFKA_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if hasKey .Values.kafka "outputTopic" }} diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index 458e58b5..47208534 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -73,7 +73,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" + - name: "KAFKA_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }} diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index 3da02578..7c6ba46b 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -120,7 +120,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" + - name: "KAFKA_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }} diff --git a/settings.gradle b/settings.gradle index e9057df9..c196e869 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,5 +10,6 @@ include( ":streams-bootstrap-core", ":streams-bootstrap-test", ":streams-bootstrap-large-messages", + ":streams-bootstrap-schema-registry", ":streams-bootstrap-cli", ) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 0a00d3c1..21699146 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -26,7 +26,6 @@ import static java.util.Collections.emptyMap; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -54,7 +53,6 @@ *
  • {@link #outputTopic}
  • *
  • {@link #extraOutputTopics}
  • *
  • {@link #brokers}
  • - *
  • {@link #schemaRegistryUrl}
  • *
  • {@link #kafkaConfig}
  • * * To implement your Kafka application inherit from this class and add your custom options. Run it by calling @@ -89,8 +87,6 @@ public abstract class KafkaApplication extraOutputTopics = emptyMap(); @CommandLine.Option(names = "--brokers", required = true, description = "Broker addresses to connect to") private String brokers; - @CommandLine.Option(names = "--schema-registry-url", description = "URL of Schema Registry") - private String schemaRegistryUrl; @CommandLine.Option(names = "--kafka-config", split = ",", description = "Additional Kafka properties") private Map kafkaConfig = emptyMap(); @@ -125,8 +121,10 @@ public static int startApplicationWithoutExit(final KafkaApplication environmentArguments = new EnvironmentArgumentsParser(ENV_PREFIX) .parseVariables(System.getenv()); final Collection allArgs = new ArrayList<>(environmentArguments); @@ -204,7 +202,6 @@ public void run() { public KafkaEndpointConfig getEndpointConfig() { return KafkaEndpointConfig.builder() .brokers(this.brokers) - .schemaRegistryUrl(this.schemaRegistryUrl) .build(); } diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/StringListConverter.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/StringListConverter.java index a31657bc..7ca0fa4b 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/StringListConverter.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/StringListConverter.java @@ -24,18 +24,22 @@ package com.bakdata.kafka; -import com.google.common.base.Splitter; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import picocli.CommandLine.ITypeConverter; /** * Converter for lists inside collection type parsed by PicoCLI. List members need to be separated by {@code ;} */ public class StringListConverter implements ITypeConverter> { - private static final Splitter SPLITTER = Splitter.on(";").omitEmptyStrings().trimResults(); @Override public List convert(final String value) { - return SPLITTER.splitToList(value); + final String[] split = value.split(";"); + return Arrays.stream(split) + .map(String::trim) + .filter(String::isEmpty) + .collect(Collectors.toList()); } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index 67232632..571ba91b 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -91,9 +91,10 @@ public SerializerConfig defaultSerializationConfig() { } })) { app.setBrokers(this.kafkaCluster.getBrokerList()); - app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()); app.setOutputTopic(output); app.setKafkaConfig(Map.of( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + this.schemaRegistryMockExtension.getUrl(), ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" )); app.run(); diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index 27675109..75aca556 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -10,9 +10,6 @@ dependencies { api(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion) api(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion) - val confluentVersion: String by project - implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion) - api(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion) api( group = "org.slf4j", name = "slf4j-api", @@ -33,6 +30,7 @@ dependencies { val fluentKafkaVersion: String by project testImplementation(project(":streams-bootstrap-test")) + testImplementation(project(":streams-bootstrap-schema-registry")) testImplementation( group = "com.bakdata.fluent-kafka-streams-tests", name = "schema-registry-mock-junit5", @@ -42,7 +40,6 @@ dependencies { testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) { exclude(group = "org.slf4j", module = "slf4j-log4j12") } - testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) val log4jVersion: String by project testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java index b9552ac4..7736ca2c 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -38,7 +37,8 @@ @Builder public class KafkaEndpointConfig { private final @NonNull String brokers; - private final String schemaRegistryUrl; + @Builder.Default + private final Map properties = new HashMap<>(); /** * Create Kafka properties to connect to infrastructure. @@ -50,11 +50,9 @@ public class KafkaEndpointConfig { * @return properties used for connecting to Kafka */ public Map createKafkaProperties() { - final Map kafkaConfig = new HashMap<>(); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.putAll(this.properties); kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokers); - if (this.schemaRegistryUrl != null) { - kafkaConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl); - } return Collections.unmodifiableMap(kafkaConfig); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java index bfddf6d1..c891e68f 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java @@ -100,8 +100,8 @@ private void deleteTopics() { } private void deleteTopic(final String topic) { - this.adminClient.getSchemaTopicClient() - .deleteTopicAndResetSchemaRegistry(topic); + this.adminClient.getTopicClient() + .deleteTopic(topic); ProducerCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerTopicConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerTopicConfig.java index 80d84c5a..78efc742 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerTopicConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerTopicConfig.java @@ -26,7 +26,6 @@ import static java.util.Collections.emptyMap; -import com.google.common.base.Preconditions; import java.util.Map; import lombok.Builder; import lombok.EqualsAndHashCode; @@ -46,7 +45,8 @@ public class ProducerTopicConfig { * Extra output topics that are identified by a role */ @Builder.Default - @NonNull Map extraOutputTopics = emptyMap(); + @NonNull + Map extraOutputTopics = emptyMap(); /** * Get extra output topic for a specified role @@ -56,7 +56,9 @@ public class ProducerTopicConfig { */ public String getOutputTopic(final String role) { final String topic = this.extraOutputTopics.get(role); - Preconditions.checkNotNull(topic, "No output topic for role '%s' available", role); + if (topic == null) { + throw new IllegalArgumentException(String.format("No output topic for role '%s' available", role)); + } return topic; } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java index d1a47beb..93de95b2 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java @@ -27,11 +27,11 @@ import com.bakdata.kafka.util.ConsumerGroupClient; import com.bakdata.kafka.util.ImprovedAdminClient; import com.bakdata.kafka.util.TopologyInformation; -import com.google.common.collect.ImmutableList; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -100,19 +100,20 @@ public static void runResetter(final Collection inputTopics, final Colle // StreamsResetter's internal AdminClient can only be configured with a properties file final String appId = streamsAppConfig.getAppId(); final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getKafkaProperties()); - final ImmutableList.Builder argList = ImmutableList.builder() - .add("--application-id", appId) - .add("--bootstrap-server", String.join(",", streamsAppConfig.getBoostrapServers())) - .add("--config-file", tempFile.toString()); + final Collection argList = new ArrayList<>(List.of( + "--application-id", appId, + "--bootstrap-server", String.join(",", streamsAppConfig.getBoostrapServers()), + "--config-file", tempFile.toString() + )); final Collection existingInputTopics = filterExistingTopics(inputTopics, allTopics); if (!existingInputTopics.isEmpty()) { - argList.add("--input-topics", String.join(",", existingInputTopics)); + argList.addAll(List.of("--input-topics", String.join(",", existingInputTopics))); } final Collection existingIntermediateTopics = filterExistingTopics(intermediateTopics, allTopics); if (!existingIntermediateTopics.isEmpty()) { - argList.add("--intermediate-topics", String.join(",", existingIntermediateTopics)); + argList.addAll(List.of("--intermediate-topics", String.join(",", existingIntermediateTopics))); } - final String[] args = argList.build().toArray(String[]::new); + final String[] args = argList.toArray(String[]::new); final StreamsResetter resetter = new StreamsResetter(); final int returnCode = resetter.execute(args); try { @@ -236,14 +237,12 @@ private void deleteTopics() { } private void resetInternalTopic(final String topic) { - this.adminClient.getSchemaTopicClient() - .resetSchemaRegistry(topic); StreamsCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic); } private void deleteTopic(final String topic) { - this.adminClient.getSchemaTopicClient() - .deleteTopicAndResetSchemaRegistry(topic); + this.adminClient.getTopicClient() + .deleteTopic(topic); StreamsCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java index 92e482d9..1c35ceba 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import com.google.common.annotations.VisibleForTesting; import java.time.Duration; import java.util.Map; import java.util.function.Consumer; @@ -80,7 +79,6 @@ CloseOptions createCloseOptions(final StreamsConfig config) { return new CloseOptions().leaveGroup(leaveGroup).timeout(this.closeTimeout); } - @VisibleForTesting boolean shouldLeaveGroup(final Map originals) { final boolean staticMembershipDisabled = isStaticMembershipDisabled(originals); return staticMembershipDisabled || this.volatileGroupInstanceId; diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsTopicConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsTopicConfig.java index 4aac95fa..b48b2972 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsTopicConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsTopicConfig.java @@ -27,7 +27,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -import com.google.common.base.Preconditions; import java.util.List; import java.util.Map; import java.util.regex.Pattern; @@ -73,7 +72,9 @@ public class StreamsTopicConfig { */ public List getInputTopics(final String role) { final List topics = this.extraInputTopics.get(role); - Preconditions.checkNotNull(topics, "No input topics for role '%s' available", role); + if (topics == null) { + throw new IllegalArgumentException(String.format("No input topics for role '%s' available", role)); + } return topics; } @@ -85,7 +86,9 @@ public List getInputTopics(final String role) { */ public Pattern getInputPattern(final String role) { final Pattern pattern = this.extraInputPatterns.get(role); - Preconditions.checkNotNull(pattern, "No input pattern for role '%s' available", role); + if (pattern == null) { + throw new IllegalArgumentException(String.format("No input pattern for role '%s' available", role)); + } return pattern; } @@ -97,7 +100,9 @@ public Pattern getInputPattern(final String role) { */ public String getOutputTopic(final String role) { final String topic = this.extraOutputTopics.get(role); - Preconditions.checkNotNull(topic, "No output topic for role '%s' available", role); + if (topic == null) { + throw new IllegalArgumentException(String.format("No output topic for role '%s' available", role)); + } return topic; } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ImprovedAdminClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ImprovedAdminClient.java index 167d48b2..0ffe3186 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ImprovedAdminClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ImprovedAdminClient.java @@ -24,17 +24,9 @@ package com.bakdata.kafka.util; -import static com.bakdata.kafka.util.SchemaTopicClient.createSchemaRegistryClient; - -import com.google.common.base.Preconditions; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.io.Closeable; -import java.io.IOException; -import java.io.UncheckedIOException; import java.time.Duration; import java.util.Map; -import java.util.Optional; import lombok.AccessLevel; import lombok.Builder; import lombok.NonNull; @@ -52,7 +44,6 @@ public final class ImprovedAdminClient implements Closeable { private static final Duration ADMIN_TIMEOUT = Duration.ofSeconds(10L); private final @NonNull Admin adminClient; - private final SchemaRegistryClient schemaRegistryClient; private final @NonNull Duration timeout; /** @@ -72,16 +63,13 @@ public static ImprovedAdminClient create(@NonNull final Map prop */ public static ImprovedAdminClient create(@NonNull final Map properties, @NonNull final Duration timeout) { - Preconditions.checkNotNull(properties.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), - "%s must be specified in properties", AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG); + if (!properties.containsKey(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)) { + throw new IllegalArgumentException( + String.format("%s must be specified in properties", AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); + } final Admin adminClient = AdminClient.create(properties); - final String schemaRegistryUrl = - (String) properties.get(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); - final SchemaRegistryClient schemaRegistryClient = - schemaRegistryUrl == null ? null : createSchemaRegistryClient(properties, schemaRegistryUrl); return builder() .adminClient(adminClient) - .schemaRegistryClient(schemaRegistryClient) .timeout(timeout) .build(); } @@ -90,15 +78,6 @@ public Admin getAdminClient() { return new PooledAdmin(this.adminClient); } - public Optional getSchemaRegistryClient() { - return Optional.ofNullable(this.schemaRegistryClient) - .map(PooledSchemaRegistryClient::new); - } - - public SchemaTopicClient getSchemaTopicClient() { - return new SchemaTopicClient(this.getTopicClient(), this.getSchemaRegistryClient().orElse(null)); - } - public TopicClient getTopicClient() { return new TopicClient(this.getAdminClient(), this.timeout); } @@ -110,13 +89,6 @@ public ConsumerGroupClient getConsumerGroupClient() { @Override public void close() { this.adminClient.close(); - if (this.schemaRegistryClient != null) { - try { - this.schemaRegistryClient.close(); - } catch (final IOException e) { - throw new UncheckedIOException("Error closing schema registry client", e); - } - } } @RequiredArgsConstructor @@ -134,15 +106,4 @@ public void close() { // do nothing } } - - @RequiredArgsConstructor - private static class PooledSchemaRegistryClient implements SchemaRegistryClient { - @Delegate(excludes = Closeable.class) - private final @NonNull SchemaRegistryClient schemaRegistryClient; - - @Override - public void close() { - // do nothing - } - } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/SchemaTopicClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/SchemaTopicClient.java deleted file mode 100644 index f7474442..00000000 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/SchemaTopicClient.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka.util; - -import com.bakdata.kafka.CleanUpException; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import java.io.Closeable; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.AdminClient; - -/** - * Client to interact with Kafka topics and its associated schema registry subjects in a unified way - */ -@Slf4j -@RequiredArgsConstructor -public final class SchemaTopicClient implements Closeable { - private static final int CACHE_CAPACITY = 100; - private final @NonNull TopicClient topicClient; - private final SchemaRegistryClient schemaRegistryClient; - - /** - * Creates a new {@code SchemaTopicClient} using the specified configuration. - * - * @param configs properties passed to {@link AdminClient#create(Map)} - * @param schemaRegistryUrl URL of schema registry - * @param timeout timeout for waiting for Kafka admin calls - * @return {@code SchemaTopicClient} - */ - public static SchemaTopicClient create(final Map configs, final String schemaRegistryUrl, - final Duration timeout) { - final SchemaRegistryClient schemaRegistryClient = - createSchemaRegistryClient(configs, schemaRegistryUrl); - final TopicClient topicClient = TopicClient.create(configs, timeout); - return new SchemaTopicClient(topicClient, schemaRegistryClient); - } - - /** - * Creates a new {@code SchemaTopicClient} with no {@link SchemaRegistryClient} using the specified configuration. - * - * @param configs properties passed to {@link AdminClient#create(Map)} - * @param timeout timeout for waiting for Kafka admin calls - * @return {@code SchemaTopicClient} - */ - public static SchemaTopicClient create(final Map configs, final Duration timeout) { - final TopicClient topicClient = TopicClient.create(configs, timeout); - return new SchemaTopicClient(topicClient, null); - } - - /** - * Creates a new {@link SchemaRegistryClient} using the specified configuration. - * - * @param configs properties passed to - * {@link SchemaRegistryClientFactory#newClient(List, int, List, Map, Map)} - * @param schemaRegistryUrl URL of schema registry - * @return {@link SchemaRegistryClient} - */ - public static SchemaRegistryClient createSchemaRegistryClient(@NonNull final Map configs, - @NonNull final String schemaRegistryUrl) { - return SchemaRegistryClientFactory.newClient(List.of(schemaRegistryUrl), CACHE_CAPACITY, null, configs, null); - } - - /** - * Delete a topic if it exists and reset the corresponding Schema Registry subjects. - * - * @param topic the topic name - */ - public void deleteTopicAndResetSchemaRegistry(final String topic) { - this.topicClient.deleteTopicIfExists(topic); - this.resetSchemaRegistry(topic); - } - - /** - * Delete key and value schemas associated with a topic from the schema registry. - * - * @param topic the topic name - */ - public void resetSchemaRegistry(final String topic) { - if (this.schemaRegistryClient == null) { - log.debug("No Schema Registry URL set. Skipping schema deletion for topic {}.", topic); - return; - } - log.info("Resetting Schema Registry for topic '{}'", topic); - try { - final Collection allSubjects = this.schemaRegistryClient.getAllSubjects(); - final String keySubject = topic + "-key"; - if (allSubjects.contains(keySubject)) { - this.schemaRegistryClient.deleteSubject(keySubject); - log.info("Cleaned key schema of topic {}", topic); - } else { - log.info("No key schema for topic {} available", topic); - } - final String valueSubject = topic + "-value"; - if (allSubjects.contains(valueSubject)) { - this.schemaRegistryClient.deleteSubject(valueSubject); - log.info("Cleaned value schema of topic {}", topic); - } else { - log.info("No value schema for topic {} available", topic); - } - } catch (final IOException | RestClientException e) { - throw new CleanUpException("Could not reset schema registry for topic " + topic, e); - } - } - - @Override - public void close() { - this.topicClient.close(); - if (this.schemaRegistryClient != null) { - try { - this.schemaRegistryClient.close(); - } catch (final IOException e) { - throw new UncheckedIOException("Error closing schema registry client", e); - } - } - } -} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java index 3173b337..378bf534 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java @@ -24,7 +24,6 @@ package com.bakdata.kafka.util; -import com.google.common.base.Verify; import java.io.Closeable; import java.time.Duration; import java.util.Collection; @@ -121,7 +120,9 @@ public void deleteTopic(final String topicName) { } catch (final ExecutionException | TimeoutException ex) { throw failedToDeleteTopic(topicName, ex); } - Verify.verify(!this.exists(topicName), "Deletion of topic %s failed", topicName); + if (this.exists(topicName)) { + throw new IllegalStateException(String.format("Deletion of topic %s failed", topicName)); + } } /** diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java index 225ee0f2..0134138b 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java @@ -86,7 +86,6 @@ void shouldSetDefaultSerializer() { new ConfiguredProducerApp<>(new TestProducer(), configuration); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") - .schemaRegistryUrl("fake") .build())) .containsEntry(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .containsEntry(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); @@ -101,7 +100,6 @@ void shouldThrowIfKeySerializerHasBeenConfiguredDifferently() { new ConfiguredProducerApp<>(new TestProducer(), configuration); assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") - .schemaRegistryUrl("fake") .build())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("'key.serializer' should not be configured already"); @@ -116,7 +114,6 @@ void shouldThrowIfValueSerializerHasBeenConfiguredDifferently() { new ConfiguredProducerApp<>(new TestProducer(), configuration); assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") - .schemaRegistryUrl("fake") .build())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("'value.serializer' should not be configured already"); @@ -131,7 +128,6 @@ void shouldThrowIfBootstrapServersHasBeenConfiguredDifferently() { new ConfiguredProducerApp<>(new TestProducer(), configuration); assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") - .schemaRegistryUrl("fake") .build())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("'bootstrap.servers' should not be configured already"); @@ -146,7 +142,7 @@ void shouldThrowIfSchemaRegistryHasBeenConfiguredDifferently() { new ConfiguredProducerApp<>(new TestProducer(), configuration); assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") - .schemaRegistryUrl("fake") + .properties(Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake")) .build())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("'schema.registry.url' should not be configured already"); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java index 46f020e4..e57ef702 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java @@ -86,7 +86,6 @@ void shouldSetDefaultSerde() { new ConfiguredStreamsApp<>(new TestApplication(), configuration); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") - .schemaRegistryUrl("fake") .build())) .containsEntry(DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class) .containsEntry(DEFAULT_VALUE_SERDE_CLASS_CONFIG, LongSerde.class); @@ -101,7 +100,6 @@ void shouldThrowIfKeySerdeHasBeenConfiguredDifferently() { new ConfiguredStreamsApp<>(new TestApplication(), configuration); assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") - .schemaRegistryUrl("fake") .build())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("'default.key.serde' should not be configured already"); @@ -116,7 +114,6 @@ void shouldThrowIfValueSerdeHasBeenConfiguredDifferently() { new ConfiguredStreamsApp<>(new TestApplication(), configuration); assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") - .schemaRegistryUrl("fake") .build())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("'default.value.serde' should not be configured already"); @@ -131,7 +128,6 @@ void shouldThrowIfAppIdHasBeenConfiguredDifferently() { new ConfiguredStreamsApp<>(new TestApplication(), configuration); assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") - .schemaRegistryUrl("fake") .build())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("'application.id' should not be configured already"); @@ -146,7 +142,6 @@ void shouldThrowIfBootstrapServersHasBeenConfiguredDifferently() { new ConfiguredStreamsApp<>(new TestApplication(), configuration); assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") - .schemaRegistryUrl("fake") .build())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("'bootstrap.servers' should not be configured already"); @@ -161,7 +156,7 @@ void shouldThrowIfSchemaRegistryHasBeenConfiguredDifferently() { new ConfiguredStreamsApp<>(new TestApplication(), configuration); assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") - .schemaRegistryUrl("fake") + .properties(Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake")) .build())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("'schema.registry.url' should not be configured already"); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java index 1e75ef40..d3686076 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java @@ -27,6 +27,8 @@ import com.bakdata.kafka.KafkaEndpointConfig; import com.bakdata.kafka.TestUtil; import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import java.util.Map; import net.mguenther.kafka.junit.EmbeddedKafkaCluster; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -56,7 +58,8 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { KafkaEndpointConfig createEndpoint() { return KafkaEndpointConfig.builder() .brokers(this.kafkaCluster.getBrokerList()) - .schemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()) + .properties(Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + this.schemaRegistryMockExtension.getUrl())) .build(); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java index 739912f9..922339f2 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java @@ -24,9 +24,9 @@ package com.bakdata.kafka.test_applications; -import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SchemaRegistryProducerApp; import com.bakdata.kafka.SerializerConfig; import com.bakdata.kafka.TestRecord; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; @@ -34,7 +34,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; -public class AvroKeyProducer implements ProducerApp { +public class AvroKeyProducer implements SchemaRegistryProducerApp { @Override public ProducerRunnable buildRunnable(final ProducerBuilder builder) { return () -> { diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java index 1b373284..1dcc8653 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java @@ -24,9 +24,9 @@ package com.bakdata.kafka.test_applications; -import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SchemaRegistryProducerApp; import com.bakdata.kafka.SerializerConfig; import com.bakdata.kafka.TestRecord; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; @@ -34,7 +34,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; -public class AvroValueProducer implements ProducerApp { +public class AvroValueProducer implements SchemaRegistryProducerApp { @Override public ProducerRunnable buildRunnable(final ProducerBuilder builder) { return () -> { diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java index 4f5b2680..a07fa898 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java @@ -24,8 +24,8 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SchemaRegistryStreamsApp; import com.bakdata.kafka.SerdeConfig; -import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; @@ -41,7 +41,7 @@ import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; -public class ComplexTopologyApplication implements StreamsApp { +public class ComplexTopologyApplication implements SchemaRegistryStreamsApp { public static final String THROUGH_TOPIC = "through-topic"; diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java index 6f303f2e..8b633773 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java @@ -24,8 +24,8 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SchemaRegistryStreamsApp; import com.bakdata.kafka.SerdeConfig; -import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; @@ -35,7 +35,7 @@ import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor -public class MirrorKeyWithAvro implements StreamsApp { +public class MirrorKeyWithAvro implements SchemaRegistryStreamsApp { @Override public void buildTopology(final TopologyBuilder builder) { final KStream input = builder.streamInput(); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java index fb8087d8..d633bc25 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java @@ -24,8 +24,8 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SchemaRegistryStreamsApp; import com.bakdata.kafka.SerdeConfig; -import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; @@ -35,7 +35,7 @@ import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor -public class MirrorValueWithAvro implements StreamsApp { +public class MirrorValueWithAvro implements SchemaRegistryStreamsApp { @Override public void buildTopology(final TopologyBuilder builder) { final KStream input = builder.streamInput(); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java index 7406b484..0aa18e25 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java @@ -25,8 +25,8 @@ package com.bakdata.kafka.test_applications; import com.bakdata.kafka.Configurator; +import com.bakdata.kafka.SchemaRegistryStreamsApp; import com.bakdata.kafka.SerdeConfig; -import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; @@ -39,7 +39,7 @@ import org.apache.kafka.streams.kstream.Produced; @NoArgsConstructor -public class MirrorWithNonDefaultSerde implements StreamsApp { +public class MirrorWithNonDefaultSerde implements SchemaRegistryStreamsApp { public static Serde newKeySerde() { return new SpecificAvroSerde<>(); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java deleted file mode 100644 index 536854dd..00000000 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka.util; - - -import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static net.mguenther.kafka.junit.Wait.delay; - -import com.bakdata.kafka.TestRecord; -import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; -import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import lombok.extern.slf4j.Slf4j; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.SendValuesTransactional; -import net.mguenther.kafka.junit.TopicConfig; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.assertj.core.api.SoftAssertions; -import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; -import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.extension.RegisterExtension; - -@Slf4j -@ExtendWith(SoftAssertionsExtension.class) -class SchemaTopicClientTest { - private static final int TIMEOUT_SECONDS = 10; - private static final String TOPIC = "topic"; - @RegisterExtension - final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); - private final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster(); - - @InjectSoftAssertions - SoftAssertions softly; - - @BeforeEach - void setup() { - this.kafkaCluster.start(); - } - - @AfterEach - void teardown() { - this.kafkaCluster.stop(); - } - - @Test - void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() - throws InterruptedException, IOException, RestClientException { - this.kafkaCluster.createTopic(TopicConfig.withName(TOPIC).useDefaults()); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .as("Topic is created") - .isTrue(); - - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(TOPIC, List.of(TestRecord.newBuilder().setContent("foo").build())) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) - .build(); - this.kafkaCluster.send(sendRequest); - - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); - this.softly.assertThat(client.getAllSubjects()) - .contains(TOPIC + "-value"); - - try (final SchemaTopicClient schemaTopicClient = this.createClientWithSchemaRegistry()) { - schemaTopicClient.deleteTopicAndResetSchemaRegistry(TOPIC); - } - - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - - this.softly.assertThat(client.getAllSubjects()) - .doesNotContain(TOPIC + "-value"); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .isFalse(); - } - - @Test - void shouldResetSchema() throws InterruptedException, IOException, RestClientException { - this.kafkaCluster.createTopic(TopicConfig.withName(TOPIC).useDefaults()); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .as("Topic is created") - .isTrue(); - - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(TOPIC, List.of(TestRecord.newBuilder().setContent("foo").build())) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) - .build(); - this.kafkaCluster.send(sendRequest); - - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); - this.softly.assertThat(client.getAllSubjects()) - .contains(TOPIC + "-value"); - - try (final SchemaTopicClient schemaTopicClient = this.createClientWithSchemaRegistry()) { - schemaTopicClient.resetSchemaRegistry(TOPIC); - } - - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - - this.softly.assertThat(client.getAllSubjects()) - .doesNotContain(TOPIC + "-value"); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .isTrue(); - } - - @Test - void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws InterruptedException, RestClientException, - IOException { - this.kafkaCluster.createTopic(TopicConfig.withName(TOPIC).useDefaults()); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .as("Topic is created") - .isTrue(); - - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(TOPIC, List.of(TestRecord.newBuilder().setContent("foo").build())) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) - .build(); - this.kafkaCluster.send(sendRequest); - - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); - this.softly.assertThat(client.getAllSubjects()) - .contains(TOPIC + "-value"); - - try (final SchemaTopicClient schemaTopicClient = this.createClientWithNoSchemaRegistry()) { - schemaTopicClient.deleteTopicAndResetSchemaRegistry(TOPIC); - } - - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - this.softly.assertThat(client.getAllSubjects()) - .contains(TOPIC + "-value"); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .isFalse(); - } - - private SchemaTopicClient createClientWithSchemaRegistry() { - final Map kafkaProperties = Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList() - ); - return SchemaTopicClient.create(kafkaProperties, this.schemaRegistryMockExtension.getUrl(), - Duration.of(TIMEOUT_SECONDS, ChronoUnit.SECONDS)); - } - - private SchemaTopicClient createClientWithNoSchemaRegistry() { - final Map kafkaProperties = Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList() - ); - return SchemaTopicClient.create(kafkaProperties, Duration.of(TIMEOUT_SECONDS, ChronoUnit.SECONDS)); - } - -} diff --git a/streams-bootstrap-schema-registry/build.gradle.kts b/streams-bootstrap-schema-registry/build.gradle.kts new file mode 100644 index 00000000..f38c06f0 --- /dev/null +++ b/streams-bootstrap-schema-registry/build.gradle.kts @@ -0,0 +1,9 @@ +description = "Utils for using Confluent Schema Registry with your Kafka Streams Application" + +dependencies { + api(project(":streams-bootstrap-core")) + val confluentVersion: String by project + api(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion) + implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion) + testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) +} diff --git a/streams-bootstrap-schema-registry/lombok.config b/streams-bootstrap-schema-registry/lombok.config new file mode 100644 index 00000000..189c0bef --- /dev/null +++ b/streams-bootstrap-schema-registry/lombok.config @@ -0,0 +1,3 @@ +# This file is generated by the 'io.freefair.lombok' Gradle plugin +config.stopBubbling = true +lombok.addLombokGeneratedAnnotation = true diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java new file mode 100644 index 00000000..a74a0ac6 --- /dev/null +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java @@ -0,0 +1,120 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.bakdata.kafka.HasTopicHooks.TopicHook; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import lombok.NonNull; +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; + +/** + * Utility class that provides helpers for cleaning {@code LargeMessageSerde} artifacts + */ +@UtilityClass +@Slf4j +public class SchemaRegistryKafkaApplicationUtils { + private static final int CACHE_CAPACITY = 100; + + /** + * Creates a new {@link SchemaRegistryClient} using the specified configuration. + * + * @param configs properties passed to + * {@link SchemaRegistryClientFactory#newClient(List, int, List, Map, Map)} + * @param schemaRegistryUrl URL of schema registry + * @return {@link SchemaRegistryClient} + */ + public static SchemaRegistryClient createSchemaRegistryClient(@NonNull final Map configs, + @NonNull final String schemaRegistryUrl) { + return SchemaRegistryClientFactory.newClient(List.of(schemaRegistryUrl), CACHE_CAPACITY, null, configs, null); + } + + /** + * Create a hook that cleans up schemas associated with a topic. It is expected that all necessary + * properties to create a {@link SchemaRegistryClient} are part of {@code kafkaProperties}. + * + * @param kafkaProperties Kafka properties to create hook from + * @return hook that cleans up schemas associated with a topic + * @see HasTopicHooks#registerTopicHook(TopicHook) + */ + public static TopicHook createSchemaRegistryCleanUpHook(final Map kafkaProperties) { + final SchemaRegistryClient schemaRegistryClient = createSchemaRegistryClient(kafkaProperties); //TODO close + return new TopicHook() { + @Override + public void deleted(final String topic) { + log.info("Resetting Schema Registry for topic '{}'", topic); + try { + final Collection allSubjects = schemaRegistryClient.getAllSubjects(); + final String keySubject = topic + "-key"; + if (allSubjects.contains(keySubject)) { + schemaRegistryClient.deleteSubject(keySubject); + log.info("Cleaned key schema of topic {}", topic); + } else { + log.info("No key schema for topic {} available", topic); + } + final String valueSubject = topic + "-value"; + if (allSubjects.contains(valueSubject)) { + schemaRegistryClient.deleteSubject(valueSubject); + log.info("Cleaned value schema of topic {}", topic); + } else { + log.info("No value schema for topic {} available", topic); + } + } catch (final IOException | RestClientException e) { + throw new CleanUpException("Could not reset schema registry for topic " + topic, e); + } + } + }; + } + + /** + * Create a hook that cleans up schemas associated with a topic. It is expected that all necessary + * properties to create a {@link SchemaRegistryClient} are part of + * {@link EffectiveAppConfiguration#getKafkaProperties()}. + * + * @param configuration Configuration to create hook from + * @return hook that cleans up schemas associated with a topic + * @see #createSchemaRegistryCleanUpHook(Map) + */ + public static TopicHook createSchemaRegistryCleanUpHook(final EffectiveAppConfiguration configuration) { + return createSchemaRegistryCleanUpHook(configuration.getKafkaProperties()); + } + + private static SchemaRegistryClient createSchemaRegistryClient(final Map kafkaProperties) { + final String schemaRegistryUrl = + (String) kafkaProperties.get(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); + final Map properties = new HashMap<>(kafkaProperties); + properties.remove(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); + return createSchemaRegistryClient(properties, schemaRegistryUrl); + } + +} diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryProducerApp.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryProducerApp.java new file mode 100644 index 00000000..d56c9a34 --- /dev/null +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryProducerApp.java @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +/** + * {@link ProducerApp} that automatically removes schemas when deleting topics + */ +public interface SchemaRegistryProducerApp extends ProducerApp { + + /** + * Register a hook that cleans up schemas associated with a topic + * @param cleanUpConfiguration Configuration to register hook on + * @param configuration Configuration to create hook from + * @return {@code ProducerCleanUpConfiguration} with registered topic hook + * @see SchemaRegistryKafkaApplicationUtils#createSchemaRegistryCleanUpHook(EffectiveAppConfiguration) + */ + static ProducerCleanUpConfiguration registerSchemaRegistryCleanUpHook( + final ProducerCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration configuration) { + return cleanUpConfiguration.registerTopicHook( + SchemaRegistryKafkaApplicationUtils.createSchemaRegistryCleanUpHook(configuration)); + } + + @Override + default ProducerCleanUpConfiguration setupCleanUp( + final EffectiveAppConfiguration configuration) { + final ProducerCleanUpConfiguration cleanUpConfiguration = ProducerApp.super.setupCleanUp(configuration); + return registerSchemaRegistryCleanUpHook(cleanUpConfiguration, configuration); + } + +} diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryStreamsApp.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryStreamsApp.java new file mode 100644 index 00000000..5ed848e0 --- /dev/null +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryStreamsApp.java @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +/** + * {@link StreamsApp} that automatically removes schemas when deleting topics + */ +public interface SchemaRegistryStreamsApp extends StreamsApp { + + /** + * Register a hook that cleans up schemas associated with a topic + * @param cleanUpConfiguration Configuration to register hook on + * @param configuration Configuration to create hook from + * @return {@code StreamsCleanUpConfiguration} with registered topic hook + * @see SchemaRegistryKafkaApplicationUtils#createSchemaRegistryCleanUpHook(EffectiveAppConfiguration) + */ + static StreamsCleanUpConfiguration registerSchemaRegistryCleanUpHook( + final StreamsCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration configuration) { + return cleanUpConfiguration.registerTopicHook( + SchemaRegistryKafkaApplicationUtils.createSchemaRegistryCleanUpHook(configuration)); + } + + @Override + default StreamsCleanUpConfiguration setupCleanUp( + final EffectiveAppConfiguration configuration) { + final StreamsCleanUpConfiguration cleanUpConfiguration = StreamsApp.super.setupCleanUp(configuration); + return registerSchemaRegistryCleanUpHook(cleanUpConfiguration, configuration); + } + +} diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/StreamsBootstrapTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/StreamsBootstrapTopologyFactory.java index f4a2402e..e9d40d8c 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/StreamsBootstrapTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/StreamsBootstrapTopologyFactory.java @@ -27,6 +27,7 @@ import com.bakdata.fluent_kafka_streams_tests.TestTopology; import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; import com.bakdata.kafka.KafkaEndpointConfig.KafkaEndpointConfigBuilder; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.util.Map; import java.util.function.Function; import lombok.experimental.UtilityClass; @@ -114,7 +115,9 @@ public static Function> getKafkaPropertiesWithSchema final ConfiguredStreamsApp app) { return schemaRegistryUrl -> { final KafkaEndpointConfig endpointConfig = newEndpointConfig() - .schemaRegistryUrl(schemaRegistryUrl) + .properties(Map.of( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl + )) .build(); return app.getKafkaProperties(endpointConfig); }; From 95299380fb6373c65f5fed7ec598ecb47f2e3020 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jul 2024 10:32:27 +0200 Subject: [PATCH 02/15] Remove schema registry support from core module --- README.md | 4 ---- .../src/test/java/com/bakdata/kafka/CliTest.java | 8 -------- 2 files changed, 12 deletions(-) diff --git a/README.md b/README.md index ea027e22..8367e700 100644 --- a/README.md +++ b/README.md @@ -109,8 +109,6 @@ The following configuration options are available: - `--brokers`: List of Kafka brokers (comma-separated) (**required**) -- `--schema-registry-url`: The URL of the Schema Registry - - `--kafka-config`: Kafka Streams configuration (`[,...]`) - `--input-topics`: List of input topics (comma-separated) @@ -192,8 +190,6 @@ The following configuration options are available: - `--brokers`: List of Kafka brokers (comma-separated) (**required**) -- `--schema-registry-url`: The URL of the Schema Registry - - `--kafka-config`: Kafka producer configuration (`[,...]`) - `--output-topic`: The output topic diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index 88dad30a..114f5e42 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -78,7 +78,6 @@ public void run() { } }, new String[]{ "--brokers", "localhost:9092", - "--schema-registry-url", "http://localhost:8081", "--input-topics", "input", "--output-topic", "output", }); @@ -104,7 +103,6 @@ public SerdeConfig defaultSerializationConfig() { } }), new String[]{ "--brokers", "localhost:9092", - "--schema-registry-url", "http://localhost:8081", "--input-topics", "input", "--output-topic", "output", }); @@ -140,7 +138,6 @@ public void clean() { } }, new String[]{ "--brokers", "localhost:9092", - "--schema-registry-url", "http://localhost:8081", "--input-topics", "input", "--output-topic", "output", "clean", @@ -176,7 +173,6 @@ public void run() { // do nothing } }, new String[]{ - "--schema-registry-url", "http://localhost:8081", "--input-topics", "input", "--output-topic", "output", }); @@ -211,7 +207,6 @@ public SerdeConfig defaultSerializationConfig() { runApp(app, "--brokers", kafkaCluster.getBrokerList(), - "--schema-registry-url", "http://localhost:8081", "--input-topics", input ); kafkaCluster.send(SendKeyValues.to(input, List.of(new KeyValue<>("foo", "bar")))); @@ -248,7 +243,6 @@ public SerdeConfig defaultSerializationConfig() { runApp(app, "--brokers", kafkaCluster.getBrokerList(), - "--schema-registry-url", "http://localhost:8081", "--input-topics", input, "--output-topic", output ); @@ -289,7 +283,6 @@ public SerdeConfig defaultSerializationConfig() { } }, new String[]{ "--brokers", "localhost:9092", - "--schema-registry-url", "http://localhost:8081", "--input-topics", "input", "--output-topic", "output", "clean", @@ -326,7 +319,6 @@ public void run() { }) { KafkaApplication.startApplicationWithoutExit(app, new String[]{ "--brokers", "brokers", - "--schema-registry-url", "schema-registry", "--input-topics", "input1,input2", "--extra-input-topics", "role1=input3,role2=input4;input5", "--input-pattern", ".*", From 030985ebd913b20cba50f901140db4d368a0e2ad Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jul 2024 10:34:50 +0200 Subject: [PATCH 03/15] Remove schema registry support from core module --- streams-bootstrap-core/build.gradle.kts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index 75aca556..2cba0667 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -40,6 +40,8 @@ dependencies { testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) { exclude(group = "org.slf4j", module = "slf4j-log4j12") } + val confluentVersion: String by project + testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) val log4jVersion: String by project testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) } From cd0a9348ad53b831c6a3a684ae4401025ff4c356 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jul 2024 10:48:28 +0200 Subject: [PATCH 04/15] Remove schema registry support from core module --- README.md | 4 ++++ charts/producer-app-cleanup-job/templates/job.yaml | 2 +- charts/producer-app/templates/pod.yaml | 2 +- charts/streams-app-cleanup-job/templates/job.yaml | 2 +- charts/streams-app/templates/deployment.yaml | 2 +- streams-bootstrap-cli/build.gradle.kts | 2 ++ .../main/java/com/bakdata/kafka/KafkaApplication.java | 11 +++++++++++ .../java/com/bakdata/kafka/StringListConverter.java | 2 +- .../bakdata/kafka/integration/RunProducerAppTest.java | 3 +-- .../java/com/bakdata/kafka/KafkaEndpointConfig.java | 7 +++---- .../java/com/bakdata/kafka/integration/KafkaTest.java | 6 ++++-- 11 files changed, 30 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 8367e700..ea027e22 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,8 @@ The following configuration options are available: - `--brokers`: List of Kafka brokers (comma-separated) (**required**) +- `--schema-registry-url`: The URL of the Schema Registry + - `--kafka-config`: Kafka Streams configuration (`[,...]`) - `--input-topics`: List of input topics (comma-separated) @@ -190,6 +192,8 @@ The following configuration options are available: - `--brokers`: List of Kafka brokers (comma-separated) (**required**) +- `--schema-registry-url`: The URL of the Schema Registry + - `--kafka-config`: Kafka producer configuration (`[,...]`) - `--output-topic`: The output topic diff --git a/charts/producer-app-cleanup-job/templates/job.yaml b/charts/producer-app-cleanup-job/templates/job.yaml index af42098f..a6a94422 100644 --- a/charts/producer-app-cleanup-job/templates/job.yaml +++ b/charts/producer-app-cleanup-job/templates/job.yaml @@ -69,7 +69,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "KAFKA_SCHEMA_REGISTRY_URL" + - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if hasKey .Values.kafka "outputTopic" }} diff --git a/charts/producer-app/templates/pod.yaml b/charts/producer-app/templates/pod.yaml index 92f4eb8e..ba063e0b 100644 --- a/charts/producer-app/templates/pod.yaml +++ b/charts/producer-app/templates/pod.yaml @@ -53,7 +53,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "KAFKA_SCHEMA_REGISTRY_URL" + - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if hasKey .Values.kafka "outputTopic" }} diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index 47208534..458e58b5 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -73,7 +73,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "KAFKA_SCHEMA_REGISTRY_URL" + - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }} diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index 7c6ba46b..3da02578 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -120,7 +120,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "KAFKA_SCHEMA_REGISTRY_URL" + - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }} diff --git a/streams-bootstrap-cli/build.gradle.kts b/streams-bootstrap-cli/build.gradle.kts index 15548118..113eeae0 100644 --- a/streams-bootstrap-cli/build.gradle.kts +++ b/streams-bootstrap-cli/build.gradle.kts @@ -7,6 +7,8 @@ plugins { dependencies { api(project(":streams-bootstrap-core")) api(group = "info.picocli", name = "picocli", version = "4.7.5") + val confluentVersion: String by project + implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion) val junitVersion: String by project testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 21699146..7f3bf14f 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -26,6 +26,7 @@ import static java.util.Collections.emptyMap; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -53,6 +54,7 @@ *
  • {@link #outputTopic}
  • *
  • {@link #extraOutputTopics}
  • *
  • {@link #brokers}
  • + *
  • {@link #schemaRegistryUrl}
  • *
  • {@link #kafkaConfig}
  • * * To implement your Kafka application inherit from this class and add your custom options. Run it by calling @@ -87,6 +89,8 @@ public abstract class KafkaApplication extraOutputTopics = emptyMap(); @CommandLine.Option(names = "--brokers", required = true, description = "Broker addresses to connect to") private String brokers; + @CommandLine.Option(names = "--schema-registry-url", description = "URL of Schema Registry") + private String schemaRegistryUrl; @CommandLine.Option(names = "--kafka-config", split = ",", description = "Additional Kafka properties") private Map kafkaConfig = emptyMap(); @@ -202,6 +206,7 @@ public void run() { public KafkaEndpointConfig getEndpointConfig() { return KafkaEndpointConfig.builder() .brokers(this.brokers) + .properties(this.getEndpointProperties()) .build(); } @@ -297,6 +302,12 @@ protected void onApplicationStart() { // do nothing by default } + private Map getEndpointProperties() { + return this.schemaRegistryUrl == null ? emptyMap() : Map.of( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl + ); + } + private void startApplication() { Runtime.getRuntime().addShutdownHook(new Thread(this::close)); this.onApplicationStart(); diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/StringListConverter.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/StringListConverter.java index 7ca0fa4b..bd40efb4 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/StringListConverter.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/StringListConverter.java @@ -39,7 +39,7 @@ public List convert(final String value) { final String[] split = value.split(";"); return Arrays.stream(split) .map(String::trim) - .filter(String::isEmpty) + .filter(s -> !s.isEmpty()) .collect(Collectors.toList()); } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index 571ba91b..67232632 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -91,10 +91,9 @@ public SerializerConfig defaultSerializationConfig() { } })) { app.setBrokers(this.kafkaCluster.getBrokerList()); + app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()); app.setOutputTopic(output); app.setKafkaConfig(Map.of( - AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl(), ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" )); app.run(); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java index 7736ca2c..9622570c 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java @@ -32,7 +32,7 @@ import org.apache.kafka.streams.StreamsConfig; /** - * Configuration to connect to Kafka infrastructure, i.e., brokers and optionally schema registry. + * Configuration to connect to Kafka infrastructure, i.e., brokers and additional properties such as credentials. */ @Builder public class KafkaEndpointConfig { @@ -45,13 +45,12 @@ public class KafkaEndpointConfig { * The following properties are configured: *
      *
    • {@code bootstrap.servers}
    • - *
    • {@code schema.registry.url}
    • + *
    • all properties specified via {@link #properties}
    • *
    * @return properties used for connecting to Kafka */ public Map createKafkaProperties() { - final Map kafkaConfig = new HashMap<>(); - kafkaConfig.putAll(this.properties); + final Map kafkaConfig = new HashMap<>(this.properties); kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokers); return Collections.unmodifiableMap(kafkaConfig); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java index d3686076..8282c273 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java @@ -58,8 +58,10 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { KafkaEndpointConfig createEndpoint() { return KafkaEndpointConfig.builder() .brokers(this.kafkaCluster.getBrokerList()) - .properties(Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl())) + .properties(Map.of( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + this.schemaRegistryMockExtension.getUrl() + )) .build(); } } From 20cb80421d0488d0a1fd1b8e6e3df91307961b88 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jul 2024 10:58:05 +0200 Subject: [PATCH 05/15] Remove schema registry support from core module --- .../SchemaRegistryKafkaApplicationUtils.java | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java index a74a0ac6..d86626b1 100644 --- a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java @@ -47,12 +47,32 @@ public class SchemaRegistryKafkaApplicationUtils { private static final int CACHE_CAPACITY = 100; /** - * Creates a new {@link SchemaRegistryClient} using the specified configuration. + * Creates a new {@code SchemaRegistryClient} using the specified configuration. + * + * @param kafkaProperties properties for creating {@code SchemaRegistryClient}. Must include + * {@link AbstractKafkaSchemaSerDeConfig#SCHEMA_REGISTRY_URL_CONFIG}. + * @return {@code SchemaRegistryClient} + * @see #createSchemaRegistryClient(Map, String) + */ + public static SchemaRegistryClient createSchemaRegistryClient(final Map kafkaProperties) { + final String schemaRegistryUrl = + (String) kafkaProperties.get(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); + if (schemaRegistryUrl == null) { + throw new IllegalArgumentException(String.format("%s must be specified in properties", + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG)); + } + final Map properties = new HashMap<>(kafkaProperties); + properties.remove(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); + return createSchemaRegistryClient(properties, schemaRegistryUrl); + } + + /** + * Creates a new {@code SchemaRegistryClient} using the specified configuration. * * @param configs properties passed to * {@link SchemaRegistryClientFactory#newClient(List, int, List, Map, Map)} * @param schemaRegistryUrl URL of schema registry - * @return {@link SchemaRegistryClient} + * @return {@code SchemaRegistryClient} */ public static SchemaRegistryClient createSchemaRegistryClient(@NonNull final Map configs, @NonNull final String schemaRegistryUrl) { @@ -109,12 +129,4 @@ public static TopicHook createSchemaRegistryCleanUpHook(final EffectiveAppConfig return createSchemaRegistryCleanUpHook(configuration.getKafkaProperties()); } - private static SchemaRegistryClient createSchemaRegistryClient(final Map kafkaProperties) { - final String schemaRegistryUrl = - (String) kafkaProperties.get(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); - final Map properties = new HashMap<>(kafkaProperties); - properties.remove(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); - return createSchemaRegistryClient(properties, schemaRegistryUrl); - } - } From 7619a0c6192f06b9597a5fa211c17949284e590b Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jul 2024 11:17:20 +0200 Subject: [PATCH 06/15] Remove schema registry support from core module --- .../src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java | 2 +- .../src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java index c891e68f..ab2895b4 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java @@ -101,7 +101,7 @@ private void deleteTopics() { private void deleteTopic(final String topic) { this.adminClient.getTopicClient() - .deleteTopic(topic); + .deleteTopicIfExists(topic); ProducerCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java index 93de95b2..cd983a14 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java @@ -242,7 +242,7 @@ private void resetInternalTopic(final String topic) { private void deleteTopic(final String topic) { this.adminClient.getTopicClient() - .deleteTopic(topic); + .deleteTopicIfExists(topic); StreamsCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic); } From 76b89a474908d3e6a2a89ce32d96f4d5bd9da678 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jul 2024 11:19:16 +0200 Subject: [PATCH 07/15] Remove schema registry CLI param --- README.md | 4 ---- charts/producer-app-cleanup-job/templates/job.yaml | 2 +- charts/producer-app/templates/pod.yaml | 2 +- charts/streams-app-cleanup-job/templates/job.yaml | 2 +- charts/streams-app/templates/deployment.yaml | 2 +- streams-bootstrap-cli/build.gradle.kts | 2 -- .../java/com/bakdata/kafka/KafkaApplication.java | 12 +----------- .../kafka/integration/RunProducerAppTest.java | 3 ++- 8 files changed, 7 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index ea027e22..8367e700 100644 --- a/README.md +++ b/README.md @@ -109,8 +109,6 @@ The following configuration options are available: - `--brokers`: List of Kafka brokers (comma-separated) (**required**) -- `--schema-registry-url`: The URL of the Schema Registry - - `--kafka-config`: Kafka Streams configuration (`[,...]`) - `--input-topics`: List of input topics (comma-separated) @@ -192,8 +190,6 @@ The following configuration options are available: - `--brokers`: List of Kafka brokers (comma-separated) (**required**) -- `--schema-registry-url`: The URL of the Schema Registry - - `--kafka-config`: Kafka producer configuration (`[,...]`) - `--output-topic`: The output topic diff --git a/charts/producer-app-cleanup-job/templates/job.yaml b/charts/producer-app-cleanup-job/templates/job.yaml index a6a94422..af42098f 100644 --- a/charts/producer-app-cleanup-job/templates/job.yaml +++ b/charts/producer-app-cleanup-job/templates/job.yaml @@ -69,7 +69,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" + - name: "KAFKA_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if hasKey .Values.kafka "outputTopic" }} diff --git a/charts/producer-app/templates/pod.yaml b/charts/producer-app/templates/pod.yaml index ba063e0b..92f4eb8e 100644 --- a/charts/producer-app/templates/pod.yaml +++ b/charts/producer-app/templates/pod.yaml @@ -53,7 +53,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" + - name: "KAFKA_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if hasKey .Values.kafka "outputTopic" }} diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index 458e58b5..47208534 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -73,7 +73,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" + - name: "KAFKA_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }} diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index 3da02578..7c6ba46b 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -120,7 +120,7 @@ spec: value: {{ .Values.kafka.brokers | quote }} {{- end }} {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" + - name: "KAFKA_SCHEMA_REGISTRY_URL" value: {{ .Values.kafka.schemaRegistryUrl | quote }} {{- end }} {{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }} diff --git a/streams-bootstrap-cli/build.gradle.kts b/streams-bootstrap-cli/build.gradle.kts index 113eeae0..15548118 100644 --- a/streams-bootstrap-cli/build.gradle.kts +++ b/streams-bootstrap-cli/build.gradle.kts @@ -7,8 +7,6 @@ plugins { dependencies { api(project(":streams-bootstrap-core")) api(group = "info.picocli", name = "picocli", version = "4.7.5") - val confluentVersion: String by project - implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion) val junitVersion: String by project testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 7f3bf14f..cac7b1f2 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -26,7 +26,6 @@ import static java.util.Collections.emptyMap; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -54,7 +53,6 @@ *
  • {@link #outputTopic}
  • *
  • {@link #extraOutputTopics}
  • *
  • {@link #brokers}
  • - *
  • {@link #schemaRegistryUrl}
  • *
  • {@link #kafkaConfig}
  • * * To implement your Kafka application inherit from this class and add your custom options. Run it by calling @@ -89,8 +87,6 @@ public abstract class KafkaApplication extraOutputTopics = emptyMap(); @CommandLine.Option(names = "--brokers", required = true, description = "Broker addresses to connect to") private String brokers; - @CommandLine.Option(names = "--schema-registry-url", description = "URL of Schema Registry") - private String schemaRegistryUrl; @CommandLine.Option(names = "--kafka-config", split = ",", description = "Additional Kafka properties") private Map kafkaConfig = emptyMap(); @@ -204,9 +200,9 @@ public void run() { } public KafkaEndpointConfig getEndpointConfig() { + // We do not specify endpoint properties, those are passed via kafkaConfig return KafkaEndpointConfig.builder() .brokers(this.brokers) - .properties(this.getEndpointProperties()) .build(); } @@ -302,12 +298,6 @@ protected void onApplicationStart() { // do nothing by default } - private Map getEndpointProperties() { - return this.schemaRegistryUrl == null ? emptyMap() : Map.of( - AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl - ); - } - private void startApplication() { Runtime.getRuntime().addShutdownHook(new Thread(this::close)); this.onApplicationStart(); diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index 67232632..571ba91b 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -91,9 +91,10 @@ public SerializerConfig defaultSerializationConfig() { } })) { app.setBrokers(this.kafkaCluster.getBrokerList()); - app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()); app.setOutputTopic(output); app.setKafkaConfig(Map.of( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + this.schemaRegistryMockExtension.getUrl(), ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" )); app.run(); From d6c54399dd8300e17df9eec94e085648e4d47996 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jul 2024 11:40:31 +0200 Subject: [PATCH 08/15] Update --- .../com/bakdata/kafka/KafkaApplication.java | 1 + .../java/com/bakdata/kafka/CleanUpRunner.java | 7 +- .../java/com/bakdata/kafka/HasTopicHooks.java | 9 ++- .../kafka/ProducerCleanUpConfiguration.java | 8 ++- .../bakdata/kafka/ProducerCleanUpRunner.java | 5 ++ .../kafka/StreamsCleanUpConfiguration.java | 8 ++- .../bakdata/kafka/StreamsCleanUpRunner.java | 5 ++ .../SchemaRegistryKafkaApplicationUtils.java | 67 ++++++++++++------- 8 files changed, 79 insertions(+), 31 deletions(-) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 7f3bf14f..c72a9ad7 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -348,6 +348,7 @@ public void close() { */ @Override public void stop() { + this.cleanUpRunner.close(); this.app.close(); } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/CleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/CleanUpRunner.java index 40ef7cfa..c05716ab 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/CleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/CleanUpRunner.java @@ -27,8 +27,11 @@ /** * Cleans all resources associated with an application */ -@FunctionalInterface -public interface CleanUpRunner { +public interface CleanUpRunner extends AutoCloseable { + + @Override + void close(); + /** * Clean all resources associated with an application */ diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/HasTopicHooks.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/HasTopicHooks.java index f3433e16..26b209d6 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/HasTopicHooks.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/HasTopicHooks.java @@ -24,6 +24,8 @@ package com.bakdata.kafka; +import java.io.Closeable; + /** * Interface for performing actions on topics * @param self for chaining @@ -41,7 +43,7 @@ public interface HasTopicHooks { /** * Hook for performing actions on topics */ - interface TopicHook { + interface TopicHook extends Closeable { /** * Called when a topic is deleted * @param topic name of the topic @@ -49,5 +51,10 @@ interface TopicHook { default void deleted(final String topic) { // do nothing } + + @Override + default void close() { + // do nothing + } } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java index aa563091..f8f25d4b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import lombok.NonNull; @@ -32,7 +33,7 @@ * Provides configuration options for {@link ProducerCleanUpRunner} */ public class ProducerCleanUpConfiguration - implements HasTopicHooks, HasCleanHook { + implements HasTopicHooks, HasCleanHook, Closeable { private final @NonNull Collection topicHooks = new ArrayList<>(); private final @NonNull Collection cleanHooks = new ArrayList<>(); @@ -54,6 +55,11 @@ public ProducerCleanUpConfiguration registerCleanHook(final Runnable hook) { return this; } + @Override + public void close() { + this.topicHooks.forEach(TopicHook::close); + } + void runCleanHooks() { this.cleanHooks.forEach(Runnable::run); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java index ab2895b4..b60d1996 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java @@ -69,6 +69,11 @@ public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig to return new ProducerCleanUpRunner(topics, kafkaProperties, configuration); } + @Override + public void close() { + this.cleanHooks.close(); + } + /** * Delete all output topics */ diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java index c9186936..bd01ba45 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import lombok.NonNull; @@ -32,7 +33,7 @@ * Provides configuration options for {@link StreamsCleanUpRunner} */ public class StreamsCleanUpConfiguration - implements HasTopicHooks, HasCleanHook { + implements HasTopicHooks, HasCleanHook, Closeable { private final @NonNull Collection topicHooks = new ArrayList<>(); private final @NonNull Collection cleanHooks = new ArrayList<>(); private final @NonNull Collection resetHooks = new ArrayList<>(); @@ -65,6 +66,11 @@ public StreamsCleanUpConfiguration registerResetHook(final Runnable hook) { return this; } + @Override + public void close() { + this.topicHooks.forEach(TopicHook::close); + } + void runCleanHooks() { this.cleanHooks.forEach(Runnable::run); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java index cd983a14..d1ef62e0 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java @@ -159,6 +159,11 @@ private static Collection filterExistingTopics(final Collection .collect(Collectors.toList()); } + @Override + public void close() { + this.cleanHooks.close(); + } + /** * Clean up your Streams app by resetting the app and deleting the output topics * and consumer group. diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java index d86626b1..541ee43d 100644 --- a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java @@ -30,11 +30,13 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -88,32 +90,8 @@ public static SchemaRegistryClient createSchemaRegistryClient(@NonNull final Map * @see HasTopicHooks#registerTopicHook(TopicHook) */ public static TopicHook createSchemaRegistryCleanUpHook(final Map kafkaProperties) { - final SchemaRegistryClient schemaRegistryClient = createSchemaRegistryClient(kafkaProperties); //TODO close - return new TopicHook() { - @Override - public void deleted(final String topic) { - log.info("Resetting Schema Registry for topic '{}'", topic); - try { - final Collection allSubjects = schemaRegistryClient.getAllSubjects(); - final String keySubject = topic + "-key"; - if (allSubjects.contains(keySubject)) { - schemaRegistryClient.deleteSubject(keySubject); - log.info("Cleaned key schema of topic {}", topic); - } else { - log.info("No key schema for topic {} available", topic); - } - final String valueSubject = topic + "-value"; - if (allSubjects.contains(valueSubject)) { - schemaRegistryClient.deleteSubject(valueSubject); - log.info("Cleaned value schema of topic {}", topic); - } else { - log.info("No value schema for topic {} available", topic); - } - } catch (final IOException | RestClientException e) { - throw new CleanUpException("Could not reset schema registry for topic " + topic, e); - } - } - }; + final SchemaRegistryClient schemaRegistryClient = createSchemaRegistryClient(kafkaProperties); + return new SchemaRegistryTopicHook(schemaRegistryClient); } /** @@ -129,4 +107,41 @@ public static TopicHook createSchemaRegistryCleanUpHook(final EffectiveAppConfig return createSchemaRegistryCleanUpHook(configuration.getKafkaProperties()); } + @RequiredArgsConstructor + private static class SchemaRegistryTopicHook implements TopicHook { + private final @NonNull SchemaRegistryClient schemaRegistryClient; + + @Override + public void deleted(final String topic) { + log.info("Resetting Schema Registry for topic '{}'", topic); + try { + final Collection allSubjects = this.schemaRegistryClient.getAllSubjects(); + final String keySubject = topic + "-key"; + if (allSubjects.contains(keySubject)) { + this.schemaRegistryClient.deleteSubject(keySubject); + log.info("Cleaned key schema of topic {}", topic); + } else { + log.info("No key schema for topic {} available", topic); + } + final String valueSubject = topic + "-value"; + if (allSubjects.contains(valueSubject)) { + this.schemaRegistryClient.deleteSubject(valueSubject); + log.info("Cleaned value schema of topic {}", topic); + } else { + log.info("No value schema for topic {} available", topic); + } + } catch (final IOException | RestClientException e) { + throw new CleanUpException("Could not reset schema registry for topic " + topic, e); + } + } + + @Override + public void close() { + try { + this.schemaRegistryClient.close(); + } catch (final IOException e) { + throw new UncheckedIOException("Error closing schema registry client", e); + } + } + } } From 6e681494905884824d5cddd8b3f2ee1b91ade7f6 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jul 2024 14:06:43 +0200 Subject: [PATCH 09/15] Rename to labeled --- charts/producer-app-cleanup-job/templates/job.yaml | 4 ---- charts/producer-app-cleanup-job/values.yaml | 1 - charts/producer-app/README.md | 1 - charts/producer-app/templates/pod.yaml | 4 ---- charts/producer-app/values.yaml | 1 - charts/streams-app-cleanup-job/templates/job.yaml | 4 ---- charts/streams-app-cleanup-job/values.yaml | 1 - charts/streams-app/README.md | 1 - charts/streams-app/templates/deployment.yaml | 4 ---- charts/streams-app/values.yaml | 1 - 10 files changed, 22 deletions(-) diff --git a/charts/producer-app-cleanup-job/templates/job.yaml b/charts/producer-app-cleanup-job/templates/job.yaml index af42098f..1d65fb96 100644 --- a/charts/producer-app-cleanup-job/templates/job.yaml +++ b/charts/producer-app-cleanup-job/templates/job.yaml @@ -68,10 +68,6 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_BROKERS" value: {{ .Values.kafka.brokers | quote }} {{- end }} - {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "KAFKA_SCHEMA_REGISTRY_URL" - value: {{ .Values.kafka.schemaRegistryUrl | quote }} - {{- end }} {{- if hasKey .Values.kafka "outputTopic" }} - name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC" value: {{ .Values.kafka.outputTopic | quote }} diff --git a/charts/producer-app-cleanup-job/values.yaml b/charts/producer-app-cleanup-job/values.yaml index c9aef620..76a4d33e 100644 --- a/charts/producer-app-cleanup-job/values.yaml +++ b/charts/producer-app-cleanup-job/values.yaml @@ -17,7 +17,6 @@ files: {} kafka: # brokers: "test:9092" -# schemaRegistryUrl: "url:1234" config: {} # max.poll.records: 500 # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. diff --git a/charts/producer-app/README.md b/charts/producer-app/README.md index f0ebbab5..ffa49f76 100644 --- a/charts/producer-app/README.md +++ b/charts/producer-app/README.md @@ -49,7 +49,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p | Parameter | Description | Default | |---------------------------|------------------------------------------------------------------------------------------------------------|---------| | `kafka.brokers` | Comma separated list of Kafka brokers to connect to. | | -| `kafka.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` | | `kafka.config` | Configurations for your [Kafka producer app](https://kafka.apache.org/documentation/#producerconfigs). | `{}` | | `kafka.outputTopic` | Output topic for your producer application. | | | `kafka.extraOutputTopics` | Map of additional named output topics if you need to specify multiple topics with different message types. | `{}` | diff --git a/charts/producer-app/templates/pod.yaml b/charts/producer-app/templates/pod.yaml index 92f4eb8e..b64a891a 100644 --- a/charts/producer-app/templates/pod.yaml +++ b/charts/producer-app/templates/pod.yaml @@ -52,10 +52,6 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_BROKERS" value: {{ .Values.kafka.brokers | quote }} {{- end }} - {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "KAFKA_SCHEMA_REGISTRY_URL" - value: {{ .Values.kafka.schemaRegistryUrl | quote }} - {{- end }} {{- if hasKey .Values.kafka "outputTopic" }} - name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC" value: {{ .Values.kafka.outputTopic | quote }} diff --git a/charts/producer-app/values.yaml b/charts/producer-app/values.yaml index 1bd2360c..1f6ce005 100644 --- a/charts/producer-app/values.yaml +++ b/charts/producer-app/values.yaml @@ -50,7 +50,6 @@ resources: kafka: # brokers: "test:9092" -# schemaRegistryUrl: "url:1234" config: {} # max.poll.records: 500 # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index 47208534..d05d8cfb 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -72,10 +72,6 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_BROKERS" value: {{ .Values.kafka.brokers | quote }} {{- end }} - {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "KAFKA_SCHEMA_REGISTRY_URL" - value: {{ .Values.kafka.schemaRegistryUrl | quote }} - {{- end }} {{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }} - name: "{{ .Values.configurationEnvPrefix }}_INPUT_TOPICS" value: {{ .Values.kafka.inputTopics | join "," | quote }} diff --git a/charts/streams-app-cleanup-job/values.yaml b/charts/streams-app-cleanup-job/values.yaml index 1620a5cd..18c9277a 100644 --- a/charts/streams-app-cleanup-job/values.yaml +++ b/charts/streams-app-cleanup-job/values.yaml @@ -17,7 +17,6 @@ files: {} kafka: # brokers: "test:9092" -# schemaRegistryUrl: "url:1234" config: {} # max.poll.records: 500 # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. diff --git a/charts/streams-app/README.md b/charts/streams-app/README.md index b5938db3..b362c957 100644 --- a/charts/streams-app/README.md +++ b/charts/streams-app/README.md @@ -53,7 +53,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p | Parameter | Description | Default | |----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| | `kafka.brokers` | Comma separated list of Kafka brokers to connect to. | | -| `kafka.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` | | `kafka.staticMembership` | Whether to use [Kafka Static Group Membership](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). | `false` | | `kafka.config` | Configurations for your [Kafka Streams app](https://kafka.apache.org/documentation/#streamsconfigs). | `{}` | | `kafka.inputTopics` | List of input topics for your streams application. | `[]` | diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index 7c6ba46b..b55b5e2c 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -119,10 +119,6 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_BROKERS" value: {{ .Values.kafka.brokers | quote }} {{- end }} - {{- if hasKey .Values.kafka "schemaRegistryUrl" }} - - name: "KAFKA_SCHEMA_REGISTRY_URL" - value: {{ .Values.kafka.schemaRegistryUrl | quote }} - {{- end }} {{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }} - name: "{{ .Values.configurationEnvPrefix }}_INPUT_TOPICS" value: {{ .Values.kafka.inputTopics | join "," | quote }} diff --git a/charts/streams-app/values.yaml b/charts/streams-app/values.yaml index 0425122f..d9506987 100644 --- a/charts/streams-app/values.yaml +++ b/charts/streams-app/values.yaml @@ -25,7 +25,6 @@ resources: kafka: # brokers: "test:9092" - # schemaRegistryUrl: "url:1234" staticMembership: false config: {} # max.poll.records: 500 From a043cf07c4de8b1fe709e87823fbdb7514a349f5 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jul 2024 19:10:15 +0200 Subject: [PATCH 10/15] Update --- .../src/main/java/com/bakdata/kafka/HasTopicHooks.java | 4 +--- .../java/com/bakdata/kafka/ProducerCleanUpConfiguration.java | 4 ++-- .../java/com/bakdata/kafka/StreamsCleanUpConfiguration.java | 4 ++-- .../main/java/com/bakdata/kafka/util/ConsumerGroupClient.java | 3 +-- .../main/java/com/bakdata/kafka/util/ImprovedAdminClient.java | 3 +-- .../src/main/java/com/bakdata/kafka/util/TopicClient.java | 3 +-- 6 files changed, 8 insertions(+), 13 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/HasTopicHooks.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/HasTopicHooks.java index 26b209d6..661fa0f3 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/HasTopicHooks.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/HasTopicHooks.java @@ -24,8 +24,6 @@ package com.bakdata.kafka; -import java.io.Closeable; - /** * Interface for performing actions on topics * @param self for chaining @@ -43,7 +41,7 @@ public interface HasTopicHooks { /** * Hook for performing actions on topics */ - interface TopicHook extends Closeable { + interface TopicHook extends AutoCloseable { /** * Called when a topic is deleted * @param topic name of the topic diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java index f8f25d4b..63a8e965 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import lombok.NonNull; @@ -33,7 +32,8 @@ * Provides configuration options for {@link ProducerCleanUpRunner} */ public class ProducerCleanUpConfiguration - implements HasTopicHooks, HasCleanHook, Closeable { + implements HasTopicHooks, HasCleanHook, + AutoCloseable { private final @NonNull Collection topicHooks = new ArrayList<>(); private final @NonNull Collection cleanHooks = new ArrayList<>(); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java index bd01ba45..73794625 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import lombok.NonNull; @@ -33,7 +32,8 @@ * Provides configuration options for {@link StreamsCleanUpRunner} */ public class StreamsCleanUpConfiguration - implements HasTopicHooks, HasCleanHook, Closeable { + implements HasTopicHooks, HasCleanHook, + AutoCloseable { private final @NonNull Collection topicHooks = new ArrayList<>(); private final @NonNull Collection cleanHooks = new ArrayList<>(); private final @NonNull Collection resetHooks = new ArrayList<>(); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java index a813785e..41b38e42 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java @@ -24,7 +24,6 @@ package com.bakdata.kafka.util; -import java.io.Closeable; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -44,7 +43,7 @@ */ @RequiredArgsConstructor @Slf4j -public final class ConsumerGroupClient implements Closeable { +public final class ConsumerGroupClient implements AutoCloseable { private final @NonNull Admin adminClient; private final @NonNull Duration timeout; diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ImprovedAdminClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ImprovedAdminClient.java index 0ffe3186..9a69236b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ImprovedAdminClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ImprovedAdminClient.java @@ -24,7 +24,6 @@ package com.bakdata.kafka.util; -import java.io.Closeable; import java.time.Duration; import java.util.Map; import lombok.AccessLevel; @@ -40,7 +39,7 @@ * Provide methods for common operations when performing administrative actions on a Kafka cluster */ @Builder(access = AccessLevel.PRIVATE) -public final class ImprovedAdminClient implements Closeable { +public final class ImprovedAdminClient implements AutoCloseable { private static final Duration ADMIN_TIMEOUT = Duration.ofSeconds(10L); private final @NonNull Admin adminClient; diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java index 378bf534..46b65bb6 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java @@ -24,7 +24,6 @@ package com.bakdata.kafka.util; -import java.io.Closeable; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -48,7 +47,7 @@ */ @RequiredArgsConstructor @Slf4j -public final class TopicClient implements Closeable { +public final class TopicClient implements AutoCloseable { private final @NonNull Admin adminClient; private final @NonNull Duration timeout; From caaf0a08139b2dc9eee4df31eb26d8bd48ed3e7c Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 29 Jul 2024 09:18:32 +0200 Subject: [PATCH 11/15] Update --- .../test/java/com/bakdata/kafka/CliTest.java | 1 - ...Utils.java => SchemaRegistryAppUtils.java} | 23 +++++++++++++++---- .../kafka/SchemaRegistryProducerApp.java | 15 +----------- .../kafka/SchemaRegistryStreamsApp.java | 15 +----------- 4 files changed, 20 insertions(+), 34 deletions(-) rename streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/{SchemaRegistryKafkaApplicationUtils.java => SchemaRegistryAppUtils.java} (86%) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index f54b15db..da75894d 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -360,7 +360,6 @@ public void run() { "--kafka-config", "foo=1,bar=2", }); assertThat(app.getBootstrapServers()).isEqualTo("bootstrap-servers"); - assertThat(app.getSchemaRegistryUrl()).isEqualTo("schema-registry"); assertThat(app.getInputTopics()).containsExactly("input1", "input2"); assertThat(app.getLabeledInputTopics()) .hasSize(2) diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java similarity index 86% rename from streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java rename to streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java index 541ee43d..17aa1fb7 100644 --- a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryKafkaApplicationUtils.java +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java @@ -45,7 +45,7 @@ */ @UtilityClass @Slf4j -public class SchemaRegistryKafkaApplicationUtils { +public class SchemaRegistryAppUtils { private static final int CACHE_CAPACITY = 100; /** @@ -89,7 +89,7 @@ public static SchemaRegistryClient createSchemaRegistryClient(@NonNull final Map * @return hook that cleans up schemas associated with a topic * @see HasTopicHooks#registerTopicHook(TopicHook) */ - public static TopicHook createSchemaRegistryCleanUpHook(final Map kafkaProperties) { + public static TopicHook createTopicHook(final Map kafkaProperties) { final SchemaRegistryClient schemaRegistryClient = createSchemaRegistryClient(kafkaProperties); return new SchemaRegistryTopicHook(schemaRegistryClient); } @@ -101,10 +101,23 @@ public static TopicHook createSchemaRegistryCleanUpHook(final Map configuration) { - return createSchemaRegistryCleanUpHook(configuration.getKafkaProperties()); + public static TopicHook createTopicHook(final EffectiveAppConfiguration configuration) { + return createTopicHook(configuration.getKafkaProperties()); + } + + /** + * Register a hook that cleans up schemas associated with a topic + * @param cleanUpConfiguration Configuration to register hook on + * @param configuration Configuration to create hook from + * @return {@code StreamsCleanUpConfiguration} with registered topic hook + * @see SchemaRegistryAppUtils#createTopicHook(EffectiveAppConfiguration) + */ + public static T registerTopicHook( + final HasTopicHooks cleanUpConfiguration, final EffectiveAppConfiguration configuration) { + return cleanUpConfiguration.registerTopicHook( + createTopicHook(configuration)); } @RequiredArgsConstructor diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryProducerApp.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryProducerApp.java index d56c9a34..fb1950d8 100644 --- a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryProducerApp.java +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryProducerApp.java @@ -29,24 +29,11 @@ */ public interface SchemaRegistryProducerApp extends ProducerApp { - /** - * Register a hook that cleans up schemas associated with a topic - * @param cleanUpConfiguration Configuration to register hook on - * @param configuration Configuration to create hook from - * @return {@code ProducerCleanUpConfiguration} with registered topic hook - * @see SchemaRegistryKafkaApplicationUtils#createSchemaRegistryCleanUpHook(EffectiveAppConfiguration) - */ - static ProducerCleanUpConfiguration registerSchemaRegistryCleanUpHook( - final ProducerCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration configuration) { - return cleanUpConfiguration.registerTopicHook( - SchemaRegistryKafkaApplicationUtils.createSchemaRegistryCleanUpHook(configuration)); - } - @Override default ProducerCleanUpConfiguration setupCleanUp( final EffectiveAppConfiguration configuration) { final ProducerCleanUpConfiguration cleanUpConfiguration = ProducerApp.super.setupCleanUp(configuration); - return registerSchemaRegistryCleanUpHook(cleanUpConfiguration, configuration); + return SchemaRegistryAppUtils.registerTopicHook(cleanUpConfiguration, configuration); } } diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryStreamsApp.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryStreamsApp.java index 5ed848e0..1e3e1381 100644 --- a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryStreamsApp.java +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryStreamsApp.java @@ -29,24 +29,11 @@ */ public interface SchemaRegistryStreamsApp extends StreamsApp { - /** - * Register a hook that cleans up schemas associated with a topic - * @param cleanUpConfiguration Configuration to register hook on - * @param configuration Configuration to create hook from - * @return {@code StreamsCleanUpConfiguration} with registered topic hook - * @see SchemaRegistryKafkaApplicationUtils#createSchemaRegistryCleanUpHook(EffectiveAppConfiguration) - */ - static StreamsCleanUpConfiguration registerSchemaRegistryCleanUpHook( - final StreamsCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration configuration) { - return cleanUpConfiguration.registerTopicHook( - SchemaRegistryKafkaApplicationUtils.createSchemaRegistryCleanUpHook(configuration)); - } - @Override default StreamsCleanUpConfiguration setupCleanUp( final EffectiveAppConfiguration configuration) { final StreamsCleanUpConfiguration cleanUpConfiguration = StreamsApp.super.setupCleanUp(configuration); - return registerSchemaRegistryCleanUpHook(cleanUpConfiguration, configuration); + return SchemaRegistryAppUtils.registerTopicHook(cleanUpConfiguration, configuration); } } From e1ad94d43d7d0f12486ff909cb966a7a2dff2b4c Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 29 Jul 2024 09:49:00 +0200 Subject: [PATCH 12/15] Update --- .../src/test/java/com/bakdata/kafka/CliTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index da75894d..a514b934 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -203,7 +203,6 @@ public SerdeConfig defaultSerializationConfig() { } }, new String[]{ "--bootstrap-servers", "localhost:9092", - "--schema-registry-url", "http://localhost:8081", "--input-topics", "input", "--output-topic", "output", "--application-id", "my-other-id" From 6c91a5e92dac030c23d89ac35bcc1415f29531eb Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 29 Jul 2024 09:54:26 +0200 Subject: [PATCH 13/15] Update --- .../bakdata/kafka/SchemaRegistryAppUtils.java | 42 ----------- .../kafka/SchemaRegistryTopicHook.java | 74 +++++++++++++++++++ 2 files changed, 74 insertions(+), 42 deletions(-) create mode 100644 streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java index 17aa1fb7..6a155bd1 100644 --- a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java @@ -27,16 +27,11 @@ import com.bakdata.kafka.HasTopicHooks.TopicHook; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -120,41 +115,4 @@ public static T registerTopicHook( createTopicHook(configuration)); } - @RequiredArgsConstructor - private static class SchemaRegistryTopicHook implements TopicHook { - private final @NonNull SchemaRegistryClient schemaRegistryClient; - - @Override - public void deleted(final String topic) { - log.info("Resetting Schema Registry for topic '{}'", topic); - try { - final Collection allSubjects = this.schemaRegistryClient.getAllSubjects(); - final String keySubject = topic + "-key"; - if (allSubjects.contains(keySubject)) { - this.schemaRegistryClient.deleteSubject(keySubject); - log.info("Cleaned key schema of topic {}", topic); - } else { - log.info("No key schema for topic {} available", topic); - } - final String valueSubject = topic + "-value"; - if (allSubjects.contains(valueSubject)) { - this.schemaRegistryClient.deleteSubject(valueSubject); - log.info("Cleaned value schema of topic {}", topic); - } else { - log.info("No value schema for topic {} available", topic); - } - } catch (final IOException | RestClientException e) { - throw new CleanUpException("Could not reset schema registry for topic " + topic, e); - } - } - - @Override - public void close() { - try { - this.schemaRegistryClient.close(); - } catch (final IOException e) { - throw new UncheckedIOException("Error closing schema registry client", e); - } - } - } } diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java new file mode 100644 index 00000000..02029299 --- /dev/null +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java @@ -0,0 +1,74 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.bakdata.kafka.HasTopicHooks.TopicHook; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +class SchemaRegistryTopicHook implements TopicHook { + private final @NonNull SchemaRegistryClient schemaRegistryClient; + + @Override + public void deleted(final String topic) { + log.info("Resetting Schema Registry for topic '{}'", topic); + try { + final Collection allSubjects = this.schemaRegistryClient.getAllSubjects(); + final String keySubject = topic + "-key"; + if (allSubjects.contains(keySubject)) { + this.schemaRegistryClient.deleteSubject(keySubject); + log.info("Cleaned key schema of topic {}", topic); + } else { + log.info("No key schema for topic {} available", topic); + } + final String valueSubject = topic + "-value"; + if (allSubjects.contains(valueSubject)) { + this.schemaRegistryClient.deleteSubject(valueSubject); + log.info("Cleaned value schema of topic {}", topic); + } else { + log.info("No value schema for topic {} available", topic); + } + } catch (final IOException | RestClientException e) { + throw new CleanUpException("Could not reset schema registry for topic " + topic, e); + } + } + + @Override + public void close() { + try { + this.schemaRegistryClient.close(); + } catch (final IOException e) { + throw new UncheckedIOException("Error closing schema registry client", e); + } + } +} From a180ea43b335d7c390614b0be1fea5b23d29440f Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 29 Jul 2024 10:35:18 +0200 Subject: [PATCH 14/15] Use topic hook for schema registry reset --- .../bakdata/kafka/ProducerCleanUpRunner.java | 5 - .../kafka/SchemaRegistryTopicHook.java | 91 ------------------- .../bakdata/kafka/StreamsCleanUpRunner.java | 5 - .../bakdata/kafka/SchemaRegistryAppUtils.java | 21 +---- .../kafka/SchemaRegistryTopicHook.java | 19 +++- 5 files changed, 20 insertions(+), 121 deletions(-) delete mode 100644 streams-bootstrap-core/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java index d87401b1..2423a5f9 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java @@ -66,11 +66,6 @@ public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig to public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig topics, @NonNull final Map kafkaProperties, @NonNull final ProducerCleanUpConfiguration configuration) { - try (final ImprovedAdminClient adminClient = ImprovedAdminClient.create(kafkaProperties)) { - adminClient.getSchemaRegistryClient() - .map(SchemaRegistryTopicHook::new) - .ifPresent(configuration::registerTopicHook); - } return new ProducerCleanUpRunner(topics, kafkaProperties, configuration); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java deleted file mode 100644 index b3214a15..00000000 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import com.bakdata.kafka.HasTopicHooks.TopicHook; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@RequiredArgsConstructor -public class SchemaRegistryTopicHook implements TopicHook { - private static final int CACHE_CAPACITY = 100; - private final @NonNull SchemaRegistryClient schemaRegistryClient; - - /** - * Creates a new {@link SchemaRegistryClient} using the specified configuration. - * - * @param configs properties passed to - * {@link SchemaRegistryClientFactory#newClient(List, int, List, Map, Map)} - * @param schemaRegistryUrl URL of schema registry - * @return {@link SchemaRegistryClient} - */ - public static SchemaRegistryClient createSchemaRegistryClient(@NonNull final Map configs, - @NonNull final String schemaRegistryUrl) { - return SchemaRegistryClientFactory.newClient(List.of(schemaRegistryUrl), CACHE_CAPACITY, null, configs, null); - } - - @Override - public void deleted(final String topic) { - log.info("Resetting Schema Registry for topic '{}'", topic); - try { - final Collection allSubjects = this.schemaRegistryClient.getAllSubjects(); - final String keySubject = topic + "-key"; - if (allSubjects.contains(keySubject)) { - this.schemaRegistryClient.deleteSubject(keySubject); - log.info("Cleaned key schema of topic {}", topic); - } else { - log.info("No key schema for topic {} available", topic); - } - final String valueSubject = topic + "-value"; - if (allSubjects.contains(valueSubject)) { - this.schemaRegistryClient.deleteSubject(valueSubject); - log.info("Cleaned value schema of topic {}", topic); - } else { - log.info("No value schema for topic {} available", topic); - } - } catch (final IOException | RestClientException e) { - throw new CleanUpException("Could not reset schema registry for topic " + topic, e); - } - } - - @Override - public void close() { - try { - this.schemaRegistryClient.close(); - } catch (final IOException e) { - throw new UncheckedIOException("Error closing schema registry client", e); - } - } -} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java index 02b0eaad..d1ef62e0 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java @@ -83,11 +83,6 @@ public static StreamsCleanUpRunner create(final @NonNull Topology topology, final @NonNull StreamsConfig streamsConfig, final @NonNull StreamsCleanUpConfiguration configuration) { final ImprovedStreamsConfig config = new ImprovedStreamsConfig(streamsConfig); final TopologyInformation topologyInformation = new TopologyInformation(topology, config.getAppId()); - try (final ImprovedAdminClient adminClient = ImprovedAdminClient.create(config.getKafkaProperties())) { - adminClient.getSchemaRegistryClient() - .map(SchemaRegistryTopicHook::new) - .ifPresent(configuration::registerTopicHook); - } return new StreamsCleanUpRunner(topologyInformation, topology, config, configuration); } diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java index 6a155bd1..f7a934a4 100644 --- a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java @@ -26,12 +26,9 @@ import com.bakdata.kafka.HasTopicHooks.TopicHook; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.util.HashMap; -import java.util.List; import java.util.Map; -import lombok.NonNull; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -41,7 +38,6 @@ @UtilityClass @Slf4j public class SchemaRegistryAppUtils { - private static final int CACHE_CAPACITY = 100; /** * Creates a new {@code SchemaRegistryClient} using the specified configuration. @@ -49,7 +45,7 @@ public class SchemaRegistryAppUtils { * @param kafkaProperties properties for creating {@code SchemaRegistryClient}. Must include * {@link AbstractKafkaSchemaSerDeConfig#SCHEMA_REGISTRY_URL_CONFIG}. * @return {@code SchemaRegistryClient} - * @see #createSchemaRegistryClient(Map, String) + * @see SchemaRegistryTopicHook#createSchemaRegistryClient(Map, String) */ public static SchemaRegistryClient createSchemaRegistryClient(final Map kafkaProperties) { final String schemaRegistryUrl = @@ -60,20 +56,7 @@ public static SchemaRegistryClient createSchemaRegistryClient(final Map properties = new HashMap<>(kafkaProperties); properties.remove(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); - return createSchemaRegistryClient(properties, schemaRegistryUrl); - } - - /** - * Creates a new {@code SchemaRegistryClient} using the specified configuration. - * - * @param configs properties passed to - * {@link SchemaRegistryClientFactory#newClient(List, int, List, Map, Map)} - * @param schemaRegistryUrl URL of schema registry - * @return {@code SchemaRegistryClient} - */ - public static SchemaRegistryClient createSchemaRegistryClient(@NonNull final Map configs, - @NonNull final String schemaRegistryUrl) { - return SchemaRegistryClientFactory.newClient(List.of(schemaRegistryUrl), CACHE_CAPACITY, null, configs, null); + return SchemaRegistryTopicHook.createSchemaRegistryClient(properties, schemaRegistryUrl); } /** diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java index 02029299..68005b8b 100644 --- a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java @@ -26,19 +26,36 @@ import com.bakdata.kafka.HasTopicHooks.TopicHook; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Collection; +import java.util.List; +import java.util.Map; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @Slf4j @RequiredArgsConstructor -class SchemaRegistryTopicHook implements TopicHook { +public class SchemaRegistryTopicHook implements TopicHook { + private static final int CACHE_CAPACITY = 100; private final @NonNull SchemaRegistryClient schemaRegistryClient; + /** + * Creates a new {@code SchemaRegistryClient} using the specified configuration. + * + * @param configs properties passed to + * {@link SchemaRegistryClientFactory#newClient(List, int, List, Map, Map)} + * @param schemaRegistryUrl URL of schema registry + * @return {@code SchemaRegistryClient} + */ + public static SchemaRegistryClient createSchemaRegistryClient(@NonNull final Map configs, + @NonNull final String schemaRegistryUrl) { + return SchemaRegistryClientFactory.newClient(List.of(schemaRegistryUrl), CACHE_CAPACITY, null, configs, null); + } + @Override public void deleted(final String topic) { log.info("Resetting Schema Registry for topic '{}'", topic); From 7bb9bf5e853b1f88238a7c76e4e567dfa94de84e Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 29 Jul 2024 10:50:57 +0200 Subject: [PATCH 15/15] Use topic hook for schema registry reset --- .../bakdata/kafka/SchemaRegistryAppUtils.java | 25 ++----------------- .../kafka/SchemaRegistryTopicHook.java | 17 ++++++------- 2 files changed, 10 insertions(+), 32 deletions(-) diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java index f7a934a4..5c24ee40 100644 --- a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryAppUtils.java @@ -26,8 +26,6 @@ import com.bakdata.kafka.HasTopicHooks.TopicHook; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; -import java.util.HashMap; import java.util.Map; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -39,26 +37,6 @@ @Slf4j public class SchemaRegistryAppUtils { - /** - * Creates a new {@code SchemaRegistryClient} using the specified configuration. - * - * @param kafkaProperties properties for creating {@code SchemaRegistryClient}. Must include - * {@link AbstractKafkaSchemaSerDeConfig#SCHEMA_REGISTRY_URL_CONFIG}. - * @return {@code SchemaRegistryClient} - * @see SchemaRegistryTopicHook#createSchemaRegistryClient(Map, String) - */ - public static SchemaRegistryClient createSchemaRegistryClient(final Map kafkaProperties) { - final String schemaRegistryUrl = - (String) kafkaProperties.get(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); - if (schemaRegistryUrl == null) { - throw new IllegalArgumentException(String.format("%s must be specified in properties", - AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG)); - } - final Map properties = new HashMap<>(kafkaProperties); - properties.remove(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); - return SchemaRegistryTopicHook.createSchemaRegistryClient(properties, schemaRegistryUrl); - } - /** * Create a hook that cleans up schemas associated with a topic. It is expected that all necessary * properties to create a {@link SchemaRegistryClient} are part of {@code kafkaProperties}. @@ -68,7 +46,8 @@ public static SchemaRegistryClient createSchemaRegistryClient(final Map kafkaProperties) { - final SchemaRegistryClient schemaRegistryClient = createSchemaRegistryClient(kafkaProperties); + final SchemaRegistryClient schemaRegistryClient = + SchemaRegistryTopicHook.createSchemaRegistryClient(kafkaProperties); return new SchemaRegistryTopicHook(schemaRegistryClient); } diff --git a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java index db6e19db..75430adf 100644 --- a/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java +++ b/streams-bootstrap-schema-registry/src/main/java/com/bakdata/kafka/SchemaRegistryTopicHook.java @@ -35,7 +35,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -47,23 +46,23 @@ public class SchemaRegistryTopicHook implements TopicHook { private final @NonNull SchemaRegistryClient schemaRegistryClient; /** - * Creates a new {@code SchemaRegistryClient} using the specified configuration if - * {@link AbstractKafkaSchemaSerDeConfig#SCHEMA_REGISTRY_URL_CONFIG} is configured. + * Creates a new {@code SchemaRegistryClient} using the specified configuration. * - * @param kafkaProperties properties for creating {@code SchemaRegistryClient} - * @return {@code SchemaRegistryClient} if {@link AbstractKafkaSchemaSerDeConfig#SCHEMA_REGISTRY_URL_CONFIG} is - * configured + * @param kafkaProperties properties for creating {@code SchemaRegistryClient}. Must include + * {@link AbstractKafkaSchemaSerDeConfig#SCHEMA_REGISTRY_URL_CONFIG}. + * @return {@code SchemaRegistryClient} * @see SchemaRegistryTopicHook#createSchemaRegistryClient(Map, String) */ - public static Optional createSchemaRegistryClient(final Map kafkaProperties) { + public static SchemaRegistryClient createSchemaRegistryClient(final Map kafkaProperties) { final String schemaRegistryUrl = (String) kafkaProperties.get(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); if (schemaRegistryUrl == null) { - return Optional.empty(); + throw new IllegalArgumentException(String.format("%s must be specified in properties", + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG)); } final Map properties = new HashMap<>(kafkaProperties); properties.remove(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); - return Optional.of(createSchemaRegistryClient(properties, schemaRegistryUrl)); + return createSchemaRegistryClient(properties, schemaRegistryUrl); } /**