Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
AsyncStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Foundation/AlignedStorage.h"
6#include "../Foundation/Function.h"
7#include "../Foundation/Internal/IGrowableBuffer.h"
8#include "../Foundation/Result.h"
9#include "../Foundation/Span.h"
10#include "Internal/CircularQueue.h"
11#include "Internal/Event.h"
12
42
45namespace SC
46{
49{
50 struct ID
51 {
52 using NumericType = int32_t;
53 NumericType identifier;
54 static constexpr NumericType InvalidValue = -1;
55
56 constexpr ID() : identifier(InvalidValue) {}
57 explicit constexpr ID(int32_t value) : identifier(value) {}
58
59 [[nodiscard]] constexpr bool operator==(ID other) const { return identifier == other.identifier; }
60 };
61 enum class Type
62 {
63 Empty,
64 Writable,
65 ReadOnly,
66 Growable,
67 };
68
69 AsyncBufferView() { type = Type::Empty; }
70 AsyncBufferView(Span<char> data) : writableData(data) { type = Type::Writable; }
71 AsyncBufferView(Span<const char> data) : readonlyData(data) { type = Type::ReadOnly; }
72
76 template <typename T>
77 AsyncBufferView(T&& t) // universal reference, it can capture both lvalue and rvalue
78 {
79 type = Type::Growable;
80 // Here we're type-erasing T in our own inline storage provided by a slightly oversized Function<>
81 // that it will be able to construct (and destruct) the right GrowableBuffer<T> from just a piece of storage
82 // and return a pointer to the corresponding IGrowableBuffer* interface
83 getGrowableBuffer = [t = forward<T>(t)](GrowableStorage& storage, bool construct) mutable -> IGrowableBuffer*
84 {
85 using Type = typename TypeTraits::RemoveReference<T>::type;
86 if (construct)
87 {
88 placementNew(storage.reinterpret_as<GrowableBuffer<Type>>(), t);
89 return &storage.reinterpret_as<GrowableBuffer<Type>>();
90 }
91 else
92 {
93 dtor(storage.reinterpret_as<GrowableBuffer<Type>>());
94 return nullptr;
95 }
96 };
97 }
98
99 template <int N>
100 AsyncBufferView(const char (&literal)[N])
101 {
102 readonlyData = {literal, N - 1};
103 type = Type::ReadOnly;
104 }
105
106 Type getType() const { return type; }
107
108 private:
109 static constexpr int TypeErasedCaptureSize = sizeof(void*) * 3; // This is enough to hold String / Buffer by copy
110 static constexpr int TypeErasedGrowableSize = sizeof(void*) * 6;
111
112 using GrowableStorage = AlignedStorage<TypeErasedGrowableSize>;
113 Function<IGrowableBuffer*(GrowableStorage&, bool), TypeErasedCaptureSize> getGrowableBuffer;
114
115 union
116 {
117 Span<char> writableData;
118 Span<const char> readonlyData;
119 };
120
121 union
122 {
123 Span<char> originalWritableData;
124 Span<const char> originalReadonlyData;
125 };
126 friend struct AsyncBuffersPool;
127
128 int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it
129 Type type;
130};
131
164
172{
173 struct Request
174 {
175 AsyncBufferView::ID bufferID;
176 };
179
180 static constexpr int MaxListeners = 8;
181
182 Event<MaxListeners, Result> eventError;
183 Event<MaxListeners, AsyncBufferView::ID> eventData;
184 Event<MaxListeners> eventEnd;
185 Event<MaxListeners> eventClose;
186
190 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
191
194
196 void pause();
197
200
202 void destroy();
203
205 [[nodiscard]] bool isEnded() const { return state == State::Ended; }
206
209
211 void push(AsyncBufferView::ID bufferID, size_t newSize);
212
214 void pushEnd();
215
218 void reactivate(bool doReactivate);
219
221 void emitError(Result error);
222
224 [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
225
226 private:
227 void emitOnData();
228 void executeRead();
229
230 enum class State
231 {
232 Stopped, // Stream must be inited
233 CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume)
234 Reading, // A read is being issued (may be sync or async)
235 SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync)
236 SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true)
237 AsyncReading, // An async read is in flight
238 AsyncPushing, // AsyncReading + AsyncReadableStream::push
239 Pausing, // Pause requested while read in flight
240 Paused, // Actually paused with no read in flight
241 Ended, // Emitted all data, no more data will be emitted
242 Destroying, // Readable is waiting for async call before
243 Destroyed, // Readable has been destroyed before emitting all data
244 Errored, // Error occurred
245 };
246 State state = State::Stopped;
247
248 AsyncBuffersPool* buffers = nullptr;
249
250 CircularQueue<Request> readQueue;
251};
252
262{
265
266 struct Request
267 {
268 AsyncBufferView::ID bufferID;
269
271 };
272 static constexpr int MaxListeners = 8;
273
274 Event<MaxListeners, Result> eventError;
275 Event<MaxListeners> eventDrain;
276 Event<MaxListeners> eventFinish;
277
281 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
282
290
294
297 void end();
298
301
304
307
310
312 void emitError(Result error);
313
318
320 void tryAsync(Result potentialError);
321
322 void stop() { state = State::Stopped; }
323
324 private:
325 enum class State
326 {
327 Stopped,
328 Writing,
329 Ending,
330 Ended
331 };
332 State state = State::Stopped;
333
334 AsyncBuffersPool* buffers = nullptr;
335
336 CircularQueue<Request> writeQueue;
337};
338
341{
343
344 Result init(AsyncBuffersPool& buffersPool, Span<AsyncReadableStream::Request> readableRequests,
345 Span<AsyncWritableStream::Request> writableRequests);
346};
347
350{
352
353 void afterProcess(Span<const char> inputAfter, Span<char> outputAfter);
354 void afterFinalize(Span<char> outputAfter, bool streamEnded);
355
357 Function<Result(Span<char>)> onFinalize;
358
359 private:
360 Function<void(AsyncBufferView::ID)> inputCallback;
361
362 Span<const char> inputData;
363 Span<char> outputData;
364
365 AsyncBufferView::ID inputBufferID;
366 AsyncBufferView::ID outputBufferID;
367
368 Result transform(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
369 Result prepare(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
370
371 bool canEndTransform();
372 void tryFinalize();
373
374 enum class State
375 {
376 None,
377 Paused,
378 Processing,
379 Finalizing,
380 Finalized,
381 };
382 State state = State::None;
383};
384
393{
394 static constexpr int MaxListeners = 8;
395 static constexpr int MaxTransforms = 8;
396 static constexpr int MaxSinks = 8;
397
398 AsyncPipeline() = default;
399 AsyncPipeline(const AsyncPipeline&) = delete;
400 AsyncPipeline(AsyncPipeline&&) = delete;
401 AsyncPipeline& operator=(const AsyncPipeline&) = delete;
402 AsyncPipeline& operator=(AsyncPipeline&&) = delete;
404
405 AsyncReadableStream* source = nullptr;
406 AsyncDuplexStream* transforms[MaxTransforms] = {nullptr};
407 AsyncWritableStream* sinks[MaxSinks] = {nullptr};
408 Event<MaxListeners, Result> eventError = {};
409
413
415 [[nodiscard]] bool unpipe();
416
420
421 // TODO: Add a pause and cancel/step
422 private:
423 void emitError(Result res);
424 Result checkBuffersPool();
425 Result chainTransforms(AsyncReadableStream*& readable);
426 Result validate();
427
428 void asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncWritableStream& writable);
429 void dispatchToPipes(AsyncBufferView::ID bufferID);
430 void endPipes();
431 void afterWrite(AsyncBufferView::ID bufferID);
432 bool listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen);
433};
434} // namespace SC
constexpr T && forward(typename TypeTraits::RemoveReference< T >::type &value)
Forwards an lvalue or an rvalue as an rvalue reference.
Definition Compiler.h:267
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Definition AsyncStreams.h:51
A Span of bytes memory to be read or written by async streams.
Definition AsyncStreams.h:49
AsyncBufferView(T &&t)
Saves a copy (or a moved instance) of a String / Buffer (or anything that works with GrowableBuffer<T...
Definition AsyncStreams.h:77
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition AsyncStreams.h:135
Result getReadableData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result getWritableData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Span< AsyncBufferView > buffers
Span of buffers to be filled in by the user.
Definition AsyncStreams.h:137
Result pushBuffer(AsyncBufferView &&buffer, AsyncBufferView::ID &bufferID)
Adds a buffer to the pool in any empty slot (found by scanning from start to end)
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
A stream that can both produce and consume buffers.
Definition AsyncStreams.h:341
Pipes read data from SC::AsyncReadableStream, forwarding them to SC::AsyncWritableStream.
Definition AsyncStreams.h:393
Result pipe()
Reports errors by source, transforms or sinks.
bool unpipe()
Unregisters all events from source, transforms and sinks.
Result start()
Starts the pipeline.
AsyncDuplexStream * transforms[MaxTransforms]
Provided source (must be != nullptr)
Definition AsyncStreams.h:406
Event< MaxListeners, Result > eventError
Provided sinks (at least one must be != nullptr)
Definition AsyncStreams.h:408
AsyncWritableStream * sinks[MaxSinks]
Provided transforms (optional, can be all nullptrs)
Definition AsyncStreams.h:407
Definition AsyncStreams.h:174
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:172
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition AsyncStreams.h:184
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition AsyncStreams.h:185
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition AsyncStreams.h:183
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when the underlying resource has been closed.
void destroy()
Forcefully destroys the readable stream before it's end event releasing all resources.
void push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition AsyncStreams.h:205
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
Result start()
Starts the readable stream, that will emit eventData.
Function< Result()> asyncRead
Function that every stream must define to implement its custom read operation.
Definition AsyncStreams.h:178
A duplex stream that produces new buffers transforming received buffers.
Definition AsyncStreams.h:350
Definition AsyncStreams.h:267
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:262
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition AsyncStreams.h:276
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void emitError(Result error)
Signals an async error received.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition AsyncStreams.h:275
void resumeWriting()
Resumes writing queued requests for this stream.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
Function< Result(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)>)> asyncWrite
Function that every stream must define to implement its custom write operation.
Definition AsyncStreams.h:264
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when no more data can be written.
Function< bool()> canEndWritable
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
Definition AsyncStreams.h:317
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
Result write(AsyncBufferView &&bufferView, Function< void(AsyncBufferView::ID)> cb={})
Push a new buffer view to the queue, registering it with the allocator.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:12
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29