Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
HttpClientAsync.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../HttpClient/HttpClient.h"
6#include "../HttpClient/Internal/HttpClientEvent.h"
7#include "../HttpClient/Internal/HttpClientThreading.h"
8
9#include <string.h>
10
11namespace SC
12{
15
18template <typename T_AsyncStreams>
19struct SC_HTTP_CLIENT_EXPORT HttpClientAsyncOperationMemoryT
20{
21 using T_AsyncBufferView = typename T_AsyncStreams::BufferView;
22 using T_AsyncReadableStream = typename T_AsyncStreams::ReadableStream;
23 using T_AsyncWritableStream = typename T_AsyncStreams::WritableStream;
24 using T_AsyncReadableRequest = typename T_AsyncReadableStream::Request;
25 using T_AsyncWritableRequest = typename T_AsyncWritableStream::Request;
26
27 Span<T_AsyncBufferView> responseBuffers;
28 Span<char> responseBufferMemory;
29 Span<T_AsyncReadableRequest> responseReadQueue;
30 Span<T_AsyncWritableRequest> requestWriteQueue;
31};
32
33template <typename T_AsyncEventLoop, typename T_AsyncStreams>
37{
38 using T_AsyncLoopWakeUp = typename T_AsyncEventLoop::LoopWakeUp;
39 using T_AsyncBufferView = typename T_AsyncStreams::BufferView;
40 using T_AsyncBuffersPool = typename T_AsyncStreams::BuffersPool;
41 using T_AsyncReadableStream = typename T_AsyncStreams::ReadableStream;
42 using T_AsyncWritableStream = typename T_AsyncStreams::WritableStream;
43 using T_AsyncReadableRequest = typename T_AsyncReadableStream::Request;
44 using T_AsyncWritableRequest = typename T_AsyncWritableStream::Request;
45
46 static constexpr int MaxListeners = 8;
47
48 struct ResponseBodyStream final : public T_AsyncReadableStream
49 {
50 virtual Result asyncRead() override { return Result(true); }
51 };
52
53 struct RequestBodySink final : public T_AsyncWritableStream
54 {
55 Result init(T_AsyncBuffersPool& buffersPool, Span<T_AsyncWritableRequest> writeQueueSpan,
56 HttpClientAsyncT& ownerValue)
57 {
58 owner = &ownerValue;
59 this->setWriteQueue(writeQueueSpan);
60 return T_AsyncWritableStream::init(buffersPool);
61 }
62
63 private:
64 virtual Result asyncWrite(typename T_AsyncBufferView::ID bufferID,
65 Function<void(typename T_AsyncBufferView::ID)> cb) override
66 {
67 SC_TRY_MSG(owner != nullptr, "HttpClientAsyncT::RequestBodySink missing owner");
68 owner->onRequestBodyWritableBuffer(bufferID, move(cb));
69 return Result(true);
70 }
71
72 virtual Result asyncDestroyWritable() override
73 {
74 if (owner != nullptr and owner->operation.isRequestInFlight() and not owner->requestBodyFinished)
75 {
76 owner->onRequestBodyWritableError(Result::Error("HttpClientAsyncT: request body sink destroyed"));
77 }
78 T_AsyncWritableStream::finishedDestroyingWritable();
79 return Result(true);
80 }
81
82 virtual bool canEndWritable() override
83 {
84 if (owner != nullptr)
85 {
86 owner->onRequestBodyWritableFinished();
87 }
88 return true;
89 }
90
91 HttpClientAsyncT* owner = nullptr;
92 };
93
94 HttpClientEvent<MaxListeners, HttpClientResponse&> eventResponseHead;
95
96 Result init(HttpClient& client, T_AsyncEventLoop& loop, const HttpClientOperationMemory& operationMemory,
98 {
99 SC_TRY_MSG(eventLoop == nullptr, "HttpClientAsyncT: already initialized");
100 SC_TRY_MSG(asyncMemory.responseBuffers.sizeInElements() > 0, "HttpClientAsyncT: response buffers missing");
101 SC_TRY_MSG(asyncMemory.responseReadQueue.sizeInElements() > 0, "HttpClientAsyncT: response read queue missing");
102 SC_TRY_MSG(asyncMemory.requestWriteQueue.sizeInElements() > 0, "HttpClientAsyncT: request write queue missing");
103
104 eventLoop = &loop;
105 responseReadQueue = asyncMemory.responseReadQueue;
106 requestWriteQueue = asyncMemory.requestWriteQueue;
107
108 responseBuffersPool.setBuffers(asyncMemory.responseBuffers);
109 if (asyncMemory.responseBufferMemory.sizeInBytes() > 0)
110 {
111 SC_TRY(T_AsyncBuffersPool::sliceInEqualParts(asyncMemory.responseBuffers, asyncMemory.responseBufferMemory,
112 asyncMemory.responseBuffers.sizeInElements()));
113 }
114
115 responseBodyStream.setReadQueue(responseReadQueue);
116 responseBodyStream.setAutoDestroy(false);
117 requestBodySink.setAutoDestroy(false);
118 wakeUp.callback = [this](typename T_AsyncLoopWakeUp::Result& result) { onWakeUp(result); };
119
120 SC_TRY(operation.init(client, operationMemory));
121 operation.setNotifier(this);
122 initialized = true;
123 return Result(true);
124 }
125
126 Result close()
127 {
128 if (not initialized)
129 {
130 return Result(true);
131 }
132
133 if (wakeUpStarted and eventLoop != nullptr)
134 {
135 (void)wakeUp.stop(*eventLoop);
136 wakeUpStarted = false;
137 }
138
139 if (requestBodySink.isStillWriting() or not requestBodySink.hasBeenDestroyed())
140 {
141 requestBodySink.destroy();
142 }
143 responseBodyStream.destroy();
144 resetRequestBodyState();
145
146 SC_TRY(operation.close());
147 operation.setNotifier(nullptr);
148
149 eventLoop = nullptr;
150 requestBodyBuffersPool = nullptr;
151 initialized = false;
152 return Result(true);
153 }
154
155 Result cancel() { return operation.cancel(); }
156
157 Result start(const HttpClientRequest& request, HttpClientResponse& response,
158 T_AsyncBuffersPool* requestBodyPool = nullptr)
159 {
160 SC_TRY_MSG(initialized, "HttpClientAsyncT: not initialized");
161 if (request.body.isStreamed())
162 {
163 SC_TRY_MSG(requestBodyPool != nullptr, "HttpClientAsyncT: streamed request body requires buffers pool");
164 }
165
166 requestBodyBuffersPool = requestBodyPool;
167 resetRequestBodyState();
168
169 responseBodyStream.setReadQueue(responseReadQueue);
170 SC_TRY(responseBodyStream.init(responseBuffersPool));
171 SC_TRY(responseBodyStream.start());
172
173 if (request.body.isStreamed())
174 {
175 SC_TRY(requestBodySink.init(*requestBodyBuffersPool, requestWriteQueue, *this));
176 }
177
178 if (not wakeUpStarted)
179 {
180 SC_TRY(wakeUp.start(*eventLoop));
181 wakeUpStarted = true;
182 }
183
184 HttpClientRequest asyncRequest = request;
185 if (asyncRequest.body.isStreamed())
186 {
187 asyncRequest.body.provider = this;
188 }
189
190 const Result res = operation.start(asyncRequest, response, this);
191 if (not res)
192 {
193 if (request.body.isStreamed())
194 {
195 requestBodySink.destroy();
196 }
197 responseBodyStream.destroy();
198 return res;
199 }
200 return Result(true);
201 }
202
203 [[nodiscard]] T_AsyncReadableStream& getResponseBodyStream() { return responseBodyStream; }
204 [[nodiscard]] T_AsyncWritableStream& getRequestBodySink() { return requestBodySink; }
205
206 [[nodiscard]] bool isInitialized() const { return initialized; }
207 [[nodiscard]] bool isRequestInFlight() const { return operation.isRequestInFlight(); }
208
209 private:
210 virtual void onResponseHead(HttpClientResponse& response) override { eventResponseHead.emit(response); }
211
212 virtual void onResponseBody(Span<const char> data) override
213 {
214 typename T_AsyncBufferView::ID bufferID;
215 Span<char> writable;
216 if (not responseBuffersPool.requestNewBuffer(data.sizeInBytes(), bufferID, writable))
217 {
218 responseBodyStream.emitError(Result::Error("HttpClientAsyncT: response buffer exhausted"));
219 (void)operation.cancel();
220 return;
221 }
222 memcpy(writable.data(), data.data(), data.sizeInBytes());
223 const bool continuePushing = responseBodyStream.push(bufferID, data.sizeInBytes());
224 responseBuffersPool.unrefBuffer(bufferID);
225 if (not continuePushing)
226 {
227 (void)operation.cancel();
228 }
229 }
230
231 virtual void onResponseComplete() override
232 {
233 responseBodyStream.pushEnd();
234 resetRequestBodyState();
235 }
236
237 virtual void onError(Result error) override
238 {
239 responseBodyStream.emitError(error);
240 if (requestBodyBuffersPool != nullptr)
241 {
242 requestBodySink.destroy();
243 }
244 resetRequestBodyState();
245 }
246
247 virtual Result pullRequestBody(Span<char> dest, size_t& bytesWritten, bool& endReached) override
248 {
249 bytesWritten = 0;
250 endReached = false;
251
252 requestBodyMutex.lock();
253 while (not requestBodyHasActive and not requestBodyFinished and not requestBodyErrorSet)
254 {
255 requestBodyCV.wait(requestBodyMutex);
256 }
257
258 if (requestBodyErrorSet)
259 {
260 const Result error = requestBodyError;
261 requestBodyMutex.unlock();
262 return error;
263 }
264
265 if (not requestBodyHasActive and requestBodyFinished)
266 {
267 requestBodyMutex.unlock();
268 endReached = true;
269 return Result(true);
270 }
271
272 const typename T_AsyncBufferView::ID activeBuffer = requestBodyActiveBuffer;
273 const size_t activeOffset = requestBodyActiveOffset;
274 requestBodyMutex.unlock();
275
276 SC_TRY_MSG(requestBodyBuffersPool != nullptr, "HttpClientAsyncT: missing request body buffers pool");
277
278 Span<const char> readable;
279 SC_TRY(requestBodyBuffersPool->getReadableData(activeBuffer, readable));
280
281 const size_t remaining = readable.sizeInBytes() - activeOffset;
282 bytesWritten = remaining < dest.sizeInBytes() ? remaining : dest.sizeInBytes();
283 if (bytesWritten > 0)
284 {
285 memcpy(dest.data(), readable.data() + activeOffset, bytesWritten);
286 }
287
288 requestBodyMutex.lock();
289 requestBodyActiveOffset += bytesWritten;
290 const bool consumedBuffer = requestBodyActiveOffset >= readable.sizeInBytes();
291 requestBodyMutex.unlock();
292
293 if (consumedBuffer)
294 {
295 finishRequestBodyActive(Result(true));
296 }
297
298 return Result(true);
299 }
300
301 virtual void notifyHttpClientOperation(HttpClientOperation&) override
302 {
303 if (wakeUpStarted and eventLoop != nullptr)
304 {
305 (void)wakeUp.wakeUp(*eventLoop);
306 }
307 }
308
309 void onWakeUp(typename T_AsyncLoopWakeUp::Result& result)
310 {
311 const Result pollResult = operation.poll();
312 if (not pollResult)
313 {
314 responseBodyStream.emitError(pollResult);
315 }
316 result.reactivateRequest(true);
317 }
318
319 void resetRequestBodyState()
320 {
321 requestBodyMutex.lock();
322 if (requestBodyHasActive and requestBodyBuffersPool != nullptr)
323 {
324 requestBodyBuffersPool->unrefBuffer(requestBodyActiveBuffer);
325 }
326 requestBodyActiveBuffer = {};
327 requestBodyActiveCallback = {};
328 requestBodyActiveOffset = 0;
329 requestBodyHasActive = false;
330 requestBodyFinished = false;
331 requestBodyErrorSet = false;
332 requestBodyError = Result(true);
333 requestBodyCV.broadcast();
334 requestBodyMutex.unlock();
335 }
336
337 void finishRequestBodyActive(Result res)
338 {
339 requestBodyMutex.lock();
340 if (requestBodyHasActive and requestBodyBuffersPool != nullptr)
341 {
342 typename T_AsyncBufferView::ID bufferID = requestBodyActiveBuffer;
343
344 Function<void(typename T_AsyncBufferView::ID)> callback = move(requestBodyActiveCallback);
345
346 requestBodyHasActive = false;
347 requestBodyActiveBuffer = {};
348 requestBodyActiveOffset = 0;
349 requestBodyBuffersPool->unrefBuffer(bufferID);
350 requestBodyMutex.unlock();
351 requestBodySink.finishedWriting(bufferID, move(callback), res);
352 }
353 else
354 {
355 requestBodyMutex.unlock();
356 }
357 }
358
359 void onRequestBodyWritableBuffer(typename T_AsyncBufferView::ID bufferID,
360 Function<void(typename T_AsyncBufferView::ID)>&& cb)
361 {
362 requestBodyMutex.lock();
363 if (requestBodyBuffersPool != nullptr)
364 {
365 requestBodyBuffersPool->refBuffer(bufferID);
366 }
367 requestBodyActiveBuffer = bufferID;
368 requestBodyActiveCallback = move(cb);
369 requestBodyActiveOffset = 0;
370 requestBodyHasActive = true;
371 requestBodyCV.broadcast();
372 requestBodyMutex.unlock();
373 }
374
375 void onRequestBodyWritableFinished()
376 {
377 requestBodyMutex.lock();
378 requestBodyFinished = true;
379 requestBodyCV.broadcast();
380 requestBodyMutex.unlock();
381 }
382
383 void onRequestBodyWritableError(Result error)
384 {
385 requestBodyMutex.lock();
386 requestBodyErrorSet = true;
387 requestBodyError = error;
388 requestBodyCV.broadcast();
389 requestBodyMutex.unlock();
390 }
391
392 HttpClientOperation operation;
393 T_AsyncEventLoop* eventLoop = nullptr;
394
395 ResponseBodyStream responseBodyStream;
396 RequestBodySink requestBodySink;
397 T_AsyncBuffersPool responseBuffersPool;
398
399 Span<T_AsyncReadableRequest> responseReadQueue;
400 Span<T_AsyncWritableRequest> requestWriteQueue;
401
402 T_AsyncBuffersPool* requestBodyBuffersPool = nullptr;
403
404 T_AsyncLoopWakeUp wakeUp;
405 bool wakeUpStarted = false;
406
407 HttpClientLocalMutex requestBodyMutex;
408 HttpClientLocalConditionVariable requestBodyCV;
409
410 typename T_AsyncBufferView::ID requestBodyActiveBuffer = {};
411 Function<void(typename T_AsyncBufferView::ID)> requestBodyActiveCallback;
412
413 size_t requestBodyActiveOffset = 0;
414 bool requestBodyHasActive = false;
415 bool requestBodyFinished = false;
416 bool requestBodyErrorSet = false;
417 Result requestBodyError = Result(true);
418
419 bool initialized = false;
420};
421
423} // namespace SC
constexpr T && move(T &value)
Converts an lvalue to an rvalue reference.
Definition Compiler.h:297
struct SC_FOUNDATION_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
#define SC_TRY_MSG(expression, failedMessage)
Checks the value of the given expression and if failed, returns a result with failedMessage to caller...
Definition Result.h:60
#define SC_TRY(expression)
Checks the value of the given expression and if failed, returns this value to caller.
Definition Result.h:49
Caller-owned memory for one HttpClientAsyncT operation.
Definition HttpClientAsync.h:20
Definition HttpClientAsync.h:54
Definition HttpClientAsync.h:49
Definition HttpClientAsync.h:37
Listener receiving response notifications during HttpClientOperation::poll.
Definition HttpClient.h:365
Caller-owned memory for one HttpClientOperation.
Definition HttpClient.h:431
Optional notifier used by external adapters to wake up their own event loop.
Definition HttpClient.h:388
void setNotifier(HttpClientOperationNotifier *notifierValue)
Registers an optional notifier used by adapters such as HttpClientAsyncT.
Definition HttpClient.h:530
Result start(const HttpClientRequest &request, HttpClientResponse &response, HttpClientOperationListener *listener=nullptr)
Starts a new request on this operation.
Result poll(uint32_t timeoutMilliseconds=0)
Processes queued backend events and optionally waits for more work.
Pull-based provider for streamed request bodies.
Definition HttpClient.h:352
Reusable HTTP backend/session owner.
Definition HttpClient.h:447
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
static constexpr Result Error(const char(&msg)[numChars])
Constructs an Error from a pointer to an ASCII string literal.
Definition Result.h:25
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29