From f6a80af4f404a200857d498749cf8ef514aeeca4 Mon Sep 17 00:00:00 2001 From: Keir Lawson Date: Sat, 2 May 2020 17:01:13 +0100 Subject: [PATCH] Add multi-topic convenience method --- .../fs2/kafka/vulcan/AvroDeserializer.scala | 31 +++++++++++++ .../kafka/vulcan/AvroDeserializerSpec.scala | 46 +++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala index 2cd663702..253ed383c 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala @@ -55,4 +55,35 @@ final class AvroDeserializer[A] private[vulcan] ( object AvroDeserializer { def apply[A](implicit codec: Codec[A]): AvroDeserializer[A] = new AvroDeserializer(codec) + + /** + * Creates a single RecordDeserializer that can handle multiple topics each with its own codec + * + * @param settings avro configuration such as for the schema registry + * @param pair the first topic, codec pair + * @param pairs subsequent topic, codec pairs + * @tparam F the type in which effects will be suspended + * @tparam A the type which this deserializer will produce + * @return a RecordDeserializer incorporating the supplied codecs + */ + def topics[F[_]: Sync, A]( + settings: AvroSettings[F] + )(pair: (String, Codec[_ <: A]), pairs: (String, Codec[_ <: A])*): RecordDeserializer[F, A] = { + val all = pair :: pairs.toList + val desers = all.traverse { + case (topic, codec) => + avroDeserializer(codec).using(settings).forValue.map(c => (topic, c.widen[A])) + } + + val deserializer = desers.map { des => + val fn = des.foldLeft(PartialFunction.empty[String, Deserializer[F, A]])((acc, p) => { + acc.orElse({ + case p._1 => p._2 + }) + }) + Deserializer.topic(fn) + } + + RecordDeserializer.const(deserializer) + } } diff --git a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroDeserializerSpec.scala b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroDeserializerSpec.scala index 126389230..b0a45facc 100644 --- a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroDeserializerSpec.scala +++ b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/AvroDeserializerSpec.scala @@ -1,6 +1,7 @@ package fs2.kafka.vulcan import cats.effect.IO +import fs2.kafka.Headers import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient import org.scalatest.funspec.AnyFunSpec import vulcan.{AvroError, Codec} @@ -35,8 +36,53 @@ final class AvroDeserializerSpec extends AnyFunSpec { avroDeserializer[Int].toString() startsWith "AvroDeserializer$" } } + + describe("topics") { + it("can create a deserializer from multiple codecs") { + + val deserializer = + AvroDeserializer.topics(avroSettings)( + ("someTopic", Codec[Int]), + ("anotherTopic", Codec[Int]) + ) + + assert(deserializer.forKey.attempt.unsafeRunSync().isRight) + assert(deserializer.forValue.attempt.unsafeRunSync().isRight) + } + + it("can correctly differentiate between incoming topics") { + + sealed trait SomeTrait + case class SomeCaseClass(someField: String) extends SomeTrait + case class AnotherCaseClass(anotherField: Int) extends SomeTrait + + implicit val someCodec = Codec[String].imap(SomeCaseClass)(_.someField) + implicit val anotherCodec = Codec[Int].imap(AnotherCaseClass)(_.anotherField) + + val deserializer = + AvroDeserializer.topics[IO, SomeTrait](avroSettings)( + ("someTopic", someCodec), + ("anotherTopic", anotherCodec) + ).forValue.unsafeRunSync() + + val someTopicResult = deserializer.deserialize("someTopic", Headers.empty, encode("someTopic", SomeCaseClass("someValue"))).unsafeRunSync() + val anotherTopicResult = deserializer.deserialize("anotherTopic", Headers.empty, encode("anotherTopic", AnotherCaseClass(123))).unsafeRunSync() + + someTopicResult match { + case SomeCaseClass(s) => assert(s == "someValue") + case _ => fail() + } + + anotherTopicResult match { + case AnotherCaseClass(i) => assert(i == 123) + case _ => fail() + } + } + } } + def encode[A: Codec](topic: String, value: A) = AvroSerializer[A].using(avroSettings).forValue.flatMap(_.serialize(topic, Headers.empty, value)).unsafeRunSync() + val schemaRegistryClient: MockSchemaRegistryClient = new MockSchemaRegistryClient()