Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix auto-registration of Avro union (etc) types #932

Merged
merged 7 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,12 @@ lazy val mimaSettings = Seq(

// sealed
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ConsumerSettings.withDeserializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers")
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"),
ProblemFilters.exclude[FinalMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"),

// private
ProblemFilters.exclude[Problem]("fs2.kafka.vulcan.AvroSettings#AvroSettingsImpl.*")
)
// format: on
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ final class AvroSerializer[A] private[vulcan] (
def using[F[_]](
settings: AvroSettings[F]
)(implicit F: Sync[F]): RecordSerializer[F, A] = {
val createSerializer: Boolean => F[Serializer[F, A]] =
settings.createAvroSerializer(_).map {
case (serializer, _) =>
Serializer.instance { (topic, _, a) =>
F.defer {
codec.encode(a) match {
case Right(value) => F.pure(serializer.serialize(topic, value))
case Left(error) => F.raiseError(error.throwable)
val createSerializer: Boolean => F[Serializer[F, A]] = isKey => {
codec.schema match {
case Left(e) => F.pure(Serializer.fail(e.throwable))
case Right(writerSchema) =>
settings.createAvroSerializer(isKey, Some(writerSchema)).map {
case (serializer, _) =>
Serializer.instance { (topic, _, a) =>
F.defer {
codec.encode(a) match {
case Right(value) => F.pure(serializer.serialize(topic, value))
case Left(error) => F.raiseError(error.throwable)
}
}
}
}
}
}
}

RecordSerializer.instance(
forKey = createSerializer(true),
Expand Down
60 changes: 52 additions & 8 deletions modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import cats.syntax.all._
import fs2.kafka.internal.converters.collection._
import fs2.kafka.internal.syntax._
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import org.apache.avro.Schema
import vulcan.Codec

/**
Expand Down Expand Up @@ -103,7 +104,16 @@ sealed abstract class AvroSettings[F[_]] {
* specified `isKey` flag, denoting whether a record key or
* value is being serialized.
*/
def createAvroSerializer(isKey: Boolean): F[(KafkaAvroSerializer, SchemaRegistryClient)]
def createAvroSerializer(
isKey: Boolean,
writerSchema: Option[Schema]
): F[(KafkaAvroSerializer, SchemaRegistryClient)]

@deprecated("use the overload that takes an optional writer schema", "2.5.0-M3")
final def createAvroSerializer(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think before 3.0 we should refactor some of this to expose fewer internals.

isKey: Boolean
): F[(KafkaAvroSerializer, SchemaRegistryClient)] =
createAvroSerializer(isKey, writerSchema = None)

/**
* Creates a new [[AvroSettings]] instance with the specified
Expand All @@ -125,10 +135,20 @@ sealed abstract class AvroSettings[F[_]] {
*/
def withCreateAvroSerializer(
// format: off
createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
// format: on
): AvroSettings[F]

@deprecated("use the overload that has an `Option[Schema]` argument", "2.5.0-M3")
final def withCreateAvroSerializer(
// format: off
createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
// format: on
): AvroSettings[F] =
withCreateAvroSerializer(
(client, isKey, _, properties) => createAvroSerializerWith(client, isKey, properties)
)

/**
* Creates a new [[AvroSettings]] instance with the specified
* function for registering schemas from settings.
Expand All @@ -145,7 +165,7 @@ object AvroSettings {
override val properties: Map[String, String],
// format: off
val createAvroDeserializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient)],
val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)],
val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)],
val registerSchemaWith: (F[SchemaRegistryClient], String, Codec[_]) => F[Int]
// format: on
) extends AvroSettings[F] {
Expand Down Expand Up @@ -177,9 +197,10 @@ object AvroSettings {
createAvroDeserializerWith(schemaRegistryClient, isKey, properties)

override def createAvroSerializer(
isKey: Boolean
isKey: Boolean,
writerSchema: Option[Schema]
): F[(KafkaAvroSerializer, SchemaRegistryClient)] =
createAvroSerializerWith(schemaRegistryClient, isKey, properties)
createAvroSerializerWith(schemaRegistryClient, isKey, writerSchema, properties)

override def registerSchema[A](subject: String)(implicit codec: Codec[A]): F[Int] =
registerSchemaWith(schemaRegistryClient, subject, codec)
Expand All @@ -193,7 +214,7 @@ object AvroSettings {

override def withCreateAvroSerializer(
// format: off
createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
// format: on
): AvroSettings[F] =
copy(createAvroSerializerWith = createAvroSerializerWith)
Expand Down Expand Up @@ -224,10 +245,33 @@ object AvroSettings {
(deserializer, schemaRegistryClient)
}
},
createAvroSerializerWith = (schemaRegistryClient, isKey, properties) =>
createAvroSerializerWith = (schemaRegistryClient, isKey, schema, properties) =>
schemaRegistryClient.flatMap { schemaRegistryClient =>
F.delay {
val serializer = new KafkaAvroSerializer(schemaRegistryClient)
val serializer = schema match {
case None => new KafkaAvroSerializer(schemaRegistryClient)
case Some(schema) =>
new KafkaAvroSerializer(schemaRegistryClient) {
// Overrides the default auto-registration behaviour, which attempts to guess the
// writer schema based on the encoded representation used by the Java Avro SDK.
// This works for types such as Records, which contain a reference to the exact schema
// that was used to write them, but doesn't work so well for unions (where
// the default behaviour is to register just the schema for the alternative
// being produced) or logical types such as timestamp-millis (where the logical
// type is lost).
val parsedSchema = new AvroSchema(schema.toString)
override def serialize(topic: String, record: Any): Array[Byte] = {
if (record == null) {
return null
}
serializeImpl(
getSubjectName(topic, isKey, record, parsedSchema),
record,
parsedSchema
)
}
}
}
serializer.configure(withDefaults(properties), isKey)
(serializer, schemaRegistryClient)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fs2.kafka.vulcan

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import fs2.kafka.Headers
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import org.scalatest.funspec.AnyFunSpec
import vulcan.{AvroError, Codec}
Expand All @@ -16,6 +17,25 @@ final class AvroSerializerSpec extends AnyFunSpec {
assert(serializer.forValue.attempt.unsafeRunSync().isRight)
}

it("auto-registers union schemas") {
(avroSerializer[Either[Int, Boolean]]
.using(avroSettings)
.forValue
.flatMap(
_.serialize(
"test-union-topic",
Headers.empty,
Right(true)
)
))
.unsafeRunSync()
assert(
schemaRegistryClient
.getLatestSchemaMetadata("test-union-topic-value")
.getSchema === """["int","boolean"]"""
)
}

it("raises schema errors") {
val codec: Codec[Int] =
Codec.instance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ final class AvroSettingsSpec extends AnyFunSpec with ScalaCheckPropertyChecks {
it("should provide withCreateAvroSerializer") {
assert {
settings
.withCreateAvroSerializer {
case _ => IO.raiseError(new RuntimeException)
.withCreateAvroSerializer { (_, _, _, _) =>
IO.raiseError(new RuntimeException)
}
.createAvroSerializer(isKey = false)
.createAvroSerializer(isKey = false, null)
.attempt
.unsafeRunSync()
.isLeft
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ final class PackageSpec extends AnyFunSpec {
describe("avroSerializer/avroDeserializer") {
it("should be able to do roundtrip serialization") {
(for {
serializer <- avroSerializer[Test].using(avroSettings).forValue
serializer <- avroSerializer[Either[Test, Int]].using(avroSettings).forValue
test = Test("test")
serialized <- serializer.serialize("topic", Headers.empty, test)
deserializer <- avroDeserializer[Test].using(avroSettings).forValue
serialized <- serializer.serialize("topic", Headers.empty, Left(test))
deserializer <- avroDeserializer[Either[Test, Int]].using(avroSettings).forValue
deserialized <- deserializer.deserialize("topic", Headers.empty, serialized)
} yield assert(deserialized == test)).unsafeRunSync()
} yield assert(deserialized == Left(test))).unsafeRunSync()
}

it("should be able to do roundtrip serialization using compatible schemas") {
Expand Down