src/ex/detail/strand_service.cpp

95.6% Lines (109/114) 95.8% List of functions (23/24)
strand_service.cpp
f(x) Functions (24)
Function Calls Lines Blocks
boost::capy::detail::strand_service_impl::strand_service_impl(boost::capy::execution_context&) :47 30x 100.0% 100.0% boost::capy::detail::strand_service_impl::create_implementation() :52 11442x 100.0% 80.0% boost::capy::detail::strand_service_impl::enqueue(boost::capy::detail::strand_impl&, std::__n4861::coroutine_handle<void>) :74 30340x 100.0% 82.0% boost::capy::detail::strand_service_impl::dispatch_pending(boost::capy::detail::strand_impl&) :87 19855x 100.0% 86.0% boost::capy::detail::strand_service_impl::try_unlock(boost::capy::detail::strand_impl&) :98 19855x 100.0% 100.0% boost::capy::detail::strand_service_impl::set_dispatch_thread(boost::capy::detail::strand_impl&) :110 19855x 100.0% 100.0% boost::capy::detail::strand_service_impl::clear_dispatch_thread(boost::capy::detail::strand_impl&) :116 19002x 100.0% 100.0% boost::capy::detail::strand_service_impl::shutdown() :127 30x 60.0% 59.0% boost::capy::detail::strand_invoker::promise_type::operator new(unsigned long, std::shared_ptr<boost::capy::detail::strand_impl> const&) :164 19002x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::operator delete(void*, unsigned long) :184 19002x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::get_return_object() :198 19002x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::initial_suspend() :203 19002x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::final_suspend() :204 19002x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::return_void() :205 19002x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::unhandled_exception() :206 0 0.0% 0.0% boost::capy::detail::make_invoker(std::shared_ptr<boost::capy::detail::strand_impl>) :217 19002x 100.0% 47.0% boost::capy::detail::strand_service_impl::post_invoker(std::shared_ptr<boost::capy::detail::strand_impl>, boost::capy::executor_ref) :233 19002x 100.0% 75.0% boost::capy::detail::strand_impl::~strand_impl() :243 11442x 100.0% 77.0% boost::capy::detail::strand_service::strand_service() :251 30x 100.0% 100.0% boost::capy::detail::strand_service::~strand_service() :257 30x 100.0% 100.0% boost::capy::detail::strand_service::running_in_this_thread(boost::capy::detail::strand_impl&) :261 12x 100.0% 100.0% boost::capy::detail::strand_service::dispatch(std::shared_ptr<boost::capy::detail::strand_impl> const&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>) :268 8x 100.0% 89.0% boost::capy::detail::strand_service::post(std::shared_ptr<boost::capy::detail::strand_impl> const&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>) :283 30335x 100.0% 82.0% boost::capy::detail::get_strand_service(boost::capy::execution_context&) :294 11442x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #include "src/ex/detail/strand_impl.hpp"
11 #include <boost/capy/ex/detail/strand_service.hpp>
12 #include <boost/capy/continuation.hpp>
13 #include <coroutine>
14 #include <memory>
15 #include <utility>
16
17 namespace boost {
18 namespace capy {
19 namespace detail {
20
21 // Sentinel stored in invoker_frame_cache_ after shutdown to prevent
22 // in-flight invokers from repopulating a freed cache slot.
23 inline void* const kCacheClosed = reinterpret_cast<void*>(1);
24
25 /** Concrete strand_service.
26
27 Holds a shared mutex pool (193 entries), a linked list of live
28 impls (for shutdown traversal), and a single-slot invoker
29 coroutine frame cache shared across all strands of this service.
30
31 The dispatch helpers (`enqueue`, `dispatch_pending`, etc.) are
32 public so the namespace-scope `make_invoker` coroutine and the
33 `strand_service` static methods can call them without friendship.
34 */
35 class strand_service_impl : public strand_service
36 {
37 public:
38 static constexpr std::size_t num_mutexes = 193;
39
40 std::mutex mutex_;
41 std::size_t salt_ = 0;
42 std::shared_ptr<std::mutex> mutexes_[num_mutexes];
43 intrusive_list<strand_impl> impl_list_;
44 std::atomic<void*> invoker_frame_cache_{nullptr};
45
46 explicit
47 30x strand_service_impl(execution_context&)
48 30x {
49 30x }
50
51 std::shared_ptr<strand_impl>
52 11442x create_implementation() override
53 {
54 11442x auto new_impl = std::make_shared<strand_impl>();
55
56 11442x std::lock_guard<std::mutex> lock(mutex_);
57
58 11442x std::size_t s = salt_++;
59 11442x std::size_t idx = reinterpret_cast<std::size_t>(new_impl.get());
60 11442x idx += idx >> 3;
61 11442x idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2);
62 11442x idx %= num_mutexes;
63 11442x if(!mutexes_[idx])
64 666x mutexes_[idx] = std::make_shared<std::mutex>();
65 11442x new_impl->mutex_ = mutexes_[idx].get();
66
67 11442x impl_list_.push_back(new_impl.get());
68 11442x new_impl->service_.store(this, std::memory_order_release);
69
70 22884x return new_impl;
71 11442x }
72
73 static bool
74 30340x enqueue(strand_impl& impl, std::coroutine_handle<> h)
75 {
76 30340x std::lock_guard<std::mutex> lock(*impl.mutex_);
77 30340x impl.pending_.push(h);
78 30340x if(!impl.locked_)
79 {
80 19002x impl.locked_ = true;
81 19002x return true;
82 }
83 11338x return false;
84 30340x }
85
86 static void
87 19855x dispatch_pending(strand_impl& impl)
88 {
89 19855x strand_queue::taken_batch batch;
90 {
91 19855x std::lock_guard<std::mutex> lock(*impl.mutex_);
92 19855x batch = impl.pending_.take_all();
93 19855x }
94 19855x impl.pending_.dispatch_batch(batch);
95 19855x }
96
97 static bool
98 19855x try_unlock(strand_impl& impl)
99 {
100 19855x std::lock_guard<std::mutex> lock(*impl.mutex_);
101 19855x if(impl.pending_.empty())
102 {
103 19002x impl.locked_ = false;
104 19002x return true;
105 }
106 853x return false;
107 19855x }
108
109 static void
110 19855x set_dispatch_thread(strand_impl& impl) noexcept
111 {
112 19855x impl.dispatch_thread_.store(std::this_thread::get_id());
113 19855x }
114
115 static void
116 19002x clear_dispatch_thread(strand_impl& impl) noexcept
117 {
118 19002x impl.dispatch_thread_.store(std::thread::id{});
119 19002x }
120
121 // Defined below; needs strand_invoker complete.
122 static void
123 post_invoker(std::shared_ptr<strand_impl> impl, executor_ref ex);
124
125 protected:
126 void
127 30x shutdown() override
128 {
129 30x std::lock_guard<std::mutex> lock(mutex_);
130 30x while(auto* p = impl_list_.pop_front())
131 {
132 std::lock_guard<std::mutex> impl_lock(*p->mutex_);
133 p->locked_ = true;
134 p->service_.store(nullptr, std::memory_order_release);
135 }
136
137 30x void* fp = invoker_frame_cache_.exchange(
138 kCacheClosed, std::memory_order_acq_rel);
139 30x if(fp) ::operator delete(fp);
140 30x }
141 };
142
143 /** Invoker coroutine that drains a strand's pending queue.
144
145 Runs once the strand transitions from unlocked to locked. Holds
146 the impl alive via the coroutine parameter (a shared_ptr in the
147 coroutine frame), so user code may drop its strand handle while
148 the invoker is mid-flight.
149
150 The frame's allocator recycles a single per-service slot. The
151 trailer points at the service (lifetime: execution_context),
152 NOT the impl (lifetime: per-strand), so operator delete is
153 safe even after the impl has been destroyed.
154 */
155 struct strand_invoker
156 {
157 struct promise_type
158 {
159 // Stored in the coroutine frame so its address is stable for
160 // posting to the inner executor.
161 continuation self_;
162
163 void*
164 19002x operator new(
165 std::size_t n,
166 std::shared_ptr<strand_impl> const& impl)
167 {
168 19002x auto* svc = impl->service_.load(std::memory_order_acquire);
169 19002x constexpr auto A = alignof(strand_service_impl*);
170 19002x std::size_t padded = (n + A - 1) & ~(A - 1);
171 19002x std::size_t total = padded + sizeof(strand_service_impl*);
172
173 19002x void* p = svc->invoker_frame_cache_.exchange(
174 nullptr, std::memory_order_acquire);
175 19002x if(!p || p == kCacheClosed)
176 8720x p = ::operator new(total);
177
178 19002x *reinterpret_cast<strand_service_impl**>(
179 19002x static_cast<char*>(p) + padded) = svc;
180 19002x return p;
181 }
182
183 void
184 19002x operator delete(void* p, std::size_t n) noexcept
185 {
186 19002x constexpr auto A = alignof(strand_service_impl*);
187 19002x std::size_t padded = (n + A - 1) & ~(A - 1);
188 19002x auto* svc = *reinterpret_cast<strand_service_impl**>(
189 static_cast<char*>(p) + padded);
190
191 19002x void* expected = nullptr;
192 19002x if(!svc->invoker_frame_cache_.compare_exchange_strong(
193 expected, p, std::memory_order_release))
194 8703x ::operator delete(p);
195 19002x }
196
197 strand_invoker
198 19002x get_return_object() noexcept
199 {
200 19002x return {std::coroutine_handle<promise_type>::from_promise(*this)};
201 }
202
203 19002x std::suspend_always initial_suspend() noexcept { return {}; }
204 19002x std::suspend_never final_suspend() noexcept { return {}; }
205 19002x void return_void() noexcept {}
206 void unhandled_exception() { std::terminate(); }
207 };
208
209 std::coroutine_handle<promise_type> h_;
210 };
211
212 // The by-value parameter lives in the coroutine frame for the
213 // invoker's lifetime, keeping the impl alive past any user-side
214 // strand drop.
215 static
216 strand_invoker
217 19002x make_invoker(std::shared_ptr<strand_impl> impl)
218 {
219 auto* p = impl.get();
220 for(;;)
221 {
222 strand_service_impl::set_dispatch_thread(*p);
223 strand_service_impl::dispatch_pending(*p);
224 if(strand_service_impl::try_unlock(*p))
225 {
226 strand_service_impl::clear_dispatch_thread(*p);
227 co_return;
228 }
229 }
230 38004x }
231
232 void
233 19002x strand_service_impl::post_invoker(
234 std::shared_ptr<strand_impl> impl,
235 executor_ref ex)
236 {
237 19002x auto invoker = make_invoker(std::move(impl));
238 19002x auto& self = invoker.h_.promise().self_;
239 19002x self.h = invoker.h_;
240 19002x ex.post(self);
241 19002x }
242
243 22884x strand_impl::~strand_impl()
244 {
245 11442x auto* svc = service_.load(std::memory_order_acquire);
246 11442x if(!svc) return;
247 11442x std::lock_guard<std::mutex> lock(svc->mutex_);
248 11442x svc->impl_list_.remove(this);
249 11442x }
250
251 30x strand_service::
252 30x strand_service()
253 30x : service()
254 {
255 30x }
256
257 30x strand_service::
258 ~strand_service() = default;
259
260 bool
261 12x strand_service::
262 running_in_this_thread(strand_impl& impl) noexcept
263 {
264 12x return impl.dispatch_thread_.load() == std::this_thread::get_id();
265 }
266
267 std::coroutine_handle<>
268 8x strand_service::
269 dispatch(
270 std::shared_ptr<strand_impl> const& impl,
271 executor_ref ex,
272 std::coroutine_handle<> h)
273 {
274 8x if(running_in_this_thread(*impl))
275 3x return h;
276
277 5x if(strand_service_impl::enqueue(*impl, h))
278 5x strand_service_impl::post_invoker(impl, ex);
279 5x return std::noop_coroutine();
280 }
281
282 void
283 30335x strand_service::
284 post(
285 std::shared_ptr<strand_impl> const& impl,
286 executor_ref ex,
287 std::coroutine_handle<> h)
288 {
289 30335x if(strand_service_impl::enqueue(*impl, h))
290 18997x strand_service_impl::post_invoker(impl, ex);
291 30335x }
292
293 strand_service&
294 11442x get_strand_service(execution_context& ctx)
295 {
296 11442x return ctx.use_service<strand_service_impl>();
297 }
298
299 } // namespace detail
300 } // namespace capy
301 } // namespace boost
302