Skip to content

Commit

Permalink
New request body API (#10781)
Browse files Browse the repository at this point in the history
This PR adds a new experimental public API for accessing the bytes of a request in the HTTP server. While it shares some naming with the ByteBody API we used before, it is a very different design. It's designed to be public API, not netty-specific, but still powerful enough to allow for the ByteBody optimizations we had before.

At the moment, the new api is only used for requests and only on the server. I'd like to expand it to the response and to the client if possible. But there are some challenges, so it won't make it into this PR:

ByteBody must be closed. Only NettyHttpRequest has the wiring to make that possible at the moment, the other HttpReq/Resp implementations are much more loose about resource management.
The server request is the only point where filters are actually executed when the body is in byte form, at the moment. In client messages and in the server response, the body is in object form when filters are executed.
Another missing piece is a non-netty implementation of ByteBody. I am particularly interested in a servlet implementation based on InputStream. I will implement that as a separate PR.

Some pieces of the old netty-only body api remain (the ObjectBody impls) to keep changes down. Also AbstractHttpContentProcessor is finally removed, all fields are folded into FormDataHttpContentProcessor.
  • Loading branch information
yawkat authored May 27, 2024
1 parent d91c160 commit 51fe0b4
Show file tree
Hide file tree
Showing 71 changed files with 3,254 additions and 1,739 deletions.
14 changes: 13 additions & 1 deletion config/accepted-api-changes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,18 @@
"type": "io.micronaut.http.client.tck.tests.$Person$IntrospectionRef",
"member": "Implemented interface io.micronaut.core.beans.BeanIntrospectionReference",
"reason": "Introspection changes"
},
{
"type": "io.micronaut.http.filter.BaseFilterProcessor$RequiresRequestBodyBinder",
"member": "Implemented interface io.micronaut.core.bind.ArgumentBinder",
"reason": "Internal class"
},
{
"type": "io.micronaut.http.server.netty.$DefaultHttpContentProcessorResolver$Definition",
"reason": "Internal class removed"
},
{
"type": "io.micronaut.http.server.netty.jackson.$JsonHttpContentSubscriberFactory$Definition",
"reason": "Internal class removed"
}

]
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.scheduler.NonBlocking;

import java.io.Closeable;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -153,6 +154,9 @@ public T take() throws InterruptedException {
return null;
}
if (demanded) {
if (Thread.currentThread() instanceof NonBlocking) {
throw new IllegalStateException("Attempted to do blocking operation on a thread marked as NonBlocking. (Maybe the netty event loop?) Please only run blocking operations on IO or virtual threads, for example by marking your controller with @ExecuteOn(TaskExecutors.BLOCKING).");
}
newDataCondition.await();
}
subscription = this.subscription;
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.ByteBufHolder;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.FileUpload;
import io.netty.handler.codec.http.multipart.HttpDataFactory;
Expand All @@ -31,10 +32,12 @@
import io.netty.handler.codec.http.multipart.HttpPostStandardRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.multipart.InterfaceHttpPostRequestDecoder;
import io.netty.util.ReferenceCountUtil;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;

/**
* <p>Decodes {@link MediaType#MULTIPART_FORM_DATA} in a non-blocking manner.</p>
Expand All @@ -45,10 +48,14 @@
* @since 1.0
*/
@Internal
public class FormDataHttpContentProcessor extends AbstractHttpContentProcessor {
public final class FormDataHttpContentProcessor {

protected final NettyHttpRequest<?> nettyHttpRequest;
protected final long advertisedLength;
protected final long requestMaxSize;
protected final AtomicLong receivedLength = new AtomicLong();
protected final HttpServerConfiguration configuration;
private final InterfaceHttpPostRequestDecoder decoder;
private final boolean enabled;
private final long partMaxSize;

/**
Expand All @@ -63,13 +70,20 @@ public class FormDataHttpContentProcessor extends AbstractHttpContentProcessor {
* {@code true} if the decoder has been destroyed or will be destroyed in the near future.
*/
private boolean destroyed = false;
/**
* Whether we received a LastHttpContent.
*/
private boolean receivedLast = false;

/**
* @param nettyHttpRequest The {@link NettyHttpRequest}
* @param configuration The {@link NettyHttpServerConfiguration}
*/
public FormDataHttpContentProcessor(NettyHttpRequest<?> nettyHttpRequest, HttpServerConfiguration configuration) {
super(nettyHttpRequest, configuration);
this.nettyHttpRequest = nettyHttpRequest;
this.advertisedLength = nettyHttpRequest.getContentLength();
this.requestMaxSize = configuration.getMaxRequestSize();
this.configuration = configuration;
Charset characterEncoding = nettyHttpRequest.getCharacterEncoding();
HttpServerConfiguration.MultipartConfiguration multipart = configuration.getMultipart();
HttpDataFactory factory = new MicronautHttpData.Factory(multipart, characterEncoding);
Expand All @@ -80,18 +94,10 @@ public FormDataHttpContentProcessor(NettyHttpRequest<?> nettyHttpRequest, HttpSe
} else {
this.decoder = new HttpPostStandardRequestDecoder(factory, nativeRequest, characterEncoding);
}
this.enabled = nettyHttpRequest.getContentType().map(type -> type.equals(MediaType.APPLICATION_FORM_URLENCODED_TYPE)).orElse(false) ||
multipart.isEnabled();
this.partMaxSize = multipart.getMaxFileSize();
}

@Override
public boolean isEnabled() {
return enabled;
}

@Override
protected void onData(ByteBufHolder message, Collection<Object> out) {
protected void onData(ByteBufHolder message, Collection<? super InterfaceHttpData> out) {
boolean skip;
synchronized (this) {
if (destroyed) {
Expand Down Expand Up @@ -163,22 +169,32 @@ protected void onData(ByteBufHolder message, Collection<Object> out) {
}
}

@Override
public void add(ByteBufHolder message, Collection<Object> out) throws Throwable {
public void add(ByteBufHolder message, Collection<? super InterfaceHttpData> out) throws Throwable {
try {
super.add(message, out);
receivedLast |= message instanceof LastHttpContent;
long receivedLength1 = this.receivedLength.addAndGet(message.content().readableBytes());

ReferenceCountUtil.touch(message);
if (advertisedLength > requestMaxSize) {
fireExceedsLength(advertisedLength, requestMaxSize, message);
} else if (receivedLength1 > requestMaxSize) {
fireExceedsLength(receivedLength1, requestMaxSize, message);
} else {
onData(message, out);
}
} catch (Throwable e) {
cancel();
throw e;
}
}

@Override
public void complete(Collection<Object> out) {
public void complete(Collection<? super InterfaceHttpData> out) throws Throwable {
if (!receivedLast) {
add(LastHttpContent.EMPTY_LAST_CONTENT, out);
}
cancel();
}

@Override
public void cancel() {
pleaseDestroy = true;
destroyIfRequested();
Expand All @@ -199,4 +215,13 @@ private void destroyIfRequested() {
}
}

/**
* @param receivedLength The length of the content received
* @param expected The expected length of the content
* @param message The message to release
*/
protected void fireExceedsLength(long receivedLength, long expected, ByteBufHolder message) {
message.release();
throw new ContentLengthExceededException(expected, receivedLength);
}
}
Loading

0 comments on commit 51fe0b4

Please sign in to comment.