Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#921 Allow reading body multiple times #923

Merged
merged 24 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c6ec72e
Update common files (#912)
micronaut-build Jun 8, 2024
829f754
Add the Server TCK for Oracle Cloud HTTP Function
andriy-dmytruk Jun 11, 2024
a966a4a
Map the Fn-Http-H- header prefix to normal headers for testing
andriy-dmytruk Jun 11, 2024
32652f1
Make the FnServletRequest mutable to support request filters
andriy-dmytruk Jun 11, 2024
75f62da
Add support for parsing form data
andriy-dmytruk Jun 12, 2024
809c007
Fix for empty values in the form data
andriy-dmytruk Jun 12, 2024
eb87c27
Fix header testing by canonizing the header keys
andriy-dmytruk Jun 12, 2024
a9157c0
Support single values in JSON for decoding publishers
andriy-dmytruk Jun 12, 2024
a902b36
Support binding JSON body parts for body
andriy-dmytruk Jun 12, 2024
ec586fe
Parse cookies from client header
andriy-dmytruk Jun 13, 2024
412d7e0
Use message body writer to write the input message if it is supplied …
andriy-dmytruk Jun 13, 2024
920adf7
Minor refactor
andriy-dmytruk Jun 13, 2024
44d30d8
Revert the HttpFunction change and configure the embedded server for …
andriy-dmytruk Jun 13, 2024
c37c84e
Refactor failed binding result into a common class
andriy-dmytruk Jun 13, 2024
1726ef4
Attempt to allow reading body multiple times
andriy-dmytruk Jun 13, 2024
ab4601e
Improve the implementation using InputStreamByteBody
andriy-dmytruk Jun 13, 2024
fd99aaa
fix byteBody implementation
yawkat Jun 18, 2024
e3d3b87
Attempt to allow reading body multiple times
andriy-dmytruk Jun 13, 2024
cd53a7a
Improve the implementation using InputStreamByteBody
andriy-dmytruk Jun 13, 2024
fec7e83
fix byteBody implementation
yawkat Jun 18, 2024
bbd3235
Disable binary compatability
andriy-dmytruk Jun 18, 2024
efe5c9d
Merge remote-tracking branch 'origin/andriy/function-http-tck-multire…
yawkat Jun 19, 2024
8121669
enable byteBody test
yawkat Jun 19, 2024
2aec7dd
Revert "enable byteBody test"
yawkat Jun 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ micronaut-micrometer = "5.6.0"
micronaut-reactor = "3.3.0"
micronaut-rxjava2 = "2.3.0"
micronaut-serde = "2.9.0"
micronaut-servlet = "4.7.0"
micronaut-servlet = "4.9.1"
micronaut-sql = "5.6.0"
micronaut-test = "4.3.0"
micronaut-logging = "1.3.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import io.micronaut.core.io.IOUtils;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
import io.micronaut.core.util.SupplierUtil;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpHeaders;
import io.micronaut.http.MutableHttpParameters;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.ServerHttpRequest;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.ByteBody.SplitBackpressureMode;
import io.micronaut.http.codec.MediaTypeCodec;
Expand All @@ -48,16 +48,13 @@
import io.micronaut.servlet.http.ServletExchange;
import io.micronaut.servlet.http.ServletHttpRequest;
import io.micronaut.servlet.http.ServletHttpResponse;
import io.micronaut.servlet.http.body.InputStreamByteBody;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -67,11 +64,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand All @@ -82,7 +76,7 @@
* @param <B> The body type
*/
@Internal
final class FnServletRequest<B> implements ServletHttpRequest<InputEvent, B>, ServletExchange<InputEvent, OutputEvent>, MutableHttpRequest<B> {
final class FnServletRequest<B> implements ServletHttpRequest<InputEvent, B>, ServletExchange<InputEvent, OutputEvent>, MutableHttpRequest<B>, ServerHttpRequest<B> {

private static final String COOKIE_HEADER = "Cookie";

Expand All @@ -96,46 +90,33 @@ final class FnServletRequest<B> implements ServletHttpRequest<InputEvent, B>, Se
private MutableConvertibleValues<Object> attributes;
private Cookies cookies;
private final MediaTypeCodecRegistry codecRegistry;
private final ReentrantReadWriteLock byteBodyLock = new ReentrantReadWriteLock();
private ByteBody byteBody;
private final ByteBody byteBody;
private URI uri;

public FnServletRequest(
ByteBody byteBody,
InputEvent inputEvent,
FnServletResponse<Object> response,
HTTPGatewayContext gatewayContext,
ConversionService conversionService,
MediaTypeCodecRegistry codecRegistry
) {
this.byteBody = byteBody;
this.inputEvent = inputEvent;
this.response = response;
this.gatewayContext = gatewayContext;
this.conversionService = conversionService;
this.codecRegistry = codecRegistry;
}

private static Supplier<byte[]> createByteBodySupplier(InputEvent inputEvent) {
return SupplierUtil.memoized(() ->
inputEvent.consumeBody(inputStream -> {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
inputStream.transferTo(outputStream);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return outputStream.toByteArray();
})
);
}

@Override
public boolean isAsyncSupported() {
return false;
}

@Override
public InputStream getInputStream() {
throw new UnsupportedOperationException("Calling getInputStream() is not supported. If you need an InputSteam define a parameter of type InputEvent and use the consumeBody method");
return byteBody.split(SplitBackpressureMode.FASTEST).toInputStream();
}

/**
Expand All @@ -145,29 +126,12 @@ public InputStream getInputStream() {
* @param <T> The function return value
*/
public <T> T consumeBody(Function<InputStream, T> consumer) {
byteBodyLock.readLock().lock();
if (byteBody != null) {
byteBodyLock.readLock().unlock();
return consumer.apply(byteBody.split(SplitBackpressureMode.FASTEST).toInputStream());
}
byteBodyLock.readLock().unlock();
byteBodyLock.writeLock().lock();
if (byteBody != null) {
byteBodyLock.writeLock().unlock();
return consumer.apply(byteBody.split(SplitBackpressureMode.FASTEST).toInputStream());
}
return inputEvent.consumeBody(inputStream -> {
// Store the stream in byte body, so that it is cached and can be reused
byteBody = InputStreamByteBody.create(inputStream, OptionalLong.empty(), null);
byteBodyLock.writeLock().unlock();
// The input event will close the stream, so we must consume it now
return consumer.apply(byteBody.split(SplitBackpressureMode.FASTEST).toInputStream());
});
return consumer.apply(byteBody.split(SplitBackpressureMode.FASTEST).toInputStream());
}

@Override
public BufferedReader getReader() {
throw new UnsupportedOperationException("Calling getReader() is not supported. If you need an InputSteam define a parameter of type InputEvent and use the consumeBody method");
return new BufferedReader(new InputStreamReader(getInputStream(), getCharacterEncoding()));
}

@NonNull
Expand Down Expand Up @@ -362,18 +326,23 @@ private ConvertibleMultiValues<CharSequence> parseFormData(String body) {
@Override
public MutableHttpRequest<B> mutate() {
FnServletRequest<B> request = new FnServletRequest<>(
byteBody,
inputEvent,
response,
gatewayContext,
conversionService,
codecRegistry
);
request.byteBody = byteBody;
request.cookies = cookies;
request.attributes = attributes;
return request;
}

@Override
public @NonNull ByteBody byteBody() {
return byteBody;
}

/**
* The fn parameters.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,29 @@
import com.fnproject.fn.api.RuntimeContext;
import com.fnproject.fn.api.httpgateway.HTTPGatewayContext;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.ReflectiveAccess;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.convert.DefaultMutableConversionService;
import io.micronaut.core.propagation.PropagatedContext;
import io.micronaut.core.util.SupplierUtil;
import io.micronaut.http.HttpHeaders;
import io.micronaut.http.body.CloseableByteBody;
import io.micronaut.http.context.ServerHttpRequestContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.oraclecloud.function.OciFunction;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.servlet.http.DefaultServletExchange;
import io.micronaut.servlet.http.ServletExchange;
import io.micronaut.servlet.http.ServletHttpHandler;
import io.micronaut.servlet.http.body.InputStreamByteBody;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.util.OptionalLong;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

/**
* A parent HttpFunction for authoring Project.fn gateway functions.
*
Expand All @@ -46,6 +55,7 @@ public class HttpFunction extends OciFunction {
private ServletHttpHandler<InputEvent, OutputEvent> httpHandler;

private final ConversionService conversionService;
private final Supplier<Executor> ioExecutor = SupplierUtil.memoized(() -> getApplicationContext().getBean(Executor.class, Qualifiers.byName(TaskExecutors.BLOCKING)));

/**
* Default constructor.
Expand Down Expand Up @@ -101,19 +111,48 @@ protected void setupGateway(@NonNull RuntimeContext ctx) {
@ReflectiveAccess
public OutputEvent handleRequest(HTTPGatewayContext gatewayContext, InputEvent inputEvent) {
FnServletResponse<Object> response = new FnServletResponse<>(gatewayContext, conversionService);
FnServletRequest<Object> servletRequest = new FnServletRequest<>(
inputEvent, response, gatewayContext, conversionService,
httpHandler.getMediaTypeCodecRegistry()
);
DefaultServletExchange<InputEvent, OutputEvent> exchange = new DefaultServletExchange<>(
servletRequest,
response
);
try (PropagatedContext.Scope ignore = PropagatedContext.getOrEmpty().plus(new ServerHttpRequestContext(servletRequest)).propagate()) {
this.httpHandler.service(
exchange
);
return response.getNativeResponse();
}
/*
This is a bit tricky. fnproject only allows access to the body InputStream through this
consumeBody method, which can only be called once and, after the lambda finishes, closes
the stream. This is incompatible with the ByteBody architecture.

To work around this limitation, we do a single big consumeBody around the entire request
processing. This hopefully encompasses most users of the body stream. Any read operations
inside this block are simply forwarded upstream.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, this is great!


In case there *is* a downstream consumer that has not called allowDiscard and has not
consumed all data yet, we call bufferIfNecessary. This call will buffer any remaining data
so that this downstream consumer can continue reading.

To prevent unnecessary buffering when there are no downstream consumers that require the
(full) body, we take advantage of the allowDiscard mechanism. If InputStreamByteBody is
never split, or all splits are closed or allowDiscarded, then closing the original
InputStreamByteBody will also close the InputStream it wraps. This sets a flag in
OptionalBufferingInputStream that disables buffering on bufferIfNecessary.
*/

return inputEvent.consumeBody(stream -> {
OptionalBufferingInputStream optionalBufferingInputStream = new OptionalBufferingInputStream(stream);
OptionalLong contentLength = inputEvent.getHeaders().get(HttpHeaders.CONTENT_LENGTH).map(Long::parseLong).map(OptionalLong::of).orElse(OptionalLong.empty());
try (CloseableByteBody body = InputStreamByteBody.create(optionalBufferingInputStream, contentLength, ioExecutor.get())) {

FnServletRequest<Object> servletRequest = new FnServletRequest<>(
body, inputEvent, response, gatewayContext, conversionService,
httpHandler.getMediaTypeCodecRegistry()
);
DefaultServletExchange<InputEvent, OutputEvent> exchange = new DefaultServletExchange<>(
servletRequest,
response
);
try (PropagatedContext.Scope ignore = PropagatedContext.getOrEmpty().plus(new ServerHttpRequestContext(servletRequest)).propagate()) {
this.httpHandler.service(
exchange
);
}
}
OutputEvent nativeResponse = response.getNativeResponse();
optionalBufferingInputStream.bufferIfNecessary();
return nativeResponse;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.oraclecloud.function.http;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* This stream class wraps another upstream {@link InputStream}. Normally, read actions are simply
* forwarded upstream. However, there is an additional action {@link #bufferIfNecessary()} that, if
* this stream has not been closed yet, buffers all remaining data from the upstream. Downstream
* consumers can then continue reading data independent of the upstream.
* <p>This class is necessary because fnproject closes the upstream at some point, and we might not
* have read all data yet. So we {@link #bufferIfNecessary()} if the downstream users still need
* access to the data after the close.
*/
final class OptionalBufferingInputStream extends InputStream {
private final Lock lock = new ReentrantLock();
private final InputStream upstream;
private byte[] buffered;
private int bufferedIndex;
private boolean closed;

OptionalBufferingInputStream(InputStream upstream) {
this.upstream = upstream;
}

@Override
public int read() throws IOException {
byte[] arr1 = new byte[1];
int n = read(arr1);
if (n == -1) {
return -1;
} else if (n == 0) {
throw new IllegalStateException("Read 0 bytes");
} else {
return arr1[0] & 0xff;
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
lock.lock();
try {
if (buffered == null) {
return upstream.read(b, off, len);
} else {
if (bufferedIndex >= buffered.length) {
return -1;
} else {
int n = Math.min(len, buffered.length - bufferedIndex);
System.arraycopy(buffered, bufferedIndex, b, off, n);
bufferedIndex += n;
return n;
}
}
} finally {
lock.unlock();
}
}

@Override
public void close() throws IOException {
lock.lock();
try {
closed = true;
upstream.close();
} finally {
lock.unlock();
}
}

void bufferIfNecessary() {
lock.lock();
try {
if (!closed) {
try {
buffered = upstream.readAllBytes();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
} finally {
lock.unlock();
}
}
}
Loading