diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java index c49d3c2c946e..af495308727b 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java @@ -22,8 +22,8 @@ import java.io.InputStream; import java.net.Socket; import java.nio.ByteBuffer; -import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -44,7 +44,6 @@ import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.NetworkTrafficListener; import org.eclipse.jetty.io.NetworkTrafficSocketChannelEndPoint; -import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.NetworkTrafficServerConnector; @@ -507,16 +506,9 @@ private NetworkTrafficHttpClient(AtomicReference listene super(new HttpClientTransportOverHTTP(new ClientConnector() { @Override - protected SelectorManager newSelectorManager() + protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) { - return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors()) - { - @Override - protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) - { - return new NetworkTrafficSocketChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout().toMillis(), listener.get()); - } - }; + return new NetworkTrafficSocketChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout().toMillis(), listener.get()); } })); this.listener = listener; diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java index c961343c3c51..f575938cd4c4 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java @@ -50,10 +50,10 @@ import org.eclipse.jetty.client.ssl.SslBytesTest.TLSRecord.Type; import org.eclipse.jetty.http.HttpCompliance; import org.eclipse.jetty.http.HttpParser; -import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConnection; @@ -189,11 +189,11 @@ public boolean flush(ByteBuffer... appOuts) throws IOException ServerConnector connector = new ServerConnector(server, null, null, null, 1, 1, sslFactory, httpFactory) { @Override - protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException { - ChannelEndPoint endp = super.newEndPoint(channel, selectSet, key); - serverEndPoint.set(endp); - return endp; + SocketChannelEndPoint endPoint = super.newEndPoint(channel, selectSet, key); + serverEndPoint.set(endPoint); + return endPoint; } }; connector.setIdleTimeout(idleTimeout); diff --git a/jetty-client/src/test/resources/jetty-logging.properties b/jetty-client/src/test/resources/jetty-logging.properties index e6bbf9a6ca01..2bbfa1a3add2 100644 --- a/jetty-client/src/test/resources/jetty-logging.properties +++ b/jetty-client/src/test/resources/jetty-logging.properties @@ -1,6 +1,6 @@ # Jetty Logging using jetty-slf4j-impl #org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.client.LEVEL=DEBUG -#org.eclipse.jetty.io.ChannelEndPoint.LEVEL=DEBUG +#org.eclipse.jetty.io.SocketChannelEndPoint.LEVEL=DEBUG #org.eclipse.jetty.io.ssl.LEVEL=DEBUG #org.eclipse.jetty.http.LEVEL=DEBUG diff --git a/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java b/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java index 3d67bed20477..ecb818614bbb 100644 --- a/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java +++ b/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; -import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -57,7 +56,6 @@ import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.parser.Parser; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.logging.StacklessLogging; @@ -118,7 +116,7 @@ public void testRequestResponseNoContent() throws Exception startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException + protected void service(HttpServletRequest req, HttpServletResponse resp) { latch.countDown(); } @@ -175,7 +173,7 @@ public void testRequestResponseContent() throws Exception startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException + protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException { latch.countDown(); resp.getOutputStream().write(content); @@ -321,7 +319,7 @@ public void testCommitFailure() throws Exception startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { try { @@ -340,7 +338,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response) ServerConnector connector2 = new ServerConnector(server, new HTTP2ServerConnectionFactory(new HttpConfiguration())) { @Override - protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) { return new SocketChannelEndPoint(channel, selectSet, key, getScheduler()) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 7cd08ad45426..b4483f703d27 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -469,11 +469,11 @@ public String toEndPointString() name = c.getSimpleName(); } - return String.format("%s@%h{%s<->%s,%s,fill=%s,flush=%s,to=%d/%d}", + return String.format("%s@%h{l=%s,r=%s,%s,fill=%s,flush=%s,to=%d/%d}", name, this, - getRemoteAddress(), getLocalAddress(), + getRemoteAddress(), _state.get(), _fillInterest.toStateString(), _writeFlusher.toStateString(), diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java deleted file mode 100644 index 2c89a1737536..000000000000 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ /dev/null @@ -1,429 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. -// -// This program and the accompanying materials are made available under -// the terms of the Eclipse Public License 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0 -// -// This Source Code may also be made available under the following -// Secondary Licenses when the conditions for such availability set -// forth in the Eclipse Public License, v. 2.0 are satisfied: -// the Apache License v2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0 -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== -// - -package org.eclipse.jetty.io; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.GatheringByteChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; - -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.thread.Invocable; -import org.eclipse.jetty.util.thread.Scheduler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Channel End Point. - *

Holds the channel and socket for an NIO endpoint. - */ -public abstract class ChannelEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable -{ - private static final Logger LOG = LoggerFactory.getLogger(ChannelEndPoint.class); - - private final ByteChannel _channel; - private final GatheringByteChannel _gather; - protected final ManagedSelector _selector; - protected final SelectionKey _key; - private boolean _updatePending; - - /** - * The current value for {@link SelectionKey#interestOps()}. - */ - protected int _currentInterestOps; - - /** - * The desired value for {@link SelectionKey#interestOps()}. - */ - protected int _desiredInterestOps; - - private abstract class RunnableTask implements Runnable, Invocable - { - final String _operation; - - protected RunnableTask(String op) - { - _operation = op; - } - - @Override - public String toString() - { - return String.format("CEP:%s:%s:%s", ChannelEndPoint.this, _operation, getInvocationType()); - } - } - - private abstract class RunnableCloseable extends RunnableTask implements Closeable - { - protected RunnableCloseable(String op) - { - super(op); - } - - @Override - public void close() - { - try - { - ChannelEndPoint.this.close(); - } - catch (Throwable x) - { - LOG.warn("Unable to close ChannelEndPoint", x); - } - } - } - - private final ManagedSelector.SelectorUpdate _updateKeyAction = new ManagedSelector.SelectorUpdate() - { - @Override - public void update(Selector selector) - { - updateKey(); - } - }; - - private final Runnable _runFillable = new RunnableCloseable("runFillable") - { - @Override - public InvocationType getInvocationType() - { - return getFillInterest().getCallbackInvocationType(); - } - - @Override - public void run() - { - getFillInterest().fillable(); - } - }; - - private final Runnable _runCompleteWrite = new RunnableCloseable("runCompleteWrite") - { - @Override - public InvocationType getInvocationType() - { - return getWriteFlusher().getCallbackInvocationType(); - } - - @Override - public void run() - { - getWriteFlusher().completeWrite(); - } - - @Override - public String toString() - { - return String.format("CEP:%s:%s:%s->%s", ChannelEndPoint.this, _operation, getInvocationType(), getWriteFlusher()); - } - }; - - private final Runnable _runCompleteWriteFillable = new RunnableCloseable("runCompleteWriteFillable") - { - @Override - public InvocationType getInvocationType() - { - InvocationType fillT = getFillInterest().getCallbackInvocationType(); - InvocationType flushT = getWriteFlusher().getCallbackInvocationType(); - if (fillT == flushT) - return fillT; - - if (fillT == InvocationType.EITHER && flushT == InvocationType.NON_BLOCKING) - return InvocationType.EITHER; - - if (fillT == InvocationType.NON_BLOCKING && flushT == InvocationType.EITHER) - return InvocationType.EITHER; - - return InvocationType.BLOCKING; - } - - @Override - public void run() - { - getWriteFlusher().completeWrite(); - getFillInterest().fillable(); - } - }; - - public ChannelEndPoint(ByteChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) - { - super(scheduler); - _channel = channel; - _selector = selector; - _key = key; - _gather = (channel instanceof GatheringByteChannel) ? (GatheringByteChannel)channel : null; - } - - @Override - public boolean isOpen() - { - return _channel.isOpen(); - } - - @Override - public void doClose() - { - if (LOG.isDebugEnabled()) - LOG.debug("doClose {}", this); - try - { - _channel.close(); - } - catch (IOException e) - { - LOG.debug("Unable to close channel", e); - } - finally - { - super.doClose(); - } - } - - @Override - public void onClose(Throwable cause) - { - try - { - super.onClose(cause); - } - finally - { - if (_selector != null) - _selector.destroyEndPoint(this, cause); - } - } - - @Override - public int fill(ByteBuffer buffer) throws IOException - { - if (isInputShutdown()) - return -1; - - int pos = BufferUtil.flipToFill(buffer); - int filled; - try - { - filled = _channel.read(buffer); - if (filled > 0) - notIdle(); - else if (filled == -1) - shutdownInput(); - } - catch (IOException e) - { - LOG.debug("Unable to shutdown output", e); - shutdownInput(); - filled = -1; - } - finally - { - BufferUtil.flipToFlush(buffer, pos); - } - if (LOG.isDebugEnabled()) - LOG.debug("filled {} {}", filled, BufferUtil.toDetailString(buffer)); - return filled; - } - - @Override - public boolean flush(ByteBuffer... buffers) throws IOException - { - long flushed = 0; - try - { - if (buffers.length == 1) - flushed = _channel.write(buffers[0]); - else if (_gather != null && buffers.length > 1) - flushed = _gather.write(buffers, 0, buffers.length); - else - { - for (ByteBuffer b : buffers) - { - if (b.hasRemaining()) - { - int l = _channel.write(b); - if (l > 0) - flushed += l; - if (b.hasRemaining()) - break; - } - } - } - if (LOG.isDebugEnabled()) - LOG.debug("flushed {} {}", flushed, this); - } - catch (IOException e) - { - throw new EofException(e); - } - - if (flushed > 0) - notIdle(); - - for (ByteBuffer b : buffers) - { - if (!BufferUtil.isEmpty(b)) - return false; - } - - return true; - } - - public ByteChannel getChannel() - { - return _channel; - } - - @Override - public Object getTransport() - { - return _channel; - } - - @Override - protected void needsFillInterest() - { - changeInterests(SelectionKey.OP_READ); - } - - @Override - protected void onIncompleteFlush() - { - changeInterests(SelectionKey.OP_WRITE); - } - - @Override - public Runnable onSelected() - { - /** - * This method may run concurrently with {@link #changeInterests(int)}. - */ - - int readyOps = _key.readyOps(); - int oldInterestOps; - int newInterestOps; - synchronized (this) - { - _updatePending = true; - // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both). - oldInterestOps = _desiredInterestOps; - newInterestOps = oldInterestOps & ~readyOps; - _desiredInterestOps = newInterestOps; - } - - boolean fillable = (readyOps & SelectionKey.OP_READ) != 0; - boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0; - - if (LOG.isDebugEnabled()) - LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this); - - // return task to complete the job - Runnable task = fillable - ? (flushable - ? _runCompleteWriteFillable - : _runFillable) - : (flushable - ? _runCompleteWrite - : null); - - if (LOG.isDebugEnabled()) - LOG.debug("task {}", task); - return task; - } - - @Override - public void updateKey() - { - /** - * This method may run concurrently with {@link #changeInterests(int)}. - */ - - try - { - int oldInterestOps; - int newInterestOps; - synchronized (this) - { - _updatePending = false; - oldInterestOps = _currentInterestOps; - newInterestOps = _desiredInterestOps; - if (oldInterestOps != newInterestOps) - { - _currentInterestOps = newInterestOps; - _key.interestOps(newInterestOps); - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this); - } - catch (CancelledKeyException x) - { - LOG.debug("Ignoring key update for concurrently closed channel {}", this); - close(); - } - catch (Throwable x) - { - LOG.warn("Ignoring key update for " + this, x); - close(); - } - } - - private void changeInterests(int operation) - { - /** - * This method may run concurrently with - * {@link #updateKey()} and {@link #onSelected()}. - */ - - int oldInterestOps; - int newInterestOps; - boolean pending; - synchronized (this) - { - pending = _updatePending; - oldInterestOps = _desiredInterestOps; - newInterestOps = oldInterestOps | operation; - if (newInterestOps != oldInterestOps) - _desiredInterestOps = newInterestOps; - } - - if (LOG.isDebugEnabled()) - LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this); - - if (!pending && _selector != null) - _selector.submit(_updateKeyAction); - } - - @Override - public String toEndPointString() - { - // We do a best effort to print the right toString() and that's it. - return String.format("%s{io=%d/%d,kio=%d,kro=%d}", - super.toEndPointString(), - _currentInterestOps, - _desiredInterestOps, - ManagedSelector.safeInterestOps(_key), - ManagedSelector.safeReadyOps(_key)); - } -} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java index a997de630017..e5d8730b10c8 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.io; -import java.io.Closeable; import java.io.IOException; import java.net.SocketAddress; import java.net.SocketException; @@ -253,7 +252,7 @@ public void connect(SocketAddress address, Map context) // exception is being thrown, so we attempt to provide a better error message. if (x.getClass() == SocketException.class) x = new SocketException("Could not connect to " + address).initCause(x); - safeClose(channel); + IO.close(channel); connectFailed(x, context); } } @@ -273,21 +272,21 @@ public void accept(SocketChannel channel, Map context) { if (LOG.isDebugEnabled()) LOG.debug("Could not accept {}", channel); - safeClose(channel); + IO.close(channel); Promise promise = (Promise)context.get(CONNECTION_PROMISE_CONTEXT_KEY); if (promise != null) promise.failed(failure); } } - protected void safeClose(Closeable closeable) + protected void configure(SocketChannel channel) throws IOException { - IO.close(closeable); + channel.socket().setTcpNoDelay(true); } - protected void configure(SocketChannel channel) throws IOException + protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) { - channel.socket().setTcpNoDelay(true); + return new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler()); } protected void connectFailed(Throwable failure, Map context) @@ -309,7 +308,7 @@ public ClientSelectorManager(Executor executor, Scheduler scheduler, int selecto @Override protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) { - SocketChannelEndPoint endPoint = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler()); + EndPoint endPoint = ClientConnector.this.newEndPoint((SocketChannel)channel, selector, selectionKey); endPoint.setIdleTimeout(getIdleTimeout().toMillis()); return endPoint; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index fbdd71ad77b2..47651f816a03 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -80,7 +80,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } private final AtomicBoolean _started = new AtomicBoolean(false); - private boolean _selecting = false; + private boolean _selecting; private final SelectorManager _selectorManager; private final int _id; private final ExecutionStrategy _strategy; @@ -123,22 +123,6 @@ protected void doStart() throws Exception start._started.await(); } - protected void onSelectFailed(Throwable cause) - { - // override to change behavior - } - - public int size() - { - Selector s = _selector; - if (s == null) - return 0; - Set keys = s.keys(); - if (keys == null) - return 0; - return keys.size(); - } - @Override protected void doStop() throws Exception { @@ -160,22 +144,119 @@ protected void doStop() throws Exception super.doStop(); } + protected int nioSelect(Selector selector, boolean now) throws IOException + { + return now ? selector.selectNow() : selector.select(); + } + + protected int select(Selector selector) throws IOException + { + try + { + int selected = nioSelect(selector, false); + if (selected == 0) + { + if (LOG.isDebugEnabled()) + LOG.debug("Selector {} woken with none selected", selector); + + if (Thread.interrupted() && !isRunning()) + throw new ClosedSelectorException(); + + if (FORCE_SELECT_NOW) + selected = nioSelect(selector, true); + } + return selected; + } + catch (ClosedSelectorException x) + { + throw x; + } + catch (Throwable x) + { + handleSelectFailure(selector, x); + return 0; + } + } + + protected void handleSelectFailure(Selector selector, Throwable failure) throws IOException + { + LOG.info("Caught select() failure, trying to recover: {}", failure.toString()); + if (LOG.isDebugEnabled()) + LOG.debug("", failure); + + Selector newSelector = _selectorManager.newSelector(); + for (SelectionKey oldKey : selector.keys()) + { + SelectableChannel channel = oldKey.channel(); + int interestOps = safeInterestOps(oldKey); + if (interestOps >= 0) + { + try + { + Object attachment = oldKey.attachment(); + SelectionKey newKey = channel.register(newSelector, interestOps, attachment); + if (attachment instanceof Selectable) + ((Selectable)attachment).replaceKey(newKey); + oldKey.cancel(); + if (LOG.isDebugEnabled()) + LOG.debug("Transferred {} iOps={} att={}", channel, interestOps, attachment); + } + catch (Throwable t) + { + if (LOG.isDebugEnabled()) + LOG.debug("Could not transfer {}", channel, t); + IO.close(channel); + } + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Invalid interestOps for {}", channel); + IO.close(channel); + } + } + + IO.close(selector); + _selector = newSelector; + } + + protected void onSelectFailed(Throwable cause) + { + // override to change behavior + } + + public int size() + { + Selector s = _selector; + if (s == null) + return 0; + Set keys = s.keys(); + if (keys == null) + return 0; + return keys.size(); + } + /** * Submit an {@link SelectorUpdate} to be acted on between calls to {@link Selector#select()} * * @param update The selector update to apply at next wakeup */ public void submit(SelectorUpdate update) + { + submit(update, false); + } + + private void submit(SelectorUpdate update, boolean lazy) { if (LOG.isDebugEnabled()) - LOG.debug("Queued change {} on {}", update, this); + LOG.debug("Queued change lazy={} {} on {}", lazy, update, this); Selector selector = null; synchronized (ManagedSelector.this) { _updates.offer(update); - if (_selecting) + if (_selecting && !lazy) { selector = _selector; // To avoid the extra select wakeup. @@ -223,7 +304,7 @@ private void execute(Runnable task) } } - private void processConnect(SelectionKey key, final Connect connect) + private void processConnect(SelectionKey key, Connect connect) { SelectableChannel channel = key.channel(); try @@ -271,7 +352,18 @@ private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey Object context = selectionKey.attachment(); Connection connection = _selectorManager.newConnection(channel, endPoint, context); endPoint.setConnection(connection); - selectionKey.attach(endPoint); + submit(selector -> + { + SelectionKey key = selectionKey; + if (key.selector() != selector) + { + key = channel.keyFor(selector); + if (key != null && endPoint instanceof Selectable) + ((Selectable)endPoint).replaceKey(key); + } + if (key != null) + key.attach(endPoint); + }, true); endPoint.onOpen(); endPointOpened(endPoint); _selectorManager.connectionOpened(connection, context); @@ -279,7 +371,7 @@ private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey LOG.debug("Created {}", endPoint); } - public void destroyEndPoint(final EndPoint endPoint, Throwable cause) + void destroyEndPoint(EndPoint endPoint, Throwable cause) { // Waking up the selector is necessary to clean the // cancelled-key set and tell the TCP stack that the @@ -330,8 +422,8 @@ public void dump(Appendable out, String indent) throws IOException Selector selector = _selector; if (selector != null && selector.isOpen()) { - final DumpKeys dump = new DumpKeys(); - final String updatesAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()); + DumpKeys dump = new DumpKeys(); + String updatesAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()); synchronized (ManagedSelector.this) { updates = new ArrayList<>(_updates); @@ -387,6 +479,14 @@ public interface Selectable * {@link ManagedSelector} for this endpoint have been processed. */ void updateKey(); + + /** + * Callback method invoked when the SelectionKey is replaced + * because the channel has been moved to a new selector. + * + * @param newKey the new SelectionKey + */ + void replaceKey(SelectionKey newKey); } private class SelectorProducer implements ExecutionStrategy.Producer @@ -434,9 +534,9 @@ private void processUpdates() LOG.debug("update {}", update); update.update(_selector); } - catch (Throwable th) + catch (Throwable x) { - LOG.warn("Cannot update selector {}", _selector, th); + LOG.warn("Cannot update selector {}", ManagedSelector.this, x); } } _updateable.clear(); @@ -466,39 +566,33 @@ private boolean select() try { Selector selector = _selector; - if (selector != null && selector.isOpen()) + if (selector != null) { if (LOG.isDebugEnabled()) LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size()); - int selected = selector.select(); - if (selected == 0) + int selected = ManagedSelector.this.select(selector); + // The selector may have been recreated. + selector = _selector; + if (selector != null) { if (LOG.isDebugEnabled()) - LOG.debug("Selector {} woken with none selected", selector); + LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size()); - if (Thread.interrupted() && !isRunning()) - throw new ClosedSelectorException(); + int updates; + synchronized (ManagedSelector.this) + { + // finished selecting + _selecting = false; + updates = _updates.size(); + } - if (FORCE_SELECT_NOW) - selected = selector.selectNow(); - } - if (LOG.isDebugEnabled()) - LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size()); + _keys = selector.selectedKeys(); + _cursor = _keys.isEmpty() ? Collections.emptyIterator() : _keys.iterator(); + if (LOG.isDebugEnabled()) + LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates); - int updates; - synchronized (ManagedSelector.this) - { - // finished selecting - _selecting = false; - updates = _updates.size(); + return true; } - - _keys = selector.selectedKeys(); - _cursor = _keys.isEmpty() ? Collections.emptyIterator() : _keys.iterator(); - if (LOG.isDebugEnabled()) - LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates); - - return true; } } catch (Throwable x) @@ -514,7 +608,8 @@ private boolean select() else { LOG.warn(x.toString()); - LOG.debug("select() failure", x); + if (LOG.isDebugEnabled()) + LOG.debug("select() failure", x); } } return false; @@ -525,9 +620,10 @@ private Runnable processSelected() while (_cursor.hasNext()) { SelectionKey key = _cursor.next(); + Object attachment = key.attachment(); + SelectableChannel channel = key.channel(); if (key.isValid()) { - Object attachment = key.attachment(); if (LOG.isDebugEnabled()) LOG.debug("selected {} {} {} ", safeReadyOps(key), key, attachment); try @@ -550,24 +646,21 @@ else if (key.isConnectable()) } catch (CancelledKeyException x) { - LOG.debug("Ignoring cancelled key for channel {}", key.channel()); - if (attachment instanceof EndPoint) - IO.close((EndPoint)attachment); + if (LOG.isDebugEnabled()) + LOG.debug("Ignoring cancelled key for channel {}", channel); + IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel); } catch (Throwable x) { - LOG.warn("Could not process key for channel " + key.channel(), x); - if (attachment instanceof EndPoint) - IO.close((EndPoint)attachment); + LOG.warn("Could not process key for channel {}", channel, x); + IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel); } } else { if (LOG.isDebugEnabled()) - LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel()); - Object attachment = key.attachment(); - if (attachment instanceof EndPoint) - IO.close((EndPoint)attachment); + LOG.debug("Selector loop ignoring invalid key for channel {}", channel); + IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel); } } return null; @@ -616,7 +709,7 @@ public void update(Selector selector) private static class DumpKeys implements SelectorUpdate { - private CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch latch = new CountDownLatch(1); private List keys; @Override @@ -652,9 +745,9 @@ class Acceptor implements SelectorUpdate, Selectable, Closeable private final SelectableChannel _channel; private SelectionKey _key; - public Acceptor(SelectableChannel channel) + Acceptor(SelectableChannel channel) { - this._channel = channel; + _channel = channel; } @Override @@ -662,31 +755,26 @@ public void update(Selector selector) { try { - if (_key == null) - { - _key = _channel.register(selector, SelectionKey.OP_ACCEPT, this); - } - + _key = _channel.register(selector, SelectionKey.OP_ACCEPT, this); if (LOG.isDebugEnabled()) - LOG.debug("{} acceptor={}", this, _key); + LOG.debug("{} acceptor={}", this, _channel); } catch (Throwable x) { IO.close(_channel); - LOG.warn("Unable to register OP_ACCEPT on selector", x); + LOG.warn("Unable to register OP_ACCEPT on selector for {}", _channel, x); } } @Override public Runnable onSelected() { - SelectableChannel server = _key.channel(); SelectableChannel channel = null; try { while (true) { - channel = _selectorManager.doAccept(server); + channel = _selectorManager.doAccept(_channel); if (channel == null) break; _selectorManager.accepted(channel); @@ -694,10 +782,9 @@ public Runnable onSelected() } catch (Throwable x) { + LOG.warn("Accept failed for channel {}", channel, x); IO.close(channel); - LOG.warn("Accept failed for channel " + channel, x); } - return null; } @@ -706,13 +793,18 @@ public void updateKey() { } + @Override + public void replaceKey(SelectionKey newKey) + { + _key = newKey; + } + @Override public void close() throws IOException { - SelectionKey key = _key; - _key = null; - if (key != null && key.isValid()) - key.cancel(); + // May be called from any thread. + // Implements AbstractConnector.setAccepting(boolean). + submit(selector -> _key.cancel()); } } @@ -732,7 +824,8 @@ class Accept implements SelectorUpdate, Runnable, Closeable @Override public void close() { - LOG.debug("closed accept of {}", channel); + if (LOG.isDebugEnabled()) + LOG.debug("closed accept of {}", channel); IO.close(channel); } @@ -748,7 +841,8 @@ public void update(Selector selector) { IO.close(channel); _selectorManager.onAcceptFailed(channel, x); - LOG.debug("Unable to register update for accept", x); + if (LOG.isDebugEnabled()) + LOG.debug("Could not register channel after accept {}", channel, x); } } @@ -762,7 +856,6 @@ public void run() } catch (Throwable x) { - LOG.debug("Unable to accept", x); failed(x); } } @@ -770,10 +863,17 @@ public void run() protected void failed(Throwable failure) { IO.close(channel); - LOG.warn("ManagedSelector#Accept failure : {}", Objects.toString(failure)); - LOG.debug("ManagedSelector#Accept failure", failure); + LOG.warn("Could not accept {}: {}", channel, String.valueOf(failure)); + if (LOG.isDebugEnabled()) + LOG.debug("", failure); _selectorManager.onAcceptFailed(channel, failure); } + + @Override + public String toString() + { + return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), channel); + } } class Connect implements SelectorUpdate, Runnable @@ -833,16 +933,15 @@ public String toString() private class CloseConnections implements SelectorUpdate { - final Set _closed; - final CountDownLatch _noEndPoints = new CountDownLatch(1); - final CountDownLatch _complete = new CountDownLatch(1); + private final Set _closed; + private final CountDownLatch _complete = new CountDownLatch(1); - public CloseConnections() + private CloseConnections() { this(null); } - public CloseConnections(Set closed) + private CloseConnections(Set closed) { _closed = closed; } @@ -852,7 +951,6 @@ public void update(Selector selector) { if (LOG.isDebugEnabled()) LOG.debug("Closing {} connections on {}", selector.keys().size(), ManagedSelector.this); - boolean zero = true; for (SelectionKey key : selector.keys()) { if (key != null && key.isValid()) @@ -861,14 +959,9 @@ public void update(Selector selector) Object attachment = key.attachment(); if (attachment instanceof EndPoint) { - EndPoint endp = (EndPoint)attachment; - if (!endp.isOutputShutdown()) - zero = false; - Connection connection = endp.getConnection(); - if (connection != null) - closeable = connection; - else - closeable = endp; + EndPoint endPoint = (EndPoint)attachment; + Connection connection = endPoint.getConnection(); + closeable = Objects.requireNonNullElse(connection, endPoint); } if (closeable != null) @@ -885,30 +978,26 @@ else if (!_closed.contains(closeable)) } } } - - if (zero) - _noEndPoints.countDown(); _complete.countDown(); } } private class StopSelector implements SelectorUpdate { - CountDownLatch _stopped = new CountDownLatch(1); + private final CountDownLatch _stopped = new CountDownLatch(1); @Override public void update(Selector selector) { for (SelectionKey key : selector.keys()) { - if (key != null && key.isValid()) - { - Object attachment = key.attachment(); - if (attachment instanceof EndPoint) - IO.close((EndPoint)attachment); - } + // Key may be null when using the UnixSocket selector. + if (key == null) + continue; + Object attachment = key.attachment(); + if (attachment instanceof Closeable) + IO.close((Closeable)attachment); } - _selector = null; IO.close(selector); _stopped.countDown(); @@ -936,8 +1025,9 @@ public void run() catch (Throwable failure) { IO.close(_connect.channel); - LOG.warn("ManagedSelector#CreateEndpoint failure : {}", Objects.toString(failure)); - LOG.debug("ManagedSelector#CreateEndpoint failure", failure); + LOG.warn("Could not create EndPoint {}: {}", _connect.channel, String.valueOf(failure)); + if (LOG.isDebugEnabled()) + LOG.debug("", failure); _connect.failed(failure); } } @@ -945,7 +1035,7 @@ public void run() @Override public String toString() { - return String.format("CreateEndPoint@%x{%s,%s}", hashCode(), _connect, _key); + return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _connect); } } @@ -954,7 +1044,7 @@ private class DestroyEndPoint implements Runnable, Closeable private final EndPoint endPoint; private final Throwable cause; - public DestroyEndPoint(EndPoint endPoint, Throwable cause) + private DestroyEndPoint(EndPoint endPoint, Throwable cause) { this.endPoint = endPoint; this.cause = cause; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSocketChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSocketChannelEndPoint.java index 81fe1c04c88e..272c0ecc0aca 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSocketChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSocketChannelEndPoint.java @@ -20,8 +20,8 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; @@ -36,7 +36,7 @@ public class NetworkTrafficSocketChannelEndPoint extends SocketChannelEndPoint private final NetworkTrafficListener listener; - public NetworkTrafficSocketChannelEndPoint(SelectableChannel channel, ManagedSelector selectSet, SelectionKey key, Scheduler scheduler, long idleTimeout, NetworkTrafficListener listener) + public NetworkTrafficSocketChannelEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key, Scheduler scheduler, long idleTimeout, NetworkTrafficListener listener) { super(channel, selectSet, key, scheduler); setIdleTimeout(idleTimeout); @@ -80,7 +80,7 @@ public void onOpen() { try { - listener.opened(getSocket()); + listener.opened(getChannel().socket()); } catch (Throwable x) { @@ -97,7 +97,7 @@ public void onClose(Throwable failure) { try { - listener.closed(getSocket()); + listener.closed(getChannel().socket()); } catch (Throwable x) { @@ -113,7 +113,7 @@ public void notifyIncoming(ByteBuffer buffer, int read) try { ByteBuffer view = buffer.asReadOnlyBuffer(); - listener.incoming(getSocket(), view); + listener.incoming(getChannel().socket(), view); } catch (Throwable x) { @@ -128,7 +128,7 @@ public void notifyOutgoing(ByteBuffer view) { try { - listener.outgoing(getSocket(), view); + listener.outgoing(getChannel().socket(), view); } catch (Throwable x) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 1bcd0080ed87..81b034f4a2d3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -195,7 +195,7 @@ public void accept(SelectableChannel channel) */ public void accept(SelectableChannel channel, Object attachment) { - final ManagedSelector selector = chooseSelector(); + ManagedSelector selector = chooseSelector(); selector.submit(selector.new Accept(channel, attachment)); } @@ -210,7 +210,7 @@ public void accept(SelectableChannel channel, Object attachment) */ public Closeable acceptor(SelectableChannel server) { - final ManagedSelector selector = chooseSelector(); + ManagedSelector selector = chooseSelector(); ManagedSelector.Acceptor acceptor = selector.new Acceptor(server); selector.submit(acceptor); return acceptor; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java index 23809302b8ca..e662c104b588 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java @@ -18,53 +18,165 @@ package org.eclipse.jetty.io; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; -import java.nio.channels.SelectableChannel; +import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SocketChannelEndPoint extends ChannelEndPoint +/** + * Channel End Point. + *

Holds the channel and socket for an NIO endpoint. + */ +public class SocketChannelEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable { private static final Logger LOG = LoggerFactory.getLogger(SocketChannelEndPoint.class); - private final Socket _socket; - private final InetSocketAddress _local; - private final InetSocketAddress _remote; - public SocketChannelEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) + private final SocketChannel _channel; + private final ManagedSelector _selector; + private SelectionKey _key; + private boolean _updatePending; + // The current value for interestOps. + private int _currentInterestOps; + // The desired value for interestOps. + private int _desiredInterestOps; + + private abstract class RunnableTask implements Runnable, Invocable { - this((SocketChannel)channel, selector, key, scheduler); + final String _operation; + + protected RunnableTask(String op) + { + _operation = op; + } + + @Override + public String toString() + { + return String.format("%s:%s:%s", SocketChannelEndPoint.this, _operation, getInvocationType()); + } } - public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) + private abstract class RunnableCloseable extends RunnableTask implements Closeable { - super(channel, selector, key, scheduler); + protected RunnableCloseable(String op) + { + super(op); + } - _socket = channel.socket(); - _local = (InetSocketAddress)_socket.getLocalSocketAddress(); - _remote = (InetSocketAddress)_socket.getRemoteSocketAddress(); + @Override + public void close() + { + try + { + SocketChannelEndPoint.this.close(); + } + catch (Throwable x) + { + LOG.warn("Unable to close {}", SocketChannelEndPoint.this, x); + } + } } - public Socket getSocket() + private final ManagedSelector.SelectorUpdate _updateKeyAction = this::updateKeyAction; + + private final Runnable _runFillable = new RunnableCloseable("runFillable") + { + @Override + public InvocationType getInvocationType() + { + return getFillInterest().getCallbackInvocationType(); + } + + @Override + public void run() + { + getFillInterest().fillable(); + } + }; + + private final Runnable _runCompleteWrite = new RunnableCloseable("runCompleteWrite") + { + @Override + public InvocationType getInvocationType() + { + return getWriteFlusher().getCallbackInvocationType(); + } + + @Override + public void run() + { + getWriteFlusher().completeWrite(); + } + + @Override + public String toString() + { + return String.format("%s:%s:%s->%s", SocketChannelEndPoint.this, _operation, getInvocationType(), getWriteFlusher()); + } + }; + + private final Runnable _runCompleteWriteFillable = new RunnableCloseable("runCompleteWriteFillable") + { + @Override + public InvocationType getInvocationType() + { + InvocationType fillT = getFillInterest().getCallbackInvocationType(); + InvocationType flushT = getWriteFlusher().getCallbackInvocationType(); + if (fillT == flushT) + return fillT; + + if (fillT == InvocationType.EITHER && flushT == InvocationType.NON_BLOCKING) + return InvocationType.EITHER; + + if (fillT == InvocationType.NON_BLOCKING && flushT == InvocationType.EITHER) + return InvocationType.EITHER; + + return InvocationType.BLOCKING; + } + + @Override + public void run() + { + getWriteFlusher().completeWrite(); + getFillInterest().fillable(); + } + }; + + public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) { - return _socket; + super(scheduler); + _channel = channel; + _selector = selector; + _key = key; } @Override public InetSocketAddress getLocalAddress() { - return _local; + return (InetSocketAddress)_channel.socket().getLocalSocketAddress(); } @Override public InetSocketAddress getRemoteAddress() { - return _remote; + return (InetSocketAddress)_channel.socket().getRemoteSocketAddress(); + } + + @Override + public boolean isOpen() + { + return _channel.isOpen(); } @Override @@ -72,12 +184,250 @@ protected void doShutdownOutput() { try { - if (!_socket.isOutputShutdown()) - _socket.shutdownOutput(); + Socket socket = _channel.socket(); + if (!socket.isOutputShutdown()) + socket.shutdownOutput(); + } + catch (IOException e) + { + LOG.debug("Could not shutdown output for {}", _channel, e); + } + } + + @Override + public void doClose() + { + if (LOG.isDebugEnabled()) + LOG.debug("doClose {}", this); + try + { + _channel.close(); + } + catch (IOException e) + { + LOG.debug("Unable to close channel", e); + } + finally + { + super.doClose(); + } + } + + @Override + public void onClose(Throwable cause) + { + try + { + super.onClose(cause); + } + finally + { + if (_selector != null) + _selector.destroyEndPoint(this, cause); + } + } + + @Override + public int fill(ByteBuffer buffer) throws IOException + { + if (isInputShutdown()) + return -1; + + int pos = BufferUtil.flipToFill(buffer); + int filled; + try + { + filled = _channel.read(buffer); + if (filled > 0) + notIdle(); + else if (filled == -1) + shutdownInput(); } catch (IOException e) { LOG.debug("Unable to shutdown output", e); + shutdownInput(); + filled = -1; + } + finally + { + BufferUtil.flipToFlush(buffer, pos); + } + if (LOG.isDebugEnabled()) + LOG.debug("filled {} {}", filled, BufferUtil.toDetailString(buffer)); + return filled; + } + + @Override + public boolean flush(ByteBuffer... buffers) throws IOException + { + long flushed; + try + { + flushed = _channel.write(buffers); + if (LOG.isDebugEnabled()) + LOG.debug("flushed {} {}", flushed, this); + } + catch (IOException e) + { + throw new EofException(e); + } + + if (flushed > 0) + notIdle(); + + for (ByteBuffer b : buffers) + { + if (!BufferUtil.isEmpty(b)) + return false; + } + + return true; + } + + public SocketChannel getChannel() + { + return _channel; + } + + @Override + public Object getTransport() + { + return _channel; + } + + @Override + protected void needsFillInterest() + { + changeInterests(SelectionKey.OP_READ); + } + + @Override + protected void onIncompleteFlush() + { + changeInterests(SelectionKey.OP_WRITE); + } + + @Override + public Runnable onSelected() + { + // This method runs from the selector thread, + // possibly concurrently with changeInterests(int). + + int readyOps = _key.readyOps(); + int oldInterestOps; + int newInterestOps; + synchronized (this) + { + _updatePending = true; + // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both). + oldInterestOps = _desiredInterestOps; + newInterestOps = oldInterestOps & ~readyOps; + _desiredInterestOps = newInterestOps; + } + + boolean fillable = (readyOps & SelectionKey.OP_READ) != 0; + boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0; + + if (LOG.isDebugEnabled()) + LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this); + + // return task to complete the job + Runnable task = fillable + ? (flushable + ? _runCompleteWriteFillable + : _runFillable) + : (flushable + ? _runCompleteWrite + : null); + + if (LOG.isDebugEnabled()) + LOG.debug("task {}", task); + return task; + } + + private void updateKeyAction(Selector selector) + { + updateKey(); + } + + @Override + public void updateKey() + { + // This method runs from the selector thread, + // possibly concurrently with changeInterests(int). + + try + { + int oldInterestOps; + int newInterestOps; + synchronized (this) + { + _updatePending = false; + oldInterestOps = _currentInterestOps; + newInterestOps = _desiredInterestOps; + if (oldInterestOps != newInterestOps) + { + _currentInterestOps = newInterestOps; + _key.interestOps(newInterestOps); + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this); + } + catch (CancelledKeyException x) + { + if (LOG.isDebugEnabled()) + LOG.debug("Ignoring key update for cancelled key {}", this, x); + close(); + } + catch (Throwable x) + { + LOG.warn("Ignoring key update for {}", this, x); + close(); } } + + @Override + public void replaceKey(SelectionKey newKey) + { + _key = newKey; + } + + private void changeInterests(int operation) + { + // This method runs from any thread, possibly + // concurrently with updateKey() and onSelected(). + + int oldInterestOps; + int newInterestOps; + boolean pending; + synchronized (this) + { + pending = _updatePending; + oldInterestOps = _desiredInterestOps; + newInterestOps = oldInterestOps | operation; + if (newInterestOps != oldInterestOps) + _desiredInterestOps = newInterestOps; + } + + if (LOG.isDebugEnabled()) + LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this); + + if (!pending && _selector != null) + _selector.submit(_updateKeyAction); + } + + @Override + public String toEndPointString() + { + // We do a best effort to print the right toString() and that's it. + return String.format("%s{io=%d/%d,kio=%d,kro=%d}", + super.toEndPointString(), + _currentInterestOps, + _desiredInterestOps, + ManagedSelector.safeInterestOps(_key), + ManagedSelector.safeReadyOps(_key)); + } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java index 5d3ca7e2772f..9890de59d178 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java @@ -72,11 +72,11 @@ public void testConnectTimeoutBeforeSuccessfulConnect() throws Exception SelectorManager selectorManager = new SelectorManager(executor, scheduler) { @Override - protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) { - SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()); - endp.setIdleTimeout(connectTimeout / 2); - return endp; + SocketChannelEndPoint endPoint = new SocketChannelEndPoint((SocketChannel)channel, selector, key, getScheduler()); + endPoint.setIdleTimeout(connectTimeout / 2); + return endPoint; } @Override @@ -96,7 +96,7 @@ protected boolean doFinishConnect(SelectableChannel channel) throws IOException } @Override - public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException + public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) { ((Callback)attachment).succeeded(); return new AbstractConnection(endpoint, executor) diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointInterestsTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointInterestsTest.java index 3ee35a3f1609..94f79c12d454 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointInterestsTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointInterestsTest.java @@ -69,7 +69,7 @@ public void init(final Interested interested) throws Exception @Override protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) { - SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()) + SocketChannelEndPoint endp = new SocketChannelEndPoint((SocketChannel)channel, selector, key, getScheduler()) { @Override protected void onIncompleteFlush() diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointTest.java index 537174b8625e..595ddb521154 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointTest.java @@ -465,10 +465,10 @@ public void testRejectedExecution(Scenario scenario) throws Exception @Override protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) { - SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler()); - _lastEndPoint = endp; + SocketChannelEndPoint endPoint = new SocketChannelEndPoint((SocketChannel)channel, selector, selectionKey, getScheduler()); + _lastEndPoint = endPoint; _lastEndPointLatch.countDown(); - return endp; + return endPoint; } @Override @@ -580,11 +580,11 @@ protected ScenarioSelectorManager(Executor executor, Scheduler scheduler) protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) { - SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()); - endp.setIdleTimeout(60000); - _lastEndPoint = endp; + SocketChannelEndPoint endPoint = new SocketChannelEndPoint((SocketChannel)channel, selector, key, getScheduler()); + endPoint.setIdleTimeout(60000); + _lastEndPoint = endPoint; _lastEndPointLatch.countDown(); - return endp; + return endPoint; } @Override @@ -743,7 +743,7 @@ public void onFillable() return; } - EndPoint endp = getEndPoint(); + EndPoint endPoint = getEndPoint(); try { _last = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); @@ -756,17 +756,17 @@ public void onFillable() BufferUtil.compact(_in); if (BufferUtil.isFull(_in)) throw new IllegalStateException("FULL " + BufferUtil.toDetailString(_in)); - int filled = endp.fill(_in); + int filled = endPoint.fill(_in); if (filled > 0) progress = true; // If the tests wants to block, then block - while (_blockAt.get() > 0 && endp.isOpen() && _in.remaining() < _blockAt.get()) + while (_blockAt.get() > 0 && endPoint.isOpen() && _in.remaining() < _blockAt.get()) { FutureCallback future = _blockingRead = new FutureCallback(); fillInterested(); future.get(); - filled = endp.fill(_in); + filled = endPoint.fill(_in); progress |= filled > 0; } @@ -782,18 +782,18 @@ public void onFillable() for (int i = 0; i < _writeCount.get(); i++) { FutureCallback blockingWrite = new FutureCallback(); - endp.write(blockingWrite, out.asReadOnlyBuffer()); + endPoint.write(blockingWrite, out.asReadOnlyBuffer()); blockingWrite.get(); } progress = true; } // are we done? - if (endp.isInputShutdown()) - endp.shutdownOutput(); + if (endPoint.isInputShutdown()) + endPoint.shutdownOutput(); } - if (endp.isOpen()) + if (endPoint.isOpen()) fillInterested(); } catch (ExecutionException e) @@ -802,9 +802,9 @@ public void onFillable() try { FutureCallback blockingWrite = new FutureCallback(); - endp.write(blockingWrite, BufferUtil.toBuffer("EE: " + BufferUtil.toString(_in))); + endPoint.write(blockingWrite, BufferUtil.toBuffer("EE: " + BufferUtil.toString(_in))); blockingWrite.get(); - endp.shutdownOutput(); + endPoint.shutdownOutput(); } catch (Exception e2) { diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java index 9cd10814def8..29aba54a82ce 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java @@ -506,7 +506,7 @@ protected ConnectManager(Executor executor, Scheduler scheduler, int selectors) @Override protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) { - SocketChannelEndPoint endPoint = new SocketChannelEndPoint(channel, selector, key, getScheduler()); + SocketChannelEndPoint endPoint = new SocketChannelEndPoint((SocketChannel)channel, selector, key, getScheduler()); endPoint.setIdleTimeout(getIdleTimeout()); return endPoint; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java index fef244df7fd3..27ca81906a77 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java @@ -23,10 +23,10 @@ import java.util.concurrent.Executor; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.NetworkTrafficListener; import org.eclipse.jetty.io.NetworkTrafficSocketChannelEndPoint; +import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.Scheduler; @@ -81,7 +81,7 @@ public NetworkTrafficListener getNetworkTrafficListener() } @Override - protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) { return new NetworkTrafficSocketChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), getNetworkTrafficListener()); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java index 5baaa6988ffd..23eb3d440df7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java @@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; @@ -424,7 +423,7 @@ public int getLocalPort() return _localPort; } - protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException { SocketChannelEndPoint endpoint = new SocketChannelEndPoint(channel, selectSet, key, getScheduler()); endpoint.setIdleTimeout(getIdleTimeout()); @@ -511,9 +510,9 @@ protected void accepted(SelectableChannel channel) throws IOException } @Override - protected ChannelEndPoint newEndPoint(SelectableChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException + protected SocketChannelEndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException { - return ServerConnector.this.newEndPoint((SocketChannel)channel, selectSet, selectionKey); + return ServerConnector.this.newEndPoint((SocketChannel)channel, selector, selectionKey); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/SocketCustomizationListener.java b/jetty-server/src/main/java/org/eclipse/jetty/server/SocketCustomizationListener.java index 39f788440282..b9d6c1486a63 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/SocketCustomizationListener.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/SocketCustomizationListener.java @@ -59,18 +59,18 @@ public SocketCustomizationListener(boolean ssl) @Override public void onOpened(Connection connection) { - EndPoint endp = connection.getEndPoint(); + EndPoint endPoint = connection.getEndPoint(); boolean ssl = false; - if (_ssl && endp instanceof DecryptedEndPoint) + if (_ssl && endPoint instanceof DecryptedEndPoint) { - endp = ((DecryptedEndPoint)endp).getSslConnection().getEndPoint(); + endPoint = ((DecryptedEndPoint)endPoint).getSslConnection().getEndPoint(); ssl = true; } - if (endp instanceof SocketChannelEndPoint) + if (endPoint instanceof SocketChannelEndPoint) { - Socket socket = ((SocketChannelEndPoint)endp).getSocket(); + Socket socket = ((SocketChannelEndPoint)endPoint).getChannel().socket(); customize(socket, connection.getClass(), ssl); } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java index 602917393021..7cde76cdf404 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java @@ -45,7 +45,6 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.tools.HttpTester; -import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; @@ -132,7 +131,7 @@ public Connection newConnection(Connector connector, EndPoint endPoint) }) { @Override - protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) { return new ExtendedEndPoint(channel, selectSet, key, getScheduler()); } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java index 9afea19e7b56..d96a48b9ddae 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java @@ -30,7 +30,6 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; @@ -61,7 +60,7 @@ public Connection newConnection(Connector connector, EndPoint endPoint) }) { @Override - protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) { return new ExtendedEndPoint(channel, selectSet, key, getScheduler()); } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java index 3fe424c20728..2f8bf9b95461 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java @@ -71,7 +71,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques EndPoint endPoint = baseRequest.getHttpChannel().getEndPoint(); assertThat("Endpoint", endPoint, instanceOf(SocketChannelEndPoint.class)); SocketChannelEndPoint channelEndPoint = (SocketChannelEndPoint)endPoint; - Socket socket = channelEndPoint.getSocket(); + Socket socket = channelEndPoint.getChannel().socket(); ServerConnector connector = (ServerConnector)baseRequest.getHttpChannel().getConnector(); PrintWriter out = response.getWriter(); @@ -214,7 +214,7 @@ public void testReuseAddressFalse() throws Exception } @Test - public void testAddFirstConnectionFactory() throws Exception + public void testAddFirstConnectionFactory() { Server server = new Server(); ServerConnector connector = new ServerConnector(server); @@ -236,7 +236,7 @@ public void testAddFirstConnectionFactory() throws Exception public void testExceptionWhileAccepting() throws Exception { Server server = new Server(); - try (StacklessLogging stackless = new StacklessLogging(AbstractConnector.class)) + try (StacklessLogging ignored = new StacklessLogging(AbstractConnector.class)) { AtomicLong spins = new AtomicLong(); ServerConnector connector = new ServerConnector(server, 1, 1) diff --git a/jetty-servlet/src/test/resources/jetty-logging.properties b/jetty-servlet/src/test/resources/jetty-logging.properties index 821cf3d2f35d..bd3b391a3dc3 100644 --- a/jetty-servlet/src/test/resources/jetty-logging.properties +++ b/jetty-servlet/src/test/resources/jetty-logging.properties @@ -1,8 +1,7 @@ # Jetty Logging using jetty-slf4j-impl -org.eclipse.jetty.LEVEL=INFO #org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.server.LEVEL=DEBUG #org.eclipse.jetty.servlet.LEVEL=DEBUG -#org.eclipse.jetty.io.ChannelEndPoint.LEVEL=DEBUG +#org.eclipse.jetty.io.SocketChannelEndPoint.LEVEL=DEBUG #org.eclipse.jetty.server.DebugListener.LEVEL=DEBUG -#org.eclipse.jetty.server.HttpChannelState.LEVEL=DEBUG \ No newline at end of file +#org.eclipse.jetty.server.HttpChannelState.LEVEL=DEBUG diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/ThreadStarvationTest.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/ThreadStarvationTest.java index dd5db390cea0..b97d52c9814b 100644 --- a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/ThreadStarvationTest.java +++ b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/ThreadStarvationTest.java @@ -45,7 +45,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.logging.StacklessLogging; @@ -106,7 +105,7 @@ public void testDefaultServletSuccess() throws Exception ServerConnector connector = new ServerConnector(_server, 0, 1) { @Override - protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) { return new SocketChannelEndPoint(channel, selectSet, key, getScheduler()) { @@ -258,7 +257,7 @@ public void testFailureStarvation() throws Exception ServerConnector connector = new ServerConnector(_server, acceptors, selectors) { @Override - protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) { return new SocketChannelEndPoint(channel, selectSet, key, getScheduler()) { diff --git a/jetty-unixsocket/jetty-unixsocket-client/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java b/jetty-unixsocket/jetty-unixsocket-client/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java index b4b20bd61cc3..dad2b625e378 100644 --- a/jetty-unixsocket/jetty-unixsocket-client/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java +++ b/jetty-unixsocket/jetty-unixsocket-client/src/main/java/org/eclipse/jetty/unixsocket/client/HttpClientTransportOverUnixSockets.java @@ -47,6 +47,7 @@ import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.unixsocket.common.UnixSocketEndPoint; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,7 +129,7 @@ public void connect(SocketAddress address, Map context) } catch (Throwable x) { - safeClose(channel); + IO.close(channel); connectFailed(x, context); } } diff --git a/jetty-unixsocket/jetty-unixsocket-client/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java b/jetty-unixsocket/jetty-unixsocket-client/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java index 8add2ca82b30..a8f10b390018 100644 --- a/jetty-unixsocket/jetty-unixsocket-client/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java +++ b/jetty-unixsocket/jetty-unixsocket-client/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java @@ -40,6 +40,7 @@ import org.eclipse.jetty.util.StringUtil; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledOnOs; import org.slf4j.Logger; @@ -146,6 +147,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques assertThat(contentResponse.getContentAsString(), containsString("Hello World")); } + @Tag("external") @Test public void testNotLocal() throws Exception { diff --git a/jetty-unixsocket/jetty-unixsocket-common/src/main/java/org/eclipse/jetty/unixsocket/common/UnixSocketEndPoint.java b/jetty-unixsocket/jetty-unixsocket-common/src/main/java/org/eclipse/jetty/unixsocket/common/UnixSocketEndPoint.java index 29c3c11d5e95..ed4e42486051 100644 --- a/jetty-unixsocket/jetty-unixsocket-common/src/main/java/org/eclipse/jetty/unixsocket/common/UnixSocketEndPoint.java +++ b/jetty-unixsocket/jetty-unixsocket-common/src/main/java/org/eclipse/jetty/unixsocket/common/UnixSocketEndPoint.java @@ -23,22 +23,25 @@ import java.nio.channels.SelectionKey; import jnr.unixsocket.UnixSocketChannel; -import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class UnixSocketEndPoint extends ChannelEndPoint +public class UnixSocketEndPoint extends SocketChannelEndPoint { private static final Logger LOG = LoggerFactory.getLogger(UnixSocketEndPoint.class); - private final UnixSocketChannel _channel; - public UnixSocketEndPoint(UnixSocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) { super(channel, selector, key, scheduler); - _channel = channel; + } + + @Override + public UnixSocketChannel getChannel() + { + return (UnixSocketChannel)super.getChannel(); } @Override @@ -56,11 +59,9 @@ public InetSocketAddress getRemoteAddress() @Override protected void doShutdownOutput() { - if (LOG.isDebugEnabled()) - LOG.debug("oshut {}", this); try { - _channel.shutdownOutput(); + getChannel().shutdownOutput(); super.doShutdownOutput(); } catch (IOException e) diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java index 0504ded0f85d..38eabbc25e0b 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java @@ -74,6 +74,7 @@ import org.eclipse.jetty.util.FuturePromise; import org.hamcrest.Matchers; import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -397,6 +398,7 @@ public void onError(Throwable t) @ParameterizedTest @ArgumentsSource(TransportProvider.class) @Tag("Unstable") + @Disabled public void testAsyncWriteClosed(Transport transport) throws Exception { init(transport); diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/RecoverFailedSelectorTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RecoverFailedSelectorTest.java new file mode 100644 index 000000000000..9cbbc89c56f5 --- /dev/null +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RecoverFailedSelectorTest.java @@ -0,0 +1,379 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.test; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.tools.HttpTester; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.io.SocketChannelEndPoint; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.util.thread.Scheduler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RecoverFailedSelectorTest +{ + private Server server; + private ServerConnector connector; + + private void start(Function consumer) throws Exception + { + server = new Server(); + connector = consumer.apply(server); + server.addConnector(connector); + server.start(); + } + + @AfterEach + public void dispose() throws Exception + { + server.stop(); + } + + @Test + public void testSelectFailureBetweenReads() throws Exception + { + // There will be 3 calls to select(): one at start(), + // one to accept, and one to set read interest. + CountDownLatch selectLatch = new CountDownLatch(3); + CountDownLatch failureLatch = new CountDownLatch(1); + AtomicBoolean fail = new AtomicBoolean(); + start(server -> new ServerConnector(server, 1, 1) + { + @Override + protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + return new ServerConnectorManager(executor, scheduler, selectors) + { + @Override + protected ManagedSelector newSelector(int id) + { + return new ManagedSelector(this, id) + { + @Override + protected int nioSelect(Selector selector, boolean now) throws IOException + { + selectLatch.countDown(); + if (fail.getAndSet(false)) + throw new IOException("explicit select() failure"); + return super.nioSelect(selector, now); + } + + @Override + protected void handleSelectFailure(Selector selector, Throwable failure) throws IOException + { + super.handleSelectFailure(selector, failure); + failureLatch.countDown(); + } + }; + } + }; + } + }); + + try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()))) + { + assertTrue(selectLatch.await(5, TimeUnit.SECONDS)); + + String request = "GET / HTTP/1.0\r\n\r\n"; + int split = request.length() / 2; + ByteBuffer chunk1 = StandardCharsets.UTF_8.encode(request.substring(0, split)); + ByteBuffer chunk2 = StandardCharsets.UTF_8.encode(request.substring(split)); + + // Wake up the selector and fail it. + fail.set(true); + client.write(chunk1); + + // Wait for the failure handling to be completed. + assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + + // Write the rest of the request, the + // server should be able to continue. + client.write(chunk2); + + HttpTester.Response response = HttpTester.parseResponse(HttpTester.from(client)); + assertNotNull(response); + assertEquals(HttpStatus.NOT_FOUND_404, response.getStatus()); + } + } + + @Test + public void testAcceptDuringSelectFailure() throws Exception + { + // There will be 3 calls to select(): one at start(), + // one to accept, and one to set read interest. + CountDownLatch selectLatch = new CountDownLatch(3); + CountDownLatch failureLatch = new CountDownLatch(1); + AtomicBoolean fail = new AtomicBoolean(); + AtomicReference socketRef = new AtomicReference<>(); + start(server -> new ServerConnector(server, 1, 1) + { + @Override + protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + return new ServerConnectorManager(executor, scheduler, selectors) + { + @Override + protected ManagedSelector newSelector(int id) + { + return new ManagedSelector(this, id) + { + @Override + protected int nioSelect(Selector selector, boolean now) throws IOException + { + selectLatch.countDown(); + if (fail.getAndSet(false)) + throw new IOException("explicit select() failure"); + return super.nioSelect(selector, now); + } + + @Override + protected void handleSelectFailure(Selector selector, Throwable failure) throws IOException + { + // Before handling the failure, connect with another socket. + SocketChannel socket = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())); + socketRef.set(socket); + super.handleSelectFailure(selector, failure); + failureLatch.countDown(); + } + }; + } + }; + } + }); + + try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()))) + { + assertTrue(selectLatch.await(5, TimeUnit.SECONDS)); + + String request = "GET / HTTP/1.0\r\n\r\n"; + ByteBuffer buffer = StandardCharsets.UTF_8.encode(request); + + // Wake up the selector and fail it. + fail.set(true); + client.write(buffer); + + // Wait for the failure handling to be completed. + assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + + HttpTester.Response response = HttpTester.parseResponse(HttpTester.from(client)); + assertNotNull(response); + assertEquals(HttpStatus.NOT_FOUND_404, response.getStatus()); + + // Verify that the newly created socket works well. + SocketChannel socket = socketRef.get(); + buffer.flip(); + socket.write(buffer); + response = HttpTester.parseResponse(HttpTester.from(socket)); + assertNotNull(response); + assertEquals(HttpStatus.NOT_FOUND_404, response.getStatus()); + } + } + + @Test + public void testSelectFailureDuringEndPointCreation() throws Exception + { + // There will be 2 calls to select(): one at start(), one to accept. + CountDownLatch selectLatch = new CountDownLatch(2); + CountDownLatch failureLatch = new CountDownLatch(1); + AtomicBoolean fail = new AtomicBoolean(); + CountDownLatch endPointLatch1 = new CountDownLatch(1); + CountDownLatch endPointLatch2 = new CountDownLatch(1); + start(server -> new ServerConnector(server, 1, 1) + { + @Override + protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + return new ServerConnectorManager(executor, scheduler, selectors) + { + @Override + protected ManagedSelector newSelector(int id) + { + return new ManagedSelector(this, id) + { + @Override + protected int nioSelect(Selector selector, boolean now) throws IOException + { + selectLatch.countDown(); + if (fail.getAndSet(false)) + throw new IOException("explicit select() failure"); + return super.nioSelect(selector, now); + } + + @Override + protected void handleSelectFailure(Selector selector, Throwable failure) throws IOException + { + super.handleSelectFailure(selector, failure); + failureLatch.countDown(); + } + }; + } + + @Override + protected SocketChannelEndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException + { + try + { + SocketChannelEndPoint endPoint = super.newEndPoint(channel, selector, selectionKey); + endPointLatch1.countDown(); + assertTrue(endPointLatch2.await(5, TimeUnit.SECONDS)); + return endPoint; + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + }; + } + }); + + try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()))) + { + assertTrue(selectLatch.await(5, TimeUnit.SECONDS)); + + // Wait until the server EndPoint instance is created. + assertTrue(endPointLatch1.await(5, TimeUnit.SECONDS)); + + // Wake up the selector and fail it. + fail.set(true); + SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())).close(); + + // Wait until the selector is replaced. + assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + + // Continue the EndPoint creation. + endPointLatch2.countDown(); + + String request = "GET / HTTP/1.0\r\n\r\n"; + ByteBuffer buffer = StandardCharsets.UTF_8.encode(request); + client.write(buffer); + + HttpTester.Response response = HttpTester.parseResponse(HttpTester.from(client)); + assertNotNull(response); + assertEquals(HttpStatus.NOT_FOUND_404, response.getStatus()); + } + } + + @Test + public void testSelectFailureDuringEndPointCreatedThenClosed() throws Exception + { + // There will be 2 calls to select(): one at start(), one to accept. + CountDownLatch selectLatch = new CountDownLatch(2); + CountDownLatch failureLatch = new CountDownLatch(1); + AtomicBoolean fail = new AtomicBoolean(); + CountDownLatch connectionLatch1 = new CountDownLatch(1); + CountDownLatch connectionLatch2 = new CountDownLatch(1); + start(server -> new ServerConnector(server, 1, 1) + { + @Override + protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + return new ServerConnectorManager(executor, scheduler, selectors) + { + @Override + protected ManagedSelector newSelector(int id) + { + return new ManagedSelector(this, id) + { + @Override + protected int nioSelect(Selector selector, boolean now) throws IOException + { + selectLatch.countDown(); + if (fail.getAndSet(false)) + throw new IOException("explicit select() failure"); + return super.nioSelect(selector, now); + } + + @Override + protected void handleSelectFailure(Selector selector, Throwable failure) throws IOException + { + super.handleSelectFailure(selector, failure); + failureLatch.countDown(); + } + }; + } + + @Override + public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException + { + try + { + Connection connection = super.newConnection(channel, endPoint, attachment); + endPoint.close(); + connectionLatch1.countDown(); + assertTrue(connectionLatch2.await(5, TimeUnit.SECONDS)); + return connection; + } + catch (InterruptedException e) + { + throw new InterruptedIOException(); + } + } + }; + } + }); + + try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()))) + { + assertTrue(selectLatch.await(5, TimeUnit.SECONDS)); + + // Wait until the server EndPoint is closed. + assertTrue(connectionLatch1.await(5, TimeUnit.SECONDS)); + + // Wake up the selector and fail it. + fail.set(true); + SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())).close(); + + // Wait until the selector is replaced. + assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + + // Continue the server processing. + connectionLatch2.countDown(); + + // The channel has been closed on the server. + int read = client.read(ByteBuffer.allocate(1)); + assertTrue(read < 0); + } + } +} diff --git a/tests/test-integration/src/test/resources/jetty-logging.properties b/tests/test-integration/src/test/resources/jetty-logging.properties index c3b261ea39e8..ba4789f545fa 100644 --- a/tests/test-integration/src/test/resources/jetty-logging.properties +++ b/tests/test-integration/src/test/resources/jetty-logging.properties @@ -1,5 +1,3 @@ # Jetty Logging using jetty-slf4j-impl -## Jetty Logging using jetty-slf4j-impl -org.eclipse.jetty.LEVEL=WARN #org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.websocket.LEVEL=DEBUG