Skip to content

Commit

Permalink
Add attempt method to RecordDeserializer
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer committed Mar 9, 2022
1 parent 8f9c1e9 commit 259ba4c
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

package fs2.kafka

import cats.Applicative
import cats.syntax.all._
import cats.{Applicative, Functor}

/**
* Deserializer which may vary depending on whether a record
Expand All @@ -17,6 +18,14 @@ sealed abstract class RecordDeserializer[F[_], A] {
def forKey: F[Deserializer[F, A]]

def forValue: F[Deserializer[F, A]]

/**
* Returns a new [[RecordDeserializer]] instance that will catch deserialization
* errors and return them as a value, allowing user code to handle them without
* causing the consumer to fail.
*/
final def attempt(implicit F: Functor[F]): RecordDeserializer[F, Either[Throwable, A]] =
RecordDeserializer.instance(forKey.map(_.attempt), forValue.map(_.attempt))
}

object RecordDeserializer {
Expand Down

0 comments on commit 259ba4c

Please sign in to comment.