Skip to content

Commit

Permalink
Fix SyntaxSpec for KafkaFuture.cancelable
Browse files Browse the repository at this point in the history
  • Loading branch information
aartigao committed Oct 16, 2023
1 parent d3de204 commit 1bf3e58
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package fs2.kafka.internal

import cats.effect.unsafe.implicits.global
import cats.effect.IO
import cats.effect.std.CountDownLatch
import fs2.kafka._
import fs2.kafka.BaseSpec
import fs2.kafka.internal.syntax._
Expand Down Expand Up @@ -55,27 +56,29 @@ final class SyntaxSpec extends BaseSpec {

describe("KafkaFuture.cancelable") {
it("should cancel future when fiber is cancelled") {
@volatile var isFutureCancelled = false

val test =
for {
started <- IO(new AtomicBoolean)
latch <- CountDownLatch[IO](1)
isFutureStarted <- IO(new AtomicBoolean)
isFutureCancelled <- IO(new AtomicBoolean)
futureIO: IO[KafkaFuture[Unit]] = IO {
started.set(true)
isFutureStarted.set(true)
// We need to return the original future after calling `whenComplete`, because the future returned by
// `whenComplete` doesn't propagate cancellation back to the original future.
val future = new KafkaFutureImpl[Unit]
future.whenComplete {
case (_, _: CancellationException) => isFutureCancelled = true
case _ => ()
case (_, _: CancellationException) =>
latch.release.as(isFutureCancelled.set(true)).unsafeRunSync()
case _ => ()
}
future
}
fiber <- futureIO.cancelable_.start
_ <- IO.cede.whileM_(IO(!started.get))
_ <- IO(assert(!isFutureCancelled))
_ <- IO.cede.whileM_(IO(!isFutureStarted.get))
_ <- IO(assert(!isFutureCancelled.get))
_ <- fiber.cancel
_ <- IO(assert(isFutureCancelled))
_ <- latch.await
_ <- IO(assert(isFutureCancelled.get))
} yield ()
test.unsafeRunSync()
}
Expand Down

0 comments on commit 1bf3e58

Please sign in to comment.