src/ex/detail/strand_service.cpp
95.6% Lines (109/114)
95.8% List of functions (23/24)
Functions (24)
Function
Calls
Lines
Blocks
boost::capy::detail::strand_service_impl::strand_service_impl(boost::capy::execution_context&)
:47
30x
100.0%
100.0%
boost::capy::detail::strand_service_impl::create_implementation()
:52
11442x
100.0%
80.0%
boost::capy::detail::strand_service_impl::enqueue(boost::capy::detail::strand_impl&, std::__n4861::coroutine_handle<void>)
:74
30340x
100.0%
82.0%
boost::capy::detail::strand_service_impl::dispatch_pending(boost::capy::detail::strand_impl&)
:87
19855x
100.0%
86.0%
boost::capy::detail::strand_service_impl::try_unlock(boost::capy::detail::strand_impl&)
:98
19855x
100.0%
100.0%
boost::capy::detail::strand_service_impl::set_dispatch_thread(boost::capy::detail::strand_impl&)
:110
19855x
100.0%
100.0%
boost::capy::detail::strand_service_impl::clear_dispatch_thread(boost::capy::detail::strand_impl&)
:116
19002x
100.0%
100.0%
boost::capy::detail::strand_service_impl::shutdown()
:127
30x
60.0%
59.0%
boost::capy::detail::strand_invoker::promise_type::operator new(unsigned long, std::shared_ptr<boost::capy::detail::strand_impl> const&)
:164
19002x
100.0%
100.0%
boost::capy::detail::strand_invoker::promise_type::operator delete(void*, unsigned long)
:184
19002x
100.0%
100.0%
boost::capy::detail::strand_invoker::promise_type::get_return_object()
:198
19002x
100.0%
100.0%
boost::capy::detail::strand_invoker::promise_type::initial_suspend()
:203
19002x
100.0%
100.0%
boost::capy::detail::strand_invoker::promise_type::final_suspend()
:204
19002x
100.0%
100.0%
boost::capy::detail::strand_invoker::promise_type::return_void()
:205
19002x
100.0%
100.0%
boost::capy::detail::strand_invoker::promise_type::unhandled_exception()
:206
0
0.0%
0.0%
boost::capy::detail::make_invoker(std::shared_ptr<boost::capy::detail::strand_impl>)
:217
19002x
100.0%
47.0%
boost::capy::detail::strand_service_impl::post_invoker(std::shared_ptr<boost::capy::detail::strand_impl>, boost::capy::executor_ref)
:233
19002x
100.0%
75.0%
boost::capy::detail::strand_impl::~strand_impl()
:243
11442x
100.0%
77.0%
boost::capy::detail::strand_service::strand_service()
:251
30x
100.0%
100.0%
boost::capy::detail::strand_service::~strand_service()
:257
30x
100.0%
100.0%
boost::capy::detail::strand_service::running_in_this_thread(boost::capy::detail::strand_impl&)
:261
12x
100.0%
100.0%
boost::capy::detail::strand_service::dispatch(std::shared_ptr<boost::capy::detail::strand_impl> const&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>)
:268
8x
100.0%
89.0%
boost::capy::detail::strand_service::post(std::shared_ptr<boost::capy::detail::strand_impl> const&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>)
:283
30335x
100.0%
82.0%
boost::capy::detail::get_strand_service(boost::capy::execution_context&)
:294
11442x
100.0%
100.0%
| Line | TLA | Hits | Source Code |
|---|---|---|---|
| 1 | // | ||
| 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | ||
| 3 | // | ||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
| 6 | // | ||
| 7 | // Official repository: https://github.com/cppalliance/capy | ||
| 8 | // | ||
| 9 | |||
| 10 | #include "src/ex/detail/strand_impl.hpp" | ||
| 11 | #include <boost/capy/ex/detail/strand_service.hpp> | ||
| 12 | #include <boost/capy/continuation.hpp> | ||
| 13 | #include <coroutine> | ||
| 14 | #include <memory> | ||
| 15 | #include <utility> | ||
| 16 | |||
| 17 | namespace boost { | ||
| 18 | namespace capy { | ||
| 19 | namespace detail { | ||
| 20 | |||
| 21 | // Sentinel stored in invoker_frame_cache_ after shutdown to prevent | ||
| 22 | // in-flight invokers from repopulating a freed cache slot. | ||
| 23 | inline void* const kCacheClosed = reinterpret_cast<void*>(1); | ||
| 24 | |||
| 25 | /** Concrete strand_service. | ||
| 26 | |||
| 27 | Holds a shared mutex pool (193 entries), a linked list of live | ||
| 28 | impls (for shutdown traversal), and a single-slot invoker | ||
| 29 | coroutine frame cache shared across all strands of this service. | ||
| 30 | |||
| 31 | The dispatch helpers (`enqueue`, `dispatch_pending`, etc.) are | ||
| 32 | public so the namespace-scope `make_invoker` coroutine and the | ||
| 33 | `strand_service` static methods can call them without friendship. | ||
| 34 | */ | ||
| 35 | class strand_service_impl : public strand_service | ||
| 36 | { | ||
| 37 | public: | ||
| 38 | static constexpr std::size_t num_mutexes = 193; | ||
| 39 | |||
| 40 | std::mutex mutex_; | ||
| 41 | std::size_t salt_ = 0; | ||
| 42 | std::shared_ptr<std::mutex> mutexes_[num_mutexes]; | ||
| 43 | intrusive_list<strand_impl> impl_list_; | ||
| 44 | std::atomic<void*> invoker_frame_cache_{nullptr}; | ||
| 45 | |||
| 46 | explicit | ||
| 47 | 30x | strand_service_impl(execution_context&) | |
| 48 | 30x | { | |
| 49 | 30x | } | |
| 50 | |||
| 51 | std::shared_ptr<strand_impl> | ||
| 52 | 11442x | create_implementation() override | |
| 53 | { | ||
| 54 | 11442x | auto new_impl = std::make_shared<strand_impl>(); | |
| 55 | |||
| 56 | 11442x | std::lock_guard<std::mutex> lock(mutex_); | |
| 57 | |||
| 58 | 11442x | std::size_t s = salt_++; | |
| 59 | 11442x | std::size_t idx = reinterpret_cast<std::size_t>(new_impl.get()); | |
| 60 | 11442x | idx += idx >> 3; | |
| 61 | 11442x | idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2); | |
| 62 | 11442x | idx %= num_mutexes; | |
| 63 | 11442x | if(!mutexes_[idx]) | |
| 64 | 666x | mutexes_[idx] = std::make_shared<std::mutex>(); | |
| 65 | 11442x | new_impl->mutex_ = mutexes_[idx].get(); | |
| 66 | |||
| 67 | 11442x | impl_list_.push_back(new_impl.get()); | |
| 68 | 11442x | new_impl->service_.store(this, std::memory_order_release); | |
| 69 | |||
| 70 | 22884x | return new_impl; | |
| 71 | 11442x | } | |
| 72 | |||
| 73 | static bool | ||
| 74 | 30340x | enqueue(strand_impl& impl, std::coroutine_handle<> h) | |
| 75 | { | ||
| 76 | 30340x | std::lock_guard<std::mutex> lock(*impl.mutex_); | |
| 77 | 30340x | impl.pending_.push(h); | |
| 78 | 30340x | if(!impl.locked_) | |
| 79 | { | ||
| 80 | 19002x | impl.locked_ = true; | |
| 81 | 19002x | return true; | |
| 82 | } | ||
| 83 | 11338x | return false; | |
| 84 | 30340x | } | |
| 85 | |||
| 86 | static void | ||
| 87 | 19855x | dispatch_pending(strand_impl& impl) | |
| 88 | { | ||
| 89 | 19855x | strand_queue::taken_batch batch; | |
| 90 | { | ||
| 91 | 19855x | std::lock_guard<std::mutex> lock(*impl.mutex_); | |
| 92 | 19855x | batch = impl.pending_.take_all(); | |
| 93 | 19855x | } | |
| 94 | 19855x | impl.pending_.dispatch_batch(batch); | |
| 95 | 19855x | } | |
| 96 | |||
| 97 | static bool | ||
| 98 | 19855x | try_unlock(strand_impl& impl) | |
| 99 | { | ||
| 100 | 19855x | std::lock_guard<std::mutex> lock(*impl.mutex_); | |
| 101 | 19855x | if(impl.pending_.empty()) | |
| 102 | { | ||
| 103 | 19002x | impl.locked_ = false; | |
| 104 | 19002x | return true; | |
| 105 | } | ||
| 106 | 853x | return false; | |
| 107 | 19855x | } | |
| 108 | |||
| 109 | static void | ||
| 110 | 19855x | set_dispatch_thread(strand_impl& impl) noexcept | |
| 111 | { | ||
| 112 | 19855x | impl.dispatch_thread_.store(std::this_thread::get_id()); | |
| 113 | 19855x | } | |
| 114 | |||
| 115 | static void | ||
| 116 | 19002x | clear_dispatch_thread(strand_impl& impl) noexcept | |
| 117 | { | ||
| 118 | 19002x | impl.dispatch_thread_.store(std::thread::id{}); | |
| 119 | 19002x | } | |
| 120 | |||
| 121 | // Defined below; needs strand_invoker complete. | ||
| 122 | static void | ||
| 123 | post_invoker(std::shared_ptr<strand_impl> impl, executor_ref ex); | ||
| 124 | |||
| 125 | protected: | ||
| 126 | void | ||
| 127 | 30x | shutdown() override | |
| 128 | { | ||
| 129 | 30x | std::lock_guard<std::mutex> lock(mutex_); | |
| 130 | 30x | while(auto* p = impl_list_.pop_front()) | |
| 131 | { | ||
| 132 | ✗ | std::lock_guard<std::mutex> impl_lock(*p->mutex_); | |
| 133 | ✗ | p->locked_ = true; | |
| 134 | ✗ | p->service_.store(nullptr, std::memory_order_release); | |
| 135 | ✗ | } | |
| 136 | |||
| 137 | 30x | void* fp = invoker_frame_cache_.exchange( | |
| 138 | kCacheClosed, std::memory_order_acq_rel); | ||
| 139 | 30x | if(fp) ::operator delete(fp); | |
| 140 | 30x | } | |
| 141 | }; | ||
| 142 | |||
| 143 | /** Invoker coroutine that drains a strand's pending queue. | ||
| 144 | |||
| 145 | Runs once the strand transitions from unlocked to locked. Holds | ||
| 146 | the impl alive via the coroutine parameter (a shared_ptr in the | ||
| 147 | coroutine frame), so user code may drop its strand handle while | ||
| 148 | the invoker is mid-flight. | ||
| 149 | |||
| 150 | The frame's allocator recycles a single per-service slot. The | ||
| 151 | trailer points at the service (lifetime: execution_context), | ||
| 152 | NOT the impl (lifetime: per-strand), so operator delete is | ||
| 153 | safe even after the impl has been destroyed. | ||
| 154 | */ | ||
| 155 | struct strand_invoker | ||
| 156 | { | ||
| 157 | struct promise_type | ||
| 158 | { | ||
| 159 | // Stored in the coroutine frame so its address is stable for | ||
| 160 | // posting to the inner executor. | ||
| 161 | continuation self_; | ||
| 162 | |||
| 163 | void* | ||
| 164 | 19002x | operator new( | |
| 165 | std::size_t n, | ||
| 166 | std::shared_ptr<strand_impl> const& impl) | ||
| 167 | { | ||
| 168 | 19002x | auto* svc = impl->service_.load(std::memory_order_acquire); | |
| 169 | 19002x | constexpr auto A = alignof(strand_service_impl*); | |
| 170 | 19002x | std::size_t padded = (n + A - 1) & ~(A - 1); | |
| 171 | 19002x | std::size_t total = padded + sizeof(strand_service_impl*); | |
| 172 | |||
| 173 | 19002x | void* p = svc->invoker_frame_cache_.exchange( | |
| 174 | nullptr, std::memory_order_acquire); | ||
| 175 | 19002x | if(!p || p == kCacheClosed) | |
| 176 | 8720x | p = ::operator new(total); | |
| 177 | |||
| 178 | 19002x | *reinterpret_cast<strand_service_impl**>( | |
| 179 | 19002x | static_cast<char*>(p) + padded) = svc; | |
| 180 | 19002x | return p; | |
| 181 | } | ||
| 182 | |||
| 183 | void | ||
| 184 | 19002x | operator delete(void* p, std::size_t n) noexcept | |
| 185 | { | ||
| 186 | 19002x | constexpr auto A = alignof(strand_service_impl*); | |
| 187 | 19002x | std::size_t padded = (n + A - 1) & ~(A - 1); | |
| 188 | 19002x | auto* svc = *reinterpret_cast<strand_service_impl**>( | |
| 189 | static_cast<char*>(p) + padded); | ||
| 190 | |||
| 191 | 19002x | void* expected = nullptr; | |
| 192 | 19002x | if(!svc->invoker_frame_cache_.compare_exchange_strong( | |
| 193 | expected, p, std::memory_order_release)) | ||
| 194 | 8703x | ::operator delete(p); | |
| 195 | 19002x | } | |
| 196 | |||
| 197 | strand_invoker | ||
| 198 | 19002x | get_return_object() noexcept | |
| 199 | { | ||
| 200 | 19002x | return {std::coroutine_handle<promise_type>::from_promise(*this)}; | |
| 201 | } | ||
| 202 | |||
| 203 | 19002x | std::suspend_always initial_suspend() noexcept { return {}; } | |
| 204 | 19002x | std::suspend_never final_suspend() noexcept { return {}; } | |
| 205 | 19002x | void return_void() noexcept {} | |
| 206 | ✗ | void unhandled_exception() { std::terminate(); } | |
| 207 | }; | ||
| 208 | |||
| 209 | std::coroutine_handle<promise_type> h_; | ||
| 210 | }; | ||
| 211 | |||
| 212 | // The by-value parameter lives in the coroutine frame for the | ||
| 213 | // invoker's lifetime, keeping the impl alive past any user-side | ||
| 214 | // strand drop. | ||
| 215 | static | ||
| 216 | strand_invoker | ||
| 217 | 19002x | make_invoker(std::shared_ptr<strand_impl> impl) | |
| 218 | { | ||
| 219 | auto* p = impl.get(); | ||
| 220 | for(;;) | ||
| 221 | { | ||
| 222 | strand_service_impl::set_dispatch_thread(*p); | ||
| 223 | strand_service_impl::dispatch_pending(*p); | ||
| 224 | if(strand_service_impl::try_unlock(*p)) | ||
| 225 | { | ||
| 226 | strand_service_impl::clear_dispatch_thread(*p); | ||
| 227 | co_return; | ||
| 228 | } | ||
| 229 | } | ||
| 230 | 38004x | } | |
| 231 | |||
| 232 | void | ||
| 233 | 19002x | strand_service_impl::post_invoker( | |
| 234 | std::shared_ptr<strand_impl> impl, | ||
| 235 | executor_ref ex) | ||
| 236 | { | ||
| 237 | 19002x | auto invoker = make_invoker(std::move(impl)); | |
| 238 | 19002x | auto& self = invoker.h_.promise().self_; | |
| 239 | 19002x | self.h = invoker.h_; | |
| 240 | 19002x | ex.post(self); | |
| 241 | 19002x | } | |
| 242 | |||
| 243 | 22884x | strand_impl::~strand_impl() | |
| 244 | { | ||
| 245 | 11442x | auto* svc = service_.load(std::memory_order_acquire); | |
| 246 | 11442x | if(!svc) return; | |
| 247 | 11442x | std::lock_guard<std::mutex> lock(svc->mutex_); | |
| 248 | 11442x | svc->impl_list_.remove(this); | |
| 249 | 11442x | } | |
| 250 | |||
| 251 | 30x | strand_service:: | |
| 252 | 30x | strand_service() | |
| 253 | 30x | : service() | |
| 254 | { | ||
| 255 | 30x | } | |
| 256 | |||
| 257 | 30x | strand_service:: | |
| 258 | ~strand_service() = default; | ||
| 259 | |||
| 260 | bool | ||
| 261 | 12x | strand_service:: | |
| 262 | running_in_this_thread(strand_impl& impl) noexcept | ||
| 263 | { | ||
| 264 | 12x | return impl.dispatch_thread_.load() == std::this_thread::get_id(); | |
| 265 | } | ||
| 266 | |||
| 267 | std::coroutine_handle<> | ||
| 268 | 8x | strand_service:: | |
| 269 | dispatch( | ||
| 270 | std::shared_ptr<strand_impl> const& impl, | ||
| 271 | executor_ref ex, | ||
| 272 | std::coroutine_handle<> h) | ||
| 273 | { | ||
| 274 | 8x | if(running_in_this_thread(*impl)) | |
| 275 | 3x | return h; | |
| 276 | |||
| 277 | 5x | if(strand_service_impl::enqueue(*impl, h)) | |
| 278 | 5x | strand_service_impl::post_invoker(impl, ex); | |
| 279 | 5x | return std::noop_coroutine(); | |
| 280 | } | ||
| 281 | |||
| 282 | void | ||
| 283 | 30335x | strand_service:: | |
| 284 | post( | ||
| 285 | std::shared_ptr<strand_impl> const& impl, | ||
| 286 | executor_ref ex, | ||
| 287 | std::coroutine_handle<> h) | ||
| 288 | { | ||
| 289 | 30335x | if(strand_service_impl::enqueue(*impl, h)) | |
| 290 | 18997x | strand_service_impl::post_invoker(impl, ex); | |
| 291 | 30335x | } | |
| 292 | |||
| 293 | strand_service& | ||
| 294 | 11442x | get_strand_service(execution_context& ctx) | |
| 295 | { | ||
| 296 | 11442x | return ctx.use_service<strand_service_impl>(); | |
| 297 | } | ||
| 298 | |||
| 299 | } // namespace detail | ||
| 300 | } // namespace capy | ||
| 301 | } // namespace boost | ||
| 302 |