Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
AsyncRequestStreams.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4#include "../Foundation/Assert.h"
5#include "AsyncStreams.h"
8namespace SC
9{
10struct AsyncResult;
11
12template <typename AsyncRequestType, typename AsyncEventLoopType>
14{
16
18 void setAutoCloseDescriptor(bool value) { autoCloseDescriptor = value; }
19
20 AsyncRequestType request;
21
22 protected:
25
26 AsyncEventLoopType* eventLoop = nullptr;
27 BufferViewID bufferID;
28 bool autoCloseDescriptor = false;
29
30 virtual Result asyncRead() override
31 {
32 SC_ASSERT_RELEASE(request.isFree());
33 if (this->getBufferOrPause(0, bufferID, request.buffer))
34 {
35 request.callback.template bind<Self, &Self::afterRead>(*this);
36 SC_TRY_MSG(eventLoop != nullptr, "AsyncRequestReadableStream eventLoop == nullptr");
37 const Result startResult = request.start(*eventLoop);
38 if (not startResult)
39 {
40 this->getBuffersPool().unrefBuffer(bufferID);
41 bufferID = {};
42 return startResult; // Error occurred during request start
43 }
44 }
45 return Result(true);
46 }
47
48 virtual Result asyncDestroyReadable() override
49 {
50 if (request.isFree())
51 {
52 finalizeReadableDestruction();
53 return Result(true);
54 }
55 else
56 {
57 return request.stop(*eventLoop, &getStopCallback());
58 }
59 }
60
61 void afterRead(typename AsyncRequestType::Result& result)
62 {
63 Span<char> data;
64 if (result.get(data))
65 {
66 SC_ASSERT_RELEASE(request.isFree());
67 if (result.isEnded())
68 {
69 this->getBuffersPool().unrefBuffer(bufferID);
70 bufferID = {};
71 this->pushEnd();
72 }
73 else
74 {
75 const bool continuePushing = this->push(bufferID, data.sizeInBytes());
76 SC_ASSERT_RELEASE(result.getAsync().isFree());
77 // Only unref if destroy() wasn't called during push callback (which would have already unref'd)
78 if (not this->hasBeenDestroyed())
79 {
80 this->getBuffersPool().unrefBuffer(bufferID);
81 bufferID = {};
82 }
83 // Check if we're still pushing (so not, paused, destroyed or errored etc.)
84 if (continuePushing)
85 {
86 if (this->getBufferOrPause(0, bufferID, result.getAsync().buffer))
87 {
88 request.callback.template bind<Self, &Self::afterRead>(*this);
89 result.reactivateRequest(true);
90 }
91 }
92 }
93 }
94 else
95 {
96 this->getBuffersPool().unrefBuffer(bufferID);
97 bufferID = {};
98 this->emitError(result.isValid());
99 }
100 }
101
102 void finalizeReadableDestruction()
103 {
104 if (bufferID.isValid())
105 {
106 this->getBuffersPool().unrefBuffer(bufferID);
107 bufferID = {};
108 }
109 if (autoCloseDescriptor)
110 {
111 SC_ASSERT_RELEASE(request.closeHandle());
112 }
114 request = {};
115 }
116
117 template <typename T_AsyncResult>
118 static void stopReadableCallback(T_AsyncResult& result)
119 {
121 Self& stream = SC_COMPILER_FIELD_OFFSET(Self, request, static_cast<AsyncRequestType&>(result.async));
122 stream.finalizeReadableDestruction();
124 }
125
126 private:
127 // clang-format off
128 static Function<void(AsyncResult&)>& getStopCallback() { static Function<void(AsyncResult&)> cb = &stopReadableCallback<AsyncResult>; return cb; }
129 // clang-format on
130
131 public:
132 template <typename DescriptorType>
133 Result init(AsyncBuffersPool& buffersPool, AsyncEventLoopType& loop, const DescriptorType& descriptor)
134 {
135 SC_TRY_MSG(not request.isCancelling(), "AsyncRequestReadableStream - Destroy in progress");
136 this->eventLoop = &loop;
137 SC_TRY(descriptor.get(this->request.handle, Result::Error("Missing descriptor")));
138 return AsyncReadableStream::init(buffersPool);
139 }
140};
141
142template <typename AsyncRequestType, typename AsyncEventLoopType>
144{
146
148 void setAutoCloseDescriptor(bool value) { autoCloseDescriptor = value; }
149
150 AsyncRequestType request;
151
152 protected:
155
156 AsyncEventLoopType* eventLoop = nullptr;
157 BufferViewID bufferID;
158 bool autoCloseDescriptor = false;
159
160 Function<void(BufferViewID)> callback;
161
162 virtual Result asyncWrite(BufferViewID newBufferID, Function<void(BufferViewID)> cb) override
163 {
164 bufferID = newBufferID;
165 SC_ASSERT_RELEASE(not callback.isValid());
166 callback = move(cb);
167 SC_TRY(this->getBuffersPool().getReadableData(bufferID, request.buffer));
168 request.callback.template bind<Self, &Self::afterWrite>(*this);
169 SC_TRY_MSG(eventLoop != nullptr, "AsyncRequestWritableStream eventLoop == nullptr");
170 const Result res = request.start(*eventLoop);
171 if (res)
172 {
173 this->getBuffersPool().refBuffer(bufferID);
174 }
175 return res;
176 }
177
178 virtual Result asyncDestroyWritable() override
179 {
180 if (request.isFree())
181 {
182 finalizeWritableDestruction();
183 return Result(true);
184 }
185 else
186 {
187 return request.stop(*eventLoop, &getStopCallback());
188 }
189 }
190
191 virtual bool canEndWritable() override { return request.isFree(); }
192
193 void afterWrite(typename AsyncRequestType::Result& result)
194 {
195 BufferViewID savedBufferID = bufferID;
196 this->getBuffersPool().unrefBuffer(bufferID);
197 bufferID = {};
198 auto cb = move(callback);
199 callback = {};
200 this->finishedWriting(savedBufferID, move(cb), result.isValid());
201 }
202
203 void finalizeWritableDestruction()
204 {
205 if (autoCloseDescriptor)
206 {
207 SC_ASSERT_RELEASE(request.closeHandle());
208 }
209 request = {};
211 }
212
213 template <typename T_AsyncResult>
214 static void stopWritableCallback(T_AsyncResult& result)
215 {
217 Self& stream = SC_COMPILER_FIELD_OFFSET(Self, request, static_cast<AsyncRequestType&>(result.async));
218 stream.finalizeWritableDestruction();
220 }
221
222 private:
223 // clang-format off
224 static Function<void(AsyncResult&)>& getStopCallback() { static Function<void(AsyncResult&)> cb = &stopWritableCallback<AsyncResult>; return cb; }
225 // clang-format on
226 public:
227 template <typename DescriptorType>
228 Result init(AsyncBuffersPool& buffersPool, AsyncEventLoopType& loop, const DescriptorType& descriptor)
229 {
230 this->eventLoop = &loop;
231 SC_TRY(descriptor.get(this->request.handle, Result::Error("Missing descriptor")));
232 return AsyncWritableStream::init(buffersPool);
233 }
234};
235
236// clang-format off
238template <typename AsyncEventLoopType> struct SC_COMPILER_EXPORT AsyncReadableFileStream : public AsyncRequestReadableStream<typename AsyncEventLoopType::FileRead, AsyncEventLoopType>{};
240template <typename AsyncEventLoopType> struct SC_COMPILER_EXPORT AsyncWritableFileStream : public AsyncRequestWritableStream<typename AsyncEventLoopType::FileWrite, AsyncEventLoopType>{};
242template <typename AsyncEventLoopType> struct SC_COMPILER_EXPORT AsyncReadableSocketStream : public AsyncRequestReadableStream<typename AsyncEventLoopType::SocketReceive, AsyncEventLoopType>{};
244template <typename AsyncEventLoopType> struct SC_COMPILER_EXPORT AsyncWritableSocketStream : public AsyncRequestWritableStream<typename AsyncEventLoopType::SocketSend, AsyncEventLoopType>{};
245// clang-format on
246
247} // namespace SC
#define SC_COMPILER_WARNING_POP
Pops warning from inside a macro.
Definition Compiler.h:110
#define SC_ASSERT_RELEASE(e)
Assert expression e to be true.
Definition Assert.h:48
#define SC_COMPILER_WARNING_PUSH_OFFSETOF
Disables invalid-offsetof gcc and clang warning.
Definition Compiler.h:130
constexpr T && move(T &value)
Converts an lvalue to an rvalue reference.
Definition Compiler.h:264
#define SC_TRY_MSG(expression, failedMessage)
Checks the value of the given expression and if failed, returns a result with failedMessage to caller...
Definition Result.h:60
#define SC_TRY(expression)
Checks the value of the given expression and if failed, returns this value to caller.
Definition Result.h:49
struct SC_COMPILER_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
Definition AsyncStreams.h:52
void refBuffer(AsyncBufferView::ID bufferID)
Increments a buffer reference count.
void unrefBuffer(AsyncBufferView::ID bufferID)
Decrements a buffer reference count.
Uses an SC::AsyncFileRead to stream data from a file.
Definition AsyncRequestStreams.h:238
Uses an SC::AsyncSocketReceive to stream data from a socket.
Definition AsyncRequestStreams.h:242
Async source abstraction emitting data events in caller provided byte buffers.
Definition AsyncStreams.h:214
void pushEnd()
Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end.
void emitError(Result error)
Signals an async error received.
bool push(AsyncBufferView::ID bufferID, size_t newSize)
Use push from inside AsyncReadableStream::asyncRead function to queue received data.
bool hasBeenDestroyed() const
Returns true if the stream has been already destroyed (asynchronously through destroy())
Definition AsyncStreams.h:252
Result finishedDestroyingReadable()
Called from inside asyncDestroy to transition from Destroying to Destroyed state (emitting eventClose...
AsyncBuffersPool & getBuffersPool()
Obtains the AsyncBuffersPool to request more buffers.
bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID &bufferID, Span< char > &data)
Returns an unused buffer from pool or pauses the stream if none is available.
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
Definition AsyncRequestStreams.h:14
virtual Result asyncDestroyReadable() override
Function that a readable stream can re-implement to release its internal resources.
Definition AsyncRequestStreams.h:48
void setAutoCloseDescriptor(bool value)
Automatically closes descriptor during read stream close event.
Definition AsyncRequestStreams.h:18
virtual Result asyncRead() override
Function that every stream must define to implement its custom read operation.
Definition AsyncRequestStreams.h:30
Definition AsyncRequestStreams.h:144
void setAutoCloseDescriptor(bool value)
Automatically closes descriptor during write stream finish event.
Definition AsyncRequestStreams.h:148
virtual Result asyncDestroyWritable() override
Function that a writable stream can re-implement to release its internal resources.
Definition AsyncRequestStreams.h:178
virtual bool canEndWritable() override
Allows keeping a writable in ENDING state until it has finished flushing all pending data.
Definition AsyncRequestStreams.h:191
AsyncBufferView::ID BufferViewID
AsyncFileRead / AsyncFileWrite / AsyncSocketReceive / AsyncSocketSend.
Definition AsyncRequestStreams.h:153
virtual Result asyncWrite(BufferViewID newBufferID, Function< void(BufferViewID)> cb) override
Function that every stream must define to implement its custom write operation.
Definition AsyncRequestStreams.h:162
Uses an SC::AsyncFileWrite to stream data to a file.
Definition AsyncRequestStreams.h:240
Uses an SC::AsyncSocketSend to stream data to a socket.
Definition AsyncRequestStreams.h:244
Async destination abstraction where bytes can be written to.
Definition AsyncStreams.h:341
void finishedDestroyingWritable()
Function that MUST be called by re-implementations of asyncDestroyWritable once they're done.
Result init(AsyncBuffersPool &buffersPool)
Emitted when the underlying resource has been closed.
void finishedWriting(AsyncBufferView::ID bufferID, Function< void(AsyncBufferView::ID)> &&cb, Result res)
Signals that the given buffer (previously queued by write) has been fully written.
AsyncBuffersPool & getBuffersPool()
Obtains the buffers pool to access its data.
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
static constexpr Result Error(const char(&msg)[numChars])
Constructs an Error from a pointer to an ASCII string literal.
Definition Result.h:25
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29