TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_TEST_READ_SOURCE_HPP
12 : #define BOOST_CAPY_TEST_READ_SOURCE_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/make_buffer.hpp>
18 : #include <coroutine>
19 : #include <boost/capy/ex/io_env.hpp>
20 : #include <boost/capy/io_result.hpp>
21 : #include <boost/capy/error.hpp>
22 : #include <boost/capy/test/fuse.hpp>
23 :
24 : #include <string>
25 : #include <string_view>
26 :
27 : namespace boost {
28 : namespace capy {
29 : namespace test {
30 :
31 : /** A mock source for testing read operations.
32 :
33 : Use this to verify code that performs complete reads without needing
34 : real I/O. Call @ref provide to supply data, then @ref read
35 : to consume it. The associated @ref fuse enables error injection
36 : at controlled points.
37 :
38 : This class satisfies the @ref ReadSource concept by providing both
39 : partial reads via `read_some` (satisfying @ref ReadStream) and
40 : complete reads via `read` that fill the entire buffer sequence
41 : before returning.
42 :
43 : @par Thread Safety
44 : Not thread-safe.
45 :
46 : @par Example
47 : @code
48 : fuse f;
49 : read_source rs( f );
50 : rs.provide( "Hello, " );
51 : rs.provide( "World!" );
52 :
53 : auto r = f.armed( [&]( fuse& ) -> task<void> {
54 : char buf[32];
55 : auto [ec, n] = co_await rs.read(
56 : mutable_buffer( buf, sizeof( buf ) ) );
57 : if( ec )
58 : co_return;
59 : // buf contains "Hello, World!"
60 : } );
61 : @endcode
62 :
63 : @see fuse, ReadSource
64 : */
65 : class read_source
66 : {
67 : fuse f_;
68 : std::string data_;
69 : std::size_t pos_ = 0;
70 : std::size_t max_read_size_;
71 :
72 : public:
73 : /** Construct a read source.
74 :
75 : @param f The fuse used to inject errors during reads.
76 :
77 : @param max_read_size Maximum bytes returned per read.
78 : Use to simulate chunked delivery.
79 : */
80 HIT 451 : explicit read_source(
81 : fuse f = {},
82 : std::size_t max_read_size = std::size_t(-1)) noexcept
83 451 : : f_(std::move(f))
84 451 : , max_read_size_(max_read_size)
85 : {
86 451 : }
87 :
88 : /** Append data to be returned by subsequent reads.
89 :
90 : Multiple calls accumulate data that @ref read returns.
91 :
92 : @param sv The data to append.
93 : */
94 : void
95 406 : provide(std::string_view sv)
96 : {
97 406 : data_.append(sv);
98 406 : }
99 :
100 : /// Clear all data and reset the read position.
101 : void
102 2 : clear() noexcept
103 : {
104 2 : data_.clear();
105 2 : pos_ = 0;
106 2 : }
107 :
108 : /// Return the number of bytes available for reading.
109 : std::size_t
110 30 : available() const noexcept
111 : {
112 30 : return data_.size() - pos_;
113 : }
114 :
115 : /** Asynchronously read some data from the source.
116 :
117 : Transfers up to `buffer_size( buffers )` bytes from the internal
118 : buffer to the provided mutable buffer sequence. If no data
119 : remains, returns `error::eof`. Before every read, the attached
120 : @ref fuse is consulted to possibly inject an error for testing
121 : fault scenarios.
122 :
123 : @param buffers The mutable buffer sequence to receive data.
124 :
125 : @return An awaitable that await-returns `(error_code,std::size_t)`.
126 :
127 : @par Cancellation
128 : If the environment's stop token has been requested, the read
129 : completes immediately with `error::canceled` and transfers no
130 : data. An empty buffer sequence is a no-op that completes
131 : successfully regardless of the stop token.
132 :
133 : @see fuse
134 : */
135 : template<MutableBufferSequence MB>
136 : auto
137 117 : read_some(MB buffers)
138 : {
139 : struct awaitable
140 : {
141 : read_source* self_;
142 : MB buffers_;
143 : bool canceled_ = false;
144 :
145 117 : bool await_ready() const noexcept { return false; }
146 :
147 : // The operation completes synchronously, but await_suspend is
148 : // the only place io_env is delivered (the promise's
149 : // transform_awaiter forwards it here). Returning false means
150 : // the coroutine does not actually suspend; it resumes
151 : // immediately, having observed the stop token. See io_env,
152 : // IoAwaitable.
153 : bool
154 117 : await_suspend(
155 : std::coroutine_handle<>,
156 : io_env const* env) noexcept
157 : {
158 117 : canceled_ = env->stop_token.stop_requested();
159 117 : return false;
160 : }
161 :
162 : io_result<std::size_t>
163 117 : await_resume()
164 : {
165 117 : if(buffer_empty(buffers_))
166 4 : return {{}, 0};
167 :
168 113 : if(canceled_)
169 1 : return {error::canceled, 0};
170 :
171 112 : auto ec = self_->f_.maybe_fail();
172 80 : if(ec)
173 32 : return {ec, 0};
174 :
175 48 : if(self_->pos_ >= self_->data_.size())
176 4 : return {error::eof, 0};
177 :
178 44 : std::size_t avail = self_->data_.size() - self_->pos_;
179 44 : if(avail > self_->max_read_size_)
180 14 : avail = self_->max_read_size_;
181 44 : auto src = make_buffer(self_->data_.data() + self_->pos_, avail);
182 44 : std::size_t const n = buffer_copy(buffers_, src);
183 44 : self_->pos_ += n;
184 44 : return {{}, n};
185 : }
186 : };
187 117 : return awaitable{this, buffers};
188 : }
189 :
190 : /** Asynchronously read data from the source.
191 :
192 : Fills the entire buffer sequence from the internal data.
193 : If the available data is less than the buffer size, returns
194 : `error::eof` with the number of bytes transferred. Before
195 : every read, the attached @ref fuse is consulted to possibly
196 : inject an error for testing fault scenarios.
197 :
198 : Unlike @ref read_some, this ignores `max_read_size` and
199 : transfers all available data in a single operation, matching
200 : the @ref ReadSource semantic contract.
201 :
202 : @param buffers The mutable buffer sequence to receive data.
203 :
204 : @return An awaitable that await-returns `(error_code,std::size_t)`.
205 :
206 : @par Cancellation
207 : If the environment's stop token has been requested, the read
208 : completes immediately with `error::canceled` and transfers no
209 : data. An empty buffer sequence is a no-op that completes
210 : successfully regardless of the stop token.
211 :
212 : @see fuse
213 : */
214 : template<MutableBufferSequence MB>
215 : auto
216 439 : read(MB buffers)
217 : {
218 : struct awaitable
219 : {
220 : read_source* self_;
221 : MB buffers_;
222 : bool canceled_ = false;
223 :
224 439 : bool await_ready() const noexcept { return false; }
225 :
226 : // Reads the stop token without suspending; see the comment
227 : // on read_some() for details.
228 : bool
229 439 : await_suspend(
230 : std::coroutine_handle<>,
231 : io_env const* env) noexcept
232 : {
233 439 : canceled_ = env->stop_token.stop_requested();
234 439 : return false;
235 : }
236 :
237 : io_result<std::size_t>
238 439 : await_resume()
239 : {
240 439 : if(buffer_empty(buffers_))
241 2 : return {{}, 0};
242 :
243 437 : if(canceled_)
244 1 : return {error::canceled, 0};
245 :
246 436 : auto ec = self_->f_.maybe_fail();
247 338 : if(ec)
248 98 : return {ec, 0};
249 :
250 240 : if(self_->pos_ >= self_->data_.size())
251 22 : return {error::eof, 0};
252 :
253 218 : std::size_t avail = self_->data_.size() - self_->pos_;
254 218 : auto src = make_buffer(self_->data_.data() + self_->pos_, avail);
255 218 : std::size_t const n = buffer_copy(buffers_, src);
256 218 : self_->pos_ += n;
257 :
258 218 : if(n < buffer_size(buffers_))
259 84 : return {error::eof, n};
260 134 : return {{}, n};
261 : }
262 : };
263 439 : return awaitable{this, buffers};
264 : }
265 : };
266 :
267 : } // test
268 : } // capy
269 : } // boost
270 :
271 : #endif
|