100.00% Lines (28/28) 100.00% Functions (13/13)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_EX_STRAND_HPP 10   #ifndef BOOST_CAPY_EX_STRAND_HPP
11   #define BOOST_CAPY_EX_STRAND_HPP 11   #define BOOST_CAPY_EX_STRAND_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/continuation.hpp> 14   #include <boost/capy/continuation.hpp>
15   #include <coroutine> 15   #include <coroutine>
16   #include <boost/capy/ex/detail/strand_service.hpp> 16   #include <boost/capy/ex/detail/strand_service.hpp>
17   17  
18   #include <type_traits> 18   #include <type_traits>
19   19  
20   namespace boost { 20   namespace boost {
21   namespace capy { 21   namespace capy {
22   22  
23   /** Provides serialized coroutine execution for any executor type. 23   /** Provides serialized coroutine execution for any executor type.
24   24  
25   A strand wraps an inner executor and ensures that coroutines 25   A strand wraps an inner executor and ensures that coroutines
26   dispatched through it never run concurrently. At most one 26   dispatched through it never run concurrently. At most one
27   coroutine executes at a time within a strand, even when the 27   coroutine executes at a time within a strand, even when the
28   underlying executor runs on multiple threads. 28   underlying executor runs on multiple threads.
29   29  
30   Strands are lightweight handles that can be copied freely. 30   Strands are lightweight handles that can be copied freely.
31   Copies share the same internal serialization state, so 31   Copies share the same internal serialization state, so
32   coroutines dispatched through any copy are serialized with 32   coroutines dispatched through any copy are serialized with
33   respect to all other copies. 33   respect to all other copies.
34   34  
35   @par Invariant 35   @par Invariant
36   Coroutines resumed through a strand shall not run concurrently. 36   Coroutines resumed through a strand shall not run concurrently.
37   37  
38   @par Implementation 38   @par Implementation
39 - The strand uses a service-based architecture with a fixed pool 39 + Each strand allocates a private serialization state. Strands
40 - of 211 implementation objects. New strands hash to select an 40 + constructed from the same execution context share a small pool
41 - impl from the pool. Strands that hash to the same index share 41 + of mutexes (193 entries) selected by hash; mutex sharing causes
42 - serialization, which is harmless (just extra serialization) 42 + only brief contention on the push/pop critical section, never
43 - and rare with 211 buckets. 43 + cross-strand state sharing. Construction cost: one
  44 + `std::make_shared` per strand.
44   45  
45   @par Executor Concept 46   @par Executor Concept
46   This class satisfies the `Executor` concept, providing: 47   This class satisfies the `Executor` concept, providing:
47   - `context()` - Returns the underlying execution context 48   - `context()` - Returns the underlying execution context
48   - `on_work_started()` / `on_work_finished()` - Work tracking 49   - `on_work_started()` / `on_work_finished()` - Work tracking
49   - `dispatch(continuation&)` - May run immediately if strand is idle 50   - `dispatch(continuation&)` - May run immediately if strand is idle
50   - `post(continuation&)` - Always queues for later execution 51   - `post(continuation&)` - Always queues for later execution
51   52  
52   @par Thread Safety 53   @par Thread Safety
53   Distinct objects: Safe. 54   Distinct objects: Safe.
54   Shared objects: Safe. 55   Shared objects: Safe.
55   56  
56   @par Example 57   @par Example
57   @code 58   @code
58   thread_pool pool(4); 59   thread_pool pool(4);
59   auto strand = make_strand(pool.get_executor()); 60   auto strand = make_strand(pool.get_executor());
60   61  
61   // These continuations will never run concurrently 62   // These continuations will never run concurrently
62   continuation c1{h1}, c2{h2}, c3{h3}; 63   continuation c1{h1}, c2{h2}, c3{h3};
63   strand.post(c1); 64   strand.post(c1);
64   strand.post(c2); 65   strand.post(c2);
65   strand.post(c3); 66   strand.post(c3);
66   @endcode 67   @endcode
67   68  
68   @tparam E The type of the underlying executor. Must 69   @tparam E The type of the underlying executor. Must
69   satisfy the `Executor` concept. 70   satisfy the `Executor` concept.
70   71  
71   @see make_strand, Executor 72   @see make_strand, Executor
72   */ 73   */
73   template<typename Ex> 74   template<typename Ex>
74   class strand 75   class strand
75   { 76   {
76 - detail::strand_impl* impl_; 77 + std::shared_ptr<detail::strand_impl> impl_;
77   Ex ex_; 78   Ex ex_;
78   79  
  80 + friend struct strand_test;
  81 +
79   public: 82   public:
80   /** The type of the underlying executor. 83   /** The type of the underlying executor.
81   */ 84   */
82   using inner_executor_type = Ex; 85   using inner_executor_type = Ex;
83   86  
84   /** Construct a strand for the specified executor. 87   /** Construct a strand for the specified executor.
85   88  
86 - Obtains a strand implementation from the service associated 89 + Allocates a fresh strand implementation from the service
87 - with the executor's context. The implementation is selected 90 + associated with the executor's context.
88 - from a fixed pool using a hash function.  
89   91  
90   @param ex The inner executor to wrap. Coroutines will 92   @param ex The inner executor to wrap. Coroutines will
91   ultimately be dispatched through this executor. 93   ultimately be dispatched through this executor.
92   94  
93   @note This constructor is disabled if the argument is a 95   @note This constructor is disabled if the argument is a
94   strand type, to prevent strand-of-strand wrapping. 96   strand type, to prevent strand-of-strand wrapping.
95   */ 97   */
96   template<typename Ex1, 98   template<typename Ex1,
97   typename = std::enable_if_t< 99   typename = std::enable_if_t<
98   !std::is_same_v<std::decay_t<Ex1>, strand> && 100   !std::is_same_v<std::decay_t<Ex1>, strand> &&
99   !detail::is_strand<std::decay_t<Ex1>>::value && 101   !detail::is_strand<std::decay_t<Ex1>>::value &&
100   std::is_convertible_v<Ex1, Ex>>> 102   std::is_convertible_v<Ex1, Ex>>>
101   explicit 103   explicit
HITCBC 102   29 strand(Ex1&& ex) 104   11442 strand(Ex1&& ex)
HITCBC 103   29 : impl_(detail::get_strand_service(ex.context()) 105   11442 : impl_(detail::get_strand_service(ex.context())
HITCBC 104 - 29 .get_implementation()) 106 + 11442 .create_implementation())
HITCBC 105   29 , ex_(std::forward<Ex1>(ex)) 107   11442 , ex_(std::forward<Ex1>(ex))
106   { 108   {
HITCBC 107   29 } 109   11442 }
108   110  
109   /** Construct a copy. 111   /** Construct a copy.
110   112  
111   Creates a strand that shares serialization state with 113   Creates a strand that shares serialization state with
112   the original. Coroutines dispatched through either strand 114   the original. Coroutines dispatched through either strand
113   will be serialized with respect to each other. 115   will be serialized with respect to each other.
114   */ 116   */
HITCBC 115   1 strand(strand const&) = default; 117   9 strand(strand const&) = default;
116   118  
117   /** Construct by moving. 119   /** Construct by moving.
118   120  
119   @note A moved-from strand is only safe to destroy 121   @note A moved-from strand is only safe to destroy
120   or reassign. 122   or reassign.
121   */ 123   */
HITGIC 122   strand(strand&&) = default; 124   11443 strand(strand&&) = default;
123   125  
124   /** Assign by copying. 126   /** Assign by copying.
125   */ 127   */
HITGIC 126   strand& operator=(strand const&) = default; 128   1 strand& operator=(strand const&) = default;
127   129  
128   /** Assign by moving. 130   /** Assign by moving.
129   131  
130   @note A moved-from strand is only safe to destroy 132   @note A moved-from strand is only safe to destroy
131   or reassign. 133   or reassign.
132   */ 134   */
HITGIC 133   strand& operator=(strand&&) = default; 135   1 strand& operator=(strand&&) = default;
134   136  
135   /** Return the underlying executor. 137   /** Return the underlying executor.
136   138  
137   @return A const reference to the inner executor. 139   @return A const reference to the inner executor.
138   */ 140   */
139   Ex const& 141   Ex const&
HITCBC 140   1 get_inner_executor() const noexcept 142   1 get_inner_executor() const noexcept
141   { 143   {
HITCBC 142   1 return ex_; 144   1 return ex_;
143   } 145   }
144   146  
145   /** Return the underlying execution context. 147   /** Return the underlying execution context.
146   148  
147   @return A reference to the execution context associated 149   @return A reference to the execution context associated
148   with the inner executor. 150   with the inner executor.
149   */ 151   */
150   auto& 152   auto&
HITCBC 151   5 context() const noexcept 153   5 context() const noexcept
152   { 154   {
HITCBC 153   5 return ex_.context(); 155   5 return ex_.context();
154   } 156   }
155   157  
156   /** Notify that work has started. 158   /** Notify that work has started.
157   159  
158   Delegates to the inner executor's `on_work_started()`. 160   Delegates to the inner executor's `on_work_started()`.
159   This is a no-op for most executor types. 161   This is a no-op for most executor types.
160   */ 162   */
161   void 163   void
HITCBC 162   6 on_work_started() const noexcept 164   6 on_work_started() const noexcept
163   { 165   {
HITCBC 164   6 ex_.on_work_started(); 166   6 ex_.on_work_started();
HITCBC 165   6 } 167   6 }
166   168  
167   /** Notify that work has finished. 169   /** Notify that work has finished.
168   170  
169   Delegates to the inner executor's `on_work_finished()`. 171   Delegates to the inner executor's `on_work_finished()`.
170   This is a no-op for most executor types. 172   This is a no-op for most executor types.
171   */ 173   */
172   void 174   void
HITCBC 173   6 on_work_finished() const noexcept 175   6 on_work_finished() const noexcept
174   { 176   {
HITCBC 175   6 ex_.on_work_finished(); 177   6 ex_.on_work_finished();
HITCBC 176   6 } 178   6 }
177   179  
178   /** Determine whether the strand is running in the current thread. 180   /** Determine whether the strand is running in the current thread.
179   181  
180   @return true if the current thread is executing a coroutine 182   @return true if the current thread is executing a coroutine
181   within this strand's dispatch loop. 183   within this strand's dispatch loop.
182   */ 184   */
183   bool 185   bool
HITCBC 184   4 running_in_this_thread() const noexcept 186   4 running_in_this_thread() const noexcept
185   { 187   {
HITCBC 186   4 return detail::strand_service::running_in_this_thread(*impl_); 188   4 return detail::strand_service::running_in_this_thread(*impl_);
187   } 189   }
188   190  
189   /** Compare two strands for equality. 191   /** Compare two strands for equality.
190   192  
191   Two strands are equal if they share the same internal 193   Two strands are equal if they share the same internal
192   serialization state. Equal strands serialize coroutines 194   serialization state. Equal strands serialize coroutines
193   with respect to each other. 195   with respect to each other.
194   196  
195   @param other The strand to compare against. 197   @param other The strand to compare against.
196   @return true if both strands share the same implementation. 198   @return true if both strands share the same implementation.
197   */ 199   */
198   bool 200   bool
HITCBC 199   4 operator==(strand const& other) const noexcept 201   499505 operator==(strand const& other) const noexcept
200   { 202   {
HITCBC 201 - 4 return impl_ == other.impl_; 203 + 499505 return impl_.get() == other.impl_.get();
202   } 204   }
203   205  
204   /** Post a continuation to the strand. 206   /** Post a continuation to the strand.
205   207  
206   The continuation is always queued for execution, never resumed 208   The continuation is always queued for execution, never resumed
207   immediately. When the strand becomes available, queued 209   immediately. When the strand becomes available, queued
208   work executes in FIFO order on the underlying executor. 210   work executes in FIFO order on the underlying executor.
209   211  
210   @par Ordering 212   @par Ordering
211   Guarantees strict FIFO ordering relative to other post() calls. 213   Guarantees strict FIFO ordering relative to other post() calls.
212   Use this instead of dispatch() when ordering matters. 214   Use this instead of dispatch() when ordering matters.
213   215  
214   @param c The continuation to post. The caller retains 216   @param c The continuation to post. The caller retains
215   ownership; the continuation must remain valid until 217   ownership; the continuation must remain valid until
216   it is dequeued and resumed. 218   it is dequeued and resumed.
217   */ 219   */
218   void 220   void
HITCBC 219   330 post(continuation& c) const 221   30335 post(continuation& c) const
220   { 222   {
HITCBC 221 - 330 detail::strand_service::post(*impl_, executor_ref(ex_), c.h); 223 + 30335 detail::strand_service::post(impl_, executor_ref(ex_), c.h);
HITCBC 222   330 } 224   30335 }
223   225  
224   /** Dispatch a continuation through the strand. 226   /** Dispatch a continuation through the strand.
225   227  
226   Returns a handle for symmetric transfer. If the calling 228   Returns a handle for symmetric transfer. If the calling
227   thread is already executing within this strand, returns `c.h`. 229   thread is already executing within this strand, returns `c.h`.
228   Otherwise, the continuation is queued and 230   Otherwise, the continuation is queued and
229   `std::noop_coroutine()` is returned. 231   `std::noop_coroutine()` is returned.
230   232  
231   @par Ordering 233   @par Ordering
232   Callers requiring strict FIFO ordering should use post() 234   Callers requiring strict FIFO ordering should use post()
233   instead, which always queues the continuation. 235   instead, which always queues the continuation.
234   236  
235   @param c The continuation to dispatch. The caller retains 237   @param c The continuation to dispatch. The caller retains
236   ownership; the continuation must remain valid until 238   ownership; the continuation must remain valid until
237   it is dequeued and resumed. 239   it is dequeued and resumed.
238   240  
239   @return A handle for symmetric transfer or `std::noop_coroutine()`. 241   @return A handle for symmetric transfer or `std::noop_coroutine()`.
240   */ 242   */
241   std::coroutine_handle<> 243   std::coroutine_handle<>
HITCBC 242   8 dispatch(continuation& c) const 244   8 dispatch(continuation& c) const
243   { 245   {
HITCBC 244 - 8 return detail::strand_service::dispatch(*impl_, executor_ref(ex_), c.h); 246 + 8 return detail::strand_service::dispatch(impl_, executor_ref(ex_), c.h);
245   } 247   }
246   }; 248   };
247   249  
248   // Deduction guide 250   // Deduction guide
249   template<typename Ex> 251   template<typename Ex>
250   strand(Ex) -> strand<Ex>; 252   strand(Ex) -> strand<Ex>;
251   253  
252   } // namespace capy 254   } // namespace capy
253   } // namespace boost 255   } // namespace boost
254   256  
255   #endif 257   #endif