-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
StreamRefs #3321
StreamRefs #3321
Conversation
@Horusiath anything in particular ? |
@marcpiechura I think that there were quite big changes in a way how stage actors are created. From what I've seen, it's no longer |
@Horusiath I think it’s this PR akka/akka#19487 but it’s quite old, not sure why we haven’t implemented it already |
8873fd0
to
5493130
Compare
If this CI will pass, this is ready for review and merge. I haven't implemented exception stack trace propagation over the wire, but it can be added even after the release. |
@Horusiath looks like you forgot to set the config parameter in the reference configuration for the tests ?!
|
@marcpiechura for some reason, when no config is provided, a |
@Horusiath One spec is still failing StreamRefsSpec.SinkRef_must_receive_hundreds_of_elements_via_remoting |
It's racy behavior, working on it. |
docs/articles/streams/streamrefs.md
Outdated
|
||
The process of preparing and running a `ISourceRef<T>` powered distributed stream is shown by the animation below: | ||
|
||
![]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing link to the animations
docs/articles/streams/streamrefs.md
Outdated
|
||
The process of preparing and running a `ISinkRef<>` powered distributed stream is shown by the animation below: | ||
|
||
![]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above
{ | ||
public CumulativeDemand(long seqNr) | ||
{ | ||
if (seqNr < 0) throw ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be <=
private readonly EnumerableActorName sourceRefStageNames = new EnumerableActorNameImpl("SourceRef", new AtomicCounterLong(0L)); | ||
private readonly EnumerableActorName sinkRefStageNames = new EnumerableActorNameImpl("SinkRef", new AtomicCounterLong(0L)); | ||
|
||
public StreamRefsMaster(ExtendedActorSystem system) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the parameter be removed since it’s not stored ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to keep the convention for extensions here.
internal sealed class SourceRefImpl<T> : SourceRefImpl, ISourceRef<T> | ||
{ | ||
public SourceRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) { } | ||
public override Type EventType => typeof(T); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn’t be a single assignment better ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used only for deserialization (usually once per instance per node). I don't want to allocate underlying field for it, as Hyperion/Json.NET are able to provide generic type param directly from SourceRefImpl<T>
's T
parameter, so the payload will be smaller.
internal sealed class SinkRefImpl<T> : SinkRefImpl, ISinkRef<T> | ||
{ | ||
public SinkRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) { } | ||
public override Type EventType => typeof(T); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above
this.SetHandler(_stage.Inlet, | ||
onPush: OnPush, | ||
onUpstreamFinish: OnUpstreamFinish, | ||
onUpstreamFailure: OnUpstreamFailure); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement IInHandler and use SetHandler(_stage.Inlet, this)
_promise = promise; | ||
_inheritedAttributes = inheritedAttributes; | ||
|
||
SetHandler(_stage.Outlet, onPull: OnPull); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement IOutHandler
|
||
public void OnDownstreamFinish() | ||
{ | ||
/* IOutHandler impl */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to call CompleteStage
This is work in progress over porting StreamRefs feature to .NET.
TODOs:
This PR may require akka.streams 2.5 to be merged first (cc: @marcpiechura).