Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
AsyncStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Foundation/AlignedStorage.h"
6#include "../Foundation/Function.h"
7#include "../Foundation/Internal/IGrowableBuffer.h"
8#include "../Foundation/Result.h"
9#include "../Foundation/Span.h"
10#include "Internal/CircularQueue.h"
11#include "Internal/Event.h"
12
42
45namespace SC
46{
47
49struct SC_COMPILER_EXPORT AsyncBufferView
50{
51 struct SC_COMPILER_EXPORT ID
52 {
53 using NumericType = int32_t;
54
55 static constexpr NumericType InvalidValue = -1;
56
57 NumericType identifier;
58
59 constexpr ID() : identifier(InvalidValue) {}
60 explicit constexpr ID(int32_t value) : identifier(value) {}
61
62 [[nodiscard]] constexpr bool isValid() const { return identifier != InvalidValue; }
63 [[nodiscard]] constexpr bool operator==(ID other) const { return identifier == other.identifier; }
64 };
65 enum class Type : uint8_t
66 {
67 Empty,
68 Writable,
69 ReadOnly,
70 Growable,
71 Child,
72 };
73
74 AsyncBufferView() : writableData(), offset(0), length(0), refs(0), type(Type::Empty), reUse(false) {}
75 AsyncBufferView(Span<char> data) : writableData(data)
76 {
77 type = Type::Writable;
78 offset = 0;
79 length = data.sizeInBytes();
80 parentID = ID();
81 }
82 AsyncBufferView(Span<const char> data) : readonlyData(data)
83 {
84 type = Type::ReadOnly;
85 offset = 0;
86 length = data.sizeInBytes();
87 parentID = ID();
88 }
89
91 void setReusable(bool reusable) { reUse = reusable; }
92
96 template <typename T>
97 AsyncBufferView(T&& t) // universal reference, it can capture both lvalue and rvalue
98 {
99 type = Type::Growable;
100 offset = 0;
101 length = 0;
102 parentID = ID();
103 // Here we're type-erasing T in our own inline storage provided by a slightly oversized Function<>
104 // that it will be able to construct (and destruct) the right GrowableBuffer<T> from just a piece of storage
105 // and return a pointer to the corresponding IGrowableBuffer* interface
106 getGrowableBuffer = [t = forward<T>(t)](GrowableStorage& storage, bool construct) mutable -> IGrowableBuffer*
107 {
108 using Type = typename TypeTraits::RemoveReference<T>::type;
109 if (construct)
110 {
111 placementNew(storage.reinterpret_as<GrowableBuffer<Type>>(), t);
112 return &storage.reinterpret_as<GrowableBuffer<Type>>();
113 }
114 else
115 {
116 dtor(storage.reinterpret_as<GrowableBuffer<Type>>());
117 return nullptr;
118 }
119 };
120 }
121
122 template <int N>
123 AsyncBufferView(const char (&literal)[N])
124 {
125 readonlyData = {literal, N - 1};
126 type = Type::ReadOnly;
127 offset = 0;
128 length = N - 1;
129 }
130
131 Type getType() const { return type; }
132
133 private:
134#if SC_PLATFORM_64_BIT
135 static constexpr int TypeErasedCaptureSize = sizeof(void*) * 3; // This is enough to hold String / Buffer by copy
136#else
137 static constexpr int TypeErasedCaptureSize = sizeof(void*) * 6; // This is enough to hold String / Buffer by copy
138#endif
139 static constexpr int TypeErasedGrowableSize = sizeof(void*) * 6;
140
141 using GrowableStorage = AlignedStorage<TypeErasedGrowableSize>;
142 Function<IGrowableBuffer*(GrowableStorage&, bool), TypeErasedCaptureSize> getGrowableBuffer;
143
144 union
145 {
146 Span<char> writableData;
147 Span<const char> readonlyData;
148 };
149 AsyncBufferView::ID parentID;
150
151 friend struct AsyncBuffersPool;
152
153 size_t offset = 0;
154 size_t length = 0;
155 int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it
156 Type type = Type::Empty; // If it's Empty, Writable, ReadOnly, Growable or Child
157 bool reUse = false; // If it can be re-used after refs == 0
158};
159
162struct SC_COMPILER_EXPORT AsyncBuffersPool
163{
166
170
173
176
179
181 Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
182
184 void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes);
185
188
190 static Result sliceInEqualParts(Span<AsyncBufferView> buffers, Span<char> memory, size_t numSlices);
191
193 void setBuffers(Span<AsyncBufferView> newBuffers) { buffers = newBuffers; }
194
196 [[nodiscard]] size_t getNumBuffers() const { return buffers.sizeInElements(); }
197
199 Result createChildView(AsyncBufferView::ID parentBufferID, size_t offset, size_t length,
200 AsyncBufferView::ID& outChildBufferID);
201
202 private:
204 Span<AsyncBufferView> buffers;
205};
206
213struct SC_COMPILER_EXPORT AsyncReadableStream
214{
217
218 struct Request
219 {
220 AsyncBufferView::ID bufferID;
221 };
222
223 static constexpr int MaxListeners = 8;
224
225 Event<MaxListeners, Result> eventError;
226 Event<MaxListeners, AsyncBufferView::ID> eventData;
227 Event<MaxListeners> eventEnd;
228 Event<MaxListeners> eventClose;
229
234
237
239 void pause();
240
243
246 void destroy();
247
249 [[nodiscard]] bool isEnded() const { return state == State::Ended; }
250
252 [[nodiscard]] bool hasBeenDestroyed() const { return destroyed; }
253
256
258 constexpr void setReadQueue(Span<Request> requests) { readQueue = requests; }
259
261 [[nodiscard]] size_t getReadQueueSize() const { return readQueue.size(); }
262
265 [[nodiscard]] bool push(AsyncBufferView::ID bufferID, size_t newSize);
266
269
271 void pushEnd();
272
275 void reactivate(bool doReactivate);
276
278 void emitError(Result error);
279
281 [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
282
284 void setAutoDestroy(bool value) { autoDestroy = value; }
285
287 [[nodiscard]] bool getAutoDestroy() const { return autoDestroy; }
288
289 protected:
290 virtual ~AsyncReadableStream();
291
293 virtual Result asyncRead() = 0;
294
298
301
302 private:
303 void maybeDestroyEndedReadable();
304 void emitOnData();
305 void executeRead();
306
307 enum class State : uint8_t
308 {
309 Stopped, // Stream must be inited
310 CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume)
311 Reading, // A read is being issued (may be sync or async)
312 SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync)
313 SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true)
314 AsyncReading, // An async read is in flight
315 AsyncPushing, // AsyncReading + AsyncReadableStream::push
316 Pausing, // Pause requested while read in flight
317 Paused, // Actually paused with no read in flight
318 Ended, // Emitted all data, no more data will be emitted
319 Destroying, // Destroy has been requested and it's in progress
320 Errored, // Error occurred
321 };
322 State state = State::Stopped;
323
324 bool destroyed = false;
325 bool autoDestroy = true;
326
327 AsyncBuffersPool* buffers = nullptr;
328
329 CircularQueue<Request> readQueue;
330};
331
340struct SC_COMPILER_EXPORT AsyncWritableStream
341{
344
345 friend struct AsyncPipeline;
346
347 struct Request
348 {
349 AsyncBufferView::ID bufferID;
350
352 };
353 static constexpr int MaxListeners = 8;
354
355 Event<MaxListeners, Result> eventError;
356
357 Event<MaxListeners> eventDrain;
358 Event<MaxListeners> eventFinish;
359 Event<MaxListeners> eventClose;
360
365
367 constexpr void setWriteQueue(Span<Request> requests) { writeQueue = requests; }
368
370 [[nodiscard]] size_t getWriteQueueSize() const { return writeQueue.size(); }
371
378 Result write(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb = {});
379
382 Result write(AsyncBufferView&& bufferView, Function<void(AsyncBufferView::ID)> cb = {});
383
386 void end();
387
390 void destroy();
391
394
396 void finishedWriting(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)>&& cb, Result res);
397
400
402 Result unshift(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)>&& cb);
403
405 void emitError(Result error);
406
408 void tryAsync(Result potentialError);
409
411 [[nodiscard]] bool isStillWriting() const { return state == State::Writing or state == State::Ending; }
412
414 [[nodiscard]] bool hasBeenDestroyed() const { return destroyed; }
415
417 void setAutoDestroy(bool value) { autoDestroy = value; }
418
420 [[nodiscard]] bool getAutoDestroy() const { return autoDestroy; }
421
422 protected:
423 virtual ~AsyncWritableStream();
424
426 virtual Result asyncWrite(AsyncBufferView::ID, Function<void(AsyncBufferView::ID)> func) = 0;
427
431 virtual bool canEndWritable();
432
436
439
440 void stop() { state = State::Stopped; }
441
442 private:
443 [[nodiscard]] bool canAcceptWrite() const
444 {
445 return (state == State::Stopped or state == State::Writing) and not writeQueue.isFull();
446 }
447
448 enum class State : uint8_t
449 {
450 Stopped,
451 Writing,
452 Ending,
453 Ended,
454 Destroying,
455 Errored,
456 };
457 State state = State::Stopped;
458
459 bool destroyed = false;
460 bool autoDestroy = true;
461
462 AsyncBuffersPool* buffers = nullptr;
463
464 CircularQueue<Request> writeQueue;
465};
466
468struct SC_COMPILER_EXPORT AsyncDuplexStream : public AsyncReadableStream, public AsyncWritableStream
469{
471
472 Result init(AsyncBuffersPool& buffersPool, Span<AsyncReadableStream::Request> readableRequests,
473 Span<AsyncWritableStream::Request> writableRequests);
474
475 virtual Result asyncRead() override;
476};
477
479struct SC_COMPILER_EXPORT AsyncTransformStream : public AsyncDuplexStream
480{
482
483 void afterProcess(Span<const char> inputAfter, Span<char> outputAfter);
484 void afterFinalize(Span<char> outputAfter, bool streamEnded);
485
486 virtual Result onProcess(Span<const char>, Span<char>) = 0;
487 virtual Result onFinalize(Span<char>) = 0;
488
489 private:
490 virtual Result asyncWrite(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb) override;
491 virtual bool canEndWritable() override;
492
493 Function<void(AsyncBufferView::ID)> inputCallback;
494
495 Span<const char> inputData;
496 Span<char> outputData;
497
498 AsyncBufferView::ID inputBufferID;
499 AsyncBufferView::ID outputBufferID;
500
501 Result prepare(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
502
503 void tryFinalize();
504
505 enum class State
506 {
507 None,
508 Paused,
509 Processing,
510 Finalizing,
511 Finalized,
512 };
513 State state = State::None;
514};
515
523struct SC_COMPILER_EXPORT AsyncPipeline
524{
525 static constexpr int MaxListeners = 8;
526 static constexpr int MaxTransforms = 8;
527 static constexpr int MaxSinks = 8;
528
529 AsyncPipeline() = default;
530 AsyncPipeline(const AsyncPipeline&) = delete;
531 AsyncPipeline(AsyncPipeline&&) = delete;
532 AsyncPipeline& operator=(const AsyncPipeline&) = delete;
533 AsyncPipeline& operator=(AsyncPipeline&&) = delete;
535
536 AsyncReadableStream* source = nullptr;
537 AsyncDuplexStream* transforms[MaxTransforms] = {nullptr};
538 AsyncWritableStream* sinks[MaxSinks] = {nullptr};
539 Event<MaxListeners, Result> eventError = {};
540
544
546 [[nodiscard]] bool unpipe();
547
551
552 // Internal state used by the pipeline implementation.
553 AsyncReadableStream* dispatchReadable = nullptr;
554 AsyncReadableStream* transformInputs[MaxTransforms] = {nullptr};
555
557 {
558 AsyncReadableStream* readable = nullptr;
559 AsyncWritableStream* writable = nullptr;
560 AsyncBufferView::ID bufferID;
561 };
562 PendingWrite pendingWrites[MaxTransforms + MaxSinks] = {};
563 bool shouldEndWhenDrained = false;
564
565 // TODO: Add a pause and cancel/step
566 private:
567 void emitError(Result res);
568 Result checkBuffersPool();
569 Result chainTransforms(AsyncReadableStream*& readable);
570 Result validate();
571
572 void asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncReadableStream& readable, AsyncWritableStream& writable);
573 void dispatchToPipes(AsyncBufferView::ID bufferID);
574 void endPipes();
575 void afterSinkEnd();
576 void afterWrite(AsyncBufferView::ID bufferID);
577 void dispatchToTransform(AsyncBufferView::ID bufferID, size_t transformIndex);
578 bool retryPendingWrites();
579 bool hasPendingWrites() const;
580 bool hasPendingWritesForReadable(const AsyncReadableStream& readable) const;
581 void releasePendingWrites();
582 bool listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen);
583
584 PendingWrite* findPendingWrite(AsyncReadableStream& readable, AsyncWritableStream& writable);
585};
586} // namespace SC
unsigned char uint8_t
Platform independent (1) byte unsigned int.
Definition PrimitiveTypes.h:36
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
struct SC_COMPILER_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Definition AsyncStreams.h:52
A Span of bytes memory to be read or written by async streams.
Definition AsyncStreams.h:50
AsyncBufferView(T &&t)
Saves a copy (or a moved instance) of a String / Buffer (or anything that works with GrowableBuffer<T...
Definition AsyncStreams.h:97
void setReusable(bool reusable)
Tags this AsyncBufferView as reusable after its refCount goes to zero.
Definition AsyncStreams.h:91
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition AsyncStreams.h:163
Result getReadableData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
void setBuffers(Span< AsyncBufferView > newBuffers)
Sets memory for the new buffers.
Definition AsyncStreams.h:193
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
Result createChildView(AsyncBufferView::ID parentBufferID, size_t offset, size_t length, AsyncBufferView::ID &outChildBufferID)
Creates a child view that references a slice of the parent buffer.
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result getWritableData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
static Result sliceInEqualParts(Span< AsyncBufferView > buffers, Span< char > memory, size_t numSlices)
Splits a span of memory in equally sized slices, assigning them to buffers and marking them as reusab...
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Result pushBuffer(AsyncBufferView &&buffer, AsyncBufferView::ID &bufferID)
Adds a buffer to the pool in any empty slot (found by scanning from start to end)
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
size_t getNumBuffers() const
Gets size of buffers held by the pool.
Definition AsyncStreams.h:196
A stream that can both produce and consume buffers.
Definition AsyncStreams.h:469
virtual Result asyncRead() override
Function that every stream must define to implement its custom read operation.
Definition AsyncStreams.h:557
Pipes read data from SC::AsyncReadableStream, forwarding them to SC::AsyncWritableStream.
Definition AsyncStreams.h:524
Result pipe()
Reports errors by source, transforms or sinks.
bool unpipe()
Unregisters all events from source, transforms and sinks.
Result start()
Starts the pipeline.
Definition AsyncStreams.h:219
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:214
bool getAutoDestroy() const
Returns true if stream will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:287
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
bool push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition AsyncStreams.h:227
bool hasBeenDestroyed() const
Returns true if the stream has been already destroyed (asynchronously through destroy())
Definition AsyncStreams.h:252
Result finishedDestroyingReadable()
Called from inside asyncDestroy to transition from Destroying to Destroyed state (emitting eventClose...
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition AsyncStreams.h:228
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition AsyncStreams.h:226
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
size_t getReadQueueSize() const
Returns the size of read queue.
Definition AsyncStreams.h:261
void setAutoDestroy(bool value)
If set to true will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:284
virtual Result asyncDestroyReadable()
Function that a readable stream can re-implement to release its internal resources.
void destroy()
Forcefully destroys the readable stream before calling end event releasing all resources.
virtual Result asyncRead()=0
Function that every stream must define to implement its custom read operation.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition AsyncStreams.h:249
constexpr void setReadQueue(Span< Request > requests)
Sets the read queue for this readable stream.
Definition AsyncStreams.h:258
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
Result unshift(AsyncBufferView::ID bufferID)
Push back a buffer to the front of the read queue (e.g. for un-consumed data)
Result start()
Starts the readable stream, that will emit eventData.
A duplex stream that produces new buffers transforming received buffers.
Definition AsyncStreams.h:480
Definition AsyncStreams.h:348
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:341
size_t getWriteQueueSize() const
Returns the size of write queue.
Definition AsyncStreams.h:370
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition AsyncStreams.h:358
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void destroy()
Forcefully destroys the writable stream before calling end event releasing all resources.
void emitError(Result error)
Signals an async error received.
void finishedDestroyingWritable()
Function that MUST be called by re-implementations of asyncDestroyWritable once they're done.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition AsyncStreams.h:357
virtual bool canEndWritable()
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
virtual Result asyncWrite(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)> func)=0
Function that every stream must define to implement its custom write operation.
bool isStillWriting() const
Returns true if this stream is writing something.
Definition AsyncStreams.h:411
void resumeWriting()
Resumes writing queued requests for this stream.
virtual Result asyncDestroyWritable()
Function that a writable stream can re-implement to release its internal resources.
void setAutoDestroy(bool value)
If set to true will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:417
bool hasBeenDestroyed() const
Returns true if the stream has been already destroyed (asynchronously through destroy())
Definition AsyncStreams.h:414
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
bool getAutoDestroy() const
Returns true if stream will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:420
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
Result write(AsyncBufferView &&bufferView, Function< void(AsyncBufferView::ID)> cb={})
Push a new buffer view to the queue, registering it with the allocator.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
Event< MaxListeners > eventClose
Emitted when no more data can be written.
Definition AsyncStreams.h:359
constexpr void setWriteQueue(Span< Request > requests)
Sets the write queue for this writable stream.
Definition AsyncStreams.h:367
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29