38 using T_AsyncLoopWakeUp =
typename T_AsyncEventLoop::LoopWakeUp;
39 using T_AsyncBufferView =
typename T_AsyncStreams::BufferView;
40 using T_AsyncBuffersPool =
typename T_AsyncStreams::BuffersPool;
41 using T_AsyncReadableStream =
typename T_AsyncStreams::ReadableStream;
42 using T_AsyncWritableStream =
typename T_AsyncStreams::WritableStream;
43 using T_AsyncReadableRequest =
typename T_AsyncReadableStream::Request;
44 using T_AsyncWritableRequest =
typename T_AsyncWritableStream::Request;
46 static constexpr int MaxListeners = 8;
50 virtual Result asyncRead()
override {
return Result(
true); }
59 this->setWriteQueue(writeQueueSpan);
60 return T_AsyncWritableStream::init(buffersPool);
64 virtual Result asyncWrite(
typename T_AsyncBufferView::ID bufferID,
65 Function<
void(
typename T_AsyncBufferView::ID)> cb)
override
67 SC_TRY_MSG(owner !=
nullptr,
"HttpClientAsyncT::RequestBodySink missing owner");
68 owner->onRequestBodyWritableBuffer(bufferID,
move(cb));
72 virtual Result asyncDestroyWritable()
override
74 if (owner !=
nullptr and owner->operation.isRequestInFlight() and not owner->requestBodyFinished)
76 owner->onRequestBodyWritableError(
Result::Error(
"HttpClientAsyncT: request body sink destroyed"));
78 T_AsyncWritableStream::finishedDestroyingWritable();
82 virtual bool canEndWritable()
override
86 owner->onRequestBodyWritableFinished();
94 HttpClientEvent<MaxListeners, HttpClientResponse&> eventResponseHead;
99 SC_TRY_MSG(eventLoop ==
nullptr,
"HttpClientAsyncT: already initialized");
100 SC_TRY_MSG(asyncMemory.responseBuffers.sizeInElements() > 0,
"HttpClientAsyncT: response buffers missing");
101 SC_TRY_MSG(asyncMemory.responseReadQueue.sizeInElements() > 0,
"HttpClientAsyncT: response read queue missing");
102 SC_TRY_MSG(asyncMemory.requestWriteQueue.sizeInElements() > 0,
"HttpClientAsyncT: request write queue missing");
105 responseReadQueue = asyncMemory.responseReadQueue;
106 requestWriteQueue = asyncMemory.requestWriteQueue;
108 responseBuffersPool.setBuffers(asyncMemory.responseBuffers);
109 if (asyncMemory.responseBufferMemory.sizeInBytes() > 0)
111 SC_TRY(T_AsyncBuffersPool::sliceInEqualParts(asyncMemory.responseBuffers, asyncMemory.responseBufferMemory,
112 asyncMemory.responseBuffers.sizeInElements()));
115 responseBodyStream.setReadQueue(responseReadQueue);
116 responseBodyStream.setAutoDestroy(
false);
117 requestBodySink.setAutoDestroy(
false);
118 wakeUp.callback = [
this](
typename T_AsyncLoopWakeUp::Result& result) { onWakeUp(result); };
120 SC_TRY(operation.init(client, operationMemory));
133 if (wakeUpStarted and eventLoop !=
nullptr)
135 (void)wakeUp.stop(*eventLoop);
136 wakeUpStarted =
false;
139 if (requestBodySink.isStillWriting() or not requestBodySink.hasBeenDestroyed())
141 requestBodySink.destroy();
143 responseBodyStream.destroy();
144 resetRequestBodyState();
146 SC_TRY(operation.close());
150 requestBodyBuffersPool =
nullptr;
155 Result cancel() {
return operation.cancel(); }
157 Result start(
const HttpClientRequest& request, HttpClientResponse& response,
158 T_AsyncBuffersPool* requestBodyPool =
nullptr)
160 SC_TRY_MSG(initialized,
"HttpClientAsyncT: not initialized");
161 if (request.streamedBodySize > 0)
163 SC_TRY_MSG(requestBodyPool !=
nullptr,
"HttpClientAsyncT: streamed request body requires buffers pool");
166 requestBodyBuffersPool = requestBodyPool;
167 resetRequestBodyState();
169 responseBodyStream.setReadQueue(responseReadQueue);
170 SC_TRY(responseBodyStream.init(responseBuffersPool));
171 SC_TRY(responseBodyStream.start());
173 if (request.streamedBodySize > 0)
175 SC_TRY(requestBodySink.init(*requestBodyBuffersPool, requestWriteQueue, *
this));
178 if (not wakeUpStarted)
180 SC_TRY(wakeUp.start(*eventLoop));
181 wakeUpStarted =
true;
184 const Result res = operation.
start(request, response,
this, request.streamedBodySize > 0 ?
this :
nullptr);
187 if (request.streamedBodySize > 0)
189 requestBodySink.destroy();
191 responseBodyStream.destroy();
197 [[nodiscard]] T_AsyncReadableStream& getResponseBodyStream() {
return responseBodyStream; }
198 [[nodiscard]] T_AsyncWritableStream& getRequestBodySink() {
return requestBodySink; }
200 [[nodiscard]]
bool isInitialized()
const {
return initialized; }
201 [[nodiscard]]
bool isRequestInFlight()
const {
return operation.isRequestInFlight(); }
204 virtual void onResponseHead(HttpClientResponse& response)
override { eventResponseHead.emit(response); }
206 virtual void onResponseBody(Span<const char> data)
override
208 typename T_AsyncBufferView::ID bufferID;
210 if (not responseBuffersPool.requestNewBuffer(data.sizeInBytes(), bufferID, writable))
212 responseBodyStream.emitError(
Result::Error(
"HttpClientAsyncT: response buffer exhausted"));
213 (void)operation.cancel();
216 memcpy(writable.data(), data.data(), data.sizeInBytes());
217 const bool continuePushing = responseBodyStream.push(bufferID, data.sizeInBytes());
218 responseBuffersPool.unrefBuffer(bufferID);
219 if (not continuePushing)
221 (void)operation.cancel();
225 virtual void onResponseComplete()
override
227 responseBodyStream.pushEnd();
228 resetRequestBodyState();
231 virtual void onError(Result error)
override
233 responseBodyStream.emitError(error);
234 if (requestBodyBuffersPool !=
nullptr)
236 requestBodySink.destroy();
238 resetRequestBodyState();
241 virtual Result pullRequestBody(Span<char> dest,
size_t& bytesWritten,
bool& endReached)
override
246 requestBodyMutex.lock();
247 while (not requestBodyHasActive and not requestBodyFinished and not requestBodyErrorSet)
249 requestBodyCV.wait(requestBodyMutex);
252 if (requestBodyErrorSet)
254 const Result error = requestBodyError;
255 requestBodyMutex.unlock();
259 if (not requestBodyHasActive and requestBodyFinished)
261 requestBodyMutex.unlock();
266 const typename T_AsyncBufferView::ID activeBuffer = requestBodyActiveBuffer;
267 const size_t activeOffset = requestBodyActiveOffset;
268 requestBodyMutex.unlock();
270 SC_TRY_MSG(requestBodyBuffersPool !=
nullptr,
"HttpClientAsyncT: missing request body buffers pool");
272 Span<const char> readable;
273 SC_TRY(requestBodyBuffersPool->getReadableData(activeBuffer, readable));
275 const size_t remaining = readable.sizeInBytes() - activeOffset;
276 bytesWritten = remaining < dest.sizeInBytes() ? remaining : dest.sizeInBytes();
277 if (bytesWritten > 0)
279 memcpy(dest.data(), readable.data() + activeOffset, bytesWritten);
282 requestBodyMutex.lock();
283 requestBodyActiveOffset += bytesWritten;
284 const bool consumedBuffer = requestBodyActiveOffset >= readable.sizeInBytes();
285 requestBodyMutex.unlock();
289 finishRequestBodyActive(Result(
true));
295 virtual void notifyHttpClientOperation(HttpClientOperation&)
override
297 if (wakeUpStarted and eventLoop !=
nullptr)
299 (void)wakeUp.wakeUp(*eventLoop);
303 void onWakeUp(
typename T_AsyncLoopWakeUp::Result& result)
305 const Result pollResult = operation.
poll();
308 responseBodyStream.emitError(pollResult);
310 result.reactivateRequest(
true);
313 void resetRequestBodyState()
315 requestBodyMutex.lock();
316 if (requestBodyHasActive and requestBodyBuffersPool !=
nullptr)
318 requestBodyBuffersPool->unrefBuffer(requestBodyActiveBuffer);
320 requestBodyActiveBuffer = {};
321 requestBodyActiveCallback = {};
322 requestBodyActiveOffset = 0;
323 requestBodyHasActive =
false;
324 requestBodyFinished =
false;
325 requestBodyErrorSet =
false;
326 requestBodyError = Result(
true);
327 requestBodyCV.broadcast();
328 requestBodyMutex.unlock();
331 void finishRequestBodyActive(Result res)
333 requestBodyMutex.lock();
334 if (requestBodyHasActive and requestBodyBuffersPool !=
nullptr)
336 typename T_AsyncBufferView::ID bufferID = requestBodyActiveBuffer;
338 Function<void(
typename T_AsyncBufferView::ID)> callback =
move(requestBodyActiveCallback);
340 requestBodyHasActive =
false;
341 requestBodyActiveBuffer = {};
342 requestBodyActiveOffset = 0;
343 requestBodyBuffersPool->unrefBuffer(bufferID);
344 requestBodyMutex.unlock();
345 requestBodySink.finishedWriting(bufferID,
move(callback), res);
349 requestBodyMutex.unlock();
353 void onRequestBodyWritableBuffer(
typename T_AsyncBufferView::ID bufferID,
354 Function<
void(
typename T_AsyncBufferView::ID)>&& cb)
356 requestBodyMutex.lock();
357 if (requestBodyBuffersPool !=
nullptr)
359 requestBodyBuffersPool->refBuffer(bufferID);
361 requestBodyActiveBuffer = bufferID;
362 requestBodyActiveCallback =
move(cb);
363 requestBodyActiveOffset = 0;
364 requestBodyHasActive =
true;
365 requestBodyCV.broadcast();
366 requestBodyMutex.unlock();
369 void onRequestBodyWritableFinished()
371 requestBodyMutex.lock();
372 requestBodyFinished =
true;
373 requestBodyCV.broadcast();
374 requestBodyMutex.unlock();
377 void onRequestBodyWritableError(Result error)
379 requestBodyMutex.lock();
380 requestBodyErrorSet =
true;
381 requestBodyError = error;
382 requestBodyCV.broadcast();
383 requestBodyMutex.unlock();
386 HttpClientOperation operation;
387 T_AsyncEventLoop* eventLoop =
nullptr;
389 ResponseBodyStream responseBodyStream;
390 RequestBodySink requestBodySink;
391 T_AsyncBuffersPool responseBuffersPool;
393 Span<T_AsyncReadableRequest> responseReadQueue;
394 Span<T_AsyncWritableRequest> requestWriteQueue;
396 T_AsyncBuffersPool* requestBodyBuffersPool =
nullptr;
398 T_AsyncLoopWakeUp wakeUp;
399 bool wakeUpStarted =
false;
401 HttpClientLocalMutex requestBodyMutex;
402 HttpClientLocalConditionVariable requestBodyCV;
404 typename T_AsyncBufferView::ID requestBodyActiveBuffer = {};
405 Function<void(
typename T_AsyncBufferView::ID)> requestBodyActiveCallback;
407 size_t requestBodyActiveOffset = 0;
408 bool requestBodyHasActive =
false;
409 bool requestBodyFinished =
false;
410 bool requestBodyErrorSet =
false;
411 Result requestBodyError = Result(
true);
413 bool initialized =
false;