Skip to content

Commit

Permalink
6033 HTTP/2 Client 100 continue support
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec committed Sep 18, 2023
1 parent ecdef53 commit 917e5ff
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ public interface Http2Stream {
*
* @param header frame header
* @param data frame data
* @param endOfStream whether this is the last data that would be received
*/
void data(Http2FrameHeader header, BufferData data);
void data(Http2FrameHeader header, BufferData data, boolean endOfStream);

/**
* Priority.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ protected WebClientServiceResponse doProceed(WebClientServiceRequest serviceRequ
stream.flowControl().inbound().incrementWindowSize(clientRequest().requestPrefetch());
whenSent.complete(serviceRequest);

stream.waitFor100Continue();

if (entityBytes.length != 0) {
stream.writeData(BufferData.create(entityBytes), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class Http2ClientConnection {
private final DataReader reader;
private final DataWriter dataWriter;
private final Semaphore pingPongSemaphore = new Semaphore(0);
private final Http2ClientConfig clientConfig;
private volatile int lastStreamId;

private Http2Settings serverSettings = Http2Settings.builder()
Expand All @@ -87,13 +88,14 @@ class Http2ClientConnection {

private volatile boolean closed = false;

Http2ClientConnection(Http2ClientProtocolConfig protocolConfig, ClientConnection connection) {
Http2ClientConnection(Http2ClientImpl http2Client, ClientConnection connection) {
this.protocolConfig = http2Client.protocolConfig();
this.clientConfig = http2Client.clientConfig();
this.connectionFlowControl = ConnectionFlowControl.clientBuilder(this::writeWindowsUpdate)
.maxFrameSize(protocolConfig.maxFrameSize())
.initialWindowSize(protocolConfig.initialWindowSize())
.blockTimeout(protocolConfig.flowControlBlockTimeout())
.build();
this.protocolConfig = protocolConfig;
this.connection = connection;
this.ctx = connection.helidonSocket();
this.dataWriter = connection.writer();
Expand All @@ -105,7 +107,7 @@ static Http2ClientConnection create(Http2ClientImpl http2Client,
ClientConnection connection,
boolean sendSettings) {

Http2ClientConnection h2conn = new Http2ClientConnection(http2Client.protocolConfig(), connection);
Http2ClientConnection h2conn = new Http2ClientConnection(http2Client, connection);
h2conn.start(http2Client.protocolConfig(), http2Client.webClient().executor(), sendSettings);

return h2conn;
Expand Down Expand Up @@ -139,7 +141,8 @@ Http2ClientStream createStream(Http2StreamConfig config) {
Http2ClientStream stream = new Http2ClientStream(this,
serverSettings,
ctx,
config.timeout(),
config,
clientConfig,
streamIdSeq);
return stream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

import io.helidon.common.buffers.BufferData;
import io.helidon.common.socket.SocketContext;
import io.helidon.http.HeaderValues;
import io.helidon.http.Headers;
import io.helidon.http.WritableHeaders;
import io.helidon.http.http2.Http2ErrorCode;
import io.helidon.http.http2.Http2Exception;
import io.helidon.http.http2.Http2Flag;
Expand Down Expand Up @@ -60,14 +62,15 @@ class Http2ClientStream implements Http2Stream, ReleasableResource {
private final Http2Settings serverSettings;
private final SocketContext ctx;
private final Duration timeout;
private final Http2ClientConfig http2ClientConfig;
private final LockingStreamIdSequence streamIdSeq;
private final Http2FrameListener sendListener = new Http2LoggingFrameListener("cl-send");
private final Http2FrameListener recvListener = new Http2LoggingFrameListener("cl-recv");
private final Http2Settings settings = Http2Settings.create();
private final List<Http2FrameData> continuationData = new ArrayList<>();

private Http2StreamState state = Http2StreamState.IDLE;
private ReadState readState = ReadState.HEADERS;
private ReadState readState = ReadState.INIT;
private Http2Headers currentHeaders;
// accessed from stream thread an connection thread
private volatile StreamFlowControl flowControl;
Expand All @@ -81,12 +84,14 @@ class Http2ClientStream implements Http2Stream, ReleasableResource {
Http2ClientStream(Http2ClientConnection connection,
Http2Settings serverSettings,
SocketContext ctx,
Duration timeout,
Http2StreamConfig http2StreamConfig,
Http2ClientConfig http2ClientConfig,
LockingStreamIdSequence streamIdSeq) {
this.connection = connection;
this.serverSettings = serverSettings;
this.ctx = ctx;
this.timeout = timeout;
this.timeout = http2StreamConfig.timeout();
this.http2ClientConfig = http2ClientConfig;
this.streamIdSeq = streamIdSeq;
}

Expand All @@ -102,15 +107,15 @@ public Http2StreamState streamState() {

@Override
public void headers(Http2Headers headers, boolean endOfStream) {
this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, false, endOfStream, true);
readState = readState.check(endOfStream ? ReadState.END : ReadState.DATA);
this.currentHeaders = headers;
this.hasEntity = !endOfStream;
}

@Override
public void trailers(Http2Headers headers, boolean endOfStream) {
// Doesn't really matter of we received endOfStream
// end of the trailers is end of the exchange
this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, false, endOfStream, true);
readState = readState.check(ReadState.END);
this.trailers.complete(headers.httpHeaders());
}
Expand Down Expand Up @@ -158,8 +163,9 @@ public void windowUpdate(Http2WindowUpdate windowUpdate) {
}

@Override
public void data(Http2FrameHeader header, BufferData data) {
readState = readState.check(ReadState.DATA);
public void data(Http2FrameHeader header, BufferData data, boolean endOfStream) {
this.state = Http2StreamState.checkAndGetState(this.state, header.type(), false, endOfStream, false);
readState = readState.check(endOfStream ? ReadState.END : ReadState.DATA);
flowControl.inbound().incrementWindowSize(header.length());
}

Expand Down Expand Up @@ -225,16 +231,34 @@ BufferData read(int i) {

BufferData read() {
while (state == Http2StreamState.HALF_CLOSED_LOCAL && readState != ReadState.END && hasEntity) {
Http2FrameData frameData = readOne();
Http2FrameData frameData = readOne(timeout);
if (frameData != null) {
return frameData.data();
}
}
return BufferData.empty();
}

void waitFor100Continue() {
Duration readContinueTimeout = http2ClientConfig.readContinueTimeout();
try {
while (readState == ReadState.CONTINUE_100_HEADERS) {
readOne(readContinueTimeout);
}
} catch (StreamTimeoutException ignored) {
// Timeout, continue as if it was received
readState = readState.check(ReadState.HEADERS);
LOGGER.log(DEBUG, "Server didn't respond within 100 Continue timeout in "
+ readContinueTimeout
+ ", sending data.");
}
}

void write(Http2Headers http2Headers, boolean endOfStream) {
this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, true, endOfStream, true);
this.readState = readState.check(http2Headers.httpHeaders().contains(HeaderValues.EXPECT_100)
? ReadState.CONTINUE_100_HEADERS
: ReadState.HEADERS);
Http2Flag.HeaderFlags flags;
if (endOfStream) {
flags = Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM);
Expand All @@ -248,7 +272,7 @@ void write(Http2Headers http2Headers, boolean endOfStream) {
// greater than all streams that the initiating endpoint has opened or reserved.
this.streamId = streamIdSeq.lockAndNext();
this.connection.updateLastStreamId(streamId);
this.buffer = new StreamBuffer(streamId, timeout);
this.buffer = new StreamBuffer(streamId);

// fixme Configurable initial win size, max frame size
this.flowControl = connection.flowControl().createStreamFlowControl(
Expand Down Expand Up @@ -278,8 +302,8 @@ void writeData(BufferData entityBytes, boolean endOfStream) {
}

Http2Headers readHeaders() {
while (currentHeaders == null) {
Http2FrameData frameData = readOne();
while (readState == ReadState.HEADERS) {
Http2FrameData frameData = readOne(timeout);
if (frameData != null) {
throw new IllegalStateException("Unexpected frame type " + frameData.header() + ", HEADERS are expected.");
}
Expand All @@ -291,8 +315,8 @@ ClientOutputStream outputStream() {
return new ClientOutputStream();
}

private Http2FrameData readOne() {
Http2FrameData frameData = buffer.poll();
private Http2FrameData readOne(Duration pollTimeout) {
Http2FrameData frameData = buffer.poll(pollTimeout);

if (frameData != null) {

Expand All @@ -303,35 +327,68 @@ private Http2FrameData readOne() {
boolean endOfStream = (flags & Http2Flag.END_OF_STREAM) == Http2Flag.END_OF_STREAM;
boolean endOfHeaders = (flags & Http2Flag.END_OF_HEADERS) == Http2Flag.END_OF_HEADERS;

this.state = Http2StreamState.checkAndGetState(this.state,
frameData.header().type(),
false,
endOfStream,
endOfHeaders);

switch (frameData.header().type()) {
case DATA:
data(frameData.header(), frameData.data());
data(frameData.header(), frameData.data(), endOfStream);
return frameData;

case HEADERS, CONTINUATION:
continuationData.add(frameData);

// (HEADERS[100-continue] CONTINUATION*)*
// ^------- endOfHeaders
// HEADERS+
// CONTINUATION*
// ^------- endOfHeaders
// DATA*
// (HEADERS[trailers] CONTINUATION*)+
// ^------- endOfHeaders
if (endOfHeaders) {
var requestHuffman = Http2HuffmanDecoder.create();

Http2Headers http2Headers = Http2Headers.create(this,
connection.getInboundDynamicTable(),
requestHuffman,
continuationData.toArray(new Http2FrameData[0]));

recvListener.headers(ctx, streamId, http2Headers);

if (readState == ReadState.HEADERS) {
// HTTP/1.1 100 Continue HEADERS
// Extension-Field: bar ==> - END_STREAM
// + END_HEADERS
// :status = 100
// extension-field = bar
//
// HTTP/1.1 200 OK HEADERS
// Content-Type: image/jpeg ==> - END_STREAM
// Transfer-Encoding: chunked + END_HEADERS
// Trailer: Foo :status = 200
// content-type = image/jpeg
// 123 trailer = Foo
// {binary data}
// 0 DATA
// Foo: bar - END_STREAM
// {binary data}
//
// HEADERS
// + END_STREAM
// + END_HEADERS
// foo = bar
switch (readState) {
case CONTINUE_100_HEADERS -> {
Http2Headers http2Headers = readHeaders(requestHuffman, false);
// Clear out for headers
continuationData.clear();
this.continue100(http2Headers, endOfStream);
}
case HEADERS -> {
// Add extension headers from 100 Continue
Http2Headers http2Headers = readHeaders(requestHuffman, true);
// Clear out for trailers
continuationData.clear();
this.headers(http2Headers, endOfStream);
} else {
}
case DATA, TRAILERS -> {
Http2Headers http2Headers = readHeaders(requestHuffman, false);
this.trailers(http2Headers, endOfStream);
}
default -> {
throw new IllegalStateException("Client is in wrong read state " + readState.name());
}
}
}
break;
default:
Expand All @@ -341,6 +398,25 @@ private Http2FrameData readOne() {
return null;
}

private void continue100(Http2Headers headers, boolean endOfStream) {
// no stream state check as 100 continues are an exception
this.currentHeaders = headers;
readState = readState.check(ReadState.HEADERS);
this.hasEntity = !endOfStream;
}

private Http2Headers readHeaders(Http2HuffmanDecoder decoder, boolean mergeWithPrevious) {
Http2Headers http2Headers = Http2Headers.create(this,
connection.getInboundDynamicTable(),
decoder,
mergeWithPrevious && currentHeaders != null
? currentHeaders
: Http2Headers.create(WritableHeaders.create()),
continuationData.toArray(new Http2FrameData[0]));
recvListener.headers(ctx, streamId, http2Headers);
return http2Headers;
}

private void splitAndWrite(Http2FrameData frameData) {
int maxFrameSize = this.serverSettings.value(Http2Setting.MAX_FRAME_SIZE).intValue();

Expand Down Expand Up @@ -395,7 +471,9 @@ private enum ReadState {
END,
TRAILERS(END),
DATA(TRAILERS, END),
HEADERS(DATA, TRAILERS);
HEADERS(DATA, TRAILERS),
CONTINUE_100_HEADERS(HEADERS),
INIT(CONTINUE_100_HEADERS, HEADERS);

private final Set<ReadState> allowedTransitions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,12 @@ class StreamBuffer {
private final Semaphore dequeSemaphore = new Semaphore(1);
private final Queue<Http2FrameData> buffer = new ArrayDeque<>();
private final int streamId;
private final Duration timeout;

StreamBuffer(int streamId, Duration timeout) {
StreamBuffer(int streamId) {
this.streamId = streamId;
this.timeout = timeout;
}

Http2FrameData poll() {
Http2FrameData poll(Duration timeout) {
try {
// Block deque thread when queue is empty
// avoid CPU burning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public void testInvalidTextContentTypeStrict() {
Headers h = res.headers();
// Raw protocol data value
assertThat(res.headers(), hasHeader(INVALID_CONTENT_TYPE_TEXT));
Header rawContentType = h.get(HeaderNames.CONTENT_TYPE);
assertThat(rawContentType.get(), is(INVALID_CONTENT_TYPE_TEXT.get()));
// Media type parsed value is invalid, IllegalArgumentException shall be thrown
var ex = assertThrows(IllegalArgumentException.class, h::contentType);
assertThat(ex.getMessage(), is("Cannot parse media type: text"));
Expand Down
Loading

0 comments on commit 917e5ff

Please sign in to comment.