Sane C++ Libraries
C++ Platform Abstraction Libraries
AsyncStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Foundation/Function.h"
6#include "../Foundation/Result.h"
7#include "../Foundation/Span.h"
8#include "../Foundation/StrongID.h"
9#include "Internal/CircularQueue.h"
10#include "Internal/Event.h"
11
17
20namespace SC
21{
22
25{
26 struct Tag
27 {
28 };
29 using ID = StrongID<Tag>;
30 Span<char> data;
31
32 private:
33 Span<char> originalData;
34 friend struct AsyncBuffersPool;
35
36 int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it
37};
38
42{
45
48
52
55
58
61
63 Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
64
66 void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes);
67};
68
76{
77 struct Request
78 {
79 AsyncBufferView::ID bufferID;
80 };
83
84 static constexpr int MaxListeners = 8;
85
86 Event<MaxListeners, Result> eventError;
87 Event<MaxListeners, AsyncBufferView::ID> eventData;
88 Event<MaxListeners> eventEnd;
89 Event<MaxListeners> eventClose;
90
94 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
95
98
100 void pause();
101
103 void resume();
104
106 void destroy();
107
109 [[nodiscard]] bool isEnded() const { return state == State::Ended; }
110
113
115 void push(AsyncBufferView::ID bufferID, size_t newSize);
116
118 void pushEnd();
119
122 void reactivate(bool doReactivate);
123
125 void emitError(Result error);
126
128 [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
129
130 private:
131 void emitOnData();
132 void executeRead();
133
134 enum class State
135 {
136 Stopped, // Stream must be inited
137 CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume)
138 Reading, // A read is being issued (may be sync or async)
139 SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync)
140 SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true)
141 AsyncReading, // An async read is in flight
142 AsyncPushing, // AsyncReading + AsyncReadableStream::push
143 Pausing, // Pause requested while read in flight
144 Paused, // Actually paused with no read in flight
145 Ended, // Emitted all data, no more data will be emitted
146 Destroying, // Readable is waiting for async call before
147 Destroyed, // Readable has been destroyed before emitting all data
148 Errored, // Error occurred
149 };
150 State state = State::Stopped;
151
152 AsyncBuffersPool* buffers = nullptr;
153
154 CircularQueue<Request> readQueue;
155};
156
166{
169
170 struct Request
171 {
172 AsyncBufferView::ID bufferID;
173
175 };
176 static constexpr int MaxListeners = 8;
177
178 Event<MaxListeners, Result> eventError;
179 Event<MaxListeners> eventDrain;
180 Event<MaxListeners> eventFinish;
181
185 Result init(AsyncBuffersPool& buffersPool, Span<Request> requests);
186
194
198
201 template <size_t N>
202 [[nodiscard]] Result write(const char (&str)[N])
203 {
204 return write(Span<const char>(str, N - 1));
205 }
206
209 void end();
210
213
216
217 private:
218 bool tryAsync(Result potentialError);
219
220 enum class State
221 {
222 Stopped,
223 Writing,
224 Ending,
225 Ended
226 };
227 State state = State::Stopped;
228
229 AsyncBuffersPool* buffers = nullptr;
230
231 CircularQueue<Request> writeQueue;
232};
233
242{
243 static constexpr int MaxListeners = 8;
244 Event<MaxListeners, Result> eventError;
245
246 // TODO: Make all these private
248
249 struct Sink
250 {
251 AsyncWritableStream* sink = nullptr;
252 };
253 Span<Sink> sinks;
254
258
259 // TODO: Add a pause and cancel/step
260 private:
261 void onBufferRead(AsyncBufferView::ID bufferID);
262 void onBufferWritten(AsyncBufferView::ID bufferID);
263};
264} // namespace SC
int int32_t
Platform independent (4) bytes signed int.
Definition: PrimitiveTypes.h:46
Definition: AsyncStreams.h:27
A Span of bytes memory to be read or written by async streams.
Definition: AsyncStreams.h:25
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition: AsyncStreams.h:42
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
Result getData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Span< AsyncBufferView > buffers
Span of buffers to be filled in by the user.
Definition: AsyncStreams.h:44
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
Result getData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
User specified source.
Definition: AsyncStreams.h:250
Pipes reads on SC::AsyncReadableStream to SC::AsyncWritableStream.
Definition: AsyncStreams.h:242
AsyncReadableStream * source
Emitted when an error occurs.
Definition: AsyncStreams.h:247
Result start()
User specified sinks.
Definition: AsyncStreams.h:78
Async source abstraction emitting data events in caller provided byte buffers.
Definition: AsyncStreams.h:76
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition: AsyncStreams.h:88
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition: AsyncStreams.h:89
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition: AsyncStreams.h:87
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
void resume()
Resumes the readable stream paused by AsyncReadableStream::pause.
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when the underlying resource has been closed.
void destroy()
Forcefully destroys the readable stream before it's end event releasing all resources.
void push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition: AsyncStreams.h:109
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
Result start()
Starts the readable stream, that will emit eventData.
Function< Result()> asyncRead
Function that every stream must define to implement its custom read operation.
Definition: AsyncStreams.h:82
Definition: AsyncStreams.h:171
Async destination abstraction where bytes can be written to.
Definition: AsyncStreams.h:166
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition: AsyncStreams.h:180
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition: AsyncStreams.h:179
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
Function< Result(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)>)> asyncWrite
Function that every stream must define to implement its custom write operation.
Definition: AsyncStreams.h:168
Result write(const char(&str)[N])
Write a C-string literal in the stream.
Definition: AsyncStreams.h:202
Result init(AsyncBuffersPool &buffersPool, Span< Request > requests)
Emitted when no more data can be written.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
Result write(Span< const char > data, Function< void(AsyncBufferView::ID)> cb={})
Try requesting a buffer big enough and copy data into it.
Wraps function pointers, member functions and lambdas without ever allocating.
Definition: Function.h:50
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition: Result.h:11
View over a contiguous sequence of items (pointer + size in elements).
Definition: Span.h:20