89.58% Lines (43/48) 100.00% Functions (7/7)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Steve Gerbino 2   // Copyright (c) 2026 Steve Gerbino
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/corosio 7   // Official repository: https://github.com/cppalliance/corosio
8   // 8   //
9   9  
10   #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 10   #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11   #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 11   #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12   12  
13   #include <boost/corosio/detail/config.hpp> 13   #include <boost/corosio/detail/config.hpp>
14   #include <boost/corosio/detail/intrusive.hpp> 14   #include <boost/corosio/detail/intrusive.hpp>
15   #include <boost/capy/ex/execution_context.hpp> 15   #include <boost/capy/ex/execution_context.hpp>
  16 + #include <boost/capy/test/thread_name.hpp>
16   17  
17   #include <condition_variable> 18   #include <condition_variable>
  19 + #include <cstdio>
18   #include <mutex> 20   #include <mutex>
19   #include <stdexcept> 21   #include <stdexcept>
20   #include <thread> 22   #include <thread>
21   #include <vector> 23   #include <vector>
22   24  
23   namespace boost::corosio::detail { 25   namespace boost::corosio::detail {
24   26  
25   /** Base class for thread pool work items. 27   /** Base class for thread pool work items.
26   28  
27   Derive from this to create work that can be posted to a 29   Derive from this to create work that can be posted to a
28   @ref thread_pool. Uses static function pointer dispatch, 30   @ref thread_pool. Uses static function pointer dispatch,
29   consistent with the IOCP `op` pattern. 31   consistent with the IOCP `op` pattern.
30   32  
31   @par Example 33   @par Example
32   @code 34   @code
33   struct my_work : pool_work_item 35   struct my_work : pool_work_item
34   { 36   {
35   int* result; 37   int* result;
36   static void execute( pool_work_item* w ) noexcept 38   static void execute( pool_work_item* w ) noexcept
37   { 39   {
38   auto* self = static_cast<my_work*>( w ); 40   auto* self = static_cast<my_work*>( w );
39   *self->result = 42; 41   *self->result = 42;
40   } 42   }
41   }; 43   };
42   44  
43   my_work w; 45   my_work w;
44   w.func_ = &my_work::execute; 46   w.func_ = &my_work::execute;
45   w.result = &r; 47   w.result = &r;
46   pool.post( &w ); 48   pool.post( &w );
47   @endcode 49   @endcode
48   */ 50   */
49   struct pool_work_item : intrusive_queue<pool_work_item>::node 51   struct pool_work_item : intrusive_queue<pool_work_item>::node
50   { 52   {
51   /// Static dispatch function signature. 53   /// Static dispatch function signature.
52   using func_type = void (*)(pool_work_item*) noexcept; 54   using func_type = void (*)(pool_work_item*) noexcept;
53   55  
54   /// Completion handler invoked by the worker thread. 56   /// Completion handler invoked by the worker thread.
55   func_type func_ = nullptr; 57   func_type func_ = nullptr;
56   }; 58   };
57   59  
58   /** Shared thread pool for dispatching blocking operations. 60   /** Shared thread pool for dispatching blocking operations.
59   61  
60   Provides a fixed pool of reusable worker threads for operations 62   Provides a fixed pool of reusable worker threads for operations
61   that cannot be integrated with async I/O (e.g. blocking DNS 63   that cannot be integrated with async I/O (e.g. blocking DNS
62   calls). Registered as an `execution_context::service` so it 64   calls). Registered as an `execution_context::service` so it
63   is a singleton per io_context. 65   is a singleton per io_context.
64   66  
65   Threads are created eagerly in the constructor. The default 67   Threads are created eagerly in the constructor. The default
66   thread count is 1. 68   thread count is 1.
67   69  
68   @par Thread Safety 70   @par Thread Safety
69   All public member functions are thread-safe. 71   All public member functions are thread-safe.
70   72  
71   @par Shutdown 73   @par Shutdown
72   Sets a shutdown flag, notifies all threads, and joins them. 74   Sets a shutdown flag, notifies all threads, and joins them.
73   In-flight blocking calls complete naturally before the thread 75   In-flight blocking calls complete naturally before the thread
74   exits. 76   exits.
75   */ 77   */
76   class thread_pool final : public capy::execution_context::service 78   class thread_pool final : public capy::execution_context::service
77   { 79   {
78   std::mutex mutex_; 80   std::mutex mutex_;
79   std::condition_variable cv_; 81   std::condition_variable cv_;
80   intrusive_queue<pool_work_item> work_queue_; 82   intrusive_queue<pool_work_item> work_queue_;
81   std::vector<std::thread> threads_; 83   std::vector<std::thread> threads_;
82   bool shutdown_ = false; 84   bool shutdown_ = false;
83   85  
84 - void worker_loop(); 86 + void worker_loop(unsigned index);
85   87  
86   public: 88   public:
87   using key_type = thread_pool; 89   using key_type = thread_pool;
88   90  
89   /** Construct the thread pool service. 91   /** Construct the thread pool service.
90   92  
91   Eagerly creates all worker threads. 93   Eagerly creates all worker threads.
92   94  
93   @par Exception Safety 95   @par Exception Safety
94   Strong guarantee. If thread creation fails, all 96   Strong guarantee. If thread creation fails, all
95   already-created threads are shut down and joined 97   already-created threads are shut down and joined
96   before the exception propagates. 98   before the exception propagates.
97   99  
98   @param ctx Reference to the owning execution_context. 100   @param ctx Reference to the owning execution_context.
99   @param num_threads Number of worker threads. Must be 101   @param num_threads Number of worker threads. Must be
100   at least 1. 102   at least 1.
101   103  
102   @throws std::logic_error If `num_threads` is 0. 104   @throws std::logic_error If `num_threads` is 0.
103   */ 105   */
HITCBC 104   607 explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1) 106   607 explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
HITCBC 105   607 { 107   607 {
106   (void)ctx; 108   (void)ctx;
HITCBC 107   607 if (!num_threads) 109   607 if (!num_threads)
HITCBC 108   1 throw std::logic_error("thread_pool requires at least 1 thread"); 110   1 throw std::logic_error("thread_pool requires at least 1 thread");
HITCBC 109   606 threads_.reserve(num_threads); 111   606 threads_.reserve(num_threads);
110   try 112   try
111   { 113   {
HITCBC 112   1215 for (unsigned i = 0; i < num_threads; ++i) 114   1215 for (unsigned i = 0; i < num_threads; ++i)
HITCBC 113 - 1218 threads_.emplace_back([this] { worker_loop(); }); 115 + 1218 threads_.emplace_back([this, i] { worker_loop(i + 1); });
114   } 116   }
MISUBC 115   catch (...) 117   catch (...)
116   { 118   {
MISUBC 117   shutdown(); 119   shutdown();
MISUBC 118   throw; 120   throw;
MISUBC 119   } 121   }
HITCBC 120   609 } 122   609 }
121   123  
HITCBC 122   1211 ~thread_pool() override = default; 124   1211 ~thread_pool() override = default;
123   125  
124   thread_pool(thread_pool const&) = delete; 126   thread_pool(thread_pool const&) = delete;
125   thread_pool& operator=(thread_pool const&) = delete; 127   thread_pool& operator=(thread_pool const&) = delete;
126   128  
127   /** Enqueue a work item for execution on the thread pool. 129   /** Enqueue a work item for execution on the thread pool.
128   130  
129   Zero-allocation: the caller owns the work item's storage. 131   Zero-allocation: the caller owns the work item's storage.
130   132  
131   @param w The work item to execute. Must remain valid until 133   @param w The work item to execute. Must remain valid until
132   its `func_` has been called. 134   its `func_` has been called.
133   135  
134   @return `true` if the item was enqueued, `false` if the 136   @return `true` if the item was enqueued, `false` if the
135   pool has already shut down. 137   pool has already shut down.
136   */ 138   */
137   bool post(pool_work_item* w) noexcept; 139   bool post(pool_work_item* w) noexcept;
138   140  
139   /** Shut down the thread pool. 141   /** Shut down the thread pool.
140   142  
141   Signals all threads to exit after draining any 143   Signals all threads to exit after draining any
142   remaining queued work, then joins them. 144   remaining queued work, then joins them.
143   */ 145   */
144   void shutdown() override; 146   void shutdown() override;
145   }; 147   };
146   148  
147   inline void 149   inline void
HITCBC 148 - 609 thread_pool::worker_loop() 150 + 609 thread_pool::worker_loop(unsigned index)
149   { 151   {
  152 + // Name format chosen to fit Linux's 15-char pthread limit:
  153 + // "tpool-svc-" (10) + up to 4 digit index leaves "tpool-svc-9999".
  154 + char name[16];
HITGNC   155 + 609 std::snprintf(name, sizeof(name), "tpool-svc-%u", index);
HITGNC   156 + 609 capy::set_current_thread_name(name);
  157 +
150   for (;;) 158   for (;;)
151   { 159   {
152   pool_work_item* w; 160   pool_work_item* w;
153   { 161   {
HITCBC 154   788 std::unique_lock<std::mutex> lock(mutex_); 162   788 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 155   788 cv_.wait( 163   788 cv_.wait(
HITCBC 156   1307 lock, [this] { return shutdown_ || !work_queue_.empty(); }); 164   1330 lock, [this] { return shutdown_ || !work_queue_.empty(); });
157   165  
HITCBC 158   788 w = work_queue_.pop(); 166   788 w = work_queue_.pop();
HITCBC 159   788 if (!w) 167   788 if (!w)
160   { 168   {
HITCBC 161   609 if (shutdown_) 169   609 if (shutdown_)
HITCBC 162   1218 return; 170   1218 return;
MISUBC 163   continue; 171   continue;
164   } 172   }
HITCBC 165   788 } 173   788 }
HITCBC 166   179 w->func_(w); 174   179 w->func_(w);
HITCBC 167   179 } 175   179 }
168   } 176   }
169   177  
170   inline bool 178   inline bool
HITCBC 171   180 thread_pool::post(pool_work_item* w) noexcept 179   180 thread_pool::post(pool_work_item* w) noexcept
172   { 180   {
173   { 181   {
HITCBC 174   180 std::lock_guard<std::mutex> lock(mutex_); 182   180 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 175   180 if (shutdown_) 183   180 if (shutdown_)
HITCBC 176   1 return false; 184   1 return false;
HITCBC 177   179 work_queue_.push(w); 185   179 work_queue_.push(w);
HITCBC 178   180 } 186   180 }
HITCBC 179   179 cv_.notify_one(); 187   179 cv_.notify_one();
HITCBC 180   179 return true; 188   179 return true;
181   } 189   }
182   190  
183   inline void 191   inline void
HITCBC 184   610 thread_pool::shutdown() 192   610 thread_pool::shutdown()
185   { 193   {
186   { 194   {
HITCBC 187   610 std::lock_guard<std::mutex> lock(mutex_); 195   610 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 188   610 shutdown_ = true; 196   610 shutdown_ = true;
HITCBC 189   610 } 197   610 }
HITCBC 190   610 cv_.notify_all(); 198   610 cv_.notify_all();
191   199  
HITCBC 192   1219 for (auto& t : threads_) 200   1219 for (auto& t : threads_)
193   { 201   {
HITCBC 194   609 if (t.joinable()) 202   609 if (t.joinable())
HITCBC 195   609 t.join(); 203   609 t.join();
196   } 204   }
HITCBC 197   610 threads_.clear(); 205   610 threads_.clear();
198   206  
199   { 207   {
HITCBC 200   610 std::lock_guard<std::mutex> lock(mutex_); 208   610 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 201   610 while (work_queue_.pop()) 209   610 while (work_queue_.pop())
202   ; 210   ;
HITCBC 203   610 } 211   610 }
HITCBC 204   610 } 212   610 }
205   213  
206   } // namespace boost::corosio::detail 214   } // namespace boost::corosio::detail
207   215  
208   #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 216   #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP