Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concat array enhancement #2508

Merged
merged 3 commits into from
Jan 25, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,9 @@

package io.helidon.common.reactive;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Relay items in order from subsequent Flow.Publishers as a single Multi source.
Expand All @@ -35,31 +36,101 @@ final class MultiConcatArray<T> implements Multi<T> {
public void subscribe(Flow.Subscriber<? super T> subscriber) {
ConcatArraySubscriber<T> parent = new ConcatArraySubscriber<>(subscriber, sources);
subscriber.onSubscribe(parent);
parent.nextSource();
parent.nextSource(parent.produced);
}

static final class ConcatArraySubscriber<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {
protected static final class ConcatArraySubscriber<T>
implements Flow.Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> downstream;

private final Flow.Publisher<T>[] sources;

private final AtomicInteger wip;
private Flow.Subscription subscription;

private int index;

private long produced;
private long produced = INIT;

private volatile long requested = SEE_OTHER;
private volatile long pending = 0;
private volatile long oldRequested = 0;
private volatile Thread lastThreadCompleting;
private boolean redo;

static final long BAD = Long.MIN_VALUE;
static final long CANCEL = Long.MIN_VALUE + 1;
static final long SEE_OTHER = Long.MIN_VALUE + 2;
static final long INIT = Long.MIN_VALUE + 3;

static final VarHandle REQUESTED;
static final VarHandle PENDING;
static final VarHandle LASTTHREADCOMPLETING;

static {
try {
MethodHandles.Lookup lookup = MethodHandles.lookup();
REQUESTED = lookup.findVarHandle(ConcatArraySubscriber.class, "requested", long.class);
PENDING = lookup.findVarHandle(ConcatArraySubscriber.class, "pending", long.class);
LASTTHREADCOMPLETING = lookup
.findVarHandle(ConcatArraySubscriber.class, "lastThreadCompleting", Thread.class);
} catch (Exception e) {
throw new Error("Expected lookup to succeed", e);
}
}

ConcatArraySubscriber(Flow.Subscriber<? super T> downstream, Flow.Publisher<T>[] sources) {
this.downstream = downstream;
this.sources = sources;
this.wip = new AtomicInteger();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
super.setSubscription(subscription);
produced++; // assert: matching request(1) has been done by nextSource()
this.subscription = subscription;
long oldProduced = produced;
long oldR = oldRequested;

long p0 = pending;
if (p0 < 0 && oldR != CANCEL) {
// not entirely necessary, since BAD and CANCEL must be observed only eventually, but
// the least surprising behaviour is:
// if pending is known to be BAD or CANCEL, make sure requested does not
// appear good even temporarily
oldR = p0;
}

// assert: requested == SEE_OTHER
requested = oldR; // assume non-conforming upstream Publisher may start delivering onNext or
// onComplete concurrently upon observing a concurrent request: only use
// values read before this assignment, or
// method-locals, or atomic updates competing with request() or cancel()

if (oldR == CANCEL) {
subscription.cancel();
return;
}

if (oldR != oldProduced) {
long req = unconsumed(oldR, oldProduced);
// assert: req != CANCEL
subscription.request(req); // assert: requesting necessarily from this subscription;
// if a concurrent onComplete is fired by a non-conforming
// Publisher before this request, the request is no-op, and onComplete
// will carry over req to the next Publisher - no double accounting
// occurs;
// but if there is no concurrent onComplete, need to request
// from this subscription
// (plus trivial arithmetical proof based on commutativity of
// addition - produced may change concurrently, too, but only by
// no more than concurrent requests, and the req can be seen to be
// preserved)
}

long p = claimPending();
if (p != 0) { // all concurrent onSubscribe and requests that observe requested >= INIT attempt this
updateRequest(p);
}
}

@Override
Expand All @@ -70,38 +141,215 @@ public void onNext(T item) {

@Override
public void onError(Throwable throwable) {
REQUESTED.setOpaque(this, CANCEL);
downstream.onError(throwable);
}

@Override
public void onComplete() {
long produced = this.produced;
if (produced != 0L) {
this.produced = 0L;
super.produced(produced);
Thread current = Thread.currentThread();
if (LASTTHREADCOMPLETING.getOpaque(this) == current) {
redo = true;
return;
}

LASTTHREADCOMPLETING.setOpaque(this, current);
VarHandle.storeStoreFence();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this again, strictly speaking this fence is not needed, because we have stronger fences between the stores targeted by this fence. But no need to change the code as the cost of this fence on the target platforms is zero.

boolean sameThread;
boolean again;
do {
redo = false;
long r = (long) REQUESTED.getAndSet(this, SEE_OTHER);
subscription = null;

nextSource(r);
again = redo;
VarHandle.loadLoadFence();
sameThread = LASTTHREADCOMPLETING.getOpaque(this) == current;
} while (again && sameThread);

if (sameThread) {
LASTTHREADCOMPLETING.compareAndSet(this, current, null);
}
nextSource();
}

public void nextSource() {
if (wip.getAndIncrement() == 0) {
do {
if (index == sources.length) {
downstream.onComplete();
} else {
sources[index++].subscribe(this);
}
} while (wip.decrementAndGet() != 0);
protected void nextSource(long r) {
// assert: requested == SEE_OTHER
if (r == CANCEL) {
return;
}

if (index == sources.length) {
// assert: no onSubscribe in the future, so no need to preserve oldRequested
downstream.onComplete();
return;
}

Flow.Publisher<T> nextPub = sources[index++];

oldRequested = r < INIT || r == Long.MAX_VALUE ? r : r + 1;

nextPub.subscribe(this);
}

protected static long unconsumed(long req, long produced) {
// assert: all invocations of unconsumed ensure req > produced, or
// req represents a final state, where produced does not matter -
// MAX_VALUE, BAD, CANCEL

if (req >= INIT && req < Long.MAX_VALUE) {
if (produced < 0 && Long.MAX_VALUE + produced < req) {
// assert: if produced < 0, then MAX_VALUE + produced does not overflow
req = Long.MAX_VALUE;
} else {
// assert: if produced >= 0, then req-produced does not overflow (req > produced)
req -= produced;
}

// assert: req > 0
}

return req;
}

@Override
public void request(long n) {
if (n <= 0) {
downstream.onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden"));
} else {
super.request(n);
updateRequest(n <= 0 ? BAD : n);
}

/*
* If requested is in a state where update is possible, and there is anything accumulated in
* the pending counter, try to claim it. If the caller observes a non-zero return value, they
* must updateRequest with that value.
*/
private long claimPending() {
long p;
do {
p = pending;
if (p == 0) {
return 0;

}

long r = requested;
if (r < INIT && !(r == BAD && p == CANCEL)) {
// assert: updating requested is needed:
//
// BAD | if p == CANCEL
// CANCEL | no
// SEE_OTHER | no
// >= INIT | if p != 0
return 0;
}
} while (!PENDING.compareAndSet(this, p, p < 0 ? p : 0));

return p;
}

private long updatePending(long n) {
long req;
long nextReq;
do {
req = pending;
if (req < 0 && !(req == BAD && n == CANCEL)) {
// assert: updating pending is needed:
//
// BAD | if n == CANCEL
// CANCEL | no
// >= 0 | yes
break;
}

nextReq = n < 0 ? n
// assert: n >= 0
: Long.MAX_VALUE - n <= req ? Long.MAX_VALUE : req + n;
} while (!PENDING.compareAndSet(this, req, nextReq));

return claimPending();
}

private void updateRequest(long n) {
Flow.Subscription sub;
long req;
long nextReq;
do {
req = requested;
while (req < INIT && !(req == BAD && n == CANCEL)) {
// assert: updating requested is needed:
//
// BAD | if n == CANCEL
// CANCEL | no - terminal state
// SEE_OTHER | no - keep track of n in pending
// >= INIT | yes

if (req != SEE_OTHER) {
return;
}
n = updatePending(n);
if (n == 0) { // assert: requested is in a terminal state, or there is a claimPending()
// now or in the future that will propagate pending to requested and
// the actual Publisher
return;
}

req = requested;
}

sub = subscription;
nextReq = n < INIT ? n
// assert: n >= 0
: Long.MAX_VALUE - n <= req ? Long.MAX_VALUE : req + n;
} while (!REQUESTED.compareAndSet(this, req, nextReq));

if (nextReq < INIT) {
// assert: good requests should be delivered once and only once to ensure
// no double-accounting happens - so we only
// attempt delivering to subscription seen before updating requested, and
// mutual exclusion between accesses to subscription.request() from
// request() and onSubscribe() is enforced.
// When MAX_VALUE is reached, good requests do not need delivering: concurrent
// request() may miss an update to subscription, and attempt to deliver to an
// old subscription, as it will not be
// able to observe new subscriptions (new values of requested), but good requests
// do not need delivering in this case

// assert: cancellations and bad requests can be delivered more than once - no
// double accounting
// occurs, and only one onError will be delivered by upstream Publisher. For
// this reason can read subscription as it appears after updating requested -
// this may result in both onSubscribe() and concurrent request() to call
// subscription.request, but this is ok for a bad request
// What we do not want to happen, is for bad request to be delivered to an old
// subscription, the update of which concurrent request() cannot detect after
// requested reaches MAX_VALUE - so, should read subscription after updating
// requested
sub = subscription;

// assert: subscription may be null, if requested was updated before it was set
// to SEE_OTHER by onComplete, but before subscription is set again by
// onSubscribe; consequently, if it is null, then there is onSubscribe in the
// future that will observe the update of requested and signal appropriately
if (sub != null) {
if (nextReq == CANCEL) {
sub.cancel();
} else {
sub.request(BAD);
}
}
return;
}

// assert: nextReq == req, if req == MAX_VALUE - nothing needs doing
if (nextReq != req) {
// assert: sub is not null, because when req != MAX_VALUE the change of subscription
// cannot be missed
sub.request(nextReq - req);
}
}

@Override
public void cancel() {
updateRequest(CANCEL);
}
}
}