Revert "http: align with stream.Writable"

This reverts commit e2f5bb7574.

Reverted as it caused a significant performance regression.

See: https://github.com/nodejs/node/issues/37937

PR-URL: https://github.com/nodejs/node/pull/37963
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Beth Griggs <bgriggs@redhat.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
This commit is contained in:
Matteo Collina 2021-03-29 11:20:57 +02:00
parent 82bc5c3d5c
commit 16920db55e
8 changed files with 236 additions and 249 deletions

View File

@ -58,7 +58,7 @@ const Agent = require('_http_agent');
const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { URL, urlToHttpOptions, searchParamsSymbol } = require('internal/url');
const { kOutHeaders } = require('internal/http');
const { kOutHeaders, kNeedDrain } = require('internal/http');
const { connResetException, codes } = require('internal/errors');
const {
ERR_HTTP_HEADERS_SENT,
@ -98,7 +98,7 @@ class HTTPClientAsyncResource {
}
function ClientRequest(input, options, cb) {
FunctionPrototypeCall(OutgoingMessage, this, { autoDestroy: false });
FunctionPrototypeCall(OutgoingMessage, this);
if (typeof input === 'string') {
const urlStr = input;
@ -304,7 +304,7 @@ function ClientRequest(input, options, cb) {
if (typeof optsWithoutSignal.createConnection === 'function') {
const oncreate = once((err, socket) => {
if (err) {
process.nextTick(() => emitError(this, err));
process.nextTick(() => this.emit('error', err));
} else {
this.onSocket(socket);
}
@ -373,8 +373,8 @@ function emitAbortNT(req) {
function ondrain() {
const msg = this._httpMessage;
if (msg && !msg.finished && msg._writableState.needDrain) {
msg._writableState.needDrain = false;
if (msg && !msg.finished && msg[kNeedDrain]) {
msg[kNeedDrain] = false;
msg.emit('drain');
}
}
@ -400,7 +400,8 @@ function socketCloseListener() {
if (!res.complete) {
res.destroy(connResetException('aborted'));
}
emitClose(req);
req._closed = true;
req.emit('close');
if (!res.aborted && res.readable) {
res.push(null);
}
@ -410,9 +411,10 @@ function socketCloseListener() {
// receive a response. The error needs to
// fire on the request.
req.socket._hadError = true;
emitError(req, connResetException('socket hang up'));
req.emit('error', connResetException('socket hang up'));
}
emitClose(req);
req._closed = true;
req.emit('close');
}
// Too bad. That output wasn't getting written.
@ -436,7 +438,7 @@ function socketErrorListener(err) {
// For Safety. Some additional errors might fire later on
// and we need to make sure we don't double-fire the error event.
req.socket._hadError = true;
emitError(req, err);
req.emit('error', err);
}
const parser = socket.parser;
@ -460,7 +462,7 @@ function socketOnEnd() {
// If we don't have a response then we know that the socket
// ended prematurely and we need to emit an error on the request.
req.socket._hadError = true;
emitError(req, connResetException('socket hang up'));
req.emit('error', connResetException('socket hang up'));
}
if (parser) {
parser.finish();
@ -483,7 +485,7 @@ function socketOnData(d) {
freeParser(parser, req, socket);
socket.destroy();
req.socket._hadError = true;
emitError(req, ret);
req.emit('error', ret);
} else if (parser.incoming && parser.incoming.upgrade) {
// Upgrade (if status code 101) or CONNECT
const bytesParsed = ret;
@ -515,7 +517,9 @@ function socketOnData(d) {
socket.readableFlowing = null;
req.emit(eventName, res, socket, bodyHead);
emitClose(req);
req.destroyed = true;
req._closed = true;
req.emit('close');
} else {
// Requested Upgrade or used CONNECT method, but have no handler.
socket.destroy();
@ -700,7 +704,8 @@ function requestOnPrefinish() {
}
function emitFreeNT(req) {
emitClose(req);
req._closed = true;
req.emit('close');
if (req.socket) {
req.socket.emit('free');
}
@ -781,10 +786,10 @@ function onSocketNT(req, socket, err) {
err = connResetException('socket hang up');
}
if (err) {
emitError(req, err);
req.emit('error', err);
}
req._closed = true;
emitClose(req);
req.emit('close');
}
if (socket) {
@ -864,23 +869,6 @@ function setSocketTimeout(sock, msecs) {
}
}
function emitError(req, err) {
req.destroyed = true;
req._writableState.errored = err;
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack; // eslint-disable-line no-unused-expressions
req._writableState.errorEmitted = true;
req.emit('error', err);
}
function emitClose(req) {
req.destroyed = true;
req._closed = true;
req._writableState.closed = true;
req._writableState.closeEmitted = true;
req.emit('close');
}
ClientRequest.prototype.setNoDelay = function setNoDelay(noDelay) {
this._deferToConnect('setNoDelay', [noDelay]);
};

View File

@ -28,7 +28,6 @@ const {
ArrayPrototypeJoin,
ArrayPrototypePush,
ArrayPrototypeUnshift,
Error,
FunctionPrototype,
FunctionPrototypeBind,
FunctionPrototypeCall,
@ -48,9 +47,9 @@ const {
const { getDefaultHighWaterMark } = require('internal/streams/state');
const assert = require('internal/assert');
const EE = require('events');
const { Stream, Writable } = require('stream');
const Stream = require('stream');
const internalUtil = require('internal/util');
const { kOutHeaders, utcDate } = require('internal/http');
const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http');
const { Buffer } = require('buffer');
const common = require('_http_common');
const checkIsHttpToken = common._checkIsHttpToken;
@ -79,12 +78,11 @@ const {
} = require('internal/errors');
const { validateString } = require('internal/validators');
const { isUint8Array } = require('internal/util/types');
const { construct, destroy } = require('internal/streams/destroy');
const HIGH_WATER_MARK = getDefaultHighWaterMark();
const { CRLF, debug } = common;
const kCorked = Symbol('kCorked');
const kCorked = Symbol('corked');
const nop = FunctionPrototype;
@ -98,7 +96,7 @@ function isCookieField(s) {
return s.length === 6 && StringPrototypeToLowerCase(s) === 'cookie';
}
function OutgoingMessage(opts) {
function OutgoingMessage() {
FunctionPrototypeCall(Stream, this);
// Queue that holds all currently pending data, until the response will be
@ -111,6 +109,9 @@ function OutgoingMessage(opts) {
// TCP socket and HTTP Parser and thus handle the backpressure.
this.outputSize = 0;
this.writable = true;
this.destroyed = false;
this._last = false;
this.chunkedEncoding = false;
this.shouldKeepAlive = true;
@ -124,9 +125,12 @@ function OutgoingMessage(opts) {
this._contentLength = null;
this._hasBody = true;
this._trailer = '';
this[kNeedDrain] = false;
this.finished = false;
this._headerSent = false;
this[kCorked] = 0;
this._closed = false;
this.socket = null;
this._header = null;
@ -135,58 +139,23 @@ function OutgoingMessage(opts) {
this._keepAliveTimeout = 0;
this._onPendingData = nop;
this._writableState = {
objectMode: false,
writable: true,
constructed: false,
corked: 0,
prefinished: false,
destroyed: false,
closed: false,
closeEmitted: false,
errored: null,
errorEmitted: false,
needDrain: false,
autoDestroy: opts?.autoDestroy == null ? true : false,
emitClose: true,
ended: false,
ending: false,
finished: false
};
construct(this, () => {
this._flush();
});
}
ObjectSetPrototypeOf(OutgoingMessage.prototype, Writable.prototype);
ObjectSetPrototypeOf(OutgoingMessage, Writable);
ObjectSetPrototypeOf(OutgoingMessage.prototype, Stream.prototype);
ObjectSetPrototypeOf(OutgoingMessage, Stream);
ObjectDefineProperty(OutgoingMessage.prototype, 'finished', {
ObjectDefineProperty(OutgoingMessage.prototype, 'writableFinished', {
get() {
return this._writableState.ended;
},
set(value) {
this._writableState.ended = value;
return (
this.finished &&
this.outputSize === 0 &&
(!this.socket || this.socket.writableLength === 0)
);
}
});
ObjectDefineProperty(OutgoingMessage.prototype, '_closed', {
ObjectDefineProperty(OutgoingMessage.prototype, 'writableObjectMode', {
get() {
return this._writableState.closed;
},
set(value) {
this._writableState.closed = value;
}
});
ObjectDefineProperty(OutgoingMessage.prototype, 'writable', {
get() {
// Compat.
return this._writableState.writable;
},
set(value) {
this._writableState.writable = value;
return false;
}
});
@ -202,6 +171,13 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', {
}
});
ObjectDefineProperty(OutgoingMessage.prototype, 'writableCorked', {
get() {
const corked = this.socket ? this.socket.writableCorked : 0;
return corked + this[kCorked];
}
});
ObjectDefineProperty(OutgoingMessage.prototype, '_headers', {
get: internalUtil.deprecate(function() {
return this.getHeaders();
@ -265,6 +241,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, '_headerNames', {
}, 'OutgoingMessage.prototype._headerNames is deprecated', 'DEP0066')
});
OutgoingMessage.prototype._renderHeaders = function _renderHeaders() {
if (this._header) {
throw new ERR_HTTP_HEADERS_SENT('render');
@ -286,8 +263,6 @@ OutgoingMessage.prototype._renderHeaders = function _renderHeaders() {
};
OutgoingMessage.prototype.cork = function() {
this._writableState.corked++;
if (this.socket) {
this.socket.cork();
} else {
@ -296,10 +271,6 @@ OutgoingMessage.prototype.cork = function() {
};
OutgoingMessage.prototype.uncork = function() {
if (this._writableState.corked) {
this._writableState.corked--;
}
if (this.socket) {
this.socket.uncork();
} else if (this[kCorked]) {
@ -308,6 +279,7 @@ OutgoingMessage.prototype.uncork = function() {
};
OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {
if (callback) {
this.on('timeout', callback);
}
@ -322,48 +294,22 @@ OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {
return this;
};
OutgoingMessage.prototype._construct = function(callback) {
if (this.socket) {
for (let n = 0; n < this[kCorked]; ++n) {
this.socket.cork();
}
callback();
} else {
// TODO(ronag): What if never assigned socket?
this.once('socket', function(socket) {
for (let n = 0; n < this[kCorked]; ++n) {
socket.cork();
}
callback();
});
// It's possible that the socket will be destroyed, and removed from
// any messages, before ever calling this. In that case, just skip
// it, since something else is destroying this connection anyway.
OutgoingMessage.prototype.destroy = function destroy(error) {
if (this.destroyed) {
return this;
}
};
OutgoingMessage.prototype.destroy = destroy;
OutgoingMessage.prototype._destroy = function(err, callback) {
if (!this.writableEnded) {
this.aborted = true;
this.emit('aborted');
}
// TODO(ronag): Why is this needed?
const cb = (err) => {
const triggerAsyncId = this.socket ?
this.socket[async_id_symbol] : undefined;
defaultTriggerAsyncIdScope(triggerAsyncId, callback, err);
};
this.destroyed = true;
if (this.socket) {
Stream.finished(this.socket.destroy(err), (er) => {
if (er && er.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
err = er;
}
cb(err);
});
this.socket.destroy(error);
} else {
cb(err);
this.once('socket', function socketDestroyOnConnect(socket) {
socket.destroy(error);
});
}
return this;
@ -740,6 +686,16 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'headersSent', {
get: function() { return !!this._header; }
});
ObjectDefineProperty(OutgoingMessage.prototype, 'writableEnded', {
get: function() { return this.finished; }
});
ObjectDefineProperty(OutgoingMessage.prototype, 'writableNeedDrain', {
get: function() {
return !this.destroyed && !this.finished && this[kNeedDrain];
}
});
const crlf_buf = Buffer.from(CRLF);
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
if (typeof encoding === 'function') {
@ -747,15 +703,30 @@ OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
encoding = null;
}
const ret = write_(this, chunk, encoding, callback, false) === true;
const ret = write_(this, chunk, encoding, callback, false);
if (!ret)
this._writableState.needDrain = true;
this[kNeedDrain] = true;
return ret;
};
function write_(msg, chunk, encoding, callback, fromEnd) {
const state = msg._writableState;
function onError(msg, err, callback) {
const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined;
defaultTriggerAsyncIdScope(triggerAsyncId,
process.nextTick,
emitErrorNt,
msg,
err,
callback);
}
function emitErrorNt(msg, err, callback) {
callback(err);
if (typeof msg.emit === 'function' && !msg._closed) {
msg.emit('error', err);
}
}
function write_(msg, chunk, encoding, callback, fromEnd) {
if (typeof callback !== 'function')
callback = nop;
@ -772,16 +743,19 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
}
let err;
if (state.ending) {
if (msg.finished) {
err = new ERR_STREAM_WRITE_AFTER_END();
} else if (state.destroyed) {
} else if (msg.destroyed) {
err = new ERR_STREAM_DESTROYED('write');
}
if (err) {
process.nextTick(callback, err);
msg.destroy(err);
return err;
if (!msg.destroyed) {
onError(msg, err, callback);
} else {
process.nextTick(callback, err);
}
return false;
}
if (!msg._header) {
@ -798,9 +772,9 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
return true;
}
if (!fromEnd && !state.corked) {
msg.cork();
process.nextTick(uncorkNT, msg);
if (!fromEnd && msg.socket && !msg.socket.writableCorked) {
msg.socket.cork();
process.nextTick(connectionCorkNT, msg.socket);
}
let ret;
@ -818,8 +792,8 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
}
function uncorkNT(msg) {
msg.uncork();
function connectionCorkNT(conn) {
conn.uncork();
}
@ -850,24 +824,12 @@ OutgoingMessage.prototype.addTrailers = function addTrailers(headers) {
}
};
function onFinish(err) {
const state = this._writableState;
if (err || state.errored || state.finished) {
return;
}
state.finished = true;
this.emit('finish');
if (state.autoDestroy) {
this.destroy();
}
function onFinish(outmsg) {
if (outmsg && outmsg.socket && outmsg.socket._hadError) return;
outmsg.emit('finish');
}
OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
const state = this._writableState;
if (typeof chunk === 'function') {
callback = chunk;
chunk = null;
@ -877,90 +839,74 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
encoding = null;
}
this.cork();
let err;
if (chunk) {
const ret = write_(this, chunk, encoding, null, true);
if (ret instanceof Error) {
err = ret;
if (this.finished) {
onError(this,
new ERR_STREAM_WRITE_AFTER_END(),
typeof callback !== 'function' ? nop : callback);
return this;
}
if (this.socket) {
this.socket.cork();
}
write_(this, chunk, encoding, null, true);
} else if (this.finished) {
if (typeof callback === 'function') {
if (!this.writableFinished) {
this.on('finish', callback);
} else {
callback(new ERR_STREAM_ALREADY_FINISHED('end'));
}
}
return this;
} else if (!this._header) {
if (this.socket) {
this.socket.cork();
}
this._contentLength = 0;
this._implicitHeader();
}
if (err) {
// Do nothing...
} else if (!state.errored && !state.ending) {
state.ending = true;
if (typeof callback === 'function')
this.once('finish', callback);
if (!this._header) {
this._contentLength = 0;
this._implicitHeader();
}
const finish = FunctionPrototypeBind(onFinish, undefined, this);
const finish = FunctionPrototypeBind(onFinish, this);
state.finalCalled = true;
if (this._hasBody && this.chunkedEncoding) {
this._send('0' + CRLF + this._trailer + CRLF, 'latin1', finish);
} else {
// Force a flush, HACK.
this._send('', 'latin1', finish);
}
while (state.corked) {
// Fully uncork connection on end().
this.uncork();
}
state.ended = true;
// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
debug('outgoing message end.');
if (this.outputData.length === 0 && this.socket?._httpMessage === this) {
this._finish();
}
} else if (state.finished) {
err = new ERR_STREAM_ALREADY_FINISHED('end');
} else if (state.destroyed) {
err = new ERR_STREAM_DESTROYED('end');
if (this._hasBody && this.chunkedEncoding) {
this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish);
} else {
// Force a flush, HACK.
this._send('', 'latin1', finish);
}
if (typeof callback === 'function') {
if (err || state.finished) {
process.nextTick(callback, err);
} else {
onFinished(this, callback);
}
if (this.socket) {
// Fully uncork connection on end().
this.socket._writableState.corked = 1;
this.socket.uncork();
}
this[kCorked] = 0;
this.uncork();
this.finished = true;
// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
debug('outgoing message end.');
if (this.outputData.length === 0 &&
this.socket &&
this.socket._httpMessage === this) {
this._finish();
}
return this;
};
function onFinished(msg, callback) {
const onDone = (err) => {
msg
.off('finish', onDone)
.off(EE.errorMonitor, onDone);
callback(err);
};
msg
.on('finish', onDone)
.on(EE.errorMonitor, onDone);
}
OutgoingMessage.prototype._finish = function _finish() {
const state = this._writableState;
if (!state.prefinished) {
assert(this.socket);
state.prefinished = true;
this.emit('prefinish');
}
assert(this.socket);
this.emit('prefinish');
};
@ -993,14 +939,19 @@ OutgoingMessage.prototype._flush = function _flush() {
if (this.finished) {
// This is a queue to the server or client to bring in the next this.
this._finish();
} else if (ret && this._writableState.needDrain) {
this._writableState.needDrain = false;
} else if (ret && this[kNeedDrain]) {
this[kNeedDrain] = false;
this.emit('drain');
}
}
};
OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
while (this[kCorked]) {
this[kCorked]--;
socket.cork();
}
const outputLength = this.outputData.length;
if (outputLength <= 0)
return undefined;

View File

@ -59,6 +59,7 @@ const {
const { OutgoingMessage } = require('_http_outgoing');
const {
kOutHeaders,
kNeedDrain,
emitStatistics
} = require('internal/http');
const {
@ -226,7 +227,11 @@ function onServerResponseClose() {
// Ergo, we need to deal with stale 'close' events and handle the case
// where the ServerResponse object has already been deconstructed.
// Fortunately, that requires only a single if check. :-)
this._httpMessage?.destroy();
if (this._httpMessage) {
this._httpMessage.destroyed = true;
this._httpMessage._closed = true;
this._httpMessage.emit('close');
}
}
ServerResponse.prototype.assignSocket = function assignSocket(socket) {
@ -235,6 +240,7 @@ ServerResponse.prototype.assignSocket = function assignSocket(socket) {
socket.on('close', onServerResponseClose);
this.socket = socket;
this.emit('socket', socket);
this._flush();
};
ServerResponse.prototype.detachSocket = function detachSocket(socket) {
@ -550,8 +556,8 @@ function socketOnDrain(socket, state) {
}
const msg = socket._httpMessage;
if (msg && !msg.finished && msg._writableState.needDrain) {
msg._writableState.needDrain = false;
if (msg && !msg.finished && msg[kNeedDrain]) {
msg[kNeedDrain] = false;
msg.emit('drain');
}
}
@ -578,6 +584,7 @@ function abortIncoming(incoming) {
const req = ArrayPrototypeShift(incoming);
req.destroy(connResetException('aborted'));
}
// Abort socket._httpMessage ?
}
function socketOnEnd(server, socket, parser, state) {
@ -797,6 +804,7 @@ function resOnFinish(req, res, socket, state, server) {
res.detachSocket(socket);
clearIncoming(req);
process.nextTick(emitCloseNT, res);
if (res._last) {
if (typeof socket.destroySoon === 'function') {
@ -818,6 +826,12 @@ function resOnFinish(req, res, socket, state, server) {
}
}
function emitCloseNT(self) {
self.destroyed = true;
self._closed = true;
self.emit('close');
}
// The following callback is issued after the headers have been read on a
// new message. In this callback we setup the response object and pass it
// to the user.

View File

@ -44,6 +44,7 @@ function emitStatistics(statistics) {
module.exports = {
kOutHeaders: Symbol('kOutHeaders'),
kNeedDrain: Symbol('kNeedDrain'),
utcDate,
emitStatistics
};

View File

@ -105,20 +105,17 @@ function remoteClose() {
function remoteError() {
// Remote server will destroy the socket
const req = get('/error', common.mustNotCall());
req.on('socket', common.mustCall((socket) => {
socket.on('end', common.mustCall(() => {
assert.strictEqual(agent.sockets[name].length, 1);
assert.strictEqual(agent.freeSockets[name], undefined);
}));
}));
req.on('error', common.mustCall((err) => {
assert(err);
assert.strictEqual(err.message, 'socket hang up');
}));
req.on('close', common.mustCall((err) => {
assert.strictEqual(agent.sockets[name], undefined);
assert.strictEqual(agent.sockets[name].length, 1);
assert.strictEqual(agent.freeSockets[name], undefined);
server.close();
// Wait socket 'close' event emit
setTimeout(common.mustCall(() => {
assert.strictEqual(agent.sockets[name], undefined);
assert.strictEqual(agent.freeSockets[name], undefined);
server.close();
}), common.platformTimeout(1));
}));
}

View File

@ -3,21 +3,23 @@ const common = require('../common');
const assert = require('assert');
const http = require('http');
const onWriteAfterEndError = common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
}, 2);
const server = http.createServer(common.mustCall(function(req, res) {
res.end('testing ended state', common.mustCall());
assert.strictEqual(res.writableCorked, 0);
res.end(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
}));
res.end('end', common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
}));
res.on('error', common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
}));
res.on('close', common.mustCall(() => {
assert.strictEqual(res.writableCorked, 0);
res.end('end', onWriteAfterEndError);
assert.strictEqual(res.writableCorked, 0);
res.on('error', onWriteAfterEndError);
res.on('finish', common.mustCall(() => {
res.end(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
server.close();
}));
}));

View File

@ -14,7 +14,7 @@ events.captureRejections = true;
throw _err;
}));
res.on('error', common.mustCall((err) => {
res.socket.on('error', common.mustCall((err) => {
assert.strictEqual(err, _err);
}));

View File

@ -253,6 +253,40 @@ const net = require('net');
});
}
{
const server = http.createServer((req, res) => {
pipeline(req, res, common.mustSucceed());
});
server.listen(0, () => {
const req = http.request({
port: server.address().port
});
let sent = 0;
const rs = new Readable({
read() {
if (sent++ > 10) {
return;
}
rs.push('hello');
}
});
pipeline(rs, req, common.mustCall(() => {
server.close();
}));
req.on('response', (res) => {
let cnt = 10;
res.on('data', () => {
cnt--;
if (cnt === 0) rs.destroy();
});
});
});
}
{
const makeTransform = () => {
const tr = new Transform({