100.00% Lines (128/128) 100.00% Functions (25/25)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // Copyright (c) 2026 Michael Vandeberg 3   // Copyright (c) 2026 Michael Vandeberg
4   // 4   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 7   //
8   // Official repository: https://github.com/boostorg/capy 8   // Official repository: https://github.com/boostorg/capy
9   // 9   //
10   10  
11   #include <boost/capy/ex/thread_pool.hpp> 11   #include <boost/capy/ex/thread_pool.hpp>
12 - #include <boost/capy/detail/thread_local_ptr.hpp>  
13   #include <boost/capy/continuation.hpp> 12   #include <boost/capy/continuation.hpp>
14   #include <boost/capy/ex/frame_allocator.hpp> 13   #include <boost/capy/ex/frame_allocator.hpp>
15   #include <boost/capy/test/thread_name.hpp> 14   #include <boost/capy/test/thread_name.hpp>
16   #include <algorithm> 15   #include <algorithm>
17   #include <atomic> 16   #include <atomic>
18   #include <condition_variable> 17   #include <condition_variable>
19   #include <cstdio> 18   #include <cstdio>
20   #include <mutex> 19   #include <mutex>
21   #include <thread> 20   #include <thread>
22   #include <vector> 21   #include <vector>
23   22  
24   /* 23   /*
25   Thread pool implementation using a shared work queue. 24   Thread pool implementation using a shared work queue.
26   25  
27   Work items are continuations linked via their intrusive next pointer, 26   Work items are continuations linked via their intrusive next pointer,
28   stored in a single queue protected by a mutex. No per-post heap 27   stored in a single queue protected by a mutex. No per-post heap
29   allocation: the continuation is owned by the caller and linked 28   allocation: the continuation is owned by the caller and linked
30   directly. Worker threads wait on a condition_variable until work 29   directly. Worker threads wait on a condition_variable until work
31   is available or stop is requested. 30   is available or stop is requested.
32   31  
33   Threads are started lazily on first post() via std::call_once to avoid 32   Threads are started lazily on first post() via std::call_once to avoid
34   spawning threads for pools that are constructed but never used. Each 33   spawning threads for pools that are constructed but never used. Each
35   thread is named with a configurable prefix plus index for debugger 34   thread is named with a configurable prefix plus index for debugger
36   visibility. 35   visibility.
37   36  
38   Work tracking: on_work_started/on_work_finished maintain an atomic 37   Work tracking: on_work_started/on_work_finished maintain an atomic
39   outstanding_work_ counter. join() blocks until this counter reaches 38   outstanding_work_ counter. join() blocks until this counter reaches
40   zero, then signals workers to stop and joins threads. 39   zero, then signals workers to stop and joins threads.
41   40  
42   Two shutdown paths: 41   Two shutdown paths:
43   - join(): waits for outstanding work to drain, then stops workers. 42   - join(): waits for outstanding work to drain, then stops workers.
44   - stop(): immediately signals workers to exit; queued work is abandoned. 43   - stop(): immediately signals workers to exit; queued work is abandoned.
45   - Destructor: stop() then join() (abandon + wait for threads). 44   - Destructor: stop() then join() (abandon + wait for threads).
46   */ 45   */
47   46  
48   namespace boost { 47   namespace boost {
49   namespace capy { 48   namespace capy {
50   49  
51   //------------------------------------------------------------------------------ 50   //------------------------------------------------------------------------------
52   51  
53   class thread_pool::impl 52   class thread_pool::impl
54 - // Identifies the pool owning the current worker thread, or  
55 - // nullptr if the calling thread is not a pool worker. Checked  
56 - // by dispatch() to decide between symmetric transfer (inline  
57 - // resume) and post.  
58 - static inline detail::thread_local_ptr<impl const> current_;  
59 -  
60   { 53   {
61   // Intrusive queue of continuations via continuation::next. 54   // Intrusive queue of continuations via continuation::next.
62   // No per-post allocation: the continuation is owned by the caller. 55   // No per-post allocation: the continuation is owned by the caller.
63   continuation* head_ = nullptr; 56   continuation* head_ = nullptr;
64   continuation* tail_ = nullptr; 57   continuation* tail_ = nullptr;
65   58  
HITCBC 66   821 void push(continuation* c) noexcept 59   19817 void push(continuation* c) noexcept
67   { 60   {
HITCBC 68   821 c->next = nullptr; 61   19817 c->next = nullptr;
HITCBC 69   821 if(tail_) 62   19817 if(tail_)
HITCBC 70   599 tail_->next = c; 63   1704 tail_->next = c;
71   else 64   else
HITCBC 72   222 head_ = c; 65   18113 head_ = c;
HITCBC 73   821 tail_ = c; 66   19817 tail_ = c;
HITCBC 74   821 } 67   19817 }
75   68  
HITCBC 76   982 continuation* pop() noexcept 69   19980 continuation* pop() noexcept
77   { 70   {
HITCBC 78   982 if(!head_) 71   19980 if(!head_)
HITCBC 79   161 return nullptr; 72   163 return nullptr;
HITCBC 80   821 continuation* c = head_; 73   19817 continuation* c = head_;
HITCBC 81   821 head_ = head_->next; 74   19817 head_ = head_->next;
HITCBC 82   821 if(!head_) 75   19817 if(!head_)
HITCBC 83   222 tail_ = nullptr; 76   18113 tail_ = nullptr;
HITCBC 84   821 return c; 77   19817 return c;
85   } 78   }
86   79  
HITCBC 87   1012 bool empty() const noexcept 80   38553 bool empty() const noexcept
88   { 81   {
HITCBC 89   1012 return head_ == nullptr; 82   38553 return head_ == nullptr;
90   } 83   }
91   84  
92   std::mutex mutex_; 85   std::mutex mutex_;
93   std::condition_variable work_cv_; 86   std::condition_variable work_cv_;
94   std::condition_variable done_cv_; 87   std::condition_variable done_cv_;
95   std::vector<std::thread> threads_; 88   std::vector<std::thread> threads_;
96   std::atomic<std::size_t> outstanding_work_{0}; 89   std::atomic<std::size_t> outstanding_work_{0};
97   bool stop_{false}; 90   bool stop_{false};
98   bool joined_{false}; 91   bool joined_{false};
99   std::size_t num_threads_; 92   std::size_t num_threads_;
100   char thread_name_prefix_[13]{}; // 12 chars max + null terminator 93   char thread_name_prefix_[13]{}; // 12 chars max + null terminator
101   std::once_flag start_flag_; 94   std::once_flag start_flag_;
102   95  
103   public: 96   public:
HITCBC 104   161 ~impl() = default; 97   163 ~impl() = default;
105 - bool  
106 - running_in_this_thread() const noexcept  
DCB 107 - 355 {  
108 - return current_.get() == this;  
DCB 109 - 355 }  
110 -  
111   98  
112   // Destroy abandoned coroutine frames. Must be called 99   // Destroy abandoned coroutine frames. Must be called
113   // before execution_context::shutdown()/destroy() so 100   // before execution_context::shutdown()/destroy() so
114   // that suspended-frame destructors (e.g. delay_awaitable 101   // that suspended-frame destructors (e.g. delay_awaitable
115   // calling timer_service::cancel()) run while services 102   // calling timer_service::cancel()) run while services
116   // are still valid. 103   // are still valid.
117   void 104   void
HITCBC 118   161 drain_abandoned() noexcept 105   163 drain_abandoned() noexcept
119   { 106   {
HITCBC 120   372 while(auto* c = pop()) 107   389 while(auto* c = pop())
121   { 108   {
HITCBC 122   211 auto h = c->h; 109   226 auto h = c->h;
HITCBC 123   211 if(h && h != std::noop_coroutine()) 110   226 if(h && h != std::noop_coroutine())
HITCBC 124   160 h.destroy(); 111   174 h.destroy();
HITCBC 125   211 } 112   226 }
HITCBC 126   161 } 113   163 }
127   114  
HITCBC 128   161 impl(std::size_t num_threads, std::string_view thread_name_prefix) 115   163 impl(std::size_t num_threads, std::string_view thread_name_prefix)
HITCBC 129   161 : num_threads_(num_threads) 116   163 : num_threads_(num_threads)
130   { 117   {
HITCBC 131   161 if(num_threads_ == 0) 118   163 if(num_threads_ == 0)
HITCBC 132   4 num_threads_ = std::max( 119   4 num_threads_ = std::max(
HITCBC 133   2 std::thread::hardware_concurrency(), 1u); 120   2 std::thread::hardware_concurrency(), 1u);
134   121  
135   // Truncate prefix to 12 chars, leaving room for up to 3-digit index. 122   // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
HITCBC 136   161 auto n = thread_name_prefix.copy(thread_name_prefix_, 12); 123   163 auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
HITCBC 137   161 thread_name_prefix_[n] = '\0'; 124   163 thread_name_prefix_[n] = '\0';
HITCBC 138   161 } 125   163 }
139   126  
140   void 127   void
HITCBC 141   821 post(continuation& c) 128   19817 post(continuation& c)
142   { 129   {
HITCBC 143   821 ensure_started(); 130   19817 ensure_started();
144   { 131   {
HITCBC 145   821 std::lock_guard<std::mutex> lock(mutex_); 132   19817 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 146   821 push(&c); 133   19817 push(&c);
HITCBC 147   821 } 134   19817 }
HITCBC 148   821 work_cv_.notify_one(); 135   19817 work_cv_.notify_one();
HITCBC 149   821 } 136   19817 }
150   137  
151   void 138   void
HITCBC 152   347 on_work_started() noexcept 139   345 on_work_started() noexcept
153   { 140   {
HITCBC 154   347 outstanding_work_.fetch_add(1, std::memory_order_acq_rel); 141   345 outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
HITCBC 155   347 } 142   345 }
156   143  
157   void 144   void
HITCBC 158   347 on_work_finished() noexcept 145   345 on_work_finished() noexcept
159   { 146   {
HITCBC 160   347 if(outstanding_work_.fetch_sub( 147   345 if(outstanding_work_.fetch_sub(
HITCBC 161   347 1, std::memory_order_acq_rel) == 1) 148   345 1, std::memory_order_acq_rel) == 1)
162   { 149   {
HITCBC 163   87 std::lock_guard<std::mutex> lock(mutex_); 150   85 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 164   87 if(joined_ && !stop_) 151   85 if(joined_ && !stop_)
HITCBC 165   4 stop_ = true; 152   4 stop_ = true;
HITCBC 166   87 done_cv_.notify_all(); 153   85 done_cv_.notify_all();
HITCBC 167   87 work_cv_.notify_all(); 154   85 work_cv_.notify_all();
HITCBC 168   87 } 155   85 }
HITCBC 169   347 } 156   345 }
170   157  
171   void 158   void
HITCBC 172   173 join() noexcept 159   175 join() noexcept
173   { 160   {
174   { 161   {
HITCBC 175   173 std::unique_lock<std::mutex> lock(mutex_); 162   175 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 176   173 if(joined_) 163   175 if(joined_)
HITCBC 177   12 return; 164   12 return;
HITCBC 178   161 joined_ = true; 165   163 joined_ = true;
179   166  
HITCBC 180   161 if(outstanding_work_.load( 167   163 if(outstanding_work_.load(
HITCBC 181   161 std::memory_order_acquire) == 0) 168   163 std::memory_order_acquire) == 0)
182   { 169   {
HITCBC 183   104 stop_ = true; 170   106 stop_ = true;
HITCBC 184   104 work_cv_.notify_all(); 171   106 work_cv_.notify_all();
185   } 172   }
186   else 173   else
187   { 174   {
HITCBC 188   57 done_cv_.wait(lock, [this]{ 175   57 done_cv_.wait(lock, [this]{
HITCBC 189   62 return stop_; 176   62 return stop_;
190   }); 177   });
191   } 178   }
HITCBC 192   173 } 179   175 }
193   180  
HITCBC 194   345 for(auto& t : threads_) 181   353 for(auto& t : threads_)
HITCBC 195   184 if(t.joinable()) 182   190 if(t.joinable())
HITCBC 196   184 t.join(); 183   190 t.join();
197   } 184   }
198   185  
199   void 186   void
HITCBC 200   163 stop() noexcept 187   165 stop() noexcept
201   { 188   {
202   { 189   {
HITCBC 203   163 std::lock_guard<std::mutex> lock(mutex_); 190   165 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 204   163 stop_ = true; 191   165 stop_ = true;
HITCBC 205   163 } 192   165 }
HITCBC 206   163 work_cv_.notify_all(); 193   165 work_cv_.notify_all();
HITCBC 207   163 done_cv_.notify_all(); 194   165 done_cv_.notify_all();
HITCBC 208   163 } 195   165 }
209   196  
210   private: 197   private:
211   void 198   void
HITCBC 212   821 ensure_started() 199   19817 ensure_started()
213   { 200   {
HITCBC 214   821 std::call_once(start_flag_, [this]{ 201   19817 std::call_once(start_flag_, [this]{
HITCBC 215   105 threads_.reserve(num_threads_); 202   106 threads_.reserve(num_threads_);
HITCBC 216   289 for(std::size_t i = 0; i < num_threads_; ++i) 203   296 for(std::size_t i = 0; i < num_threads_; ++i)
HITCBC 217   368 threads_.emplace_back([this, i]{ run(i); }); 204   380 threads_.emplace_back([this, i]{ run(i); });
HITCBC 218   105 }); 205   106 });
HITCBC 219   821 } 206   19817 }
220   207  
221   void 208   void
HITCBC 222   184 run(std::size_t index) 209   190 run(std::size_t index)
223   { 210   {
224   // Build name; set_current_thread_name truncates to platform limits. 211   // Build name; set_current_thread_name truncates to platform limits.
225   char name[16]; 212   char name[16];
HITCBC 226   184 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index); 213   190 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
HITCBC 227   184 set_current_thread_name(name); 214   190 set_current_thread_name(name);
228 - // Mark this thread as a worker of this pool so dispatch()  
229 - // can symmetric-transfer when called from within pool work.  
230 - struct scoped_pool  
231 - {  
232 - scoped_pool(impl const* p) noexcept { current_.set(p); }  
DCB 233 - 184 ~scoped_pool() noexcept { current_.set(nullptr); }  
DCB 234 - 184 } guard(this);  
DCB 235 - 184  
236   215  
237   for(;;) 216   for(;;)
238   { 217   {
HITCBC 239   794 continuation* c = nullptr; 218   19781 continuation* c = nullptr;
240   { 219   {
HITCBC 241   794 std::unique_lock<std::mutex> lock(mutex_); 220   19781 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 242   794 work_cv_.wait(lock, [this]{ 221   19781 work_cv_.wait(lock, [this]{
HITCBC 243   1313 return !empty() || 222   57415 return !empty() ||
HITCBC 244   1313 stop_; 223   57415 stop_;
245   }); 224   });
HITCBC 246   794 if(stop_) 225   19781 if(stop_)
HITCBC 247   368 return; 226   380 return;
HITCBC 248   610 c = pop(); 227   19591 c = pop();
HITCBC 249   794 } 228   19781 }
HITCBC 250   610 if(c) 229   19591 if(c)
HITCBC 251   610 safe_resume(c->h); 230   19591 safe_resume(c->h);
HITCBC 252   610 } 231   19591 }
ECB 253   184 } 232   }
254   }; 233   };
255   234  
256   //------------------------------------------------------------------------------ 235   //------------------------------------------------------------------------------
257   236  
HITCBC 258   161 thread_pool:: 237   163 thread_pool::
259   ~thread_pool() 238   ~thread_pool()
260   { 239   {
HITCBC 261   161 impl_->stop(); 240   163 impl_->stop();
HITCBC 262   161 impl_->join(); 241   163 impl_->join();
HITCBC 263   161 impl_->drain_abandoned(); 242   163 impl_->drain_abandoned();
HITCBC 264   161 shutdown(); 243   163 shutdown();
HITCBC 265   161 destroy(); 244   163 destroy();
HITCBC 266   161 delete impl_; 245   163 delete impl_;
HITCBC 267   161 } 246   163 }
268   247  
HITCBC 269   161 thread_pool:: 248   163 thread_pool::
HITCBC 270   161 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix) 249   163 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
HITCBC 271   161 : impl_(new impl(num_threads, thread_name_prefix)) 250   163 : impl_(new impl(num_threads, thread_name_prefix))
272   { 251   {
HITCBC 273   161 this->set_frame_allocator(std::allocator<void>{}); 252   163 this->set_frame_allocator(std::allocator<void>{});
HITCBC 274   161 } 253   163 }
275   254  
276   void 255   void
HITCBC 277   12 thread_pool:: 256   12 thread_pool::
278   join() noexcept 257   join() noexcept
279   { 258   {
HITCBC 280   12 impl_->join(); 259   12 impl_->join();
HITCBC 281   12 } 260   12 }
282   261  
283   void 262   void
HITCBC 284   2 thread_pool:: 263   2 thread_pool::
285   stop() noexcept 264   stop() noexcept
286   { 265   {
HITCBC 287   2 impl_->stop(); 266   2 impl_->stop();
HITCBC 288   2 } 267   2 }
289   268  
290   //------------------------------------------------------------------------------ 269   //------------------------------------------------------------------------------
291   270  
292   thread_pool::executor_type 271   thread_pool::executor_type
HITCBC 293   167 thread_pool:: 272   11577 thread_pool::
294   get_executor() const noexcept 273   get_executor() const noexcept
295   { 274   {
HITCBC 296   167 return executor_type( 275   11577 return executor_type(
HITCBC 297   167 const_cast<thread_pool&>(*this)); 276   11577 const_cast<thread_pool&>(*this));
298   } 277   }
299   278  
300   void 279   void
HITCBC 301   347 thread_pool::executor_type:: 280   345 thread_pool::executor_type::
302   on_work_started() const noexcept 281   on_work_started() const noexcept
303   { 282   {
HITCBC 304   347 pool_->impl_->on_work_started(); 283   345 pool_->impl_->on_work_started();
HITCBC 305   347 } 284   345 }
306   285  
307   void 286   void
HITCBC 308   347 thread_pool::executor_type:: 287   345 thread_pool::executor_type::
309   on_work_finished() const noexcept 288   on_work_finished() const noexcept
310   { 289   {
HITCBC 311   347 pool_->impl_->on_work_finished(); 290   345 pool_->impl_->on_work_finished();
HITCBC 312   347 } 291   345 }
313   292  
314   void 293   void
HITCBC 315   479 thread_pool::executor_type:: 294   19817 thread_pool::executor_type::
316   post(continuation& c) const 295   post(continuation& c) const
317   { 296   {
DCB 318 - 479 }  
DCB 319 - 479  
320 - std::coroutine_handle<>  
321 - thread_pool::executor_type::  
DCB 322 - 355 dispatch(continuation& c) const  
323 - {  
324 - if(pool_->impl_->running_in_this_thread())  
DCB 325 - 355 return c.h;  
DCB 326 - 13 pool_->impl_->post(c);  
DCB 327 - 342 return std::noop_coroutine();  
HITCBC 328   342 pool_->impl_->post(c); 297   19817 pool_->impl_->post(c);
HITGIC 329   } 298   19817 }
330   299  
331   } // capy 300   } // capy
332   } // boost 301   } // boost