Skip to content

Commit

Permalink
Merge branch 'eclipse:jetty-10.0.x' into jetty-10.0.x
Browse files Browse the repository at this point in the history
  • Loading branch information
strogiyotec committed Jun 24, 2023
2 parents ec3501e + 4e19fac commit df5738a
Show file tree
Hide file tree
Showing 23 changed files with 370 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,11 @@ public void setAttribute(String name, Object value)
@Override
public void setStreams(InputStream i, OutputStream o)
{
_is = i;
_os = o;
assert _is != null;
if (i != null)
_is = i;
if (o != null)
_os = o;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ wml=text/vnd.wap.wml
wmlc=application/vnd.wap.wmlc
wmls=text/vnd.wap.wmlscript
wmlsc=application/vnd.wap.wmlscriptc
woff=application/font-woff
woff=font/woff
woff2=font/woff2
wrl=model/vrml
wtls-ca-certificate=application/vnd.wap.wtls-ca-certificate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.eclipse.jetty.util.Callback;

/**
* This class can be used to accumulate pairs of {@link ByteBuffer} and {@link Callback}, and eventually copy
* these into a single {@link ByteBuffer} or byte array and succeed the callbacks.
* <p>This class can be used to accumulate pairs of {@link ByteBuffer} and {@link Callback}, and eventually copy
* these into a single {@link ByteBuffer} or byte array and succeed the callbacks.</p>
*
* <p>This class is not thread safe and callers must do mutual exclusion.</p>
*/
public class ByteBufferCallbackAccumulator
{
Expand Down Expand Up @@ -89,11 +91,14 @@ public void writeTo(ByteBuffer buffer)

public void fail(Throwable t)
{
for (Entry entry : _entries)
// In some usages the callback recursively fails the accumulator.
// So we copy and clear to avoid double completing the callback.
ArrayList<Entry> entries = new ArrayList<>(_entries);
_entries.clear();
_length = 0;
for (Entry entry : entries)
{
entry.callback.failed(t);
}
_entries.clear();
_length = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
Expand Down Expand Up @@ -145,11 +146,13 @@ public void destroy()
{
try
{
_client.stop();
LifeCycle.stop(_client);
}
catch (Exception x)
{
if (_log.isDebugEnabled())
if (_log == null)
x.printStackTrace();
else if (_log.isDebugEnabled())
_log.debug("Failed to stop client", x);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.proxy;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.client.api.Response.CompleteListener;
import org.junit.jupiter.api.Test;

public class AbstractProxyServletTest
{

@Test
public void testNewDestroy() throws Exception
{
new AbstractProxyServlet()
{
private static final long serialVersionUID = 1L;

@Override
protected CompleteListener newProxyResponseListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse)
{
return null;
}
}.destroy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.ajax.JSON;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -159,9 +160,15 @@ private void startClient() throws Exception
@AfterEach
public void dispose() throws Exception
{
client.stop();
proxy.stop();
server.stop();
LifeCycle.stop(client);
LifeCycle.stop(proxy);
LifeCycle.stop(proxy);
}

@Test
public void testNewDestroy() throws Exception
{
new AsyncMiddleManServlet().destroy();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ public void failed(Throwable x)
break;
case PENDING:
{
_state = State.FAILED;
failure = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
Expand Down Expand Up @@ -322,4 +323,44 @@ boolean waitForComplete() throws InterruptedException
return isSucceeded();
}
}

@Test
public void testMultipleFailures() throws Exception
{
AtomicInteger process = new AtomicInteger();
AtomicInteger failure = new AtomicInteger();
IteratingCallback icb = new IteratingCallback()
{
@Override
protected Action process() throws Throwable
{
process.incrementAndGet();
return Action.SCHEDULED;
}

@Override
protected void onCompleteFailure(Throwable cause)
{
super.onCompleteFailure(cause);
failure.incrementAndGet();
}
};

icb.iterate();
assertEquals(1, process.get());
assertEquals(0, failure.get());

icb.failed(new Throwable("test1"));

assertEquals(1, process.get());
assertEquals(1, failure.get());

icb.succeeded();
assertEquals(1, process.get());
assertEquals(1, failure.get());

icb.failed(new Throwable("test2"));
assertEquals(1, process.get());
assertEquals(1, failure.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -464,16 +464,23 @@ private boolean inflate(Frame frame, Callback callback, boolean first) throws Da
chunk.setPayload(payload);
chunk.setFin(frame.isFin() && complete);

boolean succeedCallback = complete;
// If we are complete we return true, then DemandingFlusher.process() will null out the Frame and Callback.
// The application may decide to hold onto the buffer and delay completing the callback, so we need to capture
// references to these in the payloadCallback and not rely on state of the flusher which may have moved on.
// This flusher could be failed while the application already has the payloadCallback, so we need protection against
// the flusher failing and the application completing the callback, that's why we use the payload AtomicReference.
boolean completeCallback = complete;
AtomicReference<ByteBuffer> payloadRef = _payloadRef;
Callback payloadCallback = Callback.from(() ->
{
getBufferPool().release(payloadRef.getAndSet(null));
if (succeedCallback)
if (completeCallback)
callback.succeeded();
}, t ->
{
getBufferPool().release(payloadRef.getAndSet(null));
if (completeCallback)
callback.failed(t);
failFlusher(t);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public AbstractMessageSink(CoreSession session, MethodHandle methodHandle)
this.session = Objects.requireNonNull(session, "CoreSession");
this.methodHandle = Objects.requireNonNull(methodHandle, "MethodHandle");
}

@Override
public void fail(Throwable failure)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,11 @@ public void accept(Frame frame, Callback callback)
}
}
}

@Override
public void fail(Throwable failure)
{
if (out != null)
out.fail(failure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,11 @@ public void accept(Frame frame, Callback callback)
}
}
}

@Override
public void fail(Throwable failure)
{
if (out != null)
out.fail(failure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,11 @@ public void succeeded()

typeSink.accept(frame, frameCallback);
}

@Override
public void fail(Throwable failure)
{
if (typeSink != null)
typeSink.fail(failure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.websocket.core.Frame;
import org.slf4j.Logger;
Expand Down Expand Up @@ -127,40 +128,6 @@ public int read(ByteBuffer buffer) throws IOException
return fillLen;
}

@Override
public void close() throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("close()");

ArrayList<Entry> entries = new ArrayList<>();
try (AutoLock l = lock.lock())
{
if (closed)
return;
closed = true;

if (currentEntry != null)
{
entries.add(currentEntry);
currentEntry = null;
}

// Clear queue and fail all entries.
entries.addAll(buffers);
buffers.clear();
buffers.offer(CLOSED);
}

// Succeed all entries as we don't need them anymore (failing would close the connection).
for (Entry e : entries)
{
e.callback.succeeded();
}

super.close();
}

public void setTimeout(long timeoutMs)
{
this.timeoutMs = timeoutMs;
Expand Down Expand Up @@ -218,6 +185,49 @@ private Entry getCurrentEntry() throws IOException
}
}

@Override
public void close() throws IOException
{
fail(null);
}

@Override
public void fail(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("close()");

ArrayList<Entry> entries = new ArrayList<>();
try (AutoLock l = lock.lock())
{
if (closed)
return;
closed = true;

if (currentEntry != null)
{
entries.add(currentEntry);
currentEntry = null;
}

// Clear queue and fail all entries.
entries.addAll(buffers);
buffers.clear();
buffers.offer(CLOSED);
}

// Succeed all entries as we don't need them anymore (failing would close the connection).
for (Entry e : entries)
{
if (failure == null)
e.callback.succeeded();
else
e.callback.failed(failure);
}

IO.close(super::close);
}

private static class Entry
{
public ByteBuffer buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public void close() throws IOException
stream.close();
}

@Override
public void fail(Throwable failure)
{
stream.fail(failure);
}

@Override
public void accept(Frame frame, Callback callback)
{
Expand Down
Loading

0 comments on commit df5738a

Please sign in to comment.