mirror of
https://github.com/nodejs/node.git
synced 2025-12-28 07:50:41 +00:00
stream: fix code style
PR-URL: https://github.com/nodejs/node/pull/51168 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
89ddc98b95
commit
20c63134fc
@ -11,7 +11,6 @@ const {
|
||||
FunctionPrototypeCall,
|
||||
MathMin,
|
||||
NumberIsInteger,
|
||||
ObjectCreate,
|
||||
ObjectDefineProperties,
|
||||
ObjectSetPrototypeOf,
|
||||
Promise,
|
||||
@ -95,9 +94,9 @@ const {
|
||||
AsyncIterator,
|
||||
cloneAsUint8Array,
|
||||
copyArrayBuffer,
|
||||
createPromiseCallback,
|
||||
customInspect,
|
||||
dequeueValue,
|
||||
ensureIsPromise,
|
||||
enqueueValueWithSize,
|
||||
extractHighWaterMark,
|
||||
extractSizeAlgorithm,
|
||||
@ -251,19 +250,7 @@ class ReadableStream {
|
||||
markTransferMode(this, false, true);
|
||||
if (source === null)
|
||||
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
|
||||
this[kState] = {
|
||||
disturbed: false,
|
||||
reader: undefined,
|
||||
state: 'readable',
|
||||
storedError: undefined,
|
||||
stream: undefined,
|
||||
transfer: {
|
||||
writable: undefined,
|
||||
port1: undefined,
|
||||
port2: undefined,
|
||||
promise: undefined,
|
||||
},
|
||||
};
|
||||
this[kState] = createReadableStreamState();
|
||||
|
||||
this[kIsClosedPromise] = createDeferredPromise();
|
||||
this[kControllerErrorFunction] = () => {};
|
||||
@ -649,19 +636,7 @@ ObjectDefineProperties(ReadableStream, {
|
||||
function InternalTransferredReadableStream() {
|
||||
markTransferMode(this, false, true);
|
||||
this[kType] = 'ReadableStream';
|
||||
this[kState] = {
|
||||
disturbed: false,
|
||||
reader: undefined,
|
||||
state: 'readable',
|
||||
storedError: undefined,
|
||||
stream: undefined,
|
||||
transfer: {
|
||||
writable: undefined,
|
||||
port1: undefined,
|
||||
port2: undefined,
|
||||
promise: undefined,
|
||||
},
|
||||
};
|
||||
this[kState] = createReadableStreamState();
|
||||
|
||||
this[kIsClosedPromise] = createDeferredPromise();
|
||||
}
|
||||
@ -1230,41 +1205,58 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, {
|
||||
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name),
|
||||
});
|
||||
|
||||
function TeeReadableStream(start, pull, cancel) {
|
||||
function InternalReadableStream(start, pull, cancel, highWaterMark, size) {
|
||||
markTransferMode(this, false, true);
|
||||
this[kType] = 'ReadableStream';
|
||||
this[kState] = {
|
||||
disturbed: false,
|
||||
state: 'readable',
|
||||
storedError: undefined,
|
||||
stream: undefined,
|
||||
transfer: {
|
||||
writable: undefined,
|
||||
port: undefined,
|
||||
promise: undefined,
|
||||
},
|
||||
};
|
||||
this[kState] = createReadableStreamState();
|
||||
this[kIsClosedPromise] = createDeferredPromise();
|
||||
setupReadableStreamDefaultControllerFromSource(
|
||||
const controller = new ReadableStreamDefaultController(kSkipThrow);
|
||||
setupReadableStreamDefaultController(
|
||||
this,
|
||||
ObjectCreate(null, {
|
||||
start: { __proto__: null, value: start },
|
||||
pull: { __proto__: null, value: pull },
|
||||
cancel: { __proto__: null, value: cancel },
|
||||
}),
|
||||
1,
|
||||
() => 1);
|
||||
controller,
|
||||
start,
|
||||
pull,
|
||||
cancel,
|
||||
highWaterMark,
|
||||
size);
|
||||
}
|
||||
|
||||
ObjectSetPrototypeOf(TeeReadableStream.prototype, ReadableStream.prototype);
|
||||
ObjectSetPrototypeOf(TeeReadableStream, ReadableStream);
|
||||
ObjectSetPrototypeOf(InternalReadableStream.prototype, ReadableStream.prototype);
|
||||
ObjectSetPrototypeOf(InternalReadableStream, ReadableStream);
|
||||
|
||||
function createTeeReadableStream(start, pull, cancel) {
|
||||
const tee = new TeeReadableStream(start, pull, cancel);
|
||||
function createReadableStream(start, pull, cancel, highWaterMark = 1, size = () => 1) {
|
||||
const stream = new InternalReadableStream(start, pull, cancel, highWaterMark, size);
|
||||
|
||||
// For spec compliance the Tee must be a ReadableStream
|
||||
tee.constructor = ReadableStream;
|
||||
return tee;
|
||||
// For spec compliance the InternalReadableStream must be a ReadableStream
|
||||
stream.constructor = ReadableStream;
|
||||
return stream;
|
||||
}
|
||||
|
||||
function InternalReadableByteStream(start, pull, cancel) {
|
||||
markTransferMode(this, false, true);
|
||||
this[kType] = 'ReadableStream';
|
||||
this[kState] = createReadableStreamState();
|
||||
this[kIsClosedPromise] = createDeferredPromise();
|
||||
const controller = new ReadableByteStreamController(kSkipThrow);
|
||||
setupReadableByteStreamController(
|
||||
this,
|
||||
controller,
|
||||
start,
|
||||
pull,
|
||||
cancel,
|
||||
0,
|
||||
undefined);
|
||||
}
|
||||
|
||||
ObjectSetPrototypeOf(InternalReadableByteStream.prototype, ReadableStream.prototype);
|
||||
ObjectSetPrototypeOf(InternalReadableByteStream, ReadableStream);
|
||||
|
||||
function createReadableByteStream(start, pull, cancel) {
|
||||
const stream = new InternalReadableByteStream(start, pull, cancel);
|
||||
|
||||
// For spec compliance the InternalReadableByteStream must be a ReadableStream
|
||||
stream.constructor = ReadableStream;
|
||||
return stream;
|
||||
}
|
||||
|
||||
const isReadableStream =
|
||||
@ -1280,6 +1272,23 @@ const isReadableStreamBYOBReader =
|
||||
|
||||
// ---- ReadableStream Implementation
|
||||
|
||||
function createReadableStreamState() {
|
||||
return {
|
||||
__proto__: null,
|
||||
disturbed: false,
|
||||
reader: undefined,
|
||||
state: 'readable',
|
||||
storedError: undefined,
|
||||
transfer: {
|
||||
__proto__: null,
|
||||
writable: undefined,
|
||||
port1: undefined,
|
||||
port2: undefined,
|
||||
promise: undefined,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function readableStreamFromIterable(iterable) {
|
||||
let stream;
|
||||
const iteratorRecord = getIterator(iterable, 'async');
|
||||
@ -1319,16 +1328,12 @@ function readableStreamFromIterable(iterable) {
|
||||
});
|
||||
}
|
||||
|
||||
stream = new ReadableStream({
|
||||
start: startAlgorithm,
|
||||
pull: pullAlgorithm,
|
||||
cancel: cancelAlgorithm,
|
||||
}, {
|
||||
size() {
|
||||
return 1;
|
||||
},
|
||||
highWaterMark: 0,
|
||||
});
|
||||
stream = createReadableStream(
|
||||
startAlgorithm,
|
||||
pullAlgorithm,
|
||||
cancelAlgorithm,
|
||||
0,
|
||||
);
|
||||
|
||||
return stream;
|
||||
}
|
||||
@ -1654,9 +1659,9 @@ function readableStreamDefaultTee(stream, cloneForBranch2) {
|
||||
}
|
||||
|
||||
branch1 =
|
||||
createTeeReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm);
|
||||
createReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm);
|
||||
branch2 =
|
||||
createTeeReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm);
|
||||
createReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm);
|
||||
|
||||
PromisePrototypeThen(
|
||||
reader[kState].close.promise,
|
||||
@ -1933,16 +1938,10 @@ function readableByteStreamTee(stream) {
|
||||
return cancelDeferred.promise;
|
||||
}
|
||||
|
||||
branch1 = new ReadableStream({
|
||||
type: 'bytes',
|
||||
pull: pull1Algorithm,
|
||||
cancel: cancel1Algorithm,
|
||||
});
|
||||
branch2 = new ReadableStream({
|
||||
type: 'bytes',
|
||||
pull: pull2Algorithm,
|
||||
cancel: cancel2Algorithm,
|
||||
});
|
||||
branch1 =
|
||||
createReadableByteStream(nonOpStart, pull1Algorithm, cancel1Algorithm);
|
||||
branch2 =
|
||||
createReadableByteStream(nonOpStart, pull2Algorithm, cancel2Algorithm);
|
||||
|
||||
forwardReaderError(reader);
|
||||
|
||||
@ -1993,10 +1992,7 @@ function readableStreamCancel(stream, reason) {
|
||||
}
|
||||
|
||||
return PromisePrototypeThen(
|
||||
ensureIsPromise(
|
||||
stream[kState].controller[kCancel],
|
||||
stream[kState].controller,
|
||||
reason),
|
||||
stream[kState].controller[kCancel](reason),
|
||||
() => {});
|
||||
}
|
||||
|
||||
@ -2361,7 +2357,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) {
|
||||
assert(!controller[kState].pullAgain);
|
||||
controller[kState].pulling = true;
|
||||
PromisePrototypeThen(
|
||||
ensureIsPromise(controller[kState].pullAlgorithm, controller),
|
||||
controller[kState].pullAlgorithm(controller),
|
||||
() => {
|
||||
controller[kState].pulling = false;
|
||||
if (controller[kState].pullAgain) {
|
||||
@ -2391,12 +2387,9 @@ function readableStreamDefaultControllerError(controller, error) {
|
||||
|
||||
function readableStreamDefaultControllerCancelSteps(controller, reason) {
|
||||
resetQueue(controller);
|
||||
try {
|
||||
const result = controller[kState].cancelAlgorithm(reason);
|
||||
return result;
|
||||
} finally {
|
||||
readableStreamDefaultControllerClearAlgorithms(controller);
|
||||
}
|
||||
const result = controller[kState].cancelAlgorithm(reason);
|
||||
readableStreamDefaultControllerClearAlgorithms(controller);
|
||||
return result;
|
||||
}
|
||||
|
||||
function readableStreamDefaultControllerPullSteps(controller, readRequest) {
|
||||
@ -2470,11 +2463,10 @@ function setupReadableStreamDefaultControllerFromSource(
|
||||
FunctionPrototypeBind(start, source, controller) :
|
||||
nonOpStart;
|
||||
const pullAlgorithm = pull ?
|
||||
FunctionPrototypeBind(pull, source, controller) :
|
||||
createPromiseCallback('source.pull', pull, source) :
|
||||
nonOpPull;
|
||||
|
||||
const cancelAlgorithm = cancel ?
|
||||
FunctionPrototypeBind(cancel, source) :
|
||||
createPromiseCallback('source.cancel', cancel, source) :
|
||||
nonOpCancel;
|
||||
|
||||
setupReadableStreamDefaultController(
|
||||
@ -3102,7 +3094,7 @@ function readableByteStreamControllerCallPullIfNeeded(controller) {
|
||||
assert(!controller[kState].pullAgain);
|
||||
controller[kState].pulling = true;
|
||||
PromisePrototypeThen(
|
||||
ensureIsPromise(controller[kState].pullAlgorithm, controller),
|
||||
controller[kState].pullAlgorithm(controller),
|
||||
() => {
|
||||
controller[kState].pulling = false;
|
||||
if (controller[kState].pullAgain) {
|
||||
@ -3269,10 +3261,10 @@ function setupReadableByteStreamControllerFromSource(
|
||||
FunctionPrototypeBind(start, source, controller) :
|
||||
nonOpStart;
|
||||
const pullAlgorithm = pull ?
|
||||
FunctionPrototypeBind(pull, source, controller) :
|
||||
createPromiseCallback('source.pull', pull, source, controller) :
|
||||
nonOpPull;
|
||||
const cancelAlgorithm = cancel ?
|
||||
FunctionPrototypeBind(cancel, source) :
|
||||
createPromiseCallback('source.cancel', cancel, source) :
|
||||
nonOpCancel;
|
||||
|
||||
if (autoAllocateChunkSize === 0) {
|
||||
@ -3369,4 +3361,6 @@ module.exports = {
|
||||
readableByteStreamControllerPullSteps,
|
||||
setupReadableByteStreamController,
|
||||
setupReadableByteStreamControllerFromSource,
|
||||
createReadableStream,
|
||||
createReadableByteStream,
|
||||
};
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
'use strict';
|
||||
|
||||
const {
|
||||
FunctionPrototypeBind,
|
||||
FunctionPrototypeCall,
|
||||
ObjectDefineProperties,
|
||||
ObjectSetPrototypeOf,
|
||||
@ -38,8 +37,8 @@ const {
|
||||
} = require('internal/worker/js_transferable');
|
||||
|
||||
const {
|
||||
createPromiseCallback,
|
||||
customInspect,
|
||||
ensureIsPromise,
|
||||
extractHighWaterMark,
|
||||
extractSizeAlgorithm,
|
||||
isBrandCheck,
|
||||
@ -50,7 +49,7 @@ const {
|
||||
} = require('internal/webstreams/util');
|
||||
|
||||
const {
|
||||
ReadableStream,
|
||||
createReadableStream,
|
||||
readableStreamDefaultControllerCanCloseOrEnqueue,
|
||||
readableStreamDefaultControllerClose,
|
||||
readableStreamDefaultControllerEnqueue,
|
||||
@ -60,7 +59,7 @@ const {
|
||||
} = require('internal/webstreams/readablestream');
|
||||
|
||||
const {
|
||||
WritableStream,
|
||||
createWritableStream,
|
||||
writableStreamDefaultControllerErrorIfNeeded,
|
||||
} = require('internal/webstreams/writablestream');
|
||||
|
||||
@ -251,10 +250,12 @@ function InternalTransferredTransformStream() {
|
||||
markTransferMode(this, false, true);
|
||||
this[kType] = 'TransformStream';
|
||||
this[kState] = {
|
||||
__proto__: null,
|
||||
readable: undefined,
|
||||
writable: undefined,
|
||||
backpressure: undefined,
|
||||
backpressureChange: {
|
||||
__proto__: null,
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
@ -360,43 +361,33 @@ function initializeTransformStream(
|
||||
readableHighWaterMark,
|
||||
readableSizeAlgorithm) {
|
||||
|
||||
const writable = new WritableStream({
|
||||
__proto__: null,
|
||||
start() { return startPromise.promise; },
|
||||
write(chunk) {
|
||||
return transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
|
||||
},
|
||||
abort(reason) {
|
||||
return transformStreamDefaultSinkAbortAlgorithm(stream, reason);
|
||||
},
|
||||
close() {
|
||||
return transformStreamDefaultSinkCloseAlgorithm(stream);
|
||||
},
|
||||
}, {
|
||||
highWaterMark: writableHighWaterMark,
|
||||
size: writableSizeAlgorithm,
|
||||
});
|
||||
const startAlgorithm = () => startPromise.promise;
|
||||
|
||||
const readable = new ReadableStream({
|
||||
__proto__: null,
|
||||
start() { return startPromise.promise; },
|
||||
pull() {
|
||||
return transformStreamDefaultSourcePullAlgorithm(stream);
|
||||
},
|
||||
cancel(reason) {
|
||||
return transformStreamDefaultSourceCancelAlgorithm(stream, reason);
|
||||
},
|
||||
}, {
|
||||
highWaterMark: readableHighWaterMark,
|
||||
size: readableSizeAlgorithm,
|
||||
});
|
||||
const writable = createWritableStream(
|
||||
startAlgorithm,
|
||||
(chunk) => transformStreamDefaultSinkWriteAlgorithm(stream, chunk),
|
||||
() => transformStreamDefaultSinkCloseAlgorithm(stream),
|
||||
(reason) => transformStreamDefaultSinkAbortAlgorithm(stream, reason),
|
||||
writableHighWaterMark,
|
||||
writableSizeAlgorithm,
|
||||
);
|
||||
|
||||
const readable = createReadableStream(
|
||||
startAlgorithm,
|
||||
() => transformStreamDefaultSourcePullAlgorithm(stream),
|
||||
(reason) => transformStreamDefaultSourceCancelAlgorithm(stream, reason),
|
||||
readableHighWaterMark,
|
||||
readableSizeAlgorithm,
|
||||
);
|
||||
|
||||
stream[kState] = {
|
||||
__proto__: null,
|
||||
readable,
|
||||
writable,
|
||||
controller: undefined,
|
||||
backpressure: undefined,
|
||||
backpressureChange: {
|
||||
__proto__: null,
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
@ -451,6 +442,7 @@ function setupTransformStreamDefaultController(
|
||||
assert(isTransformStream(stream));
|
||||
assert(stream[kState].controller === undefined);
|
||||
controller[kState] = {
|
||||
__proto__: null,
|
||||
stream,
|
||||
transformAlgorithm,
|
||||
flushAlgorithm,
|
||||
@ -463,15 +455,18 @@ function setupTransformStreamDefaultControllerFromTransformer(
|
||||
stream,
|
||||
transformer) {
|
||||
const controller = new TransformStreamDefaultController(kSkipThrow);
|
||||
const transform = transformer?.transform || defaultTransformAlgorithm;
|
||||
const flush = transformer?.flush || nonOpFlush;
|
||||
const cancel = transformer?.cancel || nonOpCancel;
|
||||
const transformAlgorithm =
|
||||
FunctionPrototypeBind(transform, transformer);
|
||||
const flushAlgorithm =
|
||||
FunctionPrototypeBind(flush, transformer);
|
||||
const cancelAlgorithm =
|
||||
FunctionPrototypeBind(cancel, transformer);
|
||||
const transform = transformer?.transform;
|
||||
const flush = transformer?.flush;
|
||||
const cancel = transformer?.cancel;
|
||||
const transformAlgorithm = transform ?
|
||||
createPromiseCallback('transformer.transform', transform, transformer) :
|
||||
defaultTransformAlgorithm;
|
||||
const flushAlgorithm = flush ?
|
||||
createPromiseCallback('transformer.flush', flush, transformer) :
|
||||
nonOpFlush;
|
||||
const cancelAlgorithm = cancel ?
|
||||
createPromiseCallback('transformer.cancel', cancel, transformer) :
|
||||
nonOpCancel;
|
||||
|
||||
setupTransformStreamDefaultController(
|
||||
stream,
|
||||
@ -519,11 +514,7 @@ function transformStreamDefaultControllerError(controller, error) {
|
||||
|
||||
async function transformStreamDefaultControllerPerformTransform(controller, chunk) {
|
||||
try {
|
||||
return await ensureIsPromise(
|
||||
controller[kState].transformAlgorithm,
|
||||
controller,
|
||||
chunk,
|
||||
controller);
|
||||
return await controller[kState].transformAlgorithm(chunk, controller);
|
||||
} catch (error) {
|
||||
transformStreamError(controller[kState].stream, error);
|
||||
throw error;
|
||||
@ -584,10 +575,7 @@ async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
|
||||
|
||||
const { promise, resolve, reject } = createDeferredPromise();
|
||||
controller[kState].finishPromise = promise;
|
||||
const cancelPromise = ensureIsPromise(
|
||||
controller[kState].cancelAlgorithm,
|
||||
controller,
|
||||
reason);
|
||||
const cancelPromise = controller[kState].cancelAlgorithm(reason);
|
||||
transformStreamDefaultControllerClearAlgorithms(controller);
|
||||
|
||||
PromisePrototypeThen(
|
||||
@ -620,11 +608,7 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) {
|
||||
}
|
||||
const { promise, resolve, reject } = createDeferredPromise();
|
||||
controller[kState].finishPromise = promise;
|
||||
const flushPromise =
|
||||
ensureIsPromise(
|
||||
controller[kState].flushAlgorithm,
|
||||
controller,
|
||||
controller);
|
||||
const flushPromise = controller[kState].flushAlgorithm(controller);
|
||||
transformStreamDefaultControllerClearAlgorithms(controller);
|
||||
PromisePrototypeThen(
|
||||
flushPromise,
|
||||
@ -662,31 +646,29 @@ function transformStreamDefaultSourceCancelAlgorithm(stream, reason) {
|
||||
|
||||
const { promise, resolve, reject } = createDeferredPromise();
|
||||
controller[kState].finishPromise = promise;
|
||||
const cancelPromise = ensureIsPromise(
|
||||
controller[kState].cancelAlgorithm,
|
||||
controller,
|
||||
reason);
|
||||
const cancelPromise = controller[kState].cancelAlgorithm(reason);
|
||||
transformStreamDefaultControllerClearAlgorithms(controller);
|
||||
|
||||
PromisePrototypeThen(cancelPromise,
|
||||
() => {
|
||||
if (writable[kState].state === 'errored')
|
||||
reject(writable[kState].storedError);
|
||||
else {
|
||||
writableStreamDefaultControllerErrorIfNeeded(
|
||||
writable[kState].controller,
|
||||
reason);
|
||||
transformStreamUnblockWrite(stream);
|
||||
resolve();
|
||||
}
|
||||
},
|
||||
(error) => {
|
||||
writableStreamDefaultControllerErrorIfNeeded(
|
||||
writable[kState].controller,
|
||||
error);
|
||||
transformStreamUnblockWrite(stream);
|
||||
reject(error);
|
||||
},
|
||||
PromisePrototypeThen(
|
||||
cancelPromise,
|
||||
() => {
|
||||
if (writable[kState].state === 'errored')
|
||||
reject(writable[kState].storedError);
|
||||
else {
|
||||
writableStreamDefaultControllerErrorIfNeeded(
|
||||
writable[kState].controller,
|
||||
reason);
|
||||
transformStreamUnblockWrite(stream);
|
||||
resolve();
|
||||
}
|
||||
},
|
||||
(error) => {
|
||||
writableStreamDefaultControllerErrorIfNeeded(
|
||||
writable[kState].controller,
|
||||
error);
|
||||
transformStreamUnblockWrite(stream);
|
||||
reject(error);
|
||||
},
|
||||
);
|
||||
|
||||
return controller[kState].finishPromise;
|
||||
|
||||
@ -9,8 +9,6 @@ const {
|
||||
MathMax,
|
||||
NumberIsNaN,
|
||||
PromisePrototypeThen,
|
||||
PromiseResolve,
|
||||
PromiseReject,
|
||||
ReflectGet,
|
||||
Symbol,
|
||||
SymbolAsyncIterator,
|
||||
@ -31,10 +29,6 @@ const {
|
||||
detachArrayBuffer,
|
||||
} = internalBinding('buffer');
|
||||
|
||||
const {
|
||||
isPromise,
|
||||
} = require('internal/util/types');
|
||||
|
||||
const {
|
||||
inspect,
|
||||
} = require('util');
|
||||
@ -180,13 +174,15 @@ function enqueueValueWithSize(controller, value, size) {
|
||||
controller[kState].queueTotalSize += size;
|
||||
}
|
||||
|
||||
function ensureIsPromise(fn, thisArg, ...args) {
|
||||
try {
|
||||
const value = FunctionPrototypeCall(fn, thisArg, ...args);
|
||||
return isPromise(value) ? value : PromiseResolve(value);
|
||||
} catch (error) {
|
||||
return PromiseReject(error);
|
||||
}
|
||||
// This implements "invoke a callback function type" for callback functions that return a promise.
|
||||
// See https://webidl.spec.whatwg.org/#es-invoking-callback-functions
|
||||
async function invokePromiseCallback(fn, thisArg, ...args) {
|
||||
return FunctionPrototypeCall(fn, thisArg, ...args);
|
||||
}
|
||||
|
||||
function createPromiseCallback(name, fn, thisArg) {
|
||||
validateFunction(fn, name);
|
||||
return (...args) => invokePromiseCallback(fn, thisArg, ...args);
|
||||
}
|
||||
|
||||
function isPromisePending(promise) {
|
||||
@ -273,15 +269,16 @@ module.exports = {
|
||||
ArrayBufferViewGetByteLength,
|
||||
ArrayBufferViewGetByteOffset,
|
||||
AsyncIterator,
|
||||
createPromiseCallback,
|
||||
cloneAsUint8Array,
|
||||
copyArrayBuffer,
|
||||
customInspect,
|
||||
dequeueValue,
|
||||
ensureIsPromise,
|
||||
enqueueValueWithSize,
|
||||
extractHighWaterMark,
|
||||
extractSizeAlgorithm,
|
||||
lazyTransfer,
|
||||
invokePromiseCallback,
|
||||
isBrandCheck,
|
||||
isPromisePending,
|
||||
isViewedArrayBufferDetached,
|
||||
|
||||
@ -49,9 +49,9 @@ const {
|
||||
} = require('internal/worker/js_transferable');
|
||||
|
||||
const {
|
||||
createPromiseCallback,
|
||||
customInspect,
|
||||
dequeueValue,
|
||||
ensureIsPromise,
|
||||
enqueueValueWithSize,
|
||||
extractHighWaterMark,
|
||||
extractSizeAlgorithm,
|
||||
@ -160,45 +160,7 @@ class WritableStream {
|
||||
if (type !== undefined)
|
||||
throw new ERR_INVALID_ARG_VALUE.RangeError('type', type);
|
||||
|
||||
this[kState] = {
|
||||
close: createDeferredPromise(),
|
||||
closeRequest: {
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
inFlightWriteRequest: {
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
inFlightCloseRequest: {
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
pendingAbortRequest: {
|
||||
abort: {
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
reason: undefined,
|
||||
wasAlreadyErroring: false,
|
||||
},
|
||||
backpressure: false,
|
||||
controller: undefined,
|
||||
state: 'writable',
|
||||
storedError: undefined,
|
||||
writeRequests: [],
|
||||
writer: undefined,
|
||||
transfer: {
|
||||
readable: undefined,
|
||||
port1: undefined,
|
||||
port2: undefined,
|
||||
promise: undefined,
|
||||
},
|
||||
};
|
||||
this[kState] = createWritableStreamState();
|
||||
|
||||
this[kIsClosedPromise] = createDeferredPromise();
|
||||
this[kControllerErrorFunction] = () => {};
|
||||
@ -332,45 +294,7 @@ ObjectDefineProperties(WritableStream.prototype, {
|
||||
function InternalTransferredWritableStream() {
|
||||
markTransferMode(this, false, true);
|
||||
this[kType] = 'WritableStream';
|
||||
this[kState] = {
|
||||
close: createDeferredPromise(),
|
||||
closeRequest: {
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
inFlightWriteRequest: {
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
inFlightCloseRequest: {
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
pendingAbortRequest: {
|
||||
abort: {
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
reason: undefined,
|
||||
wasAlreadyErroring: false,
|
||||
},
|
||||
backpressure: false,
|
||||
controller: undefined,
|
||||
state: 'writable',
|
||||
storedError: undefined,
|
||||
writeRequests: [],
|
||||
writer: undefined,
|
||||
transfer: {
|
||||
readable: undefined,
|
||||
port1: undefined,
|
||||
port2: undefined,
|
||||
promise: undefined,
|
||||
},
|
||||
};
|
||||
this[kState] = createWritableStreamState();
|
||||
|
||||
this[kIsClosedPromise] = createDeferredPromise();
|
||||
}
|
||||
@ -583,6 +507,36 @@ ObjectDefineProperties(WritableStreamDefaultController.prototype, {
|
||||
[SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStreamDefaultController.name),
|
||||
});
|
||||
|
||||
function InternalWritableStream(start, write, close, abort, highWaterMark, size) {
|
||||
markTransferMode(this, false, true);
|
||||
this[kType] = 'WritableStream';
|
||||
this[kState] = createWritableStreamState();
|
||||
this[kIsClosedPromise] = createDeferredPromise();
|
||||
|
||||
const controller = new WritableStreamDefaultController(kSkipThrow);
|
||||
setupWritableStreamDefaultController(
|
||||
this,
|
||||
controller,
|
||||
start,
|
||||
write,
|
||||
close,
|
||||
abort,
|
||||
highWaterMark,
|
||||
size,
|
||||
);
|
||||
}
|
||||
|
||||
ObjectSetPrototypeOf(InternalWritableStream.prototype, WritableStream.prototype);
|
||||
ObjectSetPrototypeOf(InternalWritableStream, WritableStream);
|
||||
|
||||
function createWritableStream(start, write, close, abort, highWaterMark = 1, size = () => 1) {
|
||||
const stream = new InternalWritableStream(start, write, close, abort, highWaterMark, size);
|
||||
|
||||
// For spec compliance the InternalWritableStream must be a WritableStream
|
||||
stream.constructor = WritableStream;
|
||||
return stream;
|
||||
}
|
||||
|
||||
const isWritableStream =
|
||||
isBrandCheck('WritableStream');
|
||||
const isWritableStreamDefaultWriter =
|
||||
@ -590,6 +544,55 @@ const isWritableStreamDefaultWriter =
|
||||
const isWritableStreamDefaultController =
|
||||
isBrandCheck('WritableStreamDefaultController');
|
||||
|
||||
function createWritableStreamState() {
|
||||
return {
|
||||
__proto__: null,
|
||||
close: createDeferredPromise(),
|
||||
closeRequest: {
|
||||
__proto__: null,
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
inFlightWriteRequest: {
|
||||
__proto__: null,
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
inFlightCloseRequest: {
|
||||
__proto__: null,
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
pendingAbortRequest: {
|
||||
__proto__: null,
|
||||
abort: {
|
||||
__proto__: null,
|
||||
promise: undefined,
|
||||
resolve: undefined,
|
||||
reject: undefined,
|
||||
},
|
||||
reason: undefined,
|
||||
wasAlreadyErroring: false,
|
||||
},
|
||||
backpressure: false,
|
||||
controller: undefined,
|
||||
state: 'writable',
|
||||
storedError: undefined,
|
||||
writeRequests: [],
|
||||
writer: undefined,
|
||||
transfer: {
|
||||
__proto__: null,
|
||||
readable: undefined,
|
||||
port1: undefined,
|
||||
port2: undefined,
|
||||
promise: undefined,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function isWritableStreamLocked(stream) {
|
||||
return stream[kState].writer !== undefined;
|
||||
}
|
||||
@ -909,10 +912,7 @@ function writableStreamFinishErroring(stream) {
|
||||
return;
|
||||
}
|
||||
PromisePrototypeThen(
|
||||
ensureIsPromise(
|
||||
stream[kState].controller[kAbort],
|
||||
stream[kState].controller,
|
||||
abortRequest.reason),
|
||||
stream[kState].controller[kAbort](abortRequest.reason),
|
||||
() => {
|
||||
abortRequest.abort.resolve?.();
|
||||
writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
|
||||
@ -1116,7 +1116,7 @@ function writableStreamDefaultControllerProcessWrite(controller, chunk) {
|
||||
writableStreamMarkFirstWriteRequestInFlight(stream);
|
||||
|
||||
PromisePrototypeThen(
|
||||
ensureIsPromise(writeAlgorithm, controller, chunk, controller),
|
||||
writeAlgorithm(chunk, controller),
|
||||
() => {
|
||||
writableStreamFinishInFlightWrite(stream);
|
||||
const {
|
||||
@ -1149,7 +1149,7 @@ function writableStreamDefaultControllerProcessClose(controller) {
|
||||
writableStreamMarkCloseRequestInFlight(stream);
|
||||
dequeueValue(controller);
|
||||
assert(!queue.length);
|
||||
const sinkClosePromise = ensureIsPromise(closeAlgorithm, controller);
|
||||
const sinkClosePromise = closeAlgorithm();
|
||||
writableStreamDefaultControllerClearAlgorithms(controller);
|
||||
PromisePrototypeThen(
|
||||
sinkClosePromise,
|
||||
@ -1248,12 +1248,14 @@ function setupWritableStreamDefaultControllerFromSink(
|
||||
FunctionPrototypeBind(start, sink, controller) :
|
||||
nonOpStart;
|
||||
const writeAlgorithm = write ?
|
||||
FunctionPrototypeBind(write, sink) :
|
||||
createPromiseCallback('sink.write', write, sink) :
|
||||
nonOpWrite;
|
||||
const closeAlgorithm = close ?
|
||||
FunctionPrototypeBind(close, sink) : nonOpCancel;
|
||||
createPromiseCallback('sink.close', close, sink) :
|
||||
nonOpCancel;
|
||||
const abortAlgorithm = abort ?
|
||||
FunctionPrototypeBind(abort, sink) : nonOpCancel;
|
||||
createPromiseCallback('sink.abort', abort, sink) :
|
||||
nonOpCancel;
|
||||
setupWritableStreamDefaultController(
|
||||
stream,
|
||||
controller,
|
||||
@ -1363,4 +1365,5 @@ module.exports = {
|
||||
writableStreamDefaultControllerAdvanceQueueIfNeeded,
|
||||
setupWritableStreamDefaultControllerFromSink,
|
||||
setupWritableStreamDefaultController,
|
||||
createWritableStream,
|
||||
};
|
||||
|
||||
@ -59,12 +59,5 @@
|
||||
},
|
||||
"readable-streams/read-task-handling.window.js": {
|
||||
"skip": "Browser-specific test"
|
||||
},
|
||||
"transform-streams/cancel.any.js": {
|
||||
"fail": {
|
||||
"expected": [
|
||||
"readable.cancel() and a parallel writable.close() should reject if a transformer.cancel() calls controller.error()"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user