Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add final method #12828

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,8 @@ on the type of stream being created, as detailed in the chart below:
<p>[Writable](#stream_class_stream_writable)</p>
</td>
<td>
<p><code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p>
<p><code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>,
<code>[_final][stream-_final]</code></p>
</td>
</tr>
<tr>
Expand All @@ -1208,7 +1209,8 @@ on the type of stream being created, as detailed in the chart below:
<p>[Duplex](#stream_class_stream_duplex)</p>
</td>
<td>
<p><code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p>
<p><code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>,
<code>[_final][stream-_final]</code></p>
</td>
</tr>
<tr>
Expand All @@ -1219,7 +1221,8 @@ on the type of stream being created, as detailed in the chart below:
<p>[Transform](#stream_class_stream_transform)</p>
</td>
<td>
<p><code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code></p>
<p><code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code>,
<code>[_final][stream-_final]</code></p>
</td>
</tr>
</table>
Expand Down Expand Up @@ -1278,6 +1281,8 @@ constructor and implement the `writable._write()` method. The
[`stream._writev()`][stream-_writev] method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][writable-_destroy] method.
* `final` {Function} Implementation for the
[`stream._final()`][stream-_final] method.

For example:

Expand Down Expand Up @@ -1396,6 +1401,22 @@ added: REPLACEME
* `callback` {Function} A callback function that takes an optional error argument
which is invoked when the writable is destroyed.

#### writable.\_final(callback)
<!-- YAML
added: REPLACEME
-->

* `callback` {Function} Call this function (optionally with an error
argument) when you are done writing any remaining data.

Note: `_final()` **must not** be called directly. It MAY be implemented
by child classes, and if so, will be called by the internal Writable
class methods only.

This optional function will be called before the stream closes, delaying the
`finish` event until `callback` is called. This is useful to close resources
or write buffered data before a stream ends.

#### Errors While Writing

It is recommended that errors occurring during the processing of the
Expand Down Expand Up @@ -2109,6 +2130,7 @@ readable buffer so there is nothing for a user to consume.
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
[stream-_write]: #stream_writable_write_chunk_encoding_callback_1
[stream-_writev]: #stream_writable_writev_chunks_callback
[stream-_final]: #stream_writable_final_callback
[stream-end]: #stream_writable_end_chunk_encoding_callback
[stream-pause]: #stream_readable_pause
[stream-push]: #stream_readable_push_chunk_encoding
Expand Down
35 changes: 29 additions & 6 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ function WritableState(options, stream) {
// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);

// if _final has been called
this.finalCalled = false;

// if _final has been called
this.finalCalled = false;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you write it twice? Hmm.

Copy link
Member

@mcollina mcollina Jun 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very likely a mistake. This has already been fixed.
https://github.com/nodejs/node/blob/master/lib/_stream_writable.js#L61-L63


// drain event flag.
this.needDrain = false;
// at the start of calling end()
Expand Down Expand Up @@ -199,6 +205,9 @@ function Writable(options) {

if (typeof options.destroy === 'function')
this._destroy = options.destroy;

if (typeof options.final === 'function')
this._final = options.final;
}

Stream.call(this);
Expand Down Expand Up @@ -520,23 +529,37 @@ function needFinish(state) {
!state.finished &&
!state.writing);
}

function prefinish(stream, state) {
if (!state.prefinished) {
function callFinal(stream, state) {
stream._final((err) => {
state.pendingcb--;
if (err) {
stream.emit('error', err);
}
state.prefinished = true;
stream.emit('prefinish');
finishMaybe(stream, state);
});
}
function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
if (typeof stream._final === 'function') {
state.pendingcb++;
state.finalCalled = true;
process.nextTick(callFinal, stream, state);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Is process.nextTick really needed here? Can someone confirm/deny?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szmarczak This looks like a classic case where process.nextTick() is called for-- it's a case where the final function may be sync or async. This post explains more about it. https://nodejs.org/en/docs/guides/event-loop-timers-and-nexttick/#process-nexttick

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know what process.nextTick does :P Well, in my case, _final() is sync and process.nextTick() is useless. I haven't checked if that's required by some other Node code, so I'm just asking :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I found some other cases that require moving it to next tick. 👍

} else {
state.prefinished = true;
stream.emit('prefinish');
}
}
}

function finishMaybe(stream, state) {
var need = needFinish(state);
if (need) {
prefinish(stream, state);
if (state.pendingcb === 0) {
prefinish(stream, state);
state.finished = true;
stream.emit('finish');
} else {
prefinish(stream, state);
}
}
return need;
Expand Down
14 changes: 3 additions & 11 deletions test/parallel/test-stream-readable-constructor-set-methods.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
'use strict';
require('../common');
const assert = require('assert');
const common = require('../common');

const Readable = require('stream').Readable;

let _readCalled = false;
function _read(n) {
_readCalled = true;
const _read = common.mustCall(function _read(n) {
this.push(null);
}
});

const r = new Readable({ read: _read });
r.resume();

process.on('exit', function() {
assert.strictEqual(r._read, _read);
assert(_readCalled);
});
24 changes: 12 additions & 12 deletions test/parallel/test-stream-transform-constructor-set-methods.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
'use strict';
require('../common');
const common = require('../common');
const assert = require('assert');

const Transform = require('stream').Transform;

let _transformCalled = false;
function _transform(d, e, n) {
_transformCalled = true;
const _transform = common.mustCall(function _transform(d, e, n) {
n();
}
});

let _flushCalled = false;
function _flush(n) {
_flushCalled = true;
const _final = common.mustCall(function _final(n) {
n();
}
});

const _flush = common.mustCall(function _flush(n) {
n();
});

const t = new Transform({
transform: _transform,
flush: _flush
flush: _flush,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use common.mustCall() on these as well, which should also allow us to avoid the unnecessary boolean checks on exit.

final: _final
});

const t2 = new Transform({});
Expand All @@ -34,6 +35,5 @@ assert.throws(() => {
process.on('exit', () => {
assert.strictEqual(t._transform, _transform);
assert.strictEqual(t._flush, _flush);
assert.strictEqual(_transformCalled, true);
assert.strictEqual(_flushCalled, true);
assert.strictEqual(t._final, _final);
});
100 changes: 100 additions & 0 deletions test/parallel/test-stream-transform-final-sync.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
'use strict';
const common = require('../common');
const assert = require('assert');

const stream = require('stream');
let state = 0;

/*
What you do
var stream = new tream.Transform({
transform: function transformCallback(chunk, _, next) {
// part 1
this.push(chunk);
//part 2
next();
},
final: function endCallback(done) {
// part 1
process.nextTick(function () {
// part 2
done();
});
},
flush: function flushCallback(done) {
// part 1
process.nextTick(function () {
// part 2
done();
});
}
});
t.on('data', dataListener);
t.on('end', endListener);
t.on('finish', finishListener);
t.write(1);
t.write(4);
t.end(7, endMethodCallback);

The order things are called

1. transformCallback part 1
2. dataListener
3. transformCallback part 2
4. transformCallback part 1
5. dataListener
6. transformCallback part 2
7. transformCallback part 1
8. dataListener
9. transformCallback part 2
10. finalCallback part 1
11. finalCallback part 2
12. flushCallback part 1
13. finishListener
14. endMethodCallback
15. flushCallback part 2
16. endListener
*/

const t = new stream.Transform({
objectMode: true,
transform: common.mustCall(function(chunk, _, next) {
assert.strictEqual(++state, chunk, 'transformCallback part 1');
this.push(state);
assert.strictEqual(++state, chunk + 2, 'transformCallback part 2');
process.nextTick(next);
}, 3),
final: common.mustCall(function(done) {
state++;
assert.strictEqual(state, 10, 'finalCallback part 1');
state++;
assert.strictEqual(state, 11, 'finalCallback part 2');
done();
}, 1),
flush: common.mustCall(function(done) {
state++;
assert.strictEqual(state, 12, 'flushCallback part 1');
process.nextTick(function() {
state++;
assert.strictEqual(state, 15, 'flushCallback part 2');
done();
});
}, 1)
});
t.on('finish', common.mustCall(function() {
state++;
assert.strictEqual(state, 13, 'finishListener');
}, 1));
t.on('end', common.mustCall(function() {
state++;
assert.strictEqual(state, 16, 'end event');
}, 1));
t.on('data', common.mustCall(function(d) {
assert.strictEqual(++state, d + 1, 'dataListener');
}, 3));
t.write(1);
t.write(4);
t.end(7, common.mustCall(function() {
state++;
assert.strictEqual(state, 14, 'endMethodCallback');
}, 1));
Loading