URING++
Loading...
Searching...
No Matches
event_loop.h
1#pragma once
2
3#include <bitset>
4#include <cassert>
5#include <cstdint>
6#include <liburing.h>
7#include <memory>
8#include <new>
9#include <stdexcept>
10#include <unordered_set>
11
12#include "uringpp/awaitable.h"
13#include "uringpp/error.h"
14#include "uringpp/task.h"
15
16#include "uringpp/detail/noncopyable.h"
17
18namespace uringpp {
19
20enum class feature {
21 SINGLE_MMAP,
22 NODROP,
23 SUBMIT_STABLE,
24 RW_CUR_POS,
25 CUR_PERSONALITY,
26 FAST_POLL,
27 POLL_32BITS,
28 SQPOLL_NONFIXED,
29 EXT_ARG,
30 NATIVE_WORKERS,
31 RSRC_TAGS,
32};
33
34class event_loop : public noncopyable,
35 public std::enable_shared_from_this<event_loop> {
36 struct io_uring ring_;
37 unsigned int cqe_count_;
38 std::unordered_set<feature> supported_features_;
39 std::bitset<IORING_OP_LAST> supported_ops_;
40 class probe_ring {
41 struct io_uring_probe *probe_;
42
43 public:
44 probe_ring(struct io_uring *ring);
45 std::bitset<IORING_OP_LAST> supported_ops();
46 ~probe_ring();
47 };
48
49 void check_feature(uint32_t features, uint32_t test_bit, enum feature efeat);
50
51 void init_supported_features(struct io_uring_params const &params);
52
53 struct io_uring_sqe *get_sqe() {
54 auto sqe = ::io_uring_get_sqe(&ring_);
55 if (sqe != nullptr) [[likely]] {
56 return sqe;
57 }
58 ::io_uring_cq_advance(&ring_, cqe_count_);
59 cqe_count_ = 0;
60 ::io_uring_submit(&ring_);
61 sqe = ::io_uring_get_sqe(&ring_);
62 if (sqe == nullptr) [[unlikely]] {
63 throw std::runtime_error("failed to allocate sqe");
64 }
65 return sqe;
66 }
67
68 sqe_awaitable await_sqe(struct io_uring_sqe *sqe, uint8_t flags) {
69 ::io_uring_sqe_set_flags(sqe, flags);
70 return sqe_awaitable(sqe);
71 }
72
73public:
74 static std::shared_ptr<event_loop> create(unsigned int entries = 128,
75 uint32_t flags = 0, int wq_fd = -1);
76
77 event_loop(unsigned int entries = 128, uint32_t flags = 0, int wq_fd = -1,
78 int sq_thread_cpu = -1, int sq_thread_idle = -1);
79
80 template <class T> void block_on(task<T> t) {
81 while (!t.h_.done()) {
82 poll();
83 }
84 }
85
86 int fd() const { return ring_.ring_fd; }
87
88 int process_cqe() {
89 io_uring_cqe *cqe;
90 unsigned head;
91
92 io_uring_for_each_cqe(&ring_, head, cqe) {
93 ++cqe_count_;
94 auto awaitable =
95 reinterpret_cast<sqe_awaitable *>(::io_uring_cqe_get_data(cqe));
96 if (awaitable != nullptr) [[likely]] {
97 awaitable->rc_ = cqe->res;
98 awaitable->h_.resume();
99 }
100 }
101 int nr_processed = cqe_count_;
102 ::io_uring_cq_advance(&ring_, cqe_count_);
103 cqe_count_ = 0;
104 return nr_processed;
105 }
106
107 int poll_no_wait() {
108 ::io_uring_submit(&ring_);
109 return process_cqe();
110 }
111
112 void poll() {
113 ::io_uring_submit_and_wait(&ring_, 1);
114 process_cqe();
115 }
116
117 sqe_awaitable openat(int dfd, const char *path, int flags, mode_t mode,
118 uint8_t sqe_flags = 0) {
119 assert(supported_ops_.test(IORING_OP_OPENAT));
120 auto *sqe = get_sqe();
121 ::io_uring_prep_openat(sqe, dfd, path, flags, mode);
122 return await_sqe(sqe, sqe_flags);
123 }
124
125 sqe_awaitable openat2(int dfd, const char *path, struct open_how *how,
126 uint8_t sqe_flags = 0) {
127 assert(supported_ops_.test(IORING_OP_OPENAT2));
128 auto *sqe = get_sqe();
129 ::io_uring_prep_openat2(sqe, dfd, path, how);
130 return await_sqe(sqe, sqe_flags);
131 }
132
133 sqe_awaitable readv(int fd, const iovec *iovecs, unsigned nr_vecs,
134 off_t offset = 0, uint8_t sqe_flags = 0) {
135 assert(supported_ops_.test(IORING_OP_READV));
136 auto *sqe = get_sqe();
137 ::io_uring_prep_readv(sqe, fd, iovecs, nr_vecs, offset);
138 return await_sqe(sqe, sqe_flags);
139 }
140
141 sqe_awaitable writev(int fd, const iovec *iovecs, unsigned nr_vecs,
142 off_t offset = 0, uint8_t sqe_flags = 0) {
143 assert(supported_ops_.test(IORING_OP_WRITEV));
144 auto *sqe = get_sqe();
145 ::io_uring_prep_writev(sqe, fd, iovecs, nr_vecs, offset);
146 return await_sqe(sqe, sqe_flags);
147 }
148
149 sqe_awaitable read(int fd, void *buf, unsigned nbytes, off_t offset = 0,
150 uint8_t sqe_flags = 0) {
151 assert(supported_ops_.test(IORING_OP_READ));
152 auto *sqe = get_sqe();
153 ::io_uring_prep_read(sqe, fd, buf, nbytes, offset);
154 return await_sqe(sqe, sqe_flags);
155 }
156
157 sqe_awaitable write(int fd, const void *buf, unsigned nbytes,
158 off_t offset = 0, uint8_t sqe_flags = 0) {
159 assert(supported_ops_.test(IORING_OP_WRITE));
160 auto *sqe = get_sqe();
161 ::io_uring_prep_write(sqe, fd, buf, nbytes, offset);
162 return await_sqe(sqe, sqe_flags);
163 }
164
165 sqe_awaitable read_fixed(int fd, void *buf, unsigned nbytes, off_t offset,
166 int buf_index, uint8_t sqe_flags = 0) {
167 assert(supported_ops_.test(IORING_OP_READ_FIXED));
168 auto *sqe = get_sqe();
169 ::io_uring_prep_read_fixed(sqe, fd, buf, nbytes, offset, buf_index);
170 return await_sqe(sqe, sqe_flags);
171 }
172
173 sqe_awaitable write_fixed(int fd, const void *buf, unsigned nbytes,
174 off_t offset, int buf_index,
175 uint8_t sqe_flags = 0) {
176 assert(supported_ops_.test(IORING_OP_WRITE_FIXED));
177 auto *sqe = get_sqe();
178 ::io_uring_prep_write_fixed(sqe, fd, buf, nbytes, offset, buf_index);
179 return await_sqe(sqe, sqe_flags);
180 }
181
182 sqe_awaitable fsync(int fd, unsigned fsync_flags, uint8_t sqe_flags = 0) {
183 assert(supported_ops_.test(IORING_OP_FSYNC));
184 auto *sqe = get_sqe();
185 ::io_uring_prep_fsync(sqe, fd, fsync_flags);
186 return await_sqe(sqe, sqe_flags);
187 }
188
189 sqe_awaitable sync_file_range(int fd, off64_t offset, off64_t nbytes,
190 unsigned sync_range_flags,
191 uint8_t sqe_flags = 0) {
192 assert(supported_ops_.test(IORING_OP_SYNC_FILE_RANGE));
193 auto *sqe = get_sqe();
194 ::io_uring_prep_rw(IORING_OP_SYNC_FILE_RANGE, sqe, fd, nullptr, nbytes,
195 offset);
196 sqe->sync_range_flags = sync_range_flags;
197 return await_sqe(sqe, sqe_flags);
198 }
199
200 sqe_awaitable recvmsg(int sockfd, msghdr *msg, uint32_t flags,
201 uint8_t sqe_flags = 0) {
202 assert(supported_ops_.test(IORING_OP_RECVMSG));
203 auto *sqe = get_sqe();
204 ::io_uring_prep_recvmsg(sqe, sockfd, msg, flags);
205 return await_sqe(sqe, sqe_flags);
206 }
207
208 sqe_awaitable sendmsg(int sockfd, const msghdr *msg, uint32_t flags,
209 uint8_t sqe_flags = 0) {
210 assert(supported_ops_.test(IORING_OP_SENDMSG));
211 auto *sqe = get_sqe();
212 ::io_uring_prep_sendmsg(sqe, sockfd, msg, flags);
213 return await_sqe(sqe, sqe_flags);
214 }
215
216 sqe_awaitable recv(int sockfd, void *buf, unsigned nbytes, uint32_t flags,
217 uint8_t sqe_flags = 0) {
218 assert(supported_ops_.test(IORING_OP_RECV));
219 auto *sqe = get_sqe();
220 ::io_uring_prep_recv(sqe, sockfd, buf, nbytes, flags);
221 return await_sqe(sqe, sqe_flags);
222 }
223
224 sqe_awaitable send(int sockfd, const void *buf, unsigned nbytes,
225 uint32_t flags, uint8_t sqe_flags = 0) {
226 assert(supported_ops_.test(IORING_OP_SEND));
227 auto *sqe = get_sqe();
228 ::io_uring_prep_send(sqe, sockfd, buf, nbytes, flags);
229 return await_sqe(sqe, sqe_flags);
230 }
231
232 sqe_awaitable poll_add(int fd, short poll_mask, uint8_t sqe_flags = 0) {
233 assert(supported_ops_.test(IORING_OP_POLL_ADD));
234 auto *sqe = get_sqe();
235 ::io_uring_prep_poll_add(sqe, fd, poll_mask);
236 return await_sqe(sqe, sqe_flags);
237 }
238
239 sqe_awaitable nop(uint8_t sqe_flags = 0) {
240 assert(supported_ops_.test(IORING_OP_NOP));
241 auto *sqe = get_sqe();
242 ::io_uring_prep_nop(sqe);
243 return await_sqe(sqe, sqe_flags);
244 }
245
246 sqe_awaitable accept(int fd, sockaddr *addr, socklen_t *addrlen,
247 int flags = 0, uint8_t sqe_flags = 0) {
248 assert(supported_ops_.test(IORING_OP_ACCEPT));
249 auto *sqe = get_sqe();
250 ::io_uring_prep_accept(sqe, fd, addr, addrlen, flags);
251 return await_sqe(sqe, sqe_flags);
252 }
253
254 sqe_awaitable connect(int fd, sockaddr *addr, socklen_t addrlen,
255 uint8_t sqe_flags = 0) {
256 assert(supported_ops_.test(IORING_OP_CONNECT));
257 auto *sqe = get_sqe();
258 ::io_uring_prep_connect(sqe, fd, addr, addrlen);
259 return await_sqe(sqe, sqe_flags);
260 }
261
262 sqe_awaitable timeout(__kernel_timespec *ts, uint8_t sqe_flags = 0) {
263 assert(supported_ops_.test(IORING_OP_TIMEOUT));
264 auto *sqe = get_sqe();
265 ::io_uring_prep_timeout(sqe, ts, 0, 0);
266 return await_sqe(sqe, sqe_flags);
267 }
268
269 sqe_awaitable close(int fd, uint8_t sqe_flags = 0) {
270 assert(supported_ops_.test(IORING_OP_CLOSE));
271 auto *sqe = get_sqe();
272 ::io_uring_prep_close(sqe, fd);
273 return await_sqe(sqe, sqe_flags);
274 }
275
276 void close_detach(int fd, uint8_t sqe_flags = 0) {
277 assert(supported_ops_.test(IORING_OP_CLOSE));
278 auto *sqe = get_sqe();
279 ::io_uring_prep_close(sqe, fd);
280 ::io_uring_sqe_set_flags(sqe, sqe_flags);
281 ::io_uring_sqe_set_data(sqe, nullptr);
282 }
283
284 sqe_awaitable statx(int dfd, const char *path, int flags, unsigned mask,
285 struct statx *statxbuf, uint8_t sqe_flags = 0) {
286 assert(supported_ops_.test(IORING_OP_STATX));
287 auto *sqe = get_sqe();
288 ::io_uring_prep_statx(sqe, dfd, path, flags, mask, statxbuf);
289 return await_sqe(sqe, sqe_flags);
290 }
291
292 sqe_awaitable splice(int fd_in, loff_t off_in, int fd_out, loff_t off_out,
293 size_t nbytes, unsigned flags, uint8_t sqe_flags = 0) {
294 assert(supported_ops_.test(IORING_OP_SPLICE));
295 auto *sqe = get_sqe();
296 ::io_uring_prep_splice(sqe, fd_in, off_in, fd_out, off_out, nbytes, flags);
297 return await_sqe(sqe, sqe_flags);
298 }
299
300 sqe_awaitable tee(int fd_in, int fd_out, size_t nbytes, unsigned flags,
301 uint8_t sqe_flags = 0) {
302 assert(supported_ops_.test(IORING_OP_TEE));
303 auto *sqe = get_sqe();
304 ::io_uring_prep_tee(sqe, fd_in, fd_out, nbytes, flags);
305 return await_sqe(sqe, sqe_flags);
306 }
307
308 sqe_awaitable shutdown(int fd, int how, uint8_t sqe_flags = 0) {
309 assert(supported_ops_.test(IORING_OP_SHUTDOWN));
310 auto *sqe = get_sqe();
311 ::io_uring_prep_shutdown(sqe, fd, how);
312 return await_sqe(sqe, sqe_flags);
313 }
314
315 sqe_awaitable renameat(int olddfd, const char *oldpath, int newdfd,
316 const char *newpath, unsigned flags,
317 uint8_t sqe_flags = 0) {
318 assert(supported_ops_.test(IORING_OP_RENAMEAT));
319 auto *sqe = get_sqe();
320 ::io_uring_prep_renameat(sqe, olddfd, oldpath, newdfd, newpath, flags);
321 return await_sqe(sqe, sqe_flags);
322 }
323
324 sqe_awaitable mkdirat(int dirfd, const char *pathname, mode_t mode,
325 uint8_t sqe_flags = 0) {
326 assert(supported_ops_.test(IORING_OP_MKDIRAT));
327 auto *sqe = get_sqe();
328 ::io_uring_prep_mkdirat(sqe, dirfd, pathname, mode);
329 return await_sqe(sqe, sqe_flags);
330 }
331
332 sqe_awaitable symlinkat(const char *target, int newdirfd,
333 const char *linkpath, uint8_t sqe_flags = 0) {
334 assert(supported_ops_.test(IORING_OP_SYMLINKAT));
335 auto *sqe = get_sqe();
336 ::io_uring_prep_symlinkat(sqe, target, newdirfd, linkpath);
337 return await_sqe(sqe, sqe_flags);
338 }
339
340 sqe_awaitable linkat(int olddirfd, const char *oldpath, int newdirfd,
341 const char *newpath, int flags, uint8_t sqe_flags = 0) {
342 assert(supported_ops_.test(IORING_OP_LINKAT));
343 auto *sqe = get_sqe();
344 ::io_uring_prep_linkat(sqe, olddirfd, oldpath, newdirfd, newpath, flags);
345 return await_sqe(sqe, sqe_flags);
346 }
347
348 sqe_awaitable unlinkat(int dfd, const char *path, unsigned flags,
349 uint8_t sqe_flags = 0) {
350 assert(supported_ops_.test(IORING_OP_UNLINKAT));
351 auto *sqe = get_sqe();
352 ::io_uring_prep_unlinkat(sqe, dfd, path, flags);
353 return await_sqe(sqe, sqe_flags);
354 }
355
356 void update_files(unsigned off, int *fds, size_t nfds) {
357 check_nerrno(::io_uring_register_files_update(&ring_, off, fds, nfds),
358 "failed to update files");
359 }
360
361 void register_files(int const *fds, size_t nfds) {
362 check_nerrno(::io_uring_register_files(&ring_, fds, nfds),
363 "failed to register files");
364 }
365
366 int unregister_files() { return ::io_uring_unregister_files(&ring_); }
367
368 void register_buffers(struct iovec const *iovecs, unsigned nr_iovecs) {
369 check_nerrno(::io_uring_register_buffers(&ring_, iovecs, nr_iovecs),
370 "failed to register buffers");
371 }
372
373 int unregister_buffers() noexcept {
374 return ::io_uring_unregister_buffers(&ring_);
375 }
376
377 ~event_loop();
378};
379
380} // namespace uringpp
381
Definition: event_loop.h:35
Definition: noncopyable.h:5
Definition: awaitable.h:10
Definition: task.h:53