99.37% Lines (157/158) 100.00% Functions (25/25)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
  3 + // Copyright (c) 2026 Michael Vandeberg
3   // 4   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 7   //
7   // Official repository: https://github.com/cppalliance/capy 8   // Official repository: https://github.com/cppalliance/capy
8   // 9   //
9   10  
10   #ifndef BOOST_CAPY_TEST_STREAM_HPP 11   #ifndef BOOST_CAPY_TEST_STREAM_HPP
11   #define BOOST_CAPY_TEST_STREAM_HPP 12   #define BOOST_CAPY_TEST_STREAM_HPP
12   13  
13   #include <boost/capy/detail/config.hpp> 14   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/buffers.hpp> 15   #include <boost/capy/buffers.hpp>
15   #include <boost/capy/buffers/buffer_copy.hpp> 16   #include <boost/capy/buffers/buffer_copy.hpp>
16   #include <boost/capy/buffers/make_buffer.hpp> 17   #include <boost/capy/buffers/make_buffer.hpp>
17   #include <boost/capy/continuation.hpp> 18   #include <boost/capy/continuation.hpp>
18   #include <coroutine> 19   #include <coroutine>
19   #include <boost/capy/ex/io_env.hpp> 20   #include <boost/capy/ex/io_env.hpp>
20   #include <boost/capy/io_result.hpp> 21   #include <boost/capy/io_result.hpp>
21   #include <boost/capy/error.hpp> 22   #include <boost/capy/error.hpp>
22   #include <boost/capy/read.hpp> 23   #include <boost/capy/read.hpp>
23   #include <boost/capy/task.hpp> 24   #include <boost/capy/task.hpp>
24   #include <boost/capy/test/fuse.hpp> 25   #include <boost/capy/test/fuse.hpp>
25   #include <boost/capy/test/run_blocking.hpp> 26   #include <boost/capy/test/run_blocking.hpp>
26   27  
  28 + #include <atomic>
27   #include <memory> 29   #include <memory>
  30 + #include <new>
28   #include <stop_token> 31   #include <stop_token>
29   #include <string> 32   #include <string>
30   #include <string_view> 33   #include <string_view>
31   #include <utility> 34   #include <utility>
32   35  
33   namespace boost { 36   namespace boost {
34   namespace capy { 37   namespace capy {
35   namespace test { 38   namespace test {
36   39  
37   /** A connected stream for testing bidirectional I/O. 40   /** A connected stream for testing bidirectional I/O.
38   41  
39   Streams are created in pairs via @ref make_stream_pair. 42   Streams are created in pairs via @ref make_stream_pair.
40   Data written to one end becomes available for reading on 43   Data written to one end becomes available for reading on
41   the other. If no data is available when @ref read_some 44   the other. If no data is available when @ref read_some
42   is called, the calling coroutine suspends until the peer 45   is called, the calling coroutine suspends until the peer
43   calls @ref write_some. The shared @ref fuse enables error 46   calls @ref write_some. The shared @ref fuse enables error
44   injection at controlled points in both directions. 47   injection at controlled points in both directions.
45   48  
46   When the fuse injects an error or throws on one end, the 49   When the fuse injects an error or throws on one end, the
47   other end is automatically closed: any suspended reader is 50   other end is automatically closed: any suspended reader is
48   resumed with `error::eof`, and subsequent operations on 51   resumed with `error::eof`, and subsequent operations on
49   both ends return `error::eof`. Calling @ref close on one 52   both ends return `error::eof`. Calling @ref close on one
50   end signals eof to the peer's reads after draining any 53   end signals eof to the peer's reads after draining any
51   buffered data, while the peer may still write. 54   buffered data, while the peer may still write.
52   55  
53   @par Thread Safety 56   @par Thread Safety
54   Single-threaded only. Both ends of the pair must be 57   Single-threaded only. Both ends of the pair must be
55   accessed from the same thread. Concurrent access is 58   accessed from the same thread. Concurrent access is
56   undefined behavior. 59   undefined behavior.
57   60  
58   @par Example 61   @par Example
59   @code 62   @code
60   fuse f; 63   fuse f;
61   auto [a, b] = make_stream_pair( f ); 64   auto [a, b] = make_stream_pair( f );
62   65  
63   auto r = f.armed( [&]( fuse& ) -> task<> { 66   auto r = f.armed( [&]( fuse& ) -> task<> {
64   auto [ec, n] = co_await a.write_some( 67   auto [ec, n] = co_await a.write_some(
65   const_buffer( "hello", 5 ) ); 68   const_buffer( "hello", 5 ) );
66   if( ec ) 69   if( ec )
67   co_return; 70   co_return;
68   71  
69   char buf[32]; 72   char buf[32];
70   auto [ec2, n2] = co_await b.read_some( 73   auto [ec2, n2] = co_await b.read_some(
71   mutable_buffer( buf, sizeof( buf ) ) ); 74   mutable_buffer( buf, sizeof( buf ) ) );
72   if( ec2 ) 75   if( ec2 )
73   co_return; 76   co_return;
74   // buf contains "hello" 77   // buf contains "hello"
75   } ); 78   } );
76   @endcode 79   @endcode
77   80  
78   @see make_stream_pair, fuse 81   @see make_stream_pair, fuse
79   */ 82   */
80   class stream 83   class stream
81   { 84   {
82   // Single-threaded only. No concurrent access to either 85   // Single-threaded only. No concurrent access to either
83   // end of the pair. Both streams and all operations must 86   // end of the pair. Both streams and all operations must
84   // run on the same thread. 87   // run on the same thread.
85   88  
86   struct half 89   struct half
87   { 90   {
88   std::string buf; 91   std::string buf;
89   std::size_t max_read_size = std::size_t(-1); 92   std::size_t max_read_size = std::size_t(-1);
90   continuation pending_cont_; 93   continuation pending_cont_;
91   executor_ref pending_ex; 94   executor_ref pending_ex;
  95 + // Points at the suspended reader's claim flag (owned by the
  96 + // read awaitable). Lets a peer wake coordinate with a stop
  97 + // callback so the parked read is resumed exactly once.
  98 + std::atomic<bool>* pending_claimed = nullptr;
92   bool eof = false; 99   bool eof = false;
93   }; 100   };
94   101  
95   struct state 102   struct state
96   { 103   {
97   fuse f; 104   fuse f;
98   bool closed = false; 105   bool closed = false;
99   half sides[2]; 106   half sides[2];
100   107  
HITCBC 101   280 explicit state(fuse f_) noexcept 108   286 explicit state(fuse f_) noexcept
HITCBC 102   840 : f(std::move(f_)) 109   858 : f(std::move(f_))
103   { 110   {
HITCBC 104   280 } 111   286 }
105   112  
  113 + // Resume a suspended reader on this side, if any. Claims the
  114 + // reader's atomic so it is never double-resumed by a racing
  115 + // stop callback; the loser of the race skips the post.
HITGNC   116 + 662 static void wake(half& side)
  117 + {
HITGNC   118 + 662 if(! side.pending_cont_.h)
HITGNC   119 + 637 return;
HITGNC   120 + 50 if(! side.pending_claimed ||
HITGNC   121 + 25 ! side.pending_claimed->exchange(
  122 + true, std::memory_order_acq_rel))
  123 + {
HITGNC   124 + 25 side.pending_ex.post(side.pending_cont_);
  125 + }
HITGNC   126 + 25 side.pending_cont_.h = {};
HITGNC   127 + 25 side.pending_ex = {};
HITGNC   128 + 25 side.pending_claimed = nullptr;
  129 + }
  130 +
106   // Set closed and resume any suspended readers 131   // Set closed and resume any suspended readers
107   // with eof on both sides. 132   // with eof on both sides.
HITCBC 108   208 void close() 133   208 void close()
109   { 134   {
HITCBC 110   208 closed = true; 135   208 closed = true;
HITCBC 111   624 for(auto& side : sides) 136   624 for(auto& side : sides)
HITGIC 112 - { 137 + 416 wake(side);
DCB 113 - 416 if(side.pending_cont_.h)  
114 - {  
DCB 115 - 12 side.pending_ex.post(side.pending_cont_);  
DCB 116 - 12 side.pending_cont_.h = {};  
DCB 117 - 12 side.pending_ex = {};  
118 - }  
119 - }  
HITCBC 120   208 } 138   208 }
121   }; 139   };
122   140  
123   // Wraps the maybe_fail() call. If the guard is 141   // Wraps the maybe_fail() call. If the guard is
124   // not disarmed before destruction (fuse returned 142   // not disarmed before destruction (fuse returned
125   // an error, or threw an exception), closes both 143   // an error, or threw an exception), closes both
126   // ends so any suspended peer gets eof. 144   // ends so any suspended peer gets eof.
127   struct close_guard 145   struct close_guard
128   { 146   {
129   state* st; 147   state* st;
130   bool armed = true; 148   bool armed = true;
HITCBC 131   300 void disarm() noexcept { armed = false; } 149   301 void disarm() noexcept { armed = false; }
HITCBC 132   508 ~close_guard() noexcept(false) { if(armed) st->close(); } 150   509 ~close_guard() noexcept(false) { if(armed) st->close(); }
133   }; 151   };
134   152  
135   std::shared_ptr<state> state_; 153   std::shared_ptr<state> state_;
136   int index_; 154   int index_;
137   155  
HITCBC 138   560 stream( 156   572 stream(
139   std::shared_ptr<state> sp, 157   std::shared_ptr<state> sp,
140   int index) noexcept 158   int index) noexcept
HITCBC 141   560 : state_(std::move(sp)) 159   572 : state_(std::move(sp))
HITCBC 142   560 , index_(index) 160   572 , index_(index)
143   { 161   {
HITCBC 144   560 } 162   572 }
145   163  
146   friend std::pair<stream, stream> 164   friend std::pair<stream, stream>
147   make_stream_pair(fuse); 165   make_stream_pair(fuse);
148   166  
149   public: 167   public:
150   stream(stream const&) = delete; 168   stream(stream const&) = delete;
151   stream& operator=(stream const&) = delete; 169   stream& operator=(stream const&) = delete;
HITCBC 152   660 stream(stream&&) = default; 170   672 stream(stream&&) = default;
153   stream& operator=(stream&&) = default; 171   stream& operator=(stream&&) = default;
154   172  
155   /** Signal end-of-stream to the peer. 173   /** Signal end-of-stream to the peer.
156   174  
157   Marks the peer's read direction as closed. 175   Marks the peer's read direction as closed.
158   If the peer is suspended in @ref read_some, 176   If the peer is suspended in @ref read_some,
159   it is resumed. The peer drains any buffered 177   it is resumed. The peer drains any buffered
160   data before receiving `error::eof`. Writes 178   data before receiving `error::eof`. Writes
161   from the peer are unaffected. 179   from the peer are unaffected.
162   */ 180   */
163   void 181   void
HITCBC 164   3 close() 182   3 close()
165   { 183   {
HITCBC 166   3 int peer = 1 - index_; 184   3 int peer = 1 - index_;
HITCBC 167   3 auto& side = state_->sides[peer]; 185   3 auto& side = state_->sides[peer];
HITCBC 168   3 side.eof = true; 186   3 side.eof = true;
HITCBC 169 - 3 if(side.pending_cont_.h) 187 + 3 state::wake(side);
170 - {  
DCB 171 - 1 side.pending_ex.post(side.pending_cont_);  
DCB 172 - 1 side.pending_cont_.h = {};  
DCB 173 - 1 side.pending_ex = {};  
174 - }  
HITCBC 175   3 } 188   3 }
176   189  
177   /** Set the maximum bytes returned per read. 190   /** Set the maximum bytes returned per read.
178   191  
179   Limits how many bytes @ref read_some returns in 192   Limits how many bytes @ref read_some returns in
180   a single call, simulating chunked network delivery. 193   a single call, simulating chunked network delivery.
181   The default is unlimited. 194   The default is unlimited.
182   195  
183   @param n Maximum bytes per read. 196   @param n Maximum bytes per read.
184   */ 197   */
185   void 198   void
HITCBC 186   54 set_max_read_size(std::size_t n) noexcept 199   54 set_max_read_size(std::size_t n) noexcept
187   { 200   {
HITCBC 188   54 state_->sides[index_].max_read_size = n; 201   54 state_->sides[index_].max_read_size = n;
HITCBC 189   54 } 202   54 }
190   203  
191   /** Asynchronously read data from the stream. 204   /** Asynchronously read data from the stream.
192   205  
193   Transfers up to `buffer_size(buffers)` bytes from 206   Transfers up to `buffer_size(buffers)` bytes from
194   data written by the peer. If no data is available, 207   data written by the peer. If no data is available,
195   the calling coroutine suspends until the peer calls 208   the calling coroutine suspends until the peer calls
196   @ref write_some. Before every read, the attached 209   @ref write_some. Before every read, the attached
197   @ref fuse is consulted to possibly inject an error. 210   @ref fuse is consulted to possibly inject an error.
198   If the fuse fires, the peer is automatically closed. 211   If the fuse fires, the peer is automatically closed.
199   If the stream is closed, returns `error::eof`. 212   If the stream is closed, returns `error::eof`.
200   The returned `std::size_t` is the number of bytes 213   The returned `std::size_t` is the number of bytes
201   transferred. 214   transferred.
202   215  
203   @param buffers The mutable buffer sequence to receive data. 216   @param buffers The mutable buffer sequence to receive data.
204   217  
205   @return An awaitable that await-returns `(error_code,std::size_t)`. 218   @return An awaitable that await-returns `(error_code,std::size_t)`.
206   219  
  220 + @par Cancellation
  221 + Cancellation applies only to a read that would otherwise suspend:
  222 + if no data is available and the environment's stop token is
  223 + requested (before or during the wait), the read resumes with
  224 + `error::canceled`. A read that can complete immediately from
  225 + buffered data is unaffected by the stop token.
  226 +
207   @see fuse, close 227   @see fuse, close
208   */ 228   */
209   template<MutableBufferSequence MB> 229   template<MutableBufferSequence MB>
210   auto 230   auto
HITCBC 211   275 read_some(MB buffers) 231   280 read_some(MB buffers)
212   { 232   {
  233 + // The read suspends when no data is available, parking its
  234 + // continuation on the side until the peer writes/closes. To
  235 + // support cancellation it follows the same pattern as
  236 + // delay_awaitable: a stop callback claims the resume (racing
  237 + // the peer wake via an atomic) and posts the continuation
  238 + // through the executor. Because it owns a std::atomic and a
  239 + // std::stop_callback, the awaitable needs explicit move and
  240 + // destruction (the task promise moves it into its
  241 + // transform_awaiter before awaiting).
213   struct awaitable 242   struct awaitable
214   { 243   {
215   stream* self_; 244   stream* self_;
216   MB buffers_; 245   MB buffers_;
217   246  
  247 + // Declared before stop_cb_buf_: the stop callback reads
  248 + // these, so they must outlive a blocking stop_cb_ destructor.
  249 + continuation cont_;
  250 + executor_ref ex_;
  251 + half* side_ = nullptr;
  252 + std::atomic<bool> claimed_{false};
  253 + bool canceled_ = false;
  254 + bool stop_cb_active_ = false;
  255 +
  256 + struct cancel_fn
  257 + {
  258 + awaitable* self_;
  259 +
HITGNC   260 + 15 void operator()() const noexcept
  261 + {
HITGNC   262 + 15 if(! self_->claimed_.exchange(
  263 + true, std::memory_order_acq_rel))
  264 + {
HITGNC   265 + 3 self_->canceled_ = true;
HITGNC   266 + 3 self_->ex_.post(self_->cont_);
  267 + }
HITGNC   268 + 15 }
  269 + };
  270 +
  271 + using stop_cb_t = std::stop_callback<cancel_fn>;
  272 +
  273 + // Declared last: its destructor may block while the callback
  274 + // accesses the members above. A union gives correct alignment
  275 + // for stop_cb_t without an alignas specifier, which avoids
  276 + // MSVC's C4324 padding warning on this function-local class
  277 + // (the member-level pragma used by delay_awaitable does not
  278 + // suppress it here). Lifetime is managed manually: placement
  279 + // new in await_suspend, explicit destruction once done.
  280 + union { stop_cb_t stop_cb_; };
  281 +
HITGNC   282 + 280 awaitable(stream* self, MB buffers) noexcept
HITGNC   283 + 280 : self_(self)
HITGNC   284 + 280 , buffers_(buffers)
  285 + {
HITGNC   286 + 280 }
  287 +
  288 + /// @pre Not yet awaited (no active stop callback).
HITGNC   289 + 280 awaitable(awaitable&& o) noexcept
HITGNC   290 + 280 : self_(o.self_)
HITGNC   291 + 280 , buffers_(o.buffers_)
HITGNC   292 + 280 , cont_(o.cont_)
HITGNC   293 + 280 , ex_(o.ex_)
HITGNC   294 + 280 , side_(o.side_)
HITGNC   295 + 280 , claimed_(o.claimed_.load(std::memory_order_relaxed))
HITGNC   296 + 280 , canceled_(o.canceled_)
HITGNC   297 + 280 , stop_cb_active_(std::exchange(o.stop_cb_active_, false))
  298 + {
HITGNC   299 + 280 }
  300 +
HITGNC   301 + 560 ~awaitable()
  302 + {
HITGNC   303 + 560 if(stop_cb_active_)
HITGNC   304 + 1 stop_cb_.~stop_cb_t();
  305 + // Unlink from the side if still parked (e.g. the
  306 + // coroutine was destroyed while suspended), so a later
  307 + // peer wake does not dereference a freed claim flag.
HITGNC   308 + 560 if(side_ && side_->pending_claimed == &claimed_)
  309 + {
HITGNC   310 + 1 side_->pending_cont_.h = {};
HITGNC   311 + 1 side_->pending_ex = {};
HITGNC   312 + 1 side_->pending_claimed = nullptr;
  313 + }
HITGNC   314 + 560 }
  315 +
  316 + awaitable(awaitable const&) = delete;
  317 + awaitable& operator=(awaitable const&) = delete;
  318 + awaitable& operator=(awaitable&&) = delete;
  319 +
HITCBC 218   275 bool await_ready() const noexcept 320   280 bool await_ready() const noexcept
219   { 321   {
HITCBC 220   275 if(buffer_empty(buffers_)) 322   280 if(buffer_empty(buffers_))
HITCBC 221   8 return true; 323   8 return true;
HITCBC 222   267 auto* st = self_->state_.get(); 324   272 auto* st = self_->state_.get();
HITCBC 223   267 auto& side = st->sides[self_->index_]; 325   272 auto& side = st->sides[self_->index_];
HITCBC 224   532 return st->closed || side.eof || 326   542 return st->closed || side.eof ||
HITCBC 225   532 !side.buf.empty(); 327   542 !side.buf.empty();
226   } 328   }
227   329  
HITCBC 228   25 std::coroutine_handle<> await_suspend( 330   29 std::coroutine_handle<> await_suspend(
229   std::coroutine_handle<> h, 331   std::coroutine_handle<> h,
230   io_env const* env) noexcept 332   io_env const* env) noexcept
231   { 333   {
  334 + // Park the continuation, then register the stop callback.
  335 + // If stop is already requested, the callback fires inline
  336 + // during construction: it claims the resume and posts the
  337 + // continuation through the executor (never a symmetric
  338 + // self-transfer, which would leak this frame under
  339 + // run_async). The parked read is then resumed with
  340 + // error::canceled by the run loop.
HITCBC 232   25 auto& side = self_->state_->sides[ 341   29 auto& side = self_->state_->sides[
HITCBC 233   25 self_->index_]; 342   29 self_->index_];
HITGNC   343 + 29 cont_.h = h;
HITGNC   344 + 29 ex_ = env->executor;
HITGNC   345 + 29 side_ = &side;
HITCBC 234   25 side.pending_cont_.h = h; 346   29 side.pending_cont_.h = h;
HITCBC 235   25 side.pending_ex = env->executor; 347   29 side.pending_ex = env->executor;
HITGNC   348 + 29 side.pending_claimed = &claimed_;
  349 +
HITGNC   350 + 29 ::new(static_cast<void*>(&stop_cb_)) stop_cb_t(
HITGNC   351 + 29 env->stop_token, cancel_fn{this});
HITGNC   352 + 29 stop_cb_active_ = true;
  353 +
HITCBC 236   25 return std::noop_coroutine(); 354   29 return std::noop_coroutine();
237   } 355   }
238   356  
239   io_result<std::size_t> 357   io_result<std::size_t>
HITCBC 240   275 await_resume() 358   279 await_resume()
241   { 359   {
HITGNC   360 + 279 if(stop_cb_active_)
  361 + {
HITGNC   362 + 28 stop_cb_.~stop_cb_t();
HITGNC   363 + 28 stop_cb_active_ = false;
  364 + }
  365 +
HITCBC 242   275 if(buffer_empty(buffers_)) 366   279 if(buffer_empty(buffers_))
HITCBC 243   8 return {{}, 0}; 367   8 return {{}, 0};
244   368  
HITGNC   369 + 271 if(canceled_)
  370 + {
  371 + // The stop callback posted us but left the side
  372 + // untouched; unlink if a peer wake has not already.
HITGNC   373 + 3 if(side_ && side_->pending_claimed == &claimed_)
  374 + {
HITGNC   375 + 3 side_->pending_cont_.h = {};
HITGNC   376 + 3 side_->pending_ex = {};
HITGNC   377 + 3 side_->pending_claimed = nullptr;
  378 + }
HITGNC   379 + 3 return {error::canceled, 0};
  380 + }
  381 +
HITCBC 245   267 auto* st = self_->state_.get(); 382   268 auto* st = self_->state_.get();
HITCBC 246   267 auto& side = st->sides[ 383   268 auto& side = st->sides[
HITCBC 247   267 self_->index_]; 384   268 self_->index_];
248   385  
HITCBC 249   267 if(st->closed) 386   268 if(st->closed)
HITCBC 250   12 return {error::eof, 0}; 387   12 return {error::eof, 0};
251   388  
HITCBC 252   255 if(side.eof && side.buf.empty()) 389   256 if(side.eof && side.buf.empty())
HITCBC 253   3 return {error::eof, 0}; 390   3 return {error::eof, 0};
254   391  
HITCBC 255   252 if(!side.eof) 392   253 if(!side.eof)
256   { 393   {
HITCBC 257   252 close_guard g{st}; 394   253 close_guard g{st};
HITCBC 258   252 auto ec = st->f.maybe_fail(); 395   253 auto ec = st->f.maybe_fail();
HITCBC 259   199 if(ec) 396   200 if(ec)
HITCBC 260   53 return {ec, 0}; 397   53 return {ec, 0};
HITCBC 261   146 g.disarm(); 398   147 g.disarm();
HITCBC 262   252 } 399   253 }
263   400  
HITCBC 264   292 std::size_t const n = buffer_copy( 401   294 std::size_t const n = buffer_copy(
HITCBC 265   146 buffers_, make_buffer(side.buf), 402   147 buffers_, make_buffer(side.buf),
266   side.max_read_size); 403   side.max_read_size);
HITCBC 267   146 side.buf.erase(0, n); 404   147 side.buf.erase(0, n);
HITCBC 268   146 return {{}, n}; 405   147 return {{}, n};
269   } 406   }
270   }; 407   };
HITCBC 271   275 return awaitable{this, buffers}; 408   280 return awaitable{this, buffers};
272   } 409   }
273   410  
274   /** Asynchronously write data to the stream. 411   /** Asynchronously write data to the stream.
275   412  
276   Transfers up to `buffer_size(buffers)` bytes to the 413   Transfers up to `buffer_size(buffers)` bytes to the
277   peer's incoming buffer. If the peer is suspended in 414   peer's incoming buffer. If the peer is suspended in
278   @ref read_some, it is resumed. Before every write, 415   @ref read_some, it is resumed. Before every write,
279   the attached @ref fuse is consulted to possibly inject 416   the attached @ref fuse is consulted to possibly inject
280   an error. If the fuse fires, the peer is automatically 417   an error. If the fuse fires, the peer is automatically
281   closed. If the stream is closed, returns `error::eof`. 418   closed. If the stream is closed, returns `error::eof`.
282   The returned `std::size_t` is the number of bytes 419   The returned `std::size_t` is the number of bytes
283   transferred. 420   transferred.
284   421  
285   @param buffers The const buffer sequence containing 422   @param buffers The const buffer sequence containing
286   data to write. 423   data to write.
287   424  
288   @return An awaitable that await-returns `(error_code,std::size_t)`. 425   @return An awaitable that await-returns `(error_code,std::size_t)`.
289   426  
  427 + @par Cancellation
  428 + If the environment's stop token has been requested, the write
  429 + completes immediately with `error::canceled` and transfers no
  430 + data. An empty buffer sequence is a no-op that completes
  431 + successfully regardless of the stop token.
  432 +
290   @see fuse, close 433   @see fuse, close
291   */ 434   */
292   template<ConstBufferSequence CB> 435   template<ConstBufferSequence CB>
293   auto 436   auto
HITCBC 294   260 write_some(CB buffers) 437   261 write_some(CB buffers)
295   { 438   {
296   struct awaitable 439   struct awaitable
297   { 440   {
298   stream* self_; 441   stream* self_;
299   CB buffers_; 442   CB buffers_;
  443 + bool canceled_ = false;
300   444  
HITCBC 301 - 260 bool await_ready() const noexcept { return true; } 445 + 261 bool await_ready() const noexcept { return false; }
302   446  
EUB 303 - void await_suspend( 447 + // The write completes synchronously; await_suspend is only
  448 + // used to observe the environment's stop token. Returning
  449 + // false means the coroutine does not actually suspend.
  450 + bool
HITGNC   451 + 261 await_suspend(
304   std::coroutine_handle<>, 452   std::coroutine_handle<>,
305 - io_env const*) const noexcept 453 + io_env const* env) noexcept
306   { 454   {
HITGNC   455 + 261 canceled_ = env->stop_token.stop_requested();
HITGNC   456 + 261 return false;
EUB 307   } 457   }
308   458  
309   io_result<std::size_t> 459   io_result<std::size_t>
HITCBC 310   260 await_resume() 460   261 await_resume()
311   { 461   {
HITCBC 312   260 std::size_t n = buffer_size(buffers_); 462   261 std::size_t n = buffer_size(buffers_);
HITCBC 313   260 if(n == 0) 463   261 if(n == 0)
HITCBC 314   4 return {{}, 0}; 464   4 return {{}, 0};
315   465  
HITGNC   466 + 257 if(canceled_)
HITGNC   467 + 1 return {error::canceled, 0};
  468 +
HITCBC 316   256 auto* st = self_->state_.get(); 469   256 auto* st = self_->state_.get();
317   470  
HITCBC 318   256 if(st->closed) 471   256 if(st->closed)
MISUBC 319   return {error::eof, 0}; 472   return {error::eof, 0};
320   473  
HITCBC 321   256 close_guard g{st}; 474   256 close_guard g{st};
HITCBC 322   256 auto ec = st->f.maybe_fail(); 475   256 auto ec = st->f.maybe_fail();
HITCBC 323   205 if(ec) 476   205 if(ec)
HITCBC 324   51 return {ec, 0}; 477   51 return {ec, 0};
HITCBC 325   154 g.disarm(); 478   154 g.disarm();
326   479  
HITCBC 327   154 int peer = 1 - self_->index_; 480   154 int peer = 1 - self_->index_;
HITCBC 328   154 auto& side = st->sides[peer]; 481   154 auto& side = st->sides[peer];
329   482  
HITCBC 330   154 std::size_t const old_size = side.buf.size(); 483   154 std::size_t const old_size = side.buf.size();
HITCBC 331   154 side.buf.resize(old_size + n); 484   154 side.buf.resize(old_size + n);
HITCBC 332   154 buffer_copy(make_buffer( 485   154 buffer_copy(make_buffer(
HITCBC 333   154 side.buf.data() + old_size, n), 486   154 side.buf.data() + old_size, n),
HITCBC 334   154 buffers_, n); 487   154 buffers_, n);
335   488  
HITCBC 336 - 154 if(side.pending_cont_.h) 489 + 154 state::wake(side);
337 - {  
DCB 338 - 12 side.pending_ex.post(side.pending_cont_);  
DCB 339 - 12 side.pending_cont_.h = {};  
DCB 340 - 12 side.pending_ex = {};  
341 - }  
342   490  
HITCBC 343   154 return {{}, n}; 491   154 return {{}, n};
HITCBC 344   256 } 492   256 }
345   }; 493   };
HITCBC 346   260 return awaitable{this, buffers}; 494   261 return awaitable{this, buffers};
347   } 495   }
348   496  
349   /** Inject data into this stream's peer for reading. 497   /** Inject data into this stream's peer for reading.
350   498  
351   Appends data directly to the peer's incoming buffer, 499   Appends data directly to the peer's incoming buffer,
352   bypassing the fuse. If the peer is suspended in 500   bypassing the fuse. If the peer is suspended in
353   @ref read_some, it is resumed. This is test setup, 501   @ref read_some, it is resumed. This is test setup,
354   not an operation under test. 502   not an operation under test.
355   503  
356   @param sv The data to inject. 504   @param sv The data to inject.
357   505  
358   @see make_stream_pair 506   @see make_stream_pair
359   */ 507   */
360   void 508   void
HITCBC 361   87 provide(std::string_view sv) 509   89 provide(std::string_view sv)
362   { 510   {
HITCBC 363   87 int peer = 1 - index_; 511   89 int peer = 1 - index_;
HITCBC 364   87 auto& side = state_->sides[peer]; 512   89 auto& side = state_->sides[peer];
HITCBC 365   87 side.buf.append(sv); 513   89 side.buf.append(sv);
HITCBC 366 - 87 if(side.pending_cont_.h) 514 + 89 state::wake(side);
367 - {  
DUB 368 - side.pending_ex.post(side.pending_cont_);  
DUB 369 - side.pending_cont_.h = {};  
DUB 370 - side.pending_ex = {};  
371 - }  
HITCBC 372   87 } 515   89 }
373   516  
374   /** Read from this stream and verify the content. 517   /** Read from this stream and verify the content.
375   518  
376   Reads exactly `expected.size()` bytes from the stream 519   Reads exactly `expected.size()` bytes from the stream
377   and compares against the expected string. The read goes 520   and compares against the expected string. The read goes
378   through the normal path including the fuse. 521   through the normal path including the fuse.
379   522  
380   @param expected The expected content. 523   @param expected The expected content.
381   524  
382   @return A pair of `(error_code, bool)`. The error_code 525   @return A pair of `(error_code, bool)`. The error_code
383   is set if a read error occurs (e.g. fuse injection). 526   is set if a read error occurs (e.g. fuse injection).
384   The bool is true if the data matches. 527   The bool is true if the data matches.
385   528  
386   @see provide 529   @see provide
387   */ 530   */
388   std::pair<std::error_code, bool> 531   std::pair<std::error_code, bool>
HITCBC 389   38 expect(std::string_view expected) 532   38 expect(std::string_view expected)
390   { 533   {
HITCBC 391   38 std::error_code result; 534   38 std::error_code result;
HITCBC 392   38 bool match = false; 535   38 bool match = false;
HITCBC 393   141 run_blocking()([]( 536   141 run_blocking()([](
394   stream& self, 537   stream& self,
395   std::string_view expected, 538   std::string_view expected,
396   std::error_code& result, 539   std::error_code& result,
397   bool& match) -> task<> 540   bool& match) -> task<>
398   { 541   {
399   std::string buf(expected.size(), '\0'); 542   std::string buf(expected.size(), '\0');
400   auto [ec, n] = co_await read( 543   auto [ec, n] = co_await read(
401   self, mutable_buffer( 544   self, mutable_buffer(
402   buf.data(), buf.size())); 545   buf.data(), buf.size()));
403   if(ec) 546   if(ec)
404   { 547   {
405   result = ec; 548   result = ec;
406   co_return; 549   co_return;
407   } 550   }
408   match = (std::string_view( 551   match = (std::string_view(
409   buf.data(), n) == expected); 552   buf.data(), n) == expected);
HITCBC 410   161 }(*this, expected, result, match)); 553   161 }(*this, expected, result, match));
HITCBC 411   58 return {result, match}; 554   58 return {result, match};
412   } 555   }
413   556  
414   /** Return the stream's pending read data. 557   /** Return the stream's pending read data.
415   558  
416   Returns a view of the data waiting to be read 559   Returns a view of the data waiting to be read
417   from this stream. This is a direct peek at the 560   from this stream. This is a direct peek at the
418   internal buffer, bypassing the fuse. 561   internal buffer, bypassing the fuse.
419   562  
420   @return A view of the pending data. 563   @return A view of the pending data.
421   564  
422   @see provide, expect 565   @see provide, expect
423   */ 566   */
424   std::string_view 567   std::string_view
425   data() const noexcept 568   data() const noexcept
426   { 569   {
427   return state_->sides[index_].buf; 570   return state_->sides[index_].buf;
428   } 571   }
429   }; 572   };
430   573  
431   /** Create a connected pair of test streams. 574   /** Create a connected pair of test streams.
432   575  
433   Data written to one stream becomes readable on the other. 576   Data written to one stream becomes readable on the other.
434   If a coroutine calls @ref stream::read_some when no data 577   If a coroutine calls @ref stream::read_some when no data
435   is available, it suspends until the peer writes. Before 578   is available, it suspends until the peer writes. Before
436   every read or write, the @ref fuse is consulted to 579   every read or write, the @ref fuse is consulted to
437   possibly inject an error for testing fault scenarios. 580   possibly inject an error for testing fault scenarios.
438   When the fuse fires, the peer is automatically closed. 581   When the fuse fires, the peer is automatically closed.
439   582  
440   @param f The fuse used to inject errors during operations. 583   @param f The fuse used to inject errors during operations.
441   584  
442   @return A pair of connected streams. 585   @return A pair of connected streams.
443   586  
444   @see stream, fuse 587   @see stream, fuse
445   */ 588   */
446   inline std::pair<stream, stream> 589   inline std::pair<stream, stream>
HITCBC 447   280 make_stream_pair(fuse f = {}) 590   286 make_stream_pair(fuse f = {})
448   { 591   {
HITCBC 449   280 auto sp = std::make_shared<stream::state>(std::move(f)); 592   286 auto sp = std::make_shared<stream::state>(std::move(f));
HITCBC 450   560 return {stream(sp, 0), stream(sp, 1)}; 593   572 return {stream(sp, 0), stream(sp, 1)};
HITCBC 451   280 } 594   286 }
452   595  
453   } // test 596   } // test
454   } // capy 597   } // capy
455   } // boost 598   } // boost
456   599  
457   #endif 600   #endif