4#include "../Common/CompilerOffsetOf.h"
5#include "AsyncStreams.h"
12template <
typename AsyncRequestType,
typename AsyncEventLoopType>
20 AsyncRequestType request;
26 AsyncEventLoopType* eventLoop =
nullptr;
28 bool autoCloseDescriptor =
false;
32 SC_ASYNC_STREAMS_ASSERT_RELEASE(request.isFree());
35 request.callback.template bind<Self, &Self::afterRead>(*
this);
36 SC_TRY_MSG(eventLoop !=
nullptr,
"AsyncRequestReadableStream eventLoop == nullptr");
37 const Result startResult = request.start(*eventLoop);
52 finalizeReadableDestruction();
57 return request.stop(*eventLoop, &getStopCallback());
61 void afterRead(
typename AsyncRequestType::Result& result)
66 SC_ASYNC_STREAMS_ASSERT_RELEASE(request.isFree());
75 const bool continuePushing = this->
push(bufferID, data.sizeInBytes());
76 SC_ASYNC_STREAMS_ASSERT_RELEASE(result.getAsync().isFree());
88 request.callback.template bind<Self, &Self::afterRead>(*
this);
89 result.reactivateRequest(
true);
102 void finalizeReadableDestruction()
104 if (bufferID.isValid())
109 if (autoCloseDescriptor)
111 SC_ASYNC_STREAMS_ASSERT_RELEASE(request.closeHandle());
117 template <
typename T_AsyncResult>
118 static void stopReadableCallback(T_AsyncResult& result)
120 SC_COMPILER_WARNING_PUSH_OFFSETOF;
121 Self& stream = SC_COMPILER_FIELD_OFFSET(Self, request,
static_cast<AsyncRequestType&
>(result.async));
122 stream.finalizeReadableDestruction();
123 SC_COMPILER_WARNING_POP_OFFSETOF;
128 static Function<void(AsyncResult&)>& getStopCallback() {
static Function<void(AsyncResult&)> cb = &stopReadableCallback<AsyncResult>;
return cb; }
132 template <
typename DescriptorType>
133 Result init(AsyncBuffersPool& buffersPool, AsyncEventLoopType& loop,
const DescriptorType& descriptor)
135 SC_TRY_MSG(not request.isCancelling(),
"AsyncRequestReadableStream - Destroy in progress");
136 this->eventLoop = &loop;
137 SC_TRY(descriptor.get(this->request.handle, Result::Error(
"Missing descriptor")));
142template <
typename AsyncRequestType,
typename AsyncEventLoopType>
150 AsyncRequestType request;
156 AsyncEventLoopType* eventLoop =
nullptr;
158 bool autoCloseDescriptor =
false;
164 bufferID = newBufferID;
165 SC_ASYNC_STREAMS_ASSERT_RELEASE(not callback.isValid());
167 SC_TRY(this->
getBuffersPool().getReadableData(bufferID, request.buffer));
168 request.callback.template bind<Self, &Self::afterWrite>(*
this);
169 SC_TRY_MSG(eventLoop !=
nullptr,
"AsyncRequestWritableStream eventLoop == nullptr");
170 const Result res = request.start(*eventLoop);
180 if (request.isFree())
182 finalizeWritableDestruction();
187 return request.stop(*eventLoop, &getStopCallback());
193 void afterWrite(
typename AsyncRequestType::Result& result)
198 auto cb = move(callback);
203 void finalizeWritableDestruction()
205 if (autoCloseDescriptor)
207 SC_ASYNC_STREAMS_ASSERT_RELEASE(request.closeHandle());
213 template <
typename T_AsyncResult>
214 static void stopWritableCallback(T_AsyncResult& result)
216 SC_COMPILER_WARNING_PUSH_OFFSETOF;
217 Self& stream = SC_COMPILER_FIELD_OFFSET(Self, request,
static_cast<AsyncRequestType&
>(result.async));
218 stream.finalizeWritableDestruction();
219 SC_COMPILER_WARNING_POP_OFFSETOF;
224 static Function<void(AsyncResult&)>& getStopCallback() {
static Function<void(AsyncResult&)> cb = &stopWritableCallback<AsyncResult>;
return cb; }
227 template <
typename DescriptorType>
228 Result init(AsyncBuffersPool& buffersPool, AsyncEventLoopType& loop,
const DescriptorType& descriptor)
230 this->eventLoop = &loop;
231 SC_TRY(descriptor.get(this->request.handle, Result::Error(
"Missing descriptor")));
Definition AsyncStreams.h:66
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
Uses an SC::AsyncFileRead to stream data from a file.
Definition AsyncRequestStreams.h:238
Uses an SC::AsyncSocketReceive to stream data from a socket.
Definition AsyncRequestStreams.h:242
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:228
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
bool push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
bool hasBeenDestroyed() const
Returns true if the stream has been already destroyed (asynchronously through destroy())
Definition AsyncStreams.h:269
Result finishedDestroyingReadable()
Called from inside asyncDestroy to transition from Destroying to Destroyed state (emitting eventClose...
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
Definition AsyncRequestStreams.h:14
virtual Result asyncDestroyReadable() override
Function that a readable stream can re-implement to release its internal resources.
Definition AsyncRequestStreams.h:48
void setAutoCloseDescriptor(bool value)
Automatically closes descriptor during read stream close event.
Definition AsyncRequestStreams.h:18
virtual Result asyncRead() override
Function that every stream must define to implement its custom read operation.
Definition AsyncRequestStreams.h:30
Definition AsyncRequestStreams.h:144
void setAutoCloseDescriptor(bool value)
Automatically closes descriptor during write stream finish event.
Definition AsyncRequestStreams.h:148
virtual Result asyncDestroyWritable() override
Function that a writable stream can re-implement to release its internal resources.
Definition AsyncRequestStreams.h:178
virtual bool canEndWritable() override
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
Definition AsyncRequestStreams.h:191
AsyncBufferView::ID BufferViewID
AsyncFileRead / AsyncFileWrite / AsyncSocketReceive / AsyncSocketSend.
Definition AsyncRequestStreams.h:153
virtual Result asyncWrite(BufferViewID newBufferID, Function< void(BufferViewID)> cb) override
Function that every stream must define to implement its custom write operation.
Definition AsyncRequestStreams.h:162
Uses an SC::AsyncFileWrite to stream data to a file.
Definition AsyncRequestStreams.h:240
Uses an SC::AsyncSocketSend to stream data to a socket.
Definition AsyncRequestStreams.h:244
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:364
void finishedDestroyingWritable()
Function that MUST be called by re-implementations of asyncDestroyWritable once they're done.
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.