Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ApplicationService with subscribeToFields to java-client-session #1738

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions java-client/session-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies {

Classpaths.inheritGrpcPlatform(project, 'implementation')
implementation 'io.grpc:grpc-core'
implementation 'io.grpc:grpc-netty'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd that this wasn't required before now, in that it might be a hole in the java-client-session api to not convey this, or this might be a more in-depth example than normally goes in this project?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I need to change this to runtimeOnly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During your BOM refactoring, the session-examples broke - I haven't dug too deep, but this is the quick fix to get it back working.


Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)
Expand Down Expand Up @@ -45,6 +46,7 @@ applicationDistribution.into('bin') {
from(createApplication('table-manager', 'io.deephaven.client.examples.TableManagerExample'))
from(createApplication('execute-script', 'io.deephaven.client.examples.ExecuteScript'))
from(createApplication('execute-code', 'io.deephaven.client.examples.ExecuteCode'))
from(createApplication('subscribe-fields', 'io.deephaven.client.examples.SubscribeToFields'))
fileMode = 0755
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.deephaven.client.examples;

import io.deephaven.client.impl.ApplicationService.Cancel;
import io.deephaven.client.impl.ApplicationService.Listener;
import io.deephaven.client.impl.Session;
import io.deephaven.proto.backplane.grpc.FieldInfo;
import io.deephaven.proto.backplane.grpc.FieldsChangeUpdate;
import io.grpc.Status.Code;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import picocli.CommandLine;
import picocli.CommandLine.Command;

import java.util.concurrent.CountDownLatch;

@Command(name = "subscribe-fields", mixinStandardHelpOptions = true,
description = "Subscribe to fields", version = "0.1.0")
public final class SubscribeToFields extends SingleSessionExampleBase {
@Override
protected void execute(Session session) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final Cancel cancel = session.subscribeToFields(new Listener() {
@Override
public void onNext(FieldsChangeUpdate fields) {
System.out.println("Created: " + fields.getCreatedCount());
System.out.println("Updated: " + fields.getUpdatedCount());
System.out.println("Removed: " + fields.getRemovedCount());
for (FieldInfo fieldInfo : fields.getCreatedList()) {
System.out.println("Created: " + fieldInfo);
}
for (FieldInfo fieldInfo : fields.getUpdatedList()) {
System.out.println("Updated: " + fieldInfo);
}
for (FieldInfo fieldInfo : fields.getRemovedList()) {
System.out.println("Removed: " + fieldInfo);
}
}

@Override
public void onError(Throwable t) {
if (!isCancelled(t)) {
t.printStackTrace(System.err);
}
latch.countDown();
}

@Override
public void onCompleted() {
latch.countDown();
}
});
Runtime.getRuntime().addShutdownHook(new Thread(cancel::cancel));
latch.await();
}

private static boolean isCancelled(Throwable t) {
if (t instanceof StatusRuntimeException) {
return ((StatusRuntimeException) t).getStatus().getCode() == Code.CANCELLED;
} else if (t instanceof StatusException) {
return ((StatusException) t).getStatus().getCode() == Code.CANCELLED;
}
return false;
}

public static void main(String[] args) {
int execute = new CommandLine(new SubscribeToFields()).execute(args);
System.exit(execute);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.deephaven.client.impl;

import io.deephaven.proto.backplane.grpc.FieldsChangeUpdate;

public interface ApplicationService {

interface Listener {
void onNext(FieldsChangeUpdate fields);

void onError(Throwable t);

void onCompleted();
}

interface Cancel {
void cancel();
}

Cancel subscribeToFields(Listener listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/**
* A session represents a client-side connection to a Deephaven server.
*/
public interface Session extends AutoCloseable, ConsoleService, InputTableService, TableService {
public interface Session extends AutoCloseable, ApplicationService, ConsoleService, InputTableService, TableService {

// ----------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import io.deephaven.client.impl.script.VariableDefinition;
import io.deephaven.proto.backplane.grpc.AddTableRequest;
import io.deephaven.proto.backplane.grpc.AddTableResponse;
import io.deephaven.proto.backplane.grpc.ApplicationServiceGrpc.ApplicationServiceStub;
import io.deephaven.proto.backplane.grpc.CloseSessionResponse;
import io.deephaven.proto.backplane.grpc.DeleteTableRequest;
import io.deephaven.proto.backplane.grpc.DeleteTableResponse;
import io.deephaven.proto.backplane.grpc.FieldsChangeUpdate;
import io.deephaven.proto.backplane.grpc.HandshakeRequest;
import io.deephaven.proto.backplane.grpc.HandshakeResponse;
import io.deephaven.proto.backplane.grpc.InputTableServiceGrpc.InputTableServiceStub;
import io.deephaven.proto.backplane.grpc.ListFieldsRequest;
import io.deephaven.proto.backplane.grpc.ReleaseRequest;
import io.deephaven.proto.backplane.grpc.ReleaseResponse;
import io.deephaven.proto.backplane.grpc.SessionServiceGrpc.SessionServiceStub;
Expand Down Expand Up @@ -178,6 +181,7 @@ public void onCompleted() {
private final SessionServiceStub sessionService;
private final ConsoleServiceStub consoleService;
private final InputTableServiceStub inputTableService;
private final ApplicationServiceStub applicationServiceStub;
private final Handler handler;
private final ExportTicketCreator exportTicketCreator;
private final ExportStates states;
Expand All @@ -200,6 +204,7 @@ private SessionImpl(SessionImplConfig config, Handler handler, AuthenticationInf
this.sessionService = config.channel().session().withCallCredentials(credentials);
this.consoleService = config.channel().console().withCallCredentials(credentials);
this.inputTableService = config.channel().inputTable().withCallCredentials(credentials);
this.applicationServiceStub = config.channel().application().withCallCredentials(credentials);
this.exportTicketCreator = new ExportTicketCreator();
this.states = new ExportStates(this, sessionService, config.channel().table().withCallCredentials(credentials),
exportTicketCreator);
Expand Down Expand Up @@ -322,6 +327,14 @@ public CompletableFuture<Void> deleteFromInputTable(HasTicketId destination, Has
return observer.future;
}

@Override
public Cancel subscribeToFields(Listener listener) {
final ListFieldsRequest request = ListFieldsRequest.newBuilder().build();
final ListFieldsObserver observer = new ListFieldsObserver(listener);
applicationServiceStub.listFields(request, observer);
return observer;
}

public long batchCount() {
return states.batchCount();
}
Expand Down Expand Up @@ -707,4 +720,40 @@ public void onCompleted() {
}
}
}

private static class ListFieldsObserver
implements Cancel, ClientResponseObserver<ListFieldsRequest, FieldsChangeUpdate> {

private final Listener listener;
private ClientCallStreamObserver<?> stream;

public ListFieldsObserver(Listener listener) {
this.listener = Objects.requireNonNull(listener);
}

@Override
public void cancel() {
stream.cancel("User cancelled", null);
}

@Override
public void beforeStart(ClientCallStreamObserver<ListFieldsRequest> requestStream) {
stream = requestStream;
}

@Override
public void onNext(FieldsChangeUpdate value) {
listener.onNext(value);
}

@Override
public void onError(Throwable t) {
listener.onError(t);
}

@Override
public void onCompleted() {
listener.onCompleted();
}
}
}