-
Notifications
You must be signed in to change notification settings - Fork 627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consider supporting Reactive Streams and Reactive Sockets #110
Comments
Hi, Thanks for the explanation. futures-rs already has a I believe that there is a fundamental difference between how this library handles asynchronous data flow vs. existing libs that make it quite adept at handling back pressure issues. This library is built around a pull model vs. a push model. Every node in the computation graph polls from its dependencies when it is ready to process more data. This means that producers are unable to overload downstream components as they never provide more data than is ready to be processed. I would be interested in your opinion on this model as you probably have more experience using the reactive stream abstraction. (I've only used it at a surface level in the past, never for anything really extensive). |
RS is more like push-pull. I will answer in more detail, but I don't yet know how your Stream looks like, plus I gotta go now, but I will look at these. |
Thanks for the report @drewhk! As @carllerche mentioned the |
I think you are referring to the Rx model. I.e. Observables and friends. What I refrer to is called Reactive Streams (RS) and it is not a push, neither a pull model, rather, an asynchronous push-pull model. In fact, I don't really like to think about it as push or pull, as any backpressured system where concurrency is involved is basically a flow of advertisements of empty buffer space in one direction and flow of elements in the other direction, probably overlapping. I.e. send-then-wait-ack, send-and-ack-with-window, req-response, req-batch-response-with-window are all the same patterns of a closed cycle where number of elements buffered plus in-flight equals the in-flight advertised space plus elements consumed since advertisement. RS is a permit based model, so consumers advertise the number of elements they are currently willing to accept, which is in turn replied to with at most that number of elements. Rx only has the onNext signal, and if that passes through a thread-boundary then backpressure is lost as blocking the calling thread is no longer possible directly, nor is desirable on a thread-pool based system. RS was built by users of RX as a response to those limitations, so I recommend to take a look at it :) There are 3 major implementations already (JavaRX 1-2 by Netflix, Reactor by Pivotal and Akka Streams by us, Lightbend) and JDK will standardize the interfaces. The interfaces themselves are deceptively simple so if you look at them they seem like a bit "meh": https://github.com/reactive-streams/reactive-streams-jvm/tree/master/api/src/main/java/org/reactivestreams Those rules there are the result of almost 2 years of refinement with feedback from various parties, so it is pretty solid. It is also backed by a rather extensive TCK: https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck As for the linked From someone who loves streaming and Rust :) |
A somewhat shameless plug is the combinators in Akka Streams: http://doc.akka.io/docs/akka/2.4/scala/stream/stages-overview.html While the API surface in streams land is highly opinionated, I think the above link is a good start nevertheless as we are on the conservative side of adding operators, i.e. we tried to keep everything to the minimum and focus on simple extensibility instead. I.e. the only reason I linked the above page is so you can roughly see what turned out to be our minimal "essential" set to get a good initial feel, not because it is scripture in any way :) If you would ask me what is the single, most important combinator apart from the usual suspects of map, filter, fold, then my vote would go to
It has a brother |
Yes, the consumer is expected to poll in a loop, however the specific strategy of doing this is decoupled from the The default scheduling strategy is a task based park / unpark system: https://github.com/alexcrichton/futures-rs/blob/master/src/stream/channel.rs#L86-L89. The specifics are still being iterated on, but the general idea is that if a consumer calls The exact combinators for this library are still being figured out / developed, but I believe that the equivalent to |
That is fine, this is true for scala Futures as well and even Akka Streams, so this I can totally relate to :)
Does this exclude cycles (directed or undirected) to be implemented? I guess not, just curious.
This means basically that you block the pollers thread. I.e. this is a blocking scheduler. My basic issue with poll (or its dual
Well, that is not the same. You cannot (so easily) separate the two steps. Unless I am mistaken and they are implemented differently here, Futures express results of computations/results that do not impede progress of the callers thread (i.e. concurrent computations/results). The only reason why mapAsync acts as a buffer is because it is able to launch multiple functions returning Futures in one go, i.e. call a remote webservice to enrich elements of a stream, making 4 requests in parallel. The buffer is needed so if the 4th element's Future finishes first, you need to put the result somewhere until elements 1, 2, 3 complete and emitted (otherwise ordering would be violated). What you describe would be equivalent in akka to
I have a feeling that we talk past each other as I never implied that map needs a buffer :) Map can be implemented as a zero cost combinator also in a fully asynchronous push-pull model, but that is besides the point. Anyway, I see that you already had come up with ideas and an architecture you like so I don't want to sidetrack anything here, just wanted to share my experiences :) |
Note that futures-rs's |
@dwrensha Yeah -- we haven't explored buffering combinators very thoroughly, and personally I'm interested in something that can apply to an arbitrary stream, assuming we can make sense of it for our |
Ah ok, my bad, I assumed the behavior from the name without looking. Yeah that basically makes |
@drewhk note that we're still very interested to learn from any experiences you've had! There may be a bit of an impedance gap as we may not understand akka very thoroughly, but new kinds of combinators or ways to use streams is something we're always looking to explore! So far it seems like they're both achieving very similar goals and you could conceptually transition between the two systems with ease, although I could be wrong! |
This sounds like it could rather easily support |
@alexcrichton if you're not too familiar with Akka but know Java, I'd strongly suggest looking at RxJava 2.0 which is also a ReactiveSteams compliant library and touches on much of the principles discussed in this thread. |
There is also Reactor from Pivotal which is the other major implementation, also in Java. Anyway, I think the RS spec is the very first place to look if there is interest. |
Ok, if I have time today I will try to distill major design junctions I am aware of (and try to keep it clean from my RS specific experiences). |
That's incorrect. What you block is a task, which is essentially a lightweight/green thread. More broadly, futures/streams are always executed in the context of some task which is responsible for making progress by polling them. Tasks themselves are scheduled onto event loops and thread pools. So "blocking" a task just means that a worker thread is now free to work on a different task. |
Ok, that is interesting. I just looked at
I see though that you have some other means of "suspending" the caller. On the JVM we have no such thing (ok, there are macros in Scala or bytecode rewriting in general to transform the sequence into continuations) and hence every RS implementation is constructed around callbacks, which when return give a chance for the execution environment to step in and take away execution from the called entity and schedule something else. How do you do the suspension? In other words, when someone calls |
For us there's always a "task" which is driving a future. This task is persistent for the entire lifetime of a future, and the internal state of the future may transition between many different futures at that point. You can think of it sort of along the lines of one task per TCP connection (in a sense) where that task drives the entire request to completion -- reads the request, parses it, fires off database queries, renders the response, sends it. When a future decides that it needs to block, it calls the So in that sense we don't literally suspend an OS thread or anything like that, we just suspend that particular future by returning up the stack. Later on we then poll the future again and the state machine of the future will handle getting back to the same point. |
(I don't want to abuse your ticket system for discussions, so we can move this elsewhere, but I am really interested in what you do so I would be happy to continue! Just tell me where it is more appropriate.) Ok, so I think I understand now. I try to summarize my understanding:
Is this correct? |
Ah no worries! Your summary is indeed correct! The exact specifics about when/where tasks are polled (e.g. by which thread) are largely left up to the user as well, none of it's baked into this library itself. Generally, though, a future is spawned into one "executor". For example the In general, though, your summary is accurate! |
Ok, so now I can explain some ideas in terms of your model since we are now on the same page :) (Disclaimer: I am obviously biased towards what I have done before (I rewrote our execution engine 4 times already in the past 2 years and I have also seen other libraries) but I try to not bias you as much as possible since domains are different, so when I refer to Akka that is just because that is what I have as easy reference) First, all RS implementations share one property in common, namely that all of them allows to separate your streaming pipeline into potentially asynchronous segments. This means that in your terminology, different Tasks might host different segments of your pipeline. For example, a As for the
Here, the events of interest are expressed in the
The main issue with this approach was, (as the above example hints at) when you were interested in multiple events, since there was only one entry point, so you ended up decoding to more fine-grained events all the time. While the above example might feel forced, but once fan-in and fan-out stages come into the picture (multiplexing/demultiplexing from/to multiple streams if you like) plus various completion events from the different streams you have served, these patterns became the norm, not the exception. What we ended up with as a programming model, expressed in terms of your
(Please note that the above is just an analogy, it is not exactly how it works and the above would be likely a horror API in practice, I just wanted to express the ideas in terms of In general, I think the following questions are worth to consider:
Lastly, depending on your answers for the above, the following basic set of streaming operations are what I usually try out first if I try something new given that they exercise almost all the patterns I have encountered (not all of these makes sense depending on the answers above):
Sorry for the long post, but I hope it is helpful. |
I think I have a working prototype for publish based on exactly what you described @drewhk. I'll attempt to post it later today. |
There is no need to rush to any conclusions, I don't want to propose any kind of API or a particular solution, I just wanted to share some experiences. |
@drewhk Thanks so much for the detailed thoughts! One thing I did want to mention re: multiplexing and large fan-in is that we have a way to communicate to a task, on wakeup, what woke it up: The example operations at the end of your comment are super useful, thanks for those! We've thought through or implemented many of them, but we should systematically work through the list. I will say that in general, I agree that you don't want a purely push or pull model. In our world, tasks provide the main source of "initiative", by always trying to make forward progress on their underlying future. In some cases, that might involve things like sending on a channel, which can "block" the task if the other side isn't ready for data. So I think we have the needed building blocks to express a wide range of patterns of backpressure etc. I haven't thought about cyclic streams, however. Can you elaborate on that with a concrete example? |
I think what you say is similar to what I mentioned. I would not say that I will love an "epoll-style" API but I guess it does the job :)
I listed them not as much because of their usefulness (some of them are, others aren't) but they host common implementations patterns, so implementing them is a really good exercise for an engine. Btw, where can I look for code? I guess at this point my speculative approach is totally useless and it is better to just look at the code and give some advice if there is any applicable.
I probably need to wrap my head around your futures because the above sentence makes little sense from my Scala background :)
I think so. I will try to look in more detail in the code. Where can I look for the current implemented patterns/operators for streams?
On cycles or deadlocks? We have a section in our doc explaining a few deadlock scenarios: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-graphs.html#Graph_cycles__liveness_and_deadlocks |
Yeah, that's what I meant by useful :) |
Hope rust community will join this party soon. |
I am one of the developers of https://github.com/akka/akka/ and I just stumbled upon this nice library (I am a Rust lurker mostly). Futures are a very nice building block for asynchronous programs, but eventually one reaches the point where some kind of streaming abstraction is needed. The simplest streaming approach is the RX-style chained Observables, however, as we and others found out, with asynchronous call-chains backpressure becomes an issue as blocking is no longer available to throttle a producer. To solve this issue, the reactive-streams (RS) standard has been created: http://www.reactive-streams.org backed by various companies interested in the JVM landscape. This set of interoperability interfaces was designed by multiple teams together. This standard is also on its way to become part of JDK9. There is also an effort to expose its semantics as a wire-level protocol http://reactivesocket.io which nicely completements the RS standard (which mainly focuses on in-JVM asynchronous, ordered, backpressured communications).
Since I imagine that eventually the need for asynchronous streams will arise here, I think these standards can be interesting for Rust, too. While RS might not be perfect, it was a result of a long design process and now has mostly consensus about it in the JVM land, so it would be nice to see a Rust implementation that is similar enough to be easily connectable to JVMs, maybe via reactive-socket.
Sorry for the shameless plug :)
The text was updated successfully, but these errors were encountered: