Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Blocking to use Sync#interruptible, not blocking #1126

Merged
merged 2 commits into from
Apr 2, 2024

Conversation

bastewart
Copy link
Contributor

@bastewart bastewart commented Dec 14, 2022

This is to avoid being unable to cancel code that is inside blocking section. At the moment it is possible for user-code to not cancel immediately, in which case it will block until an internal Kafka max-block period is reached (by default 1 minute).

Changing to use interruptible means the user-code returns immediately on cancellation, and the underlying operation in Kafka should be cancelled.

As a concrete example, at the moment producing to a topic which does not exist blocks until a minute has passed, even if the effect is cancelled. For example:

type K
type V

val producer: TransactionalKafkaProducer.WithoutOffsets[IO, Option[K], Option[V]] = ???

producer
  .produceWithoutOffsets(
    ProducerRecords.one(ProducerRecord(topic = "does_not_exist", key = None, value = None))
  )
  .timeout(1.second)
  .unsafeRunSync()

Changing to use interruptible means it completes after 1 second with the timeout exception.

This is to avoid being unable to cancel code that is inside blocking
section. At the moment it is possible for user-code to not cancel
immediately, in which case it will block until an internal Kafka
max-block period is reached (by default 1 minute).

Changing to use `interruptible` means the user-code returns immediately
on cancellation, and the underlying operation in Kafka should be
cancelled.

As a concrete example, at the moment producing to a topic which does not
exist blocks until a minute has passed, even if the effect is cancelled.
For example:

```scala
type K
type V

val producer: TransactionalKafkaProducer.WithoutOffsets[IO, Option[K], Option[V]] = ???

producer
  .produceWithoutOffsets(
    ProducerRecords.one(ProducerRecord(topic = "does_not_exist", key = None, value = None))
  )
  .timeout(1.second)
  .unsafeRunSync()
```

Changing to use `interruptible` means it completes after 1 second with
the timeout exception.
@bastewart
Copy link
Contributor Author

bastewart commented Dec 14, 2022

Full disclosure: I am not entirely sure that the underlying request to produce to Kafka is actually cancelled, or if it sits there waiting for the internal timeout still!

@bastewart
Copy link
Contributor Author

I also haven't run the test suite locally yet, so sorry if there is a failure 🤦

@bastewart
Copy link
Contributor Author

Full disclosure: I am not entirely sure that the underlying request to produce to Kafka is actually cancelled, or if it sits there waiting for the internal timeout still!

I'll see if I can manually verify if this is the case. Otherwise we could get a regression in #888

@bastewart
Copy link
Contributor Author

bastewart commented Dec 14, 2022

Yeah, Kafka does obey the thread interrupt.

I added some extra log lines in the below block to show our initial outcome and then also what the outcome of the operation was (committing transaction, or aborting transaction). As expected with this change we find we were cancelled, and the subsequent blocking(producer.abortTransaction()) call succeeds.

case (_, Outcome.Succeeded(_)) =>
blocking(producer.commitTransaction())
case (_, Outcome.Canceled() | Outcome.Errored(_)) =>
blocking(producer.abortTransaction())

I also added some logging when the promise completes, and it never does. So the producer.send call is not completing (as we'd hope). I've chased the code in the underlying producer here and it looks to obey cancellation as a second, subjective, confirmation.

record =>
asJavaRecord(keySerializer, valueSerializer, record).flatMap { javaRecord =>
F.delay(Promise[(ProducerRecord[K, V], RecordMetadata)]()).flatMap { promise =>
blocking {
producer.send(
javaRecord, { (metadata, exception) =>
if (exception == null)
promise.success((record, metadata))
else promise.failure(exception)
}
)
}.as(F.fromFuture(F.delay(promise.future)))
}
}

I have noticed a possible ordering issue improvement in this file now as well, I'll open a second PR with an explanation.

@bplommer
Copy link
Member

Thanks for this! Sorry it's taken me three months to get to this, so thanks for your patience - this looks like a good improvement but do you think it's possible to add an automated test to verify what you've manually confirmed?

@bastewart
Copy link
Contributor Author

bastewart commented Mar 24, 2023

Thanks for this! Sorry it's taken me three months to get to this, so thanks for your patience - this looks like a good improvement but do you think it's possible to add an automated test to verify what you've manually confirmed?

No worries!

Today isn't the best day (bit busy!), but I'll see what I can do next week 👍

@aartigao
Copy link
Contributor

Thanks for this! Sorry it's taken me three months to get to this, so thanks for your patience - this looks like a good improvement but do you think it's possible to add an automated test to verify what you've manually confirmed?

No worries!

Today isn't the best day (bit busy!), but I'll see what I can do next week 👍

Hey @bastewart do you feel you have some time for targeting series/3.x and create that automated test? 🙏🏽

@aartigao aartigao changed the base branch from series/2.x to series/3.x February 17, 2024 05:50
@aartigao aartigao merged commit f6784a9 into fd4s:series/3.x Apr 2, 2024
atnoya added a commit to atnoya/fs2-kafka that referenced this pull request Apr 24, 2024
aartigao added a commit that referenced this pull request Apr 25, 2024
Co-authored-by: Alan Artigao Carreño <alanartigao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants