TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11 : #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/intrusive.hpp>
15 : #include <boost/capy/ex/execution_context.hpp>
16 : #include <boost/capy/test/thread_name.hpp>
17 :
18 : #include <condition_variable>
19 : #include <cstdio>
20 : #include <mutex>
21 : #include <stdexcept>
22 : #include <thread>
23 : #include <vector>
24 :
25 : namespace boost::corosio::detail {
26 :
27 : /** Base class for thread pool work items.
28 :
29 : Derive from this to create work that can be posted to a
30 : @ref thread_pool. Uses static function pointer dispatch,
31 : consistent with the IOCP `op` pattern.
32 :
33 : @par Example
34 : @code
35 : struct my_work : pool_work_item
36 : {
37 : int* result;
38 : static void execute( pool_work_item* w ) noexcept
39 : {
40 : auto* self = static_cast<my_work*>( w );
41 : *self->result = 42;
42 : }
43 : };
44 :
45 : my_work w;
46 : w.func_ = &my_work::execute;
47 : w.result = &r;
48 : pool.post( &w );
49 : @endcode
50 : */
51 : struct pool_work_item : intrusive_queue<pool_work_item>::node
52 : {
53 : /// Static dispatch function signature.
54 : using func_type = void (*)(pool_work_item*) noexcept;
55 :
56 : /// Completion handler invoked by the worker thread.
57 : func_type func_ = nullptr;
58 : };
59 :
60 : /** Shared thread pool for dispatching blocking operations.
61 :
62 : Provides a fixed pool of reusable worker threads for operations
63 : that cannot be integrated with async I/O (e.g. blocking DNS
64 : calls). Registered as an `execution_context::service` so it
65 : is a singleton per io_context.
66 :
67 : Threads are created eagerly in the constructor. The default
68 : thread count is 1.
69 :
70 : @par Thread Safety
71 : All public member functions are thread-safe.
72 :
73 : @par Shutdown
74 : Sets a shutdown flag, notifies all threads, and joins them.
75 : In-flight blocking calls complete naturally before the thread
76 : exits.
77 : */
78 : class thread_pool final : public capy::execution_context::service
79 : {
80 : std::mutex mutex_;
81 : std::condition_variable cv_;
82 : intrusive_queue<pool_work_item> work_queue_;
83 : std::vector<std::thread> threads_;
84 : bool shutdown_ = false;
85 :
86 : void worker_loop(unsigned index);
87 :
88 : public:
89 : using key_type = thread_pool;
90 :
91 : /** Construct the thread pool service.
92 :
93 : Eagerly creates all worker threads.
94 :
95 : @par Exception Safety
96 : Strong guarantee. If thread creation fails, all
97 : already-created threads are shut down and joined
98 : before the exception propagates.
99 :
100 : @param ctx Reference to the owning execution_context.
101 : @param num_threads Number of worker threads. Must be
102 : at least 1.
103 :
104 : @throws std::logic_error If `num_threads` is 0.
105 : */
106 HIT 607 : explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
107 607 : {
108 : (void)ctx;
109 607 : if (!num_threads)
110 1 : throw std::logic_error("thread_pool requires at least 1 thread");
111 606 : threads_.reserve(num_threads);
112 : try
113 : {
114 1215 : for (unsigned i = 0; i < num_threads; ++i)
115 1218 : threads_.emplace_back([this, i] { worker_loop(i + 1); });
116 : }
117 MIS 0 : catch (...)
118 : {
119 0 : shutdown();
120 0 : throw;
121 0 : }
122 HIT 609 : }
123 :
124 1211 : ~thread_pool() override = default;
125 :
126 : thread_pool(thread_pool const&) = delete;
127 : thread_pool& operator=(thread_pool const&) = delete;
128 :
129 : /** Enqueue a work item for execution on the thread pool.
130 :
131 : Zero-allocation: the caller owns the work item's storage.
132 :
133 : @param w The work item to execute. Must remain valid until
134 : its `func_` has been called.
135 :
136 : @return `true` if the item was enqueued, `false` if the
137 : pool has already shut down.
138 : */
139 : bool post(pool_work_item* w) noexcept;
140 :
141 : /** Shut down the thread pool.
142 :
143 : Signals all threads to exit after draining any
144 : remaining queued work, then joins them.
145 : */
146 : void shutdown() override;
147 : };
148 :
149 : inline void
150 609 : thread_pool::worker_loop(unsigned index)
151 : {
152 : // Name format chosen to fit Linux's 15-char pthread limit:
153 : // "tpool-svc-" (10) + up to 4 digit index leaves "tpool-svc-9999".
154 : char name[16];
155 609 : std::snprintf(name, sizeof(name), "tpool-svc-%u", index);
156 609 : capy::set_current_thread_name(name);
157 :
158 : for (;;)
159 : {
160 : pool_work_item* w;
161 : {
162 788 : std::unique_lock<std::mutex> lock(mutex_);
163 788 : cv_.wait(
164 1330 : lock, [this] { return shutdown_ || !work_queue_.empty(); });
165 :
166 788 : w = work_queue_.pop();
167 788 : if (!w)
168 : {
169 609 : if (shutdown_)
170 1218 : return;
171 MIS 0 : continue;
172 : }
173 HIT 788 : }
174 179 : w->func_(w);
175 179 : }
176 : }
177 :
178 : inline bool
179 180 : thread_pool::post(pool_work_item* w) noexcept
180 : {
181 : {
182 180 : std::lock_guard<std::mutex> lock(mutex_);
183 180 : if (shutdown_)
184 1 : return false;
185 179 : work_queue_.push(w);
186 180 : }
187 179 : cv_.notify_one();
188 179 : return true;
189 : }
190 :
191 : inline void
192 610 : thread_pool::shutdown()
193 : {
194 : {
195 610 : std::lock_guard<std::mutex> lock(mutex_);
196 610 : shutdown_ = true;
197 610 : }
198 610 : cv_.notify_all();
199 :
200 1219 : for (auto& t : threads_)
201 : {
202 609 : if (t.joinable())
203 609 : t.join();
204 : }
205 610 : threads_.clear();
206 :
207 : {
208 610 : std::lock_guard<std::mutex> lock(mutex_);
209 610 : while (work_queue_.pop())
210 : ;
211 610 : }
212 610 : }
213 :
214 : } // namespace boost::corosio::detail
215 :
216 : #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
|