From 1364651ce6c48b78a872f022a05065bcf7e1dc19 Mon Sep 17 00:00:00 2001 From: Adrien Bestel Date: Fri, 12 Apr 2024 12:12:22 +0100 Subject: [PATCH] fix: memory leak in Cats Effects Some context to start: Cats Effects has been having memory leaks in CallbackStack since version 3.4.3. See for example: https://github.com/typelevel/cats-effect/issues/3935 I've been facing this memory leak in an application using fs2-kafka, and found that I'm not the only one (https://github.com/typelevel/cats-effect/pull/3973). Using a simple consumer > produce stream application, I monitored the size of the CallbackStack using the following command: ``` while sleep 1; do jcmd GC.class_histogram | grep 'cats.effect.CallbackStack$Node' ; done ``` I found that swapping the `F.race(shutdown, fetch)` for `fetch` stops the memory leak. This should not be an issue because the Stream is anyway interrupted on `.interruptWhen(F.race(shutdown, stopReqs.get).void.attempt)`, but I'm not 100% convinced of this. --- .../main/scala/fs2/kafka/KafkaConsumer.scala | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 313d618c3..57eb13ca9 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -170,7 +170,7 @@ object KafkaConsumer { stopReqs <- Deferred[F, Unit] } yield Stream .eval { - def fetchPartition: F[Unit] = F + val fetchPartition: F[Unit] = F .deferred[PartitionResult] .flatMap { deferred => val callback: PartitionResult => F[Unit] = @@ -202,19 +202,15 @@ object KafkaConsumer { assigned.ifM(storeFetch, completeRevoked) } >> deferred.get - F.race(shutdown, fetch) - .flatMap { - case Left(()) => - stopReqs.complete(()).void - case Right((chunk, reason)) => - val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty) + fetch.flatMap { case (chunk, reason) => + val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty) - val completeRevoked = - stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked) + val completeRevoked = + stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked) - enqueueChunk >> completeRevoked - } + enqueueChunk >> completeRevoked + } } Stream