Skip to content

Commit

Permalink
#74: refactored tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AnastasiiaSergienko committed Mar 19, 2020
1 parent a08561b commit 2534577
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 31 deletions.
2 changes: 1 addition & 1 deletion scripts/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ run_integration_tests () {
echo "# Integration testing #"
echo "# #"
echo "############################################"
./sbtx ++$TRAVIS_SCALA_VERSION coverage it:test
./sbtx ++$TRAVIS_SCALA_VERSION assembly coverage it:test
}

run_coverage_report () {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import org.scalatest.funsuite.AnyFunSuite
import org.testcontainers.containers.localstack.LocalStackContainer

trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
val JAR_NAME_PATTERN = "cloud-storage-etl-udfs-"
val DOCKER_IP_ADDRESS = "172.17.0.1"
val TEST_STREAM_NAME = "Test_stream"
final val JAR_NAME_PATTERN = "cloud-storage-etl-udfs-"
final val DOCKER_IP_ADDRESS = "172.17.0.1"
final val TEST_STREAM_NAME = "Test_stream"
final val TEST_SCHEMA_NAME = "kinesis_schema"

val exasolContainer = new ExasolContainer(
ExasolContainerConstants.EXASOL_DOCKER_IMAGE_REFERENCE
Expand All @@ -25,9 +26,7 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll
var statement: java.sql.Statement = _
var kinesisClient: AmazonKinesis = _

def abstractBeforeAll(): Unit = {
kinesisLocalStack.start();
createKinesisStream()
private[kinesis] def setupExasol(): Unit = {
exasolContainer.start();
val assembledJarName = findAssembledJarName()
uploadJarToBucket(assembledJarName)
Expand All @@ -36,26 +35,27 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll
exasolContainer.getPassword
)
statement = connection.createStatement()
statement.execute("CREATE SCHEMA kinesis_schema")
statement.execute("OPEN SCHEMA kinesis_schema")
// We have to wait until stream is ready to be accessed.
Thread.sleep(30 * 1000)
statement.execute(s"CREATE SCHEMA $TEST_SCHEMA_NAME")
val _ = statement.execute(s"OPEN SCHEMA $TEST_SCHEMA_NAME")
}

private[this] def uploadJarToBucket(assembledJarName: String): Unit = {
val pathToRls = Path.of("target", "scala-2.12", assembledJarName)
exasolContainer.getDefaultBucket.uploadFile(pathToRls, assembledJarName)
}

private[this] def createKinesisStream(): Unit = {
private[kinesis] def createKinesisStream(shardsCounter: Integer): Unit = {
kinesisLocalStack.start();
kinesisClient = AmazonKinesisClientBuilder.standard
.withEndpointConfiguration(
kinesisLocalStack.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
)
.withCredentials(kinesisLocalStack.getDefaultCredentialsProvider)
.build
System.setProperty(AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true")
val _ = kinesisClient.createStream(TEST_STREAM_NAME, 3)
kinesisClient.createStream(TEST_STREAM_NAME, shardsCounter)
// We have to wait until stream is ready to be accessed.
Thread.sleep(50 * 1000)
}

def findAssembledJarName(): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import org.testcontainers.containers.localstack.LocalStackContainer

class KinesisMetadataReaderIT extends KinesisAbstractIntegrationTest {
override final def beforeAll(): Unit = {
abstractBeforeAll()
createKinesisStream(3)
setupExasol()
val assembledJarName = findAssembledJarName()
val _ = statement.execute(
s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_METADATA (...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package com.exasol.cloudetl.kinesis

import java.nio.ByteBuffer
import java.sql.ResultSet
import java.util

import com.amazonaws.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry}
import com.exasol.cloudetl.kinesis.KinesisConstants._

import org.testcontainers.containers.localstack.LocalStackContainer

@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest {

override final def beforeAll(): Unit = {
abstractBeforeAll()
createKinesisStream(1)
setupExasol()
putRecordsIntoStream()
val columns =
"""sensorId DECIMAL(18,0),
Expand All @@ -36,27 +37,30 @@ class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest {
)
}

def putRecordsIntoStream(): Unit = {
val putRecordsRequest = new PutRecordsRequest
putRecordsRequest.setStreamName(TEST_STREAM_NAME)
val putRecordsRequestEntryList = new util.ArrayList[PutRecordsRequestEntry]
val putRecordsRequestEntry = new PutRecordsRequestEntry
putRecordsRequestEntry.setData(
ByteBuffer
.wrap("{\"sensorId\": 17,\"currentTemperature\": 147,\"status\": \"WARN\"}".getBytes)
)
putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-1"))
putRecordsRequestEntryList.add(putRecordsRequestEntry)
putRecordsRequest.setRecords(putRecordsRequestEntryList)
val _ = kinesisClient.putRecords(putRecordsRequest)
private[this] def putRecordsIntoStream(): Unit = {
putRecordIntoStream("{\"sensorId\": 17,\"currentTemperature\": 147,\"status\": \"WARN\"}")
putRecordIntoStream("{\"sensorId\": 20,\"currentTemperature\": 15,\"status\": \"OK\"}")
}

private[this] def putRecordIntoStream(recordData: String): Unit = {
val data = ByteBuffer.wrap(recordData.getBytes())
val _ = kinesisClient.putRecord(TEST_STREAM_NAME, data, "partitionKey-1")
}

test("returns data from a shard") {
val resultSet = this.executeKinesisMetadataScript("VALUES (('shardId-000000000001', null))")
val resultSet = this.executeKinesisMetadataScript("VALUES (('shardId-000000000000', null))")
assert(resultSet.next() === true)
assert(resultSet.getInt("sensorId") === 17)
assert(resultSet.getInt("currentTemperature") === 147)
assert(resultSet.getString("status") === "WARN")
assert(resultSet.getString(KINESIS_SHARD_ID_COLUMN_NAME) === "shardId-000000000000")
assert(resultSet.getString(SHARD_SEQUENCE_NUMBER_COLUMN_NAME) !== null)
assert(resultSet.next() === true)
assert(resultSet.getInt("sensorId") === 20)
assert(resultSet.getInt("currentTemperature") === 15)
assert(resultSet.getString("status") === "OK")
assert(resultSet.getString(KINESIS_SHARD_ID_COLUMN_NAME) === "shardId-000000000000")
assert(resultSet.getString(SHARD_SEQUENCE_NUMBER_COLUMN_NAME) !== null)
assert(resultSet.next() === false)
}

Expand Down

0 comments on commit 2534577

Please sign in to comment.