5#include "../Foundation/AlignedStorage.h"
6#include "../Foundation/Function.h"
7#include "../Foundation/Internal/IGrowableBuffer.h"
8#include "../Foundation/Result.h"
9#include "../Foundation/Span.h"
10#include "Internal/CircularQueue.h"
11#include "Internal/Event.h"
51 struct SC_COMPILER_EXPORT
ID
54 NumericType identifier;
55 static constexpr NumericType InvalidValue = -1;
57 constexpr ID() : identifier(InvalidValue) {}
58 explicit constexpr ID(
int32_t value) : identifier(value) {}
60 [[nodiscard]]
constexpr bool operator==(
ID other)
const {
return identifier == other.identifier; }
71 AsyncBufferView() : writableData(), offset(0), length(0), refs(0), type(Type::Empty), reUse(false) {}
72 AsyncBufferView(Span<char> data) : writableData(data)
74 type = Type::Writable;
76 length = data.sizeInBytes();
79 AsyncBufferView(Span<const char> data) : readonlyData(data)
81 type = Type::ReadOnly;
83 length = data.sizeInBytes();
96 type = Type::Growable;
103 getGrowableBuffer = [t = forward<T>(t)](
GrowableStorage& storage,
bool construct)
mutable -> IGrowableBuffer*
105 using Type =
typename TypeTraits::RemoveReference<T>::type;
108 placementNew(storage.reinterpret_as<GrowableBuffer<Type>>(), t);
109 return &storage.reinterpret_as<GrowableBuffer<Type>>();
113 dtor(storage.reinterpret_as<GrowableBuffer<Type>>());
122 readonlyData = {literal, N - 1};
123 type = Type::ReadOnly;
128 Type getType()
const {
return type; }
131#if SC_PLATFORM_64_BIT
132 static constexpr int TypeErasedCaptureSize =
sizeof(
void*) * 3;
134 static constexpr int TypeErasedCaptureSize =
sizeof(
void*) * 6;
136 static constexpr int TypeErasedGrowableSize =
sizeof(
void*) * 6;
138 using GrowableStorage = AlignedStorage<TypeErasedGrowableSize>;
139 Function<IGrowableBuffer*(GrowableStorage&, bool), TypeErasedCaptureSize> getGrowableBuffer;
143 Span<char> writableData;
144 Span<const char> readonlyData;
146 AsyncBufferView::ID parentID;
148 friend struct AsyncBuffersPool;
153 Type type = Type::Empty;
193 [[nodiscard]]
size_t getNumBuffers()
const {
return buffers.sizeInElements(); }
219 static constexpr int MaxListeners = 8;
221 Event<MaxListeners, Result> eventError;
245 [[nodiscard]]
bool isEnded()
const {
return state == State::Ended; }
295 State state = State::Stopped;
299 CircularQueue<Request> readQueue;
321 static constexpr int MaxListeners = 8;
323 Event<MaxListeners, Result> eventError;
383 [[nodiscard]]
bool isStillWriting()
const {
return state == State::Writing or state == State::Ending; }
386 void stop() { state = State::Stopped; }
396 State state = State::Stopped;
398 AsyncBuffersPool* buffers =
nullptr;
400 CircularQueue<Request> writeQueue;
418 void afterFinalize(
Span<char> outputAfter,
bool streamEnded);
435 bool canEndTransform();
446 State state = State::None;
458 static constexpr int MaxListeners = 8;
459 static constexpr int MaxTransforms = 8;
460 static constexpr int MaxSinks = 8;
472 Event<MaxListeners, Result> eventError = {};
487 void emitError(
Result res);
488 Result checkBuffersPool();
unsigned char uint8_t
Platform independent (1) byte unsigned int.
Definition PrimitiveTypes.h:36
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
struct SC_COMPILER_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Definition AsyncStreams.h:52
A Span of bytes memory to be read or written by async streams.
Definition AsyncStreams.h:50
AsyncBufferView(T &&t)
Saves a copy (or a moved instance) of a String / Buffer (or anything that works with GrowableBuffer<T...
Definition AsyncStreams.h:94
void setReusable(bool reusable)
Tags this AsyncBufferView as reusable after its refCount goes to zero.
Definition AsyncStreams.h:88
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition AsyncStreams.h:160
Result getReadableData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
void setBuffers(Span< AsyncBufferView > newBuffers)
Sets memory for the new buffers.
Definition AsyncStreams.h:190
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
Result createChildView(AsyncBufferView::ID parentBufferID, size_t offset, size_t length, AsyncBufferView::ID &outChildBufferID)
Creates a child view that references a slice of the parent buffer.
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result getWritableData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
static Result sliceInEqualParts(Span< AsyncBufferView > buffers, Span< char > memory, size_t numSlices)
Splits a span of memory in equally sized slices, assigning them to buffers and marking them as reusab...
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Result pushBuffer(AsyncBufferView &&buffer, AsyncBufferView::ID &bufferID)
Adds a buffer to the pool in any empty slot (found by scanning from start to end)
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
size_t getNumBuffers() const
Gets size of buffers held by the pool.
Definition AsyncStreams.h:193
A stream that can both produce and consume buffers.
Definition AsyncStreams.h:405
Pipes read data from SC::AsyncReadableStream, forwarding them to SC::AsyncWritableStream.
Definition AsyncStreams.h:457
Result pipe()
Reports errors by source, transforms or sinks.
bool unpipe()
Unregisters all events from source, transforms and sinks.
Result start()
Starts the pipeline.
Definition AsyncStreams.h:213
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:211
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
bool push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition AsyncStreams.h:223
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition AsyncStreams.h:224
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition AsyncStreams.h:222
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
size_t getReadQueueSize() const
Returns the size of read queue.
Definition AsyncStreams.h:254
void destroy()
Forcefully destroys the readable stream before calling end event releasing all resources.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition AsyncStreams.h:245
constexpr void setReadQueue(Span< Request > requests)
Sets the read queue for this readable stream.
Definition AsyncStreams.h:251
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
Result unshift(AsyncBufferView::ID bufferID)
Push back a buffer to the front of the read queue (e.g. for un-consumed data)
Result start()
Starts the readable stream, that will emit eventData.
Function< Result()> asyncRead
Function that every stream must define to implement its custom read operation.
Definition AsyncStreams.h:217
Definition AsyncStreams.h:316
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:311
size_t getWriteQueueSize() const
Returns the size of write queue.
Definition AsyncStreams.h:337
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition AsyncStreams.h:326
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void destroy()
Forcefully destroys the writable stream before calling end event releasing all resources.
void emitError(Result error)
Signals an async error received.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition AsyncStreams.h:325
bool isStillWriting() const
Returns true if this stream is writing something.
Definition AsyncStreams.h:383
void resumeWriting()
Resumes writing queued requests for this stream.
Result init(AsyncBuffersPool &buffersPool)
Emitted when no more data can be written.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
Function< Result(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)>)> asyncWrite
Function that every stream must define to implement its custom write operation.
Definition AsyncStreams.h:313
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Function< bool()> canEndWritable
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
Definition AsyncStreams.h:377
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
Result write(AsyncBufferView &&bufferView, Function< void(AsyncBufferView::ID)> cb={})
Push a new buffer view to the queue, registering it with the allocator.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
constexpr void setWriteQueue(Span< Request > requests)
Sets the write queue for this writable stream.
Definition AsyncStreams.h:334
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29