stream: finished callback for closed streams

Previously finished(stream, cb) would not invoke the callback
for streams that have already finished, ended or errored
before being passed to finished(stream, cb).

PR-URL: https://github.com/nodejs/node/pull/31509
Refs: https://github.com/nodejs/node/pull/31508
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
Robert Nagy 2020-01-25 15:35:38 +01:00
parent e559842188
commit d016b9d708
6 changed files with 158 additions and 14 deletions

View File

@ -148,6 +148,9 @@ function ReadableState(options, stream, isDuplex) {
// Indicates whether the stream has errored.
this.errored = false;
// Indicates whether the stream has finished destroying.
this.closed = false;
// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.

View File

@ -175,6 +175,9 @@ function WritableState(options, stream, isDuplex) {
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;
// Indicates whether the stream has finished destroying.
this.closed = false;
// Count buffered requests
this.bufferedRequestCount = 0;

View File

@ -67,16 +67,6 @@ function finish(self, err) {
return new Promise((resolve, reject) => {
const stream = self[kStream];
// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
if (ended) {
resolve(createIterResult(undefined, true));
return;
}
finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);

View File

@ -48,6 +48,13 @@ function destroy(err, cb) {
}
}
if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}
if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
@ -101,6 +108,7 @@ function undestroy() {
const w = this._writableState;
if (r) {
r.closed = false;
r.destroyed = false;
r.errored = false;
r.reading = false;
@ -110,6 +118,7 @@ function undestroy() {
}
if (w) {
w.closed = false;
w.destroyed = false;
w.errored = false;
w.ended = false;

View File

@ -32,6 +32,8 @@ function isWritableFinished(stream) {
return wState.finished || (wState.ended && wState.length === 0);
}
function nop() {}
function eos(stream, opts, callback) {
if (arguments.length === 2) {
callback = opts;
@ -52,12 +54,15 @@ function eos(stream, opts, callback) {
let writable = opts.writable ||
(opts.writable !== false && isWritable(stream));
const wState = stream._writableState;
const rState = stream._readableState;
const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};
let writableFinished = stream.writableFinished ||
(stream._writableState && stream._writableState.finished);
(rState && rState.finished);
const onfinish = () => {
writable = false;
writableFinished = true;
@ -65,7 +70,7 @@ function eos(stream, opts, callback) {
};
let readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
(rState && rState.endEmitted);
const onend = () => {
readable = false;
readableEnded = true;
@ -79,7 +84,7 @@ function eos(stream, opts, callback) {
const onclose = () => {
let err;
if (readable && !readableEnded) {
if (!stream._readableState || !stream._readableState.ended)
if (!rState || !rState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
@ -99,7 +104,7 @@ function eos(stream, opts, callback) {
stream.on('abort', onclose);
if (stream.req) onrequest();
else stream.on('request', onrequest);
} else if (writable && !stream._writableState) { // legacy streams
} else if (writable && !wState) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
}
@ -114,7 +119,24 @@ function eos(stream, opts, callback) {
if (opts.error !== false) stream.on('error', onerror);
stream.on('close', onclose);
const closed = (wState && wState.closed) || (rState && rState.closed) ||
(wState && wState.errorEmitted) || (rState && rState.errorEmitted) ||
(wState && wState.finished) || (rState && rState.endEmitted) ||
(rState && stream.req && stream.aborted);
if (closed) {
// TODO(ronag): Re-throw error if errorEmitted?
// TODO(ronag): Throw premature close as if finished was called?
// before being closed? i.e. if closed but not errored, ended or finished.
// TODO(ronag): Throw some kind of error? Does it make sense
// to call finished() on a "finished" stream?
process.nextTick(() => {
callback();
});
}
return function() {
callback = nop;
stream.removeListener('aborted', onclose);
stream.removeListener('complete', onfinish);
stream.removeListener('abort', onclose);

View File

@ -215,3 +215,120 @@ const { promisify } = require('util');
w.end('asd');
w.destroy();
}
function testClosed(factory) {
{
// If already destroyed but finished is cancelled in same tick
// don't invoke the callback,
const s = factory();
s.destroy();
const dispose = finished(s, common.mustNotCall());
dispose();
}
{
// If already destroyed invoked callback.
const s = factory();
s.destroy();
finished(s, common.mustCall());
}
{
// Don't invoke until destroy has completed.
let destroyed = false;
const s = factory({
destroy(err, cb) {
setImmediate(() => {
destroyed = true;
cb();
});
}
});
s.destroy();
finished(s, common.mustCall(() => {
assert.strictEqual(destroyed, true);
}));
}
{
// Invoke callback even if close is inhibited.
const s = factory({
emitClose: false,
destroy(err, cb) {
cb();
finished(s, common.mustCall());
}
});
s.destroy();
}
{
// Invoke with deep async.
const s = factory({
destroy(err, cb) {
setImmediate(() => {
cb();
setImmediate(() => {
finished(s, common.mustCall());
});
});
}
});
s.destroy();
}
}
testClosed((opts) => new Readable({ ...opts }));
testClosed((opts) => new Writable({ write() {}, ...opts }));
{
const w = new Writable({
write(chunk, encoding, cb) {
cb();
},
autoDestroy: false
});
w.end('asd');
process.nextTick(() => {
finished(w, common.mustCall());
});
}
{
const w = new Writable({
write(chunk, encoding, cb) {
cb(new Error());
},
autoDestroy: false
});
w.write('asd');
w.on('error', common.mustCall(() => {
finished(w, common.mustCall());
}));
}
{
const r = new Readable({
autoDestroy: false
});
r.push(null);
r.resume();
r.on('end', common.mustCall(() => {
finished(r, common.mustCall());
}));
}
{
const rs = fs.createReadStream(__filename, { autoClose: false });
rs.resume();
rs.on('close', common.mustNotCall());
rs.on('end', common.mustCall(() => {
finished(rs, common.mustCall());
}));
}