Skip to content

Commit

Permalink
http2: prevent large writes from timing out
Browse files Browse the repository at this point in the history
When writing a large chunk of data in http2, once the data is handed
off to C++, the JS session & stream lose all track of the write and
will timeout if the write doesn't complete within the timeout window

Fix this issue by tracking whether a write request is ongoing and
also tracking how many chunks have been sent since the most recent
write started. (Since each write call resets the timer.)
  • Loading branch information
apapirovski committed Oct 26, 2017
1 parent a051ccc commit 03d7e3f
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 4 deletions.
64 changes: 60 additions & 4 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,8 @@ class Http2Session extends EventEmitter {
shutdown: false,
shuttingDown: false,
pendingAck: 0,
maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10)
maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10),
writeQueueSize: 0
};

this[kType] = type;
Expand Down Expand Up @@ -1080,6 +1081,20 @@ class Http2Session extends EventEmitter {
}

_onTimeout() {
// This checks whether a write is currently in progress and also whether
// that write is actually sending data across the write. The kHandle
// stored `chunksSentSinceLastWrite` is only updated when a timeout event
// happens, meaning that if a write is ongoing it should never equal the
// newly fetched, updated value.
if (this[kState].writeQueueSize > 0) {
const handle = this[kHandle];
if (handle !== undefined &&
handle.chunksSentSinceLastWrite !== handle.updateChunksSent()) {
_unrefActive(this);
return;
}
}

process.nextTick(emit, this, 'timeout');
}
}
Expand Down Expand Up @@ -1199,12 +1214,26 @@ function createWriteReq(req, handle, data, encoding) {
}
}

function trackWriteState(stream, bytes) {
const session = stream[kSession];
stream[kState].writeQueueSize += bytes;
session[kState].writeQueueSize += bytes;
session[kHandle].chunksSentSinceLastWrite = 0;
}

function afterDoStreamWrite(status, handle, req) {
const session = handle[kOwner];
const stream = session[kState].streams.get(req.stream);
_unrefActive(session);
if (stream !== undefined)

const state = session[kState];
const { bytes } = req;
state.writeQueueSize -= bytes;

const stream = state.streams.get(req.stream);
if (stream !== undefined) {
_unrefActive(stream);
stream[kState].writeQueueSize -= bytes;
}

if (typeof req.callback === 'function')
req.callback();
Expand Down Expand Up @@ -1317,7 +1346,8 @@ class Http2Stream extends Duplex {
headersSent: false,
headRequest: false,
aborted: false,
closeHandler: onSessionClose.bind(this)
closeHandler: onSessionClose.bind(this),
writeQueueSize: 0
};

this.once('ready', streamOnceReady);
Expand Down Expand Up @@ -1364,6 +1394,21 @@ class Http2Stream extends Duplex {
}

_onTimeout() {
// This checks whether a write is currently in progress and also whether
// that write is actually sending data across the write. The kHandle
// stored `chunksSentSinceLastWrite` is only updated when a timeout event
// happens, meaning that if a write is ongoing it should never equal the
// newly fetched, updated value.
if (this[kState].writeQueueSize > 0) {
const handle = this[kSession][kHandle];
if (handle !== undefined &&
handle.chunksSentSinceLastWrite !== handle.updateChunksSent()) {
_unrefActive(this);
_unrefActive(this[kSession]);
return;
}
}

process.nextTick(emit, this, 'timeout');
}

Expand Down Expand Up @@ -1416,6 +1461,7 @@ class Http2Stream extends Duplex {
const err = createWriteReq(req, handle, data, encoding);
if (err)
throw util._errnoException(err, 'write', req.error);
trackWriteState(this, req.bytes);
}

_writev(data, cb) {
Expand Down Expand Up @@ -1444,6 +1490,7 @@ class Http2Stream extends Duplex {
const err = handle.writev(req, chunks);
if (err)
throw util._errnoException(err, 'write', req.error);
trackWriteState(this, req.bytes);
}

_read(nread) {
Expand Down Expand Up @@ -1537,6 +1584,10 @@ class Http2Stream extends Duplex {
return;
}

const state = this[kState];
session[kState].writeQueueSize -= state.writeQueueSize;
state.writeQueueSize = 0;

const server = session[kServer];
if (server !== undefined && err) {
server.emit('streamError', err, this);
Expand Down Expand Up @@ -1631,7 +1682,12 @@ function processRespondWithFD(fd, headers, offset = 0, length = -1,
if (ret < 0) {
err = new NghttpError(ret);
process.nextTick(emit, this, 'error', err);
break;
}
// exact length of the file doesn't matter here, since the
// stream is closing anyway — just use 1 to signify that
// a write does exist
trackWriteState(this, 1);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class ModuleWrap;
V(callback_string, "callback") \
V(change_string, "change") \
V(channel_string, "channel") \
V(chunks_sent_since_last_write_string, "chunksSentSinceLastWrite") \
V(constants_string, "constants") \
V(oncertcb_string, "oncertcb") \
V(onclose_string, "_onclose") \
Expand Down
24 changes: 24 additions & 0 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo<Value>& args) {
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID);
}

session->chunks_sent_since_last_write_ = 0;

Headers list(isolate, context, headers);

args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(),
Expand Down Expand Up @@ -757,6 +759,23 @@ void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args) {
stream->FlushDataChunks();
}

void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) {
Http2Session* session;
Environment* env = Environment::GetCurrent(args);
Isolate* isolate = env->isolate();
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());

HandleScope scope(isolate);

uint32_t length = session->chunks_sent_since_last_write_;

session->object()->Set(env->context(),
env->chunks_sent_since_last_write_string(),
Integer::NewFromUnsigned(isolate, length)).FromJust();

args.GetReturnValue().Set(length);
}

void Http2Session::SubmitPushPromise(const FunctionCallbackInfo<Value>& args) {
Http2Session* session;
Environment* env = Environment::GetCurrent(args);
Expand Down Expand Up @@ -811,6 +830,8 @@ int Http2Session::DoWrite(WriteWrap* req_wrap,
}
}

chunks_sent_since_last_write_ = 0;

nghttp2_stream_write_t* req = new nghttp2_stream_write_t;
req->data = req_wrap;

Expand Down Expand Up @@ -846,6 +867,7 @@ void Http2Session::Send(uv_buf_t* buf, size_t length) {
this,
AfterWrite);

chunks_sent_since_last_write_++;
uv_buf_t actual = uv_buf_init(buf->base, length);
if (stream_->DoWrite(write_req, &actual, 1, nullptr)) {
write_req->Dispose();
Expand Down Expand Up @@ -1255,6 +1277,8 @@ void Initialize(Local<Object> target,
Http2Session::DestroyStream);
env->SetProtoMethod(session, "flushData",
Http2Session::FlushData);
env->SetProtoMethod(session, "updateChunksSent",
Http2Session::UpdateChunksSent);
StreamBase::AddMethods<Http2Session>(env, session,
StreamBase::kFlagHasWritev |
StreamBase::kFlagNoShutdown);
Expand Down
4 changes: 4 additions & 0 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ class Http2Session : public AsyncWrap,
static void SubmitGoaway(const FunctionCallbackInfo<Value>& args);
static void DestroyStream(const FunctionCallbackInfo<Value>& args);
static void FlushData(const FunctionCallbackInfo<Value>& args);
static void UpdateChunksSent(const FunctionCallbackInfo<Value>& args);

template <get_setting fn>
static void GetSettings(const FunctionCallbackInfo<Value>& args);
Expand All @@ -492,6 +493,9 @@ class Http2Session : public AsyncWrap,
StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
padding_strategy_type padding_strategy_ = PADDING_STRATEGY_NONE;

// use this to allow timeout tracking during long-lasting writes
uint32_t chunks_sent_since_last_write_ = 0;

char stream_buf_[kAllocBufferSize];
};

Expand Down
89 changes: 89 additions & 0 deletions test/sequential/test-http2-timeout-large-write-file.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const fixtures = require('../common/fixtures');
const fs = require('fs');
const http2 = require('http2');
const path = require('path');

common.refreshTmpDir();

// This test assesses whether long-running writes can complete
// or timeout because the session or stream are not aware that the
// backing stream is still writing.
// To simulate a slow client, we write a really large chunk and
// then proceed through the following cycle:
// 1) Receive first 'data' event and record currently written size
// 2) Once we've read up to currently written size recorded above,
// we pause the stream and wait longer than the server timeout
// 3) Socket.prototype._onTimeout triggers and should confirm
// that the backing stream is still active and writing
// 4) Our timer fires, we resume the socket and start at 1)

const writeSize = 3000000;
const minReadSize = 500000;
const serverTimeout = common.platformTimeout(500);
let offsetTimeout = common.platformTimeout(100);
let didReceiveData = false;

const content = Buffer.alloc(writeSize, 0x44);
const filepath = path.join(common.tmpDir, 'http2-large-write.tmp');
fs.writeFileSync(filepath, content, 'binary');
const fd = fs.openSync(filepath, 'r');

const server = http2.createSecureServer({
key: fixtures.readKey('agent1-key.pem'),
cert: fixtures.readKey('agent1-cert.pem')
});
server.on('stream', common.mustCall((stream) => {
stream.respondWithFD(fd, {
'Content-Type': 'application/octet-stream',
'Content-Length': content.length.toString(),
'Vary': 'Accept-Encoding'
});
stream.setTimeout(serverTimeout);
stream.on('timeout', () => {
assert.strictEqual(didReceiveData, false, 'Should not timeout');
});
stream.end();
}));
server.setTimeout(serverTimeout);
server.on('timeout', () => {
assert.strictEqual(didReceiveData, false, 'Should not timeout');
});

server.listen(0, common.mustCall(() => {
const client = http2.connect(`https://localhost:${server.address().port}`,
{ rejectUnauthorized: false });

const req = client.request({ ':path': '/' });
req.end();

const resume = () => req.resume();
let receivedBufferLength = 0;
let firstReceivedAt;
req.on('data', common.mustCallAtLeast((buf) => {
if (receivedBufferLength === 0) {
didReceiveData = false;
firstReceivedAt = Date.now();
}
receivedBufferLength += buf.length;
if (receivedBufferLength >= minReadSize &&
receivedBufferLength < writeSize) {
didReceiveData = true;
receivedBufferLength = 0;
req.pause();
setTimeout(
resume,
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
);
offsetTimeout = 0;
}
}, 1));
req.on('end', common.mustCall(() => {
client.destroy();
server.close();
}));
}));
84 changes: 84 additions & 0 deletions test/sequential/test-http2-timeout-large-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const fixtures = require('../common/fixtures');
const http2 = require('http2');

// This test assesses whether long-running writes can complete
// or timeout because the session or stream are not aware that the
// backing stream is still writing.
// To simulate a slow client, we write a really large chunk and
// then proceed through the following cycle:
// 1) Receive first 'data' event and record currently written size
// 2) Once we've read up to currently written size recorded above,
// we pause the stream and wait longer than the server timeout
// 3) Socket.prototype._onTimeout triggers and should confirm
// that the backing stream is still active and writing
// 4) Our timer fires, we resume the socket and start at 1)

const writeSize = 3000000;
const minReadSize = 500000;
const serverTimeout = common.platformTimeout(500);
let offsetTimeout = common.platformTimeout(100);
let didReceiveData = false;

const server = http2.createSecureServer({
key: fixtures.readKey('agent1-key.pem'),
cert: fixtures.readKey('agent1-cert.pem')
});
server.on('stream', common.mustCall((stream) => {
const content = Buffer.alloc(writeSize, 0x44);

stream.respond({
'Content-Type': 'application/octet-stream',
'Content-Length': content.length.toString(),
'Vary': 'Accept-Encoding'
});

stream.write(content);
stream.setTimeout(serverTimeout);
stream.on('timeout', () => {
assert.strictEqual(didReceiveData, false, 'Should not timeout');
});
stream.end();
}));
server.setTimeout(serverTimeout);
server.on('timeout', () => {
assert.strictEqual(didReceiveData, false, 'Should not timeout');
});

server.listen(0, common.mustCall(() => {
const client = http2.connect(`https://localhost:${server.address().port}`,
{ rejectUnauthorized: false });

const req = client.request({ ':path': '/' });
req.end();

const resume = () => req.resume();
let receivedBufferLength = 0;
let firstReceivedAt;
req.on('data', common.mustCallAtLeast((buf) => {
if (receivedBufferLength === 0) {
didReceiveData = false;
firstReceivedAt = Date.now();
}
receivedBufferLength += buf.length;
if (receivedBufferLength >= minReadSize &&
receivedBufferLength < writeSize) {
didReceiveData = true;
receivedBufferLength = 0;
req.pause();
setTimeout(
resume,
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
);
offsetTimeout = 0;
}
}, 1));
req.on('end', common.mustCall(() => {
client.destroy();
server.close();
}));
}));

0 comments on commit 03d7e3f

Please sign in to comment.