Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-topic convenience method #359

Closed
wants to merge 1 commit into from

Conversation

keirlawson
Copy link
Contributor

The equivalent of Deserializer.topic for use with Vulcan codecs.

I found myself having to write this when creating a consumer that read from multiple topics, hopefully it might be useful to others.

@keirlawson keirlawson force-pushed the vulcan-multi-topic branch from 8dde0df to f6a80af Compare May 2, 2020 17:46
@codecov-io
Copy link

Codecov Report

Merging #359 into master will increase coverage by 0.04%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #359      +/-   ##
==========================================
+ Coverage   91.29%   91.34%   +0.04%     
==========================================
  Files          62       62              
  Lines        1447     1455       +8     
  Branches       54       54              
==========================================
+ Hits         1321     1329       +8     
  Misses        126      126              
Impacted Files Coverage Δ
...main/scala/fs2/kafka/vulcan/AvroDeserializer.scala 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 66b6d52...f6a80af. Read the comment docs.

@vlovgr
Copy link
Contributor

vlovgr commented May 5, 2020

Thanks @keirlawson! Is there anything that prevents you from creating a coproduct of the events from the different topics? If you define a union Codec for such a coproduct, then you can simply use that type to deserialize records from all topics, without having to rely on topic names.

@SystemFw
Copy link
Member

SystemFw commented May 5, 2020

Hey @vlovgr , I had at least a use case where this was still useful, since that was some information encoded in the topic names that had to be propagated the Vulcan codecs. I wrote my own solution, which is quite a lot uglier than this PR, judge by yourself:

  def multiTopicDeserializer[F[_]: Sync, A](
      codecs: PartialFunction[String, Codec[_ <: A]]
  ): AvroSettings[F] => RecordDeserializer[F, A] = {
    def toF[B](fb: Either[AvroError, B]): F[B] =
      Sync[F].fromEither(fb.leftMap(_.throwable))

    def getCodec(topic: String) =
      Sync[F].fromOption(
        codecs.lift.apply(topic),
        new KafkaException(s"fs2.kafka.UnexpectedTopicException: unexpected topic [$topic]")
      )

    def deserializer(
        native: KafkaAvroDeserializer,
        client: SchemaRegistryClient
    ): Deserializer[F, A] =
      Deserializer
        .instance { (topic, _, bytes) =>
          getCodec(topic).flatMap { codec =>
            toF(codec.schema).flatMap { schema =>
              Sync[F].suspend {
                val writerSchemaId = ByteBuffer.wrap(bytes).getInt(1) // skip magic byte
                val writerSchema = client.getById(writerSchemaId)

                toF(codec.decode(native.deserialize(topic, bytes, schema), writerSchema))
              }
            }
          }
        }

    settings =>
      RecordDeserializer.instance(
        forKey = settings.createAvroDeserializer(true).map((deserializer _).tupled),
        forValue = settings.createAvroDeserializer(false).map((deserializer _).tupled)
      )
  }

@keirlawson
Copy link
Contributor Author

@vlovgr happy to hear there's an easier way :-) I just tried using Codec.union to achieve this however and it doesn't seem to work, when I run I get vulcan.AvroException: Got unexpected schema type RECORD while decoding union, expected schema type UNION

@marcodippy
Copy link

marcodippy commented May 11, 2020

I just tried using Codec.union to achieve this however and it doesn't seem to work, when I run I get vulcan.AvroException: Got unexpected schema type RECORD while decoding union, expected schema type UNION

hey @vlovgr :) I just wanted to confirm that that trick doesn't work anymore with the latest fs2-kafka / vulcan

@vlovgr
Copy link
Contributor

vlovgr commented May 12, 2020

Yes, I believe that stopped working with #282. I've created a pull request (fd4s/vulcan#191) to change the union decoding to also accept schemas in the union (in addition to the union schema), which should fix the issue.

Sounds like the addition in this pull request should still be useful in special circumstances.

@mwz
Copy link

mwz commented Jul 23, 2020

I had at least a use case where this was still useful, since that was some information encoded in the topic names that had to be propagated the Vulcan codecs.

Hey @SystemFw, I'm quite curious about your use case, can you share any more details on why you need to do this? (feel free to DM me if you don't want to talk about it publicly)

@SystemFw
Copy link
Member

SystemFw commented Oct 2, 2020

Is there any reason why this never got merged?

@vlovgr
Copy link
Contributor

vlovgr commented Oct 2, 2020

I took a quick look now, and I think this assumption about forValue makes it only work for values.

@SystemFw
Copy link
Member

SystemFw commented Oct 2, 2020

does the uglier solution I posted above suffer from the same limitation?

@vlovgr
Copy link
Contributor

vlovgr commented Oct 2, 2020

No, that solution looks alright to me. 👍

@SystemFw
Copy link
Member

SystemFw commented Oct 2, 2020

I'll see if I can PR that then

@keirlawson keirlawson closed this Oct 2, 2020
@SystemFw SystemFw reopened this Oct 2, 2020
Base automatically changed from master to series/1.x January 22, 2021 14:40
@aartigao
Copy link
Contributor

I'll see if I can PR that then

Hey @SystemFw, I can't find your PR for that. Do you feel this is still needed after +3 years? Shall we re-discuss it for 3.x?

@keirlawson keirlawson closed this Dec 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

7 participants