diff --git a/gcp-function-http/src/main/java/io/micronaut/gcp/function/http/GoogleFunctionHttpRequest.java b/gcp-function-http/src/main/java/io/micronaut/gcp/function/http/GoogleFunctionHttpRequest.java index 99875be3d..1f14d1239 100644 --- a/gcp-function-http/src/main/java/io/micronaut/gcp/function/http/GoogleFunctionHttpRequest.java +++ b/gcp-function-http/src/main/java/io/micronaut/gcp/function/http/GoogleFunctionHttpRequest.java @@ -36,6 +36,10 @@ import io.micronaut.http.MutableHttpHeaders; import io.micronaut.http.MutableHttpParameters; import io.micronaut.http.MutableHttpRequest; +import io.micronaut.http.ServerHttpRequest; +import io.micronaut.http.body.ByteBody; +import io.micronaut.http.body.CloseableAvailableByteBody; +import io.micronaut.http.body.CloseableByteBody; import io.micronaut.http.cookie.Cookie; import io.micronaut.http.cookie.Cookies; import io.micronaut.http.simple.SimpleHttpParameters; @@ -45,21 +49,27 @@ import io.micronaut.servlet.http.ServletExchange; import io.micronaut.servlet.http.ServletHttpRequest; import io.micronaut.servlet.http.ServletHttpResponse; +import io.micronaut.servlet.http.body.InputStreamByteBody; import io.netty.handler.codec.http.QueryStringDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.net.URI; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.function.Supplier; import static io.micronaut.servlet.http.BodyBuilder.isFormSubmission; @@ -75,6 +85,7 @@ final class GoogleFunctionHttpRequest implements ServletHttpRequest, ServletExchange, + ServerHttpRequest, FullHttpRequest, ParsedBodyHolder { @@ -85,6 +96,7 @@ final class GoogleFunctionHttpRequest implements private final HttpMethod method; private final GoogleFunctionHeaders headers; private final GoogleFunctionHttpResponse googleResponse; + private final Supplier byteBody; private MutableHttpParameters httpParameters; private MutableConvertibleValues attributes; private B parsedBody; @@ -93,8 +105,6 @@ final class GoogleFunctionHttpRequest implements private ConversionService conversionService; - private ByteArrayByteBuffer servletByteBuffer; - /** * Default constructor. * @@ -107,7 +117,8 @@ final class GoogleFunctionHttpRequest implements com.google.cloud.functions.HttpRequest googleRequest, GoogleFunctionHttpResponse googleResponse, ConversionService conversionService, - BodyBuilder bodyBuilder) { + BodyBuilder bodyBuilder, + Executor ioExecutor) { this.googleRequest = googleRequest; this.googleResponse = googleResponse; this.uri = URI.create(googleRequest.getUri()); @@ -125,20 +136,43 @@ final class GoogleFunctionHttpRequest implements B built = parsedBody != null ? parsedBody : (B) bodyBuilder.buildBody(this::getInputStream, this); return Optional.ofNullable(built); }); + this.byteBody = SupplierUtil.memoized(() -> { + try { + return InputStreamByteBody.create( + googleRequest.getInputStream(), + OptionalLong.of(googleRequest.getContentLength()), + ioExecutor + ); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); } public byte[] getBodyBytes() throws IOException { - return googleRequest.getInputStream().readAllBytes(); + try (CloseableByteBody streaming = byteBody().split(ByteBody.SplitBackpressureMode.FASTEST); + CloseableAvailableByteBody buffered = streaming.buffer().get()) { + return buffered.toByteArray(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException ioe) { + throw ioe; + } else { + throw new RuntimeException(e); + } + } } @Override public InputStream getInputStream() throws IOException { - return servletByteBuffer != null ? servletByteBuffer.toInputStream() : new ByteArrayInputStream(getBodyBytes()); + return byteBody().split(ByteBody.SplitBackpressureMode.FASTEST).toInputStream(); } @Override public BufferedReader getReader() throws IOException { - return googleRequest.getReader(); + return new BufferedReader(new InputStreamReader(getInputStream(), getCharacterEncoding())); } @Override @@ -263,10 +297,7 @@ public void setParsedBody(B body) { @Override public @Nullable ByteBuffer contents() { try { - if (servletByteBuffer == null) { - this.servletByteBuffer = new ByteArrayByteBuffer<>(getInputStream().readAllBytes()); - } - return servletByteBuffer; + return new ByteArrayByteBuffer<>(getBodyBytes()); } catch (IOException e) { throw new IllegalStateException("Error getting all body contents", e); } @@ -277,6 +308,11 @@ public void setParsedBody(B body) { return ExecutionFlow.just(contents()); } + @Override + public @NonNull ByteBody byteBody() { + return byteBody.get(); + } + /** * Models the headers. */ diff --git a/gcp-function-http/src/main/java/io/micronaut/gcp/function/http/GoogleFunctionHttpResponse.java b/gcp-function-http/src/main/java/io/micronaut/gcp/function/http/GoogleFunctionHttpResponse.java index 1038286f5..2ce5dabe5 100644 --- a/gcp-function-http/src/main/java/io/micronaut/gcp/function/http/GoogleFunctionHttpResponse.java +++ b/gcp-function-http/src/main/java/io/micronaut/gcp/function/http/GoogleFunctionHttpResponse.java @@ -27,10 +27,10 @@ import io.micronaut.http.HttpStatus; import io.micronaut.http.MutableHttpHeaders; import io.micronaut.http.MutableHttpResponse; -import io.micronaut.http.codec.MediaTypeCodecRegistry; import io.micronaut.http.cookie.Cookie; import io.micronaut.http.cookie.ServerCookieEncoder; import io.micronaut.servlet.http.ServletHttpResponse; + import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; @@ -47,7 +47,6 @@ final class GoogleFunctionHttpResponse implements ServletHttpResponse { private final HttpResponseWrapper response; - private final MediaTypeCodecRegistry mediaTypeCodecRegistry; private final ConversionService conversionService; private MutableConvertibleValues attributes; @@ -59,11 +58,9 @@ final class GoogleFunctionHttpResponse implements ServletHttpResponse httpHandler; private final ConversionService conversionService; + private final BodyBuilder bodyBuilder; + private final Executor executor; /** * Default constructor. @@ -67,12 +92,16 @@ public class HttpFunction extends FunctionInitializer implements com.google.clou public HttpFunction() { httpHandler = initializeHandler(); this.conversionService = applicationContext.getBean(ConversionService.class); + this.bodyBuilder = applicationContext.getBean(BodyBuilder.class); + this.executor = applicationContext.getBean(Executor.class, Qualifiers.byName(TaskExecutors.BLOCKING)); } public HttpFunction(ApplicationContext context) { super(context); httpHandler = initializeHandler(); this.conversionService = applicationContext.getBean(ConversionService.class); + this.bodyBuilder = applicationContext.getBean(BodyBuilder.class); + this.executor = applicationContext.getBean(Executor.class, Qualifiers.byName(TaskExecutors.BLOCKING)); } private ServletHttpHandler initializeHandler() { @@ -80,9 +109,9 @@ private ServletHttpHandler initializeHandler() { @Override protected ServletExchange createExchange(HttpRequest request, HttpResponse response) { final GoogleFunctionHttpResponse res = - new GoogleFunctionHttpResponse<>(response, getMediaTypeCodecRegistry(), conversionService); + new GoogleFunctionHttpResponse<>(response, conversionService); final GoogleFunctionHttpRequest req = - new GoogleFunctionHttpRequest<>(request, res, conversionService, applicationContext.getBean(BodyBuilder.class)); + new GoogleFunctionHttpRequest<>(request, res, conversionService, bodyBuilder, executor); return new DefaultServletExchange<>(req, res); } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 8f83c76cc..a1401c3a7 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,6 +1,6 @@ [versions] micronaut-docs = "2.0.0" -micronaut = "4.4.0" +micronaut = "4.5.1" groovy = "4.0.14" spock = "2.3-groovy-4.0" @@ -30,13 +30,13 @@ micronaut-jackson-xml = "4.3.0" micronaut-logging = "1.3.0" micronaut-reactor = "3.3.0" micronaut-rxjava3 = "3.3.0" -micronaut-serde = "2.9.0" -micronaut-servlet = "4.7.0" -micronaut-tracing = "6.5.0" +micronaut-serde = "2.10.1" +micronaut-servlet = "4.9.1" +micronaut-tracing = "6.6.0" micronaut-test = "4.3.0" micronaut-discovery = "4.3.0" micronaut-test-resources="2.5.2" -micronaut-validation = "4.5.0" +micronaut-validation = "4.6.0" # Micronaut micronaut-gradle-plugin = "4.4.0"