From 1a8ce1d41350ddd8917ef4f8f616f8ce5c63749a Mon Sep 17 00:00:00 2001 From: Anastasiia Sergienko Date: Mon, 9 Mar 2020 13:56:01 +0100 Subject: [PATCH] #74: applied review findings --- .../kinesis/KinesisArgumentsReader.scala | 2 +- .../kinesis/KinesisPropertiesReader.scala | 34 +++++----------- .../kinesis/KinesisShardDataImporter.scala | 17 ++++---- .../cloudetl/util/JsonDeserializer.scala | 14 +++++++ .../kinesis/KinesisPropertiesReaderTest.scala | 40 +++++++++---------- .../cloudetl/util/JsonDeserializerTest.scala | 13 ++++++ 6 files changed, 67 insertions(+), 53 deletions(-) create mode 100644 src/main/scala/com/exasol/cloudetl/util/JsonDeserializer.scala create mode 100644 src/test/scala/com/exasol/cloudetl/util/JsonDeserializerTest.scala diff --git a/src/main/scala/com/exasol/cloudetl/kinesis/KinesisArgumentsReader.scala b/src/main/scala/com/exasol/cloudetl/kinesis/KinesisArgumentsReader.scala index b6c10e71..f53b20af 100644 --- a/src/main/scala/com/exasol/cloudetl/kinesis/KinesisArgumentsReader.scala +++ b/src/main/scala/com/exasol/cloudetl/kinesis/KinesisArgumentsReader.scala @@ -16,7 +16,7 @@ class KinesisArgumentsReader(val exaIterator: ExaIterator) { final def getAwsAccessKeyArgument: String = getStringProperty(KinesisArgumentsReader.AWS_ACCESS_KEY_ARGUMENT) - private def getStringProperty(columnNumber: Int): String = + private[this] def getStringProperty(columnNumber: Int): String = try this.exaIterator.getString(columnNumber) catch { case exception @ (_: ExaIterationException | _: ExaDataTypeException) => diff --git a/src/main/scala/com/exasol/cloudetl/kinesis/KinesisPropertiesReader.scala b/src/main/scala/com/exasol/cloudetl/kinesis/KinesisPropertiesReader.scala index 76d1786a..d172ec11 100644 --- a/src/main/scala/com/exasol/cloudetl/kinesis/KinesisPropertiesReader.scala +++ b/src/main/scala/com/exasol/cloudetl/kinesis/KinesisPropertiesReader.scala @@ -1,5 +1,7 @@ package com.exasol.cloudetl.kinesis +import com.exasol.cloudetl.common.AbstractProperties + object KinesisPropertiesReader { val AWS_ACCESS_KEY_PROPERTY: String = "AWS_ACCESS_KEY" val AWS_SECRET_KEY_PROPERTY: String = "AWS_SECRET_KEY" @@ -9,39 +11,23 @@ object KinesisPropertiesReader { val TABLE_NAME_PROPERTY: String = "TABLE_NAME" } -class KinesisPropertiesReader(val parametersMap: Map[String, String]) { +class KinesisPropertiesReader(val parametersMap: Map[String, String]) + extends AbstractProperties(parametersMap) { final def getAwsAccessKeyProperty: String = - getPropertyByName(KinesisPropertiesReader.AWS_ACCESS_KEY_PROPERTY) - - private def getPropertyByName(propertyName: String): String = - if (parametersMap != null) { - this.parametersMap - .get(propertyName) - .fold { - throw new KinesisConnectorException( - s"A mandatory property $propertyName is missing.", - null - ) - }(identity) - } else { - throw new KinesisConnectorException( - "A map can not be null. Please, provide a map with properties.", - null - ) - } + getString(KinesisPropertiesReader.AWS_ACCESS_KEY_PROPERTY) final def getAwsSecretKeyProperty: String = - getPropertyByName(KinesisPropertiesReader.AWS_SECRET_KEY_PROPERTY) + getString(KinesisPropertiesReader.AWS_SECRET_KEY_PROPERTY) final def getAwsSessionTokenProperty: String = - getPropertyByName(KinesisPropertiesReader.AWS_SESSION_TOKEN_PROPERTY) + getString(KinesisPropertiesReader.AWS_SESSION_TOKEN_PROPERTY) final def getStreamNameProperty: String = - getPropertyByName(KinesisPropertiesReader.STREAM_NAME_PROPERTY) + getString(KinesisPropertiesReader.STREAM_NAME_PROPERTY) final def getRegionProperty: String = - getPropertyByName(KinesisPropertiesReader.REGION_PROPERTY) + getString(KinesisPropertiesReader.REGION_PROPERTY) final def getTableNameProperty: String = - getPropertyByName(KinesisPropertiesReader.TABLE_NAME_PROPERTY) + getString(KinesisPropertiesReader.TABLE_NAME_PROPERTY) } diff --git a/src/main/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporter.scala b/src/main/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporter.scala index 56be3665..6f746a8e 100644 --- a/src/main/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporter.scala +++ b/src/main/scala/com/exasol/cloudetl/kinesis/KinesisShardDataImporter.scala @@ -2,12 +2,12 @@ package com.exasol.cloudetl.kinesis import scala.collection.JavaConverters._ +import com.exasol.cloudetl.util.JsonDeserializer + import com.amazonaws.auth._ import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder} import com.amazonaws.services.kinesis.model._ import com.exasol._ -import com.fasterxml.jackson.databind.{MapperFeature, ObjectMapper} -import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper} import java.util object KinesisShardDataImporter { @@ -71,16 +71,17 @@ object KinesisShardDataImporter { getRecordsResult.getRecords.asScala.toList } - @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) private[kinesis] def createTableValuesListFromRecord( record: Record, shardId: String ): Seq[AnyRef] = { val data = record.getData - val mapper = new ObjectMapper with ScalaObjectMapper - mapper.registerModule(DefaultScalaModule) - mapper.disable(MapperFeature.ALLOW_COERCION_OF_SCALARS) - val values = mapper.readValue[util.LinkedHashMap[String, AnyRef]](new String(data.array())) - values.values().stream().toArray().toSeq ++ Seq(shardId, record.getSequenceNumber) + val parsedValuesMap = JsonDeserializer + .parseJson[util.LinkedHashMap[String, AnyRef]](new String(data.array())) + .values() + .stream() + .toArray + .toSeq + parsedValuesMap ++ Seq(shardId, record.getSequenceNumber) } } diff --git a/src/main/scala/com/exasol/cloudetl/util/JsonDeserializer.scala b/src/main/scala/com/exasol/cloudetl/util/JsonDeserializer.scala new file mode 100644 index 00000000..9cdae991 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/util/JsonDeserializer.scala @@ -0,0 +1,14 @@ +package com.exasol.cloudetl.util + +import com.fasterxml.jackson.databind.{MapperFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper} + +@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) +object JsonDeserializer { + lazy val mapper = new ObjectMapper with ScalaObjectMapper + mapper.registerModule(DefaultScalaModule) + mapper.disable(MapperFeature.ALLOW_COERCION_OF_SCALARS) + + def parseJson[T: Manifest](jsonString: String): T = + mapper.readValue[T](jsonString) +} diff --git a/src/test/scala/com/exasol/cloudetl/kinesis/KinesisPropertiesReaderTest.scala b/src/test/scala/com/exasol/cloudetl/kinesis/KinesisPropertiesReaderTest.scala index a18be00d..2ab5bdaa 100644 --- a/src/test/scala/com/exasol/cloudetl/kinesis/KinesisPropertiesReaderTest.scala +++ b/src/test/scala/com/exasol/cloudetl/kinesis/KinesisPropertiesReaderTest.scala @@ -48,57 +48,57 @@ class KinesisPropertiesReaderTest extends AnyFunSuite with BeforeAndAfterAll { test("getAwsAccessKeyProperty throws an exception as the property is missing") { val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String]) - val thrown = intercept[KinesisConnectorException] { + val thrown = intercept[IllegalArgumentException] { kinesisPropertiesReader.getAwsAccessKeyProperty } - assert(thrown.getMessage === s"A mandatory property $AWS_ACCESS_KEY_PROPERTY is missing.") + assert( + thrown.getMessage === s"Please provide a value for the $AWS_ACCESS_KEY_PROPERTY property!" + ) } test("getAwsSecretKeyProperty throws an exception as the property is missing") { val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String]) - val thrown = intercept[KinesisConnectorException] { + val thrown = intercept[IllegalArgumentException] { kinesisPropertiesReader.getAwsSecretKeyProperty } - assert(thrown.getMessage === s"A mandatory property $AWS_SECRET_KEY_PROPERTY is missing.") + assert( + thrown.getMessage === s"Please provide a value for the $AWS_SECRET_KEY_PROPERTY property!" + ) } test("getAwsSessionTokenProperty throws an exception as the property is missing") { val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String]) - val thrown = intercept[KinesisConnectorException] { + val thrown = intercept[IllegalArgumentException] { kinesisPropertiesReader.getAwsSessionTokenProperty } - assert(thrown.getMessage === s"A mandatory property $AWS_SESSION_TOKEN_PROPERTY is missing.") + assert( + thrown.getMessage === s"Please provide a value for the $AWS_SESSION_TOKEN_PROPERTY property!" + ) } test("getStreamNameProperty throws an exception as the property is missing") { val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String]) - val thrown = intercept[KinesisConnectorException] { + val thrown = intercept[IllegalArgumentException] { kinesisPropertiesReader.getStreamNameProperty } - assert(thrown.getMessage === s"A mandatory property $STREAM_NAME_PROPERTY is missing.") + assert( + thrown.getMessage === s"Please provide a value for the $STREAM_NAME_PROPERTY property!" + ) } test("getRegionProperty throws an exception as the property is missing") { val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String]) - val thrown = intercept[KinesisConnectorException] { + val thrown = intercept[IllegalArgumentException] { kinesisPropertiesReader.getRegionProperty } - assert(thrown.getMessage === s"A mandatory property $REGION_PROPERTY is missing.") + assert(thrown.getMessage === s"Please provide a value for the $REGION_PROPERTY property!") } test("getTableNameProperty throws an exception as the property is missing") { val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String]) - val thrown = intercept[KinesisConnectorException] { + val thrown = intercept[IllegalArgumentException] { kinesisPropertiesReader.getTableNameProperty } - assert(thrown.getMessage === s"A mandatory property $TABLE_NAME_PROPERTY is missing.") - } - - test("getTableNameProperty throws an exception as Map is null") { - val kinesisPropertiesReader = new KinesisPropertiesReader(null) - val thrown = intercept[KinesisConnectorException] { - kinesisPropertiesReader.getTableNameProperty - } - assert(thrown.getMessage === "A map can not be null. Please, provide a map with properties.") + assert(thrown.getMessage === s"Please provide a value for the $TABLE_NAME_PROPERTY property!") } } diff --git a/src/test/scala/com/exasol/cloudetl/util/JsonDeserializerTest.scala b/src/test/scala/com/exasol/cloudetl/util/JsonDeserializerTest.scala new file mode 100644 index 00000000..d2ac08d0 --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/util/JsonDeserializerTest.scala @@ -0,0 +1,13 @@ +package com.exasol.cloudetl.util + +import scala.collection.immutable.HashMap + +import org.scalatest.funsuite.AnyFunSuite + +class JsonDeserializerTest extends AnyFunSuite { + test("parseJson parses a String") { + val jsonString = "{\"sensorId\": 17,\"currentTemperature\": 147,\"status\": \"WARN\"}" + val values = JsonDeserializer.parseJson[HashMap[String, Object]](jsonString) + assert(values === HashMap(("sensorId", 17), ("currentTemperature", 147), ("status", "WARN"))) + } +}