95.61% Lines (109/114) 95.83% Functions (23/24)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10 - #include "src/ex/detail/strand_queue.hpp" 10 + #include "src/ex/detail/strand_impl.hpp"
11   #include <boost/capy/ex/detail/strand_service.hpp> 11   #include <boost/capy/ex/detail/strand_service.hpp>
12 - #include <atomic>  
13   #include <boost/capy/continuation.hpp> 12   #include <boost/capy/continuation.hpp>
14   #include <coroutine> 13   #include <coroutine>
15 - #include <mutex> 14 + #include <memory>
16 - #include <thread>  
17   #include <utility> 15   #include <utility>
18   16  
19   namespace boost { 17   namespace boost {
20   namespace capy { 18   namespace capy {
21   namespace detail { 19   namespace detail {
22   20  
23 - //---------------------------------------------------------- 21 + // Sentinel stored in invoker_frame_cache_ after shutdown to prevent
24 -  
25 - /** Implementation state for a strand.  
26 -  
27 - Each strand_impl provides serialization for coroutines  
28 - dispatched through strands that share it.  
29 - */  
30 - // Sentinel stored in cached_frame_ after shutdown to prevent  
31   // in-flight invokers from repopulating a freed cache slot. 22   // in-flight invokers from repopulating a freed cache slot.
32   inline void* const kCacheClosed = reinterpret_cast<void*>(1); 23   inline void* const kCacheClosed = reinterpret_cast<void*>(1);
33   24  
34 - struct strand_impl 25 + /** Concrete strand_service.
35 - {  
36 - std::mutex mutex_;  
37 - strand_queue pending_;  
38 - bool locked_ = false;  
39 - std::atomic<std::thread::id> dispatch_thread_{};  
40 - std::atomic<void*> cached_frame_{nullptr};  
41 - };  
42 -  
43 - //----------------------------------------------------------  
44 -  
45 - /** Invoker coroutine for strand dispatch.  
46 -  
47 - Uses custom allocator to recycle frame - one allocation  
48 - per strand_impl lifetime, stored in trailer for recovery.  
49 - */  
50 - struct strand_invoker  
51 - {  
52 - struct promise_type  
53 - {  
54 - // Used to post the invoker through the inner executor.  
55 - // Lives in the coroutine frame (heap-allocated), so has  
56 - // a stable address for the duration of the queue residency.  
57 - continuation self_;  
58 -  
DCB 59 - 16 void* operator new(std::size_t n, strand_impl& impl)  
60 - {  
DCB 61 - 16 constexpr auto A = alignof(strand_impl*);  
DCB 62 - 16 std::size_t padded = (n + A - 1) & ~(A - 1);  
DCB 63 - 16 std::size_t total = padded + sizeof(strand_impl*);  
64 -  
DCB 65 - 16 void* p = impl.cached_frame_.exchange(  
66 - nullptr, std::memory_order_acquire);  
DCB 67 - 16 if(!p || p == kCacheClosed)  
DCB 68 - 13 p = ::operator new(total);  
69 -  
70 - // Trailer lets delete recover impl  
DCB 71 - 16 *reinterpret_cast<strand_impl**>(  
DCB 72 - 16 static_cast<char*>(p) + padded) = &impl;  
DCB 73 - 16 return p;  
74 - }  
75 -  
DCB 76 - 16 void operator delete(void* p, std::size_t n) noexcept  
77 - {  
DCB 78 - 16 constexpr auto A = alignof(strand_impl*);  
DCB 79 - 16 std::size_t padded = (n + A - 1) & ~(A - 1);  
80 -  
DCB 81 - 16 auto* impl = *reinterpret_cast<strand_impl**>(  
82 - static_cast<char*>(p) + padded);  
83 -  
DCB 84 - 16 void* expected = nullptr;  
DCB 85 - 16 if(!impl->cached_frame_.compare_exchange_strong(  
86 - expected, p, std::memory_order_release))  
DUB 87 - ::operator delete(p);  
DCB 88 - 16 }  
89 -  
DCB 90 - 16 strand_invoker get_return_object() noexcept  
DCB 91 - 16 { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }  
92 -  
DCB 93 - 16 std::suspend_always initial_suspend() noexcept { return {}; }  
DCB 94 - 16 std::suspend_never final_suspend() noexcept { return {}; }  
DCB 95 - 16 void return_void() noexcept {}  
DUB 96 - void unhandled_exception() { std::terminate(); }  
97 - };  
98 -  
99 - std::coroutine_handle<promise_type> h_;  
100 - };  
101 -  
102 - //----------------------------------------------------------  
103   26  
104 - /** Concrete implementation of strand_service. 27 + Holds a shared mutex pool (193 entries), a linked list of live
  28 + impls (for shutdown traversal), and a single-slot invoker
  29 + coroutine frame cache shared across all strands of this service.
105   30  
106 - Holds the fixed pool of strand_impl objects. 31 + The dispatch helpers (`enqueue`, `dispatch_pending`, etc.) are
  32 + public so the namespace-scope `make_invoker` coroutine and the
  33 + `strand_service` static methods can call them without friendship.
107   */ 34   */
108   class strand_service_impl : public strand_service 35   class strand_service_impl : public strand_service
109   { 36   {
110 - static constexpr std::size_t num_impls = 211; 37 + public:
  38 + static constexpr std::size_t num_mutexes = 193;
111 - strand_impl impls_[num_impls];  
112 - std::size_t salt_ = 0;  
113   39  
114   std::mutex mutex_; 40   std::mutex mutex_;
  41 + std::size_t salt_ = 0;
  42 + std::shared_ptr<std::mutex> mutexes_[num_mutexes];
  43 + intrusive_list<strand_impl> impl_list_;
  44 + std::atomic<void*> invoker_frame_cache_{nullptr};
115 - public:  
116   45  
117   explicit 46   explicit
HITCBC 118   25 strand_service_impl(execution_context&) 47   30 strand_service_impl(execution_context&)
HITCBC 119   5300 { 48   30 {
HITCBC 120   25 } 49   30 }
121   50  
122 - strand_impl* 51 + std::shared_ptr<strand_impl>
HITCBC 123 - 29 get_implementation() override 52 + 11442 create_implementation() override
124   { 53   {
HITGNC   54 + 11442 auto new_impl = std::make_shared<strand_impl>();
  55 +
DCB 125 - 29 std::size_t index = salt_++;  
DCB 126 - 29 index = index % num_impls;  
DCB 127 - 29 return &impls_[index];  
DCB 128 - 29 }  
HITCBC 129   29 std::lock_guard<std::mutex> lock(mutex_); 56   11442 std::lock_guard<std::mutex> lock(mutex_);
130   57  
HITGIC 131 - protected: 58 + 11442 std::size_t s = salt_++;
HITGIC 132 - void 59 + 11442 std::size_t idx = reinterpret_cast<std::size_t>(new_impl.get());
HITCBC 133 - 25 shutdown() override 60 + 11442 idx += idx >> 3;
HITGIC 134 - { 61 + 11442 idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2);
HITCBC 135 - 5300 for(std::size_t i = 0; i < num_impls; ++i) 62 + 11442 idx %= num_mutexes;
HITGIC 136 - { 63 + 11442 if(!mutexes_[idx])
HITCBC 137 - 5275 std::lock_guard<std::mutex> lock(impls_[i].mutex_); 64 + 666 mutexes_[idx] = std::make_shared<std::mutex>();
HITCBC 138 - 5275 impls_[i].locked_ = true; 65 + 11442 new_impl->mutex_ = mutexes_[idx].get();
139   66  
HITCBC 140 - 5275 void* p = impls_[i].cached_frame_.exchange( 67 + 11442 impl_list_.push_back(new_impl.get());
HITGIC 141 - kCacheClosed, std::memory_order_acquire); 68 + 11442 new_impl->service_.store(this, std::memory_order_release);
ECB 142 - 5275 if(p) 69 +
HITCBC 143 - 13 ::operator delete(p); 70 + 22884 return new_impl;
DCB 144 - 5275 }  
HITCBC 145   25 } 71   11442 }
146 - private:  
147   72  
148   static bool 73   static bool
HITCBC 149   335 enqueue(strand_impl& impl, std::coroutine_handle<> h) 74   30340 enqueue(strand_impl& impl, std::coroutine_handle<> h)
150   { 75   {
HITCBC 151 - 335 std::lock_guard<std::mutex> lock(impl.mutex_); 76 + 30340 std::lock_guard<std::mutex> lock(*impl.mutex_);
HITCBC 152   335 impl.pending_.push(h); 77   30340 impl.pending_.push(h);
HITCBC 153   335 if(!impl.locked_) 78   30340 if(!impl.locked_)
154   { 79   {
HITCBC 155   16 impl.locked_ = true; 80   19002 impl.locked_ = true;
HITCBC 156   16 return true; 81   19002 return true;
157   } 82   }
HITCBC 158   319 return false; 83   11338 return false;
HITCBC 159   335 } 84   30340 }
160   85  
161   static void 86   static void
HITCBC 162   23 dispatch_pending(strand_impl& impl) 87   19855 dispatch_pending(strand_impl& impl)
163   { 88   {
HITCBC 164   23 strand_queue::taken_batch batch; 89   19855 strand_queue::taken_batch batch;
165   { 90   {
HITCBC 166 - 23 std::lock_guard<std::mutex> lock(impl.mutex_); 91 + 19855 std::lock_guard<std::mutex> lock(*impl.mutex_);
HITCBC 167   23 batch = impl.pending_.take_all(); 92   19855 batch = impl.pending_.take_all();
HITCBC 168   23 } 93   19855 }
HITCBC 169   23 impl.pending_.dispatch_batch(batch); 94   19855 impl.pending_.dispatch_batch(batch);
HITCBC 170   23 } 95   19855 }
171   96  
172   static bool 97   static bool
HITCBC 173   23 try_unlock(strand_impl& impl) 98   19855 try_unlock(strand_impl& impl)
174   { 99   {
HITCBC 175 - 23 std::lock_guard<std::mutex> lock(impl.mutex_); 100 + 19855 std::lock_guard<std::mutex> lock(*impl.mutex_);
HITCBC 176   23 if(impl.pending_.empty()) 101   19855 if(impl.pending_.empty())
177   { 102   {
HITCBC 178   16 impl.locked_ = false; 103   19002 impl.locked_ = false;
HITCBC 179   16 return true; 104   19002 return true;
180   } 105   }
HITCBC 181   7 return false; 106   853 return false;
HITCBC 182   23 } 107   19855 }
183   108  
184   static void 109   static void
HITCBC 185   23 set_dispatch_thread(strand_impl& impl) noexcept 110   19855 set_dispatch_thread(strand_impl& impl) noexcept
186   { 111   {
HITCBC 187   23 impl.dispatch_thread_.store(std::this_thread::get_id()); 112   19855 impl.dispatch_thread_.store(std::this_thread::get_id());
HITCBC 188   23 } 113   19855 }
189   114  
190   static void 115   static void
HITCBC 191   16 clear_dispatch_thread(strand_impl& impl) noexcept 116   19002 clear_dispatch_thread(strand_impl& impl) noexcept
192   { 117   {
HITCBC 193   16 impl.dispatch_thread_.store(std::thread::id{}); 118   19002 impl.dispatch_thread_.store(std::thread::id{});
HITCBC 194   16 } 119   19002 }
195   120  
196 - // Loops until queue empty (aggressive). Alternative: per-batch fairness 121 + // Defined below; needs strand_invoker complete.
197 - // (repost after each batch to let other work run) - explore if starvation observed. 122 + static void
198 - static strand_invoker 123 + post_invoker(std::shared_ptr<strand_impl> impl, executor_ref ex);
ECB 199 - 16 make_invoker(strand_impl& impl) 124 +
  125 + protected:
  126 + void
HITGNC   127 + 30 shutdown() override
200   { 128   {
HITGIC 201 - strand_impl* p = &impl; 129 + 30 std::lock_guard<std::mutex> lock(mutex_);
HITGIC 202 - for(;;) 130 + 30 while(auto* p = impl_list_.pop_front())
203   { 131   {
MISUIC 204 - set_dispatch_thread(*p); 132 + std::lock_guard<std::mutex> impl_lock(*p->mutex_);
MISUIC 205 - dispatch_pending(*p); 133 + p->locked_ = true;
MISUIC 206 - if(try_unlock(*p)) 134 + p->service_.store(nullptr, std::memory_order_release);
207 - {  
208 - clear_dispatch_thread(*p);  
209 - co_return;  
210 - }  
MISUIC 211   } 135   }
  136 +
HITGNC   137 + 30 void* fp = invoker_frame_cache_.exchange(
  138 + kCacheClosed, std::memory_order_acq_rel);
HITGNC   139 + 30 if(fp) ::operator delete(fp);
HITCBC 212   32 } 140   30 }
  141 + };
213   142  
214 - static void 143 + /** Invoker coroutine that drains a strand's pending queue.
ECB 215 - 16 post_invoker(strand_impl& impl, executor_ref ex) 144 +
  145 + Runs once the strand transitions from unlocked to locked. Holds
  146 + the impl alive via the coroutine parameter (a shared_ptr in the
  147 + coroutine frame), so user code may drop its strand handle while
  148 + the invoker is mid-flight.
  149 +
  150 + The frame's allocator recycles a single per-service slot. The
  151 + trailer points at the service (lifetime: execution_context),
  152 + NOT the impl (lifetime: per-strand), so operator delete is
  153 + safe even after the impl has been destroyed.
  154 + */
  155 + struct strand_invoker
  156 + {
  157 + struct promise_type
216   { 158   {
ECB 217 - 16 auto invoker = make_invoker(impl); 159 + // Stored in the coroutine frame so its address is stable for
ECB 218 - 16 auto& self = invoker.h_.promise().self_; 160 + // posting to the inner executor.
ECB 219 - 16 self.h = invoker.h_; 161 + continuation self_;
DCB 220 - 16 ex.post(self);  
DCB 221 - 16 }  
222   162  
223 - friend class strand_service; 163 + void*
HITGNC   164 + 19002 operator new(
  165 + std::size_t n,
  166 + std::shared_ptr<strand_impl> const& impl)
  167 + {
HITGNC   168 + 19002 auto* svc = impl->service_.load(std::memory_order_acquire);
HITGNC   169 + 19002 constexpr auto A = alignof(strand_service_impl*);
HITGNC   170 + 19002 std::size_t padded = (n + A - 1) & ~(A - 1);
HITGNC   171 + 19002 std::size_t total = padded + sizeof(strand_service_impl*);
  172 +
HITGNC   173 + 19002 void* p = svc->invoker_frame_cache_.exchange(
  174 + nullptr, std::memory_order_acquire);
HITGNC   175 + 19002 if(!p || p == kCacheClosed)
HITGNC   176 + 8720 p = ::operator new(total);
  177 +
HITGNC   178 + 19002 *reinterpret_cast<strand_service_impl**>(
HITGNC   179 + 19002 static_cast<char*>(p) + padded) = svc;
HITGNC   180 + 19002 return p;
  181 + }
  182 +
  183 + void
HITGNC   184 + 19002 operator delete(void* p, std::size_t n) noexcept
  185 + {
HITGNC   186 + 19002 constexpr auto A = alignof(strand_service_impl*);
HITGNC   187 + 19002 std::size_t padded = (n + A - 1) & ~(A - 1);
HITGNC   188 + 19002 auto* svc = *reinterpret_cast<strand_service_impl**>(
  189 + static_cast<char*>(p) + padded);
  190 +
HITGNC   191 + 19002 void* expected = nullptr;
HITGNC   192 + 19002 if(!svc->invoker_frame_cache_.compare_exchange_strong(
  193 + expected, p, std::memory_order_release))
HITGNC   194 + 8703 ::operator delete(p);
HITGNC   195 + 19002 }
  196 +
  197 + strand_invoker
HITGNC   198 + 19002 get_return_object() noexcept
  199 + {
HITGNC   200 + 19002 return {std::coroutine_handle<promise_type>::from_promise(*this)};
  201 + }
  202 +
HITGNC   203 + 19002 std::suspend_always initial_suspend() noexcept { return {}; }
HITGNC   204 + 19002 std::suspend_never final_suspend() noexcept { return {}; }
HITGNC   205 + 19002 void return_void() noexcept {}
MISUNC   206 + void unhandled_exception() { std::terminate(); }
  207 + };
  208 +
  209 + std::coroutine_handle<promise_type> h_;
224   }; 210   };
225   211  
226 - //---------------------------------------------------------- 212 + // The by-value parameter lives in the coroutine frame for the
  213 + // invoker's lifetime, keeping the impl alive past any user-side
  214 + // strand drop.
  215 + static
  216 + strand_invoker
HITGNC   217 + 19002 make_invoker(std::shared_ptr<strand_impl> impl)
  218 + {
  219 + auto* p = impl.get();
  220 + for(;;)
  221 + {
  222 + strand_service_impl::set_dispatch_thread(*p);
  223 + strand_service_impl::dispatch_pending(*p);
  224 + if(strand_service_impl::try_unlock(*p))
  225 + {
  226 + strand_service_impl::clear_dispatch_thread(*p);
  227 + co_return;
  228 + }
  229 + }
HITGNC   230 + 38004 }
  231 +
  232 + void
HITGNC   233 + 19002 strand_service_impl::post_invoker(
  234 + std::shared_ptr<strand_impl> impl,
  235 + executor_ref ex)
  236 + {
HITGNC   237 + 19002 auto invoker = make_invoker(std::move(impl));
HITGNC   238 + 19002 auto& self = invoker.h_.promise().self_;
HITGNC   239 + 19002 self.h = invoker.h_;
HITGNC   240 + 19002 ex.post(self);
HITGNC   241 + 19002 }
  242 +
HITGNC   243 + 22884 strand_impl::~strand_impl()
  244 + {
HITGNC   245 + 11442 auto* svc = service_.load(std::memory_order_acquire);
HITGNC   246 + 11442 if(!svc) return;
HITGNC   247 + 11442 std::lock_guard<std::mutex> lock(svc->mutex_);
HITGNC   248 + 11442 svc->impl_list_.remove(this);
HITGNC   249 + 11442 }
227   250  
HITCBC 228   25 strand_service:: 251   30 strand_service::
HITCBC 229   25 strand_service() 252   30 strand_service()
HITCBC 230   25 : service() 253   30 : service()
231   { 254   {
HITCBC 232   25 } 255   30 }
233   256  
HITCBC 234   25 strand_service:: 257   30 strand_service::
235   ~strand_service() = default; 258   ~strand_service() = default;
236   259  
237   bool 260   bool
HITCBC 238   12 strand_service:: 261   12 strand_service::
239   running_in_this_thread(strand_impl& impl) noexcept 262   running_in_this_thread(strand_impl& impl) noexcept
240   { 263   {
HITCBC 241   12 return impl.dispatch_thread_.load() == std::this_thread::get_id(); 264   12 return impl.dispatch_thread_.load() == std::this_thread::get_id();
242   } 265   }
243   266  
244   std::coroutine_handle<> 267   std::coroutine_handle<>
HITCBC 245   8 strand_service:: 268   8 strand_service::
246 - dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) 269 + dispatch(
  270 + std::shared_ptr<strand_impl> const& impl,
  271 + executor_ref ex,
  272 + std::coroutine_handle<> h)
247   { 273   {
HITCBC 248 - 8 if(running_in_this_thread(impl)) 274 + 8 if(running_in_this_thread(*impl))
HITCBC 249   3 return h; 275   3 return h;
250   276  
HITCBC 251 - 5 if(strand_service_impl::enqueue(impl, h)) 277 + 5 if(strand_service_impl::enqueue(*impl, h))
HITCBC 252   5 strand_service_impl::post_invoker(impl, ex); 278   5 strand_service_impl::post_invoker(impl, ex);
HITCBC 253   5 return std::noop_coroutine(); 279   5 return std::noop_coroutine();
254   } 280   }
255   281  
256   void 282   void
HITCBC 257   330 strand_service:: 283   30335 strand_service::
258 - post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) 284 + post(
  285 + std::shared_ptr<strand_impl> const& impl,
  286 + executor_ref ex,
  287 + std::coroutine_handle<> h)
259   { 288   {
HITCBC 260 - 330 if(strand_service_impl::enqueue(impl, h)) 289 + 30335 if(strand_service_impl::enqueue(*impl, h))
HITCBC 261   11 strand_service_impl::post_invoker(impl, ex); 290   18997 strand_service_impl::post_invoker(impl, ex);
HITCBC 262   330 } 291   30335 }
263   292  
264   strand_service& 293   strand_service&
HITCBC 265   29 get_strand_service(execution_context& ctx) 294   11442 get_strand_service(execution_context& ctx)
266   { 295   {
HITCBC 267   29 return ctx.use_service<strand_service_impl>(); 296   11442 return ctx.use_service<strand_service_impl>();
268   } 297   }
269   298  
270   } // namespace detail 299   } // namespace detail
271   } // namespace capy 300   } // namespace capy
272   } // namespace boost 301   } // namespace boost