LCOV - code coverage report
Current view: top level - capy/test - stream.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 99.4 % 158 157 1
Test Date: 2026-06-24 15:55:48 Functions: 91.8 % 49 45 4

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Michael Vandeberg
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/capy
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_CAPY_TEST_STREAM_HPP
      12                 : #define BOOST_CAPY_TEST_STREAM_HPP
      13                 : 
      14                 : #include <boost/capy/detail/config.hpp>
      15                 : #include <boost/capy/buffers.hpp>
      16                 : #include <boost/capy/buffers/buffer_copy.hpp>
      17                 : #include <boost/capy/buffers/make_buffer.hpp>
      18                 : #include <boost/capy/continuation.hpp>
      19                 : #include <coroutine>
      20                 : #include <boost/capy/ex/io_env.hpp>
      21                 : #include <boost/capy/io_result.hpp>
      22                 : #include <boost/capy/error.hpp>
      23                 : #include <boost/capy/read.hpp>
      24                 : #include <boost/capy/task.hpp>
      25                 : #include <boost/capy/test/fuse.hpp>
      26                 : #include <boost/capy/test/run_blocking.hpp>
      27                 : 
      28                 : #include <atomic>
      29                 : #include <memory>
      30                 : #include <new>
      31                 : #include <stop_token>
      32                 : #include <string>
      33                 : #include <string_view>
      34                 : #include <utility>
      35                 : 
      36                 : namespace boost {
      37                 : namespace capy {
      38                 : namespace test {
      39                 : 
      40                 : /** A connected stream for testing bidirectional I/O.
      41                 : 
      42                 :     Streams are created in pairs via @ref make_stream_pair.
      43                 :     Data written to one end becomes available for reading on
      44                 :     the other. If no data is available when @ref read_some
      45                 :     is called, the calling coroutine suspends until the peer
      46                 :     calls @ref write_some. The shared @ref fuse enables error
      47                 :     injection at controlled points in both directions.
      48                 : 
      49                 :     When the fuse injects an error or throws on one end, the
      50                 :     other end is automatically closed: any suspended reader is
      51                 :     resumed with `error::eof`, and subsequent operations on
      52                 :     both ends return `error::eof`. Calling @ref close on one
      53                 :     end signals eof to the peer's reads after draining any
      54                 :     buffered data, while the peer may still write.
      55                 : 
      56                 :     @par Thread Safety
      57                 :     Single-threaded only. Both ends of the pair must be
      58                 :     accessed from the same thread. Concurrent access is
      59                 :     undefined behavior.
      60                 : 
      61                 :     @par Example
      62                 :     @code
      63                 :     fuse f;
      64                 :     auto [a, b] = make_stream_pair( f );
      65                 : 
      66                 :     auto r = f.armed( [&]( fuse& ) -> task<> {
      67                 :         auto [ec, n] = co_await a.write_some(
      68                 :             const_buffer( "hello", 5 ) );
      69                 :         if( ec )
      70                 :             co_return;
      71                 : 
      72                 :         char buf[32];
      73                 :         auto [ec2, n2] = co_await b.read_some(
      74                 :             mutable_buffer( buf, sizeof( buf ) ) );
      75                 :         if( ec2 )
      76                 :             co_return;
      77                 :         // buf contains "hello"
      78                 :     } );
      79                 :     @endcode
      80                 : 
      81                 :     @see make_stream_pair, fuse
      82                 : */
      83                 : class stream
      84                 : {
      85                 :     // Single-threaded only. No concurrent access to either
      86                 :     // end of the pair. Both streams and all operations must
      87                 :     // run on the same thread.
      88                 : 
      89                 :     struct half
      90                 :     {
      91                 :         std::string buf;
      92                 :         std::size_t max_read_size = std::size_t(-1);
      93                 :         continuation pending_cont_;
      94                 :         executor_ref pending_ex;
      95                 :         // Points at the suspended reader's claim flag (owned by the
      96                 :         // read awaitable). Lets a peer wake coordinate with a stop
      97                 :         // callback so the parked read is resumed exactly once.
      98                 :         std::atomic<bool>* pending_claimed = nullptr;
      99                 :         bool eof = false;
     100                 :     };
     101                 : 
     102                 :     struct state
     103                 :     {
     104                 :         fuse f;
     105                 :         bool closed = false;
     106                 :         half sides[2];
     107                 : 
     108 HIT         286 :         explicit state(fuse f_) noexcept
     109             858 :             : f(std::move(f_))
     110                 :         {
     111             286 :         }
     112                 : 
     113                 :         // Resume a suspended reader on this side, if any. Claims the
     114                 :         // reader's atomic so it is never double-resumed by a racing
     115                 :         // stop callback; the loser of the race skips the post.
     116             662 :         static void wake(half& side)
     117                 :         {
     118             662 :             if(! side.pending_cont_.h)
     119             637 :                 return;
     120              50 :             if(! side.pending_claimed ||
     121              25 :                 ! side.pending_claimed->exchange(
     122                 :                     true, std::memory_order_acq_rel))
     123                 :             {
     124              25 :                 side.pending_ex.post(side.pending_cont_);
     125                 :             }
     126              25 :             side.pending_cont_.h = {};
     127              25 :             side.pending_ex = {};
     128              25 :             side.pending_claimed = nullptr;
     129                 :         }
     130                 : 
     131                 :         // Set closed and resume any suspended readers
     132                 :         // with eof on both sides.
     133             208 :         void close()
     134                 :         {
     135             208 :             closed = true;
     136             624 :             for(auto& side : sides)
     137             416 :                 wake(side);
     138             208 :         }
     139                 :     };
     140                 : 
     141                 :     // Wraps the maybe_fail() call. If the guard is
     142                 :     // not disarmed before destruction (fuse returned
     143                 :     // an error, or threw an exception), closes both
     144                 :     // ends so any suspended peer gets eof.
     145                 :     struct close_guard
     146                 :     {
     147                 :         state* st;
     148                 :         bool armed = true;
     149             301 :         void disarm() noexcept { armed = false; }
     150             509 :         ~close_guard() noexcept(false) { if(armed) st->close(); }
     151                 :     };
     152                 : 
     153                 :     std::shared_ptr<state> state_;
     154                 :     int index_;
     155                 : 
     156             572 :     stream(
     157                 :         std::shared_ptr<state> sp,
     158                 :         int index) noexcept
     159             572 :         : state_(std::move(sp))
     160             572 :         , index_(index)
     161                 :     {
     162             572 :     }
     163                 : 
     164                 :     friend std::pair<stream, stream>
     165                 :     make_stream_pair(fuse);
     166                 : 
     167                 : public:
     168                 :     stream(stream const&) = delete;
     169                 :     stream& operator=(stream const&) = delete;
     170             672 :     stream(stream&&) = default;
     171                 :     stream& operator=(stream&&) = default;
     172                 : 
     173                 :     /** Signal end-of-stream to the peer.
     174                 : 
     175                 :         Marks the peer's read direction as closed.
     176                 :         If the peer is suspended in @ref read_some,
     177                 :         it is resumed. The peer drains any buffered
     178                 :         data before receiving `error::eof`. Writes
     179                 :         from the peer are unaffected.
     180                 :     */
     181                 :     void
     182               3 :     close()
     183                 :     {
     184               3 :         int peer = 1 - index_;
     185               3 :         auto& side = state_->sides[peer];
     186               3 :         side.eof = true;
     187               3 :         state::wake(side);
     188               3 :     }
     189                 : 
     190                 :     /** Set the maximum bytes returned per read.
     191                 : 
     192                 :         Limits how many bytes @ref read_some returns in
     193                 :         a single call, simulating chunked network delivery.
     194                 :         The default is unlimited.
     195                 : 
     196                 :         @param n Maximum bytes per read.
     197                 :     */
     198                 :     void
     199              54 :     set_max_read_size(std::size_t n) noexcept
     200                 :     {
     201              54 :         state_->sides[index_].max_read_size = n;
     202              54 :     }
     203                 : 
     204                 :     /** Asynchronously read data from the stream.
     205                 : 
     206                 :         Transfers up to `buffer_size(buffers)` bytes from
     207                 :         data written by the peer. If no data is available,
     208                 :         the calling coroutine suspends until the peer calls
     209                 :         @ref write_some. Before every read, the attached
     210                 :         @ref fuse is consulted to possibly inject an error.
     211                 :         If the fuse fires, the peer is automatically closed.
     212                 :         If the stream is closed, returns `error::eof`.
     213                 :         The returned `std::size_t` is the number of bytes
     214                 :         transferred.
     215                 : 
     216                 :         @param buffers The mutable buffer sequence to receive data.
     217                 : 
     218                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     219                 : 
     220                 :         @par Cancellation
     221                 :         Cancellation applies only to a read that would otherwise suspend:
     222                 :         if no data is available and the environment's stop token is
     223                 :         requested (before or during the wait), the read resumes with
     224                 :         `error::canceled`. A read that can complete immediately from
     225                 :         buffered data is unaffected by the stop token.
     226                 : 
     227                 :         @see fuse, close
     228                 :     */
     229                 :     template<MutableBufferSequence MB>
     230                 :     auto
     231             280 :     read_some(MB buffers)
     232                 :     {
     233                 :         // The read suspends when no data is available, parking its
     234                 :         // continuation on the side until the peer writes/closes. To
     235                 :         // support cancellation it follows the same pattern as
     236                 :         // delay_awaitable: a stop callback claims the resume (racing
     237                 :         // the peer wake via an atomic) and posts the continuation
     238                 :         // through the executor. Because it owns a std::atomic and a
     239                 :         // std::stop_callback, the awaitable needs explicit move and
     240                 :         // destruction (the task promise moves it into its
     241                 :         // transform_awaiter before awaiting).
     242                 :         struct awaitable
     243                 :         {
     244                 :             stream* self_;
     245                 :             MB buffers_;
     246                 : 
     247                 :             // Declared before stop_cb_buf_: the stop callback reads
     248                 :             // these, so they must outlive a blocking stop_cb_ destructor.
     249                 :             continuation cont_;
     250                 :             executor_ref ex_;
     251                 :             half* side_ = nullptr;
     252                 :             std::atomic<bool> claimed_{false};
     253                 :             bool canceled_ = false;
     254                 :             bool stop_cb_active_ = false;
     255                 : 
     256                 :             struct cancel_fn
     257                 :             {
     258                 :                 awaitable* self_;
     259                 : 
     260              15 :                 void operator()() const noexcept
     261                 :                 {
     262              15 :                     if(! self_->claimed_.exchange(
     263                 :                         true, std::memory_order_acq_rel))
     264                 :                     {
     265               3 :                         self_->canceled_ = true;
     266               3 :                         self_->ex_.post(self_->cont_);
     267                 :                     }
     268              15 :                 }
     269                 :             };
     270                 : 
     271                 :             using stop_cb_t = std::stop_callback<cancel_fn>;
     272                 : 
     273                 :             // Declared last: its destructor may block while the callback
     274                 :             // accesses the members above. A union gives correct alignment
     275                 :             // for stop_cb_t without an alignas specifier, which avoids
     276                 :             // MSVC's C4324 padding warning on this function-local class
     277                 :             // (the member-level pragma used by delay_awaitable does not
     278                 :             // suppress it here). Lifetime is managed manually: placement
     279                 :             // new in await_suspend, explicit destruction once done.
     280                 :             union { stop_cb_t stop_cb_; };
     281                 : 
     282             280 :             awaitable(stream* self, MB buffers) noexcept
     283             280 :                 : self_(self)
     284             280 :                 , buffers_(buffers)
     285                 :             {
     286             280 :             }
     287                 : 
     288                 :             /// @pre Not yet awaited (no active stop callback).
     289             280 :             awaitable(awaitable&& o) noexcept
     290             280 :                 : self_(o.self_)
     291             280 :                 , buffers_(o.buffers_)
     292             280 :                 , cont_(o.cont_)
     293             280 :                 , ex_(o.ex_)
     294             280 :                 , side_(o.side_)
     295             280 :                 , claimed_(o.claimed_.load(std::memory_order_relaxed))
     296             280 :                 , canceled_(o.canceled_)
     297             280 :                 , stop_cb_active_(std::exchange(o.stop_cb_active_, false))
     298                 :             {
     299             280 :             }
     300                 : 
     301             560 :             ~awaitable()
     302                 :             {
     303             560 :                 if(stop_cb_active_)
     304               1 :                     stop_cb_.~stop_cb_t();
     305                 :                 // Unlink from the side if still parked (e.g. the
     306                 :                 // coroutine was destroyed while suspended), so a later
     307                 :                 // peer wake does not dereference a freed claim flag.
     308             560 :                 if(side_ && side_->pending_claimed == &claimed_)
     309                 :                 {
     310               1 :                     side_->pending_cont_.h = {};
     311               1 :                     side_->pending_ex = {};
     312               1 :                     side_->pending_claimed = nullptr;
     313                 :                 }
     314             560 :             }
     315                 : 
     316                 :             awaitable(awaitable const&) = delete;
     317                 :             awaitable& operator=(awaitable const&) = delete;
     318                 :             awaitable& operator=(awaitable&&) = delete;
     319                 : 
     320             280 :             bool await_ready() const noexcept
     321                 :             {
     322             280 :                 if(buffer_empty(buffers_))
     323               8 :                     return true;
     324             272 :                 auto* st = self_->state_.get();
     325             272 :                 auto& side = st->sides[self_->index_];
     326             542 :                 return st->closed || side.eof ||
     327             542 :                     !side.buf.empty();
     328                 :             }
     329                 : 
     330              29 :             std::coroutine_handle<> await_suspend(
     331                 :                 std::coroutine_handle<> h,
     332                 :                 io_env const* env) noexcept
     333                 :             {
     334                 :                 // Park the continuation, then register the stop callback.
     335                 :                 // If stop is already requested, the callback fires inline
     336                 :                 // during construction: it claims the resume and posts the
     337                 :                 // continuation through the executor (never a symmetric
     338                 :                 // self-transfer, which would leak this frame under
     339                 :                 // run_async). The parked read is then resumed with
     340                 :                 // error::canceled by the run loop.
     341              29 :                 auto& side = self_->state_->sides[
     342              29 :                     self_->index_];
     343              29 :                 cont_.h = h;
     344              29 :                 ex_ = env->executor;
     345              29 :                 side_ = &side;
     346              29 :                 side.pending_cont_.h = h;
     347              29 :                 side.pending_ex = env->executor;
     348              29 :                 side.pending_claimed = &claimed_;
     349                 : 
     350              29 :                 ::new(static_cast<void*>(&stop_cb_)) stop_cb_t(
     351              29 :                     env->stop_token, cancel_fn{this});
     352              29 :                 stop_cb_active_ = true;
     353                 : 
     354              29 :                 return std::noop_coroutine();
     355                 :             }
     356                 : 
     357                 :             io_result<std::size_t>
     358             279 :             await_resume()
     359                 :             {
     360             279 :                 if(stop_cb_active_)
     361                 :                 {
     362              28 :                     stop_cb_.~stop_cb_t();
     363              28 :                     stop_cb_active_ = false;
     364                 :                 }
     365                 : 
     366             279 :                 if(buffer_empty(buffers_))
     367               8 :                     return {{}, 0};
     368                 : 
     369             271 :                 if(canceled_)
     370                 :                 {
     371                 :                     // The stop callback posted us but left the side
     372                 :                     // untouched; unlink if a peer wake has not already.
     373               3 :                     if(side_ && side_->pending_claimed == &claimed_)
     374                 :                     {
     375               3 :                         side_->pending_cont_.h = {};
     376               3 :                         side_->pending_ex = {};
     377               3 :                         side_->pending_claimed = nullptr;
     378                 :                     }
     379               3 :                     return {error::canceled, 0};
     380                 :                 }
     381                 : 
     382             268 :                 auto* st = self_->state_.get();
     383             268 :                 auto& side = st->sides[
     384             268 :                     self_->index_];
     385                 : 
     386             268 :                 if(st->closed)
     387              12 :                     return {error::eof, 0};
     388                 : 
     389             256 :                 if(side.eof && side.buf.empty())
     390               3 :                     return {error::eof, 0};
     391                 : 
     392             253 :                 if(!side.eof)
     393                 :                 {
     394             253 :                     close_guard g{st};
     395             253 :                     auto ec = st->f.maybe_fail();
     396             200 :                     if(ec)
     397              53 :                         return {ec, 0};
     398             147 :                     g.disarm();
     399             253 :                 }
     400                 : 
     401             294 :                 std::size_t const n = buffer_copy(
     402             147 :                     buffers_, make_buffer(side.buf),
     403                 :                     side.max_read_size);
     404             147 :                 side.buf.erase(0, n);
     405             147 :                 return {{}, n};
     406                 :             }
     407                 :         };
     408             280 :         return awaitable{this, buffers};
     409                 :     }
     410                 : 
     411                 :     /** Asynchronously write data to the stream.
     412                 : 
     413                 :         Transfers up to `buffer_size(buffers)` bytes to the
     414                 :         peer's incoming buffer. If the peer is suspended in
     415                 :         @ref read_some, it is resumed. Before every write,
     416                 :         the attached @ref fuse is consulted to possibly inject
     417                 :         an error. If the fuse fires, the peer is automatically
     418                 :         closed. If the stream is closed, returns `error::eof`.
     419                 :         The returned `std::size_t` is the number of bytes
     420                 :         transferred.
     421                 : 
     422                 :         @param buffers The const buffer sequence containing
     423                 :             data to write.
     424                 : 
     425                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     426                 : 
     427                 :         @par Cancellation
     428                 :         If the environment's stop token has been requested, the write
     429                 :         completes immediately with `error::canceled` and transfers no
     430                 :         data. An empty buffer sequence is a no-op that completes
     431                 :         successfully regardless of the stop token.
     432                 : 
     433                 :         @see fuse, close
     434                 :     */
     435                 :     template<ConstBufferSequence CB>
     436                 :     auto
     437             261 :     write_some(CB buffers)
     438                 :     {
     439                 :         struct awaitable
     440                 :         {
     441                 :             stream* self_;
     442                 :             CB buffers_;
     443                 :             bool canceled_ = false;
     444                 : 
     445             261 :             bool await_ready() const noexcept { return false; }
     446                 : 
     447                 :             // The write completes synchronously; await_suspend is only
     448                 :             // used to observe the environment's stop token. Returning
     449                 :             // false means the coroutine does not actually suspend.
     450                 :             bool
     451             261 :             await_suspend(
     452                 :                 std::coroutine_handle<>,
     453                 :                 io_env const* env) noexcept
     454                 :             {
     455             261 :                 canceled_ = env->stop_token.stop_requested();
     456             261 :                 return false;
     457                 :             }
     458                 : 
     459                 :             io_result<std::size_t>
     460             261 :             await_resume()
     461                 :             {
     462             261 :                 std::size_t n = buffer_size(buffers_);
     463             261 :                 if(n == 0)
     464               4 :                     return {{}, 0};
     465                 : 
     466             257 :                 if(canceled_)
     467               1 :                     return {error::canceled, 0};
     468                 : 
     469             256 :                 auto* st = self_->state_.get();
     470                 : 
     471             256 :                 if(st->closed)
     472 MIS           0 :                     return {error::eof, 0};
     473                 : 
     474 HIT         256 :                 close_guard g{st};
     475             256 :                 auto ec = st->f.maybe_fail();
     476             205 :                 if(ec)
     477              51 :                     return {ec, 0};
     478             154 :                 g.disarm();
     479                 : 
     480             154 :                 int peer = 1 - self_->index_;
     481             154 :                 auto& side = st->sides[peer];
     482                 : 
     483             154 :                 std::size_t const old_size = side.buf.size();
     484             154 :                 side.buf.resize(old_size + n);
     485             154 :                 buffer_copy(make_buffer(
     486             154 :                     side.buf.data() + old_size, n),
     487             154 :                     buffers_, n);
     488                 : 
     489             154 :                 state::wake(side);
     490                 : 
     491             154 :                 return {{}, n};
     492             256 :             }
     493                 :         };
     494             261 :         return awaitable{this, buffers};
     495                 :     }
     496                 : 
     497                 :     /** Inject data into this stream's peer for reading.
     498                 : 
     499                 :         Appends data directly to the peer's incoming buffer,
     500                 :         bypassing the fuse. If the peer is suspended in
     501                 :         @ref read_some, it is resumed. This is test setup,
     502                 :         not an operation under test.
     503                 : 
     504                 :         @param sv The data to inject.
     505                 : 
     506                 :         @see make_stream_pair
     507                 :     */
     508                 :     void
     509              89 :     provide(std::string_view sv)
     510                 :     {
     511              89 :         int peer = 1 - index_;
     512              89 :         auto& side = state_->sides[peer];
     513              89 :         side.buf.append(sv);
     514              89 :         state::wake(side);
     515              89 :     }
     516                 : 
     517                 :     /** Read from this stream and verify the content.
     518                 : 
     519                 :         Reads exactly `expected.size()` bytes from the stream
     520                 :         and compares against the expected string. The read goes
     521                 :         through the normal path including the fuse.
     522                 : 
     523                 :         @param expected The expected content.
     524                 : 
     525                 :         @return A pair of `(error_code, bool)`. The error_code
     526                 :             is set if a read error occurs (e.g. fuse injection).
     527                 :             The bool is true if the data matches.
     528                 : 
     529                 :         @see provide
     530                 :     */
     531                 :     std::pair<std::error_code, bool>
     532              38 :     expect(std::string_view expected)
     533                 :     {
     534              38 :         std::error_code result;
     535              38 :         bool match = false;
     536             141 :         run_blocking()([](
     537                 :             stream& self,
     538                 :             std::string_view expected,
     539                 :             std::error_code& result,
     540                 :             bool& match) -> task<>
     541                 :         {
     542                 :             std::string buf(expected.size(), '\0');
     543                 :             auto [ec, n] = co_await read(
     544                 :                 self, mutable_buffer(
     545                 :                     buf.data(), buf.size()));
     546                 :             if(ec)
     547                 :             {
     548                 :                 result = ec;
     549                 :                 co_return;
     550                 :             }
     551                 :             match = (std::string_view(
     552                 :                 buf.data(), n) == expected);
     553             161 :         }(*this, expected, result, match));
     554              58 :         return {result, match};
     555                 :     }
     556                 : 
     557                 :     /** Return the stream's pending read data.
     558                 : 
     559                 :         Returns a view of the data waiting to be read
     560                 :         from this stream. This is a direct peek at the
     561                 :         internal buffer, bypassing the fuse.
     562                 : 
     563                 :         @return A view of the pending data.
     564                 : 
     565                 :         @see provide, expect
     566                 :     */
     567                 :     std::string_view
     568                 :     data() const noexcept
     569                 :     {
     570                 :         return state_->sides[index_].buf;
     571                 :     }
     572                 : };
     573                 : 
     574                 : /** Create a connected pair of test streams.
     575                 : 
     576                 :     Data written to one stream becomes readable on the other.
     577                 :     If a coroutine calls @ref stream::read_some when no data
     578                 :     is available, it suspends until the peer writes. Before
     579                 :     every read or write, the @ref fuse is consulted to
     580                 :     possibly inject an error for testing fault scenarios.
     581                 :     When the fuse fires, the peer is automatically closed.
     582                 : 
     583                 :     @param f The fuse used to inject errors during operations.
     584                 : 
     585                 :     @return A pair of connected streams.
     586                 : 
     587                 :     @see stream, fuse
     588                 : */
     589                 : inline std::pair<stream, stream>
     590             286 : make_stream_pair(fuse f = {})
     591                 : {
     592             286 :     auto sp = std::make_shared<stream::state>(std::move(f));
     593             572 :     return {stream(sp, 0), stream(sp, 1)};
     594             286 : }
     595                 : 
     596                 : } // test
     597                 : } // capy
     598                 : } // boost
     599                 : 
     600                 : #endif
        

Generated by: LCOV version 2.3