Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment for aborting callbacks #11876

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
eb1c9a1
Experiment for aborting callbacks
gregw Jun 4, 2024
860fbd0
Experiment for aborting callbacks
gregw Jun 5, 2024
f4955be
Experiment for aborting callbacks
gregw Jun 5, 2024
0ec0052
add test for legacy wrapping callbacks
lorban Jun 5, 2024
dd4fa07
add test fix in comment
lorban Jun 5, 2024
4661e3c
Experiment for aborting callbacks
gregw Jun 6, 2024
abcc33c
Experiment for aborting callbacks
gregw Jun 6, 2024
bcd888f
Experiment for aborting callbacks
gregw Jun 6, 2024
a80fbc8
Experiment for aborting callbacks
gregw Jun 6, 2024
ad20143
Experiment for aborting callbacks
gregw Jun 6, 2024
46c6b2b
minimal changes to fix IllegalArgumentException in SocketChannel.write()
lorban Jun 6, 2024
81b4227
Experiment for aborting callbacks
gregw Jun 7, 2024
7b54c74
updates from review
gregw Jun 11, 2024
7857422
updates from review
gregw Jun 13, 2024
31fd432
Merge remote-tracking branch 'origin/jetty-12.0.x' into experiment/je…
gregw Jun 13, 2024
01846b9
updates from review
gregw Jun 13, 2024
304aee1
WIP updates from review
gregw Jun 15, 2024
58c3e30
Merge remote-tracking branch 'origin/jetty-12.0.x' into experiment/je…
gregw Jun 15, 2024
dce7956
WIP updates from review
gregw Jun 17, 2024
7099526
WIP updates from review
gregw Jun 17, 2024
10a7a8a
WIP updates from review
gregw Jun 17, 2024
c0beb52
WIP updates from review
gregw Jun 17, 2024
6964fe6
WIP updates from review
gregw Jun 18, 2024
879c341
Calling super.onCompleted() in Callback.Nested subclasses.
sbordet Jun 18, 2024
7798d69
Added TODOs in relevant places where abort needs to be handled.
sbordet Jun 18, 2024
881a450
WIP updates from review
gregw Jun 18, 2024
5da31e7
Merge branch 'jetty-12.0.x' into experiment/jetty-12.0.x/11854/abortC…
gregw Jun 18, 2024
841fefb
WIP updates from review
gregw Jun 18, 2024
c6e3f09
fixed from tests
gregw Jun 18, 2024
4b5d449
fixed tests
gregw Jun 19, 2024
51d6984
fixed Http2Flusher
gregw Jun 19, 2024
1f72dcb
fixed Http2Flusher
gregw Jun 19, 2024
73f1177
WIP
gregw Jun 19, 2024
f1f396c
Temporary fixes for aborting HttpSender.ContentSender
gregw Jun 19, 2024
ca4356e
Better javadoc in ICB
gregw Jun 19, 2024
5cb8f8b
Better javadoc in ICB
gregw Jun 19, 2024
961393e
Fixed ConnectHandler ICB usage
gregw Jun 19, 2024
86b1989
merged from 12.0.x
gregw Jun 24, 2024
bfb91ac
Merge branch 'jetty-12.0.x' into experiment/jetty-12.0.x/11854/abortC…
gregw Jun 25, 2024
1810573
Merge remote-tracking branch 'origin/jetty-12.0.x' into experiment/je…
gregw Jun 27, 2024
0ffd645
Merge remote-tracking branch 'origin/jetty-12.0.x' into experiment/je…
gregw Jun 27, 2024
6b979fd
Merge remote-tracking branch 'origin/jetty-12.0.x' into experiment/je…
gregw Jul 15, 2024
c16059b
Fix ISE handling
gregw Jul 15, 2024
e546948
added onFailure
gregw Jul 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,10 @@ protected Action process() throws Throwable
@Override
protected void onSuccess()
{
// TODO This logic should be moved to process() and/or onCompleteSuccess()
// Anything executed here is not mutually excluded from other threads in process() and/or onCompleteSuccess() and/or onCompleteFailure()
// nor is there a memory barrier.
// So, for example, two threads might try to release and null the chunk field at the same time.
boolean proceed = true;
if (committed)
{
Expand Down Expand Up @@ -590,6 +594,17 @@ else if (expect100)
}
}

@Override
protected void onAborted(Throwable cause)
{
// TODO Review this
internalAbort(exchange, cause);
Promise<Boolean> promise = abort;
abort = null;
if (promise != null)
promise.succeeded(true);
}

@Override
protected void onCompleteFailure(Throwable x)
{
Expand All @@ -603,6 +618,7 @@ protected void onCompleteFailure(Throwable x)
internalAbort(exchange, x);

Promise<Boolean> promise = abort;
abort = null;
if (promise != null)
promise.succeeded(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,7 @@ public String toString()
);
}

public abstract static class Entry extends Callback.Nested
public abstract static class Entry extends Callback.Wrapper
{
protected final Frame frame;
protected final HTTP2Stream stream;
Expand Down Expand Up @@ -2009,7 +2009,14 @@ private void onWriteFailure(Throwable x)

private void sendGoAwayAndTerminate(GoAwayFrame frame, GoAwayFrame eventFrame)
{
sendGoAway(frame, Callback.from(() -> terminate(eventFrame)));
sendGoAway(frame, Callback.from(
t ->
{
terminate(eventFrame);
return true;
},
() -> terminate(eventFrame),
t -> terminate(eventFrame)));
}

private void sendGoAway(GoAwayFrame frame, Callback callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,27 @@ protected void onCompleteSuccess()
throw new IllegalStateException();
}

@Override
protected void onAborted(Throwable cause)
{
Set<HTTP2Session.Entry> abortEntries;
Set<HTTP2Session.Entry> failEntries;
try (AutoLock ignored = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("aborted {} {}", cause, this);

// We can fail non-pending entries, as they are not being written yet
failEntries = new HashSet<>(entries);
entries.clear();

// We can only abort pending entries
abortEntries = new HashSet<>(pendingEntries);
}
abortEntries.forEach(entry -> entry.abort(cause));
failEntries.forEach(entry -> entry.failed(cause));
}

@Override
protected void onCompleteFailure(Throwable x)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public void failed(Throwable x)
_stream.reset(new ResetFrame(_stream.getId(), errorCode.code), Callback.NOOP);
}

private class SendTrailers extends Callback.Nested
private class SendTrailers extends Callback.Wrapper
{
private final HttpFields trailers;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ else if (BufferUtil.hasContent(currentBuffer))
{
if (LOG.isDebugEnabled())
LOG.debug("flushing aggregated buffer {}", aggregatedBuffer);
_flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release))
_flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Wrapper(Callback.from(aggregatedBuffer::release))
{
@Override
public void succeeded()
Expand Down Expand Up @@ -200,7 +200,7 @@ else if (flush)

if (BufferUtil.hasContent(currentBuffer))
{
_flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release))
_flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Wrapper(Callback.from(aggregatedBuffer::release))
{
@Override
public void succeeded()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ public void succeeded()
getWrapped().succeeded();
}

@Override
public boolean abort(Throwable cause)
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
return getWrapped().abort(cause);
}

@Override
public void failed(Throwable x)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,11 @@ private class ProxyIteratingCallback extends IteratingCallback
@Override
protected Action process()
{
buffer = bufferPool.acquire(getInputBufferSize(), true);
if (buffer == null)
buffer = bufferPool.acquire(getInputBufferSize(), true);
else
buffer.clear();

try
{
ByteBuffer byteBuffer = buffer.getByteBuffer();
Expand All @@ -736,18 +740,18 @@ protected Action process()
write(connection.getEndPoint(), byteBuffer, this);
return Action.SCHEDULED;
}
else if (filled == 0)

buffer.release();
buffer = null;

if (filled == 0)
{
buffer.release();
fillInterested();
return Action.IDLE;
}
else
{
buffer.release();
connection.getEndPoint().shutdownOutput();
return Action.SUCCEEDED;
}

connection.getEndPoint().shutdownOutput();
return Action.SUCCEEDED;
}
catch (IOException x)
{
Expand All @@ -768,12 +772,19 @@ protected void onSuccess()
}

@Override
protected void onCompleteFailure(Throwable x)
protected void onAborted(Throwable cause)
{
disconnect(cause);
}

@Override
protected void onCompleteFailure(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("Failed to write {} bytes {}", filled, TunnelConnection.this, x);
LOG.debug("Failed to write {} bytes {}", filled, TunnelConnection.this, cause);
buffer.release();
disconnect(x);
buffer = null;
disconnect(cause);
}

private void disconnect(Throwable x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Invocable;

public class ContextResponse extends Response.Wrapper
{
Expand All @@ -33,24 +32,24 @@ public ContextResponse(ContextHandler.ScopedContext context, Request request, Re
@Override
public void write(boolean last, ByteBuffer content, Callback callback)
{
Callback contextCallback = new Callback()
Callback contextCallback = new Callback.Wrapper(callback)
{
@Override
public void succeeded()
public boolean abort(Throwable cause)
{
_context.run(callback::succeeded, getRequest());
return _context.test(callback::abort, cause, getRequest());
}

@Override
public void failed(Throwable x)
public void failed(Throwable cause)
{
_context.accept(callback::failed, x, getRequest());
_context.accept(callback::failed, cause, getRequest());
}

@Override
public InvocationType getInvocationType()
public void succeeded()
{
return Invocable.getInvocationType(callback);
_context.run(callback::succeeded, getRequest());;
}
};
super.write(last, content, contextCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.component.Graceful;
import org.slf4j.Logger;
Expand Down Expand Up @@ -100,7 +99,11 @@ public boolean handle(Request request, Response response, Callback callback) thr
{
boolean handled = super.handle(request, response, shutdownCallback);
if (!handled)
shutdownCallback.completed();
{
_requests.decrement();
if (isShutdown())
_shutdown.check();
}
return handled;
}
catch (Throwable t)
Expand All @@ -123,25 +126,26 @@ public CompletableFuture<Void> shutdown()
return _shutdown.shutdown();
}

private class ShutdownTrackingCallback extends CountingCallback
private class ShutdownTrackingCallback extends Callback.Nested
{
final Request request;
final Response response;

public ShutdownTrackingCallback(Request request, Response response, Callback callback)
{
super(callback, 1);
super(callback);
this.request = request;
this.response = response;
_requests.increment();
}

@Override
public void completed()
protected void onCompleted(Throwable cause)
{
_requests.decrement();
if (isShutdown())
_shutdown.check();
super.onCompleted(cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ public void dump(Appendable out, String indent) throws IOException
}
}

private class HandlerCallback extends Callback.Nested implements Runnable, Dumpable
private class HandlerCallback extends Callback.Wrapper implements Runnable, Dumpable
{
private final AtomicBoolean completed = new AtomicBoolean();
private final Request request;
Expand Down Expand Up @@ -803,7 +803,7 @@ public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
}
}

private class WriteCallback extends Callback.Nested implements Dumpable
private class WriteCallback extends Callback.Wrapper implements Dumpable
{
private final AtomicBoolean callbackCompleted = new AtomicBoolean();
private final Thread writeThread;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ private Runnable lockedFailWrite(Throwable x)
_writeFailure = x;
else
ExceptionUtil.addSuppressedIfNotAssociated(_writeFailure, x);
return () -> HttpChannelState.failed(writeCallback, x);
return () -> ExceptionUtil.call(x, writeCallback::abort);
}

public long getContentBytesWritten()
Expand Down Expand Up @@ -1275,7 +1275,7 @@ else if (last && !(totalWritten == 0 && HttpMethod.HEAD.is(_request.getMethod())
if (writeFailure != null)
{
Throwable failure = writeFailure;
httpChannelState._writeInvoker.run(() -> HttpChannelState.failed(callback, failure));
httpChannelState._writeInvoker.run(() -> ExceptionUtil.call(failure, callback::failed));
return;
}

Expand Down Expand Up @@ -1343,7 +1343,7 @@ public void failed(Throwable x)
httpChannel.lockedStreamSendCompleted(false);
}
if (callback != null)
httpChannel._writeInvoker.run(() -> HttpChannelState.failed(callback, x));
httpChannel._writeInvoker.run(() -> ExceptionUtil.call(x, callback::failed));
}

@Override
Expand Down Expand Up @@ -1460,7 +1460,7 @@ public String toString()
}
}

private static class ChannelCallback implements Callback
private static class ChannelCallback extends Callback.Abstract
{
private final ChannelRequest _request;
private Throwable _completedBy;
Expand All @@ -1474,7 +1474,7 @@ private ChannelCallback(ChannelRequest request)
* Called when the {@link Handler} (or it's delegates) succeeds the request handling.
*/
@Override
public void succeeded()
public void onSucceeded()
{
// Called when the request/response cycle is completing successfully.
HttpStream stream;
Expand Down Expand Up @@ -1556,7 +1556,7 @@ else if (LOG.isDebugEnabled())
* @param failure The reason for the failure.
*/
@Override
public void failed(Throwable failure)
public void onFailed(Throwable failure)
{
try
{
Expand Down Expand Up @@ -1736,7 +1736,7 @@ public void succeeded()
}
else
{
HttpChannelState.failed(httpChannelState._handlerInvoker, failure);
ExceptionUtil.call(failure, ((Callback)httpChannelState._handlerInvoker)::failed);
}
}

Expand All @@ -1759,7 +1759,7 @@ public void failed(Throwable x)
httpChannelState._response._status = _errorResponse._status;
}
ExceptionUtil.addSuppressedIfNotAssociated(failure, x);
HttpChannelState.failed(httpChannelState._handlerInvoker, failure);
ExceptionUtil.call(failure, ((Callback)httpChannelState._handlerInvoker)::failed);
}

@Override
Expand Down Expand Up @@ -1928,25 +1928,4 @@ public void onComplianceViolation(ComplianceViolation.Event event)
}
}
}

/**
* Invoke a callback failure, handling any {@link Throwable} thrown
* by adding the passed {@code failure} as a suppressed with
* {@link ExceptionUtil#addSuppressedIfNotAssociated(Throwable, Throwable)}.
* @param callback The callback to fail
* @param failure The failure
* @throws RuntimeException If thrown, will have the {@code failure} added as a suppressed.
*/
private static void failed(Callback callback, Throwable failure)
{
try
{
callback.failed(failure);
}
catch (Throwable t)
{
ExceptionUtil.addSuppressedIfNotAssociated(t, failure);
throw t;
}
}
}
Loading
Loading