Skip to content

Commit

Permalink
Review issues 2
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec committed Sep 22, 2023
1 parent 875ae82 commit 3c5eb1a
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected WebClientServiceResponse doProceed(WebClientServiceRequest serviceRequ

Http2Headers http2Headers = prepareHeaders(serviceRequest.method(), headers, uri);

stream.write(http2Headers, entityBytes.length == 0);
stream.writeHeaders(http2Headers, entityBytes.length == 0);
stream.flowControl().inbound().incrementWindowSize(clientRequest().requestPrefetch());
whenSent.complete(serviceRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected WebClientServiceResponse doProceed(WebClientServiceRequest serviceRequ
ClientUri uri = serviceRequest.uri();
Http2Headers http2Headers = prepareHeaders(serviceRequest.method(), headers, uri);

stream.write(http2Headers, false);
stream.writeHeaders(http2Headers, false);
whenSent.complete(serviceRequest);

stream.waitFor100Continue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Http2ClientStream implements Http2Stream, ReleasableResource {
private final Http2FrameListener recvListener = new Http2LoggingFrameListener("cl-recv");
private final Http2Settings settings = Http2Settings.create();
private final List<Http2FrameData> continuationData = new ArrayList<>();
private final CompletableFuture<Headers> trailers = new CompletableFuture<>();

private Http2StreamState state = Http2StreamState.IDLE;
private ReadState readState = ReadState.INIT;
Expand All @@ -80,7 +81,6 @@ class Http2ClientStream implements Http2Stream, ReleasableResource {
// streamId and buffer can only be created when we are locked in the stream id sequence
private int streamId;
private StreamBuffer buffer;
private final CompletableFuture<Headers> trailers = new CompletableFuture<>();

Http2ClientStream(Http2ClientConnection connection,
Http2Settings serverSettings,
Expand Down Expand Up @@ -253,7 +253,7 @@ void waitFor100Continue() {
}
}

void write(Http2Headers http2Headers, boolean endOfStream) {
void writeHeaders(Http2Headers http2Headers, boolean endOfStream) {
this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, true, endOfStream, true);
this.readState = readState.check(http2Headers.httpHeaders().contains(HeaderValues.EXPECT_100)
? ReadState.CONTINUE_100_HEADERS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,18 @@
import io.helidon.http.ServerResponseHeaders;
import io.helidon.http.ServerResponseTrailers;
import io.helidon.http.Status;
import io.helidon.http.http2.FlowControl;
import io.helidon.http.http2.Http2Flag;
import io.helidon.http.http2.Http2Flag.DataFlags;
import io.helidon.http.http2.Http2FrameData;
import io.helidon.http.http2.Http2FrameHeader;
import io.helidon.http.http2.Http2FrameTypes;
import io.helidon.http.http2.Http2Headers;
import io.helidon.http.http2.Http2StreamWriter;
import io.helidon.webserver.ConnectionContext;
import io.helidon.webserver.http.ServerResponseBase;

class Http2ServerResponse extends ServerResponseBase<Http2ServerResponse> {
private static final System.Logger LOGGER = System.getLogger(Http2ServerResponse.class.getName());

private final ConnectionContext ctx;
private final Http2StreamWriter writer;
private final int streamId;
private final ServerResponseHeaders headers;
private final ServerResponseTrailers trailers;
private final FlowControl.Outbound flowControl;
private final Http2ServerRequest request;
private final Http2ServerStream stream;

private boolean isSent;
private boolean streamingEntity;
Expand All @@ -59,17 +50,12 @@ class Http2ServerResponse extends ServerResponseBase<Http2ServerResponse> {
private UnaryOperator<OutputStream> outputStreamFilter;
private String streamResult = null;

Http2ServerResponse(ConnectionContext ctx,
Http2ServerRequest request,
Http2StreamWriter writer,
int streamId,
FlowControl.Outbound flowControl) {
super(ctx, request);
this.ctx = ctx;
Http2ServerResponse(Http2ServerStream stream,
Http2ServerRequest request) {
super(stream.connectionContext(), request);
this.ctx = stream.connectionContext();
this.request = request;
this.writer = writer;
this.streamId = streamId;
this.flowControl = flowControl;
this.stream = stream;
this.headers = ServerResponseHeaders.create();
this.trailers = ServerResponseTrailers.create();
}
Expand Down Expand Up @@ -119,26 +105,11 @@ public void send(byte[] entityBytes) {

boolean sendTrailers = request.headers().contains(HeaderValues.TE_TRAILERS) || headers.contains(HeaderNames.TRAILER);

Http2FrameData frameData =
new Http2FrameData(Http2FrameHeader.create(bytes.length,
Http2FrameTypes.DATA,
DataFlags.create(sendTrailers ? 0 : Http2Flag.END_OF_STREAM),
streamId),
BufferData.create(bytes));

http2Headers.validateResponse();
bytesWritten = writer.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS),
frameData, flowControl);
bytesWritten += stream.writeHeadersWithData(http2Headers, bytes.length, BufferData.create(bytes), !sendTrailers);

if (sendTrailers) {
Http2Headers http2trailers = Http2Headers.create(trailers);
int written = writer.writeHeaders(http2trailers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM),
flowControl);
bytesWritten += written;
bytesWritten += stream.writeTrailers(Http2Headers.create(trailers));
}

afterSend();
Expand Down Expand Up @@ -244,12 +215,10 @@ private static class BlockingOutputStream extends OutputStream {
private final Http2ServerRequest request;
private final ServerResponseHeaders headers;
private final ServerResponseTrailers trailers;
private final Http2StreamWriter writer;
private final int streamId;
private final FlowControl.Outbound flowControl;
private final Status status;
private final Runnable responseCloseRunnable;
private final Http2ServerResponse response;
private final Http2ServerStream stream;

private BufferData firstBuffer;
private boolean closed;
Expand All @@ -263,9 +232,7 @@ private BlockingOutputStream(Http2ServerRequest request,
this.response = response;
this.headers = response.headers;
this.trailers = response.trailers;
this.writer = response.writer;
this.streamId = response.streamId;
this.flowControl = response.flowControl;
this.stream = response.stream;
this.status = response.status();
this.responseCloseRunnable = responseCloseRunnable;
}
Expand Down Expand Up @@ -309,7 +276,7 @@ void commit() {
} else if (sendTrailers) {
sendTrailers();
} else {
sendEndOfStream();
bytesWritten += stream.writeData(BufferData.empty(), true);
}
responseCloseRunnable.run();
try {
Expand All @@ -332,9 +299,9 @@ private void write(BufferData buffer) throws IOException {
if (firstByte) {
sendHeadersAndPrepare();
firstByte = false;
writeChunk(BufferData.create(firstBuffer, buffer));
bytesWritten += stream.writeData(BufferData.create(firstBuffer, buffer), false);
} else {
writeChunk(buffer);
bytesWritten += stream.writeData(buffer, false);
}
}

Expand All @@ -358,29 +325,9 @@ private void sendFirstChunkOnly(boolean sendTrailers) {

// at this moment, we must send headers
if (contentLength == 0) {
int written = writer.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(
sendTrailers
? Http2Flag.END_OF_HEADERS
: Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM),
flowControl);
bytesWritten += written;
bytesWritten += stream.writeHeaders(http2Headers, !sendTrailers);
} else {
Http2FrameData frameData =
new Http2FrameData(Http2FrameHeader.create(contentLength,
Http2FrameTypes.DATA,
DataFlags.create(sendTrailers
? 0
: Http2Flag.END_OF_STREAM),
streamId),
firstBuffer);
int written = writer.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS),
frameData, flowControl);

bytesWritten += written;
bytesWritten += stream.writeHeadersWithData(http2Headers, contentLength, firstBuffer, !sendTrailers);
}
}

Expand All @@ -390,36 +337,8 @@ private void sendHeadersAndPrepare() {
Http2Headers http2Headers = Http2Headers.create(headers);
http2Headers.status(status);
http2Headers.validateResponse();
int written = writer.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS),
flowControl);

bytesWritten += written;
}

private void writeChunk(BufferData buffer) {
Http2FrameData frameData = new Http2FrameData(Http2FrameHeader.create(buffer.available(),
Http2FrameTypes.DATA,
DataFlags.create(0),
streamId),
buffer);
bytesWritten += frameData.header().length();
bytesWritten += Http2FrameHeader.LENGTH;

writer.writeData(frameData, flowControl);
}

private void sendEndOfStream() {
Http2FrameData frameData = new Http2FrameData(Http2FrameHeader.create(0,
Http2FrameTypes.DATA,
DataFlags.create(Http2Flag.END_OF_STREAM),
streamId),
BufferData.empty());

bytesWritten += frameData.header().length();
bytesWritten += Http2FrameHeader.LENGTH;
writer.writeData(frameData, flowControl);
bytesWritten += stream.writeHeaders(http2Headers, false);
}

private void sendTrailers(){
Expand All @@ -429,12 +348,7 @@ private void sendTrailers(){
trailers.set(STREAM_STATUS_NAME, response.status().code());

Http2Headers http2Headers = Http2Headers.create(trailers);
int written = writer.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS
| Http2Flag.END_OF_STREAM),
flowControl);
bytesWritten += written;
bytesWritten += stream.writeTrailers(http2Headers);
}
}
}
Loading

0 comments on commit 3c5eb1a

Please sign in to comment.