TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/io/io_object.hpp>
19 : #include <boost/corosio/endpoint.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <coroutine>
22 : #include <boost/capy/error.hpp>
23 : #include <system_error>
24 :
25 : #include <boost/corosio/detail/make_err.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/scheduler_op.hpp>
28 : #include <boost/corosio/detail/endpoint_convert.hpp>
29 :
30 : #include <unistd.h>
31 : #include <errno.h>
32 : #include <fcntl.h>
33 :
34 : #include <atomic>
35 : #include <cstddef>
36 : #include <memory>
37 : #include <optional>
38 : #include <stop_token>
39 :
40 : #include <netinet/in.h>
41 : #include <sys/select.h>
42 : #include <sys/socket.h>
43 : #include <sys/uio.h>
44 :
45 : /*
46 : select Operation State
47 : ======================
48 :
49 : Each async I/O operation has a corresponding select_op-derived struct that
50 : holds the operation's state while it's in flight. The socket impl owns
51 : fixed slots for each operation type (conn_, rd_, wr_), so only one
52 : operation of each type can be pending per socket at a time.
53 :
54 : This mirrors the epoll_op design for consistency across backends.
55 :
56 : Completion vs Cancellation Race
57 : -------------------------------
58 : The `registered` atomic uses a tri-state (unregistered, registering,
59 : registered) to handle two races: (1) between register_fd() and the
60 : reactor seeing an event, and (2) between reactor completion and cancel().
61 :
62 : The registering state closes the window where an event could arrive
63 : after register_fd() but before the boolean was set. The reactor and
64 : cancel() both treat registering the same as registered when claiming.
65 :
66 : Whoever atomically exchanges to unregistered "claims" the operation
67 : and is responsible for completing it. The loser sees unregistered and
68 : does nothing. The initiating thread uses compare_exchange to transition
69 : from registering to registered; if this fails, the reactor or cancel
70 : already claimed the op.
71 :
72 : Impl Lifetime Management
73 : ------------------------
74 : When cancel() posts an op to the scheduler's ready queue, the socket impl
75 : might be destroyed before the scheduler processes the op. The `impl_ptr`
76 : member holds a shared_ptr to the impl, keeping it alive until the op
77 : completes.
78 :
79 : EOF Detection
80 : -------------
81 : For reads, 0 bytes with no error means EOF. But an empty user buffer also
82 : returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
83 :
84 : SIGPIPE Prevention
85 : ------------------
86 : Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
87 : SIGPIPE when the peer has closed.
88 : */
89 :
90 : namespace boost::corosio::detail {
91 :
92 : // Forward declarations for cancellation support
93 : class select_socket;
94 : class select_acceptor;
95 :
96 : /** Registration state for async operations.
97 :
98 : Tri-state enum to handle the race between register_fd() and
99 : run_reactor() seeing an event. Setting REGISTERING before
100 : calling register_fd() ensures events delivered during the
101 : registration window are not dropped.
102 : */
103 : enum class select_registration_state : std::uint8_t
104 : {
105 : unregistered, ///< Not registered with reactor
106 : registering, ///< register_fd() called, not yet confirmed
107 : registered ///< Fully registered, ready for events
108 : };
109 :
110 : struct select_op : scheduler_op
111 : {
112 : struct canceller
113 : {
114 : select_op* op;
115 : void operator()() const noexcept;
116 : };
117 :
118 : std::coroutine_handle<> h;
119 : capy::executor_ref ex;
120 : std::error_code* ec_out = nullptr;
121 : std::size_t* bytes_out = nullptr;
122 :
123 : int fd = -1;
124 : int errn = 0;
125 : std::size_t bytes_transferred = 0;
126 :
127 : std::atomic<bool> cancelled{false};
128 : std::atomic<select_registration_state> registered{
129 : select_registration_state::unregistered};
130 : std::optional<std::stop_callback<canceller>> stop_cb;
131 :
132 : // Prevents use-after-free when socket is closed with pending ops.
133 : std::shared_ptr<void> impl_ptr;
134 :
135 : // For stop_token cancellation - pointer to owning socket/acceptor impl.
136 : select_socket* socket_impl_ = nullptr;
137 : select_acceptor* acceptor_impl_ = nullptr;
138 :
139 HIT 31093 : select_op() = default;
140 :
141 149089 : void reset() noexcept
142 : {
143 149089 : fd = -1;
144 149089 : errn = 0;
145 149089 : bytes_transferred = 0;
146 149089 : cancelled.store(false, std::memory_order_relaxed);
147 149089 : registered.store(
148 : select_registration_state::unregistered, std::memory_order_relaxed);
149 149089 : impl_ptr.reset();
150 149089 : socket_impl_ = nullptr;
151 149089 : acceptor_impl_ = nullptr;
152 149089 : }
153 :
154 142199 : void operator()() override
155 : {
156 142199 : stop_cb.reset();
157 :
158 142199 : if (ec_out)
159 : {
160 142199 : if (cancelled.load(std::memory_order_acquire))
161 199 : *ec_out = capy::error::canceled;
162 142000 : else if (errn != 0)
163 1 : *ec_out = make_err(errn);
164 141999 : else if (is_read_operation() && bytes_transferred == 0)
165 5 : *ec_out = capy::error::eof;
166 : else
167 141994 : *ec_out = {};
168 : }
169 :
170 142199 : if (bytes_out)
171 142199 : *bytes_out = bytes_transferred;
172 :
173 : // Move to stack before destroying the frame
174 142199 : capy::executor_ref saved_ex(ex);
175 142199 : std::coroutine_handle<> saved_h(h);
176 142199 : impl_ptr.reset();
177 142199 : dispatch_coro(saved_ex, saved_h).resume();
178 142199 : }
179 :
180 71013 : virtual bool is_read_operation() const noexcept
181 : {
182 71013 : return false;
183 : }
184 : virtual void cancel() noexcept = 0;
185 :
186 MIS 0 : void destroy() override
187 : {
188 0 : stop_cb.reset();
189 0 : impl_ptr.reset();
190 0 : }
191 :
192 HIT 93963 : void request_cancel() noexcept
193 : {
194 93963 : cancelled.store(true, std::memory_order_release);
195 93963 : }
196 :
197 : // NOLINTNEXTLINE(performance-unnecessary-value-param)
198 : void start(std::stop_token token)
199 : {
200 : cancelled.store(false, std::memory_order_release);
201 : stop_cb.reset();
202 : socket_impl_ = nullptr;
203 : acceptor_impl_ = nullptr;
204 :
205 : if (token.stop_possible())
206 : stop_cb.emplace(token, canceller{this});
207 : }
208 :
209 : // NOLINTNEXTLINE(performance-unnecessary-value-param)
210 145643 : void start(std::stop_token token, select_socket* impl)
211 : {
212 145643 : cancelled.store(false, std::memory_order_release);
213 145643 : stop_cb.reset();
214 145643 : socket_impl_ = impl;
215 145643 : acceptor_impl_ = nullptr;
216 :
217 145643 : if (token.stop_possible())
218 99 : stop_cb.emplace(token, canceller{this});
219 145643 : }
220 :
221 : // NOLINTNEXTLINE(performance-unnecessary-value-param)
222 3446 : void start(std::stop_token token, select_acceptor* impl)
223 : {
224 3446 : cancelled.store(false, std::memory_order_release);
225 3446 : stop_cb.reset();
226 3446 : socket_impl_ = nullptr;
227 3446 : acceptor_impl_ = impl;
228 :
229 3446 : if (token.stop_possible())
230 MIS 0 : stop_cb.emplace(token, canceller{this});
231 HIT 3446 : }
232 :
233 148927 : void complete(int err, std::size_t bytes) noexcept
234 : {
235 148927 : errn = err;
236 148927 : bytes_transferred = bytes;
237 148927 : }
238 :
239 MIS 0 : virtual void perform_io() noexcept {}
240 : };
241 :
242 : struct select_connect_op final : select_op
243 : {
244 : endpoint target_endpoint;
245 :
246 HIT 3444 : void reset() noexcept
247 : {
248 3444 : select_op::reset();
249 3444 : target_endpoint = endpoint{};
250 3444 : }
251 :
252 3444 : void perform_io() noexcept override
253 : {
254 : // connect() completion status is retrieved via SO_ERROR, not return value
255 3444 : int err = 0;
256 3444 : socklen_t len = sizeof(err);
257 3444 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
258 MIS 0 : err = errno;
259 HIT 3444 : complete(err, 0);
260 3444 : }
261 :
262 : // Defined in sockets.cpp where select_socket is complete
263 : void operator()() override;
264 : void cancel() noexcept override;
265 : };
266 :
267 : struct select_read_op final : select_op
268 : {
269 : static constexpr std::size_t max_buffers = 16;
270 : iovec iovecs[max_buffers];
271 : int iovec_count = 0;
272 : bool empty_buffer_read = false;
273 :
274 70986 : bool is_read_operation() const noexcept override
275 : {
276 70986 : return !empty_buffer_read;
277 : }
278 :
279 71181 : void reset() noexcept
280 : {
281 71181 : select_op::reset();
282 71181 : iovec_count = 0;
283 71181 : empty_buffer_read = false;
284 71181 : }
285 :
286 123 : void perform_io() noexcept override
287 : {
288 123 : ssize_t n = ::readv(fd, iovecs, iovec_count);
289 123 : if (n >= 0)
290 123 : complete(0, static_cast<std::size_t>(n));
291 : else
292 MIS 0 : complete(errno, 0);
293 HIT 123 : }
294 :
295 : void cancel() noexcept override;
296 : };
297 :
298 : struct select_write_op final : select_op
299 : {
300 : static constexpr std::size_t max_buffers = 16;
301 : iovec iovecs[max_buffers];
302 : int iovec_count = 0;
303 :
304 71018 : void reset() noexcept
305 : {
306 71018 : select_op::reset();
307 71018 : iovec_count = 0;
308 71018 : }
309 :
310 MIS 0 : void perform_io() noexcept override
311 : {
312 0 : msghdr msg{};
313 0 : msg.msg_iov = iovecs;
314 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
315 :
316 0 : ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
317 0 : if (n >= 0)
318 0 : complete(0, static_cast<std::size_t>(n));
319 : else
320 0 : complete(errno, 0);
321 0 : }
322 :
323 : void cancel() noexcept override;
324 : };
325 :
326 : struct select_accept_op final : select_op
327 : {
328 : int accepted_fd = -1;
329 : io_object::implementation* peer_impl = nullptr;
330 : io_object::implementation** impl_out = nullptr;
331 :
332 HIT 3446 : void reset() noexcept
333 : {
334 3446 : select_op::reset();
335 3446 : accepted_fd = -1;
336 3446 : peer_impl = nullptr;
337 3446 : impl_out = nullptr;
338 3446 : }
339 :
340 3441 : void perform_io() noexcept override
341 : {
342 3441 : sockaddr_in addr{};
343 3441 : socklen_t addrlen = sizeof(addr);
344 :
345 : // Note: select backend uses accept() + fcntl instead of accept4()
346 : // for broader POSIX compatibility
347 3441 : int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
348 :
349 3441 : if (new_fd >= 0)
350 : {
351 : // Reject fds that exceed select()'s FD_SETSIZE limit.
352 : // Better to fail now than during later async operations.
353 3441 : if (new_fd >= FD_SETSIZE)
354 : {
355 MIS 0 : ::close(new_fd);
356 0 : complete(EINVAL, 0);
357 0 : return;
358 : }
359 :
360 : // Set non-blocking and close-on-exec flags.
361 : // A non-blocking socket is essential for the async reactor;
362 : // if we can't configure it, fail rather than risk blocking.
363 HIT 3441 : int flags = ::fcntl(new_fd, F_GETFL, 0);
364 3441 : if (flags == -1)
365 : {
366 MIS 0 : int err = errno;
367 0 : ::close(new_fd);
368 0 : complete(err, 0);
369 0 : return;
370 : }
371 :
372 HIT 3441 : if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
373 : {
374 MIS 0 : int err = errno;
375 0 : ::close(new_fd);
376 0 : complete(err, 0);
377 0 : return;
378 : }
379 :
380 HIT 3441 : if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
381 : {
382 MIS 0 : int err = errno;
383 0 : ::close(new_fd);
384 0 : complete(err, 0);
385 0 : return;
386 : }
387 :
388 HIT 3441 : accepted_fd = new_fd;
389 3441 : complete(0, 0);
390 : }
391 : else
392 : {
393 MIS 0 : complete(errno, 0);
394 : }
395 : }
396 :
397 : // Defined in acceptors.cpp where select_acceptor is complete
398 : void operator()() override;
399 : void cancel() noexcept override;
400 : };
401 :
402 : } // namespace boost::corosio::detail
403 :
404 : #endif // BOOST_COROSIO_HAS_SELECT
405 :
406 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
|