Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
AsyncStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Foundation/AlignedStorage.h"
6#include "../Foundation/Function.h"
7#include "../Foundation/Internal/IGrowableBuffer.h"
8#include "../Foundation/Result.h"
9#include "../Foundation/Span.h"
10#include "Internal/CircularQueue.h"
11#include "Internal/Event.h"
12
42
45namespace SC
46{
47
49struct SC_COMPILER_EXPORT AsyncBufferView
50{
51 struct SC_COMPILER_EXPORT ID
52 {
53 using NumericType = int32_t;
54 NumericType identifier;
55 static constexpr NumericType InvalidValue = -1;
56
57 constexpr ID() : identifier(InvalidValue) {}
58 explicit constexpr ID(int32_t value) : identifier(value) {}
59
60 [[nodiscard]] constexpr bool operator==(ID other) const { return identifier == other.identifier; }
61 };
62 enum class Type : uint8_t
63 {
64 Empty,
65 Writable,
66 ReadOnly,
67 Growable,
68 Child,
69 };
70
71 AsyncBufferView() : writableData(), offset(0), length(0), refs(0), type(Type::Empty), reUse(false) {}
72 AsyncBufferView(Span<char> data) : writableData(data)
73 {
74 type = Type::Writable;
75 offset = 0;
76 length = data.sizeInBytes();
77 parentID = ID();
78 }
79 AsyncBufferView(Span<const char> data) : readonlyData(data)
80 {
81 type = Type::ReadOnly;
82 offset = 0;
83 length = data.sizeInBytes();
84 parentID = ID();
85 }
86
88 void setReusable(bool reusable) { reUse = reusable; }
89
93 template <typename T>
94 AsyncBufferView(T&& t) // universal reference, it can capture both lvalue and rvalue
95 {
96 type = Type::Growable;
97 offset = 0;
98 length = 0;
99 parentID = ID();
100 // Here we're type-erasing T in our own inline storage provided by a slightly oversized Function<>
101 // that it will be able to construct (and destruct) the right GrowableBuffer<T> from just a piece of storage
102 // and return a pointer to the corresponding IGrowableBuffer* interface
103 getGrowableBuffer = [t = forward<T>(t)](GrowableStorage& storage, bool construct) mutable -> IGrowableBuffer*
104 {
105 using Type = typename TypeTraits::RemoveReference<T>::type;
106 if (construct)
107 {
108 placementNew(storage.reinterpret_as<GrowableBuffer<Type>>(), t);
109 return &storage.reinterpret_as<GrowableBuffer<Type>>();
110 }
111 else
112 {
113 dtor(storage.reinterpret_as<GrowableBuffer<Type>>());
114 return nullptr;
115 }
116 };
117 }
118
119 template <int N>
120 AsyncBufferView(const char (&literal)[N])
121 {
122 readonlyData = {literal, N - 1};
123 type = Type::ReadOnly;
124 offset = 0;
125 length = N - 1;
126 }
127
128 Type getType() const { return type; }
129
130 private:
131#if SC_PLATFORM_64_BIT
132 static constexpr int TypeErasedCaptureSize = sizeof(void*) * 3; // This is enough to hold String / Buffer by copy
133#else
134 static constexpr int TypeErasedCaptureSize = sizeof(void*) * 6; // This is enough to hold String / Buffer by copy
135#endif
136 static constexpr int TypeErasedGrowableSize = sizeof(void*) * 6;
137
138 using GrowableStorage = AlignedStorage<TypeErasedGrowableSize>;
139 Function<IGrowableBuffer*(GrowableStorage&, bool), TypeErasedCaptureSize> getGrowableBuffer;
140
141 union
142 {
143 Span<char> writableData;
144 Span<const char> readonlyData;
145 };
146 AsyncBufferView::ID parentID;
147
148 friend struct AsyncBuffersPool;
149
150 size_t offset = 0;
151 size_t length = 0;
152 int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it
153 Type type = Type::Empty; // If it's Empty, Writable, ReadOnly, Growable or Child
154 bool reUse = false; // If it can be re-used after refs == 0
155};
156
159struct SC_COMPILER_EXPORT AsyncBuffersPool
160{
163
167
170
173
176
178 Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
179
181 void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes);
182
185
187 static Result sliceInEqualParts(Span<AsyncBufferView> buffers, Span<char> memory, size_t numSlices);
188
190 void setBuffers(Span<AsyncBufferView> newBuffers) { buffers = newBuffers; }
191
193 [[nodiscard]] size_t getNumBuffers() const { return buffers.sizeInElements(); }
194
196 Result createChildView(AsyncBufferView::ID parentBufferID, size_t offset, size_t length,
197 AsyncBufferView::ID& outChildBufferID);
198
199 private:
201 Span<AsyncBufferView> buffers;
202};
203
210struct SC_COMPILER_EXPORT AsyncReadableStream
211{
212 struct Request
213 {
214 AsyncBufferView::ID bufferID;
215 };
218
219 static constexpr int MaxListeners = 8;
220
221 Event<MaxListeners, Result> eventError;
222 Event<MaxListeners, AsyncBufferView::ID> eventData;
223 Event<MaxListeners> eventEnd;
224 Event<MaxListeners> eventClose;
225
230
233
235 void pause();
236
239
242 void destroy();
243
245 [[nodiscard]] bool isEnded() const { return state == State::Ended; }
246
249
251 constexpr void setReadQueue(Span<Request> requests) { readQueue = requests; }
252
254 [[nodiscard]] size_t getReadQueueSize() const { return readQueue.size(); }
255
258 [[nodiscard]] bool push(AsyncBufferView::ID bufferID, size_t newSize);
259
262
264 void pushEnd();
265
268 void reactivate(bool doReactivate);
269
271 void emitError(Result error);
272
274 [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
275
276 private:
277 void emitOnData();
278 void executeRead();
279
280 enum class State
281 {
282 Stopped, // Stream must be inited
283 CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume)
284 Reading, // A read is being issued (may be sync or async)
285 SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync)
286 SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true)
287 AsyncReading, // An async read is in flight
288 AsyncPushing, // AsyncReading + AsyncReadableStream::push
289 Pausing, // Pause requested while read in flight
290 Paused, // Actually paused with no read in flight
291 Ended, // Emitted all data, no more data will be emitted
292 Destroyed, // Readable has been destroyed before emitting all data
293 Errored, // Error occurred
294 };
295 State state = State::Stopped;
296
297 AsyncBuffersPool* buffers = nullptr;
298
299 CircularQueue<Request> readQueue;
300};
301
310struct SC_COMPILER_EXPORT AsyncWritableStream
311{
314
315 struct Request
316 {
317 AsyncBufferView::ID bufferID;
318
320 };
321 static constexpr int MaxListeners = 8;
322
323 Event<MaxListeners, Result> eventError;
324
325 Event<MaxListeners> eventDrain;
326 Event<MaxListeners> eventFinish;
327
332
334 constexpr void setWriteQueue(Span<Request> requests) { writeQueue = requests; }
335
337 [[nodiscard]] size_t getWriteQueueSize() const { return writeQueue.size(); }
338
345 Result write(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb = {});
346
349 Result write(AsyncBufferView&& bufferView, Function<void(AsyncBufferView::ID)> cb = {});
350
353 void end();
354
357 void destroy();
358
361
363 void finishedWriting(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)>&& cb, Result res);
364
367
369 Result unshift(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)>&& cb);
370
372 void emitError(Result error);
373
378
380 void tryAsync(Result potentialError);
381
383 [[nodiscard]] bool isStillWriting() const { return state == State::Writing or state == State::Ending; }
384
385 protected:
386 void stop() { state = State::Stopped; }
387
388 private:
389 enum class State
390 {
391 Stopped,
392 Writing,
393 Ending,
394 Ended
395 };
396 State state = State::Stopped;
397
398 AsyncBuffersPool* buffers = nullptr;
399
400 CircularQueue<Request> writeQueue;
401};
402
404struct SC_COMPILER_EXPORT AsyncDuplexStream : public AsyncReadableStream, public AsyncWritableStream
405{
407
408 Result init(AsyncBuffersPool& buffersPool, Span<AsyncReadableStream::Request> readableRequests,
409 Span<AsyncWritableStream::Request> writableRequests);
410};
411
413struct SC_COMPILER_EXPORT AsyncTransformStream : public AsyncDuplexStream
414{
416
417 void afterProcess(Span<const char> inputAfter, Span<char> outputAfter);
418 void afterFinalize(Span<char> outputAfter, bool streamEnded);
419
421 Function<Result(Span<char>)> onFinalize;
422
423 private:
424 Function<void(AsyncBufferView::ID)> inputCallback;
425
426 Span<const char> inputData;
427 Span<char> outputData;
428
429 AsyncBufferView::ID inputBufferID;
430 AsyncBufferView::ID outputBufferID;
431
432 Result transform(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
433 Result prepare(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
434
435 bool canEndTransform();
436 void tryFinalize();
437
438 enum class State
439 {
440 None,
441 Paused,
442 Processing,
443 Finalizing,
444 Finalized,
445 };
446 State state = State::None;
447};
448
456struct SC_COMPILER_EXPORT AsyncPipeline
457{
458 static constexpr int MaxListeners = 8;
459 static constexpr int MaxTransforms = 8;
460 static constexpr int MaxSinks = 8;
461
462 AsyncPipeline() = default;
463 AsyncPipeline(const AsyncPipeline&) = delete;
464 AsyncPipeline(AsyncPipeline&&) = delete;
465 AsyncPipeline& operator=(const AsyncPipeline&) = delete;
466 AsyncPipeline& operator=(AsyncPipeline&&) = delete;
468
469 AsyncReadableStream* source = nullptr;
470 AsyncDuplexStream* transforms[MaxTransforms] = {nullptr};
471 AsyncWritableStream* sinks[MaxSinks] = {nullptr};
472 Event<MaxListeners, Result> eventError = {};
473
477
479 [[nodiscard]] bool unpipe();
480
484
485 // TODO: Add a pause and cancel/step
486 private:
487 void emitError(Result res);
488 Result checkBuffersPool();
489 Result chainTransforms(AsyncReadableStream*& readable);
490 Result validate();
491
492 void asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncWritableStream& writable);
493 void dispatchToPipes(AsyncBufferView::ID bufferID);
494 void endPipes();
495 void afterSinkEnd();
496 void afterWrite(AsyncBufferView::ID bufferID);
497 bool listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen);
498};
499} // namespace SC
unsigned char uint8_t
Platform independent (1) byte unsigned int.
Definition PrimitiveTypes.h:36
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
struct SC_COMPILER_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Definition AsyncStreams.h:52
A Span of bytes memory to be read or written by async streams.
Definition AsyncStreams.h:50
AsyncBufferView(T &&t)
Saves a copy (or a moved instance) of a String / Buffer (or anything that works with GrowableBuffer<T...
Definition AsyncStreams.h:94
void setReusable(bool reusable)
Tags this AsyncBufferView as reusable after its refCount goes to zero.
Definition AsyncStreams.h:88
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition AsyncStreams.h:160
Result getReadableData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
void setBuffers(Span< AsyncBufferView > newBuffers)
Sets memory for the new buffers.
Definition AsyncStreams.h:190
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
Result createChildView(AsyncBufferView::ID parentBufferID, size_t offset, size_t length, AsyncBufferView::ID &outChildBufferID)
Creates a child view that references a slice of the parent buffer.
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result getWritableData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
static Result sliceInEqualParts(Span< AsyncBufferView > buffers, Span< char > memory, size_t numSlices)
Splits a span of memory in equally sized slices, assigning them to buffers and marking them as reusab...
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Result pushBuffer(AsyncBufferView &&buffer, AsyncBufferView::ID &bufferID)
Adds a buffer to the pool in any empty slot (found by scanning from start to end)
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
size_t getNumBuffers() const
Gets size of buffers held by the pool.
Definition AsyncStreams.h:193
A stream that can both produce and consume buffers.
Definition AsyncStreams.h:405
Pipes read data from SC::AsyncReadableStream, forwarding them to SC::AsyncWritableStream.
Definition AsyncStreams.h:457
Result pipe()
Reports errors by source, transforms or sinks.
bool unpipe()
Unregisters all events from source, transforms and sinks.
Result start()
Starts the pipeline.
Definition AsyncStreams.h:213
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:211
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
bool push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition AsyncStreams.h:223
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition AsyncStreams.h:224
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition AsyncStreams.h:222
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
size_t getReadQueueSize() const
Returns the size of read queue.
Definition AsyncStreams.h:254
void destroy()
Forcefully destroys the readable stream before calling end event releasing all resources.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition AsyncStreams.h:245
constexpr void setReadQueue(Span< Request > requests)
Sets the read queue for this readable stream.
Definition AsyncStreams.h:251
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
Result unshift(AsyncBufferView::ID bufferID)
Push back a buffer to the front of the read queue (e.g. for un-consumed data)
Result start()
Starts the readable stream, that will emit eventData.
Function< Result()> asyncRead
Function that every stream must define to implement its custom read operation.
Definition AsyncStreams.h:217
A duplex stream that produces new buffers transforming received buffers.
Definition AsyncStreams.h:414
Definition AsyncStreams.h:316
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:311
size_t getWriteQueueSize() const
Returns the size of write queue.
Definition AsyncStreams.h:337
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition AsyncStreams.h:326
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void destroy()
Forcefully destroys the writable stream before calling end event releasing all resources.
void emitError(Result error)
Signals an async error received.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition AsyncStreams.h:325
bool isStillWriting() const
Returns true if this stream is writing something.
Definition AsyncStreams.h:383
void resumeWriting()
Resumes writing queued requests for this stream.
Result init(AsyncBuffersPool &buffersPool)
Emitted when no more data can be written.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
Function< Result(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)>)> asyncWrite
Function that every stream must define to implement its custom write operation.
Definition AsyncStreams.h:313
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Function< bool()> canEndWritable
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
Definition AsyncStreams.h:377
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
Result write(AsyncBufferView &&bufferView, Function< void(AsyncBufferView::ID)> cb={})
Push a new buffer view to the queue, registering it with the allocator.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
constexpr void setWriteQueue(Span< Request > requests)
Sets the write queue for this writable stream.
Definition AsyncStreams.h:334
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29