diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java index 18819e5a3..f6f14ae95 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java @@ -29,7 +29,7 @@ import java.util.Map; import java.util.Objects; import java.util.Queue; -import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -65,7 +65,7 @@ public class Nio2Session extends AbstractCloseable implements IoSession { private final SocketAddress remoteAddress; private final SocketAddress acceptanceAddress; private final PropertyResolver propertyResolver; - private final Queue writes = new LinkedTransferQueue<>(); + private final Queue writes = new ConcurrentLinkedQueue<>(); private final AtomicReference currentWrite = new AtomicReference<>(); private final AtomicLong readCyclesCounter = new AtomicLong(); private final AtomicLong lastReadCycleStart = new AtomicLong(); @@ -516,7 +516,8 @@ protected void startWriting() { doWriteCycle(buffer, handler); } } catch (Throwable e) { - future.setWritten(); + future.setException(e); + finishWrite(future); if (e instanceof RuntimeException) { throw (RuntimeException) e; @@ -555,17 +556,22 @@ protected void handleCompletedWriteCycle( Nio2CompletionHandler completionHandler, Integer result, Object attachment) { if (buffer.hasRemaining()) { try { - socket.write(buffer, null, completionHandler); + if (log.isDebugEnabled()) { + log.debug("handleCompletedWriteCycle({}) incomplete write of writeLen={}. Written result={}, resume writing buffer.remaining()={} at cycle={} after {} nanos", + this, writeLen, result, buffer.remaining(), writeCyclesCounter.get(), System.nanoTime() - lastWriteCycleStart.get()); + } + + doWriteCycle(buffer, completionHandler); } catch (Throwable t) { - debug("handleCompletedWriteCycle({}) {} while writing to socket len={}: {}", - this, t.getClass().getSimpleName(), writeLen, t.getMessage(), t); - future.setWritten(); + warn("handleCompletedWriteCycle({}) {} while writing to socket len={}, result={}: {}", + this, t.getClass().getSimpleName(), writeLen, result, t.getMessage(), t); + future.setException(t); finishWrite(future); } } else { if (log.isTraceEnabled()) { log.trace("handleCompletedWriteCycle({}) finished writing len={} at cycle={} after {} nanos", - this, writeLen, writeCyclesCounter, System.nanoTime() - lastWriteCycleStart.get()); + this, writeLen, writeCyclesCounter.get(), System.nanoTime() - lastWriteCycleStart.get()); } // This should be called before future.setWritten() to avoid WriteAbortedException