TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_TEST_BUFFER_SOURCE_HPP
12 : #define BOOST_CAPY_TEST_BUFFER_SOURCE_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/make_buffer.hpp>
17 : #include <coroutine>
18 : #include <boost/capy/error.hpp>
19 : #include <boost/capy/ex/io_env.hpp>
20 : #include <boost/capy/io_result.hpp>
21 : #include <boost/capy/test/fuse.hpp>
22 :
23 : #include <algorithm>
24 : #include <span>
25 : #include <string>
26 : #include <string_view>
27 :
28 : namespace boost {
29 : namespace capy {
30 : namespace test {
31 :
32 : /** A mock buffer source for testing push operations.
33 :
34 : Use this to verify code that transfers data from a buffer source to
35 : a sink without needing real I/O. Call @ref provide to supply data,
36 : then @ref pull to retrieve buffer descriptors. The associated
37 : @ref fuse enables error injection at controlled points.
38 :
39 : This class satisfies the @ref BufferSource concept by providing
40 : a pull interface that fills an array of buffer descriptors and
41 : a consume interface to indicate bytes used.
42 :
43 : @par Thread Safety
44 : Not thread-safe.
45 :
46 : @par Example
47 : @code
48 : fuse f;
49 : buffer_source bs( f );
50 : bs.provide( "Hello, " );
51 : bs.provide( "World!" );
52 :
53 : auto r = f.armed( [&]( fuse& ) -> task<void> {
54 : const_buffer arr[16];
55 : auto [ec, bufs] = co_await bs.pull( arr );
56 : if( ec )
57 : co_return;
58 : // bufs contains buffer descriptors
59 : std::size_t n = buffer_size( bufs );
60 : bs.consume( n );
61 : } );
62 : @endcode
63 :
64 : @see fuse, BufferSource
65 : */
66 : class buffer_source
67 : {
68 : fuse f_;
69 : std::string data_;
70 : std::size_t pos_ = 0;
71 : std::size_t max_pull_size_;
72 :
73 : public:
74 : /** Construct a buffer source.
75 :
76 : @param f The fuse used to inject errors during pulls.
77 :
78 : @param max_pull_size Maximum bytes returned per pull.
79 : Use to simulate chunked delivery.
80 : */
81 HIT 377 : explicit buffer_source(
82 : fuse f = {},
83 : std::size_t max_pull_size = std::size_t(-1)) noexcept
84 377 : : f_(std::move(f))
85 377 : , max_pull_size_(max_pull_size)
86 : {
87 377 : }
88 :
89 : /** Append data to be returned by subsequent pulls.
90 :
91 : Multiple calls accumulate data that @ref pull returns.
92 :
93 : @param sv The data to append.
94 : */
95 : void
96 389 : provide(std::string_view sv)
97 : {
98 389 : data_.append(sv);
99 389 : }
100 :
101 : /// Clear all data and reset the read position.
102 : void
103 6 : clear() noexcept
104 : {
105 6 : data_.clear();
106 6 : pos_ = 0;
107 6 : }
108 :
109 : /// Return the number of bytes available for pulling.
110 : std::size_t
111 18 : available() const noexcept
112 : {
113 18 : return data_.size() - pos_;
114 : }
115 :
116 : /** Consume bytes from the source.
117 :
118 : Advances the internal read position by the specified number
119 : of bytes. The next call to @ref pull returns data starting
120 : after the consumed bytes.
121 :
122 : @param n The number of bytes to consume. Must not exceed the
123 : total size of buffers returned by the previous @ref pull.
124 : */
125 : void
126 319 : consume(std::size_t n) noexcept
127 : {
128 319 : pos_ += n;
129 319 : }
130 :
131 : /** Pull buffer data from the source.
132 :
133 : Fills the provided span with buffer descriptors pointing to
134 : internal data starting from the current unconsumed position.
135 : Returns a span of filled buffers. When no data remains,
136 : returns an empty span to signal completion.
137 :
138 : Calling pull multiple times without intervening @ref consume
139 : returns the same data. Use consume to advance past processed
140 : bytes.
141 :
142 : @param dest Span of const_buffer to fill.
143 :
144 : @return An awaitable that await-returns `(error_code,std::span<const_buffer>)`.
145 :
146 : @par Cancellation
147 : If the environment's stop token has been requested, the pull
148 : completes immediately with `error::canceled` and an empty span.
149 :
150 : @see consume, fuse
151 : */
152 : auto
153 657 : pull(std::span<const_buffer> dest)
154 : {
155 : struct awaitable
156 : {
157 : buffer_source* self_;
158 : std::span<const_buffer> dest_;
159 : bool canceled_ = false;
160 :
161 657 : bool await_ready() const noexcept { return false; }
162 :
163 : // The operation completes synchronously, but await_suspend is
164 : // the only place io_env is delivered (the promise's
165 : // transform_awaiter forwards it here). Returning false means
166 : // the coroutine does not actually suspend; it resumes
167 : // immediately, having observed the stop token. See io_env,
168 : // IoAwaitable.
169 : bool
170 657 : await_suspend(
171 : std::coroutine_handle<>,
172 : io_env const* env) noexcept
173 : {
174 657 : canceled_ = env->stop_token.stop_requested();
175 657 : return false;
176 : }
177 :
178 : io_result<std::span<const_buffer>>
179 657 : await_resume()
180 : {
181 657 : if(canceled_)
182 1 : return {error::canceled, {}};
183 :
184 656 : auto ec = self_->f_.maybe_fail();
185 546 : if(ec)
186 110 : return {ec, {}};
187 :
188 436 : if(self_->pos_ >= self_->data_.size())
189 72 : return {error::eof, {}};
190 :
191 364 : std::size_t avail = self_->data_.size() - self_->pos_;
192 364 : std::size_t to_return = (std::min)(avail, self_->max_pull_size_);
193 :
194 364 : if(dest_.empty())
195 2 : return {{}, {}};
196 :
197 : // Fill a single buffer descriptor
198 362 : dest_[0] = make_buffer(
199 362 : self_->data_.data() + self_->pos_,
200 : to_return);
201 :
202 362 : return {{}, dest_.first(1)};
203 : }
204 : };
205 657 : return awaitable{this, dest};
206 : }
207 : };
208 :
209 : } // test
210 : } // capy
211 : } // boost
212 :
213 : #endif
|