src: update Blob implementation to use DataQueue / File-backed Blobs

Co-authored-by: flakey5 <73616808+flakey5@users.noreply.github.com>
PR-URL: https://github.com/nodejs/node/pull/45258
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
James M Snell 2022-12-17 13:08:20 -08:00
parent c8cc7e89e6
commit 950cec4c26
11 changed files with 995 additions and 331 deletions

View File

@ -202,6 +202,15 @@ When operating on file handles, the mode cannot be changed from what it was set
to with [`fsPromises.open()`][]. Therefore, this is equivalent to
[`filehandle.writeFile()`][].
#### `filehandle.blob()`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
Returns a {Blob} whose data is backed by this file.
#### `filehandle.chmod(mode)`
<!-- YAML

View File

@ -6,9 +6,8 @@ const {
MathMin,
ObjectDefineProperties,
ObjectDefineProperty,
PromiseResolve,
PromiseReject,
SafePromisePrototypeFinally,
PromiseResolve,
ReflectConstruct,
RegExpPrototypeExec,
RegExpPrototypeSymbolReplace,
@ -22,7 +21,8 @@ const {
const {
createBlob: _createBlob,
FixedSizeBlobCopyJob,
createBlobFromFileHandle: _createBlobFromFileHandle,
concat,
getDataObject,
} = internalBinding('blob');
@ -52,13 +52,13 @@ const {
const { inspect } = require('internal/util/inspect');
const {
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_THIS,
ERR_BUFFER_TOO_LARGE,
}
},
errnoException,
} = require('internal/errors');
const {
@ -67,13 +67,8 @@ const {
} = require('internal/validators');
const kHandle = Symbol('kHandle');
const kState = Symbol('kState');
const kIndex = Symbol('kIndex');
const kType = Symbol('kType');
const kLength = Symbol('kLength');
const kArrayBufferPromise = Symbol('kArrayBufferPromise');
const kMaxChunkSize = 65536;
const disallowedTypeCharacters = /[^\u{0020}-\u{007E}]/u;
@ -266,40 +261,28 @@ class Blob {
if (!isBlob(this))
return PromiseReject(new ERR_INVALID_THIS('Blob'));
// If there's already a promise in flight for the content,
// reuse it, but only while it's in flight. After the cached
// promise resolves it will be cleared, allowing it to be
// garbage collected as soon as possible.
if (this[kArrayBufferPromise])
return this[kArrayBufferPromise];
if (this.size === 0) {
return PromiseResolve(new ArrayBuffer(0));
}
const job = new FixedSizeBlobCopyJob(this[kHandle]);
const ret = job.run();
// If the job returns a value immediately, the ArrayBuffer
// was generated synchronously and should just be returned
// directly.
if (ret !== undefined)
return PromiseResolve(ret);
const {
promise,
resolve,
reject,
} = createDeferredPromise();
job.ondone = (err, ab) => {
if (err !== undefined)
return reject(new AbortError(undefined, { cause: err }));
resolve(ab);
const { promise, resolve } = createDeferredPromise();
const reader = this[kHandle].getReader();
const buffers = [];
const readNext = () => {
reader.pull((status, buffer) => {
if (status === -1) {
// EOS, concat & resolve
// buffer should be undefined here
resolve(concat(buffers));
return;
}
if (buffer !== undefined)
buffers.push(buffer);
readNext();
});
};
this[kArrayBufferPromise] =
SafePromisePrototypeFinally(
promise,
() => this[kArrayBufferPromise] = undefined);
return this[kArrayBufferPromise];
readNext();
return promise;
}
/**
@ -321,24 +304,57 @@ class Blob {
if (!isBlob(this))
throw new ERR_INVALID_THIS('Blob');
const self = this;
return new lazyReadableStream({
async start() {
this[kState] = await self.arrayBuffer();
this[kIndex] = 0;
},
if (this.size === 0) {
return new lazyReadableStream({
start(c) { c.close(); }
});
}
pull(controller) {
if (this[kState].byteLength - this[kIndex] <= kMaxChunkSize) {
controller.enqueue(new Uint8Array(this[kState], this[kIndex]));
controller.close();
this[kState] = undefined;
} else {
controller.enqueue(new Uint8Array(this[kState], this[kIndex], kMaxChunkSize));
this[kIndex] += kMaxChunkSize;
const reader = this[kHandle].getReader();
return new lazyReadableStream({
start(c) {
// There really should only be one read at a time so using an
// array here is purely defensive.
this.pendingPulls = [];
},
pull(c) {
const { promise, resolve, reject } = createDeferredPromise();
this.pendingPulls.push({resolve, reject});
reader.pull((status, buffer) => {
// If pendingPulls is empty here, the stream had to have
// been canceled, and we don't really care about the result.
// we can simply exit.
if (this.pendingPulls.length === 0) {
return;
}
const pending = this.pendingPulls.shift();
if (status === -1 || (status === 0 && buffer === undefined)) {
// EOS
c.close();
pending.resolve();
return;
} else if (status < 0) {
const error = errnoException(status, 'read');
c.error(error);
pending.reject(error);
return;
}
c.enqueue(new Uint8Array(buffer));
pending.resolve();
});
return promise;
},
cancel(reason) {
// Reject any currently pending pulls here.
for (const pending of this.pendingPulls) {
pending.reject(reason);
}
this.pendingPulls = [];
}
});
// We set the highWaterMark to 0 because we do not want the stream to
// start reading immediately on creation. We want it to wait until read
// is called.
}, new CountQueuingStrategy({ highWaterMark: 0 }));
}
}
@ -406,10 +422,16 @@ function resolveObjectURL(url) {
}
}
function createBlobFromFileHandle(handle) {
const [blob, length] = _createBlobFromFileHandle(handle);
return createBlob(blob, length);
}
module.exports = {
Blob,
ClonedBlob,
createBlob,
createBlobFromFileHandle,
isBlob,
kHandle,
resolveObjectURL,

View File

@ -25,6 +25,8 @@ const {
S_IFREG
} = constants;
const { createBlobFromFileHandle } = require('internal/blob');
const binding = internalBinding('fs');
const { Buffer } = require('buffer');
@ -310,6 +312,14 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
return new WriteStream(undefined, { ...options, fd: this });
}
/**
* @typedef {import('../blob').Blob} Blob
* @returns {Blob}
*/
blob() {
return createBlobFromFileHandle(this[kHandle]);
}
[kTransfer]() {
if (this[kClosePromise] || this[kRefs] > 1) {
throw lazyDOMException('Cannot transfer FileHandle while in use',

View File

@ -38,7 +38,7 @@ namespace node {
V(ELDHISTOGRAM) \
V(FILEHANDLE) \
V(FILEHANDLECLOSEREQ) \
V(FIXEDSIZEBLOBCOPY) \
V(BLOBREADER) \
V(FSEVENTWRAP) \
V(FSREQCALLBACK) \
V(FSREQPROMISE) \

View File

@ -329,6 +329,7 @@
V(base_object_ctor_template, v8::FunctionTemplate) \
V(binding_data_ctor_template, v8::FunctionTemplate) \
V(blob_constructor_template, v8::FunctionTemplate) \
V(blob_reader_constructor_template, v8::FunctionTemplate) \
V(blocklist_constructor_template, v8::FunctionTemplate) \
V(contextify_global_template, v8::ObjectTemplate) \
V(contextify_wrapper_template, v8::ObjectTemplate) \
@ -339,6 +340,7 @@
V(dir_instance_template, v8::ObjectTemplate) \
V(fd_constructor_template, v8::ObjectTemplate) \
V(fdclose_constructor_template, v8::ObjectTemplate) \
V(fdentry_constructor_template, v8::FunctionTemplate) \
V(filehandlereadwrap_template, v8::ObjectTemplate) \
V(fsreqpromise_constructor_template, v8::ObjectTemplate) \
V(handle_wrap_ctor_template, v8::FunctionTemplate) \
@ -358,7 +360,10 @@
V(secure_context_constructor_template, v8::FunctionTemplate) \
V(shutdown_wrap_template, v8::ObjectTemplate) \
V(socketaddress_constructor_template, v8::FunctionTemplate) \
V(streambaseentry_ctor_template, v8::FunctionTemplate) \
V(streambaseoutputstream_constructor_template, v8::ObjectTemplate) \
V(streamentry_ctor_template, v8::FunctionTemplate) \
V(streamentry_opaque_ctor_template, v8::FunctionTemplate) \
V(qlogoutputstream_constructor_template, v8::ObjectTemplate) \
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tty_constructor_template, v8::FunctionTemplate) \
@ -366,6 +371,7 @@
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
V(x509_constructor_template, v8::FunctionTemplate)
#define PER_REALM_STRONG_PERSISTENT_VALUES(V) \
V(async_hooks_after_function, v8::Function) \
V(async_hooks_before_function, v8::Function) \

View File

@ -1,11 +1,13 @@
#include "node_blob.h"
#include "async_wrap-inl.h"
#include "base_object-inl.h"
#include "base_object.h"
#include "env-inl.h"
#include "memory_tracker-inl.h"
#include "node_bob-inl.h"
#include "node_errors.h"
#include "node_external_reference.h"
#include "threadpoolwork-inl.h"
#include "node_file.h"
#include "v8.h"
#include <algorithm>
@ -21,7 +23,9 @@ using v8::EscapableHandleScope;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Global;
using v8::HandleScope;
using v8::Int32;
using v8::Isolate;
using v8::Local;
using v8::MaybeLocal;
@ -32,6 +36,76 @@ using v8::Uint32;
using v8::Undefined;
using v8::Value;
namespace {
// Concatenate multiple ArrayBufferView/ArrayBuffers into a single ArrayBuffer.
// This method treats all ArrayBufferView types the same.
void Concat(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsArray());
Local<Array> array = args[0].As<Array>();
struct View {
std::shared_ptr<BackingStore> store;
size_t length;
size_t offset = 0;
};
std::vector<View> views;
size_t total = 0;
const auto doConcat = [&](View* view, size_t size) {
std::shared_ptr<BackingStore> store =
ArrayBuffer::NewBackingStore(env->isolate(), total);
uint8_t* ptr = static_cast<uint8_t*>(store->Data());
for (size_t n = 0; n < size; n++) {
uint8_t* from = static_cast<uint8_t*>(view[n].store->Data()) + view[n].offset;
std::copy(from, from + view[n].length, ptr);
ptr += view[n].length;
}
return ArrayBuffer::New(env->isolate(), store);
};
for (uint32_t n = 0; n < array->Length(); n++) {
Local<Value> val;
if (!array->Get(env->context(), n).ToLocal(&val))
return;
if (val->IsArrayBuffer()) {
auto ab = val.As<ArrayBuffer>();
views.push_back(View { ab->GetBackingStore(), ab->ByteLength(), 0 });
total += ab->ByteLength();
} else {
CHECK(val->IsArrayBufferView());
auto view = val.As<ArrayBufferView>();
views.push_back(View {
view->Buffer()->GetBackingStore(),
view->ByteLength(),
view->ByteOffset()
});
total += view->ByteLength();
}
}
args.GetReturnValue().Set(doConcat(views.data(), views.size()));
}
void BlobFromFileHandle(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
fs::FileHandle* fileHandle;
ASSIGN_OR_RETURN_UNWRAP(&fileHandle, args[0]);
std::vector<std::unique_ptr<DataQueue::Entry>> entries;
entries.push_back(DataQueue::CreateFdEntry(BaseObjectPtr<fs::FileHandle>(fileHandle)));
auto blob = Blob::Create(env, DataQueue::CreateIdempotent(std::move(entries)));
auto array = Array::New(env->isolate(), 2);
USE(array->Set(env->context(), 0, blob->object()));
USE(array->Set(env->context(), 1, Uint32::NewFromUnsigned(env->isolate(), blob->length())));
if (blob) args.GetReturnValue().Set(array);
}
} // namespace
void Blob::Initialize(
Local<Object> target,
Local<Value> unused,
@ -47,7 +121,8 @@ void Blob::Initialize(
SetMethod(context, target, "storeDataObject", StoreDataObject);
SetMethod(context, target, "getDataObject", GetDataObject);
SetMethod(context, target, "revokeDataObject", RevokeDataObject);
FixedSizeBlobCopyJob::Initialize(env, target);
SetMethod(context, target, "concat", Concat);
SetMethod(context, target, "createBlobFromFileHandle", BlobFromFileHandle);
}
Local<FunctionTemplate> Blob::GetConstructorTemplate(Environment* env) {
@ -60,7 +135,7 @@ Local<FunctionTemplate> Blob::GetConstructorTemplate(Environment* env) {
tmpl->Inherit(BaseObject::GetConstructorTemplate(env));
tmpl->SetClassName(
FIXED_ONE_BYTE_STRING(env->isolate(), "Blob"));
SetProtoMethod(isolate, tmpl, "toArrayBuffer", ToArrayBuffer);
SetProtoMethod(isolate, tmpl, "getReader", GetReader);
SetProtoMethod(isolate, tmpl, "slice", ToSlice);
env->set_blob_constructor_template(tmpl);
}
@ -71,9 +146,9 @@ bool Blob::HasInstance(Environment* env, v8::Local<v8::Value> object) {
return GetConstructorTemplate(env)->HasInstance(object);
}
BaseObjectPtr<Blob> Blob::Create(Environment* env,
const std::vector<BlobEntry>& store,
size_t length) {
BaseObjectPtr<Blob> Blob::Create(
Environment* env,
std::shared_ptr<DataQueue> data_queue) {
HandleScope scope(env->isolate());
Local<Function> ctor;
@ -84,56 +159,82 @@ BaseObjectPtr<Blob> Blob::Create(Environment* env,
if (!ctor->NewInstance(env->context()).ToLocal(&obj))
return BaseObjectPtr<Blob>();
return MakeBaseObject<Blob>(env, obj, store, length);
return MakeBaseObject<Blob>(env, obj, data_queue);
}
void Blob::New(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsArray()); // sources
CHECK(args[1]->IsUint32()); // length
std::vector<BlobEntry> entries;
Local<Array> array = args[0].As<Array>();
std::vector<std::unique_ptr<DataQueue::Entry>> entries(array->Length());
size_t length = args[1].As<Uint32>()->Value();
size_t len = 0;
Local<Array> ary = args[0].As<Array>();
for (size_t n = 0; n < ary->Length(); n++) {
for (size_t i = 0; i < array->Length(); i++) {
Local<Value> entry;
if (!ary->Get(env->context(), n).ToLocal(&entry))
if (!array->Get(env->context(), i).ToLocal(&entry)) {
return;
CHECK(entry->IsArrayBufferView() || Blob::HasInstance(env, entry));
if (entry->IsArrayBufferView()) {
}
const auto entryFromArrayBuffer = [env](
v8::Local<v8::ArrayBuffer> buf,
size_t byte_length,
size_t byte_offset = 0) {
if (buf->IsDetachable()) {
std::shared_ptr<BackingStore> store = buf->GetBackingStore();
USE(buf->Detach(Local<Value>()));
return DataQueue::CreateInMemoryEntryFromBackingStore(
store, byte_offset, byte_length);
}
// If the ArrayBuffer is not detachable, we will copy from it instead.
std::shared_ptr<BackingStore> store =
ArrayBuffer::NewBackingStore(env->isolate(), byte_length);
uint8_t* ptr = static_cast<uint8_t*>(buf->Data()) + byte_offset;
std::copy(ptr, ptr + byte_length, static_cast<uint8_t*>(store->Data()));
return DataQueue::CreateInMemoryEntryFromBackingStore(
store, 0, byte_length);
};
// Every entry should be either an ArrayBuffer, ArrayBufferView, or Blob.
// If the input to the Blob constructor in JavaScript was a string, then
// it will be decoded into an ArrayBufferView there before being passed
// in.
//
// Importantly, here we also assume that the ArrayBuffer/ArrayBufferView
// is not going to be modified here so we will detach them. It is up to
// the JavaScript side to do the right thing with regards to copying and
// ensuring appropriate spec compliance.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> buf = entry.As<ArrayBuffer>();
entries[i] = entryFromArrayBuffer(buf, buf->ByteLength());
} else if (entry->IsArrayBufferView()) {
Local<ArrayBufferView> view = entry.As<ArrayBufferView>();
CHECK_EQ(view->ByteOffset(), 0);
std::shared_ptr<BackingStore> store = view->Buffer()->GetBackingStore();
size_t byte_length = view->ByteLength();
view->Buffer()
->Detach(Local<Value>())
.Check(); // The Blob will own the backing store now.
entries.emplace_back(BlobEntry{std::move(store), byte_length, 0});
len += byte_length;
} else {
entries[i] = entryFromArrayBuffer(
view->Buffer(),
view->ByteLength(),
view->ByteOffset());
} else if (Blob::HasInstance(env, entry)) {
Blob* blob;
ASSIGN_OR_RETURN_UNWRAP(&blob, entry);
auto source = blob->entries();
entries.insert(entries.end(), source.begin(), source.end());
len += blob->length();
entries[i] = DataQueue::CreateDataQueueEntry(blob->data_queue_);
} else {
UNREACHABLE("Incorrect Blob initialization type");
}
}
CHECK_EQ(length, len);
BaseObjectPtr<Blob> blob = Create(env, entries, length);
auto blob = Create(env, DataQueue::CreateIdempotent(std::move(entries)));
if (blob)
args.GetReturnValue().Set(blob->object());
}
void Blob::ToArrayBuffer(const FunctionCallbackInfo<Value>& args) {
void Blob::GetReader(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Blob* blob;
ASSIGN_OR_RETURN_UNWRAP(&blob, args.Holder());
Local<Value> ret;
if (blob->GetArrayBuffer(env).ToLocal(&ret))
args.GetReturnValue().Set(ret);
BaseObjectPtr<Blob::Reader> reader =
Blob::Reader::Create(env, BaseObjectPtr<Blob>(blob));
if (reader) args.GetReturnValue().Set(reader->object());
}
void Blob::ToSlice(const FunctionCallbackInfo<Value>& args) {
@ -150,72 +251,134 @@ void Blob::ToSlice(const FunctionCallbackInfo<Value>& args) {
}
void Blob::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackFieldWithSize("store", length_);
}
MaybeLocal<Value> Blob::GetArrayBuffer(Environment* env) {
EscapableHandleScope scope(env->isolate());
size_t len = length();
std::shared_ptr<BackingStore> store =
ArrayBuffer::NewBackingStore(env->isolate(), len);
if (len > 0) {
unsigned char* dest = static_cast<unsigned char*>(store->Data());
size_t total = 0;
for (const auto& entry : entries()) {
unsigned char* src = static_cast<unsigned char*>(entry.store->Data());
src += entry.offset;
memcpy(dest, src, entry.length);
dest += entry.length;
total += entry.length;
CHECK_LE(total, len);
}
}
return scope.Escape(ArrayBuffer::New(env->isolate(), store));
tracker->TrackField("data_queue_", data_queue_);
}
BaseObjectPtr<Blob> Blob::Slice(Environment* env, size_t start, size_t end) {
CHECK_LE(start, length());
CHECK_LE(end, length());
CHECK_LE(start, end);
std::vector<BlobEntry> slices;
size_t total = end - start;
size_t remaining = total;
if (total == 0) return Create(env, slices, 0);
for (const auto& entry : entries()) {
if (start + entry.offset > entry.store->ByteLength()) {
start -= entry.length;
continue;
}
size_t offset = entry.offset + start;
size_t len = std::min(remaining, entry.store->ByteLength() - offset);
slices.emplace_back(BlobEntry{entry.store, len, offset});
remaining -= len;
start = 0;
if (remaining == 0)
break;
}
return Create(env, slices, total);
return Create(env, this->data_queue_->slice(start, v8::Just(end)));
}
Blob::Blob(
Environment* env,
v8::Local<v8::Object> obj,
const std::vector<BlobEntry>& store,
size_t length)
std::shared_ptr<DataQueue> data_queue)
: BaseObject(env, obj),
store_(store),
length_(length) {
data_queue_(data_queue) {
MakeWeak();
}
Blob::Reader::Reader(
Environment* env,
v8::Local<v8::Object> obj,
BaseObjectPtr<Blob> strong_ptr)
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_BLOBREADER),
inner_(strong_ptr->data_queue_->getReader()),
strong_ptr_(std::move(strong_ptr)) {
MakeWeak();
}
bool Blob::Reader::HasInstance(Environment *env, v8::Local<v8::Value> value) {
return GetConstructorTemplate(env)->HasInstance(value);
}
Local<FunctionTemplate> Blob::Reader::GetConstructorTemplate(Environment* env) {
Local<FunctionTemplate> tmpl = env->blob_reader_constructor_template();
if (tmpl.IsEmpty()) {
Isolate* isolate = env->isolate();
tmpl = NewFunctionTemplate(isolate, nullptr);
tmpl->InstanceTemplate()->SetInternalFieldCount(
BaseObject::kInternalFieldCount);
tmpl->Inherit(BaseObject::GetConstructorTemplate(env));
tmpl->SetClassName(
FIXED_ONE_BYTE_STRING(env->isolate(), "BlobReader"));
SetProtoMethod(env->isolate(), tmpl, "pull", Pull);
env->set_blob_reader_constructor_template(tmpl);
}
return tmpl;
}
BaseObjectPtr<Blob::Reader> Blob::Reader::Create(
Environment* env,
BaseObjectPtr<Blob> blob) {
Local<Object> obj;
if (!GetConstructorTemplate(env)->InstanceTemplate()
->NewInstance(env->context()).ToLocal(&obj)) {
return BaseObjectPtr<Blob::Reader>();
}
return MakeBaseObject<Blob::Reader>(env, obj, std::move(blob));
}
void Blob::Reader::Pull(const FunctionCallbackInfo<Value> &args) {
Environment* env = Environment::GetCurrent(args);
Blob::Reader* reader;
ASSIGN_OR_RETURN_UNWRAP(&reader, args.Holder());
CHECK(args[0]->IsFunction());
Local<Function> fn = args[0].As<Function>();
CHECK(!fn->IsConstructor());
if (reader->eos_) {
Local<Value> arg = Int32::New(env->isolate(), bob::STATUS_EOS);
reader->MakeCallback(fn, 1, &arg);
return args.GetReturnValue().Set(bob::STATUS_EOS);
}
struct Impl {
BaseObjectPtr<Blob::Reader> reader;
Global<Function> callback;
Environment* env;
};
Impl* impl = new Impl();
impl->reader = BaseObjectPtr<Blob::Reader>(reader);
impl->callback.Reset(env->isolate(), fn);
impl->env = env;
auto next = [impl](
int status,
const DataQueue::Vec* vecs,
size_t count,
bob::Done doneCb) mutable {
auto dropMe = std::unique_ptr<Impl>(impl);
Environment* env = impl->env;
HandleScope handleScope(env->isolate());
Local<Function> fn = impl->callback.Get(env->isolate());
if (status == bob::STATUS_EOS) impl->reader->eos_ = true;
if (count > 0) {
// Copy the returns vectors into a single ArrayBuffer.
size_t total = 0;
for (size_t n = 0; n < count; n++) total += vecs[n].len;
std::shared_ptr<BackingStore> store =
v8::ArrayBuffer::NewBackingStore(env->isolate(), total);
auto ptr = static_cast<uint8_t*>(store->Data());
for (size_t n = 0; n < count; n++) {
std::copy(vecs[n].base, vecs[n].base + vecs[n].len, ptr);
ptr += vecs[n].len;
}
// Since we copied the data buffers, signal that we're done with them.
std::move(doneCb)(0);
Local<Value> argv[2] = {
Uint32::New(env->isolate(), status),
ArrayBuffer::New(env->isolate(), store)
};
impl->reader->MakeCallback(fn, arraysize(argv), argv);
return;
}
Local<Value> argv[2] = {
Int32::New(env->isolate(), status),
Undefined(env->isolate()),
};
impl->reader->MakeCallback(fn, arraysize(argv), argv);
};
args.GetReturnValue().Set(reader->inner_->Pull(
std::move(next), node::bob::OPTIONS_END, nullptr, 0));
}
BaseObjectPtr<BaseObject>
Blob::BlobTransferData::Deserialize(
Environment* env,
@ -225,7 +388,7 @@ Blob::BlobTransferData::Deserialize(
THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
return {};
}
return Blob::Create(env, store_, length_);
return Blob::Create(env, data_queue);
}
BaseObject::TransferMode Blob::GetTransferMode() const {
@ -233,7 +396,7 @@ BaseObject::TransferMode Blob::GetTransferMode() const {
}
std::unique_ptr<worker::TransferData> Blob::CloneForMessaging() const {
return std::make_unique<BlobTransferData>(store_, length_);
return std::make_unique<BlobTransferData>(data_queue_);
}
void Blob::StoreDataObject(const v8::FunctionCallbackInfo<v8::Value>& args) {
@ -308,118 +471,9 @@ void Blob::GetDataObject(const v8::FunctionCallbackInfo<v8::Value>& args) {
}
}
FixedSizeBlobCopyJob::FixedSizeBlobCopyJob(Environment* env,
Local<Object> object,
Blob* blob,
FixedSizeBlobCopyJob::Mode mode)
: AsyncWrap(env, object, AsyncWrap::PROVIDER_FIXEDSIZEBLOBCOPY),
ThreadPoolWork(env, "blob"),
mode_(mode) {
if (mode == FixedSizeBlobCopyJob::Mode::SYNC) MakeWeak();
source_ = blob->entries();
length_ = blob->length();
}
void FixedSizeBlobCopyJob::AfterThreadPoolWork(int status) {
Environment* env = AsyncWrap::env();
CHECK_EQ(mode_, Mode::ASYNC);
CHECK(status == 0 || status == UV_ECANCELED);
std::unique_ptr<FixedSizeBlobCopyJob> ptr(this);
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Value> args[2];
if (status == UV_ECANCELED) {
args[0] = Number::New(env->isolate(), status),
args[1] = Undefined(env->isolate());
} else {
args[0] = Undefined(env->isolate());
args[1] = ArrayBuffer::New(env->isolate(), destination_);
}
ptr->MakeCallback(env->ondone_string(), arraysize(args), args);
}
void FixedSizeBlobCopyJob::DoThreadPoolWork() {
unsigned char* dest = static_cast<unsigned char*>(destination_->Data());
if (length_ > 0) {
size_t total = 0;
for (const auto& entry : source_) {
unsigned char* src = static_cast<unsigned char*>(entry.store->Data());
src += entry.offset;
memcpy(dest, src, entry.length);
dest += entry.length;
total += entry.length;
CHECK_LE(total, length_);
}
}
}
void FixedSizeBlobCopyJob::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackFieldWithSize("source", length_);
tracker->TrackFieldWithSize(
"destination",
destination_ ? destination_->ByteLength() : 0);
}
void FixedSizeBlobCopyJob::Initialize(Environment* env, Local<Object> target) {
Isolate* isolate = env->isolate();
v8::Local<v8::FunctionTemplate> job = NewFunctionTemplate(isolate, New);
job->Inherit(AsyncWrap::GetConstructorTemplate(env));
job->InstanceTemplate()->SetInternalFieldCount(
AsyncWrap::kInternalFieldCount);
SetProtoMethod(isolate, job, "run", Run);
SetConstructorFunction(env->context(), target, "FixedSizeBlobCopyJob", job);
}
void FixedSizeBlobCopyJob::New(const FunctionCallbackInfo<Value>& args) {
static constexpr size_t kMaxSyncLength = 4096;
static constexpr size_t kMaxEntryCount = 4;
Environment* env = Environment::GetCurrent(args);
CHECK(args.IsConstructCall());
CHECK(args[0]->IsObject());
CHECK(Blob::HasInstance(env, args[0]));
Blob* blob;
ASSIGN_OR_RETURN_UNWRAP(&blob, args[0]);
// This is a fairly arbitrary heuristic. We want to avoid deferring to
// the threadpool if the amount of data being copied is small and there
// aren't that many entries to copy.
FixedSizeBlobCopyJob::Mode mode =
(blob->length() < kMaxSyncLength &&
blob->entries().size() < kMaxEntryCount) ?
FixedSizeBlobCopyJob::Mode::SYNC :
FixedSizeBlobCopyJob::Mode::ASYNC;
new FixedSizeBlobCopyJob(env, args.This(), blob, mode);
}
void FixedSizeBlobCopyJob::Run(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
FixedSizeBlobCopyJob* job;
ASSIGN_OR_RETURN_UNWRAP(&job, args.Holder());
job->destination_ =
ArrayBuffer::NewBackingStore(env->isolate(), job->length_);
if (job->mode() == FixedSizeBlobCopyJob::Mode::ASYNC)
return job->ScheduleWork();
job->DoThreadPoolWork();
args.GetReturnValue().Set(
ArrayBuffer::New(env->isolate(), job->destination_));
}
void FixedSizeBlobCopyJob::RegisterExternalReferences(
ExternalReferenceRegistry* registry) {
registry->Register(New);
registry->Register(Run);
}
void BlobBindingData::StoredDataObject::MemoryInfo(
MemoryTracker* tracker) const {
tracker->TrackField("blob", blob);
tracker->TrackFieldWithSize("type", type.length());
}
BlobBindingData::StoredDataObject::StoredDataObject(
@ -436,7 +490,7 @@ BlobBindingData::BlobBindingData(Environment* env, Local<Object> wrap)
}
void BlobBindingData::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("data_objects", data_objects_);
tracker->TrackField("data_objects_", data_objects_);
}
void BlobBindingData::store_data_object(
@ -490,13 +544,14 @@ InternalFieldInfoBase* BlobBindingData::Serialize(int index) {
void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Blob::New);
registry->Register(Blob::ToArrayBuffer);
registry->Register(Blob::GetReader);
registry->Register(Blob::ToSlice);
registry->Register(Blob::StoreDataObject);
registry->Register(Blob::GetDataObject);
registry->Register(Blob::RevokeDataObject);
FixedSizeBlobCopyJob::RegisterExternalReferences(registry);
registry->Register(Blob::Reader::Pull);
registry->Register(Concat);
registry->Register(BlobFromFileHandle);
}
} // namespace node

View File

@ -1,10 +1,13 @@
#ifndef SRC_NODE_BLOB_H_
#define SRC_NODE_BLOB_H_
#include "v8-function-callback.h"
#include "v8-template.h"
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include "async_wrap.h"
#include "base_object.h"
#include "dataqueue/queue.h"
#include "env.h"
#include "memory_tracker.h"
#include "node_internals.h"
@ -18,12 +21,6 @@
namespace node {
struct BlobEntry {
std::shared_ptr<v8::BackingStore> store;
size_t length;
size_t offset;
};
class Blob : public BaseObject {
public:
static void RegisterExternalReferences(
@ -36,7 +33,7 @@ class Blob : public BaseObject {
void* priv);
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ToArrayBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetReader(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ToSlice(const v8::FunctionCallbackInfo<v8::Value>& args);
static void StoreDataObject(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetDataObject(const v8::FunctionCallbackInfo<v8::Value>& args);
@ -45,32 +42,22 @@ class Blob : public BaseObject {
static v8::Local<v8::FunctionTemplate> GetConstructorTemplate(
Environment* env);
static BaseObjectPtr<Blob> Create(Environment* env,
const std::vector<BlobEntry>& store,
size_t length);
static BaseObjectPtr<Blob> Create(Environment* env, std::shared_ptr<DataQueue> data_queue);
static bool HasInstance(Environment* env, v8::Local<v8::Value> object);
const std::vector<BlobEntry>& entries() const { return store_; }
void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(Blob)
SET_SELF_SIZE(Blob)
// Copies the contents of the Blob into an ArrayBuffer.
v8::MaybeLocal<v8::Value> GetArrayBuffer(Environment* env);
BaseObjectPtr<Blob> Slice(Environment* env, size_t start, size_t end);
inline size_t length() const { return length_; }
inline size_t length() const { return this->data_queue_->size().ToChecked(); }
class BlobTransferData : public worker::TransferData {
public:
explicit BlobTransferData(
const std::vector<BlobEntry>& store,
size_t length)
: store_(store),
length_(length) {}
explicit BlobTransferData(std::shared_ptr<DataQueue> data_queue)
: data_queue(data_queue) {}
BaseObjectPtr<BaseObject> Deserialize(
Environment* env,
@ -82,61 +69,46 @@ class Blob : public BaseObject {
SET_NO_MEMORY_INFO()
private:
std::vector<BlobEntry> store_;
size_t length_ = 0;
std::shared_ptr<DataQueue> data_queue;
};
class Reader final : public AsyncWrap {
public:
static bool HasInstance(Environment* env, v8::Local<v8::Value> value);
static v8::Local<v8::FunctionTemplate> GetConstructorTemplate(
Environment* env);
static BaseObjectPtr<Reader> Create(
Environment* env,
BaseObjectPtr<Blob> blob);
static void Pull(const v8::FunctionCallbackInfo<v8::Value>& args);
explicit Reader(
Environment* env,
v8::Local<v8::Object> obj,
BaseObjectPtr<Blob> strong_ptr);
SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(Blob::Reader)
SET_SELF_SIZE(Reader)
private:
std::unique_ptr<DataQueue::Reader> inner_;
BaseObjectPtr<Blob> strong_ptr_;
bool eos_ = false;
};
BaseObject::TransferMode GetTransferMode() const override;
std::unique_ptr<worker::TransferData> CloneForMessaging() const override;
Blob(
Environment* env,
v8::Local<v8::Object> obj,
const std::vector<BlobEntry>& store,
size_t length);
private:
std::vector<BlobEntry> store_;
size_t length_ = 0;
};
class FixedSizeBlobCopyJob : public AsyncWrap, public ThreadPoolWork {
public:
enum class Mode {
SYNC,
ASYNC
};
static void RegisterExternalReferences(
ExternalReferenceRegistry* registry);
static void Initialize(Environment* env, v8::Local<v8::Object> target);
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Run(const v8::FunctionCallbackInfo<v8::Value>& args);
bool IsNotIndicativeOfMemoryLeakAtExit() const override {
return true;
}
void DoThreadPoolWork() override;
void AfterThreadPoolWork(int status) override;
Mode mode() const { return mode_; }
void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(FixedSizeBlobCopyJob)
SET_SELF_SIZE(FixedSizeBlobCopyJob)
private:
FixedSizeBlobCopyJob(
Environment* env,
v8::Local<v8::Object> object,
Blob* blob,
Mode mode = Mode::ASYNC);
v8::Local<v8::Object> obj,
std::shared_ptr<DataQueue> data_queue);
Mode mode_;
std::vector<BlobEntry> source_;
std::shared_ptr<v8::BackingStore> destination_;
size_t length_ = 0;
DataQueue& getDataQueue() const { return *data_queue_; }
private:
std::shared_ptr<DataQueue> data_queue_;
};
class BlobBindingData : public SnapshotableObject {

View File

@ -0,0 +1,544 @@
#include "dataqueue/queue.h"
#include "node_bob-inl.h"
#include "node_bob.h"
#include "util-inl.h"
#include "gtest/gtest.h"
#include <v8.h>
#include <memory>
#include <vector>
using node::DataQueue;
using v8::ArrayBuffer;
using v8::BackingStore;
using v8::Just;
TEST(DataQueue, InMemoryEntry) {
char buffer[] = "hello world";
size_t len = strlen(buffer);
std::shared_ptr<BackingStore> store =
ArrayBuffer::NewBackingStore(
&buffer, len, [](void*, size_t, void*) {}, nullptr);
// We can create an InMemoryEntry from a v8::BackingStore.
std::unique_ptr<DataQueue::Entry> entry =
DataQueue::CreateInMemoryEntryFromBackingStore(store, 0, len);
// The entry is idempotent.
CHECK(entry->isIdempotent());
// The size is known.
CHECK_EQ(entry->size().ToChecked(), len);
// We can slice it.
// slice: "llo world"
std::unique_ptr<DataQueue::Entry> slice1 = entry->slice(2);
// The slice is idempotent.
CHECK(slice1->isIdempotent());
// The slice size is known.
CHECK_EQ(slice1->size().ToChecked(), len - 2);
// We can slice the slice with a length.
// slice: "o w"
std::unique_ptr<DataQueue::Entry> slice2 = slice1->slice(2, Just(5UL));
// That slice is idempotent.
CHECK(slice2->isIdempotent());
// That slice size is known.
CHECK_EQ(slice2->size().ToChecked(), 3);
// The slice end can extend beyond the actual size and will be adjusted.
// slice: "orld"
std::unique_ptr<DataQueue::Entry> slice3 = slice1->slice(5, Just(100UL));
CHECK_NOT_NULL(slice3);
// The slice size is known.
CHECK_EQ(slice3->size().ToChecked(), 4);
// If the slice start is greater than the length, we get a zero length slice.
std::unique_ptr<DataQueue::Entry> slice4 = entry->slice(100);
CHECK_NOT_NULL(slice4);
CHECK_EQ(slice4->size().ToChecked(), 0);
// If the slice end is less than the start, we get a zero length slice.
std::unique_ptr<DataQueue::Entry> slice5 = entry->slice(2, Just(1UL));
CHECK_NOT_NULL(slice5);
CHECK_EQ(slice5->size().ToChecked(), 0);
// If the slice end equal to the start, we get a zero length slice.
std::unique_ptr<DataQueue::Entry> slice6 = entry->slice(2, Just(2UL));
CHECK_NOT_NULL(slice6);
CHECK_EQ(slice6->size().ToChecked(), 0);
// The shared_ptr for the BackingStore should show only 5 uses because
// the zero-length slices do not maintain a reference to it.
CHECK_EQ(store.use_count(), 5);
}
TEST(DataQueue, IdempotentDataQueue) {
char buffer1[] = "hello world";
char buffer2[] = "what fun this is";
char buffer3[] = "not added";
size_t len1 = strlen(buffer1);
size_t len2 = strlen(buffer2);
size_t len3 = strlen(buffer3);
std::shared_ptr<BackingStore> store1 =
ArrayBuffer::NewBackingStore(
&buffer1, len1, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store2 =
ArrayBuffer::NewBackingStore(
&buffer2, len2, [](void*, size_t, void*) {}, nullptr);
std::vector<std::unique_ptr<DataQueue::Entry>> list;
list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(store1, 0, len1));
list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(store2, 0, len2));
// We can create an idempotent DataQueue from a list of entries.
std::shared_ptr<DataQueue> data_queue = DataQueue::CreateIdempotent(std::move(list));
CHECK_NOT_NULL(data_queue);
// The data_queue is idempotent.
CHECK(data_queue->isIdempotent());
// The data_queue is capped.
CHECK(data_queue->isCapped());
// maybeCapRemaining() returns zero.
CHECK_EQ(data_queue->maybeCapRemaining().ToChecked(), 0);
// Calling cap() is a nonop but doesn't crash or error.
data_queue->cap();
data_queue->cap(100);
// maybeCapRemaining() still returns zero.
CHECK_EQ(data_queue->maybeCapRemaining().ToChecked(), 0);
// The size is known to be the sum of the in memory-entries.
CHECK_EQ(data_queue->size().ToChecked(), len1 + len2);
std::shared_ptr<BackingStore> store3 =
ArrayBuffer::NewBackingStore(
&buffer3, len3, [](void*, size_t, void*) {}, nullptr);
// Trying to append a new entry does not crash, but returns v8::Nothing.
CHECK(data_queue->append(
DataQueue::CreateInMemoryEntryFromBackingStore(store3, 0, len3))
.IsNothing());
// The size has not changed after the append.
CHECK_EQ(data_queue->size().ToChecked(), len1 + len2);
// We can acquire multiple readers from the data_queue.
std::unique_ptr<DataQueue::Reader> reader1 = data_queue->getReader();
std::unique_ptr<DataQueue::Reader> reader2 = data_queue->getReader();
CHECK_NOT_NULL(reader1);
CHECK_NOT_NULL(reader2);
const auto testRead = [&](auto& reader) {
// We can read the expected data from reader. Because the entries are
// InMemoryEntry instances, reads will be fully synchronous here.
bool waitingForPull = true;
// The first read produces buffer1
int status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1);
CHECK_EQ(memcmp(vecs[0].base, buffer1, len1), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// We can read the expected data from reader1. Because the entries are
// InMemoryEntry instances, reads will be fully synchronous here.
waitingForPull = true;
// The second read produces buffer2, and should be the end.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_END);
// The third read produces EOS
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
CHECK_NULL(vecs);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_EOS);
};
// Both reader1 and reader2 should pass identical tests.
testRead(reader1);
testRead(reader2);
// We can slice the data queue.
std::shared_ptr<DataQueue> slice1 = data_queue->slice(2);
CHECK_NOT_NULL(slice1);
// The slice is idempotent.
CHECK(slice1->isIdempotent());
// And capped.
CHECK(slice1->isCapped());
// The size is two-bytes less than the original.
CHECK_EQ(slice1->size().ToChecked(), data_queue->size().ToChecked() - 2);
const auto testSlice = [&](auto& reader) {
// We can read the expected data from reader. Because the entries are
// InMemoryEntry instances, reads will be fully synchronous here.
bool waitingForPull = true;
// The first read produces a slice of buffer1
int status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1 - 2);
CHECK_EQ(memcmp(vecs[0].base, buffer1 + 2, len1 - 2), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// We can read the expected data from reader1. Because the entries are
// InMemoryEntry instances, reads will be fully synchronous here.
waitingForPull = true;
// The second read produces buffer2, and should be the end.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_END);
// The third read produces EOS
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
CHECK_NULL(vecs);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_EOS);
};
// We can read the expected slice data.
std::unique_ptr<DataQueue::Reader> reader3 = slice1->getReader();
testSlice(reader3);
// We can slice correctly across boundaries.
std::shared_ptr<DataQueue> slice2 = data_queue->slice(5, Just(20UL));
// The size is known.
CHECK_EQ(slice2->size().ToChecked(), 15);
const auto testSlice2 = [&](auto& reader) {
// We can read the expected data from reader. Because the entries are
// InMemoryEntry instances, reads will be fully synchronous here.
bool waitingForPull = true;
// The first read produces a slice of buffer1
int status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1 - 5);
CHECK_EQ(memcmp(vecs[0].base, buffer1 + 5, len1 - 5), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// We can read the expected data from reader1. Because the entries are
// InMemoryEntry instances, reads will be fully synchronous here.
waitingForPull = true;
// The second read produces buffer2, and should be the end.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2 - 7);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2 - 7), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_END);
// The third read produces EOS
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
CHECK_NULL(vecs);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_EOS);
};
// We can read the expected slice data.
std::unique_ptr<DataQueue::Reader> reader4 = slice2->getReader();
testSlice2(reader4);
}
TEST(DataQueue, NonIdempotentDataQueue) {
char buffer1[] = "hello world";
char buffer2[] = "what fun this is";
char buffer3[] = "not added";
size_t len1 = strlen(buffer1);
size_t len2 = strlen(buffer2);
size_t len3 = strlen(buffer3);
std::shared_ptr<BackingStore> store1 =
ArrayBuffer::NewBackingStore(
&buffer1, len1, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store2 =
ArrayBuffer::NewBackingStore(
&buffer2, len2, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store3 =
ArrayBuffer::NewBackingStore(
&buffer3, len3, [](void*, size_t, void*) {}, nullptr);
// We can create an non-idempotent DataQueue from a list of entries.
std::shared_ptr<DataQueue> data_queue = DataQueue::Create();
CHECK(!data_queue->isIdempotent());
CHECK_EQ(data_queue->size().ToChecked(), 0);
data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(store1, 0, len1));
CHECK_EQ(data_queue->size().ToChecked(), len1);
data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(store2, 0, len2));
CHECK_EQ(data_queue->size().ToChecked(), len1 + len2);
CHECK(!data_queue->isCapped());
CHECK(data_queue->maybeCapRemaining().IsNothing());
data_queue->cap(100);
CHECK(data_queue->isCapped());
CHECK_EQ(data_queue->maybeCapRemaining().ToChecked(), 100 - (len1 + len2));
data_queue->cap(101);
CHECK(data_queue->isCapped());
CHECK_EQ(data_queue->maybeCapRemaining().ToChecked(), 100 - (len1 + len2));
data_queue->cap();
CHECK(data_queue->isCapped());
CHECK_EQ(data_queue->maybeCapRemaining().ToChecked(), 0);
// We can't add any more because the data queue is capped.
CHECK_EQ(data_queue->append(
DataQueue::CreateInMemoryEntryFromBackingStore(store3, 0, len3)).FromJust(), false);
// We cannot slice a non-idempotent data queue
std::shared_ptr<DataQueue> slice1 = data_queue->slice(2);
CHECK_NULL(slice1);
// We can acquire only a single reader for a non-idempotent data queue
std::unique_ptr<DataQueue::Reader> reader1 = data_queue->getReader();
std::unique_ptr<DataQueue::Reader> reader2 = data_queue->getReader();
CHECK_NOT_NULL(reader1);
CHECK_NULL(reader2);
const auto testRead = [&](auto& reader) {
// We can read the expected data from reader. Because the entries are
// InMemoryEntry instances, reads will be fully synchronous here.
bool waitingForPull = true;
// The first read produces buffer1
int status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1);
CHECK_EQ(memcmp(vecs[0].base, buffer1, len1), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// We can read the expected data from reader1. Because the entries are
// InMemoryEntry instances, reads will be fully synchronous here.
waitingForPull = true;
// The second read produces buffer2, and should be the end.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_END);
// The third read produces EOS
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
CHECK_NULL(vecs);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_EOS);
};
// Reading produces the expected results.
testRead(reader1);
// We still cannot acquire another reader.
std::unique_ptr<DataQueue::Reader> reader3 = data_queue->getReader();
CHECK_NULL(reader3);
CHECK_NOT_NULL(data_queue);
}
TEST(DataQueue, DataQueueEntry) {
char buffer1[] = "hello world";
char buffer2[] = "what fun this is";
size_t len1 = strlen(buffer1);
size_t len2 = strlen(buffer2);
std::shared_ptr<BackingStore> store1 =
ArrayBuffer::NewBackingStore(
&buffer1, len1, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store2 =
ArrayBuffer::NewBackingStore(
&buffer2, len2, [](void*, size_t, void*) {}, nullptr);
std::vector<std::unique_ptr<DataQueue::Entry>> list;
list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(store1, 0, len1));
list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(store2, 0, len2));
// We can create an idempotent DataQueue from a list of entries.
std::shared_ptr<DataQueue> data_queue = DataQueue::CreateIdempotent(std::move(list));
CHECK_NOT_NULL(data_queue);
// We can create an Entry from a data queue.
std::unique_ptr<DataQueue::Entry> entry =
DataQueue::CreateDataQueueEntry(data_queue);
// The entry should be idempotent since the data queue is idempotent.
CHECK(entry->isIdempotent());
// The entry size should match the data queue size.
CHECK_EQ(entry->size().ToChecked(), data_queue->size().ToChecked());
// We can slice it since it is idempotent.
std::unique_ptr<DataQueue::Entry> slice = entry->slice(5, Just(20UL));
// The slice has the expected length.
CHECK_EQ(slice->size().ToChecked(), 15);
// We can add it to another data queue, even if the new one is not
// idempotent.
std::shared_ptr<DataQueue> data_queue2 = DataQueue::Create();
CHECK(data_queue2->append(std::move(slice)).IsJust());
// Our original data queue should have a use count of 2.
CHECK_EQ(data_queue.use_count(), 2);
std::unique_ptr<DataQueue::Reader> reader = data_queue2->getReader();
bool pullIsPending = true;
int status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
pullIsPending = false;
CHECK_EQ(count, 1);
CHECK_EQ(memcmp(vecs[0].base, buffer1 + 5, len1 - 5), 0);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
}, node::bob::OPTIONS_SYNC, nullptr, 0);
// All of the actual entries are in-memory entries so reads should be sync.
CHECK(!pullIsPending);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// Read to completion...
while (status != node::bob::STATUS_EOS) {
status = reader->Pull([&](auto, auto, auto, auto) {},
node::bob::OPTIONS_SYNC, nullptr, 0);
}
// Because the original data queue is idempotent, we can still read from it,
// even though we have already consumed the non-idempotent data queue that
// contained it.
std::unique_ptr<DataQueue::Reader> reader2 = data_queue->getReader();
CHECK_NOT_NULL(reader2);
pullIsPending = true;
status = reader2->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
pullIsPending = false;
CHECK_EQ(count, 1);
CHECK_EQ(memcmp(vecs[0].base, buffer1, len1), 0);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
}, node::bob::OPTIONS_SYNC, nullptr, 0);
// All of the actual entries are in-memory entries so reads should be sync.
CHECK(!pullIsPending);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
}

View File

@ -0,0 +1,47 @@
'use strict';
const common = require('../common');
const { strictEqual } = require('assert');
const { open } = require('fs/promises');
const { TextEncoder } = require('util');
const fs = require('fs');
const path = require('path');
const tmpdir = require('../common/tmpdir');
const testfile = path.join(tmpdir.path, 'test.txt');
tmpdir.refresh();
const data = `${'a'.repeat(1000)}${'b'.repeat(2000)}`;
fs.writeFileSync(testfile, data);
(async () => {
const fh = await open(testfile);
const blob = fh.blob();
const ab = await blob.arrayBuffer();
const dec = new TextDecoder();
strictEqual(dec.decode(new Uint8Array(ab)), data);
strictEqual(await blob.text(), data);
let stream = blob.stream();
let check = '';
for await (const chunk of stream)
check = dec.decode(chunk);
strictEqual(check, data);
// If the file is modified tho, the stream errors.
fs.writeFileSync(testfile, data + 'abc');
stream = blob.stream();
try {
for await (const chunk of stream) {}
} catch (err) {
strictEqual(err.message, 'read EINVAL');
strictEqual(err.code, 'EINVAL');
}
})().then(common.mustCall());

View File

@ -224,7 +224,6 @@ assert.throws(() => new Blob({}), {
// The Blob has to be over a specific size for the data to
// be copied asynchronously..
const b = new Blob(['hello', 'there'.repeat(820)]);
assert.strictEqual(b.arrayBuffer(), b.arrayBuffer());
b.arrayBuffer().then(common.mustCall());
}

View File

@ -63,7 +63,7 @@ const { getSystemErrorName } = require('util');
delete providers.ELDHISTOGRAM;
delete providers.SIGINTWATCHDOG;
delete providers.WORKERHEAPSNAPSHOT;
delete providers.FIXEDSIZEBLOBCOPY;
delete providers.BLOBREADER;
delete providers.RANDOMPRIMEREQUEST;
delete providers.CHECKPRIMEREQUEST;