TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_TEST_BUFFER_SINK_HPP
12 : #define BOOST_CAPY_TEST_BUFFER_SINK_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/make_buffer.hpp>
17 : #include <coroutine>
18 : #include <boost/capy/ex/io_env.hpp>
19 : #include <boost/capy/io_result.hpp>
20 : #include <boost/capy/test/fuse.hpp>
21 :
22 : #include <algorithm>
23 : #include <span>
24 : #include <string>
25 : #include <string_view>
26 :
27 : namespace boost {
28 : namespace capy {
29 : namespace test {
30 :
31 : /** A mock buffer sink for testing callee-owns-buffers write operations.
32 :
33 : Use this to verify code that writes data using the callee-owns-buffers
34 : pattern without needing real I/O. Call @ref prepare to get writable
35 : buffers, write into them, then call @ref commit to finalize. The
36 : associated @ref fuse enables error injection at controlled points.
37 :
38 : This class satisfies the @ref BufferSink concept by providing
39 : internal storage that callers write into directly.
40 :
41 : @par Thread Safety
42 : Not thread-safe.
43 :
44 : @par Example
45 : @code
46 : fuse f;
47 : buffer_sink bs( f );
48 :
49 : auto r = f.armed( [&]( fuse& ) -> task<void> {
50 : mutable_buffer arr[16];
51 : auto bufs = bs.prepare( arr );
52 : if( bufs.empty() )
53 : co_return;
54 :
55 : // Write data into the first prepared buffer
56 : std::memcpy( bufs[0].data(), "Hello", 5 );
57 :
58 : auto [ec] = co_await bs.commit( 5 );
59 : if( ec )
60 : co_return;
61 :
62 : auto [ec2] = co_await bs.commit_eof( 0 );
63 : // bs.data() returns "Hello"
64 : } );
65 : @endcode
66 :
67 : @see fuse, BufferSink
68 : */
69 : class buffer_sink
70 : {
71 : fuse f_;
72 : std::string data_;
73 : std::string prepare_buf_;
74 : std::size_t prepare_size_ = 0;
75 : std::size_t max_prepare_size_;
76 : bool eof_called_ = false;
77 :
78 : public:
79 : /** Construct a buffer sink.
80 :
81 : @param f The fuse used to inject errors during commits.
82 :
83 : @param max_prepare_size Maximum bytes available per prepare.
84 : Use to simulate limited buffer space.
85 : */
86 HIT 560 : explicit buffer_sink(
87 : fuse f = {},
88 : std::size_t max_prepare_size = 4096) noexcept
89 560 : : f_(std::move(f))
90 560 : , max_prepare_size_(max_prepare_size)
91 : {
92 560 : prepare_buf_.resize(max_prepare_size_);
93 560 : }
94 :
95 : /// Return the written data as a string view.
96 : std::string_view
97 82 : data() const noexcept
98 : {
99 82 : return data_;
100 : }
101 :
102 : /// Return the number of bytes written.
103 : std::size_t
104 14 : size() const noexcept
105 : {
106 14 : return data_.size();
107 : }
108 :
109 : /// Return whether commit_eof has been called.
110 : bool
111 79 : eof_called() const noexcept
112 : {
113 79 : return eof_called_;
114 : }
115 :
116 : /// Clear all data and reset state.
117 : void
118 2 : clear() noexcept
119 : {
120 2 : data_.clear();
121 2 : prepare_size_ = 0;
122 2 : eof_called_ = false;
123 2 : }
124 :
125 : /** Prepare writable buffers.
126 :
127 : Fills the provided span with mutable buffer descriptors pointing
128 : to internal storage. The caller writes data into these buffers,
129 : then calls @ref commit to finalize.
130 :
131 : @param dest Span of mutable_buffer to fill.
132 :
133 : @return A span of filled buffers (empty or 1 buffer in this implementation).
134 : */
135 : std::span<mutable_buffer>
136 842 : prepare(std::span<mutable_buffer> dest)
137 : {
138 842 : if(dest.empty())
139 2 : return {};
140 :
141 840 : prepare_size_ = max_prepare_size_;
142 840 : dest[0] = make_buffer(prepare_buf_.data(), prepare_size_);
143 840 : return dest.first(1);
144 : }
145 :
146 : /** Commit bytes written to the prepared buffers.
147 :
148 : Transfers `n` bytes from the prepared buffer to the internal
149 : data buffer. Before committing, the attached @ref fuse is
150 : consulted to possibly inject an error for testing fault scenarios.
151 :
152 : @param n The number of bytes to commit.
153 :
154 : @return An awaitable that await-returns `(error_code)`.
155 :
156 : @par Cancellation
157 : If the environment's stop token has been requested, the commit
158 : completes immediately with `error::canceled` and commits no data.
159 :
160 : @see fuse
161 : */
162 : auto
163 739 : commit(std::size_t n)
164 : {
165 : struct awaitable
166 : {
167 : buffer_sink* self_;
168 : std::size_t n_;
169 : bool canceled_ = false;
170 :
171 739 : bool await_ready() const noexcept { return false; }
172 :
173 : // The operation completes synchronously, but await_suspend is
174 : // the only place io_env is delivered (the promise's
175 : // transform_awaiter forwards it here). Returning false means
176 : // the coroutine does not actually suspend; it resumes
177 : // immediately, having observed the stop token. See io_env,
178 : // IoAwaitable.
179 : bool
180 739 : await_suspend(
181 : std::coroutine_handle<>,
182 : io_env const* env) noexcept
183 : {
184 739 : canceled_ = env->stop_token.stop_requested();
185 739 : return false;
186 : }
187 :
188 : io_result<>
189 739 : await_resume()
190 : {
191 739 : if(canceled_)
192 1 : return {error::canceled};
193 :
194 738 : auto ec = self_->f_.maybe_fail();
195 650 : if(ec)
196 166 : return {ec};
197 :
198 484 : std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
199 484 : self_->data_.append(self_->prepare_buf_.data(), to_commit);
200 484 : self_->prepare_size_ = 0;
201 :
202 484 : return {};
203 : }
204 : };
205 739 : return awaitable{this, n};
206 : }
207 :
208 : /** Commit final bytes and signal end-of-stream.
209 :
210 : Transfers `n` bytes from the prepared buffer to the internal
211 : data buffer and marks the sink as finalized. Before committing,
212 : the attached @ref fuse is consulted to possibly inject an error
213 : for testing fault scenarios.
214 :
215 : @param n The number of bytes to commit.
216 :
217 : @return An awaitable that await-returns `(error_code)`.
218 :
219 : @par Cancellation
220 : If the environment's stop token has been requested, the operation
221 : completes immediately with `error::canceled`, commits no data, and
222 : does not signal end-of-stream.
223 :
224 : @see fuse
225 : */
226 : auto
227 189 : commit_eof(std::size_t n)
228 : {
229 : struct awaitable
230 : {
231 : buffer_sink* self_;
232 : std::size_t n_;
233 : bool canceled_ = false;
234 :
235 189 : bool await_ready() const noexcept { return false; }
236 :
237 : // Reads the stop token without suspending; see the comment
238 : // on commit() for details.
239 : bool
240 189 : await_suspend(
241 : std::coroutine_handle<>,
242 : io_env const* env) noexcept
243 : {
244 189 : canceled_ = env->stop_token.stop_requested();
245 189 : return false;
246 : }
247 :
248 : io_result<>
249 189 : await_resume()
250 : {
251 189 : if(canceled_)
252 1 : return {error::canceled};
253 :
254 188 : auto ec = self_->f_.maybe_fail();
255 136 : if(ec)
256 52 : return {ec};
257 :
258 84 : std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
259 84 : self_->data_.append(self_->prepare_buf_.data(), to_commit);
260 84 : self_->prepare_size_ = 0;
261 :
262 84 : self_->eof_called_ = true;
263 84 : return {};
264 : }
265 : };
266 189 : return awaitable{this, n};
267 : }
268 : };
269 :
270 : } // test
271 : } // capy
272 : } // boost
273 :
274 : #endif
|