100.00% Lines (55/55) 100.00% Functions (14/14)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
  3 + // Copyright (c) 2026 Michael Vandeberg
3   // 4   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 7   //
7   // Official repository: https://github.com/cppalliance/capy 8   // Official repository: https://github.com/cppalliance/capy
8   // 9   //
9   10  
10   #ifndef BOOST_CAPY_TEST_BUFFER_SINK_HPP 11   #ifndef BOOST_CAPY_TEST_BUFFER_SINK_HPP
11   #define BOOST_CAPY_TEST_BUFFER_SINK_HPP 12   #define BOOST_CAPY_TEST_BUFFER_SINK_HPP
12   13  
13   #include <boost/capy/detail/config.hpp> 14   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/buffers.hpp> 15   #include <boost/capy/buffers.hpp>
15   #include <boost/capy/buffers/make_buffer.hpp> 16   #include <boost/capy/buffers/make_buffer.hpp>
16   #include <coroutine> 17   #include <coroutine>
17   #include <boost/capy/ex/io_env.hpp> 18   #include <boost/capy/ex/io_env.hpp>
18   #include <boost/capy/io_result.hpp> 19   #include <boost/capy/io_result.hpp>
19   #include <boost/capy/test/fuse.hpp> 20   #include <boost/capy/test/fuse.hpp>
20   21  
21   #include <algorithm> 22   #include <algorithm>
22   #include <span> 23   #include <span>
23   #include <string> 24   #include <string>
24   #include <string_view> 25   #include <string_view>
25   26  
26   namespace boost { 27   namespace boost {
27   namespace capy { 28   namespace capy {
28   namespace test { 29   namespace test {
29   30  
30   /** A mock buffer sink for testing callee-owns-buffers write operations. 31   /** A mock buffer sink for testing callee-owns-buffers write operations.
31   32  
32   Use this to verify code that writes data using the callee-owns-buffers 33   Use this to verify code that writes data using the callee-owns-buffers
33   pattern without needing real I/O. Call @ref prepare to get writable 34   pattern without needing real I/O. Call @ref prepare to get writable
34   buffers, write into them, then call @ref commit to finalize. The 35   buffers, write into them, then call @ref commit to finalize. The
35   associated @ref fuse enables error injection at controlled points. 36   associated @ref fuse enables error injection at controlled points.
36   37  
37   This class satisfies the @ref BufferSink concept by providing 38   This class satisfies the @ref BufferSink concept by providing
38   internal storage that callers write into directly. 39   internal storage that callers write into directly.
39   40  
40   @par Thread Safety 41   @par Thread Safety
41   Not thread-safe. 42   Not thread-safe.
42   43  
43   @par Example 44   @par Example
44   @code 45   @code
45   fuse f; 46   fuse f;
46   buffer_sink bs( f ); 47   buffer_sink bs( f );
47   48  
48   auto r = f.armed( [&]( fuse& ) -> task<void> { 49   auto r = f.armed( [&]( fuse& ) -> task<void> {
49   mutable_buffer arr[16]; 50   mutable_buffer arr[16];
50   auto bufs = bs.prepare( arr ); 51   auto bufs = bs.prepare( arr );
51   if( bufs.empty() ) 52   if( bufs.empty() )
52   co_return; 53   co_return;
53   54  
54   // Write data into the first prepared buffer 55   // Write data into the first prepared buffer
55   std::memcpy( bufs[0].data(), "Hello", 5 ); 56   std::memcpy( bufs[0].data(), "Hello", 5 );
56   57  
57   auto [ec] = co_await bs.commit( 5 ); 58   auto [ec] = co_await bs.commit( 5 );
58   if( ec ) 59   if( ec )
59   co_return; 60   co_return;
60   61  
61   auto [ec2] = co_await bs.commit_eof( 0 ); 62   auto [ec2] = co_await bs.commit_eof( 0 );
62   // bs.data() returns "Hello" 63   // bs.data() returns "Hello"
63   } ); 64   } );
64   @endcode 65   @endcode
65   66  
66   @see fuse, BufferSink 67   @see fuse, BufferSink
67   */ 68   */
68   class buffer_sink 69   class buffer_sink
69   { 70   {
70   fuse f_; 71   fuse f_;
71   std::string data_; 72   std::string data_;
72   std::string prepare_buf_; 73   std::string prepare_buf_;
73   std::size_t prepare_size_ = 0; 74   std::size_t prepare_size_ = 0;
74   std::size_t max_prepare_size_; 75   std::size_t max_prepare_size_;
75   bool eof_called_ = false; 76   bool eof_called_ = false;
76   77  
77   public: 78   public:
78   /** Construct a buffer sink. 79   /** Construct a buffer sink.
79   80  
80   @param f The fuse used to inject errors during commits. 81   @param f The fuse used to inject errors during commits.
81   82  
82   @param max_prepare_size Maximum bytes available per prepare. 83   @param max_prepare_size Maximum bytes available per prepare.
83   Use to simulate limited buffer space. 84   Use to simulate limited buffer space.
84   */ 85   */
HITCBC 85   558 explicit buffer_sink( 86   560 explicit buffer_sink(
86   fuse f = {}, 87   fuse f = {},
87   std::size_t max_prepare_size = 4096) noexcept 88   std::size_t max_prepare_size = 4096) noexcept
HITCBC 88   558 : f_(std::move(f)) 89   560 : f_(std::move(f))
HITCBC 89   558 , max_prepare_size_(max_prepare_size) 90   560 , max_prepare_size_(max_prepare_size)
90   { 91   {
HITCBC 91   558 prepare_buf_.resize(max_prepare_size_); 92   560 prepare_buf_.resize(max_prepare_size_);
HITCBC 92   558 } 93   560 }
93   94  
94   /// Return the written data as a string view. 95   /// Return the written data as a string view.
95   std::string_view 96   std::string_view
HITCBC 96   82 data() const noexcept 97   82 data() const noexcept
97   { 98   {
HITCBC 98   82 return data_; 99   82 return data_;
99   } 100   }
100   101  
101   /// Return the number of bytes written. 102   /// Return the number of bytes written.
102   std::size_t 103   std::size_t
HITCBC 103   12 size() const noexcept 104   14 size() const noexcept
104   { 105   {
HITCBC 105   12 return data_.size(); 106   14 return data_.size();
106   } 107   }
107   108  
108   /// Return whether commit_eof has been called. 109   /// Return whether commit_eof has been called.
109   bool 110   bool
HITCBC 110   78 eof_called() const noexcept 111   79 eof_called() const noexcept
111   { 112   {
HITCBC 112   78 return eof_called_; 113   79 return eof_called_;
113   } 114   }
114   115  
115   /// Clear all data and reset state. 116   /// Clear all data and reset state.
116   void 117   void
HITCBC 117   2 clear() noexcept 118   2 clear() noexcept
118   { 119   {
HITCBC 119   2 data_.clear(); 120   2 data_.clear();
HITCBC 120   2 prepare_size_ = 0; 121   2 prepare_size_ = 0;
HITCBC 121   2 eof_called_ = false; 122   2 eof_called_ = false;
HITCBC 122   2 } 123   2 }
123   124  
124   /** Prepare writable buffers. 125   /** Prepare writable buffers.
125   126  
126   Fills the provided span with mutable buffer descriptors pointing 127   Fills the provided span with mutable buffer descriptors pointing
127   to internal storage. The caller writes data into these buffers, 128   to internal storage. The caller writes data into these buffers,
128   then calls @ref commit to finalize. 129   then calls @ref commit to finalize.
129   130  
130   @param dest Span of mutable_buffer to fill. 131   @param dest Span of mutable_buffer to fill.
131   132  
132   @return A span of filled buffers (empty or 1 buffer in this implementation). 133   @return A span of filled buffers (empty or 1 buffer in this implementation).
133   */ 134   */
134   std::span<mutable_buffer> 135   std::span<mutable_buffer>
HITCBC 135   840 prepare(std::span<mutable_buffer> dest) 136   842 prepare(std::span<mutable_buffer> dest)
136   { 137   {
HITCBC 137   840 if(dest.empty()) 138   842 if(dest.empty())
HITCBC 138   2 return {}; 139   2 return {};
139   140  
HITCBC 140   838 prepare_size_ = max_prepare_size_; 141   840 prepare_size_ = max_prepare_size_;
HITCBC 141   838 dest[0] = make_buffer(prepare_buf_.data(), prepare_size_); 142   840 dest[0] = make_buffer(prepare_buf_.data(), prepare_size_);
HITCBC 142   838 return dest.first(1); 143   840 return dest.first(1);
143   } 144   }
144   145  
145   /** Commit bytes written to the prepared buffers. 146   /** Commit bytes written to the prepared buffers.
146   147  
147   Transfers `n` bytes from the prepared buffer to the internal 148   Transfers `n` bytes from the prepared buffer to the internal
148   data buffer. Before committing, the attached @ref fuse is 149   data buffer. Before committing, the attached @ref fuse is
149   consulted to possibly inject an error for testing fault scenarios. 150   consulted to possibly inject an error for testing fault scenarios.
150   151  
151   @param n The number of bytes to commit. 152   @param n The number of bytes to commit.
152   153  
153   @return An awaitable that await-returns `(error_code)`. 154   @return An awaitable that await-returns `(error_code)`.
154   155  
  156 + @par Cancellation
  157 + If the environment's stop token has been requested, the commit
  158 + completes immediately with `error::canceled` and commits no data.
  159 +
155   @see fuse 160   @see fuse
156   */ 161   */
157   auto 162   auto
HITCBC 158   738 commit(std::size_t n) 163   739 commit(std::size_t n)
159   { 164   {
160   struct awaitable 165   struct awaitable
161   { 166   {
162   buffer_sink* self_; 167   buffer_sink* self_;
163   std::size_t n_; 168   std::size_t n_;
  169 + bool canceled_ = false;
164   170  
HITCBC 165 - 738 bool await_ready() const noexcept { return true; } 171 + 739 bool await_ready() const noexcept { return false; }
166   172  
167 - // This method is required to satisfy Capy's IoAwaitable concept, 173 + // The operation completes synchronously, but await_suspend is
168 - // but is never called because await_ready() returns true. 174 + // the only place io_env is delivered (the promise's
169 - // 175 + // transform_awaiter forwards it here). Returning false means
170 - // Capy uses a two-layer awaitable system: the promise's 176 + // the coroutine does not actually suspend; it resumes
171 - // await_transform wraps awaitables in a transform_awaiter whose 177 + // immediately, having observed the stop token. See io_env,
172 - // standard await_suspend(coroutine_handle) calls this custom 178 + // IoAwaitable.
173 - // 2-argument overload, passing the io_env from the coroutine's 179 + bool
HITGIC 174 - // context. For synchronous test awaitables like this one, the 180 + 739 await_suspend(
175 - // coroutine never suspends, so this is not invoked. The signature  
176 - // exists to allow the same awaitable type to work with both  
177 - // synchronous (test) and asynchronous (real I/O) code.  
DUB 178 - void await_suspend(  
179   std::coroutine_handle<>, 181   std::coroutine_handle<>,
180 - io_env const*) const noexcept 182 + io_env const* env) noexcept
181   { 183   {
HITGNC   184 + 739 canceled_ = env->stop_token.stop_requested();
HITGNC   185 + 739 return false;
EUB 182   } 186   }
183   187  
184   io_result<> 188   io_result<>
HITCBC 185   738 await_resume() 189   739 await_resume()
186   { 190   {
HITGNC   191 + 739 if(canceled_)
HITGNC   192 + 1 return {error::canceled};
  193 +
HITCBC 187   738 auto ec = self_->f_.maybe_fail(); 194   738 auto ec = self_->f_.maybe_fail();
HITCBC 188   650 if(ec) 195   650 if(ec)
HITCBC 189   166 return {ec}; 196   166 return {ec};
190   197  
HITCBC 191   484 std::size_t to_commit = (std::min)(n_, self_->prepare_size_); 198   484 std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
HITCBC 192   484 self_->data_.append(self_->prepare_buf_.data(), to_commit); 199   484 self_->data_.append(self_->prepare_buf_.data(), to_commit);
HITCBC 193   484 self_->prepare_size_ = 0; 200   484 self_->prepare_size_ = 0;
194   201  
HITCBC 195   484 return {}; 202   484 return {};
196   } 203   }
197   }; 204   };
HITCBC 198   738 return awaitable{this, n}; 205   739 return awaitable{this, n};
199   } 206   }
200   207  
201   /** Commit final bytes and signal end-of-stream. 208   /** Commit final bytes and signal end-of-stream.
202   209  
203   Transfers `n` bytes from the prepared buffer to the internal 210   Transfers `n` bytes from the prepared buffer to the internal
204   data buffer and marks the sink as finalized. Before committing, 211   data buffer and marks the sink as finalized. Before committing,
205   the attached @ref fuse is consulted to possibly inject an error 212   the attached @ref fuse is consulted to possibly inject an error
206   for testing fault scenarios. 213   for testing fault scenarios.
207   214  
208   @param n The number of bytes to commit. 215   @param n The number of bytes to commit.
209   216  
210   @return An awaitable that await-returns `(error_code)`. 217   @return An awaitable that await-returns `(error_code)`.
211   218  
  219 + @par Cancellation
  220 + If the environment's stop token has been requested, the operation
  221 + completes immediately with `error::canceled`, commits no data, and
  222 + does not signal end-of-stream.
  223 +
212   @see fuse 224   @see fuse
213   */ 225   */
214   auto 226   auto
HITCBC 215   188 commit_eof(std::size_t n) 227   189 commit_eof(std::size_t n)
216   { 228   {
217   struct awaitable 229   struct awaitable
218   { 230   {
219   buffer_sink* self_; 231   buffer_sink* self_;
220   std::size_t n_; 232   std::size_t n_;
  233 + bool canceled_ = false;
221   234  
HITCBC 222 - 188 bool await_ready() const noexcept { return true; } 235 + 189 bool await_ready() const noexcept { return false; }
223   236  
224 - // This method is required to satisfy Capy's IoAwaitable concept, 237 + // Reads the stop token without suspending; see the comment
225 - // but is never called because await_ready() returns true. 238 + // on commit() for details.
226 - // See the comment on commit(std::size_t) for a detailed explanation. 239 + bool
HITGBC 227 - void await_suspend( 240 + 189 await_suspend(
228   std::coroutine_handle<>, 241   std::coroutine_handle<>,
229 - io_env const*) const noexcept 242 + io_env const* env) noexcept
230   { 243   {
HITGNC   244 + 189 canceled_ = env->stop_token.stop_requested();
HITGNC   245 + 189 return false;
EUB 231   } 246   }
232   247  
233   io_result<> 248   io_result<>
HITCBC 234   188 await_resume() 249   189 await_resume()
235   { 250   {
HITGNC   251 + 189 if(canceled_)
HITGNC   252 + 1 return {error::canceled};
  253 +
HITCBC 236   188 auto ec = self_->f_.maybe_fail(); 254   188 auto ec = self_->f_.maybe_fail();
HITCBC 237   136 if(ec) 255   136 if(ec)
HITCBC 238   52 return {ec}; 256   52 return {ec};
239   257  
HITCBC 240   84 std::size_t to_commit = (std::min)(n_, self_->prepare_size_); 258   84 std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
HITCBC 241   84 self_->data_.append(self_->prepare_buf_.data(), to_commit); 259   84 self_->data_.append(self_->prepare_buf_.data(), to_commit);
HITCBC 242   84 self_->prepare_size_ = 0; 260   84 self_->prepare_size_ = 0;
243   261  
HITCBC 244   84 self_->eof_called_ = true; 262   84 self_->eof_called_ = true;
HITCBC 245   84 return {}; 263   84 return {};
246   } 264   }
247   }; 265   };
HITCBC 248   188 return awaitable{this, n}; 266   189 return awaitable{this, n};
249   } 267   }
250   }; 268   };
251   269  
252   } // test 270   } // test
253   } // capy 271   } // capy
254   } // boost 272   } // boost
255   273  
256   #endif 274   #endif