TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_TEST_READ_STREAM_HPP
12 : #define BOOST_CAPY_TEST_READ_STREAM_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/make_buffer.hpp>
18 : #include <boost/capy/cond.hpp>
19 : #include <coroutine>
20 : #include <boost/capy/ex/io_env.hpp>
21 : #include <boost/capy/io_result.hpp>
22 : #include <boost/capy/test/fuse.hpp>
23 :
24 : #include <string>
25 : #include <string_view>
26 :
27 : namespace boost {
28 : namespace capy {
29 : namespace test {
30 :
31 : /** A mock stream for testing read operations.
32 :
33 : Use this to verify code that performs reads without needing
34 : real I/O. Call @ref provide to supply data, then @ref read_some
35 : to consume it. The associated @ref fuse enables error injection
36 : at controlled points. An optional `max_read_size` constructor
37 : parameter limits bytes per read to simulate chunked delivery.
38 :
39 : This class satisfies the @ref ReadStream concept.
40 :
41 : @par Thread Safety
42 : Not thread-safe.
43 :
44 : @par Example
45 : @code
46 : fuse f;
47 : read_stream rs( f );
48 : rs.provide( "Hello, " );
49 : rs.provide( "World!" );
50 :
51 : auto r = f.armed( [&]( fuse& ) -> task<void> {
52 : char buf[32];
53 : auto [ec, n] = co_await rs.read_some(
54 : mutable_buffer( buf, sizeof( buf ) ) );
55 : if( ec )
56 : co_return;
57 : // buf contains "Hello, World!"
58 : } );
59 : @endcode
60 :
61 : @see fuse, ReadStream
62 : */
63 : class read_stream
64 : {
65 : fuse f_;
66 : std::string data_;
67 : std::size_t pos_ = 0;
68 : std::size_t max_read_size_;
69 :
70 : public:
71 : /** Construct a read stream.
72 :
73 : @param f The fuse used to inject errors during reads.
74 :
75 : @param max_read_size Maximum bytes returned per read.
76 : Use to simulate chunked network delivery.
77 : */
78 HIT 1457 : explicit read_stream(
79 : fuse f = {},
80 : std::size_t max_read_size = std::size_t(-1)) noexcept
81 1457 : : f_(std::move(f))
82 1457 : , max_read_size_(max_read_size)
83 : {
84 1457 : }
85 :
86 : /** Append data to be returned by subsequent reads.
87 :
88 : Multiple calls accumulate data that @ref read_some returns.
89 :
90 : @param sv The data to append.
91 : */
92 : void
93 1423 : provide(std::string_view sv)
94 : {
95 1423 : data_.append(sv);
96 1423 : }
97 :
98 : /// Clear all data and reset the read position.
99 : void
100 6 : clear() noexcept
101 : {
102 6 : data_.clear();
103 6 : pos_ = 0;
104 6 : }
105 :
106 : /// Return the number of bytes available for reading.
107 : std::size_t
108 20 : available() const noexcept
109 : {
110 20 : return data_.size() - pos_;
111 : }
112 :
113 : /** Asynchronously read data from the stream.
114 :
115 : Transfers up to `buffer_size( buffers )` bytes from the internal
116 : buffer to the provided mutable buffer sequence. If no data remains,
117 : returns `error::eof`. Before every read, the attached @ref fuse is
118 : consulted to possibly inject an error for testing fault scenarios.
119 : The returned `std::size_t` is the number of bytes transferred.
120 :
121 : @par Effects
122 : On success, advances the internal read position by the number of
123 : bytes copied. If an error is injected by the fuse, the read position
124 : remains unchanged.
125 :
126 : @par Exception Safety
127 : No-throw guarantee.
128 :
129 : @par Cancellation
130 : If the environment's stop token has been requested, the read
131 : completes immediately with `error::canceled` and transfers no
132 : data. This lets code under test exercise its cancellation paths.
133 : An empty buffer sequence is a no-op that completes successfully
134 : regardless of the stop token.
135 :
136 : @param buffers The mutable buffer sequence to receive data.
137 :
138 : @return An awaitable that await-returns `(error_code,std::size_t)`.
139 :
140 : @see fuse
141 : */
142 : template<MutableBufferSequence MB>
143 : auto
144 1804 : read_some(MB buffers)
145 : {
146 : struct awaitable
147 : {
148 : read_stream* self_;
149 : MB buffers_;
150 : bool canceled_ = false;
151 :
152 1804 : bool await_ready() const noexcept { return false; }
153 :
154 : // The operation completes synchronously, but await_suspend
155 : // is the only place io_env is delivered (the promise's
156 : // transform_awaiter forwards it here). Returning false means
157 : // the coroutine does not actually suspend — it resumes
158 : // immediately — so the read still completes synchronously
159 : // while having observed the stop token. See io_env, IoAwaitable.
160 : bool
161 1804 : await_suspend(
162 : std::coroutine_handle<>,
163 : io_env const* env) noexcept
164 : {
165 1804 : canceled_ = env->stop_token.stop_requested();
166 1804 : return false;
167 : }
168 :
169 : io_result<std::size_t>
170 1804 : await_resume()
171 : {
172 : // Empty buffer is a no-op regardless of
173 : // stream state, stop token, or fuse.
174 1804 : if(buffer_empty(buffers_))
175 7 : return {{}, 0};
176 :
177 1797 : if(canceled_)
178 1 : return {error::canceled, 0};
179 :
180 1796 : auto ec = self_->f_.maybe_fail();
181 1563 : if(ec)
182 233 : return {ec, 0};
183 :
184 1330 : if(self_->pos_ >= self_->data_.size())
185 115 : return {error::eof, 0};
186 :
187 1215 : std::size_t avail = self_->data_.size() - self_->pos_;
188 1215 : if(avail > self_->max_read_size_)
189 276 : avail = self_->max_read_size_;
190 1215 : auto src = make_buffer(self_->data_.data() + self_->pos_, avail);
191 1215 : std::size_t const n = buffer_copy(buffers_, src);
192 1215 : self_->pos_ += n;
193 1215 : return {{}, n};
194 : }
195 : };
196 1804 : return awaitable{this, buffers};
197 : }
198 : };
199 :
200 : } // test
201 : } // capy
202 : } // boost
203 :
204 : #endif
|