diff --git a/scripts/ci.sh b/scripts/ci.sh index 5b085e7e..c2a85c25 100755 --- a/scripts/ci.sh +++ b/scripts/ci.sh @@ -31,7 +31,7 @@ run_cleaning () { echo "# Cleaning #" echo "# #" echo "############################################" - ./sbtx ++$TRAVIS_SCALA_VERSION clean + ./sbtx ++$TRAVIS_SCALA_VERSION clean assembly } run_unit_tests () { @@ -49,7 +49,7 @@ run_integration_tests () { echo "# Integration testing #" echo "# #" echo "############################################" - ./sbtx ++$TRAVIS_SCALA_VERSION assembly coverage it:test + ./sbtx ++$TRAVIS_SCALA_VERSION coverage it:test } run_coverage_report () { diff --git a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisAbstractIntegrationTest.scala b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisAbstractIntegrationTest.scala index 97ba664a..71ce24a0 100644 --- a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisAbstractIntegrationTest.scala +++ b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisAbstractIntegrationTest.scala @@ -11,15 +11,16 @@ import org.scalatest.funsuite.AnyFunSuite import org.testcontainers.containers.localstack.LocalStackContainer trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll { - final val JAR_NAME_PATTERN = "cloud-storage-etl-udfs-" - final val DOCKER_IP_ADDRESS = "172.17.0.1" - final val TEST_STREAM_NAME = "Test_stream" - final val TEST_SCHEMA_NAME = "kinesis_schema" + val JAR_NAME_PATTERN = "cloud-storage-etl-udfs-" + val DOCKER_IP_ADDRESS = "172.17.0.1" + val TEST_STREAM_NAME = "Test_stream" + val TEST_SCHEMA_NAME = "kinesis_schema" + var assembledJarName: String = _ val exasolContainer = new ExasolContainer( ExasolContainerConstants.EXASOL_DOCKER_IMAGE_REFERENCE ) - val kinesisLocalStack = + val kinesisLocalStack: LocalStackContainer = new LocalStackContainer().withServices(LocalStackContainer.Service.KINESIS) var connection: java.sql.Connection = _ @@ -27,8 +28,8 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll var kinesisClient: AmazonKinesis = _ private[kinesis] def setupExasol(): Unit = { + assembledJarName = findAssembledJarName() exasolContainer.start(); - val assembledJarName = findAssembledJarName() uploadJarToBucket(assembledJarName) connection = exasolContainer.createConnectionForUser( exasolContainer.getUsername, @@ -77,6 +78,30 @@ trait KinesisAbstractIntegrationTest extends AnyFunSuite with BeforeAndAfterAll } } + private[kinesis] def createKinesisMetadataScript(): Unit = { + val _ = statement.execute( + s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_METADATA (...) + |EMITS (KINESIS_SHARD_ID VARCHAR(130), SHARD_SEQUENCE_NUMBER VARCHAR(2000)) AS + | %jvmoption -Dcom.amazonaws.sdk.disableCbor=true; + | %scriptclass com.exasol.cloudetl.kinesis.KinesisShardsMetadataReader; + | %jar /buckets/bfsdefault/default/$assembledJarName; + |/ + |""".stripMargin + ) + } + + private[kinesis] def createKinesisImportScript(emits: String): Unit = { + val _ = statement.execute( + s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_IMPORT (...) + |EMITS ($emits) AS + | %jvmoption -Dcom.amazonaws.sdk.disableCbor=true; + | %scriptclass com.exasol.cloudetl.kinesis.KinesisShardDataImporter; + | %jar /buckets/bfsdefault/default/$assembledJarName; + |/ + |""".stripMargin + ) + } + override final def afterAll(): Unit = { connection.close() statement.close() diff --git a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisImportQueryGeneratorIT.scala b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisImportQueryGeneratorIT.scala new file mode 100644 index 00000000..bfe02c12 --- /dev/null +++ b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisImportQueryGeneratorIT.scala @@ -0,0 +1,90 @@ +package com.exasol.cloudetl.kinesis + +import java.nio.ByteBuffer + +import com.exasol.cloudetl.kinesis.KinesisConstants.{ + KINESIS_SHARD_ID_COLUMN_NAME, + SHARD_SEQUENCE_NUMBER_COLUMN_NAME +} + +import org.testcontainers.containers.localstack.LocalStackContainer + +class KinesisImportQueryGeneratorIT extends KinesisAbstractIntegrationTest { + final val TEST_TABLE_NAME = "kinesis_table" + + override final def beforeAll(): Unit = { + createKinesisStream(2) + setupExasol() + putRecordsIntoStream() + createKinesisMetadataScript() + createKinesisImportScript("...") + val columns = + """sensorId DECIMAL(18,0), + |currentTemperature DECIMAL(18,0), + |status VARCHAR(100), + |kinesis_shard_id VARCHAR(2000), + |shard_sequence_number VARCHAR(2000)""" + statement.execute( + s"""CREATE OR REPLACE TABLE $TEST_TABLE_NAME + | ($columns) + |""".stripMargin + ) + val assembledJarName = findAssembledJarName() + val _ = statement.execute( + s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_PATH (...) + |EMITS (...) AS + | %jvmoption -Dcom.amazonaws.sdk.disableCbor=true; + | %scriptclass com.exasol.cloudetl.kinesis.KinesisImportQueryGenerator; + | %jar /buckets/bfsdefault/default/$assembledJarName; + |/ + |""".stripMargin + ) + } + + private[this] def putRecordsIntoStream(): Unit = { + putRecordIntoStream("{\"sensorId\": 17,\"currentTemperature\": 147,\"status\": \"WARN\"}") + putRecordIntoStream("{\"sensorId\": 20,\"currentTemperature\": 15,\"status\": \"OK\"}") + } + + private[this] def putRecordIntoStream(recordData: String): Unit = { + val data = ByteBuffer.wrap(recordData.getBytes()) + val _ = kinesisClient.putRecord(TEST_STREAM_NAME, data, "partitionKey-1") + } + + test("returns data from a shard") { + executeKinesisMetadataScript() + val resultSet = statement.executeQuery(s"SELECT * FROM $TEST_TABLE_NAME") + assert(resultSet.next() === true) + assert(resultSet.getInt("sensorId") === 17) + assert(resultSet.getInt("currentTemperature") === 147) + assert(resultSet.getString("status") === "WARN") + assert(resultSet.getString(KINESIS_SHARD_ID_COLUMN_NAME) === "shardId-000000000000") + assert(resultSet.getString(SHARD_SEQUENCE_NUMBER_COLUMN_NAME) !== null) + assert(resultSet.next() === true) + assert(resultSet.getInt("sensorId") === 20) + assert(resultSet.getInt("currentTemperature") === 15) + assert(resultSet.getString("status") === "OK") + assert(resultSet.getString(KINESIS_SHARD_ID_COLUMN_NAME) === "shardId-000000000000") + assert(resultSet.getString(SHARD_SEQUENCE_NUMBER_COLUMN_NAME) !== null) + assert(resultSet.next() === false) + } + + private[this] def executeKinesisMetadataScript(): Unit = { + val endpointConfiguration = + kinesisLocalStack.getEndpointConfiguration(LocalStackContainer.Service.KINESIS) + val endpointInsideDocker = + endpointConfiguration.getServiceEndpoint.replaceAll("127.0.0.1", DOCKER_IP_ADDRESS) + val credentials = kinesisLocalStack.getDefaultCredentialsProvider.getCredentials + val _ = statement.execute( + s"""IMPORT INTO $TEST_TABLE_NAME + |FROM SCRIPT KINESIS_PATH WITH + | TABLE_NAME = '$TEST_TABLE_NAME' + | AWS_ACCESS_KEY = '${credentials.getAWSAccessKeyId}' + | AWS_SECRET_KEY = '${credentials.getAWSSecretKey}' + | STREAM_NAME = '$TEST_STREAM_NAME' + | REGION = '${endpointConfiguration.getSigningRegion}' + | AWS_SERVICE_ENDPOINT = '$endpointInsideDocker' + """.stripMargin + ) + } +} diff --git a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporterIT.scala b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporterIT.scala index 0b31e690..76dbd774 100644 --- a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporterIT.scala +++ b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporterIT.scala @@ -9,7 +9,6 @@ import org.testcontainers.containers.localstack.LocalStackContainer @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest { - override final def beforeAll(): Unit = { createKinesisStream(1) setupExasol() @@ -20,21 +19,7 @@ class KinesisShardDataImporterIT extends KinesisAbstractIntegrationTest { |status VARCHAR(100), |kinesis_shard_id VARCHAR(2000), |shard_sequence_number VARCHAR(2000)""" - statement.execute( - s"""CREATE OR REPLACE TABLE kinesis_table - | ($columns) - |""".stripMargin - ) - val assembledJarName = findAssembledJarName() - val _ = statement.execute( - s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_IMPORT (...) - |EMITS ($columns) AS - | %jvmoption -Dcom.amazonaws.sdk.disableCbor=true; - | %scriptclass com.exasol.cloudetl.kinesis.KinesisShardDataImporter; - | %jar /buckets/bfsdefault/default/$assembledJarName; - |/ - |""".stripMargin - ) + createKinesisImportScript(columns) } private[this] def putRecordsIntoStream(): Unit = { diff --git a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardsMetadataReaderIT.java b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardsMetadataReaderIT.java deleted file mode 100644 index 9e40c830..00000000 --- a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardsMetadataReaderIT.java +++ /dev/null @@ -1,159 +0,0 @@ -package com.exasol.cloudetl.kinesis; - -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; -import com.exasol.bucketfs.Bucket; -import com.exasol.bucketfs.BucketAccessException; -import com.exasol.containers.ExasolContainer; -import com.exasol.containers.ExasolContainerConstants; -import org.junit.jupiter.api.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.sql.*; -import java.util.List; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; - -import static com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY; -import static org.junit.jupiter.api.Assertions.*; - -@Testcontainers -class KinesisShardsMetadataReaderIT { - private static final Logger LOGGER = - LoggerFactory.getLogger(KinesisShardsMetadataReaderIT.class); - private static final String JAR_NAME_PATTERN = "cloud-storage-etl-udfs-"; - public static final String DOCKER_IP_ADDRESS = "172.17.0.1"; - public static final String TEST_STREAM_NAME = "Test_stream"; - @Container - private static final ExasolContainer> container = - new ExasolContainer<>( - ExasolContainerConstants.EXASOL_DOCKER_IMAGE_REFERENCE) // - .withLogConsumer(new Slf4jLogConsumer(LOGGER)); - @Container - public static LocalStackContainer kinesisLocalStack = new LocalStackContainer() - .withServices(LocalStackContainer.Service.KINESIS); - private static AmazonKinesis amazonKinesis; - private static String cloudStorageEltUdfsJarName; - private static Statement statement; - private static Connection connection; - - @BeforeAll - static void beforeAll() - throws SQLException, BucketAccessException, InterruptedException, TimeoutException, - IOException { - createStream(); - cloudStorageEltUdfsJarName = findAssembledJarName(); - uploadJarToBucket(); - connection = - container.createConnectionForUser(container.getUsername(), container.getPassword()); - statement = connection.createStatement(); - statement.execute("CREATE SCHEMA kinesis_schema"); - statement.execute("OPEN SCHEMA kinesis_schema"); - statement.execute( - "CREATE OR REPLACE JAVA SET SCRIPT KINESIS_METADATA (...) EMITS (kinesis_shard_id VARCHAR(130), shard_sequence_number VARCHAR(2000)) AS\n" + - " %jvmoption -Dcom.amazonaws.sdk.disableCbor=true;\n" + - " %scriptclass com.exasol.cloudetl.kinesis.KinesisShardsMetadataReader;\n" + - " %jar /buckets/bfsdefault/default/" + cloudStorageEltUdfsJarName + ";\n" + - "/\n"); - // We have to wait until stream is ready to be accessed. - Thread.sleep(30 * 1000); - } - - private static void uploadJarToBucket() - throws InterruptedException, BucketAccessException, TimeoutException { - final Bucket bucket = container.getDefaultBucket(); - final Path pathToRls = Path.of("target", "scala-2.12", cloudStorageEltUdfsJarName); - bucket.uploadFile(pathToRls, cloudStorageEltUdfsJarName); - } - - private static void createStream() { - amazonKinesis = AmazonKinesisClientBuilder.standard() - .withEndpointConfiguration( - kinesisLocalStack.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)) - .withCredentials(kinesisLocalStack.getDefaultCredentialsProvider()).build(); - System.setProperty(AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true"); - amazonKinesis.createStream(TEST_STREAM_NAME, 3); - } - - @AfterAll - static void releaseResources() throws SQLException { - connection.close(); - statement.close(); - amazonKinesis.shutdown(); - } - - private static String findAssembledJarName() throws IOException { - final List jars = - Files.list(Path.of("target", "scala-2.12")).map(path -> path.getFileName().toString()) - .filter(fileName -> fileName.contains(JAR_NAME_PATTERN)).collect(Collectors.toList()); - if (!jars.isEmpty()) { - return jars.get(0); - } else { - throw new IllegalArgumentException("Cannot find a jar for uploading to the bucket."); - } - } - - @Test - void testMetadataReaderGetsShardsFromStream() throws SQLException { - final ResultSet resultSet = this.executeKinesisMetadataScript("VALUES (('0', '0'))"); - assertAll( - () -> assertTrue(resultSet.next()), - () -> assertEquals("shardId-000000000000", resultSet.getString("KINESIS_SHARD_ID")), - () -> assertNull(resultSet.getString("SHARD_SEQUENCE_NUMBER")), - () -> assertTrue(resultSet.next()), - () -> assertEquals("shardId-000000000001", resultSet.getString("KINESIS_SHARD_ID")), - () -> assertNull(resultSet.getString("SHARD_SEQUENCE_NUMBER")), - () -> assertTrue(resultSet.next()), - () -> assertEquals("shardId-000000000002", resultSet.getString("KINESIS_SHARD_ID")), - () -> assertNull(resultSet.getString("SHARD_SEQUENCE_NUMBER")), - () -> assertFalse(resultSet.next())); - } - - @Test - void testMetadataReaderCombineShardsMetadataWithExistingTable() - throws SQLException { - final ResultSet resultSet = this.executeKinesisMetadataScript( - "VALUES (('shardId-000000000000', '1234'), ('shardId-000000000001', '5678'), ('shardId-000000000004', '9012')) "); - assertAll( - () -> assertTrue(resultSet.next()), - () -> assertEquals("shardId-000000000000", resultSet.getString("KINESIS_SHARD_ID")), - () -> assertEquals("1234", resultSet.getString("SHARD_SEQUENCE_NUMBER")), - () -> assertTrue(resultSet.next()), - () -> assertEquals("shardId-000000000001", resultSet.getString("KINESIS_SHARD_ID")), - () -> assertEquals("5678", resultSet.getString("SHARD_SEQUENCE_NUMBER")), - () -> assertTrue(resultSet.next()), - () -> assertEquals("shardId-000000000002", resultSet.getString("KINESIS_SHARD_ID")), - () -> assertNull(resultSet.getString("SHARD_SEQUENCE_NUMBER")), - () -> assertFalse(resultSet.next())); - } - - private ResultSet executeKinesisMetadataScript(final String tableImitatingValues) - throws SQLException { - final AWSCredentials credentials = - kinesisLocalStack.getDefaultCredentialsProvider().getCredentials(); - final AwsClientBuilder.EndpointConfiguration endpointConfiguration = - kinesisLocalStack.getEndpointConfiguration(LocalStackContainer.Service.KINESIS); - final String endpointInsideDocker = - endpointConfiguration.getServiceEndpoint().replaceAll("127.0.0.1", DOCKER_IP_ADDRESS); - final String sql = - "SELECT KINESIS_METADATA('AWS_ACCESS_KEY -> " + credentials.getAWSAccessKeyId() + - ";AWS_SECRET_KEY -> " + credentials.getAWSSecretKey() + - ";REGION -> " + endpointConfiguration.getSigningRegion() + - ";STREAM_NAME -> " + TEST_STREAM_NAME + - ";AWS_SERVICE_ENDPOINT -> " + endpointInsideDocker + - "', KINESIS_SHARD_ID, SHARD_SEQUENCE_NUMBER) \n" + - "FROM (" + tableImitatingValues + - " AS t(KINESIS_SHARD_ID, SHARD_SEQUENCE_NUMBER)) ORDER BY KINESIS_SHARD_ID"; - return statement.executeQuery(sql); - } -} \ No newline at end of file diff --git a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisMetadataReaderIT.scala b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardsMetadataReaderIT.scala similarity index 83% rename from src/it/scala/com/exasol/cloudetl/kinesis/KinesisMetadataReaderIT.scala rename to src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardsMetadataReaderIT.scala index 22212876..a20d4259 100644 --- a/src/it/scala/com/exasol/cloudetl/kinesis/KinesisMetadataReaderIT.scala +++ b/src/it/scala/com/exasol/cloudetl/kinesis/KinesisShardsMetadataReaderIT.scala @@ -4,20 +4,11 @@ import java.sql.ResultSet import org.testcontainers.containers.localstack.LocalStackContainer -class KinesisMetadataReaderIT extends KinesisAbstractIntegrationTest { +class KinesisShardsMetadataReaderIT extends KinesisAbstractIntegrationTest { override final def beforeAll(): Unit = { createKinesisStream(3) setupExasol() - val assembledJarName = findAssembledJarName() - val _ = statement.execute( - s"""CREATE OR REPLACE JAVA SET SCRIPT KINESIS_METADATA (...) - |EMITS (KINESIS_SHARD_ID VARCHAR(130), SHARD_SEQUENCE_NUMBER VARCHAR(2000)) AS - | %jvmoption -Dcom.amazonaws.sdk.disableCbor=true; - | %scriptclass com.exasol.cloudetl.kinesis.KinesisShardsMetadataReader; - | %jar /buckets/bfsdefault/default/$assembledJarName; - |/ - |""".stripMargin - ) + createKinesisMetadataScript() } test("returns shards from a stream") { diff --git a/src/main/scala/com/exasol/cloudetl/kinesis/KinesisImportQueryGenerator.scala b/src/main/scala/com/exasol/cloudetl/kinesis/KinesisImportQueryGenerator.scala index 4b60d9db..d23d16e0 100644 --- a/src/main/scala/com/exasol/cloudetl/kinesis/KinesisImportQueryGenerator.scala +++ b/src/main/scala/com/exasol/cloudetl/kinesis/KinesisImportQueryGenerator.scala @@ -23,22 +23,14 @@ object KinesisImportQueryGenerator { importSpecification.getParameters.asScala.toMap ) val tableName = kinesisUserProperties.getTableName - val innerUdfProperties = - kinesisUserProperties.createSelectedPropertiesMap( - KinesisUserProperties.AWS_ACCESS_KEY_PROPERTY, - KinesisUserProperties.AWS_SECRET_KEY_PROPERTY, - KinesisUserProperties.AWS_SESSION_TOKEN_PROPERTY, - KinesisUserProperties.REGION_PROPERTY, - KinesisUserProperties.STREAM_NAME_PROPERTY - ) - val innerUdfPropertiesString = kinesisUserProperties.mkString(innerUdfProperties) + val propertiesString = kinesisUserProperties.mkString() s"""SELECT KINESIS_IMPORT( - | '$innerUdfPropertiesString', + | '$propertiesString', | $KINESIS_SHARD_ID_COLUMN_NAME, | $SHARD_SEQUENCE_NUMBER_COLUMN_NAME |) |FROM ( - | SELECT KINESIS_METADATA('$innerUdfPropertiesString', + | SELECT KINESIS_METADATA('$propertiesString', | $KINESIS_SHARD_ID_COLUMN_NAME, $SHARD_SEQUENCE_NUMBER_COLUMN_NAME) | FROM (SELECT | $KINESIS_SHARD_ID_COLUMN_NAME, diff --git a/src/main/scala/com/exasol/cloudetl/kinesis/KinesisUserProperties.scala b/src/main/scala/com/exasol/cloudetl/kinesis/KinesisUserProperties.scala index e9420c87..64bf8eb6 100644 --- a/src/main/scala/com/exasol/cloudetl/kinesis/KinesisUserProperties.scala +++ b/src/main/scala/com/exasol/cloudetl/kinesis/KinesisUserProperties.scala @@ -68,27 +68,10 @@ class KinesisUserProperties(val propertiesMap: Map[String, String]) /** * Converts a properties map to a string. * - * @param propertiesMap A map with properties. + * Uses a map that user passed to a constructor. */ - final def mkString(propertiesMap: Map[String, String]): String = + final def mkString(): String = (SortedMap.empty[String, String] ++ propertiesMap) .map { case (k, v) => s"$k$KEY_VALUE_SEPARATOR$v" } .mkString(PROPERTY_SEPARATOR) - - /** - * Converts a properties map to a string. - * - * Uses a map that user passed to a constructor. - */ - final def mkString(): String = - mkString(propertiesMap) - - /** - * Creates a new [[scala.collection.Map]] with user-selected properties based on the existing - * one. - * - * @param propertyNames Names of the properties to include. - */ - final def createSelectedPropertiesMap(propertyNames: String*): Map[String, String] = - propertiesMap.filter { case (key, _) => propertyNames.contains(key) } } diff --git a/src/test/scala/com/exasol/cloudetl/kinesis/KinesisImportQueryGeneratorTest.scala b/src/test/scala/com/exasol/cloudetl/kinesis/KinesisImportQueryGeneratorTest.scala index 12f76279..c54dab87 100644 --- a/src/test/scala/com/exasol/cloudetl/kinesis/KinesisImportQueryGeneratorTest.scala +++ b/src/test/scala/com/exasol/cloudetl/kinesis/KinesisImportQueryGeneratorTest.scala @@ -37,7 +37,8 @@ class KinesisImportQueryGeneratorTest |AWS_SECRET_KEY -> MY_SECRET_KEY; |AWS_SESSION_TOKEN -> MY_SESSION_TOKEN; |REGION -> eu-west-1; - |STREAM_NAME -> Test_stream""".stripMargin.replace("\n", "") + |STREAM_NAME -> Test_stream; + |TABLE_NAME -> TEST_TABLE""".stripMargin.replace("\n", "") val expected = s"""SELECT KINESIS_IMPORT( | '$propertiesAsString', diff --git a/src/test/scala/com/exasol/cloudetl/kinesis/KinesisUserPropertiesTest.scala b/src/test/scala/com/exasol/cloudetl/kinesis/KinesisUserPropertiesTest.scala index 0223e879..8eb55796 100644 --- a/src/test/scala/com/exasol/cloudetl/kinesis/KinesisUserPropertiesTest.scala +++ b/src/test/scala/com/exasol/cloudetl/kinesis/KinesisUserPropertiesTest.scala @@ -115,7 +115,7 @@ class KinesisUserPropertiesTest extends AnyFunSuite { assert(thrown.getMessage === s"Please provide a value for the $TABLE_NAME_PROPERTY property!") } - test("makeString returns a properties string") { + test("mkString returns a properties string") { val kinesisUserProperties = new KinesisUserProperties(kinesisProperties) assert( kinesisUserProperties.mkString() === @@ -128,23 +128,4 @@ class KinesisUserPropertiesTest extends AnyFunSuite { |TABLE_NAME -> TEST_TABLE""".stripMargin.replace("\n", "") ) } - - test("makeString returns a properties string for a custom map") { - val propertiesMap = Map( - "TABLE_NAME" -> "TEST_TABLE", - "REGION" -> "eu-west-1" - ) - val kinesisUserProperties = new KinesisUserProperties(kinesisProperties) - assert( - kinesisUserProperties - .mkString(propertiesMap) === "REGION -> eu-west-1;TABLE_NAME -> TEST_TABLE" - ) - } - - test("createNewPropertiesMap creates a property map with selected properties") { - val kinesisUserProperties = new KinesisUserProperties(kinesisProperties) - val newProperties = - kinesisUserProperties.createSelectedPropertiesMap(TABLE_NAME_PROPERTY, STREAM_NAME_PROPERTY) - assert(newProperties === Map("TABLE_NAME" -> "TEST_TABLE", "STREAM_NAME" -> "Test_stream")) - } }