From 54e1aee84c3c024cc76a146833e2bfd51552ea9e Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Thu, 23 Dec 2021 11:24:16 -0800 Subject: [PATCH 1/2] Add ApplicationService with subscribeToFields to java-client-session --- java-client/session-examples/build.gradle | 2 + .../client/examples/SubscribeToFields.java | 69 +++++++++++++++++++ .../client/impl/ApplicationService.java | 20 ++++++ .../io/deephaven/client/impl/Session.java | 2 +- .../io/deephaven/client/impl/SessionImpl.java | 49 +++++++++++++ 5 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 java-client/session-examples/src/main/java/io/deephaven/client/examples/SubscribeToFields.java create mode 100644 java-client/session/src/main/java/io/deephaven/client/impl/ApplicationService.java diff --git a/java-client/session-examples/build.gradle b/java-client/session-examples/build.gradle index cc1602ab359..995c1186d77 100644 --- a/java-client/session-examples/build.gradle +++ b/java-client/session-examples/build.gradle @@ -10,6 +10,7 @@ dependencies { Classpaths.inheritGrpcPlatform(project, 'implementation') implementation 'io.grpc:grpc-core' + implementation 'io.grpc:grpc-netty' Classpaths.inheritJUnitPlatform(project) Classpaths.inheritAssertJ(project) @@ -45,6 +46,7 @@ applicationDistribution.into('bin') { from(createApplication('table-manager', 'io.deephaven.client.examples.TableManagerExample')) from(createApplication('execute-script', 'io.deephaven.client.examples.ExecuteScript')) from(createApplication('execute-code', 'io.deephaven.client.examples.ExecuteCode')) + from(createApplication('subscribe-fields', 'io.deephaven.client.examples.SubscribeToFields')) fileMode = 0755 } diff --git a/java-client/session-examples/src/main/java/io/deephaven/client/examples/SubscribeToFields.java b/java-client/session-examples/src/main/java/io/deephaven/client/examples/SubscribeToFields.java new file mode 100644 index 00000000000..30eeed42327 --- /dev/null +++ b/java-client/session-examples/src/main/java/io/deephaven/client/examples/SubscribeToFields.java @@ -0,0 +1,69 @@ +package io.deephaven.client.examples; + +import io.deephaven.client.impl.ApplicationService.Cancel; +import io.deephaven.client.impl.ApplicationService.Listener; +import io.deephaven.client.impl.Session; +import io.deephaven.proto.backplane.grpc.FieldInfo; +import io.deephaven.proto.backplane.grpc.FieldsChangeUpdate; +import io.grpc.Status.Code; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import picocli.CommandLine; +import picocli.CommandLine.Command; + +import java.util.concurrent.CountDownLatch; + +@Command(name = "subscribe-fields", mixinStandardHelpOptions = true, + description = "Subscribe to fields", version = "0.1.0") +public final class SubscribeToFields extends SingleSessionExampleBase { + @Override + protected void execute(Session session) throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final Cancel cancel = session.subscribeToFields(new Listener() { + @Override + public void onNext(FieldsChangeUpdate fields) { + System.out.println("Created: " + fields.getCreatedCount()); + System.out.println("Updated: " + fields.getUpdatedCount()); + System.out.println("Removed: " + fields.getRemovedCount()); + for (FieldInfo fieldInfo : fields.getCreatedList()) { + System.out.println("Created: " + fieldInfo); + } + for (FieldInfo fieldInfo : fields.getUpdatedList()) { + System.out.println("Updated: " + fieldInfo); + } + for (FieldInfo fieldInfo : fields.getRemovedList()) { + System.out.println("Removed: " + fieldInfo); + } + } + + @Override + public void onError(Throwable t) { + if (!isCancelled(t)) { + t.printStackTrace(System.err); + } + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + }); + Runtime.getRuntime().addShutdownHook(new Thread(cancel::cancel)); + latch.await(); + } + + private static boolean isCancelled(Throwable t) { + if (t instanceof StatusRuntimeException) { + return ((StatusRuntimeException) t).getStatus().getCode() == Code.CANCELLED; + } else if (t instanceof StatusException) { + return ((StatusException) t).getStatus().getCode() == Code.CANCELLED; + } + return false; + } + + public static void main(String[] args) { + int execute = new CommandLine(new SubscribeToFields()).execute(args); + System.exit(execute); + } +} diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/ApplicationService.java b/java-client/session/src/main/java/io/deephaven/client/impl/ApplicationService.java new file mode 100644 index 00000000000..e4612261aa9 --- /dev/null +++ b/java-client/session/src/main/java/io/deephaven/client/impl/ApplicationService.java @@ -0,0 +1,20 @@ +package io.deephaven.client.impl; + +import io.deephaven.proto.backplane.grpc.FieldsChangeUpdate; + +public interface ApplicationService { + + interface Listener { + void onNext(FieldsChangeUpdate fields); + + void onError(Throwable t); + + void onCompleted(); + } + + interface Cancel { + void cancel(); + } + + Cancel subscribeToFields(Listener listener); +} diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/Session.java b/java-client/session/src/main/java/io/deephaven/client/impl/Session.java index c4a1ad0f58f..4022b3b4da8 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/Session.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/Session.java @@ -5,7 +5,7 @@ /** * A session represents a client-side connection to a Deephaven server. */ -public interface Session extends AutoCloseable, ConsoleService, InputTableService, TableService { +public interface Session extends AutoCloseable, ApplicationService, ConsoleService, InputTableService, TableService { // ---------------------------------------------------------- diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java b/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java index 772b606e3c4..b6439125ec9 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java @@ -5,12 +5,15 @@ import io.deephaven.client.impl.script.VariableDefinition; import io.deephaven.proto.backplane.grpc.AddTableRequest; import io.deephaven.proto.backplane.grpc.AddTableResponse; +import io.deephaven.proto.backplane.grpc.ApplicationServiceGrpc.ApplicationServiceStub; import io.deephaven.proto.backplane.grpc.CloseSessionResponse; import io.deephaven.proto.backplane.grpc.DeleteTableRequest; import io.deephaven.proto.backplane.grpc.DeleteTableResponse; +import io.deephaven.proto.backplane.grpc.FieldsChangeUpdate; import io.deephaven.proto.backplane.grpc.HandshakeRequest; import io.deephaven.proto.backplane.grpc.HandshakeResponse; import io.deephaven.proto.backplane.grpc.InputTableServiceGrpc.InputTableServiceStub; +import io.deephaven.proto.backplane.grpc.ListFieldsRequest; import io.deephaven.proto.backplane.grpc.ReleaseRequest; import io.deephaven.proto.backplane.grpc.ReleaseResponse; import io.deephaven.proto.backplane.grpc.SessionServiceGrpc.SessionServiceStub; @@ -178,6 +181,7 @@ public void onCompleted() { private final SessionServiceStub sessionService; private final ConsoleServiceStub consoleService; private final InputTableServiceStub inputTableService; + private final ApplicationServiceStub applicationServiceStub; private final Handler handler; private final ExportTicketCreator exportTicketCreator; private final ExportStates states; @@ -200,6 +204,7 @@ private SessionImpl(SessionImplConfig config, Handler handler, AuthenticationInf this.sessionService = config.channel().session().withCallCredentials(credentials); this.consoleService = config.channel().console().withCallCredentials(credentials); this.inputTableService = config.channel().inputTable().withCallCredentials(credentials); + this.applicationServiceStub = config.channel().application().withCallCredentials(credentials); this.exportTicketCreator = new ExportTicketCreator(); this.states = new ExportStates(this, sessionService, config.channel().table().withCallCredentials(credentials), exportTicketCreator); @@ -322,6 +327,14 @@ public CompletableFuture deleteFromInputTable(HasTicketId destination, Has return observer.future; } + @Override + public Cancel subscribeToFields(Listener listener) { + final ListFieldsRequest request = ListFieldsRequest.newBuilder().build(); + final ListFieldsObserver observer = new ListFieldsObserver(listener); + applicationServiceStub.listFields(request, observer); + return observer; + } + public long batchCount() { return states.batchCount(); } @@ -707,4 +720,40 @@ public void onCompleted() { } } } + + private static class ListFieldsObserver + implements Cancel, ClientResponseObserver { + + private final Listener listener; + private ClientCallStreamObserver stream; + + public ListFieldsObserver(Listener listener) { + this.listener = Objects.requireNonNull(listener); + } + + @Override + public void cancel() { + stream.cancel("User cancelled", null); + } + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + stream = requestStream; + } + + @Override + public void onNext(FieldsChangeUpdate value) { + listener.onNext(value); + } + + @Override + public void onError(Throwable t) { + listener.onError(t); + } + + @Override + public void onCompleted() { + listener.onCompleted(); + } + } } From e3e7c7418c251cc110c1342a8392dac0475035d3 Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Thu, 23 Dec 2021 12:12:03 -0800 Subject: [PATCH 2/2] to runtime only --- java-client/session-examples/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java-client/session-examples/build.gradle b/java-client/session-examples/build.gradle index 995c1186d77..bcdb9863487 100644 --- a/java-client/session-examples/build.gradle +++ b/java-client/session-examples/build.gradle @@ -10,7 +10,7 @@ dependencies { Classpaths.inheritGrpcPlatform(project, 'implementation') implementation 'io.grpc:grpc-core' - implementation 'io.grpc:grpc-netty' + runtimeOnly 'io.grpc:grpc-netty' Classpaths.inheritJUnitPlatform(project) Classpaths.inheritAssertJ(project)