Skip to content

Commit

Permalink
#74: added last test
Browse files Browse the repository at this point in the history
  • Loading branch information
AnastasiiaSergienko committed Mar 19, 2020
1 parent 2534577 commit a1cb868
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 245 deletions.
4 changes: 2 additions & 2 deletions scripts/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ run_cleaning () {
echo "# Cleaning #"
echo "# #"
echo "############################################"
./sbtx ++$TRAVIS_SCALA_VERSION clean
./sbtx ++$TRAVIS_SCALA_VERSION clean assembly
}

run_unit_tests () {
Expand All @@ -49,7 +49,7 @@ run_integration_tests () {
echo "# Integration testing #"
echo "# #"
echo "############################################"
./sbtx ++$TRAVIS_SCALA_VERSION assembly coverage it:test
./sbtx ++$TRAVIS_SCALA_VERSION coverage it:test
}

run_coverage_report () {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,25 @@ import org.scalatest.funsuite.AnyFunSuite
import org.testcontainers.containers.localstack.LocalStackContainer

trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
final val JAR_NAME_PATTERN = "cloud-storage-etl-udfs-"
final val DOCKER_IP_ADDRESS = "172.17.0.1"
final val TEST_STREAM_NAME = "Test_stream"
final val TEST_SCHEMA_NAME = "kinesis_schema"
val JAR_NAME_PATTERN = "cloud-storage-etl-udfs-"
val DOCKER_IP_ADDRESS = "172.17.0.1"
val TEST_STREAM_NAME = "Test_stream"
val TEST_SCHEMA_NAME = "kinesis_schema"
var assembledJarName: String = _

val exasolContainer = new ExasolContainer(
ExasolContainerConstants.EXASOL_DOCKER_IMAGE_REFERENCE
)
val kinesisLocalStack =
val kinesisLocalStack: LocalStackContainer =
new LocalStackContainer().withServices(LocalStackContainer.Service.KINESIS)

var connection: java.sql.Connection = _
var statement: java.sql.Statement = _
var kinesisClient: AmazonKinesis = _

private[kinesis] def setupExasol(): Unit = {
assembledJarName = findAssembledJarName()
exasolContainer.start();
val assembledJarName = findAssembledJarName()
uploadJarToBucket(assembledJarName)
connection = exasolContainer.createConnectionForUser(
exasolContainer.getUsername,
Expand Down Expand Up @@ -77,6 +78,30 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll
}
}

private[kinesis] def createKinesisMetadataScript(): Unit = {
val _ = statement.execute(
s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_METADATA (...)
|EMITS (KINESIS_SHARD_ID VARCHAR(130), SHARD_SEQUENCE_NUMBER VARCHAR(2000)) AS
| %jvmoption -Dcom.amazonaws.sdk.disableCbor=true;
| %scriptclass com.exasol.cloudetl.kinesis.KinesisShardsMetadataReader;
| %jar /buckets/bfsdefault/default/$assembledJarName;
|/
|""".stripMargin
)
}

private[kinesis] def createKinesisImportScript(emits: String): Unit = {
val _ = statement.execute(
s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_IMPORT (...)
|EMITS ($emits) AS
| %jvmoption -Dcom.amazonaws.sdk.disableCbor=true;
| %scriptclass com.exasol.cloudetl.kinesis.KinesisShardDataImporter;
| %jar /buckets/bfsdefault/default/$assembledJarName;
|/
|""".stripMargin
)
}

override final def afterAll(): Unit = {
connection.close()
statement.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.exasol.cloudetl.kinesis

import java.nio.ByteBuffer

import com.exasol.cloudetl.kinesis.KinesisConstants.{
KINESIS_SHARD_ID_COLUMN_NAME,
SHARD_SEQUENCE_NUMBER_COLUMN_NAME
}

import org.testcontainers.containers.localstack.LocalStackContainer

class KinesisImportQueryGeneratorIT extends KinesisAbstractIntegrationTest {
final val TEST_TABLE_NAME = "kinesis_table"

override final def beforeAll(): Unit = {
createKinesisStream(2)
setupExasol()
putRecordsIntoStream()
createKinesisMetadataScript()
createKinesisImportScript("...")
val columns =
"""sensorId DECIMAL(18,0),
|currentTemperature DECIMAL(18,0),
|status VARCHAR(100),
|kinesis_shard_id VARCHAR(2000),
|shard_sequence_number VARCHAR(2000)"""
statement.execute(
s"""CREATE OR REPLACE TABLE $TEST_TABLE_NAME
| ($columns)
|""".stripMargin
)
val assembledJarName = findAssembledJarName()
val _ = statement.execute(
s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_PATH (...)
|EMITS (...) AS
| %jvmoption -Dcom.amazonaws.sdk.disableCbor=true;
| %scriptclass com.exasol.cloudetl.kinesis.KinesisImportQueryGenerator;
| %jar /buckets/bfsdefault/default/$assembledJarName;
|/
|""".stripMargin
)
}

private[this] def putRecordsIntoStream(): Unit = {
putRecordIntoStream("{\"sensorId\": 17,\"currentTemperature\": 147,\"status\": \"WARN\"}")
putRecordIntoStream("{\"sensorId\": 20,\"currentTemperature\": 15,\"status\": \"OK\"}")
}

private[this] def putRecordIntoStream(recordData: String): Unit = {
val data = ByteBuffer.wrap(recordData.getBytes())
val _ = kinesisClient.putRecord(TEST_STREAM_NAME, data, "partitionKey-1")
}

test("returns data from a shard") {
executeKinesisMetadataScript()
val resultSet = statement.executeQuery(s"SELECT * FROM $TEST_TABLE_NAME")
assert(resultSet.next() === true)
assert(resultSet.getInt("sensorId") === 17)
assert(resultSet.getInt("currentTemperature") === 147)
assert(resultSet.getString("status") === "WARN")
assert(resultSet.getString(KINESIS_SHARD_ID_COLUMN_NAME) === "shardId-000000000000")
assert(resultSet.getString(SHARD_SEQUENCE_NUMBER_COLUMN_NAME) !== null)
assert(resultSet.next() === true)
assert(resultSet.getInt("sensorId") === 20)
assert(resultSet.getInt("currentTemperature") === 15)
assert(resultSet.getString("status") === "OK")
assert(resultSet.getString(KINESIS_SHARD_ID_COLUMN_NAME) === "shardId-000000000000")
assert(resultSet.getString(SHARD_SEQUENCE_NUMBER_COLUMN_NAME) !== null)
assert(resultSet.next() === false)
}

private[this] def executeKinesisMetadataScript(): Unit = {
val endpointConfiguration =
kinesisLocalStack.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
val endpointInsideDocker =
endpointConfiguration.getServiceEndpoint.replaceAll("127.0.0.1", DOCKER_IP_ADDRESS)
val credentials = kinesisLocalStack.getDefaultCredentialsProvider.getCredentials
val _ = statement.execute(
s"""IMPORT INTO $TEST_TABLE_NAME
|FROM SCRIPT KINESIS_PATH WITH
| TABLE_NAME = '$TEST_TABLE_NAME'
| AWS_ACCESS_KEY = '${credentials.getAWSAccessKeyId}'
| AWS_SECRET_KEY = '${credentials.getAWSSecretKey}'
| STREAM_NAME = '$TEST_STREAM_NAME'
| REGION = '${endpointConfiguration.getSigningRegion}'
| AWS_SERVICE_ENDPOINT = '$endpointInsideDocker'
""".stripMargin
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import org.testcontainers.containers.localstack.LocalStackContainer

@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest {

override final def beforeAll(): Unit = {
createKinesisStream(1)
setupExasol()
Expand All @@ -20,21 +19,7 @@ class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest {
|status VARCHAR(100),
|kinesis_shard_id VARCHAR(2000),
|shard_sequence_number VARCHAR(2000)"""
statement.execute(
s"""CREATE OR REPLACE TABLE kinesis_table
| ($columns)
|""".stripMargin
)
val assembledJarName = findAssembledJarName()
val _ = statement.execute(
s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_IMPORT (...)
|EMITS ($columns) AS
| %jvmoption -Dcom.amazonaws.sdk.disableCbor=true;
| %scriptclass com.exasol.cloudetl.kinesis.KinesisShardDataImporter;
| %jar /buckets/bfsdefault/default/$assembledJarName;
|/
|""".stripMargin
)
createKinesisImportScript(columns)
}

private[this] def putRecordsIntoStream(): Unit = {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,11 @@ import java.sql.ResultSet

import org.testcontainers.containers.localstack.LocalStackContainer

class KinesisMetadataReaderIT extends KinesisAbstractIntegrationTest {
class KinesisShardsMetadataReaderIT extends KinesisAbstractIntegrationTest {
override final def beforeAll(): Unit = {
createKinesisStream(3)
setupExasol()
val assembledJarName = findAssembledJarName()
val _ = statement.execute(
s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_METADATA (...)
|EMITS (KINESIS_SHARD_ID VARCHAR(130), SHARD_SEQUENCE_NUMBER VARCHAR(2000)) AS
| %jvmoption -Dcom.amazonaws.sdk.disableCbor=true;
| %scriptclass com.exasol.cloudetl.kinesis.KinesisShardsMetadataReader;
| %jar /buckets/bfsdefault/default/$assembledJarName;
|/
|""".stripMargin
)
createKinesisMetadataScript()
}

test("returns shards from a stream") {
Expand Down
Loading

0 comments on commit a1cb868

Please sign in to comment.