Skip to content

Commit

Permalink
http2: allocate on every chunk send
Browse files Browse the repository at this point in the history
Previously, we were using a shared stack allocated buffer to hold
the serialized outbound data but that runs into issues if the
outgoing stream does not write or copy immediately. Instead,
allocate a buffer each time. Slight additional overhead here,
but necessary.

Later on, once we've analyzed this more, we might be able to
switch to a stack allocated ring or slab buffer but that's a
bit more complicated than what we strictly need right now.

PR-URL: #16669
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Khaidi Chu <i@2333.moe>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
jasnell committed Nov 3, 2017
1 parent de24602 commit 4db1bc8
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 34 deletions.
37 changes: 19 additions & 18 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -853,32 +853,33 @@ int Http2Session::DoWrite(WriteWrap* req_wrap,
return 0;
}

void Http2Session::AllocateSend(uv_buf_t* buf) {
buf->base = stream_alloc();
buf->len = kAllocBufferSize;
WriteWrap* Http2Session::AllocateSend() {
HandleScope scope(env()->isolate());
auto AfterWrite = [](WriteWrap* req, int status) {
req->Dispose();
};
Local<Object> obj =
env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked();
// Base the amount allocated on the remote peers max frame size
uint32_t size =
nghttp2_session_get_remote_settings(
session(),
NGHTTP2_SETTINGS_MAX_FRAME_SIZE);
// Max frame size + 9 bytes for the header
return WriteWrap::New(env(), obj, this, AfterWrite, size + 9);
}

void Http2Session::Send(uv_buf_t* buf, size_t length) {
void Http2Session::Send(WriteWrap* req, char* buf, size_t length) {
DEBUG_HTTP2("Http2Session: Attempting to send data\n");
if (stream_ == nullptr || !stream_->IsAlive() || stream_->IsClosing()) {
return;
}
HandleScope scope(env()->isolate());
auto AfterWrite = [](WriteWrap* req_wrap, int status) {
req_wrap->Dispose();
};
Local<Object> req_wrap_obj =
env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked();
WriteWrap* write_req = WriteWrap::New(env(),
req_wrap_obj,
this,
AfterWrite);

chunks_sent_since_last_write_++;
uv_buf_t actual = uv_buf_init(buf->base, length);
if (stream_->DoWrite(write_req, &actual, 1, nullptr)) {
write_req->Dispose();
uv_buf_t actual = uv_buf_init(buf, length);
if (stream_->DoWrite(req, &actual, 1, nullptr)) {
req->Dispose();
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ class Http2Session : public AsyncWrap,
nghttp2_headers_category cat,
uint8_t flags) override;
void OnStreamClose(int32_t id, uint32_t code) override;
void Send(uv_buf_t* bufs, size_t total) override;
void OnDataChunk(Nghttp2Stream* stream, uv_buf_t* chunk) override;
void OnSettings(bool ack) override;
void OnPriority(int32_t stream,
Expand All @@ -430,7 +429,9 @@ class Http2Session : public AsyncWrap,
void OnFrameError(int32_t id, uint8_t type, int error_code) override;
void OnTrailers(Nghttp2Stream* stream,
const SubmitTrailers& submit_trailers) override;
void AllocateSend(uv_buf_t* buf) override;

void Send(WriteWrap* req, char* buf, size_t length) override;
WriteWrap* AllocateSend();

int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count,
uv_stream_t* send_handle) override;
Expand Down
23 changes: 15 additions & 8 deletions src/node_http2_core-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -490,17 +490,22 @@ inline void Nghttp2Session::SendPendingData() {
if (IsDestroying())
return;

uv_buf_t dest;
AllocateSend(&dest);
WriteWrap* req = nullptr;
char* dest = nullptr;
size_t destRemaining = 0;
size_t destLength = 0; // amount of data stored in dest
size_t destRemaining = dest.len; // amount space remaining in dest
size_t destOffset = 0; // current write offset of dest

const uint8_t* src; // pointer to the serialized data
ssize_t srcLength = 0; // length of serialized data chunk

// While srcLength is greater than zero
while ((srcLength = nghttp2_session_mem_send(session_, &src)) > 0) {
if (req == nullptr) {
req = AllocateSend();
destRemaining = req->self_size();
dest = req->Extra();
}
DEBUG_HTTP2("Nghttp2Session %s: nghttp2 has %d bytes to send\n",
TypeName(), srcLength);
size_t srcRemaining = srcLength;
Expand All @@ -512,18 +517,20 @@ inline void Nghttp2Session::SendPendingData() {
while (srcRemaining > destRemaining) {
DEBUG_HTTP2("Nghttp2Session %s: pushing %d bytes to the socket\n",
TypeName(), destLength + destRemaining);
memcpy(dest.base + destOffset, src + srcOffset, destRemaining);
memcpy(dest + destOffset, src + srcOffset, destRemaining);
destLength += destRemaining;
Send(&dest, destLength);
Send(req, dest, destLength);
destOffset = 0;
destLength = 0;
srcRemaining -= destRemaining;
srcOffset += destRemaining;
destRemaining = dest.len;
req = AllocateSend();
destRemaining = req->self_size();
dest = req->Extra();
}

if (srcRemaining > 0) {
memcpy(dest.base + destOffset, src + srcOffset, srcRemaining);
memcpy(dest + destOffset, src + srcOffset, srcRemaining);
destLength += srcRemaining;
destOffset += srcRemaining;
destRemaining -= srcRemaining;
Expand All @@ -535,7 +542,7 @@ inline void Nghttp2Session::SendPendingData() {
if (destLength > 0) {
DEBUG_HTTP2("Nghttp2Session %s: pushing %d bytes to the socket\n",
TypeName(), destLength);
Send(&dest, destLength);
Send(req, dest, destLength);
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/node_http2_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "stream_base.h"
#include "util-inl.h"
#include "uv.h"
#include "nghttp2/nghttp2.h"
Expand Down Expand Up @@ -153,7 +154,6 @@ class Nghttp2Session {
// Removes a stream instance from this session
inline void RemoveStream(int32_t id);

virtual void Send(uv_buf_t* buf, size_t length) {}
virtual void OnHeaders(
Nghttp2Stream* stream,
std::queue<nghttp2_header>* headers,
Expand All @@ -176,7 +176,10 @@ class Nghttp2Session {
int error_code) {}
virtual ssize_t GetPadding(size_t frameLength,
size_t maxFrameLength) { return 0; }
virtual void AllocateSend(uv_buf_t* buf) = 0;

inline void SendPendingData();
virtual void Send(WriteWrap* req, char* buf, size_t length) = 0;
virtual WriteWrap* AllocateSend() = 0;

virtual bool HasGetPaddingCallback() { return false; }

Expand All @@ -199,8 +202,6 @@ class Nghttp2Session {
virtual void OnTrailers(Nghttp2Stream* stream,
const SubmitTrailers& submit_trailers) {}

inline void SendPendingData();

virtual uv_loop_t* event_loop() const = 0;

virtual void Close();
Expand Down
2 changes: 0 additions & 2 deletions test/parallel/parallel.status
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,5 @@ test-npm-install: PASS,FLAKY
[$system==solaris] # Also applies to SmartOS

[$system==freebsd]
test-http2-compat-serverrequest-pipe: PASS,FLAKY
test-http2-pipe: PASS,FLAKY

[$system==aix]

0 comments on commit 4db1bc8

Please sign in to comment.