Skip to content

Commit

Permalink
#74: applied review findings
Browse files Browse the repository at this point in the history
  • Loading branch information
AnastasiiaSergienko committed Mar 9, 2020
1 parent d115575 commit 1a8ce1d
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class KinesisArgumentsReader(val exaIterator: ExaIterator) {
final def getAwsAccessKeyArgument: String =
getStringProperty(KinesisArgumentsReader.AWS_ACCESS_KEY_ARGUMENT)

private def getStringProperty(columnNumber: Int): String =
private[this] def getStringProperty(columnNumber: Int): String =
try this.exaIterator.getString(columnNumber)
catch {
case exception @ (_: ExaIterationException | _: ExaDataTypeException) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.exasol.cloudetl.kinesis

import com.exasol.cloudetl.common.AbstractProperties

object KinesisPropertiesReader {
val AWS_ACCESS_KEY_PROPERTY: String = "AWS_ACCESS_KEY"
val AWS_SECRET_KEY_PROPERTY: String = "AWS_SECRET_KEY"
Expand All @@ -9,39 +11,23 @@ object KinesisPropertiesReader {
val TABLE_NAME_PROPERTY: String = "TABLE_NAME"
}

class KinesisPropertiesReader(val parametersMap: Map[String, String]) {
class KinesisPropertiesReader(val parametersMap: Map[String, String])
extends AbstractProperties(parametersMap) {
final def getAwsAccessKeyProperty: String =
getPropertyByName(KinesisPropertiesReader.AWS_ACCESS_KEY_PROPERTY)

private def getPropertyByName(propertyName: String): String =
if (parametersMap != null) {
this.parametersMap
.get(propertyName)
.fold {
throw new KinesisConnectorException(
s"A mandatory property $propertyName is missing.",
null
)
}(identity)
} else {
throw new KinesisConnectorException(
"A map can not be null. Please, provide a map with properties.",
null
)
}
getString(KinesisPropertiesReader.AWS_ACCESS_KEY_PROPERTY)

final def getAwsSecretKeyProperty: String =
getPropertyByName(KinesisPropertiesReader.AWS_SECRET_KEY_PROPERTY)
getString(KinesisPropertiesReader.AWS_SECRET_KEY_PROPERTY)

final def getAwsSessionTokenProperty: String =
getPropertyByName(KinesisPropertiesReader.AWS_SESSION_TOKEN_PROPERTY)
getString(KinesisPropertiesReader.AWS_SESSION_TOKEN_PROPERTY)

final def getStreamNameProperty: String =
getPropertyByName(KinesisPropertiesReader.STREAM_NAME_PROPERTY)
getString(KinesisPropertiesReader.STREAM_NAME_PROPERTY)

final def getRegionProperty: String =
getPropertyByName(KinesisPropertiesReader.REGION_PROPERTY)
getString(KinesisPropertiesReader.REGION_PROPERTY)

final def getTableNameProperty: String =
getPropertyByName(KinesisPropertiesReader.TABLE_NAME_PROPERTY)
getString(KinesisPropertiesReader.TABLE_NAME_PROPERTY)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package com.exasol.cloudetl.kinesis

import scala.collection.JavaConverters._

import com.exasol.cloudetl.util.JsonDeserializer

import com.amazonaws.auth._
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder}
import com.amazonaws.services.kinesis.model._
import com.exasol._
import com.fasterxml.jackson.databind.{MapperFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import java.util

object KinesisShardDataImporter {
Expand Down Expand Up @@ -71,16 +71,17 @@ object KinesisShardDataImporter {
getRecordsResult.getRecords.asScala.toList
}

@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
private[kinesis] def createTableValuesListFromRecord(
record: Record,
shardId: String
): Seq[AnyRef] = {
val data = record.getData
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.disable(MapperFeature.ALLOW_COERCION_OF_SCALARS)
val values = mapper.readValue[util.LinkedHashMap[String, AnyRef]](new String(data.array()))
values.values().stream().toArray().toSeq ++ Seq(shardId, record.getSequenceNumber)
val parsedValuesMap = JsonDeserializer
.parseJson[util.LinkedHashMap[String, AnyRef]](new String(data.array()))
.values()
.stream()
.toArray
.toSeq
parsedValuesMap ++ Seq(shardId, record.getSequenceNumber)
}
}
14 changes: 14 additions & 0 deletions src/main/scala/com/exasol/cloudetl/util/JsonDeserializer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.exasol.cloudetl.util

import com.fasterxml.jackson.databind.{MapperFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}

@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
object JsonDeserializer {
lazy val mapper = new ObjectMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.disable(MapperFeature.ALLOW_COERCION_OF_SCALARS)

def parseJson[T: Manifest](jsonString: String): T =
mapper.readValue[T](jsonString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,57 +48,57 @@ class KinesisPropertiesReaderTest extends AnyFunSuite with BeforeAndAfterAll {

test("getAwsAccessKeyProperty throws an exception as the property is missing") {
val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String])
val thrown = intercept[KinesisConnectorException] {
val thrown = intercept[IllegalArgumentException] {
kinesisPropertiesReader.getAwsAccessKeyProperty
}
assert(thrown.getMessage === s"A mandatory property $AWS_ACCESS_KEY_PROPERTY is missing.")
assert(
thrown.getMessage === s"Please provide a value for the $AWS_ACCESS_KEY_PROPERTY property!"
)
}

test("getAwsSecretKeyProperty throws an exception as the property is missing") {
val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String])
val thrown = intercept[KinesisConnectorException] {
val thrown = intercept[IllegalArgumentException] {
kinesisPropertiesReader.getAwsSecretKeyProperty
}
assert(thrown.getMessage === s"A mandatory property $AWS_SECRET_KEY_PROPERTY is missing.")
assert(
thrown.getMessage === s"Please provide a value for the $AWS_SECRET_KEY_PROPERTY property!"
)
}

test("getAwsSessionTokenProperty throws an exception as the property is missing") {
val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String])
val thrown = intercept[KinesisConnectorException] {
val thrown = intercept[IllegalArgumentException] {
kinesisPropertiesReader.getAwsSessionTokenProperty
}
assert(thrown.getMessage === s"A mandatory property $AWS_SESSION_TOKEN_PROPERTY is missing.")
assert(
thrown.getMessage === s"Please provide a value for the $AWS_SESSION_TOKEN_PROPERTY property!"
)
}

test("getStreamNameProperty throws an exception as the property is missing") {
val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String])
val thrown = intercept[KinesisConnectorException] {
val thrown = intercept[IllegalArgumentException] {
kinesisPropertiesReader.getStreamNameProperty
}
assert(thrown.getMessage === s"A mandatory property $STREAM_NAME_PROPERTY is missing.")
assert(
thrown.getMessage === s"Please provide a value for the $STREAM_NAME_PROPERTY property!"
)
}

test("getRegionProperty throws an exception as the property is missing") {
val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String])
val thrown = intercept[KinesisConnectorException] {
val thrown = intercept[IllegalArgumentException] {
kinesisPropertiesReader.getRegionProperty
}
assert(thrown.getMessage === s"A mandatory property $REGION_PROPERTY is missing.")
assert(thrown.getMessage === s"Please provide a value for the $REGION_PROPERTY property!")
}

test("getTableNameProperty throws an exception as the property is missing") {
val kinesisPropertiesReader = new KinesisPropertiesReader(Map.empty[String, String])
val thrown = intercept[KinesisConnectorException] {
val thrown = intercept[IllegalArgumentException] {
kinesisPropertiesReader.getTableNameProperty
}
assert(thrown.getMessage === s"A mandatory property $TABLE_NAME_PROPERTY is missing.")
}

test("getTableNameProperty throws an exception as Map is null") {
val kinesisPropertiesReader = new KinesisPropertiesReader(null)
val thrown = intercept[KinesisConnectorException] {
kinesisPropertiesReader.getTableNameProperty
}
assert(thrown.getMessage === "A map can not be null. Please, provide a map with properties.")
assert(thrown.getMessage === s"Please provide a value for the $TABLE_NAME_PROPERTY property!")
}
}
13 changes: 13 additions & 0 deletions src/test/scala/com/exasol/cloudetl/util/JsonDeserializerTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.exasol.cloudetl.util

import scala.collection.immutable.HashMap

import org.scalatest.funsuite.AnyFunSuite

class JsonDeserializerTest extends AnyFunSuite {
test("parseJson parses a String") {
val jsonString = "{\"sensorId\": 17,\"currentTemperature\": 147,\"status\": \"WARN\"}"
val values = JsonDeserializer.parseJson[HashMap[String, Object]](jsonString)
assert(values === HashMap(("sensorId", 17), ("currentTemperature", 147), ("status", "WARN")))
}
}

0 comments on commit 1a8ce1d

Please sign in to comment.