Skip to content

Commit

Permalink
#74: added first integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
AnastasiiaSergienko committed Mar 16, 2020
1 parent 76b2340 commit 0f83709
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 29 deletions.
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ before_install:

matrix:
include:
- jdk: openjdk8
- jdk: openjdk11
scala: 2.11.12

- jdk: oraclejdk8
- jdk: oraclejdk11
scala: 2.11.12

- jdk: openjdk8
- jdk: openjdk11
scala: 2.12.10

- jdk: oraclejdk8
- jdk: oraclejdk11
scala: 2.12.10
env: RELEASE=true

Expand Down
11 changes: 8 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.exasol.cloudetl.sbt

import sbt._
import sbt.{ExclusionRule, _}
import sbt.librarymanagement.InclExclRule

/** A list of required dependencies */
Expand Down Expand Up @@ -85,7 +85,11 @@ object Dependencies {
"org.scalatest" %% "scalatest" % "3.1.0",
"org.scalatestplus" %% "scalatestplus-mockito" % "1.0.0-M2",
"org.mockito" % "mockito-core" % "3.2.4",
"io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % "5.4.0"
"io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % "5.4.0",
"org.junit.jupiter" % "junit-jupiter-engine" % "5.6.0",
"org.testcontainers" % "junit-jupiter" % "1.13.0",
"com.exasol" % "exasol-testcontainers" % "1.6.1",
"org.testcontainers" % "localstack" % "1.13.0"
).map(_ % Test)

lazy val ExcludedDependencies: Seq[InclExclRule] = Seq(
Expand All @@ -95,7 +99,8 @@ object Dependencies {
ExclusionRule("com.sun.jersey", "jersey-server"),
ExclusionRule("com.sun.jersey", "jersey-json"),
ExclusionRule("javax.servlet", "servlet-api"),
ExclusionRule("javax.servlet.jsp", "jsp-api")
ExclusionRule("javax.servlet.jsp", "jsp-api"),
ExclusionRule("org.openjfx", "javafx.base")
)

/** The list of all dependencies for the connector */
Expand Down
4 changes: 2 additions & 2 deletions src/it/scala/com/exasol/cloudetl/KafkaIntegrationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ trait KafkaIntegrationTest
val bHead :: bTail = Seq.fill(partitions.size - 1)(true) ++ Seq(false)
when(mockedIterator.next()).thenReturn(bHead, bTail: _*)

val pHead :: pTail = partitions.map(new java.lang.Integer(_))
val pHead :: pTail = partitions.map(Integer.valueOf)
when(mockedIterator.getInteger(1)).thenReturn(pHead, pTail: _*)

val oHead :: oTail = offsets.map(new java.lang.Long(_))
val oHead :: oTail = offsets.map(java.lang.Long.valueOf)
when(mockedIterator.getLong(2)).thenReturn(oHead, oTail: _*)

mockedIterator
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package com.exasol.cloudetl.kinesis;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.exasol.bucketfs.Bucket;
import com.exasol.bucketfs.BucketAccessException;
import com.exasol.containers.ExasolContainer;
import com.exasol.containers.ExasolContainerConstants;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.*;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY;
import static org.junit.jupiter.api.Assertions.*;

@Testcontainers
class KinesisShardsMetadataReaderIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(KinesisShardsMetadataReaderIT.class);
private static final String JAR_NAME_PATTERN = "cloud-storage-etl-udfs-";
public static final String DOCKER_IP_ADDRESS = "172.17.0.1";
public static final String TEST_STREAM_NAME = "Test_stream";
@Container
private static final ExasolContainer<? extends ExasolContainer<?>> container =
new ExasolContainer<>(
ExasolContainerConstants.EXASOL_DOCKER_IMAGE_REFERENCE) //
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
@Container
public static LocalStackContainer kinesisLocalStack = new LocalStackContainer()
.withServices(LocalStackContainer.Service.KINESIS);
private static AmazonKinesis amazonKinesis;
private static String cloudStorageEltUdfsJarName;
private static Statement statement;
private static Connection connection;

@BeforeAll
static void beforeAll()
throws SQLException, BucketAccessException, InterruptedException, TimeoutException,
IOException {
cloudStorageEltUdfsJarName = findAssembledJarName();
uploadJarToBucket();
createStream();
connection =
container.createConnectionForUser(container.getUsername(), container.getPassword());
statement = connection.createStatement();
statement.execute("CREATE SCHEMA kinesis_schema");
statement.execute("OPEN SCHEMA kinesis_schema");
statement.execute(
"CREATE OR REPLACE JAVA SET SCRIPT KINESIS_METADATA (...) EMITS (kinesis_shard_id VARCHAR(130), shard_sequence_number VARCHAR(2000)) AS\n" +
" %jvmoption -Dcom.amazonaws.sdk.disableCbor=true;\n" +
" %scriptclass com.exasol.cloudetl.kinesis.KinesisShardsMetadataReader;\n" +
" %jar /buckets/bfsdefault/default/" + cloudStorageEltUdfsJarName + ";\n" +
"/\n");
// We have to wait until stream is ready to be accessed.
Thread.sleep(40 * 1000);
}

private static void uploadJarToBucket()
throws InterruptedException, BucketAccessException, TimeoutException {
final Bucket bucket = container.getDefaultBucket();
final Path pathToRls = Path.of("target", "scala-2.12", cloudStorageEltUdfsJarName);
bucket.uploadFile(pathToRls, cloudStorageEltUdfsJarName);
}

private static void createStream() {
amazonKinesis = AmazonKinesisClientBuilder.standard()
.withEndpointConfiguration(
kinesisLocalStack.getEndpointConfiguration(LocalStackContainer.Service.KINESIS))
.withCredentials(kinesisLocalStack.getDefaultCredentialsProvider()).build();
System.setProperty(AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
amazonKinesis.createStream(TEST_STREAM_NAME, 3);
}

@AfterAll
static void releaseResources() throws SQLException {
connection.close();
statement.close();
amazonKinesis.shutdown();
}

private static String findAssembledJarName() throws IOException {
final List<String> jars =
Files.list(Path.of("target", "scala-2.12")).map(path -> path.getFileName().toString())
.filter(fileName -> fileName.contains(JAR_NAME_PATTERN)).collect(Collectors.toList());
if (!jars.isEmpty()) {
return jars.get(0);
} else {
throw new IllegalArgumentException("Cannot find a jar for uploading to the bucket.");
}
}

@Test
void testMetadataReaderGetsShardsFromStream() throws SQLException {
final ResultSet resultSet = this.executeKinesisMetadataScript("VALUES (('0', '0'))");
assertAll(
() -> assertTrue(resultSet.next()),
() -> assertEquals("shardId-000000000000", resultSet.getString("KINESIS_SHARD_ID")),
() -> assertNull(resultSet.getString("SHARD_SEQUENCE_NUMBER")),
() -> assertTrue(resultSet.next()),
() -> assertEquals("shardId-000000000001", resultSet.getString("KINESIS_SHARD_ID")),
() -> assertNull(resultSet.getString("SHARD_SEQUENCE_NUMBER")),
() -> assertTrue(resultSet.next()),
() -> assertEquals("shardId-000000000002", resultSet.getString("KINESIS_SHARD_ID")),
() -> assertNull(resultSet.getString("SHARD_SEQUENCE_NUMBER")),
() -> assertFalse(resultSet.next()));
}

@Test
void testMetadataReaderCombineShardsMetadataWithExistingTable()
throws SQLException {
final ResultSet resultSet = this.executeKinesisMetadataScript(
"VALUES (('shardId-000000000000', '1234'), ('shardId-000000000001', '5678'), ('shardId-000000000004', '9012')) ");
assertAll(
() -> assertTrue(resultSet.next()),
() -> assertEquals("shardId-000000000000", resultSet.getString("KINESIS_SHARD_ID")),
() -> assertEquals("1234", resultSet.getString("SHARD_SEQUENCE_NUMBER")),
() -> assertTrue(resultSet.next()),
() -> assertEquals("shardId-000000000001", resultSet.getString("KINESIS_SHARD_ID")),
() -> assertEquals("5678", resultSet.getString("SHARD_SEQUENCE_NUMBER")),
() -> assertTrue(resultSet.next()),
() -> assertEquals("shardId-000000000002", resultSet.getString("KINESIS_SHARD_ID")),
() -> assertNull(resultSet.getString("SHARD_SEQUENCE_NUMBER")),
() -> assertFalse(resultSet.next()));
}

private ResultSet executeKinesisMetadataScript(final String tableImitatingValues)
throws SQLException {
final AWSCredentials credentials =
kinesisLocalStack.getDefaultCredentialsProvider().getCredentials();
final AwsClientBuilder.EndpointConfiguration endpointConfiguration =
kinesisLocalStack.getEndpointConfiguration(LocalStackContainer.Service.KINESIS);
System.out.println(endpointConfiguration.getServiceEndpoint());
final String endpointInsideDocker =
endpointConfiguration.getServiceEndpoint().replaceAll("127.0.0.1", DOCKER_IP_ADDRESS);
final String sql =
"SELECT KINESIS_METADATA('AWS_ACCESS_KEY -> " + credentials.getAWSAccessKeyId() +
";AWS_SECRET_KEY -> " + credentials.getAWSSecretKey() +
";REGION -> " + endpointConfiguration.getSigningRegion() +
";STREAM_NAME -> " + TEST_STREAM_NAME +
";AWS_SERVICE_ENDPOINT -> " + endpointInsideDocker +
"', KINESIS_SHARD_ID, SHARD_SEQUENCE_NUMBER) \n" +
"FROM (" + tableImitatingValues +
" AS t(KINESIS_SHARD_ID, SHARD_SEQUENCE_NUMBER)) ORDER BY KINESIS_SHARD_ID";
return statement.executeQuery(sql);
}
}
32 changes: 28 additions & 4 deletions src/it/scala/com/exasol/cloudetl/scriptclasses/KafkaImportIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,20 @@ class KafkaImportIT extends KafkaIntegrationTest {
anyInt().asInstanceOf[JInt],
anyLong().asInstanceOf[JLong]
)
verify(iter, times(1)).emit(new JInt(0), new JLong(0), "abc", new JInt(3), new JLong(13))
verify(iter, times(1)).emit(new JInt(0), new JLong(1), "hello", new JInt(4), new JLong(14))
verify(iter, times(1)).emit(
JInt.valueOf(0),
JLong.valueOf(0),
"abc",
JInt.valueOf(3),
JLong.valueOf(13)
)
verify(iter, times(1)).emit(
JInt.valueOf(0),
JLong.valueOf(1),
"hello",
JInt.valueOf(4),
JLong.valueOf(14)
)
}

test("run emits records starting from provided offset") {
Expand All @@ -51,8 +63,20 @@ class KafkaImportIT extends KafkaIntegrationTest {
anyInt().asInstanceOf[JInt],
anyLong().asInstanceOf[JLong]
)
verify(iter, times(1)).emit(new JInt(0), new JLong(2), "def", new JInt(7), new JLong(17))
verify(iter, times(1)).emit(new JInt(0), new JLong(3), "xyz", new JInt(13), new JLong(23))
verify(iter, times(1)).emit(
JInt.valueOf(0),
JLong.valueOf(2),
"def",
JInt.valueOf(7),
JLong.valueOf(17)
)
verify(iter, times(1)).emit(
JInt.valueOf(0),
JLong.valueOf(3),
"xyz",
JInt.valueOf(13),
JLong.valueOf(23)
)
}

test("run emits records within min / max records per run") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class KafkaMetadataIT extends KafkaIntegrationTest {
val iter = mockExasolIterator(properties, Seq(0), Seq(-1))
KafkaMetadata.run(mock[ExaMetadata], iter)
verify(iter, times(1)).emit(anyInt().asInstanceOf[JInt], anyLong().asInstanceOf[JLong])
verify(iter, times(1)).emit(new JInt(0), new JLong(-1))
verify(iter, times(1)).emit(JInt.valueOf(0), JLong.valueOf(-1))
}

// Default case where Exasol table is empty.
Expand All @@ -30,7 +30,7 @@ class KafkaMetadataIT extends KafkaIntegrationTest {
KafkaMetadata.run(mock[ExaMetadata], iter)
verify(iter, times(3)).emit(anyInt().asInstanceOf[JInt], anyLong().asInstanceOf[JLong])
Seq(0, 1, 2).foreach { partitionId =>
verify(iter, times(1)).emit(new JInt(partitionId), new JLong(-1))
verify(iter, times(1)).emit(JInt.valueOf(partitionId), JLong.valueOf(-1))
}
}

Expand All @@ -44,9 +44,9 @@ class KafkaMetadataIT extends KafkaIntegrationTest {
verify(iter, times(3)).emit(anyInt().asInstanceOf[JInt], anyLong().asInstanceOf[JLong])
partitions.zip(offsets).foreach {
case (partitionId, maxOffset) =>
verify(iter, times(1)).emit(new JInt(partitionId), new JLong(maxOffset))
verify(iter, times(1)).emit(JInt.valueOf(partitionId), JLong.valueOf(maxOffset))
}
verify(iter, times(1)).emit(new JInt(2), new JLong(-1))
verify(iter, times(1)).emit(JInt.valueOf(2), JLong.valueOf(-1))
}

// Do not emit partitionId maxOffset pairs if partitionId is not
Expand All @@ -57,8 +57,8 @@ class KafkaMetadataIT extends KafkaIntegrationTest {
KafkaMetadata.run(mock[ExaMetadata], iter)

verify(iter, times(2)).emit(anyInt().asInstanceOf[JInt], anyLong().asInstanceOf[JLong])
verify(iter, times(1)).emit(new JInt(0), new JLong(-1))
verify(iter, times(1)).emit(new JInt(1), new JLong(7))
verify(iter, times(1)).emit(JInt.valueOf(0), JLong.valueOf(-1))
verify(iter, times(1)).emit(JInt.valueOf(1), JLong.valueOf(7))
}

test("run throws if it cannot create KafkConsumer") {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package com.exasol.cloudetl.kinesis

import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicSessionCredentials}
import com.amazonaws.auth.{
AWSCredentials,
AWSStaticCredentialsProvider,
BasicAWSCredentials,
BasicSessionCredentials
}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder}

/**
Expand All @@ -15,16 +21,33 @@ object KinesisClientFactory {
* @param kinesisUserProperties An instance of [[KinesisUserProperties]] class
* with user properties.
*/
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
def createKinesisClient(kinesisUserProperties: KinesisUserProperties): AmazonKinesis = {
val region = kinesisUserProperties.getRegion
val awsCredentials = createAwsCredentials(kinesisUserProperties)
val kinesisClientBuilder = AmazonKinesisClientBuilder.standard
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
if (kinesisUserProperties.containsAwsServiceEndpoint) {
kinesisClientBuilder.withEndpointConfiguration(
new EndpointConfiguration(
kinesisUserProperties.getAwsServiceEndpoint,
region
)
)
} else { kinesisClientBuilder.setRegion(region) }
kinesisClientBuilder.build
}

private[this] def createAwsCredentials(
kinesisUserProperties: KinesisUserProperties
): AWSCredentials = {
val awsAccessKeyId = kinesisUserProperties.getAwsAccessKey
val awsSecretAccessKey = kinesisUserProperties.getAwsSecretKey
val awsSessionToken = kinesisUserProperties.getAwsSessionToken
val region = kinesisUserProperties.getRegion
val awsCredentials =
if (kinesisUserProperties.containsAwsSessionToken) {
val awsSessionToken = kinesisUserProperties.getAwsSessionToken
new BasicSessionCredentials(awsAccessKeyId, awsSecretAccessKey, awsSessionToken)
AmazonKinesisClientBuilder.standard
.withRegion(region)
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.build
} else {
new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ object KinesisUserProperties extends CommonProperties {
val AWS_ACCESS_KEY_PROPERTY: String = "AWS_ACCESS_KEY"
val AWS_SECRET_KEY_PROPERTY: String = "AWS_SECRET_KEY"
val AWS_SESSION_TOKEN_PROPERTY: String = "AWS_SESSION_TOKEN"
val AWS_SERVICE_ENDPOINT_PROPERTY: String = "AWS_SERVICE_ENDPOINT"
val STREAM_NAME_PROPERTY: String = "STREAM_NAME"
val REGION_PROPERTY: String = "REGION"
val TABLE_NAME_PROPERTY: String = "TABLE_NAME"
Expand Down Expand Up @@ -46,6 +47,15 @@ class KinesisUserProperties(val propertiesMap: Map[String, String])
final def getAwsSessionToken: String =
getString(AWS_SESSION_TOKEN_PROPERTY)

final def containsAwsSessionToken: Boolean =
containsKey(AWS_SESSION_TOKEN_PROPERTY)

final def getAwsServiceEndpoint: String =
getString(AWS_SERVICE_ENDPOINT_PROPERTY)

final def containsAwsServiceEndpoint: Boolean =
containsKey(AWS_SERVICE_ENDPOINT_PROPERTY)

final def getStreamName: String =
getString(STREAM_NAME_PROPERTY)

Expand Down
Loading

0 comments on commit 0f83709

Please sign in to comment.