TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/corosio
9 : //
10 :
11 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13 :
14 : #include <boost/corosio/timer.hpp>
15 : #include <boost/corosio/io_context.hpp>
16 : #include <boost/corosio/detail/scheduler_op.hpp>
17 : #include <boost/corosio/native/native_scheduler.hpp>
18 : #include <boost/corosio/detail/intrusive.hpp>
19 : #include <boost/corosio/detail/thread_local_ptr.hpp>
20 : #include <boost/capy/error.hpp>
21 : #include <boost/capy/ex/execution_context.hpp>
22 : #include <boost/capy/ex/executor_ref.hpp>
23 : #include <system_error>
24 :
25 : #include <atomic>
26 : #include <chrono>
27 : #include <coroutine>
28 : #include <cstddef>
29 : #include <limits>
30 : #include <mutex>
31 : #include <optional>
32 : #include <stop_token>
33 : #include <vector>
34 :
35 : namespace boost::corosio::detail {
36 :
37 : struct scheduler;
38 :
39 : /*
40 : Timer Service
41 : =============
42 :
43 : Data Structures
44 : ---------------
45 : waiter_node holds per-waiter state: coroutine handle, executor,
46 : error output, stop_token, embedded completion_op. Each concurrent
47 : co_await t.wait() allocates one waiter_node.
48 :
49 : timer_service::implementation holds per-timer state: expiry,
50 : heap index, and an intrusive_list of waiter_nodes. Multiple
51 : coroutines can wait on the same timer simultaneously.
52 :
53 : timer_service owns a min-heap of active timers, a free list
54 : of recycled impls, and a free list of recycled waiter_nodes. The
55 : heap is ordered by expiry time; the scheduler queries
56 : nearest_expiry() to set the epoll/timerfd timeout.
57 :
58 : Optimization Strategy
59 : ---------------------
60 : 1. Deferred heap insertion — expires_after() stores the expiry
61 : but does not insert into the heap. Insertion happens in wait().
62 : 2. Thread-local impl cache — single-slot per-thread cache.
63 : 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64 : 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65 : 5. might_have_pending_waits_ flag — skips lock when no wait issued.
66 : 6. Thread-local waiter cache — single-slot per-thread cache.
67 :
68 : Concurrency
69 : -----------
70 : stop_token callbacks can fire from any thread. The impl_
71 : pointer on waiter_node is used as a "still in list" marker.
72 : */
73 :
74 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75 :
76 : inline void timer_service_invalidate_cache() noexcept;
77 :
78 : // timer_service class body — member function definitions are
79 : // out-of-class (after implementation and waiter_node are complete)
80 : class BOOST_COROSIO_DECL timer_service final
81 : : public capy::execution_context::service
82 : , public io_object::io_service
83 : {
84 : public:
85 : using clock_type = std::chrono::steady_clock;
86 : using time_point = clock_type::time_point;
87 :
88 : class callback
89 : {
90 : void* ctx_ = nullptr;
91 : void (*fn_)(void*) = nullptr;
92 :
93 : public:
94 HIT 340 : callback() = default;
95 340 : callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
96 :
97 : explicit operator bool() const noexcept
98 : {
99 : return fn_ != nullptr;
100 : }
101 8297 : void operator()() const
102 : {
103 8297 : if (fn_)
104 8297 : fn_(ctx_);
105 8297 : }
106 : };
107 :
108 : struct implementation;
109 :
110 : private:
111 : struct heap_entry
112 : {
113 : time_point time_;
114 : implementation* timer_;
115 : };
116 :
117 : scheduler* sched_ = nullptr;
118 : mutable std::mutex mutex_;
119 : std::vector<heap_entry> heap_;
120 : implementation* free_list_ = nullptr;
121 : waiter_node* waiter_free_list_ = nullptr;
122 : callback on_earliest_changed_;
123 : // Avoids mutex in nearest_expiry() and empty()
124 : mutable std::atomic<std::int64_t> cached_nearest_ns_{
125 : (std::numeric_limits<std::int64_t>::max)()};
126 :
127 : public:
128 340 : inline timer_service(capy::execution_context&, scheduler& sched)
129 340 : : sched_(&sched)
130 : {
131 340 : }
132 :
133 16654 : inline scheduler& get_scheduler() noexcept
134 : {
135 16654 : return *sched_;
136 : }
137 :
138 680 : ~timer_service() override = default;
139 :
140 : timer_service(timer_service const&) = delete;
141 : timer_service& operator=(timer_service const&) = delete;
142 :
143 340 : inline void set_on_earliest_changed(callback cb)
144 : {
145 340 : on_earliest_changed_ = cb;
146 340 : }
147 :
148 : inline bool empty() const noexcept
149 : {
150 : return cached_nearest_ns_.load(std::memory_order_acquire) ==
151 : (std::numeric_limits<std::int64_t>::max)();
152 : }
153 :
154 19779 : inline time_point nearest_expiry() const noexcept
155 : {
156 19779 : auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
157 19779 : return time_point(time_point::duration(ns));
158 : }
159 :
160 : inline void shutdown() override;
161 : inline io_object::implementation* construct() override;
162 : inline void destroy(io_object::implementation* p) override;
163 : inline void destroy_impl(implementation& impl);
164 : inline waiter_node* create_waiter();
165 : inline void destroy_waiter(waiter_node* w);
166 : inline std::size_t update_timer(implementation& impl, time_point new_time);
167 : inline void insert_waiter(implementation& impl, waiter_node* w);
168 : inline std::size_t cancel_timer(implementation& impl);
169 : inline void cancel_waiter(waiter_node* w);
170 : inline std::size_t cancel_one_waiter(implementation& impl);
171 : inline std::size_t process_expired();
172 :
173 : private:
174 97208 : inline void refresh_cached_nearest() noexcept
175 : {
176 97208 : auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
177 96818 : : heap_[0].time_.time_since_epoch().count();
178 97208 : cached_nearest_ns_.store(ns, std::memory_order_release);
179 97208 : }
180 :
181 : inline void remove_timer_impl(implementation& impl);
182 : inline void up_heap(std::size_t index);
183 : inline void down_heap(std::size_t index);
184 : inline void swap_heap(std::size_t i1, std::size_t i2);
185 : };
186 :
187 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
188 : : intrusive_list<waiter_node>::node
189 : {
190 : // Embedded completion op — avoids heap allocation per fire/cancel
191 : struct completion_op final : scheduler_op
192 : {
193 : waiter_node* waiter_ = nullptr;
194 :
195 : static void do_complete(
196 : void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
197 :
198 142 : completion_op() noexcept : scheduler_op(&do_complete) {}
199 :
200 : void operator()() override;
201 : // No-op — lifetime owned by waiter_node, not the scheduler queue
202 MIS 0 : void destroy() override {}
203 : };
204 :
205 : // Per-waiter stop_token cancellation
206 : struct canceller
207 : {
208 : waiter_node* waiter_;
209 : void operator()() const;
210 : };
211 :
212 : // nullptr once removed from timer's waiter list (concurrency marker)
213 : timer_service::implementation* impl_ = nullptr;
214 : timer_service* svc_ = nullptr;
215 : std::coroutine_handle<> h_;
216 : capy::executor_ref d_;
217 : std::error_code* ec_out_ = nullptr;
218 : std::stop_token token_;
219 : std::optional<std::stop_callback<canceller>> stop_cb_;
220 : completion_op op_;
221 : std::error_code ec_value_;
222 : waiter_node* next_free_ = nullptr;
223 :
224 HIT 142 : waiter_node() noexcept
225 142 : {
226 142 : op_.waiter_ = this;
227 142 : }
228 : };
229 :
230 : struct timer_service::implementation final : timer::implementation
231 : {
232 : using clock_type = std::chrono::steady_clock;
233 : using time_point = clock_type::time_point;
234 : using duration = clock_type::duration;
235 :
236 : timer_service* svc_ = nullptr;
237 : intrusive_list<waiter_node> waiters_;
238 :
239 : // Free list linkage (reused when impl is on free_list)
240 : implementation* next_free_ = nullptr;
241 :
242 : inline explicit implementation(timer_service& svc) noexcept;
243 :
244 : inline std::coroutine_handle<> wait(
245 : std::coroutine_handle<>,
246 : capy::executor_ref,
247 : std::stop_token,
248 : std::error_code*) override;
249 : };
250 :
251 : // Thread-local caches avoid hot-path mutex acquisitions:
252 : // 1. Impl cache — single-slot, validated by comparing svc_
253 : // 2. Waiter cache — single-slot, no service affinity
254 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
255 :
256 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
257 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
258 :
259 : inline timer_service::implementation*
260 8613 : try_pop_tl_cache(timer_service* svc) noexcept
261 : {
262 8613 : auto* impl = tl_cached_impl.get();
263 8613 : if (impl)
264 : {
265 8440 : tl_cached_impl.set(nullptr);
266 8440 : if (impl->svc_ == svc)
267 8440 : return impl;
268 : // Stale impl from a destroyed service
269 MIS 0 : delete impl;
270 : }
271 HIT 173 : return nullptr;
272 : }
273 :
274 : inline bool
275 8613 : try_push_tl_cache(timer_service::implementation* impl) noexcept
276 : {
277 8613 : if (!tl_cached_impl.get())
278 : {
279 8565 : tl_cached_impl.set(impl);
280 8565 : return true;
281 : }
282 48 : return false;
283 : }
284 :
285 : inline waiter_node*
286 8327 : try_pop_waiter_tl_cache() noexcept
287 : {
288 8327 : auto* w = tl_cached_waiter.get();
289 8327 : if (w)
290 : {
291 8185 : tl_cached_waiter.set(nullptr);
292 8185 : return w;
293 : }
294 142 : return nullptr;
295 : }
296 :
297 : inline bool
298 8327 : try_push_waiter_tl_cache(waiter_node* w) noexcept
299 : {
300 8327 : if (!tl_cached_waiter.get())
301 : {
302 8269 : tl_cached_waiter.set(w);
303 8269 : return true;
304 : }
305 58 : return false;
306 : }
307 :
308 : inline void
309 340 : timer_service_invalidate_cache() noexcept
310 : {
311 340 : delete tl_cached_impl.get();
312 340 : tl_cached_impl.set(nullptr);
313 :
314 340 : delete tl_cached_waiter.get();
315 340 : tl_cached_waiter.set(nullptr);
316 340 : }
317 :
318 : // timer_service out-of-class member function definitions
319 :
320 173 : inline timer_service::implementation::implementation(
321 173 : timer_service& svc) noexcept
322 173 : : svc_(&svc)
323 : {
324 173 : }
325 :
326 : inline void
327 340 : timer_service::shutdown()
328 : {
329 340 : timer_service_invalidate_cache();
330 :
331 : // Cancel waiting timers still in the heap
332 340 : for (auto& entry : heap_)
333 : {
334 MIS 0 : auto* impl = entry.timer_;
335 0 : while (auto* w = impl->waiters_.pop_front())
336 : {
337 0 : w->stop_cb_.reset();
338 0 : w->h_.destroy();
339 0 : sched_->work_finished();
340 0 : delete w;
341 0 : }
342 0 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
343 0 : delete impl;
344 : }
345 HIT 340 : heap_.clear();
346 340 : cached_nearest_ns_.store(
347 : (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
348 :
349 : // Delete free-listed impls
350 388 : while (free_list_)
351 : {
352 48 : auto* next = free_list_->next_free_;
353 48 : delete free_list_;
354 48 : free_list_ = next;
355 : }
356 :
357 : // Delete free-listed waiters
358 398 : while (waiter_free_list_)
359 : {
360 58 : auto* next = waiter_free_list_->next_free_;
361 58 : delete waiter_free_list_;
362 58 : waiter_free_list_ = next;
363 : }
364 340 : }
365 :
366 : inline io_object::implementation*
367 8613 : timer_service::construct()
368 : {
369 8613 : implementation* impl = try_pop_tl_cache(this);
370 8613 : if (impl)
371 : {
372 8440 : impl->svc_ = this;
373 8440 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
374 8440 : impl->might_have_pending_waits_ = false;
375 8440 : return impl;
376 : }
377 :
378 173 : std::lock_guard lock(mutex_);
379 173 : if (free_list_)
380 : {
381 MIS 0 : impl = free_list_;
382 0 : free_list_ = impl->next_free_;
383 0 : impl->next_free_ = nullptr;
384 0 : impl->svc_ = this;
385 0 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
386 0 : impl->might_have_pending_waits_ = false;
387 : }
388 : else
389 : {
390 HIT 173 : impl = new implementation(*this);
391 : }
392 173 : return impl;
393 173 : }
394 :
395 : inline void
396 8613 : timer_service::destroy(io_object::implementation* p)
397 : {
398 8613 : destroy_impl(static_cast<implementation&>(*p));
399 8613 : }
400 :
401 : inline void
402 8613 : timer_service::destroy_impl(implementation& impl)
403 : {
404 8613 : cancel_timer(impl);
405 :
406 8613 : if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
407 : {
408 MIS 0 : std::lock_guard lock(mutex_);
409 0 : remove_timer_impl(impl);
410 0 : refresh_cached_nearest();
411 0 : }
412 :
413 HIT 8613 : if (try_push_tl_cache(&impl))
414 8565 : return;
415 :
416 48 : std::lock_guard lock(mutex_);
417 48 : impl.next_free_ = free_list_;
418 48 : free_list_ = &impl;
419 48 : }
420 :
421 : inline waiter_node*
422 8327 : timer_service::create_waiter()
423 : {
424 8327 : if (auto* w = try_pop_waiter_tl_cache())
425 8185 : return w;
426 :
427 142 : std::lock_guard lock(mutex_);
428 142 : if (waiter_free_list_)
429 : {
430 MIS 0 : auto* w = waiter_free_list_;
431 0 : waiter_free_list_ = w->next_free_;
432 0 : w->next_free_ = nullptr;
433 0 : return w;
434 : }
435 :
436 HIT 142 : return new waiter_node();
437 142 : }
438 :
439 : inline void
440 8327 : timer_service::destroy_waiter(waiter_node* w)
441 : {
442 8327 : if (try_push_waiter_tl_cache(w))
443 8269 : return;
444 :
445 58 : std::lock_guard lock(mutex_);
446 58 : w->next_free_ = waiter_free_list_;
447 58 : waiter_free_list_ = w;
448 58 : }
449 :
450 : inline std::size_t
451 6 : timer_service::update_timer(implementation& impl, time_point new_time)
452 : {
453 : bool in_heap =
454 6 : (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
455 6 : if (!in_heap && impl.waiters_.empty())
456 MIS 0 : return 0;
457 :
458 HIT 6 : bool notify = false;
459 6 : intrusive_list<waiter_node> canceled;
460 :
461 : {
462 6 : std::lock_guard lock(mutex_);
463 :
464 16 : while (auto* w = impl.waiters_.pop_front())
465 : {
466 10 : w->impl_ = nullptr;
467 10 : canceled.push_back(w);
468 10 : }
469 :
470 6 : if (impl.heap_index_ < heap_.size())
471 : {
472 6 : time_point old_time = heap_[impl.heap_index_].time_;
473 6 : heap_[impl.heap_index_].time_ = new_time;
474 :
475 6 : if (new_time < old_time)
476 6 : up_heap(impl.heap_index_);
477 : else
478 MIS 0 : down_heap(impl.heap_index_);
479 :
480 HIT 6 : notify = (impl.heap_index_ == 0);
481 : }
482 :
483 6 : refresh_cached_nearest();
484 6 : }
485 :
486 6 : std::size_t count = 0;
487 16 : while (auto* w = canceled.pop_front())
488 : {
489 10 : w->ec_value_ = make_error_code(capy::error::canceled);
490 10 : sched_->post(&w->op_);
491 10 : ++count;
492 10 : }
493 :
494 6 : if (notify)
495 6 : on_earliest_changed_();
496 :
497 6 : return count;
498 : }
499 :
500 : inline void
501 8327 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
502 : {
503 8327 : bool notify = false;
504 : {
505 8327 : std::lock_guard lock(mutex_);
506 8327 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
507 : {
508 8305 : impl.heap_index_ = heap_.size();
509 8305 : heap_.push_back({impl.expiry_, &impl});
510 8305 : up_heap(heap_.size() - 1);
511 8305 : notify = (impl.heap_index_ == 0);
512 8305 : refresh_cached_nearest();
513 : }
514 8327 : impl.waiters_.push_back(w);
515 8327 : }
516 8327 : if (notify)
517 8291 : on_earliest_changed_();
518 8327 : }
519 :
520 : inline std::size_t
521 8621 : timer_service::cancel_timer(implementation& impl)
522 : {
523 8621 : if (!impl.might_have_pending_waits_)
524 8605 : return 0;
525 :
526 : // Not in heap and no waiters — just clear the flag
527 16 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
528 MIS 0 : impl.waiters_.empty())
529 : {
530 0 : impl.might_have_pending_waits_ = false;
531 0 : return 0;
532 : }
533 :
534 HIT 16 : intrusive_list<waiter_node> canceled;
535 :
536 : {
537 16 : std::lock_guard lock(mutex_);
538 16 : remove_timer_impl(impl);
539 36 : while (auto* w = impl.waiters_.pop_front())
540 : {
541 20 : w->impl_ = nullptr;
542 20 : canceled.push_back(w);
543 20 : }
544 16 : refresh_cached_nearest();
545 16 : }
546 :
547 16 : impl.might_have_pending_waits_ = false;
548 :
549 16 : std::size_t count = 0;
550 36 : while (auto* w = canceled.pop_front())
551 : {
552 20 : w->ec_value_ = make_error_code(capy::error::canceled);
553 20 : sched_->post(&w->op_);
554 20 : ++count;
555 20 : }
556 :
557 16 : return count;
558 : }
559 :
560 : inline void
561 4 : timer_service::cancel_waiter(waiter_node* w)
562 : {
563 : {
564 4 : std::lock_guard lock(mutex_);
565 : // Already removed by cancel_timer or process_expired
566 4 : if (!w->impl_)
567 MIS 0 : return;
568 HIT 4 : auto* impl = w->impl_;
569 4 : w->impl_ = nullptr;
570 4 : impl->waiters_.remove(w);
571 4 : if (impl->waiters_.empty())
572 : {
573 2 : remove_timer_impl(*impl);
574 2 : impl->might_have_pending_waits_ = false;
575 : }
576 4 : refresh_cached_nearest();
577 4 : }
578 :
579 4 : w->ec_value_ = make_error_code(capy::error::canceled);
580 4 : sched_->post(&w->op_);
581 : }
582 :
583 : inline std::size_t
584 2 : timer_service::cancel_one_waiter(implementation& impl)
585 : {
586 2 : if (!impl.might_have_pending_waits_)
587 MIS 0 : return 0;
588 :
589 HIT 2 : waiter_node* w = nullptr;
590 :
591 : {
592 2 : std::lock_guard lock(mutex_);
593 2 : w = impl.waiters_.pop_front();
594 2 : if (!w)
595 MIS 0 : return 0;
596 HIT 2 : w->impl_ = nullptr;
597 2 : if (impl.waiters_.empty())
598 : {
599 MIS 0 : remove_timer_impl(impl);
600 0 : impl.might_have_pending_waits_ = false;
601 : }
602 HIT 2 : refresh_cached_nearest();
603 2 : }
604 :
605 2 : w->ec_value_ = make_error_code(capy::error::canceled);
606 2 : sched_->post(&w->op_);
607 2 : return 1;
608 : }
609 :
610 : inline std::size_t
611 88875 : timer_service::process_expired()
612 : {
613 88875 : intrusive_list<waiter_node> expired;
614 :
615 : {
616 88875 : std::lock_guard lock(mutex_);
617 88875 : auto now = clock_type::now();
618 :
619 97162 : while (!heap_.empty() && heap_[0].time_ <= now)
620 : {
621 8287 : implementation* t = heap_[0].timer_;
622 8287 : remove_timer_impl(*t);
623 16578 : while (auto* w = t->waiters_.pop_front())
624 : {
625 8291 : w->impl_ = nullptr;
626 8291 : w->ec_value_ = {};
627 8291 : expired.push_back(w);
628 8291 : }
629 8287 : t->might_have_pending_waits_ = false;
630 : }
631 :
632 88875 : refresh_cached_nearest();
633 88875 : }
634 :
635 88875 : std::size_t count = 0;
636 97166 : while (auto* w = expired.pop_front())
637 : {
638 8291 : sched_->post(&w->op_);
639 8291 : ++count;
640 8291 : }
641 :
642 88875 : return count;
643 : }
644 :
645 : inline void
646 8305 : timer_service::remove_timer_impl(implementation& impl)
647 : {
648 8305 : std::size_t index = impl.heap_index_;
649 8305 : if (index >= heap_.size())
650 MIS 0 : return; // Not in heap
651 :
652 HIT 8305 : if (index == heap_.size() - 1)
653 : {
654 : // Last element, just pop
655 101 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
656 101 : heap_.pop_back();
657 : }
658 : else
659 : {
660 : // Swap with last and reheapify
661 8204 : swap_heap(index, heap_.size() - 1);
662 8204 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
663 8204 : heap_.pop_back();
664 :
665 8204 : if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
666 MIS 0 : up_heap(index);
667 : else
668 HIT 8204 : down_heap(index);
669 : }
670 : }
671 :
672 : inline void
673 8311 : timer_service::up_heap(std::size_t index)
674 : {
675 16503 : while (index > 0)
676 : {
677 8206 : std::size_t parent = (index - 1) / 2;
678 8206 : if (!(heap_[index].time_ < heap_[parent].time_))
679 14 : break;
680 8192 : swap_heap(index, parent);
681 8192 : index = parent;
682 : }
683 8311 : }
684 :
685 : inline void
686 8204 : timer_service::down_heap(std::size_t index)
687 : {
688 8204 : std::size_t child = index * 2 + 1;
689 8204 : while (child < heap_.size())
690 : {
691 4 : std::size_t min_child = (child + 1 == heap_.size() ||
692 MIS 0 : heap_[child].time_ < heap_[child + 1].time_)
693 HIT 4 : ? child
694 4 : : child + 1;
695 :
696 4 : if (heap_[index].time_ < heap_[min_child].time_)
697 4 : break;
698 :
699 MIS 0 : swap_heap(index, min_child);
700 0 : index = min_child;
701 0 : child = index * 2 + 1;
702 : }
703 HIT 8204 : }
704 :
705 : inline void
706 16396 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
707 : {
708 16396 : heap_entry tmp = heap_[i1];
709 16396 : heap_[i1] = heap_[i2];
710 16396 : heap_[i2] = tmp;
711 16396 : heap_[i1].timer_->heap_index_ = i1;
712 16396 : heap_[i2].timer_->heap_index_ = i2;
713 16396 : }
714 :
715 : // waiter_node out-of-class member function definitions
716 :
717 : inline void
718 4 : waiter_node::canceller::operator()() const
719 : {
720 4 : waiter_->svc_->cancel_waiter(waiter_);
721 4 : }
722 :
723 : inline void
724 MIS 0 : waiter_node::completion_op::do_complete(
725 : void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
726 : {
727 0 : if (!owner)
728 0 : return;
729 0 : static_cast<completion_op*>(base)->operator()();
730 : }
731 :
732 : inline void
733 HIT 8327 : waiter_node::completion_op::operator()()
734 : {
735 8327 : auto* w = waiter_;
736 8327 : w->stop_cb_.reset();
737 8327 : if (w->ec_out_)
738 8327 : *w->ec_out_ = w->ec_value_;
739 :
740 8327 : auto h = w->h_;
741 8327 : auto d = w->d_;
742 8327 : auto* svc = w->svc_;
743 8327 : auto& sched = svc->get_scheduler();
744 :
745 8327 : svc->destroy_waiter(w);
746 :
747 8327 : d.post(h);
748 8327 : sched.work_finished();
749 8327 : }
750 :
751 : inline std::coroutine_handle<>
752 8328 : timer_service::implementation::wait(
753 : std::coroutine_handle<> h,
754 : capy::executor_ref d,
755 : std::stop_token token,
756 : std::error_code* ec)
757 : {
758 : // Already-expired fast path — no waiter_node, no mutex.
759 : // Post instead of dispatch so the coroutine yields to the
760 : // scheduler, allowing other queued work to run.
761 8328 : if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
762 : {
763 8306 : if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
764 : {
765 1 : if (ec)
766 1 : *ec = {};
767 1 : d.post(h);
768 1 : return std::noop_coroutine();
769 : }
770 : }
771 :
772 8327 : auto* w = svc_->create_waiter();
773 8327 : w->impl_ = this;
774 8327 : w->svc_ = svc_;
775 8327 : w->h_ = h;
776 8327 : w->d_ = d;
777 8327 : w->token_ = std::move(token);
778 8327 : w->ec_out_ = ec;
779 :
780 8327 : svc_->insert_waiter(*this, w);
781 8327 : might_have_pending_waits_ = true;
782 8327 : svc_->get_scheduler().work_started();
783 :
784 8327 : if (w->token_.stop_possible())
785 4 : w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
786 :
787 8327 : return std::noop_coroutine();
788 : }
789 :
790 : // Free functions
791 :
792 : struct timer_service_access
793 : {
794 8613 : static native_scheduler& get_scheduler(io_context& ctx) noexcept
795 : {
796 8613 : return static_cast<native_scheduler&>(*ctx.sched_);
797 : }
798 : };
799 :
800 : // Bypass find_service() mutex by reading the scheduler's cached pointer
801 : inline io_object::io_service&
802 8613 : timer_service_direct(capy::execution_context& ctx) noexcept
803 : {
804 8613 : return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
805 8613 : .timer_svc_;
806 : }
807 :
808 : inline std::size_t
809 6 : timer_service_update_expiry(timer::implementation& base)
810 : {
811 6 : auto& impl = static_cast<timer_service::implementation&>(base);
812 6 : return impl.svc_->update_timer(impl, impl.expiry_);
813 : }
814 :
815 : inline std::size_t
816 8 : timer_service_cancel(timer::implementation& base) noexcept
817 : {
818 8 : auto& impl = static_cast<timer_service::implementation&>(base);
819 8 : return impl.svc_->cancel_timer(impl);
820 : }
821 :
822 : inline std::size_t
823 2 : timer_service_cancel_one(timer::implementation& base) noexcept
824 : {
825 2 : auto& impl = static_cast<timer_service::implementation&>(base);
826 2 : return impl.svc_->cancel_one_waiter(impl);
827 : }
828 :
829 : inline timer_service&
830 340 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
831 : {
832 340 : return ctx.make_service<timer_service>(sched);
833 : }
834 :
835 : } // namespace boost::corosio::detail
836 :
837 : #endif
|