Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.x] 100 continue triggered by content request #5714

Merged
merged 4 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/config/io_helidon_webserver_SocketConfiguration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ Type: link:{javadoc-base-url}/io.helidon.webserver/io/helidon/webserver/SocketCo
server socket.

If `0` then use implementation default.

|`continue-immediately`|boolean |`false` |When true answers to expect continue with 100 continue immediately, not waiting for user to actually request the data.
Default is `false`
|`requested-uri-discovery.enabled` |boolean |`true if 'types' or 'trusted-proxies' is set; false otherwise` |Sets whether requested URI discovery is enabled for the socket.
|`requested-uri-discovery.trusted-proxies` |xref:{rootdir}/config/io_helidon_common_configurable_AllowList.adoc[AllowList] |{nbsp} |Assigns the settings governing the acceptance and rejection of forwarded headers from incoming requests to this socket.
This setting automatically enables discovery for the socket.
Expand Down
3 changes: 3 additions & 0 deletions docs/config/io_helidon_webserver_WebServer.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ This is a standalone configuration type, prefix from configuration root: `server
server socket.

If `0` then use implementation default.

|`continue-immediately`|boolean |`false` |When true WebServer answers to expect continue with 100 continue immediately, not waiting for user to actually request the data.
Default is `false`
|`requested-uri-discovery.enabled` |boolean |`true if 'types' or 'trusted-proxies' is set; false otherwise` |Sets whether requested URI discovery is enabled for the socket.
|`requested-uri-discovery.trusted-proxies` |xref:{rootdir}/config/io_helidon_common_configurable_AllowList.adoc[AllowList] |{nbsp} |Assigns the settings governing the acceptance and rejection of forwarded headers from incoming requests to this socket.
This setting automatically enables discovery for the socket.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
* Copyright (c) 2017, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,6 +69,7 @@ class BareResponseImpl implements BareResponse {
private final AtomicBoolean internallyClosed = new AtomicBoolean(false);
private final CompletableFuture<BareResponse> responseFuture;
private final CompletableFuture<BareResponse> headersFuture;
private final CompletableFuture<Boolean> entityRequested;
private final RequestContext requestContext;
private final long requestId;
private final String http2StreamId;
Expand Down Expand Up @@ -98,21 +99,22 @@ class BareResponseImpl implements BareResponse {
* @param requestId the correlation ID that is added to the log statements
*/
BareResponseImpl(ChannelHandlerContext ctx,
CompletableFuture<Boolean> entityRequested,
HttpRequest request,
RequestContext requestContext,
CompletableFuture<?> prevRequestChunk,
CompletableFuture<ChannelFutureListener> requestEntityAnalyzed,
long backpressureBufferSize,
BackpressureStrategy backpressureStrategy,
SocketConfiguration soConfig,
long requestId) {
this.entityRequested = entityRequested;
this.requestContext = requestContext;
this.originalEntityAnalyzed = requestEntityAnalyzed;
this.requestEntityAnalyzed = requestEntityAnalyzed;
this.backpressureStrategy = backpressureStrategy;
this.backpressureBufferSize = backpressureBufferSize;
this.backpressureStrategy = soConfig.backpressureStrategy();
this.backpressureBufferSize = soConfig.backpressureBufferSize();
this.responseFuture = new CompletableFuture<>();
this.headersFuture = new CompletableFuture<>();
this.channel = new NettyChannel(ctx.channel());
this.channel = new NettyChannel(ctx);
this.requestId = requestId;
this.keepAlive = HttpUtil.isKeepAlive(request);
this.requestHeaders = request.headers();
Expand Down Expand Up @@ -168,6 +170,14 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S
throw new IllegalStateException("Status and headers were already sent");
}

if (!requestContext.socketConfiguration().continueImmediately()
&& HttpUtil.is100ContinueExpected(requestContext.request())
&& !requestContext.isDataRequested()) {
channel.expectationFailed();
entityRequested.complete(false);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE_ON_FAILURE);
}

HttpResponseStatus nettyStatus;
if (status instanceof Http.Status || status.reasonPhrase() == null) {
// default reason phrase
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
* Copyright (c) 2017, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -301,7 +301,7 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
// Context, publisher and DataChunk queue for this request/response
DataChunkHoldingQueue queue = new DataChunkHoldingQueue();
HttpRequestScopedPublisher publisher = new HttpRequestScopedPublisher(queue);
requestContext = new RequestContext(publisher, request, requestScope);
requestContext = new RequestContext(publisher, request, requestScope, soConfig);

// Closure local variables that cache mutable instance variables
RequestContext requestContextRef = requestContext;
Expand All @@ -313,8 +313,11 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
IndirectReference<HttpRequestScopedPublisher, DataChunkHoldingQueue> publisherRef =
new IndirectReference<>(publisher, queues, queue);

CompletableFuture<Boolean> entityRequested = new CompletableFuture<>();

// Set up read strategy for channel based on consumer demand
publisher.onRequest((n, demand) -> {
entityRequested.complete(true);
if (publisher.isUnbounded()) {
LOGGER.finest(() -> formatMsg("Netty autoread: true", ctx));
ctx.channel().config().setAutoRead(true);
Expand Down Expand Up @@ -399,20 +402,20 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
// Create response and handler for its completion
BareResponseImpl bareResponse =
new BareResponseImpl(ctx,
entityRequested,
request,
requestContext,
prevRequestFuture,
requestEntityAnalyzed,
soConfig.backpressureBufferSize(),
soConfig.backpressureStrategy(),
soConfig,
requestId);
prevRequestFuture = new CompletableFuture<>();
CompletableFuture<?> thisResp = prevRequestFuture;
bareResponse.whenCompleted()
.thenRun(() -> {
// Mark response completed in context
requestContextRef.responseCompleted(true);

entityRequested.complete(false);
// Consume and release any buffers in publisher
publisher.clearAndRelease();

Expand All @@ -430,12 +433,19 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
LOGGER.fine(formatMsg("Response complete: %s", ctx, System.identityHashCode(msg)));
}
});
/*
TODO we should only send continue in case the entity is request (e.g. we found a route and user started reading it)
This would solve connection close for 404 for requests with entity
*/
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx, request);


if (soConfig.continueImmediately()) {
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx, request);
}
} else {
// Send 100 continue only when entity is actually requested
entityRequested.thenAccept(requestedByUser -> {
if (requestedByUser && HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx, request);
}
});
}

// If a problem during routing, return 400 response
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, 2022 Oracle and/or its affiliates.
* Copyright (c) 2021, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,9 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.handler.codec.http.HttpExpectationFailedEvent;
import io.netty.util.concurrent.Future;

/**
Expand All @@ -40,10 +42,12 @@
*/
class NettyChannel {
private final Channel channel;
private final ChannelHandlerContext ctx;
private CompletionStage<ChannelFuture> writeFuture = CompletableFuture.completedFuture(null);

NettyChannel(Channel channel) {
this.channel = channel;
NettyChannel(ChannelHandlerContext ctx) {
this.ctx = ctx;
this.channel = ctx.channel();
}

/**
Expand Down Expand Up @@ -133,6 +137,14 @@ static <T> void completeFuture(Future<? super Void> nettyFuture, CompletableFutu
}
}


/**
* Reset HttpObjectDecoder to not expect data.
*/
void expectationFailed(){
ctx.pipeline().fireUserEventTriggered(HttpExpectationFailedEvent.INSTANCE);
}

@Override
public String toString() {
return "NettyChannel{"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
* Copyright (c) 2017, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,13 +36,18 @@ class RequestContext {
private final HttpRequestScopedPublisher publisher;
private final HttpRequest request;
private final Context scope;
private final SocketConfiguration socketConfiguration;
private volatile boolean responseCompleted;
private volatile boolean emitted;

RequestContext(HttpRequestScopedPublisher publisher, HttpRequest request, Context scope) {
RequestContext(HttpRequestScopedPublisher publisher,
HttpRequest request,
Context scope,
SocketConfiguration socketConfiguration) {
this.publisher = publisher;
this.request = request;
this.scope = scope;
this.socketConfiguration = socketConfiguration;
}

Multi<DataChunk> publisher() {
Expand Down Expand Up @@ -124,4 +129,8 @@ void responseCompleted(boolean responseCompleted) {
boolean responseCompleted() {
return responseCompleted;
}

SocketConfiguration socketConfiguration() {
return socketConfiguration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ static class SocketConfig implements SocketConfiguration {
private final long maxPayloadSize;
private final long backpressureBufferSize;
private final BackpressureStrategy backpressureStrategy;
private final boolean continueImmediately;
private final int maxUpgradeContentLength;
private final List<RequestedUriDiscoveryType> requestedUriDiscoveryTypes;
private final AllowList trustedProxies;
Expand All @@ -248,6 +249,7 @@ static class SocketConfig implements SocketConfiguration {
this.maxPayloadSize = builder.maxPayloadSize();
this.backpressureBufferSize = builder.backpressureBufferSize();
this.backpressureStrategy = builder.backpressureStrategy();
this.continueImmediately = builder.continueImmediately();
this.maxUpgradeContentLength = builder.maxUpgradeContentLength();
WebServerTls webServerTls = builder.tlsConfig();
this.webServerTls = webServerTls.enabled() ? webServerTls : null;
Expand Down Expand Up @@ -366,6 +368,11 @@ public BackpressureStrategy backpressureStrategy() {
return backpressureStrategy;
}

@Override
public boolean continueImmediately() {
return continueImmediately;
}

@Override
public List<RequestedUriDiscoveryType> requestedUriDiscoveryTypes() {
return requestedUriDiscoveryTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,21 @@ public Builder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
return this;
}

/**
* When true WebServer answers to expect continue with 100 continue immediately,
* not waiting for user to actually request the data.
* <p>
* Default is {@code false}
*
* @param continueImmediately , answer with 100 continue immediately after expect continue, default is false
* @return this builder
*/
@Override
public Builder continueImmediately(boolean continueImmediately) {
defaultSocketBuilder().continueImmediately(continueImmediately);
return this;
}

/**
* Set a maximum length of the content of an upgrade request.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,17 @@ default BackpressureStrategy backpressureStrategy() {
return BackpressureStrategy.LINEAR;
}

/**
* When true WebServer answers with 100 continue immediately,
* not waiting for user to actually request the data.
* False is default value.
*
* @return strategy identifier for applying backpressure
*/
default boolean continueImmediately() {
return false;
}

/**
* Initial size of the buffer used to parse HTTP line and headers.
*
Expand Down Expand Up @@ -505,6 +516,18 @@ default B tls(Supplier<WebServerTls> tlsConfig) {
@ConfiguredOption("LINEAR")
B backpressureStrategy(BackpressureStrategy backpressureStrategy);

/**
* When true WebServer answers to expect continue with 100 continue immediately,
* not waiting for user to actually request the data.
* <p>
* Default is {@code false}
*
* @param continueImmediately , answer with 100 continue immediately after expect continue, default is false
* @return this builder
*/
@ConfiguredOption("false")
B continueImmediately(boolean continueImmediately);

/**
* Set a maximum length of the content of an upgrade request.
* <p>
Expand Down Expand Up @@ -653,6 +676,7 @@ final class Builder implements SocketConfigurationBuilder<Builder>, io.helidon.c
private boolean enableCompression = false;
private long maxPayloadSize = -1;
private BackpressureStrategy backpressureStrategy = BackpressureStrategy.LINEAR;
private boolean continueImmediately = false;
private int maxUpgradeContentLength = 64 * 1024;
private long maxBufferSize = 5 * 1024 * 1024;
private final List<RequestedUriDiscoveryType> requestedUriDiscoveryTypes = new ArrayList<>();
Expand Down Expand Up @@ -844,6 +868,12 @@ public Builder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
return this;
}

@Override
public Builder continueImmediately(boolean continueImmediately) {
this.continueImmediately = continueImmediately;
return this;
}

@Override
public Builder maxUpgradeContentLength(int size) {
this.maxUpgradeContentLength = size;
Expand Down Expand Up @@ -1037,6 +1067,10 @@ BackpressureStrategy backpressureStrategy() {
return backpressureStrategy;
}

boolean continueImmediately() {
return continueImmediately;
}

int maxUpgradeContentLength() {
return maxUpgradeContentLength;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,12 @@ public Builder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
return this;
}

@Override
public Builder continueImmediately(boolean continueImmediately) {
configurationBuilder.continueImmediately(continueImmediately);
return this;
}

@Override
public Builder maxUpgradeContentLength(int size) {
configurationBuilder.maxUpgradeContentLength(size);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, 2022 Oracle and/or its affiliates.
* Copyright (c) 2021, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,13 +51,15 @@ public Flow.Subscriber<DataChunk> createFlowSubscriber(WhiteboxSubscriberProbe<D
Mockito.when(ctx.channel()).thenReturn(channel);
Mockito.when(channel.closeFuture()).thenReturn(channelFuture);

SocketConfiguration socketConfiguration = SocketConfiguration.create("@default");

return new BareResponseImpl(ctx,
CompletableFuture.completedFuture(false),
httpRequest,
requestContext,
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
100 * 1024,
BackpressureStrategy.LINEAR,
socketConfiguration,
0L) {
@Override
public void onSubscribe(Flow.Subscription subscription) {
Expand Down
Loading