diff --git a/integration-tests/grpc-streaming/src/main/java/io/quarkus/grpc/example/streaming/StreamingService.java b/integration-tests/grpc-streaming/src/main/java/io/quarkus/grpc/example/streaming/StreamingService.java index 35612cc5d9e4d..6a4d730b034b0 100644 --- a/integration-tests/grpc-streaming/src/main/java/io/quarkus/grpc/example/streaming/StreamingService.java +++ b/integration-tests/grpc-streaming/src/main/java/io/quarkus/grpc/example/streaming/StreamingService.java @@ -1,10 +1,13 @@ package io.quarkus.grpc.example.streaming; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; import io.grpc.examples.streaming.Empty; import io.grpc.examples.streaming.Item; import io.grpc.examples.streaming.MutinyStreamingGrpc; +import io.grpc.examples.streaming.StringReply; +import io.grpc.examples.streaming.StringRequest; import io.quarkus.grpc.GrpcService; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -36,4 +39,25 @@ public Multi pipe(Multi request) { .onItem().scan(() -> 0L, Long::sum) .onItem().transform(l -> Item.newBuilder().setValue(Long.toString(l)).build()); } + + @Override + public Uni stringStream(Multi request) { + AtomicInteger atomicInteger = new AtomicInteger(0); + return request + // .call(() -> Uni.createFrom().failure(new RuntimeException("Any error"))) + .map(x -> { + if (atomicInteger.get() == 30) { + throw new RuntimeException("We reached 30, error here"); + } + return StringReply.newBuilder() + .setMessage(x.toString()) + .build(); + }) + .collect().asList() + .replaceWith(StringReply.newBuilder() + .setMessage("DONE") + .build()) + .onFailure() + .invoke(th -> System.err.println(th.getMessage())); + } } diff --git a/integration-tests/grpc-streaming/src/main/proto/streaming.proto b/integration-tests/grpc-streaming/src/main/proto/streaming.proto index 43b857c8dac4a..626a6285157f8 100644 --- a/integration-tests/grpc-streaming/src/main/proto/streaming.proto +++ b/integration-tests/grpc-streaming/src/main/proto/streaming.proto @@ -12,6 +12,7 @@ service Streaming { rpc Source(Empty) returns (stream Item) {} rpc Sink(stream Item) returns (Empty) {} rpc Pipe(stream Item) returns (stream Item) {} + rpc StringStream (stream StringRequest) returns (StringReply) {} } message Item { @@ -19,4 +20,12 @@ message Item { } message Empty { -} \ No newline at end of file +} + +message StringRequest { + string anyValue = 1; +} + +message StringReply { + string message = 1; +} diff --git a/integration-tests/grpc-streaming/src/main/resources/application.properties b/integration-tests/grpc-streaming/src/main/resources/application.properties index 8405096c74a29..011cd24e1144d 100644 --- a/integration-tests/grpc-streaming/src/main/resources/application.properties +++ b/integration-tests/grpc-streaming/src/main/resources/application.properties @@ -4,3 +4,12 @@ quarkus.grpc.clients.streaming.port=9001 %vertx.quarkus.grpc.clients.streaming.port=8081 %vertx.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true %vertx.quarkus.grpc.server.use-separate-server=false + +%n2o.quarkus.grpc.server.use-separate-server=true +%o2n.quarkus.grpc.server.use-separate-server=false + +%n2o.quarkus.grpc.clients.streaming.port=9001 +%n2o.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true + +%o2n.quarkus.grpc.clients.streaming.port=8081 +%o2n.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=false diff --git a/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTest.java b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTest.java new file mode 100644 index 0000000000000..ba401722b2bb4 --- /dev/null +++ b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTest.java @@ -0,0 +1,7 @@ +package io.quarkus.grpc.example.streaming; + +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class LongStreamTest extends LongStreamTestBase { +} diff --git a/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTestBase.java b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTestBase.java new file mode 100644 index 0000000000000..887c42f427f5e --- /dev/null +++ b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTestBase.java @@ -0,0 +1,37 @@ +package io.quarkus.grpc.example.streaming; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.grpc.examples.streaming.Streaming; +import io.grpc.examples.streaming.StringRequest; +import io.quarkus.grpc.GrpcClient; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +@SuppressWarnings("NewClassNamingConvention") +public class LongStreamTestBase { + private final Logger log = LoggerFactory.getLogger(getClass()); + + @GrpcClient("streaming") + Streaming streamSvc; + + @Test + public void testFailure() { + Multi multi = Multi.createFrom().range(1, 10000) + // delaying stream to make it a bit longer + .call(() -> Uni.createFrom().nullItem().onItem().delayIt().by(Duration.of(1000, ChronoUnit.NANOS))) + .map(x -> StringRequest.newBuilder() + .setAnyValue(x.toString()) + .build()) + .invoke(x -> log.info("Stream piece number is: " + x.getAnyValue())); + + streamSvc.stringStream(multi) + .subscribe().with(ok -> log.info("All is ok: {}", ok), th -> log.error(th.getMessage(), th)); + } + +} diff --git a/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/N2OLongStreamTest.java b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/N2OLongStreamTest.java new file mode 100644 index 0000000000000..f7598d1727953 --- /dev/null +++ b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/N2OLongStreamTest.java @@ -0,0 +1,10 @@ +package io.quarkus.grpc.example.streaming; + +import io.quarkus.grpc.test.utils.N2OGRPCTestProfile; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +@QuarkusTest +@TestProfile(N2OGRPCTestProfile.class) +public class N2OLongStreamTest extends LongStreamTestBase { +} diff --git a/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/O2NLongStreamTest.java b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/O2NLongStreamTest.java new file mode 100644 index 0000000000000..9278fa5cfad07 --- /dev/null +++ b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/O2NLongStreamTest.java @@ -0,0 +1,10 @@ +package io.quarkus.grpc.example.streaming; + +import io.quarkus.grpc.test.utils.O2NGRPCTestProfile; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +@QuarkusTest +@TestProfile(O2NGRPCTestProfile.class) +public class O2NLongStreamTest extends LongStreamTestBase { +} diff --git a/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/VertxLongStreamTest.java b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/VertxLongStreamTest.java new file mode 100644 index 0000000000000..c056a783c9a96 --- /dev/null +++ b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/VertxLongStreamTest.java @@ -0,0 +1,10 @@ +package io.quarkus.grpc.example.streaming; + +import io.quarkus.grpc.test.utils.VertxGRPCTestProfile; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +@QuarkusTest +@TestProfile(VertxGRPCTestProfile.class) +public class VertxLongStreamTest extends LongStreamTestBase { +}