io_uring/
uring.rs

1// Copyright 2020 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// This file makes several casts from u8 pointers into more-aligned pointer types.
6// We assume that the kernel will give us suitably aligned memory.
7#![allow(clippy::cast_ptr_alignment)]
8
9use std::collections::BTreeMap;
10use std::fs::File;
11use std::io;
12use std::os::unix::io::AsRawFd;
13use std::os::unix::io::FromRawFd;
14use std::os::unix::io::RawFd;
15use std::pin::Pin;
16use std::ptr::null;
17use std::sync::atomic::AtomicPtr;
18use std::sync::atomic::AtomicU32;
19use std::sync::atomic::Ordering;
20
21use base::AsRawDescriptor;
22use base::EventType;
23use base::IoBufMut;
24use base::MappedRegion;
25use base::MemoryMapping;
26use base::MemoryMappingBuilder;
27use base::Protection;
28use base::RawDescriptor;
29use libc::c_void;
30use remain::sorted;
31use sync::Mutex;
32use thiserror::Error as ThisError;
33
34use crate::bindings::*;
35use crate::syscalls::*;
36
37/// Holds per-operation, user specified data. The usage is up to the caller. The most common use is
38/// for callers to identify each request.
39pub type UserData = u64;
40
41#[sorted]
42#[derive(Debug, ThisError)]
43pub enum Error {
44    /// Failed to map the completion ring.
45    #[error("Failed to mmap completion ring {0}")]
46    MappingCompleteRing(base::MmapError),
47    /// Failed to map submit entries.
48    #[error("Failed to mmap submit entries {0}")]
49    MappingSubmitEntries(base::MmapError),
50    /// Failed to map the submit ring.
51    #[error("Failed to mmap submit ring {0}")]
52    MappingSubmitRing(base::MmapError),
53    /// Too many ops are already queued.
54    #[error("No space for more ring entries, try increasing the size passed to `new`")]
55    NoSpace,
56    /// The call to `io_uring_enter` failed with the given errno.
57    #[error("Failed to enter io uring: {0}")]
58    RingEnter(libc::c_int),
59    /// The call to `io_uring_register` failed with the given errno.
60    #[error("Failed to register operations for io uring: {0}")]
61    RingRegister(libc::c_int),
62    /// The call to `io_uring_setup` failed with the given errno.
63    #[error("Failed to setup io uring {0}")]
64    Setup(libc::c_int),
65}
66pub type Result<T> = std::result::Result<T, Error>;
67
68impl From<Error> for io::Error {
69    fn from(e: Error) -> Self {
70        use Error::*;
71        match e {
72            RingEnter(errno) => io::Error::from_raw_os_error(errno),
73            Setup(errno) => io::Error::from_raw_os_error(errno),
74            e => io::Error::other(e),
75        }
76    }
77}
78
79pub struct SubmitQueue {
80    submit_ring: SubmitQueueState,
81    submit_queue_entries: SubmitQueueEntries,
82    submitting: usize, // The number of ops in the process of being submitted.
83    pub added: usize,  // The number of ops added since the last call to `io_uring_enter`.
84    num_sqes: usize,   // The total number of sqes allocated in shared memory.
85}
86
87// Helper functions to set io_uring_sqe bindgen union members in a less verbose manner.
88impl io_uring_sqe {
89    pub fn set_addr(&mut self, val: u64) {
90        self.__bindgen_anon_2.addr = val;
91    }
92    pub fn set_off(&mut self, val: u64) {
93        self.__bindgen_anon_1.off = val;
94    }
95
96    pub fn set_buf_index(&mut self, val: u16) {
97        self.__bindgen_anon_4.buf_index = val;
98    }
99
100    pub fn set_rw_flags(&mut self, val: libc::c_int) {
101        self.__bindgen_anon_3.rw_flags = val;
102    }
103
104    pub fn set_poll_events(&mut self, val: u32) {
105        let val = if cfg!(target_endian = "big") {
106            // Swap words on big-endian platforms to match the original ABI where poll_events was 16
107            // bits wide.
108            val.rotate_left(16)
109        } else {
110            val
111        };
112        self.__bindgen_anon_3.poll32_events = val;
113    }
114}
115
116// Convert a file offset to the raw io_uring offset format.
117// Some => explicit offset
118// None => use current file position
119fn file_offset_to_raw_offset(offset: Option<u64>) -> u64 {
120    // File offsets are interpretted as off64_t inside io_uring, with -1 representing the current
121    // file position.
122    const USE_CURRENT_FILE_POS: libc::off64_t = -1;
123    offset.unwrap_or(USE_CURRENT_FILE_POS as u64)
124}
125
126impl SubmitQueue {
127    // Call `f` with the next available sqe or return an error if none are available.
128    // After `f` returns, the sqe is appended to the kernel's queue.
129    fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
130    where
131        F: FnMut(&mut io_uring_sqe),
132    {
133        if self.added == self.num_sqes {
134            return Err(Error::NoSpace);
135        }
136
137        // Find the next free submission entry in the submit ring and fill it with an iovec.
138        // The below raw pointer derefs are safe because the memory the pointers use lives as long
139        // as the mmap in self.
140        let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
141        let next_tail = tail.wrapping_add(1);
142        if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
143            return Err(Error::NoSpace);
144        }
145        // `tail` is the next sqe to use.
146        let index = (tail & self.submit_ring.ring_mask) as usize;
147        let sqe = self.submit_queue_entries.get_mut(index).unwrap();
148
149        f(sqe);
150
151        // Tells the kernel to use the new index when processing the entry at that index.
152        self.submit_ring.set_array_entry(index, index as u32);
153        // Ensure the above writes to sqe are seen before the tail is updated.
154        // set_tail uses Release ordering when storing to the ring.
155        self.submit_ring.pointers.set_tail(next_tail);
156
157        self.added += 1;
158
159        Ok(())
160    }
161
162    // Returns the number of entries that have been added to this SubmitQueue since the last time
163    // `prepare_submit` was called.
164    fn prepare_submit(&mut self) -> usize {
165        let out = self.added - self.submitting;
166        self.submitting = self.added;
167
168        out
169    }
170
171    // Indicates that we failed to submit `count` entries to the kernel and that they should be
172    // retried.
173    fn fail_submit(&mut self, count: usize) {
174        debug_assert!(count <= self.submitting);
175        self.submitting -= count;
176    }
177
178    // Indicates that `count` entries have been submitted to the kernel and so the space may be
179    // reused for new entries.
180    fn complete_submit(&mut self, count: usize) {
181        debug_assert!(count <= self.submitting);
182        self.submitting -= count;
183        self.added -= count;
184    }
185}
186
187/// Enum to represent all io_uring operations
188#[repr(u32)]
189pub enum URingOperation {
190    Nop = io_uring_op_IORING_OP_NOP,
191    Readv = io_uring_op_IORING_OP_READV,
192    Writev = io_uring_op_IORING_OP_WRITEV,
193    Fsync = io_uring_op_IORING_OP_FSYNC,
194    ReadFixed = io_uring_op_IORING_OP_READ_FIXED,
195    WriteFixed = io_uring_op_IORING_OP_WRITE_FIXED,
196    PollAdd = io_uring_op_IORING_OP_POLL_ADD,
197    PollRemove = io_uring_op_IORING_OP_POLL_REMOVE,
198    SyncFileRange = io_uring_op_IORING_OP_SYNC_FILE_RANGE,
199    Sendmsg = io_uring_op_IORING_OP_SENDMSG,
200    Recvmsg = io_uring_op_IORING_OP_RECVMSG,
201    Timeout = io_uring_op_IORING_OP_TIMEOUT,
202    TimeoutRemove = io_uring_op_IORING_OP_TIMEOUT_REMOVE,
203    Accept = io_uring_op_IORING_OP_ACCEPT,
204    AsyncCancel = io_uring_op_IORING_OP_ASYNC_CANCEL,
205    LinkTimeout = io_uring_op_IORING_OP_LINK_TIMEOUT,
206    Connect = io_uring_op_IORING_OP_CONNECT,
207    Fallocate = io_uring_op_IORING_OP_FALLOCATE,
208    Openat = io_uring_op_IORING_OP_OPENAT,
209    Close = io_uring_op_IORING_OP_CLOSE,
210    FilesUpdate = io_uring_op_IORING_OP_FILES_UPDATE,
211    Statx = io_uring_op_IORING_OP_STATX,
212    Read = io_uring_op_IORING_OP_READ,
213    Write = io_uring_op_IORING_OP_WRITE,
214    Fadvise = io_uring_op_IORING_OP_FADVISE,
215    Madvise = io_uring_op_IORING_OP_MADVISE,
216    Send = io_uring_op_IORING_OP_SEND,
217    Recv = io_uring_op_IORING_OP_RECV,
218    Openat2 = io_uring_op_IORING_OP_OPENAT2,
219    EpollCtl = io_uring_op_IORING_OP_EPOLL_CTL,
220    Splice = io_uring_op_IORING_OP_SPLICE,
221    ProvideBuffers = io_uring_op_IORING_OP_PROVIDE_BUFFERS,
222    RemoveBuffers = io_uring_op_IORING_OP_REMOVE_BUFFERS,
223    Tee = io_uring_op_IORING_OP_TEE,
224    Shutdown = io_uring_op_IORING_OP_SHUTDOWN,
225    Renameat = io_uring_op_IORING_OP_RENAMEAT,
226    Unlinkat = io_uring_op_IORING_OP_UNLINKAT,
227    Mkdirat = io_uring_op_IORING_OP_MKDIRAT,
228    Symlinkat = io_uring_op_IORING_OP_SYMLINKAT,
229    Linkat = io_uring_op_IORING_OP_LINKAT,
230}
231
232/// Represents an allowlist of the restrictions to be registered to a uring.
233#[derive(Default)]
234pub struct URingAllowlist(Vec<io_uring_restriction>);
235
236impl URingAllowlist {
237    /// Create a new `UringAllowList` which allows no operation.
238    pub fn new() -> Self {
239        URingAllowlist::default()
240    }
241
242    /// Allow `operation` to be submitted to the submit queue of the io_uring.
243    pub fn allow_submit_operation(&mut self, operation: URingOperation) -> &mut Self {
244        self.0.push(io_uring_restriction {
245            opcode: io_uring_register_restriction_op_IORING_RESTRICTION_SQE_OP as u16,
246            __bindgen_anon_1: io_uring_restriction__bindgen_ty_1 {
247                sqe_op: operation as u8,
248            },
249            ..Default::default()
250        });
251        self
252    }
253}
254
255/// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations
256/// to the kernel and asynchronously handling the completion of these operations.
257/// Use the various `add_*` functions to configure operations, then call `wait` to start
258/// the operations and get any completed results. Each op is given a u64 user_data argument that is
259/// used to identify the result when returned in the iterator provided by `wait`.
260///
261/// # Example polling an FD for readable status.
262///
263/// ```no_run
264/// # use std::fs::File;
265/// # use std::os::unix::io::AsRawFd;
266/// # use std::path::Path;
267/// # use base::EventType;
268/// # use io_uring::URingContext;
269/// let f = File::open(Path::new("/dev/zero")).unwrap();
270/// let uring = URingContext::new(16, None).unwrap();
271/// uring
272///   .add_poll_fd(f.as_raw_fd(), EventType::Read, 454)
273/// .unwrap();
274/// let (user_data, res) = uring.wait().unwrap().next().unwrap();
275/// assert_eq!(user_data, 454 as io_uring::UserData);
276/// assert_eq!(res.unwrap(), 1 as u32);
277/// ```
278pub struct URingContext {
279    ring_file: File, // Holds the io_uring context FD returned from io_uring_setup.
280    pub submit_ring: Mutex<SubmitQueue>,
281    pub complete_ring: CompleteQueueState,
282}
283
284impl URingContext {
285    /// Creates a `URingContext` where the underlying uring has a space for `num_entries`
286    /// simultaneous operations. If `allowlist` is given, all operations other
287    /// than those explicitly permitted by `allowlist` are prohibited.
288    pub fn new(num_entries: usize, allowlist: Option<&URingAllowlist>) -> Result<URingContext> {
289        let mut ring_params = io_uring_params::default();
290        if allowlist.is_some() {
291            // To register restrictions, a uring must start in a disabled state.
292            ring_params.flags |= IORING_SETUP_R_DISABLED;
293        }
294
295        // SAFETY:
296        // The below unsafe block isolates the creation of the URingContext. Each step on it's own
297        // is unsafe. Using the uring FD for the mapping and the offsets returned by the kernel for
298        // base addresses maintains safety guarantees assuming the kernel API guarantees are
299        // trusted.
300        unsafe {
301            // Safe because the kernel is trusted to only modify params and `File` is created with
302            // an FD that it takes complete ownership of.
303            let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?;
304            let ring_file = File::from_raw_fd(fd);
305
306            // Register the restrictions if it's given
307            if let Some(restrictions) = allowlist {
308                // safe because IORING_REGISTER_RESTRICTIONS does not modify the memory and
309                // `restrictions` contains a valid pointer and length.
310                io_uring_register(
311                    fd,
312                    io_uring_register_op_IORING_REGISTER_RESTRICTIONS,
313                    restrictions.0.as_ptr() as *const c_void,
314                    restrictions.0.len() as u32,
315                )
316                .map_err(Error::RingRegister)?;
317
318                // enables the URingContext since it was started in a disabled state.
319                // safe because IORING_REGISTER_RESTRICTIONS does not modify the memory
320                io_uring_register(
321                    fd,
322                    io_uring_register_op_IORING_REGISTER_ENABLE_RINGS,
323                    null::<c_void>(),
324                    0,
325                )
326                .map_err(Error::RingRegister)?;
327            }
328
329            // Mmap the submit and completion queues.
330            // Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error
331            // is checked.
332            let submit_ring = SubmitQueueState::new(
333                MemoryMappingBuilder::new(
334                    ring_params.sq_off.array as usize
335                        + ring_params.sq_entries as usize * std::mem::size_of::<u32>(),
336                )
337                .from_file(&ring_file)
338                .offset(u64::from(IORING_OFF_SQ_RING))
339                .protection(Protection::read_write())
340                .populate()
341                .build()
342                .map_err(Error::MappingSubmitRing)?,
343                &ring_params,
344            );
345
346            let num_sqe = ring_params.sq_entries as usize;
347            let submit_queue_entries = SubmitQueueEntries {
348                mmap: MemoryMappingBuilder::new(
349                    ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(),
350                )
351                .from_file(&ring_file)
352                .offset(u64::from(IORING_OFF_SQES))
353                .protection(Protection::read_write())
354                .populate()
355                .build()
356                .map_err(Error::MappingSubmitEntries)?,
357                len: num_sqe,
358            };
359
360            let complete_ring = CompleteQueueState::new(
361                MemoryMappingBuilder::new(
362                    ring_params.cq_off.cqes as usize
363                        + ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(),
364                )
365                .from_file(&ring_file)
366                .offset(u64::from(IORING_OFF_CQ_RING))
367                .protection(Protection::read_write())
368                .populate()
369                .build()
370                .map_err(Error::MappingCompleteRing)?,
371                &ring_params,
372            );
373
374            Ok(URingContext {
375                ring_file,
376                submit_ring: Mutex::new(SubmitQueue {
377                    submit_ring,
378                    submit_queue_entries,
379                    submitting: 0,
380                    added: 0,
381                    num_sqes: ring_params.sq_entries as usize,
382                }),
383                complete_ring,
384            })
385        }
386    }
387
388    /// # Safety
389    /// See 'writev' but accepts an iterator instead of a vector if there isn't already a vector in
390    /// existence.
391    pub unsafe fn add_writev_iter<I>(
392        &self,
393        iovecs: I,
394        fd: RawFd,
395        offset: Option<u64>,
396        user_data: UserData,
397    ) -> Result<()>
398    where
399        I: Iterator<Item = libc::iovec>,
400    {
401        self.add_writev(
402            Pin::from(
403                // Safe because the caller is required to guarantee that the memory pointed to by
404                // `iovecs` lives until the transaction is complete and the completion has been
405                // returned from `wait()`.
406                iovecs
407                    .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
408                    .collect::<Vec<_>>()
409                    .into_boxed_slice(),
410            ),
411            fd,
412            offset,
413            user_data,
414        )
415    }
416
417    /// Asynchronously writes to `fd` from the addresses given in `iovecs`.
418    /// # Safety
419    /// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller
420    /// guarantees there are no other references to that memory and that the memory lives until the
421    /// transaction is complete and that completion has been returned from the `wait` function.  In
422    /// addition there must not be any mutable references to the data pointed to by `iovecs` until
423    /// the operation completes.  Ensure that the fd remains open until the op completes as well.
424    /// The iovecs reference must be kept alive until the op returns.
425    pub unsafe fn add_writev(
426        &self,
427        iovecs: Pin<Box<[IoBufMut<'static>]>>,
428        fd: RawFd,
429        offset: Option<u64>,
430        user_data: UserData,
431    ) -> Result<()> {
432        self.submit_ring.lock().prep_next_sqe(|sqe| {
433            sqe.opcode = io_uring_op_IORING_OP_WRITEV as u8;
434            sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
435            sqe.len = iovecs.len() as u32;
436            sqe.set_off(file_offset_to_raw_offset(offset));
437            sqe.set_buf_index(0);
438            sqe.ioprio = 0;
439            sqe.user_data = user_data;
440            sqe.flags = 0;
441            sqe.fd = fd;
442        })?;
443        self.complete_ring.add_op_data(user_data, iovecs);
444        Ok(())
445    }
446
447    /// # Safety
448    /// See 'readv' but accepts an iterator instead of a vector if there isn't already a vector in
449    /// existence.
450    pub unsafe fn add_readv_iter<I>(
451        &self,
452        iovecs: I,
453        fd: RawFd,
454        offset: Option<u64>,
455        user_data: UserData,
456    ) -> Result<()>
457    where
458        I: Iterator<Item = libc::iovec>,
459    {
460        self.add_readv(
461            Pin::from(
462                // Safe because the caller is required to guarantee that the memory pointed to by
463                // `iovecs` lives until the transaction is complete and the completion has been
464                // returned from `wait()`.
465                iovecs
466                    .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
467                    .collect::<Vec<_>>()
468                    .into_boxed_slice(),
469            ),
470            fd,
471            offset,
472            user_data,
473        )
474    }
475
476    /// Asynchronously reads from `fd` to the addresses given in `iovecs`.
477    /// # Safety
478    /// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller
479    /// guarantees there are no other references to that memory and that the memory lives until the
480    /// transaction is complete and that completion has been returned from the `wait` function.  In
481    /// addition there must not be any references to the data pointed to by `iovecs` until the
482    /// operation completes.  Ensure that the fd remains open until the op completes as well.
483    /// The iovecs reference must be kept alive until the op returns.
484    pub unsafe fn add_readv(
485        &self,
486        iovecs: Pin<Box<[IoBufMut<'static>]>>,
487        fd: RawFd,
488        offset: Option<u64>,
489        user_data: UserData,
490    ) -> Result<()> {
491        self.submit_ring.lock().prep_next_sqe(|sqe| {
492            sqe.opcode = io_uring_op_IORING_OP_READV as u8;
493            sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
494            sqe.len = iovecs.len() as u32;
495            sqe.set_off(file_offset_to_raw_offset(offset));
496            sqe.set_buf_index(0);
497            sqe.ioprio = 0;
498            sqe.user_data = user_data;
499            sqe.flags = 0;
500            sqe.fd = fd;
501        })?;
502        self.complete_ring.add_op_data(user_data, iovecs);
503        Ok(())
504    }
505
506    /// Add a no-op operation that doesn't perform any IO. Useful for testing the performance of the
507    /// io_uring itself and for waking up a thread that's blocked inside a wait() call.
508    pub fn add_nop(&self, user_data: UserData) -> Result<()> {
509        self.submit_ring.lock().prep_next_sqe(|sqe| {
510            sqe.opcode = io_uring_op_IORING_OP_NOP as u8;
511            sqe.fd = -1;
512            sqe.user_data = user_data;
513
514            sqe.set_addr(0);
515            sqe.len = 0;
516            sqe.set_off(0);
517            sqe.set_buf_index(0);
518            sqe.set_rw_flags(0);
519            sqe.ioprio = 0;
520            sqe.flags = 0;
521        })
522    }
523
524    /// Syncs all completed operations, the ordering with in-flight async ops is not
525    /// defined.
526    pub fn add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()> {
527        self.submit_ring.lock().prep_next_sqe(|sqe| {
528            sqe.opcode = io_uring_op_IORING_OP_FSYNC as u8;
529            sqe.fd = fd;
530            sqe.user_data = user_data;
531
532            sqe.set_addr(0);
533            sqe.len = 0;
534            sqe.set_off(0);
535            sqe.set_buf_index(0);
536            sqe.set_rw_flags(0);
537            sqe.ioprio = 0;
538            sqe.flags = 0;
539        })
540    }
541
542    /// See the usage of `fallocate`, this asynchronously performs the same operations.
543    pub fn add_fallocate(
544        &self,
545        fd: RawFd,
546        offset: u64,
547        len: u64,
548        mode: u32,
549        user_data: UserData,
550    ) -> Result<()> {
551        // Note that len for fallocate in passed in the addr field of the sqe and the mode uses the
552        // len field.
553        self.submit_ring.lock().prep_next_sqe(|sqe| {
554            sqe.opcode = io_uring_op_IORING_OP_FALLOCATE as u8;
555
556            sqe.fd = fd;
557            sqe.set_addr(len);
558            sqe.len = mode;
559            sqe.set_off(offset);
560            sqe.user_data = user_data;
561
562            sqe.set_buf_index(0);
563            sqe.set_rw_flags(0);
564            sqe.ioprio = 0;
565            sqe.flags = 0;
566        })
567    }
568
569    /// Adds an FD to be polled based on the given flags.
570    /// The user must keep the FD open until the operation completion is returned from
571    /// `wait`.
572    /// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added
573    /// to get future events.
574    pub fn add_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
575        self.submit_ring.lock().prep_next_sqe(|sqe| {
576            sqe.opcode = io_uring_op_IORING_OP_POLL_ADD as u8;
577            sqe.fd = fd;
578            sqe.user_data = user_data;
579            sqe.set_poll_events(events.into());
580
581            sqe.set_addr(0);
582            sqe.len = 0;
583            sqe.set_off(0);
584            sqe.set_buf_index(0);
585            sqe.ioprio = 0;
586            sqe.flags = 0;
587        })
588    }
589
590    /// Removes an FD that was previously added with `add_poll_fd`.
591    pub fn remove_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
592        self.submit_ring.lock().prep_next_sqe(|sqe| {
593            sqe.opcode = io_uring_op_IORING_OP_POLL_REMOVE as u8;
594            sqe.fd = fd;
595            sqe.user_data = user_data;
596            sqe.set_poll_events(events.into());
597
598            sqe.set_addr(0);
599            sqe.len = 0;
600            sqe.set_off(0);
601            sqe.set_buf_index(0);
602            sqe.ioprio = 0;
603            sqe.flags = 0;
604        })
605    }
606
607    /// Attempt to cancel an already issued request. addr must contain the user_data field of the
608    /// request that should be cancelled. The cancellation request will complete with one of the
609    /// following results codes. If found, the res field of the cqe will contain 0. If not found,
610    /// res will contain -ENOENT. If found and attempted cancelled, the res field will contain
611    /// -EALREADY. In this case, the request may or may not terminate. In general, requests that
612    /// are interruptible (like socket IO) will get cancelled, while disk IO requests cannot be
613    /// cancelled if already started.
614    pub fn async_cancel(&self, addr: UserData, user_data: UserData) -> Result<()> {
615        self.submit_ring.lock().prep_next_sqe(|sqe| {
616            sqe.opcode = io_uring_op_IORING_OP_ASYNC_CANCEL as u8;
617            sqe.user_data = user_data;
618            sqe.set_addr(addr);
619
620            sqe.len = 0;
621            sqe.fd = 0;
622            sqe.set_off(0);
623            sqe.set_buf_index(0);
624            sqe.ioprio = 0;
625            sqe.flags = 0;
626        })
627    }
628
629    // Calls io_uring_enter, submitting any new sqes that have been added to the submit queue and
630    // waiting for `wait_nr` operations to complete.
631    fn enter(&self, wait_nr: u64) -> Result<()> {
632        let added = self.submit_ring.lock().prepare_submit();
633        if added == 0 && wait_nr == 0 {
634            return Ok(());
635        }
636
637        let flags = if wait_nr > 0 {
638            IORING_ENTER_GETEVENTS
639        } else {
640            0
641        };
642        let res =
643            // SAFETY:
644            // Safe because the only memory modified is in the completion queue.
645            unsafe { io_uring_enter(self.ring_file.as_raw_fd(), added as u64, wait_nr, flags) };
646
647        // An EINTR means we did successfully submit the events.
648        if res.is_ok() || res == Err(libc::EINTR) {
649            self.submit_ring.lock().complete_submit(added);
650        } else {
651            self.submit_ring.lock().fail_submit(added);
652        }
653
654        match res {
655            Ok(()) => Ok(()),
656            // EBUSY means that some completed events need to be processed before more can
657            // be submitted, so wait for some sqes to finish without submitting new ones.
658            // EINTR means we were interrupted while waiting, so start waiting again.
659            Err(libc::EBUSY) | Err(libc::EINTR) if wait_nr != 0 => {
660                loop {
661                    let res =
662                        // SAFETY:
663                        // Safe because the only memory modified is in the completion queue.
664                        unsafe { io_uring_enter(self.ring_file.as_raw_fd(), 0, wait_nr, flags) };
665                    if res != Err(libc::EINTR) {
666                        return res.map_err(Error::RingEnter);
667                    }
668                }
669            }
670            Err(e) => Err(Error::RingEnter(e)),
671        }
672    }
673
674    /// Sends operations added with the `add_*` functions to the kernel.
675    pub fn submit(&self) -> Result<()> {
676        self.enter(0)
677    }
678
679    /// Sends operations added with the `add_*` functions to the kernel and return an iterator to
680    /// any completed operations. `wait` blocks until at least one completion is ready.  If
681    /// called without any new events added, this simply waits for any existing events to
682    /// complete and returns as soon an one or more is ready.
683    pub fn wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_> {
684        // We only want to wait for events if there aren't already events in the completion queue.
685        let wait_nr = if self.complete_ring.num_ready() > 0 {
686            0
687        } else {
688            1
689        };
690
691        // The CompletionQueue will iterate all completed ops.
692        match self.enter(wait_nr) {
693            Ok(()) => Ok(&self.complete_ring),
694            // If we cannot submit any more entries then we need to pull stuff out of the completion
695            // ring, so just return the completion ring. This can only happen when `wait_nr` is 0 so
696            // we know there are already entries in the completion queue.
697            Err(Error::RingEnter(libc::EBUSY)) => Ok(&self.complete_ring),
698            Err(e) => Err(e),
699        }
700    }
701}
702
703impl AsRawFd for URingContext {
704    fn as_raw_fd(&self) -> RawFd {
705        self.ring_file.as_raw_fd()
706    }
707}
708
709impl AsRawDescriptor for URingContext {
710    fn as_raw_descriptor(&self) -> RawDescriptor {
711        self.ring_file.as_raw_descriptor()
712    }
713}
714
715struct SubmitQueueEntries {
716    mmap: MemoryMapping,
717    len: usize,
718}
719
720impl SubmitQueueEntries {
721    fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> {
722        if index >= self.len {
723            return None;
724        }
725        // SAFETY:
726        // Safe because the mut borrow of self resticts to one mutable reference at a time and
727        // we trust that the kernel has returned enough memory in io_uring_setup and mmap.
728        let mut_ref = unsafe { &mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index) };
729        // Clear any state.
730        *mut_ref = io_uring_sqe::default();
731        Some(mut_ref)
732    }
733}
734
735struct SubmitQueueState {
736    _mmap: MemoryMapping,
737    pointers: QueuePointers,
738    ring_mask: u32,
739    array: AtomicPtr<u32>,
740}
741
742impl SubmitQueueState {
743    // # Safety
744    // Safe iff `mmap` is created by mapping from a uring FD at the SQ_RING offset and params is
745    // the params struct passed to io_uring_setup.
746    unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState {
747        let ptr = mmap.as_ptr();
748        // Transmutes are safe because a u32 is atomic on all supported architectures and the
749        // pointer will live until after self is dropped because the mmap is owned.
750        let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32;
751        let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32;
752        // This offset is guaranteed to be within the mmap so unwrap the result.
753        let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap();
754        let array = AtomicPtr::new(ptr.add(params.sq_off.array as usize) as *mut u32);
755        SubmitQueueState {
756            _mmap: mmap,
757            pointers: QueuePointers { head, tail },
758            ring_mask,
759            array,
760        }
761    }
762
763    // Sets the kernel's array entry at the given `index` to `value`.
764    fn set_array_entry(&self, index: usize, value: u32) {
765        // SAFETY:
766        // Safe because self being constructed from the correct mmap guaratees that the memory is
767        // valid to written.
768        unsafe {
769            std::ptr::write_volatile(self.array.load(Ordering::Relaxed).add(index), value);
770        }
771    }
772}
773
774#[derive(Default)]
775struct CompleteQueueData {
776    //For ops that pass in arrays of iovecs, they need to be valid for the duration of the
777    //operation because the kernel might read them at any time.
778    pending_op_addrs: BTreeMap<UserData, Pin<Box<[IoBufMut<'static>]>>>,
779}
780
781pub struct CompleteQueueState {
782    mmap: MemoryMapping,
783    pointers: QueuePointers,
784    ring_mask: u32,
785    cqes_offset: u32,
786    data: Mutex<CompleteQueueData>,
787}
788
789impl CompleteQueueState {
790    /// # Safety
791    /// Safe iff `mmap` is created by mapping from a uring FD at the CQ_RING offset and params is
792    /// the params struct passed to io_uring_setup.
793    unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState {
794        let ptr = mmap.as_ptr();
795        let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32;
796        let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32;
797        let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap();
798        CompleteQueueState {
799            mmap,
800            pointers: QueuePointers { head, tail },
801            ring_mask,
802            cqes_offset: params.cq_off.cqes,
803            data: Default::default(),
804        }
805    }
806
807    fn add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>) {
808        self.data.lock().pending_op_addrs.insert(user_data, addrs);
809    }
810
811    fn get_cqe(&self, head: u32) -> &io_uring_cqe {
812        // SAFETY:
813        // Safe because we trust that the kernel has returned enough memory in io_uring_setup
814        // and mmap and index is checked within range by the ring_mask.
815        unsafe {
816            let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize)
817                as *const io_uring_cqe;
818
819            let index = head & self.ring_mask;
820
821            &*cqes.add(index as usize)
822        }
823    }
824
825    pub fn num_ready(&self) -> u32 {
826        let tail = self.pointers.tail(Ordering::Acquire);
827        let head = self.pointers.head(Ordering::Relaxed);
828
829        tail.saturating_sub(head)
830    }
831
832    fn pop_front(&self) -> Option<(UserData, std::io::Result<u32>)> {
833        // Take the lock on self.data first so that 2 threads don't try to pop the same completed op
834        // from the queue.
835        let mut data = self.data.lock();
836
837        // Safe because the pointers to the atomics are valid and the cqe must be in range
838        // because the kernel provided mask is applied to the index.
839        let head = self.pointers.head(Ordering::Relaxed);
840
841        // Synchronize the read of tail after the read of head.
842        if head == self.pointers.tail(Ordering::Acquire) {
843            return None;
844        }
845
846        let cqe = self.get_cqe(head);
847        let user_data = cqe.user_data;
848        let res = cqe.res;
849
850        // free the addrs saved for this op.
851        let _ = data.pending_op_addrs.remove(&user_data);
852
853        // Store the new head and ensure the reads above complete before the kernel sees the
854        // update to head, `set_head` uses `Release` ordering
855        let new_head = head.wrapping_add(1);
856        self.pointers.set_head(new_head);
857
858        let io_res = match res {
859            r if r < 0 => Err(std::io::Error::from_raw_os_error(-r)),
860            r => Ok(r as u32),
861        };
862        Some((user_data, io_res))
863    }
864}
865
866// Return the completed ops with their result.
867impl Iterator for &CompleteQueueState {
868    type Item = (UserData, std::io::Result<u32>);
869
870    fn next(&mut self) -> Option<Self::Item> {
871        self.pop_front()
872    }
873}
874
875struct QueuePointers {
876    head: *const AtomicU32,
877    tail: *const AtomicU32,
878}
879
880// SAFETY:
881// Rust pointers don't implement Send or Sync but in this case both fields are atomics and so it's
882// safe to send the pointers between threads or access them concurrently from multiple threads.
883unsafe impl Send for QueuePointers {}
884// SAFETY: See safety comments for impl Send
885unsafe impl Sync for QueuePointers {}
886
887impl QueuePointers {
888    // Loads the tail pointer atomically with the given ordering.
889    fn tail(&self, ordering: Ordering) -> u32 {
890        // SAFETY:
891        // Safe because self being constructed from the correct mmap guaratees that the memory is
892        // valid to read.
893        unsafe { (*self.tail).load(ordering) }
894    }
895
896    // Stores the new value of the tail in the submit queue. This allows the kernel to start
897    // processing entries that have been added up until the given tail pointer.
898    // Always stores with release ordering as that is the only valid way to use the pointer.
899    fn set_tail(&self, next_tail: u32) {
900        // SAFETY:
901        // Safe because self being constructed from the correct mmap guaratees that the memory is
902        // valid to read and it's used as an atomic to cover mutability concerns.
903        unsafe { (*self.tail).store(next_tail, Ordering::Release) }
904    }
905
906    // Loads the head pointer atomically with the given ordering.
907    fn head(&self, ordering: Ordering) -> u32 {
908        // SAFETY:
909        // Safe because self being constructed from the correct mmap guaratees that the memory is
910        // valid to read.
911        unsafe { (*self.head).load(ordering) }
912    }
913
914    // Stores the new value of the head in the submit queue. This allows the kernel to start
915    // processing entries that have been added up until the given head pointer.
916    // Always stores with release ordering as that is the only valid way to use the pointer.
917    fn set_head(&self, next_head: u32) {
918        // SAFETY:
919        // Safe because self being constructed from the correct mmap guaratees that the memory is
920        // valid to read and it's used as an atomic to cover mutability concerns.
921        unsafe { (*self.head).store(next_head, Ordering::Release) }
922    }
923}