Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation for new SMT ExtractTopicFromValueSchema and tests #93

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,38 @@ Either `field.value` or `field.value.pattern` must be defined to apply filter.

Only, `string`, `numeric` and `boolean` types are considered for matching purposes, other types are ignored.

### `ExtractTopicFromValueSchema`

This transformation checks the record value schema name and if it exists uses it as the topic name.

- `io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name` - works on value schema name.

By default (if SchemaNameToTopicMap or the RegExSchemaNameToTopicMap is not set) transformation uses the content of the schema.name field of the record value.

The transformation defines the following optional configurations which can be used to tamper the schema.name:

- `schema.name.topic-map` - Map that contains the schema.name value and corresponding new topic name value that should be used instead. Format is "SchemaValue1:NewValue1,SchemaValue2:NewValue2" so key:value pairs as comma separated list.
- `schema.name.regex` - RegEx that should be used to parse the schema.name to desired value. For example for example `(?:[.]|^)([^.]*)$` which parses the name after last dot.

Here is an example of this transformation configuration (using :schema.name.topic-map)

```properties
transforms=ExtractTopicFromValueSchema
transforms.ExtractTopicFromValueSchema.type=io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name
transforms.ExtractTopicFromValueSchema.schema.name.topic-map=com.acme.schema.SchemaNameToTopic1:TheNameToReplace1,com.acme.schema.SchemaNameToTopic2:TheNameToReplace2

```
And here is an example of this transformation configuration (using :schema.name.regex)
```properties
transforms=ExtractTopicFromValueSchema
transforms.ExtractTopicFromValueSchema.type=io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name
transforms.ExtractTopicFromValueSchema.schema.name.regex=(?:[.]|^)([^.]*)$

mikaka-paf marked this conversation as resolved.
Show resolved Hide resolved
## License

This project is licensed under the [Apache License, Version 2.0](LICENSE).

## Trademarks

Apache Kafka and Apache Kafka Connect are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.

Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ final class IntegrationTest {
private final TopicPartition newTopicPartition0 =
new TopicPartition(TestSourceConnector.NEW_TOPIC, 0);

private final TopicPartition originalTopicValueFromSchema =
new TopicPartition(TopicFromValueSchemaConnector.TOPIC, 0);

private final TopicPartition newTopicValueFromSchema =
new TopicPartition(TopicFromValueSchemaConnector.NAME, 0);
private static File pluginsDir;

@Container
Expand Down Expand Up @@ -92,7 +97,9 @@ static void setUpAll() throws IOException, InterruptedException {
assert integrationTestClassesPath.exists();

final Class<?>[] testConnectorClasses = new Class[]{
TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class
TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class,
TopicFromValueSchemaConnector.class,
TopicFromValueSchemaConnector.TopicFromValueSchemaConnectorTask.class
};
for (final Class<?> clazz : testConnectorClasses) {
final String packageName = clazz.getPackage().getName();
Expand Down Expand Up @@ -127,7 +134,12 @@ void setUp() throws ExecutionException, InterruptedException {

final NewTopic originalTopic = new NewTopic(TestSourceConnector.ORIGINAL_TOPIC, 1, (short) 1);
final NewTopic newTopic = new NewTopic(TestSourceConnector.NEW_TOPIC, 1, (short) 1);
adminClient.createTopics(Arrays.asList(originalTopic, newTopic)).all().get();
final NewTopic originalTopicForExtractTopicFromValue =
new NewTopic(TopicFromValueSchemaConnector.TOPIC, 1, (short) 1);
final NewTopic newTopicForExtractTopicFromValue =
new NewTopic(TopicFromValueSchemaConnector.NAME, 1, (short) 1);
adminClient.createTopics(Arrays.asList(originalTopic, newTopic, originalTopicForExtractTopicFromValue,
newTopicForExtractTopicFromValue)).all().get();

connectRunner = new ConnectRunner(pluginsDir, kafka.getBootstrapServers());
connectRunner.start();
Expand Down Expand Up @@ -159,19 +171,42 @@ final void testExtractTopic() throws ExecutionException, InterruptedException, I
connectorConfig.put("tasks.max", "1");
connectRunner.createConnector(connectorConfig);

checkMessageTopics(originalTopicPartition0, newTopicPartition0);
}

@Test
@Timeout(10)
final void testExtractTopicFromValueSchemaName() throws ExecutionException, InterruptedException, IOException {
final Map<String, String> connectorConfig = new HashMap<>();
connectorConfig.put("name", "test-source-connector");
connectorConfig.put("connector.class", TopicFromValueSchemaConnector.class.getName());
connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
connectorConfig.put("value.converter.value.subject.name.strategy",
"io.confluent.kafka.serializers.subject.RecordNameStrategy");
connectorConfig.put("transforms",
"ExtractTopicFromValueSchema");
connectorConfig.put("transforms.ExtractTopicFromValueSchema.type",
"io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name");
connectorConfig.put("tasks.max", "1");
connectRunner.createConnector(connectorConfig);
checkMessageTopics(originalTopicValueFromSchema, newTopicValueFromSchema);

}

final void checkMessageTopics(final TopicPartition originalTopicPartition, final TopicPartition newTopicPartition)
throws InterruptedException {
waitForCondition(
() -> consumer
.endOffsets(Arrays.asList(originalTopicPartition0, newTopicPartition0))
.values().stream().reduce(Long::sum).map(s -> s >= TestSourceConnector.MESSAGES_TO_PRODUCE)
.orElse(false),
5000, "Messages appear in any topic"
() -> consumer.endOffsets(Arrays.asList(originalTopicPartition, newTopicPartition))
.values().stream().reduce(Long::sum).map(s -> s >= TestSourceConnector.MESSAGES_TO_PRODUCE)
.orElse(false), 5000, "Messages appear in any topic"
);
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(
Arrays.asList(originalTopicPartition0, newTopicPartition0));
Arrays.asList(originalTopicPartition, newTopicPartition));
// The original topic should be empty.
assertEquals(0, endOffsets.get(originalTopicPartition0));
assertEquals(0, endOffsets.get(originalTopicPartition));
// The new topic should be non-empty.
assertEquals(TestSourceConnector.MESSAGES_TO_PRODUCE, endOffsets.get(newTopicPartition0));
assertEquals(TestSourceConnector.MESSAGES_TO_PRODUCE, endOffsets.get(newTopicPartition));
}

private void waitForCondition(final Supplier<Boolean> conditionChecker,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.transforms;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A connector needed for testing of ExtractTopicFromValueSchema.
*
* <p>It just produces a fixed number of struct records with value schema name set.
*/
public class TopicFromValueSchemaConnector extends SourceConnector {
static final int MESSAGES_TO_PRODUCE = 10;

private static final Logger log = LoggerFactory.getLogger(TopicFromValueSchemaConnector.class);
static final String TOPIC = "topic-for-value-schema-connector-test";
static final String FIELD = "field-0";

static final String NAME = "com.acme.schema.SchemaNameToTopic";

@Override
public void start(final Map<String, String> props) {
}

@Override
public Class<? extends Task> taskClass() {
return TopicFromValueSchemaConnectorTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(final int maxTasks) {
return Collections.singletonList(Collections.emptyMap());
}

@Override
public void stop() {
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public String version() {
return null;
}

public static class TopicFromValueSchemaConnectorTask extends SourceTask {
private int counter = 0;

private final Schema valueSchema = SchemaBuilder.struct()
.field(FIELD, SchemaBuilder.STRING_SCHEMA)
.name(NAME)
.schema();
private final Struct value = new Struct(valueSchema).put(FIELD, "Data");

@Override
public void start(final Map<String, String> props) {
log.info("Started TopicFromValueSchemaConnector!!!");
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
if (counter >= MESSAGES_TO_PRODUCE) {
return null; // indicate pause
}

final Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("partition", "0");
final Map<String, String> sourceOffset = new HashMap<>();
sourceOffset.put("offset", Integer.toString(counter));

counter += 1;

return Collections.singletonList(
new SourceRecord(sourcePartition, sourceOffset,
TOPIC,
valueSchema, value)
);
}

@Override
public void stop() {
}

@Override
public String version() {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.transforms;

import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.Transformation;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ExtractTopicFromValueSchema<R extends ConnectRecord<R>> implements Transformation<R> {

private static final Logger log = LoggerFactory.getLogger(ExtractTopicFromValueSchema.class);
mikaka-paf marked this conversation as resolved.
Show resolved Hide resolved

private ExtractTopicFromValueSchemaConfig config;
mikaka-paf marked this conversation as resolved.
Show resolved Hide resolved
private Map<String, String> schemaNameToTopicMap;
private Pattern pattern;

@Override
public ConfigDef config() {
return ExtractTopicFromValueSchemaConfig.config();
}

@Override
public void configure(final Map<String, ?> configs) {
this.config = new ExtractTopicFromValueSchemaConfig(configs);
schemaNameToTopicMap = config.schemaNameToTopicMap();
final Optional<String> regex = config.regEx();
regex.ifPresent(s -> pattern = Pattern.compile(s));
}

@Override
public R apply(final R record) {

if (null == record.valueSchema() || null == record.valueSchema().name()) {
throw new DataException(" value schema name can't be null: " + record);
}
// First check schema value name -> desired topic name mapping and use that if it is set.
if (schemaNameToTopicMap.containsKey(record.valueSchema().name())) {
return createConnectRecord(record, schemaNameToTopicMap.get(record.valueSchema().name()));
}
// Secondly check if regex parsing from schema value name is set and use that.
mikaka-paf marked this conversation as resolved.
Show resolved Hide resolved
final Optional<String> regex = config.regEx();
mikaka-paf marked this conversation as resolved.
Show resolved Hide resolved
if (pattern != null) {
final Matcher matcher = pattern.matcher(record.valueSchema().name());
if (matcher.find() && matcher.groupCount() == 1) {
return createConnectRecord(record, matcher.group(1));
}
mikaka-paf marked this conversation as resolved.
Show resolved Hide resolved
log.trace("No match with pattern {} from schema name {}", pattern.pattern(), record.valueSchema().name());
}
// If no other configurations are set use value schema name as new topic name.
return createConnectRecord(record, record.valueSchema().name());
}

private R createConnectRecord(final R record, final String newTopicName) {
return record.newRecord(
newTopicName,
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
record.value(),
record.timestamp(),
record.headers()
);
}

@Override
public void close() {
}

public static class Name<R extends ConnectRecord<R>> extends ExtractTopicFromValueSchema<R> {
mikaka-paf marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void close() {
}
}
}
Loading