Skip to content

Commit

Permalink
[improve][io] Debezium sources: Support loading config from secrets (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Preuß authored Dec 23, 2022
1 parent 513e671 commit 38053b5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.KafkaConnectSource;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;

@Slf4j
public abstract class DebeziumSource extends KafkaConnectSource {
private static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
private static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
Expand Down Expand Up @@ -60,11 +62,25 @@ public static String topicNamespace(SourceContext sourceContext) {
+ (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace);
}

public static void tryLoadingConfigSecret(String secretName, Map<String, Object> config, SourceContext context) {
try {
String secret = context.getSecret(secretName);
if (secret != null) {
config.put(secretName, secret);
log.info("Config key {} set from secret.", secretName);
}
} catch (Exception e) {
log.warn("Failed to read secret {}.", secretName, e);
}
}

public abstract void setDbConnectorTask(Map<String, Object> config) throws Exception;

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
setDbConnectorTask(config);
tryLoadingConfigSecret("database.user", config, sourceContext);
tryLoadingConfigSecret("database.password", config, sourceContext);

// key.converter
setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.debezium.DebeziumSource;

/**
Expand All @@ -32,4 +33,11 @@ public class DebeziumMongoDbSource extends DebeziumSource {
public void setDbConnectorTask(Map<String, Object> config) throws Exception {
throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
}

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
tryLoadingConfigSecret("mongodb.user", config, sourceContext);
tryLoadingConfigSecret("mongodb.password", config, sourceContext);
super.open(config, sourceContext);
}
}

0 comments on commit 38053b5

Please sign in to comment.