Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
AsyncStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Foundation/Function.h"
6#include "../Foundation/Result.h"
7#include "../Foundation/Span.h"
8#include "Internal/CircularQueue.h"
9#include "Internal/Event.h"
10
40
43namespace SC
44{
47{
48 struct ID
49 {
50 using NumericType = int32_t;
51 NumericType identifier;
52 static constexpr NumericType InvalidValue = -1;
53
54 constexpr ID() : identifier(InvalidValue) {}
55 explicit constexpr ID(int32_t value) : identifier(value) {}
56
57 [[nodiscard]] constexpr bool operator==(ID other) const { return identifier == other.identifier; }
58 [[nodiscard]] constexpr bool isValid() const { return identifier != InvalidValue; }
59 };
60
61 Span<char> data;
62
63 private:
64 Span<char> originalData;
65 friend struct AsyncBuffersPool;
66
67 int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it
68};
69
99
107{
108 struct Request
109 {
110 AsyncBufferView::ID bufferID;
111 };
114
115 static constexpr int MaxListeners = 8;
116
117 Event<MaxListeners, Result> eventError;
118 Event<MaxListeners, AsyncBufferView::ID> eventData;
119 Event<MaxListeners> eventEnd;
120 Event<MaxListeners> eventClose;
121
125 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
126
129
131 void pause();
132
135
137 void destroy();
138
140 [[nodiscard]] bool isEnded() const { return state == State::Ended; }
141
144
146 void push(AsyncBufferView::ID bufferID, size_t newSize);
147
149 void pushEnd();
150
153 void reactivate(bool doReactivate);
154
156 void emitError(Result error);
157
159 [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
160
161 private:
162 void emitOnData();
163 void executeRead();
164
165 enum class State
166 {
167 Stopped, // Stream must be inited
168 CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume)
169 Reading, // A read is being issued (may be sync or async)
170 SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync)
171 SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true)
172 AsyncReading, // An async read is in flight
173 AsyncPushing, // AsyncReading + AsyncReadableStream::push
174 Pausing, // Pause requested while read in flight
175 Paused, // Actually paused with no read in flight
176 Ended, // Emitted all data, no more data will be emitted
177 Destroying, // Readable is waiting for async call before
178 Destroyed, // Readable has been destroyed before emitting all data
179 Errored, // Error occurred
180 };
181 State state = State::Stopped;
182
183 AsyncBuffersPool* buffers = nullptr;
184
185 CircularQueue<Request> readQueue;
186};
187
197{
200
201 struct Request
202 {
203 AsyncBufferView::ID bufferID;
204
206 };
207 static constexpr int MaxListeners = 8;
208
209 Event<MaxListeners, Result> eventError;
210 Event<MaxListeners> eventDrain;
211 Event<MaxListeners> eventFinish;
212
216 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
217
225
229
232 template <size_t N>
233 [[nodiscard]] Result write(const char (&str)[N])
234 {
235 return write(Span<const char>(str, N - 1));
236 }
237
240 void end();
241
244
247
250
253
255 void emitError(Result error);
256
261
263 void tryAsync(Result potentialError);
264
265 void stop() { state = State::Stopped; }
266
267 private:
268 enum class State
269 {
270 Stopped,
271 Writing,
272 Ending,
273 Ended
274 };
275 State state = State::Stopped;
276
277 AsyncBuffersPool* buffers = nullptr;
278
279 CircularQueue<Request> writeQueue;
280};
281
284{
286
287 Result init(AsyncBuffersPool& buffersPool, Span<AsyncReadableStream::Request> readableRequests,
288 Span<AsyncWritableStream::Request> writableRequests);
289};
290
293{
295
296 void afterProcess(Span<const char> inputAfter, Span<char> outputAfter);
297 void afterFinalize(Span<char> outputAfter, bool streamEnded);
298
300 Function<Result(Span<char>)> onFinalize;
301
302 private:
303 Function<void(AsyncBufferView::ID)> inputCallback;
304
305 Span<const char> inputData;
306 Span<char> outputData;
307
308 AsyncBufferView::ID inputBufferID;
309 AsyncBufferView::ID outputBufferID;
310
311 Result transform(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
312 Result prepare(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
313
314 bool canEndTransform();
315 void tryFinalize();
316
317 enum class State
318 {
319 None,
320 Paused,
321 Processing,
322 Finalizing,
323 Finalized,
324 };
325 State state = State::None;
326};
327
336{
337 static constexpr int MaxListeners = 8;
338
339 AsyncPipeline() = default;
340 AsyncPipeline(const AsyncPipeline&) = delete;
341 AsyncPipeline(AsyncPipeline&&) = delete;
342 AsyncPipeline& operator=(const AsyncPipeline&) = delete;
343 AsyncPipeline& operator=(AsyncPipeline&&) = delete;
345 Event<MaxListeners, Result> eventError;
346
349
352 Span<AsyncWritableStream*> asyncSinks);
353
355 [[nodiscard]] bool unpipe();
356
360
361 // TODO: Add a pause and cancel/step
362 private:
363 AsyncReadableStream* source = nullptr;
364
366
367 Span<AsyncDuplexStream*> transforms;
368
369 void emitError(Result res);
370 Result checkBuffersPool();
371 Result chainTransforms(AsyncReadableStream*& readable);
372
373 void asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncWritableStream& writable);
374 void dispatchToPipes(AsyncBufferView::ID bufferID);
375 void endPipes();
376 void afterWrite(AsyncBufferView::ID bufferID);
377 bool listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen);
378};
379} // namespace SC
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
Definition AsyncStreams.h:49
A Span of bytes memory to be read or written by async streams.
Definition AsyncStreams.h:47
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition AsyncStreams.h:73
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
Result getData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Span< AsyncBufferView > buffers
Span of buffers to be filled in by the user.
Definition AsyncStreams.h:75
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
Result getData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
A stream that can both produce and consume buffers.
Definition AsyncStreams.h:284
Pipes reads on SC::AsyncReadableStream to SC::AsyncWritableStream.
Definition AsyncStreams.h:336
bool unpipe()
Unregisters all events from source, transforms ans sinks.
Result pipe(AsyncReadableStream &asyncSource, Span< AsyncDuplexStream * > asyncTransforms, Span< AsyncWritableStream * > asyncSinks)
Inits the pipeline with a source, transforms and some writable sinks.
Result start()
Starts the pipeline.
Result pipe(AsyncReadableStream &asyncSource, Span< AsyncWritableStream * > asyncSinks)
Reports errors by source, transforms or sinks.
Definition AsyncStreams.h:109
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:107
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition AsyncStreams.h:119
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition AsyncStreams.h:120
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition AsyncStreams.h:118
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when the underlying resource has been closed.
void destroy()
Forcefully destroys the readable stream before it's end event releasing all resources.
void push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition AsyncStreams.h:140
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
Result start()
Starts the readable stream, that will emit eventData.
Function< Result()> asyncRead
Function that every stream must define to implement its custom read operation.
Definition AsyncStreams.h:113
A duplex stream that produces new buffers transforming received buffers.
Definition AsyncStreams.h:293
Definition AsyncStreams.h:202
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:197
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition AsyncStreams.h:211
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void emitError(Result error)
Signals an async error received.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition AsyncStreams.h:210
void resumeWriting()
Resumes writing queued requests for this stream.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
Function< Result(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)>)> asyncWrite
Function that every stream must define to implement its custom write operation.
Definition AsyncStreams.h:199
Result write(const char(&str)[N])
Write a C-string literal in the stream.
Definition AsyncStreams.h:233
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when no more data can be written.
Function< bool()> canEndWritable
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
Definition AsyncStreams.h:260
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
Result write(Span< const char > data, Function< void(AsyncBufferView::ID)> cb={})
Try requesting a buffer big enough and copy data into it.
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:12
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29