Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
AsyncStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Foundation/Compiler.h"
6#ifndef SC_EXPORT_LIBRARY_ASYNC_STREAMS
7#define SC_EXPORT_LIBRARY_ASYNC_STREAMS 0
8#endif
9#define SC_ASYNC_STREAMS_EXPORT SC_COMPILER_LIBRARY_EXPORT(SC_EXPORT_LIBRARY_ASYNC_STREAMS)
10
11#include "../Foundation/AlignedStorage.h"
12#include "../Foundation/Function.h"
13#include "../Foundation/Internal/IGrowableBuffer.h"
14#include "../Foundation/Result.h"
15#include "../Foundation/Span.h"
16#include "Internal/CircularQueue.h"
17#include "Internal/Event.h"
18
48
51namespace SC
52{
53
55struct SC_ASYNC_STREAMS_EXPORT AsyncBufferView
56{
57 struct SC_ASYNC_STREAMS_EXPORT ID
58 {
59 using NumericType = int32_t;
60
61 static constexpr NumericType InvalidValue = -1;
62
63 NumericType identifier;
64
65 constexpr ID() : identifier(InvalidValue) {}
66 explicit constexpr ID(int32_t value) : identifier(value) {}
67
68 [[nodiscard]] constexpr bool isValid() const { return identifier != InvalidValue; }
69 [[nodiscard]] constexpr bool operator==(ID other) const { return identifier == other.identifier; }
70 };
71 enum class Type : uint8_t
72 {
73 Empty,
74 Writable,
75 ReadOnly,
76 Growable,
77 Child,
78 };
79
80 AsyncBufferView() : writableData(), offset(0), length(0), refs(0), type(Type::Empty), reUse(false) {}
81 AsyncBufferView(Span<char> data) : writableData(data)
82 {
83 type = Type::Writable;
84 offset = 0;
85 length = data.sizeInBytes();
86 parentID = ID();
87 }
88 AsyncBufferView(Span<const char> data) : readonlyData(data)
89 {
90 type = Type::ReadOnly;
91 offset = 0;
92 length = data.sizeInBytes();
93 parentID = ID();
94 }
95
97 void setReusable(bool reusable) { reUse = reusable; }
98
102 template <typename T>
103 AsyncBufferView(T&& t) // universal reference, it can capture both lvalue and rvalue
104 {
105 type = Type::Growable;
106 offset = 0;
107 length = 0;
108 parentID = ID();
109 // Here we're type-erasing T in our own inline storage provided by a slightly oversized Function<>
110 // that it will be able to construct (and destruct) the right GrowableBuffer<T> from just a piece of storage
111 // and return a pointer to the corresponding IGrowableBuffer* interface
112 getGrowableBuffer = [t = forward<T>(t)](GrowableStorage& storage, bool construct) mutable -> IGrowableBuffer*
113 {
114 using Type = typename TypeTraits::RemoveReference<T>::type;
115 if (construct)
116 {
117 placementNew(storage.reinterpret_as<GrowableBuffer<Type>>(), t);
118 return &storage.reinterpret_as<GrowableBuffer<Type>>();
119 }
120 else
121 {
122 dtor(storage.reinterpret_as<GrowableBuffer<Type>>());
123 return nullptr;
124 }
125 };
126 }
127
128 template <int N>
129 AsyncBufferView(const char (&literal)[N])
130 {
131 readonlyData = {literal, N - 1};
132 type = Type::ReadOnly;
133 offset = 0;
134 length = N - 1;
135 }
136
137 Type getType() const { return type; }
138
139 private:
140#if SC_PLATFORM_64_BIT
141 static constexpr int TypeErasedCaptureSize = sizeof(void*) * 3; // This is enough to hold String / Buffer by copy
142#else
143 static constexpr int TypeErasedCaptureSize = sizeof(void*) * 6; // This is enough to hold String / Buffer by copy
144#endif
145 static constexpr int TypeErasedGrowableSize = sizeof(void*) * 6;
146
147 using GrowableStorage = AlignedStorage<TypeErasedGrowableSize>;
148 Function<IGrowableBuffer*(GrowableStorage&, bool), TypeErasedCaptureSize> getGrowableBuffer;
149
150 union
151 {
152 Span<char> writableData;
153 Span<const char> readonlyData;
154 };
155 AsyncBufferView::ID parentID;
156
157 friend struct AsyncBuffersPool;
158
159 size_t offset = 0;
160 size_t length = 0;
161 int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it
162 Type type = Type::Empty; // If it's Empty, Writable, ReadOnly, Growable or Child
163 bool reUse = false; // If it can be re-used after refs == 0
164};
165
168struct SC_ASYNC_STREAMS_EXPORT AsyncBuffersPool
169{
172
176
179
182
185
187 Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
188
190 void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes);
191
194
196 static Result sliceInEqualParts(Span<AsyncBufferView> buffers, Span<char> memory, size_t numSlices);
197
199 void setBuffers(Span<AsyncBufferView> newBuffers) { buffers = newBuffers; }
200
202 [[nodiscard]] size_t getNumBuffers() const { return buffers.sizeInElements(); }
203
205 Result createChildView(AsyncBufferView::ID parentBufferID, size_t offset, size_t length,
206 AsyncBufferView::ID& outChildBufferID);
207
208 private:
210 Span<AsyncBufferView> buffers;
211};
212
219struct SC_ASYNC_STREAMS_EXPORT AsyncReadableStream
220{
223
224 struct Request
225 {
226 AsyncBufferView::ID bufferID;
227 };
228
229 static constexpr int MaxListeners = 8;
230
231 Event<MaxListeners, Result> eventError;
232 Event<MaxListeners, AsyncBufferView::ID> eventData;
233 Event<MaxListeners> eventEnd;
234 Event<MaxListeners> eventClose;
235
240
243
245 void pause();
246
249
252 void destroy();
253
255 [[nodiscard]] bool isEnded() const { return state == State::Ended; }
256
258 [[nodiscard]] bool hasQueuedData() const { return not readQueue.isEmpty(); }
259
261 [[nodiscard]] bool hasBeenDestroyed() const { return destroyed; }
262
264 [[nodiscard]] bool canStart() const { return state == State::CanRead; }
265
268
270 constexpr void setReadQueue(Span<Request> requests) { readQueue = requests; }
271
273 [[nodiscard]] size_t getReadQueueSize() const { return readQueue.size(); }
274
277 [[nodiscard]] bool push(AsyncBufferView::ID bufferID, size_t newSize);
278
281
283 void pushEnd();
284
287 void reactivate(bool doReactivate);
288
290 void emitError(Result error);
291
293 [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
294
296 void setAutoDestroy(bool value) { autoDestroy = value; }
297
299 [[nodiscard]] bool getAutoDestroy() const { return autoDestroy; }
300
301 protected:
302 virtual ~AsyncReadableStream();
303
305 virtual Result asyncRead() = 0;
306
309
313
316
317 private:
318 void maybeDestroyEndedReadable();
319 void emitOnData();
320 void executeRead();
321
322 enum class State : uint8_t
323 {
324 Stopped, // Stream must be inited
325 CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume)
326 Reading, // A read is being issued (may be sync or async)
327 SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync)
328 SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true)
329 AsyncReading, // An async read is in flight
330 AsyncPushing, // AsyncReading + AsyncReadableStream::push
331 Pausing, // Pause requested while read in flight
332 Paused, // Actually paused with no read in flight
333 Ended, // Emitted all data, no more data will be emitted
334 Destroying, // Destroy has been requested and it's in progress
335 Errored, // Error occurred
336 };
337 State state = State::Stopped;
338
339 bool destroyed = false;
340 bool autoDestroy = true;
341
342 AsyncBuffersPool* buffers = nullptr;
343
344 CircularQueue<Request> readQueue;
345};
346
355struct SC_ASYNC_STREAMS_EXPORT AsyncWritableStream
356{
359
360 friend struct AsyncPipeline;
361
362 struct Request
363 {
364 AsyncBufferView::ID bufferID;
365
367 };
368 static constexpr int MaxListeners = 8;
369
370 Event<MaxListeners, Result> eventError;
371
372 Event<MaxListeners> eventDrain;
373 Event<MaxListeners> eventFinish;
374 Event<MaxListeners> eventClose;
375
380
382 constexpr void setWriteQueue(Span<Request> requests) { writeQueue = requests; }
383
385 [[nodiscard]] size_t getWriteQueueSize() const { return writeQueue.size(); }
386
393 Result write(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb = {});
394
397 Result write(AsyncBufferView&& bufferView, Function<void(AsyncBufferView::ID)> cb = {});
398
401 void end();
402
405 void destroy();
406
409
411 void finishedWriting(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)>&& cb, Result res);
412
415
417 Result unshift(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)>&& cb);
418
420 void emitError(Result error);
421
423 void tryAsync(Result potentialError);
424
426 [[nodiscard]] bool isStillWriting() const { return state == State::Writing or state == State::Ending; }
427
429 [[nodiscard]] bool hasBeenDestroyed() const { return destroyed; }
430
432 void setAutoDestroy(bool value) { autoDestroy = value; }
433
435 [[nodiscard]] bool getAutoDestroy() const { return autoDestroy; }
436
437 protected:
438 virtual ~AsyncWritableStream();
439
441 virtual Result asyncWrite(AsyncBufferView::ID, Function<void(AsyncBufferView::ID)> func) = 0;
442
446 virtual bool canEndWritable();
447
451
454
455 void stop() { state = State::Stopped; }
456
457 private:
458 [[nodiscard]] bool canAcceptWrite() const
459 {
460 return (state == State::Stopped or state == State::Writing) and not writeQueue.isFull();
461 }
462
463 enum class State : uint8_t
464 {
465 Stopped,
466 Writing,
467 Ending,
468 Ended,
469 Destroying,
470 Errored,
471 };
472 State state = State::Stopped;
473
474 bool destroyed = false;
475 bool autoDestroy = true;
476
477 AsyncBuffersPool* buffers = nullptr;
478
479 CircularQueue<Request> writeQueue;
480};
481
483struct SC_ASYNC_STREAMS_EXPORT AsyncDuplexStream : public AsyncReadableStream, public AsyncWritableStream
484{
486
487 Result init(AsyncBuffersPool& buffersPool, Span<AsyncReadableStream::Request> readableRequests,
488 Span<AsyncWritableStream::Request> writableRequests);
489
490 virtual Result asyncRead() override;
491};
492
494struct SC_ASYNC_STREAMS_EXPORT AsyncTransformStream : public AsyncDuplexStream
495{
497
498 void afterProcess(Span<const char> inputAfter, Span<char> outputAfter);
499 void afterFinalize(Span<char> outputAfter, bool streamEnded);
500
501 virtual Result onProcess(Span<const char>, Span<char>) = 0;
502 virtual Result onFinalize(Span<char>) = 0;
503
504 private:
505 virtual Result asyncWrite(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb) override;
506 virtual bool canEndWritable() override;
507
508 Function<void(AsyncBufferView::ID)> inputCallback;
509
510 Span<const char> inputData;
511 Span<char> outputData;
512
513 AsyncBufferView::ID inputBufferID;
514 AsyncBufferView::ID outputBufferID;
515
516 Result prepare(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
517
518 void tryFinalize();
519
520 enum class State
521 {
522 None,
523 Paused,
524 Processing,
525 Finalizing,
526 Finalized,
527 };
528 State state = State::None;
529};
530
533struct SC_ASYNC_STREAMS_EXPORT AsyncStreams
534{
540};
541
549struct SC_ASYNC_STREAMS_EXPORT AsyncPipeline
550{
551 static constexpr int MaxListeners = 8;
552 static constexpr int MaxTransforms = 8;
553 static constexpr int MaxSinks = 8;
554
555 AsyncPipeline() = default;
556 AsyncPipeline(const AsyncPipeline&) = delete;
557 AsyncPipeline(AsyncPipeline&&) = delete;
558 AsyncPipeline& operator=(const AsyncPipeline&) = delete;
559 AsyncPipeline& operator=(AsyncPipeline&&) = delete;
561
562 AsyncReadableStream* source = nullptr;
563 AsyncDuplexStream* transforms[MaxTransforms] = {nullptr};
564 AsyncWritableStream* sinks[MaxSinks] = {nullptr};
565 Event<MaxListeners, Result> eventError = {};
566
570
572 [[nodiscard]] bool unpipe();
573
577
578 // Internal state used by the pipeline implementation.
579 AsyncReadableStream* dispatchReadable = nullptr;
580 AsyncReadableStream* transformInputs[MaxTransforms] = {nullptr};
581
583 {
584 AsyncReadableStream* readable = nullptr;
585 AsyncWritableStream* writable = nullptr;
586 AsyncBufferView::ID bufferID;
587 };
588 PendingWrite pendingWrites[MaxTransforms + MaxSinks] = {};
589 bool shouldEndWhenDrained = false;
590 bool endingPipes = false;
591
592 // TODO: Add a pause and cancel/step
593 private:
594 void emitError(Result res);
595 Result checkBuffersPool();
596 Result chainTransforms(AsyncReadableStream*& readable);
597 Result validate();
598
599 void asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncReadableStream& readable, AsyncWritableStream& writable);
600 void dispatchToPipes(AsyncBufferView::ID bufferID);
601 void endPipes();
602 void afterSinkEnd();
603 void afterWrite(AsyncBufferView::ID bufferID);
604 void dispatchToTransform(AsyncBufferView::ID bufferID, size_t transformIndex);
605 bool retryPendingWrites();
606 bool hasPendingWrites() const;
607 bool hasPendingWritesForReadable(const AsyncReadableStream& readable) const;
608 void releasePendingWrites();
609 bool listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen);
610
611 PendingWrite* findPendingWrite(AsyncReadableStream& readable, AsyncWritableStream& writable);
612};
613} // namespace SC
struct SC_FOUNDATION_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
unsigned char uint8_t
Platform independent (1) byte unsigned int.
Definition PrimitiveTypes.h:27
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:37
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Definition AsyncStreams.h:58
A Span of bytes memory to be read or written by async streams.
Definition AsyncStreams.h:56
AsyncBufferView(T &&t)
Saves a copy (or a moved instance) of a String / Buffer (or anything that works with GrowableBuffer<T...
Definition AsyncStreams.h:103
void setReusable(bool reusable)
Tags this AsyncBufferView as reusable after its refCount goes to zero.
Definition AsyncStreams.h:97
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition AsyncStreams.h:169
Result getReadableData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
void setBuffers(Span< AsyncBufferView > newBuffers)
Sets memory for the new buffers.
Definition AsyncStreams.h:199
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
Result createChildView(AsyncBufferView::ID parentBufferID, size_t offset, size_t length, AsyncBufferView::ID &outChildBufferID)
Creates a child view that references a slice of the parent buffer.
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result getWritableData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
static Result sliceInEqualParts(Span< AsyncBufferView > buffers, Span< char > memory, size_t numSlices)
Splits a span of memory in equally sized slices, assigning them to buffers and marking them as reusab...
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Result pushBuffer(AsyncBufferView &&buffer, AsyncBufferView::ID &bufferID)
Adds a buffer to the pool in any empty slot (found by scanning from start to end)
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
size_t getNumBuffers() const
Gets size of buffers held by the pool.
Definition AsyncStreams.h:202
A stream that can both produce and consume buffers.
Definition AsyncStreams.h:484
virtual Result asyncRead() override
Function that every stream must define to implement its custom read operation.
Definition AsyncStreams.h:583
Pipes read data from SC::AsyncReadableStream, forwarding them to SC::AsyncWritableStream.
Definition AsyncStreams.h:550
Result pipe()
Reports errors by source, transforms or sinks.
bool unpipe()
Unregisters all events from source, transforms and sinks.
Result start()
Starts the pipeline.
Definition AsyncStreams.h:225
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:220
bool getAutoDestroy() const
Returns true if stream will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:299
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
bool push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition AsyncStreams.h:233
bool hasBeenDestroyed() const
Returns true if the stream has been already destroyed (asynchronously through destroy())
Definition AsyncStreams.h:261
Result finishedDestroyingReadable()
Called from inside asyncDestroy to transition from Destroying to Destroyed state (emitting eventClose...
virtual Result asyncResumeReading()
Function that streams may define to resume an already pending async read.
bool hasQueuedData() const
Returns true if there are buffered reads waiting to be emitted.
Definition AsyncStreams.h:258
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition AsyncStreams.h:234
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition AsyncStreams.h:232
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
size_t getReadQueueSize() const
Returns the size of read queue.
Definition AsyncStreams.h:273
void setAutoDestroy(bool value)
If set to true will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:296
virtual Result asyncDestroyReadable()
Function that a readable stream can re-implement to release its internal resources.
void destroy()
Forcefully destroys the readable stream before calling end event releasing all resources.
virtual Result asyncRead()=0
Function that every stream must define to implement its custom read operation.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition AsyncStreams.h:255
constexpr void setReadQueue(Span< Request > requests)
Sets the read queue for this readable stream.
Definition AsyncStreams.h:270
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
bool canStart() const
Returns true when start() is currently valid for this stream.
Definition AsyncStreams.h:264
Result unshift(AsyncBufferView::ID bufferID)
Push back a buffer to the front of the read queue (e.g. for un-consumed data)
Result start()
Starts the readable stream, that will emit eventData.
Traits exposing AsyncStreams public types through a single template parameter.
Definition AsyncStreams.h:534
A duplex stream that produces new buffers transforming received buffers.
Definition AsyncStreams.h:495
Definition AsyncStreams.h:363
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:356
size_t getWriteQueueSize() const
Returns the size of write queue.
Definition AsyncStreams.h:385
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition AsyncStreams.h:373
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void destroy()
Forcefully destroys the writable stream before calling end event releasing all resources.
void emitError(Result error)
Signals an async error received.
void finishedDestroyingWritable()
Function that MUST be called by re-implementations of asyncDestroyWritable once they're done.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition AsyncStreams.h:372
virtual bool canEndWritable()
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
virtual Result asyncWrite(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)> func)=0
Function that every stream must define to implement its custom write operation.
bool isStillWriting() const
Returns true if this stream is writing something.
Definition AsyncStreams.h:426
void resumeWriting()
Resumes writing queued requests for this stream.
virtual Result asyncDestroyWritable()
Function that a writable stream can re-implement to release its internal resources.
void setAutoDestroy(bool value)
If set to true will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:432
bool hasBeenDestroyed() const
Returns true if the stream has been already destroyed (asynchronously through destroy())
Definition AsyncStreams.h:429
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
bool getAutoDestroy() const
Returns true if stream will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:435
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
Result write(AsyncBufferView &&bufferView, Function< void(AsyncBufferView::ID)> cb={})
Push a new buffer view to the queue, registering it with the allocator.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
Event< MaxListeners > eventClose
Emitted when no more data can be written.
Definition AsyncStreams.h:374
constexpr void setWriteQueue(Span< Request > requests)
Sets the write queue for this writable stream.
Definition AsyncStreams.h:382
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29