Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][io] Improve loading sensitive fields for many connectors #54

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pulsar-io/aerospike/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class AerospikeAbstractSink<K, V> implements Sink<byte[]> {

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
aerospikeSinkConfig = AerospikeSinkConfig.load(config);
aerospikeSinkConfig = AerospikeSinkConfig.load(config, sinkContext);
if (aerospikeSinkConfig.getSeedHosts() == null
|| aerospikeSinkConfig.getKeyspace() == null
|| aerospikeSinkConfig.getColumnName() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.aerospike;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
Expand All @@ -26,6 +27,9 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;

@Data
@Accessors(chain = true)
Expand All @@ -38,21 +42,50 @@ public class AerospikeSinkConfig implements Serializable {
private String columnName;

// Optional
@FieldDoc(
required = false,
defaultValue = "",
sensitive = true,
help = "The username for authentication."
)
private String userName;
@FieldDoc(
required = false,
defaultValue = "",
sensitive = true,
help = "The password for authentication."
)
private String password;
private String keySet;
private int maxConcurrentRequests = 100;
private int timeoutMs = 100;
private int retries = 1;


/**
* @deprecated Use {@link #load(String, SinkContext)} instead.
*/
@Deprecated
public static AerospikeSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class);
}

public static AerospikeSinkConfig load(String yamlFile, SinkContext context) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return load(mapper.readValue(new File(yamlFile), new TypeReference<Map<String, Object>>() {}), context);
}

/**
* @deprecated Use {@link #load(Map, SinkContext)} instead.
*/
@Deprecated
public static AerospikeSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), AerospikeSinkConfig.class);
}

public static AerospikeSinkConfig load(Map<String, Object> map, SinkContext context) {
return IOConfigUtils.loadWithSecrets(map, AerospikeSinkConfig.class, context);
}
}
5 changes: 5 additions & 0 deletions pulsar-io/canal/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class CanalAbstractSource<V> extends PushSource<V> {

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
canalSourceConfig = CanalSourceConfig.load(config);
canalSourceConfig = CanalSourceConfig.load(config, sourceContext);
if (canalSourceConfig.getCluster()) {
connector = CanalConnectors.newClusterConnector(canalSourceConfig.getZkServers(),
canalSourceConfig.getDestination(), canalSourceConfig.getUsername(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.canal;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
Expand All @@ -26,6 +27,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;


Expand Down Expand Up @@ -80,14 +83,31 @@ public class CanalSourceConfig implements Serializable{
help = "The batch size to fetch from canal.")
private int batchSize = 1000;


/**
* @deprecated Use {@link #load(String, SourceContext)} instead.
*/
@Deprecated
public static CanalSourceConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), CanalSourceConfig.class);
}

public static CanalSourceConfig load(String yamlFile, SourceContext context) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return load(mapper.readValue(new File(yamlFile), new TypeReference<Map<String, Object>>() {}), context);
}

/**
* @deprecated Use {@link #load(Map, SourceContext)} instead.
*/
@Deprecated
public static CanalSourceConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), CanalSourceConfig.class);
}

public static CanalSourceConfig load(Map<String, Object> map, SourceContext context) {
return IOConfigUtils.loadWithSecrets(map, CanalSourceConfig.class, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ private static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz,
}
}
configs.computeIfAbsent(field.getName(), key -> {
if (fieldDoc.required()) {
throw new IllegalArgumentException(field.getName() + " cannot be null");
}
String value = fieldDoc.defaultValue();
if (value != null && !value.isEmpty()) {
return value;
}
if (fieldDoc.required()) {
throw new IllegalArgumentException(field.getName() + " cannot be null");
}
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ static class TestDefaultConfig {
)
protected String testRequired;

@FieldDoc(
required = true,
defaultValue = "defaultStr",
sensitive = true,
help = "requiredWithDefault"
)
protected String requiredWithDefault;

@FieldDoc(
required = false,
defaultValue = "defaultStr",
Expand Down Expand Up @@ -300,6 +308,8 @@ public void testDefaultValue() {
TestDefaultConfig testDefaultConfig =
IOConfigUtils.loadWithSecrets(configMap, TestDefaultConfig.class, new TestSinkContext());
Assert.assertEquals(testDefaultConfig.getDefaultStr(), "defaultStr");
Assert.assertEquals(testDefaultConfig.getTestRequired(), "test");
Assert.assertEquals(testDefaultConfig.getRequiredWithDefault(), "defaultStr");
Assert.assertEquals(testDefaultConfig.isDefaultBool(), true);
Assert.assertEquals(testDefaultConfig.getDefaultInt(), 100);
Assert.assertEquals(testDefaultConfig.getDefaultLong(), 100);
Expand Down
5 changes: 5 additions & 0 deletions pulsar-io/influxdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-instance</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class InfluxDBGenericRecordSink implements Sink<GenericRecord> {
@Override
public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
try {
val configV2 = InfluxDBSinkConfig.load(map);
val configV2 = InfluxDBSinkConfig.load(map, sinkContext);
configV2.validate();
sink = new InfluxDBSink();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.influxdb.v1;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Preconditions;
Expand All @@ -27,6 +28,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;

/**
Expand Down Expand Up @@ -94,7 +97,7 @@ public class InfluxDBSinkConfig implements Serializable {

@FieldDoc(
required = false,
defaultValue = "1000L",
defaultValue = "1000",
help = "The InfluxDB operation time in milliseconds")
private long batchTimeMs = 1000L;

Expand All @@ -105,16 +108,34 @@ public class InfluxDBSinkConfig implements Serializable {
)
private int batchSize = 200;


/**
* @deprecated Use {@link #load(String, SinkContext)} instead.
*/
@Deprecated
public static InfluxDBSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class);
}

public static InfluxDBSinkConfig load(String yamlFile, SinkContext context) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return load(mapper.readValue(new File(yamlFile), new TypeReference<Map<String, Object>>() {}), context);
}

/**
* @deprecated Use {@link #load(Map, SinkContext)} instead.
*/
@Deprecated
public static InfluxDBSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), InfluxDBSinkConfig.class);
}

public static InfluxDBSinkConfig load(Map<String, Object> map, SinkContext context) {
return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, context);
}

public void validate() {
Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not set.");
Preconditions.checkNotNull(database, "database property not set.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class InfluxDBSink extends BatchSink<Point, GenericRecord> {

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config);
InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config, sinkContext);
influxDBSinkConfig.validate();
super.init(influxDBSinkConfig.getBatchTimeMs(), influxDBSinkConfig.getBatchSize());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.influxdb.v2;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Preconditions;
Expand All @@ -27,6 +28,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;

/**
Expand Down Expand Up @@ -87,7 +90,7 @@ public class InfluxDBSinkConfig implements Serializable {

@FieldDoc(
required = false,
defaultValue = "1000L",
defaultValue = "1000",
help = "The InfluxDB operation time in milliseconds")
private long batchTimeMs = 1000;

Expand All @@ -98,16 +101,33 @@ public class InfluxDBSinkConfig implements Serializable {
)
private int batchSize = 200;

/**
* @deprecated Use {@link #load(String, SinkContext)} instead.
*/
@Deprecated
public static InfluxDBSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class);
}

public static InfluxDBSinkConfig load(String yamlFile, SinkContext context) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return load(mapper.readValue(new File(yamlFile), new TypeReference<Map<String, Object>>() {}), context);
}

/**
* @deprecated Use {@link #load(Map, SinkContext)} instead.
*/
@Deprecated
public static InfluxDBSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), InfluxDBSinkConfig.class);
}

public static InfluxDBSinkConfig load(Map<String, Object> map, SinkContext context) {
return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, context);
}

public void validate() {
Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not set.");
Preconditions.checkNotNull(token, "token property not set.");
Expand Down
Loading
Loading