4#include "../Foundation/Assert.h"
5#include "AsyncStreams.h"
12template <
typename AsyncRequestType,
typename AsyncEventLoopType>
20 AsyncRequestType request;
26 AsyncEventLoopType* eventLoop =
nullptr;
28 bool autoCloseDescriptor =
false;
35 request.callback.template bind<Self, &Self::afterRead>(*
this);
36 SC_TRY_MSG(eventLoop !=
nullptr,
"AsyncRequestReadableStream eventLoop == nullptr");
37 const Result startResult = request.start(*eventLoop);
52 finalizeReadableDestruction();
57 return request.stop(*eventLoop, &getStopCallback());
61 void afterRead(
typename AsyncRequestType::Result& result)
75 const bool continuePushing = this->
push(bufferID, data.sizeInBytes());
88 request.callback.template bind<Self, &Self::afterRead>(*
this);
89 result.reactivateRequest(
true);
102 void finalizeReadableDestruction()
104 if (bufferID.isValid())
109 if (autoCloseDescriptor)
117 template <
typename T_AsyncResult>
118 static void stopReadableCallback(T_AsyncResult& result)
121 Self& stream = SC_COMPILER_FIELD_OFFSET(Self, request,
static_cast<AsyncRequestType&
>(result.async));
122 stream.finalizeReadableDestruction();
128 static Function<void(AsyncResult&)>& getStopCallback() {
static Function<void(AsyncResult&)> cb = &stopReadableCallback<AsyncResult>;
return cb; }
132 template <
typename DescriptorType>
133 Result init(AsyncBuffersPool& buffersPool, AsyncEventLoopType& loop,
const DescriptorType& descriptor)
135 SC_TRY_MSG(not request.isCancelling(),
"AsyncRequestReadableStream - Destroy in progress");
136 this->eventLoop = &loop;
142template <
typename AsyncRequestType,
typename AsyncEventLoopType>
150 AsyncRequestType request;
156 AsyncEventLoopType* eventLoop =
nullptr;
158 bool autoCloseDescriptor =
false;
164 bufferID = newBufferID;
168 request.callback.template bind<Self, &Self::afterWrite>(*
this);
169 SC_TRY_MSG(eventLoop !=
nullptr,
"AsyncRequestWritableStream eventLoop == nullptr");
170 const Result res = request.start(*eventLoop);
180 if (request.isFree())
182 finalizeWritableDestruction();
187 return request.stop(*eventLoop, &getStopCallback());
193 void afterWrite(
typename AsyncRequestType::Result& result)
198 auto cb =
move(callback);
203 void finalizeWritableDestruction()
205 if (autoCloseDescriptor)
213 template <
typename T_AsyncResult>
214 static void stopWritableCallback(T_AsyncResult& result)
217 Self& stream = SC_COMPILER_FIELD_OFFSET(Self, request,
static_cast<AsyncRequestType&
>(result.async));
218 stream.finalizeWritableDestruction();
224 static Function<void(AsyncResult&)>& getStopCallback() {
static Function<void(AsyncResult&)> cb = &stopWritableCallback<AsyncResult>;
return cb; }
227 template <
typename DescriptorType>
228 Result init(AsyncBuffersPool& buffersPool, AsyncEventLoopType& loop,
const DescriptorType& descriptor)
230 this->eventLoop = &loop;
#define SC_COMPILER_WARNING_POP
Pops warning from inside a macro.
Definition Compiler.h:110
#define SC_ASSERT_RELEASE(e)
Assert expression e to be true.
Definition Assert.h:48
#define SC_COMPILER_WARNING_PUSH_OFFSETOF
Disables invalid-offsetof gcc and clang warning.
Definition Compiler.h:130
constexpr T && move(T &value)
Converts an lvalue to an rvalue reference.
Definition Compiler.h:264
#define SC_TRY_MSG(expression, failedMessage)
Checks the value of the given expression and if failed, returns a result with failedMessage to caller...
Definition Result.h:60
#define SC_TRY(expression)
Checks the value of the given expression and if failed, returns this value to caller.
Definition Result.h:49
struct SC_COMPILER_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
Definition AsyncStreams.h:52
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
Uses an SC::AsyncFileRead to stream data from a file.
Definition AsyncRequestStreams.h:238
Uses an SC::AsyncSocketReceive to stream data from a socket.
Definition AsyncRequestStreams.h:242
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:214
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
bool push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
bool hasBeenDestroyed() const
Returns true if the stream has been already destroyed (asynchronously through destroy())
Definition AsyncStreams.h:252
Result finishedDestroyingReadable()
Called from inside asyncDestroy to transition from Destroying to Destroyed state (emitting eventClose...
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
Definition AsyncRequestStreams.h:14
virtual Result asyncDestroyReadable() override
Function that a readable stream can re-implement to release its internal resources.
Definition AsyncRequestStreams.h:48
void setAutoCloseDescriptor(bool value)
Automatically closes descriptor during read stream close event.
Definition AsyncRequestStreams.h:18
virtual Result asyncRead() override
Function that every stream must define to implement its custom read operation.
Definition AsyncRequestStreams.h:30
Definition AsyncRequestStreams.h:144
void setAutoCloseDescriptor(bool value)
Automatically closes descriptor during write stream finish event.
Definition AsyncRequestStreams.h:148
virtual Result asyncDestroyWritable() override
Function that a writable stream can re-implement to release its internal resources.
Definition AsyncRequestStreams.h:178
virtual bool canEndWritable() override
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
Definition AsyncRequestStreams.h:191
AsyncBufferView::ID BufferViewID
AsyncFileRead / AsyncFileWrite / AsyncSocketReceive / AsyncSocketSend.
Definition AsyncRequestStreams.h:153
virtual Result asyncWrite(BufferViewID newBufferID, Function< void(BufferViewID)> cb) override
Function that every stream must define to implement its custom write operation.
Definition AsyncRequestStreams.h:162
Uses an SC::AsyncFileWrite to stream data to a file.
Definition AsyncRequestStreams.h:240
Uses an SC::AsyncSocketSend to stream data to a socket.
Definition AsyncRequestStreams.h:244
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:341
void finishedDestroyingWritable()
Function that MUST be called by re-implementations of asyncDestroyWritable once they're done.
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
static constexpr Result Error(const char(&msg)[numChars])
Constructs an Error from a pointer to an ASCII string literal.
Definition Result.h:25
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29