Skip to content

Commit

Permalink
Add Client methods for checking and waiting for service availability (#…
Browse files Browse the repository at this point in the history
…85)

* Re-enable tests related to services

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Add Client methods for checking and waiting for service availability

These methods are very useful for allowing a client to wait for a service to be available before making requests.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Refactor ClientTest to avoid repeatedly sending requests

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
  • Loading branch information
jacobperron committed Nov 10, 2020
1 parent a387525 commit 0190b11
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 12 deletions.
10 changes: 5 additions & 5 deletions rcljava/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,14 @@ if(BUILD_TESTING)
"src/test/java/org/ros2/rcljava/RCLJavaTest.java"
"src/test/java/org/ros2/rcljava/SpinTest.java"
"src/test/java/org/ros2/rcljava/TimeTest.java"
# "src/test/java/org/ros2/rcljava/client/ClientTest.java"
"src/test/java/org/ros2/rcljava/client/ClientTest.java"
"src/test/java/org/ros2/rcljava/contexts/ContextTest.java"
"src/test/java/org/ros2/rcljava/node/NodeOptionsTest.java"
"src/test/java/org/ros2/rcljava/node/NodeParametersTest.java"
"src/test/java/org/ros2/rcljava/node/NodeUndeclaredParametersTest.java"
"src/test/java/org/ros2/rcljava/node/NodeTest.java"
# "src/test/java/org/ros2/rcljava/parameters/AsyncParametersClientTest.java"
# "src/test/java/org/ros2/rcljava/parameters/SyncParametersClientTest.java"
"src/test/java/org/ros2/rcljava/parameters/AsyncParametersClientTest.java"
"src/test/java/org/ros2/rcljava/parameters/SyncParametersClientTest.java"
"src/test/java/org/ros2/rcljava/publisher/PublisherTest.java"
"src/test/java/org/ros2/rcljava/qos/QoSProfileTest.java"
"src/test/java/org/ros2/rcljava/subscription/SubscriptionTest.java"
Expand All @@ -283,13 +283,13 @@ if(BUILD_TESTING)
"org.ros2.rcljava.RCLJavaTest"
"org.ros2.rcljava.SpinTest"
"org.ros2.rcljava.TimeTest"
# "org.ros2.rcljava.client.ClientTest"
"org.ros2.rcljava.client.ClientTest"
"org.ros2.rcljava.contexts.ContextTest"
"org.ros2.rcljava.node.NodeOptionsTest"
"org.ros2.rcljava.node.NodeParametersTest"
"org.ros2.rcljava.node.NodeUndeclaredParametersTest"
"org.ros2.rcljava.node.NodeTest"
# "org.ros2.rcljava.parameters.SyncParametersClientTest"
"org.ros2.rcljava.parameters.SyncParametersClientTest"
"org.ros2.rcljava.publisher.PublisherTest"
"org.ros2.rcljava.qos.QoSProfileTest"
"org.ros2.rcljava.subscription.SubscriptionTest"
Expand Down
10 changes: 10 additions & 0 deletions rcljava/include/org_ros2_rcljava_client_ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ JNICALL Java_org_ros2_rcljava_client_ClientImpl_nativeSendClientRequest(
JNIEXPORT void
JNICALL Java_org_ros2_rcljava_client_ClientImpl_nativeDispose(JNIEnv *, jclass, jlong, jlong);

/*
* Class: org_ros2_rcljava_client_ClientImpl
* Method: nativeIsServiceAvailable
* Signature: (JJ)Z
*/
JNIEXPORT jboolean
JNICALL Java_org_ros2_rcljava_client_ClientImpl_nativeIsServiceAvailable(
JNIEnv *, jclass, jlong, jlong);


#ifdef __cplusplus
}
#endif
Expand Down
27 changes: 27 additions & 0 deletions rcljava/src/main/cpp/org_ros2_rcljava_client_ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <string>

#include "rcl/error_handling.h"
#include "rcl/graph.h"
#include "rcl/node.h"
#include "rcl/rcl.h"
#include "rmw/rmw.h"
Expand Down Expand Up @@ -96,3 +97,29 @@ Java_org_ros2_rcljava_client_ClientImpl_nativeDispose(
rcljava_throw_rclexception(env, ret, msg);
}
}

JNIEXPORT jboolean JNICALL
Java_org_ros2_rcljava_client_ClientImpl_nativeIsServiceAvailable(
JNIEnv * env, jclass, jlong node_handle, jlong client_handle)
{
rcl_node_t * node = reinterpret_cast<rcl_node_t *>(node_handle);
assert(node != NULL);
rcl_client_t * client = reinterpret_cast<rcl_client_t *>(client_handle);
assert(client != NULL);

bool is_ready;
rcl_ret_t ret = rcl_service_server_is_available(node, client, &is_ready);
if (RCL_RET_NODE_INVALID == ret) {
if (node && !rcl_context_is_valid(node->context)) {
// context is shutdown, do a soft failure
return false;
}
}
if (ret != RCL_RET_OK) {
std::string msg =
"Failed to check if service is available: " + std::string(rcl_get_error_string().str);
rcl_reset_error();
rcljava_throw_rclexception(env, ret, msg);
}
return is_ready;
}
30 changes: 30 additions & 0 deletions rcljava/src/main/java/org/ros2/rcljava/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.ros2.rcljava.client;

import java.time.Duration;
import java.util.concurrent.Future;

import org.ros2.rcljava.concurrent.RCLFuture;
Expand All @@ -37,5 +38,34 @@ <U extends MessageDefinition, V extends MessageDefinition> Future<V> asyncSendRe
<U extends MessageDefinition, V extends MessageDefinition> Future<V> asyncSendRequest(
final U request, final Consumer<Future<V>> callback);

/**
* Check if the service server is available.
*
* @return true if the client can talk to the service, false otherwise.
*/
boolean isServiceAvailable();

/**
* Wait for the service server to be available.
*
* Blocks until the service is available or the ROS context is invalidated.
*
* @return true if the service is available, false if the ROS context was shutdown.
*/
boolean waitForService();

/**
* Wait for the service server to be available.
*
* Blocks until the service is available or a timeout occurs.
* Also returns if the ROS context is invalidated.
*
* @param timeout Time to wait for the service to be available.
* A zero value causes this method to check if the service is available and return immediately.
* A negative value is treated as an infinite timeout.
* @return true if the service is available, false otherwise.
*/
boolean waitForService(Duration timeout);

String getServiceName();
}
56 changes: 56 additions & 0 deletions rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@

package org.ros2.rcljava.client;

import java.time.Duration;
import java.lang.ref.WeakReference;
import java.lang.InterruptedException;
import java.lang.Long;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.ros2.rcljava.RCLJava;
import org.ros2.rcljava.common.JNIUtils;
Expand Down Expand Up @@ -142,6 +146,58 @@ public final long getHandle() {
return this.handle;
}

private static native boolean nativeIsServiceAvailable(long nodeHandle, long handle);

/**
* {@inheritDoc}
*/
public boolean isServiceAvailable() {
Node node = this.nodeReference.get();
if (node == null) {
return false;
}
return nativeIsServiceAvailable(node.getHandle(), this.handle);
}

/**
* {@inheritDoc}
*/
public final boolean waitForService() {
return waitForService(Duration.ofNanos(-1));
}

/**
* {@inheritDoc}
*/
public final boolean waitForService(Duration timeout) {
long timeoutNano = timeout.toNanos();
if (0L == timeoutNano) {
return isServiceAvailable();
}
long startTime = System.nanoTime();
long timeToWait = (timeoutNano >= 0L) ? timeoutNano : Long.MAX_VALUE;
while (RCLJava.ok() && timeToWait > 0L) {
// TODO(jacobperron): Wake up whenever graph changes instead of sleeping for a fixed duration
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return false;
}

if (isServiceAvailable()) {
return true;
}

// If timeout is negative, timeToWait will always be greater than zero
if (timeoutNano > 0L) {
timeToWait = timeoutNano - (System.nanoTime() - startTime);
}
}

return false;
}

public String getServiceName() {
return this.serviceName;
}
Expand Down
24 changes: 17 additions & 7 deletions rcljava/src/test/java/org/ros2/rcljava/client/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import org.junit.Test;

import java.lang.ref.WeakReference;
import java.time.Duration;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.ros2.rcljava.RCLJava;
import org.ros2.rcljava.concurrent.RCLFuture;
Expand Down Expand Up @@ -81,10 +84,10 @@ public static void tearDownOnce() {

@Test
public final void testAdd() throws Exception {
RCLFuture<rcljava.srv.AddTwoInts_Response> future =
RCLFuture<rcljava.srv.AddTwoInts_Response> consumerFuture =
new RCLFuture<rcljava.srv.AddTwoInts_Response>(new WeakReference<Node>(node));

TestClientConsumer clientConsumer = new TestClientConsumer(future);
TestClientConsumer clientConsumer = new TestClientConsumer(consumerFuture);

Service<rcljava.srv.AddTwoInts> service = node.<rcljava.srv.AddTwoInts>createService(
rcljava.srv.AddTwoInts.class, "add_two_ints", clientConsumer);
Expand All @@ -96,12 +99,19 @@ public final void testAdd() throws Exception {
Client<rcljava.srv.AddTwoInts> client =
node.<rcljava.srv.AddTwoInts>createClient(rcljava.srv.AddTwoInts.class, "add_two_ints");

while (RCLJava.ok() && !future.isDone()) {
client.asyncSendRequest(request);
RCLJava.spinOnce(node);
}
assertTrue(client.waitForService(Duration.ofSeconds(10)));

Future<rcljava.srv.AddTwoInts_Response> responseFuture = client.asyncSendRequest(request);

rcljava.srv.AddTwoInts_Response response = responseFuture.get(10, TimeUnit.SECONDS);

// Check that the message was received by the service
assertTrue(consumerFuture.isDone());

// Check the contents of the response
assertEquals(5, response.getSum());

assertEquals(5, future.get().getSum());
// Cleanup
client.dispose();
assertEquals(0, client.getHandle());
service.dispose();
Expand Down

0 comments on commit 0190b11

Please sign in to comment.