Sane C++ Libraries
C++ Platform Abstraction Libraries
AsyncStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Foundation/Function.h"
6#include "../Foundation/Result.h"
7#include "../Foundation/Span.h"
8#include "../Foundation/StrongID.h"
9#include "Internal/CircularQueue.h"
10#include "Internal/Event.h"
11
41
44namespace SC
45{
46
49{
50 struct Tag
51 {
52 };
53 using ID = StrongID<Tag>;
54 Span<char> data;
55
56 private:
57 Span<char> originalData;
58 friend struct AsyncBuffersPool;
59
60 int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it
61};
62
66{
69
72
76
79
82
85
87 Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
88
90 void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes);
91};
92
100{
101 struct Request
102 {
103 AsyncBufferView::ID bufferID;
104 };
107
108 static constexpr int MaxListeners = 8;
109
110 Event<MaxListeners, Result> eventError;
111 Event<MaxListeners, AsyncBufferView::ID> eventData;
112 Event<MaxListeners> eventEnd;
113 Event<MaxListeners> eventClose;
114
118 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
119
122
124 void pause();
125
128
130 void destroy();
131
133 [[nodiscard]] bool isEnded() const { return state == State::Ended; }
134
137
139 void push(AsyncBufferView::ID bufferID, size_t newSize);
140
142 void pushEnd();
143
146 void reactivate(bool doReactivate);
147
149 void emitError(Result error);
150
152 [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
153
154 private:
155 void emitOnData();
156 void executeRead();
157
158 enum class State
159 {
160 Stopped, // Stream must be inited
161 CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume)
162 Reading, // A read is being issued (may be sync or async)
163 SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync)
164 SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true)
165 AsyncReading, // An async read is in flight
166 AsyncPushing, // AsyncReading + AsyncReadableStream::push
167 Pausing, // Pause requested while read in flight
168 Paused, // Actually paused with no read in flight
169 Ended, // Emitted all data, no more data will be emitted
170 Destroying, // Readable is waiting for async call before
171 Destroyed, // Readable has been destroyed before emitting all data
172 Errored, // Error occurred
173 };
174 State state = State::Stopped;
175
176 AsyncBuffersPool* buffers = nullptr;
177
178 CircularQueue<Request> readQueue;
179};
180
190{
193
194 struct Request
195 {
196 AsyncBufferView::ID bufferID;
197
199 };
200 static constexpr int MaxListeners = 8;
201
202 Event<MaxListeners, Result> eventError;
203 Event<MaxListeners> eventDrain;
204 Event<MaxListeners> eventFinish;
205
209 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
210
218
222
225 template <size_t N>
226 [[nodiscard]] Result write(const char (&str)[N])
227 {
228 return write(Span<const char>(str, N - 1));
229 }
230
233 void end();
234
237
240
243
246
248 void emitError(Result error);
249
254
256 void tryAsync(Result potentialError);
257
258 void stop() { state = State::Stopped; }
259
260 private:
261 enum class State
262 {
263 Stopped,
264 Writing,
265 Ending,
266 Ended
267 };
268 State state = State::Stopped;
269
270 AsyncBuffersPool* buffers = nullptr;
271
272 CircularQueue<Request> writeQueue;
273};
274
277{
279
280 Result init(AsyncBuffersPool& buffersPool, Span<AsyncReadableStream::Request> readableRequests,
281 Span<AsyncWritableStream::Request> writableRequests);
282};
283
286{
288
289 void afterProcess(Span<const char> inputAfter, Span<char> outputAfter);
290 void afterFinalize(Span<char> outputAfter, bool streamEnded);
291
293 Function<Result(Span<char>)> onFinalize;
294
295 private:
296 Function<void(AsyncBufferView::ID)> inputCallback;
297
298 Span<const char> inputData;
299 Span<char> outputData;
300
301 AsyncBufferView::ID inputBufferID;
302 AsyncBufferView::ID outputBufferID;
303
304 Result transform(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
305 Result prepare(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
306
307 bool canEndTransform();
308 void tryFinalize();
309
310 enum class State
311 {
312 None,
313 Paused,
314 Processing,
315 Finalizing,
316 Finalized,
317 };
318 State state = State::None;
319};
320
329{
330 static constexpr int MaxListeners = 8;
331
332 AsyncPipeline() = default;
333 AsyncPipeline(const AsyncPipeline&) = delete;
334 AsyncPipeline(AsyncPipeline&&) = delete;
335 AsyncPipeline& operator=(const AsyncPipeline&) = delete;
336 AsyncPipeline& operator=(AsyncPipeline&&) = delete;
338 Event<MaxListeners, Result> eventError;
339
342
345 Span<AsyncWritableStream*> asyncSinks);
346
348 [[nodiscard]] bool unpipe();
349
353
354 // TODO: Add a pause and cancel/step
355 private:
356 AsyncReadableStream* source = nullptr;
357
359
360 Span<AsyncDuplexStream*> transforms;
361
362 void emitError(Result res);
363 Result checkBuffersPool();
364 Result chainTransforms(AsyncReadableStream*& readable);
365
366 void asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncWritableStream& writable);
367 void dispatchToPipes(AsyncBufferView::ID bufferID);
368 void endPipes();
369 void afterWrite(AsyncBufferView::ID bufferID);
370 bool listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen);
371};
372} // namespace SC
int int32_t
Platform independent (4) bytes signed int.
Definition: PrimitiveTypes.h:46
Definition: AsyncStreams.h:51
A Span of bytes memory to be read or written by async streams.
Definition: AsyncStreams.h:49
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition: AsyncStreams.h:66
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
Result getData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Span< AsyncBufferView > buffers
Span of buffers to be filled in by the user.
Definition: AsyncStreams.h:68
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
Result getData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
A stream that can both produce and consume buffers.
Definition: AsyncStreams.h:277
Pipes reads on SC::AsyncReadableStream to SC::AsyncWritableStream.
Definition: AsyncStreams.h:329
bool unpipe()
Unregisters all events from source, transforms ans sinks.
Result pipe(AsyncReadableStream &asyncSource, Span< AsyncDuplexStream * > asyncTransforms, Span< AsyncWritableStream * > asyncSinks)
Inits the pipeline with a source, transforms and some writable sinks.
Result start()
Starts the pipeline.
Result pipe(AsyncReadableStream &asyncSource, Span< AsyncWritableStream * > asyncSinks)
Reports errors by source, transforms or sinks.
Definition: AsyncStreams.h:102
Async source abstraction emitting data events in caller provided byte buffers.
Definition: AsyncStreams.h:100
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition: AsyncStreams.h:112
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition: AsyncStreams.h:113
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition: AsyncStreams.h:111
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when the underlying resource has been closed.
void destroy()
Forcefully destroys the readable stream before it's end event releasing all resources.
void push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition: AsyncStreams.h:133
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
Result start()
Starts the readable stream, that will emit eventData.
Function< Result()> asyncRead
Function that every stream must define to implement its custom read operation.
Definition: AsyncStreams.h:106
A duplex stream that produces new buffers transforming received buffers.
Definition: AsyncStreams.h:286
Definition: AsyncStreams.h:195
Async destination abstraction where bytes can be written to.
Definition: AsyncStreams.h:190
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition: AsyncStreams.h:204
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void emitError(Result error)
Signals an async error received.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition: AsyncStreams.h:203
void resumeWriting()
Resumes writing queued requests for this stream.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
Function< Result(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)>)> asyncWrite
Function that every stream must define to implement its custom write operation.
Definition: AsyncStreams.h:192
Result write(const char(&str)[N])
Write a C-string literal in the stream.
Definition: AsyncStreams.h:226
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when no more data can be written.
Function< bool()> canEndWritable
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
Definition: AsyncStreams.h:253
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
Result write(Span< const char > data, Function< void(AsyncBufferView::ID)> cb={})
Try requesting a buffer big enough and copy data into it.
Wraps function pointers, member functions and lambdas without ever allocating.
Definition: Function.h:19
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition: Result.h:12
View over a contiguous sequence of items (pointer + size in elements).
Definition: Span.h:24