Skip to content

Commit

Permalink
Add multi-topic convenience method
Browse files Browse the repository at this point in the history
  • Loading branch information
keirlawson committed May 2, 2020
1 parent 66b6d52 commit f6a80af
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,35 @@ final class AvroDeserializer[A] private[vulcan] (
object AvroDeserializer {
def apply[A](implicit codec: Codec[A]): AvroDeserializer[A] =
new AvroDeserializer(codec)

/**
* Creates a single RecordDeserializer that can handle multiple topics each with its own codec
*
* @param settings avro configuration such as for the schema registry
* @param pair the first topic, codec pair
* @param pairs subsequent topic, codec pairs
* @tparam F the type in which effects will be suspended
* @tparam A the type which this deserializer will produce
* @return a RecordDeserializer incorporating the supplied codecs
*/
def topics[F[_]: Sync, A](
settings: AvroSettings[F]
)(pair: (String, Codec[_ <: A]), pairs: (String, Codec[_ <: A])*): RecordDeserializer[F, A] = {
val all = pair :: pairs.toList
val desers = all.traverse {
case (topic, codec) =>
avroDeserializer(codec).using(settings).forValue.map(c => (topic, c.widen[A]))
}

val deserializer = desers.map { des =>
val fn = des.foldLeft(PartialFunction.empty[String, Deserializer[F, A]])((acc, p) => {
acc.orElse({
case p._1 => p._2
})
})
Deserializer.topic(fn)
}

RecordDeserializer.const(deserializer)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fs2.kafka.vulcan

import cats.effect.IO
import fs2.kafka.Headers
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import org.scalatest.funspec.AnyFunSpec
import vulcan.{AvroError, Codec}
Expand Down Expand Up @@ -35,8 +36,53 @@ final class AvroDeserializerSpec extends AnyFunSpec {
avroDeserializer[Int].toString() startsWith "AvroDeserializer$"
}
}

describe("topics") {
it("can create a deserializer from multiple codecs") {

val deserializer =
AvroDeserializer.topics(avroSettings)(
("someTopic", Codec[Int]),
("anotherTopic", Codec[Int])
)

assert(deserializer.forKey.attempt.unsafeRunSync().isRight)
assert(deserializer.forValue.attempt.unsafeRunSync().isRight)
}

it("can correctly differentiate between incoming topics") {

sealed trait SomeTrait
case class SomeCaseClass(someField: String) extends SomeTrait
case class AnotherCaseClass(anotherField: Int) extends SomeTrait

implicit val someCodec = Codec[String].imap(SomeCaseClass)(_.someField)
implicit val anotherCodec = Codec[Int].imap(AnotherCaseClass)(_.anotherField)

val deserializer =
AvroDeserializer.topics[IO, SomeTrait](avroSettings)(
("someTopic", someCodec),
("anotherTopic", anotherCodec)
).forValue.unsafeRunSync()

val someTopicResult = deserializer.deserialize("someTopic", Headers.empty, encode("someTopic", SomeCaseClass("someValue"))).unsafeRunSync()
val anotherTopicResult = deserializer.deserialize("anotherTopic", Headers.empty, encode("anotherTopic", AnotherCaseClass(123))).unsafeRunSync()

someTopicResult match {
case SomeCaseClass(s) => assert(s == "someValue")
case _ => fail()
}

anotherTopicResult match {
case AnotherCaseClass(i) => assert(i == 123)
case _ => fail()
}
}
}
}

def encode[A: Codec](topic: String, value: A) = AvroSerializer[A].using(avroSettings).forValue.flatMap(_.serialize(topic, Headers.empty, value)).unsafeRunSync()

val schemaRegistryClient: MockSchemaRegistryClient =
new MockSchemaRegistryClient()

Expand Down

0 comments on commit f6a80af

Please sign in to comment.