Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
Async.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Async/Internal/IntrusiveDoubleLinkedList.h"
6#include "../File/File.h"
7#include "../FileSystem/FileSystem.h"
8#include "../Foundation/Function.h"
9#include "../Foundation/OpaqueObject.h"
10#include "../Socket/Socket.h"
11#include "../Threading/Atomic.h"
12#include "../Threading/ThreadPool.h"
13
14namespace SC
15{
16struct ThreadPool;
17struct ThreadPoolTask;
18struct EventObject;
19} // namespace SC
44
47namespace SC
48{
49struct AsyncEventLoop;
50struct AsyncResult;
51struct AsyncSequence;
52struct AsyncTaskSequence;
53
54namespace detail
55{
56struct AsyncWinOverlapped;
57struct AsyncWinOverlappedDefinition
58{
59 static constexpr int Windows = sizeof(void*) * 4 + sizeof(uint64_t);
60 static constexpr size_t Alignment = alignof(void*);
61
62 using Object = AsyncWinOverlapped;
63};
64using WinOverlappedOpaque = OpaqueObject<AsyncWinOverlappedDefinition>;
65
66struct AsyncWinWaitDefinition
67{
68 using Handle = FileDescriptor::Handle; // fd
69 static constexpr Handle Invalid = FileDescriptor::Invalid; // invalid fd
70
71 static Result releaseHandle(Handle& waitHandle);
72};
73struct SC_COMPILER_EXPORT WinWaitHandle : public UniqueHandle<AsyncWinWaitDefinition>
74{
75};
76} // namespace detail
77
116struct SC_COMPILER_EXPORT AsyncRequest
117{
118 AsyncRequest* next = nullptr;
119 AsyncRequest* prev = nullptr;
120
121 void setDebugName(const char* newDebugName);
122
124 void executeOn(AsyncSequence& sequence);
125
130
133
135 enum class Type : uint8_t
136 {
137 LoopTimeout,
138 LoopWakeUp,
139 LoopWork,
140 ProcessExit,
141 SocketAccept,
142 SocketConnect,
143 SocketSend,
144 SocketSendTo,
145 SocketReceive,
146 SocketReceiveFrom,
147 FileRead,
148 FileWrite,
149 FileSend,
150 FilePoll,
151 FileSystemOperation,
152 };
153
156 AsyncRequest(Type type) : state(State::Free), type(type), flags(0), unused(0), userFlags(0) {}
157
164 Result stop(AsyncEventLoop& eventLoop, Function<void(AsyncResult&)>* afterStopped = nullptr);
165
167 [[nodiscard]] bool isFree() const;
168
170 [[nodiscard]] bool isCancelling() const;
171
173 [[nodiscard]] bool isActive() const;
174
176 [[nodiscard]] Type getType() const { return type; }
177
180
182 void setUserFlags(uint16_t externalFlags) { userFlags = externalFlags; }
183
185 uint16_t getUserFlags() const { return userFlags; }
186
188 [[nodiscard]] Function<void(AsyncResult&)>* getCloseCallback() { return closeCallback; }
189
190 [[nodiscard]] const Function<void(AsyncResult&)>* getCloseCallback() const { return closeCallback; }
191
192 protected:
193 Result checkState();
194
195 void queueSubmission(AsyncEventLoop& eventLoop);
196
197 AsyncSequence* sequence = nullptr;
198
199 AsyncTaskSequence* getTask();
200
201 private:
202 Function<void(AsyncResult&)>* closeCallback = nullptr;
203
204 friend struct AsyncEventLoop;
205 friend struct AsyncResult;
206
207 void markAsFree();
208
209 [[nodiscard]] static const char* TypeToString(Type type);
210 enum class State : uint8_t
211 {
212 Free, // not in any queue, this can be started with an async.start(...)
213 Setup, // when in submission queue waiting to be setup (after an async.start(...))
214 Submitting, // when in submission queue waiting to be activated or re-activated
215 Active, // when monitored by OS syscall or in activeLoopWakeUps / activeTimeouts queues
216 Reactivate, // when flagged for reactivation inside the callback (after a result.reactivateRequest(true))
217 Cancelling, // when in cancellation queue waiting for a cancelAsync (on active async)
218 };
219
220#if SC_ASYNC_ENABLE_LOG
221 const char* debugName = "None";
222#endif
223 State state; // 1 byte
224 Type type; // 1 byte
225 int16_t flags; // 2 bytes
226
227 uint16_t unused; // 2 bytes
228 uint16_t userFlags; // 2 bytes
229};
230
235struct SC_COMPILER_EXPORT AsyncSequence
236{
237 AsyncSequence* next = nullptr;
238 AsyncSequence* prev = nullptr;
239
240 bool clearSequenceOnCancel = true;
241 bool clearSequenceOnError = true;
242 private:
243 friend struct AsyncEventLoop;
244 bool runningAsync = false; // true if an async from this sequence is being run
245 bool tracked = false;
246
247 IntrusiveDoubleLinkedList<AsyncRequest> submissions;
248};
249
251struct SC_COMPILER_EXPORT AsyncCompletionData{};
252
255struct SC_COMPILER_EXPORT AsyncResult
256{
258 AsyncResult(AsyncEventLoop& eventLoop, AsyncRequest& request, SC::Result& res, bool* hasBeenReactivated = nullptr)
259 : eventLoop(eventLoop), async(request), hasBeenReactivated(hasBeenReactivated), returnCode(res)
260 {}
261
264 void reactivateRequest(bool shouldBeReactivated);
265
267 [[nodiscard]] const SC::Result& isValid() const { return returnCode; }
268
269 AsyncEventLoop& eventLoop;
270 AsyncRequest& async;
271
272 protected:
273 friend struct AsyncEventLoop;
274
275 bool shouldCallCallback = true;
276 bool* hasBeenReactivated = nullptr;
277
278 SC::Result& returnCode;
279};
280
284template <typename T, typename C>
286{
287 T& getAsync() { return static_cast<T&>(AsyncResult::async); }
288 const T& getAsync() const { return static_cast<const T&>(AsyncResult::async); }
289
291
292 C completionData;
293 int32_t eventIndex = 0;
294};
295
300struct SC_COMPILER_EXPORT AsyncLoopTimeout : public AsyncRequest
301{
302 AsyncLoopTimeout() : AsyncRequest(Type::LoopTimeout) {}
303
306 using AsyncRequest::start;
307
309 SC::Result start(AsyncEventLoop& eventLoop, TimeMs relativeTimeout);
310
312
314
316 TimeMs getExpirationTime() const { return expirationTime; }
317
318 private:
319 SC::Result validate(AsyncEventLoop&);
320 friend struct AsyncEventLoop;
321 TimeMs expirationTime;
322};
323
336struct SC_COMPILER_EXPORT AsyncLoopWakeUp : public AsyncRequest
337{
338 AsyncLoopWakeUp() : AsyncRequest(Type::LoopWakeUp) {}
339
342 using AsyncRequest::start;
343
345 SC::Result start(AsyncEventLoop& eventLoop, EventObject& eventObject);
346
349
351 EventObject* eventObject = nullptr;
352
353 private:
354 friend struct AsyncEventLoop;
355 SC::Result validate(AsyncEventLoop&);
356
357 Atomic<bool> pending = false;
358};
359
364struct SC_COMPILER_EXPORT AsyncProcessExit : public AsyncRequest
365{
366 AsyncProcessExit() : AsyncRequest(Type::ProcessExit) {}
367
369 {
370 int exitStatus;
371 };
372
373 struct Result : public AsyncResultOf<AsyncProcessExit, CompletionData>
374 {
375 using AsyncResultOf<AsyncProcessExit, CompletionData>::AsyncResultOf;
376
377 SC::Result get(int& status)
378 {
379 status = completionData.exitStatus;
380 return returnCode;
381 }
382 };
383 using AsyncRequest::start;
384
388 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle process);
389
391
392 private:
393 friend struct AsyncEventLoop;
394 SC::Result validate(AsyncEventLoop&);
395
396 FileDescriptor::Handle handle = FileDescriptor::Invalid;
397#if SC_PLATFORM_WINDOWS
399 detail::WinWaitHandle waitHandle;
400 AsyncEventLoop* eventLoop = nullptr;
401#elif SC_PLATFORM_LINUX
402 FileDescriptor pidFd;
403#endif
404};
405
406struct AsyncSocketAccept;
407namespace detail
408{
411struct SC_COMPILER_EXPORT AsyncSocketAcceptData
412{
413#if SC_PLATFORM_WINDOWS
414 void (*pAcceptEx)() = nullptr;
416 SocketDescriptor clientSocket;
417 uint8_t acceptBuffer[288] = {0};
418#elif SC_PLATFORM_LINUX
419 AlignedStorage<28> sockAddrHandle;
420 uint32_t sockAddrLen;
421#endif
422};
423
425struct SC_COMPILER_EXPORT AsyncSocketAcceptBase : public AsyncRequest
426{
427 AsyncSocketAcceptBase() : AsyncRequest(Type::SocketAccept) {}
428
429 struct CompletionData : public AsyncCompletionData
430 {
431 SocketDescriptor acceptedClient;
432 };
433
434 struct Result : public AsyncResultOf<AsyncSocketAccept, CompletionData>
435 {
436 using AsyncResultOf<AsyncSocketAccept, CompletionData>::AsyncResultOf;
437
438 SC::Result moveTo(SocketDescriptor& client)
439 {
440 SC_TRY(returnCode);
441 return client.assign(move(completionData.acceptedClient));
442 }
443 };
444 using AsyncRequest::start;
445
447 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& socketDescriptor, AsyncSocketAcceptData& data);
448 SC::Result validate(AsyncEventLoop&);
449
450 Function<void(Result&)> callback;
451 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
452 SocketFlags::AddressFamily addressFamily = SocketFlags::AddressFamilyIPV4;
453 AsyncSocketAcceptData* acceptData = nullptr;
454};
455
456} // namespace detail
457
467struct SC_COMPILER_EXPORT AsyncSocketAccept : public detail::AsyncSocketAcceptBase
468{
469 AsyncSocketAccept() { AsyncSocketAcceptBase::acceptData = &data; }
470 using AsyncSocketAcceptBase::start;
471
473 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& socketDescriptor);
474
475 private:
476 detail::AsyncSocketAcceptData data;
477};
478
487struct SC_COMPILER_EXPORT AsyncSocketConnect : public AsyncRequest
488{
489 AsyncSocketConnect() : AsyncRequest(Type::SocketConnect) {}
490
493 using AsyncRequest::start;
494
496 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress address);
497
499
500 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
501 SocketIPAddress ipAddress;
502
503 private:
504 friend struct AsyncEventLoop;
505 SC::Result validate(AsyncEventLoop&);
506
507#if SC_PLATFORM_WINDOWS
508 void (*pConnectEx)() = nullptr;
510#endif
511};
512
521struct SC_COMPILER_EXPORT AsyncSocketSend : public AsyncRequest
522{
523 AsyncSocketSend() : AsyncRequest(Type::SocketSend) {}
525 {
526 size_t numBytes = 0;
527 };
529 using AsyncRequest::start;
530
533
536
538
539 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
540
543 bool singleBuffer = true;
544
545 protected:
546 AsyncSocketSend(Type type) : AsyncRequest(type) {}
547 friend struct AsyncEventLoop;
548 SC::Result validate(AsyncEventLoop&);
549
550 size_t totalBytesWritten = 0;
551#if SC_PLATFORM_WINDOWS
553#endif
554};
555
565struct SC_COMPILER_EXPORT AsyncSocketSendTo : public AsyncSocketSend
566{
567 AsyncSocketSendTo() : AsyncSocketSend(Type::SocketSendTo) {}
568
569 SocketIPAddress address;
570
571 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress ipAddress,
572 Span<const char> data);
573
574 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress ipAddress,
575 Span<Span<const char>> data);
576
577 private:
578 using AsyncSocketSend::start;
579 friend struct AsyncEventLoop;
580 SC::Result validate(AsyncEventLoop&);
581#if SC_PLATFORM_LINUX
582 AlignedStorage<56> typeErasedMsgHdr;
583#endif
584};
585
597struct SC_COMPILER_EXPORT AsyncSocketReceive : public AsyncRequest
598{
599 AsyncSocketReceive() : AsyncRequest(Type::SocketReceive) {}
600
602 {
603 size_t numBytes = 0;
604 bool disconnected = false;
605 };
606
607 struct Result : public AsyncResultOf<AsyncSocketReceive, CompletionData>
608 {
609 using AsyncResultOf<AsyncSocketReceive, CompletionData>::AsyncResultOf;
610
615 {
616 SC_TRY(getAsync().buffer.sliceStartLength(0, completionData.numBytes, outData));
617 return returnCode;
618 }
619
620 SocketIPAddress getSourceAddress() const;
621 };
622 using AsyncRequest::start;
623
625 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, Span<char> data);
626
628
630 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
631
632 protected:
634 friend struct AsyncEventLoop;
635 SC::Result validate(AsyncEventLoop&);
636#if SC_PLATFORM_WINDOWS
638#endif
639};
640
650struct SC_COMPILER_EXPORT AsyncSocketReceiveFrom : public AsyncSocketReceive
651{
652 AsyncSocketReceiveFrom() : AsyncSocketReceive(Type::SocketReceiveFrom) {}
653 using AsyncSocketReceive::start;
654
655 private:
656 SocketIPAddress address;
657 friend struct AsyncSocketReceive;
658 friend struct AsyncEventLoop;
659#if SC_PLATFORM_LINUX
660 AlignedStorage<56> typeErasedMsgHdr;
661#endif
662};
663
685struct SC_COMPILER_EXPORT AsyncFileRead : public AsyncRequest
686{
687 AsyncFileRead() : AsyncRequest(Type::FileRead) { handle = FileDescriptor::Invalid; }
688
690 {
691 size_t numBytes = 0;
692 bool endOfFile = false;
693 };
694
695 struct Result : public AsyncResultOf<AsyncFileRead, CompletionData>
696 {
697 using AsyncResultOf<AsyncFileRead, CompletionData>::AsyncResultOf;
698
699 SC::Result get(Span<char>& data)
700 {
701 SC_TRY(getAsync().buffer.sliceStartLength(0, completionData.numBytes, data));
702 return returnCode;
703 }
704 };
705 using AsyncRequest::start;
706
707 Function<void(Result&)> callback;
709 FileDescriptor::Handle handle;
711
713 uint64_t getOffset() const { return offset; }
714
717 void setOffset(uint64_t fileOffset)
718 {
719 useOffset = true;
720 offset = fileOffset;
721 }
722
723 private:
724 friend struct AsyncEventLoop;
725 SC::Result validate(AsyncEventLoop&);
726
727 bool useOffset = false;
728 bool endedSync = false;
729
730 uint64_t offset = 0;
731#if SC_PLATFORM_WINDOWS
732 uint64_t readCursor = 0;
734#endif
735};
736
754struct SC_COMPILER_EXPORT AsyncFileWrite : public AsyncRequest
755{
756 AsyncFileWrite() : AsyncRequest(Type::FileWrite) { handle = FileDescriptor::Invalid; }
757
759 {
760 size_t numBytes = 0;
761 };
762
763 struct Result : public AsyncResultOf<AsyncFileWrite, CompletionData>
764 {
765 using AsyncResultOf<AsyncFileWrite, CompletionData>::AsyncResultOf;
766
767 SC::Result get(size_t& writtenSizeInBytes)
768 {
769 writtenSizeInBytes = completionData.numBytes;
770 return returnCode;
771 }
772 };
773
774 using AsyncRequest::start;
775
778
781
783
784 FileDescriptor::Handle handle;
786
789 bool singleBuffer = true;
790
792 uint64_t getOffset() const { return offset; }
793
796 void setOffset(uint64_t fileOffset)
797 {
798 useOffset = true;
799 offset = fileOffset;
800 }
801
802 private:
803 friend struct AsyncEventLoop;
804 SC::Result validate(AsyncEventLoop&);
805
806#if SC_PLATFORM_WINDOWS
807 bool endedSync = false;
808#else
809 bool isWatchable = false;
810#endif
811 bool useOffset = false;
812 uint64_t offset = 0xffffffffffffffff;
813
814 size_t totalBytesWritten = 0;
815#if SC_PLATFORM_WINDOWS
817#endif
818};
819
824struct SC_COMPILER_EXPORT AsyncFilePoll : public AsyncRequest
825{
826 AsyncFilePoll() : AsyncRequest(Type::FilePoll) {}
827
830
832 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle fileDescriptor);
833
834#if SC_PLATFORM_WINDOWS
835 [[nodiscard]] void* getOverlappedPtr();
836#endif
837
838 Function<void(Result&)> callback;
839
840 private:
841 friend struct AsyncEventLoop;
842 SC::Result validate(AsyncEventLoop&);
843
844 FileDescriptor::Handle handle = FileDescriptor::Invalid;
845#if SC_PLATFORM_WINDOWS
847#endif
848};
849
874struct SC_COMPILER_EXPORT AsyncFileSend : public AsyncRequest
875{
876 AsyncFileSend() : AsyncRequest(Type::FileSend) {}
877
879 {
880 size_t bytesTransferred = 0;
881 bool usedZeroCopy = false;
882 };
883
884 struct Result : public AsyncResultOf<AsyncFileSend, CompletionData>
885 {
886 using AsyncResultOf<AsyncFileSend, CompletionData>::AsyncResultOf;
887
889 [[nodiscard]] size_t getBytesTransferred() const { return completionData.bytesTransferred; }
890
892 [[nodiscard]] bool usedZeroCopy() const { return completionData.usedZeroCopy; }
893
895 [[nodiscard]] bool isComplete() const
896 {
897 return returnCode && completionData.bytesTransferred == getAsync().length;
898 }
899 };
900
901 using AsyncRequest::start;
902
912 SC::Result start(AsyncEventLoop& eventLoop, const FileDescriptor& file, const SocketDescriptor& socket,
913 int64_t offset = 0, size_t length = 0, size_t pipeSize = 0);
914
916
917 // Internal handles (set by start())
918 FileDescriptor::Handle fileHandle = FileDescriptor::Invalid;
919 SocketDescriptor::Handle socketHandle = SocketDescriptor::Invalid;
920
921 int64_t offset = 0;
922 size_t length = 0;
923 size_t bytesSent = 0;
924 private:
925 friend struct AsyncEventLoop;
926 SC::Result validate(AsyncEventLoop&);
927
928#if SC_PLATFORM_WINDOWS
930#elif SC_PLATFORM_LINUX
931 size_t pipeBufferSize = 0;
932 PipeDescriptor splicePipe;
933#endif
934};
935
936// forward declared because it must be defined after AsyncTaskSequence
937struct AsyncLoopWork;
939
941{
942 FileDescriptor::Handle handle = FileDescriptor::Invalid; // for open
943
944 int code = 0; // for open/close
945 size_t numBytes = 0; // for read
946};
947
948namespace detail
949{
950// A simple hand-made variant of all completion types
951struct SC_COMPILER_EXPORT AsyncCompletionVariant
952{
953 AsyncCompletionVariant() {}
954 ~AsyncCompletionVariant() { destroy(); }
955
956 AsyncCompletionVariant(const AsyncCompletionVariant&) = delete;
957 AsyncCompletionVariant(AsyncCompletionVariant&&) = delete;
958 AsyncCompletionVariant& operator=(const AsyncCompletionVariant&) = delete;
959 AsyncCompletionVariant& operator=(AsyncCompletionVariant&&) = delete;
960
961 bool inited = false;
962
963 AsyncRequest::Type type;
964 union
965 {
966 AsyncCompletionData completionDataLoopWork; // Defined after AsyncCompletionVariant / AsyncTaskSequence
967 AsyncLoopTimeout::CompletionData completionDataLoopTimeout;
968 AsyncLoopWakeUp::CompletionData completionDataLoopWakeUp;
969 AsyncProcessExit::CompletionData completionDataProcessExit;
970 AsyncSocketAccept::CompletionData completionDataSocketAccept;
971 AsyncSocketConnect::CompletionData completionDataSocketConnect;
972 AsyncSocketSend::CompletionData completionDataSocketSend;
973 AsyncSocketSendTo::CompletionData completionDataSocketSendTo;
974 AsyncSocketReceive::CompletionData completionDataSocketReceive;
975 AsyncSocketReceiveFrom::CompletionData completionDataSocketReceiveFrom;
976 AsyncFileRead::CompletionData completionDataFileRead;
977 AsyncFileWrite::CompletionData completionDataFileWrite;
978 AsyncFileSend::CompletionData completionDataFileSend;
979 AsyncFilePoll::CompletionData completionDataFilePoll;
980
981 AsyncFileSystemOperationCompletionData completionDataFileSystemOperation;
982 };
983
984 auto& getCompletion(AsyncLoopWork&) { return completionDataLoopWork; }
985 auto& getCompletion(AsyncLoopTimeout&) { return completionDataLoopTimeout; }
986 auto& getCompletion(AsyncLoopWakeUp&) { return completionDataLoopWakeUp; }
987 auto& getCompletion(AsyncProcessExit&) { return completionDataProcessExit; }
988 auto& getCompletion(AsyncSocketAccept&) { return completionDataSocketAccept; }
989 auto& getCompletion(AsyncSocketConnect&) { return completionDataSocketConnect; }
990 auto& getCompletion(AsyncSocketSend&) { return completionDataSocketSend; }
991 auto& getCompletion(AsyncSocketReceive&) { return completionDataSocketReceive; }
992 auto& getCompletion(AsyncFileRead&) { return completionDataFileRead; }
993 auto& getCompletion(AsyncFileWrite&) { return completionDataFileWrite; }
994 auto& getCompletion(AsyncFileSend&) { return completionDataFileSend; }
995 auto& getCompletion(AsyncFilePoll&) { return completionDataFilePoll; }
996 auto& getCompletion(AsyncFileSystemOperation&) { return completionDataFileSystemOperation; }
997
998 template <typename T>
999 auto& construct(T& t)
1000 {
1001 destroy();
1002 placementNew(getCompletion(t));
1003 inited = true;
1004 type = t.getType();
1005 return getCompletion(t);
1006 }
1007 void destroy();
1008};
1009} // namespace detail
1010
1014struct SC_COMPILER_EXPORT AsyncTaskSequence : public AsyncSequence
1015{
1016 protected:
1017 ThreadPoolTask task;
1018 ThreadPool* threadPool = nullptr;
1019
1020 friend struct AsyncEventLoop;
1021 friend struct AsyncRequest;
1022
1023 detail::AsyncCompletionVariant completion;
1024
1025 SC::Result returnCode = SC::Result(true);
1026};
1027
1033struct SC_COMPILER_EXPORT AsyncLoopWork : public AsyncRequest
1034{
1035 AsyncLoopWork() : AsyncRequest(Type::LoopWork) {}
1036
1039
1043
1044 Function<SC::Result()> work;
1046
1047 private:
1048 friend struct AsyncEventLoop;
1049 SC::Result validate(AsyncEventLoop&);
1050 AsyncTaskSequence task;
1051};
1052
1086struct SC_COMPILER_EXPORT AsyncFileSystemOperation : public AsyncRequest
1087{
1088 AsyncFileSystemOperation() : AsyncRequest(Type::FileSystemOperation) {}
1089 ~AsyncFileSystemOperation() { destroy(); }
1090#ifdef CopyFile
1091#undef CopyFile
1092#endif
1093#ifdef RemoveDirectory
1094#undef RemoveDirectory
1095#endif
1096 enum class Operation
1097 {
1098 None = 0,
1099 Open,
1100 Close,
1101 Read,
1102 Write,
1103 CopyFile,
1104 CopyDirectory,
1105 Rename,
1106 RemoveDirectory,
1107 RemoveFile,
1108 };
1109
1112
1115
1117
1124
1129 SC::Result close(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle);
1130
1137 SC::Result read(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle, Span<char> buffer, uint64_t offset);
1138
1145 SC::Result write(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle, Span<const char> buffer,
1146 uint64_t offset);
1147
1154 SC::Result copyFile(AsyncEventLoop& eventLoop, StringSpan path, StringSpan destinationPath,
1156
1165
1172
1179
1185
1186 private:
1187 friend struct AsyncEventLoop;
1188 Operation operation = Operation::None;
1189 AsyncLoopWork loopWork;
1190 CompletionData completionData;
1191
1192 void onOperationCompleted(AsyncLoopWork::Result& res);
1193
1194 struct FileDescriptorData
1195 {
1196 FileDescriptor::Handle handle;
1197 };
1198
1199 struct OpenData
1200 {
1201 StringSpan path;
1202 FileOpen mode;
1203 };
1204
1205 struct ReadData
1206 {
1207 FileDescriptor::Handle handle;
1208 Span<char> buffer;
1209 uint64_t offset;
1210 };
1211
1212 struct WriteData
1213 {
1214 FileDescriptor::Handle handle;
1215 Span<const char> buffer;
1216 uint64_t offset;
1217 };
1218
1219 struct CopyFileData
1220 {
1221 StringSpan path;
1222 StringSpan destinationPath;
1223 FileSystemCopyFlags copyFlags;
1224 };
1225
1226 using CopyDirectoryData = CopyFileData;
1227
1228 using CloseData = FileDescriptorData;
1229
1230 struct RenameData
1231 {
1232 StringSpan path;
1233 StringSpan newPath;
1234 };
1235
1236 struct RemoveData
1237 {
1238 StringSpan path;
1239 };
1240
1241 union
1242 {
1243 OpenData openData;
1244 CloseData closeData;
1245 ReadData readData;
1246 WriteData writeData;
1247 CopyFileData copyFileData;
1248 CopyDirectoryData copyDirectoryData;
1249 RenameData renameData;
1250 RemoveData removeData;
1251 };
1252
1253 void destroy();
1254
1255 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle fileDescriptor);
1256 SC::Result validate(AsyncEventLoop&);
1257};
1258
1262struct SC_COMPILER_EXPORT AsyncKernelEvents
1263{
1265
1266 private:
1267 int numberOfEvents = 0;
1268 friend struct AsyncEventLoop;
1269};
1270
1272struct SC_COMPILER_EXPORT AsyncEventLoopListeners
1273{
1274 Function<void(AsyncEventLoop&)> beforeBlockingPoll;
1275 Function<void(AsyncEventLoop&)> afterBlockingPoll;
1276};
1277
1284struct SC_COMPILER_EXPORT AsyncEventLoop
1285{
1287 struct Options
1288 {
1289 enum class ApiType : uint8_t
1290 {
1291 Automatic = 0,
1292 ForceUseIOURing,
1293 ForceUseEpoll,
1294 };
1296
1297 Options() { apiType = ApiType::Automatic; }
1298 };
1299
1301
1302 AsyncEventLoop(const AsyncEventLoop&) = delete;
1303 AsyncEventLoop(AsyncEventLoop&&) = delete;
1304 AsyncEventLoop& operator=(AsyncEventLoop&&) = delete;
1305 AsyncEventLoop& operator=(const AsyncEventLoop&) = delete;
1306
1309
1312
1316
1319
1321 [[nodiscard]] bool isInitialized() const;
1322
1324 [[nodiscard]] bool needsThreadPoolForFileOperations() const;
1325
1332
1343
1350
1356
1376
1383
1387
1390
1393
1396
1399
1402
1405
1408
1411
1413 [[nodiscard]] TimeMs getLoopTime() const;
1414
1416 [[nodiscard]] int getNumberOfActiveRequests() const;
1417
1419 [[nodiscard]] int getNumberOfSubmittedRequests() const;
1420
1424
1427
1430
1432 void enumerateRequests(Function<void(AsyncRequest&)> enumerationCallback);
1433
1437
1439 [[nodiscard]] static bool isExcludedFromActiveCount(const AsyncRequest& async);
1440
1443 [[nodiscard]] static bool tryLoadingLiburing();
1444
1447
1448 struct Internal;
1449
1450 public:
1451 struct SC_COMPILER_EXPORT InternalDefinition
1452 {
1453 static constexpr int Windows = 536;
1454 static constexpr int Apple = 528;
1455 static constexpr int Linux = 744;
1456 static constexpr int Default = Linux;
1457
1458 static constexpr size_t Alignment = 8;
1459
1460 using Object = Internal;
1461 };
1462
1464
1465 private:
1466 InternalOpaque internalOpaque;
1467 Internal& internal;
1468
1469 friend struct AsyncRequest;
1470 friend struct AsyncFileWrite;
1471 friend struct AsyncFileRead;
1472 friend struct AsyncFileSystemOperation;
1473 friend struct AsyncResult;
1474};
1475
1479struct SC_COMPILER_EXPORT AsyncEventLoopMonitor
1480{
1482
1486
1489
1497
1503
1504 private:
1505#if SC_COMPILER_MSVC
1506#pragma warning(push)
1507#pragma warning(disable : 4324) // useless warning on 32 bit... (structure was padded due to __declspec(align()))
1508#endif
1509 alignas(uint64_t) uint8_t eventsMemory[8 * 1024]; // 8 Kb of kernel events
1510#if SC_COMPILER_MSVC
1511#pragma warning(pop)
1512#endif
1513
1514 AsyncKernelEvents asyncKernelEvents;
1515 AsyncEventLoop* eventLoop = nullptr;
1516 AsyncLoopWakeUp eventLoopWakeUp;
1517
1518 Thread eventLoopThread;
1519 EventObject eventObjectEnterBlockingMode;
1520 EventObject eventObjectExitBlockingMode;
1521
1522 Atomic<bool> finished = false;
1523 Atomic<bool> needsWakeUp = true;
1524
1525 bool wakeUpHasBeenCalled = false;
1526
1527 Result monitoringLoopThread(Thread& thread);
1528};
1529
1530} // namespace SC
unsigned short uint16_t
Platform independent (2) bytes unsigned int.
Definition PrimitiveTypes.h:37
constexpr T && move(T &value)
Converts an lvalue to an rvalue reference.
Definition Compiler.h:264
unsigned char uint8_t
Platform independent (1) byte unsigned int.
Definition PrimitiveTypes.h:36
unsigned long long uint64_t
Platform independent (8) bytes unsigned int.
Definition PrimitiveTypes.h:42
short int16_t
Platform independent (2) bytes signed int.
Definition PrimitiveTypes.h:45
long long int64_t
Platform independent (8) bytes signed int.
Definition PrimitiveTypes.h:50
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
#define SC_TRY(expression)
Checks the value of the given expression and if failed, returns this value to caller.
Definition Result.h:49
struct SC_COMPILER_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Empty base struct for all AsyncRequest-derived CompletionData (internal) structs.
Definition Async.h:251
Allow library user to provide callbacks signaling different phases of async event loop cycle.
Definition Async.h:1273
Monitors Async I/O events from a background thread using a blocking kernel function (no CPU usage on ...
Definition Async.h:1480
Function< void(void)> onNewEventsAvailable
Informs to call dispatchCompletions on GUI Event Loop.
Definition Async.h:1481
Result startMonitoring()
Queue all async requests submissions and start monitoring loop events on a background thread.
Result close()
Stop monitoring the AsyncEventLoop, disposing all resources.
Result stopMonitoringAndDispatchCompletions()
Stops monitoring events on the background thread and dispatches callbacks for completed requests.
Result create(AsyncEventLoop &eventLoop)
Create the monitoring thread for an AsyncEventLoop.
Options given to AsyncEventLoop::create.
Definition Async.h:1288
ApiType apiType
Criteria to choose Async IO API.
Definition Async.h:1295
ApiType
Definition Async.h:1290
Asynchronous I/O (files, sockets, timers, processes, fs events, threads wake-up) (see Async) AsyncEve...
Definition Async.h:1285
bool needsThreadPoolForFileOperations() const
Returns true if backend needs a thread pool for non-blocking fs operations (anything but io_uring bas...
Result associateExternallyCreatedFileDescriptor(FileDescriptor &outDescriptor)
Associates a previously created File Descriptor with the eventLoop.
Result wakeUpFromExternalThread()
Wake up the event loop from a thread different than the one where run() is called (and potentially bl...
Result runNoWait()
Process active requests if any, dispatching their completions, or returns immediately without blockin...
static Result removeAllAssociationsFor(SocketDescriptor &outDescriptor)
Removes association of a TCP Socket with any event loop.
void updateTime()
Updates loop time to "now".
static bool isExcludedFromActiveCount(const AsyncRequest &async)
Checks if excludeFromActiveCount() has been called on the given request.
Result associateExternallyCreatedSocket(SocketDescriptor &outDescriptor)
Associates a previously created TCP / UDP socket with the eventLoop.
Result blockingPoll(AsyncKernelEvents &kernelEvents)
Blocks until at least one event happens, ensuring forward progress, without executing completions.
void clearSequence(AsyncSequence &sequence)
Clears the sequence.
int getNumberOfSubmittedRequests() const
Obtain the total number of submitted requests.
Result submitRequests(AsyncKernelEvents &kernelEvents)
Submits all queued async requests.
void enumerateRequests(Function< void(AsyncRequest &)> enumerationCallback)
Enumerates all requests objects associated with this loop.
TimeMs getLoopTime() const
Get Loop time (monotonic)
Result start(AsyncRequest &async)
Queues an async request request that has been correctly setup.
AsyncLoopTimeout * findEarliestLoopTimeout() const
Returns the next AsyncLoopTimeout that will be executed (shortest relativeTimeout)
void setListeners(AsyncEventLoopListeners *listeners)
Sets reference to listeners that will signal different events in loop lifetime.
Result dispatchCompletions(AsyncKernelEvents &kernelEvents)
Invokes completions for the AsyncKernelEvents collected by a call to AsyncEventLoop::blockingPoll.
void interrupt()
Interrupts the event loop even if it has active request on it.
Result wakeUpFromExternalThread(AsyncLoopWakeUp &wakeUp)
Wake up the event loop from a thread different than the one where run() is called (and potentially bl...
bool isInitialized() const
Returns true if create has been already called (successfully)
Result create(Options options=Options())
Creates the event loop kernel object.
static bool tryLoadingLiburing()
Check if liburing is loadable (only on Linux)
Result createAsyncTCPSocket(SocketFlags::AddressFamily family, SocketDescriptor &outDescriptor)
Creates an async TCP (IPV4 / IPV6) socket registered with the eventLoop.
Result close()
Closes the event loop kernel object.
Result createAsyncUDPSocket(SocketFlags::AddressFamily family, SocketDescriptor &outDescriptor)
Creates an async UCP (IPV4 / IPV6) socket registered with the eventLoop.
static Result removeAllAssociationsFor(FileDescriptor &outDescriptor)
Removes association of a File Descriptor with any event loop.
Result runOnce()
Blocks until at least one request proceeds, ensuring forward progress, dispatching all completions.
void excludeFromActiveCount(AsyncRequest &async)
Excludes the request from active handles count (to avoid it keeping event loop alive)
Result run()
Blocks until there are no more active queued requests, dispatching all completions.
void includeInActiveCount(AsyncRequest &async)
Reverses the effect of excludeFromActiveCount for the request.
int getNumberOfActiveRequests() const
Obtain the total number of active requests.
Starts an handle polling operation.
Definition Async.h:825
SC::Result start(AsyncEventLoop &eventLoop, FileDescriptor::Handle fileDescriptor)
Starts a file descriptor poll operation, monitoring its readiness with appropriate OS API.
Definition Async.h:690
Definition Async.h:696
Starts a file read operation, reading bytes from a file (or pipe).
Definition Async.h:686
FileDescriptor::Handle handle
The writeable span of memory where to data will be written.
Definition Async.h:709
Span< char > buffer
Callback called when some data has been read from the file into the buffer.
Definition Async.h:708
void setOffset(uint64_t fileOffset)
Sets the offset in bytes at which start reading.
Definition Async.h:717
uint64_t getOffset() const
The file/pipe descriptor handle to read data from.
Definition Async.h:713
Definition Async.h:879
Definition Async.h:885
size_t getBytesTransferred() const
Get the number of bytes transferred.
Definition Async.h:889
bool usedZeroCopy() const
Check if zero-copy was used for this transfer.
Definition Async.h:892
bool isComplete() const
Check if the entire requested range was sent.
Definition Async.h:895
Sends file contents to a socket using zero-copy when available (sendfile, TransmitFile).
Definition Async.h:875
Function< void(Result &)> callback
Called when send completes or fails.
Definition Async.h:915
SC::Result start(AsyncEventLoop &eventLoop, const FileDescriptor &file, const SocketDescriptor &socket, int64_t offset=0, size_t length=0, size_t pipeSize=0)
Start the file send operation.
Starts an asynchronous file system operation (open, close, read, write, sendFile, stat,...
Definition Async.h:1087
SC::Result copyDirectory(AsyncEventLoop &eventLoop, StringSpan path, StringSpan destinationPath, FileSystemCopyFlags copyFlags=FileSystemCopyFlags())
Copies a directory from one location to another.
SC::Result removeEmptyDirectory(AsyncEventLoop &eventLoop, StringSpan path)
Removes a directory asynchronously.
SC::Result rename(AsyncEventLoop &eventLoop, StringSpan path, StringSpan newPath)
Renames a file.
SC::Result removeFile(AsyncEventLoop &eventLoop, StringSpan path)
Removes a file asynchronously.
SC::Result setThreadPool(ThreadPool &threadPool)
Sets the thread pool to use for the operation.
SC::Result read(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle, Span< char > buffer, uint64_t offset)
Reads data from a file descriptor at a given offset.
SC::Result write(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle, Span< const char > buffer, uint64_t offset)
Writes data to a file descriptor at a given offset.
SC::Result close(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle)
Closes a file descriptor asynchronously.
Function< void(Result &)> callback
Called after the operation is completed, on the event loop thread.
Definition Async.h:1116
SC::Result copyFile(AsyncEventLoop &eventLoop, StringSpan path, StringSpan destinationPath, FileSystemCopyFlags copyFlags=FileSystemCopyFlags())
Copies a file from one location to another.
SC::Result open(AsyncEventLoop &eventLoop, StringSpan path, FileOpen mode)
Opens a file asynchronously and returns its corresponding file descriptor.
Definition Async.h:764
Starts a file write operation, writing bytes to a file (or pipe).
Definition Async.h:755
uint64_t getOffset() const
Returns the last offset set with AsyncFileWrite::setOffset.
Definition Async.h:792
FileDescriptor::Handle handle
The file/pipe descriptor to write data to.
Definition Async.h:784
SC::Result start(AsyncEventLoop &eventLoop, Span< const char > data)
Sets async request members and calls AsyncEventLoop::start.
void setOffset(uint64_t fileOffset)
Sets the offset in bytes at which start writing.
Definition Async.h:796
Function< void(Result &)> callback
Callback called when descriptor is ready to be written with more data.
Definition Async.h:782
Span< Span< const char > > buffers
The read-only spans of memory where to read the data from.
Definition Async.h:788
SC::Result start(AsyncEventLoop &eventLoop, Span< Span< const char > > data)
Sets async request members and calls AsyncEventLoop::start.
Span< const char > buffer
The read-only span of memory where to read the data from.
Definition Async.h:787
Allows user to supply a block of memory that will store kernel I/O events retrieved from AsyncEventLo...
Definition Async.h:1263
Span< uint8_t > eventsMemory
User supplied block of memory used to store kernel I/O events.
Definition Async.h:1264
Starts a Timeout that is invoked only once after expiration (relative) time has passed.
Definition Async.h:301
TimeMs getExpirationTime() const
Gets computed absolute expiration time that determines when this timeout get executed.
Definition Async.h:316
SC::Result start(AsyncEventLoop &eventLoop, TimeMs relativeTimeout)
Sets async request members and calls AsyncEventLoop::start.
TimeMs relativeTimeout
First timer expiration (relative) time in milliseconds.
Definition Async.h:313
Function< void(Result &)> callback
Called after given expiration time since AsyncLoopTimeout::start has passed.
Definition Async.h:311
Starts a wake-up operation, allowing threads to execute callbacks on loop thread.
Definition Async.h:337
SC::Result start(AsyncEventLoop &eventLoop, EventObject &eventObject)
Sets async request members and calls AsyncEventLoop::start.
Function< void(Result &)> callback
Callback called by SC::AsyncEventLoop::run after SC::AsyncLoopWakeUp::wakeUp.
Definition Async.h:350
SC::Result wakeUp(AsyncEventLoop &eventLoop)
Wakes up event loop, scheduling AsyncLoopWakeUp::callback on next AsyncEventLoop::run (or its variati...
Executes work in a thread pool and then invokes a callback on the event loop thread.
Definition Async.h:1034
Function< void(Result &)> callback
Called to execute the work in a background threadpool thread.
Definition Async.h:1045
SC::Result setThreadPool(ThreadPool &threadPool)
Sets the ThreadPool that will supply the thread to run the async work on.
Definition Async.h:374
Starts monitoring a process, notifying about its termination.
Definition Async.h:365
SC::Result start(AsyncEventLoop &eventLoop, FileDescriptor::Handle process)
Sets async request members and calls AsyncEventLoop::start.
Function< void(Result &)> callback
Called when process has exited.
Definition Async.h:390
Base class for all async requests, holding state and type.
Definition Async.h:117
bool isCancelling() const
Returns true if this request is being cancelled.
AsyncRequest(Type type)
Constructs a free async request of given type.
Definition Async.h:156
Result start(AsyncEventLoop &eventLoop)
Shortcut for AsyncEventLoop::start.
uint16_t getUserFlags() const
Gets user flags, holding some meaningful data for the caller.
Definition Async.h:185
Function< void(AsyncResult &)> * getCloseCallback()
Returns currently set close callback (if any) passed to AsyncRequest::stop.
Definition Async.h:188
bool isActive() const
Returns true if this request is active or being reactivated.
bool isFree() const
Returns true if this request is free.
void disableThreadPool()
Disables the thread-pool usage for this request.
Type getType() const
Returns request type.
Definition Async.h:176
void setUserFlags(uint16_t externalFlags)
Sets user flags, holding some meaningful data for the caller.
Definition Async.h:182
Result executeOn(AsyncTaskSequence &task, ThreadPool &pool)
Adds the request to be executed on a specific AsyncTaskSequence.
Result stop(AsyncEventLoop &eventLoop, Function< void(AsyncResult &)> *afterStopped=nullptr)
Ask to stop current async operation.
void executeOn(AsyncSequence &sequence)
Adds the request to be executed on a specific AsyncSequence.
Type
Type of async request.
Definition Async.h:136
Helper holding CompletionData for a specific AsyncRequest-derived class.
Definition Async.h:286
Base class for all async results (argument of completion callbacks).
Definition Async.h:256
const SC::Result & isValid() const
Check if the returnCode of this result is valid.
Definition Async.h:267
AsyncResult(AsyncEventLoop &eventLoop, AsyncRequest &request, SC::Result &res, bool *hasBeenReactivated=nullptr)
Constructs an async result from a request and a result.
Definition Async.h:258
void reactivateRequest(bool shouldBeReactivated)
Ask the event loop to re-activate this request after it was already completed.
Execute AsyncRequests serially, by submitting the next one after the previous one is completed.
Definition Async.h:236
Starts a socket accept operation, obtaining a new socket from a listening socket.
Definition Async.h:468
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &socketDescriptor)
Sets async request members and calls AsyncEventLoop::start.
Starts a socket connect operation, connecting to a remote endpoint.
Definition Async.h:488
Function< void(Result &)> callback
Called after socket is finally connected to endpoint.
Definition Async.h:498
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, SocketIPAddress address)
Sets async request members and calls AsyncEventLoop::start.
Starts an unconnected socket receive from operation, receiving bytes from a remote endpoint.
Definition Async.h:651
Definition Async.h:608
SC::Result get(Span< char > &outData)
Get a Span of the actually read data.
Definition Async.h:614
Starts a socket receive operation, receiving bytes from a remote endpoint.
Definition Async.h:598
AsyncSocketReceive(Type type)
The Socket Descriptor handle to read data from.
Definition Async.h:633
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< char > data)
Sets async request members and calls AsyncEventLoop::start.
Span< char > buffer
The writeable span of memory where to data will be written.
Definition Async.h:629
Function< void(Result &)> callback
Called after data has been received.
Definition Async.h:627
Starts an unconnected socket send to operation, sending bytes to a remote endpoint.
Definition Async.h:566
Starts a socket send operation, sending bytes to a remote endpoint.
Definition Async.h:522
Function< void(Result &)> callback
Called when socket is ready to send more data.
Definition Async.h:537
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< const char > data)
Sets async request members and calls AsyncEventLoop::start.
Span< Span< const char > > buffers
Spans of bytes to send (singleBuffer == false)
Definition Async.h:542
Span< const char > buffer
Span of bytes to send (singleBuffer == true)
Definition Async.h:541
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< Span< const char > > data)
Sets async request members and calls AsyncEventLoop::start.
An AsyncSequence using a SC::ThreadPool to execute one or more SC::AsyncRequest in a background threa...
Definition Async.h:1015
Atomic variables (only for int and bool for now).
Definition Atomic.h:41
An automatically reset event object to synchronize two threads.
Definition Threading.h:229
[UniqueHandleDeclaration2Snippet]
Definition File.h:78
Options used to open a file descriptor.
Definition File.h:49
A structure to describe copy flags.
Definition FileSystem.h:25
Hides implementation details from public headers (static PIMPL).
Definition OpaqueObject.h:31
Read / Write pipe (Process stdin/stdout and IPC communication)
Definition File.h:221
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
Low-level OS socket handle.
Definition Socket.h:153
AddressFamily
Sets the address family of an IP Address (IPv4 or IPV6)
Definition Socket.h:60
Native representation of an IP Address.
Definition Socket.h:100
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29
An read-only view over a string (to avoid including Strings library when parsing is not needed).
Definition StringSpan.h:37
A small task containing a function to execute that can be queued in the thread pool.
Definition ThreadPool.h:16
Simple thread pool that executes tasks in a fixed number of worker threads.
Definition ThreadPool.h:38
A native OS thread.
Definition Threading.h:113
A vocabulary type representing a time interval in milliseconds since epoch.
Definition PrimitiveTypes.h:63