Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
AsyncStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Foundation/AlignedStorage.h"
6#include "../Foundation/Function.h"
7#include "../Foundation/Internal/IGrowableBuffer.h"
8#include "../Foundation/Result.h"
9#include "../Foundation/Span.h"
10#include "Internal/CircularQueue.h"
11#include "Internal/Event.h"
12
42
45namespace SC
46{
49{
50 struct ID
51 {
52 using NumericType = int32_t;
53 NumericType identifier;
54 static constexpr NumericType InvalidValue = -1;
55
56 constexpr ID() : identifier(InvalidValue) {}
57 explicit constexpr ID(int32_t value) : identifier(value) {}
58
59 [[nodiscard]] constexpr bool operator==(ID other) const { return identifier == other.identifier; }
60 };
61 enum class Type : uint8_t
62 {
63 Empty,
64 Writable,
65 ReadOnly,
66 Growable,
67 };
68
69 AsyncBufferView() { type = Type::Empty; }
70 AsyncBufferView(Span<char> data) : writableData(data) { type = Type::Writable; }
71 AsyncBufferView(Span<const char> data) : readonlyData(data) { type = Type::ReadOnly; }
72
74 void setReusable(bool reusable) { reUse = reusable; }
75
79 template <typename T>
80 AsyncBufferView(T&& t) // universal reference, it can capture both lvalue and rvalue
81 {
82 type = Type::Growable;
83 // Here we're type-erasing T in our own inline storage provided by a slightly oversized Function<>
84 // that it will be able to construct (and destruct) the right GrowableBuffer<T> from just a piece of storage
85 // and return a pointer to the corresponding IGrowableBuffer* interface
86 getGrowableBuffer = [t = forward<T>(t)](GrowableStorage& storage, bool construct) mutable -> IGrowableBuffer*
87 {
88 using Type = typename TypeTraits::RemoveReference<T>::type;
89 if (construct)
90 {
91 placementNew(storage.reinterpret_as<GrowableBuffer<Type>>(), t);
92 return &storage.reinterpret_as<GrowableBuffer<Type>>();
93 }
94 else
95 {
96 dtor(storage.reinterpret_as<GrowableBuffer<Type>>());
97 return nullptr;
98 }
99 };
100 }
101
102 template <int N>
103 AsyncBufferView(const char (&literal)[N])
104 {
105 readonlyData = {literal, N - 1};
106 type = Type::ReadOnly;
107 }
108
109 Type getType() const { return type; }
110
111 private:
112 static constexpr int TypeErasedCaptureSize = sizeof(void*) * 3; // This is enough to hold String / Buffer by copy
113 static constexpr int TypeErasedGrowableSize = sizeof(void*) * 6;
114
115 using GrowableStorage = AlignedStorage<TypeErasedGrowableSize>;
116 Function<IGrowableBuffer*(GrowableStorage&, bool), TypeErasedCaptureSize> getGrowableBuffer;
117
118 union
119 {
120 Span<char> writableData;
121 Span<const char> readonlyData;
122 };
123
124 union
125 {
126 Span<char> originalWritableData;
127 Span<const char> originalReadonlyData;
128 };
129 friend struct AsyncBuffersPool;
130
131 int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it
132 Type type; // If it's Empty, Writable, ReadOnly or Growable
133 bool reUse = false; // If it can be re-used after refs == 0
134};
135
168
176{
177 struct Request
178 {
179 AsyncBufferView::ID bufferID;
180 };
183
184 static constexpr int MaxListeners = 8;
185
186 Event<MaxListeners, Result> eventError;
187 Event<MaxListeners, AsyncBufferView::ID> eventData;
188 Event<MaxListeners> eventEnd;
189 Event<MaxListeners> eventClose;
190
194 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
195
198
200 void pause();
201
204
206 void destroy();
207
209 [[nodiscard]] bool isEnded() const { return state == State::Ended; }
210
213
216 [[nodiscard]] bool push(AsyncBufferView::ID bufferID, size_t newSize);
217
219 void pushEnd();
220
223 void reactivate(bool doReactivate);
224
226 void emitError(Result error);
227
229 [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
230
231 private:
232 void emitOnData();
233 void executeRead();
234
235 enum class State
236 {
237 Stopped, // Stream must be inited
238 CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume)
239 Reading, // A read is being issued (may be sync or async)
240 SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync)
241 SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true)
242 AsyncReading, // An async read is in flight
243 AsyncPushing, // AsyncReading + AsyncReadableStream::push
244 Pausing, // Pause requested while read in flight
245 Paused, // Actually paused with no read in flight
246 Ended, // Emitted all data, no more data will be emitted
247 Destroyed, // Readable has been destroyed before emitting all data
248 Errored, // Error occurred
249 };
250 State state = State::Stopped;
251
252 AsyncBuffersPool* buffers = nullptr;
253
254 CircularQueue<Request> readQueue;
255};
256
266{
269
270 struct Request
271 {
272 AsyncBufferView::ID bufferID;
273
275 };
276 static constexpr int MaxListeners = 8;
277
278 Event<MaxListeners, Result> eventError;
279
280 Event<MaxListeners> eventDrain;
281 Event<MaxListeners> eventFinish;
282
286 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
287
295
299
302 void end();
303
306
309
312
315
317 void emitError(Result error);
318
323
325 void tryAsync(Result potentialError);
326
327 void stop() { state = State::Stopped; }
328
329 private:
330 enum class State
331 {
332 Stopped,
333 Writing,
334 Ending,
335 Ended
336 };
337 State state = State::Stopped;
338
339 AsyncBuffersPool* buffers = nullptr;
340
341 CircularQueue<Request> writeQueue;
342};
343
346{
348
349 Result init(AsyncBuffersPool& buffersPool, Span<AsyncReadableStream::Request> readableRequests,
350 Span<AsyncWritableStream::Request> writableRequests);
351};
352
355{
357
358 void afterProcess(Span<const char> inputAfter, Span<char> outputAfter);
359 void afterFinalize(Span<char> outputAfter, bool streamEnded);
360
362 Function<Result(Span<char>)> onFinalize;
363
364 private:
365 Function<void(AsyncBufferView::ID)> inputCallback;
366
367 Span<const char> inputData;
368 Span<char> outputData;
369
370 AsyncBufferView::ID inputBufferID;
371 AsyncBufferView::ID outputBufferID;
372
373 Result transform(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
374 Result prepare(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
375
376 bool canEndTransform();
377 void tryFinalize();
378
379 enum class State
380 {
381 None,
382 Paused,
383 Processing,
384 Finalizing,
385 Finalized,
386 };
387 State state = State::None;
388};
389
398{
399 static constexpr int MaxListeners = 8;
400 static constexpr int MaxTransforms = 8;
401 static constexpr int MaxSinks = 8;
402
403 AsyncPipeline() = default;
404 AsyncPipeline(const AsyncPipeline&) = delete;
405 AsyncPipeline(AsyncPipeline&&) = delete;
406 AsyncPipeline& operator=(const AsyncPipeline&) = delete;
407 AsyncPipeline& operator=(AsyncPipeline&&) = delete;
409
410 AsyncReadableStream* source = nullptr;
411 AsyncDuplexStream* transforms[MaxTransforms] = {nullptr};
412 AsyncWritableStream* sinks[MaxSinks] = {nullptr};
413 Event<MaxListeners, Result> eventError = {};
414
418
420 [[nodiscard]] bool unpipe();
421
425
426 // TODO: Add a pause and cancel/step
427 private:
428 void emitError(Result res);
429 Result checkBuffersPool();
430 Result chainTransforms(AsyncReadableStream*& readable);
431 Result validate();
432
433 void asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncWritableStream& writable);
434 void dispatchToPipes(AsyncBufferView::ID bufferID);
435 void endPipes();
436 void afterWrite(AsyncBufferView::ID bufferID);
437 bool listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen);
438};
439} // namespace SC
constexpr T && forward(typename TypeTraits::RemoveReference< T >::type &value)
Forwards an lvalue or an rvalue as an rvalue reference.
Definition Compiler.h:267
unsigned char uint8_t
Platform independent (1) byte unsigned int.
Definition PrimitiveTypes.h:36
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Definition AsyncStreams.h:51
A Span of bytes memory to be read or written by async streams.
Definition AsyncStreams.h:49
AsyncBufferView(T &&t)
Saves a copy (or a moved instance) of a String / Buffer (or anything that works with GrowableBuffer<T...
Definition AsyncStreams.h:80
void setReusable(bool reusable)
Tags this AsyncBufferView as reusable after its refCount goes to zero.
Definition AsyncStreams.h:74
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition AsyncStreams.h:139
Result getReadableData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result getWritableData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Span< AsyncBufferView > buffers
Span of buffers to be filled in by the user.
Definition AsyncStreams.h:141
Result pushBuffer(AsyncBufferView &&buffer, AsyncBufferView::ID &bufferID)
Adds a buffer to the pool in any empty slot (found by scanning from start to end)
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
A stream that can both produce and consume buffers.
Definition AsyncStreams.h:346
Pipes read data from SC::AsyncReadableStream, forwarding them to SC::AsyncWritableStream.
Definition AsyncStreams.h:398
Result pipe()
Reports errors by source, transforms or sinks.
bool unpipe()
Unregisters all events from source, transforms and sinks.
Result start()
Starts the pipeline.
AsyncDuplexStream * transforms[MaxTransforms]
Provided source (must be != nullptr)
Definition AsyncStreams.h:411
Event< MaxListeners, Result > eventError
Provided sinks (at least one must be != nullptr)
Definition AsyncStreams.h:413
AsyncWritableStream * sinks[MaxSinks]
Provided transforms (optional, can be all nullptrs)
Definition AsyncStreams.h:412
Definition AsyncStreams.h:178
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:176
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
bool push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition AsyncStreams.h:188
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition AsyncStreams.h:189
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition AsyncStreams.h:187
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when the underlying resource has been closed.
void destroy()
Forcefully destroys the readable stream before it's end event releasing all resources.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition AsyncStreams.h:209
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
Result start()
Starts the readable stream, that will emit eventData.
Function< Result()> asyncRead
Function that every stream must define to implement its custom read operation.
Definition AsyncStreams.h:182
A duplex stream that produces new buffers transforming received buffers.
Definition AsyncStreams.h:355
Definition AsyncStreams.h:271
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:266
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition AsyncStreams.h:281
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void emitError(Result error)
Signals an async error received.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition AsyncStreams.h:280
void resumeWriting()
Resumes writing queued requests for this stream.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
Function< Result(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)>)> asyncWrite
Function that every stream must define to implement its custom write operation.
Definition AsyncStreams.h:268
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when no more data can be written.
Function< bool()> canEndWritable
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
Definition AsyncStreams.h:322
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
Result write(AsyncBufferView &&bufferView, Function< void(AsyncBufferView::ID)> cb={})
Push a new buffer view to the queue, registering it with the allocator.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:12
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29