Skip to content

Commit

Permalink
Some new tests from JK and fixes to gRPC client code to handle: (1) e…
Browse files Browse the repository at this point in the history
…xceptions thrown in gRPC methods and (2) larger payloads.
  • Loading branch information
spericas committed May 2, 2024
1 parent 19562a9 commit 386bad9
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package io.helidon.webclient.grpc;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -88,8 +92,18 @@ public void halfClose() {
@Override
public void sendMessage(ReqT message) {
socket().log(LOGGER, DEBUG, "sendMessage called");
BufferData messageData = BufferData.growing(BUFFER_SIZE_BYTES);
messageData.readFrom(requestMarshaller().stream(message));

// serialize message using a marshaller
ByteArrayOutputStream baos = new ByteArrayOutputStream(BUFFER_SIZE_BYTES);
try (InputStream is = requestMarshaller().stream(message)) {
is.transferTo(baos);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
byte[] serialized = baos.toByteArray();

// queue data message and start writer
BufferData messageData = BufferData.createReadOnly(serialized, 0, serialized.length);
BufferData headerData = BufferData.create(5);
headerData.writeInt8(0); // no compression
headerData.writeUnsignedInt32(messageData.available()); // length prefixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,14 @@ public void sendMessage(ReqT message) {
clientStream().writeData(BufferData.create(headerData, messageData), true);
requestSent = true;

// read response headers
clientStream().readHeaders();

while (isRemoteOpen()) {
// trailers received?
if (clientStream().trailers().isDone()) {
socket().log(LOGGER, DEBUG, "trailers received");
return;
// trailers or eos received?
if (clientStream().trailers().isDone() || !clientStream().hasEntity()) {
socket().log(LOGGER, DEBUG, "[Reading thread] trailers or eos received");
break;
}

// attempt to read and queue
Expand Down
2 changes: 2 additions & 0 deletions webclient/tests/grpc/src/main/proto/strings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ service StringService {
rpc Split (StringMessage) returns (stream StringMessage) {}
rpc Join (stream StringMessage) returns (StringMessage) {}
rpc Echo (stream StringMessage) returns (stream StringMessage) {}
rpc BadMethod (StringMessage) returns (StringMessage) {}
rpc NotImplementedMethod (StringMessage) returns (StringMessage) {}
}

message StringMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.helidon.common.Weight;
import io.helidon.common.configurable.Resource;
Expand Down Expand Up @@ -74,7 +75,11 @@ static void setUpRoute(GrpcRouting.Builder routing) {
.bidi(Strings.getDescriptor(),
"StringService",
"Echo",
GrpcStubTest::echo);
GrpcStubTest::echo)
.unary(Strings.getDescriptor(),
"StringService",
"BadMethod",
GrpcStubTest::badMethod);
}

@BeforeEach
Expand Down Expand Up @@ -154,6 +159,10 @@ public void onCompleted() {
};
}

static void badMethod(Strings.StringMessage req, StreamObserver<Strings.StringMessage> streamObserver) {
streamObserver.onError(Status.INTERNAL.asException());
}

Strings.StringMessage newStringMessage(String data) {
return Strings.StringMessage.newBuilder().setText(data).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@

package io.helidon.webclient.grpc.tests;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;

import io.helidon.common.configurable.Resource;
import io.helidon.common.tls.Tls;
import io.helidon.webclient.api.WebClient;
import io.helidon.webclient.grpc.GrpcClient;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.testing.junit5.ServerTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -144,4 +148,34 @@ void testBidirectionalEchoAsyncEmpty() throws ExecutionException, InterruptedExc
Iterator<Strings.StringMessage> res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(res.hasNext(), is(false));
}

@Test
void testBidirectionalEchoAsyncWithLargePayload() throws ExecutionException, InterruptedException, TimeoutException {
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel());
CompletableFuture<Iterator<Strings.StringMessage>> future = new CompletableFuture<>();
StreamObserver<Strings.StringMessage> req = service.echo(multiStreamObserver(future));
byte[] array = new byte[2000];
new Random().nextBytes(array);
String largeString = new String(array, StandardCharsets.UTF_8);
req.onNext(newStringMessage(largeString));
req.onCompleted();
Iterator<Strings.StringMessage> res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(res.next().getText(), is(largeString));
assertThat(res.hasNext(), is(false));
}

@Test
void testReceiveServerException() {
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel());
Assertions.assertThrows(Throwable.class, () -> service.badMethod(newStringMessage("hello")));
}

@Test
void testCallingNotImplementMethodThrowsException() {
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel());
Assertions.assertThrows(Throwable.class, () -> service.notImplementedMethod(newStringMessage("hello")));
}
}
2 changes: 1 addition & 1 deletion webclient/tests/grpc/src/test/resources/logging.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ handlers=io.helidon.logging.jul.HelidonConsoleHandler
java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n

.level=INFO
#io.helidon.webclient.grpc.level=FINEST
io.helidon.webclient.grpc.level=FINEST

0 comments on commit 386bad9

Please sign in to comment.