5#include "../Foundation/Compiler.h"
6#ifndef SC_EXPORT_LIBRARY_ASYNC_STREAMS
7#define SC_EXPORT_LIBRARY_ASYNC_STREAMS 0
9#define SC_ASYNC_STREAMS_EXPORT SC_COMPILER_LIBRARY_EXPORT(SC_EXPORT_LIBRARY_ASYNC_STREAMS)
11#include "../Foundation/AlignedStorage.h"
12#include "../Foundation/Function.h"
13#include "../Foundation/Internal/IGrowableBuffer.h"
14#include "../Foundation/Result.h"
15#include "../Foundation/Span.h"
16#include "Internal/CircularQueue.h"
17#include "Internal/Event.h"
57 struct SC_ASYNC_STREAMS_EXPORT
ID
61 static constexpr NumericType InvalidValue = -1;
63 NumericType identifier;
65 constexpr ID() : identifier(InvalidValue) {}
66 explicit constexpr ID(
int32_t value) : identifier(value) {}
68 [[nodiscard]]
constexpr bool isValid()
const {
return identifier != InvalidValue; }
69 [[nodiscard]]
constexpr bool operator==(
ID other)
const {
return identifier == other.identifier; }
80 AsyncBufferView() : writableData(), offset(0), length(0), refs(0), type(Type::Empty), reUse(false) {}
81 AsyncBufferView(Span<char> data) : writableData(data)
83 type = Type::Writable;
85 length = data.sizeInBytes();
88 AsyncBufferView(Span<const char> data) : readonlyData(data)
90 type = Type::ReadOnly;
92 length = data.sizeInBytes();
102 template <
typename T>
105 type = Type::Growable;
112 getGrowableBuffer = [t = forward<T>(t)](
GrowableStorage& storage,
bool construct)
mutable -> IGrowableBuffer*
114 using Type =
typename TypeTraits::RemoveReference<T>::type;
117 placementNew(storage.reinterpret_as<GrowableBuffer<Type>>(), t);
118 return &storage.reinterpret_as<GrowableBuffer<Type>>();
122 dtor(storage.reinterpret_as<GrowableBuffer<Type>>());
131 readonlyData = {literal, N - 1};
132 type = Type::ReadOnly;
137 Type getType()
const {
return type; }
140#if SC_PLATFORM_64_BIT
141 static constexpr int TypeErasedCaptureSize =
sizeof(
void*) * 3;
143 static constexpr int TypeErasedCaptureSize =
sizeof(
void*) * 6;
145 static constexpr int TypeErasedGrowableSize =
sizeof(
void*) * 6;
147 using GrowableStorage = AlignedStorage<TypeErasedGrowableSize>;
148 Function<IGrowableBuffer*(GrowableStorage&, bool), TypeErasedCaptureSize> getGrowableBuffer;
152 Span<char> writableData;
153 Span<const char> readonlyData;
155 AsyncBufferView::ID parentID;
157 friend struct AsyncBuffersPool;
162 Type type = Type::Empty;
202 [[nodiscard]]
size_t getNumBuffers()
const {
return buffers.sizeInElements(); }
229 static constexpr int MaxListeners = 8;
231 Event<MaxListeners, Result> eventError;
255 [[nodiscard]]
bool isEnded()
const {
return state == State::Ended; }
261 [[nodiscard]]
bool canStart()
const {
return state == State::CanRead; }
312 void maybeDestroyEndedReadable();
331 State state = State::Stopped;
333 bool destroyed =
false;
334 bool autoDestroy =
true;
338 CircularQueue<Request> readQueue;
362 static constexpr int MaxListeners = 8;
364 Event<MaxListeners, Result> eventError;
420 [[nodiscard]]
bool isStillWriting()
const {
return state == State::Writing or state == State::Ending; }
449 void stop() { state = State::Stopped; }
452 [[nodiscard]]
bool canAcceptWrite()
const
454 return (state == State::Stopped or state == State::Writing) and not writeQueue.isFull();
457 enum class State : uint8_t
466 State state = State::Stopped;
468 bool destroyed =
false;
469 bool autoDestroy =
true;
471 AsyncBuffersPool* buffers =
nullptr;
473 CircularQueue<Request> writeQueue;
493 void afterFinalize(
Span<char> outputAfter,
bool streamEnded);
500 virtual bool canEndWritable()
override;
522 State state = State::None;
545 static constexpr int MaxListeners = 8;
546 static constexpr int MaxTransforms = 8;
547 static constexpr int MaxSinks = 8;
559 Event<MaxListeners, Result> eventError = {};
582 PendingWrite pendingWrites[MaxTransforms + MaxSinks] = {};
583 bool shouldEndWhenDrained =
false;
587 void emitError(
Result res);
588 Result checkBuffersPool();
598 bool retryPendingWrites();
599 bool hasPendingWrites()
const;
601 void releasePendingWrites();
struct SC_FOUNDATION_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
unsigned char uint8_t
Platform independent (1) byte unsigned int.
Definition PrimitiveTypes.h:27
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:37
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Definition AsyncStreams.h:58
A Span of bytes memory to be read or written by async streams.
Definition AsyncStreams.h:56
AsyncBufferView(T &&t)
Saves a copy (or a moved instance) of a String / Buffer (or anything that works with GrowableBuffer<T...
Definition AsyncStreams.h:103
void setReusable(bool reusable)
Tags this AsyncBufferView as reusable after its refCount goes to zero.
Definition AsyncStreams.h:97
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition AsyncStreams.h:169
Result getReadableData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
void setBuffers(Span< AsyncBufferView > newBuffers)
Sets memory for the new buffers.
Definition AsyncStreams.h:199
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
Result createChildView(AsyncBufferView::ID parentBufferID, size_t offset, size_t length, AsyncBufferView::ID &outChildBufferID)
Creates a child view that references a slice of the parent buffer.
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result getWritableData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
static Result sliceInEqualParts(Span< AsyncBufferView > buffers, Span< char > memory, size_t numSlices)
Splits a span of memory in equally sized slices, assigning them to buffers and marking them as reusab...
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Result pushBuffer(AsyncBufferView &&buffer, AsyncBufferView::ID &bufferID)
Adds a buffer to the pool in any empty slot (found by scanning from start to end)
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
size_t getNumBuffers() const
Gets size of buffers held by the pool.
Definition AsyncStreams.h:202
A stream that can both produce and consume buffers.
Definition AsyncStreams.h:478
virtual Result asyncRead() override
Function that every stream must define to implement its custom read operation.
Definition AsyncStreams.h:577
Pipes read data from SC::AsyncReadableStream, forwarding them to SC::AsyncWritableStream.
Definition AsyncStreams.h:544
Result pipe()
Reports errors by source, transforms or sinks.
bool unpipe()
Unregisters all events from source, transforms and sinks.
Result start()
Starts the pipeline.
Definition AsyncStreams.h:225
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:220
bool getAutoDestroy() const
Returns true if stream will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:296
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
bool push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition AsyncStreams.h:233
bool hasBeenDestroyed() const
Returns true if the stream has been already destroyed (asynchronously through destroy())
Definition AsyncStreams.h:258
Result finishedDestroyingReadable()
Called from inside asyncDestroy to transition from Destroying to Destroyed state (emitting eventClose...
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition AsyncStreams.h:234
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition AsyncStreams.h:232
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
size_t getReadQueueSize() const
Returns the size of read queue.
Definition AsyncStreams.h:270
void setAutoDestroy(bool value)
If set to true will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:293
virtual Result asyncDestroyReadable()
Function that a readable stream can re-implement to release its internal resources.
void destroy()
Forcefully destroys the readable stream before calling end event releasing all resources.
virtual Result asyncRead()=0
Function that every stream must define to implement its custom read operation.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition AsyncStreams.h:255
constexpr void setReadQueue(Span< Request > requests)
Sets the read queue for this readable stream.
Definition AsyncStreams.h:267
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
bool canStart() const
Returns true when start() is currently valid for this stream.
Definition AsyncStreams.h:261
Result unshift(AsyncBufferView::ID bufferID)
Push back a buffer to the front of the read queue (e.g. for un-consumed data)
Result start()
Starts the readable stream, that will emit eventData.
Traits exposing AsyncStreams public types through a single template parameter.
Definition AsyncStreams.h:528
Definition AsyncStreams.h:357
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:350
size_t getWriteQueueSize() const
Returns the size of write queue.
Definition AsyncStreams.h:379
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition AsyncStreams.h:367
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void destroy()
Forcefully destroys the writable stream before calling end event releasing all resources.
void emitError(Result error)
Signals an async error received.
void finishedDestroyingWritable()
Function that MUST be called by re-implementations of asyncDestroyWritable once they're done.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition AsyncStreams.h:366
virtual bool canEndWritable()
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
virtual Result asyncWrite(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)> func)=0
Function that every stream must define to implement its custom write operation.
bool isStillWriting() const
Returns true if this stream is writing something.
Definition AsyncStreams.h:420
void resumeWriting()
Resumes writing queued requests for this stream.
virtual Result asyncDestroyWritable()
Function that a writable stream can re-implement to release its internal resources.
void setAutoDestroy(bool value)
If set to true will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:426
bool hasBeenDestroyed() const
Returns true if the stream has been already destroyed (asynchronously through destroy())
Definition AsyncStreams.h:423
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
bool getAutoDestroy() const
Returns true if stream will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:429
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
Result write(AsyncBufferView &&bufferView, Function< void(AsyncBufferView::ID)> cb={})
Push a new buffer view to the queue, registering it with the allocator.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
Event< MaxListeners > eventClose
Emitted when no more data can be written.
Definition AsyncStreams.h:368
constexpr void setWriteQueue(Span< Request > requests)
Sets the write queue for this writable stream.
Definition AsyncStreams.h:376
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29