Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
Async.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../File/File.h"
6#include "../FileSystem/FileSystem.h"
7#include "../Foundation/Function.h"
8#include "../Foundation/Internal/IntrusiveDoubleLinkedList.h"
9#include "../Foundation/OpaqueObject.h"
10#include "../Socket/Socket.h"
11#include "../Threading/Atomic.h"
12#include "../Threading/ThreadPool.h"
13#include "../Time/Time.h"
14
15namespace SC
16{
17struct ThreadPool;
18struct ThreadPoolTask;
19struct EventObject;
20} // namespace SC
45
48namespace SC
49{
50struct AsyncEventLoop;
51struct AsyncResult;
52struct AsyncSequence;
53struct AsyncTaskSequence;
54
55namespace detail
56{
57struct AsyncWinOverlapped;
58struct AsyncWinOverlappedDefinition
59{
60 static constexpr int Windows = sizeof(void*) * 4 + sizeof(uint64_t);
61 static constexpr size_t Alignment = alignof(void*);
62
63 using Object = AsyncWinOverlapped;
64};
65using WinOverlappedOpaque = OpaqueObject<AsyncWinOverlappedDefinition>;
66
67struct AsyncWinWaitDefinition
68{
69 using Handle = FileDescriptor::Handle; // fd
70 static constexpr Handle Invalid = FileDescriptor::Invalid; // invalid fd
71
72 static Result releaseHandle(Handle& waitHandle);
73};
74struct WinWaitHandle : public UniqueHandle<AsyncWinWaitDefinition>
75{
76};
77} // namespace detail
78
118{
119 AsyncRequest* next = nullptr;
120 AsyncRequest* prev = nullptr;
121
122 void setDebugName(const char* newDebugName);
123
125 void executeOn(AsyncSequence& sequence);
126
131
134
153
156 AsyncRequest(Type type) : state(State::Free), type(type), flags(0), userFlags(0), unused(0) {}
157
164 Result stop(AsyncEventLoop& eventLoop, Function<void(AsyncResult&)>* afterStopped = nullptr);
165
167 [[nodiscard]] bool isFree() const;
168
170 [[nodiscard]] bool isCancelling() const;
171
173 [[nodiscard]] bool isActive() const;
174
176 [[nodiscard]] Type getType() const { return type; }
177
180
182 void setUserFlags(uint16_t externalFlags) { userFlags = externalFlags; }
183
185 uint16_t getUserFlags() const { return userFlags; }
186
188 [[nodiscard]] Function<void(AsyncResult&)>* getCloseCallback() { return closeCallback; }
189
190 [[nodiscard]] const Function<void(AsyncResult&)>* getCloseCallback() const { return closeCallback; }
191
192 protected:
193 Result checkState();
194
195 void queueSubmission(AsyncEventLoop& eventLoop);
196
197 AsyncSequence* sequence = nullptr;
198
199 AsyncTaskSequence* getTask();
200
201 private:
202 Function<void(AsyncResult&)>* closeCallback = nullptr;
203
204 friend struct AsyncEventLoop;
205 friend struct AsyncResult;
206
207 void markAsFree();
208
209 [[nodiscard]] static const char* TypeToString(Type type);
210 enum class State : uint8_t
211 {
212 Free, // not in any queue, this can be started with an async.start(...)
213 Setup, // when in submission queue waiting to be setup (after an async.start(...))
214 Submitting, // when in submission queue waiting to be activated or re-activated
215 Active, // when monitored by OS syscall or in activeLoopWakeUps / activeTimeouts queues
216 Reactivate, // when flagged for reactivation inside the callback (after a result.reactivateRequest(true))
217 Cancelling, // when in cancellation queue waiting for a cancelAsync (on active async)
218 };
219
220#if SC_ASYNC_ENABLE_LOG
221 const char* debugName = "None";
222#endif
223 State state; // 1 byte
224 Type type; // 1 byte
225 int16_t flags; // 2 bytes
226
227 uint16_t unused; // 2 bytes
228 uint16_t userFlags; // 2 bytes
229};
230
236{
237 AsyncSequence* next = nullptr;
238 AsyncSequence* prev = nullptr;
239
242 private:
243 friend struct AsyncEventLoop;
244 bool runningAsync = false; // true if an async from this sequence is being run
245 bool tracked = false;
246
247 IntrusiveDoubleLinkedList<AsyncRequest> submissions;
248};
249
252{
253};
254
258{
260 AsyncResult(AsyncEventLoop& eventLoop, AsyncRequest& request, SC::Result& res, bool* hasBeenReactivated = nullptr)
261 : eventLoop(eventLoop), async(request), returnCode(res), hasBeenReactivated(hasBeenReactivated)
262 {}
263
266 void reactivateRequest(bool shouldBeReactivated);
267
269 [[nodiscard]] const SC::Result& isValid() const { return returnCode; }
270
271 AsyncEventLoop& eventLoop;
272 AsyncRequest& async;
273
274 protected:
275 friend struct AsyncEventLoop;
276
277 bool shouldCallCallback = true;
278 bool* hasBeenReactivated = nullptr;
279
280 SC::Result& returnCode;
281};
282
286template <typename T, typename C>
288{
289 T& getAsync() { return static_cast<T&>(AsyncResult::async); }
290 const T& getAsync() const { return static_cast<const T&>(AsyncResult::async); }
291
293
294 C completionData;
295 int32_t eventIndex = 0;
296};
297
325
361
367{
369
371 {
372 int exitStatus;
373 };
374
375 struct Result : public AsyncResultOf<AsyncProcessExit, CompletionData>
376 {
377 using AsyncResultOf<AsyncProcessExit, CompletionData>::AsyncResultOf;
378
379 SC::Result get(int& status)
380 {
381 status = completionData.exitStatus;
382 return returnCode;
383 }
384 };
386
390 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle process);
391
393
394 private:
395 friend struct AsyncEventLoop;
396 SC::Result validate(AsyncEventLoop&);
397
398 FileDescriptor::Handle handle = FileDescriptor::Invalid;
399#if SC_PLATFORM_WINDOWS
401 detail::WinWaitHandle waitHandle;
402 AsyncEventLoop* eventLoop = nullptr;
403#elif SC_PLATFORM_LINUX
404 FileDescriptor pidFd;
405#endif
406};
407
408struct AsyncSocketAccept;
409namespace detail
410{
413struct AsyncSocketAcceptData
414{
415#if SC_PLATFORM_WINDOWS
416 void (*pAcceptEx)() = nullptr;
418 SocketDescriptor clientSocket;
419 uint8_t acceptBuffer[288] = {0};
420#elif SC_PLATFORM_LINUX
421 AlignedStorage<28> sockAddrHandle;
422 uint32_t sockAddrLen;
423#endif
424};
425
427struct AsyncSocketAcceptBase : public AsyncRequest
428{
429 AsyncSocketAcceptBase() : AsyncRequest(Type::SocketAccept) {}
430
431 struct CompletionData : public AsyncCompletionData
432 {
433 SocketDescriptor acceptedClient;
434 };
435
436 struct Result : public AsyncResultOf<AsyncSocketAccept, CompletionData>
437 {
438 using AsyncResultOf<AsyncSocketAccept, CompletionData>::AsyncResultOf;
439
440 SC::Result moveTo(SocketDescriptor& client)
441 {
442 SC_TRY(returnCode);
443 return client.assign(move(completionData.acceptedClient));
444 }
445 };
447
449 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& socketDescriptor, AsyncSocketAcceptData& data);
450 SC::Result validate(AsyncEventLoop&);
451
452 Function<void(Result&)> callback;
453 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
455 AsyncSocketAcceptData* acceptData = nullptr;
456};
457
458} // namespace detail
459
469struct AsyncSocketAccept : public detail::AsyncSocketAcceptBase
470{
471 AsyncSocketAccept() { AsyncSocketAcceptBase::acceptData = &data; }
472 using AsyncSocketAcceptBase::start;
473
475 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& socketDescriptor);
476
477 private:
478 detail::AsyncSocketAcceptData data;
479};
480
490{
492
496
498 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress address);
499
501
502 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
503 SocketIPAddress ipAddress;
504
505 private:
506 friend struct AsyncEventLoop;
507 SC::Result validate(AsyncEventLoop&);
508
509#if SC_PLATFORM_WINDOWS
510 void (*pConnectEx)() = nullptr;
512#endif
513};
514
524{
527 {
528 size_t numBytes = 0;
529 };
532
535
538
540
541 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
542
545 bool singleBuffer = true;
546
547 protected:
548 AsyncSocketSend(Type type) : AsyncRequest(type) {}
549 friend struct AsyncEventLoop;
550 SC::Result validate(AsyncEventLoop&);
551
552 size_t totalBytesWritten = 0;
553#if SC_PLATFORM_WINDOWS
555#endif
556};
557
568{
570
571 SocketIPAddress address;
572
573 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress ipAddress,
574 Span<const char> data);
575
576 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress ipAddress,
577 Span<Span<const char>> data);
578
579 private:
581 friend struct AsyncEventLoop;
582 SC::Result validate(AsyncEventLoop&);
583#if SC_PLATFORM_LINUX
584 AlignedStorage<56> typeErasedMsgHdr;
585#endif
586};
587
600{
602
604 {
605 size_t numBytes = 0;
606 bool disconnected = false;
607 };
608
609 struct Result : public AsyncResultOf<AsyncSocketReceive, CompletionData>
610 {
611 using AsyncResultOf<AsyncSocketReceive, CompletionData>::AsyncResultOf;
612
617 {
618 SC_TRY(getAsync().buffer.sliceStartLength(0, completionData.numBytes, outData));
619 return returnCode;
620 }
621
622 SocketIPAddress getSourceAddress() const;
623 };
625
627 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, Span<char> data);
628
630
632 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
633
634 protected:
636 friend struct AsyncEventLoop;
637 SC::Result validate(AsyncEventLoop&);
638#if SC_PLATFORM_WINDOWS
640#endif
641};
642
653{
656
657 private:
658 SocketIPAddress address;
659 friend struct AsyncSocketReceive;
660 friend struct AsyncEventLoop;
661#if SC_PLATFORM_LINUX
662 AlignedStorage<56> typeErasedMsgHdr;
663#endif
664};
665
688{
689 AsyncFileRead() : AsyncRequest(Type::FileRead) { handle = FileDescriptor::Invalid; }
690
692 {
693 size_t numBytes = 0;
694 bool endOfFile = false;
695 };
696
697 struct Result : public AsyncResultOf<AsyncFileRead, CompletionData>
698 {
699 using AsyncResultOf<AsyncFileRead, CompletionData>::AsyncResultOf;
700
701 SC::Result get(Span<char>& data)
702 {
703 SC_TRY(getAsync().buffer.sliceStartLength(0, completionData.numBytes, data));
704 return returnCode;
705 }
706 };
708
709 Function<void(Result&)> callback;
711 FileDescriptor::Handle handle;
713
715 uint64_t getOffset() const { return offset; }
716
719 void setOffset(uint64_t fileOffset)
720 {
721 useOffset = true;
722 offset = fileOffset;
723 }
724
725 private:
726 friend struct AsyncEventLoop;
727 SC::Result validate(AsyncEventLoop&);
728 bool useOffset = false;
729 uint64_t offset = 0;
730#if SC_PLATFORM_WINDOWS
731 uint64_t readCursor = 0;
733#endif
734};
735
754{
755 AsyncFileWrite() : AsyncRequest(Type::FileWrite) { handle = FileDescriptor::Invalid; }
756
758 {
759 size_t numBytes = 0;
760 };
761
762 struct Result : public AsyncResultOf<AsyncFileWrite, CompletionData>
763 {
764 using AsyncResultOf<AsyncFileWrite, CompletionData>::AsyncResultOf;
765
766 SC::Result get(size_t& writtenSizeInBytes)
767 {
768 writtenSizeInBytes = completionData.numBytes;
769 return returnCode;
770 }
771 };
772
774
777
780
782
783 FileDescriptor::Handle handle;
785
788 bool singleBuffer = true;
789
791 uint64_t getOffset() const { return offset; }
792
795 void setOffset(uint64_t fileOffset)
796 {
797 useOffset = true;
798 offset = fileOffset;
799 }
800
801 private:
802 friend struct AsyncEventLoop;
803 SC::Result validate(AsyncEventLoop&);
804
805 bool isWatchable = false;
806 bool useOffset = false;
807 uint64_t offset = 0xffffffffffffffff;
808
809 size_t totalBytesWritten = 0;
810#if SC_PLATFORM_WINDOWS
812#endif
813};
814
820{
822
825
827 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle fileDescriptor);
828
829#if SC_PLATFORM_WINDOWS
830 [[nodiscard]] auto& getOverlappedOpaque() { return overlapped; }
831#endif
832
833 Function<void(Result&)> callback;
834
835 private:
836 friend struct AsyncEventLoop;
837 SC::Result validate(AsyncEventLoop&);
838
839 FileDescriptor::Handle handle = FileDescriptor::Invalid;
840#if SC_PLATFORM_WINDOWS
842#endif
843};
844
845// forward declared because it must be defined after AsyncTaskSequence
846struct AsyncLoopWork;
847struct AsyncFileSystemOperation;
848
850{
851 FileDescriptor::Handle handle = FileDescriptor::Invalid; // for open
852
853 int code = 0; // for open/close
854 size_t numBytes = 0; // for read
855};
856
857namespace detail
858{
859// A simple hand-made variant of all completion types
860struct AsyncCompletionVariant
861{
862 AsyncCompletionVariant() {}
863 ~AsyncCompletionVariant() { destroy(); }
864
865 AsyncCompletionVariant(const AsyncCompletionVariant&) = delete;
866 AsyncCompletionVariant(AsyncCompletionVariant&&) = delete;
867 AsyncCompletionVariant& operator=(const AsyncCompletionVariant&) = delete;
868 AsyncCompletionVariant& operator=(AsyncCompletionVariant&&) = delete;
869
870 bool inited = false;
871
873 union
874 {
875 AsyncCompletionData completionDataLoopWork; // Defined after AsyncCompletionVariant / AsyncTaskSequence
876 AsyncLoopTimeout::CompletionData completionDataLoopTimeout;
877 AsyncLoopWakeUp::CompletionData completionDataLoopWakeUp;
878 AsyncProcessExit::CompletionData completionDataProcessExit;
879 AsyncSocketAccept::CompletionData completionDataSocketAccept;
880 AsyncSocketConnect::CompletionData completionDataSocketConnect;
881 AsyncSocketSend::CompletionData completionDataSocketSend;
882 AsyncSocketSendTo::CompletionData completionDataSocketSendTo;
883 AsyncSocketReceive::CompletionData completionDataSocketReceive;
884 AsyncSocketReceiveFrom::CompletionData completionDataSocketReceiveFrom;
885 AsyncFileRead::CompletionData completionDataFileRead;
886 AsyncFileWrite::CompletionData completionDataFileWrite;
887 AsyncFilePoll::CompletionData completionDataFilePoll;
888
889 AsyncFileSystemOperationCompletionData completionDataFileSystemOperation;
890 };
891
892 auto& getCompletion(AsyncLoopWork&) { return completionDataLoopWork; }
893 auto& getCompletion(AsyncLoopTimeout&) { return completionDataLoopTimeout; }
894 auto& getCompletion(AsyncLoopWakeUp&) { return completionDataLoopWakeUp; }
895 auto& getCompletion(AsyncProcessExit&) { return completionDataProcessExit; }
896 auto& getCompletion(AsyncSocketAccept&) { return completionDataSocketAccept; }
897 auto& getCompletion(AsyncSocketConnect&) { return completionDataSocketConnect; }
898 auto& getCompletion(AsyncSocketSend&) { return completionDataSocketSend; }
899 auto& getCompletion(AsyncSocketReceive&) { return completionDataSocketReceive; }
900 auto& getCompletion(AsyncFileRead&) { return completionDataFileRead; }
901 auto& getCompletion(AsyncFileWrite&) { return completionDataFileWrite; }
902 auto& getCompletion(AsyncFilePoll&) { return completionDataFilePoll; }
903 auto& getCompletion(AsyncFileSystemOperation&) { return completionDataFileSystemOperation; }
904
905 template <typename T>
906 auto& construct(T& t)
907 {
908 destroy();
909 placementNew(getCompletion(t));
910 inited = true;
911 type = t.getType();
912 return getCompletion(t);
913 }
914 void destroy();
915};
916} // namespace detail
917
922{
923 protected:
924 ThreadPoolTask task;
925 ThreadPool* threadPool = nullptr;
926
927 friend struct AsyncEventLoop;
928 friend struct AsyncRequest;
929
930 detail::AsyncCompletionVariant completion;
931
932 SC::Result returnCode = SC::Result(true);
933};
934
959
994{
996 ~AsyncFileSystemOperation() { destroy(); }
997
998 enum class Operation
999 {
1000 None = 0,
1001 Open,
1002 Close,
1003 Read,
1004 Write,
1005 CopyFile,
1006 CopyDirectory,
1007 Rename,
1008 RemoveDirectory,
1009 RemoveFile,
1010 };
1011
1014
1017
1019
1026
1031 SC::Result close(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle);
1032
1039 SC::Result read(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle, Span<char> buffer, uint64_t offset);
1040
1047 SC::Result write(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle, Span<const char> buffer,
1048 uint64_t offset);
1049
1056 SC::Result copyFile(AsyncEventLoop& eventLoop, StringSpan path, StringSpan destinationPath,
1058
1067
1074
1081
1087
1088 private:
1089 friend struct AsyncEventLoop;
1090 Operation operation = Operation::None;
1091 AsyncLoopWork loopWork;
1092 CompletionData completionData;
1093
1094 void onOperationCompleted(AsyncLoopWork::Result& res);
1095
1096 struct FileDescriptorData
1097 {
1098 FileDescriptor::Handle handle;
1099 };
1100
1101 struct OpenData
1102 {
1103 StringSpan path;
1104 FileOpen mode;
1105 };
1106
1107 struct ReadData
1108 {
1109 FileDescriptor::Handle handle;
1110 Span<char> buffer;
1111 uint64_t offset;
1112 };
1113
1114 struct WriteData
1115 {
1116 FileDescriptor::Handle handle;
1117 Span<const char> buffer;
1118 uint64_t offset;
1119 };
1120
1121 struct CopyFileData
1122 {
1123 StringSpan path;
1124 StringSpan destinationPath;
1125 FileSystemCopyFlags copyFlags;
1126 };
1127
1128 using CopyDirectoryData = CopyFileData;
1129
1130 using CloseData = FileDescriptorData;
1131
1132 struct RenameData
1133 {
1134 StringSpan path;
1135 StringSpan newPath;
1136 };
1137
1138 struct RemoveData
1139 {
1140 StringSpan path;
1141 };
1142
1143 union
1144 {
1145 OpenData openData;
1146 CloseData closeData;
1147 ReadData readData;
1148 WriteData writeData;
1149 CopyFileData copyFileData;
1150 CopyDirectoryData copyDirectoryData;
1151 RenameData renameData;
1152 RemoveData removeData;
1153 };
1154
1155 void destroy();
1156
1157 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle fileDescriptor);
1158 SC::Result validate(AsyncEventLoop&);
1159};
1160
1165{
1167
1168 private:
1169 int numberOfEvents = 0;
1170 friend struct AsyncEventLoop;
1171};
1172
1175{
1176 Function<void(AsyncEventLoop&)> beforeBlockingPoll;
1177 Function<void(AsyncEventLoop&)> afterBlockingPoll;
1178};
1179
1187{
1189 struct Options
1190 {
1191 enum class ApiType : uint8_t
1192 {
1193 Automatic = 0,
1196 };
1198
1200 };
1201
1203
1206
1209
1213
1216
1218 [[nodiscard]] bool isInitialized() const;
1219
1226
1237
1244
1250
1270
1277
1281
1284
1287
1290
1293
1296
1299
1302
1305
1307 [[nodiscard]] Time::Monotonic getLoopTime() const;
1308
1310 [[nodiscard]] int getNumberOfActiveRequests() const;
1311
1313 [[nodiscard]] int getNumberOfSubmittedRequests() const;
1314
1318
1321
1324
1326 void enumerateRequests(Function<void(AsyncRequest&)> enumerationCallback);
1327
1331
1333 [[nodiscard]] static bool isExcludedFromActiveCount(const AsyncRequest& async);
1334
1337 [[nodiscard]] static bool tryLoadingLiburing();
1338
1341
1342 struct Internal;
1343
1344 private:
1345 struct InternalDefinition
1346 {
1347 static constexpr int Windows = 520;
1348 static constexpr int Apple = 512;
1349 static constexpr int Linux = 720;
1350 static constexpr int Default = Linux;
1351
1352 static constexpr size_t Alignment = 8;
1353
1354 using Object = Internal;
1355 };
1356
1357 public:
1358 using InternalOpaque = OpaqueObject<InternalDefinition>;
1359
1360 private:
1361 InternalOpaque internalOpaque;
1362 Internal& internal;
1363
1364 friend struct AsyncRequest;
1365 friend struct AsyncFileWrite;
1366 friend struct AsyncFileRead;
1367 friend struct AsyncFileSystemOperation;
1368 friend struct AsyncResult;
1369};
1370
1375{
1377
1381
1384
1392
1398
1399 private:
1400#if SC_COMPILER_MSVC
1401#pragma warning(push)
1402#pragma warning(disable : 4324) // useless warning on 32 bit... (structure was padded due to __declspec(align()))
1403#endif
1404 alignas(uint64_t) uint8_t eventsMemory[8 * 1024]; // 8 Kb of kernel events
1405#if SC_COMPILER_MSVC
1406#pragma warning(pop)
1407#endif
1408
1409 AsyncKernelEvents asyncKernelEvents;
1410 AsyncEventLoop* eventLoop = nullptr;
1411 AsyncLoopWakeUp eventLoopWakeUp;
1412
1413 Thread eventLoopThread;
1414 EventObject eventObjectEnterBlockingMode;
1415 EventObject eventObjectExitBlockingMode;
1416
1417 Atomic<bool> finished = false;
1418 Atomic<bool> needsWakeUp = true;
1419
1420 bool wakeUpHasBeenCalled = false;
1421
1422 Result monitoringLoopThread(Thread& thread);
1423};
1424
1425} // namespace SC
unsigned short uint16_t
Platform independent (2) bytes unsigned int.
Definition PrimitiveTypes.h:37
constexpr T && move(T &value)
Converts an lvalue to an rvalue reference.
Definition Compiler.h:269
unsigned char uint8_t
Platform independent (1) byte unsigned int.
Definition PrimitiveTypes.h:36
unsigned long long uint64_t
Platform independent (8) bytes unsigned int.
Definition PrimitiveTypes.h:42
unsigned int uint32_t
Platform independent (4) bytes unsigned int.
Definition PrimitiveTypes.h:38
short int16_t
Platform independent (2) bytes signed int.
Definition PrimitiveTypes.h:45
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
#define SC_TRY(expression)
Checks the value of the given expression and if failed, returns this value to caller.
Definition Result.h:48
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Empty base struct for all AsyncRequest-derived CompletionData (internal) structs.
Definition Async.h:252
Allow library user to provide callbacks signaling different phases of async event loop cycle.
Definition Async.h:1175
Monitors Async I/O events from a background thread using a blocking kernel function (no CPU usage on ...
Definition Async.h:1375
Function< void(void)> onNewEventsAvailable
Informs to call dispatchCompletions on GUI Event Loop.
Definition Async.h:1376
Result startMonitoring()
Queue all async requests submissions and start monitoring loop events on a background thread.
Result close()
Stop monitoring the AsyncEventLoop, disposing all resources.
Result stopMonitoringAndDispatchCompletions()
Stops monitoring events on the background thread and dispatches callbacks for completed requests.
Result create(AsyncEventLoop &eventLoop)
Create the monitoring thread for an AsyncEventLoop.
Options given to AsyncEventLoop::create.
Definition Async.h:1190
ApiType apiType
Criteria to choose Async IO API.
Definition Async.h:1197
ApiType
Definition Async.h:1192
@ Automatic
Platform specific backend chooses the best API.
@ ForceUseEpoll
(Linux only) Tries to use epoll
@ ForceUseIOURing
(Linux only) Tries to use io_uring (failing if it's not found on the system)
Asynchronous I/O (files, sockets, timers, processes, fs events, threads wake-up) (see Async) AsyncEve...
Definition Async.h:1187
Time::Monotonic getLoopTime() const
Get Loop time.
Result associateExternallyCreatedFileDescriptor(FileDescriptor &outDescriptor)
Associates a previously created File Descriptor with the eventLoop.
Result wakeUpFromExternalThread()
Wake up the event loop from a thread different than the one where run() is called (and potentially bl...
Result runNoWait()
Process active requests if any, dispatching their completions, or returns immediately without blockin...
static Result removeAllAssociationsFor(SocketDescriptor &outDescriptor)
Removes association of a TCP Socket with any event loop.
void updateTime()
Updates loop time to "now".
static bool isExcludedFromActiveCount(const AsyncRequest &async)
Checks if excludeFromActiveCount() has been called on the given request.
Result associateExternallyCreatedSocket(SocketDescriptor &outDescriptor)
Associates a previously created TCP / UDP socket with the eventLoop.
Result blockingPoll(AsyncKernelEvents &kernelEvents)
Blocks until at least one event happens, ensuring forward progress, without executing completions.
void clearSequence(AsyncSequence &sequence)
Clears the sequence.
int getNumberOfSubmittedRequests() const
Obtain the total number of submitted requests.
Result submitRequests(AsyncKernelEvents &kernelEvents)
Submits all queued async requests.
void enumerateRequests(Function< void(AsyncRequest &)> enumerationCallback)
Enumerates all requests objects associated with this loop.
Result start(AsyncRequest &async)
Queues an async request request that has been correctly setup.
AsyncLoopTimeout * findEarliestLoopTimeout() const
Returns the next AsyncLoopTimeout that will be executed (shortest relativeTimeout)
void setListeners(AsyncEventLoopListeners *listeners)
Sets reference to listeners that will signal different events in loop lifetime.
Result dispatchCompletions(AsyncKernelEvents &kernelEvents)
Invokes completions for the AsyncKernelEvents collected by a call to AsyncEventLoop::blockingPoll.
void interrupt()
Interrupts the event loop even if it has active request on it.
Result wakeUpFromExternalThread(AsyncLoopWakeUp &wakeUp)
Wake up the event loop from a thread different than the one where run() is called (and potentially bl...
bool isInitialized() const
Returns true if create has been already called (successfully)
Result create(Options options=Options())
Creates the event loop kernel object.
static bool tryLoadingLiburing()
Check if liburing is loadable (only on Linux)
Result createAsyncTCPSocket(SocketFlags::AddressFamily family, SocketDescriptor &outDescriptor)
Creates an async TCP (IPV4 / IPV6) socket registered with the eventLoop.
Result close()
Closes the event loop kernel object.
Result createAsyncUDPSocket(SocketFlags::AddressFamily family, SocketDescriptor &outDescriptor)
Creates an async UCP (IPV4 / IPV6) socket registered with the eventLoop.
static Result removeAllAssociationsFor(FileDescriptor &outDescriptor)
Removes association of a File Descriptor with any event loop.
Result runOnce()
Blocks until at least one request proceeds, ensuring forward progress, dispatching all completions.
void excludeFromActiveCount(AsyncRequest &async)
Excludes the request from active handles count (to avoid it keeping event loop alive)
Result run()
Blocks until there are no more active queued requests, dispatching all completions.
void includeInActiveCount(AsyncRequest &async)
Reverses the effect of excludeFromActiveCount for the request.
int getNumberOfActiveRequests() const
Obtain the total number of active requests.
Starts an handle polling operation.
Definition Async.h:820
SC::Result start(AsyncEventLoop &eventLoop, FileDescriptor::Handle fileDescriptor)
Starts a file descriptor poll operation, monitoring its readiness with appropriate OS API.
Definition Async.h:692
Definition Async.h:698
Starts a file read operation, reading bytes from a file (or pipe).
Definition Async.h:688
FileDescriptor::Handle handle
The writeable span of memory where to data will be written.
Definition Async.h:711
Span< char > buffer
Callback called when some data has been read from the file into the buffer.
Definition Async.h:710
void setOffset(uint64_t fileOffset)
Sets the offset in bytes at which start reading.
Definition Async.h:719
uint64_t getOffset() const
The file/pipe descriptor handle to read data from.
Definition Async.h:715
Starts an asynchronous file system operation (open, close, read, write, sendFile, stat,...
Definition Async.h:994
SC::Result copyDirectory(AsyncEventLoop &eventLoop, StringSpan path, StringSpan destinationPath, FileSystemCopyFlags copyFlags=FileSystemCopyFlags())
Copies a directory from one location to another.
SC::Result removeEmptyDirectory(AsyncEventLoop &eventLoop, StringSpan path)
Removes a directory asynchronously.
SC::Result rename(AsyncEventLoop &eventLoop, StringSpan path, StringSpan newPath)
Renames a file.
SC::Result removeFile(AsyncEventLoop &eventLoop, StringSpan path)
Removes a file asynchronously.
SC::Result setThreadPool(ThreadPool &threadPool)
Sets the thread pool to use for the operation.
SC::Result read(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle, Span< char > buffer, uint64_t offset)
Reads data from a file descriptor at a given offset.
SC::Result write(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle, Span< const char > buffer, uint64_t offset)
Writes data to a file descriptor at a given offset.
SC::Result close(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle)
Closes a file descriptor asynchronously.
Function< void(Result &)> callback
Called after the operation is completed, on the event loop thread.
Definition Async.h:1018
SC::Result copyFile(AsyncEventLoop &eventLoop, StringSpan path, StringSpan destinationPath, FileSystemCopyFlags copyFlags=FileSystemCopyFlags())
Copies a file from one location to another.
SC::Result open(AsyncEventLoop &eventLoop, StringSpan path, FileOpen mode)
Opens a file asynchronously and returns its corresponding file descriptor.
Definition Async.h:763
Starts a file write operation, writing bytes to a file (or pipe).
Definition Async.h:754
uint64_t getOffset() const
Returns the last offset set with AsyncFileWrite::setOffset.
Definition Async.h:791
FileDescriptor::Handle handle
The file/pipe descriptor to write data to.
Definition Async.h:783
SC::Result start(AsyncEventLoop &eventLoop, Span< const char > data)
Sets async request members and calls AsyncEventLoop::start.
void setOffset(uint64_t fileOffset)
Sets the offset in bytes at which start writing.
Definition Async.h:795
Function< void(Result &)> callback
Callback called when descriptor is ready to be written with more data.
Definition Async.h:781
Span< Span< const char > > buffers
The read-only spans of memory where to read the data from.
Definition Async.h:787
SC::Result start(AsyncEventLoop &eventLoop, Span< Span< const char > > data)
Sets async request members and calls AsyncEventLoop::start.
Span< const char > buffer
The read-only span of memory where to read the data from.
Definition Async.h:786
bool singleBuffer
Controls if buffer or buffers will be used.
Definition Async.h:788
Allows user to supply a block of memory that will store kernel I/O events retrieved from AsyncEventLo...
Definition Async.h:1165
Span< uint8_t > eventsMemory
User supplied block of memory used to store kernel I/O events.
Definition Async.h:1166
Starts a Timeout that is invoked only once after expiration (relative) time has passed.
Definition Async.h:303
SC::Result start(AsyncEventLoop &eventLoop, Time::Milliseconds relativeTimeout)
Sets async request members and calls AsyncEventLoop::start.
Time::Absolute getExpirationTime() const
Gets computed absolute expiration time that determines when this timeout get executed.
Definition Async.h:318
Function< void(Result &)> callback
Called after given expiration time since AsyncLoopTimeout::start has passed.
Definition Async.h:313
Time::Milliseconds relativeTimeout
First timer expiration (relative) time in milliseconds.
Definition Async.h:315
Starts a wake-up operation, allowing threads to execute callbacks on loop thread.
Definition Async.h:339
SC::Result start(AsyncEventLoop &eventLoop, EventObject &eventObject)
Sets async request members and calls AsyncEventLoop::start.
Function< void(Result &)> callback
Callback called by SC::AsyncEventLoop::run after SC::AsyncLoopWakeUp::wakeUp.
Definition Async.h:352
SC::Result wakeUp(AsyncEventLoop &eventLoop)
Wakes up event loop, scheduling AsyncLoopWakeUp::callback on next AsyncEventLoop::run (or its variati...
EventObject * eventObject
Optional EventObject to let external threads wait for the callback to end.
Definition Async.h:353
Executes work in a thread pool and then invokes a callback on the event loop thread.
Definition Async.h:941
Function< void(Result &)> callback
Called to execute the work in a background threadpool thread.
Definition Async.h:952
SC::Result setThreadPool(ThreadPool &threadPool)
Sets the ThreadPool that will supply the thread to run the async work on.
Definition Async.h:376
Starts monitoring a process, notifying about its termination.
Definition Async.h:367
SC::Result start(AsyncEventLoop &eventLoop, FileDescriptor::Handle process)
Sets async request members and calls AsyncEventLoop::start.
Function< void(Result &)> callback
Called when process has exited.
Definition Async.h:392
Base class for all async requests, holding state and type.
Definition Async.h:118
bool isCancelling() const
Returns true if this request is being cancelled.
AsyncRequest(Type type)
Constructs a free async request of given type.
Definition Async.h:156
Result start(AsyncEventLoop &eventLoop)
Shortcut for AsyncEventLoop::start.
uint16_t getUserFlags() const
Gets user flags, holding some meaningful data for the caller.
Definition Async.h:185
Function< void(AsyncResult &)> * getCloseCallback()
Returns currently set close callback (if any) passed to AsyncRequest::stop.
Definition Async.h:188
bool isActive() const
Returns true if this request is active or being reactivated.
bool isFree() const
Returns true if this request is free.
void disableThreadPool()
Disables the thread-pool usage for this request.
Type getType() const
Returns request type.
Definition Async.h:176
void setUserFlags(uint16_t externalFlags)
Sets user flags, holding some meaningful data for the caller.
Definition Async.h:182
Result executeOn(AsyncTaskSequence &task, ThreadPool &pool)
Adds the request to be executed on a specific AsyncTaskSequence.
Result stop(AsyncEventLoop &eventLoop, Function< void(AsyncResult &)> *afterStopped=nullptr)
Ask to stop current async operation.
void executeOn(AsyncSequence &sequence)
Adds the request to be executed on a specific AsyncSequence.
Type
Type of async request.
Definition Async.h:137
@ SocketSend
Request is an AsyncSocketSend object.
@ SocketReceive
Request is an AsyncSocketReceive object.
@ SocketAccept
Request is an AsyncSocketAccept object.
@ FileWrite
Request is an AsyncFileWrite object.
@ SocketSendTo
Request is an SocketSendTo object.
@ FileSystemOperation
Request is an AsyncFileSystemOperation object.
@ LoopTimeout
Request is an AsyncLoopTimeout object.
@ ProcessExit
Request is an AsyncProcessExit object.
@ FileRead
Request is an AsyncFileRead object.
@ SocketReceiveFrom
Request is an SocketReceiveFrom object.
@ FilePoll
Request is an AsyncFilePoll object.
@ LoopWakeUp
Request is an AsyncLoopWakeUp object.
@ SocketConnect
Request is an AsyncSocketConnect object.
@ LoopWork
Request is an AsyncLoopWork object.
Helper holding CompletionData for a specific AsyncRequest-derived class.
Definition Async.h:288
Base class for all async results (argument of completion callbacks).
Definition Async.h:258
const SC::Result & isValid() const
Check if the returnCode of this result is valid.
Definition Async.h:269
AsyncResult(AsyncEventLoop &eventLoop, AsyncRequest &request, SC::Result &res, bool *hasBeenReactivated=nullptr)
Constructs an async result from a request and a result.
Definition Async.h:260
void reactivateRequest(bool shouldBeReactivated)
Ask the event loop to re-activate this request after it was already completed.
Execute AsyncRequests serially, by submitting the next one after the previous one is completed.
Definition Async.h:236
bool clearSequenceOnError
Do not queue next requests in the sequence when current one returns error.
Definition Async.h:241
bool clearSequenceOnCancel
Do not queue next requests in the sequence when current one is cancelled.
Definition Async.h:240
Starts a socket accept operation, obtaining a new socket from a listening socket.
Definition Async.h:470
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &socketDescriptor)
Sets async request members and calls AsyncEventLoop::start.
Starts a socket connect operation, connecting to a remote endpoint.
Definition Async.h:490
Function< void(Result &)> callback
Called after socket is finally connected to endpoint.
Definition Async.h:500
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, SocketIPAddress address)
Sets async request members and calls AsyncEventLoop::start.
Starts an unconnected socket receive from operation, receiving bytes from a remote endpoint.
Definition Async.h:653
Definition Async.h:610
SC::Result get(Span< char > &outData)
Get a Span of the actually read data.
Definition Async.h:616
Starts a socket receive operation, receiving bytes from a remote endpoint.
Definition Async.h:600
AsyncSocketReceive(Type type)
The Socket Descriptor handle to read data from.
Definition Async.h:635
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< char > data)
Sets async request members and calls AsyncEventLoop::start.
Span< char > buffer
The writeable span of memory where to data will be written.
Definition Async.h:631
Function< void(Result &)> callback
Called after data has been received.
Definition Async.h:629
Starts an unconnected socket send to operation, sending bytes to a remote endpoint.
Definition Async.h:568
Starts a socket send operation, sending bytes to a remote endpoint.
Definition Async.h:524
Function< void(Result &)> callback
Called when socket is ready to send more data.
Definition Async.h:539
bool singleBuffer
Controls if buffer or buffers will be used.
Definition Async.h:545
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< const char > data)
Sets async request members and calls AsyncEventLoop::start.
Span< Span< const char > > buffers
Spans of bytes to send (singleBuffer == false)
Definition Async.h:544
Span< const char > buffer
Span of bytes to send (singleBuffer == true)
Definition Async.h:543
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< Span< const char > > data)
Sets async request members and calls AsyncEventLoop::start.
SocketDescriptor::Handle handle
The socket to send data to.
Definition Async.h:541
An AsyncSequence using a SC::ThreadPool to execute one or more SC::AsyncRequest in a background threa...
Definition Async.h:922
Atomic variables (only for int and bool for now).
Definition Atomic.h:97
An automatically reset event object to synchronize two threads.
Definition Threading.h:174
Open, read and write to/from a file descriptor (like a file or pipe).
Definition File.h:76
Options used to open a file descriptor.
Definition File.h:48
A structure to describe copy flags.
Definition FileSystem.h:26
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
Hides implementation details from public headers (static PIMPL).
Definition OpaqueObject.h:77
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:12
Low-level OS socket handle.
Definition Socket.h:164
AddressFamily
Sets the address family of an IP Address (IPv4 or IPV6)
Definition Socket.h:61
@ AddressFamilyIPV4
IP Address is IPV4.
Definition Socket.h:62
Native representation of an IP Address.
Definition Socket.h:105
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29
constexpr bool sliceStartLength(SizeType offsetInElements, SizeType lengthInElements, Span &destination) const
Creates another Span, starting at an offset in elements from current Span of specified length.
Definition Span.h:159
An read-only view over a string (to avoid including Strings library when parsing is not needed).
Definition StringSpan.h:31
A small task containing a function to execute that can be queued in the thread pool.
Definition ThreadPool.h:19
Simple thread pool that executes tasks in a fixed number of worker threads.
Definition ThreadPool.h:41
A native OS thread.
Definition Threading.h:118
Absolute time as realtime or monotonically increasing clock.
Definition Time.h:114
Type-safe wrapper of uint64 used to represent milliseconds.
Definition Time.h:44
Represent monotonically increasing time (use Monotonic::now for current time)
Definition Time.h:180