LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 74.4 % 129 96 33
Test Date: 2026-02-18 17:16:27 Functions: 84.2 % 19 16 3

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/corosio/io/io_object.hpp>
      19                 : #include <boost/corosio/endpoint.hpp>
      20                 : #include <boost/capy/ex/executor_ref.hpp>
      21                 : #include <coroutine>
      22                 : #include <boost/capy/error.hpp>
      23                 : #include <system_error>
      24                 : 
      25                 : #include <boost/corosio/detail/make_err.hpp>
      26                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      27                 : #include <boost/corosio/detail/scheduler_op.hpp>
      28                 : #include <boost/corosio/detail/endpoint_convert.hpp>
      29                 : 
      30                 : #include <unistd.h>
      31                 : #include <errno.h>
      32                 : #include <fcntl.h>
      33                 : 
      34                 : #include <atomic>
      35                 : #include <cstddef>
      36                 : #include <memory>
      37                 : #include <optional>
      38                 : #include <stop_token>
      39                 : 
      40                 : #include <netinet/in.h>
      41                 : #include <sys/select.h>
      42                 : #include <sys/socket.h>
      43                 : #include <sys/uio.h>
      44                 : 
      45                 : /*
      46                 :     select Operation State
      47                 :     ======================
      48                 : 
      49                 :     Each async I/O operation has a corresponding select_op-derived struct that
      50                 :     holds the operation's state while it's in flight. The socket impl owns
      51                 :     fixed slots for each operation type (conn_, rd_, wr_), so only one
      52                 :     operation of each type can be pending per socket at a time.
      53                 : 
      54                 :     This mirrors the epoll_op design for consistency across backends.
      55                 : 
      56                 :     Completion vs Cancellation Race
      57                 :     -------------------------------
      58                 :     The `registered` atomic uses a tri-state (unregistered, registering,
      59                 :     registered) to handle two races: (1) between register_fd() and the
      60                 :     reactor seeing an event, and (2) between reactor completion and cancel().
      61                 : 
      62                 :     The registering state closes the window where an event could arrive
      63                 :     after register_fd() but before the boolean was set. The reactor and
      64                 :     cancel() both treat registering the same as registered when claiming.
      65                 : 
      66                 :     Whoever atomically exchanges to unregistered "claims" the operation
      67                 :     and is responsible for completing it. The loser sees unregistered and
      68                 :     does nothing. The initiating thread uses compare_exchange to transition
      69                 :     from registering to registered; if this fails, the reactor or cancel
      70                 :     already claimed the op.
      71                 : 
      72                 :     Impl Lifetime Management
      73                 :     ------------------------
      74                 :     When cancel() posts an op to the scheduler's ready queue, the socket impl
      75                 :     might be destroyed before the scheduler processes the op. The `impl_ptr`
      76                 :     member holds a shared_ptr to the impl, keeping it alive until the op
      77                 :     completes.
      78                 : 
      79                 :     EOF Detection
      80                 :     -------------
      81                 :     For reads, 0 bytes with no error means EOF. But an empty user buffer also
      82                 :     returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
      83                 : 
      84                 :     SIGPIPE Prevention
      85                 :     ------------------
      86                 :     Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
      87                 :     SIGPIPE when the peer has closed.
      88                 : */
      89                 : 
      90                 : namespace boost::corosio::detail {
      91                 : 
      92                 : // Forward declarations for cancellation support
      93                 : class select_socket;
      94                 : class select_acceptor;
      95                 : 
      96                 : /** Registration state for async operations.
      97                 : 
      98                 :     Tri-state enum to handle the race between register_fd() and
      99                 :     run_reactor() seeing an event. Setting REGISTERING before
     100                 :     calling register_fd() ensures events delivered during the
     101                 :     registration window are not dropped.
     102                 : */
     103                 : enum class select_registration_state : std::uint8_t
     104                 : {
     105                 :     unregistered, ///< Not registered with reactor
     106                 :     registering,  ///< register_fd() called, not yet confirmed
     107                 :     registered    ///< Fully registered, ready for events
     108                 : };
     109                 : 
     110                 : struct select_op : scheduler_op
     111                 : {
     112                 :     struct canceller
     113                 :     {
     114                 :         select_op* op;
     115                 :         void operator()() const noexcept;
     116                 :     };
     117                 : 
     118                 :     std::coroutine_handle<> h;
     119                 :     capy::executor_ref ex;
     120                 :     std::error_code* ec_out = nullptr;
     121                 :     std::size_t* bytes_out  = nullptr;
     122                 : 
     123                 :     int fd                        = -1;
     124                 :     int errn                      = 0;
     125                 :     std::size_t bytes_transferred = 0;
     126                 : 
     127                 :     std::atomic<bool> cancelled{false};
     128                 :     std::atomic<select_registration_state> registered{
     129                 :         select_registration_state::unregistered};
     130                 :     std::optional<std::stop_callback<canceller>> stop_cb;
     131                 : 
     132                 :     // Prevents use-after-free when socket is closed with pending ops.
     133                 :     std::shared_ptr<void> impl_ptr;
     134                 : 
     135                 :     // For stop_token cancellation - pointer to owning socket/acceptor impl.
     136                 :     select_socket* socket_impl_     = nullptr;
     137                 :     select_acceptor* acceptor_impl_ = nullptr;
     138                 : 
     139 HIT       31093 :     select_op() = default;
     140                 : 
     141          149089 :     void reset() noexcept
     142                 :     {
     143          149089 :         fd                = -1;
     144          149089 :         errn              = 0;
     145          149089 :         bytes_transferred = 0;
     146          149089 :         cancelled.store(false, std::memory_order_relaxed);
     147          149089 :         registered.store(
     148                 :             select_registration_state::unregistered, std::memory_order_relaxed);
     149          149089 :         impl_ptr.reset();
     150          149089 :         socket_impl_   = nullptr;
     151          149089 :         acceptor_impl_ = nullptr;
     152          149089 :     }
     153                 : 
     154          142199 :     void operator()() override
     155                 :     {
     156          142199 :         stop_cb.reset();
     157                 : 
     158          142199 :         if (ec_out)
     159                 :         {
     160          142199 :             if (cancelled.load(std::memory_order_acquire))
     161             199 :                 *ec_out = capy::error::canceled;
     162          142000 :             else if (errn != 0)
     163               1 :                 *ec_out = make_err(errn);
     164          141999 :             else if (is_read_operation() && bytes_transferred == 0)
     165               5 :                 *ec_out = capy::error::eof;
     166                 :             else
     167          141994 :                 *ec_out = {};
     168                 :         }
     169                 : 
     170          142199 :         if (bytes_out)
     171          142199 :             *bytes_out = bytes_transferred;
     172                 : 
     173                 :         // Move to stack before destroying the frame
     174          142199 :         capy::executor_ref saved_ex(ex);
     175          142199 :         std::coroutine_handle<> saved_h(h);
     176          142199 :         impl_ptr.reset();
     177          142199 :         dispatch_coro(saved_ex, saved_h).resume();
     178          142199 :     }
     179                 : 
     180           71013 :     virtual bool is_read_operation() const noexcept
     181                 :     {
     182           71013 :         return false;
     183                 :     }
     184                 :     virtual void cancel() noexcept = 0;
     185                 : 
     186 MIS           0 :     void destroy() override
     187                 :     {
     188               0 :         stop_cb.reset();
     189               0 :         impl_ptr.reset();
     190               0 :     }
     191                 : 
     192 HIT       93963 :     void request_cancel() noexcept
     193                 :     {
     194           93963 :         cancelled.store(true, std::memory_order_release);
     195           93963 :     }
     196                 : 
     197                 :     // NOLINTNEXTLINE(performance-unnecessary-value-param)
     198                 :     void start(std::stop_token token)
     199                 :     {
     200                 :         cancelled.store(false, std::memory_order_release);
     201                 :         stop_cb.reset();
     202                 :         socket_impl_   = nullptr;
     203                 :         acceptor_impl_ = nullptr;
     204                 : 
     205                 :         if (token.stop_possible())
     206                 :             stop_cb.emplace(token, canceller{this});
     207                 :     }
     208                 : 
     209                 :     // NOLINTNEXTLINE(performance-unnecessary-value-param)
     210          145643 :     void start(std::stop_token token, select_socket* impl)
     211                 :     {
     212          145643 :         cancelled.store(false, std::memory_order_release);
     213          145643 :         stop_cb.reset();
     214          145643 :         socket_impl_   = impl;
     215          145643 :         acceptor_impl_ = nullptr;
     216                 : 
     217          145643 :         if (token.stop_possible())
     218              99 :             stop_cb.emplace(token, canceller{this});
     219          145643 :     }
     220                 : 
     221                 :     // NOLINTNEXTLINE(performance-unnecessary-value-param)
     222            3446 :     void start(std::stop_token token, select_acceptor* impl)
     223                 :     {
     224            3446 :         cancelled.store(false, std::memory_order_release);
     225            3446 :         stop_cb.reset();
     226            3446 :         socket_impl_   = nullptr;
     227            3446 :         acceptor_impl_ = impl;
     228                 : 
     229            3446 :         if (token.stop_possible())
     230 MIS           0 :             stop_cb.emplace(token, canceller{this});
     231 HIT        3446 :     }
     232                 : 
     233          148927 :     void complete(int err, std::size_t bytes) noexcept
     234                 :     {
     235          148927 :         errn              = err;
     236          148927 :         bytes_transferred = bytes;
     237          148927 :     }
     238                 : 
     239 MIS           0 :     virtual void perform_io() noexcept {}
     240                 : };
     241                 : 
     242                 : struct select_connect_op final : select_op
     243                 : {
     244                 :     endpoint target_endpoint;
     245                 : 
     246 HIT        3444 :     void reset() noexcept
     247                 :     {
     248            3444 :         select_op::reset();
     249            3444 :         target_endpoint = endpoint{};
     250            3444 :     }
     251                 : 
     252            3444 :     void perform_io() noexcept override
     253                 :     {
     254                 :         // connect() completion status is retrieved via SO_ERROR, not return value
     255            3444 :         int err       = 0;
     256            3444 :         socklen_t len = sizeof(err);
     257            3444 :         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     258 MIS           0 :             err = errno;
     259 HIT        3444 :         complete(err, 0);
     260            3444 :     }
     261                 : 
     262                 :     // Defined in sockets.cpp where select_socket is complete
     263                 :     void operator()() override;
     264                 :     void cancel() noexcept override;
     265                 : };
     266                 : 
     267                 : struct select_read_op final : select_op
     268                 : {
     269                 :     static constexpr std::size_t max_buffers = 16;
     270                 :     iovec iovecs[max_buffers];
     271                 :     int iovec_count        = 0;
     272                 :     bool empty_buffer_read = false;
     273                 : 
     274           70986 :     bool is_read_operation() const noexcept override
     275                 :     {
     276           70986 :         return !empty_buffer_read;
     277                 :     }
     278                 : 
     279           71181 :     void reset() noexcept
     280                 :     {
     281           71181 :         select_op::reset();
     282           71181 :         iovec_count       = 0;
     283           71181 :         empty_buffer_read = false;
     284           71181 :     }
     285                 : 
     286             123 :     void perform_io() noexcept override
     287                 :     {
     288             123 :         ssize_t n = ::readv(fd, iovecs, iovec_count);
     289             123 :         if (n >= 0)
     290             123 :             complete(0, static_cast<std::size_t>(n));
     291                 :         else
     292 MIS           0 :             complete(errno, 0);
     293 HIT         123 :     }
     294                 : 
     295                 :     void cancel() noexcept override;
     296                 : };
     297                 : 
     298                 : struct select_write_op final : select_op
     299                 : {
     300                 :     static constexpr std::size_t max_buffers = 16;
     301                 :     iovec iovecs[max_buffers];
     302                 :     int iovec_count = 0;
     303                 : 
     304           71018 :     void reset() noexcept
     305                 :     {
     306           71018 :         select_op::reset();
     307           71018 :         iovec_count = 0;
     308           71018 :     }
     309                 : 
     310 MIS           0 :     void perform_io() noexcept override
     311                 :     {
     312               0 :         msghdr msg{};
     313               0 :         msg.msg_iov    = iovecs;
     314               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     315                 : 
     316               0 :         ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
     317               0 :         if (n >= 0)
     318               0 :             complete(0, static_cast<std::size_t>(n));
     319                 :         else
     320               0 :             complete(errno, 0);
     321               0 :     }
     322                 : 
     323                 :     void cancel() noexcept override;
     324                 : };
     325                 : 
     326                 : struct select_accept_op final : select_op
     327                 : {
     328                 :     int accepted_fd                      = -1;
     329                 :     io_object::implementation* peer_impl = nullptr;
     330                 :     io_object::implementation** impl_out = nullptr;
     331                 : 
     332 HIT        3446 :     void reset() noexcept
     333                 :     {
     334            3446 :         select_op::reset();
     335            3446 :         accepted_fd = -1;
     336            3446 :         peer_impl   = nullptr;
     337            3446 :         impl_out    = nullptr;
     338            3446 :     }
     339                 : 
     340            3441 :     void perform_io() noexcept override
     341                 :     {
     342            3441 :         sockaddr_in addr{};
     343            3441 :         socklen_t addrlen = sizeof(addr);
     344                 : 
     345                 :         // Note: select backend uses accept() + fcntl instead of accept4()
     346                 :         // for broader POSIX compatibility
     347            3441 :         int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
     348                 : 
     349            3441 :         if (new_fd >= 0)
     350                 :         {
     351                 :             // Reject fds that exceed select()'s FD_SETSIZE limit.
     352                 :             // Better to fail now than during later async operations.
     353            3441 :             if (new_fd >= FD_SETSIZE)
     354                 :             {
     355 MIS           0 :                 ::close(new_fd);
     356               0 :                 complete(EINVAL, 0);
     357               0 :                 return;
     358                 :             }
     359                 : 
     360                 :             // Set non-blocking and close-on-exec flags.
     361                 :             // A non-blocking socket is essential for the async reactor;
     362                 :             // if we can't configure it, fail rather than risk blocking.
     363 HIT        3441 :             int flags = ::fcntl(new_fd, F_GETFL, 0);
     364            3441 :             if (flags == -1)
     365                 :             {
     366 MIS           0 :                 int err = errno;
     367               0 :                 ::close(new_fd);
     368               0 :                 complete(err, 0);
     369               0 :                 return;
     370                 :             }
     371                 : 
     372 HIT        3441 :             if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
     373                 :             {
     374 MIS           0 :                 int err = errno;
     375               0 :                 ::close(new_fd);
     376               0 :                 complete(err, 0);
     377               0 :                 return;
     378                 :             }
     379                 : 
     380 HIT        3441 :             if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
     381                 :             {
     382 MIS           0 :                 int err = errno;
     383               0 :                 ::close(new_fd);
     384               0 :                 complete(err, 0);
     385               0 :                 return;
     386                 :             }
     387                 : 
     388 HIT        3441 :             accepted_fd = new_fd;
     389            3441 :             complete(0, 0);
     390                 :         }
     391                 :         else
     392                 :         {
     393 MIS           0 :             complete(errno, 0);
     394                 :         }
     395                 :     }
     396                 : 
     397                 :     // Defined in acceptors.cpp where select_acceptor is complete
     398                 :     void operator()() override;
     399                 :     void cancel() noexcept override;
     400                 : };
     401                 : 
     402                 : } // namespace boost::corosio::detail
     403                 : 
     404                 : #endif // BOOST_COROSIO_HAS_SELECT
     405                 : 
     406                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
        

Generated by: LCOV version 2.3