Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor user properties handling #46

Merged
merged 13 commits into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion project/Compilation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ object Compilation {
val WartremoverTestFlags: Seq[Wart] = ExtraWartremoverFlags ++ Warts.allBut(
Wart.Any,
Wart.NonUnitStatements,
Wart.Null
Wart.Null,
Wart.Var
)

}
15 changes: 14 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.exasol.cloudetl.sbt

import sbt._
import sbt.librarymanagement.InclExclRule

/** A list of required dependencies */
object Dependencies {
Expand Down Expand Up @@ -42,9 +43,21 @@ object Dependencies {
/** Test dependencies only required in `test` */
private val TestDependencies: Seq[ModuleID] = Seq(
"org.scalatest" %% "scalatest" % "3.0.5",
"org.mockito" % "mockito-core" % "2.23.4"
"org.mockito" % "mockito-core" % "2.23.4",
"org.apache.kafka" %% "kafka" % "2.3.0" exclude ("org.slf4j", "slf4j-log4j12") exclude ("org.apache.kafka", "kafka-clients"),
"io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % "5.3.0" exclude ("org.apacha.kafka", "kafka")
).map(_ % Test)

lazy val ExcludedDependencies: Seq[InclExclRule] = Seq(
ExclusionRule("org.ow2.asm", "asm"),
ExclusionRule("javax.ws.rs", "jsr311-api"),
ExclusionRule("com.sun.jersey", "jersey-core"),
ExclusionRule("com.sun.jersey", "jersey-server"),
ExclusionRule("com.sun.jersey", "jersey-json"),
ExclusionRule("javax.servlet", "servlet-api"),
ExclusionRule("javax.servlet.jsp", "jsp-api")
)

/** The list of all dependencies for the connector */
lazy val AllDependencies: Seq[ModuleID] = CoreDependencies ++ TestDependencies

Expand Down
3 changes: 2 additions & 1 deletion project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ object Settings {
compileOrder in Compile := CompileOrder.JavaThenScala,
// Dependency settings
resolvers ++= Dependencies.Resolvers,
libraryDependencies ++= Dependencies.AllDependencies
libraryDependencies ++= Dependencies.AllDependencies,
excludeDependencies ++= Dependencies.ExcludedDependencies
)

def miscSettings(): Seq[Setting[_]] = Seq(
Expand Down
98 changes: 98 additions & 0 deletions src/it/scala/com/exasol/cloudetl/KafkaIntegrationTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.exasol.cloudetl.kafka

import com.exasol.ExaIterator

import net.manub.embeddedkafka.schemaregistry.EmbeddedKafka
import org.apache.avro.AvroRuntimeException
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecordBase
import org.mockito.Mockito.when
import org.scalatest.BeforeAndAfterEach
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar

trait KafkaIntegrationTest
extends FunSuite
with BeforeAndAfterEach
with MockitoSugar
with EmbeddedKafka {

val topic = "exasol-kafka-topic"

val properties = Map(
"BOOTSTRAP_SERVERS" -> "localhost:6001",
"SCHEMA_REGISTRY_URL" -> "http://localhost:6002",
"TOPICS" -> topic,
"TABLE_NAME" -> "exasolTable"
)

override final def beforeEach(): Unit = {
EmbeddedKafka.start()
()
}

override final def afterEach(): Unit = {
EmbeddedKafka.stop()
()
}

final def mockExasolIterator(
params: Map[String, String],
partitions: Seq[Int],
offsets: Seq[Long]
): ExaIterator = {
val mockedIterator = mock[ExaIterator]
when(mockedIterator.getString(0)).thenReturn(KafkaConsumerProperties(params).mkString())

val bHead :: bTail = Seq.fill(partitions.size - 1)(true) ++ Seq(false)
when(mockedIterator.next()).thenReturn(bHead, bTail: _*)

val pHead :: pTail = partitions.map(new java.lang.Integer(_))
when(mockedIterator.getInteger(1)).thenReturn(pHead, pTail: _*)

val oHead :: oTail = offsets.map(new java.lang.Long(_))
when(mockedIterator.getLong(2)).thenReturn(oHead, oTail: _*)

mockedIterator
}

private[this] val avroRecordSchema =
new Schema.Parser().parse(s"""{
| "namespace": "com.exasol.cloudetl",
| "type": "record",
| "name": "AvroRecordSchemaForIT",
| "fields": [
| {"name": "col_str", "type": "string"},
| {"name": "col_int", "type": "int"},
| {"name": "col_long", "type": "long"}
| ]
|}""".stripMargin)

case class AvroRecord(var col_str: String, var col_int: Int, var col_long: Long)
extends SpecificRecordBase {
def this() = this("", 0, 0)

override def get(index: Int): AnyRef = index match {
case 0 => col_str
case 1 => col_int.asInstanceOf[AnyRef]
case 2 => col_long.asInstanceOf[AnyRef]
case _ => throw new AvroRuntimeException(s"Unknown index $index!")
}

override def put(index: Int, value: Any): Unit = index match {
case 0 =>
col_str = value match {
case (utf8: org.apache.avro.util.Utf8) => utf8.toString
case _ => value.asInstanceOf[String]
}
case 1 =>
col_int = value.asInstanceOf[Int]
case 2 =>
col_long = value.asInstanceOf[Long]
case _ => throw new AvroRuntimeException(s"Unknown index $index!")
}

override def getSchema(): Schema = avroRecordSchema
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.exasol.cloudetl.kafka

import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.KafkaConsumer

@SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf"))
class KafkaConsumerPropertiesIT extends KafkaIntegrationTest {

test("build returns a KafkaConsumer[String, GenericRecord]") {
val kafkaConsumer = KafkaConsumerProperties(properties).build()
assert(kafkaConsumer.isInstanceOf[KafkaConsumer[String, GenericRecord]])
}

}
84 changes: 84 additions & 0 deletions src/it/scala/com/exasol/cloudetl/scriptclasses/KafkaImportIT.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.exasol.cloudetl.scriptclasses

import java.lang.{Integer => JInt}
import java.lang.{Long => JLong}

import com.exasol.ExaMetadata
import com.exasol.cloudetl.kafka.KafkaIntegrationTest

import org.mockito.ArgumentMatchers._
import org.mockito.Mockito.times
import org.mockito.Mockito.verify

class KafkaImportIT extends KafkaIntegrationTest {

test("run emits records from starting initial offset") {
createCustomTopic(topic)
publishToKafka(topic, AvroRecord("abc", 3, 13))
publishToKafka(topic, AvroRecord("hello", 4, 14))

val iter = mockExasolIterator(properties, Seq(0), Seq(-1))
KafkaImport.run(mock[ExaMetadata], iter)

verify(iter, times(2)).emit(Seq(any[Object]): _*)
verify(iter, times(2)).emit(
anyInt().asInstanceOf[JInt],
anyLong().asInstanceOf[JLong],
anyString(),
anyInt().asInstanceOf[JInt],
anyLong().asInstanceOf[JLong]
)
verify(iter, times(1)).emit(new JInt(0), new JLong(0), "abc", new JInt(3), new JLong(13))
verify(iter, times(1)).emit(new JInt(0), new JLong(1), "hello", new JInt(4), new JLong(14))
}

test("run emits records starting from provided offset") {
createCustomTopic(topic)
publishToKafka(topic, AvroRecord("abc", 3, 13))
publishToKafka(topic, AvroRecord("hello", 4, 14))
publishToKafka(topic, AvroRecord("def", 7, 17))
publishToKafka(topic, AvroRecord("xyz", 13, 23))

// records at 0, 1 are already read, committed
val iter = mockExasolIterator(properties, Seq(0), Seq(1))
KafkaImport.run(mock[ExaMetadata], iter)

verify(iter, times(2)).emit(Seq(any[Object]): _*)
verify(iter, times(2)).emit(
anyInt().asInstanceOf[JInt],
anyLong().asInstanceOf[JLong],
anyString(),
anyInt().asInstanceOf[JInt],
anyLong().asInstanceOf[JLong]
)
verify(iter, times(1)).emit(new JInt(0), new JLong(2), "def", new JInt(7), new JLong(17))
verify(iter, times(1)).emit(new JInt(0), new JLong(3), "xyz", new JInt(13), new JLong(23))
}

test("run emits records within min / max records per run") {
val newProperties = properties ++ Map(
"MAX_POLL_RECORDS" -> "2",
"MIN_RECORDS_PER_RUN" -> "2",
"MAX_RECORDS_PER_RUN" -> "4"
)
createCustomTopic(topic)
publishToKafka(topic, AvroRecord("abc", 3, 13))
publishToKafka(topic, AvroRecord("hello", 4, 14))
publishToKafka(topic, AvroRecord("def", 7, 17))
publishToKafka(topic, AvroRecord("xyz", 13, 23))

// comsumer in two batches each with 2 records
val iter = mockExasolIterator(newProperties, Seq(0), Seq(-1))
KafkaImport.run(mock[ExaMetadata], iter)

verify(iter, times(4)).emit(Seq(any[Object]): _*)
verify(iter, times(4)).emit(
anyInt().asInstanceOf[JInt],
anyLong().asInstanceOf[JLong],
anyString(),
anyInt().asInstanceOf[JInt],
anyLong().asInstanceOf[JLong]
)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.exasol.cloudetl.scriptclasses

import java.lang.{Integer => JInt}
import java.lang.{Long => JLong}

import com.exasol.ExaMetadata
import com.exasol.cloudetl.kafka.KafkaIntegrationTest

import org.mockito.ArgumentMatchers.anyInt
import org.mockito.ArgumentMatchers.anyLong
import org.mockito.Mockito._

@SuppressWarnings(
Array("org.wartremover.warts.AsInstanceOf", "org.wartremover.contrib.warts.SymbolicName")
)
class KafkaMetadataIT extends KafkaIntegrationTest {

// Default case where Exasol table is empty.
test("run emits default partitionId maxOffset pairs with single topic partition") {
val iter = mockExasolIterator(properties, Seq(0), Seq(-1))
KafkaMetadata.run(mock[ExaMetadata], iter)
verify(iter, times(1)).emit(anyInt().asInstanceOf[JInt], anyLong().asInstanceOf[JLong])
verify(iter, times(1)).emit(new JInt(0), new JLong(-1))
}

// Default case where Exasol table is empty.
test("run emits default partitionId maxOffset pairs with more topic partitions") {
createCustomTopic(topic, partitions = 3)
val iter = mockExasolIterator(properties, Seq(0), Seq(-1))
KafkaMetadata.run(mock[ExaMetadata], iter)
verify(iter, times(3)).emit(anyInt().asInstanceOf[JInt], anyLong().asInstanceOf[JLong])
Seq(0, 1, 2).foreach { partitionId =>
verify(iter, times(1)).emit(new JInt(partitionId), new JLong(-1))
}
}

test("run emits partitionId maxOffset pairs with additional topic partitions") {
createCustomTopic(topic, partitions = 3)
val partitions = Seq(0, 1)
val offsets = Seq(3L, 4L)
val iter = mockExasolIterator(properties, partitions, offsets)
KafkaMetadata.run(mock[ExaMetadata], iter)

verify(iter, times(3)).emit(anyInt().asInstanceOf[JInt], anyLong().asInstanceOf[JLong])
partitions.zip(offsets).foreach {
case (partitionId, maxOffset) =>
verify(iter, times(1)).emit(new JInt(partitionId), new JLong(maxOffset))
}
verify(iter, times(1)).emit(new JInt(2), new JLong(-1))
}

// Do not emit partitionId maxOffset pairs if partitionId is not
// available in topic partitions
test("run emits partitionId maxOffset pairs with fewer topic partitions") {
createCustomTopic(topic, partitions = 2)
val iter = mockExasolIterator(properties, Seq(1, 3), Seq(7, 17))
KafkaMetadata.run(mock[ExaMetadata], iter)

verify(iter, times(2)).emit(anyInt().asInstanceOf[JInt], anyLong().asInstanceOf[JLong])
verify(iter, times(1)).emit(new JInt(0), new JLong(-1))
verify(iter, times(1)).emit(new JInt(1), new JLong(7))
}

test("run throws if it cannot create KafkConsumer") {
createCustomTopic(topic)
val newProperties = properties + ("BOOTSTRAP_SERVERS" -> "kafka01.internal:9092")
val iter = mockExasolIterator(newProperties, Seq(0), Seq(-1))
val thrown = intercept[org.apache.kafka.common.KafkaException] {
KafkaMetadata.run(mock[ExaMetadata], iter)
}
assert(thrown.getMessage === "Failed to construct kafka consumer")
}

}
25 changes: 17 additions & 8 deletions src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
package com.exasol.cloudetl.bucket

import com.exasol.cloudetl.storage.StorageProperties

import org.apache.hadoop.conf.Configuration

/** A [[Bucket]] implementation for the Azure Data Lake Storage */
final case class AzureAdlsBucket(path: String, params: Map[String, String]) extends Bucket {
final case class AzureAdlsBucket(path: String, params: StorageProperties) extends Bucket {

private[this] val AZURE_CLIENT_ID: String = "AZURE_CLIENT_ID"
private[this] val AZURE_CLIENT_SECRET: String = "AZURE_CLIENT_SECRET"
private[this] val AZURE_DIRECTORY_ID: String = "AZURE_DIRECTORY_ID"

/** @inheritdoc */
override val bucketPath: String = path

/** @inheritdoc */
override val properties: Map[String, String] = params
override val properties: StorageProperties = params

/** @inheritdoc */
override def validate(): Unit =
Bucket.validate(properties, Bucket.AZURE_ADLS_PARAMETERS)
/**
* Returns the list of required property keys for Azure Data Lake
* Storage.
*/
override def getRequiredProperties(): Seq[String] =
Seq(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_DIRECTORY_ID)

/**
* @inheritdoc
Expand All @@ -25,9 +34,9 @@ final case class AzureAdlsBucket(path: String, params: Map[String, String]) exte
validate()

val conf = new Configuration()
val clientId = Bucket.requiredParam(params, "AZURE_CLIENT_ID")
val clientSecret = Bucket.requiredParam(params, "AZURE_CLIENT_SECRET")
val directoryId = Bucket.requiredParam(params, "AZURE_DIRECTORY_ID")
val clientId = properties.getString(AZURE_CLIENT_ID)
val clientSecret = properties.getString(AZURE_CLIENT_SECRET)
val directoryId = properties.getString(AZURE_DIRECTORY_ID)
val tokenEndpoint = s"https://login.microsoftonline.com/$directoryId/oauth2/token"
conf.set("fs.adl.impl", classOf[org.apache.hadoop.fs.adl.AdlFileSystem].getName)
conf.set("fs.AbstractFileSystem.adl.impl", classOf[org.apache.hadoop.fs.adl.Adl].getName)
Expand Down
Loading