TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_TEST_WRITE_SINK_HPP
12 : #define BOOST_CAPY_TEST_WRITE_SINK_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/make_buffer.hpp>
18 : #include <coroutine>
19 : #include <boost/capy/ex/io_env.hpp>
20 : #include <boost/capy/io_result.hpp>
21 : #include <boost/capy/error.hpp>
22 : #include <boost/capy/test/fuse.hpp>
23 :
24 : #include <algorithm>
25 : #include <string>
26 : #include <string_view>
27 :
28 : namespace boost {
29 : namespace capy {
30 : namespace test {
31 :
32 : /** A mock sink for testing write operations.
33 :
34 : Use this to verify code that performs complete writes without needing
35 : real I/O. Call @ref write to write data, then @ref data to retrieve
36 : what was written. The associated @ref fuse enables error injection
37 : at controlled points.
38 :
39 : This class satisfies the @ref WriteSink concept by providing partial
40 : writes via `write_some` (satisfying @ref WriteStream), complete
41 : writes via `write`, and EOF signaling via `write_eof`.
42 :
43 : @par Thread Safety
44 : Not thread-safe.
45 :
46 : @par Example
47 : @code
48 : fuse f;
49 : write_sink ws( f );
50 :
51 : auto r = f.armed( [&]( fuse& ) -> task<void> {
52 : auto [ec, n] = co_await ws.write(
53 : const_buffer( "Hello", 5 ) );
54 : if( ec )
55 : co_return;
56 : auto [ec2] = co_await ws.write_eof();
57 : if( ec2 )
58 : co_return;
59 : // ws.data() returns "Hello"
60 : } );
61 : @endcode
62 :
63 : @see fuse, WriteSink
64 : */
65 : class write_sink
66 : {
67 : fuse f_;
68 : std::string data_;
69 : std::string expect_;
70 : std::size_t max_write_size_;
71 : bool eof_called_ = false;
72 :
73 : std::error_code
74 HIT 236 : consume_match_() noexcept
75 : {
76 236 : if(data_.empty() || expect_.empty())
77 228 : return {};
78 8 : std::size_t const n = (std::min)(data_.size(), expect_.size());
79 8 : if(std::string_view(data_.data(), n) !=
80 16 : std::string_view(expect_.data(), n))
81 4 : return error::test_failure;
82 4 : data_.erase(0, n);
83 4 : expect_.erase(0, n);
84 4 : return {};
85 : }
86 :
87 : public:
88 : /** Construct a write sink.
89 :
90 : @param f The fuse used to inject errors during writes.
91 :
92 : @param max_write_size Maximum bytes transferred per write.
93 : Use to simulate chunked delivery.
94 : */
95 416 : explicit write_sink(
96 : fuse f = {},
97 : std::size_t max_write_size = std::size_t(-1)) noexcept
98 416 : : f_(std::move(f))
99 416 : , max_write_size_(max_write_size)
100 : {
101 416 : }
102 :
103 : /// Return the written data as a string view.
104 : std::string_view
105 100 : data() const noexcept
106 : {
107 100 : return data_;
108 : }
109 :
110 : /** Set the expected data for subsequent writes.
111 :
112 : Stores the expected data and immediately tries to match
113 : against any data already written. Matched data is consumed
114 : from both buffers.
115 :
116 : @param sv The expected data.
117 :
118 : @return An error if existing data does not match.
119 : */
120 : std::error_code
121 16 : expect(std::string_view sv)
122 : {
123 16 : expect_.assign(sv);
124 16 : return consume_match_();
125 : }
126 :
127 : /// Return the number of bytes written.
128 : std::size_t
129 9 : size() const noexcept
130 : {
131 9 : return data_.size();
132 : }
133 :
134 : /// Return whether write_eof has been called.
135 : bool
136 66 : eof_called() const noexcept
137 : {
138 66 : return eof_called_;
139 : }
140 :
141 : /// Clear all data and reset state.
142 : void
143 4 : clear() noexcept
144 : {
145 4 : data_.clear();
146 4 : expect_.clear();
147 4 : eof_called_ = false;
148 4 : }
149 :
150 : /** Asynchronously write some data to the sink.
151 :
152 : Transfers up to `buffer_size( buffers )` bytes from the provided
153 : const buffer sequence to the internal buffer. Before every write,
154 : the attached @ref fuse is consulted to possibly inject an error.
155 :
156 : @param buffers The const buffer sequence containing data to write.
157 :
158 : @return An awaitable that await-returns `(error_code,std::size_t)`.
159 :
160 : @par Cancellation
161 : If the environment's stop token has been requested, the write
162 : completes immediately with `error::canceled` and transfers no
163 : data. An empty buffer sequence is a no-op that completes
164 : successfully regardless of the stop token.
165 :
166 : @see fuse
167 : */
168 : template<ConstBufferSequence CB>
169 : auto
170 77 : write_some(CB buffers)
171 : {
172 : struct awaitable
173 : {
174 : write_sink* self_;
175 : CB buffers_;
176 : bool canceled_ = false;
177 :
178 77 : bool await_ready() const noexcept { return false; }
179 :
180 : // The operation completes synchronously, but await_suspend is
181 : // the only place io_env is delivered (the promise's
182 : // transform_awaiter forwards it here). Returning false means
183 : // the coroutine does not actually suspend; it resumes
184 : // immediately, having observed the stop token. See io_env,
185 : // IoAwaitable.
186 : bool
187 77 : await_suspend(
188 : std::coroutine_handle<>,
189 : io_env const* env) noexcept
190 : {
191 77 : canceled_ = env->stop_token.stop_requested();
192 77 : return false;
193 : }
194 :
195 : io_result<std::size_t>
196 77 : await_resume()
197 : {
198 77 : if(buffer_empty(buffers_))
199 2 : return {{}, 0};
200 :
201 75 : if(canceled_)
202 1 : return {error::canceled, 0};
203 :
204 74 : auto ec = self_->f_.maybe_fail();
205 53 : if(ec)
206 21 : return {ec, 0};
207 :
208 32 : std::size_t n = buffer_size(buffers_);
209 32 : n = (std::min)(n, self_->max_write_size_);
210 :
211 32 : std::size_t const old_size = self_->data_.size();
212 32 : self_->data_.resize(old_size + n);
213 32 : buffer_copy(make_buffer(
214 32 : self_->data_.data() + old_size, n), buffers_, n);
215 :
216 32 : ec = self_->consume_match_();
217 32 : if(ec)
218 : {
219 MIS 0 : self_->data_.resize(old_size);
220 0 : return {ec, 0};
221 : }
222 :
223 HIT 32 : return {{}, n};
224 : }
225 : };
226 77 : return awaitable{this, buffers};
227 : }
228 :
229 : /** Asynchronously write data to the sink.
230 :
231 : Transfers all bytes from the provided const buffer sequence
232 : to the internal buffer. Unlike @ref write_some, this ignores
233 : `max_write_size` and writes all available data, matching the
234 : @ref WriteSink semantic contract.
235 :
236 : @param buffers The const buffer sequence containing data to write.
237 :
238 : @return An awaitable that await-returns `(error_code,std::size_t)`.
239 :
240 : @par Cancellation
241 : If the environment's stop token has been requested, the write
242 : completes immediately with `error::canceled` and transfers no
243 : data.
244 :
245 : @see fuse
246 : */
247 : template<ConstBufferSequence CB>
248 : auto
249 303 : write(CB buffers)
250 : {
251 : struct awaitable
252 : {
253 : write_sink* self_;
254 : CB buffers_;
255 : bool canceled_ = false;
256 :
257 303 : bool await_ready() const noexcept { return false; }
258 :
259 : // Reads the stop token without suspending; see the comment
260 : // on write_some() for details.
261 : bool
262 303 : await_suspend(
263 : std::coroutine_handle<>,
264 : io_env const* env) noexcept
265 : {
266 303 : canceled_ = env->stop_token.stop_requested();
267 303 : return false;
268 : }
269 :
270 : io_result<std::size_t>
271 303 : await_resume()
272 : {
273 303 : if(canceled_)
274 1 : return {error::canceled, 0};
275 :
276 302 : auto ec = self_->f_.maybe_fail();
277 241 : if(ec)
278 61 : return {ec, 0};
279 :
280 180 : std::size_t n = buffer_size(buffers_);
281 180 : if(n == 0)
282 2 : return {{}, 0};
283 :
284 178 : std::size_t const old_size = self_->data_.size();
285 178 : self_->data_.resize(old_size + n);
286 178 : buffer_copy(make_buffer(
287 178 : self_->data_.data() + old_size, n), buffers_);
288 :
289 178 : ec = self_->consume_match_();
290 178 : if(ec)
291 2 : return {ec, n};
292 :
293 176 : return {{}, n};
294 : }
295 : };
296 303 : return awaitable{this, buffers};
297 : }
298 :
299 : /** Atomically write data and signal end-of-stream.
300 :
301 : Transfers all bytes from the provided const buffer sequence to
302 : the internal buffer and signals end-of-stream. Before the write,
303 : the attached @ref fuse is consulted to possibly inject an error
304 : for testing fault scenarios.
305 :
306 : @par Effects
307 : On success, appends the written bytes to the internal buffer
308 : and marks the sink as finalized.
309 : If an error is injected by the fuse, the internal buffer remains
310 : unchanged.
311 :
312 : @par Exception Safety
313 : No-throw guarantee.
314 :
315 : @par Cancellation
316 : If the environment's stop token has been requested, the operation
317 : completes immediately with `error::canceled`, transfers no data,
318 : and does not signal end-of-stream.
319 :
320 : @param buffers The const buffer sequence containing data to write.
321 :
322 : @return An awaitable that await-returns `(error_code,std::size_t)`.
323 :
324 : @see fuse
325 : */
326 : template<ConstBufferSequence CB>
327 : auto
328 35 : write_eof(CB buffers)
329 : {
330 : struct awaitable
331 : {
332 : write_sink* self_;
333 : CB buffers_;
334 : bool canceled_ = false;
335 :
336 35 : bool await_ready() const noexcept { return false; }
337 :
338 : // Reads the stop token without suspending; see the comment
339 : // on write_some() for details.
340 : bool
341 35 : await_suspend(
342 : std::coroutine_handle<>,
343 : io_env const* env) noexcept
344 : {
345 35 : canceled_ = env->stop_token.stop_requested();
346 35 : return false;
347 : }
348 :
349 : io_result<std::size_t>
350 35 : await_resume()
351 : {
352 35 : if(canceled_)
353 1 : return {error::canceled, 0};
354 :
355 34 : auto ec = self_->f_.maybe_fail();
356 23 : if(ec)
357 11 : return {ec, 0};
358 :
359 12 : std::size_t n = buffer_size(buffers_);
360 12 : if(n > 0)
361 : {
362 10 : std::size_t const old_size = self_->data_.size();
363 10 : self_->data_.resize(old_size + n);
364 10 : buffer_copy(make_buffer(
365 10 : self_->data_.data() + old_size, n), buffers_);
366 :
367 10 : ec = self_->consume_match_();
368 10 : if(ec)
369 MIS 0 : return {ec, n};
370 : }
371 :
372 HIT 12 : self_->eof_called_ = true;
373 :
374 12 : return {{}, n};
375 : }
376 : };
377 35 : return awaitable{this, buffers};
378 : }
379 :
380 : /** Signal end-of-stream.
381 :
382 : Marks the sink as finalized, indicating no more data will be
383 : written. Before signaling, the attached @ref fuse is consulted
384 : to possibly inject an error for testing fault scenarios.
385 :
386 : @par Effects
387 : On success, marks the sink as finalized.
388 : If an error is injected by the fuse, the state remains unchanged.
389 :
390 : @par Exception Safety
391 : No-throw guarantee.
392 :
393 : @par Cancellation
394 : If the environment's stop token has been requested, the operation
395 : completes immediately with `error::canceled` and does not signal
396 : end-of-stream.
397 :
398 : @return An awaitable that await-returns `(error_code)`.
399 :
400 : @see fuse
401 : */
402 : auto
403 83 : write_eof()
404 : {
405 : struct awaitable
406 : {
407 : write_sink* self_;
408 : bool canceled_ = false;
409 :
410 83 : bool await_ready() const noexcept { return false; }
411 :
412 : // Reads the stop token without suspending; see the comment
413 : // on write_some() for details.
414 : bool
415 83 : await_suspend(
416 : std::coroutine_handle<>,
417 : io_env const* env) noexcept
418 : {
419 83 : canceled_ = env->stop_token.stop_requested();
420 83 : return false;
421 : }
422 :
423 : io_result<>
424 83 : await_resume()
425 : {
426 83 : if(canceled_)
427 1 : return {error::canceled};
428 :
429 82 : auto ec = self_->f_.maybe_fail();
430 60 : if(ec)
431 22 : return {ec};
432 :
433 38 : self_->eof_called_ = true;
434 38 : return {};
435 : }
436 : };
437 83 : return awaitable{this};
438 : }
439 : };
440 :
441 : } // test
442 : } // capy
443 : } // boost
444 :
445 : #endif
|