100.00% Lines (7/7) 100.00% Functions (3/3)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // Copyright (c) 2026 Michael Vandeberg 3   // Copyright (c) 2026 Michael Vandeberg
4   // 4   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 7   //
8   // Official repository: https://github.com/cppalliance/capy 8   // Official repository: https://github.com/cppalliance/capy
9   // 9   //
10   10  
11   #ifndef BOOST_CAPY_EX_THREAD_POOL_HPP 11   #ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
12   #define BOOST_CAPY_EX_THREAD_POOL_HPP 12   #define BOOST_CAPY_EX_THREAD_POOL_HPP
13   13  
14   #include <boost/capy/detail/config.hpp> 14   #include <boost/capy/detail/config.hpp>
15   #include <boost/capy/continuation.hpp> 15   #include <boost/capy/continuation.hpp>
16   #include <coroutine> 16   #include <coroutine>
17   #include <boost/capy/ex/execution_context.hpp> 17   #include <boost/capy/ex/execution_context.hpp>
18   #include <cstddef> 18   #include <cstddef>
19   #include <string_view> 19   #include <string_view>
20   20  
21   namespace boost { 21   namespace boost {
22   namespace capy { 22   namespace capy {
23   23  
24   /** A pool of threads for executing work concurrently. 24   /** A pool of threads for executing work concurrently.
25   25  
26   Use this when you need to run coroutines on multiple threads 26   Use this when you need to run coroutines on multiple threads
27   without the overhead of creating and destroying threads for 27   without the overhead of creating and destroying threads for
28   each task. Work items are distributed across the pool using 28   each task. Work items are distributed across the pool using
29   a shared queue. 29   a shared queue.
30   30  
31   @par Thread Safety 31   @par Thread Safety
32   Distinct objects: Safe. 32   Distinct objects: Safe.
33   Shared objects: Unsafe. 33   Shared objects: Unsafe.
34   34  
35   @par Example 35   @par Example
36   @code 36   @code
37   thread_pool pool(4); // 4 worker threads 37   thread_pool pool(4); // 4 worker threads
38   auto ex = pool.get_executor(); 38   auto ex = pool.get_executor();
39   ex.post(some_coroutine); 39   ex.post(some_coroutine);
40   pool.join(); // wait for outstanding work to complete 40   pool.join(); // wait for outstanding work to complete
41   // pool destructor stops the pool, discarding any pending work 41   // pool destructor stops the pool, discarding any pending work
42   @endcode 42   @endcode
43   */ 43   */
44   class BOOST_CAPY_DECL 44   class BOOST_CAPY_DECL
45   thread_pool 45   thread_pool
46   : public execution_context 46   : public execution_context
47   { 47   {
48   class impl; 48   class impl;
49   impl* impl_; 49   impl* impl_;
50   50  
51   public: 51   public:
52   class executor_type; 52   class executor_type;
53   53  
54   /** Destroy the thread pool. 54   /** Destroy the thread pool.
55   55  
56   Signals all worker threads to stop, waits for them to 56   Signals all worker threads to stop, waits for them to
57   finish, and destroys any pending work items. 57   finish, and destroys any pending work items.
58   */ 58   */
59   ~thread_pool(); 59   ~thread_pool();
60   60  
61   /** Construct a thread pool. 61   /** Construct a thread pool.
62   62  
63   Creates a pool with the specified number of worker threads. 63   Creates a pool with the specified number of worker threads.
64   If `num_threads` is zero, the number of threads is set to 64   If `num_threads` is zero, the number of threads is set to
65   the hardware concurrency, or one if that cannot be determined. 65   the hardware concurrency, or one if that cannot be determined.
66   66  
67   @param num_threads The number of worker threads, or zero 67   @param num_threads The number of worker threads, or zero
68   for automatic selection. 68   for automatic selection.
69   69  
70   @param thread_name_prefix The prefix for worker thread names. 70   @param thread_name_prefix The prefix for worker thread names.
71   Thread names appear as "{prefix}0", "{prefix}1", etc. 71   Thread names appear as "{prefix}0", "{prefix}1", etc.
72   The prefix is truncated to 12 characters. Defaults to 72   The prefix is truncated to 12 characters. Defaults to
73   "capy-pool-". 73   "capy-pool-".
74   */ 74   */
75   explicit 75   explicit
76   thread_pool( 76   thread_pool(
77   std::size_t num_threads = 0, 77   std::size_t num_threads = 0,
78   std::string_view thread_name_prefix = "capy-pool-"); 78   std::string_view thread_name_prefix = "capy-pool-");
79   79  
80   thread_pool(thread_pool const&) = delete; 80   thread_pool(thread_pool const&) = delete;
81   thread_pool& operator=(thread_pool const&) = delete; 81   thread_pool& operator=(thread_pool const&) = delete;
82   82  
83   /** Wait for all outstanding work to complete. 83   /** Wait for all outstanding work to complete.
84   84  
85   Releases the internal work guard, then blocks the calling 85   Releases the internal work guard, then blocks the calling
86   thread until all outstanding work tracked by 86   thread until all outstanding work tracked by
87   @ref executor_type::on_work_started and 87   @ref executor_type::on_work_started and
88   @ref executor_type::on_work_finished completes. After all 88   @ref executor_type::on_work_finished completes. After all
89   work finishes, joins the worker threads. 89   work finishes, joins the worker threads.
90   90  
91   If @ref stop is called while `join()` is blocking, the 91   If @ref stop is called while `join()` is blocking, the
92   pool stops without waiting for remaining work to 92   pool stops without waiting for remaining work to
93   complete. Worker threads finish their current item and 93   complete. Worker threads finish their current item and
94   exit; `join()` still waits for all threads to be joined 94   exit; `join()` still waits for all threads to be joined
95   before returning. 95   before returning.
96   96  
97   This function is idempotent. The first call performs the 97   This function is idempotent. The first call performs the
98   join; subsequent calls return immediately. 98   join; subsequent calls return immediately.
99   99  
100   @par Preconditions 100   @par Preconditions
101   Must not be called from a thread in this pool (undefined 101   Must not be called from a thread in this pool (undefined
102   behavior). 102   behavior).
103   103  
104   @par Postconditions 104   @par Postconditions
105   All worker threads have been joined. The pool cannot be 105   All worker threads have been joined. The pool cannot be
106   reused. 106   reused.
107   107  
108   @par Thread Safety 108   @par Thread Safety
109   May be called from any thread not in this pool. 109   May be called from any thread not in this pool.
110   */ 110   */
111   void 111   void
112   join() noexcept; 112   join() noexcept;
113   113  
114   /** Request all worker threads to stop. 114   /** Request all worker threads to stop.
115   115  
116   Signals all threads to exit after finishing their current 116   Signals all threads to exit after finishing their current
117   work item. Queued work that has not started is abandoned. 117   work item. Queued work that has not started is abandoned.
118   Does not wait for threads to exit. 118   Does not wait for threads to exit.
119   119  
120   If @ref join is blocking on another thread, calling 120   If @ref join is blocking on another thread, calling
121   `stop()` causes it to stop waiting for outstanding 121   `stop()` causes it to stop waiting for outstanding
122   work. The `join()` call still waits for worker threads 122   work. The `join()` call still waits for worker threads
123   to finish their current item and exit before returning. 123   to finish their current item and exit before returning.
124   */ 124   */
125   void 125   void
126   stop() noexcept; 126   stop() noexcept;
127   127  
128   /** Return an executor for this thread pool. 128   /** Return an executor for this thread pool.
129   129  
130   @return An executor associated with this thread pool. 130   @return An executor associated with this thread pool.
131   */ 131   */
132   executor_type 132   executor_type
133   get_executor() const noexcept; 133   get_executor() const noexcept;
134   }; 134   };
135   135  
136   /** An executor that submits work to a thread_pool. 136   /** An executor that submits work to a thread_pool.
137   137  
138   Executors are lightweight handles that can be copied and stored. 138   Executors are lightweight handles that can be copied and stored.
139   All copies refer to the same underlying thread pool. 139   All copies refer to the same underlying thread pool.
140   140  
141   @par Thread Safety 141   @par Thread Safety
142   Distinct objects: Safe. 142   Distinct objects: Safe.
143   Shared objects: Safe. 143   Shared objects: Safe.
144   */ 144   */
145   class thread_pool::executor_type 145   class thread_pool::executor_type
146   { 146   {
147   friend class thread_pool; 147   friend class thread_pool;
148   148  
149   thread_pool* pool_ = nullptr; 149   thread_pool* pool_ = nullptr;
150   150  
151   explicit 151   explicit
HITCBC 152   11582 executor_type(thread_pool& pool) noexcept 152   11583 executor_type(thread_pool& pool) noexcept
HITCBC 153   11582 : pool_(&pool) 153   11583 : pool_(&pool)
154   { 154   {
HITCBC 155   11582 } 155   11583 }
156   156  
157   public: 157   public:
158   /** Construct a default null executor. 158   /** Construct a default null executor.
159   159  
160   The resulting executor is not associated with any pool. 160   The resulting executor is not associated with any pool.
161   `context()`, `dispatch()`, and `post()` require the 161   `context()`, `dispatch()`, and `post()` require the
162   executor to be associated with a pool before use. 162   executor to be associated with a pool before use.
163   */ 163   */
164   executor_type() = default; 164   executor_type() = default;
165   165  
166   /// Return the underlying thread pool. 166   /// Return the underlying thread pool.
167   thread_pool& 167   thread_pool&
HITCBC 168   11817 context() const noexcept 168   11818 context() const noexcept
169   { 169   {
HITCBC 170   11817 return *pool_; 170   11818 return *pool_;
171   } 171   }
172   172  
173   /** Notify that work has started. 173   /** Notify that work has started.
174   174  
175   Increments the outstanding work count. Must be paired 175   Increments the outstanding work count. Must be paired
176   with a subsequent call to @ref on_work_finished. 176   with a subsequent call to @ref on_work_finished.
177   177  
178   @see on_work_finished, work_guard 178   @see on_work_finished, work_guard
179   */ 179   */
180   BOOST_CAPY_DECL 180   BOOST_CAPY_DECL
181   void 181   void
182   on_work_started() const noexcept; 182   on_work_started() const noexcept;
183   183  
184   /** Notify that work has finished. 184   /** Notify that work has finished.
185   185  
186   Decrements the outstanding work count. When the count 186   Decrements the outstanding work count. When the count
187   reaches zero after @ref thread_pool::join has been called, 187   reaches zero after @ref thread_pool::join has been called,
188   the pool's worker threads are signaled to stop. 188   the pool's worker threads are signaled to stop.
189   189  
190   @pre A preceding call to @ref on_work_started was made. 190   @pre A preceding call to @ref on_work_started was made.
191   191  
192   @see on_work_started, work_guard 192   @see on_work_started, work_guard
193   */ 193   */
194   BOOST_CAPY_DECL 194   BOOST_CAPY_DECL
195   void 195   void
196   on_work_finished() const noexcept; 196   on_work_finished() const noexcept;
197   197  
198   /** Dispatch a continuation for execution. 198   /** Dispatch a continuation for execution.
199   199  
200   If the calling thread is a worker of this pool, returns 200   If the calling thread is a worker of this pool, returns
201   `c.h` for symmetric transfer so the caller can resume the 201   `c.h` for symmetric transfer so the caller can resume the
202   continuation inline. Otherwise, posts the continuation to 202   continuation inline. Otherwise, posts the continuation to
203   the pool for execution on a worker thread and returns 203   the pool for execution on a worker thread and returns
204   `std::noop_coroutine()`. 204   `std::noop_coroutine()`.
205   205  
206   @param c The continuation to execute. On the post path, 206   @param c The continuation to execute. On the post path,
207   must remain at a stable address until dequeued 207   must remain at a stable address until dequeued
208   and resumed. 208   and resumed.
209   209  
210   @return `c.h` when the calling thread is a pool worker; 210   @return `c.h` when the calling thread is a pool worker;
211   `std::noop_coroutine()` otherwise. 211   `std::noop_coroutine()` otherwise.
212   */ 212   */
213   BOOST_CAPY_DECL 213   BOOST_CAPY_DECL
214   std::coroutine_handle<> 214   std::coroutine_handle<>
215   dispatch(continuation& c) const; 215   dispatch(continuation& c) const;
216   216  
217   /** Post a continuation to the thread pool. 217   /** Post a continuation to the thread pool.
218   218  
219   The continuation will be resumed on one of the pool's 219   The continuation will be resumed on one of the pool's
220   worker threads. The continuation must remain at a stable 220   worker threads. The continuation must remain at a stable
221   address until it is dequeued and resumed. 221   address until it is dequeued and resumed.
222   222  
223   @param c The continuation to execute. 223   @param c The continuation to execute.
224   */ 224   */
225   BOOST_CAPY_DECL 225   BOOST_CAPY_DECL
226   void 226   void
227   post(continuation& c) const; 227   post(continuation& c) const;
228   228  
229   /// Return true if two executors refer to the same thread pool. 229   /// Return true if two executors refer to the same thread pool.
230   bool 230   bool
HITCBC 231   13 operator==(executor_type const& other) const noexcept 231   13 operator==(executor_type const& other) const noexcept
232   { 232   {
HITCBC 233   13 return pool_ == other.pool_; 233   13 return pool_ == other.pool_;
234   } 234   }
235   }; 235   };
236   236  
237   } // capy 237   } // capy
238   } // boost 238   } // boost
239   239  
240   #endif 240   #endif