38 using T_AsyncLoopWakeUp =
typename T_AsyncEventLoop::LoopWakeUp;
39 using T_AsyncBufferView =
typename T_AsyncStreams::BufferView;
40 using T_AsyncBuffersPool =
typename T_AsyncStreams::BuffersPool;
41 using T_AsyncReadableStream =
typename T_AsyncStreams::ReadableStream;
42 using T_AsyncWritableStream =
typename T_AsyncStreams::WritableStream;
43 using T_AsyncReadableRequest =
typename T_AsyncReadableStream::Request;
44 using T_AsyncWritableRequest =
typename T_AsyncWritableStream::Request;
46 static constexpr int MaxListeners = 8;
50 virtual Result asyncRead()
override {
return Result(
true); }
59 this->setWriteQueue(writeQueueSpan);
60 return T_AsyncWritableStream::init(buffersPool);
64 virtual Result asyncWrite(
typename T_AsyncBufferView::ID bufferID,
65 Function<
void(
typename T_AsyncBufferView::ID)> cb)
override
67 SC_TRY_MSG(owner !=
nullptr,
"HttpClientAsyncT::RequestBodySink missing owner");
68 owner->onRequestBodyWritableBuffer(bufferID,
move(cb));
72 virtual Result asyncDestroyWritable()
override
74 if (owner !=
nullptr and owner->operation.isRequestInFlight() and not owner->requestBodyFinished)
76 owner->onRequestBodyWritableError(
Result::Error(
"HttpClientAsyncT: request body sink destroyed"));
78 T_AsyncWritableStream::finishedDestroyingWritable();
82 virtual bool canEndWritable()
override
86 owner->onRequestBodyWritableFinished();
94 HttpClientEvent<MaxListeners, HttpClientResponse&> eventResponseHead;
99 SC_TRY_MSG(eventLoop ==
nullptr,
"HttpClientAsyncT: already initialized");
100 SC_TRY_MSG(asyncMemory.responseBuffers.sizeInElements() > 0,
"HttpClientAsyncT: response buffers missing");
101 SC_TRY_MSG(asyncMemory.responseReadQueue.sizeInElements() > 0,
"HttpClientAsyncT: response read queue missing");
102 SC_TRY_MSG(asyncMemory.requestWriteQueue.sizeInElements() > 0,
"HttpClientAsyncT: request write queue missing");
105 responseReadQueue = asyncMemory.responseReadQueue;
106 requestWriteQueue = asyncMemory.requestWriteQueue;
108 responseBuffersPool.setBuffers(asyncMemory.responseBuffers);
109 if (asyncMemory.responseBufferMemory.sizeInBytes() > 0)
111 SC_TRY(T_AsyncBuffersPool::sliceInEqualParts(asyncMemory.responseBuffers, asyncMemory.responseBufferMemory,
112 asyncMemory.responseBuffers.sizeInElements()));
115 responseBodyStream.setReadQueue(responseReadQueue);
116 responseBodyStream.setAutoDestroy(
false);
117 requestBodySink.setAutoDestroy(
false);
118 wakeUp.callback = [
this](
typename T_AsyncLoopWakeUp::Result& result) { onWakeUp(result); };
120 SC_TRY(operation.init(client, operationMemory));
133 if (wakeUpStarted and eventLoop !=
nullptr)
135 (void)wakeUp.stop(*eventLoop);
136 wakeUpStarted =
false;
139 if (requestBodySink.isStillWriting() or not requestBodySink.hasBeenDestroyed())
141 requestBodySink.destroy();
143 responseBodyStream.destroy();
144 resetRequestBodyState();
146 SC_TRY(operation.close());
150 requestBodyBuffersPool =
nullptr;
155 Result cancel() {
return operation.cancel(); }
157 Result start(
const HttpClientRequest& request, HttpClientResponse& response,
158 T_AsyncBuffersPool* requestBodyPool =
nullptr)
160 SC_TRY_MSG(initialized,
"HttpClientAsyncT: not initialized");
161 if (request.body.isStreamed())
163 SC_TRY_MSG(requestBodyPool !=
nullptr,
"HttpClientAsyncT: streamed request body requires buffers pool");
166 requestBodyBuffersPool = requestBodyPool;
167 resetRequestBodyState();
169 responseBodyStream.setReadQueue(responseReadQueue);
170 SC_TRY(responseBodyStream.init(responseBuffersPool));
171 SC_TRY(responseBodyStream.start());
173 if (request.body.isStreamed())
175 SC_TRY(requestBodySink.init(*requestBodyBuffersPool, requestWriteQueue, *
this));
178 if (not wakeUpStarted)
180 SC_TRY(wakeUp.start(*eventLoop));
181 wakeUpStarted =
true;
184 HttpClientRequest asyncRequest = request;
185 if (asyncRequest.body.isStreamed())
187 asyncRequest.body.provider =
this;
190 const Result res = operation.
start(asyncRequest, response,
this);
193 if (request.body.isStreamed())
195 requestBodySink.destroy();
197 responseBodyStream.destroy();
203 [[nodiscard]] T_AsyncReadableStream& getResponseBodyStream() {
return responseBodyStream; }
204 [[nodiscard]] T_AsyncWritableStream& getRequestBodySink() {
return requestBodySink; }
206 [[nodiscard]]
bool isInitialized()
const {
return initialized; }
207 [[nodiscard]]
bool isRequestInFlight()
const {
return operation.isRequestInFlight(); }
210 virtual void onResponseHead(HttpClientResponse& response)
override { eventResponseHead.emit(response); }
212 virtual void onResponseBody(Span<const char> data)
override
214 typename T_AsyncBufferView::ID bufferID;
216 if (not responseBuffersPool.requestNewBuffer(data.sizeInBytes(), bufferID, writable))
218 responseBodyStream.emitError(
Result::Error(
"HttpClientAsyncT: response buffer exhausted"));
219 (void)operation.cancel();
222 memcpy(writable.data(), data.data(), data.sizeInBytes());
223 const bool continuePushing = responseBodyStream.push(bufferID, data.sizeInBytes());
224 responseBuffersPool.unrefBuffer(bufferID);
225 if (not continuePushing)
227 (void)operation.cancel();
231 virtual void onResponseComplete()
override
233 responseBodyStream.pushEnd();
234 resetRequestBodyState();
237 virtual void onError(Result error)
override
239 responseBodyStream.emitError(error);
240 if (requestBodyBuffersPool !=
nullptr)
242 requestBodySink.destroy();
244 resetRequestBodyState();
247 virtual Result pullRequestBody(Span<char> dest,
size_t& bytesWritten,
bool& endReached)
override
252 requestBodyMutex.lock();
253 while (not requestBodyHasActive and not requestBodyFinished and not requestBodyErrorSet)
255 requestBodyCV.wait(requestBodyMutex);
258 if (requestBodyErrorSet)
260 const Result error = requestBodyError;
261 requestBodyMutex.unlock();
265 if (not requestBodyHasActive and requestBodyFinished)
267 requestBodyMutex.unlock();
272 const typename T_AsyncBufferView::ID activeBuffer = requestBodyActiveBuffer;
273 const size_t activeOffset = requestBodyActiveOffset;
274 requestBodyMutex.unlock();
276 SC_TRY_MSG(requestBodyBuffersPool !=
nullptr,
"HttpClientAsyncT: missing request body buffers pool");
278 Span<const char> readable;
279 SC_TRY(requestBodyBuffersPool->getReadableData(activeBuffer, readable));
281 const size_t remaining = readable.sizeInBytes() - activeOffset;
282 bytesWritten = remaining < dest.sizeInBytes() ? remaining : dest.sizeInBytes();
283 if (bytesWritten > 0)
285 memcpy(dest.data(), readable.data() + activeOffset, bytesWritten);
288 requestBodyMutex.lock();
289 requestBodyActiveOffset += bytesWritten;
290 const bool consumedBuffer = requestBodyActiveOffset >= readable.sizeInBytes();
291 requestBodyMutex.unlock();
295 finishRequestBodyActive(Result(
true));
301 virtual void notifyHttpClientOperation(HttpClientOperation&)
override
303 if (wakeUpStarted and eventLoop !=
nullptr)
305 (void)wakeUp.wakeUp(*eventLoop);
309 void onWakeUp(
typename T_AsyncLoopWakeUp::Result& result)
311 const Result pollResult = operation.
poll();
314 responseBodyStream.emitError(pollResult);
316 result.reactivateRequest(
true);
319 void resetRequestBodyState()
321 requestBodyMutex.lock();
322 if (requestBodyHasActive and requestBodyBuffersPool !=
nullptr)
324 requestBodyBuffersPool->unrefBuffer(requestBodyActiveBuffer);
326 requestBodyActiveBuffer = {};
327 requestBodyActiveCallback = {};
328 requestBodyActiveOffset = 0;
329 requestBodyHasActive =
false;
330 requestBodyFinished =
false;
331 requestBodyErrorSet =
false;
332 requestBodyError = Result(
true);
333 requestBodyCV.broadcast();
334 requestBodyMutex.unlock();
337 void finishRequestBodyActive(Result res)
339 requestBodyMutex.lock();
340 if (requestBodyHasActive and requestBodyBuffersPool !=
nullptr)
342 typename T_AsyncBufferView::ID bufferID = requestBodyActiveBuffer;
344 Function<void(
typename T_AsyncBufferView::ID)> callback =
move(requestBodyActiveCallback);
346 requestBodyHasActive =
false;
347 requestBodyActiveBuffer = {};
348 requestBodyActiveOffset = 0;
349 requestBodyBuffersPool->unrefBuffer(bufferID);
350 requestBodyMutex.unlock();
351 requestBodySink.finishedWriting(bufferID,
move(callback), res);
355 requestBodyMutex.unlock();
359 void onRequestBodyWritableBuffer(
typename T_AsyncBufferView::ID bufferID,
360 Function<
void(
typename T_AsyncBufferView::ID)>&& cb)
362 requestBodyMutex.lock();
363 if (requestBodyBuffersPool !=
nullptr)
365 requestBodyBuffersPool->refBuffer(bufferID);
367 requestBodyActiveBuffer = bufferID;
368 requestBodyActiveCallback =
move(cb);
369 requestBodyActiveOffset = 0;
370 requestBodyHasActive =
true;
371 requestBodyCV.broadcast();
372 requestBodyMutex.unlock();
375 void onRequestBodyWritableFinished()
377 requestBodyMutex.lock();
378 requestBodyFinished =
true;
379 requestBodyCV.broadcast();
380 requestBodyMutex.unlock();
383 void onRequestBodyWritableError(Result error)
385 requestBodyMutex.lock();
386 requestBodyErrorSet =
true;
387 requestBodyError = error;
388 requestBodyCV.broadcast();
389 requestBodyMutex.unlock();
392 HttpClientOperation operation;
393 T_AsyncEventLoop* eventLoop =
nullptr;
395 ResponseBodyStream responseBodyStream;
396 RequestBodySink requestBodySink;
397 T_AsyncBuffersPool responseBuffersPool;
399 Span<T_AsyncReadableRequest> responseReadQueue;
400 Span<T_AsyncWritableRequest> requestWriteQueue;
402 T_AsyncBuffersPool* requestBodyBuffersPool =
nullptr;
404 T_AsyncLoopWakeUp wakeUp;
405 bool wakeUpStarted =
false;
407 HttpClientLocalMutex requestBodyMutex;
408 HttpClientLocalConditionVariable requestBodyCV;
410 typename T_AsyncBufferView::ID requestBodyActiveBuffer = {};
411 Function<void(
typename T_AsyncBufferView::ID)> requestBodyActiveCallback;
413 size_t requestBodyActiveOffset = 0;
414 bool requestBodyHasActive =
false;
415 bool requestBodyFinished =
false;
416 bool requestBodyErrorSet =
false;
417 Result requestBodyError = Result(
true);
419 bool initialized =
false;