From dba48fc41058e2c2125768811c2c932beaa4d1f7 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Thu, 31 Mar 2022 17:16:54 +0100 Subject: [PATCH 1/7] Fix auto-registration of union types --- build.sbt | 7 ++- .../fs2/kafka/vulcan/AvroSerializer.scala | 23 ++++--- .../scala/fs2/kafka/vulcan/AvroSettings.scala | 60 ++++++++++++++++--- .../fs2/kafka/vulcan/AvroSerializerSpec.scala | 20 +++++++ .../fs2/kafka/vulcan/AvroSettingsSpec.scala | 6 +- .../scala/fs2/kafka/vulcan/PackageSpec.scala | 8 +-- 6 files changed, 99 insertions(+), 25 deletions(-) diff --git a/build.sbt b/build.sbt index 42634c290..ecfc3efbc 100644 --- a/build.sbt +++ b/build.sbt @@ -284,7 +284,12 @@ lazy val mimaSettings = Seq( // sealed ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ConsumerSettings.withDeserializers"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers") + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"), + ProblemFilters.exclude[FinalMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"), + + // private + ProblemFilters.exclude[Problem]("fs2.kafka.vulcan.AvroSettings#AvroSettingsImpl.*") ) // format: on } diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala index 568e1c251..6fe067937 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala @@ -17,18 +17,23 @@ final class AvroSerializer[A] private[vulcan] ( def using[F[_]]( settings: AvroSettings[F] )(implicit F: Sync[F]): RecordSerializer[F, A] = { - val createSerializer: Boolean => F[Serializer[F, A]] = - settings.createAvroSerializer(_).map { - case (serializer, _) => - Serializer.instance { (topic, _, a) => - F.defer { - codec.encode(a) match { - case Right(value) => F.pure(serializer.serialize(topic, value)) - case Left(error) => F.raiseError(error.throwable) + val createSerializer: Boolean => F[Serializer[F, A]] = isKey => { + codec.schema match { + case Left(e) => F.pure(Serializer.fail(e.throwable)) + case Right(writerSchema) => + settings.createAvroSerializer(isKey, Some(writerSchema)).map { + case (serializer, _) => + Serializer.instance { (topic, _, a) => + F.defer { + codec.encode(a) match { + case Right(value) => F.pure(serializer.serialize(topic, value)) + case Left(error) => F.raiseError(error.throwable) + } + } } - } } } + } RecordSerializer.instance( forKey = createSerializer(true), diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala index d6e645c4c..fbcc3a484 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala @@ -11,6 +11,7 @@ import cats.syntax.all._ import fs2.kafka.internal.converters.collection._ import fs2.kafka.internal.syntax._ import io.confluent.kafka.schemaregistry.avro.AvroSchema +import org.apache.avro.Schema import vulcan.Codec /** @@ -103,7 +104,16 @@ sealed abstract class AvroSettings[F[_]] { * specified `isKey` flag, denoting whether a record key or * value is being serialized. */ - def createAvroSerializer(isKey: Boolean): F[(KafkaAvroSerializer, SchemaRegistryClient)] + def createAvroSerializer( + isKey: Boolean, + writerSchema: Option[Schema] + ): F[(KafkaAvroSerializer, SchemaRegistryClient)] + + @deprecated("use the overload that takes an optional writer schema", "2.5.0-M3") + final def createAvroSerializer( + isKey: Boolean + ): F[(KafkaAvroSerializer, SchemaRegistryClient)] = + createAvroSerializer(isKey, writerSchema = None) /** * Creates a new [[AvroSettings]] instance with the specified @@ -125,10 +135,20 @@ sealed abstract class AvroSettings[F[_]] { */ def withCreateAvroSerializer( // format: off - createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)] + createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)] // format: on ): AvroSettings[F] + @deprecated("use the overload that has an `Option[Schema]` argument", "2.5.0-M3") + final def withCreateAvroSerializer( + // format: off + createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)] + // format: on + ): AvroSettings[F] = + withCreateAvroSerializer( + (client, isKey, _, properties) => createAvroSerializerWith(client, isKey, properties) + ) + /** * Creates a new [[AvroSettings]] instance with the specified * function for registering schemas from settings. @@ -145,7 +165,7 @@ object AvroSettings { override val properties: Map[String, String], // format: off val createAvroDeserializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient)], - val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)], + val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)], val registerSchemaWith: (F[SchemaRegistryClient], String, Codec[_]) => F[Int] // format: on ) extends AvroSettings[F] { @@ -177,9 +197,10 @@ object AvroSettings { createAvroDeserializerWith(schemaRegistryClient, isKey, properties) override def createAvroSerializer( - isKey: Boolean + isKey: Boolean, + writerSchema: Option[Schema] ): F[(KafkaAvroSerializer, SchemaRegistryClient)] = - createAvroSerializerWith(schemaRegistryClient, isKey, properties) + createAvroSerializerWith(schemaRegistryClient, isKey, writerSchema, properties) override def registerSchema[A](subject: String)(implicit codec: Codec[A]): F[Int] = registerSchemaWith(schemaRegistryClient, subject, codec) @@ -193,7 +214,7 @@ object AvroSettings { override def withCreateAvroSerializer( // format: off - createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)] + createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)] // format: on ): AvroSettings[F] = copy(createAvroSerializerWith = createAvroSerializerWith) @@ -224,10 +245,33 @@ object AvroSettings { (deserializer, schemaRegistryClient) } }, - createAvroSerializerWith = (schemaRegistryClient, isKey, properties) => + createAvroSerializerWith = (schemaRegistryClient, isKey, schema, properties) => schemaRegistryClient.flatMap { schemaRegistryClient => F.delay { - val serializer = new KafkaAvroSerializer(schemaRegistryClient) + val serializer = schema match { + case None => new KafkaAvroSerializer(schemaRegistryClient) + case Some(schema) => + new KafkaAvroSerializer(schemaRegistryClient) { + // Overrides the default auto-registration behaviour, which attempts to guess the + // writer schema based on the encoded representation used by the Java Avro SDK. + // This works for types such as Records, which contain a reference to the exact schema + // that was used to write them, but doesn't work so well for unions (where + // the default behaviour is to register just the schema for the alternative + // being produced) or logical types such as timestamp-millis (where the logical + // type is lost). + val parsedSchema = new AvroSchema(schema.toString) + override def serialize(topic: String, record: Any): Array[Byte] = { + if (record == null) { + return null + } + serializeImpl( + getSubjectName(topic, isKey, record, parsedSchema), + record, + parsedSchema + ) + } + } + } serializer.configure(withDefaults(properties), isKey) (serializer, schemaRegistryClient) } diff --git a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSerializerSpec.scala b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSerializerSpec.scala index ca91ed7a0..cf2ff288d 100644 --- a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSerializerSpec.scala +++ b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSerializerSpec.scala @@ -2,6 +2,7 @@ package fs2.kafka.vulcan import cats.effect.IO import cats.effect.unsafe.implicits.global +import fs2.kafka.Headers import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient import org.scalatest.funspec.AnyFunSpec import vulcan.{AvroError, Codec} @@ -16,6 +17,25 @@ final class AvroSerializerSpec extends AnyFunSpec { assert(serializer.forValue.attempt.unsafeRunSync().isRight) } + it("auto-registers union schemas") { + (avroSerializer[Either[Int, Boolean]] + .using(avroSettings) + .forValue + .flatMap( + _.serialize( + "test-union-topic", + Headers.empty, + Right(true) + ) + )) + .unsafeRunSync() + assert( + schemaRegistryClient + .getLatestSchemaMetadata("test-union-topic-value") + .getSchema === """["int","boolean"]""" + ) + } + it("raises schema errors") { val codec: Codec[Int] = Codec.instance( diff --git a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSettingsSpec.scala b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSettingsSpec.scala index 401faee2f..35d041a45 100644 --- a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSettingsSpec.scala +++ b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroSettingsSpec.scala @@ -89,10 +89,10 @@ final class AvroSettingsSpec extends AnyFunSpec with ScalaCheckPropertyChecks { it("should provide withCreateAvroSerializer") { assert { settings - .withCreateAvroSerializer { - case _ => IO.raiseError(new RuntimeException) + .withCreateAvroSerializer { (_, _, _, _) => + IO.raiseError(new RuntimeException) } - .createAvroSerializer(isKey = false) + .createAvroSerializer(isKey = false, null) .attempt .unsafeRunSync() .isLeft diff --git a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala index a790a18d6..e8a9bcd8a 100644 --- a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala +++ b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala @@ -27,12 +27,12 @@ final class PackageSpec extends AnyFunSpec { describe("avroSerializer/avroDeserializer") { it("should be able to do roundtrip serialization") { (for { - serializer <- avroSerializer[Test].using(avroSettings).forValue + serializer <- avroSerializer[Either[Test, Int]].using(avroSettings).forValue test = Test("test") - serialized <- serializer.serialize("topic", Headers.empty, test) - deserializer <- avroDeserializer[Test].using(avroSettings).forValue + serialized <- serializer.serialize("topic", Headers.empty, Left(test)) + deserializer <- avroDeserializer[Either[Test, Int]].using(avroSettings).forValue deserialized <- deserializer.deserialize("topic", Headers.empty, serialized) - } yield assert(deserialized == test)).unsafeRunSync() + } yield assert(deserialized == Left(test))).unsafeRunSync() } it("should be able to do roundtrip serialization using compatible schemas") { From 3beabda577fb802454cc1e2f66ed39aa4eb3f2dc Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Thu, 31 Mar 2022 17:29:12 +0100 Subject: [PATCH 2/7] Fix scala 2.12 compilation --- .../vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala index fbcc3a484..b3990d04d 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala @@ -260,7 +260,7 @@ object AvroSettings { // being produced) or logical types such as timestamp-millis (where the logical // type is lost). val parsedSchema = new AvroSchema(schema.toString) - override def serialize(topic: String, record: Any): Array[Byte] = { + override def serialize(topic: String, record: AnyRef): Array[Byte] = { if (record == null) { return null } From c2ecb85f5512c097707461a91adcc01a9dcbafbf Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Thu, 31 Mar 2022 17:41:04 +0100 Subject: [PATCH 3/7] Scala 3.1.2-RC3 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index ecfc3efbc..7a044d9e3 100644 --- a/build.sbt +++ b/build.sbt @@ -16,7 +16,7 @@ val scala212 = "2.12.15" val scala213 = "2.13.8" -val scala3 = "3.1.1" +val scala3 = "3.1.2-RC3" lazy val `fs2-kafka` = project .in(file(".")) From c15a8fe55b13115c2ddc05f4f2f55ce7c53738d1 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Thu, 31 Mar 2022 17:43:13 +0100 Subject: [PATCH 4/7] Update githubworkflow --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 44dc5a69a..d37004598 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - scala: [2.12.15, 2.13.8, 3.1.1] + scala: [2.12.15, 2.13.8, 3.1.2-RC3] java: [adopt@1.8] runs-on: ${{ matrix.os }} steps: From 6d268b4020deaef162db3637645de470e147e788 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Thu, 31 Mar 2022 18:18:40 +0100 Subject: [PATCH 5/7] Scala 3 bug workaround --- build.sbt | 2 +- .../src/main/scala-3/kafka.util/VerifiableProperties.scala | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 modules/vulcan/src/main/scala-3/kafka.util/VerifiableProperties.scala diff --git a/build.sbt b/build.sbt index 7a044d9e3..ecfc3efbc 100644 --- a/build.sbt +++ b/build.sbt @@ -16,7 +16,7 @@ val scala212 = "2.12.15" val scala213 = "2.13.8" -val scala3 = "3.1.2-RC3" +val scala3 = "3.1.1" lazy val `fs2-kafka` = project .in(file(".")) diff --git a/modules/vulcan/src/main/scala-3/kafka.util/VerifiableProperties.scala b/modules/vulcan/src/main/scala-3/kafka.util/VerifiableProperties.scala new file mode 100644 index 000000000..1da5a6997 --- /dev/null +++ b/modules/vulcan/src/main/scala-3/kafka.util/VerifiableProperties.scala @@ -0,0 +1,5 @@ +package kafka.utils + +/* Workaround for https://github.com/lampepfl/dotty/issues/13523 and + https://github.com/confluentinc/schema-registry/issues/553 */ +private class VerifiableProperties From aa66c3f6f749d11c3e81300a6bf7a99a21c203a5 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Thu, 31 Mar 2022 18:20:11 +0100 Subject: [PATCH 6/7] Revert workflow changes --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d37004598..44dc5a69a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - scala: [2.12.15, 2.13.8, 3.1.2-RC3] + scala: [2.12.15, 2.13.8, 3.1.1] java: [adopt@1.8] runs-on: ${{ matrix.os }} steps: From 9b63123d437d71a41d20c8d780b8ec827fb24ca8 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Thu, 31 Mar 2022 18:37:49 +0100 Subject: [PATCH 7/7] Add header --- .../main/scala-3/kafka.util/VerifiableProperties.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/modules/vulcan/src/main/scala-3/kafka.util/VerifiableProperties.scala b/modules/vulcan/src/main/scala-3/kafka.util/VerifiableProperties.scala index 1da5a6997..6754b9b04 100644 --- a/modules/vulcan/src/main/scala-3/kafka.util/VerifiableProperties.scala +++ b/modules/vulcan/src/main/scala-3/kafka.util/VerifiableProperties.scala @@ -1,5 +1,10 @@ +/* + * Copyright 2018-2022 OVO Energy Limited + * + * SPDX-License-Identifier: Apache-2.0 + */ + package kafka.utils -/* Workaround for https://github.com/lampepfl/dotty/issues/13523 and - https://github.com/confluentinc/schema-registry/issues/553 */ +// Workaround for https://github.com/lampepfl/dotty/issues/13523 and https://github.com/confluentinc/schema-registry/issues/553 private class VerifiableProperties