1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/native_scheduler.hpp>
20  
#include <boost/corosio/native/native_scheduler.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
22  

22  

23  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28  

28  

29  
#include <boost/corosio/detail/except.hpp>
29  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <chrono>
33  
#include <chrono>
34  
#include <condition_variable>
34  
#include <condition_variable>
35  
#include <cstddef>
35  
#include <cstddef>
36  
#include <cstdint>
36  
#include <cstdint>
37  
#include <limits>
37  
#include <limits>
38  
#include <mutex>
38  
#include <mutex>
39  
#include <utility>
39  
#include <utility>
40  

40  

41  
#include <errno.h>
41  
#include <errno.h>
42  
#include <fcntl.h>
42  
#include <fcntl.h>
43  
#include <sys/epoll.h>
43  
#include <sys/epoll.h>
44  
#include <sys/eventfd.h>
44  
#include <sys/eventfd.h>
45  
#include <sys/socket.h>
45  
#include <sys/socket.h>
46  
#include <sys/timerfd.h>
46  
#include <sys/timerfd.h>
47  
#include <unistd.h>
47  
#include <unistd.h>
48  

48  

49  
namespace boost::corosio::detail {
49  
namespace boost::corosio::detail {
50  

50  

51  
struct epoll_op;
51  
struct epoll_op;
52  
struct descriptor_state;
52  
struct descriptor_state;
53  
namespace epoll {
53  
namespace epoll {
54  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
54  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
55  
} // namespace epoll
55  
} // namespace epoll
56  

56  

57  
/** Linux scheduler using epoll for I/O multiplexing.
57  
/** Linux scheduler using epoll for I/O multiplexing.
58  

58  

59  
    This scheduler implements the scheduler interface using Linux epoll
59  
    This scheduler implements the scheduler interface using Linux epoll
60  
    for efficient I/O event notification. It uses a single reactor model
60  
    for efficient I/O event notification. It uses a single reactor model
61  
    where one thread runs epoll_wait while other threads
61  
    where one thread runs epoll_wait while other threads
62  
    wait on a condition variable for handler work. This design provides:
62  
    wait on a condition variable for handler work. This design provides:
63  

63  

64  
    - Handler parallelism: N posted handlers can execute on N threads
64  
    - Handler parallelism: N posted handlers can execute on N threads
65  
    - No thundering herd: condition_variable wakes exactly one thread
65  
    - No thundering herd: condition_variable wakes exactly one thread
66  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
66  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
67  

67  

68  
    When threads call run(), they first try to execute queued handlers.
68  
    When threads call run(), they first try to execute queued handlers.
69  
    If the queue is empty and no reactor is running, one thread becomes
69  
    If the queue is empty and no reactor is running, one thread becomes
70  
    the reactor and runs epoll_wait. Other threads wait on a condition
70  
    the reactor and runs epoll_wait. Other threads wait on a condition
71  
    variable until handlers are available.
71  
    variable until handlers are available.
72  

72  

73  
    @par Thread Safety
73  
    @par Thread Safety
74  
    All public member functions are thread-safe.
74  
    All public member functions are thread-safe.
75  
*/
75  
*/
76  
class BOOST_COROSIO_DECL epoll_scheduler final
76  
class BOOST_COROSIO_DECL epoll_scheduler final
77  
    : public native_scheduler
77  
    : public native_scheduler
78  
    , public capy::execution_context::service
78  
    , public capy::execution_context::service
79  
{
79  
{
80  
public:
80  
public:
81  
    using key_type = scheduler;
81  
    using key_type = scheduler;
82  

82  

83  
    /** Construct the scheduler.
83  
    /** Construct the scheduler.
84  

84  

85  
        Creates an epoll instance, eventfd for reactor interruption,
85  
        Creates an epoll instance, eventfd for reactor interruption,
86  
        and timerfd for kernel-managed timer expiry.
86  
        and timerfd for kernel-managed timer expiry.
87  

87  

88  
        @param ctx Reference to the owning execution_context.
88  
        @param ctx Reference to the owning execution_context.
89  
        @param concurrency_hint Hint for expected thread count (unused).
89  
        @param concurrency_hint Hint for expected thread count (unused).
90  
    */
90  
    */
91  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
91  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92  

92  

93  
    /// Destroy the scheduler.
93  
    /// Destroy the scheduler.
94  
    ~epoll_scheduler() override;
94  
    ~epoll_scheduler() override;
95  

95  

96  
    epoll_scheduler(epoll_scheduler const&)            = delete;
96  
    epoll_scheduler(epoll_scheduler const&)            = delete;
97  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
97  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98  

98  

99  
    void shutdown() override;
99  
    void shutdown() override;
100  
    void post(std::coroutine_handle<> h) const override;
100  
    void post(std::coroutine_handle<> h) const override;
101  
    void post(scheduler_op* h) const override;
101  
    void post(scheduler_op* h) const override;
102  
    bool running_in_this_thread() const noexcept override;
102  
    bool running_in_this_thread() const noexcept override;
103  
    void stop() override;
103  
    void stop() override;
104  
    bool stopped() const noexcept override;
104  
    bool stopped() const noexcept override;
105  
    void restart() override;
105  
    void restart() override;
106  
    std::size_t run() override;
106  
    std::size_t run() override;
107  
    std::size_t run_one() override;
107  
    std::size_t run_one() override;
108  
    std::size_t wait_one(long usec) override;
108  
    std::size_t wait_one(long usec) override;
109  
    std::size_t poll() override;
109  
    std::size_t poll() override;
110  
    std::size_t poll_one() override;
110  
    std::size_t poll_one() override;
111  

111  

112  
    /** Return the epoll file descriptor.
112  
    /** Return the epoll file descriptor.
113  

113  

114  
        Used by socket services to register file descriptors
114  
        Used by socket services to register file descriptors
115  
        for I/O event notification.
115  
        for I/O event notification.
116  

116  

117  
        @return The epoll file descriptor.
117  
        @return The epoll file descriptor.
118  
    */
118  
    */
119  
    int epoll_fd() const noexcept
119  
    int epoll_fd() const noexcept
120  
    {
120  
    {
121  
        return epoll_fd_;
121  
        return epoll_fd_;
122  
    }
122  
    }
123  

123  

124  
    /** Reset the thread's inline completion budget.
124  
    /** Reset the thread's inline completion budget.
125  

125  

126  
        Called at the start of each posted completion handler to
126  
        Called at the start of each posted completion handler to
127  
        grant a fresh budget for speculative inline completions.
127  
        grant a fresh budget for speculative inline completions.
128  
    */
128  
    */
129  
    void reset_inline_budget() const noexcept;
129  
    void reset_inline_budget() const noexcept;
130  

130  

131  
    /** Consume one unit of inline budget if available.
131  
    /** Consume one unit of inline budget if available.
132  

132  

133  
        @return True if budget was available and consumed.
133  
        @return True if budget was available and consumed.
134  
    */
134  
    */
135  
    bool try_consume_inline_budget() const noexcept;
135  
    bool try_consume_inline_budget() const noexcept;
136  

136  

137  
    /** Register a descriptor for persistent monitoring.
137  
    /** Register a descriptor for persistent monitoring.
138  

138  

139  
        The fd is registered once and stays registered until explicitly
139  
        The fd is registered once and stays registered until explicitly
140  
        deregistered. Events are dispatched via descriptor_state which
140  
        deregistered. Events are dispatched via descriptor_state which
141  
        tracks pending read/write/connect operations.
141  
        tracks pending read/write/connect operations.
142  

142  

143  
        @param fd The file descriptor to register.
143  
        @param fd The file descriptor to register.
144  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
144  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145  
    */
145  
    */
146  
    void register_descriptor(int fd, descriptor_state* desc) const;
146  
    void register_descriptor(int fd, descriptor_state* desc) const;
147  

147  

148  
    /** Deregister a persistently registered descriptor.
148  
    /** Deregister a persistently registered descriptor.
149  

149  

150  
        @param fd The file descriptor to deregister.
150  
        @param fd The file descriptor to deregister.
151  
    */
151  
    */
152  
    void deregister_descriptor(int fd) const;
152  
    void deregister_descriptor(int fd) const;
153  

153  

154  
    void work_started() noexcept override;
154  
    void work_started() noexcept override;
155  
    void work_finished() noexcept override;
155  
    void work_finished() noexcept override;
156  

156  

157  
    /** Offset a forthcoming work_finished from work_cleanup.
157  
    /** Offset a forthcoming work_finished from work_cleanup.
158  

158  

159  
        Called by descriptor_state when all I/O returned EAGAIN and no
159  
        Called by descriptor_state when all I/O returned EAGAIN and no
160  
        handler will be executed. Must be called from a scheduler thread.
160  
        handler will be executed. Must be called from a scheduler thread.
161  
    */
161  
    */
162  
    void compensating_work_started() const noexcept;
162  
    void compensating_work_started() const noexcept;
163  

163  

164  
    /** Drain work from thread context's private queue to global queue.
164  
    /** Drain work from thread context's private queue to global queue.
165  

165  

166  
        Called by thread_context_guard destructor when a thread exits run().
166  
        Called by thread_context_guard destructor when a thread exits run().
167  
        Transfers pending work to the global queue under mutex protection.
167  
        Transfers pending work to the global queue under mutex protection.
168  

168  

169  
        @param queue The private queue to drain.
169  
        @param queue The private queue to drain.
170  
        @param count Item count for wakeup decisions (wakes other threads if positive).
170  
        @param count Item count for wakeup decisions (wakes other threads if positive).
171  
    */
171  
    */
172  
    void drain_thread_queue(op_queue& queue, long count) const;
172  
    void drain_thread_queue(op_queue& queue, long count) const;
173  

173  

174  
    /** Post completed operations for deferred invocation.
174  
    /** Post completed operations for deferred invocation.
175  

175  

176  
        If called from a thread running this scheduler, operations go to
176  
        If called from a thread running this scheduler, operations go to
177  
        the thread's private queue (fast path). Otherwise, operations are
177  
        the thread's private queue (fast path). Otherwise, operations are
178  
        added to the global queue under mutex and a waiter is signaled.
178  
        added to the global queue under mutex and a waiter is signaled.
179  

179  

180  
        @par Preconditions
180  
        @par Preconditions
181  
        work_started() must have been called for each operation.
181  
        work_started() must have been called for each operation.
182  

182  

183  
        @param ops Queue of operations to post.
183  
        @param ops Queue of operations to post.
184  
    */
184  
    */
185  
    void post_deferred_completions(op_queue& ops) const;
185  
    void post_deferred_completions(op_queue& ops) const;
186  

186  

187  
private:
187  
private:
188  
    struct work_cleanup
188  
    struct work_cleanup
189  
    {
189  
    {
190  
        epoll_scheduler* scheduler;
190  
        epoll_scheduler* scheduler;
191  
        std::unique_lock<std::mutex>* lock;
191  
        std::unique_lock<std::mutex>* lock;
192  
        epoll::scheduler_context* ctx;
192  
        epoll::scheduler_context* ctx;
193  
        ~work_cleanup();
193  
        ~work_cleanup();
194  
    };
194  
    };
195  

195  

196  
    struct task_cleanup
196  
    struct task_cleanup
197  
    {
197  
    {
198  
        epoll_scheduler const* scheduler;
198  
        epoll_scheduler const* scheduler;
199  
        std::unique_lock<std::mutex>* lock;
199  
        std::unique_lock<std::mutex>* lock;
200  
        epoll::scheduler_context* ctx;
200  
        epoll::scheduler_context* ctx;
201  
        ~task_cleanup();
201  
        ~task_cleanup();
202  
    };
202  
    };
203  

203  

204  
    std::size_t do_one(
204  
    std::size_t do_one(
205  
        std::unique_lock<std::mutex>& lock,
205  
        std::unique_lock<std::mutex>& lock,
206  
        long timeout_us,
206  
        long timeout_us,
207  
        epoll::scheduler_context* ctx);
207  
        epoll::scheduler_context* ctx);
208  
    void
208  
    void
209  
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
209  
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
210  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
210  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
211  
    void interrupt_reactor() const;
211  
    void interrupt_reactor() const;
212  
    void update_timerfd() const;
212  
    void update_timerfd() const;
213  

213  

214  
    /** Set the signaled state and wake all waiting threads.
214  
    /** Set the signaled state and wake all waiting threads.
215  

215  

216  
        @par Preconditions
216  
        @par Preconditions
217  
        Mutex must be held.
217  
        Mutex must be held.
218  

218  

219  
        @param lock The held mutex lock.
219  
        @param lock The held mutex lock.
220  
    */
220  
    */
221  
    void signal_all(std::unique_lock<std::mutex>& lock) const;
221  
    void signal_all(std::unique_lock<std::mutex>& lock) const;
222  

222  

223  
    /** Set the signaled state and wake one waiter if any exist.
223  
    /** Set the signaled state and wake one waiter if any exist.
224  

224  

225  
        Only unlocks and signals if at least one thread is waiting.
225  
        Only unlocks and signals if at least one thread is waiting.
226  
        Use this when the caller needs to perform a fallback action
226  
        Use this when the caller needs to perform a fallback action
227  
        (such as interrupting the reactor) when no waiters exist.
227  
        (such as interrupting the reactor) when no waiters exist.
228  

228  

229  
        @par Preconditions
229  
        @par Preconditions
230  
        Mutex must be held.
230  
        Mutex must be held.
231  

231  

232  
        @param lock The held mutex lock.
232  
        @param lock The held mutex lock.
233  

233  

234  
        @return `true` if unlocked and signaled, `false` if lock still held.
234  
        @return `true` if unlocked and signaled, `false` if lock still held.
235  
    */
235  
    */
236  
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
236  
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
237  

237  

238  
    /** Set the signaled state, unlock, and wake one waiter if any exist.
238  
    /** Set the signaled state, unlock, and wake one waiter if any exist.
239  

239  

240  
        Always unlocks the mutex. Use this when the caller will release
240  
        Always unlocks the mutex. Use this when the caller will release
241  
        the lock regardless of whether a waiter exists.
241  
        the lock regardless of whether a waiter exists.
242  

242  

243  
        @par Preconditions
243  
        @par Preconditions
244  
        Mutex must be held.
244  
        Mutex must be held.
245  

245  

246  
        @param lock The held mutex lock.
246  
        @param lock The held mutex lock.
247  

247  

248  
        @return `true` if a waiter was signaled, `false` otherwise.
248  
        @return `true` if a waiter was signaled, `false` otherwise.
249  
    */
249  
    */
250  
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
250  
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
251  

251  

252  
    /** Clear the signaled state before waiting.
252  
    /** Clear the signaled state before waiting.
253  

253  

254  
        @par Preconditions
254  
        @par Preconditions
255  
        Mutex must be held.
255  
        Mutex must be held.
256  
    */
256  
    */
257  
    void clear_signal() const;
257  
    void clear_signal() const;
258  

258  

259  
    /** Block until the signaled state is set.
259  
    /** Block until the signaled state is set.
260  

260  

261  
        Returns immediately if already signaled (fast-path). Otherwise
261  
        Returns immediately if already signaled (fast-path). Otherwise
262  
        increments the waiter count, waits on the condition variable,
262  
        increments the waiter count, waits on the condition variable,
263  
        and decrements the waiter count upon waking.
263  
        and decrements the waiter count upon waking.
264  

264  

265  
        @par Preconditions
265  
        @par Preconditions
266  
        Mutex must be held.
266  
        Mutex must be held.
267  

267  

268  
        @param lock The held mutex lock.
268  
        @param lock The held mutex lock.
269  
    */
269  
    */
270  
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
270  
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
271  

271  

272  
    /** Block until signaled or timeout expires.
272  
    /** Block until signaled or timeout expires.
273  

273  

274  
        @par Preconditions
274  
        @par Preconditions
275  
        Mutex must be held.
275  
        Mutex must be held.
276  

276  

277  
        @param lock The held mutex lock.
277  
        @param lock The held mutex lock.
278  
        @param timeout_us Maximum time to wait in microseconds.
278  
        @param timeout_us Maximum time to wait in microseconds.
279  
    */
279  
    */
280  
    void wait_for_signal_for(
280  
    void wait_for_signal_for(
281  
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
281  
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
282  

282  

283  
    int epoll_fd_;
283  
    int epoll_fd_;
284  
    int event_fd_; // for interrupting reactor
284  
    int event_fd_; // for interrupting reactor
285  
    int timer_fd_; // timerfd for kernel-managed timer expiry
285  
    int timer_fd_; // timerfd for kernel-managed timer expiry
286  
    mutable std::mutex mutex_;
286  
    mutable std::mutex mutex_;
287  
    mutable std::condition_variable cond_;
287  
    mutable std::condition_variable cond_;
288  
    mutable op_queue completed_ops_;
288  
    mutable op_queue completed_ops_;
289  
    mutable std::atomic<long> outstanding_work_;
289  
    mutable std::atomic<long> outstanding_work_;
290  
    bool stopped_;
290  
    bool stopped_;
291  
    bool shutdown_;
291  
    bool shutdown_;
292  

292  

293  
    // True while a thread is blocked in epoll_wait. Used by
293  
    // True while a thread is blocked in epoll_wait. Used by
294  
    // wake_one_thread_and_unlock and work_finished to know when
294  
    // wake_one_thread_and_unlock and work_finished to know when
295  
    // an eventfd interrupt is needed instead of a condvar signal.
295  
    // an eventfd interrupt is needed instead of a condvar signal.
296  
    mutable std::atomic<bool> task_running_{false};
296  
    mutable std::atomic<bool> task_running_{false};
297  

297  

298  
    // True when the reactor has been told to do a non-blocking poll
298  
    // True when the reactor has been told to do a non-blocking poll
299  
    // (more handlers queued or poll mode). Prevents redundant eventfd
299  
    // (more handlers queued or poll mode). Prevents redundant eventfd
300  
    // writes and controls the epoll_wait timeout.
300  
    // writes and controls the epoll_wait timeout.
301  
    mutable bool task_interrupted_ = false;
301  
    mutable bool task_interrupted_ = false;
302  

302  

303  
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
303  
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
304  
    mutable std::size_t state_ = 0;
304  
    mutable std::size_t state_ = 0;
305  

305  

306  
    // Edge-triggered eventfd state
306  
    // Edge-triggered eventfd state
307  
    mutable std::atomic<bool> eventfd_armed_{false};
307  
    mutable std::atomic<bool> eventfd_armed_{false};
308  

308  

309  
    // Set when the earliest timer changes; flushed before epoll_wait
309  
    // Set when the earliest timer changes; flushed before epoll_wait
310  
    // blocks. Avoids timerfd_settime syscalls for timers that are
310  
    // blocks. Avoids timerfd_settime syscalls for timers that are
311  
    // scheduled then cancelled without being waited on.
311  
    // scheduled then cancelled without being waited on.
312  
    mutable std::atomic<bool> timerfd_stale_{false};
312  
    mutable std::atomic<bool> timerfd_stale_{false};
313  

313  

314  
    // Sentinel operation for interleaving reactor runs with handler execution.
314  
    // Sentinel operation for interleaving reactor runs with handler execution.
315  
    // Ensures the reactor runs periodically even when handlers are continuously
315  
    // Ensures the reactor runs periodically even when handlers are continuously
316  
    // posted, preventing starvation of I/O events, timers, and signals.
316  
    // posted, preventing starvation of I/O events, timers, and signals.
317  
    struct task_op final : scheduler_op
317  
    struct task_op final : scheduler_op
318  
    {
318  
    {
319  
        void operator()() override {}
319  
        void operator()() override {}
320  
        void destroy() override {}
320  
        void destroy() override {}
321  
    };
321  
    };
322  
    task_op task_op_;
322  
    task_op task_op_;
323  
};
323  
};
324  

324  

325  
//--------------------------------------------------------------------------
325  
//--------------------------------------------------------------------------
326  
//
326  
//
327  
// Implementation
327  
// Implementation
328  
//
328  
//
329  
//--------------------------------------------------------------------------
329  
//--------------------------------------------------------------------------
330  

330  

331  
/*
331  
/*
332  
    epoll Scheduler - Single Reactor Model
332  
    epoll Scheduler - Single Reactor Model
333  
    ======================================
333  
    ======================================
334  

334  

335  
    This scheduler uses a thread coordination strategy to provide handler
335  
    This scheduler uses a thread coordination strategy to provide handler
336  
    parallelism and avoid the thundering herd problem.
336  
    parallelism and avoid the thundering herd problem.
337  
    Instead of all threads blocking on epoll_wait(), one thread becomes the
337  
    Instead of all threads blocking on epoll_wait(), one thread becomes the
338  
    "reactor" while others wait on a condition variable for handler work.
338  
    "reactor" while others wait on a condition variable for handler work.
339  

339  

340  
    Thread Model
340  
    Thread Model
341  
    ------------
341  
    ------------
342  
    - ONE thread runs epoll_wait() at a time (the reactor thread)
342  
    - ONE thread runs epoll_wait() at a time (the reactor thread)
343  
    - OTHER threads wait on cond_ (condition variable) for handlers
343  
    - OTHER threads wait on cond_ (condition variable) for handlers
344  
    - When work is posted, exactly one waiting thread wakes via notify_one()
344  
    - When work is posted, exactly one waiting thread wakes via notify_one()
345  
    - This matches Windows IOCP semantics where N posted items wake N threads
345  
    - This matches Windows IOCP semantics where N posted items wake N threads
346  

346  

347  
    Event Loop Structure (do_one)
347  
    Event Loop Structure (do_one)
348  
    -----------------------------
348  
    -----------------------------
349  
    1. Lock mutex, try to pop handler from queue
349  
    1. Lock mutex, try to pop handler from queue
350  
    2. If got handler: execute it (unlocked), return
350  
    2. If got handler: execute it (unlocked), return
351  
    3. If queue empty and no reactor running: become reactor
351  
    3. If queue empty and no reactor running: become reactor
352  
       - Run epoll_wait (unlocked), queue I/O completions, loop back
352  
       - Run epoll_wait (unlocked), queue I/O completions, loop back
353  
    4. If queue empty and reactor running: wait on condvar for work
353  
    4. If queue empty and reactor running: wait on condvar for work
354  

354  

355  
    The task_running_ flag ensures only one thread owns epoll_wait().
355  
    The task_running_ flag ensures only one thread owns epoll_wait().
356  
    After the reactor queues I/O completions, it loops back to try getting
356  
    After the reactor queues I/O completions, it loops back to try getting
357  
    a handler, giving priority to handler execution over more I/O polling.
357  
    a handler, giving priority to handler execution over more I/O polling.
358  

358  

359  
    Signaling State (state_)
359  
    Signaling State (state_)
360  
    ------------------------
360  
    ------------------------
361  
    The state_ variable encodes two pieces of information:
361  
    The state_ variable encodes two pieces of information:
362  
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
362  
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
363  
    - Upper bits: waiter count (each waiter adds 2 before blocking)
363  
    - Upper bits: waiter count (each waiter adds 2 before blocking)
364  

364  

365  
    This allows efficient coordination:
365  
    This allows efficient coordination:
366  
    - Signalers only call notify when waiters exist (state_ > 1)
366  
    - Signalers only call notify when waiters exist (state_ > 1)
367  
    - Waiters check if already signaled before blocking (fast-path)
367  
    - Waiters check if already signaled before blocking (fast-path)
368  

368  

369  
    Wake Coordination (wake_one_thread_and_unlock)
369  
    Wake Coordination (wake_one_thread_and_unlock)
370  
    ----------------------------------------------
370  
    ----------------------------------------------
371  
    When posting work:
371  
    When posting work:
372  
    - If waiters exist (state_ > 1): signal and notify_one()
372  
    - If waiters exist (state_ > 1): signal and notify_one()
373  
    - Else if reactor running: interrupt via eventfd write
373  
    - Else if reactor running: interrupt via eventfd write
374  
    - Else: no-op (thread will find work when it checks queue)
374  
    - Else: no-op (thread will find work when it checks queue)
375  

375  

376  
    This avoids waking threads unnecessarily. With cascading wakes,
376  
    This avoids waking threads unnecessarily. With cascading wakes,
377  
    each handler execution wakes at most one additional thread if
377  
    each handler execution wakes at most one additional thread if
378  
    more work exists in the queue.
378  
    more work exists in the queue.
379  

379  

380  
    Work Counting
380  
    Work Counting
381  
    -------------
381  
    -------------
382  
    outstanding_work_ tracks pending operations. When it hits zero, run()
382  
    outstanding_work_ tracks pending operations. When it hits zero, run()
383  
    returns. Each operation increments on start, decrements on completion.
383  
    returns. Each operation increments on start, decrements on completion.
384  

384  

385  
    Timer Integration
385  
    Timer Integration
386  
    -----------------
386  
    -----------------
387  
    Timers are handled by timer_service. The reactor adjusts epoll_wait
387  
    Timers are handled by timer_service. The reactor adjusts epoll_wait
388  
    timeout to wake for the nearest timer expiry. When a new timer is
388  
    timeout to wake for the nearest timer expiry. When a new timer is
389  
    scheduled earlier than current, timer_service calls interrupt_reactor()
389  
    scheduled earlier than current, timer_service calls interrupt_reactor()
390  
    to re-evaluate the timeout.
390  
    to re-evaluate the timeout.
391  
*/
391  
*/
392  

392  

393  
namespace epoll {
393  
namespace epoll {
394  

394  

395  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
395  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
396  
{
396  
{
397  
    epoll_scheduler const* key;
397  
    epoll_scheduler const* key;
398  
    scheduler_context* next;
398  
    scheduler_context* next;
399  
    op_queue private_queue;
399  
    op_queue private_queue;
400  
    long private_outstanding_work;
400  
    long private_outstanding_work;
401  
    int inline_budget;
401  
    int inline_budget;
402  
    int inline_budget_max;
402  
    int inline_budget_max;
403  
    bool unassisted;
403  
    bool unassisted;
404  

404  

405  
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
405  
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
406  
        : key(k)
406  
        : key(k)
407  
        , next(n)
407  
        , next(n)
408  
        , private_outstanding_work(0)
408  
        , private_outstanding_work(0)
409  
        , inline_budget(0)
409  
        , inline_budget(0)
410  
        , inline_budget_max(2)
410  
        , inline_budget_max(2)
411  
        , unassisted(false)
411  
        , unassisted(false)
412  
    {
412  
    {
413  
    }
413  
    }
414  
};
414  
};
415  

415  

416  
inline thread_local_ptr<scheduler_context> context_stack;
416  
inline thread_local_ptr<scheduler_context> context_stack;
417  

417  

418  
struct thread_context_guard
418  
struct thread_context_guard
419  
{
419  
{
420  
    scheduler_context frame_;
420  
    scheduler_context frame_;
421  

421  

422  
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
422  
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
423  
        : frame_(ctx, context_stack.get())
423  
        : frame_(ctx, context_stack.get())
424  
    {
424  
    {
425  
        context_stack.set(&frame_);
425  
        context_stack.set(&frame_);
426  
    }
426  
    }
427  

427  

428  
    ~thread_context_guard() noexcept
428  
    ~thread_context_guard() noexcept
429  
    {
429  
    {
430  
        if (!frame_.private_queue.empty())
430  
        if (!frame_.private_queue.empty())
431  
            frame_.key->drain_thread_queue(
431  
            frame_.key->drain_thread_queue(
432  
                frame_.private_queue, frame_.private_outstanding_work);
432  
                frame_.private_queue, frame_.private_outstanding_work);
433  
        context_stack.set(frame_.next);
433  
        context_stack.set(frame_.next);
434  
    }
434  
    }
435  
};
435  
};
436  

436  

437  
inline scheduler_context*
437  
inline scheduler_context*
438  
find_context(epoll_scheduler const* self) noexcept
438  
find_context(epoll_scheduler const* self) noexcept
439  
{
439  
{
440  
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
440  
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
441  
        if (c->key == self)
441  
        if (c->key == self)
442  
            return c;
442  
            return c;
443  
    return nullptr;
443  
    return nullptr;
444  
}
444  
}
445  

445  

446  
} // namespace epoll
446  
} // namespace epoll
447  

447  

448  
inline void
448  
inline void
449  
epoll_scheduler::reset_inline_budget() const noexcept
449  
epoll_scheduler::reset_inline_budget() const noexcept
450  
{
450  
{
451  
    if (auto* ctx = epoll::find_context(this))
451  
    if (auto* ctx = epoll::find_context(this))
452  
    {
452  
    {
453  
        // Cap when no other thread absorbed queued work. A moderate
453  
        // Cap when no other thread absorbed queued work. A moderate
454  
        // cap (4) amortizes scheduling for small buffers while avoiding
454  
        // cap (4) amortizes scheduling for small buffers while avoiding
455  
        // bursty I/O that fills socket buffers and stalls large transfers.
455  
        // bursty I/O that fills socket buffers and stalls large transfers.
456  
        if (ctx->unassisted)
456  
        if (ctx->unassisted)
457  
        {
457  
        {
458  
            ctx->inline_budget_max = 4;
458  
            ctx->inline_budget_max = 4;
459  
            ctx->inline_budget     = 4;
459  
            ctx->inline_budget     = 4;
460  
            return;
460  
            return;
461  
        }
461  
        }
462  
        // Ramp up when previous cycle fully consumed budget.
462  
        // Ramp up when previous cycle fully consumed budget.
463  
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
463  
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
464  
        if (ctx->inline_budget == 0)
464  
        if (ctx->inline_budget == 0)
465  
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
465  
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
466  
        else if (ctx->inline_budget < ctx->inline_budget_max)
466  
        else if (ctx->inline_budget < ctx->inline_budget_max)
467  
            ctx->inline_budget_max = 2;
467  
            ctx->inline_budget_max = 2;
468  
        ctx->inline_budget = ctx->inline_budget_max;
468  
        ctx->inline_budget = ctx->inline_budget_max;
469  
    }
469  
    }
470  
}
470  
}
471  

471  

472  
inline bool
472  
inline bool
473  
epoll_scheduler::try_consume_inline_budget() const noexcept
473  
epoll_scheduler::try_consume_inline_budget() const noexcept
474  
{
474  
{
475  
    if (auto* ctx = epoll::find_context(this))
475  
    if (auto* ctx = epoll::find_context(this))
476  
    {
476  
    {
477  
        if (ctx->inline_budget > 0)
477  
        if (ctx->inline_budget > 0)
478  
        {
478  
        {
479  
            --ctx->inline_budget;
479  
            --ctx->inline_budget;
480  
            return true;
480  
            return true;
481  
        }
481  
        }
482  
    }
482  
    }
483  
    return false;
483  
    return false;
484  
}
484  
}
485  

485  

486  
inline void
486  
inline void
487  
descriptor_state::operator()()
487  
descriptor_state::operator()()
488  
{
488  
{
489  
    is_enqueued_.store(false, std::memory_order_relaxed);
489  
    is_enqueued_.store(false, std::memory_order_relaxed);
490  

490  

491  
    // Take ownership of impl ref set by close_socket() to prevent
491  
    // Take ownership of impl ref set by close_socket() to prevent
492  
    // the owning impl from being freed while we're executing
492  
    // the owning impl from being freed while we're executing
493  
    auto prevent_impl_destruction = std::move(impl_ref_);
493  
    auto prevent_impl_destruction = std::move(impl_ref_);
494  

494  

495  
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
495  
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
496  
    if (ev == 0)
496  
    if (ev == 0)
497  
    {
497  
    {
498  
        scheduler_->compensating_work_started();
498  
        scheduler_->compensating_work_started();
499  
        return;
499  
        return;
500  
    }
500  
    }
501  

501  

502  
    op_queue local_ops;
502  
    op_queue local_ops;
503  

503  

504  
    int err = 0;
504  
    int err = 0;
505  
    if (ev & EPOLLERR)
505  
    if (ev & EPOLLERR)
506  
    {
506  
    {
507  
        socklen_t len = sizeof(err);
507  
        socklen_t len = sizeof(err);
508  
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
508  
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
509  
            err = errno;
509  
            err = errno;
510  
        if (err == 0)
510  
        if (err == 0)
511  
            err = EIO;
511  
            err = EIO;
512  
    }
512  
    }
513  

513  

514  
    {
514  
    {
515  
        std::lock_guard lock(mutex);
515  
        std::lock_guard lock(mutex);
516  
        if (ev & EPOLLIN)
516  
        if (ev & EPOLLIN)
517  
        {
517  
        {
518  
            if (read_op)
518  
            if (read_op)
519  
            {
519  
            {
520  
                auto* rd = read_op;
520  
                auto* rd = read_op;
521  
                if (err)
521  
                if (err)
522  
                    rd->complete(err, 0);
522  
                    rd->complete(err, 0);
523  
                else
523  
                else
524  
                    rd->perform_io();
524  
                    rd->perform_io();
525  

525  

526  
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
526  
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
527  
                {
527  
                {
528  
                    rd->errn = 0;
528  
                    rd->errn = 0;
529  
                }
529  
                }
530  
                else
530  
                else
531  
                {
531  
                {
532  
                    read_op = nullptr;
532  
                    read_op = nullptr;
533  
                    local_ops.push(rd);
533  
                    local_ops.push(rd);
534  
                }
534  
                }
535  
            }
535  
            }
536  
            else
536  
            else
537  
            {
537  
            {
538  
                read_ready = true;
538  
                read_ready = true;
539  
            }
539  
            }
540  
        }
540  
        }
541  
        if (ev & EPOLLOUT)
541  
        if (ev & EPOLLOUT)
542  
        {
542  
        {
543  
            bool had_write_op = (connect_op || write_op);
543  
            bool had_write_op = (connect_op || write_op);
544  
            if (connect_op)
544  
            if (connect_op)
545  
            {
545  
            {
546  
                auto* cn = connect_op;
546  
                auto* cn = connect_op;
547  
                if (err)
547  
                if (err)
548  
                    cn->complete(err, 0);
548  
                    cn->complete(err, 0);
549  
                else
549  
                else
550  
                    cn->perform_io();
550  
                    cn->perform_io();
551  
                connect_op = nullptr;
551  
                connect_op = nullptr;
552  
                local_ops.push(cn);
552  
                local_ops.push(cn);
553  
            }
553  
            }
554  
            if (write_op)
554  
            if (write_op)
555  
            {
555  
            {
556  
                auto* wr = write_op;
556  
                auto* wr = write_op;
557  
                if (err)
557  
                if (err)
558  
                    wr->complete(err, 0);
558  
                    wr->complete(err, 0);
559  
                else
559  
                else
560  
                    wr->perform_io();
560  
                    wr->perform_io();
561  

561  

562  
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
562  
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
563  
                {
563  
                {
564  
                    wr->errn = 0;
564  
                    wr->errn = 0;
565  
                }
565  
                }
566  
                else
566  
                else
567  
                {
567  
                {
568  
                    write_op = nullptr;
568  
                    write_op = nullptr;
569  
                    local_ops.push(wr);
569  
                    local_ops.push(wr);
570  
                }
570  
                }
571  
            }
571  
            }
572  
            if (!had_write_op)
572  
            if (!had_write_op)
573  
                write_ready = true;
573  
                write_ready = true;
574  
        }
574  
        }
575  
        if (err)
575  
        if (err)
576  
        {
576  
        {
577  
            if (read_op)
577  
            if (read_op)
578  
            {
578  
            {
579  
                read_op->complete(err, 0);
579  
                read_op->complete(err, 0);
580  
                local_ops.push(std::exchange(read_op, nullptr));
580  
                local_ops.push(std::exchange(read_op, nullptr));
581  
            }
581  
            }
582  
            if (write_op)
582  
            if (write_op)
583  
            {
583  
            {
584  
                write_op->complete(err, 0);
584  
                write_op->complete(err, 0);
585  
                local_ops.push(std::exchange(write_op, nullptr));
585  
                local_ops.push(std::exchange(write_op, nullptr));
586  
            }
586  
            }
587  
            if (connect_op)
587  
            if (connect_op)
588  
            {
588  
            {
589  
                connect_op->complete(err, 0);
589  
                connect_op->complete(err, 0);
590  
                local_ops.push(std::exchange(connect_op, nullptr));
590  
                local_ops.push(std::exchange(connect_op, nullptr));
591  
            }
591  
            }
592  
        }
592  
        }
593  
    }
593  
    }
594  

594  

595  
    // Execute first handler inline — the scheduler's work_cleanup
595  
    // Execute first handler inline — the scheduler's work_cleanup
596  
    // accounts for this as the "consumed" work item
596  
    // accounts for this as the "consumed" work item
597  
    scheduler_op* first = local_ops.pop();
597  
    scheduler_op* first = local_ops.pop();
598  
    if (first)
598  
    if (first)
599  
    {
599  
    {
600  
        scheduler_->post_deferred_completions(local_ops);
600  
        scheduler_->post_deferred_completions(local_ops);
601  
        (*first)();
601  
        (*first)();
602  
    }
602  
    }
603  
    else
603  
    else
604  
    {
604  
    {
605  
        scheduler_->compensating_work_started();
605  
        scheduler_->compensating_work_started();
606  
    }
606  
    }
607  
}
607  
}
608  

608  

609  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
609  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
610  
    : epoll_fd_(-1)
610  
    : epoll_fd_(-1)
611  
    , event_fd_(-1)
611  
    , event_fd_(-1)
612  
    , timer_fd_(-1)
612  
    , timer_fd_(-1)
613  
    , outstanding_work_(0)
613  
    , outstanding_work_(0)
614  
    , stopped_(false)
614  
    , stopped_(false)
615  
    , shutdown_(false)
615  
    , shutdown_(false)
616  
    , task_running_{false}
616  
    , task_running_{false}
617  
    , task_interrupted_(false)
617  
    , task_interrupted_(false)
618  
    , state_(0)
618  
    , state_(0)
619  
{
619  
{
620  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
620  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
621  
    if (epoll_fd_ < 0)
621  
    if (epoll_fd_ < 0)
622  
        detail::throw_system_error(make_err(errno), "epoll_create1");
622  
        detail::throw_system_error(make_err(errno), "epoll_create1");
623  

623  

624  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
624  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
625  
    if (event_fd_ < 0)
625  
    if (event_fd_ < 0)
626  
    {
626  
    {
627  
        int errn = errno;
627  
        int errn = errno;
628  
        ::close(epoll_fd_);
628  
        ::close(epoll_fd_);
629  
        detail::throw_system_error(make_err(errn), "eventfd");
629  
        detail::throw_system_error(make_err(errn), "eventfd");
630  
    }
630  
    }
631  

631  

632  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
632  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
633  
    if (timer_fd_ < 0)
633  
    if (timer_fd_ < 0)
634  
    {
634  
    {
635  
        int errn = errno;
635  
        int errn = errno;
636  
        ::close(event_fd_);
636  
        ::close(event_fd_);
637  
        ::close(epoll_fd_);
637  
        ::close(epoll_fd_);
638  
        detail::throw_system_error(make_err(errn), "timerfd_create");
638  
        detail::throw_system_error(make_err(errn), "timerfd_create");
639  
    }
639  
    }
640  

640  

641  
    epoll_event ev{};
641  
    epoll_event ev{};
642  
    ev.events   = EPOLLIN | EPOLLET;
642  
    ev.events   = EPOLLIN | EPOLLET;
643  
    ev.data.ptr = nullptr;
643  
    ev.data.ptr = nullptr;
644  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
644  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
645  
    {
645  
    {
646  
        int errn = errno;
646  
        int errn = errno;
647  
        ::close(timer_fd_);
647  
        ::close(timer_fd_);
648  
        ::close(event_fd_);
648  
        ::close(event_fd_);
649  
        ::close(epoll_fd_);
649  
        ::close(epoll_fd_);
650  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
650  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
651  
    }
651  
    }
652  

652  

653  
    epoll_event timer_ev{};
653  
    epoll_event timer_ev{};
654  
    timer_ev.events   = EPOLLIN | EPOLLERR;
654  
    timer_ev.events   = EPOLLIN | EPOLLERR;
655  
    timer_ev.data.ptr = &timer_fd_;
655  
    timer_ev.data.ptr = &timer_fd_;
656  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
656  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
657  
    {
657  
    {
658  
        int errn = errno;
658  
        int errn = errno;
659  
        ::close(timer_fd_);
659  
        ::close(timer_fd_);
660  
        ::close(event_fd_);
660  
        ::close(event_fd_);
661  
        ::close(epoll_fd_);
661  
        ::close(epoll_fd_);
662  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
662  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
663  
    }
663  
    }
664  

664  

665  
    timer_svc_ = &get_timer_service(ctx, *this);
665  
    timer_svc_ = &get_timer_service(ctx, *this);
666  
    timer_svc_->set_on_earliest_changed(
666  
    timer_svc_->set_on_earliest_changed(
667  
        timer_service::callback(this, [](void* p) {
667  
        timer_service::callback(this, [](void* p) {
668  
            auto* self = static_cast<epoll_scheduler*>(p);
668  
            auto* self = static_cast<epoll_scheduler*>(p);
669  
            self->timerfd_stale_.store(true, std::memory_order_release);
669  
            self->timerfd_stale_.store(true, std::memory_order_release);
670  
            if (self->task_running_.load(std::memory_order_acquire))
670  
            if (self->task_running_.load(std::memory_order_acquire))
671  
                self->interrupt_reactor();
671  
                self->interrupt_reactor();
672  
        }));
672  
        }));
673  

673  

674  
    // Initialize resolver service
674  
    // Initialize resolver service
675  
    get_resolver_service(ctx, *this);
675  
    get_resolver_service(ctx, *this);
676  

676  

677  
    // Initialize signal service
677  
    // Initialize signal service
678  
    get_signal_service(ctx, *this);
678  
    get_signal_service(ctx, *this);
679  

679  

680  
    // Push task sentinel to interleave reactor runs with handler execution
680  
    // Push task sentinel to interleave reactor runs with handler execution
681  
    completed_ops_.push(&task_op_);
681  
    completed_ops_.push(&task_op_);
682  
}
682  
}
683  

683  

684  
inline epoll_scheduler::~epoll_scheduler()
684  
inline epoll_scheduler::~epoll_scheduler()
685  
{
685  
{
686  
    if (timer_fd_ >= 0)
686  
    if (timer_fd_ >= 0)
687  
        ::close(timer_fd_);
687  
        ::close(timer_fd_);
688  
    if (event_fd_ >= 0)
688  
    if (event_fd_ >= 0)
689  
        ::close(event_fd_);
689  
        ::close(event_fd_);
690  
    if (epoll_fd_ >= 0)
690  
    if (epoll_fd_ >= 0)
691  
        ::close(epoll_fd_);
691  
        ::close(epoll_fd_);
692  
}
692  
}
693  

693  

694  
inline void
694  
inline void
695  
epoll_scheduler::shutdown()
695  
epoll_scheduler::shutdown()
696  
{
696  
{
697  
    {
697  
    {
698  
        std::unique_lock lock(mutex_);
698  
        std::unique_lock lock(mutex_);
699  
        shutdown_ = true;
699  
        shutdown_ = true;
700  

700  

701  
        while (auto* h = completed_ops_.pop())
701  
        while (auto* h = completed_ops_.pop())
702  
        {
702  
        {
703  
            if (h == &task_op_)
703  
            if (h == &task_op_)
704  
                continue;
704  
                continue;
705  
            lock.unlock();
705  
            lock.unlock();
706  
            h->destroy();
706  
            h->destroy();
707  
            lock.lock();
707  
            lock.lock();
708  
        }
708  
        }
709  

709  

710  
        signal_all(lock);
710  
        signal_all(lock);
711  
    }
711  
    }
712  

712  

713  
    outstanding_work_.store(0, std::memory_order_release);
713  
    outstanding_work_.store(0, std::memory_order_release);
714  

714  

715  
    if (event_fd_ >= 0)
715  
    if (event_fd_ >= 0)
716  
        interrupt_reactor();
716  
        interrupt_reactor();
717  
}
717  
}
718  

718  

719  
inline void
719  
inline void
720  
epoll_scheduler::post(std::coroutine_handle<> h) const
720  
epoll_scheduler::post(std::coroutine_handle<> h) const
721  
{
721  
{
722  
    struct post_handler final : scheduler_op
722  
    struct post_handler final : scheduler_op
723  
    {
723  
    {
724  
        std::coroutine_handle<> h_;
724  
        std::coroutine_handle<> h_;
725  

725  

726  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
726  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
727  

727  

728  
        ~post_handler() override = default;
728  
        ~post_handler() override = default;
729  

729  

730  
        void operator()() override
730  
        void operator()() override
731  
        {
731  
        {
732  
            auto h = h_;
732  
            auto h = h_;
733  
            delete this;
733  
            delete this;
734  
            h.resume();
734  
            h.resume();
735  
        }
735  
        }
736  

736  

737  
        void destroy() override
737  
        void destroy() override
738  
        {
738  
        {
739  
            delete this;
739  
            delete this;
740  
        }
740  
        }
741  
    };
741  
    };
742  

742  

743  
    auto ph = std::make_unique<post_handler>(h);
743  
    auto ph = std::make_unique<post_handler>(h);
744  

744  

745  
    // Fast path: same thread posts to private queue
745  
    // Fast path: same thread posts to private queue
746  
    // Only count locally; work_cleanup batches to global counter
746  
    // Only count locally; work_cleanup batches to global counter
747  
    if (auto* ctx = epoll::find_context(this))
747  
    if (auto* ctx = epoll::find_context(this))
748  
    {
748  
    {
749  
        ++ctx->private_outstanding_work;
749  
        ++ctx->private_outstanding_work;
750  
        ctx->private_queue.push(ph.release());
750  
        ctx->private_queue.push(ph.release());
751  
        return;
751  
        return;
752  
    }
752  
    }
753  

753  

754  
    // Slow path: cross-thread post requires mutex
754  
    // Slow path: cross-thread post requires mutex
755  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
755  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
756  

756  

757  
    std::unique_lock lock(mutex_);
757  
    std::unique_lock lock(mutex_);
758  
    completed_ops_.push(ph.release());
758  
    completed_ops_.push(ph.release());
759  
    wake_one_thread_and_unlock(lock);
759  
    wake_one_thread_and_unlock(lock);
760  
}
760  
}
761  

761  

762  
inline void
762  
inline void
763  
epoll_scheduler::post(scheduler_op* h) const
763  
epoll_scheduler::post(scheduler_op* h) const
764  
{
764  
{
765  
    // Fast path: same thread posts to private queue
765  
    // Fast path: same thread posts to private queue
766  
    // Only count locally; work_cleanup batches to global counter
766  
    // Only count locally; work_cleanup batches to global counter
767  
    if (auto* ctx = epoll::find_context(this))
767  
    if (auto* ctx = epoll::find_context(this))
768  
    {
768  
    {
769  
        ++ctx->private_outstanding_work;
769  
        ++ctx->private_outstanding_work;
770  
        ctx->private_queue.push(h);
770  
        ctx->private_queue.push(h);
771  
        return;
771  
        return;
772  
    }
772  
    }
773  

773  

774  
    // Slow path: cross-thread post requires mutex
774  
    // Slow path: cross-thread post requires mutex
775  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
775  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
776  

776  

777  
    std::unique_lock lock(mutex_);
777  
    std::unique_lock lock(mutex_);
778  
    completed_ops_.push(h);
778  
    completed_ops_.push(h);
779  
    wake_one_thread_and_unlock(lock);
779  
    wake_one_thread_and_unlock(lock);
780  
}
780  
}
781  

781  

782  
inline bool
782  
inline bool
783  
epoll_scheduler::running_in_this_thread() const noexcept
783  
epoll_scheduler::running_in_this_thread() const noexcept
784  
{
784  
{
785  
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
785  
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
786  
        if (c->key == this)
786  
        if (c->key == this)
787  
            return true;
787  
            return true;
788  
    return false;
788  
    return false;
789  
}
789  
}
790  

790  

791  
inline void
791  
inline void
792  
epoll_scheduler::stop()
792  
epoll_scheduler::stop()
793  
{
793  
{
794  
    std::unique_lock lock(mutex_);
794  
    std::unique_lock lock(mutex_);
795  
    if (!stopped_)
795  
    if (!stopped_)
796  
    {
796  
    {
797  
        stopped_ = true;
797  
        stopped_ = true;
798  
        signal_all(lock);
798  
        signal_all(lock);
799  
        interrupt_reactor();
799  
        interrupt_reactor();
800  
    }
800  
    }
801  
}
801  
}
802  

802  

803  
inline bool
803  
inline bool
804  
epoll_scheduler::stopped() const noexcept
804  
epoll_scheduler::stopped() const noexcept
805  
{
805  
{
806  
    std::unique_lock lock(mutex_);
806  
    std::unique_lock lock(mutex_);
807  
    return stopped_;
807  
    return stopped_;
808  
}
808  
}
809  

809  

810  
inline void
810  
inline void
811  
epoll_scheduler::restart()
811  
epoll_scheduler::restart()
812  
{
812  
{
813  
    std::unique_lock lock(mutex_);
813  
    std::unique_lock lock(mutex_);
814  
    stopped_ = false;
814  
    stopped_ = false;
815  
}
815  
}
816  

816  

817  
inline std::size_t
817  
inline std::size_t
818  
epoll_scheduler::run()
818  
epoll_scheduler::run()
819  
{
819  
{
820  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
820  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
821  
    {
821  
    {
822  
        stop();
822  
        stop();
823  
        return 0;
823  
        return 0;
824  
    }
824  
    }
825  

825  

826  
    epoll::thread_context_guard ctx(this);
826  
    epoll::thread_context_guard ctx(this);
827  
    std::unique_lock lock(mutex_);
827  
    std::unique_lock lock(mutex_);
828  

828  

829  
    std::size_t n = 0;
829  
    std::size_t n = 0;
830  
    for (;;)
830  
    for (;;)
831  
    {
831  
    {
832  
        if (!do_one(lock, -1, &ctx.frame_))
832  
        if (!do_one(lock, -1, &ctx.frame_))
833  
            break;
833  
            break;
834  
        if (n != (std::numeric_limits<std::size_t>::max)())
834  
        if (n != (std::numeric_limits<std::size_t>::max)())
835  
            ++n;
835  
            ++n;
836  
        if (!lock.owns_lock())
836  
        if (!lock.owns_lock())
837  
            lock.lock();
837  
            lock.lock();
838  
    }
838  
    }
839  
    return n;
839  
    return n;
840  
}
840  
}
841  

841  

842  
inline std::size_t
842  
inline std::size_t
843  
epoll_scheduler::run_one()
843  
epoll_scheduler::run_one()
844  
{
844  
{
845  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
845  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
846  
    {
846  
    {
847  
        stop();
847  
        stop();
848  
        return 0;
848  
        return 0;
849  
    }
849  
    }
850  

850  

851  
    epoll::thread_context_guard ctx(this);
851  
    epoll::thread_context_guard ctx(this);
852  
    std::unique_lock lock(mutex_);
852  
    std::unique_lock lock(mutex_);
853  
    return do_one(lock, -1, &ctx.frame_);
853  
    return do_one(lock, -1, &ctx.frame_);
854  
}
854  
}
855  

855  

856  
inline std::size_t
856  
inline std::size_t
857  
epoll_scheduler::wait_one(long usec)
857  
epoll_scheduler::wait_one(long usec)
858  
{
858  
{
859  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
859  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
860  
    {
860  
    {
861  
        stop();
861  
        stop();
862  
        return 0;
862  
        return 0;
863  
    }
863  
    }
864  

864  

865  
    epoll::thread_context_guard ctx(this);
865  
    epoll::thread_context_guard ctx(this);
866  
    std::unique_lock lock(mutex_);
866  
    std::unique_lock lock(mutex_);
867  
    return do_one(lock, usec, &ctx.frame_);
867  
    return do_one(lock, usec, &ctx.frame_);
868  
}
868  
}
869  

869  

870  
inline std::size_t
870  
inline std::size_t
871  
epoll_scheduler::poll()
871  
epoll_scheduler::poll()
872  
{
872  
{
873  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
873  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
874  
    {
874  
    {
875  
        stop();
875  
        stop();
876  
        return 0;
876  
        return 0;
877  
    }
877  
    }
878  

878  

879  
    epoll::thread_context_guard ctx(this);
879  
    epoll::thread_context_guard ctx(this);
880  
    std::unique_lock lock(mutex_);
880  
    std::unique_lock lock(mutex_);
881  

881  

882  
    std::size_t n = 0;
882  
    std::size_t n = 0;
883  
    for (;;)
883  
    for (;;)
884  
    {
884  
    {
885  
        if (!do_one(lock, 0, &ctx.frame_))
885  
        if (!do_one(lock, 0, &ctx.frame_))
886  
            break;
886  
            break;
887  
        if (n != (std::numeric_limits<std::size_t>::max)())
887  
        if (n != (std::numeric_limits<std::size_t>::max)())
888  
            ++n;
888  
            ++n;
889  
        if (!lock.owns_lock())
889  
        if (!lock.owns_lock())
890  
            lock.lock();
890  
            lock.lock();
891  
    }
891  
    }
892  
    return n;
892  
    return n;
893  
}
893  
}
894  

894  

895  
inline std::size_t
895  
inline std::size_t
896  
epoll_scheduler::poll_one()
896  
epoll_scheduler::poll_one()
897  
{
897  
{
898  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
898  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
899  
    {
899  
    {
900  
        stop();
900  
        stop();
901  
        return 0;
901  
        return 0;
902  
    }
902  
    }
903  

903  

904  
    epoll::thread_context_guard ctx(this);
904  
    epoll::thread_context_guard ctx(this);
905  
    std::unique_lock lock(mutex_);
905  
    std::unique_lock lock(mutex_);
906  
    return do_one(lock, 0, &ctx.frame_);
906  
    return do_one(lock, 0, &ctx.frame_);
907  
}
907  
}
908  

908  

909  
inline void
909  
inline void
910  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
910  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
911  
{
911  
{
912  
    epoll_event ev{};
912  
    epoll_event ev{};
913  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
913  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
914  
    ev.data.ptr = desc;
914  
    ev.data.ptr = desc;
915  

915  

916  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
916  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
917  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
917  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
918  

918  

919  
    desc->registered_events = ev.events;
919  
    desc->registered_events = ev.events;
920  
    desc->fd                = fd;
920  
    desc->fd                = fd;
921  
    desc->scheduler_        = this;
921  
    desc->scheduler_        = this;
922  

922  

923  
    std::lock_guard lock(desc->mutex);
923  
    std::lock_guard lock(desc->mutex);
924  
    desc->read_ready  = false;
924  
    desc->read_ready  = false;
925  
    desc->write_ready = false;
925  
    desc->write_ready = false;
926  
}
926  
}
927  

927  

928  
inline void
928  
inline void
929  
epoll_scheduler::deregister_descriptor(int fd) const
929  
epoll_scheduler::deregister_descriptor(int fd) const
930  
{
930  
{
931  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
931  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
932  
}
932  
}
933  

933  

934  
inline void
934  
inline void
935  
epoll_scheduler::work_started() noexcept
935  
epoll_scheduler::work_started() noexcept
936  
{
936  
{
937  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
937  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
938  
}
938  
}
939  

939  

940  
inline void
940  
inline void
941  
epoll_scheduler::work_finished() noexcept
941  
epoll_scheduler::work_finished() noexcept
942  
{
942  
{
943  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
943  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
944  
        stop();
944  
        stop();
945  
}
945  
}
946  

946  

947  
inline void
947  
inline void
948  
epoll_scheduler::compensating_work_started() const noexcept
948  
epoll_scheduler::compensating_work_started() const noexcept
949  
{
949  
{
950  
    auto* ctx = epoll::find_context(this);
950  
    auto* ctx = epoll::find_context(this);
951  
    if (ctx)
951  
    if (ctx)
952  
        ++ctx->private_outstanding_work;
952  
        ++ctx->private_outstanding_work;
953  
}
953  
}
954  

954  

955  
inline void
955  
inline void
956  
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
956  
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
957  
{
957  
{
958  
    // Note: outstanding_work_ was already incremented when posting
958  
    // Note: outstanding_work_ was already incremented when posting
959  
    std::unique_lock lock(mutex_);
959  
    std::unique_lock lock(mutex_);
960  
    completed_ops_.splice(queue);
960  
    completed_ops_.splice(queue);
961  
    if (count > 0)
961  
    if (count > 0)
962  
        maybe_unlock_and_signal_one(lock);
962  
        maybe_unlock_and_signal_one(lock);
963  
}
963  
}
964  

964  

965  
inline void
965  
inline void
966  
epoll_scheduler::post_deferred_completions(op_queue& ops) const
966  
epoll_scheduler::post_deferred_completions(op_queue& ops) const
967  
{
967  
{
968  
    if (ops.empty())
968  
    if (ops.empty())
969  
        return;
969  
        return;
970  

970  

971  
    // Fast path: if on scheduler thread, use private queue
971  
    // Fast path: if on scheduler thread, use private queue
972  
    if (auto* ctx = epoll::find_context(this))
972  
    if (auto* ctx = epoll::find_context(this))
973  
    {
973  
    {
974  
        ctx->private_queue.splice(ops);
974  
        ctx->private_queue.splice(ops);
975  
        return;
975  
        return;
976  
    }
976  
    }
977  

977  

978  
    // Slow path: add to global queue and wake a thread
978  
    // Slow path: add to global queue and wake a thread
979  
    std::unique_lock lock(mutex_);
979  
    std::unique_lock lock(mutex_);
980  
    completed_ops_.splice(ops);
980  
    completed_ops_.splice(ops);
981  
    wake_one_thread_and_unlock(lock);
981  
    wake_one_thread_and_unlock(lock);
982  
}
982  
}
983  

983  

984  
inline void
984  
inline void
985  
epoll_scheduler::interrupt_reactor() const
985  
epoll_scheduler::interrupt_reactor() const
986  
{
986  
{
987  
    // Only write if not already armed to avoid redundant writes
987  
    // Only write if not already armed to avoid redundant writes
988  
    bool expected = false;
988  
    bool expected = false;
989  
    if (eventfd_armed_.compare_exchange_strong(
989  
    if (eventfd_armed_.compare_exchange_strong(
990  
            expected, true, std::memory_order_release,
990  
            expected, true, std::memory_order_release,
991  
            std::memory_order_relaxed))
991  
            std::memory_order_relaxed))
992  
    {
992  
    {
993  
        std::uint64_t val       = 1;
993  
        std::uint64_t val       = 1;
994  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
994  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
995  
    }
995  
    }
996  
}
996  
}
997  

997  

998  
inline void
998  
inline void
999  
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
999  
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
1000  
{
1000  
{
1001  
    state_ |= 1;
1001  
    state_ |= 1;
1002  
    cond_.notify_all();
1002  
    cond_.notify_all();
1003  
}
1003  
}
1004  

1004  

1005  
inline bool
1005  
inline bool
1006  
epoll_scheduler::maybe_unlock_and_signal_one(
1006  
epoll_scheduler::maybe_unlock_and_signal_one(
1007  
    std::unique_lock<std::mutex>& lock) const
1007  
    std::unique_lock<std::mutex>& lock) const
1008  
{
1008  
{
1009  
    state_ |= 1;
1009  
    state_ |= 1;
1010  
    if (state_ > 1)
1010  
    if (state_ > 1)
1011  
    {
1011  
    {
1012  
        lock.unlock();
1012  
        lock.unlock();
1013  
        cond_.notify_one();
1013  
        cond_.notify_one();
1014  
        return true;
1014  
        return true;
1015  
    }
1015  
    }
1016  
    return false;
1016  
    return false;
1017  
}
1017  
}
1018  

1018  

1019  
inline bool
1019  
inline bool
1020  
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1020  
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1021  
{
1021  
{
1022  
    state_ |= 1;
1022  
    state_ |= 1;
1023  
    bool have_waiters = state_ > 1;
1023  
    bool have_waiters = state_ > 1;
1024  
    lock.unlock();
1024  
    lock.unlock();
1025  
    if (have_waiters)
1025  
    if (have_waiters)
1026  
        cond_.notify_one();
1026  
        cond_.notify_one();
1027  
    return have_waiters;
1027  
    return have_waiters;
1028  
}
1028  
}
1029  

1029  

1030  
inline void
1030  
inline void
1031  
epoll_scheduler::clear_signal() const
1031  
epoll_scheduler::clear_signal() const
1032  
{
1032  
{
1033  
    state_ &= ~std::size_t(1);
1033  
    state_ &= ~std::size_t(1);
1034  
}
1034  
}
1035  

1035  

1036  
inline void
1036  
inline void
1037  
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1037  
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1038  
{
1038  
{
1039  
    while ((state_ & 1) == 0)
1039  
    while ((state_ & 1) == 0)
1040  
    {
1040  
    {
1041  
        state_ += 2;
1041  
        state_ += 2;
1042  
        cond_.wait(lock);
1042  
        cond_.wait(lock);
1043  
        state_ -= 2;
1043  
        state_ -= 2;
1044  
    }
1044  
    }
1045  
}
1045  
}
1046  

1046  

1047  
inline void
1047  
inline void
1048  
epoll_scheduler::wait_for_signal_for(
1048  
epoll_scheduler::wait_for_signal_for(
1049  
    std::unique_lock<std::mutex>& lock, long timeout_us) const
1049  
    std::unique_lock<std::mutex>& lock, long timeout_us) const
1050  
{
1050  
{
1051  
    if ((state_ & 1) == 0)
1051  
    if ((state_ & 1) == 0)
1052  
    {
1052  
    {
1053  
        state_ += 2;
1053  
        state_ += 2;
1054  
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1054  
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1055  
        state_ -= 2;
1055  
        state_ -= 2;
1056  
    }
1056  
    }
1057  
}
1057  
}
1058  

1058  

1059  
inline void
1059  
inline void
1060  
epoll_scheduler::wake_one_thread_and_unlock(
1060  
epoll_scheduler::wake_one_thread_and_unlock(
1061  
    std::unique_lock<std::mutex>& lock) const
1061  
    std::unique_lock<std::mutex>& lock) const
1062  
{
1062  
{
1063  
    if (maybe_unlock_and_signal_one(lock))
1063  
    if (maybe_unlock_and_signal_one(lock))
1064  
        return;
1064  
        return;
1065  

1065  

1066  
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1066  
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1067  
    {
1067  
    {
1068  
        task_interrupted_ = true;
1068  
        task_interrupted_ = true;
1069  
        lock.unlock();
1069  
        lock.unlock();
1070  
        interrupt_reactor();
1070  
        interrupt_reactor();
1071  
    }
1071  
    }
1072  
    else
1072  
    else
1073  
    {
1073  
    {
1074  
        lock.unlock();
1074  
        lock.unlock();
1075  
    }
1075  
    }
1076  
}
1076  
}
1077  

1077  

1078  
inline epoll_scheduler::work_cleanup::~work_cleanup()
1078  
inline epoll_scheduler::work_cleanup::~work_cleanup()
1079  
{
1079  
{
1080  
    if (ctx)
1080  
    if (ctx)
1081  
    {
1081  
    {
1082  
        long produced = ctx->private_outstanding_work;
1082  
        long produced = ctx->private_outstanding_work;
1083  
        if (produced > 1)
1083  
        if (produced > 1)
1084  
            scheduler->outstanding_work_.fetch_add(
1084  
            scheduler->outstanding_work_.fetch_add(
1085  
                produced - 1, std::memory_order_relaxed);
1085  
                produced - 1, std::memory_order_relaxed);
1086  
        else if (produced < 1)
1086  
        else if (produced < 1)
1087  
            scheduler->work_finished();
1087  
            scheduler->work_finished();
1088  
        ctx->private_outstanding_work = 0;
1088  
        ctx->private_outstanding_work = 0;
1089  

1089  

1090  
        if (!ctx->private_queue.empty())
1090  
        if (!ctx->private_queue.empty())
1091  
        {
1091  
        {
1092  
            lock->lock();
1092  
            lock->lock();
1093  
            scheduler->completed_ops_.splice(ctx->private_queue);
1093  
            scheduler->completed_ops_.splice(ctx->private_queue);
1094  
        }
1094  
        }
1095  
    }
1095  
    }
1096  
    else
1096  
    else
1097  
    {
1097  
    {
1098  
        scheduler->work_finished();
1098  
        scheduler->work_finished();
1099  
    }
1099  
    }
1100  
}
1100  
}
1101  

1101  

1102  
inline epoll_scheduler::task_cleanup::~task_cleanup()
1102  
inline epoll_scheduler::task_cleanup::~task_cleanup()
1103  
{
1103  
{
1104  
    if (!ctx)
1104  
    if (!ctx)
1105  
        return;
1105  
        return;
1106  

1106  

1107  
    if (ctx->private_outstanding_work > 0)
1107  
    if (ctx->private_outstanding_work > 0)
1108  
    {
1108  
    {
1109  
        scheduler->outstanding_work_.fetch_add(
1109  
        scheduler->outstanding_work_.fetch_add(
1110  
            ctx->private_outstanding_work, std::memory_order_relaxed);
1110  
            ctx->private_outstanding_work, std::memory_order_relaxed);
1111  
        ctx->private_outstanding_work = 0;
1111  
        ctx->private_outstanding_work = 0;
1112  
    }
1112  
    }
1113  

1113  

1114  
    if (!ctx->private_queue.empty())
1114  
    if (!ctx->private_queue.empty())
1115  
    {
1115  
    {
1116  
        if (!lock->owns_lock())
1116  
        if (!lock->owns_lock())
1117  
            lock->lock();
1117  
            lock->lock();
1118  
        scheduler->completed_ops_.splice(ctx->private_queue);
1118  
        scheduler->completed_ops_.splice(ctx->private_queue);
1119  
    }
1119  
    }
1120  
}
1120  
}
1121  

1121  

1122  
inline void
1122  
inline void
1123  
epoll_scheduler::update_timerfd() const
1123  
epoll_scheduler::update_timerfd() const
1124  
{
1124  
{
1125  
    auto nearest = timer_svc_->nearest_expiry();
1125  
    auto nearest = timer_svc_->nearest_expiry();
1126  

1126  

1127  
    itimerspec ts{};
1127  
    itimerspec ts{};
1128  
    int flags = 0;
1128  
    int flags = 0;
1129  

1129  

1130  
    if (nearest == timer_service::time_point::max())
1130  
    if (nearest == timer_service::time_point::max())
1131  
    {
1131  
    {
1132  
        // No timers - disarm by setting to 0 (relative)
1132  
        // No timers - disarm by setting to 0 (relative)
1133  
    }
1133  
    }
1134  
    else
1134  
    else
1135  
    {
1135  
    {
1136  
        auto now = std::chrono::steady_clock::now();
1136  
        auto now = std::chrono::steady_clock::now();
1137  
        if (nearest <= now)
1137  
        if (nearest <= now)
1138  
        {
1138  
        {
1139  
            // Use 1ns instead of 0 - zero disarms the timerfd
1139  
            // Use 1ns instead of 0 - zero disarms the timerfd
1140  
            ts.it_value.tv_nsec = 1;
1140  
            ts.it_value.tv_nsec = 1;
1141  
        }
1141  
        }
1142  
        else
1142  
        else
1143  
        {
1143  
        {
1144  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1144  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1145  
                            nearest - now)
1145  
                            nearest - now)
1146  
                            .count();
1146  
                            .count();
1147  
            ts.it_value.tv_sec  = nsec / 1000000000;
1147  
            ts.it_value.tv_sec  = nsec / 1000000000;
1148  
            ts.it_value.tv_nsec = nsec % 1000000000;
1148  
            ts.it_value.tv_nsec = nsec % 1000000000;
1149  
            // Ensure non-zero to avoid disarming if duration rounds to 0
1149  
            // Ensure non-zero to avoid disarming if duration rounds to 0
1150  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1150  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1151  
                ts.it_value.tv_nsec = 1;
1151  
                ts.it_value.tv_nsec = 1;
1152  
        }
1152  
        }
1153  
    }
1153  
    }
1154  

1154  

1155  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1155  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1156  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
1156  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
1157  
}
1157  
}
1158  

1158  

1159  
inline void
1159  
inline void
1160  
epoll_scheduler::run_task(
1160  
epoll_scheduler::run_task(
1161  
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1161  
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1162  
{
1162  
{
1163  
    int timeout_ms = task_interrupted_ ? 0 : -1;
1163  
    int timeout_ms = task_interrupted_ ? 0 : -1;
1164  

1164  

1165  
    if (lock.owns_lock())
1165  
    if (lock.owns_lock())
1166  
        lock.unlock();
1166  
        lock.unlock();
1167  

1167  

1168  
    task_cleanup on_exit{this, &lock, ctx};
1168  
    task_cleanup on_exit{this, &lock, ctx};
1169  

1169  

1170  
    // Flush deferred timerfd programming before blocking
1170  
    // Flush deferred timerfd programming before blocking
1171  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1171  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1172  
        update_timerfd();
1172  
        update_timerfd();
1173  

1173  

1174  
    // Event loop runs without mutex held
1174  
    // Event loop runs without mutex held
1175  
    epoll_event events[128];
1175  
    epoll_event events[128];
1176  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1176  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1177  

1177  

1178  
    if (nfds < 0 && errno != EINTR)
1178  
    if (nfds < 0 && errno != EINTR)
1179  
        detail::throw_system_error(make_err(errno), "epoll_wait");
1179  
        detail::throw_system_error(make_err(errno), "epoll_wait");
1180  

1180  

1181  
    bool check_timers = false;
1181  
    bool check_timers = false;
1182  
    op_queue local_ops;
1182  
    op_queue local_ops;
1183  

1183  

1184  
    // Process events without holding the mutex
1184  
    // Process events without holding the mutex
1185  
    for (int i = 0; i < nfds; ++i)
1185  
    for (int i = 0; i < nfds; ++i)
1186  
    {
1186  
    {
1187  
        if (events[i].data.ptr == nullptr)
1187  
        if (events[i].data.ptr == nullptr)
1188  
        {
1188  
        {
1189  
            std::uint64_t val;
1189  
            std::uint64_t val;
1190  
            // Mutex released above; analyzer can't track unlock via ref
1190  
            // Mutex released above; analyzer can't track unlock via ref
1191  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1191  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1192  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1192  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1193  
            eventfd_armed_.store(false, std::memory_order_relaxed);
1193  
            eventfd_armed_.store(false, std::memory_order_relaxed);
1194  
            continue;
1194  
            continue;
1195  
        }
1195  
        }
1196  

1196  

1197  
        if (events[i].data.ptr == &timer_fd_)
1197  
        if (events[i].data.ptr == &timer_fd_)
1198  
        {
1198  
        {
1199  
            std::uint64_t expirations;
1199  
            std::uint64_t expirations;
1200  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1200  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1201  
            [[maybe_unused]] auto r =
1201  
            [[maybe_unused]] auto r =
1202  
                ::read(timer_fd_, &expirations, sizeof(expirations));
1202  
                ::read(timer_fd_, &expirations, sizeof(expirations));
1203  
            check_timers = true;
1203  
            check_timers = true;
1204  
            continue;
1204  
            continue;
1205  
        }
1205  
        }
1206  

1206  

1207  
        // Deferred I/O: just set ready events and enqueue descriptor
1207  
        // Deferred I/O: just set ready events and enqueue descriptor
1208  
        // No per-descriptor mutex locking in reactor hot path!
1208  
        // No per-descriptor mutex locking in reactor hot path!
1209  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1209  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1210  
        desc->add_ready_events(events[i].events);
1210  
        desc->add_ready_events(events[i].events);
1211  

1211  

1212  
        // Only enqueue if not already enqueued
1212  
        // Only enqueue if not already enqueued
1213  
        bool expected = false;
1213  
        bool expected = false;
1214  
        if (desc->is_enqueued_.compare_exchange_strong(
1214  
        if (desc->is_enqueued_.compare_exchange_strong(
1215  
                expected, true, std::memory_order_release,
1215  
                expected, true, std::memory_order_release,
1216  
                std::memory_order_relaxed))
1216  
                std::memory_order_relaxed))
1217  
        {
1217  
        {
1218  
            local_ops.push(desc);
1218  
            local_ops.push(desc);
1219  
        }
1219  
        }
1220  
    }
1220  
    }
1221  

1221  

1222  
    // Process timers only when timerfd fires
1222  
    // Process timers only when timerfd fires
1223  
    if (check_timers)
1223  
    if (check_timers)
1224  
    {
1224  
    {
1225  
        timer_svc_->process_expired();
1225  
        timer_svc_->process_expired();
1226  
        update_timerfd();
1226  
        update_timerfd();
1227  
    }
1227  
    }
1228  

1228  

1229  
    lock.lock();
1229  
    lock.lock();
1230  

1230  

1231  
    if (!local_ops.empty())
1231  
    if (!local_ops.empty())
1232  
        completed_ops_.splice(local_ops);
1232  
        completed_ops_.splice(local_ops);
1233  
}
1233  
}
1234  

1234  

1235  
inline std::size_t
1235  
inline std::size_t
1236  
epoll_scheduler::do_one(
1236  
epoll_scheduler::do_one(
1237  
    std::unique_lock<std::mutex>& lock,
1237  
    std::unique_lock<std::mutex>& lock,
1238  
    long timeout_us,
1238  
    long timeout_us,
1239  
    epoll::scheduler_context* ctx)
1239  
    epoll::scheduler_context* ctx)
1240  
{
1240  
{
1241  
    for (;;)
1241  
    for (;;)
1242  
    {
1242  
    {
1243  
        if (stopped_)
1243  
        if (stopped_)
1244  
            return 0;
1244  
            return 0;
1245  

1245  

1246  
        scheduler_op* op = completed_ops_.pop();
1246  
        scheduler_op* op = completed_ops_.pop();
1247  

1247  

1248  
        // Handle reactor sentinel - time to poll for I/O
1248  
        // Handle reactor sentinel - time to poll for I/O
1249  
        if (op == &task_op_)
1249  
        if (op == &task_op_)
1250  
        {
1250  
        {
1251  
            bool more_handlers = !completed_ops_.empty();
1251  
            bool more_handlers = !completed_ops_.empty();
1252  

1252  

1253  
            // Nothing to run the reactor for: no pending work to wait on,
1253  
            // Nothing to run the reactor for: no pending work to wait on,
1254  
            // or caller requested a non-blocking poll
1254  
            // or caller requested a non-blocking poll
1255  
            if (!more_handlers &&
1255  
            if (!more_handlers &&
1256  
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1256  
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1257  
                 timeout_us == 0))
1257  
                 timeout_us == 0))
1258  
            {
1258  
            {
1259  
                completed_ops_.push(&task_op_);
1259  
                completed_ops_.push(&task_op_);
1260  
                return 0;
1260  
                return 0;
1261  
            }
1261  
            }
1262  

1262  

1263  
            task_interrupted_ = more_handlers || timeout_us == 0;
1263  
            task_interrupted_ = more_handlers || timeout_us == 0;
1264  
            task_running_.store(true, std::memory_order_release);
1264  
            task_running_.store(true, std::memory_order_release);
1265  

1265  

1266  
            if (more_handlers)
1266  
            if (more_handlers)
1267  
                unlock_and_signal_one(lock);
1267  
                unlock_and_signal_one(lock);
1268  

1268  

1269  
            run_task(lock, ctx);
1269  
            run_task(lock, ctx);
1270  

1270  

1271  
            task_running_.store(false, std::memory_order_relaxed);
1271  
            task_running_.store(false, std::memory_order_relaxed);
1272  
            completed_ops_.push(&task_op_);
1272  
            completed_ops_.push(&task_op_);
1273  
            continue;
1273  
            continue;
1274  
        }
1274  
        }
1275  

1275  

1276  
        // Handle operation
1276  
        // Handle operation
1277  
        if (op != nullptr)
1277  
        if (op != nullptr)
1278  
        {
1278  
        {
1279  
            bool more = !completed_ops_.empty();
1279  
            bool more = !completed_ops_.empty();
1280  

1280  

1281  
            if (more)
1281  
            if (more)
1282  
                ctx->unassisted = !unlock_and_signal_one(lock);
1282  
                ctx->unassisted = !unlock_and_signal_one(lock);
1283  
            else
1283  
            else
1284  
            {
1284  
            {
1285  
                ctx->unassisted = false;
1285  
                ctx->unassisted = false;
1286  
                lock.unlock();
1286  
                lock.unlock();
1287  
            }
1287  
            }
1288  

1288  

1289  
            work_cleanup on_exit{this, &lock, ctx};
1289  
            work_cleanup on_exit{this, &lock, ctx};
1290  

1290  

1291  
            (*op)();
1291  
            (*op)();
1292  
            return 1;
1292  
            return 1;
1293  
        }
1293  
        }
1294  

1294  

1295  
        // No pending work to wait on, or caller requested non-blocking poll
1295  
        // No pending work to wait on, or caller requested non-blocking poll
1296  
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1296  
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1297  
            timeout_us == 0)
1297  
            timeout_us == 0)
1298  
            return 0;
1298  
            return 0;
1299  

1299  

1300  
        clear_signal();
1300  
        clear_signal();
1301  
        if (timeout_us < 0)
1301  
        if (timeout_us < 0)
1302  
            wait_for_signal(lock);
1302  
            wait_for_signal(lock);
1303  
        else
1303  
        else
1304  
            wait_for_signal_for(lock, timeout_us);
1304  
            wait_for_signal_for(lock, timeout_us);
1305  
    }
1305  
    }
1306  
}
1306  
}
1307  

1307  

1308  
} // namespace boost::corosio::detail
1308  
} // namespace boost::corosio::detail
1309  

1309  

1310  
#endif // BOOST_COROSIO_HAS_EPOLL
1310  
#endif // BOOST_COROSIO_HAS_EPOLL
1311  

1311  

1312  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
1312  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP