Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
AsyncStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Foundation/Function.h"
6#include "../Foundation/Result.h"
7#include "../Foundation/Span.h"
8#include "Internal/CircularQueue.h"
9#include "Internal/Event.h"
10
40
43namespace SC
44{
47{
48 struct ID
49 {
50 using NumericType = int32_t;
51 NumericType identifier;
52 static constexpr NumericType InvalidValue = -1;
53
54 constexpr ID() : identifier(InvalidValue) {}
55 explicit constexpr ID(int32_t value) : identifier(value) {}
56
57 [[nodiscard]] constexpr bool operator==(ID other) const { return identifier == other.identifier; }
58 };
59
60 Span<char> data;
61
62 private:
63 Span<char> originalData;
64 friend struct AsyncBuffersPool;
65
66 int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it
67};
68
98
106{
107 struct Request
108 {
109 AsyncBufferView::ID bufferID;
110 };
113
114 static constexpr int MaxListeners = 8;
115
116 Event<MaxListeners, Result> eventError;
117 Event<MaxListeners, AsyncBufferView::ID> eventData;
118 Event<MaxListeners> eventEnd;
119 Event<MaxListeners> eventClose;
120
124 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
125
128
130 void pause();
131
134
136 void destroy();
137
139 [[nodiscard]] bool isEnded() const { return state == State::Ended; }
140
143
145 void push(AsyncBufferView::ID bufferID, size_t newSize);
146
148 void pushEnd();
149
152 void reactivate(bool doReactivate);
153
155 void emitError(Result error);
156
158 [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
159
160 private:
161 void emitOnData();
162 void executeRead();
163
164 enum class State
165 {
166 Stopped, // Stream must be inited
167 CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume)
168 Reading, // A read is being issued (may be sync or async)
169 SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync)
170 SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true)
171 AsyncReading, // An async read is in flight
172 AsyncPushing, // AsyncReading + AsyncReadableStream::push
173 Pausing, // Pause requested while read in flight
174 Paused, // Actually paused with no read in flight
175 Ended, // Emitted all data, no more data will be emitted
176 Destroying, // Readable is waiting for async call before
177 Destroyed, // Readable has been destroyed before emitting all data
178 Errored, // Error occurred
179 };
180 State state = State::Stopped;
181
182 AsyncBuffersPool* buffers = nullptr;
183
184 CircularQueue<Request> readQueue;
185};
186
196{
199
200 struct Request
201 {
202 AsyncBufferView::ID bufferID;
203
205 };
206 static constexpr int MaxListeners = 8;
207
208 Event<MaxListeners, Result> eventError;
209 Event<MaxListeners> eventDrain;
210 Event<MaxListeners> eventFinish;
211
215 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
216
224
228
231 template <size_t N>
232 Result write(const char (&str)[N])
233 {
234 return write(Span<const char>(str, N - 1));
235 }
236
239 void end();
240
243
246
249
252
254 void emitError(Result error);
255
260
262 void tryAsync(Result potentialError);
263
264 void stop() { state = State::Stopped; }
265
266 private:
267 enum class State
268 {
269 Stopped,
270 Writing,
271 Ending,
272 Ended
273 };
274 State state = State::Stopped;
275
276 AsyncBuffersPool* buffers = nullptr;
277
278 CircularQueue<Request> writeQueue;
279};
280
283{
285
286 Result init(AsyncBuffersPool& buffersPool, Span<AsyncReadableStream::Request> readableRequests,
287 Span<AsyncWritableStream::Request> writableRequests);
288};
289
292{
294
295 void afterProcess(Span<const char> inputAfter, Span<char> outputAfter);
296 void afterFinalize(Span<char> outputAfter, bool streamEnded);
297
299 Function<Result(Span<char>)> onFinalize;
300
301 private:
302 Function<void(AsyncBufferView::ID)> inputCallback;
303
304 Span<const char> inputData;
305 Span<char> outputData;
306
307 AsyncBufferView::ID inputBufferID;
308 AsyncBufferView::ID outputBufferID;
309
310 Result transform(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
311 Result prepare(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
312
313 bool canEndTransform();
314 void tryFinalize();
315
316 enum class State
317 {
318 None,
319 Paused,
320 Processing,
321 Finalizing,
322 Finalized,
323 };
324 State state = State::None;
325};
326
335{
336 static constexpr int MaxListeners = 8;
337
338 AsyncPipeline() = default;
339 AsyncPipeline(const AsyncPipeline&) = delete;
340 AsyncPipeline(AsyncPipeline&&) = delete;
341 AsyncPipeline& operator=(const AsyncPipeline&) = delete;
342 AsyncPipeline& operator=(AsyncPipeline&&) = delete;
344 Event<MaxListeners, Result> eventError;
345
348
351 Span<AsyncWritableStream*> asyncSinks);
352
354 [[nodiscard]] bool unpipe();
355
359
360 // TODO: Add a pause and cancel/step
361 private:
362 AsyncReadableStream* source = nullptr;
363
365
366 Span<AsyncDuplexStream*> transforms;
367
368 void emitError(Result res);
369 Result checkBuffersPool();
370 Result chainTransforms(AsyncReadableStream*& readable);
371
372 void asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncWritableStream& writable);
373 void dispatchToPipes(AsyncBufferView::ID bufferID);
374 void endPipes();
375 void afterWrite(AsyncBufferView::ID bufferID);
376 bool listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen);
377};
378} // namespace SC
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
Definition AsyncStreams.h:49
A Span of bytes memory to be read or written by async streams.
Definition AsyncStreams.h:47
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition AsyncStreams.h:72
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
Result getData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Span< AsyncBufferView > buffers
Span of buffers to be filled in by the user.
Definition AsyncStreams.h:74
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
Result getData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
A stream that can both produce and consume buffers.
Definition AsyncStreams.h:283
Pipes reads on SC::AsyncReadableStream to SC::AsyncWritableStream.
Definition AsyncStreams.h:335
bool unpipe()
Unregisters all events from source, transforms ans sinks.
Result pipe(AsyncReadableStream &asyncSource, Span< AsyncDuplexStream * > asyncTransforms, Span< AsyncWritableStream * > asyncSinks)
Inits the pipeline with a source, transforms and some writable sinks.
Result start()
Starts the pipeline.
Result pipe(AsyncReadableStream &asyncSource, Span< AsyncWritableStream * > asyncSinks)
Reports errors by source, transforms or sinks.
Definition AsyncStreams.h:108
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:106
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition AsyncStreams.h:118
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition AsyncStreams.h:119
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition AsyncStreams.h:117
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when the underlying resource has been closed.
void destroy()
Forcefully destroys the readable stream before it's end event releasing all resources.
void push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition AsyncStreams.h:139
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
Result start()
Starts the readable stream, that will emit eventData.
Function< Result()> asyncRead
Function that every stream must define to implement its custom read operation.
Definition AsyncStreams.h:112
A duplex stream that produces new buffers transforming received buffers.
Definition AsyncStreams.h:292
Definition AsyncStreams.h:201
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:196
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition AsyncStreams.h:210
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void emitError(Result error)
Signals an async error received.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition AsyncStreams.h:209
void resumeWriting()
Resumes writing queued requests for this stream.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
Function< Result(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)>)> asyncWrite
Function that every stream must define to implement its custom write operation.
Definition AsyncStreams.h:198
Result write(const char(&str)[N])
Write a C-string literal in the stream.
Definition AsyncStreams.h:232
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when no more data can be written.
Function< bool()> canEndWritable
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
Definition AsyncStreams.h:259
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
Result write(Span< const char > data, Function< void(AsyncBufferView::ID)> cb={})
Try requesting a buffer big enough and copy data into it.
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:12
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29