LCOV - code coverage report
Current view: top level - corosio/detail - thread_pool.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 89.6 % 48 43 5
Test Date: 2026-05-11 17:11:56 Functions: 100.0 % 8 8

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
      11                 : #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/config.hpp>
      14                 : #include <boost/corosio/detail/intrusive.hpp>
      15                 : #include <boost/capy/ex/execution_context.hpp>
      16                 : #include <boost/capy/test/thread_name.hpp>
      17                 : 
      18                 : #include <condition_variable>
      19                 : #include <cstdio>
      20                 : #include <mutex>
      21                 : #include <stdexcept>
      22                 : #include <thread>
      23                 : #include <vector>
      24                 : 
      25                 : namespace boost::corosio::detail {
      26                 : 
      27                 : /** Base class for thread pool work items.
      28                 : 
      29                 :     Derive from this to create work that can be posted to a
      30                 :     @ref thread_pool. Uses static function pointer dispatch,
      31                 :     consistent with the IOCP `op` pattern.
      32                 : 
      33                 :     @par Example
      34                 :     @code
      35                 :     struct my_work : pool_work_item
      36                 :     {
      37                 :         int* result;
      38                 :         static void execute( pool_work_item* w ) noexcept
      39                 :         {
      40                 :             auto* self = static_cast<my_work*>( w );
      41                 :             *self->result = 42;
      42                 :         }
      43                 :     };
      44                 : 
      45                 :     my_work w;
      46                 :     w.func_ = &my_work::execute;
      47                 :     w.result = &r;
      48                 :     pool.post( &w );
      49                 :     @endcode
      50                 : */
      51                 : struct pool_work_item : intrusive_queue<pool_work_item>::node
      52                 : {
      53                 :     /// Static dispatch function signature.
      54                 :     using func_type = void (*)(pool_work_item*) noexcept;
      55                 : 
      56                 :     /// Completion handler invoked by the worker thread.
      57                 :     func_type func_ = nullptr;
      58                 : };
      59                 : 
      60                 : /** Shared thread pool for dispatching blocking operations.
      61                 : 
      62                 :     Provides a fixed pool of reusable worker threads for operations
      63                 :     that cannot be integrated with async I/O (e.g. blocking DNS
      64                 :     calls). Registered as an `execution_context::service` so it
      65                 :     is a singleton per io_context.
      66                 : 
      67                 :     Threads are created eagerly in the constructor. The default
      68                 :     thread count is 1.
      69                 : 
      70                 :     @par Thread Safety
      71                 :     All public member functions are thread-safe.
      72                 : 
      73                 :     @par Shutdown
      74                 :     Sets a shutdown flag, notifies all threads, and joins them.
      75                 :     In-flight blocking calls complete naturally before the thread
      76                 :     exits.
      77                 : */
      78                 : class thread_pool final : public capy::execution_context::service
      79                 : {
      80                 :     std::mutex mutex_;
      81                 :     std::condition_variable cv_;
      82                 :     intrusive_queue<pool_work_item> work_queue_;
      83                 :     std::vector<std::thread> threads_;
      84                 :     bool shutdown_ = false;
      85                 : 
      86                 :     void worker_loop(unsigned index);
      87                 : 
      88                 : public:
      89                 :     using key_type = thread_pool;
      90                 : 
      91                 :     /** Construct the thread pool service.
      92                 : 
      93                 :         Eagerly creates all worker threads.
      94                 : 
      95                 :         @par Exception Safety
      96                 :         Strong guarantee. If thread creation fails, all
      97                 :         already-created threads are shut down and joined
      98                 :         before the exception propagates.
      99                 : 
     100                 :         @param ctx Reference to the owning execution_context.
     101                 :         @param num_threads Number of worker threads. Must be
     102                 :                at least 1.
     103                 : 
     104                 :         @throws std::logic_error If `num_threads` is 0.
     105                 :     */
     106 HIT         607 :     explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
     107             607 :     {
     108                 :         (void)ctx;
     109             607 :         if (!num_threads)
     110               1 :             throw std::logic_error("thread_pool requires at least 1 thread");
     111             606 :         threads_.reserve(num_threads);
     112                 :         try
     113                 :         {
     114            1215 :             for (unsigned i = 0; i < num_threads; ++i)
     115            1218 :                 threads_.emplace_back([this, i] { worker_loop(i + 1); });
     116                 :         }
     117 MIS           0 :         catch (...)
     118                 :         {
     119               0 :             shutdown();
     120               0 :             throw;
     121               0 :         }
     122 HIT         609 :     }
     123                 : 
     124            1211 :     ~thread_pool() override = default;
     125                 : 
     126                 :     thread_pool(thread_pool const&)            = delete;
     127                 :     thread_pool& operator=(thread_pool const&) = delete;
     128                 : 
     129                 :     /** Enqueue a work item for execution on the thread pool.
     130                 : 
     131                 :         Zero-allocation: the caller owns the work item's storage.
     132                 : 
     133                 :         @param w The work item to execute. Must remain valid until
     134                 :                  its `func_` has been called.
     135                 : 
     136                 :         @return `true` if the item was enqueued, `false` if the
     137                 :                 pool has already shut down.
     138                 :     */
     139                 :     bool post(pool_work_item* w) noexcept;
     140                 : 
     141                 :     /** Shut down the thread pool.
     142                 : 
     143                 :         Signals all threads to exit after draining any
     144                 :         remaining queued work, then joins them.
     145                 :     */
     146                 :     void shutdown() override;
     147                 : };
     148                 : 
     149                 : inline void
     150             609 : thread_pool::worker_loop(unsigned index)
     151                 : {
     152                 :     // Name format chosen to fit Linux's 15-char pthread limit:
     153                 :     // "tpool-svc-" (10) + up to 4 digit index leaves "tpool-svc-9999".
     154                 :     char name[16];
     155             609 :     std::snprintf(name, sizeof(name), "tpool-svc-%u", index);
     156             609 :     capy::set_current_thread_name(name);
     157                 : 
     158                 :     for (;;)
     159                 :     {
     160                 :         pool_work_item* w;
     161                 :         {
     162             788 :             std::unique_lock<std::mutex> lock(mutex_);
     163             788 :             cv_.wait(
     164            1330 :                 lock, [this] { return shutdown_ || !work_queue_.empty(); });
     165                 : 
     166             788 :             w = work_queue_.pop();
     167             788 :             if (!w)
     168                 :             {
     169             609 :                 if (shutdown_)
     170            1218 :                     return;
     171 MIS           0 :                 continue;
     172                 :             }
     173 HIT         788 :         }
     174             179 :         w->func_(w);
     175             179 :     }
     176                 : }
     177                 : 
     178                 : inline bool
     179             180 : thread_pool::post(pool_work_item* w) noexcept
     180                 : {
     181                 :     {
     182             180 :         std::lock_guard<std::mutex> lock(mutex_);
     183             180 :         if (shutdown_)
     184               1 :             return false;
     185             179 :         work_queue_.push(w);
     186             180 :     }
     187             179 :     cv_.notify_one();
     188             179 :     return true;
     189                 : }
     190                 : 
     191                 : inline void
     192             610 : thread_pool::shutdown()
     193                 : {
     194                 :     {
     195             610 :         std::lock_guard<std::mutex> lock(mutex_);
     196             610 :         shutdown_ = true;
     197             610 :     }
     198             610 :     cv_.notify_all();
     199                 : 
     200            1219 :     for (auto& t : threads_)
     201                 :     {
     202             609 :         if (t.joinable())
     203             609 :             t.join();
     204                 :     }
     205             610 :     threads_.clear();
     206                 : 
     207                 :     {
     208             610 :         std::lock_guard<std::mutex> lock(mutex_);
     209             610 :         while (work_queue_.pop())
     210                 :             ;
     211             610 :     }
     212             610 : }
     213                 : 
     214                 : } // namespace boost::corosio::detail
     215                 : 
     216                 : #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
        

Generated by: LCOV version 2.3