mirror of
https://github.com/nodejs/node.git
synced 2025-12-28 07:50:41 +00:00
stream: support async for stream impl functions
PR-URL: https://github.com/nodejs/node/pull/34416 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anto Aravinth <anto.aravinth.cse@gmail.com>
This commit is contained in:
parent
ca26eae966
commit
744a284ccc
@ -107,8 +107,10 @@ function Transform(options) {
|
||||
}
|
||||
|
||||
function final(cb) {
|
||||
let called = false;
|
||||
if (typeof this._flush === 'function' && !this.destroyed) {
|
||||
this._flush((er, data) => {
|
||||
const result = this._flush((er, data) => {
|
||||
called = true;
|
||||
if (er) {
|
||||
if (cb) {
|
||||
cb(er);
|
||||
@ -126,6 +128,33 @@ function final(cb) {
|
||||
cb();
|
||||
}
|
||||
});
|
||||
if (result !== undefined && result !== null) {
|
||||
try {
|
||||
const then = result.then;
|
||||
if (typeof then === 'function') {
|
||||
then.call(
|
||||
result,
|
||||
(data) => {
|
||||
if (called)
|
||||
return;
|
||||
if (data != null)
|
||||
this.push(data);
|
||||
this.push(null);
|
||||
if (cb)
|
||||
process.nextTick(cb);
|
||||
},
|
||||
(err) => {
|
||||
if (cb) {
|
||||
process.nextTick(cb, err);
|
||||
} else {
|
||||
process.nextTick(() => this.destroy(err));
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
process.nextTick(() => this.destroy(err));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
this.push(null);
|
||||
if (cb) {
|
||||
@ -151,7 +180,9 @@ Transform.prototype._write = function(chunk, encoding, callback) {
|
||||
const wState = this._writableState;
|
||||
const length = rState.length;
|
||||
|
||||
this._transform(chunk, encoding, (err, val) => {
|
||||
let called = false;
|
||||
const result = this._transform(chunk, encoding, (err, val) => {
|
||||
called = true;
|
||||
if (err) {
|
||||
callback(err);
|
||||
return;
|
||||
@ -172,6 +203,38 @@ Transform.prototype._write = function(chunk, encoding, callback) {
|
||||
this[kCallback] = callback;
|
||||
}
|
||||
});
|
||||
if (result !== undefined && result != null) {
|
||||
try {
|
||||
const then = result.then;
|
||||
if (typeof then === 'function') {
|
||||
then.call(
|
||||
result,
|
||||
(val) => {
|
||||
if (called)
|
||||
return;
|
||||
|
||||
if (val != null) {
|
||||
this.push(val);
|
||||
}
|
||||
|
||||
if (
|
||||
wState.ended ||
|
||||
length === rState.length ||
|
||||
rState.length < rState.highWaterMark ||
|
||||
rState.length === 0) {
|
||||
process.nextTick(callback);
|
||||
} else {
|
||||
this[kCallback] = callback;
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
process.nextTick(callback, err);
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
process.nextTick(callback, err);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Transform.prototype._read = function() {
|
||||
|
||||
@ -647,7 +647,7 @@ function needFinish(state) {
|
||||
function callFinal(stream, state) {
|
||||
state.sync = true;
|
||||
state.pendingcb++;
|
||||
stream._final((err) => {
|
||||
const result = stream._final((err) => {
|
||||
state.pendingcb--;
|
||||
if (err) {
|
||||
for (const callback of state[kOnFinished].splice(0)) {
|
||||
@ -664,6 +664,31 @@ function callFinal(stream, state) {
|
||||
process.nextTick(finish, stream, state);
|
||||
}
|
||||
});
|
||||
if (result !== undefined && result !== null) {
|
||||
try {
|
||||
const then = result.then;
|
||||
if (typeof then === 'function') {
|
||||
then.call(
|
||||
result,
|
||||
function() {
|
||||
if (state.prefinished)
|
||||
return;
|
||||
state.prefinish = true;
|
||||
process.nextTick(() => stream.emit('prefinish'));
|
||||
state.pendingcb++;
|
||||
process.nextTick(finish, stream, state);
|
||||
},
|
||||
function(err) {
|
||||
for (const callback of state[kOnFinished].splice(0)) {
|
||||
process.nextTick(callback, err);
|
||||
}
|
||||
process.nextTick(errorOrDestroy, stream, err, state.sync);
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
process.nextTick(errorOrDestroy, stream, err, state.sync);
|
||||
}
|
||||
}
|
||||
state.sync = false;
|
||||
}
|
||||
|
||||
|
||||
@ -59,10 +59,13 @@ function destroy(err, cb) {
|
||||
}
|
||||
|
||||
function _destroy(self, err, cb) {
|
||||
self._destroy(err || null, (err) => {
|
||||
let called = false;
|
||||
const result = self._destroy(err || null, (err) => {
|
||||
const r = self._readableState;
|
||||
const w = self._writableState;
|
||||
|
||||
called = true;
|
||||
|
||||
if (err) {
|
||||
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
|
||||
err.stack;
|
||||
@ -92,6 +95,64 @@ function _destroy(self, err, cb) {
|
||||
process.nextTick(emitCloseNT, self);
|
||||
}
|
||||
});
|
||||
if (result !== undefined && result !== null) {
|
||||
try {
|
||||
const then = result.then;
|
||||
if (typeof then === 'function') {
|
||||
then.call(
|
||||
result,
|
||||
function() {
|
||||
if (called)
|
||||
return;
|
||||
|
||||
const r = self._readableState;
|
||||
const w = self._writableState;
|
||||
|
||||
if (w) {
|
||||
w.closed = true;
|
||||
}
|
||||
if (r) {
|
||||
r.closed = true;
|
||||
}
|
||||
|
||||
if (typeof cb === 'function') {
|
||||
process.nextTick(cb);
|
||||
}
|
||||
|
||||
process.nextTick(emitCloseNT, self);
|
||||
},
|
||||
function(err) {
|
||||
const r = self._readableState;
|
||||
const w = self._writableState;
|
||||
err.stack;
|
||||
|
||||
called = true;
|
||||
|
||||
if (w && !w.errored) {
|
||||
w.errored = err;
|
||||
}
|
||||
if (r && !r.errored) {
|
||||
r.errored = err;
|
||||
}
|
||||
|
||||
if (w) {
|
||||
w.closed = true;
|
||||
}
|
||||
if (r) {
|
||||
r.closed = true;
|
||||
}
|
||||
|
||||
if (typeof cb === 'function') {
|
||||
process.nextTick(cb, err);
|
||||
}
|
||||
|
||||
process.nextTick(emitErrorCloseNT, self, err);
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
process.nextTick(emitErrorNT, self, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function emitErrorCloseNT(self, err) {
|
||||
@ -230,7 +291,7 @@ function constructNT(stream) {
|
||||
const s = w || r;
|
||||
|
||||
let called = false;
|
||||
stream._construct((err) => {
|
||||
const result = stream._construct((err) => {
|
||||
if (r) {
|
||||
r.constructed = true;
|
||||
}
|
||||
@ -252,6 +313,47 @@ function constructNT(stream) {
|
||||
process.nextTick(emitConstructNT, stream);
|
||||
}
|
||||
});
|
||||
if (result !== undefined && result !== null) {
|
||||
try {
|
||||
const then = result.then;
|
||||
if (typeof then === 'function') {
|
||||
then.call(
|
||||
result,
|
||||
function() {
|
||||
// If the callback was invoked, do nothing further.
|
||||
if (called)
|
||||
return;
|
||||
if (r) {
|
||||
r.constructed = true;
|
||||
}
|
||||
if (w) {
|
||||
w.constructed = true;
|
||||
}
|
||||
if (s.destroyed) {
|
||||
process.nextTick(() => stream.emit(kDestroy));
|
||||
} else {
|
||||
process.nextTick(emitConstructNT, stream);
|
||||
}
|
||||
},
|
||||
function(err) {
|
||||
if (r) {
|
||||
r.constructed = true;
|
||||
}
|
||||
if (w) {
|
||||
w.constructed = true;
|
||||
}
|
||||
called = true;
|
||||
if (s.destroyed) {
|
||||
process.nextTick(() => stream.emit(kDestroy, err));
|
||||
} else {
|
||||
process.nextTick(errorOrDestroy, stream, err);
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
process.nextTick(emitErrorNT, stream, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function emitConstructNT(stream) {
|
||||
|
||||
258
test/parallel/test-stream-construct-async-error.js
Normal file
258
test/parallel/test-stream-construct-async-error.js
Normal file
@ -0,0 +1,258 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const {
|
||||
Duplex,
|
||||
Writable,
|
||||
Transform,
|
||||
} = require('stream');
|
||||
const { setTimeout } = require('timers/promises');
|
||||
const assert = require('assert');
|
||||
|
||||
{
|
||||
class Foo extends Duplex {
|
||||
async _construct(cb) {
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
await setTimeout(common.platformTimeout(1));
|
||||
cb();
|
||||
throw new Error('boom');
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.on('error', common.expectsError({
|
||||
message: 'boom'
|
||||
}));
|
||||
foo.on('close', common.mustCall(() => {
|
||||
assert(foo._writableState.constructed);
|
||||
assert(foo._readableState.constructed);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
class Foo extends Duplex {
|
||||
async _destroy(err, cb) {
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
await setTimeout(common.platformTimeout(1));
|
||||
throw new Error('boom');
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.destroy();
|
||||
foo.on('error', common.expectsError({
|
||||
message: 'boom'
|
||||
}));
|
||||
foo.on('close', common.mustCall(() => {
|
||||
assert(foo.destroyed);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
class Foo extends Duplex {
|
||||
async _destroy(err, cb) {
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
await setTimeout(common.platformTimeout(1));
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.destroy();
|
||||
foo.on('close', common.mustCall(() => {
|
||||
assert(foo.destroyed);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
class Foo extends Duplex {
|
||||
async _construct() {
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
await setTimeout(common.platformTimeout(1));
|
||||
}
|
||||
|
||||
_write = common.mustCall((chunk, encoding, cb) => {
|
||||
cb();
|
||||
})
|
||||
|
||||
_read() {}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.write('test', common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
class Foo extends Duplex {
|
||||
async _construct(callback) {
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
await setTimeout(common.platformTimeout(1));
|
||||
callback();
|
||||
}
|
||||
|
||||
_write = common.mustCall((chunk, encoding, cb) => {
|
||||
cb();
|
||||
})
|
||||
|
||||
_read() {}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.write('test', common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
class Foo extends Writable {
|
||||
_write = common.mustCall((chunk, encoding, cb) => {
|
||||
cb();
|
||||
})
|
||||
|
||||
async _final() {
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
await setTimeout(common.platformTimeout(1));
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.end('hello');
|
||||
foo.on('finish', common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
class Foo extends Writable {
|
||||
_write = common.mustCall((chunk, encoding, cb) => {
|
||||
cb();
|
||||
})
|
||||
|
||||
async _final(callback) {
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
await setTimeout(common.platformTimeout(1));
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.end('hello');
|
||||
foo.on('finish', common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
class Foo extends Writable {
|
||||
_write = common.mustCall((chunk, encoding, cb) => {
|
||||
cb();
|
||||
})
|
||||
|
||||
async _final() {
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
await setTimeout(common.platformTimeout(1));
|
||||
throw new Error('boom');
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.end('hello');
|
||||
foo.on('error', common.expectsError({
|
||||
message: 'boom'
|
||||
}));
|
||||
foo.on('close', common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const expected = ['hello', 'world'];
|
||||
class Foo extends Transform {
|
||||
async _flush() {
|
||||
return 'world';
|
||||
}
|
||||
|
||||
_transform(chunk, encoding, callback) {
|
||||
callback(null, chunk);
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.end('hello');
|
||||
foo.on('data', common.mustCall((chunk) => {
|
||||
assert.strictEqual(chunk.toString(), expected.shift());
|
||||
}, 2));
|
||||
}
|
||||
|
||||
{
|
||||
const expected = ['hello', 'world'];
|
||||
class Foo extends Transform {
|
||||
async _flush(callback) {
|
||||
callback(null, 'world');
|
||||
}
|
||||
|
||||
_transform(chunk, encoding, callback) {
|
||||
callback(null, chunk);
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.end('hello');
|
||||
foo.on('data', common.mustCall((chunk) => {
|
||||
assert.strictEqual(chunk.toString(), expected.shift());
|
||||
}, 2));
|
||||
}
|
||||
|
||||
{
|
||||
class Foo extends Transform {
|
||||
async _flush(callback) {
|
||||
throw new Error('boom');
|
||||
}
|
||||
|
||||
_transform(chunk, encoding, callback) {
|
||||
callback(null, chunk);
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.end('hello');
|
||||
foo.on('data', common.mustCall());
|
||||
foo.on('error', common.expectsError({
|
||||
message: 'boom'
|
||||
}));
|
||||
foo.on('close', common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
class Foo extends Transform {
|
||||
async _transform(chunk) {
|
||||
return chunk.toString().toUpperCase();
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.end('hello');
|
||||
foo.on('data', common.mustCall((chunk) => {
|
||||
assert.strictEqual(chunk.toString(), 'HELLO');
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
class Foo extends Transform {
|
||||
async _transform(chunk, _, callback) {
|
||||
callback(null, chunk.toString().toUpperCase());
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.end('hello');
|
||||
foo.on('data', common.mustCall((chunk) => {
|
||||
assert.strictEqual(chunk.toString(), 'HELLO');
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
class Foo extends Transform {
|
||||
async _transform() {
|
||||
throw new Error('boom');
|
||||
}
|
||||
}
|
||||
|
||||
const foo = new Foo();
|
||||
foo.end('hello');
|
||||
foo.on('error', common.expectsError({
|
||||
message: 'boom'
|
||||
}));
|
||||
foo.on('close', common.mustCall());
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user