Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
Async.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Async/Internal/IntrusiveDoubleLinkedList.h"
6#include "../File/File.h"
7#include "../FileSystem/FileSystem.h"
8#include "../Foundation/Function.h"
9#include "../Foundation/OpaqueObject.h"
10#include "../Socket/Socket.h"
11#include "../Threading/Atomic.h"
12#include "../Threading/ThreadPool.h"
13
14namespace SC
15{
16struct ThreadPool;
17struct ThreadPoolTask;
18struct EventObject;
19} // namespace SC
44
47namespace SC
48{
49struct AsyncEventLoop;
50struct AsyncResult;
51struct AsyncSequence;
52struct AsyncTaskSequence;
53
54namespace detail
55{
56struct AsyncWinOverlapped;
57struct AsyncWinOverlappedDefinition
58{
59 static constexpr int Windows = sizeof(void*) * 4 + sizeof(uint64_t);
60 static constexpr size_t Alignment = alignof(void*);
61
62 using Object = AsyncWinOverlapped;
63};
64using WinOverlappedOpaque = OpaqueObject<AsyncWinOverlappedDefinition>;
65
66struct AsyncWinWaitDefinition
67{
68 using Handle = FileDescriptor::Handle; // fd
69 static constexpr Handle Invalid = FileDescriptor::Invalid; // invalid fd
70
71 static Result releaseHandle(Handle& waitHandle);
72};
73struct SC_COMPILER_EXPORT WinWaitHandle : public UniqueHandle<AsyncWinWaitDefinition>
74{
75};
76} // namespace detail
77
116struct SC_COMPILER_EXPORT AsyncRequest
117{
118 AsyncRequest* next = nullptr;
119 AsyncRequest* prev = nullptr;
120
121 void setDebugName(const char* newDebugName);
122
124 void executeOn(AsyncSequence& sequence);
125
130
133
135 enum class Type : uint8_t
136 {
137 LoopTimeout,
138 LoopWakeUp,
139 LoopWork,
140 ProcessExit,
141 SocketAccept,
142 SocketConnect,
143 SocketSend,
144 SocketSendTo,
145 SocketReceive,
146 SocketReceiveFrom,
147 FileRead,
148 FileWrite,
149 FilePoll,
150 FileSystemOperation,
151 };
152
155 AsyncRequest(Type type) : state(State::Free), type(type), flags(0), unused(0), userFlags(0) {}
156
163 Result stop(AsyncEventLoop& eventLoop, Function<void(AsyncResult&)>* afterStopped = nullptr);
164
166 [[nodiscard]] bool isFree() const;
167
169 [[nodiscard]] bool isCancelling() const;
170
172 [[nodiscard]] bool isActive() const;
173
175 [[nodiscard]] Type getType() const { return type; }
176
179
181 void setUserFlags(uint16_t externalFlags) { userFlags = externalFlags; }
182
184 uint16_t getUserFlags() const { return userFlags; }
185
187 [[nodiscard]] Function<void(AsyncResult&)>* getCloseCallback() { return closeCallback; }
188
189 [[nodiscard]] const Function<void(AsyncResult&)>* getCloseCallback() const { return closeCallback; }
190
191 protected:
192 Result checkState();
193
194 void queueSubmission(AsyncEventLoop& eventLoop);
195
196 AsyncSequence* sequence = nullptr;
197
198 AsyncTaskSequence* getTask();
199
200 private:
201 Function<void(AsyncResult&)>* closeCallback = nullptr;
202
203 friend struct AsyncEventLoop;
204 friend struct AsyncResult;
205
206 void markAsFree();
207
208 [[nodiscard]] static const char* TypeToString(Type type);
209 enum class State : uint8_t
210 {
211 Free, // not in any queue, this can be started with an async.start(...)
212 Setup, // when in submission queue waiting to be setup (after an async.start(...))
213 Submitting, // when in submission queue waiting to be activated or re-activated
214 Active, // when monitored by OS syscall or in activeLoopWakeUps / activeTimeouts queues
215 Reactivate, // when flagged for reactivation inside the callback (after a result.reactivateRequest(true))
216 Cancelling, // when in cancellation queue waiting for a cancelAsync (on active async)
217 };
218
219#if SC_ASYNC_ENABLE_LOG
220 const char* debugName = "None";
221#endif
222 State state; // 1 byte
223 Type type; // 1 byte
224 int16_t flags; // 2 bytes
225
226 uint16_t unused; // 2 bytes
227 uint16_t userFlags; // 2 bytes
228};
229
234struct SC_COMPILER_EXPORT AsyncSequence
235{
236 AsyncSequence* next = nullptr;
237 AsyncSequence* prev = nullptr;
238
239 bool clearSequenceOnCancel = true;
240 bool clearSequenceOnError = true;
241 private:
242 friend struct AsyncEventLoop;
243 bool runningAsync = false; // true if an async from this sequence is being run
244 bool tracked = false;
245
246 IntrusiveDoubleLinkedList<AsyncRequest> submissions;
247};
248
250struct SC_COMPILER_EXPORT AsyncCompletionData{};
251
254struct SC_COMPILER_EXPORT AsyncResult
255{
257 AsyncResult(AsyncEventLoop& eventLoop, AsyncRequest& request, SC::Result& res, bool* hasBeenReactivated = nullptr)
258 : eventLoop(eventLoop), async(request), hasBeenReactivated(hasBeenReactivated), returnCode(res)
259 {}
260
263 void reactivateRequest(bool shouldBeReactivated);
264
266 [[nodiscard]] const SC::Result& isValid() const { return returnCode; }
267
268 AsyncEventLoop& eventLoop;
269 AsyncRequest& async;
270
271 protected:
272 friend struct AsyncEventLoop;
273
274 bool shouldCallCallback = true;
275 bool* hasBeenReactivated = nullptr;
276
277 SC::Result& returnCode;
278};
279
283template <typename T, typename C>
285{
286 T& getAsync() { return static_cast<T&>(AsyncResult::async); }
287 const T& getAsync() const { return static_cast<const T&>(AsyncResult::async); }
288
290
291 C completionData;
292 int32_t eventIndex = 0;
293};
294
299struct SC_COMPILER_EXPORT AsyncLoopTimeout : public AsyncRequest
300{
301 AsyncLoopTimeout() : AsyncRequest(Type::LoopTimeout) {}
302
305 using AsyncRequest::start;
306
308 SC::Result start(AsyncEventLoop& eventLoop, TimeMs relativeTimeout);
309
311
313
315 TimeMs getExpirationTime() const { return expirationTime; }
316
317 private:
318 SC::Result validate(AsyncEventLoop&);
319 friend struct AsyncEventLoop;
320 TimeMs expirationTime;
321};
322
335struct SC_COMPILER_EXPORT AsyncLoopWakeUp : public AsyncRequest
336{
337 AsyncLoopWakeUp() : AsyncRequest(Type::LoopWakeUp) {}
338
341 using AsyncRequest::start;
342
344 SC::Result start(AsyncEventLoop& eventLoop, EventObject& eventObject);
345
348
350 EventObject* eventObject = nullptr;
351
352 private:
353 friend struct AsyncEventLoop;
354 SC::Result validate(AsyncEventLoop&);
355
356 Atomic<bool> pending = false;
357};
358
363struct SC_COMPILER_EXPORT AsyncProcessExit : public AsyncRequest
364{
365 AsyncProcessExit() : AsyncRequest(Type::ProcessExit) {}
366
368 {
369 int exitStatus;
370 };
371
372 struct Result : public AsyncResultOf<AsyncProcessExit, CompletionData>
373 {
374 using AsyncResultOf<AsyncProcessExit, CompletionData>::AsyncResultOf;
375
376 SC::Result get(int& status)
377 {
378 status = completionData.exitStatus;
379 return returnCode;
380 }
381 };
382 using AsyncRequest::start;
383
387 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle process);
388
390
391 private:
392 friend struct AsyncEventLoop;
393 SC::Result validate(AsyncEventLoop&);
394
395 FileDescriptor::Handle handle = FileDescriptor::Invalid;
396#if SC_PLATFORM_WINDOWS
398 detail::WinWaitHandle waitHandle;
399 AsyncEventLoop* eventLoop = nullptr;
400#elif SC_PLATFORM_LINUX
401 FileDescriptor pidFd;
402#endif
403};
404
405struct AsyncSocketAccept;
406namespace detail
407{
410struct SC_COMPILER_EXPORT AsyncSocketAcceptData
411{
412#if SC_PLATFORM_WINDOWS
413 void (*pAcceptEx)() = nullptr;
415 SocketDescriptor clientSocket;
416 uint8_t acceptBuffer[288] = {0};
417#elif SC_PLATFORM_LINUX
418 AlignedStorage<28> sockAddrHandle;
419 uint32_t sockAddrLen;
420#endif
421};
422
424struct SC_COMPILER_EXPORT AsyncSocketAcceptBase : public AsyncRequest
425{
426 AsyncSocketAcceptBase() : AsyncRequest(Type::SocketAccept) {}
427
428 struct CompletionData : public AsyncCompletionData
429 {
430 SocketDescriptor acceptedClient;
431 };
432
433 struct Result : public AsyncResultOf<AsyncSocketAccept, CompletionData>
434 {
435 using AsyncResultOf<AsyncSocketAccept, CompletionData>::AsyncResultOf;
436
437 SC::Result moveTo(SocketDescriptor& client)
438 {
439 SC_TRY(returnCode);
440 return client.assign(move(completionData.acceptedClient));
441 }
442 };
443 using AsyncRequest::start;
444
446 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& socketDescriptor, AsyncSocketAcceptData& data);
447 SC::Result validate(AsyncEventLoop&);
448
449 Function<void(Result&)> callback;
450 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
451 SocketFlags::AddressFamily addressFamily = SocketFlags::AddressFamilyIPV4;
452 AsyncSocketAcceptData* acceptData = nullptr;
453};
454
455} // namespace detail
456
466struct SC_COMPILER_EXPORT AsyncSocketAccept : public detail::AsyncSocketAcceptBase
467{
468 AsyncSocketAccept() { AsyncSocketAcceptBase::acceptData = &data; }
469 using AsyncSocketAcceptBase::start;
470
472 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& socketDescriptor);
473
474 private:
475 detail::AsyncSocketAcceptData data;
476};
477
486struct SC_COMPILER_EXPORT AsyncSocketConnect : public AsyncRequest
487{
488 AsyncSocketConnect() : AsyncRequest(Type::SocketConnect) {}
489
492 using AsyncRequest::start;
493
495 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress address);
496
498
499 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
500 SocketIPAddress ipAddress;
501
502 private:
503 friend struct AsyncEventLoop;
504 SC::Result validate(AsyncEventLoop&);
505
506#if SC_PLATFORM_WINDOWS
507 void (*pConnectEx)() = nullptr;
509#endif
510};
511
520struct SC_COMPILER_EXPORT AsyncSocketSend : public AsyncRequest
521{
522 AsyncSocketSend() : AsyncRequest(Type::SocketSend) {}
524 {
525 size_t numBytes = 0;
526 };
528 using AsyncRequest::start;
529
532
535
537
538 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
539
542 bool singleBuffer = true;
543
544 protected:
545 AsyncSocketSend(Type type) : AsyncRequest(type) {}
546 friend struct AsyncEventLoop;
547 SC::Result validate(AsyncEventLoop&);
548
549 size_t totalBytesWritten = 0;
550#if SC_PLATFORM_WINDOWS
552#endif
553};
554
564struct SC_COMPILER_EXPORT AsyncSocketSendTo : public AsyncSocketSend
565{
566 AsyncSocketSendTo() : AsyncSocketSend(Type::SocketSendTo) {}
567
568 SocketIPAddress address;
569
570 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress ipAddress,
571 Span<const char> data);
572
573 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress ipAddress,
574 Span<Span<const char>> data);
575
576 private:
577 using AsyncSocketSend::start;
578 friend struct AsyncEventLoop;
579 SC::Result validate(AsyncEventLoop&);
580#if SC_PLATFORM_LINUX
581 AlignedStorage<56> typeErasedMsgHdr;
582#endif
583};
584
596struct SC_COMPILER_EXPORT AsyncSocketReceive : public AsyncRequest
597{
598 AsyncSocketReceive() : AsyncRequest(Type::SocketReceive) {}
599
601 {
602 size_t numBytes = 0;
603 bool disconnected = false;
604 };
605
606 struct Result : public AsyncResultOf<AsyncSocketReceive, CompletionData>
607 {
608 using AsyncResultOf<AsyncSocketReceive, CompletionData>::AsyncResultOf;
609
614 {
615 SC_TRY(getAsync().buffer.sliceStartLength(0, completionData.numBytes, outData));
616 return returnCode;
617 }
618
619 SocketIPAddress getSourceAddress() const;
620 };
621 using AsyncRequest::start;
622
624 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, Span<char> data);
625
627
629 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
630
631 protected:
633 friend struct AsyncEventLoop;
634 SC::Result validate(AsyncEventLoop&);
635#if SC_PLATFORM_WINDOWS
637#endif
638};
639
649struct SC_COMPILER_EXPORT AsyncSocketReceiveFrom : public AsyncSocketReceive
650{
651 AsyncSocketReceiveFrom() : AsyncSocketReceive(Type::SocketReceiveFrom) {}
652 using AsyncSocketReceive::start;
653
654 private:
655 SocketIPAddress address;
656 friend struct AsyncSocketReceive;
657 friend struct AsyncEventLoop;
658#if SC_PLATFORM_LINUX
659 AlignedStorage<56> typeErasedMsgHdr;
660#endif
661};
662
684struct SC_COMPILER_EXPORT AsyncFileRead : public AsyncRequest
685{
686 AsyncFileRead() : AsyncRequest(Type::FileRead) { handle = FileDescriptor::Invalid; }
687
689 {
690 size_t numBytes = 0;
691 bool endOfFile = false;
692 };
693
694 struct Result : public AsyncResultOf<AsyncFileRead, CompletionData>
695 {
696 using AsyncResultOf<AsyncFileRead, CompletionData>::AsyncResultOf;
697
698 SC::Result get(Span<char>& data)
699 {
700 SC_TRY(getAsync().buffer.sliceStartLength(0, completionData.numBytes, data));
701 return returnCode;
702 }
703 };
704 using AsyncRequest::start;
705
706 Function<void(Result&)> callback;
708 FileDescriptor::Handle handle;
710
712 uint64_t getOffset() const { return offset; }
713
716 void setOffset(uint64_t fileOffset)
717 {
718 useOffset = true;
719 offset = fileOffset;
720 }
721
722 private:
723 friend struct AsyncEventLoop;
724 SC::Result validate(AsyncEventLoop&);
725
726 bool useOffset = false;
727 bool endedSync = false;
728
729 uint64_t offset = 0;
730#if SC_PLATFORM_WINDOWS
731 uint64_t readCursor = 0;
733#endif
734};
735
753struct SC_COMPILER_EXPORT AsyncFileWrite : public AsyncRequest
754{
755 AsyncFileWrite() : AsyncRequest(Type::FileWrite) { handle = FileDescriptor::Invalid; }
756
758 {
759 size_t numBytes = 0;
760 };
761
762 struct Result : public AsyncResultOf<AsyncFileWrite, CompletionData>
763 {
764 using AsyncResultOf<AsyncFileWrite, CompletionData>::AsyncResultOf;
765
766 SC::Result get(size_t& writtenSizeInBytes)
767 {
768 writtenSizeInBytes = completionData.numBytes;
769 return returnCode;
770 }
771 };
772
773 using AsyncRequest::start;
774
777
780
782
783 FileDescriptor::Handle handle;
785
788 bool singleBuffer = true;
789
791 uint64_t getOffset() const { return offset; }
792
795 void setOffset(uint64_t fileOffset)
796 {
797 useOffset = true;
798 offset = fileOffset;
799 }
800
801 private:
802 friend struct AsyncEventLoop;
803 SC::Result validate(AsyncEventLoop&);
804
805#if SC_PLATFORM_WINDOWS
806 bool endedSync = false;
807#else
808 bool isWatchable = false;
809#endif
810 bool useOffset = false;
811 uint64_t offset = 0xffffffffffffffff;
812
813 size_t totalBytesWritten = 0;
814#if SC_PLATFORM_WINDOWS
816#endif
817};
818
823struct SC_COMPILER_EXPORT AsyncFilePoll : public AsyncRequest
824{
825 AsyncFilePoll() : AsyncRequest(Type::FilePoll) {}
826
829
831 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle fileDescriptor);
832
833#if SC_PLATFORM_WINDOWS
834 [[nodiscard]] void* getOverlappedPtr();
835#endif
836
837 Function<void(Result&)> callback;
838
839 private:
840 friend struct AsyncEventLoop;
841 SC::Result validate(AsyncEventLoop&);
842
843 FileDescriptor::Handle handle = FileDescriptor::Invalid;
844#if SC_PLATFORM_WINDOWS
846#endif
847};
848
849// forward declared because it must be defined after AsyncTaskSequence
850struct AsyncLoopWork;
852
854{
855 FileDescriptor::Handle handle = FileDescriptor::Invalid; // for open
856
857 int code = 0; // for open/close
858 size_t numBytes = 0; // for read
859};
860
861namespace detail
862{
863// A simple hand-made variant of all completion types
864struct SC_COMPILER_EXPORT AsyncCompletionVariant
865{
866 AsyncCompletionVariant() {}
867 ~AsyncCompletionVariant() { destroy(); }
868
869 AsyncCompletionVariant(const AsyncCompletionVariant&) = delete;
870 AsyncCompletionVariant(AsyncCompletionVariant&&) = delete;
871 AsyncCompletionVariant& operator=(const AsyncCompletionVariant&) = delete;
872 AsyncCompletionVariant& operator=(AsyncCompletionVariant&&) = delete;
873
874 bool inited = false;
875
876 AsyncRequest::Type type;
877 union
878 {
879 AsyncCompletionData completionDataLoopWork; // Defined after AsyncCompletionVariant / AsyncTaskSequence
880 AsyncLoopTimeout::CompletionData completionDataLoopTimeout;
881 AsyncLoopWakeUp::CompletionData completionDataLoopWakeUp;
882 AsyncProcessExit::CompletionData completionDataProcessExit;
883 AsyncSocketAccept::CompletionData completionDataSocketAccept;
884 AsyncSocketConnect::CompletionData completionDataSocketConnect;
885 AsyncSocketSend::CompletionData completionDataSocketSend;
886 AsyncSocketSendTo::CompletionData completionDataSocketSendTo;
887 AsyncSocketReceive::CompletionData completionDataSocketReceive;
888 AsyncSocketReceiveFrom::CompletionData completionDataSocketReceiveFrom;
889 AsyncFileRead::CompletionData completionDataFileRead;
890 AsyncFileWrite::CompletionData completionDataFileWrite;
891 AsyncFilePoll::CompletionData completionDataFilePoll;
892
893 AsyncFileSystemOperationCompletionData completionDataFileSystemOperation;
894 };
895
896 auto& getCompletion(AsyncLoopWork&) { return completionDataLoopWork; }
897 auto& getCompletion(AsyncLoopTimeout&) { return completionDataLoopTimeout; }
898 auto& getCompletion(AsyncLoopWakeUp&) { return completionDataLoopWakeUp; }
899 auto& getCompletion(AsyncProcessExit&) { return completionDataProcessExit; }
900 auto& getCompletion(AsyncSocketAccept&) { return completionDataSocketAccept; }
901 auto& getCompletion(AsyncSocketConnect&) { return completionDataSocketConnect; }
902 auto& getCompletion(AsyncSocketSend&) { return completionDataSocketSend; }
903 auto& getCompletion(AsyncSocketReceive&) { return completionDataSocketReceive; }
904 auto& getCompletion(AsyncFileRead&) { return completionDataFileRead; }
905 auto& getCompletion(AsyncFileWrite&) { return completionDataFileWrite; }
906 auto& getCompletion(AsyncFilePoll&) { return completionDataFilePoll; }
907 auto& getCompletion(AsyncFileSystemOperation&) { return completionDataFileSystemOperation; }
908
909 template <typename T>
910 auto& construct(T& t)
911 {
912 destroy();
913 placementNew(getCompletion(t));
914 inited = true;
915 type = t.getType();
916 return getCompletion(t);
917 }
918 void destroy();
919};
920} // namespace detail
921
925struct SC_COMPILER_EXPORT AsyncTaskSequence : public AsyncSequence
926{
927 protected:
928 ThreadPoolTask task;
929 ThreadPool* threadPool = nullptr;
930
931 friend struct AsyncEventLoop;
932 friend struct AsyncRequest;
933
934 detail::AsyncCompletionVariant completion;
935
936 SC::Result returnCode = SC::Result(true);
937};
938
944struct SC_COMPILER_EXPORT AsyncLoopWork : public AsyncRequest
945{
946 AsyncLoopWork() : AsyncRequest(Type::LoopWork) {}
947
950
954
955 Function<SC::Result()> work;
957
958 private:
959 friend struct AsyncEventLoop;
960 SC::Result validate(AsyncEventLoop&);
962};
963
997struct SC_COMPILER_EXPORT AsyncFileSystemOperation : public AsyncRequest
998{
999 AsyncFileSystemOperation() : AsyncRequest(Type::FileSystemOperation) {}
1000 ~AsyncFileSystemOperation() { destroy(); }
1001#ifdef CopyFile
1002#undef CopyFile
1003#endif
1004#ifdef RemoveDirectory
1005#undef RemoveDirectory
1006#endif
1007 enum class Operation
1008 {
1009 None = 0,
1010 Open,
1011 Close,
1012 Read,
1013 Write,
1014 CopyFile,
1015 CopyDirectory,
1016 Rename,
1017 RemoveDirectory,
1018 RemoveFile,
1019 };
1020
1023
1026
1028
1035
1040 SC::Result close(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle);
1041
1048 SC::Result read(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle, Span<char> buffer, uint64_t offset);
1049
1056 SC::Result write(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle, Span<const char> buffer,
1057 uint64_t offset);
1058
1065 SC::Result copyFile(AsyncEventLoop& eventLoop, StringSpan path, StringSpan destinationPath,
1067
1076
1083
1090
1096
1097 private:
1098 friend struct AsyncEventLoop;
1099 Operation operation = Operation::None;
1100 AsyncLoopWork loopWork;
1101 CompletionData completionData;
1102
1103 void onOperationCompleted(AsyncLoopWork::Result& res);
1104
1105 struct FileDescriptorData
1106 {
1107 FileDescriptor::Handle handle;
1108 };
1109
1110 struct OpenData
1111 {
1112 StringSpan path;
1113 FileOpen mode;
1114 };
1115
1116 struct ReadData
1117 {
1118 FileDescriptor::Handle handle;
1119 Span<char> buffer;
1120 uint64_t offset;
1121 };
1122
1123 struct WriteData
1124 {
1125 FileDescriptor::Handle handle;
1126 Span<const char> buffer;
1127 uint64_t offset;
1128 };
1129
1130 struct CopyFileData
1131 {
1132 StringSpan path;
1133 StringSpan destinationPath;
1134 FileSystemCopyFlags copyFlags;
1135 };
1136
1137 using CopyDirectoryData = CopyFileData;
1138
1139 using CloseData = FileDescriptorData;
1140
1141 struct RenameData
1142 {
1143 StringSpan path;
1144 StringSpan newPath;
1145 };
1146
1147 struct RemoveData
1148 {
1149 StringSpan path;
1150 };
1151
1152 union
1153 {
1154 OpenData openData;
1155 CloseData closeData;
1156 ReadData readData;
1157 WriteData writeData;
1158 CopyFileData copyFileData;
1159 CopyDirectoryData copyDirectoryData;
1160 RenameData renameData;
1161 RemoveData removeData;
1162 };
1163
1164 void destroy();
1165
1166 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle fileDescriptor);
1167 SC::Result validate(AsyncEventLoop&);
1168};
1169
1173struct SC_COMPILER_EXPORT AsyncKernelEvents
1174{
1176
1177 private:
1178 int numberOfEvents = 0;
1179 friend struct AsyncEventLoop;
1180};
1181
1183struct SC_COMPILER_EXPORT AsyncEventLoopListeners
1184{
1185 Function<void(AsyncEventLoop&)> beforeBlockingPoll;
1186 Function<void(AsyncEventLoop&)> afterBlockingPoll;
1187};
1188
1195struct SC_COMPILER_EXPORT AsyncEventLoop
1196{
1198 struct Options
1199 {
1200 enum class ApiType : uint8_t
1201 {
1202 Automatic = 0,
1203 ForceUseIOURing,
1204 ForceUseEpoll,
1205 };
1207
1208 Options() { apiType = ApiType::Automatic; }
1209 };
1210
1212
1213 AsyncEventLoop(const AsyncEventLoop&) = delete;
1214 AsyncEventLoop(AsyncEventLoop&&) = delete;
1215 AsyncEventLoop& operator=(AsyncEventLoop&&) = delete;
1216 AsyncEventLoop& operator=(const AsyncEventLoop&) = delete;
1217
1220
1223
1227
1230
1232 [[nodiscard]] bool isInitialized() const;
1233
1235 [[nodiscard]] bool needsThreadPoolForFileOperations() const;
1236
1243
1254
1261
1267
1287
1294
1298
1301
1304
1307
1310
1313
1316
1319
1322
1324 [[nodiscard]] TimeMs getLoopTime() const;
1325
1327 [[nodiscard]] int getNumberOfActiveRequests() const;
1328
1330 [[nodiscard]] int getNumberOfSubmittedRequests() const;
1331
1335
1338
1341
1343 void enumerateRequests(Function<void(AsyncRequest&)> enumerationCallback);
1344
1348
1350 [[nodiscard]] static bool isExcludedFromActiveCount(const AsyncRequest& async);
1351
1354 [[nodiscard]] static bool tryLoadingLiburing();
1355
1358
1359 struct Internal;
1360
1361 public:
1362 struct SC_COMPILER_EXPORT InternalDefinition
1363 {
1364 static constexpr int Windows = 520;
1365 static constexpr int Apple = 512;
1366 static constexpr int Linux = 720;
1367 static constexpr int Default = Linux;
1368
1369 static constexpr size_t Alignment = 8;
1370
1371 using Object = Internal;
1372 };
1373
1375
1376 private:
1377 InternalOpaque internalOpaque;
1378 Internal& internal;
1379
1380 friend struct AsyncRequest;
1381 friend struct AsyncFileWrite;
1382 friend struct AsyncFileRead;
1383 friend struct AsyncFileSystemOperation;
1384 friend struct AsyncResult;
1385};
1386
1390struct SC_COMPILER_EXPORT AsyncEventLoopMonitor
1391{
1393
1397
1400
1408
1414
1415 private:
1416#if SC_COMPILER_MSVC
1417#pragma warning(push)
1418#pragma warning(disable : 4324) // useless warning on 32 bit... (structure was padded due to __declspec(align()))
1419#endif
1420 alignas(uint64_t) uint8_t eventsMemory[8 * 1024]; // 8 Kb of kernel events
1421#if SC_COMPILER_MSVC
1422#pragma warning(pop)
1423#endif
1424
1425 AsyncKernelEvents asyncKernelEvents;
1426 AsyncEventLoop* eventLoop = nullptr;
1427 AsyncLoopWakeUp eventLoopWakeUp;
1428
1429 Thread eventLoopThread;
1430 EventObject eventObjectEnterBlockingMode;
1431 EventObject eventObjectExitBlockingMode;
1432
1433 Atomic<bool> finished = false;
1434 Atomic<bool> needsWakeUp = true;
1435
1436 bool wakeUpHasBeenCalled = false;
1437
1438 Result monitoringLoopThread(Thread& thread);
1439};
1440
1441} // namespace SC
unsigned short uint16_t
Platform independent (2) bytes unsigned int.
Definition PrimitiveTypes.h:37
constexpr T && move(T &value)
Converts an lvalue to an rvalue reference.
Definition Compiler.h:264
unsigned char uint8_t
Platform independent (1) byte unsigned int.
Definition PrimitiveTypes.h:36
unsigned long long uint64_t
Platform independent (8) bytes unsigned int.
Definition PrimitiveTypes.h:42
short int16_t
Platform independent (2) bytes signed int.
Definition PrimitiveTypes.h:45
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
#define SC_TRY(expression)
Checks the value of the given expression and if failed, returns this value to caller.
Definition Result.h:49
struct SC_COMPILER_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Empty base struct for all AsyncRequest-derived CompletionData (internal) structs.
Definition Async.h:250
Allow library user to provide callbacks signaling different phases of async event loop cycle.
Definition Async.h:1184
Monitors Async I/O events from a background thread using a blocking kernel function (no CPU usage on ...
Definition Async.h:1391
Function< void(void)> onNewEventsAvailable
Informs to call dispatchCompletions on GUI Event Loop.
Definition Async.h:1392
Result startMonitoring()
Queue all async requests submissions and start monitoring loop events on a background thread.
Result close()
Stop monitoring the AsyncEventLoop, disposing all resources.
Result stopMonitoringAndDispatchCompletions()
Stops monitoring events on the background thread and dispatches callbacks for completed requests.
Result create(AsyncEventLoop &eventLoop)
Create the monitoring thread for an AsyncEventLoop.
Options given to AsyncEventLoop::create.
Definition Async.h:1199
ApiType apiType
Criteria to choose Async IO API.
Definition Async.h:1206
ApiType
Definition Async.h:1201
Asynchronous I/O (files, sockets, timers, processes, fs events, threads wake-up) (see Async) AsyncEve...
Definition Async.h:1196
bool needsThreadPoolForFileOperations() const
Returns true if backend needs a thread pool for non-blocking fs operations (anything but io_uring bas...
Result associateExternallyCreatedFileDescriptor(FileDescriptor &outDescriptor)
Associates a previously created File Descriptor with the eventLoop.
Result wakeUpFromExternalThread()
Wake up the event loop from a thread different than the one where run() is called (and potentially bl...
Result runNoWait()
Process active requests if any, dispatching their completions, or returns immediately without blockin...
static Result removeAllAssociationsFor(SocketDescriptor &outDescriptor)
Removes association of a TCP Socket with any event loop.
void updateTime()
Updates loop time to "now".
static bool isExcludedFromActiveCount(const AsyncRequest &async)
Checks if excludeFromActiveCount() has been called on the given request.
Result associateExternallyCreatedSocket(SocketDescriptor &outDescriptor)
Associates a previously created TCP / UDP socket with the eventLoop.
Result blockingPoll(AsyncKernelEvents &kernelEvents)
Blocks until at least one event happens, ensuring forward progress, without executing completions.
void clearSequence(AsyncSequence &sequence)
Clears the sequence.
int getNumberOfSubmittedRequests() const
Obtain the total number of submitted requests.
Result submitRequests(AsyncKernelEvents &kernelEvents)
Submits all queued async requests.
void enumerateRequests(Function< void(AsyncRequest &)> enumerationCallback)
Enumerates all requests objects associated with this loop.
TimeMs getLoopTime() const
Get Loop time (monotonic)
Result start(AsyncRequest &async)
Queues an async request request that has been correctly setup.
AsyncLoopTimeout * findEarliestLoopTimeout() const
Returns the next AsyncLoopTimeout that will be executed (shortest relativeTimeout)
void setListeners(AsyncEventLoopListeners *listeners)
Sets reference to listeners that will signal different events in loop lifetime.
Result dispatchCompletions(AsyncKernelEvents &kernelEvents)
Invokes completions for the AsyncKernelEvents collected by a call to AsyncEventLoop::blockingPoll.
void interrupt()
Interrupts the event loop even if it has active request on it.
Result wakeUpFromExternalThread(AsyncLoopWakeUp &wakeUp)
Wake up the event loop from a thread different than the one where run() is called (and potentially bl...
bool isInitialized() const
Returns true if create has been already called (successfully)
Result create(Options options=Options())
Creates the event loop kernel object.
static bool tryLoadingLiburing()
Check if liburing is loadable (only on Linux)
Result createAsyncTCPSocket(SocketFlags::AddressFamily family, SocketDescriptor &outDescriptor)
Creates an async TCP (IPV4 / IPV6) socket registered with the eventLoop.
Result close()
Closes the event loop kernel object.
Result createAsyncUDPSocket(SocketFlags::AddressFamily family, SocketDescriptor &outDescriptor)
Creates an async UCP (IPV4 / IPV6) socket registered with the eventLoop.
static Result removeAllAssociationsFor(FileDescriptor &outDescriptor)
Removes association of a File Descriptor with any event loop.
Result runOnce()
Blocks until at least one request proceeds, ensuring forward progress, dispatching all completions.
void excludeFromActiveCount(AsyncRequest &async)
Excludes the request from active handles count (to avoid it keeping event loop alive)
Result run()
Blocks until there are no more active queued requests, dispatching all completions.
void includeInActiveCount(AsyncRequest &async)
Reverses the effect of excludeFromActiveCount for the request.
int getNumberOfActiveRequests() const
Obtain the total number of active requests.
Starts an handle polling operation.
Definition Async.h:824
SC::Result start(AsyncEventLoop &eventLoop, FileDescriptor::Handle fileDescriptor)
Starts a file descriptor poll operation, monitoring its readiness with appropriate OS API.
Definition Async.h:689
Definition Async.h:695
Starts a file read operation, reading bytes from a file (or pipe).
Definition Async.h:685
FileDescriptor::Handle handle
The writeable span of memory where to data will be written.
Definition Async.h:708
Span< char > buffer
Callback called when some data has been read from the file into the buffer.
Definition Async.h:707
void setOffset(uint64_t fileOffset)
Sets the offset in bytes at which start reading.
Definition Async.h:716
uint64_t getOffset() const
The file/pipe descriptor handle to read data from.
Definition Async.h:712
Starts an asynchronous file system operation (open, close, read, write, sendFile, stat,...
Definition Async.h:998
SC::Result copyDirectory(AsyncEventLoop &eventLoop, StringSpan path, StringSpan destinationPath, FileSystemCopyFlags copyFlags=FileSystemCopyFlags())
Copies a directory from one location to another.
SC::Result removeEmptyDirectory(AsyncEventLoop &eventLoop, StringSpan path)
Removes a directory asynchronously.
SC::Result rename(AsyncEventLoop &eventLoop, StringSpan path, StringSpan newPath)
Renames a file.
SC::Result removeFile(AsyncEventLoop &eventLoop, StringSpan path)
Removes a file asynchronously.
SC::Result setThreadPool(ThreadPool &threadPool)
Sets the thread pool to use for the operation.
SC::Result read(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle, Span< char > buffer, uint64_t offset)
Reads data from a file descriptor at a given offset.
SC::Result write(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle, Span< const char > buffer, uint64_t offset)
Writes data to a file descriptor at a given offset.
SC::Result close(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle)
Closes a file descriptor asynchronously.
Function< void(Result &)> callback
Called after the operation is completed, on the event loop thread.
Definition Async.h:1027
SC::Result copyFile(AsyncEventLoop &eventLoop, StringSpan path, StringSpan destinationPath, FileSystemCopyFlags copyFlags=FileSystemCopyFlags())
Copies a file from one location to another.
SC::Result open(AsyncEventLoop &eventLoop, StringSpan path, FileOpen mode)
Opens a file asynchronously and returns its corresponding file descriptor.
Definition Async.h:763
Starts a file write operation, writing bytes to a file (or pipe).
Definition Async.h:754
uint64_t getOffset() const
Returns the last offset set with AsyncFileWrite::setOffset.
Definition Async.h:791
FileDescriptor::Handle handle
The file/pipe descriptor to write data to.
Definition Async.h:783
SC::Result start(AsyncEventLoop &eventLoop, Span< const char > data)
Sets async request members and calls AsyncEventLoop::start.
void setOffset(uint64_t fileOffset)
Sets the offset in bytes at which start writing.
Definition Async.h:795
Function< void(Result &)> callback
Callback called when descriptor is ready to be written with more data.
Definition Async.h:781
Span< Span< const char > > buffers
The read-only spans of memory where to read the data from.
Definition Async.h:787
SC::Result start(AsyncEventLoop &eventLoop, Span< Span< const char > > data)
Sets async request members and calls AsyncEventLoop::start.
Span< const char > buffer
The read-only span of memory where to read the data from.
Definition Async.h:786
Allows user to supply a block of memory that will store kernel I/O events retrieved from AsyncEventLo...
Definition Async.h:1174
Span< uint8_t > eventsMemory
User supplied block of memory used to store kernel I/O events.
Definition Async.h:1175
Starts a Timeout that is invoked only once after expiration (relative) time has passed.
Definition Async.h:300
TimeMs getExpirationTime() const
Gets computed absolute expiration time that determines when this timeout get executed.
Definition Async.h:315
SC::Result start(AsyncEventLoop &eventLoop, TimeMs relativeTimeout)
Sets async request members and calls AsyncEventLoop::start.
TimeMs relativeTimeout
First timer expiration (relative) time in milliseconds.
Definition Async.h:312
Function< void(Result &)> callback
Called after given expiration time since AsyncLoopTimeout::start has passed.
Definition Async.h:310
Starts a wake-up operation, allowing threads to execute callbacks on loop thread.
Definition Async.h:336
SC::Result start(AsyncEventLoop &eventLoop, EventObject &eventObject)
Sets async request members and calls AsyncEventLoop::start.
Function< void(Result &)> callback
Callback called by SC::AsyncEventLoop::run after SC::AsyncLoopWakeUp::wakeUp.
Definition Async.h:349
SC::Result wakeUp(AsyncEventLoop &eventLoop)
Wakes up event loop, scheduling AsyncLoopWakeUp::callback on next AsyncEventLoop::run (or its variati...
Executes work in a thread pool and then invokes a callback on the event loop thread.
Definition Async.h:945
Function< void(Result &)> callback
Called to execute the work in a background threadpool thread.
Definition Async.h:956
SC::Result setThreadPool(ThreadPool &threadPool)
Sets the ThreadPool that will supply the thread to run the async work on.
Definition Async.h:373
Starts monitoring a process, notifying about its termination.
Definition Async.h:364
SC::Result start(AsyncEventLoop &eventLoop, FileDescriptor::Handle process)
Sets async request members and calls AsyncEventLoop::start.
Function< void(Result &)> callback
Called when process has exited.
Definition Async.h:389
Base class for all async requests, holding state and type.
Definition Async.h:117
bool isCancelling() const
Returns true if this request is being cancelled.
AsyncRequest(Type type)
Constructs a free async request of given type.
Definition Async.h:155
Result start(AsyncEventLoop &eventLoop)
Shortcut for AsyncEventLoop::start.
uint16_t getUserFlags() const
Gets user flags, holding some meaningful data for the caller.
Definition Async.h:184
Function< void(AsyncResult &)> * getCloseCallback()
Returns currently set close callback (if any) passed to AsyncRequest::stop.
Definition Async.h:187
bool isActive() const
Returns true if this request is active or being reactivated.
bool isFree() const
Returns true if this request is free.
void disableThreadPool()
Disables the thread-pool usage for this request.
Type getType() const
Returns request type.
Definition Async.h:175
void setUserFlags(uint16_t externalFlags)
Sets user flags, holding some meaningful data for the caller.
Definition Async.h:181
Result executeOn(AsyncTaskSequence &task, ThreadPool &pool)
Adds the request to be executed on a specific AsyncTaskSequence.
Result stop(AsyncEventLoop &eventLoop, Function< void(AsyncResult &)> *afterStopped=nullptr)
Ask to stop current async operation.
void executeOn(AsyncSequence &sequence)
Adds the request to be executed on a specific AsyncSequence.
Type
Type of async request.
Definition Async.h:136
Helper holding CompletionData for a specific AsyncRequest-derived class.
Definition Async.h:285
Base class for all async results (argument of completion callbacks).
Definition Async.h:255
const SC::Result & isValid() const
Check if the returnCode of this result is valid.
Definition Async.h:266
AsyncResult(AsyncEventLoop &eventLoop, AsyncRequest &request, SC::Result &res, bool *hasBeenReactivated=nullptr)
Constructs an async result from a request and a result.
Definition Async.h:257
void reactivateRequest(bool shouldBeReactivated)
Ask the event loop to re-activate this request after it was already completed.
Execute AsyncRequests serially, by submitting the next one after the previous one is completed.
Definition Async.h:235
Starts a socket accept operation, obtaining a new socket from a listening socket.
Definition Async.h:467
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &socketDescriptor)
Sets async request members and calls AsyncEventLoop::start.
Starts a socket connect operation, connecting to a remote endpoint.
Definition Async.h:487
Function< void(Result &)> callback
Called after socket is finally connected to endpoint.
Definition Async.h:497
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, SocketIPAddress address)
Sets async request members and calls AsyncEventLoop::start.
Starts an unconnected socket receive from operation, receiving bytes from a remote endpoint.
Definition Async.h:650
Definition Async.h:607
SC::Result get(Span< char > &outData)
Get a Span of the actually read data.
Definition Async.h:613
Starts a socket receive operation, receiving bytes from a remote endpoint.
Definition Async.h:597
AsyncSocketReceive(Type type)
The Socket Descriptor handle to read data from.
Definition Async.h:632
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< char > data)
Sets async request members and calls AsyncEventLoop::start.
Span< char > buffer
The writeable span of memory where to data will be written.
Definition Async.h:628
Function< void(Result &)> callback
Called after data has been received.
Definition Async.h:626
Starts an unconnected socket send to operation, sending bytes to a remote endpoint.
Definition Async.h:565
Starts a socket send operation, sending bytes to a remote endpoint.
Definition Async.h:521
Function< void(Result &)> callback
Called when socket is ready to send more data.
Definition Async.h:536
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< const char > data)
Sets async request members and calls AsyncEventLoop::start.
Span< Span< const char > > buffers
Spans of bytes to send (singleBuffer == false)
Definition Async.h:541
Span< const char > buffer
Span of bytes to send (singleBuffer == true)
Definition Async.h:540
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< Span< const char > > data)
Sets async request members and calls AsyncEventLoop::start.
An AsyncSequence using a SC::ThreadPool to execute one or more SC::AsyncRequest in a background threa...
Definition Async.h:926
Atomic variables (only for int and bool for now).
Definition Atomic.h:41
An automatically reset event object to synchronize two threads.
Definition Threading.h:229
[UniqueHandleDeclaration2Snippet]
Definition File.h:78
Options used to open a file descriptor.
Definition File.h:49
A structure to describe copy flags.
Definition FileSystem.h:25
Hides implementation details from public headers (static PIMPL).
Definition OpaqueObject.h:31
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
Low-level OS socket handle.
Definition Socket.h:153
AddressFamily
Sets the address family of an IP Address (IPv4 or IPV6)
Definition Socket.h:60
Native representation of an IP Address.
Definition Socket.h:100
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29
An read-only view over a string (to avoid including Strings library when parsing is not needed).
Definition StringSpan.h:37
A small task containing a function to execute that can be queued in the thread pool.
Definition ThreadPool.h:16
Simple thread pool that executes tasks in a fixed number of worker threads.
Definition ThreadPool.h:38
A native OS thread.
Definition Threading.h:113
A vocabulary type representing a time interval in milliseconds since epoch.
Definition PrimitiveTypes.h:63