Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
HttpClientAsync.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../HttpClient/HttpClient.h"
6#include "../HttpClient/Internal/HttpClientEvent.h"
7#include "../HttpClient/Internal/HttpClientThreading.h"
8
9#include <string.h>
10
11namespace SC
12{
15
18template <typename T_AsyncStreams>
19struct SC_HTTP_CLIENT_EXPORT HttpClientAsyncOperationMemoryT
20{
21 using T_AsyncBufferView = typename T_AsyncStreams::BufferView;
22 using T_AsyncReadableStream = typename T_AsyncStreams::ReadableStream;
23 using T_AsyncWritableStream = typename T_AsyncStreams::WritableStream;
24 using T_AsyncReadableRequest = typename T_AsyncReadableStream::Request;
25 using T_AsyncWritableRequest = typename T_AsyncWritableStream::Request;
26
27 Span<T_AsyncBufferView> responseBuffers;
28 Span<char> responseBufferMemory;
29 Span<T_AsyncReadableRequest> responseReadQueue;
30 Span<T_AsyncWritableRequest> requestWriteQueue;
31};
32
33template <typename T_AsyncEventLoop, typename T_AsyncStreams>
37{
38 using T_AsyncLoopWakeUp = typename T_AsyncEventLoop::LoopWakeUp;
39 using T_AsyncBufferView = typename T_AsyncStreams::BufferView;
40 using T_AsyncBuffersPool = typename T_AsyncStreams::BuffersPool;
41 using T_AsyncReadableStream = typename T_AsyncStreams::ReadableStream;
42 using T_AsyncWritableStream = typename T_AsyncStreams::WritableStream;
43 using T_AsyncReadableRequest = typename T_AsyncReadableStream::Request;
44 using T_AsyncWritableRequest = typename T_AsyncWritableStream::Request;
45
46 static constexpr int MaxListeners = 8;
47
48 struct ResponseBodyStream final : public T_AsyncReadableStream
49 {
50 virtual Result asyncRead() override { return Result(true); }
51 };
52
53 struct RequestBodySink final : public T_AsyncWritableStream
54 {
55 Result init(T_AsyncBuffersPool& buffersPool, Span<T_AsyncWritableRequest> writeQueueSpan,
56 HttpClientAsyncT& ownerValue)
57 {
58 owner = &ownerValue;
59 this->setWriteQueue(writeQueueSpan);
60 return T_AsyncWritableStream::init(buffersPool);
61 }
62
63 private:
64 virtual Result asyncWrite(typename T_AsyncBufferView::ID bufferID,
65 Function<void(typename T_AsyncBufferView::ID)> cb) override
66 {
67 SC_TRY_MSG(owner != nullptr, "HttpClientAsyncT::RequestBodySink missing owner");
68 owner->onRequestBodyWritableBuffer(bufferID, move(cb));
69 return Result(true);
70 }
71
72 virtual Result asyncDestroyWritable() override
73 {
74 if (owner != nullptr and owner->operation.isRequestInFlight() and not owner->requestBodyFinished)
75 {
76 owner->onRequestBodyWritableError(Result::Error("HttpClientAsyncT: request body sink destroyed"));
77 }
78 T_AsyncWritableStream::finishedDestroyingWritable();
79 return Result(true);
80 }
81
82 virtual bool canEndWritable() override
83 {
84 if (owner != nullptr)
85 {
86 owner->onRequestBodyWritableFinished();
87 }
88 return true;
89 }
90
91 HttpClientAsyncT* owner = nullptr;
92 };
93
94 HttpClientEvent<MaxListeners, HttpClientResponse&> eventResponseHead;
95
96 Result init(HttpClient& client, T_AsyncEventLoop& loop, const HttpClientOperationMemory& operationMemory,
98 {
99 SC_TRY_MSG(eventLoop == nullptr, "HttpClientAsyncT: already initialized");
100 SC_TRY_MSG(asyncMemory.responseBuffers.sizeInElements() > 0, "HttpClientAsyncT: response buffers missing");
101 SC_TRY_MSG(asyncMemory.responseReadQueue.sizeInElements() > 0, "HttpClientAsyncT: response read queue missing");
102 SC_TRY_MSG(asyncMemory.requestWriteQueue.sizeInElements() > 0, "HttpClientAsyncT: request write queue missing");
103
104 eventLoop = &loop;
105 responseReadQueue = asyncMemory.responseReadQueue;
106 requestWriteQueue = asyncMemory.requestWriteQueue;
107
108 responseBuffersPool.setBuffers(asyncMemory.responseBuffers);
109 if (asyncMemory.responseBufferMemory.sizeInBytes() > 0)
110 {
111 SC_TRY(T_AsyncBuffersPool::sliceInEqualParts(asyncMemory.responseBuffers, asyncMemory.responseBufferMemory,
112 asyncMemory.responseBuffers.sizeInElements()));
113 }
114
115 responseBodyStream.setReadQueue(responseReadQueue);
116 responseBodyStream.setAutoDestroy(false);
117 requestBodySink.setAutoDestroy(false);
118 wakeUp.callback = [this](typename T_AsyncLoopWakeUp::Result& result) { onWakeUp(result); };
119
120 SC_TRY(operation.init(client, operationMemory));
121 operation.setNotifier(this);
122 initialized = true;
123 return Result(true);
124 }
125
126 Result close()
127 {
128 if (not initialized)
129 {
130 return Result(true);
131 }
132
133 if (wakeUpStarted and eventLoop != nullptr)
134 {
135 (void)wakeUp.stop(*eventLoop);
136 wakeUpStarted = false;
137 }
138
139 if (requestBodySink.isStillWriting() or not requestBodySink.hasBeenDestroyed())
140 {
141 requestBodySink.destroy();
142 }
143 responseBodyStream.destroy();
144 resetRequestBodyState();
145
146 SC_TRY(operation.close());
147 operation.setNotifier(nullptr);
148
149 eventLoop = nullptr;
150 requestBodyBuffersPool = nullptr;
151 initialized = false;
152 return Result(true);
153 }
154
155 Result cancel() { return operation.cancel(); }
156
157 Result start(const HttpClientRequest& request, HttpClientResponse& response,
158 T_AsyncBuffersPool* requestBodyPool = nullptr)
159 {
160 SC_TRY_MSG(initialized, "HttpClientAsyncT: not initialized");
161 if (request.streamedBodySize > 0)
162 {
163 SC_TRY_MSG(requestBodyPool != nullptr, "HttpClientAsyncT: streamed request body requires buffers pool");
164 }
165
166 requestBodyBuffersPool = requestBodyPool;
167 resetRequestBodyState();
168
169 responseBodyStream.setReadQueue(responseReadQueue);
170 SC_TRY(responseBodyStream.init(responseBuffersPool));
171 SC_TRY(responseBodyStream.start());
172
173 if (request.streamedBodySize > 0)
174 {
175 SC_TRY(requestBodySink.init(*requestBodyBuffersPool, requestWriteQueue, *this));
176 }
177
178 if (not wakeUpStarted)
179 {
180 SC_TRY(wakeUp.start(*eventLoop));
181 wakeUpStarted = true;
182 }
183
184 const Result res = operation.start(request, response, this, request.streamedBodySize > 0 ? this : nullptr);
185 if (not res)
186 {
187 if (request.streamedBodySize > 0)
188 {
189 requestBodySink.destroy();
190 }
191 responseBodyStream.destroy();
192 return res;
193 }
194 return Result(true);
195 }
196
197 [[nodiscard]] T_AsyncReadableStream& getResponseBodyStream() { return responseBodyStream; }
198 [[nodiscard]] T_AsyncWritableStream& getRequestBodySink() { return requestBodySink; }
199
200 [[nodiscard]] bool isInitialized() const { return initialized; }
201 [[nodiscard]] bool isRequestInFlight() const { return operation.isRequestInFlight(); }
202
203 private:
204 virtual void onResponseHead(HttpClientResponse& response) override { eventResponseHead.emit(response); }
205
206 virtual void onResponseBody(Span<const char> data) override
207 {
208 typename T_AsyncBufferView::ID bufferID;
209 Span<char> writable;
210 if (not responseBuffersPool.requestNewBuffer(data.sizeInBytes(), bufferID, writable))
211 {
212 responseBodyStream.emitError(Result::Error("HttpClientAsyncT: response buffer exhausted"));
213 (void)operation.cancel();
214 return;
215 }
216 memcpy(writable.data(), data.data(), data.sizeInBytes());
217 const bool continuePushing = responseBodyStream.push(bufferID, data.sizeInBytes());
218 responseBuffersPool.unrefBuffer(bufferID);
219 if (not continuePushing)
220 {
221 (void)operation.cancel();
222 }
223 }
224
225 virtual void onResponseComplete() override
226 {
227 responseBodyStream.pushEnd();
228 resetRequestBodyState();
229 }
230
231 virtual void onError(Result error) override
232 {
233 responseBodyStream.emitError(error);
234 if (requestBodyBuffersPool != nullptr)
235 {
236 requestBodySink.destroy();
237 }
238 resetRequestBodyState();
239 }
240
241 virtual Result pullRequestBody(Span<char> dest, size_t& bytesWritten, bool& endReached) override
242 {
243 bytesWritten = 0;
244 endReached = false;
245
246 requestBodyMutex.lock();
247 while (not requestBodyHasActive and not requestBodyFinished and not requestBodyErrorSet)
248 {
249 requestBodyCV.wait(requestBodyMutex);
250 }
251
252 if (requestBodyErrorSet)
253 {
254 const Result error = requestBodyError;
255 requestBodyMutex.unlock();
256 return error;
257 }
258
259 if (not requestBodyHasActive and requestBodyFinished)
260 {
261 requestBodyMutex.unlock();
262 endReached = true;
263 return Result(true);
264 }
265
266 const typename T_AsyncBufferView::ID activeBuffer = requestBodyActiveBuffer;
267 const size_t activeOffset = requestBodyActiveOffset;
268 requestBodyMutex.unlock();
269
270 SC_TRY_MSG(requestBodyBuffersPool != nullptr, "HttpClientAsyncT: missing request body buffers pool");
271
272 Span<const char> readable;
273 SC_TRY(requestBodyBuffersPool->getReadableData(activeBuffer, readable));
274
275 const size_t remaining = readable.sizeInBytes() - activeOffset;
276 bytesWritten = remaining < dest.sizeInBytes() ? remaining : dest.sizeInBytes();
277 if (bytesWritten > 0)
278 {
279 memcpy(dest.data(), readable.data() + activeOffset, bytesWritten);
280 }
281
282 requestBodyMutex.lock();
283 requestBodyActiveOffset += bytesWritten;
284 const bool consumedBuffer = requestBodyActiveOffset >= readable.sizeInBytes();
285 requestBodyMutex.unlock();
286
287 if (consumedBuffer)
288 {
289 finishRequestBodyActive(Result(true));
290 }
291
292 return Result(true);
293 }
294
295 virtual void notifyHttpClientOperation(HttpClientOperation&) override
296 {
297 if (wakeUpStarted and eventLoop != nullptr)
298 {
299 (void)wakeUp.wakeUp(*eventLoop);
300 }
301 }
302
303 void onWakeUp(typename T_AsyncLoopWakeUp::Result& result)
304 {
305 const Result pollResult = operation.poll();
306 if (not pollResult)
307 {
308 responseBodyStream.emitError(pollResult);
309 }
310 result.reactivateRequest(true);
311 }
312
313 void resetRequestBodyState()
314 {
315 requestBodyMutex.lock();
316 if (requestBodyHasActive and requestBodyBuffersPool != nullptr)
317 {
318 requestBodyBuffersPool->unrefBuffer(requestBodyActiveBuffer);
319 }
320 requestBodyActiveBuffer = {};
321 requestBodyActiveCallback = {};
322 requestBodyActiveOffset = 0;
323 requestBodyHasActive = false;
324 requestBodyFinished = false;
325 requestBodyErrorSet = false;
326 requestBodyError = Result(true);
327 requestBodyCV.broadcast();
328 requestBodyMutex.unlock();
329 }
330
331 void finishRequestBodyActive(Result res)
332 {
333 requestBodyMutex.lock();
334 if (requestBodyHasActive and requestBodyBuffersPool != nullptr)
335 {
336 typename T_AsyncBufferView::ID bufferID = requestBodyActiveBuffer;
337
338 Function<void(typename T_AsyncBufferView::ID)> callback = move(requestBodyActiveCallback);
339
340 requestBodyHasActive = false;
341 requestBodyActiveBuffer = {};
342 requestBodyActiveOffset = 0;
343 requestBodyBuffersPool->unrefBuffer(bufferID);
344 requestBodyMutex.unlock();
345 requestBodySink.finishedWriting(bufferID, move(callback), res);
346 }
347 else
348 {
349 requestBodyMutex.unlock();
350 }
351 }
352
353 void onRequestBodyWritableBuffer(typename T_AsyncBufferView::ID bufferID,
354 Function<void(typename T_AsyncBufferView::ID)>&& cb)
355 {
356 requestBodyMutex.lock();
357 if (requestBodyBuffersPool != nullptr)
358 {
359 requestBodyBuffersPool->refBuffer(bufferID);
360 }
361 requestBodyActiveBuffer = bufferID;
362 requestBodyActiveCallback = move(cb);
363 requestBodyActiveOffset = 0;
364 requestBodyHasActive = true;
365 requestBodyCV.broadcast();
366 requestBodyMutex.unlock();
367 }
368
369 void onRequestBodyWritableFinished()
370 {
371 requestBodyMutex.lock();
372 requestBodyFinished = true;
373 requestBodyCV.broadcast();
374 requestBodyMutex.unlock();
375 }
376
377 void onRequestBodyWritableError(Result error)
378 {
379 requestBodyMutex.lock();
380 requestBodyErrorSet = true;
381 requestBodyError = error;
382 requestBodyCV.broadcast();
383 requestBodyMutex.unlock();
384 }
385
386 HttpClientOperation operation;
387 T_AsyncEventLoop* eventLoop = nullptr;
388
389 ResponseBodyStream responseBodyStream;
390 RequestBodySink requestBodySink;
391 T_AsyncBuffersPool responseBuffersPool;
392
393 Span<T_AsyncReadableRequest> responseReadQueue;
394 Span<T_AsyncWritableRequest> requestWriteQueue;
395
396 T_AsyncBuffersPool* requestBodyBuffersPool = nullptr;
397
398 T_AsyncLoopWakeUp wakeUp;
399 bool wakeUpStarted = false;
400
401 HttpClientLocalMutex requestBodyMutex;
402 HttpClientLocalConditionVariable requestBodyCV;
403
404 typename T_AsyncBufferView::ID requestBodyActiveBuffer = {};
405 Function<void(typename T_AsyncBufferView::ID)> requestBodyActiveCallback;
406
407 size_t requestBodyActiveOffset = 0;
408 bool requestBodyHasActive = false;
409 bool requestBodyFinished = false;
410 bool requestBodyErrorSet = false;
411 Result requestBodyError = Result(true);
412
413 bool initialized = false;
414};
415
417} // namespace SC
constexpr T && move(T &value)
Converts an lvalue to an rvalue reference.
Definition Compiler.h:274
struct SC_FOUNDATION_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
#define SC_TRY_MSG(expression, failedMessage)
Checks the value of the given expression and if failed, returns a result with failedMessage to caller...
Definition Result.h:60
#define SC_TRY(expression)
Checks the value of the given expression and if failed, returns this value to caller.
Definition Result.h:49
Caller-owned memory for one HttpClientAsyncT operation.
Definition HttpClientAsync.h:20
Definition HttpClientAsync.h:54
Definition HttpClientAsync.h:49
Definition HttpClientAsync.h:37
Listener receiving response notifications during HttpClientOperation::poll.
Definition HttpClient.h:85
Caller-owned memory for one HttpClientOperation.
Definition HttpClient.h:145
Optional notifier used by external adapters to wake up their own event loop.
Definition HttpClient.h:108
Result start(const HttpClientRequest &request, HttpClientResponse &response, HttpClientOperationListener *listener=nullptr, HttpClientRequestBodyProvider *bodyProvider=nullptr)
Starts a new request on this operation.
void setNotifier(HttpClientOperationNotifier *notifierValue)
Registers an optional notifier used by adapters such as HttpClientAsyncT.
Definition HttpClient.h:239
Result poll(uint32_t timeoutMilliseconds=0)
Processes queued backend events and optionally waits for more work.
Pull-based provider for streamed request bodies.
Definition HttpClient.h:72
Reusable HTTP backend/session owner.
Definition HttpClient.h:160
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
static constexpr Result Error(const char(&msg)[numChars])
Constructs an Error from a pointer to an ASCII string literal.
Definition Result.h:25
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29