stream: simplify Transform stream implementation

Significantly simplified Transform stream implementation by
using mostly standard stream code.

PR-URL: https://github.com/nodejs/node/pull/32763
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
Robert Nagy 2020-04-10 17:17:20 +02:00
parent f22a9cac36
commit 0bd5595509
4 changed files with 54 additions and 112 deletions

View File

@ -65,45 +65,18 @@
const {
ObjectSetPrototypeOf,
Symbol
} = primordials;
module.exports = Transform;
const {
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_TRANSFORM_ALREADY_TRANSFORMING,
ERR_TRANSFORM_WITH_LENGTH_0
ERR_METHOD_NOT_IMPLEMENTED
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);
function afterTransform(er, data) {
const ts = this._transformState;
ts.transforming = false;
const cb = ts.writecb;
if (cb === null) {
return this.emit('error', new ERR_MULTIPLE_CALLBACK());
}
ts.writechunk = null;
ts.writecb = null;
if (data != null) // Single equals check for both `null` and `undefined`
this.push(data);
cb(er);
const rs = this._readableState;
rs.reading = false;
if (rs.needReadable || rs.length < rs.highWaterMark) {
this._read(rs.highWaterMark);
}
}
const kCallback = Symbol('kCallback');
function Transform(options) {
if (!(this instanceof Transform))
@ -111,20 +84,13 @@ function Transform(options) {
Duplex.call(this, options);
this._transformState = {
afterTransform: afterTransform.bind(this),
needTransform: false,
transforming: false,
writecb: null,
writechunk: null,
writeencoding: null
};
// We have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
this._readableState.sync = false;
this[kCallback] = null;
if (options) {
if (typeof options.transform === 'function')
this._transform = options.transform;
@ -133,89 +99,67 @@ function Transform(options) {
this._flush = options.flush;
}
// When the writable side finishes, then flush out anything remaining.
// TODO(ronag): Unfortunately _final is invoked asynchronously.
// Use `prefinish` hack. `prefinish` is emitted synchronously when
// and only when `_final` is not defined. Implementing `_final`
// to a Transform should be an error.
this.on('prefinish', prefinish);
}
function prefinish() {
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
if (typeof this._flush === 'function' && !this.destroyed) {
this._flush((er, data) => {
done(this, er, data);
if (er) {
this.destroy(er);
return;
}
if (data != null) {
this.push(data);
}
this.push(null);
});
} else {
done(this, null, null);
this.push(null);
}
}
Transform.prototype.push = function(chunk, encoding) {
this._transformState.needTransform = false;
return Duplex.prototype.push.call(this, chunk, encoding);
};
// This is the part where you do stuff!
// override this function in implementation classes.
// 'chunk' is an input chunk.
//
// Call `push(newChunk)` to pass along transformed output
// to the readable side. You may call 'push' zero or more times.
//
// Call `cb(err)` when you are done with this chunk. If you pass
// an error, then that'll put the hurt on the whole operation. If you
// never call cb(), then you'll never get another chunk.
Transform.prototype._transform = function(chunk, encoding, cb) {
Transform.prototype._transform = function(chunk, encoding, callback) {
throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()');
};
Transform.prototype._write = function(chunk, encoding, cb) {
const ts = this._transformState;
ts.writecb = cb;
ts.writechunk = chunk;
ts.writeencoding = encoding;
if (!ts.transforming) {
const rs = this._readableState;
if (ts.needTransform ||
rs.needReadable ||
rs.length < rs.highWaterMark)
this._read(rs.highWaterMark);
}
};
Transform.prototype._write = function(chunk, encoding, callback) {
const rState = this._readableState;
const wState = this._writableState;
const length = rState.length;
// Doesn't matter what the args are here.
// _transform does all the work.
// That we got here means that the readable side wants more data.
Transform.prototype._read = function(n) {
const ts = this._transformState;
this._transform(chunk, encoding, (err, val) => {
if (err) {
callback(err);
return;
}
if (ts.writechunk !== null && !ts.transforming) {
ts.transforming = true;
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
} else {
// Mark that we need a transform, so that any data that comes in
// will get processed, now that we've asked for it.
ts.needTransform = true;
}
};
if (val != null) {
this.push(val);
}
Transform.prototype._destroy = function(err, cb) {
Duplex.prototype._destroy.call(this, err, (err2) => {
cb(err2);
if (
wState.ended || // Backwards compat.
length === rState.length || // Backwards compat.
rState.length < rState.highWaterMark ||
rState.length === 0
) {
callback();
} else {
this[kCallback] = callback;
}
});
};
function done(stream, er, data) {
if (er)
return stream.emit('error', er);
if (data != null) // Single equals check for both `null` and `undefined`
stream.push(data);
// These two error cases are coherence checks that can likely not be tested.
if (stream._writableState.length)
throw new ERR_TRANSFORM_WITH_LENGTH_0();
if (stream._transformState.transforming)
throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
return stream.push(null);
}
Transform.prototype._read = function() {
if (this[kCallback]) {
const callback = this[kCallback];
this[kCallback] = null;
callback();
}
};

View File

@ -1363,12 +1363,8 @@ E('ERR_TLS_SNI_FROM_SERVER',
E('ERR_TRACE_EVENTS_CATEGORY_REQUIRED',
'At least one category is required', TypeError);
E('ERR_TRACE_EVENTS_UNAVAILABLE', 'Trace events are unavailable', Error);
E('ERR_TRANSFORM_ALREADY_TRANSFORMING',
'Calling transform done when still transforming', Error);
// This should probably be a `RangeError`.
E('ERR_TRANSFORM_WITH_LENGTH_0',
'Calling transform done when writableState.length != 0', Error);
E('ERR_TTY_INIT_FAILED', 'TTY initialization failed', SystemError);
E('ERR_UNCAUGHT_EXCEPTION_CAPTURE_ALREADY_SET',
'`process.setupUncaughtExceptionCapture()` was called while a capture ' +

View File

@ -45,10 +45,9 @@ const Transform = require('_stream_transform');
assert.strictEqual(tx.readableLength, 10);
assert.strictEqual(transformed, 10);
assert.strictEqual(tx._transformState.writechunk.length, 5);
assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
return c.chunk.length;
}), [6, 7, 8, 9, 10]);
}), [5, 6, 7, 8, 9, 10]);
}
{

View File

@ -28,6 +28,9 @@ const ws = deflater._writableState;
const beforeFlush = ws.needDrain;
let afterFlush = ws.needDrain;
deflater.on('data', () => {
});
deflater.flush(function(err) {
afterFlush = ws.needDrain;
});