-
Notifications
You must be signed in to change notification settings - Fork 30k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http2: fix several timeout related issues #16525
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -746,7 +746,8 @@ class Http2Session extends EventEmitter { | |
shutdown: false, | ||
shuttingDown: false, | ||
pendingAck: 0, | ||
maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10) | ||
maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10), | ||
writeQueueSize: 0 | ||
}; | ||
|
||
this[kType] = type; | ||
|
@@ -1080,6 +1081,20 @@ class Http2Session extends EventEmitter { | |
} | ||
|
||
_onTimeout() { | ||
// This checks whether a write is currently in progress and also whether | ||
// that write is actually sending data across the write. The kHandle | ||
// stored `chunksSentSinceLastWrite` is only updated when a timeout event | ||
// happens, meaning that if a write is ongoing it should never equal the | ||
// newly fetched, updated value. | ||
if (this[kState].writeQueueSize > 0) { | ||
const handle = this[kHandle]; | ||
if (handle !== undefined && | ||
handle.chunksSentSinceLastWrite !== handle.updateChunksSent()) { | ||
_unrefActive(this); | ||
return; | ||
} | ||
} | ||
|
||
process.nextTick(emit, this, 'timeout'); | ||
} | ||
} | ||
|
@@ -1199,8 +1214,27 @@ function createWriteReq(req, handle, data, encoding) { | |
} | ||
} | ||
|
||
function trackWriteState(stream, bytes) { | ||
const session = stream[kSession]; | ||
stream[kState].writeQueueSize += bytes; | ||
session[kState].writeQueueSize += bytes; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any chance we could rename the property to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's currently the same name as net/tls but I can definitely rename if that's preferred. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nah, if it’s for consistency then this is fine |
||
session[kHandle].chunksSentSinceLastWrite = 0; | ||
} | ||
|
||
function afterDoStreamWrite(status, handle, req) { | ||
_unrefActive(handle[kOwner]); | ||
const session = handle[kOwner]; | ||
_unrefActive(session); | ||
|
||
const state = session[kState]; | ||
const { bytes } = req; | ||
state.writeQueueSize -= bytes; | ||
|
||
const stream = state.streams.get(req.stream); | ||
if (stream !== undefined) { | ||
_unrefActive(stream); | ||
stream[kState].writeQueueSize -= bytes; | ||
} | ||
|
||
if (typeof req.callback === 'function') | ||
req.callback(); | ||
this.handle = undefined; | ||
|
@@ -1312,7 +1346,8 @@ class Http2Stream extends Duplex { | |
headersSent: false, | ||
headRequest: false, | ||
aborted: false, | ||
closeHandler: onSessionClose.bind(this) | ||
closeHandler: onSessionClose.bind(this), | ||
writeQueueSize: 0 | ||
}; | ||
|
||
this.once('ready', streamOnceReady); | ||
|
@@ -1359,6 +1394,21 @@ class Http2Stream extends Duplex { | |
} | ||
|
||
_onTimeout() { | ||
// This checks whether a write is currently in progress and also whether | ||
// that write is actually sending data across the write. The kHandle | ||
// stored `chunksSentSinceLastWrite` is only updated when a timeout event | ||
// happens, meaning that if a write is ongoing it should never equal the | ||
// newly fetched, updated value. | ||
if (this[kState].writeQueueSize > 0) { | ||
const handle = this[kSession][kHandle]; | ||
if (handle !== undefined && | ||
handle.chunksSentSinceLastWrite !== handle.updateChunksSent()) { | ||
_unrefActive(this); | ||
_unrefActive(this[kSession]); | ||
return; | ||
} | ||
} | ||
|
||
process.nextTick(emit, this, 'timeout'); | ||
} | ||
|
||
|
@@ -1396,10 +1446,11 @@ class Http2Stream extends Duplex { | |
this.once('ready', this._write.bind(this, data, encoding, cb)); | ||
return; | ||
} | ||
_unrefActive(this); | ||
if (!this[kState].headersSent) | ||
this[kProceed](); | ||
const session = this[kSession]; | ||
_unrefActive(this); | ||
_unrefActive(session); | ||
const handle = session[kHandle]; | ||
const req = new WriteWrap(); | ||
req.stream = this[kID]; | ||
|
@@ -1410,18 +1461,19 @@ class Http2Stream extends Duplex { | |
const err = createWriteReq(req, handle, data, encoding); | ||
if (err) | ||
throw util._errnoException(err, 'write', req.error); | ||
this._bytesDispatched += req.bytes; | ||
trackWriteState(this, req.bytes); | ||
} | ||
|
||
_writev(data, cb) { | ||
if (this[kID] === undefined) { | ||
this.once('ready', this._writev.bind(this, data, cb)); | ||
return; | ||
} | ||
_unrefActive(this); | ||
if (!this[kState].headersSent) | ||
this[kProceed](); | ||
const session = this[kSession]; | ||
_unrefActive(this); | ||
_unrefActive(session); | ||
const handle = session[kHandle]; | ||
const req = new WriteWrap(); | ||
req.stream = this[kID]; | ||
|
@@ -1438,6 +1490,7 @@ class Http2Stream extends Duplex { | |
const err = handle.writev(req, chunks); | ||
if (err) | ||
throw util._errnoException(err, 'write', req.error); | ||
trackWriteState(this, req.bytes); | ||
} | ||
|
||
_read(nread) { | ||
|
@@ -1531,6 +1584,10 @@ class Http2Stream extends Duplex { | |
return; | ||
} | ||
|
||
const state = this[kState]; | ||
session[kState].writeQueueSize -= state.writeQueueSize; | ||
state.writeQueueSize = 0; | ||
|
||
const server = session[kServer]; | ||
if (server !== undefined && err) { | ||
server.emit('streamError', err, this); | ||
|
@@ -1625,7 +1682,12 @@ function processRespondWithFD(fd, headers, offset = 0, length = -1, | |
if (ret < 0) { | ||
err = new NghttpError(ret); | ||
process.nextTick(emit, this, 'error', err); | ||
break; | ||
} | ||
// exact length of the file doesn't matter here, since the | ||
// stream is closing anyway — just use 1 to signify that | ||
// a write does exist | ||
trackWriteState(this, 1); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -603,6 +603,8 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo<Value>& args) { | |
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID); | ||
} | ||
|
||
session->chunks_sent_since_last_write_ = 0; | ||
|
||
Headers list(isolate, context, headers); | ||
|
||
args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(), | ||
|
@@ -757,6 +759,23 @@ void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args) { | |
stream->FlushDataChunks(); | ||
} | ||
|
||
void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) { | ||
Http2Session* session; | ||
Environment* env = Environment::GetCurrent(args); | ||
Isolate* isolate = env->isolate(); | ||
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); | ||
|
||
HandleScope scope(isolate); | ||
|
||
uint32_t length = session->chunks_sent_since_last_write_; | ||
|
||
session->object()->Set(env->context(), | ||
env->chunks_sent_since_last_write_string(), | ||
Integer::NewFromUnsigned(isolate, length)).FromJust(); | ||
|
||
args.GetReturnValue().Set(length); | ||
} | ||
|
||
void Http2Session::SubmitPushPromise(const FunctionCallbackInfo<Value>& args) { | ||
Http2Session* session; | ||
Environment* env = Environment::GetCurrent(args); | ||
|
@@ -811,6 +830,8 @@ int Http2Session::DoWrite(WriteWrap* req_wrap, | |
} | ||
} | ||
|
||
chunks_sent_since_last_write_ = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is my understanding correct that this and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, to be clear: it doesn't quite matter if they're the only parts that can trigger it because any trigger of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, okay. My question wasn’t relevant to this particular PR – it just was kind of my hope that this meant the # of places that could schedule a write would be overseeable so that it would be easy to turn the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. I think that loop is a bit more complicated since nghttp2 manages the flow of data. The only place that can call
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah – basically, the question was if these were the only places that would make nghttp2 want to write something to the socket ;) I guess I can answer that for myself with a ”no”, in the end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are lots of things that could trigger Ng too want to write. Responding to ping frames, for instance, is handled entirely by Ng without any intervention from our code. There are also automatic error handing situations that do not bubble up to our code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
How do we know we need to write to the socket in those cases? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's currently why we attempt to send on every event loop tick. There is a want_write api, however that can be checked. It won't tell you how much data is pending to write, but it will answer true so long as there is data pending in the queue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, makes sense! |
||
|
||
nghttp2_stream_write_t* req = new nghttp2_stream_write_t; | ||
req->data = req_wrap; | ||
|
||
|
@@ -846,6 +867,7 @@ void Http2Session::Send(uv_buf_t* buf, size_t length) { | |
this, | ||
AfterWrite); | ||
|
||
chunks_sent_since_last_write_++; | ||
uv_buf_t actual = uv_buf_init(buf->base, length); | ||
if (stream_->DoWrite(write_req, &actual, 1, nullptr)) { | ||
write_req->Dispose(); | ||
|
@@ -1255,6 +1277,8 @@ void Initialize(Local<Object> target, | |
Http2Session::DestroyStream); | ||
env->SetProtoMethod(session, "flushData", | ||
Http2Session::FlushData); | ||
env->SetProtoMethod(session, "updateChunksSent", | ||
Http2Session::UpdateChunksSent); | ||
StreamBase::AddMethods<Http2Session>(env, session, | ||
StreamBase::kFlagHasWritev | | ||
StreamBase::kFlagNoShutdown); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
'use strict'; | ||
const common = require('../common'); | ||
if (!common.hasCrypto) | ||
common.skip('missing crypto'); | ||
const assert = require('assert'); | ||
const fixtures = require('../common/fixtures'); | ||
const fs = require('fs'); | ||
const http2 = require('http2'); | ||
const path = require('path'); | ||
|
||
common.refreshTmpDir(); | ||
|
||
// This test assesses whether long-running writes can complete | ||
// or timeout because the session or stream are not aware that the | ||
// backing stream is still writing. | ||
// To simulate a slow client, we write a really large chunk and | ||
// then proceed through the following cycle: | ||
// 1) Receive first 'data' event and record currently written size | ||
// 2) Once we've read up to currently written size recorded above, | ||
// we pause the stream and wait longer than the server timeout | ||
// 3) Socket.prototype._onTimeout triggers and should confirm | ||
// that the backing stream is still active and writing | ||
// 4) Our timer fires, we resume the socket and start at 1) | ||
|
||
const writeSize = 3000000; | ||
const minReadSize = 500000; | ||
const serverTimeout = common.platformTimeout(500); | ||
let offsetTimeout = common.platformTimeout(100); | ||
let didReceiveData = false; | ||
|
||
const content = Buffer.alloc(writeSize, 0x44); | ||
const filepath = path.join(common.tmpDir, 'http2-large-write.tmp'); | ||
fs.writeFileSync(filepath, content, 'binary'); | ||
const fd = fs.openSync(filepath, 'r'); | ||
|
||
const server = http2.createSecureServer({ | ||
key: fixtures.readKey('agent1-key.pem'), | ||
cert: fixtures.readKey('agent1-cert.pem') | ||
}); | ||
server.on('stream', common.mustCall((stream) => { | ||
stream.respondWithFD(fd, { | ||
'Content-Type': 'application/octet-stream', | ||
'Content-Length': content.length.toString(), | ||
'Vary': 'Accept-Encoding' | ||
}); | ||
stream.setTimeout(serverTimeout); | ||
stream.on('timeout', () => { | ||
assert.strictEqual(didReceiveData, false, 'Should not timeout'); | ||
}); | ||
stream.end(); | ||
})); | ||
server.setTimeout(serverTimeout); | ||
server.on('timeout', () => { | ||
assert.strictEqual(didReceiveData, false, 'Should not timeout'); | ||
}); | ||
|
||
server.listen(0, common.mustCall(() => { | ||
const client = http2.connect(`https://localhost:${server.address().port}`, | ||
{ rejectUnauthorized: false }); | ||
|
||
const req = client.request({ ':path': '/' }); | ||
req.end(); | ||
|
||
const resume = () => req.resume(); | ||
let receivedBufferLength = 0; | ||
let firstReceivedAt; | ||
req.on('data', common.mustCallAtLeast((buf) => { | ||
if (receivedBufferLength === 0) { | ||
didReceiveData = false; | ||
firstReceivedAt = Date.now(); | ||
} | ||
receivedBufferLength += buf.length; | ||
if (receivedBufferLength >= minReadSize && | ||
receivedBufferLength < writeSize) { | ||
didReceiveData = true; | ||
receivedBufferLength = 0; | ||
req.pause(); | ||
setTimeout( | ||
resume, | ||
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) | ||
); | ||
offsetTimeout = 0; | ||
} | ||
}, 1)); | ||
req.on('end', common.mustCall(() => { | ||
client.destroy(); | ||
server.close(); | ||
})); | ||
})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The right side of this comparison modifies the left side, right? Could we maaaaybe store the left side in a variable? 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.