Skip to content

Commit d12ee6e

Browse files
authored
src: improve thread safety of TaskQueue
PR-URL: #57910 Reviewed-By: Ben Noordhuis <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Joyee Cheung <[email protected]>
1 parent a365da6 commit d12ee6e

File tree

2 files changed

+130
-102
lines changed

2 files changed

+130
-102
lines changed

src/node_platform.cc

+106-94
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ static void PlatformWorkerThread(void* data) {
4040
worker_data->platform_workers_ready->Signal(lock);
4141
}
4242

43-
while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
43+
while (std::unique_ptr<Task> task =
44+
pending_worker_tasks->Lock().BlockingPop()) {
4445
task->Run();
45-
pending_worker_tasks->NotifyOfCompletion();
46+
pending_worker_tasks->Lock().NotifyOfCompletion();
4647
}
4748
}
4849

@@ -73,13 +74,15 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
7374
}
7475

7576
void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
76-
tasks_.Push(std::make_unique<ScheduleTask>(this, std::move(task),
77-
delay_in_seconds));
77+
auto locked = tasks_.Lock();
78+
locked.Push(std::make_unique<ScheduleTask>(
79+
this, std::move(task), delay_in_seconds));
7880
uv_async_send(&flush_tasks_);
7981
}
8082

8183
void Stop() {
82-
tasks_.Push(std::make_unique<StopTask>(this));
84+
auto locked = tasks_.Lock();
85+
locked.Push(std::make_unique<StopTask>(this));
8386
uv_async_send(&flush_tasks_);
8487
}
8588

@@ -100,8 +103,14 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
100103
static void FlushTasks(uv_async_t* flush_tasks) {
101104
DelayedTaskScheduler* scheduler =
102105
ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
103-
while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
106+
107+
std::queue<std::unique_ptr<Task>> tasks_to_run =
108+
scheduler->tasks_.Lock().PopAll();
109+
while (!tasks_to_run.empty()) {
110+
std::unique_ptr<Task> task = std::move(tasks_to_run.front());
111+
tasks_to_run.pop();
104112
task->Run();
113+
}
105114
}
106115

107116
class StopTask : public Task {
@@ -149,7 +158,8 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
149158
static void RunTask(uv_timer_t* timer) {
150159
DelayedTaskScheduler* scheduler =
151160
ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
152-
scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
161+
scheduler->pending_worker_tasks_->Lock().Push(
162+
scheduler->TakeTimerTask(timer));
153163
}
154164

155165
std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
@@ -203,7 +213,7 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
203213
}
204214

205215
void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
206-
pending_worker_tasks_.Push(std::move(task));
216+
pending_worker_tasks_.Lock().Push(std::move(task));
207217
}
208218

209219
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
@@ -212,11 +222,11 @@ void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
212222
}
213223

214224
void WorkerThreadsTaskRunner::BlockingDrain() {
215-
pending_worker_tasks_.BlockingDrain();
225+
pending_worker_tasks_.Lock().BlockingDrain();
216226
}
217227

218228
void WorkerThreadsTaskRunner::Shutdown() {
219-
pending_worker_tasks_.Stop();
229+
pending_worker_tasks_.Lock().Stop();
220230
delayed_task_scheduler_->Stop();
221231
for (size_t i = 0; i < threads_.size(); i++) {
222232
CHECK_EQ(0, uv_thread_join(threads_[i].get()));
@@ -253,29 +263,27 @@ void PerIsolatePlatformData::PostIdleTaskImpl(
253263

254264
void PerIsolatePlatformData::PostTaskImpl(std::unique_ptr<Task> task,
255265
const v8::SourceLocation& location) {
256-
if (flush_tasks_ == nullptr) {
257-
// V8 may post tasks during Isolate disposal. In that case, the only
258-
// sensible path forward is to discard the task.
259-
return;
260-
}
261-
foreground_tasks_.Push(std::move(task));
266+
// The task can be posted from any V8 background worker thread, even when
267+
// the foreground task runner is being cleaned up by Shutdown(). In that
268+
// case, make sure we wait until the shutdown is completed (which leads
269+
// to flush_tasks_ == nullptr, and the task will be discarded).
270+
auto locked = foreground_tasks_.Lock();
271+
if (flush_tasks_ == nullptr) return;
272+
locked.Push(std::move(task));
262273
uv_async_send(flush_tasks_);
263274
}
264275

265276
void PerIsolatePlatformData::PostDelayedTaskImpl(
266277
std::unique_ptr<Task> task,
267278
double delay_in_seconds,
268279
const v8::SourceLocation& location) {
269-
if (flush_tasks_ == nullptr) {
270-
// V8 may post tasks during Isolate disposal. In that case, the only
271-
// sensible path forward is to discard the task.
272-
return;
273-
}
280+
auto locked = foreground_delayed_tasks_.Lock();
281+
if (flush_tasks_ == nullptr) return;
274282
std::unique_ptr<DelayedTask> delayed(new DelayedTask());
275283
delayed->task = std::move(task);
276284
delayed->platform_data = shared_from_this();
277285
delayed->timeout = delay_in_seconds;
278-
foreground_delayed_tasks_.Push(std::move(delayed));
286+
locked.Push(std::move(delayed));
279287
uv_async_send(flush_tasks_);
280288
}
281289

@@ -301,32 +309,30 @@ void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
301309
}
302310

303311
void PerIsolatePlatformData::Shutdown() {
304-
if (flush_tasks_ == nullptr)
305-
return;
312+
auto foreground_tasks_locked = foreground_tasks_.Lock();
313+
auto foreground_delayed_tasks_locked = foreground_delayed_tasks_.Lock();
306314

307-
// While there should be no V8 tasks in the queues at this point, it is
308-
// possible that Node.js-internal tasks from e.g. the inspector are still
309-
// lying around. We clear these queues and ignore the return value,
310-
// effectively deleting the tasks instead of running them.
311-
foreground_delayed_tasks_.PopAll();
312-
foreground_tasks_.PopAll();
315+
foreground_delayed_tasks_locked.PopAll();
316+
foreground_tasks_locked.PopAll();
313317
scheduled_delayed_tasks_.clear();
314318

315-
// Both destroying the scheduled_delayed_tasks_ lists and closing
316-
// flush_tasks_ handle add tasks to the event loop. We keep a count of all
317-
// non-closed handles, and when that reaches zero, we inform any shutdown
318-
// callbacks that the platform is done as far as this Isolate is concerned.
319-
self_reference_ = shared_from_this();
320-
uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
321-
[](uv_handle_t* handle) {
322-
std::unique_ptr<uv_async_t> flush_tasks {
323-
reinterpret_cast<uv_async_t*>(handle) };
324-
PerIsolatePlatformData* platform_data =
325-
static_cast<PerIsolatePlatformData*>(flush_tasks->data);
326-
platform_data->DecreaseHandleCount();
327-
platform_data->self_reference_.reset();
328-
});
329-
flush_tasks_ = nullptr;
319+
if (flush_tasks_ != nullptr) {
320+
// Both destroying the scheduled_delayed_tasks_ lists and closing
321+
// flush_tasks_ handle add tasks to the event loop. We keep a count of all
322+
// non-closed handles, and when that reaches zero, we inform any shutdown
323+
// callbacks that the platform is done as far as this Isolate is concerned.
324+
self_reference_ = shared_from_this();
325+
uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
326+
[](uv_handle_t* handle) {
327+
std::unique_ptr<uv_async_t> flush_tasks{
328+
reinterpret_cast<uv_async_t*>(handle)};
329+
PerIsolatePlatformData* platform_data =
330+
static_cast<PerIsolatePlatformData*>(flush_tasks->data);
331+
platform_data->DecreaseHandleCount();
332+
platform_data->self_reference_.reset();
333+
});
334+
flush_tasks_ = nullptr;
335+
}
330336
}
331337

332338
void PerIsolatePlatformData::DecreaseHandleCount() {
@@ -472,39 +478,48 @@ void NodePlatform::DrainTasks(Isolate* isolate) {
472478
bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
473479
bool did_work = false;
474480

475-
while (std::unique_ptr<DelayedTask> delayed =
476-
foreground_delayed_tasks_.Pop()) {
481+
std::queue<std::unique_ptr<DelayedTask>> delayed_tasks_to_schedule =
482+
foreground_delayed_tasks_.Lock().PopAll();
483+
while (!delayed_tasks_to_schedule.empty()) {
484+
std::unique_ptr<DelayedTask> delayed =
485+
std::move(delayed_tasks_to_schedule.front());
486+
delayed_tasks_to_schedule.pop();
487+
477488
did_work = true;
478489
uint64_t delay_millis = llround(delayed->timeout * 1000);
479490

480491
delayed->timer.data = static_cast<void*>(delayed.get());
481492
uv_timer_init(loop_, &delayed->timer);
482-
// Timers may not guarantee queue ordering of events with the same delay if
483-
// the delay is non-zero. This should not be a problem in practice.
493+
// Timers may not guarantee queue ordering of events with the same delay
494+
// if the delay is non-zero. This should not be a problem in practice.
484495
uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
485496
uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
486497
uv_handle_count_++;
487498

488-
scheduled_delayed_tasks_.emplace_back(delayed.release(),
489-
[](DelayedTask* delayed) {
490-
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
491-
[](uv_handle_t* handle) {
492-
std::unique_ptr<DelayedTask> task {
493-
static_cast<DelayedTask*>(handle->data) };
494-
task->platform_data->DecreaseHandleCount();
495-
});
496-
});
499+
scheduled_delayed_tasks_.emplace_back(
500+
delayed.release(), [](DelayedTask* delayed) {
501+
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
502+
[](uv_handle_t* handle) {
503+
std::unique_ptr<DelayedTask> task{
504+
static_cast<DelayedTask*>(handle->data)};
505+
task->platform_data->DecreaseHandleCount();
506+
});
507+
});
508+
}
509+
510+
std::queue<std::unique_ptr<Task>> tasks;
511+
{
512+
auto locked = foreground_tasks_.Lock();
513+
tasks = locked.PopAll();
497514
}
498-
// Move all foreground tasks into a separate queue and flush that queue.
499-
// This way tasks that are posted while flushing the queue will be run on the
500-
// next call of FlushForegroundTasksInternal.
501-
std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
515+
502516
while (!tasks.empty()) {
503517
std::unique_ptr<Task> task = std::move(tasks.front());
504518
tasks.pop();
505519
did_work = true;
506520
RunForegroundTask(std::move(task));
507521
}
522+
508523
return did_work;
509524
}
510525

@@ -594,66 +609,63 @@ TaskQueue<T>::TaskQueue()
594609
outstanding_tasks_(0), stopped_(false), task_queue_() { }
595610

596611
template <class T>
597-
void TaskQueue<T>::Push(std::unique_ptr<T> task) {
598-
Mutex::ScopedLock scoped_lock(lock_);
599-
outstanding_tasks_++;
600-
task_queue_.push(std::move(task));
601-
tasks_available_.Signal(scoped_lock);
612+
TaskQueue<T>::Locked::Locked(TaskQueue* queue)
613+
: queue_(queue), lock_(queue->lock_) {}
614+
615+
template <class T>
616+
void TaskQueue<T>::Locked::Push(std::unique_ptr<T> task) {
617+
queue_->outstanding_tasks_++;
618+
queue_->task_queue_.push(std::move(task));
619+
queue_->tasks_available_.Signal(lock_);
602620
}
603621

604622
template <class T>
605-
std::unique_ptr<T> TaskQueue<T>::Pop() {
606-
Mutex::ScopedLock scoped_lock(lock_);
607-
if (task_queue_.empty()) {
623+
std::unique_ptr<T> TaskQueue<T>::Locked::Pop() {
624+
if (queue_->task_queue_.empty()) {
608625
return std::unique_ptr<T>(nullptr);
609626
}
610-
std::unique_ptr<T> result = std::move(task_queue_.front());
611-
task_queue_.pop();
627+
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
628+
queue_->task_queue_.pop();
612629
return result;
613630
}
614631

615632
template <class T>
616-
std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
617-
Mutex::ScopedLock scoped_lock(lock_);
618-
while (task_queue_.empty() && !stopped_) {
619-
tasks_available_.Wait(scoped_lock);
633+
std::unique_ptr<T> TaskQueue<T>::Locked::BlockingPop() {
634+
while (queue_->task_queue_.empty() && !queue_->stopped_) {
635+
queue_->tasks_available_.Wait(lock_);
620636
}
621-
if (stopped_) {
637+
if (queue_->stopped_) {
622638
return std::unique_ptr<T>(nullptr);
623639
}
624-
std::unique_ptr<T> result = std::move(task_queue_.front());
625-
task_queue_.pop();
640+
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
641+
queue_->task_queue_.pop();
626642
return result;
627643
}
628644

629645
template <class T>
630-
void TaskQueue<T>::NotifyOfCompletion() {
631-
Mutex::ScopedLock scoped_lock(lock_);
632-
if (--outstanding_tasks_ == 0) {
633-
tasks_drained_.Broadcast(scoped_lock);
646+
void TaskQueue<T>::Locked::NotifyOfCompletion() {
647+
if (--queue_->outstanding_tasks_ == 0) {
648+
queue_->tasks_drained_.Broadcast(lock_);
634649
}
635650
}
636651

637652
template <class T>
638-
void TaskQueue<T>::BlockingDrain() {
639-
Mutex::ScopedLock scoped_lock(lock_);
640-
while (outstanding_tasks_ > 0) {
641-
tasks_drained_.Wait(scoped_lock);
653+
void TaskQueue<T>::Locked::BlockingDrain() {
654+
while (queue_->outstanding_tasks_ > 0) {
655+
queue_->tasks_drained_.Wait(lock_);
642656
}
643657
}
644658

645659
template <class T>
646-
void TaskQueue<T>::Stop() {
647-
Mutex::ScopedLock scoped_lock(lock_);
648-
stopped_ = true;
649-
tasks_available_.Broadcast(scoped_lock);
660+
void TaskQueue<T>::Locked::Stop() {
661+
queue_->stopped_ = true;
662+
queue_->tasks_available_.Broadcast(lock_);
650663
}
651664

652665
template <class T>
653-
std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
654-
Mutex::ScopedLock scoped_lock(lock_);
666+
std::queue<std::unique_ptr<T>> TaskQueue<T>::Locked::PopAll() {
655667
std::queue<std::unique_ptr<T>> result;
656-
result.swap(task_queue_);
668+
result.swap(queue_->task_queue_);
657669
return result;
658670
}
659671

src/node_platform.h

+24-8
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,28 @@ class PerIsolatePlatformData;
2222
template <class T>
2323
class TaskQueue {
2424
public:
25+
class Locked {
26+
public:
27+
void Push(std::unique_ptr<T> task);
28+
std::unique_ptr<T> Pop();
29+
std::unique_ptr<T> BlockingPop();
30+
void NotifyOfCompletion();
31+
void BlockingDrain();
32+
void Stop();
33+
std::queue<std::unique_ptr<T>> PopAll();
34+
35+
private:
36+
friend class TaskQueue;
37+
explicit Locked(TaskQueue* queue);
38+
39+
TaskQueue* queue_;
40+
Mutex::ScopedLock lock_;
41+
};
42+
2543
TaskQueue();
2644
~TaskQueue() = default;
2745

28-
void Push(std::unique_ptr<T> task);
29-
std::unique_ptr<T> Pop();
30-
std::unique_ptr<T> BlockingPop();
31-
std::queue<std::unique_ptr<T>> PopAll();
32-
void NotifyOfCompletion();
33-
void BlockingDrain();
34-
void Stop();
46+
Locked Lock() { return Locked(this); }
3547

3648
private:
3749
Mutex lock_;
@@ -98,6 +110,8 @@ class PerIsolatePlatformData
98110
void RunForegroundTask(std::unique_ptr<v8::Task> task);
99111
static void RunForegroundTask(uv_timer_t* timer);
100112

113+
uv_async_t* flush_tasks_ = nullptr;
114+
101115
struct ShutdownCallback {
102116
void (*cb)(void*);
103117
void* data;
@@ -110,7 +124,9 @@ class PerIsolatePlatformData
110124

111125
v8::Isolate* const isolate_;
112126
uv_loop_t* const loop_;
113-
uv_async_t* flush_tasks_ = nullptr;
127+
128+
// When acquiring locks for both task queues, lock foreground_tasks_
129+
// first then foreground_delayed_tasks_ to avoid deadlocks.
114130
TaskQueue<v8::Task> foreground_tasks_;
115131
TaskQueue<DelayedTask> foreground_delayed_tasks_;
116132

0 commit comments

Comments
 (0)