Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove schema registry support from core module #245

Draft
wants to merge 21 commits into
base: feature/schema-registry-topic-hook
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
cb4fd69
Remove schema registry support from core module
philipp94831 Jul 26, 2024
9529938
Remove schema registry support from core module
philipp94831 Jul 26, 2024
030985e
Remove schema registry support from core module
philipp94831 Jul 26, 2024
cd0a934
Remove schema registry support from core module
philipp94831 Jul 26, 2024
20cb804
Remove schema registry support from core module
philipp94831 Jul 26, 2024
7619a0c
Remove schema registry support from core module
philipp94831 Jul 26, 2024
76b89a4
Remove schema registry CLI param
philipp94831 Jul 26, 2024
d6c5439
Update
philipp94831 Jul 26, 2024
88c1b8c
Merge branch 'refs/heads/feature/remove-schema-registry-2' into featu…
philipp94831 Jul 26, 2024
6e68149
Rename to labeled
philipp94831 Jul 26, 2024
97a3803
Merge remote-tracking branch 'origin/v3' into feature/remove-schema-r…
philipp94831 Jul 26, 2024
a043cf0
Update
philipp94831 Jul 26, 2024
aaf6a68
Merge remote-tracking branch 'origin/v3' into feature/remove-schema-r…
philipp94831 Jul 29, 2024
caaf0a0
Update
philipp94831 Jul 29, 2024
363e9c8
Merge remote-tracking branch 'origin/v3' into feature/remove-schema-r…
philipp94831 Jul 29, 2024
e1ad94d
Update
philipp94831 Jul 29, 2024
6c91a5e
Update
philipp94831 Jul 29, 2024
e007c1d
Merge branch 'refs/heads/feature/schema-registry-topic-hook' into fea…
philipp94831 Jul 29, 2024
a180ea4
Use topic hook for schema registry reset
philipp94831 Jul 29, 2024
362093c
Merge branch 'refs/heads/feature/schema-registry-topic-hook' into fea…
philipp94831 Jul 29, 2024
7bb9bf5
Use topic hook for schema registry reset
philipp94831 Jul 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ The following configuration options are available:

- `--bootstrap-servers`, `--bootstrap-server`: List of Kafka bootstrap servers (comma-separated) (**required**)

- `--schema-registry-url`: The URL of the Schema Registry

- `--kafka-config`: Kafka Streams configuration (`<String=String>[,<String=String>...]`)

- `--input-topics`: List of input topics (comma-separated)
Expand Down Expand Up @@ -195,8 +193,6 @@ The following configuration options are available:

- `--bootstrap-servers`, `--bootstrap-server`: List of Kafka bootstrap servers (comma-separated) (**required**)

- `--schema-registry-url`: The URL of the Schema Registry

- `--kafka-config`: Kafka producer configuration (`<String=String>[,<String=String>...]`)

- `--output-topic`: The output topic
Expand Down
4 changes: 0 additions & 4 deletions charts/producer-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_BOOTSTRAP_SERVERS"
value: {{ .Values.kafka.bootstrapServers | quote }}
{{- end }}
{{- if hasKey .Values.kafka "schemaRegistryUrl" }}
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.kafka.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values.kafka "outputTopic" }}
- name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC"
value: {{ .Values.kafka.outputTopic | quote }}
Expand Down
1 change: 0 additions & 1 deletion charts/producer-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ files: {}

kafka:
# bootstrapServers: "test:9092"
# schemaRegistryUrl: "url:1234"
config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
Expand Down
1 change: 0 additions & 1 deletion charts/producer-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
| Parameter | Description | Default |
|-----------------------------|--------------------------------------------------------------------------------------------------------------|---------|
| `kafka.bootstrapServers` | Comma separated list of Kafka bootstrap servers to connect to. | |
| `kafka.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` |
| `kafka.config` | Configurations for your [Kafka producer app](https://kafka.apache.org/documentation/#producerconfigs). | `{}` |
| `kafka.outputTopic` | Output topic for your producer application. | |
| `kafka.labeledOutputTopics` | Map of additional labeled output topics if you need to specify multiple topics with different message types. | `{}` |
Expand Down
4 changes: 0 additions & 4 deletions charts/producer-app/templates/pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_BOOTSTRAP_SERVERS"
value: {{ .Values.kafka.bootstrapServers | quote }}
{{- end }}
{{- if hasKey .Values.kafka "schemaRegistryUrl" }}
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.kafka.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values.kafka "outputTopic" }}
- name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC"
value: {{ .Values.kafka.outputTopic | quote }}
Expand Down
1 change: 0 additions & 1 deletion charts/producer-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ resources:

kafka:
# bootstrapServers: "test:9092"
# schemaRegistryUrl: "url:1234"
config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
Expand Down
4 changes: 0 additions & 4 deletions charts/streams-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_BOOTSTRAP_SERVERS"
value: {{ .Values.kafka.bootstrapServers | quote }}
{{- end }}
{{- if hasKey .Values.kafka "schemaRegistryUrl" }}
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.kafka.schemaRegistryUrl | quote }}
{{- end }}
{{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }}
- name: "{{ .Values.configurationEnvPrefix }}_INPUT_TOPICS"
value: {{ .Values.kafka.inputTopics | join "," | quote }}
Expand Down
1 change: 0 additions & 1 deletion charts/streams-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ files: {}

kafka:
# bootstrapServers: "test:9092"
# schemaRegistryUrl: "url:1234"
config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
Expand Down
1 change: 0 additions & 1 deletion charts/streams-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
| Parameter | Description | Default |
|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `kafka.bootstrapServers` | Comma separated list of Kafka bootstrap servers to connect to. | |
| `kafka.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` |
| `kafka.staticMembership` | Whether to use [Kafka Static Group Membership](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). | `false` |
| `kafka.config` | Configurations for your [Kafka Streams app](https://kafka.apache.org/documentation/#streamsconfigs). | `{}` |
| `kafka.inputTopics` | List of input topics for your streams application. | `[]` |
Expand Down
4 changes: 0 additions & 4 deletions charts/streams-app/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_BOOTSTRAP_SERVERS"
value: {{ .Values.kafka.bootstrapServers | quote }}
{{- end }}
{{- if hasKey .Values.kafka "schemaRegistryUrl" }}
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.kafka.schemaRegistryUrl | quote }}
{{- end }}
{{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }}
- name: "{{ .Values.configurationEnvPrefix }}_INPUT_TOPICS"
value: {{ .Values.kafka.inputTopics | join "," | quote }}
Expand Down
1 change: 0 additions & 1 deletion charts/streams-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ resources:

kafka:
# bootstrapServers: "test:9092"
# schemaRegistryUrl: "url:1234"
staticMembership: false
config: {}
# max.poll.records: 500
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ include(
":streams-bootstrap-core",
":streams-bootstrap-test",
":streams-bootstrap-large-messages",
":streams-bootstrap-schema-registry",
":streams-bootstrap-cli",
)
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
* <li>{@link #bootstrapServers}</li>
* <li>{@link #outputTopic}</li>
* <li>{@link #labeledOutputTopics}</li>
* <li>{@link #schemaRegistryUrl}</li>
* <li>{@link #kafkaConfig}</li>
* </ul>
* To implement your Kafka application inherit from this class and add your custom options. Run it by calling
Expand Down Expand Up @@ -89,8 +88,6 @@ public abstract class KafkaApplication<R extends Runner, CR extends CleanUpRunne
@CommandLine.Option(names = {"--bootstrap-servers", "--bootstrap-server"}, required = true,
description = "Kafka bootstrap servers to connect to")
private String bootstrapServers;
@CommandLine.Option(names = "--schema-registry-url", description = "URL of Schema Registry")
private String schemaRegistryUrl;
@CommandLine.Option(names = "--kafka-config", split = ",", description = "Additional Kafka properties")
private Map<String, String> kafkaConfig = emptyMap();

Expand Down Expand Up @@ -204,9 +201,9 @@ public void run() {
}

public KafkaEndpointConfig getEndpointConfig() {
// We do not specify endpoint properties, those are passed via kafkaConfig
return KafkaEndpointConfig.builder()
.bootstrapServers(this.bootstrapServers)
.schemaRegistryUrl(this.schemaRegistryUrl)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ public SerdeConfig defaultSerializationConfig() {
}
}, new String[]{
"--bootstrap-servers", "localhost:9092",
"--schema-registry-url", "http://localhost:8081",
"--input-topics", "input",
"--output-topic", "output",
"--application-id", "my-other-id"
Expand Down Expand Up @@ -351,7 +350,6 @@ public void run() {
}) {
KafkaApplication.startApplicationWithoutExit(app, new String[]{
"--bootstrap-server", "bootstrap-servers",
"--schema-registry-url", "schema-registry",
"--input-topics", "input1,input2",
"--labeled-input-topics", "label1=input3,label2=input4;input5",
"--input-pattern", ".*",
Expand All @@ -361,7 +359,6 @@ public void run() {
"--kafka-config", "foo=1,bar=2",
});
assertThat(app.getBootstrapServers()).isEqualTo("bootstrap-servers");
assertThat(app.getSchemaRegistryUrl()).isEqualTo("schema-registry");
assertThat(app.getInputTopics()).containsExactly("input1", "input2");
assertThat(app.getLabeledInputTopics())
.hasSize(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ public SerializerConfig defaultSerializationConfig() {
}
})) {
app.setBootstrapServers(this.kafkaCluster.getBrokerList());
app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl());
app.setOutputTopic(output);
app.setKafkaConfig(Map.of(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl(),
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
app.run();
Expand Down
5 changes: 2 additions & 3 deletions streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ dependencies {

api(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion)
api(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion)
val confluentVersion: String by project
implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion)
api(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion)
api(
group = "org.slf4j",
name = "slf4j-api",
Expand All @@ -33,6 +30,7 @@ dependencies {

val fluentKafkaVersion: String by project
testImplementation(project(":streams-bootstrap-test"))
testImplementation(project(":streams-bootstrap-schema-registry"))
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "schema-registry-mock-junit5",
Expand All @@ -42,6 +40,7 @@ dependencies {
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
val confluentVersion: String by project
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka;

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -33,28 +32,26 @@
import org.apache.kafka.streams.StreamsConfig;

/**
* Configuration to connect to Kafka infrastructure, i.e., bootstrap servers and optionally schema registry.
* Configuration to connect to Kafka infrastructure, i.e., bootstrap servers and additional properties such as credentials.
*/
@Builder
public class KafkaEndpointConfig {
private final @NonNull String bootstrapServers;
private final String schemaRegistryUrl;
@Builder.Default
private final Map<String, Object> properties = new HashMap<>();

/**
* Create Kafka properties to connect to infrastructure.
* The following properties are configured:
* <ul>
* <li>{@code bootstrap.servers}</li>
* <li>{@code schema.registry.url}</li>
* <li>all properties specified via {@link #properties}</li>
* </ul>
* @return properties used for connecting to Kafka
*/
public Map<String, Object> createKafkaProperties() {
final Map<String, String> kafkaConfig = new HashMap<>();
final Map<String, Object> kafkaConfig = new HashMap<>(this.properties);
kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
if (this.schemaRegistryUrl != null) {
kafkaConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl);
}
return Collections.unmodifiableMap(kafkaConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig to
public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig topics,
@NonNull final Map<String, Object> kafkaProperties,
@NonNull final ProducerCleanUpConfiguration configuration) {
SchemaRegistryTopicHook.createSchemaRegistryClient(kafkaProperties)
.map(SchemaRegistryTopicHook::new)
.ifPresent(configuration::registerTopicHook);
return new ProducerCleanUpRunner(topics, kafkaProperties, configuration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ public static StreamsCleanUpRunner create(final @NonNull Topology topology,
final @NonNull StreamsConfig streamsConfig, final @NonNull StreamsCleanUpConfiguration configuration) {
final ImprovedStreamsConfig config = new ImprovedStreamsConfig(streamsConfig);
final TopologyInformation topologyInformation = new TopologyInformation(topology, config.getAppId());
SchemaRegistryTopicHook.createSchemaRegistryClient(config.getKafkaProperties())
.map(SchemaRegistryTopicHook::new)
.ifPresent(configuration::registerTopicHook);
return new StreamsCleanUpRunner(topologyInformation, topology, config, configuration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,8 @@

package com.bakdata.kafka.util;

import static com.bakdata.kafka.SchemaRegistryTopicHook.createSchemaRegistryClient;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.NonNull;
Expand All @@ -49,7 +43,6 @@ public final class ImprovedAdminClient implements AutoCloseable {

private static final Duration ADMIN_TIMEOUT = Duration.ofSeconds(10L);
private final @NonNull Admin adminClient;
private final SchemaRegistryClient schemaRegistryClient;
private final @NonNull Duration timeout;

/**
Expand All @@ -74,10 +67,8 @@ public static ImprovedAdminClient create(@NonNull final Map<String, Object> prop
String.format("%s must be specified in properties", AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
}
final Admin adminClient = AdminClient.create(properties);
final SchemaRegistryClient schemaRegistryClient = createSchemaRegistryClient(properties).orElse(null);
return builder()
.adminClient(adminClient)
.schemaRegistryClient(schemaRegistryClient)
.timeout(timeout)
.build();
}
Expand All @@ -86,11 +77,6 @@ public Admin getAdminClient() {
return new PooledAdmin(this.adminClient);
}

public Optional<SchemaRegistryClient> getSchemaRegistryClient() {
return Optional.ofNullable(this.schemaRegistryClient)
.map(PooledSchemaRegistryClient::new);
}

public TopicClient getTopicClient() {
return new TopicClient(this.getAdminClient(), this.timeout);
}
Expand All @@ -102,13 +88,6 @@ public ConsumerGroupClient getConsumerGroupClient() {
@Override
public void close() {
this.adminClient.close();
if (this.schemaRegistryClient != null) {
try {
this.schemaRegistryClient.close();
} catch (final IOException e) {
throw new UncheckedIOException("Error closing schema registry client", e);
}
}
}

@RequiredArgsConstructor
Expand All @@ -126,15 +105,4 @@ public void close() {
// do nothing
}
}

@RequiredArgsConstructor
private static class PooledSchemaRegistryClient implements SchemaRegistryClient {
@Delegate(excludes = AutoCloseable.class)
private final @NonNull SchemaRegistryClient schemaRegistryClient;

@Override
public void close() {
// do nothing
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void shouldThrowIfSchemaRegistryHasBeenConfiguredDifferently() {
new ConfiguredProducerApp<>(new TestProducer(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
.bootstrapServers("fake")
.schemaRegistryUrl("fake")
.properties(Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake"))
.build()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'schema.registry.url' should not be configured already");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void shouldThrowIfSchemaRegistryHasBeenConfiguredDifferently() {
new ConfiguredStreamsApp<>(new TestApplication(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
.bootstrapServers("fake")
.schemaRegistryUrl("fake")
.properties(Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake"))
.build()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'schema.registry.url' should not be configured already");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.bakdata.kafka.KafkaEndpointConfig;
import com.bakdata.kafka.TestUtil;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.util.Map;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -56,7 +58,10 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
KafkaEndpointConfig createEndpoint() {
return KafkaEndpointConfig.builder()
.bootstrapServers(this.kafkaCluster.getBrokerList())
.schemaRegistryUrl(this.schemaRegistryMockExtension.getUrl())
.properties(Map.of(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl()
))
.build();
}
}
Loading
Loading