Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: HTTP/2 Client 100 continue #7604

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ public interface Http2Stream {
*
* @param header frame header
* @param data frame data
* @param endOfStream whether this is the last data that would be received
*/
void data(Http2FrameHeader header, BufferData data);
void data(Http2FrameHeader header, BufferData data, boolean endOfStream);

/**
* Priority.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ protected WebClientServiceResponse doProceed(WebClientServiceRequest serviceRequ
stream.flowControl().inbound().incrementWindowSize(clientRequest().requestPrefetch());
whenSent.complete(serviceRequest);

stream.waitFor100Continue();

if (entityBytes.length != 0) {
stream.writeData(BufferData.create(entityBytes), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class Http2ClientConnection {
private final DataReader reader;
private final DataWriter dataWriter;
private final Semaphore pingPongSemaphore = new Semaphore(0);
private final Http2ClientConfig clientConfig;
private volatile int lastStreamId;

private Http2Settings serverSettings = Http2Settings.builder()
Expand All @@ -87,13 +88,14 @@ class Http2ClientConnection {

private volatile boolean closed = false;

Http2ClientConnection(Http2ClientProtocolConfig protocolConfig, ClientConnection connection) {
Http2ClientConnection(Http2ClientImpl http2Client, ClientConnection connection) {
this.protocolConfig = http2Client.protocolConfig();
this.clientConfig = http2Client.clientConfig();
this.connectionFlowControl = ConnectionFlowControl.clientBuilder(this::writeWindowsUpdate)
.maxFrameSize(protocolConfig.maxFrameSize())
.initialWindowSize(protocolConfig.initialWindowSize())
.blockTimeout(protocolConfig.flowControlBlockTimeout())
.build();
this.protocolConfig = protocolConfig;
this.connection = connection;
this.ctx = connection.helidonSocket();
this.dataWriter = connection.writer();
Expand All @@ -105,7 +107,7 @@ static Http2ClientConnection create(Http2ClientImpl http2Client,
ClientConnection connection,
boolean sendSettings) {

Http2ClientConnection h2conn = new Http2ClientConnection(http2Client.protocolConfig(), connection);
Http2ClientConnection h2conn = new Http2ClientConnection(http2Client, connection);
h2conn.start(http2Client.protocolConfig(), http2Client.webClient().executor(), sendSettings);

return h2conn;
Expand Down Expand Up @@ -139,7 +141,8 @@ Http2ClientStream createStream(Http2StreamConfig config) {
Http2ClientStream stream = new Http2ClientStream(this,
serverSettings,
ctx,
config.timeout(),
config,
clientConfig,
streamIdSeq);
return stream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

import io.helidon.common.buffers.BufferData;
import io.helidon.common.socket.SocketContext;
import io.helidon.http.HeaderValues;
import io.helidon.http.Headers;
import io.helidon.http.WritableHeaders;
import io.helidon.http.http2.Http2ErrorCode;
import io.helidon.http.http2.Http2Exception;
import io.helidon.http.http2.Http2Flag;
Expand Down Expand Up @@ -60,14 +62,15 @@ class Http2ClientStream implements Http2Stream, ReleasableResource {
private final Http2Settings serverSettings;
private final SocketContext ctx;
private final Duration timeout;
private final Http2ClientConfig http2ClientConfig;
private final LockingStreamIdSequence streamIdSeq;
private final Http2FrameListener sendListener = new Http2LoggingFrameListener("cl-send");
private final Http2FrameListener recvListener = new Http2LoggingFrameListener("cl-recv");
private final Http2Settings settings = Http2Settings.create();
private final List<Http2FrameData> continuationData = new ArrayList<>();

private Http2StreamState state = Http2StreamState.IDLE;
private ReadState readState = ReadState.HEADERS;
private ReadState readState = ReadState.INIT;
private Http2Headers currentHeaders;
// accessed from stream thread an connection thread
private volatile StreamFlowControl flowControl;
Expand All @@ -81,12 +84,14 @@ class Http2ClientStream implements Http2Stream, ReleasableResource {
Http2ClientStream(Http2ClientConnection connection,
Http2Settings serverSettings,
SocketContext ctx,
Duration timeout,
Http2StreamConfig http2StreamConfig,
Http2ClientConfig http2ClientConfig,
LockingStreamIdSequence streamIdSeq) {
this.connection = connection;
this.serverSettings = serverSettings;
this.ctx = ctx;
this.timeout = timeout;
this.timeout = http2StreamConfig.timeout();
this.http2ClientConfig = http2ClientConfig;
this.streamIdSeq = streamIdSeq;
}

Expand All @@ -102,15 +107,15 @@ public Http2StreamState streamState() {

@Override
public void headers(Http2Headers headers, boolean endOfStream) {
this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, false, endOfStream, true);
readState = readState.check(endOfStream ? ReadState.END : ReadState.DATA);
this.currentHeaders = headers;
this.hasEntity = !endOfStream;
}

@Override
public void trailers(Http2Headers headers, boolean endOfStream) {
// Doesn't really matter of we received endOfStream
// end of the trailers is end of the exchange
this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, false, endOfStream, true);
readState = readState.check(ReadState.END);
this.trailers.complete(headers.httpHeaders());
}
Expand Down Expand Up @@ -158,8 +163,9 @@ public void windowUpdate(Http2WindowUpdate windowUpdate) {
}

@Override
public void data(Http2FrameHeader header, BufferData data) {
readState = readState.check(ReadState.DATA);
public void data(Http2FrameHeader header, BufferData data, boolean endOfStream) {
this.state = Http2StreamState.checkAndGetState(this.state, header.type(), false, endOfStream, false);
readState = readState.check(endOfStream ? ReadState.END : ReadState.DATA);
flowControl.inbound().incrementWindowSize(header.length());
}

Expand Down Expand Up @@ -225,16 +231,34 @@ BufferData read(int i) {

BufferData read() {
while (state == Http2StreamState.HALF_CLOSED_LOCAL && readState != ReadState.END && hasEntity) {
Http2FrameData frameData = readOne();
Http2FrameData frameData = readOne(timeout);
if (frameData != null) {
return frameData.data();
}
}
return BufferData.empty();
}

void waitFor100Continue() {
Duration readContinueTimeout = http2ClientConfig.readContinueTimeout();
try {
while (readState == ReadState.CONTINUE_100_HEADERS) {
readOne(readContinueTimeout);
}
} catch (StreamTimeoutException ignored) {
// Timeout, continue as if it was received
readState = readState.check(ReadState.HEADERS);
LOGGER.log(DEBUG, "Server didn't respond within 100 Continue timeout in "
+ readContinueTimeout
+ ", sending data.");
}
}

void write(Http2Headers http2Headers, boolean endOfStream) {
this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, true, endOfStream, true);
this.readState = readState.check(http2Headers.httpHeaders().contains(HeaderValues.EXPECT_100)
? ReadState.CONTINUE_100_HEADERS
: ReadState.HEADERS);
Http2Flag.HeaderFlags flags;
if (endOfStream) {
flags = Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM);
Expand All @@ -248,7 +272,7 @@ void write(Http2Headers http2Headers, boolean endOfStream) {
// greater than all streams that the initiating endpoint has opened or reserved.
this.streamId = streamIdSeq.lockAndNext();
this.connection.updateLastStreamId(streamId);
this.buffer = new StreamBuffer(streamId, timeout);
this.buffer = new StreamBuffer(streamId);

// fixme Configurable initial win size, max frame size
this.flowControl = connection.flowControl().createStreamFlowControl(
Expand Down Expand Up @@ -278,8 +302,8 @@ void writeData(BufferData entityBytes, boolean endOfStream) {
}

Http2Headers readHeaders() {
while (currentHeaders == null) {
Http2FrameData frameData = readOne();
while (readState == ReadState.HEADERS) {
Http2FrameData frameData = readOne(timeout);
if (frameData != null) {
throw new IllegalStateException("Unexpected frame type " + frameData.header() + ", HEADERS are expected.");
}
Expand All @@ -291,8 +315,8 @@ ClientOutputStream outputStream() {
return new ClientOutputStream();
}

private Http2FrameData readOne() {
Http2FrameData frameData = buffer.poll();
private Http2FrameData readOne(Duration pollTimeout) {
Http2FrameData frameData = buffer.poll(pollTimeout);

if (frameData != null) {

Expand All @@ -303,35 +327,68 @@ private Http2FrameData readOne() {
boolean endOfStream = (flags & Http2Flag.END_OF_STREAM) == Http2Flag.END_OF_STREAM;
boolean endOfHeaders = (flags & Http2Flag.END_OF_HEADERS) == Http2Flag.END_OF_HEADERS;

this.state = Http2StreamState.checkAndGetState(this.state,
frameData.header().type(),
false,
endOfStream,
endOfHeaders);

switch (frameData.header().type()) {
case DATA:
data(frameData.header(), frameData.data());
data(frameData.header(), frameData.data(), endOfStream);
return frameData;

case HEADERS, CONTINUATION:
continuationData.add(frameData);

// (HEADERS[100-continue] CONTINUATION*)*
// ^------- endOfHeaders
// HEADERS+
// CONTINUATION*
// ^------- endOfHeaders
// DATA*
// (HEADERS[trailers] CONTINUATION*)+
// ^------- endOfHeaders
if (endOfHeaders) {
var requestHuffman = Http2HuffmanDecoder.create();

Http2Headers http2Headers = Http2Headers.create(this,
connection.getInboundDynamicTable(),
requestHuffman,
continuationData.toArray(new Http2FrameData[0]));

recvListener.headers(ctx, streamId, http2Headers);

if (readState == ReadState.HEADERS) {
// HTTP/1.1 100 Continue HEADERS
// Extension-Field: bar ==> - END_STREAM
// + END_HEADERS
// :status = 100
// extension-field = bar
//
// HTTP/1.1 200 OK HEADERS
// Content-Type: image/jpeg ==> - END_STREAM
// Transfer-Encoding: chunked + END_HEADERS
// Trailer: Foo :status = 200
// content-type = image/jpeg
// 123 trailer = Foo
// {binary data}
// 0 DATA
// Foo: bar - END_STREAM
// {binary data}
//
// HEADERS
// + END_STREAM
// + END_HEADERS
// foo = bar
switch (readState) {
case CONTINUE_100_HEADERS -> {
Http2Headers http2Headers = readHeaders(requestHuffman, false);
// Clear out for headers
continuationData.clear();
this.continue100(http2Headers, endOfStream);
}
case HEADERS -> {
// Add extension headers from 100 Continue
Http2Headers http2Headers = readHeaders(requestHuffman, true);
// Clear out for trailers
continuationData.clear();
this.headers(http2Headers, endOfStream);
} else {
}
case DATA, TRAILERS -> {
Http2Headers http2Headers = readHeaders(requestHuffman, false);
this.trailers(http2Headers, endOfStream);
}
default -> {
throw new IllegalStateException("Client is in wrong read state " + readState.name());
}
}
}
break;
default:
Expand All @@ -341,6 +398,25 @@ private Http2FrameData readOne() {
return null;
}

private void continue100(Http2Headers headers, boolean endOfStream) {
// no stream state check as 100 continues are an exception
this.currentHeaders = headers;
readState = readState.check(ReadState.HEADERS);
this.hasEntity = !endOfStream;
}

private Http2Headers readHeaders(Http2HuffmanDecoder decoder, boolean mergeWithPrevious) {
Http2Headers http2Headers = Http2Headers.create(this,
connection.getInboundDynamicTable(),
decoder,
mergeWithPrevious && currentHeaders != null
? currentHeaders
: Http2Headers.create(WritableHeaders.create()),
continuationData.toArray(new Http2FrameData[0]));
recvListener.headers(ctx, streamId, http2Headers);
return http2Headers;
}

private void splitAndWrite(Http2FrameData frameData) {
int maxFrameSize = this.serverSettings.value(Http2Setting.MAX_FRAME_SIZE).intValue();

Expand Down Expand Up @@ -395,7 +471,9 @@ private enum ReadState {
END,
TRAILERS(END),
DATA(TRAILERS, END),
HEADERS(DATA, TRAILERS);
HEADERS(DATA, TRAILERS),
CONTINUE_100_HEADERS(HEADERS),
INIT(CONTINUE_100_HEADERS, HEADERS);

private final Set<ReadState> allowedTransitions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,12 @@ class StreamBuffer {
private final Semaphore dequeSemaphore = new Semaphore(1);
private final Queue<Http2FrameData> buffer = new ArrayDeque<>();
private final int streamId;
private final Duration timeout;

StreamBuffer(int streamId, Duration timeout) {
StreamBuffer(int streamId) {
this.streamId = streamId;
this.timeout = timeout;
}

Http2FrameData poll() {
Http2FrameData poll(Duration timeout) {
try {
// Block deque thread when queue is empty
// avoid CPU burning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public void testInvalidTextContentTypeStrict() {
Headers h = res.headers();
// Raw protocol data value
assertThat(res.headers(), hasHeader(INVALID_CONTENT_TYPE_TEXT));
Header rawContentType = h.get(HeaderNames.CONTENT_TYPE);
assertThat(rawContentType.get(), is(INVALID_CONTENT_TYPE_TEXT.get()));
// Media type parsed value is invalid, IllegalArgumentException shall be thrown
var ex = assertThrows(IllegalArgumentException.class, h::contentType);
assertThat(ex.getMessage(), is("Cannot parse media type: text"));
Expand Down
Loading
Loading