Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
AsyncStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Common/CompilerMacrosExport.h"
6#include "../Common/PlatformMacrosInstructionSet.h"
7#ifndef SC_EXPORT_LIBRARY_ASYNC_STREAMS
8#define SC_EXPORT_LIBRARY_ASYNC_STREAMS 0
9#endif
10#define SC_ASYNC_STREAMS_EXPORT SC_COMPILER_LIBRARY_EXPORT(SC_EXPORT_LIBRARY_ASYNC_STREAMS)
11
12#include "../Common/AlignedStorage.h"
13#include "../Common/Assert.h"
14#include "../Common/Function.h"
15#include "../Common/IGrowableBufferSpan.h"
16#include "../Common/PlatformMacrosInstructionSet.h"
17#include "../Common/Result.h"
18#include "../Common/Span.h"
19#include "Internal/CircularQueue.h"
20#include "Internal/Event.h"
21
51
54namespace SC
55{
56SC_DECLARE_ASSERT_PROVIDER(AsyncStreamsAssert, SC_ASYNC_STREAMS_EXPORT);
57
58#define SC_ASYNC_STREAMS_ASSERT_RELEASE(e) SC_ASSERT_PROVIDER_RELEASE(SC::AsyncStreamsAssert, e)
59#define SC_ASYNC_STREAMS_ASSERT_DEBUG(e) SC_ASSERT_PROVIDER_DEBUG(SC::AsyncStreamsAssert, e)
60#define SC_ASYNC_STREAMS_TRUST_RESULT(expression) SC_ASYNC_STREAMS_ASSERT_RELEASE(expression)
61
63struct SC_ASYNC_STREAMS_EXPORT AsyncBufferView
64{
65 struct SC_ASYNC_STREAMS_EXPORT ID
66 {
67 using NumericType = int32_t;
68
69 static constexpr NumericType InvalidValue = -1;
70
71 NumericType identifier;
72
73 constexpr ID() : identifier(InvalidValue) {}
74 explicit constexpr ID(int32_t value) : identifier(value) {}
75
76 [[nodiscard]] constexpr bool isValid() const { return identifier != InvalidValue; }
77 [[nodiscard]] constexpr bool operator==(ID other) const { return identifier == other.identifier; }
78 };
79 enum class Type : uint8_t
80 {
81 Empty,
82 Writable,
83 ReadOnly,
84 Growable,
85 Child,
86 };
87
88 AsyncBufferView() : writableData(), offset(0), length(0), refs(0), type(Type::Empty), reUse(false) {}
89 AsyncBufferView(Span<char> data) : writableData(data)
90 {
91 type = Type::Writable;
92 offset = 0;
93 length = data.sizeInBytes();
94 parentID = ID();
95 }
96 AsyncBufferView(Span<const char> data) : readonlyData(data)
97 {
98 type = Type::ReadOnly;
99 offset = 0;
100 length = data.sizeInBytes();
101 parentID = ID();
102 }
103
105 void setReusable(bool reusable) { reUse = reusable; }
106
110 template <typename T>
111 AsyncBufferView(T&& t) // universal reference, it can capture both lvalue and rvalue
112 {
113 type = Type::Growable;
114 offset = 0;
115 length = 0;
116 parentID = ID();
117 // Here we're type-erasing T in our own inline storage provided by a slightly oversized Function<>
118 // that it will be able to construct (and destruct) the right GrowableBuffer<T> from just a piece of storage
119 // and return a pointer to the corresponding IGrowableBuffer* interface
120 getGrowableBuffer = [t = forward<T>(t)](GrowableStorage& storage, bool construct) mutable -> IGrowableBuffer*
121 {
122 using Type = typename TypeTraits::RemoveReference<T>::type;
123 if (construct)
124 {
125 placementNew(storage.reinterpret_as<GrowableBuffer<Type>>(), t);
126 return &storage.reinterpret_as<GrowableBuffer<Type>>();
127 }
128 else
129 {
130 dtor(storage.reinterpret_as<GrowableBuffer<Type>>());
131 return nullptr;
132 }
133 };
134 }
135
136 template <int N>
137 AsyncBufferView(const char (&literal)[N])
138 {
139 readonlyData = {literal, N - 1};
140 type = Type::ReadOnly;
141 offset = 0;
142 length = N - 1;
143 }
144
145 Type getType() const { return type; }
146
147 private:
148#if SC_PLATFORM_64_BIT
149 static constexpr int TypeErasedCaptureSize = sizeof(void*) * 3; // This is enough to hold String / Buffer by copy
150#else
151 static constexpr int TypeErasedCaptureSize = sizeof(void*) * 6; // This is enough to hold String / Buffer by copy
152#endif
153 static constexpr int TypeErasedGrowableSize = sizeof(void*) * 6;
154
155 using GrowableStorage = AlignedStorage<TypeErasedGrowableSize>;
156 Function<IGrowableBuffer*(GrowableStorage&, bool), TypeErasedCaptureSize> getGrowableBuffer;
157
158 union
159 {
160 Span<char> writableData;
161 Span<const char> readonlyData;
162 };
163 AsyncBufferView::ID parentID;
164
165 friend struct AsyncBuffersPool;
166
167 size_t offset = 0;
168 size_t length = 0;
169 int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it
170 Type type = Type::Empty; // If it's Empty, Writable, ReadOnly, Growable or Child
171 bool reUse = false; // If it can be re-used after refs == 0
172};
173
176struct SC_ASYNC_STREAMS_EXPORT AsyncBuffersPool
177{
180
184
186 Result getReadableData(AsyncBufferView::ID bufferID, Span<const char>& data);
187
189 Result getWritableData(AsyncBufferView::ID bufferID, Span<char>& data);
190
193
195 Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
196
198 void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes);
199
201 Result pushBuffer(AsyncBufferView&& buffer, AsyncBufferView::ID& bufferID);
202
204 static Result sliceInEqualParts(Span<AsyncBufferView> buffers, Span<char> memory, size_t numSlices);
205
207 void setBuffers(Span<AsyncBufferView> newBuffers) { buffers = newBuffers; }
208
210 [[nodiscard]] size_t getNumBuffers() const { return buffers.sizeInElements(); }
211
213 Result createChildView(AsyncBufferView::ID parentBufferID, size_t offset, size_t length,
214 AsyncBufferView::ID& outChildBufferID);
215
216 private:
218 Span<AsyncBufferView> buffers;
219};
220
227struct SC_ASYNC_STREAMS_EXPORT AsyncReadableStream
228{
231
232 struct Request
233 {
234 AsyncBufferView::ID bufferID;
235 };
236
237 static constexpr int MaxListeners = 8;
238
239 Event<MaxListeners, Result> eventError;
240 Event<MaxListeners, AsyncBufferView::ID> eventData;
241 Event<MaxListeners> eventEnd;
242 Event<MaxListeners> eventClose;
243
247 Result init(AsyncBuffersPool& buffersPool);
248
250 Result start();
251
253 void pause();
254
257
260 void destroy();
261
263 [[nodiscard]] bool isEnded() const { return state == State::Ended; }
264
266 [[nodiscard]] bool hasQueuedData() const { return not readQueue.isEmpty(); }
267
269 [[nodiscard]] bool hasBeenDestroyed() const { return destroyed; }
270
272 [[nodiscard]] bool canStart() const { return state == State::CanRead; }
273
276
278 constexpr void setReadQueue(Span<Request> requests) { readQueue = requests; }
279
281 [[nodiscard]] size_t getReadQueueSize() const { return readQueue.size(); }
282
285 [[nodiscard]] bool push(AsyncBufferView::ID bufferID, size_t newSize);
286
288 Result unshift(AsyncBufferView::ID bufferID);
289
291 void pushEnd();
292
295 void reactivate(bool doReactivate);
296
298 void emitError(Result error);
299
301 [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span<char>& data);
302
304 void setAutoDestroy(bool value) { autoDestroy = value; }
305
307 [[nodiscard]] bool getAutoDestroy() const { return autoDestroy; }
308
309 protected:
310 virtual ~AsyncReadableStream();
311
313 virtual Result asyncRead() = 0;
314
316 virtual Result asyncResumeReading();
317
320 virtual Result asyncDestroyReadable();
321
324
325 private:
326 void maybeDestroyEndedReadable();
327 void emitOnData();
328 void executeRead();
329
330 enum class State : uint8_t
331 {
332 Stopped, // Stream must be inited
333 CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume)
334 Reading, // A read is being issued (may be sync or async)
335 SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync)
336 SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true)
337 AsyncReading, // An async read is in flight
338 AsyncPushing, // AsyncReading + AsyncReadableStream::push
339 Pausing, // Pause requested while read in flight
340 Paused, // Actually paused with no read in flight
341 Ended, // Emitted all data, no more data will be emitted
342 Destroying, // Destroy has been requested and it's in progress
343 Errored, // Error occurred
344 };
345 State state = State::Stopped;
346
347 bool destroyed = false;
348 bool autoDestroy = true;
349
350 AsyncBuffersPool* buffers = nullptr;
351
352 CircularQueue<Request> readQueue;
353};
354
363struct SC_ASYNC_STREAMS_EXPORT AsyncWritableStream
364{
367
368 friend struct AsyncPipeline;
369
370 struct Request
371 {
372 AsyncBufferView::ID bufferID;
373
374 Function<void(AsyncBufferView::ID)> cb;
375 };
376 static constexpr int MaxListeners = 8;
377
378 Event<MaxListeners, Result> eventError;
379
380 Event<MaxListeners> eventDrain;
381 Event<MaxListeners> eventFinish;
382 Event<MaxListeners> eventClose;
383
387 Result init(AsyncBuffersPool& buffersPool);
388
390 constexpr void setWriteQueue(Span<Request> requests) { writeQueue = requests; }
391
393 [[nodiscard]] size_t getWriteQueueSize() const { return writeQueue.size(); }
394
401 Result write(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb = {});
402
405 Result write(AsyncBufferView&& bufferView, Function<void(AsyncBufferView::ID)> cb = {});
406
409 void end();
410
413 void destroy();
414
417
419 void finishedWriting(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)>&& cb, Result res);
420
423
425 Result unshift(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)>&& cb);
426
428 void emitError(Result error);
429
431 void tryAsync(Result potentialError);
432
434 [[nodiscard]] bool isStillWriting() const { return state == State::Writing or state == State::Ending; }
435
437 [[nodiscard]] bool hasBeenDestroyed() const { return destroyed; }
438
440 void setAutoDestroy(bool value) { autoDestroy = value; }
441
443 [[nodiscard]] bool getAutoDestroy() const { return autoDestroy; }
444
445 protected:
446 virtual ~AsyncWritableStream();
447
449 virtual Result asyncWrite(AsyncBufferView::ID, Function<void(AsyncBufferView::ID)> func) = 0;
450
454 virtual bool canEndWritable();
455
458 virtual Result asyncDestroyWritable();
459
462
463 void stop() { state = State::Stopped; }
464
465 private:
466 [[nodiscard]] bool canAcceptWrite() const
467 {
468 return (state == State::Stopped or state == State::Writing) and not writeQueue.isFull();
469 }
470
471 enum class State : uint8_t
472 {
473 Stopped,
474 Writing,
475 Ending,
476 Ended,
477 Destroying,
478 Errored,
479 };
480 State state = State::Stopped;
481
482 bool destroyed = false;
483 bool autoDestroy = true;
484
485 AsyncBuffersPool* buffers = nullptr;
486
487 CircularQueue<Request> writeQueue;
488};
489
491struct SC_ASYNC_STREAMS_EXPORT AsyncDuplexStream : public AsyncReadableStream, public AsyncWritableStream
492{
494
495 Result init(AsyncBuffersPool& buffersPool, Span<AsyncReadableStream::Request> readableRequests,
496 Span<AsyncWritableStream::Request> writableRequests);
497
498 virtual Result asyncRead() override;
499};
500
502struct SC_ASYNC_STREAMS_EXPORT AsyncTransformStream : public AsyncDuplexStream
503{
505
506 void afterProcess(Span<const char> inputAfter, Span<char> outputAfter);
507 void afterFinalize(Span<char> outputAfter, bool streamEnded);
508
509 virtual Result onProcess(Span<const char>, Span<char>) = 0;
510 virtual Result onFinalize(Span<char>) = 0;
511
512 private:
513 virtual Result asyncWrite(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb) override;
514 virtual bool canEndWritable() override;
515
516 Function<void(AsyncBufferView::ID)> inputCallback;
517
518 Span<const char> inputData;
519 Span<char> outputData;
520
521 AsyncBufferView::ID inputBufferID;
522 AsyncBufferView::ID outputBufferID;
523
524 Result prepare(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
525
526 void tryFinalize();
527
528 enum class State
529 {
530 None,
531 Paused,
532 Processing,
533 Finalizing,
534 Finalized,
535 };
536 State state = State::None;
537};
538
541struct SC_ASYNC_STREAMS_EXPORT AsyncStreams
542{
548};
549
557struct SC_ASYNC_STREAMS_EXPORT AsyncPipeline
558{
559 static constexpr int MaxListeners = 8;
560 static constexpr int MaxTransforms = 8;
561 static constexpr int MaxSinks = 8;
562
563 AsyncPipeline() = default;
564 AsyncPipeline(const AsyncPipeline&) = delete;
565 AsyncPipeline(AsyncPipeline&&) = delete;
566 AsyncPipeline& operator=(const AsyncPipeline&) = delete;
567 AsyncPipeline& operator=(AsyncPipeline&&) = delete;
569
570 AsyncReadableStream* source = nullptr;
571 AsyncDuplexStream* transforms[MaxTransforms] = {nullptr};
572 AsyncWritableStream* sinks[MaxSinks] = {nullptr};
573 Event<MaxListeners, Result> eventError = {};
574
577 Result pipe();
578
580 [[nodiscard]] bool unpipe();
581
584 Result start();
585
586 // Internal state used by the pipeline implementation.
587 AsyncReadableStream* dispatchReadable = nullptr;
588 AsyncReadableStream* transformInputs[MaxTransforms] = {nullptr};
589
591 {
592 AsyncReadableStream* readable = nullptr;
593 AsyncWritableStream* writable = nullptr;
594 AsyncBufferView::ID bufferID;
595 };
596 PendingWrite pendingWrites[MaxTransforms + MaxSinks] = {};
597 bool shouldEndWhenDrained = false;
598 bool endingPipes = false;
599
600 // TODO: Add a pause and cancel/step
601 private:
602 void emitError(Result res);
603 Result checkBuffersPool();
604 Result chainTransforms(AsyncReadableStream*& readable);
605 Result validate();
606
607 void asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncReadableStream& readable, AsyncWritableStream& writable);
608 void dispatchToPipes(AsyncBufferView::ID bufferID);
609 void endPipes();
610 void afterSinkEnd();
611 void afterWrite(AsyncBufferView::ID bufferID);
612 void dispatchToTransform(AsyncBufferView::ID bufferID, size_t transformIndex);
613 bool retryPendingWrites();
614 bool hasPendingWrites() const;
615 bool hasPendingWritesForReadable(const AsyncReadableStream& readable) const;
616 void releasePendingWrites();
617 bool listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen);
618
619 PendingWrite* findPendingWrite(AsyncReadableStream& readable, AsyncWritableStream& writable);
620};
621} // namespace SC
Definition AsyncStreams.h:66
A Span of bytes memory to be read or written by async streams.
Definition AsyncStreams.h:64
AsyncBufferView(T &&t)
Saves a copy (or a moved instance) of a String / Buffer (or anything that works with GrowableBuffer<T...
Definition AsyncStreams.h:111
void setReusable(bool reusable)
Tags this AsyncBufferView as reusable after its refCount goes to zero.
Definition AsyncStreams.h:105
Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams.
Definition AsyncStreams.h:177
Result getReadableData(AsyncBufferView::ID bufferID, Span< const char > &data)
Access data span owned by the buffer.
void setBuffers(Span< AsyncBufferView > newBuffers)
Sets memory for the new buffers.
Definition AsyncStreams.h:207
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
AsyncBufferView * getBuffer(AsyncBufferView::ID bufferID)
Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid)
Result createChildView(AsyncBufferView::ID parentBufferID, size_t offset, size_t length, AsyncBufferView::ID &outChildBufferID)
Creates a child view that references a slice of the parent buffer.
void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes)
Sets the new size in bytes for the buffer.
Result getWritableData(AsyncBufferView::ID bufferID, Span< char > &data)
Access data span owned by the buffer.
static Result sliceInEqualParts(Span< AsyncBufferView > buffers, Span< char > memory, size_t numSlices)
Splits a span of memory in equally sized slices, assigning them to buffers and marking them as reusab...
Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount.
Result pushBuffer(AsyncBufferView &&buffer, AsyncBufferView::ID &bufferID)
Adds a buffer to the pool in any empty slot (found by scanning from start to end)
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
size_t getNumBuffers() const
Gets size of buffers held by the pool.
Definition AsyncStreams.h:210
A stream that can both produce and consume buffers.
Definition AsyncStreams.h:492
virtual Result asyncRead() override
Function that every stream must define to implement its custom read operation.
Definition AsyncStreams.h:591
Pipes read data from SC::AsyncReadableStream, forwarding them to SC::AsyncWritableStream.
Definition AsyncStreams.h:558
Result pipe()
Reports errors by source, transforms or sinks.
bool unpipe()
Unregisters all events from source, transforms and sinks.
Result start()
Starts the pipeline.
Definition AsyncStreams.h:233
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:228
bool getAutoDestroy() const
Returns true if stream will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:307
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
void resumeReading()
Resumes the readable stream paused by AsyncReadableStream::pause.
bool push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
Event< MaxListeners > eventEnd
Emitted when a new buffer has been read.
Definition AsyncStreams.h:241
bool hasBeenDestroyed() const
Returns true if the stream has been already destroyed (asynchronously through destroy())
Definition AsyncStreams.h:269
Result finishedDestroyingReadable()
Called from inside asyncDestroy to transition from Destroying to Destroyed state (emitting eventClose...
virtual Result asyncResumeReading()
Function that streams may define to resume an already pending async read.
bool hasQueuedData() const
Returns true if there are buffered reads waiting to be emitted.
Definition AsyncStreams.h:266
Event< MaxListeners > eventClose
Emitted when there is no more data.
Definition AsyncStreams.h:242
Event< MaxListeners, AsyncBufferView::ID > eventData
Emitted when an error occurs.
Definition AsyncStreams.h:240
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
size_t getReadQueueSize() const
Returns the size of read queue.
Definition AsyncStreams.h:281
void setAutoDestroy(bool value)
If set to true will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:304
virtual Result asyncDestroyReadable()
Function that a readable stream can re-implement to release its internal resources.
void destroy()
Forcefully destroys the readable stream before calling end event releasing all resources.
virtual Result asyncRead()=0
Function that every stream must define to implement its custom read operation.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
bool isEnded() const
Returns true if the stream is ended (AsyncReadableStream::end has been called)
Definition AsyncStreams.h:263
constexpr void setReadQueue(Span< Request > requests)
Sets the read queue for this readable stream.
Definition AsyncStreams.h:278
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void reactivate(bool doReactivate)
Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the state machine to ...
void pause()
Pauses the readable stream (that can be later resumed)
bool canStart() const
Returns true when start() is currently valid for this stream.
Definition AsyncStreams.h:272
Result unshift(AsyncBufferView::ID bufferID)
Push back a buffer to the front of the read queue (e.g. for un-consumed data)
Result start()
Starts the readable stream, that will emit eventData.
Traits exposing AsyncStreams public types through a single template parameter.
Definition AsyncStreams.h:542
A duplex stream that produces new buffers transforming received buffers.
Definition AsyncStreams.h:503
Definition AsyncStreams.h:371
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:364
size_t getWriteQueueSize() const
Returns the size of write queue.
Definition AsyncStreams.h:393
Event< MaxListeners > eventFinish
Emitted when write queue is empty.
Definition AsyncStreams.h:381
Result write(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> cb={})
Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) When t...
void end()
Ends the writable stream, waiting for all in-flight and queued writes to finish.
void destroy()
Forcefully destroys the writable stream before calling end event releasing all resources.
void emitError(Result error)
Signals an async error received.
void finishedDestroyingWritable()
Function that MUST be called by re-implementations of asyncDestroyWritable once they're done.
Event< MaxListeners > eventDrain
Emitted when an error occurs.
Definition AsyncStreams.h:380
virtual bool canEndWritable()
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
virtual Result asyncWrite(AsyncBufferView::ID, Function< void(AsyncBufferView::ID)> func)=0
Function that every stream must define to implement its custom write operation.
bool isStillWriting() const
Returns true if this stream is writing something.
Definition AsyncStreams.h:434
void resumeWriting()
Resumes writing queued requests for this stream.
virtual Result asyncDestroyWritable()
Function that a writable stream can re-implement to release its internal resources.
void setAutoDestroy(bool value)
If set to true will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:440
bool hasBeenDestroyed() const
Returns true if the stream has been already destroyed (asynchronously through destroy())
Definition AsyncStreams.h:437
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
bool getAutoDestroy() const
Returns true if stream will automatically call .destroy() when Ended state is reached.
Definition AsyncStreams.h:443
void tryAsync(Result potentialError)
Will emit error if the passed in Result is false.
Result unshift(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb)
Puts back a buffer at the top of the write queue.
Result write(AsyncBufferView &&bufferView, Function< void(AsyncBufferView::ID)> cb={})
Push a new buffer view to the queue, registering it with the allocator.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
Event< MaxListeners > eventClose
Emitted when no more data can be written.
Definition AsyncStreams.h:382
constexpr void setWriteQueue(Span< Request > requests)
Sets the write queue for this writable stream.
Definition AsyncStreams.h:390