mirror of
https://github.com/nodejs/node.git
synced 2025-12-28 07:50:41 +00:00
src: split out callback queue implementation from Environment
This isn’t conceptually tied to anything Node.js-specific at all. PR-URL: https://github.com/nodejs/node/pull/33272 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
This commit is contained in:
parent
2c3c9f5a3f
commit
fea01c1179
2
node.gyp
2
node.gyp
@ -642,6 +642,8 @@
|
||||
'src/base_object.h',
|
||||
'src/base_object-inl.h',
|
||||
'src/base64.h',
|
||||
'src/callback_queue.h',
|
||||
'src/callback_queue-inl.h',
|
||||
'src/connect_wrap.h',
|
||||
'src/connection_wrap.h',
|
||||
'src/debug_utils.h',
|
||||
|
||||
97
src/callback_queue-inl.h
Normal file
97
src/callback_queue-inl.h
Normal file
@ -0,0 +1,97 @@
|
||||
#ifndef SRC_CALLBACK_QUEUE_INL_H_
|
||||
#define SRC_CALLBACK_QUEUE_INL_H_
|
||||
|
||||
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
|
||||
|
||||
#include "callback_queue.h"
|
||||
|
||||
namespace node {
|
||||
|
||||
template <typename R, typename... Args>
|
||||
template <typename Fn>
|
||||
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
|
||||
CallbackQueue<R, Args...>::CreateCallback(Fn&& fn, bool refed) {
|
||||
return std::make_unique<CallbackImpl<Fn>>(std::move(fn), refed);
|
||||
}
|
||||
|
||||
template <typename R, typename... Args>
|
||||
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
|
||||
CallbackQueue<R, Args...>::Shift() {
|
||||
std::unique_ptr<Callback> ret = std::move(head_);
|
||||
if (ret) {
|
||||
head_ = ret->get_next();
|
||||
if (!head_)
|
||||
tail_ = nullptr; // The queue is now empty.
|
||||
}
|
||||
size_--;
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename R, typename... Args>
|
||||
void CallbackQueue<R, Args...>::Push(std::unique_ptr<Callback> cb) {
|
||||
Callback* prev_tail = tail_;
|
||||
|
||||
size_++;
|
||||
tail_ = cb.get();
|
||||
if (prev_tail != nullptr)
|
||||
prev_tail->set_next(std::move(cb));
|
||||
else
|
||||
head_ = std::move(cb);
|
||||
}
|
||||
|
||||
template <typename R, typename... Args>
|
||||
void CallbackQueue<R, Args...>::ConcatMove(CallbackQueue<R, Args...>&& other) {
|
||||
size_ += other.size_;
|
||||
if (tail_ != nullptr)
|
||||
tail_->set_next(std::move(other.head_));
|
||||
else
|
||||
head_ = std::move(other.head_);
|
||||
tail_ = other.tail_;
|
||||
other.tail_ = nullptr;
|
||||
other.size_ = 0;
|
||||
}
|
||||
|
||||
template <typename R, typename... Args>
|
||||
size_t CallbackQueue<R, Args...>::size() const {
|
||||
return size_.load();
|
||||
}
|
||||
|
||||
template <typename R, typename... Args>
|
||||
CallbackQueue<R, Args...>::Callback::Callback(bool refed)
|
||||
: refed_(refed) {}
|
||||
|
||||
template <typename R, typename... Args>
|
||||
bool CallbackQueue<R, Args...>::Callback::is_refed() const {
|
||||
return refed_;
|
||||
}
|
||||
|
||||
template <typename R, typename... Args>
|
||||
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
|
||||
CallbackQueue<R, Args...>::Callback::get_next() {
|
||||
return std::move(next_);
|
||||
}
|
||||
|
||||
template <typename R, typename... Args>
|
||||
void CallbackQueue<R, Args...>::Callback::set_next(
|
||||
std::unique_ptr<Callback> next) {
|
||||
next_ = std::move(next);
|
||||
}
|
||||
|
||||
template <typename R, typename... Args>
|
||||
template <typename Fn>
|
||||
CallbackQueue<R, Args...>::CallbackImpl<Fn>::CallbackImpl(
|
||||
Fn&& callback, bool refed)
|
||||
: Callback(refed),
|
||||
callback_(std::move(callback)) {}
|
||||
|
||||
template <typename R, typename... Args>
|
||||
template <typename Fn>
|
||||
R CallbackQueue<R, Args...>::CallbackImpl<Fn>::Call(Args... args) {
|
||||
return callback_(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
} // namespace node
|
||||
|
||||
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
|
||||
|
||||
#endif // SRC_CALLBACK_QUEUE_INL_H_
|
||||
70
src/callback_queue.h
Normal file
70
src/callback_queue.h
Normal file
@ -0,0 +1,70 @@
|
||||
#ifndef SRC_CALLBACK_QUEUE_H_
|
||||
#define SRC_CALLBACK_QUEUE_H_
|
||||
|
||||
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
|
||||
|
||||
#include <atomic>
|
||||
|
||||
namespace node {
|
||||
|
||||
// A queue of C++ functions that take Args... as arguments and return R
|
||||
// (this is similar to the signature of std::function).
|
||||
// New entries are added using `CreateCallback()`/`Push()`, and removed using
|
||||
// `Shift()`.
|
||||
// The `refed` flag is left for easier use in situations in which some of these
|
||||
// should be run even if nothing else is keeping the event loop alive.
|
||||
template <typename R, typename... Args>
|
||||
class CallbackQueue {
|
||||
public:
|
||||
class Callback {
|
||||
public:
|
||||
explicit inline Callback(bool refed);
|
||||
|
||||
virtual ~Callback() = default;
|
||||
virtual R Call(Args... args) = 0;
|
||||
|
||||
inline bool is_refed() const;
|
||||
|
||||
private:
|
||||
inline std::unique_ptr<Callback> get_next();
|
||||
inline void set_next(std::unique_ptr<Callback> next);
|
||||
|
||||
bool refed_;
|
||||
std::unique_ptr<Callback> next_;
|
||||
|
||||
friend class CallbackQueue;
|
||||
};
|
||||
|
||||
template <typename Fn>
|
||||
inline std::unique_ptr<Callback> CreateCallback(Fn&& fn, bool refed);
|
||||
|
||||
inline std::unique_ptr<Callback> Shift();
|
||||
inline void Push(std::unique_ptr<Callback> cb);
|
||||
// ConcatMove adds elements from 'other' to the end of this list, and clears
|
||||
// 'other' afterwards.
|
||||
inline void ConcatMove(CallbackQueue&& other);
|
||||
|
||||
// size() is atomic and may be called from any thread.
|
||||
inline size_t size() const;
|
||||
|
||||
private:
|
||||
template <typename Fn>
|
||||
class CallbackImpl final : public Callback {
|
||||
public:
|
||||
CallbackImpl(Fn&& callback, bool refed);
|
||||
R Call(Args... args) override;
|
||||
|
||||
private:
|
||||
Fn callback_;
|
||||
};
|
||||
|
||||
std::atomic<size_t> size_ {0};
|
||||
std::unique_ptr<Callback> head_;
|
||||
Callback* tail_ = nullptr;
|
||||
};
|
||||
|
||||
} // namespace node
|
||||
|
||||
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
|
||||
|
||||
#endif // SRC_CALLBACK_QUEUE_H_
|
||||
@ -25,6 +25,7 @@
|
||||
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
|
||||
|
||||
#include "aliased_buffer.h"
|
||||
#include "callback_queue-inl.h"
|
||||
#include "env.h"
|
||||
#include "node.h"
|
||||
#include "util-inl.h"
|
||||
@ -705,50 +706,9 @@ inline void IsolateData::set_options(
|
||||
options_ = std::move(options);
|
||||
}
|
||||
|
||||
std::unique_ptr<Environment::NativeImmediateCallback>
|
||||
Environment::NativeImmediateQueue::Shift() {
|
||||
std::unique_ptr<Environment::NativeImmediateCallback> ret = std::move(head_);
|
||||
if (ret) {
|
||||
head_ = ret->get_next();
|
||||
if (!head_)
|
||||
tail_ = nullptr; // The queue is now empty.
|
||||
}
|
||||
size_--;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void Environment::NativeImmediateQueue::Push(
|
||||
std::unique_ptr<Environment::NativeImmediateCallback> cb) {
|
||||
NativeImmediateCallback* prev_tail = tail_;
|
||||
|
||||
size_++;
|
||||
tail_ = cb.get();
|
||||
if (prev_tail != nullptr)
|
||||
prev_tail->set_next(std::move(cb));
|
||||
else
|
||||
head_ = std::move(cb);
|
||||
}
|
||||
|
||||
void Environment::NativeImmediateQueue::ConcatMove(
|
||||
NativeImmediateQueue&& other) {
|
||||
size_ += other.size_;
|
||||
if (tail_ != nullptr)
|
||||
tail_->set_next(std::move(other.head_));
|
||||
else
|
||||
head_ = std::move(other.head_);
|
||||
tail_ = other.tail_;
|
||||
other.tail_ = nullptr;
|
||||
other.size_ = 0;
|
||||
}
|
||||
|
||||
size_t Environment::NativeImmediateQueue::size() const {
|
||||
return size_.load();
|
||||
}
|
||||
|
||||
template <typename Fn>
|
||||
void Environment::CreateImmediate(Fn&& cb, bool ref) {
|
||||
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
|
||||
std::move(cb), ref);
|
||||
auto callback = native_immediates_.CreateCallback(std::move(cb), ref);
|
||||
native_immediates_.Push(std::move(callback));
|
||||
}
|
||||
|
||||
@ -768,8 +728,8 @@ void Environment::SetUnrefImmediate(Fn&& cb) {
|
||||
|
||||
template <typename Fn>
|
||||
void Environment::SetImmediateThreadsafe(Fn&& cb) {
|
||||
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
|
||||
std::move(cb), false);
|
||||
auto callback =
|
||||
native_immediates_threadsafe_.CreateCallback(std::move(cb), false);
|
||||
{
|
||||
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
|
||||
native_immediates_threadsafe_.Push(std::move(callback));
|
||||
@ -780,8 +740,8 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) {
|
||||
|
||||
template <typename Fn>
|
||||
void Environment::RequestInterrupt(Fn&& cb) {
|
||||
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
|
||||
std::move(cb), false);
|
||||
auto callback =
|
||||
native_immediates_interrupts_.CreateCallback(std::move(cb), false);
|
||||
{
|
||||
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
|
||||
native_immediates_interrupts_.Push(std::move(callback));
|
||||
@ -791,34 +751,6 @@ void Environment::RequestInterrupt(Fn&& cb) {
|
||||
RequestInterruptFromV8();
|
||||
}
|
||||
|
||||
Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
|
||||
: refed_(refed) {}
|
||||
|
||||
bool Environment::NativeImmediateCallback::is_refed() const {
|
||||
return refed_;
|
||||
}
|
||||
|
||||
std::unique_ptr<Environment::NativeImmediateCallback>
|
||||
Environment::NativeImmediateCallback::get_next() {
|
||||
return std::move(next_);
|
||||
}
|
||||
|
||||
void Environment::NativeImmediateCallback::set_next(
|
||||
std::unique_ptr<NativeImmediateCallback> next) {
|
||||
next_ = std::move(next);
|
||||
}
|
||||
|
||||
template <typename Fn>
|
||||
Environment::NativeImmediateCallbackImpl<Fn>::NativeImmediateCallbackImpl(
|
||||
Fn&& callback, bool refed)
|
||||
: NativeImmediateCallback(refed),
|
||||
callback_(std::move(callback)) {}
|
||||
|
||||
template <typename Fn>
|
||||
void Environment::NativeImmediateCallbackImpl<Fn>::Call(Environment* env) {
|
||||
callback_(env);
|
||||
}
|
||||
|
||||
inline bool Environment::can_call_into_js() const {
|
||||
return can_call_into_js_ && !is_stopping();
|
||||
}
|
||||
|
||||
@ -729,7 +729,7 @@ void Environment::RunAndClearInterrupts() {
|
||||
}
|
||||
DebugSealHandleScope seal_handle_scope(isolate());
|
||||
|
||||
while (std::unique_ptr<NativeImmediateCallback> head = queue.Shift())
|
||||
while (auto head = queue.Shift())
|
||||
head->Call(this);
|
||||
}
|
||||
}
|
||||
@ -755,8 +755,7 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
|
||||
auto drain_list = [&]() {
|
||||
TryCatchScope try_catch(this);
|
||||
DebugSealHandleScope seal_handle_scope(isolate());
|
||||
while (std::unique_ptr<NativeImmediateCallback> head =
|
||||
native_immediates_.Shift()) {
|
||||
while (auto head = native_immediates_.Shift()) {
|
||||
if (head->is_refed())
|
||||
ref_count++;
|
||||
|
||||
|
||||
45
src/env.h
45
src/env.h
@ -29,6 +29,7 @@
|
||||
#include "inspector_agent.h"
|
||||
#include "inspector_profiler.h"
|
||||
#endif
|
||||
#include "callback_queue.h"
|
||||
#include "debug_utils.h"
|
||||
#include "handle_wrap.h"
|
||||
#include "node.h"
|
||||
@ -1368,49 +1369,7 @@ class Environment : public MemoryRetainer {
|
||||
|
||||
std::list<ExitCallback> at_exit_functions_;
|
||||
|
||||
class NativeImmediateCallback {
|
||||
public:
|
||||
explicit inline NativeImmediateCallback(bool refed);
|
||||
|
||||
virtual ~NativeImmediateCallback() = default;
|
||||
virtual void Call(Environment* env) = 0;
|
||||
|
||||
inline bool is_refed() const;
|
||||
inline std::unique_ptr<NativeImmediateCallback> get_next();
|
||||
inline void set_next(std::unique_ptr<NativeImmediateCallback> next);
|
||||
|
||||
private:
|
||||
bool refed_;
|
||||
std::unique_ptr<NativeImmediateCallback> next_;
|
||||
};
|
||||
|
||||
template <typename Fn>
|
||||
class NativeImmediateCallbackImpl final : public NativeImmediateCallback {
|
||||
public:
|
||||
NativeImmediateCallbackImpl(Fn&& callback, bool refed);
|
||||
void Call(Environment* env) override;
|
||||
|
||||
private:
|
||||
Fn callback_;
|
||||
};
|
||||
|
||||
class NativeImmediateQueue {
|
||||
public:
|
||||
inline std::unique_ptr<NativeImmediateCallback> Shift();
|
||||
inline void Push(std::unique_ptr<NativeImmediateCallback> cb);
|
||||
// ConcatMove adds elements from 'other' to the end of this list, and clears
|
||||
// 'other' afterwards.
|
||||
inline void ConcatMove(NativeImmediateQueue&& other);
|
||||
|
||||
// size() is atomic and may be called from any thread.
|
||||
inline size_t size() const;
|
||||
|
||||
private:
|
||||
std::atomic<size_t> size_ {0};
|
||||
std::unique_ptr<NativeImmediateCallback> head_;
|
||||
NativeImmediateCallback* tail_ = nullptr;
|
||||
};
|
||||
|
||||
typedef CallbackQueue<void, Environment*> NativeImmediateQueue;
|
||||
NativeImmediateQueue native_immediates_;
|
||||
Mutex native_immediates_threadsafe_mutex_;
|
||||
NativeImmediateQueue native_immediates_threadsafe_;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user