Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
AsyncStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Foundation/AlignedStorage.h"
6#include "../Foundation/Function.h"
7#include "../Foundation/Internal/IGrowableBuffer.h"
8#include "../Foundation/Result.h"
9#include "../Foundation/Span.h"
10#include "Internal/CircularQueue.h"
11#include "Internal/Event.h"
12
42
45namespace SC
46{
47
49struct SC_COMPILER_EXPORT AsyncBufferView
50{
51 struct SC_COMPILER_EXPORT ID
52 {
53 using NumericType = int32_t;
54 NumericType identifier;
55 static constexpr NumericType InvalidValue = -1;
56
57 constexpr ID() : identifier(InvalidValue) {}
58 explicit constexpr ID(int32_t value) : identifier(value) {}
59
60 [[nodiscard]] constexpr bool operator==(ID other) const { return identifier == other.identifier; }
61 };
62 enum class Type : uint8_t
63 {
64 Empty,
65 Writable,
66 ReadOnly,
67 Growable,
68 };
69
70 AsyncBufferView() { type = Type::Empty; }
71 AsyncBufferView(Span<char> data) : writableData(data) { type = Type::Writable; }
72 AsyncBufferView(Span<const char> data) : readonlyData(data) { type = Type::ReadOnly; }
73
75 void setReusable(bool reusable) { reUse = reusable; }
76
80 template <typename T>
81 AsyncBufferView(T&& t) // universal reference, it can capture both lvalue and rvalue
82 {
83 type = Type::Growable;
84 // Here we're type-erasing T in our own inline storage provided by a slightly oversized Function<>
85 // that it will be able to construct (and destruct) the right GrowableBuffer<T> from just a piece of storage
86 // and return a pointer to the corresponding IGrowableBuffer* interface
87 getGrowableBuffer = [t = forward<T>(t)](GrowableStorage& storage, bool construct) mutable -> IGrowableBuffer*
88 {
89 using Type = typename TypeTraits::RemoveReference<T>::type;
90 if (construct)
91 {
92 placementNew(storage.reinterpret_as<GrowableBuffer<Type>>(), t);
93 return &storage.reinterpret_as<GrowableBuffer<Type>>();
94 }
95 else
96 {
97 dtor(storage.reinterpret_as<GrowableBuffer<Type>>());
98 return nullptr;
99 }
100 };
101 }
102
103 template <int N>
104 AsyncBufferView(const char (&literal)[N])
105 {
106 readonlyData = {literal, N - 1};
107 type = Type::ReadOnly;
108 }
109
110 Type getType() const { return type; }
111
112 private:
113 static constexpr int TypeErasedCaptureSize = sizeof(void*) * 3; // This is enough to hold String / Buffer by copy
114 static constexpr int TypeErasedGrowableSize = sizeof(void*) * 6;
115
116 using GrowableStorage = AlignedStorage<TypeErasedGrowableSize>;
117 Function<IGrowableBuffer*(GrowableStorage&, bool), TypeErasedCaptureSize> getGrowableBuffer;
118
119 union
120 {
121 Span<char> writableData = {};
122 Span<const char> readonlyData;
123 };
124
125 union
126 {
127 Span<char> originalWritableData = {};
128 Span<const char> originalReadonlyData;
129 };
130 friend struct AsyncBuffersPool;
131
132 int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it
133 Type type; // If it's Empty, Writable, ReadOnly or Growable
134 bool reUse = false; // If it can be re-used after refs == 0
135};
136
139struct SC_COMPILER_EXPORT AsyncBuffersPool
140{
143
147
150
153
156
158 Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
159
161 void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes);
162
165
167 static Result sliceInEqualParts(Span<AsyncBufferView> buffers, Span<char> memory, size_t numSlices);
168
170 void setBuffers(Span<AsyncBufferView> newBuffers) { buffers = newBuffers; }
171
173 [[nodiscard]] size_t getNumBuffers() const { return buffers.sizeInElements(); }
174
175 private:
177 Span<AsyncBufferView> buffers;
178};
179
186struct SC_COMPILER_EXPORT AsyncReadableStream
187{
188 struct Request
189 {
190 AsyncBufferView::ID bufferID;
191 };
194
195 static constexpr int MaxListeners = 8;
196
197 Event<MaxListeners, Result> eventError;
198 Event<MaxListeners, AsyncBufferView::ID> eventData;
199 Event<MaxListeners> eventEnd;
200 Event<MaxListeners> eventClose;
201
206
209
211 void pause();
212
215
218 void destroy();
219
221 [[nodiscard]] bool isEnded() const { return state == State::Ended; }
222
225
227 constexpr void setReadQueue(Span<Request> requests) { readQueue = requests; }
228
230 [[nodiscard]] size_t getReadQueueSize() const { return readQueue.size(); }
231
234 [[nodiscard]] bool push(AsyncBufferView::ID bufferID, size_t newSize);
235
237 void pushEnd();
238
241 void reactivate(bool doReactivate);
242
244 void emitError(Result error);
245
247 [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
248
249 private:
250 void emitOnData();
251 void executeRead();
252
253 enum class State
254 {
255 Stopped, // Stream must be inited
256 CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume)
257 Reading, // A read is being issued (may be sync or async)
258 SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync)
259 SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true)
260 AsyncReading, // An async read is in flight
261 AsyncPushing, // AsyncReading + AsyncReadableStream::push
262 Pausing, // Pause requested while read in flight
263 Paused, // Actually paused with no read in flight
264 Ended, // Emitted all data, no more data will be emitted
265 Destroyed, // Readable has been destroyed before emitting all data
266 Errored, // Error occurred
267 };
268 State state = State::Stopped;
269
270 AsyncBuffersPool* buffers = nullptr;
271
272 CircularQueue<Request> readQueue;
273};
274
283struct SC_COMPILER_EXPORT AsyncWritableStream
284{
287
288 struct Request
289 {
290 AsyncBufferView::ID bufferID;
291
293 };
294 static constexpr int MaxListeners = 8;
295
296 Event<MaxListeners, Result> eventError;
297
298 Event<MaxListeners> eventDrain;
299 Event<MaxListeners> eventFinish;
300
305
307 constexpr void setWriteQueue(Span<Request> requests) { writeQueue = requests; }
308
310 [[nodiscard]] size_t getWriteQueueSize() const { return writeQueue.size(); }
311
318 Result write(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb = {});
319
322 Result write(AsyncBufferView&& bufferView, Function<void(AsyncBufferView::ID)> cb = {});
323
326 void end();
327
330 void destroy();
331
334
336 void finishedWriting(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)>&& cb, Result res);
337
340
342 Result unshift(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)>&& cb);
343
345 void emitError(Result error);
346
351
353 void tryAsync(Result potentialError);
354
356 bool isStillWriting() const { return state == State::Writing or state == State::Ending; }
357
358 protected:
359 void stop() { state = State::Stopped; }
360
361 private:
362 enum class State
363 {
364 Stopped,
365 Writing,
366 Ending,
367 Ended
368 };
369 State state = State::Stopped;
370
371 AsyncBuffersPool* buffers = nullptr;
372
373 CircularQueue<Request> writeQueue;
374};
375
377struct SC_COMPILER_EXPORT AsyncDuplexStream : public AsyncReadableStream, public AsyncWritableStream
378{
380
381 Result init(AsyncBuffersPool& buffersPool, Span<AsyncReadableStream::Request> readableRequests,
382 Span<AsyncWritableStream::Request> writableRequests);
383};
384
386struct SC_COMPILER_EXPORT AsyncTransformStream : public AsyncDuplexStream
387{
389
390 void afterProcess(Span<const char> inputAfter, Span<char> outputAfter);
391 void afterFinalize(Span<char> outputAfter, bool streamEnded);
392
394 Function<Result(Span<char>)> onFinalize;
395
396 private:
397 Function<void(AsyncBufferView::ID)> inputCallback;
398
399 Span<const char> inputData;
400 Span<char> outputData;
401
402 AsyncBufferView::ID inputBufferID;
403 AsyncBufferView::ID outputBufferID;
404
405 Result transform(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
406 Result prepare(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
407
408 bool canEndTransform();
409 void tryFinalize();
410
411 enum class State
412 {
413 None,
414 Paused,
415 Processing,
416 Finalizing,
417 Finalized,
418 };
419 State state = State::None;
420};
421
429struct SC_COMPILER_EXPORT AsyncPipeline
430{
431 static constexpr int MaxListeners = 8;
432 static constexpr int MaxTransforms = 8;
433 static constexpr int MaxSinks = 8;
434
435 AsyncPipeline() = default;
436 AsyncPipeline(const AsyncPipeline&) = delete;
437 AsyncPipeline(AsyncPipeline&&) = delete;
438 AsyncPipeline& operator=(const AsyncPipeline&) = delete;
439 AsyncPipeline& operator=(AsyncPipeline&&) = delete;
441
442 AsyncReadableStream* source = nullptr;
443 AsyncDuplexStream* transforms[MaxTransforms] = {nullptr};
444 AsyncWritableStream* sinks[MaxSinks] = {nullptr};
445 Event<MaxListeners, Result> eventError = {};
446
450
452 [[nodiscard]] bool unpipe();
453
457
458 // TODO: Add a pause and cancel/step
459 private:
460 void emitError(Result res);
461 Result checkBuffersPool();
462 Result chainTransforms(AsyncReadableStream*& readable);
463 Result validate();
464
465 void asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncWritableStream& writable);
466 void dispatchToPipes(AsyncBufferView::ID bufferID);
467 void endPipes();
468 void afterSinkEnd();
469 void afterWrite(AsyncBufferView::ID bufferID);
470 bool listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen);
471};
472} // namespace SC
unsigned char uint8_t
Platform independent (1) byte unsigned int.
Definition PrimitiveTypes.h:36
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
struct SC_COMPILER_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Definition AsyncStreams.h:52
A Span of bytes memory to be read or written by async streams.
Definition AsyncStreams.h:50
AsyncBufferView(T &&t)
Saves a copy (or a moved instance) of a String / Buffer (or anything that works with GrowableBuffer<T...
Definition AsyncStreams.h:81
void setReusable(bool reusable)
Tags this AsyncBufferView as reusable after its refCount goes to zero.
Definition AsyncStreams.h:75
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition AsyncStreams.h:140
Result getReadableData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
void setBuffers(Span< AsyncBufferView > newBuffers)
Sets memory for the new buffers.
Definition AsyncStreams.h:170
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result getWritableData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
static Result sliceInEqualParts(Span< AsyncBufferView > buffers, Span< char > memory, size_t numSlices)
Splits a span of memory in equally sized slices, assigning them to buffers and marking them as reusab...
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Result pushBuffer(AsyncBufferView &&buffer, AsyncBufferView::ID &bufferID)
Adds a buffer to the pool in any empty slot (found by scanning from start to end)
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
size_t getNumBuffers() const
Gets size of buffers held by the pool.
Definition AsyncStreams.h:173
A stream that can both produce and consume buffers.
Definition AsyncStreams.h:378
Pipes read data from SC::AsyncReadableStream, forwarding them to SC::AsyncWritableStream.
Definition AsyncStreams.h:430
Result pipe()
Reports errors by source, transforms or sinks.
bool unpipe()
Unregisters all events from source, transforms and sinks.
Result start()
Starts the pipeline.
Definition AsyncStreams.h:189
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:187
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
bool push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition AsyncStreams.h:199
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition AsyncStreams.h:200
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition AsyncStreams.h:198
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
size_t getReadQueueSize() const
Returns the size of read queue.
Definition AsyncStreams.h:230
void destroy()
Forcefully destroys the readable stream before calling end event releasing all resources.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition AsyncStreams.h:221
constexpr void setReadQueue(Span< Request > requests)
Sets the read queue for this readable stream.
Definition AsyncStreams.h:227
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
Result start()
Starts the readable stream, that will emit eventData.
Function< Result()> asyncRead
Function that every stream must define to implement its custom read operation.
Definition AsyncStreams.h:193
A duplex stream that produces new buffers transforming received buffers.
Definition AsyncStreams.h:387
Definition AsyncStreams.h:289
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:284
size_t getWriteQueueSize() const
Returns the size of write queue.
Definition AsyncStreams.h:310
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition AsyncStreams.h:299
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void destroy()
Forcefully destroys the writable stream before calling end event releasing all resources.
void emitError(Result error)
Signals an async error received.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition AsyncStreams.h:298
bool isStillWriting() const
Returns true if this stream is writing something.
Definition AsyncStreams.h:356
void resumeWriting()
Resumes writing queued requests for this stream.
Result init(AsyncBuffersPool &buffersPool)
Emitted when no more data can be written.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
Function< Result(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)>)> asyncWrite
Function that every stream must define to implement its custom write operation.
Definition AsyncStreams.h:286
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Function< bool()> canEndWritable
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
Definition AsyncStreams.h:350
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
Result write(AsyncBufferView &&bufferView, Function< void(AsyncBufferView::ID)> cb={})
Push a new buffer view to the queue, registering it with the allocator.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
constexpr void setWriteQueue(Span< Request > requests)
Sets the write queue for this writable stream.
Definition AsyncStreams.h:307
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29