diff --git a/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala b/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala index fdc16c523..cce06abeb 100644 --- a/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala +++ b/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala @@ -6,7 +6,8 @@ package fs2.kafka -import cats.Applicative +import cats.syntax.all._ +import cats.{Applicative, Functor} /** * Deserializer which may vary depending on whether a record @@ -17,6 +18,14 @@ sealed abstract class RecordDeserializer[F[_], A] { def forKey: F[Deserializer[F, A]] def forValue: F[Deserializer[F, A]] + + /** + * Returns a new [[RecordDeserializer]] instance that will catch deserialization + * errors and return them as a value, allowing user code to handle them without + * causing the consumer to fail. + */ + final def attempt(implicit F: Functor[F]): RecordDeserializer[F, Either[Throwable, A]] = + RecordDeserializer.instance(forKey.map(_.attempt), forValue.map(_.attempt)) } object RecordDeserializer {