include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp
77.7% Lines (384/494)
85.4% Functions (41/48)
include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp
| Line | Hits | Source Code |
|---|---|---|
| 1 | // | |
| 2 | // Copyright (c) 2026 Steve Gerbino | |
| 3 | // | |
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
| 6 | // | |
| 7 | // Official repository: https://github.com/cppalliance/corosio | |
| 8 | // | |
| 9 | ||
| 10 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | |
| 11 | #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | |
| 12 | ||
| 13 | #include <boost/corosio/detail/platform.hpp> | |
| 14 | ||
| 15 | #if BOOST_COROSIO_HAS_EPOLL | |
| 16 | ||
| 17 | #include <boost/corosio/detail/config.hpp> | |
| 18 | #include <boost/capy/ex/execution_context.hpp> | |
| 19 | ||
| 20 | #include <boost/corosio/native/native_scheduler.hpp> | |
| 21 | #include <boost/corosio/detail/scheduler_op.hpp> | |
| 22 | ||
| 23 | #include <boost/corosio/native/detail/epoll/epoll_op.hpp> | |
| 24 | #include <boost/corosio/detail/timer_service.hpp> | |
| 25 | #include <boost/corosio/detail/make_err.hpp> | |
| 26 | #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp> | |
| 27 | #include <boost/corosio/native/detail/posix/posix_signal_service.hpp> | |
| 28 | ||
| 29 | #include <boost/corosio/detail/except.hpp> | |
| 30 | #include <boost/corosio/detail/thread_local_ptr.hpp> | |
| 31 | ||
| 32 | #include <atomic> | |
| 33 | #include <chrono> | |
| 34 | #include <condition_variable> | |
| 35 | #include <cstddef> | |
| 36 | #include <cstdint> | |
| 37 | #include <limits> | |
| 38 | #include <mutex> | |
| 39 | #include <utility> | |
| 40 | ||
| 41 | #include <errno.h> | |
| 42 | #include <fcntl.h> | |
| 43 | #include <sys/epoll.h> | |
| 44 | #include <sys/eventfd.h> | |
| 45 | #include <sys/socket.h> | |
| 46 | #include <sys/timerfd.h> | |
| 47 | #include <unistd.h> | |
| 48 | ||
| 49 | namespace boost::corosio::detail { | |
| 50 | ||
| 51 | struct epoll_op; | |
| 52 | struct descriptor_state; | |
| 53 | namespace epoll { | |
| 54 | struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context; | |
| 55 | } // namespace epoll | |
| 56 | ||
| 57 | /** Linux scheduler using epoll for I/O multiplexing. | |
| 58 | ||
| 59 | This scheduler implements the scheduler interface using Linux epoll | |
| 60 | for efficient I/O event notification. It uses a single reactor model | |
| 61 | where one thread runs epoll_wait while other threads | |
| 62 | wait on a condition variable for handler work. This design provides: | |
| 63 | ||
| 64 | - Handler parallelism: N posted handlers can execute on N threads | |
| 65 | - No thundering herd: condition_variable wakes exactly one thread | |
| 66 | - IOCP parity: Behavior matches Windows I/O completion port semantics | |
| 67 | ||
| 68 | When threads call run(), they first try to execute queued handlers. | |
| 69 | If the queue is empty and no reactor is running, one thread becomes | |
| 70 | the reactor and runs epoll_wait. Other threads wait on a condition | |
| 71 | variable until handlers are available. | |
| 72 | ||
| 73 | @par Thread Safety | |
| 74 | All public member functions are thread-safe. | |
| 75 | */ | |
| 76 | class BOOST_COROSIO_DECL epoll_scheduler final | |
| 77 | : public native_scheduler | |
| 78 | , public capy::execution_context::service | |
| 79 | { | |
| 80 | public: | |
| 81 | using key_type = scheduler; | |
| 82 | ||
| 83 | /** Construct the scheduler. | |
| 84 | ||
| 85 | Creates an epoll instance, eventfd for reactor interruption, | |
| 86 | and timerfd for kernel-managed timer expiry. | |
| 87 | ||
| 88 | @param ctx Reference to the owning execution_context. | |
| 89 | @param concurrency_hint Hint for expected thread count (unused). | |
| 90 | */ | |
| 91 | epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1); | |
| 92 | ||
| 93 | /// Destroy the scheduler. | |
| 94 | ~epoll_scheduler() override; | |
| 95 | ||
| 96 | epoll_scheduler(epoll_scheduler const&) = delete; | |
| 97 | epoll_scheduler& operator=(epoll_scheduler const&) = delete; | |
| 98 | ||
| 99 | void shutdown() override; | |
| 100 | void post(std::coroutine_handle<> h) const override; | |
| 101 | void post(scheduler_op* h) const override; | |
| 102 | bool running_in_this_thread() const noexcept override; | |
| 103 | void stop() override; | |
| 104 | bool stopped() const noexcept override; | |
| 105 | void restart() override; | |
| 106 | std::size_t run() override; | |
| 107 | std::size_t run_one() override; | |
| 108 | std::size_t wait_one(long usec) override; | |
| 109 | std::size_t poll() override; | |
| 110 | std::size_t poll_one() override; | |
| 111 | ||
| 112 | /** Return the epoll file descriptor. | |
| 113 | ||
| 114 | Used by socket services to register file descriptors | |
| 115 | for I/O event notification. | |
| 116 | ||
| 117 | @return The epoll file descriptor. | |
| 118 | */ | |
| 119 | int epoll_fd() const noexcept | |
| 120 | { | |
| 121 | return epoll_fd_; | |
| 122 | } | |
| 123 | ||
| 124 | /** Reset the thread's inline completion budget. | |
| 125 | ||
| 126 | Called at the start of each posted completion handler to | |
| 127 | grant a fresh budget for speculative inline completions. | |
| 128 | */ | |
| 129 | void reset_inline_budget() const noexcept; | |
| 130 | ||
| 131 | /** Consume one unit of inline budget if available. | |
| 132 | ||
| 133 | @return True if budget was available and consumed. | |
| 134 | */ | |
| 135 | bool try_consume_inline_budget() const noexcept; | |
| 136 | ||
| 137 | /** Register a descriptor for persistent monitoring. | |
| 138 | ||
| 139 | The fd is registered once and stays registered until explicitly | |
| 140 | deregistered. Events are dispatched via descriptor_state which | |
| 141 | tracks pending read/write/connect operations. | |
| 142 | ||
| 143 | @param fd The file descriptor to register. | |
| 144 | @param desc Pointer to descriptor data (stored in epoll_event.data.ptr). | |
| 145 | */ | |
| 146 | void register_descriptor(int fd, descriptor_state* desc) const; | |
| 147 | ||
| 148 | /** Deregister a persistently registered descriptor. | |
| 149 | ||
| 150 | @param fd The file descriptor to deregister. | |
| 151 | */ | |
| 152 | void deregister_descriptor(int fd) const; | |
| 153 | ||
| 154 | void work_started() noexcept override; | |
| 155 | void work_finished() noexcept override; | |
| 156 | ||
| 157 | /** Offset a forthcoming work_finished from work_cleanup. | |
| 158 | ||
| 159 | Called by descriptor_state when all I/O returned EAGAIN and no | |
| 160 | handler will be executed. Must be called from a scheduler thread. | |
| 161 | */ | |
| 162 | void compensating_work_started() const noexcept; | |
| 163 | ||
| 164 | /** Drain work from thread context's private queue to global queue. | |
| 165 | ||
| 166 | Called by thread_context_guard destructor when a thread exits run(). | |
| 167 | Transfers pending work to the global queue under mutex protection. | |
| 168 | ||
| 169 | @param queue The private queue to drain. | |
| 170 | @param count Item count for wakeup decisions (wakes other threads if positive). | |
| 171 | */ | |
| 172 | void drain_thread_queue(op_queue& queue, long count) const; | |
| 173 | ||
| 174 | /** Post completed operations for deferred invocation. | |
| 175 | ||
| 176 | If called from a thread running this scheduler, operations go to | |
| 177 | the thread's private queue (fast path). Otherwise, operations are | |
| 178 | added to the global queue under mutex and a waiter is signaled. | |
| 179 | ||
| 180 | @par Preconditions | |
| 181 | work_started() must have been called for each operation. | |
| 182 | ||
| 183 | @param ops Queue of operations to post. | |
| 184 | */ | |
| 185 | void post_deferred_completions(op_queue& ops) const; | |
| 186 | ||
| 187 | private: | |
| 188 | struct work_cleanup | |
| 189 | { | |
| 190 | epoll_scheduler* scheduler; | |
| 191 | std::unique_lock<std::mutex>* lock; | |
| 192 | epoll::scheduler_context* ctx; | |
| 193 | ~work_cleanup(); | |
| 194 | }; | |
| 195 | ||
| 196 | struct task_cleanup | |
| 197 | { | |
| 198 | epoll_scheduler const* scheduler; | |
| 199 | std::unique_lock<std::mutex>* lock; | |
| 200 | epoll::scheduler_context* ctx; | |
| 201 | ~task_cleanup(); | |
| 202 | }; | |
| 203 | ||
| 204 | std::size_t do_one( | |
| 205 | std::unique_lock<std::mutex>& lock, | |
| 206 | long timeout_us, | |
| 207 | epoll::scheduler_context* ctx); | |
| 208 | void | |
| 209 | run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx); | |
| 210 | void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const; | |
| 211 | void interrupt_reactor() const; | |
| 212 | void update_timerfd() const; | |
| 213 | ||
| 214 | /** Set the signaled state and wake all waiting threads. | |
| 215 | ||
| 216 | @par Preconditions | |
| 217 | Mutex must be held. | |
| 218 | ||
| 219 | @param lock The held mutex lock. | |
| 220 | */ | |
| 221 | void signal_all(std::unique_lock<std::mutex>& lock) const; | |
| 222 | ||
| 223 | /** Set the signaled state and wake one waiter if any exist. | |
| 224 | ||
| 225 | Only unlocks and signals if at least one thread is waiting. | |
| 226 | Use this when the caller needs to perform a fallback action | |
| 227 | (such as interrupting the reactor) when no waiters exist. | |
| 228 | ||
| 229 | @par Preconditions | |
| 230 | Mutex must be held. | |
| 231 | ||
| 232 | @param lock The held mutex lock. | |
| 233 | ||
| 234 | @return `true` if unlocked and signaled, `false` if lock still held. | |
| 235 | */ | |
| 236 | bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const; | |
| 237 | ||
| 238 | /** Set the signaled state, unlock, and wake one waiter if any exist. | |
| 239 | ||
| 240 | Always unlocks the mutex. Use this when the caller will release | |
| 241 | the lock regardless of whether a waiter exists. | |
| 242 | ||
| 243 | @par Preconditions | |
| 244 | Mutex must be held. | |
| 245 | ||
| 246 | @param lock The held mutex lock. | |
| 247 | ||
| 248 | @return `true` if a waiter was signaled, `false` otherwise. | |
| 249 | */ | |
| 250 | bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const; | |
| 251 | ||
| 252 | /** Clear the signaled state before waiting. | |
| 253 | ||
| 254 | @par Preconditions | |
| 255 | Mutex must be held. | |
| 256 | */ | |
| 257 | void clear_signal() const; | |
| 258 | ||
| 259 | /** Block until the signaled state is set. | |
| 260 | ||
| 261 | Returns immediately if already signaled (fast-path). Otherwise | |
| 262 | increments the waiter count, waits on the condition variable, | |
| 263 | and decrements the waiter count upon waking. | |
| 264 | ||
| 265 | @par Preconditions | |
| 266 | Mutex must be held. | |
| 267 | ||
| 268 | @param lock The held mutex lock. | |
| 269 | */ | |
| 270 | void wait_for_signal(std::unique_lock<std::mutex>& lock) const; | |
| 271 | ||
| 272 | /** Block until signaled or timeout expires. | |
| 273 | ||
| 274 | @par Preconditions | |
| 275 | Mutex must be held. | |
| 276 | ||
| 277 | @param lock The held mutex lock. | |
| 278 | @param timeout_us Maximum time to wait in microseconds. | |
| 279 | */ | |
| 280 | void wait_for_signal_for( | |
| 281 | std::unique_lock<std::mutex>& lock, long timeout_us) const; | |
| 282 | ||
| 283 | int epoll_fd_; | |
| 284 | int event_fd_; // for interrupting reactor | |
| 285 | int timer_fd_; // timerfd for kernel-managed timer expiry | |
| 286 | mutable std::mutex mutex_; | |
| 287 | mutable std::condition_variable cond_; | |
| 288 | mutable op_queue completed_ops_; | |
| 289 | mutable std::atomic<long> outstanding_work_; | |
| 290 | bool stopped_; | |
| 291 | bool shutdown_; | |
| 292 | ||
| 293 | // True while a thread is blocked in epoll_wait. Used by | |
| 294 | // wake_one_thread_and_unlock and work_finished to know when | |
| 295 | // an eventfd interrupt is needed instead of a condvar signal. | |
| 296 | mutable std::atomic<bool> task_running_{false}; | |
| 297 | ||
| 298 | // True when the reactor has been told to do a non-blocking poll | |
| 299 | // (more handlers queued or poll mode). Prevents redundant eventfd | |
| 300 | // writes and controls the epoll_wait timeout. | |
| 301 | mutable bool task_interrupted_ = false; | |
| 302 | ||
| 303 | // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2) | |
| 304 | mutable std::size_t state_ = 0; | |
| 305 | ||
| 306 | // Edge-triggered eventfd state | |
| 307 | mutable std::atomic<bool> eventfd_armed_{false}; | |
| 308 | ||
| 309 | // Set when the earliest timer changes; flushed before epoll_wait | |
| 310 | // blocks. Avoids timerfd_settime syscalls for timers that are | |
| 311 | // scheduled then cancelled without being waited on. | |
| 312 | mutable std::atomic<bool> timerfd_stale_{false}; | |
| 313 | ||
| 314 | // Sentinel operation for interleaving reactor runs with handler execution. | |
| 315 | // Ensures the reactor runs periodically even when handlers are continuously | |
| 316 | // posted, preventing starvation of I/O events, timers, and signals. | |
| 317 | struct task_op final : scheduler_op | |
| 318 | { | |
| 319 | ✗ | void operator()() override {} |
| 320 | ✗ | void destroy() override {} |
| 321 | }; | |
| 322 | task_op task_op_; | |
| 323 | }; | |
| 324 | ||
| 325 | //-------------------------------------------------------------------------- | |
| 326 | // | |
| 327 | // Implementation | |
| 328 | // | |
| 329 | //-------------------------------------------------------------------------- | |
| 330 | ||
| 331 | /* | |
| 332 | epoll Scheduler - Single Reactor Model | |
| 333 | ====================================== | |
| 334 | ||
| 335 | This scheduler uses a thread coordination strategy to provide handler | |
| 336 | parallelism and avoid the thundering herd problem. | |
| 337 | Instead of all threads blocking on epoll_wait(), one thread becomes the | |
| 338 | "reactor" while others wait on a condition variable for handler work. | |
| 339 | ||
| 340 | Thread Model | |
| 341 | ------------ | |
| 342 | - ONE thread runs epoll_wait() at a time (the reactor thread) | |
| 343 | - OTHER threads wait on cond_ (condition variable) for handlers | |
| 344 | - When work is posted, exactly one waiting thread wakes via notify_one() | |
| 345 | - This matches Windows IOCP semantics where N posted items wake N threads | |
| 346 | ||
| 347 | Event Loop Structure (do_one) | |
| 348 | ----------------------------- | |
| 349 | 1. Lock mutex, try to pop handler from queue | |
| 350 | 2. If got handler: execute it (unlocked), return | |
| 351 | 3. If queue empty and no reactor running: become reactor | |
| 352 | - Run epoll_wait (unlocked), queue I/O completions, loop back | |
| 353 | 4. If queue empty and reactor running: wait on condvar for work | |
| 354 | ||
| 355 | The task_running_ flag ensures only one thread owns epoll_wait(). | |
| 356 | After the reactor queues I/O completions, it loops back to try getting | |
| 357 | a handler, giving priority to handler execution over more I/O polling. | |
| 358 | ||
| 359 | Signaling State (state_) | |
| 360 | ------------------------ | |
| 361 | The state_ variable encodes two pieces of information: | |
| 362 | - Bit 0: signaled flag (1 = signaled, persists until cleared) | |
| 363 | - Upper bits: waiter count (each waiter adds 2 before blocking) | |
| 364 | ||
| 365 | This allows efficient coordination: | |
| 366 | - Signalers only call notify when waiters exist (state_ > 1) | |
| 367 | - Waiters check if already signaled before blocking (fast-path) | |
| 368 | ||
| 369 | Wake Coordination (wake_one_thread_and_unlock) | |
| 370 | ---------------------------------------------- | |
| 371 | When posting work: | |
| 372 | - If waiters exist (state_ > 1): signal and notify_one() | |
| 373 | - Else if reactor running: interrupt via eventfd write | |
| 374 | - Else: no-op (thread will find work when it checks queue) | |
| 375 | ||
| 376 | This avoids waking threads unnecessarily. With cascading wakes, | |
| 377 | each handler execution wakes at most one additional thread if | |
| 378 | more work exists in the queue. | |
| 379 | ||
| 380 | Work Counting | |
| 381 | ------------- | |
| 382 | outstanding_work_ tracks pending operations. When it hits zero, run() | |
| 383 | returns. Each operation increments on start, decrements on completion. | |
| 384 | ||
| 385 | Timer Integration | |
| 386 | ----------------- | |
| 387 | Timers are handled by timer_service. The reactor adjusts epoll_wait | |
| 388 | timeout to wake for the nearest timer expiry. When a new timer is | |
| 389 | scheduled earlier than current, timer_service calls interrupt_reactor() | |
| 390 | to re-evaluate the timeout. | |
| 391 | */ | |
| 392 | ||
| 393 | namespace epoll { | |
| 394 | ||
| 395 | struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context | |
| 396 | { | |
| 397 | epoll_scheduler const* key; | |
| 398 | scheduler_context* next; | |
| 399 | op_queue private_queue; | |
| 400 | long private_outstanding_work; | |
| 401 | int inline_budget; | |
| 402 | int inline_budget_max; | |
| 403 | bool unassisted; | |
| 404 | ||
| 405 | 191 | scheduler_context(epoll_scheduler const* k, scheduler_context* n) |
| 406 | 191 | : key(k) |
| 407 | 191 | , next(n) |
| 408 | 191 | , private_outstanding_work(0) |
| 409 | 191 | , inline_budget(0) |
| 410 | 191 | , inline_budget_max(2) |
| 411 | 191 | , unassisted(false) |
| 412 | { | |
| 413 | 191 | } |
| 414 | }; | |
| 415 | ||
| 416 | inline thread_local_ptr<scheduler_context> context_stack; | |
| 417 | ||
| 418 | struct thread_context_guard | |
| 419 | { | |
| 420 | scheduler_context frame_; | |
| 421 | ||
| 422 | 191 | explicit thread_context_guard(epoll_scheduler const* ctx) noexcept |
| 423 | 191 | : frame_(ctx, context_stack.get()) |
| 424 | { | |
| 425 | 191 | context_stack.set(&frame_); |
| 426 | 191 | } |
| 427 | ||
| 428 | 191 | ~thread_context_guard() noexcept |
| 429 | { | |
| 430 | 191 | if (!frame_.private_queue.empty()) |
| 431 | ✗ | frame_.key->drain_thread_queue( |
| 432 | ✗ | frame_.private_queue, frame_.private_outstanding_work); |
| 433 | 191 | context_stack.set(frame_.next); |
| 434 | 191 | } |
| 435 | }; | |
| 436 | ||
| 437 | inline scheduler_context* | |
| 438 | 393275 | find_context(epoll_scheduler const* self) noexcept |
| 439 | { | |
| 440 | 393275 | for (auto* c = context_stack.get(); c != nullptr; c = c->next) |
| 441 | 391590 | if (c->key == self) |
| 442 | 391590 | return c; |
| 443 | 1685 | return nullptr; |
| 444 | } | |
| 445 | ||
| 446 | } // namespace epoll | |
| 447 | ||
| 448 | inline void | |
| 449 | 57516 | epoll_scheduler::reset_inline_budget() const noexcept |
| 450 | { | |
| 451 | 57516 | if (auto* ctx = epoll::find_context(this)) |
| 452 | { | |
| 453 | // Cap when no other thread absorbed queued work. A moderate | |
| 454 | // cap (4) amortizes scheduling for small buffers while avoiding | |
| 455 | // bursty I/O that fills socket buffers and stalls large transfers. | |
| 456 | 57516 | if (ctx->unassisted) |
| 457 | { | |
| 458 | 57516 | ctx->inline_budget_max = 4; |
| 459 | 57516 | ctx->inline_budget = 4; |
| 460 | 57516 | return; |
| 461 | } | |
| 462 | // Ramp up when previous cycle fully consumed budget. | |
| 463 | // Reset on partial consumption (EAGAIN hit or peer got scheduled). | |
| 464 | ✗ | if (ctx->inline_budget == 0) |
| 465 | ✗ | ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16); |
| 466 | ✗ | else if (ctx->inline_budget < ctx->inline_budget_max) |
| 467 | ✗ | ctx->inline_budget_max = 2; |
| 468 | ✗ | ctx->inline_budget = ctx->inline_budget_max; |
| 469 | } | |
| 470 | } | |
| 471 | ||
| 472 | inline bool | |
| 473 | 242589 | epoll_scheduler::try_consume_inline_budget() const noexcept |
| 474 | { | |
| 475 | 242589 | if (auto* ctx = epoll::find_context(this)) |
| 476 | { | |
| 477 | 242589 | if (ctx->inline_budget > 0) |
| 478 | { | |
| 479 | 194146 | --ctx->inline_budget; |
| 480 | 194146 | return true; |
| 481 | } | |
| 482 | } | |
| 483 | 48443 | return false; |
| 484 | } | |
| 485 | ||
| 486 | inline void | |
| 487 | 42210 | descriptor_state::operator()() |
| 488 | { | |
| 489 | 42210 | is_enqueued_.store(false, std::memory_order_relaxed); |
| 490 | ||
| 491 | // Take ownership of impl ref set by close_socket() to prevent | |
| 492 | // the owning impl from being freed while we're executing | |
| 493 | 42210 | auto prevent_impl_destruction = std::move(impl_ref_); |
| 494 | ||
| 495 | 42210 | std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire); |
| 496 | 42210 | if (ev == 0) |
| 497 | { | |
| 498 | ✗ | scheduler_->compensating_work_started(); |
| 499 | ✗ | return; |
| 500 | } | |
| 501 | ||
| 502 | 42210 | op_queue local_ops; |
| 503 | ||
| 504 | 42210 | int err = 0; |
| 505 | 42210 | if (ev & EPOLLERR) |
| 506 | { | |
| 507 | ✗ | socklen_t len = sizeof(err); |
| 508 | ✗ | if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0) |
| 509 | ✗ | err = errno; |
| 510 | ✗ | if (err == 0) |
| 511 | ✗ | err = EIO; |
| 512 | } | |
| 513 | ||
| 514 | { | |
| 515 | 42210 | std::lock_guard lock(mutex); |
| 516 | 42210 | if (ev & EPOLLIN) |
| 517 | { | |
| 518 | 13412 | if (read_op) |
| 519 | { | |
| 520 | 4433 | auto* rd = read_op; |
| 521 | 4433 | if (err) |
| 522 | ✗ | rd->complete(err, 0); |
| 523 | else | |
| 524 | 4433 | rd->perform_io(); |
| 525 | ||
| 526 | 4433 | if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK) |
| 527 | { | |
| 528 | ✗ | rd->errn = 0; |
| 529 | } | |
| 530 | else | |
| 531 | { | |
| 532 | 4433 | read_op = nullptr; |
| 533 | 4433 | local_ops.push(rd); |
| 534 | } | |
| 535 | } | |
| 536 | else | |
| 537 | { | |
| 538 | 8979 | read_ready = true; |
| 539 | } | |
| 540 | } | |
| 541 | 42210 | if (ev & EPOLLOUT) |
| 542 | { | |
| 543 | 37781 | bool had_write_op = (connect_op || write_op); |
| 544 | 37781 | if (connect_op) |
| 545 | { | |
| 546 | 4432 | auto* cn = connect_op; |
| 547 | 4432 | if (err) |
| 548 | ✗ | cn->complete(err, 0); |
| 549 | else | |
| 550 | 4432 | cn->perform_io(); |
| 551 | 4432 | connect_op = nullptr; |
| 552 | 4432 | local_ops.push(cn); |
| 553 | } | |
| 554 | 37781 | if (write_op) |
| 555 | { | |
| 556 | ✗ | auto* wr = write_op; |
| 557 | ✗ | if (err) |
| 558 | ✗ | wr->complete(err, 0); |
| 559 | else | |
| 560 | ✗ | wr->perform_io(); |
| 561 | ||
| 562 | ✗ | if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK) |
| 563 | { | |
| 564 | ✗ | wr->errn = 0; |
| 565 | } | |
| 566 | else | |
| 567 | { | |
| 568 | ✗ | write_op = nullptr; |
| 569 | ✗ | local_ops.push(wr); |
| 570 | } | |
| 571 | } | |
| 572 | 37781 | if (!had_write_op) |
| 573 | 33349 | write_ready = true; |
| 574 | } | |
| 575 | 42210 | if (err) |
| 576 | { | |
| 577 | ✗ | if (read_op) |
| 578 | { | |
| 579 | ✗ | read_op->complete(err, 0); |
| 580 | ✗ | local_ops.push(std::exchange(read_op, nullptr)); |
| 581 | } | |
| 582 | ✗ | if (write_op) |
| 583 | { | |
| 584 | ✗ | write_op->complete(err, 0); |
| 585 | ✗ | local_ops.push(std::exchange(write_op, nullptr)); |
| 586 | } | |
| 587 | ✗ | if (connect_op) |
| 588 | { | |
| 589 | ✗ | connect_op->complete(err, 0); |
| 590 | ✗ | local_ops.push(std::exchange(connect_op, nullptr)); |
| 591 | } | |
| 592 | } | |
| 593 | 42210 | } |
| 594 | ||
| 595 | // Execute first handler inline — the scheduler's work_cleanup | |
| 596 | // accounts for this as the "consumed" work item | |
| 597 | 42210 | scheduler_op* first = local_ops.pop(); |
| 598 | 42210 | if (first) |
| 599 | { | |
| 600 | 8865 | scheduler_->post_deferred_completions(local_ops); |
| 601 | 8865 | (*first)(); |
| 602 | } | |
| 603 | else | |
| 604 | { | |
| 605 | 33345 | scheduler_->compensating_work_started(); |
| 606 | } | |
| 607 | 42210 | } |
| 608 | ||
| 609 | 205 | inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int) |
| 610 | 205 | : epoll_fd_(-1) |
| 611 | 205 | , event_fd_(-1) |
| 612 | 205 | , timer_fd_(-1) |
| 613 | 205 | , outstanding_work_(0) |
| 614 | 205 | , stopped_(false) |
| 615 | 205 | , shutdown_(false) |
| 616 | 205 | , task_running_{false} |
| 617 | 205 | , task_interrupted_(false) |
| 618 | 410 | , state_(0) |
| 619 | { | |
| 620 | 205 | epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC); |
| 621 | 205 | if (epoll_fd_ < 0) |
| 622 | ✗ | detail::throw_system_error(make_err(errno), "epoll_create1"); |
| 623 | ||
| 624 | 205 | event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); |
| 625 | 205 | if (event_fd_ < 0) |
| 626 | { | |
| 627 | ✗ | int errn = errno; |
| 628 | ✗ | ::close(epoll_fd_); |
| 629 | ✗ | detail::throw_system_error(make_err(errn), "eventfd"); |
| 630 | } | |
| 631 | ||
| 632 | 205 | timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); |
| 633 | 205 | if (timer_fd_ < 0) |
| 634 | { | |
| 635 | ✗ | int errn = errno; |
| 636 | ✗ | ::close(event_fd_); |
| 637 | ✗ | ::close(epoll_fd_); |
| 638 | ✗ | detail::throw_system_error(make_err(errn), "timerfd_create"); |
| 639 | } | |
| 640 | ||
| 641 | 205 | epoll_event ev{}; |
| 642 | 205 | ev.events = EPOLLIN | EPOLLET; |
| 643 | 205 | ev.data.ptr = nullptr; |
| 644 | 205 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0) |
| 645 | { | |
| 646 | ✗ | int errn = errno; |
| 647 | ✗ | ::close(timer_fd_); |
| 648 | ✗ | ::close(event_fd_); |
| 649 | ✗ | ::close(epoll_fd_); |
| 650 | ✗ | detail::throw_system_error(make_err(errn), "epoll_ctl"); |
| 651 | } | |
| 652 | ||
| 653 | 205 | epoll_event timer_ev{}; |
| 654 | 205 | timer_ev.events = EPOLLIN | EPOLLERR; |
| 655 | 205 | timer_ev.data.ptr = &timer_fd_; |
| 656 | 205 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0) |
| 657 | { | |
| 658 | ✗ | int errn = errno; |
| 659 | ✗ | ::close(timer_fd_); |
| 660 | ✗ | ::close(event_fd_); |
| 661 | ✗ | ::close(epoll_fd_); |
| 662 | ✗ | detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)"); |
| 663 | } | |
| 664 | ||
| 665 | 205 | timer_svc_ = &get_timer_service(ctx, *this); |
| 666 | 205 | timer_svc_->set_on_earliest_changed( |
| 667 | 4845 | timer_service::callback(this, [](void* p) { |
| 668 | 4640 | auto* self = static_cast<epoll_scheduler*>(p); |
| 669 | 4640 | self->timerfd_stale_.store(true, std::memory_order_release); |
| 670 | 4640 | if (self->task_running_.load(std::memory_order_acquire)) |
| 671 | ✗ | self->interrupt_reactor(); |
| 672 | 4640 | })); |
| 673 | ||
| 674 | // Initialize resolver service | |
| 675 | 205 | get_resolver_service(ctx, *this); |
| 676 | ||
| 677 | // Initialize signal service | |
| 678 | 205 | get_signal_service(ctx, *this); |
| 679 | ||
| 680 | // Push task sentinel to interleave reactor runs with handler execution | |
| 681 | 205 | completed_ops_.push(&task_op_); |
| 682 | 205 | } |
| 683 | ||
| 684 | 410 | inline epoll_scheduler::~epoll_scheduler() |
| 685 | { | |
| 686 | 205 | if (timer_fd_ >= 0) |
| 687 | 205 | ::close(timer_fd_); |
| 688 | 205 | if (event_fd_ >= 0) |
| 689 | 205 | ::close(event_fd_); |
| 690 | 205 | if (epoll_fd_ >= 0) |
| 691 | 205 | ::close(epoll_fd_); |
| 692 | 410 | } |
| 693 | ||
| 694 | inline void | |
| 695 | 205 | epoll_scheduler::shutdown() |
| 696 | { | |
| 697 | { | |
| 698 | 205 | std::unique_lock lock(mutex_); |
| 699 | 205 | shutdown_ = true; |
| 700 | ||
| 701 | 439 | while (auto* h = completed_ops_.pop()) |
| 702 | { | |
| 703 | 234 | if (h == &task_op_) |
| 704 | 205 | continue; |
| 705 | 29 | lock.unlock(); |
| 706 | 29 | h->destroy(); |
| 707 | 29 | lock.lock(); |
| 708 | 234 | } |
| 709 | ||
| 710 | 205 | signal_all(lock); |
| 711 | 205 | } |
| 712 | ||
| 713 | 205 | outstanding_work_.store(0, std::memory_order_release); |
| 714 | ||
| 715 | 205 | if (event_fd_ >= 0) |
| 716 | 205 | interrupt_reactor(); |
| 717 | 205 | } |
| 718 | ||
| 719 | inline void | |
| 720 | 6482 | epoll_scheduler::post(std::coroutine_handle<> h) const |
| 721 | { | |
| 722 | struct post_handler final : scheduler_op | |
| 723 | { | |
| 724 | std::coroutine_handle<> h_; | |
| 725 | ||
| 726 | 6482 | explicit post_handler(std::coroutine_handle<> h) : h_(h) {} |
| 727 | ||
| 728 | 12964 | ~post_handler() override = default; |
| 729 | ||
| 730 | 6482 | void operator()() override |
| 731 | { | |
| 732 | 6482 | auto h = h_; |
| 733 | 6482 | delete this; |
| 734 | 6482 | h.resume(); |
| 735 | 6482 | } |
| 736 | ||
| 737 | ✗ | void destroy() override |
| 738 | { | |
| 739 | ✗ | delete this; |
| 740 | ✗ | } |
| 741 | }; | |
| 742 | ||
| 743 | 6482 | auto ph = std::make_unique<post_handler>(h); |
| 744 | ||
| 745 | // Fast path: same thread posts to private queue | |
| 746 | // Only count locally; work_cleanup batches to global counter | |
| 747 | 6482 | if (auto* ctx = epoll::find_context(this)) |
| 748 | { | |
| 749 | 4823 | ++ctx->private_outstanding_work; |
| 750 | 4823 | ctx->private_queue.push(ph.release()); |
| 751 | 4823 | return; |
| 752 | } | |
| 753 | ||
| 754 | // Slow path: cross-thread post requires mutex | |
| 755 | 1659 | outstanding_work_.fetch_add(1, std::memory_order_relaxed); |
| 756 | ||
| 757 | 1659 | std::unique_lock lock(mutex_); |
| 758 | 1659 | completed_ops_.push(ph.release()); |
| 759 | 1659 | wake_one_thread_and_unlock(lock); |
| 760 | 6482 | } |
| 761 | ||
| 762 | inline void | |
| 763 | 53343 | epoll_scheduler::post(scheduler_op* h) const |
| 764 | { | |
| 765 | // Fast path: same thread posts to private queue | |
| 766 | // Only count locally; work_cleanup batches to global counter | |
| 767 | 53343 | if (auto* ctx = epoll::find_context(this)) |
| 768 | { | |
| 769 | 53317 | ++ctx->private_outstanding_work; |
| 770 | 53317 | ctx->private_queue.push(h); |
| 771 | 53317 | return; |
| 772 | } | |
| 773 | ||
| 774 | // Slow path: cross-thread post requires mutex | |
| 775 | 26 | outstanding_work_.fetch_add(1, std::memory_order_relaxed); |
| 776 | ||
| 777 | 26 | std::unique_lock lock(mutex_); |
| 778 | 26 | completed_ops_.push(h); |
| 779 | 26 | wake_one_thread_and_unlock(lock); |
| 780 | 26 | } |
| 781 | ||
| 782 | inline bool | |
| 783 | 702 | epoll_scheduler::running_in_this_thread() const noexcept |
| 784 | { | |
| 785 | 702 | for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next) |
| 786 | 456 | if (c->key == this) |
| 787 | 456 | return true; |
| 788 | 246 | return false; |
| 789 | } | |
| 790 | ||
| 791 | inline void | |
| 792 | 201 | epoll_scheduler::stop() |
| 793 | { | |
| 794 | 201 | std::unique_lock lock(mutex_); |
| 795 | 201 | if (!stopped_) |
| 796 | { | |
| 797 | 168 | stopped_ = true; |
| 798 | 168 | signal_all(lock); |
| 799 | 168 | interrupt_reactor(); |
| 800 | } | |
| 801 | 201 | } |
| 802 | ||
| 803 | inline bool | |
| 804 | 18 | epoll_scheduler::stopped() const noexcept |
| 805 | { | |
| 806 | 18 | std::unique_lock lock(mutex_); |
| 807 | 36 | return stopped_; |
| 808 | 18 | } |
| 809 | ||
| 810 | inline void | |
| 811 | 52 | epoll_scheduler::restart() |
| 812 | { | |
| 813 | 52 | std::unique_lock lock(mutex_); |
| 814 | 52 | stopped_ = false; |
| 815 | 52 | } |
| 816 | ||
| 817 | inline std::size_t | |
| 818 | 187 | epoll_scheduler::run() |
| 819 | { | |
| 820 | 374 | if (outstanding_work_.load(std::memory_order_acquire) == 0) |
| 821 | { | |
| 822 | 28 | stop(); |
| 823 | 28 | return 0; |
| 824 | } | |
| 825 | ||
| 826 | 159 | epoll::thread_context_guard ctx(this); |
| 827 | 159 | std::unique_lock lock(mutex_); |
| 828 | ||
| 829 | 159 | std::size_t n = 0; |
| 830 | for (;;) | |
| 831 | { | |
| 832 | 102161 | if (!do_one(lock, -1, &ctx.frame_)) |
| 833 | 159 | break; |
| 834 | 102002 | if (n != (std::numeric_limits<std::size_t>::max)()) |
| 835 | 102002 | ++n; |
| 836 | 102002 | if (!lock.owns_lock()) |
| 837 | 48519 | lock.lock(); |
| 838 | } | |
| 839 | 159 | return n; |
| 840 | 159 | } |
| 841 | ||
| 842 | inline std::size_t | |
| 843 | 2 | epoll_scheduler::run_one() |
| 844 | { | |
| 845 | 4 | if (outstanding_work_.load(std::memory_order_acquire) == 0) |
| 846 | { | |
| 847 | ✗ | stop(); |
| 848 | ✗ | return 0; |
| 849 | } | |
| 850 | ||
| 851 | 2 | epoll::thread_context_guard ctx(this); |
| 852 | 2 | std::unique_lock lock(mutex_); |
| 853 | 2 | return do_one(lock, -1, &ctx.frame_); |
| 854 | 2 | } |
| 855 | ||
| 856 | inline std::size_t | |
| 857 | 34 | epoll_scheduler::wait_one(long usec) |
| 858 | { | |
| 859 | 68 | if (outstanding_work_.load(std::memory_order_acquire) == 0) |
| 860 | { | |
| 861 | 7 | stop(); |
| 862 | 7 | return 0; |
| 863 | } | |
| 864 | ||
| 865 | 27 | epoll::thread_context_guard ctx(this); |
| 866 | 27 | std::unique_lock lock(mutex_); |
| 867 | 27 | return do_one(lock, usec, &ctx.frame_); |
| 868 | 27 | } |
| 869 | ||
| 870 | inline std::size_t | |
| 871 | 2 | epoll_scheduler::poll() |
| 872 | { | |
| 873 | 4 | if (outstanding_work_.load(std::memory_order_acquire) == 0) |
| 874 | { | |
| 875 | 1 | stop(); |
| 876 | 1 | return 0; |
| 877 | } | |
| 878 | ||
| 879 | 1 | epoll::thread_context_guard ctx(this); |
| 880 | 1 | std::unique_lock lock(mutex_); |
| 881 | ||
| 882 | 1 | std::size_t n = 0; |
| 883 | for (;;) | |
| 884 | { | |
| 885 | 3 | if (!do_one(lock, 0, &ctx.frame_)) |
| 886 | 1 | break; |
| 887 | 2 | if (n != (std::numeric_limits<std::size_t>::max)()) |
| 888 | 2 | ++n; |
| 889 | 2 | if (!lock.owns_lock()) |
| 890 | 2 | lock.lock(); |
| 891 | } | |
| 892 | 1 | return n; |
| 893 | 1 | } |
| 894 | ||
| 895 | inline std::size_t | |
| 896 | 4 | epoll_scheduler::poll_one() |
| 897 | { | |
| 898 | 8 | if (outstanding_work_.load(std::memory_order_acquire) == 0) |
| 899 | { | |
| 900 | 2 | stop(); |
| 901 | 2 | return 0; |
| 902 | } | |
| 903 | ||
| 904 | 2 | epoll::thread_context_guard ctx(this); |
| 905 | 2 | std::unique_lock lock(mutex_); |
| 906 | 2 | return do_one(lock, 0, &ctx.frame_); |
| 907 | 2 | } |
| 908 | ||
| 909 | inline void | |
| 910 | 8938 | epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const |
| 911 | { | |
| 912 | 8938 | epoll_event ev{}; |
| 913 | 8938 | ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; |
| 914 | 8938 | ev.data.ptr = desc; |
| 915 | ||
| 916 | 8938 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0) |
| 917 | ✗ | detail::throw_system_error(make_err(errno), "epoll_ctl (register)"); |
| 918 | ||
| 919 | 8938 | desc->registered_events = ev.events; |
| 920 | 8938 | desc->fd = fd; |
| 921 | 8938 | desc->scheduler_ = this; |
| 922 | ||
| 923 | 8938 | std::lock_guard lock(desc->mutex); |
| 924 | 8938 | desc->read_ready = false; |
| 925 | 8938 | desc->write_ready = false; |
| 926 | 8938 | } |
| 927 | ||
| 928 | inline void | |
| 929 | 8938 | epoll_scheduler::deregister_descriptor(int fd) const |
| 930 | { | |
| 931 | 8938 | ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr); |
| 932 | 8938 | } |
| 933 | ||
| 934 | inline void | |
| 935 | 14441 | epoll_scheduler::work_started() noexcept |
| 936 | { | |
| 937 | 14441 | outstanding_work_.fetch_add(1, std::memory_order_relaxed); |
| 938 | 14441 | } |
| 939 | ||
| 940 | inline void | |
| 941 | 20772 | epoll_scheduler::work_finished() noexcept |
| 942 | { | |
| 943 | 41544 | if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1) |
| 944 | 162 | stop(); |
| 945 | 20772 | } |
| 946 | ||
| 947 | inline void | |
| 948 | 33345 | epoll_scheduler::compensating_work_started() const noexcept |
| 949 | { | |
| 950 | 33345 | auto* ctx = epoll::find_context(this); |
| 951 | 33345 | if (ctx) |
| 952 | 33345 | ++ctx->private_outstanding_work; |
| 953 | 33345 | } |
| 954 | ||
| 955 | inline void | |
| 956 | ✗ | epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const |
| 957 | { | |
| 958 | // Note: outstanding_work_ was already incremented when posting | |
| 959 | ✗ | std::unique_lock lock(mutex_); |
| 960 | ✗ | completed_ops_.splice(queue); |
| 961 | ✗ | if (count > 0) |
| 962 | ✗ | maybe_unlock_and_signal_one(lock); |
| 963 | ✗ | } |
| 964 | ||
| 965 | inline void | |
| 966 | 8865 | epoll_scheduler::post_deferred_completions(op_queue& ops) const |
| 967 | { | |
| 968 | 8865 | if (ops.empty()) |
| 969 | 8865 | return; |
| 970 | ||
| 971 | // Fast path: if on scheduler thread, use private queue | |
| 972 | ✗ | if (auto* ctx = epoll::find_context(this)) |
| 973 | { | |
| 974 | ✗ | ctx->private_queue.splice(ops); |
| 975 | ✗ | return; |
| 976 | } | |
| 977 | ||
| 978 | // Slow path: add to global queue and wake a thread | |
| 979 | ✗ | std::unique_lock lock(mutex_); |
| 980 | ✗ | completed_ops_.splice(ops); |
| 981 | ✗ | wake_one_thread_and_unlock(lock); |
| 982 | ✗ | } |
| 983 | ||
| 984 | inline void | |
| 985 | 399 | epoll_scheduler::interrupt_reactor() const |
| 986 | { | |
| 987 | // Only write if not already armed to avoid redundant writes | |
| 988 | 399 | bool expected = false; |
| 989 | 399 | if (eventfd_armed_.compare_exchange_strong( |
| 990 | expected, true, std::memory_order_release, | |
| 991 | std::memory_order_relaxed)) | |
| 992 | { | |
| 993 | 277 | std::uint64_t val = 1; |
| 994 | 277 | [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val)); |
| 995 | } | |
| 996 | 399 | } |
| 997 | ||
| 998 | inline void | |
| 999 | 373 | epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const |
| 1000 | { | |
| 1001 | 373 | state_ |= 1; |
| 1002 | 373 | cond_.notify_all(); |
| 1003 | 373 | } |
| 1004 | ||
| 1005 | inline bool | |
| 1006 | 1685 | epoll_scheduler::maybe_unlock_and_signal_one( |
| 1007 | std::unique_lock<std::mutex>& lock) const | |
| 1008 | { | |
| 1009 | 1685 | state_ |= 1; |
| 1010 | 1685 | if (state_ > 1) |
| 1011 | { | |
| 1012 | ✗ | lock.unlock(); |
| 1013 | ✗ | cond_.notify_one(); |
| 1014 | ✗ | return true; |
| 1015 | } | |
| 1016 | 1685 | return false; |
| 1017 | } | |
| 1018 | ||
| 1019 | inline bool | |
| 1020 | 128761 | epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const |
| 1021 | { | |
| 1022 | 128761 | state_ |= 1; |
| 1023 | 128761 | bool have_waiters = state_ > 1; |
| 1024 | 128761 | lock.unlock(); |
| 1025 | 128761 | if (have_waiters) |
| 1026 | ✗ | cond_.notify_one(); |
| 1027 | 128761 | return have_waiters; |
| 1028 | } | |
| 1029 | ||
| 1030 | inline void | |
| 1031 | ✗ | epoll_scheduler::clear_signal() const |
| 1032 | { | |
| 1033 | ✗ | state_ &= ~std::size_t(1); |
| 1034 | ✗ | } |
| 1035 | ||
| 1036 | inline void | |
| 1037 | ✗ | epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const |
| 1038 | { | |
| 1039 | ✗ | while ((state_ & 1) == 0) |
| 1040 | { | |
| 1041 | ✗ | state_ += 2; |
| 1042 | ✗ | cond_.wait(lock); |
| 1043 | ✗ | state_ -= 2; |
| 1044 | } | |
| 1045 | ✗ | } |
| 1046 | ||
| 1047 | inline void | |
| 1048 | ✗ | epoll_scheduler::wait_for_signal_for( |
| 1049 | std::unique_lock<std::mutex>& lock, long timeout_us) const | |
| 1050 | { | |
| 1051 | ✗ | if ((state_ & 1) == 0) |
| 1052 | { | |
| 1053 | ✗ | state_ += 2; |
| 1054 | ✗ | cond_.wait_for(lock, std::chrono::microseconds(timeout_us)); |
| 1055 | ✗ | state_ -= 2; |
| 1056 | } | |
| 1057 | ✗ | } |
| 1058 | ||
| 1059 | inline void | |
| 1060 | 1685 | epoll_scheduler::wake_one_thread_and_unlock( |
| 1061 | std::unique_lock<std::mutex>& lock) const | |
| 1062 | { | |
| 1063 | 1685 | if (maybe_unlock_and_signal_one(lock)) |
| 1064 | ✗ | return; |
| 1065 | ||
| 1066 | 1685 | if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_) |
| 1067 | { | |
| 1068 | 26 | task_interrupted_ = true; |
| 1069 | 26 | lock.unlock(); |
| 1070 | 26 | interrupt_reactor(); |
| 1071 | } | |
| 1072 | else | |
| 1073 | { | |
| 1074 | 1659 | lock.unlock(); |
| 1075 | } | |
| 1076 | } | |
| 1077 | ||
| 1078 | 102035 | inline epoll_scheduler::work_cleanup::~work_cleanup() |
| 1079 | { | |
| 1080 | 102035 | if (ctx) |
| 1081 | { | |
| 1082 | 102035 | long produced = ctx->private_outstanding_work; |
| 1083 | 102035 | if (produced > 1) |
| 1084 | 7 | scheduler->outstanding_work_.fetch_add( |
| 1085 | produced - 1, std::memory_order_relaxed); | |
| 1086 | 102028 | else if (produced < 1) |
| 1087 | 15196 | scheduler->work_finished(); |
| 1088 | 102035 | ctx->private_outstanding_work = 0; |
| 1089 | ||
| 1090 | 102035 | if (!ctx->private_queue.empty()) |
| 1091 | { | |
| 1092 | 53494 | lock->lock(); |
| 1093 | 53494 | scheduler->completed_ops_.splice(ctx->private_queue); |
| 1094 | } | |
| 1095 | } | |
| 1096 | else | |
| 1097 | { | |
| 1098 | ✗ | scheduler->work_finished(); |
| 1099 | } | |
| 1100 | 102035 | } |
| 1101 | ||
| 1102 | 71642 | inline epoll_scheduler::task_cleanup::~task_cleanup() |
| 1103 | { | |
| 1104 | 35821 | if (!ctx) |
| 1105 | ✗ | return; |
| 1106 | ||
| 1107 | 35821 | if (ctx->private_outstanding_work > 0) |
| 1108 | { | |
| 1109 | 4633 | scheduler->outstanding_work_.fetch_add( |
| 1110 | 4633 | ctx->private_outstanding_work, std::memory_order_relaxed); |
| 1111 | 4633 | ctx->private_outstanding_work = 0; |
| 1112 | } | |
| 1113 | ||
| 1114 | 35821 | if (!ctx->private_queue.empty()) |
| 1115 | { | |
| 1116 | 4633 | if (!lock->owns_lock()) |
| 1117 | ✗ | lock->lock(); |
| 1118 | 4633 | scheduler->completed_ops_.splice(ctx->private_queue); |
| 1119 | } | |
| 1120 | 35821 | } |
| 1121 | ||
| 1122 | inline void | |
| 1123 | 9261 | epoll_scheduler::update_timerfd() const |
| 1124 | { | |
| 1125 | 9261 | auto nearest = timer_svc_->nearest_expiry(); |
| 1126 | ||
| 1127 | 9261 | itimerspec ts{}; |
| 1128 | 9261 | int flags = 0; |
| 1129 | ||
| 1130 | 9261 | if (nearest == timer_service::time_point::max()) |
| 1131 | { | |
| 1132 | // No timers - disarm by setting to 0 (relative) | |
| 1133 | } | |
| 1134 | else | |
| 1135 | { | |
| 1136 | 9217 | auto now = std::chrono::steady_clock::now(); |
| 1137 | 9217 | if (nearest <= now) |
| 1138 | { | |
| 1139 | // Use 1ns instead of 0 - zero disarms the timerfd | |
| 1140 | 192 | ts.it_value.tv_nsec = 1; |
| 1141 | } | |
| 1142 | else | |
| 1143 | { | |
| 1144 | 9025 | auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>( |
| 1145 | 9025 | nearest - now) |
| 1146 | 9025 | .count(); |
| 1147 | 9025 | ts.it_value.tv_sec = nsec / 1000000000; |
| 1148 | 9025 | ts.it_value.tv_nsec = nsec % 1000000000; |
| 1149 | // Ensure non-zero to avoid disarming if duration rounds to 0 | |
| 1150 | 9025 | if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0) |
| 1151 | ✗ | ts.it_value.tv_nsec = 1; |
| 1152 | } | |
| 1153 | } | |
| 1154 | ||
| 1155 | 9261 | if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0) |
| 1156 | ✗ | detail::throw_system_error(make_err(errno), "timerfd_settime"); |
| 1157 | 9261 | } |
| 1158 | ||
| 1159 | inline void | |
| 1160 | 35821 | epoll_scheduler::run_task( |
| 1161 | std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx) | |
| 1162 | { | |
| 1163 | 35821 | int timeout_ms = task_interrupted_ ? 0 : -1; |
| 1164 | ||
| 1165 | 35821 | if (lock.owns_lock()) |
| 1166 | 9095 | lock.unlock(); |
| 1167 | ||
| 1168 | 35821 | task_cleanup on_exit{this, &lock, ctx}; |
| 1169 | ||
| 1170 | // Flush deferred timerfd programming before blocking | |
| 1171 | 35821 | if (timerfd_stale_.exchange(false, std::memory_order_acquire)) |
| 1172 | 4628 | update_timerfd(); |
| 1173 | ||
| 1174 | // Event loop runs without mutex held | |
| 1175 | epoll_event events[128]; | |
| 1176 | 35821 | int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms); |
| 1177 | ||
| 1178 | 35821 | if (nfds < 0 && errno != EINTR) |
| 1179 | ✗ | detail::throw_system_error(make_err(errno), "epoll_wait"); |
| 1180 | ||
| 1181 | 35821 | bool check_timers = false; |
| 1182 | 35821 | op_queue local_ops; |
| 1183 | ||
| 1184 | // Process events without holding the mutex | |
| 1185 | 82765 | for (int i = 0; i < nfds; ++i) |
| 1186 | { | |
| 1187 | 46944 | if (events[i].data.ptr == nullptr) |
| 1188 | { | |
| 1189 | std::uint64_t val; | |
| 1190 | // Mutex released above; analyzer can't track unlock via ref | |
| 1191 | // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) | |
| 1192 | 72 | [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val)); |
| 1193 | 72 | eventfd_armed_.store(false, std::memory_order_relaxed); |
| 1194 | 72 | continue; |
| 1195 | 72 | } |
| 1196 | ||
| 1197 | 46872 | if (events[i].data.ptr == &timer_fd_) |
| 1198 | { | |
| 1199 | std::uint64_t expirations; | |
| 1200 | // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) | |
| 1201 | [[maybe_unused]] auto r = | |
| 1202 | 4633 | ::read(timer_fd_, &expirations, sizeof(expirations)); |
| 1203 | 4633 | check_timers = true; |
| 1204 | 4633 | continue; |
| 1205 | 4633 | } |
| 1206 | ||
| 1207 | // Deferred I/O: just set ready events and enqueue descriptor | |
| 1208 | // No per-descriptor mutex locking in reactor hot path! | |
| 1209 | 42239 | auto* desc = static_cast<descriptor_state*>(events[i].data.ptr); |
| 1210 | 42239 | desc->add_ready_events(events[i].events); |
| 1211 | ||
| 1212 | // Only enqueue if not already enqueued | |
| 1213 | 42239 | bool expected = false; |
| 1214 | 42239 | if (desc->is_enqueued_.compare_exchange_strong( |
| 1215 | expected, true, std::memory_order_release, | |
| 1216 | std::memory_order_relaxed)) | |
| 1217 | { | |
| 1218 | 42239 | local_ops.push(desc); |
| 1219 | } | |
| 1220 | } | |
| 1221 | ||
| 1222 | // Process timers only when timerfd fires | |
| 1223 | 35821 | if (check_timers) |
| 1224 | { | |
| 1225 | 4633 | timer_svc_->process_expired(); |
| 1226 | 4633 | update_timerfd(); |
| 1227 | } | |
| 1228 | ||
| 1229 | 35821 | lock.lock(); |
| 1230 | ||
| 1231 | 35821 | if (!local_ops.empty()) |
| 1232 | 26273 | completed_ops_.splice(local_ops); |
| 1233 | 35821 | } |
| 1234 | ||
| 1235 | inline std::size_t | |
| 1236 | 102195 | epoll_scheduler::do_one( |
| 1237 | std::unique_lock<std::mutex>& lock, | |
| 1238 | long timeout_us, | |
| 1239 | epoll::scheduler_context* ctx) | |
| 1240 | { | |
| 1241 | for (;;) | |
| 1242 | { | |
| 1243 | 138016 | if (stopped_) |
| 1244 | 160 | return 0; |
| 1245 | ||
| 1246 | 137856 | scheduler_op* op = completed_ops_.pop(); |
| 1247 | ||
| 1248 | // Handle reactor sentinel - time to poll for I/O | |
| 1249 | 137856 | if (op == &task_op_) |
| 1250 | { | |
| 1251 | 35821 | bool more_handlers = !completed_ops_.empty(); |
| 1252 | ||
| 1253 | // Nothing to run the reactor for: no pending work to wait on, | |
| 1254 | // or caller requested a non-blocking poll | |
| 1255 | 44916 | if (!more_handlers && |
| 1256 | 18190 | (outstanding_work_.load(std::memory_order_acquire) == 0 || |
| 1257 | timeout_us == 0)) | |
| 1258 | { | |
| 1259 | ✗ | completed_ops_.push(&task_op_); |
| 1260 | ✗ | return 0; |
| 1261 | } | |
| 1262 | ||
| 1263 | 35821 | task_interrupted_ = more_handlers || timeout_us == 0; |
| 1264 | 35821 | task_running_.store(true, std::memory_order_release); |
| 1265 | ||
| 1266 | 35821 | if (more_handlers) |
| 1267 | 26726 | unlock_and_signal_one(lock); |
| 1268 | ||
| 1269 | 35821 | run_task(lock, ctx); |
| 1270 | ||
| 1271 | 35821 | task_running_.store(false, std::memory_order_relaxed); |
| 1272 | 35821 | completed_ops_.push(&task_op_); |
| 1273 | 35821 | continue; |
| 1274 | 35821 | } |
| 1275 | ||
| 1276 | // Handle operation | |
| 1277 | 102035 | if (op != nullptr) |
| 1278 | { | |
| 1279 | 102035 | bool more = !completed_ops_.empty(); |
| 1280 | ||
| 1281 | 102035 | if (more) |
| 1282 | 102035 | ctx->unassisted = !unlock_and_signal_one(lock); |
| 1283 | else | |
| 1284 | { | |
| 1285 | ✗ | ctx->unassisted = false; |
| 1286 | ✗ | lock.unlock(); |
| 1287 | } | |
| 1288 | ||
| 1289 | 102035 | work_cleanup on_exit{this, &lock, ctx}; |
| 1290 | ||
| 1291 | 102035 | (*op)(); |
| 1292 | 102035 | return 1; |
| 1293 | 102035 | } |
| 1294 | ||
| 1295 | // No pending work to wait on, or caller requested non-blocking poll | |
| 1296 | ✗ | if (outstanding_work_.load(std::memory_order_acquire) == 0 || |
| 1297 | timeout_us == 0) | |
| 1298 | ✗ | return 0; |
| 1299 | ||
| 1300 | ✗ | clear_signal(); |
| 1301 | ✗ | if (timeout_us < 0) |
| 1302 | ✗ | wait_for_signal(lock); |
| 1303 | else | |
| 1304 | ✗ | wait_for_signal_for(lock, timeout_us); |
| 1305 | 35821 | } |
| 1306 | } | |
| 1307 | ||
| 1308 | } // namespace boost::corosio::detail | |
| 1309 | ||
| 1310 | #endif // BOOST_COROSIO_HAS_EPOLL | |
| 1311 | ||
| 1312 | #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | |
| 1313 |