Skip to content

Commit

Permalink
Merge pull request #39 from exasol/feature/kafka-import
Browse files Browse the repository at this point in the history
Kafka Consumer as Exasol IMPORT UDF
  • Loading branch information
morazow authored Oct 4, 2019
2 parents a2c3985 + fa37167 commit a9fd757
Show file tree
Hide file tree
Showing 9 changed files with 431 additions and 26 deletions.
5 changes: 5 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ object Dependencies {
private val ParquetVersion = "1.8.1"
private val AzureStorageVersion = "2.2.0"
private val GoogleStorageVersion = "hadoop2-1.9.10"
private val KafkaClientsVersion = "2.3.0"
private val KafkaAvroSerializerVersion = "5.2.1"
private val TypesafeLoggingVersion = "3.9.0"

val Resolvers: Seq[Resolver] = Seq(
"Confluent Maven Repo" at "http://packages.confluent.io/maven/",
"Exasol Releases" at "https://maven.exasol.com/artifactory/exasol-releases"
)

Expand All @@ -31,6 +34,8 @@ object Dependencies {
"org.apache.parquet" % "parquet-avro" % ParquetVersion,
"com.microsoft.azure" % "azure-storage" % AzureStorageVersion,
"com.google.cloud.bigdataoss" % "gcs-connector" % GoogleStorageVersion,
"org.apache.kafka" % "kafka-clients" % KafkaClientsVersion,
"io.confluent" % "kafka-avro-serializer" % KafkaAvroSerializerVersion,
"com.typesafe.scala-logging" %% "scala-logging" % TypesafeLoggingVersion
)

Expand Down
24 changes: 1 addition & 23 deletions src/main/scala/com/exasol/cloudetl/avro/AvroRowIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.exasol.cloudetl.data.Row

import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.util.Utf8

/**
* An object that creates a [[com.exasol.cloudetl.data.Row]] iterator
Expand Down Expand Up @@ -37,29 +36,8 @@ object AvroRowIterator {
throw new NoSuchElementException("Avro reader called next on an empty iterator!")
}
val record = reader.next()
recordToRow(record)
Row.fromAvroGenericRecord(record)
}
}

private[this] def recordToRow(record: GenericRecord): Row = {
val size = record.getSchema.getFields.size
val values = Array.ofDim[Any](size)
for { index <- 0 until size } {
values.update(index, convertRecordValue(record.get(index)))
}
Row(values.toSeq)
}

@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
private[this] def convertRecordValue(value: Any): Any = value match {
case _: GenericRecord =>
throw new IllegalArgumentException("Avro nested record type is not supported yet!")
case _: java.util.Collection[_] =>
throw new IllegalArgumentException("Avro collection type is not supported yet!")
case _: java.util.Map[_, _] =>
throw new IllegalArgumentException("Avro map type is not supported yet!")
case _: Utf8 => value.asInstanceOf[Utf8].toString
case primitiveType => primitiveType
}

}
6 changes: 3 additions & 3 deletions src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ object Bucket extends LazyLogging {

/**
* Checks if the provided key is available in the key value parameter
* map. If it does not exist, throws an [[IllegalArgumentException]]
* exception.
* map. If it does not exist, throws an
* [[java.lang.IllegalArgumentException]] exception.
*/
private[bucket] def requiredParam(params: Map[String, String], key: String): String = {
def requiredParam(params: Map[String, String], key: String): String = {
val opt = params.get(key)
opt.fold {
throw new IllegalArgumentException(s"The required parameter $key is not defined!")
Expand Down
68 changes: 68 additions & 0 deletions src/main/scala/com/exasol/cloudetl/data/Row.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package com.exasol.cloudetl.data

import scala.collection.JavaConverters._

import org.apache.avro.Schema
import org.apache.avro.generic.GenericFixed
import org.apache.avro.generic.GenericRecord
import org.apache.avro.util.Utf8

/**
* The internal class that holds column data in an array.
*/
Expand All @@ -25,3 +32,64 @@ final case class Row(protected[data] val values: Seq[Any]) {
values

}

/**
* A companion object to the internal [[Row]] data structure with helper
* functions.
*/
object Row {

/**
* Returns a [[Row]] from [[org.apache.avro.generic.GenericRecord]]
* data.
*/
def fromAvroGenericRecord(record: GenericRecord): Row = {
val size = record.getSchema.getFields.size
val values = Array.ofDim[Any](size)
record.getSchema.getFields.asScala.zipWithIndex.foreach {
case (field, index) =>
values.update(index, getAvroRecordValue(record.get(index), field.schema))
}
Row(values.toSeq)
}

@SuppressWarnings(
Array(
"org.wartremover.warts.AsInstanceOf",
"org.wartremover.warts.Null",
"org.wartremover.warts.Return",
"org.wartremover.warts.ToString"
)
)
def getAvroRecordValue(value: Any, field: Schema): Any = {
// scalastyle:off
if (value == null) {
return null
}
// scalastyle:on
field.getType match {
case Schema.Type.NULL => value
case Schema.Type.BOOLEAN => value
case Schema.Type.INT => value
case Schema.Type.LONG => value
case Schema.Type.FLOAT => value
case Schema.Type.DOUBLE => value
case Schema.Type.STRING => value.toString
case Schema.Type.ENUM => value.toString
case Schema.Type.UNION => getAvroUnionValue(value, field)
case Schema.Type.FIXED => value.asInstanceOf[GenericFixed].bytes().toString
case Schema.Type.BYTES => value.asInstanceOf[Utf8].toString
case field =>
throw new IllegalArgumentException(s"Avro ${field.getName} type is not supperted!")
}
}

def getAvroUnionValue(value: Any, field: Schema): Any = field.getTypes.asScala.toSeq match {
case Seq(f) => getAvroRecordValue(value, f)
case Seq(n, f) if n.getType == Schema.Type.NULL => getAvroRecordValue(value, f)
case Seq(f, n) if n.getType == Schema.Type.NULL => getAvroRecordValue(value, f)
case _ =>
throw new IllegalArgumentException("Avro UNION type should contain a primitive and null!")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.exasol.cloudetl.kafka

import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.KafkaConsumer

/**
* A companion object to the
* [[org.apache.kafka.clients.consumer.KafkaConsumer]] class.
*/
object KafkaConsumerBuilder {

@SuppressWarnings(Array("org.wartremover.warts.Null"))
def apply(params: Map[String, String]): KafkaConsumer[String, GenericRecord] = {
val configs = KafkaConsumerProperties.fromImportParameters[GenericRecord](params)
new KafkaConsumer[String, GenericRecord](
configs.getProperties(),
configs.keyDeserializerOpt.orNull,
configs.valueDeserializerOpt.orNull
)
}

}
181 changes: 181 additions & 0 deletions src/main/scala/com/exasol/cloudetl/kafka/KafkaConsumerProperties.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package com.exasol.cloudetl.kafka

import scala.collection.JavaConverters._
import scala.collection.mutable.{Map => MMap}

import com.exasol.cloudetl.bucket.Bucket

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.StringDeserializer

/**
* A companion object to the [[KafkaConsumerProperties]] class.
*
* It provides helper functions such as {@code apply} and {@code create}
* for convenient properties construction.
*/
@SuppressWarnings(Array("org.wartremover.warts.Overloading", "org.wartremover.warts.Null"))
object KafkaConsumerProperties {

/**
* Creates Kafka consumer properties with optional key and value
* deserializers.
*/
def apply[K, V](
properties: Map[String, String],
keyDeserializer: Option[Deserializer[K]],
valueDeserializer: Option[Deserializer[V]]
): KafkaConsumerProperties[K, V] = {
require(
keyDeserializer != null &&
(keyDeserializer.isDefined || properties
.contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)),
"Key deserializer should be defined or specified in properties!"
)
require(
valueDeserializer != null &&
(valueDeserializer.isDefined || properties
.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)),
"Value deserializer should be defined or specified in properties!"
)
new KafkaConsumerProperties[K, V](properties, keyDeserializer, valueDeserializer)
}

/**
* Creates Kafka consumer properties with explicitly provided key and
* value deserializers.
*/
def apply[K, V](
properties: Map[String, String],
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]
): KafkaConsumerProperties[K, V] =
apply(properties, Option(keyDeserializer), Option(valueDeserializer))

@SuppressWarnings(
Array(
"org.wartremover.warts.AsInstanceOf",
"org.wartremover.warts.MutableDataStructures",
"org.wartremover.warts.NonUnitStatements"
)
)
def fromImportParameters[V](
importParams: Map[String, String]
): KafkaConsumerProperties[String, V] = {
val params = MMap.empty[String, String]
params.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
params.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Bucket.requiredParam(importParams, "BROKER_ADDRESS")
)
params.put(ConsumerConfig.GROUP_ID_CONFIG, Bucket.requiredParam(importParams, "GROUP_ID"))
val schemaRegistryUrl = Bucket.requiredParam(importParams, "SCHEMA_REGISTRY_URL")
params.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
val sslEnabled = Bucket.optionalParameter(importParams, "SSL_ENABLED", "false")
if (sslEnabled.equals("true")) {
params.put(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Bucket.requiredParam(importParams, "SECURITY_PROTOCOL")
)
params.put(
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
Bucket.requiredParam(importParams, "SSL_KEYSTORE_LOCATION")
)
params.put(
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
Bucket.requiredParam(importParams, "SSL_KEYSTORE_PASSWORD")
)
params.put(
SslConfigs.SSL_KEY_PASSWORD_CONFIG,
Bucket.requiredParam(importParams, "SSL_KEY_PASSWORD")
)
params.put(
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
Bucket.requiredParam(importParams, "SSL_TRUSTSTORE_LOCATION")
)
params.put(
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
Bucket.requiredParam(importParams, "SSL_TRUSTSTORE_PASSWORD")
)
}

KafkaConsumerProperties(
params.toMap,
new StringDeserializer,
createAvroDeserializer(schemaRegistryUrl).asInstanceOf[Deserializer[V]]
)

}

def createAvroDeserializer(schemaRegistryUrl: String): KafkaAvroDeserializer = {
// The schema registry url should be provided here since the one
// configured in consumer properties is not for the deserializer.
val deserializerConfig = Map(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl
)
val kafkaAvroDeserializer = new KafkaAvroDeserializer
kafkaAvroDeserializer.configure(deserializerConfig.asJava, false)
kafkaAvroDeserializer
}

}

/**
* A properties holder class for Kafka consumers.
*
* It is parameterized on key/value deserializer types.
*/
class KafkaConsumerProperties[K, V](
val properties: Map[String, String],
val keyDeserializerOpt: Option[Deserializer[K]],
val valueDeserializerOpt: Option[Deserializer[V]]
) {

/**
* A comma-separated collection of host/port pairs in order to connect
* to Kafka brokers.
*/
final def withBootstrapServers(bootstrapServers: String): KafkaConsumerProperties[K, V] =
withProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)

/**
* A unique identifier the consumer group this consumer belongs.
*/
final def withGroupId(groupId: String): KafkaConsumerProperties[K, V] =
withProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)

/**
* A schema registry url this consumer can use.
*/
final def withSchemaRegistryUrl(schemaRegistryUrl: String): KafkaConsumerProperties[K, V] =
withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)

/**
* Sets or updates key/value Kafka consumer property.
*/
final def withProperty(key: String, value: String): KafkaConsumerProperties[K, V] =
copy(properties = properties.updated(key, value))

/**
* Returns the Kafka consumer properties as Java map.
*/
final def getProperties(): java.util.Map[String, AnyRef] =
properties.asInstanceOf[Map[String, AnyRef]].asJava

@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments"))
private[this] def copy(
properties: Map[String, String],
keyDeserializer: Option[Deserializer[K]] = keyDeserializerOpt,
valueDeserializer: Option[Deserializer[V]] = valueDeserializerOpt
): KafkaConsumerProperties[K, V] =
new KafkaConsumerProperties[K, V](
properties,
keyDeserializer,
valueDeserializer
)
}
Loading

0 comments on commit a9fd757

Please sign in to comment.