stream: try to wait for flush to complete before 'finish'

Due to compat reasons Transform streams don't always wait
for flush to complete before finishing the stream.

Try to wait when possible, i.e. when the user does not
override _final.

Fixes: https://github.com/nodejs/node/issues/34274

PR-URL: https://github.com/nodejs/node/pull/34314
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Zeyu Yang <himself65@outlook.com>
This commit is contained in:
Robert Nagy 2020-07-11 23:50:09 +02:00
parent e44855d317
commit a65218f5e8
3 changed files with 49 additions and 2 deletions

View File

@ -106,11 +106,15 @@ function Transform(options) {
this.on('prefinish', prefinish);
}
function prefinish() {
function final(cb) {
if (typeof this._flush === 'function' && !this.destroyed) {
this._flush((er, data) => {
if (er) {
this.destroy(er);
if (cb) {
cb(er);
} else {
this.destroy(er);
}
return;
}
@ -118,12 +122,26 @@ function prefinish() {
this.push(data);
}
this.push(null);
if (cb) {
cb();
}
});
} else {
this.push(null);
if (cb) {
cb();
}
}
}
function prefinish() {
if (this._final !== final) {
final.call(this);
}
}
Transform.prototype._final = final;
Transform.prototype._transform = function(chunk, encoding, callback) {
throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()');
};

View File

@ -323,6 +323,11 @@ ZlibBase.prototype._flush = function(callback) {
this._transform(Buffer.alloc(0), '', callback);
};
// Force Transform compat behavior.
ZlibBase.prototype._final = function(callback) {
callback();
};
// If a flush is scheduled while another flush is still pending, a way to figure
// out which one is the "stronger" flush is needed.
// This is currently only used to figure out which flush flag to use for the

View File

@ -1231,3 +1231,27 @@ const net = require('net');
assert.strictEqual(res, 'helloworld');
}));
}
{
let flushed = false;
const makeStream = () =>
new Transform({
transform: (chunk, enc, cb) => cb(null, chunk),
flush: (cb) =>
setTimeout(() => {
flushed = true;
cb(null);
}, 1),
});
const input = new Readable();
input.push(null);
pipeline(
input,
makeStream(),
common.mustCall(() => {
assert.strictEqual(flushed, true);
}),
);
}