Skip to content

Commit

Permalink
GH-461: Fix heartbeats with wantReply=true
Browse files Browse the repository at this point in the history
Switch from a timeout model to the OpenSSH model: fail if there
are more than a certain number of heartbeats for which no reply
was received yet.

Bug: #461
  • Loading branch information
tomaswolf committed May 25, 2024
1 parent 5a78e6d commit 14a9aca
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 32 deletions.
30 changes: 29 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

* [GH-427](https://github.com/apache/mina-sshd/issues/427) SCP client: fix `DefaultScpClient.upload(InputStream, ...)`
* [GH-455](https://github.com/apache/mina-sshd/issues/455) Fix `BaseCipher`: make sure all bytes are processed
* [GH-461](https://github.com/apache/mina-sshd/issues/461) Fix heartbeats with `wantReply=true`
* [GH-470](https://github.com/apache/mina-sshd/issues/470) MontgomeryCurve: synchronize access to KeyPairGenerator
* [GH-489](https://github.com/apache/mina-sshd/issues/489) SFTP v3 client: better file type determination
* [GH-493](https://github.com/apache/mina-sshd/issues/493) Fix arcfour128 and arcfour256 ciphers
Expand All @@ -58,7 +59,34 @@ NTRU Prime sntrup761 and X25519 with SHA-512: sntrup761x25519-sha512](https://ww

## Behavioral changes and enhancements

* [GH-468](https://github.com/apache/mina-sshd/issues/468) SFTP: validate length of data received: must not be more than requested
### [GH-461](https://github.com/apache/mina-sshd/issues/461) Fix heartbeats with `wantReply=true`

The client-side heartbeat mechanism has been changed. Such heartbeats are configured with an interval
(`CoreModuleProperties.HEARTBEAT_INTERVAL`) > 0. Previously they could also be configured with a timeout
(`CoreModuleProperties.HEARTBEAT_REPLY_WAIT`). If the timeout was <= 0, the client would just send a
heartbeat request, but not expect any answer. However, with a timeout > 0, it would send the request
with a flag telling the server to send back a reply, and then wait for the given duration for the
reply to arrive, and terminate the connection if no reply was forthcoming.

That could cause trouble if the timeout was fairly long, and the server was slow to respond. If the
timeout was longer than the interval it could also delay subsequent heartbeats.

Newly, `CoreModuleProperties.HEARTBEAT_REPLY_WAIT` is _deprecated_. There is a new configuration property
`CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX` instead. It defines a limit for how many heartbeats without
reply a session survives. If the value is <= 0, the client still sends heartbeats without expecting
any answer. If > 0, it will request a reply from the server for each heartbeat message, and it will
terminate the connection once there are `CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX` previous heartbeats
for which no reply was received yet.

This new way to configure heartbeats corresponds to the configuration in OpenSSH via the
`ServerAliveInterval` and `ServerAliveCountMax` options.

For compatibility with old configurations that do define `CoreModuleProperties.HEARTBEAT_REPLY_WAIT` > 0,
the new code maps this to the new configuration (but only if `CoreModuleProperties.HEARTBEAT_INTERVAL` > 0
and the new property `CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX` has _not_ been set) by setting
`CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX = (CoreModuleProperties.HEARTBEAT_REPLY_WAIT / CoreModuleProperties.HEARTBEAT_INTERVAL) + 1`.

### [GH-468](https://github.com/apache/mina-sshd/issues/468) SFTP: validate length of data received: must not be more than requested

SFTP read operations now check the amount of data they get back. If it's more than
requested an exception is thrown. SFTP servers must never return more data than the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sshd.common.future;

/**
* Something that may have a failure exception.
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public interface HasException {

/**
* Returns the cause of the failure.
*
* @return the {@link Throwable} of the failure, or {@code null} if not failed (yet).
*/
Throwable getException();

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,7 @@
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public interface WithException {

/**
* Returns the cause of the failure.
*
* @return the {@link Throwable} of the failure, or {@code null} if not failed (yet).
*/
Throwable getException();
public interface WithException extends HasException {

/**
* Sets the exception that caused the operation to fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,14 @@
*/
package org.apache.sshd.common.io;

import org.apache.sshd.common.future.HasException;
import org.apache.sshd.common.future.SshFuture;
import org.apache.sshd.common.future.VerifiableFuture;

public interface IoWriteFuture extends SshFuture<IoWriteFuture>, VerifiableFuture<IoWriteFuture> {
public interface IoWriteFuture extends HasException, SshFuture<IoWriteFuture>, VerifiableFuture<IoWriteFuture> {
/**
* @return <tt>true</tt> if the write operation is finished successfully.
*/
boolean isWritten();

/**
* @return the cause of the write failure if and only if the write operation has failed due to an {@link Exception}.
* Otherwise, {@code null} is returned (use {@link #isDone()} to distinguish between the two.
*/
Throwable getException();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.sshd.agent.common.AgentForwardSupport;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.GlobalRequestFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.kex.KexState;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.helpers.AbstractConnectionService;
import org.apache.sshd.common.util.GenericUtils;
Expand All @@ -46,7 +49,10 @@ public class ClientConnectionService
implements ClientSessionHolder {
protected final String heartbeatRequest;
protected final Duration heartbeatInterval;
protected final Duration heartbeatReplyMaxWait;
protected final int heartbeatMaxNoReply;

protected final AtomicInteger outstandingHeartbeats = new AtomicInteger();

/** Non-null only if using the &quot;keep-alive&quot; request mechanism */
protected ScheduledFuture<?> clientHeartbeat;

Expand All @@ -55,7 +61,40 @@ public ClientConnectionService(AbstractClientSession s) throws SshException {

heartbeatRequest = CoreModuleProperties.HEARTBEAT_REQUEST.getRequired(this);
heartbeatInterval = CoreModuleProperties.HEARTBEAT_INTERVAL.getRequired(this);
heartbeatReplyMaxWait = CoreModuleProperties.HEARTBEAT_REPLY_WAIT.getRequired(this);
heartbeatMaxNoReply = configureMaxNoReply();
}

protected int configureMaxNoReply() {
@SuppressWarnings("deprecation")
Duration timeout = CoreModuleProperties.HEARTBEAT_REPLY_WAIT.getOrNull(this);
if (timeout == null || GenericUtils.isNegativeOrNull(heartbeatInterval) || GenericUtils.isEmpty(heartbeatRequest)) {
return CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.getRequired(this).intValue();
}
// The deprecated timeout is configured explicitly. If the new no reply max is _not_ explicitly configured,
// set it from the timeout.
Integer noReplyValue = CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.getOrNull(this);
if (noReplyValue != null) {
return noReplyValue.intValue();
}
if (timeout.compareTo(heartbeatInterval) >= 0) {
// Timeout is longer than the interval. With the old system, that would have killed the session when the
// timeout was reached. A slow server that managed to return the reply just before the timeout expired would
// have delayed subsequent heartbeats. The new system will keep sending heartbeats with the given interval.
// Thus we can have timeout / interval heartbeats without reply if we want to approximate the old system.
double timeoutSec = timeout.getSeconds() + (timeout.getNano() / 1_000_000_000.0);
double intervalSec = heartbeatInterval.getSeconds() + (heartbeatInterval.getNano() / 1_000_000_000.0);
double multiple = timeoutSec / intervalSec;
if (multiple >= Integer.MAX_VALUE - 1) {
return Integer.MAX_VALUE;
} else {
return (int) multiple + 1;
}
}
// Timeout is smaller than the interval. We want to have every heartbeat replied to.
return 1;
// This is an approximation. If no reply is forthcoming, the session will newly be killed after the interval. In
// the old system it would have been killed after the timeout. We _could_ code something to schedule a task that
// kills the session after the timeout and cancel that if we get a reply, but it seems a bit pointless.
}

@Override
Expand Down Expand Up @@ -117,26 +156,35 @@ protected boolean sendHeartBeat() {
}

Session session = getSession();
if (session.getKexState() != KexState.DONE) {
// During KEX, global requests are delayed until after the key exchange is over. Don't count during KEX,
// otherwise a slow KEX might cause us to kill the session prematurely.
return false;
}
try {
boolean withReply = !GenericUtils.isNegativeOrNull(heartbeatReplyMaxWait);
heartbeatCount.incrementAndGet();
boolean withReply = heartbeatMaxNoReply > 0;
int outstanding = outstandingHeartbeats.incrementAndGet();
if (withReply && heartbeatMaxNoReply < outstanding) {
throw new SshException("Got " + (outstanding - 1) + " heartbeat requests without reply");
}
Buffer buf = session.createBuffer(
SshConstants.SSH_MSG_GLOBAL_REQUEST, heartbeatRequest.length() + Byte.SIZE);
buf.putString(heartbeatRequest);
buf.putBoolean(withReply);

// Even if we want a reply, we don't wait.
if (withReply) {
Buffer reply = session.request(heartbeatRequest, buf, heartbeatReplyMaxWait);
if (reply != null) {
if (log.isTraceEnabled()) {
log.trace("sendHeartBeat({}) received reply size={} for request={}",
session, reply.available(), heartbeatRequest);
}
}
GlobalRequestFuture future = session.request(buf, heartbeatRequest, (cmd, buffer) -> {
// We got something back. Don't care about success or failure. (In particular we may get here in
// case the server responds SSH_MSG_UNIMPLEMENTED.)
outstandingHeartbeats.set(0);
});
future.addListener(this::futureDone);
} else {
IoWriteFuture future = session.writePacket(buf);
future.addListener(this::futureDone);
}
heartbeatCount.incrementAndGet();
return true;
} catch (IOException | RuntimeException | Error e) {
session.exceptionCaught(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* @see org.apache.sshd.common.session.Session#request(Buffer, String, ReplyHandler)
*/
public class GlobalRequestFuture extends DefaultSshFuture<GlobalRequestFuture>
implements SshFutureListener<IoWriteFuture> {
implements HasException, SshFutureListener<IoWriteFuture> {

/**
* A {@code ReplyHandler} is invoked asynchronously when the reply for a request with {@code want-reply = true} is
Expand Down Expand Up @@ -137,6 +137,7 @@ public Buffer getBuffer() {
*
* @return a failure reason, or {@code null} if there isn't one or if the request did not fail
*/
@Override
public Throwable getException() {
Object value = getValue();
if (value instanceof Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.sshd.common.forward.ForwarderFactory;
import org.apache.sshd.common.forward.PortForwardingEventListener;
import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
import org.apache.sshd.common.future.HasException;
import org.apache.sshd.common.io.AbstractIoWriteFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.kex.KexState;
Expand Down Expand Up @@ -263,7 +264,7 @@ protected boolean sendHeartBeat() {
}
}

protected void futureDone(IoWriteFuture future) {
protected void futureDone(HasException future) {
Throwable t = future.getException();
if (t != null) {
Session session = getSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,21 @@ public final class CoreModuleProperties {
/**
* Key used to indicate that the heartbeat request is also expecting a reply - time in <U>milliseconds</U> to wait
* for the reply. If non-positive then no reply is expected (nor requested).
*
* @deprecated since 2.13.0, use {@link #HEARTBEAT_NO_REPLY_MAX} instead
*/
@Deprecated
public static final Property<Duration> HEARTBEAT_REPLY_WAIT
= Property.durationSec("heartbeat-reply-wait", Duration.ofMinutes(5));

/**
* Key to set the maximum number of heartbeat messages to send without having received a reply. If &gt; 0, heartbeat
* messages are sent with a flag that requires the peer to reply. The session will be killed if
* {@code HEARTBEAT_NO_REPLY_MAX} heartbeats have been sent without having received a reply. If &lt;= 0, heartbeat
* messages will be sent, but no reply is requested or expected, and the client will not kill the session.
*/
public static final Property<Integer> HEARTBEAT_NO_REPLY_MAX = Property.integer("heartbeat-no-reply-max", 0);

/**
* Whether to ignore invalid identities files when pre-initializing the client session
*
Expand Down
9 changes: 5 additions & 4 deletions sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void tearDown() {
// Restore default value
CoreModuleProperties.IDLE_TIMEOUT.remove(sshd);
CoreModuleProperties.HEARTBEAT_INTERVAL.remove(client);
CoreModuleProperties.HEARTBEAT_REPLY_WAIT.remove(client);
CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.remove(client);
}

@Test
Expand Down Expand Up @@ -198,7 +198,7 @@ public Result process(
}
}));
CoreModuleProperties.HEARTBEAT_INTERVAL.set(client, HEARTBEAT);
CoreModuleProperties.HEARTBEAT_REPLY_WAIT.set(client, Duration.ofSeconds(5L));
CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.set(client, 1);
try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port)
.verify(7L, TimeUnit.SECONDS)
.getSession()) {
Expand Down Expand Up @@ -229,8 +229,9 @@ public Result process(ConnectionService connectionService, String request, boole
return Result.Replied;
}
}));
CoreModuleProperties.HEARTBEAT_INTERVAL.set(client, HEARTBEAT);
CoreModuleProperties.HEARTBEAT_REPLY_WAIT.set(client, Duration.ofSeconds(1));
CoreModuleProperties.HEARTBEAT_INTERVAL.set(client, Duration.ofSeconds(1));
// CoreModuleProperties.HEARTBEAT_REPLY_WAIT.set(client, Duration.ofSeconds(1));
CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.set(client, 1);
try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(CONNECT_TIMEOUT)
.getSession()) {
session.addPasswordIdentity(getCurrentTestName());
Expand Down

0 comments on commit 14a9aca

Please sign in to comment.