1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/native_scheduler.hpp>
20  
#include <boost/corosio/native/native_scheduler.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
22  

22  

23  
#include <boost/corosio/native/detail/select/select_op.hpp>
23  
#include <boost/corosio/native/detail/select/select_op.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28  

28  

29  
#include <boost/corosio/detail/except.hpp>
29  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
31  

31  

32  
#include <sys/select.h>
32  
#include <sys/select.h>
33  
#include <sys/socket.h>
33  
#include <sys/socket.h>
34  
#include <unistd.h>
34  
#include <unistd.h>
35  
#include <errno.h>
35  
#include <errno.h>
36  
#include <fcntl.h>
36  
#include <fcntl.h>
37  

37  

38  
#include <algorithm>
38  
#include <algorithm>
39  
#include <atomic>
39  
#include <atomic>
40  
#include <chrono>
40  
#include <chrono>
41  
#include <condition_variable>
41  
#include <condition_variable>
42  
#include <cstddef>
42  
#include <cstddef>
43  
#include <limits>
43  
#include <limits>
44  
#include <mutex>
44  
#include <mutex>
45  
#include <unordered_map>
45  
#include <unordered_map>
46  

46  

47  
namespace boost::corosio::detail {
47  
namespace boost::corosio::detail {
48  

48  

49  
struct select_op;
49  
struct select_op;
50  

50  

51  
/** POSIX scheduler using select() for I/O multiplexing.
51  
/** POSIX scheduler using select() for I/O multiplexing.
52  

52  

53  
    This scheduler implements the scheduler interface using the POSIX select()
53  
    This scheduler implements the scheduler interface using the POSIX select()
54  
    call for I/O event notification. It uses a single reactor model
54  
    call for I/O event notification. It uses a single reactor model
55  
    where one thread runs select() while other threads wait on a condition
55  
    where one thread runs select() while other threads wait on a condition
56  
    variable for handler work. This design provides:
56  
    variable for handler work. This design provides:
57  

57  

58  
    - Handler parallelism: N posted handlers can execute on N threads
58  
    - Handler parallelism: N posted handlers can execute on N threads
59  
    - No thundering herd: condition_variable wakes exactly one thread
59  
    - No thundering herd: condition_variable wakes exactly one thread
60  
    - Portability: Works on all POSIX systems
60  
    - Portability: Works on all POSIX systems
61  

61  

62  
    The design mirrors epoll_scheduler for behavioral consistency:
62  
    The design mirrors epoll_scheduler for behavioral consistency:
63  
    - Same single-reactor thread coordination model
63  
    - Same single-reactor thread coordination model
64  
    - Same work counting semantics
64  
    - Same work counting semantics
65  
    - Same timer integration pattern
65  
    - Same timer integration pattern
66  

66  

67  
    Known Limitations:
67  
    Known Limitations:
68  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
68  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
69  
    - O(n) scanning: rebuilds fd_sets each iteration
69  
    - O(n) scanning: rebuilds fd_sets each iteration
70  
    - Level-triggered only (no edge-triggered mode)
70  
    - Level-triggered only (no edge-triggered mode)
71  

71  

72  
    @par Thread Safety
72  
    @par Thread Safety
73  
    All public member functions are thread-safe.
73  
    All public member functions are thread-safe.
74  
*/
74  
*/
75  
class BOOST_COROSIO_DECL select_scheduler final
75  
class BOOST_COROSIO_DECL select_scheduler final
76  
    : public native_scheduler
76  
    : public native_scheduler
77  
    , public capy::execution_context::service
77  
    , public capy::execution_context::service
78  
{
78  
{
79  
public:
79  
public:
80  
    using key_type = scheduler;
80  
    using key_type = scheduler;
81  

81  

82  
    /** Construct the scheduler.
82  
    /** Construct the scheduler.
83  

83  

84  
        Creates a self-pipe for reactor interruption.
84  
        Creates a self-pipe for reactor interruption.
85  

85  

86  
        @param ctx Reference to the owning execution_context.
86  
        @param ctx Reference to the owning execution_context.
87  
        @param concurrency_hint Hint for expected thread count (unused).
87  
        @param concurrency_hint Hint for expected thread count (unused).
88  
    */
88  
    */
89  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
89  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
90  

90  

91  
    ~select_scheduler() override;
91  
    ~select_scheduler() override;
92  

92  

93  
    select_scheduler(select_scheduler const&)            = delete;
93  
    select_scheduler(select_scheduler const&)            = delete;
94  
    select_scheduler& operator=(select_scheduler const&) = delete;
94  
    select_scheduler& operator=(select_scheduler const&) = delete;
95  

95  

96  
    void shutdown() override;
96  
    void shutdown() override;
97  
    void post(std::coroutine_handle<> h) const override;
97  
    void post(std::coroutine_handle<> h) const override;
98  
    void post(scheduler_op* h) const override;
98  
    void post(scheduler_op* h) const override;
99  
    bool running_in_this_thread() const noexcept override;
99  
    bool running_in_this_thread() const noexcept override;
100  
    void stop() override;
100  
    void stop() override;
101  
    bool stopped() const noexcept override;
101  
    bool stopped() const noexcept override;
102  
    void restart() override;
102  
    void restart() override;
103  
    std::size_t run() override;
103  
    std::size_t run() override;
104  
    std::size_t run_one() override;
104  
    std::size_t run_one() override;
105  
    std::size_t wait_one(long usec) override;
105  
    std::size_t wait_one(long usec) override;
106  
    std::size_t poll() override;
106  
    std::size_t poll() override;
107  
    std::size_t poll_one() override;
107  
    std::size_t poll_one() override;
108  

108  

109  
    /** Return the maximum file descriptor value supported.
109  
    /** Return the maximum file descriptor value supported.
110  

110  

111  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
111  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
112  
        monitored by select(). Operations with fd >= FD_SETSIZE
112  
        monitored by select(). Operations with fd >= FD_SETSIZE
113  
        will fail with EINVAL.
113  
        will fail with EINVAL.
114  

114  

115  
        @return The maximum supported file descriptor value.
115  
        @return The maximum supported file descriptor value.
116  
    */
116  
    */
117  
    static constexpr int max_fd() noexcept
117  
    static constexpr int max_fd() noexcept
118  
    {
118  
    {
119  
        return FD_SETSIZE - 1;
119  
        return FD_SETSIZE - 1;
120  
    }
120  
    }
121  

121  

122  
    /** Register a file descriptor for monitoring.
122  
    /** Register a file descriptor for monitoring.
123  

123  

124  
        @param fd The file descriptor to register.
124  
        @param fd The file descriptor to register.
125  
        @param op The operation associated with this fd.
125  
        @param op The operation associated with this fd.
126  
        @param events Event mask: 1 = read, 2 = write, 3 = both.
126  
        @param events Event mask: 1 = read, 2 = write, 3 = both.
127  
    */
127  
    */
128  
    void register_fd(int fd, select_op* op, int events) const;
128  
    void register_fd(int fd, select_op* op, int events) const;
129  

129  

130  
    /** Unregister a file descriptor from monitoring.
130  
    /** Unregister a file descriptor from monitoring.
131  

131  

132  
        @param fd The file descriptor to unregister.
132  
        @param fd The file descriptor to unregister.
133  
        @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
133  
        @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
134  
    */
134  
    */
135  
    void deregister_fd(int fd, int events) const;
135  
    void deregister_fd(int fd, int events) const;
136  

136  

137  
    void work_started() noexcept override;
137  
    void work_started() noexcept override;
138  
    void work_finished() noexcept override;
138  
    void work_finished() noexcept override;
139  

139  

140  
    // Event flags for register_fd/deregister_fd
140  
    // Event flags for register_fd/deregister_fd
141  
    static constexpr int event_read  = 1;
141  
    static constexpr int event_read  = 1;
142  
    static constexpr int event_write = 2;
142  
    static constexpr int event_write = 2;
143  

143  

144  
private:
144  
private:
145  
    std::size_t do_one(long timeout_us);
145  
    std::size_t do_one(long timeout_us);
146  
    void run_reactor(std::unique_lock<std::mutex>& lock);
146  
    void run_reactor(std::unique_lock<std::mutex>& lock);
147  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
147  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
148  
    void interrupt_reactor() const;
148  
    void interrupt_reactor() const;
149  
    long calculate_timeout(long requested_timeout_us) const;
149  
    long calculate_timeout(long requested_timeout_us) const;
150  

150  

151  
    // Self-pipe for interrupting select()
151  
    // Self-pipe for interrupting select()
152  
    int pipe_fds_[2]; // [0]=read, [1]=write
152  
    int pipe_fds_[2]; // [0]=read, [1]=write
153  

153  

154  
    mutable std::mutex mutex_;
154  
    mutable std::mutex mutex_;
155  
    mutable std::condition_variable wakeup_event_;
155  
    mutable std::condition_variable wakeup_event_;
156  
    mutable op_queue completed_ops_;
156  
    mutable op_queue completed_ops_;
157  
    mutable std::atomic<long> outstanding_work_;
157  
    mutable std::atomic<long> outstanding_work_;
158  
    std::atomic<bool> stopped_;
158  
    std::atomic<bool> stopped_;
159  
    bool shutdown_;
159  
    bool shutdown_;
160  

160  

161  
    // Per-fd state for tracking registered operations
161  
    // Per-fd state for tracking registered operations
162  
    struct fd_state
162  
    struct fd_state
163  
    {
163  
    {
164  
        select_op* read_op  = nullptr;
164  
        select_op* read_op  = nullptr;
165  
        select_op* write_op = nullptr;
165  
        select_op* write_op = nullptr;
166  
    };
166  
    };
167  
    mutable std::unordered_map<int, fd_state> registered_fds_;
167  
    mutable std::unordered_map<int, fd_state> registered_fds_;
168  
    mutable int max_fd_ = -1;
168  
    mutable int max_fd_ = -1;
169  

169  

170  
    // Single reactor thread coordination
170  
    // Single reactor thread coordination
171  
    mutable bool reactor_running_     = false;
171  
    mutable bool reactor_running_     = false;
172  
    mutable bool reactor_interrupted_ = false;
172  
    mutable bool reactor_interrupted_ = false;
173  
    mutable int idle_thread_count_    = 0;
173  
    mutable int idle_thread_count_    = 0;
174  

174  

175  
    // Sentinel operation for interleaving reactor runs with handler execution.
175  
    // Sentinel operation for interleaving reactor runs with handler execution.
176  
    // Ensures the reactor runs periodically even when handlers are continuously
176  
    // Ensures the reactor runs periodically even when handlers are continuously
177  
    // posted, preventing timer starvation.
177  
    // posted, preventing timer starvation.
178  
    struct task_op final : scheduler_op
178  
    struct task_op final : scheduler_op
179  
    {
179  
    {
180  
        void operator()() override {}
180  
        void operator()() override {}
181  
        void destroy() override {}
181  
        void destroy() override {}
182  
    };
182  
    };
183  
    task_op task_op_;
183  
    task_op task_op_;
184  
};
184  
};
185  

185  

186  
/*
186  
/*
187  
    select Scheduler - Single Reactor Model
187  
    select Scheduler - Single Reactor Model
188  
    =======================================
188  
    =======================================
189  

189  

190  
    This scheduler mirrors the epoll_scheduler design but uses select() instead
190  
    This scheduler mirrors the epoll_scheduler design but uses select() instead
191  
    of epoll for I/O multiplexing. The thread coordination strategy is identical:
191  
    of epoll for I/O multiplexing. The thread coordination strategy is identical:
192  
    one thread becomes the "reactor" while others wait on a condition variable.
192  
    one thread becomes the "reactor" while others wait on a condition variable.
193  

193  

194  
    Thread Model
194  
    Thread Model
195  
    ------------
195  
    ------------
196  
    - ONE thread runs select() at a time (the reactor thread)
196  
    - ONE thread runs select() at a time (the reactor thread)
197  
    - OTHER threads wait on wakeup_event_ (condition variable) for handlers
197  
    - OTHER threads wait on wakeup_event_ (condition variable) for handlers
198  
    - When work is posted, exactly one waiting thread wakes via notify_one()
198  
    - When work is posted, exactly one waiting thread wakes via notify_one()
199  

199  

200  
    Key Differences from epoll
200  
    Key Differences from epoll
201  
    --------------------------
201  
    --------------------------
202  
    - Uses self-pipe instead of eventfd for interruption (more portable)
202  
    - Uses self-pipe instead of eventfd for interruption (more portable)
203  
    - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
203  
    - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
204  
    - FD_SETSIZE limit (~1024 fds on most systems)
204  
    - FD_SETSIZE limit (~1024 fds on most systems)
205  
    - Level-triggered only (no edge-triggered mode)
205  
    - Level-triggered only (no edge-triggered mode)
206  

206  

207  
    Self-Pipe Pattern
207  
    Self-Pipe Pattern
208  
    -----------------
208  
    -----------------
209  
    To interrupt a blocking select() call (e.g., when work is posted or a timer
209  
    To interrupt a blocking select() call (e.g., when work is posted or a timer
210  
    expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
210  
    expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
211  
    always in the read_fds set, so select() returns immediately. We drain the
211  
    always in the read_fds set, so select() returns immediately. We drain the
212  
    pipe to clear the readable state.
212  
    pipe to clear the readable state.
213  

213  

214  
    fd-to-op Mapping
214  
    fd-to-op Mapping
215  
    ----------------
215  
    ----------------
216  
    We use an unordered_map<int, fd_state> to track which operations are
216  
    We use an unordered_map<int, fd_state> to track which operations are
217  
    registered for each fd. This allows O(1) lookup when select() returns
217  
    registered for each fd. This allows O(1) lookup when select() returns
218  
    ready fds. Each fd can have at most one read op and one write op registered.
218  
    ready fds. Each fd can have at most one read op and one write op registered.
219  
*/
219  
*/
220  

220  

221  
namespace select {
221  
namespace select {
222  

222  

223  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
223  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
224  
{
224  
{
225  
    select_scheduler const* key;
225  
    select_scheduler const* key;
226  
    scheduler_context* next;
226  
    scheduler_context* next;
227  
};
227  
};
228  

228  

229  
inline thread_local_ptr<scheduler_context> context_stack;
229  
inline thread_local_ptr<scheduler_context> context_stack;
230  

230  

231  
struct thread_context_guard
231  
struct thread_context_guard
232  
{
232  
{
233  
    scheduler_context frame_;
233  
    scheduler_context frame_;
234  

234  

235  
    explicit thread_context_guard(select_scheduler const* ctx) noexcept
235  
    explicit thread_context_guard(select_scheduler const* ctx) noexcept
236  
        : frame_{ctx, context_stack.get()}
236  
        : frame_{ctx, context_stack.get()}
237  
    {
237  
    {
238  
        context_stack.set(&frame_);
238  
        context_stack.set(&frame_);
239  
    }
239  
    }
240  

240  

241  
    ~thread_context_guard() noexcept
241  
    ~thread_context_guard() noexcept
242  
    {
242  
    {
243  
        context_stack.set(frame_.next);
243  
        context_stack.set(frame_.next);
244  
    }
244  
    }
245  
};
245  
};
246  

246  

247  
struct work_guard
247  
struct work_guard
248  
{
248  
{
249  
    select_scheduler* self;
249  
    select_scheduler* self;
250  
    ~work_guard()
250  
    ~work_guard()
251  
    {
251  
    {
252  
        self->work_finished();
252  
        self->work_finished();
253  
    }
253  
    }
254  
};
254  
};
255  

255  

256  
} // namespace select
256  
} // namespace select
257  

257  

258  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
258  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
259  
    : pipe_fds_{-1, -1}
259  
    : pipe_fds_{-1, -1}
260  
    , outstanding_work_(0)
260  
    , outstanding_work_(0)
261  
    , stopped_(false)
261  
    , stopped_(false)
262  
    , shutdown_(false)
262  
    , shutdown_(false)
263  
    , max_fd_(-1)
263  
    , max_fd_(-1)
264  
    , reactor_running_(false)
264  
    , reactor_running_(false)
265  
    , reactor_interrupted_(false)
265  
    , reactor_interrupted_(false)
266  
    , idle_thread_count_(0)
266  
    , idle_thread_count_(0)
267  
{
267  
{
268  
    // Create self-pipe for interrupting select()
268  
    // Create self-pipe for interrupting select()
269  
    if (::pipe(pipe_fds_) < 0)
269  
    if (::pipe(pipe_fds_) < 0)
270  
        detail::throw_system_error(make_err(errno), "pipe");
270  
        detail::throw_system_error(make_err(errno), "pipe");
271  

271  

272  
    // Set both ends to non-blocking and close-on-exec
272  
    // Set both ends to non-blocking and close-on-exec
273  
    for (int i = 0; i < 2; ++i)
273  
    for (int i = 0; i < 2; ++i)
274  
    {
274  
    {
275  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
275  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
276  
        if (flags == -1)
276  
        if (flags == -1)
277  
        {
277  
        {
278  
            int errn = errno;
278  
            int errn = errno;
279  
            ::close(pipe_fds_[0]);
279  
            ::close(pipe_fds_[0]);
280  
            ::close(pipe_fds_[1]);
280  
            ::close(pipe_fds_[1]);
281  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
281  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
282  
        }
282  
        }
283  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
283  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
284  
        {
284  
        {
285  
            int errn = errno;
285  
            int errn = errno;
286  
            ::close(pipe_fds_[0]);
286  
            ::close(pipe_fds_[0]);
287  
            ::close(pipe_fds_[1]);
287  
            ::close(pipe_fds_[1]);
288  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
288  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
289  
        }
289  
        }
290  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
290  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
291  
        {
291  
        {
292  
            int errn = errno;
292  
            int errn = errno;
293  
            ::close(pipe_fds_[0]);
293  
            ::close(pipe_fds_[0]);
294  
            ::close(pipe_fds_[1]);
294  
            ::close(pipe_fds_[1]);
295  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
295  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
296  
        }
296  
        }
297  
    }
297  
    }
298  

298  

299  
    timer_svc_ = &get_timer_service(ctx, *this);
299  
    timer_svc_ = &get_timer_service(ctx, *this);
300  
    timer_svc_->set_on_earliest_changed(
300  
    timer_svc_->set_on_earliest_changed(
301  
        timer_service::callback(this, [](void* p) {
301  
        timer_service::callback(this, [](void* p) {
302  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
302  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
303  
        }));
303  
        }));
304  

304  

305  
    // Initialize resolver service
305  
    // Initialize resolver service
306  
    get_resolver_service(ctx, *this);
306  
    get_resolver_service(ctx, *this);
307  

307  

308  
    // Initialize signal service
308  
    // Initialize signal service
309  
    get_signal_service(ctx, *this);
309  
    get_signal_service(ctx, *this);
310  

310  

311  
    // Push task sentinel to interleave reactor runs with handler execution
311  
    // Push task sentinel to interleave reactor runs with handler execution
312  
    completed_ops_.push(&task_op_);
312  
    completed_ops_.push(&task_op_);
313  
}
313  
}
314  

314  

315  
inline select_scheduler::~select_scheduler()
315  
inline select_scheduler::~select_scheduler()
316  
{
316  
{
317  
    if (pipe_fds_[0] >= 0)
317  
    if (pipe_fds_[0] >= 0)
318  
        ::close(pipe_fds_[0]);
318  
        ::close(pipe_fds_[0]);
319  
    if (pipe_fds_[1] >= 0)
319  
    if (pipe_fds_[1] >= 0)
320  
        ::close(pipe_fds_[1]);
320  
        ::close(pipe_fds_[1]);
321  
}
321  
}
322  

322  

323  
inline void
323  
inline void
324  
select_scheduler::shutdown()
324  
select_scheduler::shutdown()
325  
{
325  
{
326  
    {
326  
    {
327  
        std::unique_lock lock(mutex_);
327  
        std::unique_lock lock(mutex_);
328  
        shutdown_ = true;
328  
        shutdown_ = true;
329  

329  

330  
        while (auto* h = completed_ops_.pop())
330  
        while (auto* h = completed_ops_.pop())
331  
        {
331  
        {
332  
            if (h == &task_op_)
332  
            if (h == &task_op_)
333  
                continue;
333  
                continue;
334  
            lock.unlock();
334  
            lock.unlock();
335  
            h->destroy();
335  
            h->destroy();
336  
            lock.lock();
336  
            lock.lock();
337  
        }
337  
        }
338  
    }
338  
    }
339  

339  

340  
    outstanding_work_.store(0, std::memory_order_release);
340  
    outstanding_work_.store(0, std::memory_order_release);
341  

341  

342  
    if (pipe_fds_[1] >= 0)
342  
    if (pipe_fds_[1] >= 0)
343  
        interrupt_reactor();
343  
        interrupt_reactor();
344  

344  

345  
    wakeup_event_.notify_all();
345  
    wakeup_event_.notify_all();
346  
}
346  
}
347  

347  

348  
inline void
348  
inline void
349  
select_scheduler::post(std::coroutine_handle<> h) const
349  
select_scheduler::post(std::coroutine_handle<> h) const
350  
{
350  
{
351  
    struct post_handler final : scheduler_op
351  
    struct post_handler final : scheduler_op
352  
    {
352  
    {
353  
        std::coroutine_handle<> h_;
353  
        std::coroutine_handle<> h_;
354  

354  

355  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
355  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
356  

356  

357  
        ~post_handler() override = default;
357  
        ~post_handler() override = default;
358  

358  

359  
        void operator()() override
359  
        void operator()() override
360  
        {
360  
        {
361  
            auto h = h_;
361  
            auto h = h_;
362  
            delete this;
362  
            delete this;
363  
            h.resume();
363  
            h.resume();
364  
        }
364  
        }
365  

365  

366  
        void destroy() override
366  
        void destroy() override
367  
        {
367  
        {
368  
            delete this;
368  
            delete this;
369  
        }
369  
        }
370  
    };
370  
    };
371  

371  

372  
    auto ph = std::make_unique<post_handler>(h);
372  
    auto ph = std::make_unique<post_handler>(h);
373  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
373  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
374  

374  

375  
    std::unique_lock lock(mutex_);
375  
    std::unique_lock lock(mutex_);
376  
    completed_ops_.push(ph.release());
376  
    completed_ops_.push(ph.release());
377  
    wake_one_thread_and_unlock(lock);
377  
    wake_one_thread_and_unlock(lock);
378  
}
378  
}
379  

379  

380  
inline void
380  
inline void
381  
select_scheduler::post(scheduler_op* h) const
381  
select_scheduler::post(scheduler_op* h) const
382  
{
382  
{
383  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
383  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
384  

384  

385  
    std::unique_lock lock(mutex_);
385  
    std::unique_lock lock(mutex_);
386  
    completed_ops_.push(h);
386  
    completed_ops_.push(h);
387  
    wake_one_thread_and_unlock(lock);
387  
    wake_one_thread_and_unlock(lock);
388  
}
388  
}
389  

389  

390  
inline bool
390  
inline bool
391  
select_scheduler::running_in_this_thread() const noexcept
391  
select_scheduler::running_in_this_thread() const noexcept
392  
{
392  
{
393  
    for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
393  
    for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
394  
        if (c->key == this)
394  
        if (c->key == this)
395  
            return true;
395  
            return true;
396  
    return false;
396  
    return false;
397  
}
397  
}
398  

398  

399  
inline void
399  
inline void
400  
select_scheduler::stop()
400  
select_scheduler::stop()
401  
{
401  
{
402  
    bool expected = false;
402  
    bool expected = false;
403  
    if (stopped_.compare_exchange_strong(
403  
    if (stopped_.compare_exchange_strong(
404  
            expected, true, std::memory_order_release,
404  
            expected, true, std::memory_order_release,
405  
            std::memory_order_relaxed))
405  
            std::memory_order_relaxed))
406  
    {
406  
    {
407  
        // Wake all threads so they notice stopped_ and exit
407  
        // Wake all threads so they notice stopped_ and exit
408  
        {
408  
        {
409  
            std::lock_guard lock(mutex_);
409  
            std::lock_guard lock(mutex_);
410  
            wakeup_event_.notify_all();
410  
            wakeup_event_.notify_all();
411  
        }
411  
        }
412  
        interrupt_reactor();
412  
        interrupt_reactor();
413  
    }
413  
    }
414  
}
414  
}
415  

415  

416  
inline bool
416  
inline bool
417  
select_scheduler::stopped() const noexcept
417  
select_scheduler::stopped() const noexcept
418  
{
418  
{
419  
    return stopped_.load(std::memory_order_acquire);
419  
    return stopped_.load(std::memory_order_acquire);
420  
}
420  
}
421  

421  

422  
inline void
422  
inline void
423  
select_scheduler::restart()
423  
select_scheduler::restart()
424  
{
424  
{
425  
    stopped_.store(false, std::memory_order_release);
425  
    stopped_.store(false, std::memory_order_release);
426  
}
426  
}
427  

427  

428  
inline std::size_t
428  
inline std::size_t
429  
select_scheduler::run()
429  
select_scheduler::run()
430  
{
430  
{
431  
    if (stopped_.load(std::memory_order_acquire))
431  
    if (stopped_.load(std::memory_order_acquire))
432  
        return 0;
432  
        return 0;
433  

433  

434  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
434  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
435  
    {
435  
    {
436  
        stop();
436  
        stop();
437  
        return 0;
437  
        return 0;
438  
    }
438  
    }
439  

439  

440  
    select::thread_context_guard ctx(this);
440  
    select::thread_context_guard ctx(this);
441  

441  

442  
    std::size_t n = 0;
442  
    std::size_t n = 0;
443  
    while (do_one(-1))
443  
    while (do_one(-1))
444  
        if (n != (std::numeric_limits<std::size_t>::max)())
444  
        if (n != (std::numeric_limits<std::size_t>::max)())
445  
            ++n;
445  
            ++n;
446  
    return n;
446  
    return n;
447  
}
447  
}
448  

448  

449  
inline std::size_t
449  
inline std::size_t
450  
select_scheduler::run_one()
450  
select_scheduler::run_one()
451  
{
451  
{
452  
    if (stopped_.load(std::memory_order_acquire))
452  
    if (stopped_.load(std::memory_order_acquire))
453  
        return 0;
453  
        return 0;
454  

454  

455  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
455  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
456  
    {
456  
    {
457  
        stop();
457  
        stop();
458  
        return 0;
458  
        return 0;
459  
    }
459  
    }
460  

460  

461  
    select::thread_context_guard ctx(this);
461  
    select::thread_context_guard ctx(this);
462  
    return do_one(-1);
462  
    return do_one(-1);
463  
}
463  
}
464  

464  

465  
inline std::size_t
465  
inline std::size_t
466  
select_scheduler::wait_one(long usec)
466  
select_scheduler::wait_one(long usec)
467  
{
467  
{
468  
    if (stopped_.load(std::memory_order_acquire))
468  
    if (stopped_.load(std::memory_order_acquire))
469  
        return 0;
469  
        return 0;
470  

470  

471  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
471  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
472  
    {
472  
    {
473  
        stop();
473  
        stop();
474  
        return 0;
474  
        return 0;
475  
    }
475  
    }
476  

476  

477  
    select::thread_context_guard ctx(this);
477  
    select::thread_context_guard ctx(this);
478  
    return do_one(usec);
478  
    return do_one(usec);
479  
}
479  
}
480  

480  

481  
inline std::size_t
481  
inline std::size_t
482  
select_scheduler::poll()
482  
select_scheduler::poll()
483  
{
483  
{
484  
    if (stopped_.load(std::memory_order_acquire))
484  
    if (stopped_.load(std::memory_order_acquire))
485  
        return 0;
485  
        return 0;
486  

486  

487  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
487  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
488  
    {
488  
    {
489  
        stop();
489  
        stop();
490  
        return 0;
490  
        return 0;
491  
    }
491  
    }
492  

492  

493  
    select::thread_context_guard ctx(this);
493  
    select::thread_context_guard ctx(this);
494  

494  

495  
    std::size_t n = 0;
495  
    std::size_t n = 0;
496  
    while (do_one(0))
496  
    while (do_one(0))
497  
        if (n != (std::numeric_limits<std::size_t>::max)())
497  
        if (n != (std::numeric_limits<std::size_t>::max)())
498  
            ++n;
498  
            ++n;
499  
    return n;
499  
    return n;
500  
}
500  
}
501  

501  

502  
inline std::size_t
502  
inline std::size_t
503  
select_scheduler::poll_one()
503  
select_scheduler::poll_one()
504  
{
504  
{
505  
    if (stopped_.load(std::memory_order_acquire))
505  
    if (stopped_.load(std::memory_order_acquire))
506  
        return 0;
506  
        return 0;
507  

507  

508  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
508  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
509  
    {
509  
    {
510  
        stop();
510  
        stop();
511  
        return 0;
511  
        return 0;
512  
    }
512  
    }
513  

513  

514  
    select::thread_context_guard ctx(this);
514  
    select::thread_context_guard ctx(this);
515  
    return do_one(0);
515  
    return do_one(0);
516  
}
516  
}
517  

517  

518  
inline void
518  
inline void
519  
select_scheduler::register_fd(int fd, select_op* op, int events) const
519  
select_scheduler::register_fd(int fd, select_op* op, int events) const
520  
{
520  
{
521  
    // Validate fd is within select() limits
521  
    // Validate fd is within select() limits
522  
    if (fd < 0 || fd >= FD_SETSIZE)
522  
    if (fd < 0 || fd >= FD_SETSIZE)
523  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
523  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
524  

524  

525  
    {
525  
    {
526  
        std::lock_guard lock(mutex_);
526  
        std::lock_guard lock(mutex_);
527  

527  

528  
        auto& state = registered_fds_[fd];
528  
        auto& state = registered_fds_[fd];
529  
        if (events & event_read)
529  
        if (events & event_read)
530  
            state.read_op = op;
530  
            state.read_op = op;
531  
        if (events & event_write)
531  
        if (events & event_write)
532  
            state.write_op = op;
532  
            state.write_op = op;
533  

533  

534  
        if (fd > max_fd_)
534  
        if (fd > max_fd_)
535  
            max_fd_ = fd;
535  
            max_fd_ = fd;
536  
    }
536  
    }
537  

537  

538  
    // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
538  
    // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
539  
    // with the newly registered fd.
539  
    // with the newly registered fd.
540  
    interrupt_reactor();
540  
    interrupt_reactor();
541  
}
541  
}
542  

542  

543  
inline void
543  
inline void
544  
select_scheduler::deregister_fd(int fd, int events) const
544  
select_scheduler::deregister_fd(int fd, int events) const
545  
{
545  
{
546  
    std::lock_guard lock(mutex_);
546  
    std::lock_guard lock(mutex_);
547  

547  

548  
    auto it = registered_fds_.find(fd);
548  
    auto it = registered_fds_.find(fd);
549  
    if (it == registered_fds_.end())
549  
    if (it == registered_fds_.end())
550  
        return;
550  
        return;
551  

551  

552  
    if (events & event_read)
552  
    if (events & event_read)
553  
        it->second.read_op = nullptr;
553  
        it->second.read_op = nullptr;
554  
    if (events & event_write)
554  
    if (events & event_write)
555  
        it->second.write_op = nullptr;
555  
        it->second.write_op = nullptr;
556  

556  

557  
    // Remove entry if both are null
557  
    // Remove entry if both are null
558  
    if (!it->second.read_op && !it->second.write_op)
558  
    if (!it->second.read_op && !it->second.write_op)
559  
    {
559  
    {
560  
        registered_fds_.erase(it);
560  
        registered_fds_.erase(it);
561  

561  

562  
        // Recalculate max_fd_ if needed
562  
        // Recalculate max_fd_ if needed
563  
        if (fd == max_fd_)
563  
        if (fd == max_fd_)
564  
        {
564  
        {
565  
            max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
565  
            max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
566  
            for (auto& [registered_fd, state] : registered_fds_)
566  
            for (auto& [registered_fd, state] : registered_fds_)
567  
            {
567  
            {
568  
                if (registered_fd > max_fd_)
568  
                if (registered_fd > max_fd_)
569  
                    max_fd_ = registered_fd;
569  
                    max_fd_ = registered_fd;
570  
            }
570  
            }
571  
        }
571  
        }
572  
    }
572  
    }
573  
}
573  
}
574  

574  

575  
inline void
575  
inline void
576  
select_scheduler::work_started() noexcept
576  
select_scheduler::work_started() noexcept
577  
{
577  
{
578  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
578  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
579  
}
579  
}
580  

580  

581  
inline void
581  
inline void
582  
select_scheduler::work_finished() noexcept
582  
select_scheduler::work_finished() noexcept
583  
{
583  
{
584  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
584  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
585  
        stop();
585  
        stop();
586  
}
586  
}
587  

587  

588  
inline void
588  
inline void
589  
select_scheduler::interrupt_reactor() const
589  
select_scheduler::interrupt_reactor() const
590  
{
590  
{
591  
    char byte               = 1;
591  
    char byte               = 1;
592  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
592  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
593  
}
593  
}
594  

594  

595  
inline void
595  
inline void
596  
select_scheduler::wake_one_thread_and_unlock(
596  
select_scheduler::wake_one_thread_and_unlock(
597  
    std::unique_lock<std::mutex>& lock) const
597  
    std::unique_lock<std::mutex>& lock) const
598  
{
598  
{
599  
    if (idle_thread_count_ > 0)
599  
    if (idle_thread_count_ > 0)
600  
    {
600  
    {
601  
        // Idle worker exists - wake it via condvar
601  
        // Idle worker exists - wake it via condvar
602  
        wakeup_event_.notify_one();
602  
        wakeup_event_.notify_one();
603  
        lock.unlock();
603  
        lock.unlock();
604  
    }
604  
    }
605  
    else if (reactor_running_ && !reactor_interrupted_)
605  
    else if (reactor_running_ && !reactor_interrupted_)
606  
    {
606  
    {
607  
        // No idle workers but reactor is running - interrupt it
607  
        // No idle workers but reactor is running - interrupt it
608  
        reactor_interrupted_ = true;
608  
        reactor_interrupted_ = true;
609  
        lock.unlock();
609  
        lock.unlock();
610  
        interrupt_reactor();
610  
        interrupt_reactor();
611  
    }
611  
    }
612  
    else
612  
    else
613  
    {
613  
    {
614  
        // No one to wake
614  
        // No one to wake
615  
        lock.unlock();
615  
        lock.unlock();
616  
    }
616  
    }
617  
}
617  
}
618  

618  

619  
inline long
619  
inline long
620  
select_scheduler::calculate_timeout(long requested_timeout_us) const
620  
select_scheduler::calculate_timeout(long requested_timeout_us) const
621  
{
621  
{
622  
    if (requested_timeout_us == 0)
622  
    if (requested_timeout_us == 0)
623  
        return 0;
623  
        return 0;
624  

624  

625  
    auto nearest = timer_svc_->nearest_expiry();
625  
    auto nearest = timer_svc_->nearest_expiry();
626  
    if (nearest == timer_service::time_point::max())
626  
    if (nearest == timer_service::time_point::max())
627  
        return requested_timeout_us;
627  
        return requested_timeout_us;
628  

628  

629  
    auto now = std::chrono::steady_clock::now();
629  
    auto now = std::chrono::steady_clock::now();
630  
    if (nearest <= now)
630  
    if (nearest <= now)
631  
        return 0;
631  
        return 0;
632  

632  

633  
    auto timer_timeout_us =
633  
    auto timer_timeout_us =
634  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
634  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
635  
            .count();
635  
            .count();
636  

636  

637  
    // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
637  
    // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
638  
    constexpr auto long_max =
638  
    constexpr auto long_max =
639  
        static_cast<long long>((std::numeric_limits<long>::max)());
639  
        static_cast<long long>((std::numeric_limits<long>::max)());
640  
    auto capped_timer_us =
640  
    auto capped_timer_us =
641  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
641  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
642  
                              static_cast<long long>(0)),
642  
                              static_cast<long long>(0)),
643  
                   long_max);
643  
                   long_max);
644  

644  

645  
    if (requested_timeout_us < 0)
645  
    if (requested_timeout_us < 0)
646  
        return static_cast<long>(capped_timer_us);
646  
        return static_cast<long>(capped_timer_us);
647  

647  

648  
    // requested_timeout_us is already long, so min() result fits in long
648  
    // requested_timeout_us is already long, so min() result fits in long
649  
    return static_cast<long>(
649  
    return static_cast<long>(
650  
        (std::min)(static_cast<long long>(requested_timeout_us),
650  
        (std::min)(static_cast<long long>(requested_timeout_us),
651  
                   capped_timer_us));
651  
                   capped_timer_us));
652  
}
652  
}
653  

653  

654  
inline void
654  
inline void
655  
select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
655  
select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
656  
{
656  
{
657  
    // Calculate timeout considering timers, use 0 if interrupted
657  
    // Calculate timeout considering timers, use 0 if interrupted
658  
    long effective_timeout_us =
658  
    long effective_timeout_us =
659  
        reactor_interrupted_ ? 0 : calculate_timeout(-1);
659  
        reactor_interrupted_ ? 0 : calculate_timeout(-1);
660  

660  

661  
    // Build fd_sets from registered_fds_
661  
    // Build fd_sets from registered_fds_
662  
    fd_set read_fds, write_fds, except_fds;
662  
    fd_set read_fds, write_fds, except_fds;
663  
    FD_ZERO(&read_fds);
663  
    FD_ZERO(&read_fds);
664  
    FD_ZERO(&write_fds);
664  
    FD_ZERO(&write_fds);
665  
    FD_ZERO(&except_fds);
665  
    FD_ZERO(&except_fds);
666  

666  

667  
    // Always include the interrupt pipe
667  
    // Always include the interrupt pipe
668  
    FD_SET(pipe_fds_[0], &read_fds);
668  
    FD_SET(pipe_fds_[0], &read_fds);
669  
    int nfds = pipe_fds_[0];
669  
    int nfds = pipe_fds_[0];
670  

670  

671  
    // Add registered fds
671  
    // Add registered fds
672  
    for (auto& [fd, state] : registered_fds_)
672  
    for (auto& [fd, state] : registered_fds_)
673  
    {
673  
    {
674  
        if (state.read_op)
674  
        if (state.read_op)
675  
            FD_SET(fd, &read_fds);
675  
            FD_SET(fd, &read_fds);
676  
        if (state.write_op)
676  
        if (state.write_op)
677  
        {
677  
        {
678  
            FD_SET(fd, &write_fds);
678  
            FD_SET(fd, &write_fds);
679  
            // Also monitor for errors on connect operations
679  
            // Also monitor for errors on connect operations
680  
            FD_SET(fd, &except_fds);
680  
            FD_SET(fd, &except_fds);
681  
        }
681  
        }
682  
        if (fd > nfds)
682  
        if (fd > nfds)
683  
            nfds = fd;
683  
            nfds = fd;
684  
    }
684  
    }
685  

685  

686  
    // Convert timeout to timeval
686  
    // Convert timeout to timeval
687  
    struct timeval tv;
687  
    struct timeval tv;
688  
    struct timeval* tv_ptr = nullptr;
688  
    struct timeval* tv_ptr = nullptr;
689  
    if (effective_timeout_us >= 0)
689  
    if (effective_timeout_us >= 0)
690  
    {
690  
    {
691  
        tv.tv_sec  = effective_timeout_us / 1000000;
691  
        tv.tv_sec  = effective_timeout_us / 1000000;
692  
        tv.tv_usec = effective_timeout_us % 1000000;
692  
        tv.tv_usec = effective_timeout_us % 1000000;
693  
        tv_ptr     = &tv;
693  
        tv_ptr     = &tv;
694  
    }
694  
    }
695  

695  

696  
    lock.unlock();
696  
    lock.unlock();
697  

697  

698  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
698  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
699  
    int saved_errno = errno;
699  
    int saved_errno = errno;
700  

700  

701  
    // Process timers outside the lock
701  
    // Process timers outside the lock
702  
    timer_svc_->process_expired();
702  
    timer_svc_->process_expired();
703  

703  

704  
    if (ready < 0 && saved_errno != EINTR)
704  
    if (ready < 0 && saved_errno != EINTR)
705  
        detail::throw_system_error(make_err(saved_errno), "select");
705  
        detail::throw_system_error(make_err(saved_errno), "select");
706  

706  

707  
    // Re-acquire lock before modifying completed_ops_
707  
    // Re-acquire lock before modifying completed_ops_
708  
    lock.lock();
708  
    lock.lock();
709  

709  

710  
    // Drain the interrupt pipe if readable
710  
    // Drain the interrupt pipe if readable
711  
    if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
711  
    if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
712  
    {
712  
    {
713  
        char buf[256];
713  
        char buf[256];
714  
        while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
714  
        while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
715  
        {
715  
        {
716  
        }
716  
        }
717  
    }
717  
    }
718  

718  

719  
    // Process I/O completions
719  
    // Process I/O completions
720  
    int completions_queued = 0;
720  
    int completions_queued = 0;
721  
    if (ready > 0)
721  
    if (ready > 0)
722  
    {
722  
    {
723  
        // Iterate over registered fds (copy keys to avoid iterator invalidation)
723  
        // Iterate over registered fds (copy keys to avoid iterator invalidation)
724  
        std::vector<int> fds_to_check;
724  
        std::vector<int> fds_to_check;
725  
        fds_to_check.reserve(registered_fds_.size());
725  
        fds_to_check.reserve(registered_fds_.size());
726  
        for (auto& [fd, state] : registered_fds_)
726  
        for (auto& [fd, state] : registered_fds_)
727  
            fds_to_check.push_back(fd);
727  
            fds_to_check.push_back(fd);
728  

728  

729  
        for (int fd : fds_to_check)
729  
        for (int fd : fds_to_check)
730  
        {
730  
        {
731  
            auto it = registered_fds_.find(fd);
731  
            auto it = registered_fds_.find(fd);
732  
            if (it == registered_fds_.end())
732  
            if (it == registered_fds_.end())
733  
                continue;
733  
                continue;
734  

734  

735  
            auto& state = it->second;
735  
            auto& state = it->second;
736  

736  

737  
            // Check for errors (especially for connect operations)
737  
            // Check for errors (especially for connect operations)
738  
            bool has_error = FD_ISSET(fd, &except_fds);
738  
            bool has_error = FD_ISSET(fd, &except_fds);
739  

739  

740  
            // Process read readiness
740  
            // Process read readiness
741  
            if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
741  
            if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
742  
            {
742  
            {
743  
                auto* op = state.read_op;
743  
                auto* op = state.read_op;
744  
                // Claim the op by exchanging to unregistered. Both registering and
744  
                // Claim the op by exchanging to unregistered. Both registering and
745  
                // registered states mean the op is ours to complete.
745  
                // registered states mean the op is ours to complete.
746  
                auto prev = op->registered.exchange(
746  
                auto prev = op->registered.exchange(
747  
                    select_registration_state::unregistered,
747  
                    select_registration_state::unregistered,
748  
                    std::memory_order_acq_rel);
748  
                    std::memory_order_acq_rel);
749  
                if (prev != select_registration_state::unregistered)
749  
                if (prev != select_registration_state::unregistered)
750  
                {
750  
                {
751  
                    state.read_op = nullptr;
751  
                    state.read_op = nullptr;
752  

752  

753  
                    if (has_error)
753  
                    if (has_error)
754  
                    {
754  
                    {
755  
                        int errn      = 0;
755  
                        int errn      = 0;
756  
                        socklen_t len = sizeof(errn);
756  
                        socklen_t len = sizeof(errn);
757  
                        if (::getsockopt(
757  
                        if (::getsockopt(
758  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
758  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
759  
                            errn = errno;
759  
                            errn = errno;
760  
                        if (errn == 0)
760  
                        if (errn == 0)
761  
                            errn = EIO;
761  
                            errn = EIO;
762  
                        op->complete(errn, 0);
762  
                        op->complete(errn, 0);
763  
                    }
763  
                    }
764  
                    else
764  
                    else
765  
                    {
765  
                    {
766  
                        op->perform_io();
766  
                        op->perform_io();
767  
                    }
767  
                    }
768  

768  

769  
                    completed_ops_.push(op);
769  
                    completed_ops_.push(op);
770  
                    ++completions_queued;
770  
                    ++completions_queued;
771  
                }
771  
                }
772  
            }
772  
            }
773  

773  

774  
            // Process write readiness
774  
            // Process write readiness
775  
            if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
775  
            if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
776  
            {
776  
            {
777  
                auto* op = state.write_op;
777  
                auto* op = state.write_op;
778  
                // Claim the op by exchanging to unregistered. Both registering and
778  
                // Claim the op by exchanging to unregistered. Both registering and
779  
                // registered states mean the op is ours to complete.
779  
                // registered states mean the op is ours to complete.
780  
                auto prev = op->registered.exchange(
780  
                auto prev = op->registered.exchange(
781  
                    select_registration_state::unregistered,
781  
                    select_registration_state::unregistered,
782  
                    std::memory_order_acq_rel);
782  
                    std::memory_order_acq_rel);
783  
                if (prev != select_registration_state::unregistered)
783  
                if (prev != select_registration_state::unregistered)
784  
                {
784  
                {
785  
                    state.write_op = nullptr;
785  
                    state.write_op = nullptr;
786  

786  

787  
                    if (has_error)
787  
                    if (has_error)
788  
                    {
788  
                    {
789  
                        int errn      = 0;
789  
                        int errn      = 0;
790  
                        socklen_t len = sizeof(errn);
790  
                        socklen_t len = sizeof(errn);
791  
                        if (::getsockopt(
791  
                        if (::getsockopt(
792  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
792  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
793  
                            errn = errno;
793  
                            errn = errno;
794  
                        if (errn == 0)
794  
                        if (errn == 0)
795  
                            errn = EIO;
795  
                            errn = EIO;
796  
                        op->complete(errn, 0);
796  
                        op->complete(errn, 0);
797  
                    }
797  
                    }
798  
                    else
798  
                    else
799  
                    {
799  
                    {
800  
                        op->perform_io();
800  
                        op->perform_io();
801  
                    }
801  
                    }
802  

802  

803  
                    completed_ops_.push(op);
803  
                    completed_ops_.push(op);
804  
                    ++completions_queued;
804  
                    ++completions_queued;
805  
                }
805  
                }
806  
            }
806  
            }
807  

807  

808  
            // Clean up empty entries
808  
            // Clean up empty entries
809  
            if (!state.read_op && !state.write_op)
809  
            if (!state.read_op && !state.write_op)
810  
                registered_fds_.erase(it);
810  
                registered_fds_.erase(it);
811  
        }
811  
        }
812  
    }
812  
    }
813  

813  

814  
    if (completions_queued > 0)
814  
    if (completions_queued > 0)
815  
    {
815  
    {
816  
        if (completions_queued == 1)
816  
        if (completions_queued == 1)
817  
            wakeup_event_.notify_one();
817  
            wakeup_event_.notify_one();
818  
        else
818  
        else
819  
            wakeup_event_.notify_all();
819  
            wakeup_event_.notify_all();
820  
    }
820  
    }
821  
}
821  
}
822  

822  

823  
inline std::size_t
823  
inline std::size_t
824  
select_scheduler::do_one(long timeout_us)
824  
select_scheduler::do_one(long timeout_us)
825  
{
825  
{
826  
    std::unique_lock lock(mutex_);
826  
    std::unique_lock lock(mutex_);
827  

827  

828  
    for (;;)
828  
    for (;;)
829  
    {
829  
    {
830  
        if (stopped_.load(std::memory_order_acquire))
830  
        if (stopped_.load(std::memory_order_acquire))
831  
            return 0;
831  
            return 0;
832  

832  

833  
        scheduler_op* op = completed_ops_.pop();
833  
        scheduler_op* op = completed_ops_.pop();
834  

834  

835  
        if (op == &task_op_)
835  
        if (op == &task_op_)
836  
        {
836  
        {
837  
            bool more_handlers = !completed_ops_.empty();
837  
            bool more_handlers = !completed_ops_.empty();
838  

838  

839  
            if (!more_handlers)
839  
            if (!more_handlers)
840  
            {
840  
            {
841  
                if (outstanding_work_.load(std::memory_order_acquire) == 0)
841  
                if (outstanding_work_.load(std::memory_order_acquire) == 0)
842  
                {
842  
                {
843  
                    completed_ops_.push(&task_op_);
843  
                    completed_ops_.push(&task_op_);
844  
                    return 0;
844  
                    return 0;
845  
                }
845  
                }
846  
                if (timeout_us == 0)
846  
                if (timeout_us == 0)
847  
                {
847  
                {
848  
                    completed_ops_.push(&task_op_);
848  
                    completed_ops_.push(&task_op_);
849  
                    return 0;
849  
                    return 0;
850  
                }
850  
                }
851  
            }
851  
            }
852  

852  

853  
            reactor_interrupted_ = more_handlers || timeout_us == 0;
853  
            reactor_interrupted_ = more_handlers || timeout_us == 0;
854  
            reactor_running_     = true;
854  
            reactor_running_     = true;
855  

855  

856  
            if (more_handlers && idle_thread_count_ > 0)
856  
            if (more_handlers && idle_thread_count_ > 0)
857  
                wakeup_event_.notify_one();
857  
                wakeup_event_.notify_one();
858  

858  

859  
            run_reactor(lock);
859  
            run_reactor(lock);
860  

860  

861  
            reactor_running_ = false;
861  
            reactor_running_ = false;
862  
            completed_ops_.push(&task_op_);
862  
            completed_ops_.push(&task_op_);
863  
            continue;
863  
            continue;
864  
        }
864  
        }
865  

865  

866  
        if (op != nullptr)
866  
        if (op != nullptr)
867  
        {
867  
        {
868  
            lock.unlock();
868  
            lock.unlock();
869  
            select::work_guard g{this};
869  
            select::work_guard g{this};
870  
            (*op)();
870  
            (*op)();
871  
            return 1;
871  
            return 1;
872  
        }
872  
        }
873  

873  

874  
        if (outstanding_work_.load(std::memory_order_acquire) == 0)
874  
        if (outstanding_work_.load(std::memory_order_acquire) == 0)
875  
            return 0;
875  
            return 0;
876  

876  

877  
        if (timeout_us == 0)
877  
        if (timeout_us == 0)
878  
            return 0;
878  
            return 0;
879  

879  

880  
        ++idle_thread_count_;
880  
        ++idle_thread_count_;
881  
        if (timeout_us < 0)
881  
        if (timeout_us < 0)
882  
            wakeup_event_.wait(lock);
882  
            wakeup_event_.wait(lock);
883  
        else
883  
        else
884  
            wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
884  
            wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
885  
        --idle_thread_count_;
885  
        --idle_thread_count_;
886  
    }
886  
    }
887  
}
887  
}
888  

888  

889  
} // namespace boost::corosio::detail
889  
} // namespace boost::corosio::detail
890  

890  

891  
#endif // BOOST_COROSIO_HAS_SELECT
891  
#endif // BOOST_COROSIO_HAS_SELECT
892  

892  

893  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
893  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP