1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Steve Gerbino
3  
// Copyright (c) 2026 Steve Gerbino
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/cppalliance/corosio
8  
// Official repository: https://github.com/cppalliance/corosio
9  
//
9  
//
10  

10  

11  
#ifndef BOOST_COROSIO_IO_IO_STREAM_HPP
11  
#ifndef BOOST_COROSIO_IO_IO_STREAM_HPP
12  
#define BOOST_COROSIO_IO_IO_STREAM_HPP
12  
#define BOOST_COROSIO_IO_IO_STREAM_HPP
13  

13  

14  
#include <boost/corosio/detail/config.hpp>
14  
#include <boost/corosio/detail/config.hpp>
15  
#include <boost/corosio/io/io_read_stream.hpp>
15  
#include <boost/corosio/io/io_read_stream.hpp>
16  
#include <boost/corosio/io/io_write_stream.hpp>
16  
#include <boost/corosio/io/io_write_stream.hpp>
17  
#include <boost/corosio/io_buffer_param.hpp>
17  
#include <boost/corosio/io_buffer_param.hpp>
18  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <boost/capy/ex/executor_ref.hpp>
19  

19  

20  
#include <coroutine>
20  
#include <coroutine>
21  
#include <cstddef>
21  
#include <cstddef>
22  
#include <stop_token>
22  
#include <stop_token>
23  
#include <system_error>
23  
#include <system_error>
24  

24  

25  
namespace boost::corosio {
25  
namespace boost::corosio {
26  

26  

27  
/** Platform stream with read/write operations.
27  
/** Platform stream with read/write operations.
28  

28  

29  
    Combines @ref io_read_stream and @ref io_write_stream into
29  
    Combines @ref io_read_stream and @ref io_write_stream into
30  
    a single bidirectional stream. The `read_some` and `write_some`
30  
    a single bidirectional stream. The `read_some` and `write_some`
31  
    operations are inherited from the base classes and dispatch
31  
    operations are inherited from the base classes and dispatch
32  
    through `do_read_some` / `do_write_some`, which this class
32  
    through `do_read_some` / `do_write_some`, which this class
33  
    implements by forwarding to the platform `implementation`.
33  
    implements by forwarding to the platform `implementation`.
34  

34  

35  
    The implementation hierarchy stays linear (no diamond):
35  
    The implementation hierarchy stays linear (no diamond):
36  
    `io_object::implementation` -> `io_stream::implementation`
36  
    `io_object::implementation` -> `io_stream::implementation`
37  
    -> `tcp_socket::implementation` -> backend impl.
37  
    -> `tcp_socket::implementation` -> backend impl.
38  

38  

39  
    @par Semantics
39  
    @par Semantics
40  
    Concrete classes wrap direct platform I/O completed by the kernel.
40  
    Concrete classes wrap direct platform I/O completed by the kernel.
41  
    Functions taking `io_stream&` signal "platform implementation
41  
    Functions taking `io_stream&` signal "platform implementation
42  
    required" - use this when you need actual kernel I/O rather than
42  
    required" - use this when you need actual kernel I/O rather than
43  
    a mock or test double.
43  
    a mock or test double.
44  

44  

45  
    For generic stream algorithms that work with test mocks,
45  
    For generic stream algorithms that work with test mocks,
46  
    use `template<capy::Stream S>` instead of `io_stream&`.
46  
    use `template<capy::Stream S>` instead of `io_stream&`.
47  

47  

48  
    @par Thread Safety
48  
    @par Thread Safety
49  
    Distinct objects: Safe.
49  
    Distinct objects: Safe.
50  
    Shared objects: Unsafe. All calls to a single stream must be made
50  
    Shared objects: Unsafe. All calls to a single stream must be made
51  
    from the same implicit or explicit serialization context.
51  
    from the same implicit or explicit serialization context.
52  

52  

53  
    @par Example
53  
    @par Example
54  
    @code
54  
    @code
55  
    // Read until buffer full or EOF
55  
    // Read until buffer full or EOF
56  
    capy::task<> read_all( io_stream& stream, std::span<char> buf )
56  
    capy::task<> read_all( io_stream& stream, std::span<char> buf )
57  
    {
57  
    {
58  
        std::size_t total = 0;
58  
        std::size_t total = 0;
59  
        while( total < buf.size() )
59  
        while( total < buf.size() )
60  
        {
60  
        {
61  
            auto [ec, n] = co_await stream.read_some(
61  
            auto [ec, n] = co_await stream.read_some(
62  
                capy::buffer( buf.data() + total, buf.size() - total ) );
62  
                capy::buffer( buf.data() + total, buf.size() - total ) );
63  
            if( ec == capy::cond::eof )
63  
            if( ec == capy::cond::eof )
64  
                break;
64  
                break;
65  
            if( ec.failed() )
65  
            if( ec.failed() )
66  
                capy::detail::throw_system_error( ec );
66  
                capy::detail::throw_system_error( ec );
67  
            total += n;
67  
            total += n;
68  
        }
68  
        }
69  
    }
69  
    }
70  
    @endcode
70  
    @endcode
71  

71  

72  
    @see io_read_stream, io_write_stream, tcp_socket
72  
    @see io_read_stream, io_write_stream, tcp_socket
73  
*/
73  
*/
74  
class BOOST_COROSIO_DECL io_stream
74  
class BOOST_COROSIO_DECL io_stream
75  
    : public io_read_stream
75  
    : public io_read_stream
76  
    , public io_write_stream
76  
    , public io_write_stream
77  
{
77  
{
78  
public:
78  
public:
79  
    /** Platform-specific stream implementation interface.
79  
    /** Platform-specific stream implementation interface.
80  

80  

81  
        Derived classes implement this interface to provide kernel-level
81  
        Derived classes implement this interface to provide kernel-level
82  
        read and write operations for each supported platform (IOCP,
82  
        read and write operations for each supported platform (IOCP,
83  
        epoll, kqueue, io_uring).
83  
        epoll, kqueue, io_uring).
84  
    */
84  
    */
85  
    struct implementation : io_object::implementation
85  
    struct implementation : io_object::implementation
86  
    {
86  
    {
87  
        /// Initiate platform read operation.
87  
        /// Initiate platform read operation.
88  
        virtual std::coroutine_handle<> read_some(
88  
        virtual std::coroutine_handle<> read_some(
89  
            std::coroutine_handle<>,
89  
            std::coroutine_handle<>,
90  
            capy::executor_ref,
90  
            capy::executor_ref,
91  
            io_buffer_param,
91  
            io_buffer_param,
92  
            std::stop_token,
92  
            std::stop_token,
93  
            std::error_code*,
93  
            std::error_code*,
94  
            std::size_t*) = 0;
94  
            std::size_t*) = 0;
95  

95  

96  
        /// Initiate platform write operation.
96  
        /// Initiate platform write operation.
97  
        virtual std::coroutine_handle<> write_some(
97  
        virtual std::coroutine_handle<> write_some(
98  
            std::coroutine_handle<>,
98  
            std::coroutine_handle<>,
99  
            capy::executor_ref,
99  
            capy::executor_ref,
100  
            io_buffer_param,
100  
            io_buffer_param,
101  
            std::stop_token,
101  
            std::stop_token,
102  
            std::error_code*,
102  
            std::error_code*,
103  
            std::size_t*) = 0;
103  
            std::size_t*) = 0;
104  
    };
104  
    };
105  

105  

106  
protected:
106  
protected:
107  
    io_stream() noexcept = default;
107  
    io_stream() noexcept = default;
108  

108  

109  
    /// Construct stream from a handle.
109  
    /// Construct stream from a handle.
110  
    explicit io_stream(handle h) noexcept : io_object(std::move(h)) {}
110  
    explicit io_stream(handle h) noexcept : io_object(std::move(h)) {}
111  

111  

112  
    /// Dispatch read through implementation vtable.
112  
    /// Dispatch read through implementation vtable.
113  
    std::coroutine_handle<> do_read_some(
113  
    std::coroutine_handle<> do_read_some(
114  
        std::coroutine_handle<> h,
114  
        std::coroutine_handle<> h,
115  
        capy::executor_ref ex,
115  
        capy::executor_ref ex,
116  
        io_buffer_param buffers,
116  
        io_buffer_param buffers,
117  
        std::stop_token token,
117  
        std::stop_token token,
118  
        std::error_code* ec,
118  
        std::error_code* ec,
119  
        std::size_t* bytes) override
119  
        std::size_t* bytes) override
120  
    {
120  
    {
121  
        return get().read_some(h, ex, buffers, std::move(token), ec, bytes);
121  
        return get().read_some(h, ex, buffers, std::move(token), ec, bytes);
122  
    }
122  
    }
123  

123  

124  
    /// Dispatch write through implementation vtable.
124  
    /// Dispatch write through implementation vtable.
125  
    std::coroutine_handle<> do_write_some(
125  
    std::coroutine_handle<> do_write_some(
126  
        std::coroutine_handle<> h,
126  
        std::coroutine_handle<> h,
127  
        capy::executor_ref ex,
127  
        capy::executor_ref ex,
128  
        io_buffer_param buffers,
128  
        io_buffer_param buffers,
129  
        std::stop_token token,
129  
        std::stop_token token,
130  
        std::error_code* ec,
130  
        std::error_code* ec,
131  
        std::size_t* bytes) override
131  
        std::size_t* bytes) override
132  
    {
132  
    {
133  
        return get().write_some(h, ex, buffers, std::move(token), ec, bytes);
133  
        return get().write_some(h, ex, buffers, std::move(token), ec, bytes);
134  
    }
134  
    }
135  

135  

136  
private:
136  
private:
137  
    /// Return implementation downcasted to stream interface.
137  
    /// Return implementation downcasted to stream interface.
138  
    implementation& get() const noexcept
138  
    implementation& get() const noexcept
139  
    {
139  
    {
140  
        return *static_cast<implementation*>(h_.get());
140  
        return *static_cast<implementation*>(h_.get());
141  
    }
141  
    }
142  
};
142  
};
143  

143  

144  
} // namespace boost::corosio
144  
} // namespace boost::corosio
145  

145  

146  
#endif
146  
#endif