From 9d5a50c2a6d6e92c76f4f4e867da6681dd07c7ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Preu=C3=9F?= Date: Tue, 20 Dec 2022 10:07:09 +0100 Subject: [PATCH] [improve][io] Debezium sources: Support loading config from secrets --- .../pulsar/io/debezium/DebeziumSource.java | 16 ++++++++++++++++ .../debezium/mongodb/DebeziumMongoDbSource.java | 8 ++++++++ 2 files changed, 24 insertions(+) diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java index 4cdd363dbe9a9..6c422c4f036a9 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java @@ -20,12 +20,14 @@ import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.kafka.connect.KafkaConnectSource; import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig; +@Slf4j public abstract class DebeziumSource extends KafkaConnectSource { private static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter"; private static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"; @@ -60,11 +62,25 @@ public static String topicNamespace(SourceContext sourceContext) { + (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace); } + public static void tryLoadingConfigSecret(String secretName, Map config, SourceContext context) { + try { + String secret = context.getSecret(secretName); + if (secret != null) { + config.put(secretName, secret); + log.info("Config key {} set from secret.", secretName); + } + } catch (Exception e) { + log.warn("Failed to read secret {}.", secretName, e); + } + } + public abstract void setDbConnectorTask(Map config) throws Exception; @Override public void open(Map config, SourceContext sourceContext) throws Exception { setDbConnectorTask(config); + tryLoadingConfigSecret("database.user", config, sourceContext); + tryLoadingConfigSecret("database.password", config, sourceContext); // key.converter setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); diff --git a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java index a78664301be8e..88acbc61c7732 100644 --- a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java +++ b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.debezium.DebeziumSource; /** @@ -32,4 +33,11 @@ public class DebeziumMongoDbSource extends DebeziumSource { public void setDbConnectorTask(Map config) throws Exception { throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK); } + + @Override + public void open(Map config, SourceContext sourceContext) throws Exception { + tryLoadingConfigSecret("mongodb.user", config, sourceContext); + tryLoadingConfigSecret("mongodb.password", config, sourceContext); + super.open(config, sourceContext); + } }