LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 100.0 % 128 128
Test Date: 2026-04-29 13:49:04 Functions: 100.0 % 25 25

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Michael Vandeberg
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/boostorg/capy
       9                 : //
      10                 : 
      11                 : #include <boost/capy/ex/thread_pool.hpp>
      12                 : #include <boost/capy/continuation.hpp>
      13                 : #include <boost/capy/ex/frame_allocator.hpp>
      14                 : #include <boost/capy/test/thread_name.hpp>
      15                 : #include <algorithm>
      16                 : #include <atomic>
      17                 : #include <condition_variable>
      18                 : #include <cstdio>
      19                 : #include <mutex>
      20                 : #include <thread>
      21                 : #include <vector>
      22                 : 
      23                 : /*
      24                 :     Thread pool implementation using a shared work queue.
      25                 : 
      26                 :     Work items are continuations linked via their intrusive next pointer,
      27                 :     stored in a single queue protected by a mutex. No per-post heap
      28                 :     allocation: the continuation is owned by the caller and linked
      29                 :     directly. Worker threads wait on a condition_variable until work
      30                 :     is available or stop is requested.
      31                 : 
      32                 :     Threads are started lazily on first post() via std::call_once to avoid
      33                 :     spawning threads for pools that are constructed but never used. Each
      34                 :     thread is named with a configurable prefix plus index for debugger
      35                 :     visibility.
      36                 : 
      37                 :     Work tracking: on_work_started/on_work_finished maintain an atomic
      38                 :     outstanding_work_ counter. join() blocks until this counter reaches
      39                 :     zero, then signals workers to stop and joins threads.
      40                 : 
      41                 :     Two shutdown paths:
      42                 :     - join(): waits for outstanding work to drain, then stops workers.
      43                 :     - stop(): immediately signals workers to exit; queued work is abandoned.
      44                 :     - Destructor: stop() then join() (abandon + wait for threads).
      45                 : */
      46                 : 
      47                 : namespace boost {
      48                 : namespace capy {
      49                 : 
      50                 : //------------------------------------------------------------------------------
      51                 : 
      52                 : class thread_pool::impl
      53                 : {
      54                 :     // Intrusive queue of continuations via continuation::next.
      55                 :     // No per-post allocation: the continuation is owned by the caller.
      56                 :     continuation* head_ = nullptr;
      57                 :     continuation* tail_ = nullptr;
      58                 : 
      59 HIT       19817 :     void push(continuation* c) noexcept
      60                 :     {
      61           19817 :         c->next = nullptr;
      62           19817 :         if(tail_)
      63            1704 :             tail_->next = c;
      64                 :         else
      65           18113 :             head_ = c;
      66           19817 :         tail_ = c;
      67           19817 :     }
      68                 : 
      69           19980 :     continuation* pop() noexcept
      70                 :     {
      71           19980 :         if(!head_)
      72             163 :             return nullptr;
      73           19817 :         continuation* c = head_;
      74           19817 :         head_ = head_->next;
      75           19817 :         if(!head_)
      76           18113 :             tail_ = nullptr;
      77           19817 :         return c;
      78                 :     }
      79                 : 
      80           38553 :     bool empty() const noexcept
      81                 :     {
      82           38553 :         return head_ == nullptr;
      83                 :     }
      84                 : 
      85                 :     std::mutex mutex_;
      86                 :     std::condition_variable work_cv_;
      87                 :     std::condition_variable done_cv_;
      88                 :     std::vector<std::thread> threads_;
      89                 :     std::atomic<std::size_t> outstanding_work_{0};
      90                 :     bool stop_{false};
      91                 :     bool joined_{false};
      92                 :     std::size_t num_threads_;
      93                 :     char thread_name_prefix_[13]{};  // 12 chars max + null terminator
      94                 :     std::once_flag start_flag_;
      95                 : 
      96                 : public:
      97             163 :     ~impl() = default;
      98                 : 
      99                 :     // Destroy abandoned coroutine frames. Must be called
     100                 :     // before execution_context::shutdown()/destroy() so
     101                 :     // that suspended-frame destructors (e.g. delay_awaitable
     102                 :     // calling timer_service::cancel()) run while services
     103                 :     // are still valid.
     104                 :     void
     105             163 :     drain_abandoned() noexcept
     106                 :     {
     107             389 :         while(auto* c = pop())
     108                 :         {
     109             226 :             auto h = c->h;
     110             226 :             if(h && h != std::noop_coroutine())
     111             174 :                 h.destroy();
     112             226 :         }
     113             163 :     }
     114                 : 
     115             163 :     impl(std::size_t num_threads, std::string_view thread_name_prefix)
     116             163 :         : num_threads_(num_threads)
     117                 :     {
     118             163 :         if(num_threads_ == 0)
     119               4 :             num_threads_ = std::max(
     120               2 :                 std::thread::hardware_concurrency(), 1u);
     121                 : 
     122                 :         // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
     123             163 :         auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
     124             163 :         thread_name_prefix_[n] = '\0';
     125             163 :     }
     126                 : 
     127                 :     void
     128           19817 :     post(continuation& c)
     129                 :     {
     130           19817 :         ensure_started();
     131                 :         {
     132           19817 :             std::lock_guard<std::mutex> lock(mutex_);
     133           19817 :             push(&c);
     134           19817 :         }
     135           19817 :         work_cv_.notify_one();
     136           19817 :     }
     137                 : 
     138                 :     void
     139             345 :     on_work_started() noexcept
     140                 :     {
     141             345 :         outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
     142             345 :     }
     143                 : 
     144                 :     void
     145             345 :     on_work_finished() noexcept
     146                 :     {
     147             345 :         if(outstanding_work_.fetch_sub(
     148             345 :             1, std::memory_order_acq_rel) == 1)
     149                 :         {
     150              85 :             std::lock_guard<std::mutex> lock(mutex_);
     151              85 :             if(joined_ && !stop_)
     152               4 :                 stop_ = true;
     153              85 :             done_cv_.notify_all();
     154              85 :             work_cv_.notify_all();
     155              85 :         }
     156             345 :     }
     157                 : 
     158                 :     void
     159             175 :     join() noexcept
     160                 :     {
     161                 :         {
     162             175 :             std::unique_lock<std::mutex> lock(mutex_);
     163             175 :             if(joined_)
     164              12 :                 return;
     165             163 :             joined_ = true;
     166                 : 
     167             163 :             if(outstanding_work_.load(
     168             163 :                 std::memory_order_acquire) == 0)
     169                 :             {
     170             106 :                 stop_ = true;
     171             106 :                 work_cv_.notify_all();
     172                 :             }
     173                 :             else
     174                 :             {
     175              57 :                 done_cv_.wait(lock, [this]{
     176              62 :                     return stop_;
     177                 :                 });
     178                 :             }
     179             175 :         }
     180                 : 
     181             353 :         for(auto& t : threads_)
     182             190 :             if(t.joinable())
     183             190 :                 t.join();
     184                 :     }
     185                 : 
     186                 :     void
     187             165 :     stop() noexcept
     188                 :     {
     189                 :         {
     190             165 :             std::lock_guard<std::mutex> lock(mutex_);
     191             165 :             stop_ = true;
     192             165 :         }
     193             165 :         work_cv_.notify_all();
     194             165 :         done_cv_.notify_all();
     195             165 :     }
     196                 : 
     197                 : private:
     198                 :     void
     199           19817 :     ensure_started()
     200                 :     {
     201           19817 :         std::call_once(start_flag_, [this]{
     202             106 :             threads_.reserve(num_threads_);
     203             296 :             for(std::size_t i = 0; i < num_threads_; ++i)
     204             380 :                 threads_.emplace_back([this, i]{ run(i); });
     205             106 :         });
     206           19817 :     }
     207                 : 
     208                 :     void
     209             190 :     run(std::size_t index)
     210                 :     {
     211                 :         // Build name; set_current_thread_name truncates to platform limits.
     212                 :         char name[16];
     213             190 :         std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
     214             190 :         set_current_thread_name(name);
     215                 : 
     216                 :         for(;;)
     217                 :         {
     218           19781 :             continuation* c = nullptr;
     219                 :             {
     220           19781 :                 std::unique_lock<std::mutex> lock(mutex_);
     221           19781 :                 work_cv_.wait(lock, [this]{
     222           57415 :                     return !empty() ||
     223           57415 :                         stop_;
     224                 :                 });
     225           19781 :                 if(stop_)
     226             380 :                     return;
     227           19591 :                 c = pop();
     228           19781 :             }
     229           19591 :             if(c)
     230           19591 :                 safe_resume(c->h);
     231           19591 :         }
     232                 :     }
     233                 : };
     234                 : 
     235                 : //------------------------------------------------------------------------------
     236                 : 
     237             163 : thread_pool::
     238                 : ~thread_pool()
     239                 : {
     240             163 :     impl_->stop();
     241             163 :     impl_->join();
     242             163 :     impl_->drain_abandoned();
     243             163 :     shutdown();
     244             163 :     destroy();
     245             163 :     delete impl_;
     246             163 : }
     247                 : 
     248             163 : thread_pool::
     249             163 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
     250             163 :     : impl_(new impl(num_threads, thread_name_prefix))
     251                 : {
     252             163 :     this->set_frame_allocator(std::allocator<void>{});
     253             163 : }
     254                 : 
     255                 : void
     256              12 : thread_pool::
     257                 : join() noexcept
     258                 : {
     259              12 :     impl_->join();
     260              12 : }
     261                 : 
     262                 : void
     263               2 : thread_pool::
     264                 : stop() noexcept
     265                 : {
     266               2 :     impl_->stop();
     267               2 : }
     268                 : 
     269                 : //------------------------------------------------------------------------------
     270                 : 
     271                 : thread_pool::executor_type
     272           11577 : thread_pool::
     273                 : get_executor() const noexcept
     274                 : {
     275           11577 :     return executor_type(
     276           11577 :         const_cast<thread_pool&>(*this));
     277                 : }
     278                 : 
     279                 : void
     280             345 : thread_pool::executor_type::
     281                 : on_work_started() const noexcept
     282                 : {
     283             345 :     pool_->impl_->on_work_started();
     284             345 : }
     285                 : 
     286                 : void
     287             345 : thread_pool::executor_type::
     288                 : on_work_finished() const noexcept
     289                 : {
     290             345 :     pool_->impl_->on_work_finished();
     291             345 : }
     292                 : 
     293                 : void
     294           19817 : thread_pool::executor_type::
     295                 : post(continuation& c) const
     296                 : {
     297           19817 :     pool_->impl_->post(c);
     298           19817 : }
     299                 : 
     300                 : } // capy
     301                 : } // boost
        

Generated by: LCOV version 2.3