Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stuck partitions - question about the pause/resume logic #1230

Open
simonpetty opened this issue Aug 10, 2023 · 23 comments
Open

Stuck partitions - question about the pause/resume logic #1230

simonpetty opened this issue Aug 10, 2023 · 23 comments

Comments

@simonpetty
Copy link

simonpetty commented Aug 10, 2023

Hey.

We have a topic with 50 partitions, with consumers scaled out to a relatively small number of hosts (3). Sporadically, we get a situation where one particular partition gets stuck and does not commit any new offsets. As soon as we restart the hosts, it kicks back into life and immediately catches up.

We see this in the logs very frequently after it becomes stuck:

Skipping fetching records for assigned partition <mypartition> because it is paused

The last couple of FS2 Kafka logs mentioning the stuck partition, before it was stuck, say this:

Completed fetches with records for partitions [ <mypartition> -> { first: 45677489, last: 45677495 }, .... ]

Followed by logs that never mention the partition again:

Current state [State(fetches = Map(... a number of other partitions, but not <mypartition> ... ), ...)]

In trying to figure out what's going on, I ended up looking at this bit of the code:

def pollConsumer(state: State[F, K, V]): F[ConsumerRecords] =
      withConsumer
        .blocking { consumer =>
          val assigned = consumer.assignment.toSet
          val requested = state.fetches.keySetStrict
          val available = state.records.keySetStrict

          val resume = (requested intersect assigned) diff available
          val pause = assigned diff resume

          if (pause.nonEmpty)
            consumer.pause(pause.asJava)

          if (resume.nonEmpty)
            consumer.resume(resume.asJava)

          consumer.poll(pollTimeout)
        }
        .flatMap(records)

Filling in all the possible combinations, I think we get these possible outcomes:

Screenshot 2023-08-10 at 09 01 54

I've highlighted the state that I think we're in, and, given that the java kafka library is saying it will skip fetching for paused partitions, I'm a little confused how the partition is then expected to resume.

If I pull down the FS2 Kafka codebase, and replace the intersect with union the KafkaConsumerSpec tests still pass, and it would cause our highlighted scenario to be resumed (i think), but I don't know what the consequences of that would be!

It feels like this should be a more widespread issue given how central this code is, and how long it's been like this for, so I bet I'm missing something.

Thanks

@jarrodcodes
Copy link

Just curious because we are having a similar issue, which version of fs2-kafka are you on? We are on 2.6.1, wanted to know if this was also happening with 3.x

@tpalmer99
Copy link

Just curious because we are having a similar issue, which version of fs2-kafka are you on? We are on 2.6.1, wanted to know if this was also happening with 3.x

Just witnessed the issue on v3.0.1

@jarrodcodes
Copy link

Just curious because we are having a similar issue, which version of fs2-kafka are you on? We are on 2.6.1, wanted to know if this was also happening with 3.x

Just witnessed the issue on v3.0.1

Thanks for the reply. That's unfortunate. I was hoping upgrading would be sufficient.

@biochimia
Copy link
Contributor

We operate an internal service that runs a few thousand streams and we have also experienced the "lost partition" issue. The issue seems to be a race condition, and happens reliably for topics with higher volume of traffic: we see a couple of "partition lost" events every day in topics with higher volume of traffic. We haven't so far identified the source of this race condition.

After spending some time trying to identify the root cause, we settled on a workaround where we monitor consumer offset lag for each stream, and recreate/restart consumers that get stuck for a few minutes. This worked out well in practice, and allowed us to ignore this issue for a while.

Mainly triggered by the reporting of the current issue, as I was trying to gather some data to share, and getting ready to dive once more in search for the race condition, a colleague pointed to some comments (I don't have a reference handy) that relate the lost partition issue to the use of Stream.groupWithin method from fs2.

Are you using groupWithin in your stream as a mechanism to chunk and aggregate messages based on time?

We ran some experiments in our staging environment, removing groupWithin from a stream that would usually trigger the lost partition issue, and we have not seen it happen after a couple of days, so it is looking like a good candidate to look into for the race condition. I still need to spend some time reading the groupWithin code more closely, to be able to say this more definitely and/or propose a fix.

From our side, we already have a stream that mixes clock ticks and message chunks so we're looking at replacing groupWithin altogether with a simpler implementation based on the ticks. For our use case, it'll be cheaper to do this based on a shared tick stream, as we have, versus individual timeout decisions.

@jarrodcodes
Copy link

Thank you for the reply. Unfortunately no use of groupWithin here.

@biochimia
Copy link
Contributor

Do you use commitBatchWithin? It is also based on groupWithin.

@jarrodcodes
Copy link

Yes! I will look into this, thank you for the lead

@simonpetty
Copy link
Author

simonpetty commented Aug 30, 2023

Thanks for the hint @biochimia.

Since we seemed to have encountered this issue when moving from FS2 Kafka 2 to 3, I've just been doing some sleuthing. This change was made to groupWithin a couple of years ago. Before that was this issue which has an interesting comment from @SystemFw

groupWithin is push based with back pressure of 1, so very close to being purely pull based but not quite

I’m not convinced I understand how behaviour around commits would prevent fetching new records though, my naive model has them as entirely separate "threads", and committing offsets just minimizes the amount of records you re-process when starting up a stream again - you could in theory run a stream for hours/days not making any commits.

Anyway… I was thinking about a similar workaround that involves these steps:

  1. Detect a partition stream that is idle (this post provides an interesting solution: https://medium.com/@ivovk/fs2-streams-detecting-stream-idleness-aec12af06936)
  2. See if there is lag on that partition (can use kafkaConsumer.committed, but also requires the broker offset - via the Admin client?)
  3. If there isn't any lag, keep going and restart the timer basically (thinking this could be fairly long like 30 mins)
  4. Otherwise, restart the stream - at the moment i'm guessing the simplest thing here would be to restart the parent stream (and restart all partitions on this host).

@biochimia
Copy link
Contributor

Regarding fs2.Stream.groupWithin:

Tangentially, the release notes for cats-effect 3.5.0 mention an interface/behaviour change where the legacy behaviour could "result in an effect which could be canceled unconditionally, without the invocation of any finalizer". There's no smoking gun or indication that this addresses the issues we have seen, but somehow it seems like it could be related. We're looking into testing this too, but for us it involves updates of various dependencies across various components. https://github.com/typelevel/cats-effect/releases/tag/v3.5.0

@biochimia
Copy link
Contributor

  1. About detecting idleness, our approach has been to merge a stream of clock ticks, so in the end we have a stream of TickOrChunk and we can alternate time-based actions and message processing in the same stream. This also gives us a point to flush intermediate buffers—not entirely unlike groupWithin.

I don't think we tried the stream.pull.timed approach, but it also looks interesting.

  1. To check for lag we're using consumer.endOffsets, directly; this also based on our observations that the consumer remains operational when we experience lost partitions (e.g., no rebalances triggered, and other consumers do not take over the partition). We then compare the end offsets against our last processed offsets and a timestamp associated with the processing. You can get fancy on the rules here, like checking that the consumer has failed to make progress for a while before deciding to restart it.
  2. Our checks run every 15 minutes, they might only restart a stuck partition after 30 minutes or so.
  3. On restarting the stream, we take the approach of recreating everything from the consumer up through the stream. We do this by raising an error that is only caught at the point where we instantiate the stream in the first place. We have some machinery around this to give us a graceful exit path when we do want to shutdown the application.

@biochimia
Copy link
Contributor

Unfortunately, our current workaround is getting entangled in the problem as we are seeing some errored streams getting stuck and not being able to recover. This may be more related to the cancelation/finaliser changes in cats-effect 3.5, and changes being made by various libraries to adapt to it...

Anyway, at this point I'm happy to share the information we have in case it helps someone make a connection.

@simonpetty
Copy link
Author

2. consumer.endOffsets

Thanks. Wasn't aware of this one.

@biochimia
Copy link
Contributor

An update from our side:

  • we have updated our application to the latest versions of cats-effect stack: cats 2.10.0, cats-effect 3.5.1, fs2 3.8.0, and fs2-kafka 3.0.1, among other packages. (I'm also aware fs2 just released a couple of updates, at least the release notes for those don't suggest an impact here)
  • we have removed our use of groupWithin in favour of our custom implementation that makes use of a tick-stream (which we also use for other purposes)

Unfortunately, we still see dropped partitions with the updated stack. The only good news is that we seem to be reliably restarting those again, so the updated dependencies seem to have addressed the issue where our restart mechanism would get itself stuck. This could be related to the fix in typelevel/cats-effect#3444.

I can't rule out that there are other bugs in our code and in the stack, but perhaps the main trigger here could be CPU starvation or a similar bad behaviour by our application such as "expensive" tasks that work against the cats-effect scheduler.

At this moment, I no longer have an indication about a specific construct being the trigger for this issue.

@simonpetty
Copy link
Author

Update from us. We tried implementing the timed pull approach (described here: https://medium.com/@ivovk/fs2-streams-detecting-stream-idleness-aec12af06936).

Unfortunately, this didn't work. It appears that when the partition becomes "stuck" the stream stops being pulled (as our timer never fires again).

@simonpetty
Copy link
Author

Hey. Just as an update, we've not seen stuck partitions for a month now. We have since updated various libraries (Cats, FS2, FS2-Kafka) so our assumption is one of these fixed the issue - or it was due to some issue at the broker side which has since been resolved.

@jarrodcodes
Copy link

That's awesome to hear. Just curious, which version of Kafka is your broker on? Would love to be able to compare once we do some upgrades.

@simonpetty
Copy link
Author

Just to say, we've had reoccurrences of this. Luckily, they're rare, and mostly in non-prod environments, but still. @jarrodcodes our Kafka broker version is 3.4 (soon to be 3.5 apparently, so will be interested to see if that changes anything)

@kareblak
Copy link

kareblak commented Jan 7, 2024

I'm having the same issue. Chunks pulled from kafka reports as size 0 after some time. It is not occuring in our test env with 8 partitions/4 consumers, but is happening quite frequently in production on 24 partitions/4 consumer replicas. Metrics are reported downstream in forked threads (based on partitionsMapStream), but pulled chunks are as I've said empty. I'm on broker 3.3 and fs2-kafka 3.2.0.

@aartigao
Copy link
Contributor

I've read the whole thread and I must say that this is really unfortunate 😞 I mean the fact that you folks have been workaround this by restarting the consumers...

I'd like to investigate this bug in depth. Just to be sure, these partitions stuck happen without a rebalance, am I right? If that's the case, it will be easier to trace the issue 🙏🏽

@simonpetty
Copy link
Author

simonpetty commented Apr 19, 2024

Thanks @aartigao, in our case this mostly happens in test environments at some point after a deployment (we've actually moved to using sticky partition assignor to mitigate other issues, but doesn't seem to prevent this issue).

What we see happening is some time after a deployment, when some messages finally start arriving, the lag starts going up - and never goes down. Usually only for one of the partitions (we're consuming from one topic with 50 partitions), sometimes more.

As described above if we simply restart the services it comes back to life.

@mihaisoloi
Copy link

I've been experiencing this issue and tried everything described in this thread, but nothing helped, albeit I had a consumer that was subscribed to to 27 topics with 18 partitions each, so 486 partitions in total. The consumer was not consuming within a few hours of restarting. I even set up a Keda autoscaler to restart the services, but it was getting ridiculous when you have many of them. Fortunately I kept reading through the fs2.kafka codebase and came across this https://github.com/fd4s/fs2-kafka/blob/series/3.x/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsumeChunk.scala using it I've not experienced a paused partition for the past week 🤞

I know it's not going to be applicable in all cases but it really works, plus it seems to have increased the consumption rate by at least 5 times, we're consuming 100k events per minute vs around 20k prior to using this API.

@mkharytonau
Copy link

@mihaisoloi hi, thanks for posting this, did you previously used .commitBatchWithin (which is using .groupWithin)?)

@mihaisoloi
Copy link

yes, we were using .commitBatchWithin

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

8 participants