LCOV - code coverage report
Current view: top level - corosio/detail - timer_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 87.3 % 354 309 45
Test Date: 2026-02-18 17:16:27 Functions: 95.6 % 45 43 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Steve Gerbino
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/corosio
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      12                 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      13                 : 
      14                 : #include <boost/corosio/timer.hpp>
      15                 : #include <boost/corosio/io_context.hpp>
      16                 : #include <boost/corosio/detail/scheduler_op.hpp>
      17                 : #include <boost/corosio/native/native_scheduler.hpp>
      18                 : #include <boost/corosio/detail/intrusive.hpp>
      19                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      20                 : #include <boost/capy/error.hpp>
      21                 : #include <boost/capy/ex/execution_context.hpp>
      22                 : #include <boost/capy/ex/executor_ref.hpp>
      23                 : #include <system_error>
      24                 : 
      25                 : #include <atomic>
      26                 : #include <chrono>
      27                 : #include <coroutine>
      28                 : #include <cstddef>
      29                 : #include <limits>
      30                 : #include <mutex>
      31                 : #include <optional>
      32                 : #include <stop_token>
      33                 : #include <vector>
      34                 : 
      35                 : namespace boost::corosio::detail {
      36                 : 
      37                 : struct scheduler;
      38                 : 
      39                 : /*
      40                 :     Timer Service
      41                 :     =============
      42                 : 
      43                 :     Data Structures
      44                 :     ---------------
      45                 :     waiter_node holds per-waiter state: coroutine handle, executor,
      46                 :     error output, stop_token, embedded completion_op. Each concurrent
      47                 :     co_await t.wait() allocates one waiter_node.
      48                 : 
      49                 :     timer_service::implementation holds per-timer state: expiry,
      50                 :     heap index, and an intrusive_list of waiter_nodes. Multiple
      51                 :     coroutines can wait on the same timer simultaneously.
      52                 : 
      53                 :     timer_service owns a min-heap of active timers, a free list
      54                 :     of recycled impls, and a free list of recycled waiter_nodes. The
      55                 :     heap is ordered by expiry time; the scheduler queries
      56                 :     nearest_expiry() to set the epoll/timerfd timeout.
      57                 : 
      58                 :     Optimization Strategy
      59                 :     ---------------------
      60                 :     1. Deferred heap insertion — expires_after() stores the expiry
      61                 :        but does not insert into the heap. Insertion happens in wait().
      62                 :     2. Thread-local impl cache — single-slot per-thread cache.
      63                 :     3. Embedded completion_op — eliminates heap allocation per fire/cancel.
      64                 :     4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
      65                 :     5. might_have_pending_waits_ flag — skips lock when no wait issued.
      66                 :     6. Thread-local waiter cache — single-slot per-thread cache.
      67                 : 
      68                 :     Concurrency
      69                 :     -----------
      70                 :     stop_token callbacks can fire from any thread. The impl_
      71                 :     pointer on waiter_node is used as a "still in list" marker.
      72                 : */
      73                 : 
      74                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
      75                 : 
      76                 : inline void timer_service_invalidate_cache() noexcept;
      77                 : 
      78                 : // timer_service class body — member function definitions are
      79                 : // out-of-class (after implementation and waiter_node are complete)
      80                 : class BOOST_COROSIO_DECL timer_service final
      81                 :     : public capy::execution_context::service
      82                 :     , public io_object::io_service
      83                 : {
      84                 : public:
      85                 :     using clock_type = std::chrono::steady_clock;
      86                 :     using time_point = clock_type::time_point;
      87                 : 
      88                 :     class callback
      89                 :     {
      90                 :         void* ctx_         = nullptr;
      91                 :         void (*fn_)(void*) = nullptr;
      92                 : 
      93                 :     public:
      94 HIT         340 :         callback() = default;
      95             340 :         callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
      96                 : 
      97                 :         explicit operator bool() const noexcept
      98                 :         {
      99                 :             return fn_ != nullptr;
     100                 :         }
     101            8297 :         void operator()() const
     102                 :         {
     103            8297 :             if (fn_)
     104            8297 :                 fn_(ctx_);
     105            8297 :         }
     106                 :     };
     107                 : 
     108                 :     struct implementation;
     109                 : 
     110                 : private:
     111                 :     struct heap_entry
     112                 :     {
     113                 :         time_point time_;
     114                 :         implementation* timer_;
     115                 :     };
     116                 : 
     117                 :     scheduler* sched_ = nullptr;
     118                 :     mutable std::mutex mutex_;
     119                 :     std::vector<heap_entry> heap_;
     120                 :     implementation* free_list_     = nullptr;
     121                 :     waiter_node* waiter_free_list_ = nullptr;
     122                 :     callback on_earliest_changed_;
     123                 :     // Avoids mutex in nearest_expiry() and empty()
     124                 :     mutable std::atomic<std::int64_t> cached_nearest_ns_{
     125                 :         (std::numeric_limits<std::int64_t>::max)()};
     126                 : 
     127                 : public:
     128             340 :     inline timer_service(capy::execution_context&, scheduler& sched)
     129             340 :         : sched_(&sched)
     130                 :     {
     131             340 :     }
     132                 : 
     133           16654 :     inline scheduler& get_scheduler() noexcept
     134                 :     {
     135           16654 :         return *sched_;
     136                 :     }
     137                 : 
     138             680 :     ~timer_service() override = default;
     139                 : 
     140                 :     timer_service(timer_service const&)            = delete;
     141                 :     timer_service& operator=(timer_service const&) = delete;
     142                 : 
     143             340 :     inline void set_on_earliest_changed(callback cb)
     144                 :     {
     145             340 :         on_earliest_changed_ = cb;
     146             340 :     }
     147                 : 
     148                 :     inline bool empty() const noexcept
     149                 :     {
     150                 :         return cached_nearest_ns_.load(std::memory_order_acquire) ==
     151                 :             (std::numeric_limits<std::int64_t>::max)();
     152                 :     }
     153                 : 
     154           19779 :     inline time_point nearest_expiry() const noexcept
     155                 :     {
     156           19779 :         auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
     157           19779 :         return time_point(time_point::duration(ns));
     158                 :     }
     159                 : 
     160                 :     inline void shutdown() override;
     161                 :     inline io_object::implementation* construct() override;
     162                 :     inline void destroy(io_object::implementation* p) override;
     163                 :     inline void destroy_impl(implementation& impl);
     164                 :     inline waiter_node* create_waiter();
     165                 :     inline void destroy_waiter(waiter_node* w);
     166                 :     inline std::size_t update_timer(implementation& impl, time_point new_time);
     167                 :     inline void insert_waiter(implementation& impl, waiter_node* w);
     168                 :     inline std::size_t cancel_timer(implementation& impl);
     169                 :     inline void cancel_waiter(waiter_node* w);
     170                 :     inline std::size_t cancel_one_waiter(implementation& impl);
     171                 :     inline std::size_t process_expired();
     172                 : 
     173                 : private:
     174           97208 :     inline void refresh_cached_nearest() noexcept
     175                 :     {
     176           97208 :         auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
     177           96818 :                                 : heap_[0].time_.time_since_epoch().count();
     178           97208 :         cached_nearest_ns_.store(ns, std::memory_order_release);
     179           97208 :     }
     180                 : 
     181                 :     inline void remove_timer_impl(implementation& impl);
     182                 :     inline void up_heap(std::size_t index);
     183                 :     inline void down_heap(std::size_t index);
     184                 :     inline void swap_heap(std::size_t i1, std::size_t i2);
     185                 : };
     186                 : 
     187                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
     188                 :     : intrusive_list<waiter_node>::node
     189                 : {
     190                 :     // Embedded completion op — avoids heap allocation per fire/cancel
     191                 :     struct completion_op final : scheduler_op
     192                 :     {
     193                 :         waiter_node* waiter_ = nullptr;
     194                 : 
     195                 :         static void do_complete(
     196                 :             void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
     197                 : 
     198             142 :         completion_op() noexcept : scheduler_op(&do_complete) {}
     199                 : 
     200                 :         void operator()() override;
     201                 :         // No-op — lifetime owned by waiter_node, not the scheduler queue
     202 MIS           0 :         void destroy() override {}
     203                 :     };
     204                 : 
     205                 :     // Per-waiter stop_token cancellation
     206                 :     struct canceller
     207                 :     {
     208                 :         waiter_node* waiter_;
     209                 :         void operator()() const;
     210                 :     };
     211                 : 
     212                 :     // nullptr once removed from timer's waiter list (concurrency marker)
     213                 :     timer_service::implementation* impl_ = nullptr;
     214                 :     timer_service* svc_                  = nullptr;
     215                 :     std::coroutine_handle<> h_;
     216                 :     capy::executor_ref d_;
     217                 :     std::error_code* ec_out_ = nullptr;
     218                 :     std::stop_token token_;
     219                 :     std::optional<std::stop_callback<canceller>> stop_cb_;
     220                 :     completion_op op_;
     221                 :     std::error_code ec_value_;
     222                 :     waiter_node* next_free_ = nullptr;
     223                 : 
     224 HIT         142 :     waiter_node() noexcept
     225             142 :     {
     226             142 :         op_.waiter_ = this;
     227             142 :     }
     228                 : };
     229                 : 
     230                 : struct timer_service::implementation final : timer::implementation
     231                 : {
     232                 :     using clock_type = std::chrono::steady_clock;
     233                 :     using time_point = clock_type::time_point;
     234                 :     using duration   = clock_type::duration;
     235                 : 
     236                 :     timer_service* svc_ = nullptr;
     237                 :     intrusive_list<waiter_node> waiters_;
     238                 : 
     239                 :     // Free list linkage (reused when impl is on free_list)
     240                 :     implementation* next_free_ = nullptr;
     241                 : 
     242                 :     inline explicit implementation(timer_service& svc) noexcept;
     243                 : 
     244                 :     inline std::coroutine_handle<> wait(
     245                 :         std::coroutine_handle<>,
     246                 :         capy::executor_ref,
     247                 :         std::stop_token,
     248                 :         std::error_code*) override;
     249                 : };
     250                 : 
     251                 : // Thread-local caches avoid hot-path mutex acquisitions:
     252                 : // 1. Impl cache — single-slot, validated by comparing svc_
     253                 : // 2. Waiter cache — single-slot, no service affinity
     254                 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
     255                 : 
     256                 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
     257                 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
     258                 : 
     259                 : inline timer_service::implementation*
     260            8613 : try_pop_tl_cache(timer_service* svc) noexcept
     261                 : {
     262            8613 :     auto* impl = tl_cached_impl.get();
     263            8613 :     if (impl)
     264                 :     {
     265            8440 :         tl_cached_impl.set(nullptr);
     266            8440 :         if (impl->svc_ == svc)
     267            8440 :             return impl;
     268                 :         // Stale impl from a destroyed service
     269 MIS           0 :         delete impl;
     270                 :     }
     271 HIT         173 :     return nullptr;
     272                 : }
     273                 : 
     274                 : inline bool
     275            8613 : try_push_tl_cache(timer_service::implementation* impl) noexcept
     276                 : {
     277            8613 :     if (!tl_cached_impl.get())
     278                 :     {
     279            8565 :         tl_cached_impl.set(impl);
     280            8565 :         return true;
     281                 :     }
     282              48 :     return false;
     283                 : }
     284                 : 
     285                 : inline waiter_node*
     286            8327 : try_pop_waiter_tl_cache() noexcept
     287                 : {
     288            8327 :     auto* w = tl_cached_waiter.get();
     289            8327 :     if (w)
     290                 :     {
     291            8185 :         tl_cached_waiter.set(nullptr);
     292            8185 :         return w;
     293                 :     }
     294             142 :     return nullptr;
     295                 : }
     296                 : 
     297                 : inline bool
     298            8327 : try_push_waiter_tl_cache(waiter_node* w) noexcept
     299                 : {
     300            8327 :     if (!tl_cached_waiter.get())
     301                 :     {
     302            8269 :         tl_cached_waiter.set(w);
     303            8269 :         return true;
     304                 :     }
     305              58 :     return false;
     306                 : }
     307                 : 
     308                 : inline void
     309             340 : timer_service_invalidate_cache() noexcept
     310                 : {
     311             340 :     delete tl_cached_impl.get();
     312             340 :     tl_cached_impl.set(nullptr);
     313                 : 
     314             340 :     delete tl_cached_waiter.get();
     315             340 :     tl_cached_waiter.set(nullptr);
     316             340 : }
     317                 : 
     318                 : // timer_service out-of-class member function definitions
     319                 : 
     320             173 : inline timer_service::implementation::implementation(
     321             173 :     timer_service& svc) noexcept
     322             173 :     : svc_(&svc)
     323                 : {
     324             173 : }
     325                 : 
     326                 : inline void
     327             340 : timer_service::shutdown()
     328                 : {
     329             340 :     timer_service_invalidate_cache();
     330                 : 
     331                 :     // Cancel waiting timers still in the heap
     332             340 :     for (auto& entry : heap_)
     333                 :     {
     334 MIS           0 :         auto* impl = entry.timer_;
     335               0 :         while (auto* w = impl->waiters_.pop_front())
     336                 :         {
     337               0 :             w->stop_cb_.reset();
     338               0 :             w->h_.destroy();
     339               0 :             sched_->work_finished();
     340               0 :             delete w;
     341               0 :         }
     342               0 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     343               0 :         delete impl;
     344                 :     }
     345 HIT         340 :     heap_.clear();
     346             340 :     cached_nearest_ns_.store(
     347                 :         (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
     348                 : 
     349                 :     // Delete free-listed impls
     350             388 :     while (free_list_)
     351                 :     {
     352              48 :         auto* next = free_list_->next_free_;
     353              48 :         delete free_list_;
     354              48 :         free_list_ = next;
     355                 :     }
     356                 : 
     357                 :     // Delete free-listed waiters
     358             398 :     while (waiter_free_list_)
     359                 :     {
     360              58 :         auto* next = waiter_free_list_->next_free_;
     361              58 :         delete waiter_free_list_;
     362              58 :         waiter_free_list_ = next;
     363                 :     }
     364             340 : }
     365                 : 
     366                 : inline io_object::implementation*
     367            8613 : timer_service::construct()
     368                 : {
     369            8613 :     implementation* impl = try_pop_tl_cache(this);
     370            8613 :     if (impl)
     371                 :     {
     372            8440 :         impl->svc_        = this;
     373            8440 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     374            8440 :         impl->might_have_pending_waits_ = false;
     375            8440 :         return impl;
     376                 :     }
     377                 : 
     378             173 :     std::lock_guard lock(mutex_);
     379             173 :     if (free_list_)
     380                 :     {
     381 MIS           0 :         impl              = free_list_;
     382               0 :         free_list_        = impl->next_free_;
     383               0 :         impl->next_free_  = nullptr;
     384               0 :         impl->svc_        = this;
     385               0 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     386               0 :         impl->might_have_pending_waits_ = false;
     387                 :     }
     388                 :     else
     389                 :     {
     390 HIT         173 :         impl = new implementation(*this);
     391                 :     }
     392             173 :     return impl;
     393             173 : }
     394                 : 
     395                 : inline void
     396            8613 : timer_service::destroy(io_object::implementation* p)
     397                 : {
     398            8613 :     destroy_impl(static_cast<implementation&>(*p));
     399            8613 : }
     400                 : 
     401                 : inline void
     402            8613 : timer_service::destroy_impl(implementation& impl)
     403                 : {
     404            8613 :     cancel_timer(impl);
     405                 : 
     406            8613 :     if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
     407                 :     {
     408 MIS           0 :         std::lock_guard lock(mutex_);
     409               0 :         remove_timer_impl(impl);
     410               0 :         refresh_cached_nearest();
     411               0 :     }
     412                 : 
     413 HIT        8613 :     if (try_push_tl_cache(&impl))
     414            8565 :         return;
     415                 : 
     416              48 :     std::lock_guard lock(mutex_);
     417              48 :     impl.next_free_ = free_list_;
     418              48 :     free_list_      = &impl;
     419              48 : }
     420                 : 
     421                 : inline waiter_node*
     422            8327 : timer_service::create_waiter()
     423                 : {
     424            8327 :     if (auto* w = try_pop_waiter_tl_cache())
     425            8185 :         return w;
     426                 : 
     427             142 :     std::lock_guard lock(mutex_);
     428             142 :     if (waiter_free_list_)
     429                 :     {
     430 MIS           0 :         auto* w           = waiter_free_list_;
     431               0 :         waiter_free_list_ = w->next_free_;
     432               0 :         w->next_free_     = nullptr;
     433               0 :         return w;
     434                 :     }
     435                 : 
     436 HIT         142 :     return new waiter_node();
     437             142 : }
     438                 : 
     439                 : inline void
     440            8327 : timer_service::destroy_waiter(waiter_node* w)
     441                 : {
     442            8327 :     if (try_push_waiter_tl_cache(w))
     443            8269 :         return;
     444                 : 
     445              58 :     std::lock_guard lock(mutex_);
     446              58 :     w->next_free_     = waiter_free_list_;
     447              58 :     waiter_free_list_ = w;
     448              58 : }
     449                 : 
     450                 : inline std::size_t
     451               6 : timer_service::update_timer(implementation& impl, time_point new_time)
     452                 : {
     453                 :     bool in_heap =
     454               6 :         (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
     455               6 :     if (!in_heap && impl.waiters_.empty())
     456 MIS           0 :         return 0;
     457                 : 
     458 HIT           6 :     bool notify = false;
     459               6 :     intrusive_list<waiter_node> canceled;
     460                 : 
     461                 :     {
     462               6 :         std::lock_guard lock(mutex_);
     463                 : 
     464              16 :         while (auto* w = impl.waiters_.pop_front())
     465                 :         {
     466              10 :             w->impl_ = nullptr;
     467              10 :             canceled.push_back(w);
     468              10 :         }
     469                 : 
     470               6 :         if (impl.heap_index_ < heap_.size())
     471                 :         {
     472               6 :             time_point old_time           = heap_[impl.heap_index_].time_;
     473               6 :             heap_[impl.heap_index_].time_ = new_time;
     474                 : 
     475               6 :             if (new_time < old_time)
     476               6 :                 up_heap(impl.heap_index_);
     477                 :             else
     478 MIS           0 :                 down_heap(impl.heap_index_);
     479                 : 
     480 HIT           6 :             notify = (impl.heap_index_ == 0);
     481                 :         }
     482                 : 
     483               6 :         refresh_cached_nearest();
     484               6 :     }
     485                 : 
     486               6 :     std::size_t count = 0;
     487              16 :     while (auto* w = canceled.pop_front())
     488                 :     {
     489              10 :         w->ec_value_ = make_error_code(capy::error::canceled);
     490              10 :         sched_->post(&w->op_);
     491              10 :         ++count;
     492              10 :     }
     493                 : 
     494               6 :     if (notify)
     495               6 :         on_earliest_changed_();
     496                 : 
     497               6 :     return count;
     498                 : }
     499                 : 
     500                 : inline void
     501            8327 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
     502                 : {
     503            8327 :     bool notify = false;
     504                 :     {
     505            8327 :         std::lock_guard lock(mutex_);
     506            8327 :         if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
     507                 :         {
     508            8305 :             impl.heap_index_ = heap_.size();
     509            8305 :             heap_.push_back({impl.expiry_, &impl});
     510            8305 :             up_heap(heap_.size() - 1);
     511            8305 :             notify = (impl.heap_index_ == 0);
     512            8305 :             refresh_cached_nearest();
     513                 :         }
     514            8327 :         impl.waiters_.push_back(w);
     515            8327 :     }
     516            8327 :     if (notify)
     517            8291 :         on_earliest_changed_();
     518            8327 : }
     519                 : 
     520                 : inline std::size_t
     521            8621 : timer_service::cancel_timer(implementation& impl)
     522                 : {
     523            8621 :     if (!impl.might_have_pending_waits_)
     524            8605 :         return 0;
     525                 : 
     526                 :     // Not in heap and no waiters — just clear the flag
     527              16 :     if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
     528 MIS           0 :         impl.waiters_.empty())
     529                 :     {
     530               0 :         impl.might_have_pending_waits_ = false;
     531               0 :         return 0;
     532                 :     }
     533                 : 
     534 HIT          16 :     intrusive_list<waiter_node> canceled;
     535                 : 
     536                 :     {
     537              16 :         std::lock_guard lock(mutex_);
     538              16 :         remove_timer_impl(impl);
     539              36 :         while (auto* w = impl.waiters_.pop_front())
     540                 :         {
     541              20 :             w->impl_ = nullptr;
     542              20 :             canceled.push_back(w);
     543              20 :         }
     544              16 :         refresh_cached_nearest();
     545              16 :     }
     546                 : 
     547              16 :     impl.might_have_pending_waits_ = false;
     548                 : 
     549              16 :     std::size_t count = 0;
     550              36 :     while (auto* w = canceled.pop_front())
     551                 :     {
     552              20 :         w->ec_value_ = make_error_code(capy::error::canceled);
     553              20 :         sched_->post(&w->op_);
     554              20 :         ++count;
     555              20 :     }
     556                 : 
     557              16 :     return count;
     558                 : }
     559                 : 
     560                 : inline void
     561               4 : timer_service::cancel_waiter(waiter_node* w)
     562                 : {
     563                 :     {
     564               4 :         std::lock_guard lock(mutex_);
     565                 :         // Already removed by cancel_timer or process_expired
     566               4 :         if (!w->impl_)
     567 MIS           0 :             return;
     568 HIT           4 :         auto* impl = w->impl_;
     569               4 :         w->impl_   = nullptr;
     570               4 :         impl->waiters_.remove(w);
     571               4 :         if (impl->waiters_.empty())
     572                 :         {
     573               2 :             remove_timer_impl(*impl);
     574               2 :             impl->might_have_pending_waits_ = false;
     575                 :         }
     576               4 :         refresh_cached_nearest();
     577               4 :     }
     578                 : 
     579               4 :     w->ec_value_ = make_error_code(capy::error::canceled);
     580               4 :     sched_->post(&w->op_);
     581                 : }
     582                 : 
     583                 : inline std::size_t
     584               2 : timer_service::cancel_one_waiter(implementation& impl)
     585                 : {
     586               2 :     if (!impl.might_have_pending_waits_)
     587 MIS           0 :         return 0;
     588                 : 
     589 HIT           2 :     waiter_node* w = nullptr;
     590                 : 
     591                 :     {
     592               2 :         std::lock_guard lock(mutex_);
     593               2 :         w = impl.waiters_.pop_front();
     594               2 :         if (!w)
     595 MIS           0 :             return 0;
     596 HIT           2 :         w->impl_ = nullptr;
     597               2 :         if (impl.waiters_.empty())
     598                 :         {
     599 MIS           0 :             remove_timer_impl(impl);
     600               0 :             impl.might_have_pending_waits_ = false;
     601                 :         }
     602 HIT           2 :         refresh_cached_nearest();
     603               2 :     }
     604                 : 
     605               2 :     w->ec_value_ = make_error_code(capy::error::canceled);
     606               2 :     sched_->post(&w->op_);
     607               2 :     return 1;
     608                 : }
     609                 : 
     610                 : inline std::size_t
     611           88875 : timer_service::process_expired()
     612                 : {
     613           88875 :     intrusive_list<waiter_node> expired;
     614                 : 
     615                 :     {
     616           88875 :         std::lock_guard lock(mutex_);
     617           88875 :         auto now = clock_type::now();
     618                 : 
     619           97162 :         while (!heap_.empty() && heap_[0].time_ <= now)
     620                 :         {
     621            8287 :             implementation* t = heap_[0].timer_;
     622            8287 :             remove_timer_impl(*t);
     623           16578 :             while (auto* w = t->waiters_.pop_front())
     624                 :             {
     625            8291 :                 w->impl_     = nullptr;
     626            8291 :                 w->ec_value_ = {};
     627            8291 :                 expired.push_back(w);
     628            8291 :             }
     629            8287 :             t->might_have_pending_waits_ = false;
     630                 :         }
     631                 : 
     632           88875 :         refresh_cached_nearest();
     633           88875 :     }
     634                 : 
     635           88875 :     std::size_t count = 0;
     636           97166 :     while (auto* w = expired.pop_front())
     637                 :     {
     638            8291 :         sched_->post(&w->op_);
     639            8291 :         ++count;
     640            8291 :     }
     641                 : 
     642           88875 :     return count;
     643                 : }
     644                 : 
     645                 : inline void
     646            8305 : timer_service::remove_timer_impl(implementation& impl)
     647                 : {
     648            8305 :     std::size_t index = impl.heap_index_;
     649            8305 :     if (index >= heap_.size())
     650 MIS           0 :         return; // Not in heap
     651                 : 
     652 HIT        8305 :     if (index == heap_.size() - 1)
     653                 :     {
     654                 :         // Last element, just pop
     655             101 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     656             101 :         heap_.pop_back();
     657                 :     }
     658                 :     else
     659                 :     {
     660                 :         // Swap with last and reheapify
     661            8204 :         swap_heap(index, heap_.size() - 1);
     662            8204 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     663            8204 :         heap_.pop_back();
     664                 : 
     665            8204 :         if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
     666 MIS           0 :             up_heap(index);
     667                 :         else
     668 HIT        8204 :             down_heap(index);
     669                 :     }
     670                 : }
     671                 : 
     672                 : inline void
     673            8311 : timer_service::up_heap(std::size_t index)
     674                 : {
     675           16503 :     while (index > 0)
     676                 :     {
     677            8206 :         std::size_t parent = (index - 1) / 2;
     678            8206 :         if (!(heap_[index].time_ < heap_[parent].time_))
     679              14 :             break;
     680            8192 :         swap_heap(index, parent);
     681            8192 :         index = parent;
     682                 :     }
     683            8311 : }
     684                 : 
     685                 : inline void
     686            8204 : timer_service::down_heap(std::size_t index)
     687                 : {
     688            8204 :     std::size_t child = index * 2 + 1;
     689            8204 :     while (child < heap_.size())
     690                 :     {
     691               4 :         std::size_t min_child = (child + 1 == heap_.size() ||
     692 MIS           0 :                                  heap_[child].time_ < heap_[child + 1].time_)
     693 HIT           4 :             ? child
     694               4 :             : child + 1;
     695                 : 
     696               4 :         if (heap_[index].time_ < heap_[min_child].time_)
     697               4 :             break;
     698                 : 
     699 MIS           0 :         swap_heap(index, min_child);
     700               0 :         index = min_child;
     701               0 :         child = index * 2 + 1;
     702                 :     }
     703 HIT        8204 : }
     704                 : 
     705                 : inline void
     706           16396 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
     707                 : {
     708           16396 :     heap_entry tmp                = heap_[i1];
     709           16396 :     heap_[i1]                     = heap_[i2];
     710           16396 :     heap_[i2]                     = tmp;
     711           16396 :     heap_[i1].timer_->heap_index_ = i1;
     712           16396 :     heap_[i2].timer_->heap_index_ = i2;
     713           16396 : }
     714                 : 
     715                 : // waiter_node out-of-class member function definitions
     716                 : 
     717                 : inline void
     718               4 : waiter_node::canceller::operator()() const
     719                 : {
     720               4 :     waiter_->svc_->cancel_waiter(waiter_);
     721               4 : }
     722                 : 
     723                 : inline void
     724 MIS           0 : waiter_node::completion_op::do_complete(
     725                 :     void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
     726                 : {
     727               0 :     if (!owner)
     728               0 :         return;
     729               0 :     static_cast<completion_op*>(base)->operator()();
     730                 : }
     731                 : 
     732                 : inline void
     733 HIT        8327 : waiter_node::completion_op::operator()()
     734                 : {
     735            8327 :     auto* w = waiter_;
     736            8327 :     w->stop_cb_.reset();
     737            8327 :     if (w->ec_out_)
     738            8327 :         *w->ec_out_ = w->ec_value_;
     739                 : 
     740            8327 :     auto h      = w->h_;
     741            8327 :     auto d      = w->d_;
     742            8327 :     auto* svc   = w->svc_;
     743            8327 :     auto& sched = svc->get_scheduler();
     744                 : 
     745            8327 :     svc->destroy_waiter(w);
     746                 : 
     747            8327 :     d.post(h);
     748            8327 :     sched.work_finished();
     749            8327 : }
     750                 : 
     751                 : inline std::coroutine_handle<>
     752            8328 : timer_service::implementation::wait(
     753                 :     std::coroutine_handle<> h,
     754                 :     capy::executor_ref d,
     755                 :     std::stop_token token,
     756                 :     std::error_code* ec)
     757                 : {
     758                 :     // Already-expired fast path — no waiter_node, no mutex.
     759                 :     // Post instead of dispatch so the coroutine yields to the
     760                 :     // scheduler, allowing other queued work to run.
     761            8328 :     if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
     762                 :     {
     763            8306 :         if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
     764                 :         {
     765               1 :             if (ec)
     766               1 :                 *ec = {};
     767               1 :             d.post(h);
     768               1 :             return std::noop_coroutine();
     769                 :         }
     770                 :     }
     771                 : 
     772            8327 :     auto* w    = svc_->create_waiter();
     773            8327 :     w->impl_   = this;
     774            8327 :     w->svc_    = svc_;
     775            8327 :     w->h_      = h;
     776            8327 :     w->d_      = d;
     777            8327 :     w->token_  = std::move(token);
     778            8327 :     w->ec_out_ = ec;
     779                 : 
     780            8327 :     svc_->insert_waiter(*this, w);
     781            8327 :     might_have_pending_waits_ = true;
     782            8327 :     svc_->get_scheduler().work_started();
     783                 : 
     784            8327 :     if (w->token_.stop_possible())
     785               4 :         w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
     786                 : 
     787            8327 :     return std::noop_coroutine();
     788                 : }
     789                 : 
     790                 : // Free functions
     791                 : 
     792                 : struct timer_service_access
     793                 : {
     794            8613 :     static native_scheduler& get_scheduler(io_context& ctx) noexcept
     795                 :     {
     796            8613 :         return static_cast<native_scheduler&>(*ctx.sched_);
     797                 :     }
     798                 : };
     799                 : 
     800                 : // Bypass find_service() mutex by reading the scheduler's cached pointer
     801                 : inline io_object::io_service&
     802            8613 : timer_service_direct(capy::execution_context& ctx) noexcept
     803                 : {
     804            8613 :     return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
     805            8613 :                 .timer_svc_;
     806                 : }
     807                 : 
     808                 : inline std::size_t
     809               6 : timer_service_update_expiry(timer::implementation& base)
     810                 : {
     811               6 :     auto& impl = static_cast<timer_service::implementation&>(base);
     812               6 :     return impl.svc_->update_timer(impl, impl.expiry_);
     813                 : }
     814                 : 
     815                 : inline std::size_t
     816               8 : timer_service_cancel(timer::implementation& base) noexcept
     817                 : {
     818               8 :     auto& impl = static_cast<timer_service::implementation&>(base);
     819               8 :     return impl.svc_->cancel_timer(impl);
     820                 : }
     821                 : 
     822                 : inline std::size_t
     823               2 : timer_service_cancel_one(timer::implementation& base) noexcept
     824                 : {
     825               2 :     auto& impl = static_cast<timer_service::implementation&>(base);
     826               2 :     return impl.svc_->cancel_one_waiter(impl);
     827                 : }
     828                 : 
     829                 : inline timer_service&
     830             340 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
     831                 : {
     832             340 :     return ctx.make_service<timer_service>(sched);
     833                 : }
     834                 : 
     835                 : } // namespace boost::corosio::detail
     836                 : 
     837                 : #endif
        

Generated by: LCOV version 2.3