TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_TEST_STREAM_HPP
12 : #define BOOST_CAPY_TEST_STREAM_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/make_buffer.hpp>
18 : #include <boost/capy/continuation.hpp>
19 : #include <coroutine>
20 : #include <boost/capy/ex/io_env.hpp>
21 : #include <boost/capy/io_result.hpp>
22 : #include <boost/capy/error.hpp>
23 : #include <boost/capy/read.hpp>
24 : #include <boost/capy/task.hpp>
25 : #include <boost/capy/test/fuse.hpp>
26 : #include <boost/capy/test/run_blocking.hpp>
27 :
28 : #include <atomic>
29 : #include <memory>
30 : #include <new>
31 : #include <stop_token>
32 : #include <string>
33 : #include <string_view>
34 : #include <utility>
35 :
36 : namespace boost {
37 : namespace capy {
38 : namespace test {
39 :
40 : /** A connected stream for testing bidirectional I/O.
41 :
42 : Streams are created in pairs via @ref make_stream_pair.
43 : Data written to one end becomes available for reading on
44 : the other. If no data is available when @ref read_some
45 : is called, the calling coroutine suspends until the peer
46 : calls @ref write_some. The shared @ref fuse enables error
47 : injection at controlled points in both directions.
48 :
49 : When the fuse injects an error or throws on one end, the
50 : other end is automatically closed: any suspended reader is
51 : resumed with `error::eof`, and subsequent operations on
52 : both ends return `error::eof`. Calling @ref close on one
53 : end signals eof to the peer's reads after draining any
54 : buffered data, while the peer may still write.
55 :
56 : @par Thread Safety
57 : Single-threaded only. Both ends of the pair must be
58 : accessed from the same thread. Concurrent access is
59 : undefined behavior.
60 :
61 : @par Example
62 : @code
63 : fuse f;
64 : auto [a, b] = make_stream_pair( f );
65 :
66 : auto r = f.armed( [&]( fuse& ) -> task<> {
67 : auto [ec, n] = co_await a.write_some(
68 : const_buffer( "hello", 5 ) );
69 : if( ec )
70 : co_return;
71 :
72 : char buf[32];
73 : auto [ec2, n2] = co_await b.read_some(
74 : mutable_buffer( buf, sizeof( buf ) ) );
75 : if( ec2 )
76 : co_return;
77 : // buf contains "hello"
78 : } );
79 : @endcode
80 :
81 : @see make_stream_pair, fuse
82 : */
83 : class stream
84 : {
85 : // Single-threaded only. No concurrent access to either
86 : // end of the pair. Both streams and all operations must
87 : // run on the same thread.
88 :
89 : struct half
90 : {
91 : std::string buf;
92 : std::size_t max_read_size = std::size_t(-1);
93 : continuation pending_cont_;
94 : executor_ref pending_ex;
95 : // Points at the suspended reader's claim flag (owned by the
96 : // read awaitable). Lets a peer wake coordinate with a stop
97 : // callback so the parked read is resumed exactly once.
98 : std::atomic<bool>* pending_claimed = nullptr;
99 : bool eof = false;
100 : };
101 :
102 : struct state
103 : {
104 : fuse f;
105 : bool closed = false;
106 : half sides[2];
107 :
108 HIT 286 : explicit state(fuse f_) noexcept
109 858 : : f(std::move(f_))
110 : {
111 286 : }
112 :
113 : // Resume a suspended reader on this side, if any. Claims the
114 : // reader's atomic so it is never double-resumed by a racing
115 : // stop callback; the loser of the race skips the post.
116 662 : static void wake(half& side)
117 : {
118 662 : if(! side.pending_cont_.h)
119 637 : return;
120 50 : if(! side.pending_claimed ||
121 25 : ! side.pending_claimed->exchange(
122 : true, std::memory_order_acq_rel))
123 : {
124 25 : side.pending_ex.post(side.pending_cont_);
125 : }
126 25 : side.pending_cont_.h = {};
127 25 : side.pending_ex = {};
128 25 : side.pending_claimed = nullptr;
129 : }
130 :
131 : // Set closed and resume any suspended readers
132 : // with eof on both sides.
133 208 : void close()
134 : {
135 208 : closed = true;
136 624 : for(auto& side : sides)
137 416 : wake(side);
138 208 : }
139 : };
140 :
141 : // Wraps the maybe_fail() call. If the guard is
142 : // not disarmed before destruction (fuse returned
143 : // an error, or threw an exception), closes both
144 : // ends so any suspended peer gets eof.
145 : struct close_guard
146 : {
147 : state* st;
148 : bool armed = true;
149 301 : void disarm() noexcept { armed = false; }
150 509 : ~close_guard() noexcept(false) { if(armed) st->close(); }
151 : };
152 :
153 : std::shared_ptr<state> state_;
154 : int index_;
155 :
156 572 : stream(
157 : std::shared_ptr<state> sp,
158 : int index) noexcept
159 572 : : state_(std::move(sp))
160 572 : , index_(index)
161 : {
162 572 : }
163 :
164 : friend std::pair<stream, stream>
165 : make_stream_pair(fuse);
166 :
167 : public:
168 : stream(stream const&) = delete;
169 : stream& operator=(stream const&) = delete;
170 672 : stream(stream&&) = default;
171 : stream& operator=(stream&&) = default;
172 :
173 : /** Signal end-of-stream to the peer.
174 :
175 : Marks the peer's read direction as closed.
176 : If the peer is suspended in @ref read_some,
177 : it is resumed. The peer drains any buffered
178 : data before receiving `error::eof`. Writes
179 : from the peer are unaffected.
180 : */
181 : void
182 3 : close()
183 : {
184 3 : int peer = 1 - index_;
185 3 : auto& side = state_->sides[peer];
186 3 : side.eof = true;
187 3 : state::wake(side);
188 3 : }
189 :
190 : /** Set the maximum bytes returned per read.
191 :
192 : Limits how many bytes @ref read_some returns in
193 : a single call, simulating chunked network delivery.
194 : The default is unlimited.
195 :
196 : @param n Maximum bytes per read.
197 : */
198 : void
199 54 : set_max_read_size(std::size_t n) noexcept
200 : {
201 54 : state_->sides[index_].max_read_size = n;
202 54 : }
203 :
204 : /** Asynchronously read data from the stream.
205 :
206 : Transfers up to `buffer_size(buffers)` bytes from
207 : data written by the peer. If no data is available,
208 : the calling coroutine suspends until the peer calls
209 : @ref write_some. Before every read, the attached
210 : @ref fuse is consulted to possibly inject an error.
211 : If the fuse fires, the peer is automatically closed.
212 : If the stream is closed, returns `error::eof`.
213 : The returned `std::size_t` is the number of bytes
214 : transferred.
215 :
216 : @param buffers The mutable buffer sequence to receive data.
217 :
218 : @return An awaitable that await-returns `(error_code,std::size_t)`.
219 :
220 : @par Cancellation
221 : Cancellation applies only to a read that would otherwise suspend:
222 : if no data is available and the environment's stop token is
223 : requested (before or during the wait), the read resumes with
224 : `error::canceled`. A read that can complete immediately from
225 : buffered data is unaffected by the stop token.
226 :
227 : @see fuse, close
228 : */
229 : template<MutableBufferSequence MB>
230 : auto
231 280 : read_some(MB buffers)
232 : {
233 : // The read suspends when no data is available, parking its
234 : // continuation on the side until the peer writes/closes. To
235 : // support cancellation it follows the same pattern as
236 : // delay_awaitable: a stop callback claims the resume (racing
237 : // the peer wake via an atomic) and posts the continuation
238 : // through the executor. Because it owns a std::atomic and a
239 : // std::stop_callback, the awaitable needs explicit move and
240 : // destruction (the task promise moves it into its
241 : // transform_awaiter before awaiting).
242 : struct awaitable
243 : {
244 : stream* self_;
245 : MB buffers_;
246 :
247 : // Declared before stop_cb_buf_: the stop callback reads
248 : // these, so they must outlive a blocking stop_cb_ destructor.
249 : continuation cont_;
250 : executor_ref ex_;
251 : half* side_ = nullptr;
252 : std::atomic<bool> claimed_{false};
253 : bool canceled_ = false;
254 : bool stop_cb_active_ = false;
255 :
256 : struct cancel_fn
257 : {
258 : awaitable* self_;
259 :
260 15 : void operator()() const noexcept
261 : {
262 15 : if(! self_->claimed_.exchange(
263 : true, std::memory_order_acq_rel))
264 : {
265 3 : self_->canceled_ = true;
266 3 : self_->ex_.post(self_->cont_);
267 : }
268 15 : }
269 : };
270 :
271 : using stop_cb_t = std::stop_callback<cancel_fn>;
272 :
273 : // Declared last: its destructor may block while the callback
274 : // accesses the members above. A union gives correct alignment
275 : // for stop_cb_t without an alignas specifier, which avoids
276 : // MSVC's C4324 padding warning on this function-local class
277 : // (the member-level pragma used by delay_awaitable does not
278 : // suppress it here). Lifetime is managed manually: placement
279 : // new in await_suspend, explicit destruction once done.
280 : union { stop_cb_t stop_cb_; };
281 :
282 280 : awaitable(stream* self, MB buffers) noexcept
283 280 : : self_(self)
284 280 : , buffers_(buffers)
285 : {
286 280 : }
287 :
288 : /// @pre Not yet awaited (no active stop callback).
289 280 : awaitable(awaitable&& o) noexcept
290 280 : : self_(o.self_)
291 280 : , buffers_(o.buffers_)
292 280 : , cont_(o.cont_)
293 280 : , ex_(o.ex_)
294 280 : , side_(o.side_)
295 280 : , claimed_(o.claimed_.load(std::memory_order_relaxed))
296 280 : , canceled_(o.canceled_)
297 280 : , stop_cb_active_(std::exchange(o.stop_cb_active_, false))
298 : {
299 280 : }
300 :
301 560 : ~awaitable()
302 : {
303 560 : if(stop_cb_active_)
304 1 : stop_cb_.~stop_cb_t();
305 : // Unlink from the side if still parked (e.g. the
306 : // coroutine was destroyed while suspended), so a later
307 : // peer wake does not dereference a freed claim flag.
308 560 : if(side_ && side_->pending_claimed == &claimed_)
309 : {
310 1 : side_->pending_cont_.h = {};
311 1 : side_->pending_ex = {};
312 1 : side_->pending_claimed = nullptr;
313 : }
314 560 : }
315 :
316 : awaitable(awaitable const&) = delete;
317 : awaitable& operator=(awaitable const&) = delete;
318 : awaitable& operator=(awaitable&&) = delete;
319 :
320 280 : bool await_ready() const noexcept
321 : {
322 280 : if(buffer_empty(buffers_))
323 8 : return true;
324 272 : auto* st = self_->state_.get();
325 272 : auto& side = st->sides[self_->index_];
326 542 : return st->closed || side.eof ||
327 542 : !side.buf.empty();
328 : }
329 :
330 29 : std::coroutine_handle<> await_suspend(
331 : std::coroutine_handle<> h,
332 : io_env const* env) noexcept
333 : {
334 : // Park the continuation, then register the stop callback.
335 : // If stop is already requested, the callback fires inline
336 : // during construction: it claims the resume and posts the
337 : // continuation through the executor (never a symmetric
338 : // self-transfer, which would leak this frame under
339 : // run_async). The parked read is then resumed with
340 : // error::canceled by the run loop.
341 29 : auto& side = self_->state_->sides[
342 29 : self_->index_];
343 29 : cont_.h = h;
344 29 : ex_ = env->executor;
345 29 : side_ = &side;
346 29 : side.pending_cont_.h = h;
347 29 : side.pending_ex = env->executor;
348 29 : side.pending_claimed = &claimed_;
349 :
350 29 : ::new(static_cast<void*>(&stop_cb_)) stop_cb_t(
351 29 : env->stop_token, cancel_fn{this});
352 29 : stop_cb_active_ = true;
353 :
354 29 : return std::noop_coroutine();
355 : }
356 :
357 : io_result<std::size_t>
358 279 : await_resume()
359 : {
360 279 : if(stop_cb_active_)
361 : {
362 28 : stop_cb_.~stop_cb_t();
363 28 : stop_cb_active_ = false;
364 : }
365 :
366 279 : if(buffer_empty(buffers_))
367 8 : return {{}, 0};
368 :
369 271 : if(canceled_)
370 : {
371 : // The stop callback posted us but left the side
372 : // untouched; unlink if a peer wake has not already.
373 3 : if(side_ && side_->pending_claimed == &claimed_)
374 : {
375 3 : side_->pending_cont_.h = {};
376 3 : side_->pending_ex = {};
377 3 : side_->pending_claimed = nullptr;
378 : }
379 3 : return {error::canceled, 0};
380 : }
381 :
382 268 : auto* st = self_->state_.get();
383 268 : auto& side = st->sides[
384 268 : self_->index_];
385 :
386 268 : if(st->closed)
387 12 : return {error::eof, 0};
388 :
389 256 : if(side.eof && side.buf.empty())
390 3 : return {error::eof, 0};
391 :
392 253 : if(!side.eof)
393 : {
394 253 : close_guard g{st};
395 253 : auto ec = st->f.maybe_fail();
396 200 : if(ec)
397 53 : return {ec, 0};
398 147 : g.disarm();
399 253 : }
400 :
401 294 : std::size_t const n = buffer_copy(
402 147 : buffers_, make_buffer(side.buf),
403 : side.max_read_size);
404 147 : side.buf.erase(0, n);
405 147 : return {{}, n};
406 : }
407 : };
408 280 : return awaitable{this, buffers};
409 : }
410 :
411 : /** Asynchronously write data to the stream.
412 :
413 : Transfers up to `buffer_size(buffers)` bytes to the
414 : peer's incoming buffer. If the peer is suspended in
415 : @ref read_some, it is resumed. Before every write,
416 : the attached @ref fuse is consulted to possibly inject
417 : an error. If the fuse fires, the peer is automatically
418 : closed. If the stream is closed, returns `error::eof`.
419 : The returned `std::size_t` is the number of bytes
420 : transferred.
421 :
422 : @param buffers The const buffer sequence containing
423 : data to write.
424 :
425 : @return An awaitable that await-returns `(error_code,std::size_t)`.
426 :
427 : @par Cancellation
428 : If the environment's stop token has been requested, the write
429 : completes immediately with `error::canceled` and transfers no
430 : data. An empty buffer sequence is a no-op that completes
431 : successfully regardless of the stop token.
432 :
433 : @see fuse, close
434 : */
435 : template<ConstBufferSequence CB>
436 : auto
437 261 : write_some(CB buffers)
438 : {
439 : struct awaitable
440 : {
441 : stream* self_;
442 : CB buffers_;
443 : bool canceled_ = false;
444 :
445 261 : bool await_ready() const noexcept { return false; }
446 :
447 : // The write completes synchronously; await_suspend is only
448 : // used to observe the environment's stop token. Returning
449 : // false means the coroutine does not actually suspend.
450 : bool
451 261 : await_suspend(
452 : std::coroutine_handle<>,
453 : io_env const* env) noexcept
454 : {
455 261 : canceled_ = env->stop_token.stop_requested();
456 261 : return false;
457 : }
458 :
459 : io_result<std::size_t>
460 261 : await_resume()
461 : {
462 261 : std::size_t n = buffer_size(buffers_);
463 261 : if(n == 0)
464 4 : return {{}, 0};
465 :
466 257 : if(canceled_)
467 1 : return {error::canceled, 0};
468 :
469 256 : auto* st = self_->state_.get();
470 :
471 256 : if(st->closed)
472 MIS 0 : return {error::eof, 0};
473 :
474 HIT 256 : close_guard g{st};
475 256 : auto ec = st->f.maybe_fail();
476 205 : if(ec)
477 51 : return {ec, 0};
478 154 : g.disarm();
479 :
480 154 : int peer = 1 - self_->index_;
481 154 : auto& side = st->sides[peer];
482 :
483 154 : std::size_t const old_size = side.buf.size();
484 154 : side.buf.resize(old_size + n);
485 154 : buffer_copy(make_buffer(
486 154 : side.buf.data() + old_size, n),
487 154 : buffers_, n);
488 :
489 154 : state::wake(side);
490 :
491 154 : return {{}, n};
492 256 : }
493 : };
494 261 : return awaitable{this, buffers};
495 : }
496 :
497 : /** Inject data into this stream's peer for reading.
498 :
499 : Appends data directly to the peer's incoming buffer,
500 : bypassing the fuse. If the peer is suspended in
501 : @ref read_some, it is resumed. This is test setup,
502 : not an operation under test.
503 :
504 : @param sv The data to inject.
505 :
506 : @see make_stream_pair
507 : */
508 : void
509 89 : provide(std::string_view sv)
510 : {
511 89 : int peer = 1 - index_;
512 89 : auto& side = state_->sides[peer];
513 89 : side.buf.append(sv);
514 89 : state::wake(side);
515 89 : }
516 :
517 : /** Read from this stream and verify the content.
518 :
519 : Reads exactly `expected.size()` bytes from the stream
520 : and compares against the expected string. The read goes
521 : through the normal path including the fuse.
522 :
523 : @param expected The expected content.
524 :
525 : @return A pair of `(error_code, bool)`. The error_code
526 : is set if a read error occurs (e.g. fuse injection).
527 : The bool is true if the data matches.
528 :
529 : @see provide
530 : */
531 : std::pair<std::error_code, bool>
532 38 : expect(std::string_view expected)
533 : {
534 38 : std::error_code result;
535 38 : bool match = false;
536 141 : run_blocking()([](
537 : stream& self,
538 : std::string_view expected,
539 : std::error_code& result,
540 : bool& match) -> task<>
541 : {
542 : std::string buf(expected.size(), '\0');
543 : auto [ec, n] = co_await read(
544 : self, mutable_buffer(
545 : buf.data(), buf.size()));
546 : if(ec)
547 : {
548 : result = ec;
549 : co_return;
550 : }
551 : match = (std::string_view(
552 : buf.data(), n) == expected);
553 161 : }(*this, expected, result, match));
554 58 : return {result, match};
555 : }
556 :
557 : /** Return the stream's pending read data.
558 :
559 : Returns a view of the data waiting to be read
560 : from this stream. This is a direct peek at the
561 : internal buffer, bypassing the fuse.
562 :
563 : @return A view of the pending data.
564 :
565 : @see provide, expect
566 : */
567 : std::string_view
568 : data() const noexcept
569 : {
570 : return state_->sides[index_].buf;
571 : }
572 : };
573 :
574 : /** Create a connected pair of test streams.
575 :
576 : Data written to one stream becomes readable on the other.
577 : If a coroutine calls @ref stream::read_some when no data
578 : is available, it suspends until the peer writes. Before
579 : every read or write, the @ref fuse is consulted to
580 : possibly inject an error for testing fault scenarios.
581 : When the fuse fires, the peer is automatically closed.
582 :
583 : @param f The fuse used to inject errors during operations.
584 :
585 : @return A pair of connected streams.
586 :
587 : @see stream, fuse
588 : */
589 : inline std::pair<stream, stream>
590 286 : make_stream_pair(fuse f = {})
591 : {
592 286 : auto sp = std::make_shared<stream::state>(std::move(f));
593 572 : return {stream(sp, 0), stream(sp, 1)};
594 286 : }
595 :
596 : } // test
597 : } // capy
598 : } // boost
599 :
600 : #endif
|