Skip to content

Commit

Permalink
Fixes problems in unary calls with large payloads.
Browse files Browse the repository at this point in the history
  • Loading branch information
spericas committed May 2, 2024
1 parent 386bad9 commit 4283336
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package io.helidon.webclient.grpc;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -214,4 +217,14 @@ public int read() {
}
});
}

protected byte[] serializeMessage(ReqT message) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(BUFFER_SIZE_BYTES);
try (InputStream is = requestMarshaller().stream(message)) {
is.transferTo(baos);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return baos.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

package io.helidon.webclient.grpc;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -93,16 +89,8 @@ public void halfClose() {
public void sendMessage(ReqT message) {
socket().log(LOGGER, DEBUG, "sendMessage called");

// serialize message using a marshaller
ByteArrayOutputStream baos = new ByteArrayOutputStream(BUFFER_SIZE_BYTES);
try (InputStream is = requestMarshaller().stream(message)) {
is.transferTo(baos);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
byte[] serialized = baos.toByteArray();

// queue data message and start writer
// serialize and queue message for writing
byte[] serialized = serializeMessage(message);
BufferData messageData = BufferData.createReadOnly(serialized, 0, serialized.length);
BufferData headerData = BufferData.create(5);
headerData.writeInt8(0); // no compression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ public void sendMessage(ReqT message) {
return;
}

BufferData messageData = BufferData.growing(BUFFER_SIZE_BYTES);
messageData.readFrom(requestMarshaller().stream(message));
// serialize and write message
byte[] serialized = serializeMessage(message);
BufferData messageData = BufferData.createReadOnly(serialized, 0, serialized.length);
BufferData headerData = BufferData.create(5);
headerData.writeInt8(0); // no compression
headerData.writeUnsignedInt32(messageData.available()); // length prefixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package io.helidon.webclient.grpc.tests;

import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;

import io.helidon.common.configurable.Resource;
import io.helidon.common.tls.Tls;
Expand Down Expand Up @@ -70,6 +70,15 @@ void testUnaryUpper() {
assertThat(res.getText(), is("HELLO"));
}

@Test
void tesUnaryUpperLongString() {
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel());
String s = CharBuffer.allocate(2000).toString().replace('\0', 'a');
Strings.StringMessage res = service.upper(newStringMessage(s));
assertThat(res.getText(), is(s.toUpperCase()));
}

@Test
void testUnaryUpperAsync() throws ExecutionException, InterruptedException, TimeoutException {
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
Expand Down

0 comments on commit 4283336

Please sign in to comment.