Skip to content

Commit

Permalink
[improve][io] Improve kinesis connector config. (#21004)
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Aug 22, 2023
1 parent ee91edc commit 4634311
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 262 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public abstract class BaseKinesisConfig implements Serializable {
)
private String awsEndpoint = "";

@FieldDoc(
required = false,
defaultValue = "",
help = "Cloudwatch end-point url. It can be found at "
+ "https://docs.aws.amazon.com/general/latest/gr/rande.html"
)
private String cloudwatchEndpoint = "";

@FieldDoc(
required = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.io.kinesis;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
Expand Down Expand Up @@ -49,7 +48,6 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
Expand Down Expand Up @@ -155,17 +153,16 @@ public void close() {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
kinesisSinkConfig = IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, sinkContext);
kinesisSinkConfig = KinesisSinkConfig.load(config, sinkContext);
this.sinkContext = sinkContext;

checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint())
|| isNotBlank(kinesisSinkConfig.getAwsRegion()),
"Either the aws-end-point or aws-region must be set");
checkArgument(isNotBlank(kinesisSinkConfig.getAwsCredentialPluginParam()), "empty aws-credential param");

KinesisProducerConfiguration kinesisConfig = new KinesisProducerConfiguration();
kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint());
if (isNotBlank(kinesisSinkConfig.getAwsEndpoint())) {
kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint());
}
if (isNotBlank(kinesisSinkConfig.getCloudwatchEndpoint())) {
kinesisConfig.setCloudwatchEndpoint(kinesisSinkConfig.getCloudwatchEndpoint());
}
if (kinesisSinkConfig.getAwsEndpointPort() != null) {
kinesisConfig.setKinesisPort(kinesisSinkConfig.getAwsEndpointPort());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
*/
package org.apache.pulsar.io.kinesis;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
import java.io.IOException;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;

@Data
Expand Down Expand Up @@ -103,9 +104,12 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
help = "The maximum delay(in milliseconds) between retries.")
private long retryMaxDelayInMillis = 60000;

public static KinesisSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), KinesisSinkConfig.class);
public static KinesisSinkConfig load(Map<String, Object> config, SinkContext sinkContext) {
KinesisSinkConfig kinesisSinkConfig = IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, sinkContext);
checkArgument(isNotBlank(kinesisSinkConfig.getAwsRegion())
|| (isNotBlank(kinesisSinkConfig.getAwsEndpoint()) && isNotBlank(kinesisSinkConfig.getCloudwatchEndpoint())),
"Either \"awsRegion\" must be set OR all of [\"awsEndpoint\", \"cloudwatchEndpoint\"] must be set.");
return kinesisSinkConfig;
}

public enum MessageFormat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@
*/
package org.apache.pulsar.io.kinesis;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.net.InetAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig;
Expand Down Expand Up @@ -68,18 +64,7 @@ public void close() throws Exception {

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config, KinesisSourceConfig.class, sourceContext);

checkArgument(isNotBlank(kinesisSourceConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
checkArgument(isNotBlank(kinesisSourceConfig.getAwsEndpoint())
|| isNotBlank(kinesisSourceConfig.getAwsRegion()),
"Either the aws-end-point or aws-region must be set");
checkArgument(isNotBlank(kinesisSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param");

if (kinesisSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
checkArgument((kinesisSourceConfig.getStartAtTime() != null), "Timestamp must be specified");
}

this.kinesisSourceConfig = KinesisSourceConfig.load(config, sourceContext);
queue = new LinkedBlockingQueue<>(kinesisSourceConfig.getReceiveQueueSize());
workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
*/
package org.apache.pulsar.io.kinesis;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
import java.io.IOException;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.Serializable;
import java.net.URI;
import java.util.Date;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
Expand Down Expand Up @@ -76,7 +77,7 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab

@FieldDoc(
required = false,
defaultValue = "Apache Pulsar IO Connector",
defaultValue = "pulsar-kinesis",
help = "Name of the Amazon Kinesis application. By default the application name is included "
+ "in the user agent string used to make AWS requests. This can assist with troubleshooting "
+ "(e.g. distinguish requests made by separate connectors instances)."
Expand Down Expand Up @@ -122,24 +123,27 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab
)
private String dynamoEndpoint = "";

@FieldDoc(
required = false,
defaultValue = "",
help = "Cloudwatch end-point url. It can be found at https://docs.aws.amazon.com/general/latest/gr/rande.html"
)
private String cloudwatchEndpoint = "";

@FieldDoc(
required = false,
defaultValue = "true",
help = "When true, uses Kinesis enhanced fan-out, when false, uses polling"
)
private boolean useEnhancedFanOut = true;


public static KinesisSourceConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), KinesisSourceConfig.class);
public static KinesisSourceConfig load(Map<String, Object> config, SourceContext sourceContext) {
KinesisSourceConfig kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config,
KinesisSourceConfig.class, sourceContext);
boolean isNotBlankEndpoint = isNotBlank(kinesisSourceConfig.getAwsEndpoint())
&& isNotBlank(kinesisSourceConfig.getCloudwatchEndpoint())
&& isNotBlank(kinesisSourceConfig.getDynamoEndpoint());
checkArgument(isNotBlank(kinesisSourceConfig.getAwsRegion()) || isNotBlankEndpoint,
"Either \"awsRegion\" must be set OR all of "
+ "[ \"awsEndpoint\", \"cloudwatchEndpoint\", and \"dynamoEndpoint\" ] must be set.");
if (kinesisSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
checkArgument((kinesisSourceConfig.getStartAtTime() != null),
"When initialPositionInStream is AT_TIMESTAMP, startAtTime must be specified");
}
return kinesisSourceConfig;
}

public KinesisAsyncClient buildKinesisAsyncClient(AwsCredentialProviderPlugin credPlugin) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,16 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
import org.mockito.Mockito;
import org.testng.annotations.Test;

public class KinesisSinkConfigTests {

@Test
public final void loadFromYamlFileTest() throws IOException {
File yamlFile = getFile("sinkConfig.yaml");
KinesisSinkConfig config = KinesisSinkConfig.load(yamlFile.getAbsolutePath());

assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
assertEquals(config.getAwsRegion(), "us-east-1");
assertEquals(config.getAwsKinesisStreamName(), "my-stream");
assertEquals(config.getAwsCredentialPluginParam(),
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
assertEquals(config.getMessageFormat(), MessageFormat.ONLY_RAW_PAYLOAD);
assertEquals(true, config.isRetainOrdering());
}

@Test
public final void loadFromMapTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
Expand All @@ -58,7 +40,7 @@ public final void loadFromMapTest() throws IOException {
map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");

SinkContext sinkContext = Mockito.mock(SinkContext.class);
KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSinkConfig.class, sinkContext);
KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext);

assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
Expand All @@ -78,7 +60,7 @@ public final void loadFromMapCredentialFromSecretTest() throws IOException {
SinkContext sinkContext = Mockito.mock(SinkContext.class);
Mockito.when(sinkContext.getSecret("awsCredentialPluginParam"))
.thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSinkConfig.class, sinkContext);
KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext);

assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
Expand All @@ -88,8 +70,13 @@ public final void loadFromMapCredentialFromSecretTest() throws IOException {
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
}

private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());
@Test(expectedExceptions = IllegalArgumentException.class)
public final void missCloudWatchEndpointTest() {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("awsEndpoint", "https://some.endpoint.aws");
map.put("awsKinesisStreamName", "my-stream");
map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
SinkContext sinkContext = Mockito.mock(SinkContext.class);
KinesisSinkConfig.load(map, sinkContext);
}
}
Loading

0 comments on commit 4634311

Please sign in to comment.