10#include <unordered_set>
12#include "uringpp/awaitable.h"
13#include "uringpp/error.h"
14#include "uringpp/task.h"
16#include "uringpp/detail/noncopyable.h"
35 public std::enable_shared_from_this<event_loop> {
36 struct io_uring ring_;
37 unsigned int cqe_count_;
38 std::unordered_set<feature> supported_features_;
39 std::bitset<IORING_OP_LAST> supported_ops_;
41 struct io_uring_probe *probe_;
44 probe_ring(
struct io_uring *ring);
45 std::bitset<IORING_OP_LAST> supported_ops();
49 void check_feature(uint32_t features, uint32_t test_bit,
enum feature efeat);
51 void init_supported_features(
struct io_uring_params
const ¶ms);
53 struct io_uring_sqe *get_sqe() {
54 auto sqe = ::io_uring_get_sqe(&ring_);
55 if (sqe !=
nullptr) [[likely]] {
58 ::io_uring_cq_advance(&ring_, cqe_count_);
60 ::io_uring_submit(&ring_);
61 sqe = ::io_uring_get_sqe(&ring_);
62 if (sqe ==
nullptr) [[unlikely]] {
63 throw std::runtime_error(
"failed to allocate sqe");
68 sqe_awaitable await_sqe(
struct io_uring_sqe *sqe, uint8_t flags) {
69 ::io_uring_sqe_set_flags(sqe, flags);
74 static std::shared_ptr<event_loop> create(
unsigned int entries = 128,
75 uint32_t flags = 0,
int wq_fd = -1);
77 event_loop(
unsigned int entries = 128, uint32_t flags = 0,
int wq_fd = -1,
78 int sq_thread_cpu = -1,
int sq_thread_idle = -1);
80 template <
class T>
void block_on(
task<T> t) {
81 while (!t.h_.done()) {
86 int fd()
const {
return ring_.ring_fd; }
92 io_uring_for_each_cqe(&ring_, head, cqe) {
95 reinterpret_cast<sqe_awaitable *
>(::io_uring_cqe_get_data(cqe));
96 if (awaitable !=
nullptr) [[likely]] {
97 awaitable->rc_ = cqe->res;
98 awaitable->h_.resume();
101 int nr_processed = cqe_count_;
102 ::io_uring_cq_advance(&ring_, cqe_count_);
108 ::io_uring_submit(&ring_);
109 return process_cqe();
113 ::io_uring_submit_and_wait(&ring_, 1);
117 sqe_awaitable openat(
int dfd,
const char *path,
int flags, mode_t mode,
118 uint8_t sqe_flags = 0) {
119 assert(supported_ops_.test(IORING_OP_OPENAT));
120 auto *sqe = get_sqe();
121 ::io_uring_prep_openat(sqe, dfd, path, flags, mode);
122 return await_sqe(sqe, sqe_flags);
125 sqe_awaitable openat2(
int dfd,
const char *path,
struct open_how *how,
126 uint8_t sqe_flags = 0) {
127 assert(supported_ops_.test(IORING_OP_OPENAT2));
128 auto *sqe = get_sqe();
129 ::io_uring_prep_openat2(sqe, dfd, path, how);
130 return await_sqe(sqe, sqe_flags);
133 sqe_awaitable readv(
int fd,
const iovec *iovecs,
unsigned nr_vecs,
134 off_t offset = 0, uint8_t sqe_flags = 0) {
135 assert(supported_ops_.test(IORING_OP_READV));
136 auto *sqe = get_sqe();
137 ::io_uring_prep_readv(sqe, fd, iovecs, nr_vecs, offset);
138 return await_sqe(sqe, sqe_flags);
141 sqe_awaitable writev(
int fd,
const iovec *iovecs,
unsigned nr_vecs,
142 off_t offset = 0, uint8_t sqe_flags = 0) {
143 assert(supported_ops_.test(IORING_OP_WRITEV));
144 auto *sqe = get_sqe();
145 ::io_uring_prep_writev(sqe, fd, iovecs, nr_vecs, offset);
146 return await_sqe(sqe, sqe_flags);
149 sqe_awaitable read(
int fd,
void *buf,
unsigned nbytes, off_t offset = 0,
150 uint8_t sqe_flags = 0) {
151 assert(supported_ops_.test(IORING_OP_READ));
152 auto *sqe = get_sqe();
153 ::io_uring_prep_read(sqe, fd, buf, nbytes, offset);
154 return await_sqe(sqe, sqe_flags);
157 sqe_awaitable write(
int fd,
const void *buf,
unsigned nbytes,
158 off_t offset = 0, uint8_t sqe_flags = 0) {
159 assert(supported_ops_.test(IORING_OP_WRITE));
160 auto *sqe = get_sqe();
161 ::io_uring_prep_write(sqe, fd, buf, nbytes, offset);
162 return await_sqe(sqe, sqe_flags);
165 sqe_awaitable read_fixed(
int fd,
void *buf,
unsigned nbytes, off_t offset,
166 int buf_index, uint8_t sqe_flags = 0) {
167 assert(supported_ops_.test(IORING_OP_READ_FIXED));
168 auto *sqe = get_sqe();
169 ::io_uring_prep_read_fixed(sqe, fd, buf, nbytes, offset, buf_index);
170 return await_sqe(sqe, sqe_flags);
173 sqe_awaitable write_fixed(
int fd,
const void *buf,
unsigned nbytes,
174 off_t offset,
int buf_index,
175 uint8_t sqe_flags = 0) {
176 assert(supported_ops_.test(IORING_OP_WRITE_FIXED));
177 auto *sqe = get_sqe();
178 ::io_uring_prep_write_fixed(sqe, fd, buf, nbytes, offset, buf_index);
179 return await_sqe(sqe, sqe_flags);
182 sqe_awaitable fsync(
int fd,
unsigned fsync_flags, uint8_t sqe_flags = 0) {
183 assert(supported_ops_.test(IORING_OP_FSYNC));
184 auto *sqe = get_sqe();
185 ::io_uring_prep_fsync(sqe, fd, fsync_flags);
186 return await_sqe(sqe, sqe_flags);
189 sqe_awaitable sync_file_range(
int fd, off64_t offset, off64_t nbytes,
190 unsigned sync_range_flags,
191 uint8_t sqe_flags = 0) {
192 assert(supported_ops_.test(IORING_OP_SYNC_FILE_RANGE));
193 auto *sqe = get_sqe();
194 ::io_uring_prep_rw(IORING_OP_SYNC_FILE_RANGE, sqe, fd,
nullptr, nbytes,
196 sqe->sync_range_flags = sync_range_flags;
197 return await_sqe(sqe, sqe_flags);
200 sqe_awaitable recvmsg(
int sockfd, msghdr *msg, uint32_t flags,
201 uint8_t sqe_flags = 0) {
202 assert(supported_ops_.test(IORING_OP_RECVMSG));
203 auto *sqe = get_sqe();
204 ::io_uring_prep_recvmsg(sqe, sockfd, msg, flags);
205 return await_sqe(sqe, sqe_flags);
208 sqe_awaitable sendmsg(
int sockfd,
const msghdr *msg, uint32_t flags,
209 uint8_t sqe_flags = 0) {
210 assert(supported_ops_.test(IORING_OP_SENDMSG));
211 auto *sqe = get_sqe();
212 ::io_uring_prep_sendmsg(sqe, sockfd, msg, flags);
213 return await_sqe(sqe, sqe_flags);
216 sqe_awaitable recv(
int sockfd,
void *buf,
unsigned nbytes, uint32_t flags,
217 uint8_t sqe_flags = 0) {
218 assert(supported_ops_.test(IORING_OP_RECV));
219 auto *sqe = get_sqe();
220 ::io_uring_prep_recv(sqe, sockfd, buf, nbytes, flags);
221 return await_sqe(sqe, sqe_flags);
224 sqe_awaitable send(
int sockfd,
const void *buf,
unsigned nbytes,
225 uint32_t flags, uint8_t sqe_flags = 0) {
226 assert(supported_ops_.test(IORING_OP_SEND));
227 auto *sqe = get_sqe();
228 ::io_uring_prep_send(sqe, sockfd, buf, nbytes, flags);
229 return await_sqe(sqe, sqe_flags);
232 sqe_awaitable poll_add(
int fd,
short poll_mask, uint8_t sqe_flags = 0) {
233 assert(supported_ops_.test(IORING_OP_POLL_ADD));
234 auto *sqe = get_sqe();
235 ::io_uring_prep_poll_add(sqe, fd, poll_mask);
236 return await_sqe(sqe, sqe_flags);
240 assert(supported_ops_.test(IORING_OP_NOP));
241 auto *sqe = get_sqe();
242 ::io_uring_prep_nop(sqe);
243 return await_sqe(sqe, sqe_flags);
246 sqe_awaitable accept(
int fd, sockaddr *addr, socklen_t *addrlen,
247 int flags = 0, uint8_t sqe_flags = 0) {
248 assert(supported_ops_.test(IORING_OP_ACCEPT));
249 auto *sqe = get_sqe();
250 ::io_uring_prep_accept(sqe, fd, addr, addrlen, flags);
251 return await_sqe(sqe, sqe_flags);
254 sqe_awaitable connect(
int fd, sockaddr *addr, socklen_t addrlen,
255 uint8_t sqe_flags = 0) {
256 assert(supported_ops_.test(IORING_OP_CONNECT));
257 auto *sqe = get_sqe();
258 ::io_uring_prep_connect(sqe, fd, addr, addrlen);
259 return await_sqe(sqe, sqe_flags);
262 sqe_awaitable timeout(__kernel_timespec *ts, uint8_t sqe_flags = 0) {
263 assert(supported_ops_.test(IORING_OP_TIMEOUT));
264 auto *sqe = get_sqe();
265 ::io_uring_prep_timeout(sqe, ts, 0, 0);
266 return await_sqe(sqe, sqe_flags);
270 assert(supported_ops_.test(IORING_OP_CLOSE));
271 auto *sqe = get_sqe();
272 ::io_uring_prep_close(sqe, fd);
273 return await_sqe(sqe, sqe_flags);
276 void close_detach(
int fd, uint8_t sqe_flags = 0) {
277 assert(supported_ops_.test(IORING_OP_CLOSE));
278 auto *sqe = get_sqe();
279 ::io_uring_prep_close(sqe, fd);
280 ::io_uring_sqe_set_flags(sqe, sqe_flags);
281 ::io_uring_sqe_set_data(sqe,
nullptr);
284 sqe_awaitable statx(
int dfd,
const char *path,
int flags,
unsigned mask,
285 struct statx *statxbuf, uint8_t sqe_flags = 0) {
286 assert(supported_ops_.test(IORING_OP_STATX));
287 auto *sqe = get_sqe();
288 ::io_uring_prep_statx(sqe, dfd, path, flags, mask, statxbuf);
289 return await_sqe(sqe, sqe_flags);
292 sqe_awaitable splice(
int fd_in, loff_t off_in,
int fd_out, loff_t off_out,
293 size_t nbytes,
unsigned flags, uint8_t sqe_flags = 0) {
294 assert(supported_ops_.test(IORING_OP_SPLICE));
295 auto *sqe = get_sqe();
296 ::io_uring_prep_splice(sqe, fd_in, off_in, fd_out, off_out, nbytes, flags);
297 return await_sqe(sqe, sqe_flags);
300 sqe_awaitable tee(
int fd_in,
int fd_out,
size_t nbytes,
unsigned flags,
301 uint8_t sqe_flags = 0) {
302 assert(supported_ops_.test(IORING_OP_TEE));
303 auto *sqe = get_sqe();
304 ::io_uring_prep_tee(sqe, fd_in, fd_out, nbytes, flags);
305 return await_sqe(sqe, sqe_flags);
308 sqe_awaitable shutdown(
int fd,
int how, uint8_t sqe_flags = 0) {
309 assert(supported_ops_.test(IORING_OP_SHUTDOWN));
310 auto *sqe = get_sqe();
311 ::io_uring_prep_shutdown(sqe, fd, how);
312 return await_sqe(sqe, sqe_flags);
315 sqe_awaitable renameat(
int olddfd,
const char *oldpath,
int newdfd,
316 const char *newpath,
unsigned flags,
317 uint8_t sqe_flags = 0) {
318 assert(supported_ops_.test(IORING_OP_RENAMEAT));
319 auto *sqe = get_sqe();
320 ::io_uring_prep_renameat(sqe, olddfd, oldpath, newdfd, newpath, flags);
321 return await_sqe(sqe, sqe_flags);
324 sqe_awaitable mkdirat(
int dirfd,
const char *pathname, mode_t mode,
325 uint8_t sqe_flags = 0) {
326 assert(supported_ops_.test(IORING_OP_MKDIRAT));
327 auto *sqe = get_sqe();
328 ::io_uring_prep_mkdirat(sqe, dirfd, pathname, mode);
329 return await_sqe(sqe, sqe_flags);
333 const char *linkpath, uint8_t sqe_flags = 0) {
334 assert(supported_ops_.test(IORING_OP_SYMLINKAT));
335 auto *sqe = get_sqe();
336 ::io_uring_prep_symlinkat(sqe, target, newdirfd, linkpath);
337 return await_sqe(sqe, sqe_flags);
340 sqe_awaitable linkat(
int olddirfd,
const char *oldpath,
int newdirfd,
341 const char *newpath,
int flags, uint8_t sqe_flags = 0) {
342 assert(supported_ops_.test(IORING_OP_LINKAT));
343 auto *sqe = get_sqe();
344 ::io_uring_prep_linkat(sqe, olddirfd, oldpath, newdirfd, newpath, flags);
345 return await_sqe(sqe, sqe_flags);
348 sqe_awaitable unlinkat(
int dfd,
const char *path,
unsigned flags,
349 uint8_t sqe_flags = 0) {
350 assert(supported_ops_.test(IORING_OP_UNLINKAT));
351 auto *sqe = get_sqe();
352 ::io_uring_prep_unlinkat(sqe, dfd, path, flags);
353 return await_sqe(sqe, sqe_flags);
356 void update_files(
unsigned off,
int *fds,
size_t nfds) {
357 check_nerrno(::io_uring_register_files_update(&ring_, off, fds, nfds),
358 "failed to update files");
361 void register_files(
int const *fds,
size_t nfds) {
362 check_nerrno(::io_uring_register_files(&ring_, fds, nfds),
363 "failed to register files");
366 int unregister_files() { return ::io_uring_unregister_files(&ring_); }
368 void register_buffers(
struct iovec
const *iovecs,
unsigned nr_iovecs) {
369 check_nerrno(::io_uring_register_buffers(&ring_, iovecs, nr_iovecs),
370 "failed to register buffers");
373 int unregister_buffers()
noexcept {
374 return ::io_uring_unregister_buffers(&ring_);
Definition: event_loop.h:35
Definition: noncopyable.h:5
Definition: awaitable.h:10