95.61% Lines (109/114)
95.83% Functions (23/24)
| TLA | Baseline | Branch | ||||||
|---|---|---|---|---|---|---|---|---|
| Line | Hits | Code | Line | Hits | Code | |||
| 1 | // | 1 | // | |||||
| 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | |||||
| 3 | // | 3 | // | |||||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |||||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |||||
| 6 | // | 6 | // | |||||
| 7 | // Official repository: https://github.com/cppalliance/capy | 7 | // Official repository: https://github.com/cppalliance/capy | |||||
| 8 | // | 8 | // | |||||
| 9 | 9 | |||||||
| 10 | - | #include "src/ex/detail/strand_queue.hpp" | 10 | + | #include "src/ex/detail/strand_impl.hpp" | |||
| 11 | #include <boost/capy/ex/detail/strand_service.hpp> | 11 | #include <boost/capy/ex/detail/strand_service.hpp> | |||||
| 12 | - | #include <atomic> | ||||||
| 13 | #include <boost/capy/continuation.hpp> | 12 | #include <boost/capy/continuation.hpp> | |||||
| 14 | #include <coroutine> | 13 | #include <coroutine> | |||||
| 15 | - | #include <mutex> | 14 | + | #include <memory> | |||
| 16 | - | #include <thread> | ||||||
| 17 | #include <utility> | 15 | #include <utility> | |||||
| 18 | 16 | |||||||
| 19 | namespace boost { | 17 | namespace boost { | |||||
| 20 | namespace capy { | 18 | namespace capy { | |||||
| 21 | namespace detail { | 19 | namespace detail { | |||||
| 22 | 20 | |||||||
| 23 | - | //---------------------------------------------------------- | 21 | + | // Sentinel stored in invoker_frame_cache_ after shutdown to prevent | |||
| 24 | - | |||||||
| 25 | - | /** Implementation state for a strand. | ||||||
| 26 | - | |||||||
| 27 | - | Each strand_impl provides serialization for coroutines | ||||||
| 28 | - | dispatched through strands that share it. | ||||||
| 29 | - | */ | ||||||
| 30 | - | // Sentinel stored in cached_frame_ after shutdown to prevent | ||||||
| 31 | // in-flight invokers from repopulating a freed cache slot. | 22 | // in-flight invokers from repopulating a freed cache slot. | |||||
| 32 | inline void* const kCacheClosed = reinterpret_cast<void*>(1); | 23 | inline void* const kCacheClosed = reinterpret_cast<void*>(1); | |||||
| 33 | 24 | |||||||
| 34 | - | struct strand_impl | 25 | + | /** Concrete strand_service. | |||
| 35 | - | { | ||||||
| 36 | - | std::mutex mutex_; | ||||||
| 37 | - | strand_queue pending_; | ||||||
| 38 | - | bool locked_ = false; | ||||||
| 39 | - | std::atomic<std::thread::id> dispatch_thread_{}; | ||||||
| 40 | - | std::atomic<void*> cached_frame_{nullptr}; | ||||||
| 41 | - | }; | ||||||
| 42 | - | |||||||
| 43 | - | //---------------------------------------------------------- | ||||||
| 44 | - | |||||||
| 45 | - | /** Invoker coroutine for strand dispatch. | ||||||
| 46 | - | |||||||
| 47 | - | Uses custom allocator to recycle frame - one allocation | ||||||
| 48 | - | per strand_impl lifetime, stored in trailer for recovery. | ||||||
| 49 | - | */ | ||||||
| 50 | - | struct strand_invoker | ||||||
| 51 | - | { | ||||||
| 52 | - | struct promise_type | ||||||
| 53 | - | { | ||||||
| 54 | - | // Used to post the invoker through the inner executor. | ||||||
| 55 | - | // Lives in the coroutine frame (heap-allocated), so has | ||||||
| 56 | - | // a stable address for the duration of the queue residency. | ||||||
| 57 | - | continuation self_; | ||||||
| 58 | - | |||||||
| DCB | 59 | - | 16 | void* operator new(std::size_t n, strand_impl& impl) | ||||
| 60 | - | { | ||||||
| DCB | 61 | - | 16 | constexpr auto A = alignof(strand_impl*); | ||||
| DCB | 62 | - | 16 | std::size_t padded = (n + A - 1) & ~(A - 1); | ||||
| DCB | 63 | - | 16 | std::size_t total = padded + sizeof(strand_impl*); | ||||
| 64 | - | |||||||
| DCB | 65 | - | 16 | void* p = impl.cached_frame_.exchange( | ||||
| 66 | - | nullptr, std::memory_order_acquire); | ||||||
| DCB | 67 | - | 16 | if(!p || p == kCacheClosed) | ||||
| DCB | 68 | - | 13 | p = ::operator new(total); | ||||
| 69 | - | |||||||
| 70 | - | // Trailer lets delete recover impl | ||||||
| DCB | 71 | - | 16 | *reinterpret_cast<strand_impl**>( | ||||
| DCB | 72 | - | 16 | static_cast<char*>(p) + padded) = &impl; | ||||
| DCB | 73 | - | 16 | return p; | ||||
| 74 | - | } | ||||||
| 75 | - | |||||||
| DCB | 76 | - | 16 | void operator delete(void* p, std::size_t n) noexcept | ||||
| 77 | - | { | ||||||
| DCB | 78 | - | 16 | constexpr auto A = alignof(strand_impl*); | ||||
| DCB | 79 | - | 16 | std::size_t padded = (n + A - 1) & ~(A - 1); | ||||
| 80 | - | |||||||
| DCB | 81 | - | 16 | auto* impl = *reinterpret_cast<strand_impl**>( | ||||
| 82 | - | static_cast<char*>(p) + padded); | ||||||
| 83 | - | |||||||
| DCB | 84 | - | 16 | void* expected = nullptr; | ||||
| DCB | 85 | - | 16 | if(!impl->cached_frame_.compare_exchange_strong( | ||||
| 86 | - | expected, p, std::memory_order_release)) | ||||||
| DUB | 87 | - | ✗ | ::operator delete(p); | ||||
| DCB | 88 | - | 16 | } | ||||
| 89 | - | |||||||
| DCB | 90 | - | 16 | strand_invoker get_return_object() noexcept | ||||
| DCB | 91 | - | 16 | { return {std::coroutine_handle<promise_type>::from_promise(*this)}; } | ||||
| 92 | - | |||||||
| DCB | 93 | - | 16 | std::suspend_always initial_suspend() noexcept { return {}; } | ||||
| DCB | 94 | - | 16 | std::suspend_never final_suspend() noexcept { return {}; } | ||||
| DCB | 95 | - | 16 | void return_void() noexcept {} | ||||
| DUB | 96 | - | ✗ | void unhandled_exception() { std::terminate(); } | ||||
| 97 | - | }; | ||||||
| 98 | - | |||||||
| 99 | - | std::coroutine_handle<promise_type> h_; | ||||||
| 100 | - | }; | ||||||
| 101 | - | |||||||
| 102 | - | //---------------------------------------------------------- | ||||||
| 103 | 26 | |||||||
| 104 | - | /** Concrete implementation of strand_service. | 27 | + | Holds a shared mutex pool (193 entries), a linked list of live | |||
| 28 | + | impls (for shutdown traversal), and a single-slot invoker | ||||||
| 29 | + | coroutine frame cache shared across all strands of this service. | ||||||
| 105 | 30 | |||||||
| 106 | - | Holds the fixed pool of strand_impl objects. | 31 | + | The dispatch helpers (`enqueue`, `dispatch_pending`, etc.) are | |||
| 32 | + | public so the namespace-scope `make_invoker` coroutine and the | ||||||
| 33 | + | `strand_service` static methods can call them without friendship. | ||||||
| 107 | */ | 34 | */ | |||||
| 108 | class strand_service_impl : public strand_service | 35 | class strand_service_impl : public strand_service | |||||
| 109 | { | 36 | { | |||||
| 110 | - | static constexpr std::size_t num_impls = 211; | 37 | + | public: | |||
| 38 | + | static constexpr std::size_t num_mutexes = 193; | ||||||
| 111 | - | strand_impl impls_[num_impls]; | ||||||
| 112 | - | std::size_t salt_ = 0; | ||||||
| 113 | 39 | |||||||
| 114 | std::mutex mutex_; | 40 | std::mutex mutex_; | |||||
| 41 | + | std::size_t salt_ = 0; | ||||||
| 42 | + | std::shared_ptr<std::mutex> mutexes_[num_mutexes]; | ||||||
| 43 | + | intrusive_list<strand_impl> impl_list_; | ||||||
| 44 | + | std::atomic<void*> invoker_frame_cache_{nullptr}; | ||||||
| 115 | - | public: | ||||||
| 116 | 45 | |||||||
| 117 | explicit | 46 | explicit | |||||
| HITCBC | 118 | 25 | strand_service_impl(execution_context&) | 47 | 30 | strand_service_impl(execution_context&) | ||
| HITCBC | 119 | 5300 | { | 48 | 30 | { | ||
| HITCBC | 120 | 25 | } | 49 | 30 | } | ||
| 121 | 50 | |||||||
| 122 | - | strand_impl* | 51 | + | std::shared_ptr<strand_impl> | |||
| HITCBC | 123 | - | 29 | get_implementation() override | 52 | + | 11442 | create_implementation() override |
| 124 | { | 53 | { | |||||
| HITGNC | 54 | + | 11442 | auto new_impl = std::make_shared<strand_impl>(); | ||||
| 55 | + | |||||||
| DCB | 125 | - | 29 | std::size_t index = salt_++; | ||||
| DCB | 126 | - | 29 | index = index % num_impls; | ||||
| DCB | 127 | - | 29 | return &impls_[index]; | ||||
| DCB | 128 | - | 29 | } | ||||
| HITCBC | 129 | 29 | std::lock_guard<std::mutex> lock(mutex_); | 56 | 11442 | std::lock_guard<std::mutex> lock(mutex_); | ||
| 130 | 57 | |||||||
| HITGIC | 131 | - | protected: | 58 | + | 11442 | std::size_t s = salt_++; | |
| HITGIC | 132 | - | void | 59 | + | 11442 | std::size_t idx = reinterpret_cast<std::size_t>(new_impl.get()); | |
| HITCBC | 133 | - | 25 | shutdown() override | 60 | + | 11442 | idx += idx >> 3; |
| HITGIC | 134 | - | { | 61 | + | 11442 | idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2); | |
| HITCBC | 135 | - | 5300 | for(std::size_t i = 0; i < num_impls; ++i) | 62 | + | 11442 | idx %= num_mutexes; |
| HITGIC | 136 | - | { | 63 | + | 11442 | if(!mutexes_[idx]) | |
| HITCBC | 137 | - | 5275 | std::lock_guard<std::mutex> lock(impls_[i].mutex_); | 64 | + | 666 | mutexes_[idx] = std::make_shared<std::mutex>(); |
| HITCBC | 138 | - | 5275 | impls_[i].locked_ = true; | 65 | + | 11442 | new_impl->mutex_ = mutexes_[idx].get(); |
| 139 | 66 | |||||||
| HITCBC | 140 | - | 5275 | void* p = impls_[i].cached_frame_.exchange( | 67 | + | 11442 | impl_list_.push_back(new_impl.get()); |
| HITGIC | 141 | - | kCacheClosed, std::memory_order_acquire); | 68 | + | 11442 | new_impl->service_.store(this, std::memory_order_release); | |
| ECB | 142 | - | 5275 | if(p) | 69 | + | ||
| HITCBC | 143 | - | 13 | ::operator delete(p); | 70 | + | 22884 | return new_impl; |
| DCB | 144 | - | 5275 | } | ||||
| HITCBC | 145 | 25 | } | 71 | 11442 | } | ||
| 146 | - | private: | ||||||
| 147 | 72 | |||||||
| 148 | static bool | 73 | static bool | |||||
| HITCBC | 149 | 335 | enqueue(strand_impl& impl, std::coroutine_handle<> h) | 74 | 30340 | enqueue(strand_impl& impl, std::coroutine_handle<> h) | ||
| 150 | { | 75 | { | |||||
| HITCBC | 151 | - | 335 | std::lock_guard<std::mutex> lock(impl.mutex_); | 76 | + | 30340 | std::lock_guard<std::mutex> lock(*impl.mutex_); |
| HITCBC | 152 | 335 | impl.pending_.push(h); | 77 | 30340 | impl.pending_.push(h); | ||
| HITCBC | 153 | 335 | if(!impl.locked_) | 78 | 30340 | if(!impl.locked_) | ||
| 154 | { | 79 | { | |||||
| HITCBC | 155 | 16 | impl.locked_ = true; | 80 | 19002 | impl.locked_ = true; | ||
| HITCBC | 156 | 16 | return true; | 81 | 19002 | return true; | ||
| 157 | } | 82 | } | |||||
| HITCBC | 158 | 319 | return false; | 83 | 11338 | return false; | ||
| HITCBC | 159 | 335 | } | 84 | 30340 | } | ||
| 160 | 85 | |||||||
| 161 | static void | 86 | static void | |||||
| HITCBC | 162 | 23 | dispatch_pending(strand_impl& impl) | 87 | 19855 | dispatch_pending(strand_impl& impl) | ||
| 163 | { | 88 | { | |||||
| HITCBC | 164 | 23 | strand_queue::taken_batch batch; | 89 | 19855 | strand_queue::taken_batch batch; | ||
| 165 | { | 90 | { | |||||
| HITCBC | 166 | - | 23 | std::lock_guard<std::mutex> lock(impl.mutex_); | 91 | + | 19855 | std::lock_guard<std::mutex> lock(*impl.mutex_); |
| HITCBC | 167 | 23 | batch = impl.pending_.take_all(); | 92 | 19855 | batch = impl.pending_.take_all(); | ||
| HITCBC | 168 | 23 | } | 93 | 19855 | } | ||
| HITCBC | 169 | 23 | impl.pending_.dispatch_batch(batch); | 94 | 19855 | impl.pending_.dispatch_batch(batch); | ||
| HITCBC | 170 | 23 | } | 95 | 19855 | } | ||
| 171 | 96 | |||||||
| 172 | static bool | 97 | static bool | |||||
| HITCBC | 173 | 23 | try_unlock(strand_impl& impl) | 98 | 19855 | try_unlock(strand_impl& impl) | ||
| 174 | { | 99 | { | |||||
| HITCBC | 175 | - | 23 | std::lock_guard<std::mutex> lock(impl.mutex_); | 100 | + | 19855 | std::lock_guard<std::mutex> lock(*impl.mutex_); |
| HITCBC | 176 | 23 | if(impl.pending_.empty()) | 101 | 19855 | if(impl.pending_.empty()) | ||
| 177 | { | 102 | { | |||||
| HITCBC | 178 | 16 | impl.locked_ = false; | 103 | 19002 | impl.locked_ = false; | ||
| HITCBC | 179 | 16 | return true; | 104 | 19002 | return true; | ||
| 180 | } | 105 | } | |||||
| HITCBC | 181 | 7 | return false; | 106 | 853 | return false; | ||
| HITCBC | 182 | 23 | } | 107 | 19855 | } | ||
| 183 | 108 | |||||||
| 184 | static void | 109 | static void | |||||
| HITCBC | 185 | 23 | set_dispatch_thread(strand_impl& impl) noexcept | 110 | 19855 | set_dispatch_thread(strand_impl& impl) noexcept | ||
| 186 | { | 111 | { | |||||
| HITCBC | 187 | 23 | impl.dispatch_thread_.store(std::this_thread::get_id()); | 112 | 19855 | impl.dispatch_thread_.store(std::this_thread::get_id()); | ||
| HITCBC | 188 | 23 | } | 113 | 19855 | } | ||
| 189 | 114 | |||||||
| 190 | static void | 115 | static void | |||||
| HITCBC | 191 | 16 | clear_dispatch_thread(strand_impl& impl) noexcept | 116 | 19002 | clear_dispatch_thread(strand_impl& impl) noexcept | ||
| 192 | { | 117 | { | |||||
| HITCBC | 193 | 16 | impl.dispatch_thread_.store(std::thread::id{}); | 118 | 19002 | impl.dispatch_thread_.store(std::thread::id{}); | ||
| HITCBC | 194 | 16 | } | 119 | 19002 | } | ||
| 195 | 120 | |||||||
| 196 | - | // Loops until queue empty (aggressive). Alternative: per-batch fairness | 121 | + | // Defined below; needs strand_invoker complete. | |||
| 197 | - | // (repost after each batch to let other work run) - explore if starvation observed. | 122 | + | static void | |||
| 198 | - | static strand_invoker | 123 | + | post_invoker(std::shared_ptr<strand_impl> impl, executor_ref ex); | |||
| ECB | 199 | - | 16 | make_invoker(strand_impl& impl) | 124 | + | ||
| 125 | + | protected: | ||||||
| 126 | + | void | ||||||
| HITGNC | 127 | + | 30 | shutdown() override | ||||
| 200 | { | 128 | { | |||||
| HITGIC | 201 | - | strand_impl* p = &impl; | 129 | + | 30 | std::lock_guard<std::mutex> lock(mutex_); | |
| HITGIC | 202 | - | for(;;) | 130 | + | 30 | while(auto* p = impl_list_.pop_front()) | |
| 203 | { | 131 | { | |||||
| MISUIC | 204 | - | set_dispatch_thread(*p); | 132 | + | ✗ | std::lock_guard<std::mutex> impl_lock(*p->mutex_); | |
| MISUIC | 205 | - | dispatch_pending(*p); | 133 | + | ✗ | p->locked_ = true; | |
| MISUIC | 206 | - | if(try_unlock(*p)) | 134 | + | ✗ | p->service_.store(nullptr, std::memory_order_release); | |
| 207 | - | { | ||||||
| 208 | - | clear_dispatch_thread(*p); | ||||||
| 209 | - | co_return; | ||||||
| 210 | - | } | ||||||
| MISUIC | 211 | } | 135 | ✗ | } | |||
| 136 | + | |||||||
| HITGNC | 137 | + | 30 | void* fp = invoker_frame_cache_.exchange( | ||||
| 138 | + | kCacheClosed, std::memory_order_acq_rel); | ||||||
| HITGNC | 139 | + | 30 | if(fp) ::operator delete(fp); | ||||
| HITCBC | 212 | 32 | } | 140 | 30 | } | ||
| 141 | + | }; | ||||||
| 213 | 142 | |||||||
| 214 | - | static void | 143 | + | /** Invoker coroutine that drains a strand's pending queue. | |||
| ECB | 215 | - | 16 | post_invoker(strand_impl& impl, executor_ref ex) | 144 | + | ||
| 145 | + | Runs once the strand transitions from unlocked to locked. Holds | ||||||
| 146 | + | the impl alive via the coroutine parameter (a shared_ptr in the | ||||||
| 147 | + | coroutine frame), so user code may drop its strand handle while | ||||||
| 148 | + | the invoker is mid-flight. | ||||||
| 149 | + | |||||||
| 150 | + | The frame's allocator recycles a single per-service slot. The | ||||||
| 151 | + | trailer points at the service (lifetime: execution_context), | ||||||
| 152 | + | NOT the impl (lifetime: per-strand), so operator delete is | ||||||
| 153 | + | safe even after the impl has been destroyed. | ||||||
| 154 | + | */ | ||||||
| 155 | + | struct strand_invoker | ||||||
| 156 | + | { | ||||||
| 157 | + | struct promise_type | ||||||
| 216 | { | 158 | { | |||||
| ECB | 217 | - | 16 | auto invoker = make_invoker(impl); | 159 | + | // Stored in the coroutine frame so its address is stable for | |
| ECB | 218 | - | 16 | auto& self = invoker.h_.promise().self_; | 160 | + | // posting to the inner executor. | |
| ECB | 219 | - | 16 | self.h = invoker.h_; | 161 | + | continuation self_; | |
| DCB | 220 | - | 16 | ex.post(self); | ||||
| DCB | 221 | - | 16 | } | ||||
| 222 | 162 | |||||||
| 223 | - | friend class strand_service; | 163 | + | void* | |||
| HITGNC | 164 | + | 19002 | operator new( | ||||
| 165 | + | std::size_t n, | ||||||
| 166 | + | std::shared_ptr<strand_impl> const& impl) | ||||||
| 167 | + | { | ||||||
| HITGNC | 168 | + | 19002 | auto* svc = impl->service_.load(std::memory_order_acquire); | ||||
| HITGNC | 169 | + | 19002 | constexpr auto A = alignof(strand_service_impl*); | ||||
| HITGNC | 170 | + | 19002 | std::size_t padded = (n + A - 1) & ~(A - 1); | ||||
| HITGNC | 171 | + | 19002 | std::size_t total = padded + sizeof(strand_service_impl*); | ||||
| 172 | + | |||||||
| HITGNC | 173 | + | 19002 | void* p = svc->invoker_frame_cache_.exchange( | ||||
| 174 | + | nullptr, std::memory_order_acquire); | ||||||
| HITGNC | 175 | + | 19002 | if(!p || p == kCacheClosed) | ||||
| HITGNC | 176 | + | 8720 | p = ::operator new(total); | ||||
| 177 | + | |||||||
| HITGNC | 178 | + | 19002 | *reinterpret_cast<strand_service_impl**>( | ||||
| HITGNC | 179 | + | 19002 | static_cast<char*>(p) + padded) = svc; | ||||
| HITGNC | 180 | + | 19002 | return p; | ||||
| 181 | + | } | ||||||
| 182 | + | |||||||
| 183 | + | void | ||||||
| HITGNC | 184 | + | 19002 | operator delete(void* p, std::size_t n) noexcept | ||||
| 185 | + | { | ||||||
| HITGNC | 186 | + | 19002 | constexpr auto A = alignof(strand_service_impl*); | ||||
| HITGNC | 187 | + | 19002 | std::size_t padded = (n + A - 1) & ~(A - 1); | ||||
| HITGNC | 188 | + | 19002 | auto* svc = *reinterpret_cast<strand_service_impl**>( | ||||
| 189 | + | static_cast<char*>(p) + padded); | ||||||
| 190 | + | |||||||
| HITGNC | 191 | + | 19002 | void* expected = nullptr; | ||||
| HITGNC | 192 | + | 19002 | if(!svc->invoker_frame_cache_.compare_exchange_strong( | ||||
| 193 | + | expected, p, std::memory_order_release)) | ||||||
| HITGNC | 194 | + | 8703 | ::operator delete(p); | ||||
| HITGNC | 195 | + | 19002 | } | ||||
| 196 | + | |||||||
| 197 | + | strand_invoker | ||||||
| HITGNC | 198 | + | 19002 | get_return_object() noexcept | ||||
| 199 | + | { | ||||||
| HITGNC | 200 | + | 19002 | return {std::coroutine_handle<promise_type>::from_promise(*this)}; | ||||
| 201 | + | } | ||||||
| 202 | + | |||||||
| HITGNC | 203 | + | 19002 | std::suspend_always initial_suspend() noexcept { return {}; } | ||||
| HITGNC | 204 | + | 19002 | std::suspend_never final_suspend() noexcept { return {}; } | ||||
| HITGNC | 205 | + | 19002 | void return_void() noexcept {} | ||||
| MISUNC | 206 | + | ✗ | void unhandled_exception() { std::terminate(); } | ||||
| 207 | + | }; | ||||||
| 208 | + | |||||||
| 209 | + | std::coroutine_handle<promise_type> h_; | ||||||
| 224 | }; | 210 | }; | |||||
| 225 | 211 | |||||||
| 226 | - | //---------------------------------------------------------- | 212 | + | // The by-value parameter lives in the coroutine frame for the | |||
| 213 | + | // invoker's lifetime, keeping the impl alive past any user-side | ||||||
| 214 | + | // strand drop. | ||||||
| 215 | + | static | ||||||
| 216 | + | strand_invoker | ||||||
| HITGNC | 217 | + | 19002 | make_invoker(std::shared_ptr<strand_impl> impl) | ||||
| 218 | + | { | ||||||
| 219 | + | auto* p = impl.get(); | ||||||
| 220 | + | for(;;) | ||||||
| 221 | + | { | ||||||
| 222 | + | strand_service_impl::set_dispatch_thread(*p); | ||||||
| 223 | + | strand_service_impl::dispatch_pending(*p); | ||||||
| 224 | + | if(strand_service_impl::try_unlock(*p)) | ||||||
| 225 | + | { | ||||||
| 226 | + | strand_service_impl::clear_dispatch_thread(*p); | ||||||
| 227 | + | co_return; | ||||||
| 228 | + | } | ||||||
| 229 | + | } | ||||||
| HITGNC | 230 | + | 38004 | } | ||||
| 231 | + | |||||||
| 232 | + | void | ||||||
| HITGNC | 233 | + | 19002 | strand_service_impl::post_invoker( | ||||
| 234 | + | std::shared_ptr<strand_impl> impl, | ||||||
| 235 | + | executor_ref ex) | ||||||
| 236 | + | { | ||||||
| HITGNC | 237 | + | 19002 | auto invoker = make_invoker(std::move(impl)); | ||||
| HITGNC | 238 | + | 19002 | auto& self = invoker.h_.promise().self_; | ||||
| HITGNC | 239 | + | 19002 | self.h = invoker.h_; | ||||
| HITGNC | 240 | + | 19002 | ex.post(self); | ||||
| HITGNC | 241 | + | 19002 | } | ||||
| 242 | + | |||||||
| HITGNC | 243 | + | 22884 | strand_impl::~strand_impl() | ||||
| 244 | + | { | ||||||
| HITGNC | 245 | + | 11442 | auto* svc = service_.load(std::memory_order_acquire); | ||||
| HITGNC | 246 | + | 11442 | if(!svc) return; | ||||
| HITGNC | 247 | + | 11442 | std::lock_guard<std::mutex> lock(svc->mutex_); | ||||
| HITGNC | 248 | + | 11442 | svc->impl_list_.remove(this); | ||||
| HITGNC | 249 | + | 11442 | } | ||||
| 227 | 250 | |||||||
| HITCBC | 228 | 25 | strand_service:: | 251 | 30 | strand_service:: | ||
| HITCBC | 229 | 25 | strand_service() | 252 | 30 | strand_service() | ||
| HITCBC | 230 | 25 | : service() | 253 | 30 | : service() | ||
| 231 | { | 254 | { | |||||
| HITCBC | 232 | 25 | } | 255 | 30 | } | ||
| 233 | 256 | |||||||
| HITCBC | 234 | 25 | strand_service:: | 257 | 30 | strand_service:: | ||
| 235 | ~strand_service() = default; | 258 | ~strand_service() = default; | |||||
| 236 | 259 | |||||||
| 237 | bool | 260 | bool | |||||
| HITCBC | 238 | 12 | strand_service:: | 261 | 12 | strand_service:: | ||
| 239 | running_in_this_thread(strand_impl& impl) noexcept | 262 | running_in_this_thread(strand_impl& impl) noexcept | |||||
| 240 | { | 263 | { | |||||
| HITCBC | 241 | 12 | return impl.dispatch_thread_.load() == std::this_thread::get_id(); | 264 | 12 | return impl.dispatch_thread_.load() == std::this_thread::get_id(); | ||
| 242 | } | 265 | } | |||||
| 243 | 266 | |||||||
| 244 | std::coroutine_handle<> | 267 | std::coroutine_handle<> | |||||
| HITCBC | 245 | 8 | strand_service:: | 268 | 8 | strand_service:: | ||
| 246 | - | dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) | 269 | + | dispatch( | |||
| 270 | + | std::shared_ptr<strand_impl> const& impl, | ||||||
| 271 | + | executor_ref ex, | ||||||
| 272 | + | std::coroutine_handle<> h) | ||||||
| 247 | { | 273 | { | |||||
| HITCBC | 248 | - | 8 | if(running_in_this_thread(impl)) | 274 | + | 8 | if(running_in_this_thread(*impl)) |
| HITCBC | 249 | 3 | return h; | 275 | 3 | return h; | ||
| 250 | 276 | |||||||
| HITCBC | 251 | - | 5 | if(strand_service_impl::enqueue(impl, h)) | 277 | + | 5 | if(strand_service_impl::enqueue(*impl, h)) |
| HITCBC | 252 | 5 | strand_service_impl::post_invoker(impl, ex); | 278 | 5 | strand_service_impl::post_invoker(impl, ex); | ||
| HITCBC | 253 | 5 | return std::noop_coroutine(); | 279 | 5 | return std::noop_coroutine(); | ||
| 254 | } | 280 | } | |||||
| 255 | 281 | |||||||
| 256 | void | 282 | void | |||||
| HITCBC | 257 | 330 | strand_service:: | 283 | 30335 | strand_service:: | ||
| 258 | - | post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) | 284 | + | post( | |||
| 285 | + | std::shared_ptr<strand_impl> const& impl, | ||||||
| 286 | + | executor_ref ex, | ||||||
| 287 | + | std::coroutine_handle<> h) | ||||||
| 259 | { | 288 | { | |||||
| HITCBC | 260 | - | 330 | if(strand_service_impl::enqueue(impl, h)) | 289 | + | 30335 | if(strand_service_impl::enqueue(*impl, h)) |
| HITCBC | 261 | 11 | strand_service_impl::post_invoker(impl, ex); | 290 | 18997 | strand_service_impl::post_invoker(impl, ex); | ||
| HITCBC | 262 | 330 | } | 291 | 30335 | } | ||
| 263 | 292 | |||||||
| 264 | strand_service& | 293 | strand_service& | |||||
| HITCBC | 265 | 29 | get_strand_service(execution_context& ctx) | 294 | 11442 | get_strand_service(execution_context& ctx) | ||
| 266 | { | 295 | { | |||||
| HITCBC | 267 | 29 | return ctx.use_service<strand_service_impl>(); | 296 | 11442 | return ctx.use_service<strand_service_impl>(); | ||
| 268 | } | 297 | } | |||||
| 269 | 298 | |||||||
| 270 | } // namespace detail | 299 | } // namespace detail | |||||
| 271 | } // namespace capy | 300 | } // namespace capy | |||||
| 272 | } // namespace boost | 301 | } // namespace boost | |||||