Skip to content

Latest commit

 

History

History
52 lines (38 loc) · 1.59 KB

README.md

File metadata and controls

52 lines (38 loc) · 1.59 KB

Confluent Spark Avro

Spark UDFs to deserialize Avro messages with schemas stored in Schema Registry. More details about Schema Registry on the official website.

Usages

We expect that you use it together with native Spark Kafka Reader.

val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "topic1")
    .load()

val utils = ConfluentSparkAvroUtils("http://schema-registry.my-company.com:8081")
val keyDeserializer = utils.deserializerForSubject("topic1-key")
val valueDeserialzer = utils.deserializerForSubject("topic1-value")

df.select(
    keyDeserializer(col("key").alias("key")),
    valueDeserializer(col("value").alias("value"))
).show(10)

Data decryption

With this same sample code above you can read data encrypted with AES256 with KMS, except it expect encrypted data to use specific format: [magic byte (value 2 or 3) | encrypted aes256 key | encrypted avro data]

Build

The tool is designed to be used with Spark >= 2.0.2.

sbt assembly
ll target/scala-2.11/confluent-spark-avro-assembly-1.0.jar

Testing

We haven't added unit tests, but you can test UDFs with the next command:

sbt "project confluent-spark-avro" "run kafka.host:9092 http://schema-registry.host:8081 kafka.topic"

TODO

[ ] Spark UDFs to serialize messages.

License

The project is licensed under the Apache 2 license.