Sane C++ Libraries
C++ Platform Abstraction Libraries
Loading...
Searching...
No Matches
Async.h
1// Copyright (c) Stefano Cristiano
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "../Async/Internal/IntrusiveDoubleLinkedList.h"
6#include "../File/File.h"
7#include "../FileSystem/FileSystem.h"
8#include "../Foundation/Function.h"
9#include "../Foundation/OpaqueObject.h"
10#include "../Socket/Socket.h"
11#include "../Threading/Atomic.h"
12#include "../Threading/ThreadPool.h"
13
14namespace SC
15{
16struct ThreadPool;
17struct ThreadPoolTask;
18struct EventObject;
19} // namespace SC
44
47namespace SC
48{
49struct AsyncEventLoop;
50struct AsyncResult;
51struct AsyncSequence;
52struct AsyncTaskSequence;
53
54namespace detail
55{
56struct AsyncWinOverlapped;
57struct AsyncWinOverlappedDefinition
58{
59 static constexpr int Windows = sizeof(void*) * 4 + sizeof(uint64_t);
60 static constexpr size_t Alignment = alignof(void*);
61
62 using Object = AsyncWinOverlapped;
63};
64using WinOverlappedOpaque = OpaqueObject<AsyncWinOverlappedDefinition>;
65
66struct AsyncWinWaitDefinition
67{
68 using Handle = FileDescriptor::Handle; // fd
69 static constexpr Handle Invalid = FileDescriptor::Invalid; // invalid fd
70
71 static Result releaseHandle(Handle& waitHandle);
72};
73struct SC_COMPILER_EXPORT WinWaitHandle : public UniqueHandle<AsyncWinWaitDefinition>
74{
75};
76} // namespace detail
77
116struct SC_COMPILER_EXPORT AsyncRequest
117{
118 AsyncRequest* next = nullptr;
119 AsyncRequest* prev = nullptr;
120
121 void setDebugName(const char* newDebugName);
122
124 void executeOn(AsyncSequence& sequence);
125
130
133
135 enum class Type : uint8_t
136 {
137 LoopTimeout,
138 LoopWakeUp,
139 LoopWork,
140 ProcessExit,
141 SocketAccept,
142 SocketConnect,
143 SocketSend,
144 SocketSendTo,
145 SocketReceive,
146 SocketReceiveFrom,
147 FileRead,
148 FileWrite,
149 FileSend,
150 FilePoll,
151 FileSystemOperation,
152 };
153
156 AsyncRequest(Type type) : state(State::Free), type(type), flags(0), unused(0), userFlags(0) {}
157
164 Result stop(AsyncEventLoop& eventLoop, Function<void(AsyncResult&)>* afterStopped = nullptr);
165
167 [[nodiscard]] bool isFree() const;
168
170 [[nodiscard]] bool isCancelling() const;
171
173 [[nodiscard]] bool isActive() const;
174
176 [[nodiscard]] Type getType() const { return type; }
177
180
182 void setUserFlags(uint16_t externalFlags) { userFlags = externalFlags; }
183
185 uint16_t getUserFlags() const { return userFlags; }
186
188 [[nodiscard]] Function<void(AsyncResult&)>* getCloseCallback() { return closeCallback; }
189
190 [[nodiscard]] const Function<void(AsyncResult&)>* getCloseCallback() const { return closeCallback; }
191
192 protected:
193 Result checkState();
194
195 void queueSubmission(AsyncEventLoop& eventLoop);
196
197 AsyncSequence* sequence = nullptr;
198
199 AsyncTaskSequence* getTask();
200
201 private:
202 Function<void(AsyncResult&)>* closeCallback = nullptr;
203
204 friend struct AsyncEventLoop;
205 friend struct AsyncResult;
206
207 void markAsFree();
208
209 [[nodiscard]] static const char* TypeToString(Type type);
210 enum class State : uint8_t
211 {
212 Free, // not in any queue, this can be started with an async.start(...)
213 Setup, // when in submission queue waiting to be setup (after an async.start(...))
214 Submitting, // when in submission queue waiting to be activated or re-activated
215 Active, // when monitored by OS syscall or in activeLoopWakeUps / activeTimeouts queues
216 Reactivate, // when flagged for reactivation inside the callback (after a result.reactivateRequest(true))
217 Cancelling, // when in cancellation queue waiting for a cancelAsync (on active async)
218 };
219
220#if SC_ASYNC_ENABLE_LOG
221 const char* debugName = "None";
222#endif
223 State state; // 1 byte
224 Type type; // 1 byte
225 int16_t flags; // 2 bytes
226
227 uint16_t unused; // 2 bytes
228 uint16_t userFlags; // 2 bytes
229};
230
235struct SC_COMPILER_EXPORT AsyncSequence
236{
237 AsyncSequence* next = nullptr;
238 AsyncSequence* prev = nullptr;
239
240 bool clearSequenceOnCancel = true;
241 bool clearSequenceOnError = true;
242 private:
243 friend struct AsyncEventLoop;
244 bool runningAsync = false; // true if an async from this sequence is being run
245 bool tracked = false;
246
247 IntrusiveDoubleLinkedList<AsyncRequest> submissions;
248};
249
251struct SC_COMPILER_EXPORT AsyncCompletionData{};
252
255struct SC_COMPILER_EXPORT AsyncResult
256{
258 AsyncResult(AsyncEventLoop& eventLoop, AsyncRequest& request, SC::Result& res, bool* hasBeenReactivated = nullptr)
259 : eventLoop(eventLoop), async(request), hasBeenReactivated(hasBeenReactivated), returnCode(res)
260 {}
261
264 void reactivateRequest(bool shouldBeReactivated);
265
267 [[nodiscard]] const SC::Result& isValid() const { return returnCode; }
268
269 AsyncEventLoop& eventLoop;
270 AsyncRequest& async;
271
272 protected:
273 friend struct AsyncEventLoop;
274
275 bool shouldCallCallback = true;
276 bool* hasBeenReactivated = nullptr;
277
278 SC::Result& returnCode;
279};
280
284template <typename T, typename C>
286{
287 T& getAsync() { return static_cast<T&>(AsyncResult::async); }
288 const T& getAsync() const { return static_cast<const T&>(AsyncResult::async); }
289
291
292 C completionData;
293 int32_t eventIndex = 0;
294};
295
300struct SC_COMPILER_EXPORT AsyncLoopTimeout : public AsyncRequest
301{
302 AsyncLoopTimeout() : AsyncRequest(Type::LoopTimeout) {}
303
306 using AsyncRequest::start;
307
309 SC::Result start(AsyncEventLoop& eventLoop, TimeMs relativeTimeout);
310
312
314
316 TimeMs getExpirationTime() const { return expirationTime; }
317
318 private:
319 SC::Result validate(AsyncEventLoop&);
320 friend struct AsyncEventLoop;
321 TimeMs expirationTime;
322};
323
336struct SC_COMPILER_EXPORT AsyncLoopWakeUp : public AsyncRequest
337{
338 AsyncLoopWakeUp() : AsyncRequest(Type::LoopWakeUp) {}
339
342 using AsyncRequest::start;
343
345 SC::Result start(AsyncEventLoop& eventLoop, EventObject& eventObject);
346
349
351 EventObject* eventObject = nullptr;
352
353 private:
354 friend struct AsyncEventLoop;
355 SC::Result validate(AsyncEventLoop&);
356
357 Atomic<bool> pending = false;
358};
359
364struct SC_COMPILER_EXPORT AsyncProcessExit : public AsyncRequest
365{
366 AsyncProcessExit() : AsyncRequest(Type::ProcessExit) {}
367
369 {
370 int exitStatus;
371 };
372
373 struct Result : public AsyncResultOf<AsyncProcessExit, CompletionData>
374 {
375 using AsyncResultOf<AsyncProcessExit, CompletionData>::AsyncResultOf;
376
377 SC::Result get(int& status)
378 {
379 status = completionData.exitStatus;
380 return returnCode;
381 }
382 };
383 using AsyncRequest::start;
384
388 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle process);
389
391
392 private:
393 friend struct AsyncEventLoop;
394 SC::Result validate(AsyncEventLoop&);
395
396 FileDescriptor::Handle handle = FileDescriptor::Invalid;
397#if SC_PLATFORM_WINDOWS
399 detail::WinWaitHandle waitHandle;
400 AsyncEventLoop* eventLoop = nullptr;
401#elif SC_PLATFORM_LINUX
402 FileDescriptor pidFd;
403#endif
404};
405
406struct AsyncSocketAccept;
407namespace detail
408{
411struct SC_COMPILER_EXPORT AsyncSocketAcceptData
412{
413#if SC_PLATFORM_WINDOWS
414 void (*pAcceptEx)() = nullptr;
416 SocketDescriptor clientSocket;
417 uint8_t acceptBuffer[288] = {0};
418#elif SC_PLATFORM_LINUX
419 AlignedStorage<28> sockAddrHandle;
420 uint32_t sockAddrLen;
421#endif
422};
423
425struct SC_COMPILER_EXPORT AsyncSocketAcceptBase : public AsyncRequest
426{
427 AsyncSocketAcceptBase() : AsyncRequest(Type::SocketAccept) {}
428
429 struct CompletionData : public AsyncCompletionData
430 {
431 SocketDescriptor acceptedClient;
432 };
433
434 struct Result : public AsyncResultOf<AsyncSocketAccept, CompletionData>
435 {
436 using AsyncResultOf<AsyncSocketAccept, CompletionData>::AsyncResultOf;
437
438 SC::Result moveTo(SocketDescriptor& client)
439 {
440 SC_TRY(returnCode);
441 return client.assign(move(completionData.acceptedClient));
442 }
443 };
444 using AsyncRequest::start;
445
447 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& socketDescriptor, AsyncSocketAcceptData& data);
448 SC::Result validate(AsyncEventLoop&);
449
450 Function<void(Result&)> callback;
451 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
452 SocketFlags::AddressFamily addressFamily = SocketFlags::AddressFamilyIPV4;
453 AsyncSocketAcceptData* acceptData = nullptr;
454};
455
456} // namespace detail
457
467struct SC_COMPILER_EXPORT AsyncSocketAccept : public detail::AsyncSocketAcceptBase
468{
469 AsyncSocketAccept() { AsyncSocketAcceptBase::acceptData = &data; }
470 using AsyncSocketAcceptBase::start;
471
473 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& socketDescriptor);
474
475 private:
476 detail::AsyncSocketAcceptData data;
477};
478
487struct SC_COMPILER_EXPORT AsyncSocketConnect : public AsyncRequest
488{
489 AsyncSocketConnect() : AsyncRequest(Type::SocketConnect) {}
490
493 using AsyncRequest::start;
494
496 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress address);
497
499
500 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
501 SocketIPAddress ipAddress;
502
503 private:
504 friend struct AsyncEventLoop;
505 SC::Result validate(AsyncEventLoop&);
506
507#if SC_PLATFORM_WINDOWS
508 void (*pConnectEx)() = nullptr;
510#endif
511};
512
521struct SC_COMPILER_EXPORT AsyncSocketSend : public AsyncRequest
522{
523 AsyncSocketSend() : AsyncRequest(Type::SocketSend) {}
525 {
526 size_t numBytes = 0;
527 };
529 using AsyncRequest::start;
530
533
536
538
539 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
540
543 bool singleBuffer = true;
544
545 protected:
546 AsyncSocketSend(Type type) : AsyncRequest(type) {}
547 friend struct AsyncEventLoop;
548 SC::Result validate(AsyncEventLoop&);
549
550 size_t totalBytesWritten = 0;
551#if SC_PLATFORM_WINDOWS
553#endif
554};
555
565struct SC_COMPILER_EXPORT AsyncSocketSendTo : public AsyncSocketSend
566{
567 AsyncSocketSendTo() : AsyncSocketSend(Type::SocketSendTo) {}
568
569 SocketIPAddress address;
570
571 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress ipAddress,
572 Span<const char> data);
573
574 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, SocketIPAddress ipAddress,
575 Span<Span<const char>> data);
576
577 private:
578 using AsyncSocketSend::start;
579 friend struct AsyncEventLoop;
580 SC::Result validate(AsyncEventLoop&);
581#if SC_PLATFORM_LINUX
582 AlignedStorage<56> typeErasedMsgHdr;
583#endif
584};
585
597struct SC_COMPILER_EXPORT AsyncSocketReceive : public AsyncRequest
598{
599 AsyncSocketReceive() : AsyncRequest(Type::SocketReceive) {}
600
602 {
603 size_t numBytes = 0;
604 bool disconnected = false;
605 };
606
607 struct Result : public AsyncResultOf<AsyncSocketReceive, CompletionData>
608 {
609 using AsyncResultOf<AsyncSocketReceive, CompletionData>::AsyncResultOf;
610
615 {
616 SC_TRY(getAsync().buffer.sliceStartLength(0, completionData.numBytes, outData));
617 return returnCode;
618 }
619
620 SocketIPAddress getSourceAddress() const;
621 };
622 using AsyncRequest::start;
623
625 SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor, Span<char> data);
626
628
630 SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
631
632 protected:
634 friend struct AsyncEventLoop;
635 SC::Result validate(AsyncEventLoop&);
636#if SC_PLATFORM_WINDOWS
638#endif
639};
640
650struct SC_COMPILER_EXPORT AsyncSocketReceiveFrom : public AsyncSocketReceive
651{
652 AsyncSocketReceiveFrom() : AsyncSocketReceive(Type::SocketReceiveFrom) {}
653 using AsyncSocketReceive::start;
654
655 private:
656 SocketIPAddress address;
657 friend struct AsyncSocketReceive;
658 friend struct AsyncEventLoop;
659#if SC_PLATFORM_LINUX
660 AlignedStorage<56> typeErasedMsgHdr;
661#endif
662};
663
685struct SC_COMPILER_EXPORT AsyncFileRead : public AsyncRequest
686{
687 AsyncFileRead() : AsyncRequest(Type::FileRead) { handle = FileDescriptor::Invalid; }
688
690 {
691 size_t numBytes = 0;
692 bool endOfFile = false;
693 };
694
695 struct Result : public AsyncResultOf<AsyncFileRead, CompletionData>
696 {
697 using AsyncResultOf<AsyncFileRead, CompletionData>::AsyncResultOf;
698
699 SC::Result get(Span<char>& data)
700 {
701 SC_TRY(getAsync().buffer.sliceStartLength(0, completionData.numBytes, data));
702 return returnCode;
703 }
704 };
705 using AsyncRequest::start;
706
708 SC::Result start(AsyncEventLoop& eventLoop, const FileDescriptor& descriptor, Span<char> data);
709
710 Function<void(Result&)> callback;
712 FileDescriptor::Handle handle;
714
716 uint64_t getOffset() const { return offset; }
717
720 void setOffset(uint64_t fileOffset)
721 {
722 useOffset = true;
723 offset = fileOffset;
724 }
725
726 private:
727 friend struct AsyncEventLoop;
728 SC::Result validate(AsyncEventLoop&);
729
730 bool useOffset = false;
731 bool endedSync = false;
732
733 uint64_t offset = 0;
734#if SC_PLATFORM_WINDOWS
735 uint64_t readCursor = 0;
737#endif
738};
739
757struct SC_COMPILER_EXPORT AsyncFileWrite : public AsyncRequest
758{
759 AsyncFileWrite() : AsyncRequest(Type::FileWrite) { handle = FileDescriptor::Invalid; }
760
762 {
763 size_t numBytes = 0;
764 };
765
766 struct Result : public AsyncResultOf<AsyncFileWrite, CompletionData>
767 {
768 using AsyncResultOf<AsyncFileWrite, CompletionData>::AsyncResultOf;
769
770 SC::Result get(size_t& writtenSizeInBytes)
771 {
772 writtenSizeInBytes = completionData.numBytes;
773 return returnCode;
774 }
775 };
776
777 using AsyncRequest::start;
778
780 SC::Result start(AsyncEventLoop& eventLoop, const FileDescriptor& descriptor, Span<Span<const char>> data);
781
784
786 SC::Result start(AsyncEventLoop& eventLoop, const FileDescriptor& descriptor, Span<const char> data);
787
790
792
793 FileDescriptor::Handle handle;
795
798 bool singleBuffer = true;
799
801 uint64_t getOffset() const { return offset; }
802
805 void setOffset(uint64_t fileOffset)
806 {
807 useOffset = true;
808 offset = fileOffset;
809 }
810
811 private:
812 friend struct AsyncEventLoop;
813 SC::Result validate(AsyncEventLoop&);
814
815#if SC_PLATFORM_WINDOWS
816 bool endedSync = false;
817#else
818 bool isWatchable = false;
819#endif
820 bool useOffset = false;
821 uint64_t offset = 0xffffffffffffffff;
822
823 size_t totalBytesWritten = 0;
824#if SC_PLATFORM_WINDOWS
826#endif
827};
828
833struct SC_COMPILER_EXPORT AsyncFilePoll : public AsyncRequest
834{
835 AsyncFilePoll() : AsyncRequest(Type::FilePoll) {}
836
839
841 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle fileDescriptor);
842
843#if SC_PLATFORM_WINDOWS
844 [[nodiscard]] void* getOverlappedPtr();
845#endif
846
847 Function<void(Result&)> callback;
848
849 private:
850 friend struct AsyncEventLoop;
851 SC::Result validate(AsyncEventLoop&);
852
853 FileDescriptor::Handle handle = FileDescriptor::Invalid;
854#if SC_PLATFORM_WINDOWS
856#endif
857};
858
883struct SC_COMPILER_EXPORT AsyncFileSend : public AsyncRequest
884{
885 AsyncFileSend() : AsyncRequest(Type::FileSend) {}
886
888 {
889 size_t bytesTransferred = 0;
890 bool usedZeroCopy = false;
891 };
892
893 struct Result : public AsyncResultOf<AsyncFileSend, CompletionData>
894 {
895 using AsyncResultOf<AsyncFileSend, CompletionData>::AsyncResultOf;
896
898 [[nodiscard]] size_t getBytesTransferred() const { return completionData.bytesTransferred; }
899
901 [[nodiscard]] bool usedZeroCopy() const { return completionData.usedZeroCopy; }
902
904 [[nodiscard]] bool isComplete() const
905 {
906 return returnCode && completionData.bytesTransferred == getAsync().length;
907 }
908 };
909
910 using AsyncRequest::start;
911
921 SC::Result start(AsyncEventLoop& eventLoop, const FileDescriptor& file, const SocketDescriptor& socket,
922 int64_t offset = 0, size_t length = 0, size_t pipeSize = 0);
923
925
926 // Internal handles (set by start())
927 FileDescriptor::Handle fileHandle = FileDescriptor::Invalid;
928 SocketDescriptor::Handle socketHandle = SocketDescriptor::Invalid;
929
930 int64_t offset = 0;
931 size_t length = 0;
932 size_t bytesSent = 0;
933 private:
934 friend struct AsyncEventLoop;
935 SC::Result validate(AsyncEventLoop&);
936
937#if SC_PLATFORM_WINDOWS
939#elif SC_PLATFORM_LINUX
940 size_t pipeBufferSize = 0;
941 PipeDescriptor splicePipe;
942#endif
943};
944
945// forward declared because it must be defined after AsyncTaskSequence
946struct AsyncLoopWork;
948
950{
951 FileDescriptor::Handle handle = FileDescriptor::Invalid; // for open
952
953 int code = 0; // for open/close
954 size_t numBytes = 0; // for read
955};
956
957namespace detail
958{
959// A simple hand-made variant of all completion types
960struct SC_COMPILER_EXPORT AsyncCompletionVariant
961{
962 AsyncCompletionVariant() {}
963 ~AsyncCompletionVariant() { destroy(); }
964
965 AsyncCompletionVariant(const AsyncCompletionVariant&) = delete;
966 AsyncCompletionVariant(AsyncCompletionVariant&&) = delete;
967 AsyncCompletionVariant& operator=(const AsyncCompletionVariant&) = delete;
968 AsyncCompletionVariant& operator=(AsyncCompletionVariant&&) = delete;
969
970 bool inited = false;
971
972 AsyncRequest::Type type;
973 union
974 {
975 AsyncCompletionData completionDataLoopWork; // Defined after AsyncCompletionVariant / AsyncTaskSequence
976 AsyncLoopTimeout::CompletionData completionDataLoopTimeout;
977 AsyncLoopWakeUp::CompletionData completionDataLoopWakeUp;
978 AsyncProcessExit::CompletionData completionDataProcessExit;
979 AsyncSocketAccept::CompletionData completionDataSocketAccept;
980 AsyncSocketConnect::CompletionData completionDataSocketConnect;
981 AsyncSocketSend::CompletionData completionDataSocketSend;
982 AsyncSocketSendTo::CompletionData completionDataSocketSendTo;
983 AsyncSocketReceive::CompletionData completionDataSocketReceive;
984 AsyncSocketReceiveFrom::CompletionData completionDataSocketReceiveFrom;
985 AsyncFileRead::CompletionData completionDataFileRead;
986 AsyncFileWrite::CompletionData completionDataFileWrite;
987 AsyncFileSend::CompletionData completionDataFileSend;
988 AsyncFilePoll::CompletionData completionDataFilePoll;
989
990 AsyncFileSystemOperationCompletionData completionDataFileSystemOperation;
991 };
992
993 auto& getCompletion(AsyncLoopWork&) { return completionDataLoopWork; }
994 auto& getCompletion(AsyncLoopTimeout&) { return completionDataLoopTimeout; }
995 auto& getCompletion(AsyncLoopWakeUp&) { return completionDataLoopWakeUp; }
996 auto& getCompletion(AsyncProcessExit&) { return completionDataProcessExit; }
997 auto& getCompletion(AsyncSocketAccept&) { return completionDataSocketAccept; }
998 auto& getCompletion(AsyncSocketConnect&) { return completionDataSocketConnect; }
999 auto& getCompletion(AsyncSocketSend&) { return completionDataSocketSend; }
1000 auto& getCompletion(AsyncSocketReceive&) { return completionDataSocketReceive; }
1001 auto& getCompletion(AsyncFileRead&) { return completionDataFileRead; }
1002 auto& getCompletion(AsyncFileWrite&) { return completionDataFileWrite; }
1003 auto& getCompletion(AsyncFileSend&) { return completionDataFileSend; }
1004 auto& getCompletion(AsyncFilePoll&) { return completionDataFilePoll; }
1005 auto& getCompletion(AsyncFileSystemOperation&) { return completionDataFileSystemOperation; }
1006
1007 template <typename T>
1008 auto& construct(T& t)
1009 {
1010 destroy();
1011 placementNew(getCompletion(t));
1012 inited = true;
1013 type = t.getType();
1014 return getCompletion(t);
1015 }
1016 void destroy();
1017};
1018} // namespace detail
1019
1023struct SC_COMPILER_EXPORT AsyncTaskSequence : public AsyncSequence
1024{
1025 protected:
1026 ThreadPoolTask task;
1027 ThreadPool* threadPool = nullptr;
1028
1029 friend struct AsyncEventLoop;
1030 friend struct AsyncRequest;
1031
1032 detail::AsyncCompletionVariant completion;
1033
1034 SC::Result returnCode = SC::Result(true);
1035};
1036
1042struct SC_COMPILER_EXPORT AsyncLoopWork : public AsyncRequest
1043{
1044 AsyncLoopWork() : AsyncRequest(Type::LoopWork) {}
1045
1048
1052
1053 Function<SC::Result()> work;
1055
1056 private:
1057 friend struct AsyncEventLoop;
1058 SC::Result validate(AsyncEventLoop&);
1059 AsyncTaskSequence task;
1060};
1061
1095struct SC_COMPILER_EXPORT AsyncFileSystemOperation : public AsyncRequest
1096{
1097 AsyncFileSystemOperation() : AsyncRequest(Type::FileSystemOperation) {}
1098 ~AsyncFileSystemOperation() { destroy(); }
1099#ifdef CopyFile
1100#undef CopyFile
1101#endif
1102#ifdef RemoveDirectory
1103#undef RemoveDirectory
1104#endif
1105 enum class Operation
1106 {
1107 None = 0,
1108 Open,
1109 Close,
1110 Read,
1111 Write,
1112 CopyFile,
1113 CopyDirectory,
1114 Rename,
1115 RemoveDirectory,
1116 RemoveFile,
1117 };
1118
1121
1124
1126
1133
1138 SC::Result close(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle);
1139
1146 SC::Result read(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle, Span<char> buffer, uint64_t offset);
1147
1154 SC::Result write(AsyncEventLoop& eventLoop, FileDescriptor::Handle handle, Span<const char> buffer,
1155 uint64_t offset);
1156
1163 SC::Result copyFile(AsyncEventLoop& eventLoop, StringSpan path, StringSpan destinationPath,
1165
1174
1181
1188
1194
1195 private:
1196 friend struct AsyncEventLoop;
1197 Operation operation = Operation::None;
1198 AsyncLoopWork loopWork;
1199 CompletionData completionData;
1200
1201 void onOperationCompleted(AsyncLoopWork::Result& res);
1202
1203 struct FileDescriptorData
1204 {
1205 FileDescriptor::Handle handle;
1206 };
1207
1208 struct OpenData
1209 {
1210 StringSpan path;
1211 FileOpen mode;
1212 };
1213
1214 struct ReadData
1215 {
1216 FileDescriptor::Handle handle;
1217 Span<char> buffer;
1218 uint64_t offset;
1219 };
1220
1221 struct WriteData
1222 {
1223 FileDescriptor::Handle handle;
1224 Span<const char> buffer;
1225 uint64_t offset;
1226 };
1227
1228 struct CopyFileData
1229 {
1230 StringSpan path;
1231 StringSpan destinationPath;
1232 FileSystemCopyFlags copyFlags;
1233 };
1234
1235 using CopyDirectoryData = CopyFileData;
1236
1237 using CloseData = FileDescriptorData;
1238
1239 struct RenameData
1240 {
1241 StringSpan path;
1242 StringSpan newPath;
1243 };
1244
1245 struct RemoveData
1246 {
1247 StringSpan path;
1248 };
1249
1250 union
1251 {
1252 OpenData openData;
1253 CloseData closeData;
1254 ReadData readData;
1255 WriteData writeData;
1256 CopyFileData copyFileData;
1257 CopyDirectoryData copyDirectoryData;
1258 RenameData renameData;
1259 RemoveData removeData;
1260 };
1261
1262 void destroy();
1263
1264 SC::Result start(AsyncEventLoop& eventLoop, FileDescriptor::Handle fileDescriptor);
1265 SC::Result validate(AsyncEventLoop&);
1266};
1267
1271struct SC_COMPILER_EXPORT AsyncKernelEvents
1272{
1274
1275 private:
1276 int numberOfEvents = 0;
1277 friend struct AsyncEventLoop;
1278};
1279
1281struct SC_COMPILER_EXPORT AsyncEventLoopListeners
1282{
1283 Function<void(AsyncEventLoop&)> beforeBlockingPoll;
1284 Function<void(AsyncEventLoop&)> afterBlockingPoll;
1285};
1286
1293struct SC_COMPILER_EXPORT AsyncEventLoop
1294{
1296 struct Options
1297 {
1298 enum class ApiType : uint8_t
1299 {
1300 Automatic = 0,
1301 ForceUseIOURing,
1302 ForceUseEpoll,
1303 };
1305
1306 Options() { apiType = ApiType::Automatic; }
1307 };
1308
1310
1311 AsyncEventLoop(const AsyncEventLoop&) = delete;
1312 AsyncEventLoop(AsyncEventLoop&&) = delete;
1313 AsyncEventLoop& operator=(AsyncEventLoop&&) = delete;
1314 AsyncEventLoop& operator=(const AsyncEventLoop&) = delete;
1315
1318
1321
1325
1328
1330 [[nodiscard]] bool isInitialized() const;
1331
1333 [[nodiscard]] bool needsThreadPoolForFileOperations() const;
1334
1341
1352
1359
1365
1385
1392
1396
1399
1402
1405
1408
1411
1414
1417
1420
1422 [[nodiscard]] TimeMs getLoopTime() const;
1423
1425 [[nodiscard]] int getNumberOfActiveRequests() const;
1426
1428 [[nodiscard]] int getNumberOfSubmittedRequests() const;
1429
1433
1436
1439
1441 void enumerateRequests(Function<void(AsyncRequest&)> enumerationCallback);
1442
1446
1448 [[nodiscard]] static bool isExcludedFromActiveCount(const AsyncRequest& async);
1449
1452 [[nodiscard]] static bool tryLoadingLiburing();
1453
1456
1457 struct Internal;
1458
1459 public:
1460 struct SC_COMPILER_EXPORT InternalDefinition
1461 {
1462 static constexpr int Windows = 536;
1463 static constexpr int Apple = 504;
1464 static constexpr int Linux = 720;
1465 static constexpr int Default = Linux;
1466
1467 static constexpr size_t Alignment = 8;
1468
1469 using Object = Internal;
1470 };
1471
1473
1474 private:
1475 InternalOpaque internalOpaque;
1476 Internal& internal;
1477
1478 friend struct AsyncRequest;
1479 friend struct AsyncFileWrite;
1480 friend struct AsyncFileRead;
1481 friend struct AsyncFileSystemOperation;
1482 friend struct AsyncResult;
1483};
1484
1488struct SC_COMPILER_EXPORT AsyncEventLoopMonitor
1489{
1491
1495
1498
1506
1512
1513 private:
1514#if SC_COMPILER_MSVC
1515#pragma warning(push)
1516#pragma warning(disable : 4324) // useless warning on 32 bit... (structure was padded due to __declspec(align()))
1517#endif
1518 alignas(uint64_t) uint8_t eventsMemory[8 * 1024]; // 8 Kb of kernel events
1519#if SC_COMPILER_MSVC
1520#pragma warning(pop)
1521#endif
1522
1523 AsyncKernelEvents asyncKernelEvents;
1524 AsyncEventLoop* eventLoop = nullptr;
1525 AsyncLoopWakeUp eventLoopWakeUp;
1526
1527 Thread eventLoopThread;
1528 EventObject eventObjectEnterBlockingMode;
1529 EventObject eventObjectExitBlockingMode;
1530
1531 Atomic<bool> finished = false;
1532 Atomic<bool> needsWakeUp = true;
1533
1534 bool wakeUpHasBeenCalled = false;
1535
1536 Result monitoringLoopThread(Thread& thread);
1537};
1538
1539} // namespace SC
unsigned short uint16_t
Platform independent (2) bytes unsigned int.
Definition PrimitiveTypes.h:37
constexpr T && move(T &value)
Converts an lvalue to an rvalue reference.
Definition Compiler.h:264
unsigned char uint8_t
Platform independent (1) byte unsigned int.
Definition PrimitiveTypes.h:36
unsigned long long uint64_t
Platform independent (8) bytes unsigned int.
Definition PrimitiveTypes.h:42
short int16_t
Platform independent (2) bytes signed int.
Definition PrimitiveTypes.h:45
long long int64_t
Platform independent (8) bytes signed int.
Definition PrimitiveTypes.h:50
int int32_t
Platform independent (4) bytes signed int.
Definition PrimitiveTypes.h:46
#define SC_TRY(expression)
Checks the value of the given expression and if failed, returns this value to caller.
Definition Result.h:49
struct SC_COMPILER_EXPORT Function
Wraps function pointers, member functions and lambdas without ever allocating.
Definition Function.h:19
A buffer of bytes with given alignment.
Definition AlignedStorage.h:29
Empty base struct for all AsyncRequest-derived CompletionData (internal) structs.
Definition Async.h:251
Allow library user to provide callbacks signaling different phases of async event loop cycle.
Definition Async.h:1282
Monitors Async I/O events from a background thread using a blocking kernel function (no CPU usage on ...
Definition Async.h:1489
Function< void(void)> onNewEventsAvailable
Informs to call dispatchCompletions on GUI Event Loop.
Definition Async.h:1490
Result startMonitoring()
Queue all async requests submissions and start monitoring loop events on a background thread.
Result close()
Stop monitoring the AsyncEventLoop, disposing all resources.
Result stopMonitoringAndDispatchCompletions()
Stops monitoring events on the background thread and dispatches callbacks for completed requests.
Result create(AsyncEventLoop &eventLoop)
Create the monitoring thread for an AsyncEventLoop.
Options given to AsyncEventLoop::create.
Definition Async.h:1297
ApiType apiType
Criteria to choose Async IO API.
Definition Async.h:1304
ApiType
Definition Async.h:1299
Asynchronous I/O (files, sockets, timers, processes, fs events, threads wake-up) (see Async) AsyncEve...
Definition Async.h:1294
bool needsThreadPoolForFileOperations() const
Returns true if backend needs a thread pool for non-blocking fs operations (anything but io_uring bas...
Result associateExternallyCreatedFileDescriptor(FileDescriptor &outDescriptor)
Associates a previously created File Descriptor with the eventLoop.
Result wakeUpFromExternalThread()
Wake up the event loop from a thread different than the one where run() is called (and potentially bl...
Result runNoWait()
Process active requests if any, dispatching their completions, or returns immediately without blockin...
static Result removeAllAssociationsFor(SocketDescriptor &outDescriptor)
Removes association of a TCP Socket with any event loop.
void updateTime()
Updates loop time to "now".
static bool isExcludedFromActiveCount(const AsyncRequest &async)
Checks if excludeFromActiveCount() has been called on the given request.
Result associateExternallyCreatedSocket(SocketDescriptor &outDescriptor)
Associates a previously created TCP / UDP socket with the eventLoop.
Result blockingPoll(AsyncKernelEvents &kernelEvents)
Blocks until at least one event happens, ensuring forward progress, without executing completions.
void clearSequence(AsyncSequence &sequence)
Clears the sequence.
int getNumberOfSubmittedRequests() const
Obtain the total number of submitted requests.
Result submitRequests(AsyncKernelEvents &kernelEvents)
Submits all queued async requests.
void enumerateRequests(Function< void(AsyncRequest &)> enumerationCallback)
Enumerates all requests objects associated with this loop.
TimeMs getLoopTime() const
Get Loop time (monotonic)
Result start(AsyncRequest &async)
Queues an async request request that has been correctly setup.
AsyncLoopTimeout * findEarliestLoopTimeout() const
Returns the next AsyncLoopTimeout that will be executed (shortest relativeTimeout)
void setListeners(AsyncEventLoopListeners *listeners)
Sets reference to listeners that will signal different events in loop lifetime.
Result dispatchCompletions(AsyncKernelEvents &kernelEvents)
Invokes completions for the AsyncKernelEvents collected by a call to AsyncEventLoop::blockingPoll.
void interrupt()
Interrupts the event loop even if it has active request on it.
Result wakeUpFromExternalThread(AsyncLoopWakeUp &wakeUp)
Wake up the event loop from a thread different than the one where run() is called (and potentially bl...
bool isInitialized() const
Returns true if create has been already called (successfully)
Result create(Options options=Options())
Creates the event loop kernel object.
static bool tryLoadingLiburing()
Check if liburing is loadable (only on Linux)
Result createAsyncTCPSocket(SocketFlags::AddressFamily family, SocketDescriptor &outDescriptor)
Creates an async TCP (IPV4 / IPV6) socket registered with the eventLoop.
Result close()
Closes the event loop kernel object.
Result createAsyncUDPSocket(SocketFlags::AddressFamily family, SocketDescriptor &outDescriptor)
Creates an async UCP (IPV4 / IPV6) socket registered with the eventLoop.
static Result removeAllAssociationsFor(FileDescriptor &outDescriptor)
Removes association of a File Descriptor with any event loop.
Result runOnce()
Blocks until at least one request proceeds, ensuring forward progress, dispatching all completions.
void excludeFromActiveCount(AsyncRequest &async)
Excludes the request from active handles count (to avoid it keeping event loop alive)
Result run()
Blocks until there are no more active queued requests, dispatching all completions.
void includeInActiveCount(AsyncRequest &async)
Reverses the effect of excludeFromActiveCount for the request.
int getNumberOfActiveRequests() const
Obtain the total number of active requests.
Starts an handle polling operation.
Definition Async.h:834
SC::Result start(AsyncEventLoop &eventLoop, FileDescriptor::Handle fileDescriptor)
Starts a file descriptor poll operation, monitoring its readiness with appropriate OS API.
Definition Async.h:690
Definition Async.h:696
Starts a file read operation, reading bytes from a file (or pipe).
Definition Async.h:686
FileDescriptor::Handle handle
The writeable span of memory where to data will be written.
Definition Async.h:712
Span< char > buffer
Callback called when some data has been read from the file into the buffer.
Definition Async.h:711
void setOffset(uint64_t fileOffset)
Sets the offset in bytes at which start reading.
Definition Async.h:720
SC::Result start(AsyncEventLoop &eventLoop, const FileDescriptor &descriptor, Span< char > data)
Sets async request members and calls AsyncEventLoop::start.
uint64_t getOffset() const
The file/pipe descriptor handle to read data from.
Definition Async.h:716
Definition Async.h:888
Definition Async.h:894
size_t getBytesTransferred() const
Get the number of bytes transferred.
Definition Async.h:898
bool usedZeroCopy() const
Check if zero-copy was used for this transfer.
Definition Async.h:901
bool isComplete() const
Check if the entire requested range was sent.
Definition Async.h:904
Sends file contents to a socket using zero-copy when available (sendfile, TransmitFile).
Definition Async.h:884
Function< void(Result &)> callback
Called when send completes or fails.
Definition Async.h:924
SC::Result start(AsyncEventLoop &eventLoop, const FileDescriptor &file, const SocketDescriptor &socket, int64_t offset=0, size_t length=0, size_t pipeSize=0)
Start the file send operation.
Starts an asynchronous file system operation (open, close, read, write, sendFile, stat,...
Definition Async.h:1096
SC::Result copyDirectory(AsyncEventLoop &eventLoop, StringSpan path, StringSpan destinationPath, FileSystemCopyFlags copyFlags=FileSystemCopyFlags())
Copies a directory from one location to another.
SC::Result removeEmptyDirectory(AsyncEventLoop &eventLoop, StringSpan path)
Removes a directory asynchronously.
SC::Result rename(AsyncEventLoop &eventLoop, StringSpan path, StringSpan newPath)
Renames a file.
SC::Result removeFile(AsyncEventLoop &eventLoop, StringSpan path)
Removes a file asynchronously.
SC::Result setThreadPool(ThreadPool &threadPool)
Sets the thread pool to use for the operation.
SC::Result read(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle, Span< char > buffer, uint64_t offset)
Reads data from a file descriptor at a given offset.
SC::Result write(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle, Span< const char > buffer, uint64_t offset)
Writes data to a file descriptor at a given offset.
SC::Result close(AsyncEventLoop &eventLoop, FileDescriptor::Handle handle)
Closes a file descriptor asynchronously.
Function< void(Result &)> callback
Called after the operation is completed, on the event loop thread.
Definition Async.h:1125
SC::Result copyFile(AsyncEventLoop &eventLoop, StringSpan path, StringSpan destinationPath, FileSystemCopyFlags copyFlags=FileSystemCopyFlags())
Copies a file from one location to another.
SC::Result open(AsyncEventLoop &eventLoop, StringSpan path, FileOpen mode)
Opens a file asynchronously and returns its corresponding file descriptor.
Definition Async.h:767
Starts a file write operation, writing bytes to a file (or pipe).
Definition Async.h:758
uint64_t getOffset() const
Returns the last offset set with AsyncFileWrite::setOffset.
Definition Async.h:801
FileDescriptor::Handle handle
The file/pipe descriptor to write data to.
Definition Async.h:793
SC::Result start(AsyncEventLoop &eventLoop, Span< const char > data)
Sets async request members and calls AsyncEventLoop::start.
SC::Result start(AsyncEventLoop &eventLoop, const FileDescriptor &descriptor, Span< Span< const char > > data)
Sets async request members and calls AsyncEventLoop::start.
void setOffset(uint64_t fileOffset)
Sets the offset in bytes at which start writing.
Definition Async.h:805
Function< void(Result &)> callback
Callback called when descriptor is ready to be written with more data.
Definition Async.h:791
SC::Result start(AsyncEventLoop &eventLoop, const FileDescriptor &descriptor, Span< const char > data)
Sets async request members and calls AsyncEventLoop::start.
Span< Span< const char > > buffers
The read-only spans of memory where to read the data from.
Definition Async.h:797
SC::Result start(AsyncEventLoop &eventLoop, Span< Span< const char > > data)
Sets async request members and calls AsyncEventLoop::start.
Span< const char > buffer
The read-only span of memory where to read the data from.
Definition Async.h:796
Allows user to supply a block of memory that will store kernel I/O events retrieved from AsyncEventLo...
Definition Async.h:1272
Span< uint8_t > eventsMemory
User supplied block of memory used to store kernel I/O events.
Definition Async.h:1273
Starts a Timeout that is invoked only once after expiration (relative) time has passed.
Definition Async.h:301
TimeMs getExpirationTime() const
Gets computed absolute expiration time that determines when this timeout get executed.
Definition Async.h:316
SC::Result start(AsyncEventLoop &eventLoop, TimeMs relativeTimeout)
Sets async request members and calls AsyncEventLoop::start.
TimeMs relativeTimeout
First timer expiration (relative) time in milliseconds.
Definition Async.h:313
Function< void(Result &)> callback
Called after given expiration time since AsyncLoopTimeout::start has passed.
Definition Async.h:311
Starts a wake-up operation, allowing threads to execute callbacks on loop thread.
Definition Async.h:337
SC::Result start(AsyncEventLoop &eventLoop, EventObject &eventObject)
Sets async request members and calls AsyncEventLoop::start.
Function< void(Result &)> callback
Callback called by SC::AsyncEventLoop::run after SC::AsyncLoopWakeUp::wakeUp.
Definition Async.h:350
SC::Result wakeUp(AsyncEventLoop &eventLoop)
Wakes up event loop, scheduling AsyncLoopWakeUp::callback on next AsyncEventLoop::run (or its variati...
Executes work in a thread pool and then invokes a callback on the event loop thread.
Definition Async.h:1043
Function< void(Result &)> callback
Called to execute the work in a background threadpool thread.
Definition Async.h:1054
SC::Result setThreadPool(ThreadPool &threadPool)
Sets the ThreadPool that will supply the thread to run the async work on.
Definition Async.h:374
Starts monitoring a process, notifying about its termination.
Definition Async.h:365
SC::Result start(AsyncEventLoop &eventLoop, FileDescriptor::Handle process)
Sets async request members and calls AsyncEventLoop::start.
Function< void(Result &)> callback
Called when process has exited.
Definition Async.h:390
Base class for all async requests, holding state and type.
Definition Async.h:117
bool isCancelling() const
Returns true if this request is being cancelled.
AsyncRequest(Type type)
Constructs a free async request of given type.
Definition Async.h:156
Result start(AsyncEventLoop &eventLoop)
Shortcut for AsyncEventLoop::start.
uint16_t getUserFlags() const
Gets user flags, holding some meaningful data for the caller.
Definition Async.h:185
Function< void(AsyncResult &)> * getCloseCallback()
Returns currently set close callback (if any) passed to AsyncRequest::stop.
Definition Async.h:188
bool isActive() const
Returns true if this request is active or being reactivated.
bool isFree() const
Returns true if this request is free.
void disableThreadPool()
Disables the thread-pool usage for this request.
Type getType() const
Returns request type.
Definition Async.h:176
void setUserFlags(uint16_t externalFlags)
Sets user flags, holding some meaningful data for the caller.
Definition Async.h:182
Result executeOn(AsyncTaskSequence &task, ThreadPool &pool)
Adds the request to be executed on a specific AsyncTaskSequence.
Result stop(AsyncEventLoop &eventLoop, Function< void(AsyncResult &)> *afterStopped=nullptr)
Ask to stop current async operation.
void executeOn(AsyncSequence &sequence)
Adds the request to be executed on a specific AsyncSequence.
Type
Type of async request.
Definition Async.h:136
Helper holding CompletionData for a specific AsyncRequest-derived class.
Definition Async.h:286
Base class for all async results (argument of completion callbacks).
Definition Async.h:256
const SC::Result & isValid() const
Check if the returnCode of this result is valid.
Definition Async.h:267
AsyncResult(AsyncEventLoop &eventLoop, AsyncRequest &request, SC::Result &res, bool *hasBeenReactivated=nullptr)
Constructs an async result from a request and a result.
Definition Async.h:258
void reactivateRequest(bool shouldBeReactivated)
Ask the event loop to re-activate this request after it was already completed.
Execute AsyncRequests serially, by submitting the next one after the previous one is completed.
Definition Async.h:236
Starts a socket accept operation, obtaining a new socket from a listening socket.
Definition Async.h:468
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &socketDescriptor)
Sets async request members and calls AsyncEventLoop::start.
Starts a socket connect operation, connecting to a remote endpoint.
Definition Async.h:488
Function< void(Result &)> callback
Called after socket is finally connected to endpoint.
Definition Async.h:498
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, SocketIPAddress address)
Sets async request members and calls AsyncEventLoop::start.
Starts an unconnected socket receive from operation, receiving bytes from a remote endpoint.
Definition Async.h:651
Definition Async.h:608
SC::Result get(Span< char > &outData)
Get a Span of the actually read data.
Definition Async.h:614
Starts a socket receive operation, receiving bytes from a remote endpoint.
Definition Async.h:598
AsyncSocketReceive(Type type)
The Socket Descriptor handle to read data from.
Definition Async.h:633
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< char > data)
Sets async request members and calls AsyncEventLoop::start.
Span< char > buffer
The writeable span of memory where to data will be written.
Definition Async.h:629
Function< void(Result &)> callback
Called after data has been received.
Definition Async.h:627
Starts an unconnected socket send to operation, sending bytes to a remote endpoint.
Definition Async.h:566
Starts a socket send operation, sending bytes to a remote endpoint.
Definition Async.h:522
Function< void(Result &)> callback
Called when socket is ready to send more data.
Definition Async.h:537
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< const char > data)
Sets async request members and calls AsyncEventLoop::start.
Span< Span< const char > > buffers
Spans of bytes to send (singleBuffer == false)
Definition Async.h:542
Span< const char > buffer
Span of bytes to send (singleBuffer == true)
Definition Async.h:541
SC::Result start(AsyncEventLoop &eventLoop, const SocketDescriptor &descriptor, Span< Span< const char > > data)
Sets async request members and calls AsyncEventLoop::start.
An AsyncSequence using a SC::ThreadPool to execute one or more SC::AsyncRequest in a background threa...
Definition Async.h:1024
Atomic variables (only for int and bool for now).
Definition Atomic.h:41
An automatically reset event object to synchronize two threads.
Definition Threading.h:229
[UniqueHandleDeclaration2Snippet]
Definition File.h:79
Options used to open a file descriptor.
Definition File.h:50
A structure to describe copy flags.
Definition FileSystem.h:25
Hides implementation details from public headers (static PIMPL).
Definition OpaqueObject.h:31
Read / Write pipe (Process stdin/stdout and IPC communication)
Definition File.h:222
An ascii string used as boolean result. SC_TRY macro forwards errors to caller.
Definition Result.h:13
Low-level OS socket handle.
Definition Socket.h:153
AddressFamily
Sets the address family of an IP Address (IPv4 or IPV6)
Definition Socket.h:60
Native representation of an IP Address.
Definition Socket.h:100
View over a contiguous sequence of items (pointer + size in elements).
Definition Span.h:29
An read-only view over a string (to avoid including Strings library when parsing is not needed).
Definition StringSpan.h:37
A small task containing a function to execute that can be queued in the thread pool.
Definition ThreadPool.h:16
Simple thread pool that executes tasks in a fixed number of worker threads.
Definition ThreadPool.h:38
A native OS thread.
Definition Threading.h:113
A vocabulary type representing a time interval in milliseconds since epoch.
Definition PrimitiveTypes.h:63