-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Faster groupWithin implementation #2327
Conversation
137ba20
to
46a3ebe
Compare
@DGolubets Good evening, Dmitry! Thanks for the PR, we are looking through it. 🙂 Could you add to the PR the benchmarks you used to get your results? It would help us confirm the results, and FS2 would also benefit from more benchmark coverage. |
If not too much of a problem, could this be added as a separate PR? I think it has independent value (and for example I want to see if Queue vs Channel makes a difference to the current implementation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few small code comments.
Quick warning, it's going to take me a long time to review this PR due to the complexity of the combinator :) |
By the way, I think you have hit a deadlock in two of the CI runs. I'm going to try restarting it to see if it's an accident but unfortunately I don't think so, because it's happened on two separate runs (2.12, 3.0RC1) |
Thanks for the feedback guys! I'll try to address it tomorrow.
I noticed a problem with one test under JS target. |
It seems that under JS target the cancellation of So the workaround is to always cancel what I acquire in race (essentially it's like "waitForAvailableN" if there was such a method):
But I need to make sure |
b11e761
to
ec13eb5
Compare
} | ||
|
||
fs2.Stream | ||
.eval(enqueueAsync) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this gets cancelled, you won't interrupt the producer stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't that work: endDemand(ending) *> upstream.cancel
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the stream gets canceled during eval
, you won't reach that point. The concurrently
combinator covers this general use case for you. (there is a general meta point here about perf vs correctness, I'm finding it fairly hard to fully reason about the correctness of this implementation, even though I believe it's important enough to justify high performance)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps you can move the onFinalize
one level up though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bracketCase
should do the trick I think
ec13eb5
to
38610e6
Compare
This ready for merge @diesalbla @SystemFw ? |
I think it can be merged, although the logic is rather tricky, so if you wanted to review it @mpilquist , I think a further set of eyes could be useful. If you don't have time, I think we can merge, on the balance. |
The existing test coverage for |
acquireSupplyUpToNWithin(n.toLong).flatMap { n => | ||
buffer | ||
.modify(_.splitAt(n.toInt)) | ||
.flatMap { buf => | ||
demand.releaseN(buf.data.size.toLong).flatMap { _ => | ||
buf.endOfSupply match { | ||
case Some(Left(error)) => | ||
F.raiseError(error) | ||
case Some(Right(_)) if buf.data.isEmpty => | ||
F.pure(None) | ||
case _ => | ||
F.pure(Some(buf.data)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a for comprehension here as well?
acquireSupplyUpToNWithin(n.toLong).flatMap { n => | |
buffer | |
.modify(_.splitAt(n.toInt)) | |
.flatMap { buf => | |
demand.releaseN(buf.data.size.toLong).flatMap { _ => | |
buf.endOfSupply match { | |
case Some(Left(error)) => | |
F.raiseError(error) | |
case Some(Right(_)) if buf.data.isEmpty => | |
F.pure(None) | |
case _ => | |
F.pure(Some(buf.data)) | |
} | |
} | |
for { | |
n <- acquireSupplyUpToNWithin(n.toLong) | |
buf <- buffer.modify(_.splitAt(n.toInt)) | |
_ <- demand.releaseN(buf.data.size.toLong) | |
beos = buf.endOfSupply | |
_ <- beos.traverse(F.fromEither) // Raise errors from Some(Left(_)) case | |
} yield beos match { | |
case Some(Right(_)) if buf.data.isEmpty => None | |
case _ => Some(buf.data) | |
} |
@DGolubets Not a blocker, merge if you like it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@diesalbla I don't think I can: I only have an option to close or comment.
I've tried making
groupWithin
as fast as I could.I would appreciate a thorough review and any feedback, cos it's my first time writing fs2 stream operator and this particular one is quite complex.
The reason I want it to be faster is because I see
groupWithin
as one of the most important streaming operators.It allows me to choose an acceptable latency for a stream whilst still keeping high throughput by batching.
E.g. if I have night spikes of data but very few to none records during a day, I don't won't a couple unlucky records stuck in buffer, and
groupWithin
helps me with it.I did simple benchmarks on my machine and I see this:
groupWithin
limits any stream to ~34k per secondThis is x6 improvement.
Maybe some edge cases are not handled. But I think it's worth looking at cos the gain is huge.
Edit:
the benchmarks I did were with 2.5.x.
I tried v3 and
groupWithin
implementation there is even slower.