Skip to content

Commit

Permalink
Experiment with IteratingCallback
Browse files Browse the repository at this point in the history
The previous semantic of `onCompleteFailure` has been renamed to `onFailure(Throwable)`, which is called immediately (but serialized) on either an abort or a failure.   A new `onCompleteFailure(Throwable)` method has been added that is called only after a `failed(throwable)` or a `abort(Throwable)` followed by `succeeded()` or `failed(Throwable)``

No usage has yet been made of the new `onCompleteFailure`, but the ICB implementation has been completely replaced by the one developed in #11876
  • Loading branch information
gregw committed Jul 16, 2024
1 parent 2b60a6d commit c2742cc
Showing 1 changed file with 91 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,57 @@ protected void onCompleted(Throwable causeOrNull)
onCompleteFailure(causeOrNull);
}

private void doOnSuccessProcessing()
{
ExceptionUtil.callAndThen(this::onSuccess, this::processing);
}

private void doCompleteSuccess()
{
onCompleted(null);
}

private void doCompleteFailure(Throwable cause)
private void doOnCompleted(Throwable cause)
{
ExceptionUtil.call(cause, this::onCompleted);
}

private void doOnFailureOnCompleted(Throwable cause)
{
ExceptionUtil.callAndThen(cause, this::onFailure, this::onCompleted);
}

private void doOnAbortedOnFailure(Throwable cause)
{
onCompleted(cause);
ExceptionUtil.callAndThen(cause, this::onAborted, this::onFailure);
}

private void doOnAbortedOnFailureOnCompleted(Throwable cause)
{
ExceptionUtil.callAndThen(cause, this::doOnAbortedOnFailure, this::onCompleted);
}

private void doOnAbortedOnFailureIfNotPendingDoCompleted(Throwable cause)
{
ExceptionUtil.callAndThen(cause, this::doOnAbortedOnFailure, this::ifNotPendingDoCompleted);
}

private void ifNotPendingDoCompleted()
{
Throwable completeFailure = null;
try (AutoLock ignored = _lock.lock())
{
_failure = _failure.getCause();

if (Objects.requireNonNull(_state) != State.PENDING)
{
// the callback completed, one way or another, so it is up to us to do the completion
completeFailure = _failure;
}
}

if (completeFailure != null)
doOnCompleted(completeFailure);
}

/**
Expand Down Expand Up @@ -298,9 +341,9 @@ private void processing()
// may happen concurrently, so state is not assumed.

boolean completeSuccess = false;
Throwable abortDoCompleteFailure = null;
Throwable completeFailure = null;
Throwable onAbort = null;
Throwable onAbortedOnFailureOnCompleted = null;
Throwable onFailureOnCompleted = null;
Throwable onAbortedOnFailureIfNotPendingDoCompleted = null;

// While we are processing
processing:
Expand Down Expand Up @@ -339,7 +382,7 @@ private void processing()
if (_aborted)
{
_state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE;
abortDoCompleteFailure = _failure;
onAbortedOnFailureOnCompleted = _failure;
break processing;
}

Expand All @@ -361,8 +404,8 @@ private void processing()
_state = State.PENDING;
if (_aborted)
{
onAbort = _failure;
_failure = new AbortingException(onAbort);
onAbortedOnFailureIfNotPendingDoCompleted = _failure;
_failure = new AbortingException(onAbortedOnFailureIfNotPendingDoCompleted);
}
break processing;
}
Expand All @@ -373,7 +416,7 @@ private void processing()
if (_aborted)
{
_state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE;
abortDoCompleteFailure = _failure;
onAbortedOnFailureOnCompleted = _failure;
}
else
{
Expand All @@ -395,24 +438,24 @@ private void processing()
if (action != Action.SCHEDULED && action != null)
{
_state = State.CLOSED;
abortDoCompleteFailure = new IllegalStateException("Action not scheduled");
onAbortedOnFailureOnCompleted = new IllegalStateException("Action not scheduled");
if (_failure == null)
{
_failure = abortDoCompleteFailure;
_failure = onAbortedOnFailureOnCompleted;
}
else
{
ExceptionUtil.addSuppressedIfNotAssociated(_failure, onAbort);
abortDoCompleteFailure = _failure;
ExceptionUtil.addSuppressedIfNotAssociated(_failure, onAbortedOnFailureIfNotPendingDoCompleted);
onAbortedOnFailureOnCompleted = _failure;
}
break processing;
}
if (_failure != null)
{
if (_aborted)
abortDoCompleteFailure = _failure;
onAbortedOnFailureOnCompleted = _failure;
else
completeFailure = _failure;
onFailureOnCompleted = _failure;
_state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE;
break processing;
}
Expand All @@ -431,14 +474,14 @@ private void processing()
onSuccess();
}
}
if (abortDoCompleteFailure != null)
ExceptionUtil.callAndThen(abortDoCompleteFailure, this::doOnAbortedOnFailure, this::doCompleteFailure);
if (onAbortedOnFailureOnCompleted != null)
doOnAbortedOnFailureOnCompleted(onAbortedOnFailureOnCompleted);
else if (completeSuccess)
doCompleteSuccess();
else if (completeFailure != null)
ExceptionUtil.callAndThen(completeFailure, this::onFailure, this::doCompleteFailure);
else if (onAbort != null)
ExceptionUtil.callAndThen(onAbort, this::doOnAbortedOnFailure, this::doAbortPendingCompletion);
else if (onFailureOnCompleted != null)
doOnFailureOnCompleted(onFailureOnCompleted);
else if (onAbortedOnFailureIfNotPendingDoCompleted != null)
doOnAbortedOnFailureIfNotPendingDoCompleted(onAbortedOnFailureIfNotPendingDoCompleted);
}

/**
Expand All @@ -457,8 +500,8 @@ else if (onAbort != null)
@Override
public final void succeeded()
{
boolean process = false;
Throwable completeFailure = null;
boolean onSuccessProcessing = false;
Throwable onCompleted = null;
try (AutoLock ignored = _lock.lock())
{
if (LOG.isDebugEnabled())
Expand All @@ -484,14 +527,14 @@ public final void succeeded()
{
// The onAborted call is complete, so we must do the completion
_state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE;
completeFailure = _failure;
onCompleted = _failure;
}
}
else
{
// No other thread is processing, so we will do the processing
_state = State.PROCESSING;
process = true;
onSuccessProcessing = true;
}
break;
}
Expand All @@ -506,13 +549,13 @@ public final void succeeded()
}
}
}
if (process)
if (onSuccessProcessing)
{
ExceptionUtil.callAndThen(this::onSuccess, this::processing);
doOnSuccessProcessing();
}
else if (completeFailure != null)
else if (onCompleted != null)
{
doCompleteFailure(completeFailure);
doOnCompleted(onCompleted);
}
}

Expand All @@ -538,8 +581,8 @@ public final void failed(Throwable cause)
{
cause = Objects.requireNonNullElseGet(cause, IOException::new);

Throwable completeFailure = null;
Throwable abortCompletion = null;
Throwable onFailureOnCompleted = null;
Throwable onCompleted = null;
try (AutoLock ignored = _lock.lock())
{
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -571,15 +614,15 @@ public final void failed(Throwable cause)
// The onAborted call is complete, so we must do the completion
ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause);
_state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE;
abortCompletion = _failure;
onCompleted = _failure;
}
}
else
{
// No other thread is processing, so we will do the processing
_state = State.COMPLETE;
_failure = cause;
completeFailure = _failure;
onFailureOnCompleted = _failure;
}
break;
}
Expand All @@ -595,10 +638,10 @@ public final void failed(Throwable cause)
}
}
}
if (completeFailure != null)
ExceptionUtil.callAndThen(completeFailure, this::onFailure, this::doCompleteFailure);
else if (abortCompletion != null)
doCompleteFailure(abortCompletion);
if (onFailureOnCompleted != null)
doOnFailureOnCompleted(onFailureOnCompleted);
else if (onCompleted != null)
doOnCompleted(onCompleted);
}

/**
Expand All @@ -612,8 +655,8 @@ else if (abortCompletion != null)
*/
public final void close()
{
Throwable onAbort = null;
Throwable onAbortDoCompleteFailure = null;
Throwable onAbortedOnFailureIfNotPendingDoCompleted = null;
Throwable onAbortOnFailureOnCompleted = null;

try (AutoLock ignored = _lock.lock())
{
Expand All @@ -626,7 +669,7 @@ public final void close()
// Nothing happening so we can abort and complete
_state = State.CLOSED;
_failure = new ClosedException();
onAbortDoCompleteFailure = _failure;
onAbortOnFailureOnCompleted = _failure;
}
case PROCESSING, PROCESSING_CALLED ->
{
Expand All @@ -645,8 +688,8 @@ public final void close()
case PENDING ->
{
// We are waiting for the callback, so we can only call onAbort and then keep waiting
onAbort = new ClosedException();
_failure = new AbortingException(onAbort);
onAbortedOnFailureIfNotPendingDoCompleted = new ClosedException();
_failure = new AbortingException(onAbortedOnFailureIfNotPendingDoCompleted);
_aborted = true;
}

Expand All @@ -663,10 +706,10 @@ public final void close()
}
}

if (onAbort != null)
ExceptionUtil.callAndThen(onAbort, this::doOnAbortedOnFailure, this::doAbortPendingCompletion);
else if (onAbortDoCompleteFailure != null)
ExceptionUtil.callAndThen(onAbortDoCompleteFailure, this::doOnAbortedOnFailure, this::doCompleteFailure);
if (onAbortedOnFailureIfNotPendingDoCompleted != null)
doOnAbortedOnFailureIfNotPendingDoCompleted(onAbortedOnFailureIfNotPendingDoCompleted);
else if (onAbortOnFailureOnCompleted != null)
doOnAbortedOnFailureOnCompleted(onAbortOnFailureOnCompleted);
}

/**
Expand Down Expand Up @@ -747,36 +790,13 @@ public final boolean abort(Throwable cause)
}

if (onAbortDoCompleteFailure)
ExceptionUtil.callAndThen(cause, this::doOnAbortedOnFailure, this::doCompleteFailure);
doOnAbortedOnFailureOnCompleted(cause);
else if (onAbort)
ExceptionUtil.callAndThen(cause, this::doOnAbortedOnFailure, this::doAbortPendingCompletion);
doOnAbortedOnFailureIfNotPendingDoCompleted(cause);

return true;
}

private void doOnAbortedOnFailure(Throwable cause)
{
ExceptionUtil.callAndThen(cause, this::onAborted, this::onFailure);
}

private void doAbortPendingCompletion()
{
Throwable doCompleteFailure = null;
try (AutoLock ignored = _lock.lock())
{
_failure = _failure.getCause();

if (Objects.requireNonNull(_state) != State.PENDING)
{
// the callback completed, one way or another, so it is up to use to do the completion
doCompleteFailure = _failure;
}
}

if (doCompleteFailure != null)
ExceptionUtil.call(doCompleteFailure, this::doCompleteFailure);
}

/**
* @return whether this callback is idle, and {@link #iterate()} needs to be called
*/
Expand Down

0 comments on commit c2742cc

Please sign in to comment.