From 3d9ece19e1d87e775f92d334cc689e80d30e6e20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Tue, 22 Mar 2022 09:26:18 +0100 Subject: [PATCH] [tests] Add Kinesis Sink integration tests (#42) * [tests] Add Kinesis connector integration tests * fixup --- pulsar-io/aws/pom.xml | 1 + pulsar-io/kinesis/pom.xml | 12 + .../pulsar/io/kinesis/BaseKinesisConfig.java | 15 ++ .../apache/pulsar/io/kinesis/KinesisSink.java | 7 + .../io/kinesis/KinesisSinkAuthTest.java | 151 +++++++++++ .../pulsar/io/kinesis/KinesisSinkTest.java | 246 ++++++++++-------- .../latest-version-image/Dockerfile | 1 + tests/integration/pom.xml | 19 ++ .../io/sinks/KinesisSinkTester.java | 132 ++++++++++ .../integration/io/sinks/PulsarSinksTest.java | 5 + .../integration/io/sinks/SinkTester.java | 1 + 11 files changed, 475 insertions(+), 115 deletions(-) create mode 100644 pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkAuthTest.java create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java diff --git a/pulsar-io/aws/pom.xml b/pulsar-io/aws/pom.xml index 33fb8fd470a76..9b0df600b32c7 100644 --- a/pulsar-io/aws/pom.xml +++ b/pulsar-io/aws/pom.xml @@ -53,6 +53,7 @@ sts 2.10.56 + diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml index 747b96abd44ad..7f961be2e9e68 100644 --- a/pulsar-io/kinesis/pom.xml +++ b/pulsar-io/kinesis/pom.xml @@ -108,6 +108,18 @@ 2.3.0 + + org.testcontainers + localstack + test + + + + org.awaitility + awaitility + test + + diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java index cad2ac5e29aaa..d3fd5a2764c4d 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java @@ -38,6 +38,13 @@ public abstract class BaseKinesisConfig implements Serializable { ) private String awsEndpoint = ""; + @FieldDoc( + required = false, + defaultValue = "", + help = "Kinesis end-point port. It can be found at https://docs.aws.amazon.com/general/latest/gr/rande.html" + ) + private Integer awsEndpointPort; + @FieldDoc( required = false, defaultValue = "", @@ -45,6 +52,14 @@ public abstract class BaseKinesisConfig implements Serializable { ) private String awsRegion = ""; + + @FieldDoc( + required = false, + defaultValue = "false", + help = "Tell to Kinesis Client to skip certificate validation. This is useful while performing local tests, it's recommended to always validate certificate in production environment." + ) + private Boolean skipCertificateValidation = false; + @FieldDoc( required = true, defaultValue = "", diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index ed7c70cd76d9f..47ba07d8d879f 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -164,10 +164,17 @@ public void open(Map config, SinkContext sinkContext) throws Exc KinesisProducerConfiguration kinesisConfig = new KinesisProducerConfiguration(); kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint()); + if (kinesisSinkConfig.getAwsEndpointPort() != null) { + kinesisConfig.setKinesisPort(kinesisSinkConfig.getAwsEndpointPort()); + } kinesisConfig.setRegion(kinesisSinkConfig.getAwsRegion()); kinesisConfig.setThreadingModel(ThreadingModel.POOLED); kinesisConfig.setThreadPoolSize(4); kinesisConfig.setCollectionMaxCount(1); + if (kinesisSinkConfig.getSkipCertificateValidation() != null && + kinesisSinkConfig.getSkipCertificateValidation()) { + kinesisConfig.setVerifyCertificate(false); + } AWSCredentialsProvider credentialsProvider = createCredentialProvider( kinesisSinkConfig.getAwsCredentialPluginName(), kinesisSinkConfig.getAwsCredentialPluginParam()) diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkAuthTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkAuthTest.java new file mode 100644 index 0000000000000..4284a37098b5b --- /dev/null +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkAuthTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.kinesis; + +import java.io.IOException; +import java.util.Map; + +import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin; +import org.testng.Assert; +import org.testng.annotations.Test; +import org.testng.collections.Maps; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicSessionCredentials; +import com.google.gson.Gson; + +public class KinesisSinkAuthTest { + + @Test + public void testDefaultCredentialProvider() throws Exception { + KinesisSink sink = new KinesisSink(); + Map credentialParam = Maps.newHashMap(); + String awsCredentialPluginParam = new Gson().toJson(credentialParam); + try { + sink.defaultCredentialProvider(awsCredentialPluginParam); + Assert.fail("accessKey and SecretKey validation not applied"); + } catch (IllegalArgumentException ie) { + // Ok.. + } + + final String accesKey = "ak"; + final String secretKey = "sk"; + credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey); + credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey); + awsCredentialPluginParam = new Gson().toJson(credentialParam); + AWSCredentialsProvider credentialProvider = sink.defaultCredentialProvider(awsCredentialPluginParam) + .getCredentialProvider(); + Assert.assertNotNull(credentialProvider); + Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), accesKey); + Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), secretKey); + + sink.close(); + } + + @Test + public void testCredentialProvider() throws Exception { + KinesisSink sink = new KinesisSink(); + + final String accesKey = "ak"; + final String secretKey = "sk"; + Map credentialParam = Maps.newHashMap(); + credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey); + credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey); + String awsCredentialPluginParam = new Gson().toJson(credentialParam); + AWSCredentialsProvider credentialProvider = sink.createCredentialProvider(null, awsCredentialPluginParam) + .getCredentialProvider(); + Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), accesKey); + Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), secretKey); + + credentialProvider = sink.createCredentialProvider(AwsCredentialProviderPluginImpl.class.getName(), "{}") + .getCredentialProvider(); + Assert.assertNotNull(credentialProvider); + Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), + AwsCredentialProviderPluginImpl.accessKey); + Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), + AwsCredentialProviderPluginImpl.secretKey); + Assert.assertEquals(((BasicSessionCredentials) credentialProvider.getCredentials()).getSessionToken(), + AwsCredentialProviderPluginImpl.sessionToken); + + sink.close(); + } + + @Test + public void testCredentialProviderPlugin() throws Exception { + KinesisSink sink = new KinesisSink(); + + AWSCredentialsProvider credentialProvider = sink + .createCredentialProviderWithPlugin(AwsCredentialProviderPluginImpl.class.getName(), "{}") + .getCredentialProvider(); + Assert.assertNotNull(credentialProvider); + Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), + AwsCredentialProviderPluginImpl.accessKey); + Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), + AwsCredentialProviderPluginImpl.secretKey); + Assert.assertEquals(((BasicSessionCredentials) credentialProvider.getCredentials()).getSessionToken(), + AwsCredentialProviderPluginImpl.sessionToken); + + sink.close(); + } + + public static class AwsCredentialProviderPluginImpl implements AwsCredentialProviderPlugin { + + public final static String accessKey = "ak"; + public final static String secretKey = "sk"; + public final static String sessionToken = "st"; + + public void init(String param) { + // no-op + } + + @Override + public AWSCredentialsProvider getCredentialProvider() { + return new AWSCredentialsProvider() { + @Override + public AWSCredentials getCredentials() { + return new BasicSessionCredentials(accessKey, secretKey, sessionToken) { + + @Override + public String getAWSAccessKeyId() { + return accessKey; + } + @Override + public String getAWSSecretKey() { + return secretKey; + } + @Override + public String getSessionToken() { + return sessionToken; + } + }; + } + @Override + public void refresh() { + // TODO Auto-generated method stub + } + }; + } + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + } + } + +} diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java index 7172c04bd43cd..07332defacb88 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java @@ -18,137 +18,153 @@ */ package org.apache.pulsar.io.kinesis; -import java.io.IOException; +import lombok.SneakyThrows; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.SinkContext; +import org.awaitility.Awaitility; +import org.jetbrains.annotations.NotNull; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.utility.DockerImageName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin; -import org.testng.Assert; -import org.testng.annotations.Test; -import org.testng.collections.Maps; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.BasicSessionCredentials; -import com.google.gson.Gson; -/** - * Unit test of {@link KinesisSink}. - */ public class KinesisSinkTest { - @Test - public void testDefaultCredentialProvider() throws Exception { - KinesisSink sink = new KinesisSink(); - Map credentialParam = Maps.newHashMap(); - String awsCredentialPluginParam = new Gson().toJson(credentialParam); - try { - sink.defaultCredentialProvider(awsCredentialPluginParam); - Assert.fail("accessKey and SecretKey validation not applied"); - } catch (IllegalArgumentException ie) { - // Ok.. - } + public static final String STREAM_NAME = "my-stream-1"; + public static LocalStackContainer LOCALSTACK_CONTAINER = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest")) + .withServices(LocalStackContainer.Service.KINESIS); - final String accesKey = "ak"; - final String secretKey = "sk"; - credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey); - credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey); - awsCredentialPluginParam = new Gson().toJson(credentialParam); - AWSCredentialsProvider credentialProvider = sink.defaultCredentialProvider(awsCredentialPluginParam) - .getCredentialProvider(); - Assert.assertNotNull(credentialProvider); - Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), accesKey); - Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), secretKey); - - sink.close(); + @BeforeClass(alwaysRun = true) + public void beforeClass() throws Exception { + LOCALSTACK_CONTAINER.start(); + createClient().createStream(CreateStreamRequest.builder().streamName(STREAM_NAME).shardCount(1).build()).get(); } - @Test - public void testCredentialProvider() throws Exception { - KinesisSink sink = new KinesisSink(); - - final String accesKey = "ak"; - final String secretKey = "sk"; - Map credentialParam = Maps.newHashMap(); - credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey); - credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey); - String awsCredentialPluginParam = new Gson().toJson(credentialParam); - AWSCredentialsProvider credentialProvider = sink.createCredentialProvider(null, awsCredentialPluginParam) - .getCredentialProvider(); - Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), accesKey); - Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), secretKey); - - credentialProvider = sink.createCredentialProvider(AwsCredentialProviderPluginImpl.class.getName(), "{}") - .getCredentialProvider(); - Assert.assertNotNull(credentialProvider); - Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), - AwsCredentialProviderPluginImpl.accessKey); - Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), - AwsCredentialProviderPluginImpl.secretKey); - Assert.assertEquals(((BasicSessionCredentials) credentialProvider.getCredentials()).getSessionToken(), - AwsCredentialProviderPluginImpl.sessionToken); - - sink.close(); + @AfterClass(alwaysRun = true) + public void afterClass() throws Exception { + LOCALSTACK_CONTAINER.stop(); } @Test - public void testCredentialProviderPlugin() throws Exception { - KinesisSink sink = new KinesisSink(); - - AWSCredentialsProvider credentialProvider = sink - .createCredentialProviderWithPlugin(AwsCredentialProviderPluginImpl.class.getName(), "{}") - .getCredentialProvider(); - Assert.assertNotNull(credentialProvider); - Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), - AwsCredentialProviderPluginImpl.accessKey); - Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), - AwsCredentialProviderPluginImpl.secretKey); - Assert.assertEquals(((BasicSessionCredentials) credentialProvider.getCredentials()).getSessionToken(), - AwsCredentialProviderPluginImpl.sessionToken); - - sink.close(); - } + public void testWrite() throws Exception { + final Record mockRecord = mock(Record.class); + AtomicBoolean ackCalled = new AtomicBoolean(); + Mockito.doAnswer((Answer) invocation -> { + ackCalled.set(true); + return null; + }).when(mockRecord).ack(); + when(mockRecord.getKey()).thenAnswer(new Answer>() { + long sequenceCounter = 0; + public Optional answer(InvocationOnMock invocation) throws Throwable { + return Optional.of( "key-" + sequenceCounter++); + }}); + + when(mockRecord.getValue()).thenReturn("hello".getBytes(StandardCharsets.UTF_8)); + + try (final KinesisSink sink = new KinesisSink();) { + Map map = createConfig(); + SinkContext mockSinkContext = mock(SinkContext.class); + + sink.open(map, mockSinkContext); + for (int i = 0; i < 10; i++) { + sink.write(mockRecord); + } + Awaitility.await().untilAsserted(() -> { + assertTrue(ackCalled.get()); + }); + final GetRecordsResponse getRecords = getStreamRecords(); + assertEquals(getRecords.records().size(), 10); + + for (software.amazon.awssdk.services.kinesis.model.Record record : getRecords.records()) { + assertEquals(record.data().asString(StandardCharsets.UTF_8), "hello"); + } - public static class AwsCredentialProviderPluginImpl implements AwsCredentialProviderPlugin { + } + } - public final static String accessKey = "ak"; - public final static String secretKey = "sk"; - public final static String sessionToken = "st"; + @NotNull + private Map createConfig() { + final URI endpointOverride = LOCALSTACK_CONTAINER.getEndpointOverride(LocalStackContainer.Service.KINESIS); + Map map = new HashMap<>(); + map.put("awsEndpoint", endpointOverride.getHost()); + map.put("awsEndpointPort", endpointOverride.getPort()); + map.put("skipCertificateValidation", true); + map.put("awsKinesisStreamName", STREAM_NAME); + map.put("awsRegion", "us-east-1"); + map.put("awsCredentialPluginParam", "{\"accessKey\":\"access\",\"secretKey\":\"secret\"}"); + return map; + } - public void init(String param) { - // no-op - } + private KinesisAsyncClient createClient() { + final KinesisAsyncClient client = KinesisAsyncClient.builder() + .credentialsProvider(new AwsCredentialsProvider() { + @Override + public AwsCredentials resolveCredentials() { + return AwsBasicCredentials.create( + "access", + "secret"); + } + }) + .region(Region.US_EAST_1) + .endpointOverride(LOCALSTACK_CONTAINER.getEndpointOverride(LocalStackContainer.Service.KINESIS)) + .build(); + return client; + } - @Override - public AWSCredentialsProvider getCredentialProvider() { - return new AWSCredentialsProvider() { - @Override - public AWSCredentials getCredentials() { - return new BasicSessionCredentials(accessKey, secretKey, sessionToken) { - - @Override - public String getAWSAccessKeyId() { - return accessKey; - } - @Override - public String getAWSSecretKey() { - return secretKey; - } - @Override - public String getSessionToken() { - return sessionToken; - } - }; - } - @Override - public void refresh() { - // TODO Auto-generated method stub - } - }; - } - @Override - public void close() throws IOException { - // TODO Auto-generated method stub - } + @SneakyThrows + private GetRecordsResponse getStreamRecords() { + final KinesisAsyncClient client = createClient(); + final String shardId = client.listShards( + ListShardsRequest.builder() + .streamName(STREAM_NAME) + .build() + ).get() + .shards() + .get(0) + .shardId(); + + final String iterator = client.getShardIterator(GetShardIteratorRequest.builder() + .streamName(STREAM_NAME) + .shardId(shardId) + .shardIteratorType(ShardIteratorType.TRIM_HORIZON) + .build()) + .get() + .shardIterator(); + final GetRecordsResponse response = client.getRecords( + GetRecordsRequest + .builder() + .shardIterator(iterator) + .build()) + .get(); + return response; } } diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile index 70a47d3446eb4..bbbbe0a337a31 100644 --- a/tests/docker-images/latest-version-image/Dockerfile +++ b/tests/docker-images/latest-version-image/Dockerfile @@ -106,6 +106,7 @@ COPY --from=pulsar-all /pulsar/connectors/pulsar-io-debezium-*.nar /pulsar/conne COPY --from=pulsar-all /pulsar/connectors/pulsar-io-elastic-*.nar /pulsar/connectors/ COPY --from=pulsar-all /pulsar/connectors/pulsar-io-jdbc-postgres-*.nar /pulsar/connectors/ COPY --from=pulsar-all /pulsar/connectors/pulsar-io-kafka-*.nar /pulsar/connectors/ +COPY --from=pulsar-all /pulsar/connectors/pulsar-io-kinesis-*.nar /pulsar/connectors/ # download Oracle JDBC driver for Oracle Debezium Connector tests RUN mkdir -p META-INF/bundled-dependencies diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index fc7bf1d790d16..7d4cea22ce08d 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -195,6 +195,25 @@ awaitility test + + + + org.testcontainers + localstack + test + + + software.amazon.kinesis + amazon-kinesis-client + 2.2.8 + test + + + com.amazonaws + aws-java-sdk-core + test + + diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java new file mode 100644 index 0000000000000..968e19faaf7b8 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io.sinks; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.awaitility.Awaitility; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import static org.testng.Assert.assertTrue; + +@Slf4j +public class KinesisSinkTester extends SinkTester { + + private static final String NAME = "kinesis"; + public static final String STREAM_NAME = "my-stream-1"; + private KinesisAsyncClient client; + + public KinesisSinkTester() { + super(NAME, SinkType.KINESIS); + + sinkConfig.put("awsKinesisStreamName", STREAM_NAME); + sinkConfig.put("awsRegion", "us-east-1"); + sinkConfig.put("awsCredentialPluginParam", "{\"accessKey\":\"access\",\"secretKey\":\"secret\"}"); + } + + + @Override + public void prepareSink() throws Exception { + final LocalStackContainer localStackContainer = getServiceContainer(); + final URI endpointOverride = localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS); + sinkConfig.put("awsEndpoint", NAME); + sinkConfig.put("awsEndpointPort", endpointOverride.getPort()); + sinkConfig.put("skipCertificateValidation", true); + client = KinesisAsyncClient.builder().credentialsProvider(new AwsCredentialsProvider() { + @Override + public AwsCredentials resolveCredentials() { + return AwsBasicCredentials.create( + "access", + "secret"); + } + }) + .region(Region.US_EAST_1) + .endpointOverride(endpointOverride) + .build(); + log.info("prepareSink for kinesis: creating stream {}, endpoint {}", STREAM_NAME, endpointOverride); + client.createStream(CreateStreamRequest.builder() + .streamName(STREAM_NAME) + .shardCount(1) + .build()) + .get(); + log.info("prepareSink for kinesis: created stream {}", STREAM_NAME); + + + } + + @Override + protected LocalStackContainer createSinkService(PulsarCluster cluster) { + return new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest")) + .withServices(LocalStackContainer.Service.KINESIS); + } + + @Override + @SneakyThrows + public void validateSinkResult(Map kvs) { + Awaitility.await().untilAsserted(() -> validateSinkResult()); + } + + @SneakyThrows + private void validateSinkResult() { + final String shardId = client.listShards( + ListShardsRequest.builder() + .streamName(STREAM_NAME) + .build() + ).get() + .shards() + .get(0) + .shardId(); + + final String iterator = client.getShardIterator(GetShardIteratorRequest.builder() + .streamName(STREAM_NAME) + .shardId(shardId) + .shardIteratorType(ShardIteratorType.TRIM_HORIZON) + .build()) + .get() + .shardIterator(); + final GetRecordsResponse response = client.getRecords( + GetRecordsRequest + .builder() + .shardIterator(iterator) + .build()) + .get(); + assertTrue(response.hasRecords()); + for (Record record : response.records()) { + assertTrue(record.data().asString(StandardCharsets.UTF_8).startsWith("value-")); + } + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java index 8dbef586665b5..c47645ebe8ac3 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java @@ -90,4 +90,9 @@ public void testRabbitMQSink() throws Exception { testSink(new RabbitMQSinkTester(containerName), true, new RabbitMQSourceTester(containerName)); } + @Test(groups = "sink") + public void testKinesis() throws Exception { + testSink(new KinesisSinkTester(), true); + } + } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java index 4037a8e926fb4..0d0a2ce87d02e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java @@ -42,6 +42,7 @@ public enum SinkType { UNDEFINED("undefined"), CASSANDRA("cassandra"), KAFKA("kafka"), + KINESIS("kinesis"), JDBC_POSTGRES("jdbc-postgres"), HDFS("hdfs"), ELASTIC_SEARCH("elastic_search"),