25
25
#include < windows.h>
26
26
#endif
27
27
28
+ // for pthread thread_local emulation
29
+ #if defined(EMULATE_PTHREAD_THREAD_LOCAL)
30
+ # include < pthread.h>
31
+ #endif
32
+
28
33
namespace async {
29
34
namespace detail {
30
35
@@ -65,14 +70,76 @@ struct threadpool_data {
65
70
#endif
66
71
};
67
72
73
+ // this wrapper encapsulates both the owning_threadpool pointer and the thread id.
74
+ // this is done to improve performance on the emulated thread_local reducing the number
75
+ // of calls to "pthread_getspecific"
76
+ struct threadpool_data_wrapper {
77
+ threadpool_data* owning_threadpool;
78
+ std::size_t thread_id;
79
+
80
+ threadpool_data_wrapper (threadpool_data* owning_threadpool, std::size_t thread_id):
81
+ owning_threadpool (owning_threadpool), thread_id(thread_id) { }
82
+ };
83
+
84
+ #if defined(EMULATE_PTHREAD_THREAD_LOCAL)
85
+ struct pthread_emulation_threadpool_data_initializer {
86
+ pthread_key_t key;
87
+
88
+ pthread_emulation_threadpool_data_initializer ()
89
+ {
90
+ pthread_key_create (&key, [](void * wrapper_ptr) {
91
+ threadpool_data_wrapper* wrapper = static_cast <threadpool_data_wrapper*>(wrapper_ptr);
92
+ delete wrapper;
93
+ });
94
+ }
95
+
96
+ ~pthread_emulation_threadpool_data_initializer ()
97
+ {
98
+ pthread_key_delete (key);
99
+ }
100
+ };
101
+
102
+ static pthread_key_t get_local_threadpool_data_key ()
103
+ {
104
+ static pthread_emulation_threadpool_data_initializer initializer;
105
+ return initializer.key ;
106
+ }
107
+
108
+ #else
68
109
// Thread pool this thread belongs to, or null if not in pool
69
110
static THREAD_LOCAL threadpool_data* owning_threadpool = nullptr ;
70
111
71
112
// Current thread's index in the pool
72
113
static THREAD_LOCAL std::size_t thread_id;
114
+ #endif
115
+
116
+ static void create_threadpool_data (threadpool_data* owning_threadpool_, std::size_t thread_id_)
117
+ {
118
+ #if defined(EMULATE_PTHREAD_THREAD_LOCAL)
119
+ // the memory allocated here gets deallocated by the lambda declared on the key creation
120
+ pthread_setspecific (get_local_threadpool_data_key (), new threadpool_data_wrapper (owning_threadpool_, thread_id_));
121
+ #else
122
+ owning_threadpool = _owning_threadpool;
123
+ thread_id = _thread_id;
124
+ #endif
125
+ }
126
+
127
+ static threadpool_data_wrapper get_threadpool_data_wrapper ()
128
+ {
129
+ #if defined(EMULATE_PTHREAD_THREAD_LOCAL)
130
+ threadpool_data_wrapper* wrapper = static_cast <threadpool_data_wrapper*>(pthread_getspecific (get_local_threadpool_data_key ()));
131
+ if (wrapper == nullptr ) {
132
+ // if, for some reason, the wrapper is not set, this won't cause a crash
133
+ return threadpool_data_wrapper (nullptr , 0 );
134
+ }
135
+ return *wrapper;
136
+ #else
137
+ return threadpool_data_wrapper (owning_threadpool, thread_id);
138
+ #endif
139
+ }
73
140
74
141
// Try to steal a task from another thread's queue
75
- static task_run_handle steal_task (threadpool_data* impl)
142
+ static task_run_handle steal_task (threadpool_data* impl, std:: size_t thread_id )
76
143
{
77
144
// Make a list of victim thread ids and shuffle it
78
145
std::vector<std::size_t > victims (impl->thread_data .size ());
@@ -97,10 +164,10 @@ static task_run_handle steal_task(threadpool_data* impl)
97
164
98
165
// Main task stealing loop which is used by worker threads when they have
99
166
// nothing to do.
100
- static void thread_task_loop (threadpool_data* impl, task_wait_handle wait_task)
167
+ static void thread_task_loop (threadpool_data* impl, std:: size_t thread_id, task_wait_handle wait_task)
101
168
{
102
169
// Get our thread's data
103
- thread_data_t & current_thread = owning_threadpool ->thread_data [thread_id];
170
+ thread_data_t & current_thread = impl ->thread_data [thread_id];
104
171
105
172
// Flag indicating if we have added a continuation to the task
106
173
bool added_continuation = false ;
@@ -121,7 +188,7 @@ static void thread_task_loop(threadpool_data* impl, task_wait_handle wait_task)
121
188
// Stealing loop
122
189
while (true ) {
123
190
// Try to steal a task
124
- if (task_run_handle t = steal_task (impl)) {
191
+ if (task_run_handle t = steal_task (impl, thread_id )) {
125
192
t.run ();
126
193
break ;
127
194
}
@@ -189,26 +256,26 @@ static void thread_task_loop(threadpool_data* impl, task_wait_handle wait_task)
189
256
// Wait for a task to complete (for worker threads inside thread pool)
190
257
static void threadpool_wait_handler (task_wait_handle wait_task)
191
258
{
192
- thread_task_loop (owning_threadpool, wait_task);
259
+ threadpool_data_wrapper wrapper = get_threadpool_data_wrapper ();
260
+ thread_task_loop (wrapper.owning_threadpool , wrapper.thread_id , wait_task);
193
261
}
194
262
195
263
// Worker thread main loop
196
- static void worker_thread (threadpool_data* impl , std::size_t id )
264
+ static void worker_thread (threadpool_data* owning_threadpool , std::size_t thread_id )
197
265
{
198
- // Save the thread id and owning threadpool
199
- owning_threadpool = impl;
200
- thread_id = id;
266
+ // store on the local thread data
267
+ create_threadpool_data (owning_threadpool, thread_id);
201
268
202
269
// Set the wait handler so threads from the pool do useful work while
203
270
// waiting for another task to finish.
204
271
set_thread_wait_handler (threadpool_wait_handler);
205
272
206
273
// Seed the random number generator with our id. This gives each thread a
207
274
// different steal order.
208
- impl ->thread_data [thread_id].rng .seed (static_cast <std::minstd_rand::result_type>(thread_id));
275
+ owning_threadpool ->thread_data [thread_id].rng .seed (static_cast <std::minstd_rand::result_type>(thread_id));
209
276
210
277
// Main loop, runs until the shutdown signal is recieved
211
- thread_task_loop (impl , task_wait_handle ());
278
+ thread_task_loop (owning_threadpool, thread_id , task_wait_handle ());
212
279
}
213
280
214
281
// Recursive function to spawn all worker threads in parallel
@@ -296,10 +363,12 @@ threadpool_scheduler::~threadpool_scheduler()
296
363
// Schedule a task on the thread pool
297
364
void threadpool_scheduler::schedule (task_run_handle t)
298
365
{
366
+ detail::threadpool_data_wrapper wrapper = detail::get_threadpool_data_wrapper ();
367
+
299
368
// Check if we are in the thread pool
300
- if (detail:: owning_threadpool == impl.get ()) {
369
+ if (wrapper. owning_threadpool == impl.get ()) {
301
370
// Push the task onto our task queue
302
- impl->thread_data [detail:: thread_id].queue .push (std::move (t));
371
+ impl->thread_data [wrapper. thread_id ].queue .push (std::move (t));
303
372
304
373
// If there are no sleeping threads, just return. We check outside the
305
374
// lock to avoid locking overhead in the fast path.
0 commit comments