diff --git a/scripts/ci.sh b/scripts/ci.sh index 136a9a3d..5b085e7e 100755 --- a/scripts/ci.sh +++ b/scripts/ci.sh @@ -49,7 +49,7 @@ run_integration_tests () { echo "# Integration testing #" echo "# #" echo "############################################" - ./sbtx ++$TRAVIS_SCALA_VERSION coverage it:test + ./sbtx ++$TRAVIS_SCALA_VERSION assembly coverage it:test } run_coverage_report () { diff --git a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisAbstractIntegrationTest.scala b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisAbstractIntegrationTest.scala index 731d4907..97ba664a 100644 --- a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisAbstractIntegrationTest.scala +++ b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisAbstractIntegrationTest.scala @@ -11,9 +11,10 @@ import org.scalatest.funsuite.AnyFunSuite import org.testcontainers.containers.localstack.LocalStackContainer trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll { - val JAR_NAME_PATTERN = "cloud-storage-etl-udfs-" - val DOCKER_IP_ADDRESS = "172.17.0.1" - val TEST_STREAM_NAME = "Test_stream" + final val JAR_NAME_PATTERN = "cloud-storage-etl-udfs-" + final val DOCKER_IP_ADDRESS = "172.17.0.1" + final val TEST_STREAM_NAME = "Test_stream" + final val TEST_SCHEMA_NAME = "kinesis_schema" val exasolContainer = new ExasolContainer( ExasolContainerConstants.EXASOL_DOCKER_IMAGE_REFERENCE @@ -25,9 +26,7 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll var statement: java.sql.Statement = _ var kinesisClient: AmazonKinesis = _ - def abstractBeforeAll(): Unit = { - kinesisLocalStack.start(); - createKinesisStream() + private[kinesis] def setupExasol(): Unit = { exasolContainer.start(); val assembledJarName = findAssembledJarName() uploadJarToBucket(assembledJarName) @@ -36,10 +35,8 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll exasolContainer.getPassword ) statement = connection.createStatement() - statement.execute("CREATE SCHEMA kinesis_schema") - statement.execute("OPEN SCHEMA kinesis_schema") - // We have to wait until stream is ready to be accessed. - Thread.sleep(30 * 1000) + statement.execute(s"CREATE SCHEMA $TEST_SCHEMA_NAME") + val _ = statement.execute(s"OPEN SCHEMA $TEST_SCHEMA_NAME") } private[this] def uploadJarToBucket(assembledJarName: String): Unit = { @@ -47,7 +44,8 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll exasolContainer.getDefaultBucket.uploadFile(pathToRls, assembledJarName) } - private[this] def createKinesisStream(): Unit = { + private[kinesis] def createKinesisStream(shardsCounter: Integer): Unit = { + kinesisLocalStack.start(); kinesisClient = AmazonKinesisClientBuilder.standard .withEndpointConfiguration( kinesisLocalStack.getEndpointConfiguration(LocalStackContainer.Service.KINESIS) @@ -55,7 +53,9 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll .withCredentials(kinesisLocalStack.getDefaultCredentialsProvider) .build System.setProperty(AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true") - val _ = kinesisClient.createStream(TEST_STREAM_NAME, 3) + kinesisClient.createStream(TEST_STREAM_NAME, shardsCounter) + // We have to wait until stream is ready to be accessed. + Thread.sleep(50 * 1000) } def findAssembledJarName(): String = { diff --git a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisMetadataReaderIT.scala b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisMetadataReaderIT.scala index b64f37e6..22212876 100644 --- a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisMetadataReaderIT.scala +++ b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisMetadataReaderIT.scala @@ -6,7 +6,8 @@ import org.testcontainers.containers.localstack.LocalStackContainer class KinesisMetadataReaderIT extends KinesisAbstractIntegrationTest { override final def beforeAll(): Unit = { - abstractBeforeAll() + createKinesisStream(3) + setupExasol() val assembledJarName = findAssembledJarName() val _ = statement.execute( s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_METADATA (...) diff --git a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporterIT.scala b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporterIT.scala index 3c148d0f..0b31e690 100644 --- a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporterIT.scala +++ b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporterIT.scala @@ -2,16 +2,17 @@ package com.exasol.cloudetl.kinesis import java.nio.ByteBuffer import java.sql.ResultSet -import java.util -import com.amazonaws.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry} +import com.exasol.cloudetl.kinesis.KinesisConstants._ + import org.testcontainers.containers.localstack.LocalStackContainer @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest { override final def beforeAll(): Unit = { - abstractBeforeAll() + createKinesisStream(1) + setupExasol() putRecordsIntoStream() val columns = """sensorId DECIMAL(18,0), @@ -36,27 +37,30 @@ class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest { ) } - def putRecordsIntoStream(): Unit = { - val putRecordsRequest = new PutRecordsRequest - putRecordsRequest.setStreamName(TEST_STREAM_NAME) - val putRecordsRequestEntryList = new util.ArrayList[PutRecordsRequestEntry] - val putRecordsRequestEntry = new PutRecordsRequestEntry - putRecordsRequestEntry.setData( - ByteBuffer - .wrap("{\"sensorId\": 17,\"currentTemperature\": 147,\"status\": \"WARN\"}".getBytes) - ) - putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-1")) - putRecordsRequestEntryList.add(putRecordsRequestEntry) - putRecordsRequest.setRecords(putRecordsRequestEntryList) - val _ = kinesisClient.putRecords(putRecordsRequest) + private[this] def putRecordsIntoStream(): Unit = { + putRecordIntoStream("{\"sensorId\": 17,\"currentTemperature\": 147,\"status\": \"WARN\"}") + putRecordIntoStream("{\"sensorId\": 20,\"currentTemperature\": 15,\"status\": \"OK\"}") + } + + private[this] def putRecordIntoStream(recordData: String): Unit = { + val data = ByteBuffer.wrap(recordData.getBytes()) + val _ = kinesisClient.putRecord(TEST_STREAM_NAME, data, "partitionKey-1") } test("returns data from a shard") { - val resultSet = this.executeKinesisMetadataScript("VALUES (('shardId-000000000001', null))") + val resultSet = this.executeKinesisMetadataScript("VALUES (('shardId-000000000000', null))") assert(resultSet.next() === true) assert(resultSet.getInt("sensorId") === 17) assert(resultSet.getInt("currentTemperature") === 147) assert(resultSet.getString("status") === "WARN") + assert(resultSet.getString(KINESIS_SHARD_ID_COLUMN_NAME) === "shardId-000000000000") + assert(resultSet.getString(SHARD_SEQUENCE_NUMBER_COLUMN_NAME) !== null) + assert(resultSet.next() === true) + assert(resultSet.getInt("sensorId") === 20) + assert(resultSet.getInt("currentTemperature") === 15) + assert(resultSet.getString("status") === "OK") + assert(resultSet.getString(KINESIS_SHARD_ID_COLUMN_NAME) === "shardId-000000000000") + assert(resultSet.getString(SHARD_SEQUENCE_NUMBER_COLUMN_NAME) !== null) assert(resultSet.next() === false) }