1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918
// Copyright 2020 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// This file makes several casts from u8 pointers into more-aligned pointer types.
// We assume that the kernel will give us suitably aligned memory.
#![allow(clippy::cast_ptr_alignment)]
use std::collections::BTreeMap;
use std::fs::File;
use std::io;
use std::os::unix::io::AsRawFd;
use std::os::unix::io::FromRawFd;
use std::os::unix::io::RawFd;
use std::pin::Pin;
use std::ptr::null;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use base::AsRawDescriptor;
use base::EventType;
use base::IoBufMut;
use base::MappedRegion;
use base::MemoryMapping;
use base::MemoryMappingBuilder;
use base::Protection;
use base::RawDescriptor;
use libc::c_void;
use remain::sorted;
use sync::Mutex;
use thiserror::Error as ThisError;
use crate::bindings::*;
use crate::syscalls::*;
/// Holds per-operation, user specified data. The usage is up to the caller. The most common use is
/// for callers to identify each request.
pub type UserData = u64;
#[sorted]
#[derive(Debug, ThisError)]
pub enum Error {
/// Failed to map the completion ring.
#[error("Failed to mmap completion ring {0}")]
MappingCompleteRing(base::MmapError),
/// Failed to map submit entries.
#[error("Failed to mmap submit entries {0}")]
MappingSubmitEntries(base::MmapError),
/// Failed to map the submit ring.
#[error("Failed to mmap submit ring {0}")]
MappingSubmitRing(base::MmapError),
/// Too many ops are already queued.
#[error("No space for more ring entries, try increasing the size passed to `new`")]
NoSpace,
/// The call to `io_uring_enter` failed with the given errno.
#[error("Failed to enter io uring: {0}")]
RingEnter(libc::c_int),
/// The call to `io_uring_register` failed with the given errno.
#[error("Failed to register operations for io uring: {0}")]
RingRegister(libc::c_int),
/// The call to `io_uring_setup` failed with the given errno.
#[error("Failed to setup io uring {0}")]
Setup(libc::c_int),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
RingEnter(errno) => io::Error::from_raw_os_error(errno),
Setup(errno) => io::Error::from_raw_os_error(errno),
e => io::Error::new(io::ErrorKind::Other, e),
}
}
}
pub struct SubmitQueue {
submit_ring: SubmitQueueState,
submit_queue_entries: SubmitQueueEntries,
submitting: usize, // The number of ops in the process of being submitted.
pub added: usize, // The number of ops added since the last call to `io_uring_enter`.
num_sqes: usize, // The total number of sqes allocated in shared memory.
}
// Helper functions to set io_uring_sqe bindgen union members in a less verbose manner.
impl io_uring_sqe {
pub fn set_addr(&mut self, val: u64) {
self.__bindgen_anon_2.addr = val;
}
pub fn set_off(&mut self, val: u64) {
self.__bindgen_anon_1.off = val;
}
pub fn set_buf_index(&mut self, val: u16) {
self.__bindgen_anon_4.buf_index = val;
}
pub fn set_rw_flags(&mut self, val: libc::c_int) {
self.__bindgen_anon_3.rw_flags = val;
}
pub fn set_poll_events(&mut self, val: u32) {
let val = if cfg!(target_endian = "big") {
// Swap words on big-endian platforms to match the original ABI where poll_events was 16
// bits wide.
val.rotate_left(16)
} else {
val
};
self.__bindgen_anon_3.poll32_events = val;
}
}
// Convert a file offset to the raw io_uring offset format.
// Some => explicit offset
// None => use current file position
fn file_offset_to_raw_offset(offset: Option<u64>) -> u64 {
// File offsets are interpretted as off64_t inside io_uring, with -1 representing the current
// file position.
const USE_CURRENT_FILE_POS: libc::off64_t = -1;
offset.unwrap_or(USE_CURRENT_FILE_POS as u64)
}
impl SubmitQueue {
// Call `f` with the next available sqe or return an error if none are available.
// After `f` returns, the sqe is appended to the kernel's queue.
fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
where
F: FnMut(&mut io_uring_sqe),
{
if self.added == self.num_sqes {
return Err(Error::NoSpace);
}
// Find the next free submission entry in the submit ring and fill it with an iovec.
// The below raw pointer derefs are safe because the memory the pointers use lives as long
// as the mmap in self.
let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
let next_tail = tail.wrapping_add(1);
if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
return Err(Error::NoSpace);
}
// `tail` is the next sqe to use.
let index = (tail & self.submit_ring.ring_mask) as usize;
let sqe = self.submit_queue_entries.get_mut(index).unwrap();
f(sqe);
// Tells the kernel to use the new index when processing the entry at that index.
self.submit_ring.set_array_entry(index, index as u32);
// Ensure the above writes to sqe are seen before the tail is updated.
// set_tail uses Release ordering when storing to the ring.
self.submit_ring.pointers.set_tail(next_tail);
self.added += 1;
Ok(())
}
// Returns the number of entries that have been added to this SubmitQueue since the last time
// `prepare_submit` was called.
fn prepare_submit(&mut self) -> usize {
let out = self.added - self.submitting;
self.submitting = self.added;
out
}
// Indicates that we failed to submit `count` entries to the kernel and that they should be
// retried.
fn fail_submit(&mut self, count: usize) {
debug_assert!(count <= self.submitting);
self.submitting -= count;
}
// Indicates that `count` entries have been submitted to the kernel and so the space may be
// reused for new entries.
fn complete_submit(&mut self, count: usize) {
debug_assert!(count <= self.submitting);
self.submitting -= count;
self.added -= count;
}
}
/// Enum to represent all io_uring operations
#[repr(u32)]
pub enum URingOperation {
Nop = io_uring_op_IORING_OP_NOP,
Readv = io_uring_op_IORING_OP_READV,
Writev = io_uring_op_IORING_OP_WRITEV,
Fsync = io_uring_op_IORING_OP_FSYNC,
ReadFixed = io_uring_op_IORING_OP_READ_FIXED,
WriteFixed = io_uring_op_IORING_OP_WRITE_FIXED,
PollAdd = io_uring_op_IORING_OP_POLL_ADD,
PollRemove = io_uring_op_IORING_OP_POLL_REMOVE,
SyncFileRange = io_uring_op_IORING_OP_SYNC_FILE_RANGE,
Sendmsg = io_uring_op_IORING_OP_SENDMSG,
Recvmsg = io_uring_op_IORING_OP_RECVMSG,
Timeout = io_uring_op_IORING_OP_TIMEOUT,
TimeoutRemove = io_uring_op_IORING_OP_TIMEOUT_REMOVE,
Accept = io_uring_op_IORING_OP_ACCEPT,
AsyncCancel = io_uring_op_IORING_OP_ASYNC_CANCEL,
LinkTimeout = io_uring_op_IORING_OP_LINK_TIMEOUT,
Connect = io_uring_op_IORING_OP_CONNECT,
Fallocate = io_uring_op_IORING_OP_FALLOCATE,
Openat = io_uring_op_IORING_OP_OPENAT,
Close = io_uring_op_IORING_OP_CLOSE,
FilesUpdate = io_uring_op_IORING_OP_FILES_UPDATE,
Statx = io_uring_op_IORING_OP_STATX,
Read = io_uring_op_IORING_OP_READ,
Write = io_uring_op_IORING_OP_WRITE,
Fadvise = io_uring_op_IORING_OP_FADVISE,
Madvise = io_uring_op_IORING_OP_MADVISE,
Send = io_uring_op_IORING_OP_SEND,
Recv = io_uring_op_IORING_OP_RECV,
Openat2 = io_uring_op_IORING_OP_OPENAT2,
EpollCtl = io_uring_op_IORING_OP_EPOLL_CTL,
Splice = io_uring_op_IORING_OP_SPLICE,
ProvideBuffers = io_uring_op_IORING_OP_PROVIDE_BUFFERS,
RemoveBuffers = io_uring_op_IORING_OP_REMOVE_BUFFERS,
Tee = io_uring_op_IORING_OP_TEE,
Shutdown = io_uring_op_IORING_OP_SHUTDOWN,
Renameat = io_uring_op_IORING_OP_RENAMEAT,
Unlinkat = io_uring_op_IORING_OP_UNLINKAT,
Mkdirat = io_uring_op_IORING_OP_MKDIRAT,
Symlinkat = io_uring_op_IORING_OP_SYMLINKAT,
Linkat = io_uring_op_IORING_OP_LINKAT,
}
/// Represents an allowlist of the restrictions to be registered to a uring.
#[derive(Default)]
pub struct URingAllowlist(Vec<io_uring_restriction>);
impl URingAllowlist {
/// Create a new `UringAllowList` which allows no operation.
pub fn new() -> Self {
URingAllowlist::default()
}
/// Allow `operation` to be submitted to the submit queue of the io_uring.
pub fn allow_submit_operation(&mut self, operation: URingOperation) -> &mut Self {
self.0.push(io_uring_restriction {
opcode: IORING_RESTRICTION_SQE_OP as u16,
__bindgen_anon_1: io_uring_restriction__bindgen_ty_1 {
sqe_op: operation as u8,
},
..Default::default()
});
self
}
}
/// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations
/// to the kernel and asynchronously handling the completion of these operations.
/// Use the various `add_*` functions to configure operations, then call `wait` to start
/// the operations and get any completed results. Each op is given a u64 user_data argument that is
/// used to identify the result when returned in the iterator provided by `wait`.
///
/// # Example polling an FD for readable status.
///
/// ```no_run
/// # use std::fs::File;
/// # use std::os::unix::io::AsRawFd;
/// # use std::path::Path;
/// # use base::EventType;
/// # use io_uring::URingContext;
/// let f = File::open(Path::new("/dev/zero")).unwrap();
/// let uring = URingContext::new(16, None).unwrap();
/// uring
/// .add_poll_fd(f.as_raw_fd(), EventType::Read, 454)
/// .unwrap();
/// let (user_data, res) = uring.wait().unwrap().next().unwrap();
/// assert_eq!(user_data, 454 as io_uring::UserData);
/// assert_eq!(res.unwrap(), 1 as u32);
/// ```
pub struct URingContext {
ring_file: File, // Holds the io_uring context FD returned from io_uring_setup.
pub submit_ring: Mutex<SubmitQueue>,
pub complete_ring: CompleteQueueState,
}
impl URingContext {
/// Creates a `URingContext` where the underlying uring has a space for `num_entries`
/// simultaneous operations. If `allowlist` is given, all operations other
/// than those explicitly permitted by `allowlist` are prohibited.
pub fn new(num_entries: usize, allowlist: Option<&URingAllowlist>) -> Result<URingContext> {
let mut ring_params = io_uring_params::default();
if allowlist.is_some() {
// To register restrictions, a uring must start in a disabled state.
ring_params.flags |= IORING_SETUP_R_DISABLED;
}
// SAFETY:
// The below unsafe block isolates the creation of the URingContext. Each step on it's own
// is unsafe. Using the uring FD for the mapping and the offsets returned by the kernel for
// base addresses maintains safety guarantees assuming the kernel API guarantees are
// trusted.
unsafe {
// Safe because the kernel is trusted to only modify params and `File` is created with
// an FD that it takes complete ownership of.
let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?;
let ring_file = File::from_raw_fd(fd);
// Register the restrictions if it's given
if let Some(restrictions) = allowlist {
// safe because IORING_REGISTER_RESTRICTIONS does not modify the memory and
// `restrictions` contains a valid pointer and length.
io_uring_register(
fd,
IORING_REGISTER_RESTRICTIONS,
restrictions.0.as_ptr() as *const c_void,
restrictions.0.len() as u32,
)
.map_err(Error::RingRegister)?;
// enables the URingContext since it was started in a disabled state.
// safe because IORING_REGISTER_RESTRICTIONS does not modify the memory
io_uring_register(fd, IORING_REGISTER_ENABLE_RINGS, null::<c_void>(), 0)
.map_err(Error::RingRegister)?;
}
// Mmap the submit and completion queues.
// Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error
// is checked.
let submit_ring = SubmitQueueState::new(
MemoryMappingBuilder::new(
ring_params.sq_off.array as usize
+ ring_params.sq_entries as usize * std::mem::size_of::<u32>(),
)
.from_file(&ring_file)
.offset(u64::from(IORING_OFF_SQ_RING))
.protection(Protection::read_write())
.populate()
.build()
.map_err(Error::MappingSubmitRing)?,
&ring_params,
);
let num_sqe = ring_params.sq_entries as usize;
let submit_queue_entries = SubmitQueueEntries {
mmap: MemoryMappingBuilder::new(
ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(),
)
.from_file(&ring_file)
.offset(u64::from(IORING_OFF_SQES))
.protection(Protection::read_write())
.populate()
.build()
.map_err(Error::MappingSubmitEntries)?,
len: num_sqe,
};
let complete_ring = CompleteQueueState::new(
MemoryMappingBuilder::new(
ring_params.cq_off.cqes as usize
+ ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(),
)
.from_file(&ring_file)
.offset(u64::from(IORING_OFF_CQ_RING))
.protection(Protection::read_write())
.populate()
.build()
.map_err(Error::MappingCompleteRing)?,
&ring_params,
);
Ok(URingContext {
ring_file,
submit_ring: Mutex::new(SubmitQueue {
submit_ring,
submit_queue_entries,
submitting: 0,
added: 0,
num_sqes: ring_params.sq_entries as usize,
}),
complete_ring,
})
}
}
/// # Safety
/// See 'writev' but accepts an iterator instead of a vector if there isn't already a vector in
/// existence.
pub unsafe fn add_writev_iter<I>(
&self,
iovecs: I,
fd: RawFd,
offset: Option<u64>,
user_data: UserData,
) -> Result<()>
where
I: Iterator<Item = libc::iovec>,
{
self.add_writev(
Pin::from(
// Safe because the caller is required to guarantee that the memory pointed to by
// `iovecs` lives until the transaction is complete and the completion has been
// returned from `wait()`.
iovecs
.map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
.collect::<Vec<_>>()
.into_boxed_slice(),
),
fd,
offset,
user_data,
)
}
/// Asynchronously writes to `fd` from the addresses given in `iovecs`.
/// # Safety
/// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller
/// guarantees there are no other references to that memory and that the memory lives until the
/// transaction is complete and that completion has been returned from the `wait` function. In
/// addition there must not be any mutable references to the data pointed to by `iovecs` until
/// the operation completes. Ensure that the fd remains open until the op completes as well.
/// The iovecs reference must be kept alive until the op returns.
pub unsafe fn add_writev(
&self,
iovecs: Pin<Box<[IoBufMut<'static>]>>,
fd: RawFd,
offset: Option<u64>,
user_data: UserData,
) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_WRITEV as u8;
sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
sqe.len = iovecs.len() as u32;
sqe.set_off(file_offset_to_raw_offset(offset));
sqe.set_buf_index(0);
sqe.ioprio = 0;
sqe.user_data = user_data;
sqe.flags = 0;
sqe.fd = fd;
})?;
self.complete_ring.add_op_data(user_data, iovecs);
Ok(())
}
/// # Safety
/// See 'readv' but accepts an iterator instead of a vector if there isn't already a vector in
/// existence.
pub unsafe fn add_readv_iter<I>(
&self,
iovecs: I,
fd: RawFd,
offset: Option<u64>,
user_data: UserData,
) -> Result<()>
where
I: Iterator<Item = libc::iovec>,
{
self.add_readv(
Pin::from(
// Safe because the caller is required to guarantee that the memory pointed to by
// `iovecs` lives until the transaction is complete and the completion has been
// returned from `wait()`.
iovecs
.map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
.collect::<Vec<_>>()
.into_boxed_slice(),
),
fd,
offset,
user_data,
)
}
/// Asynchronously reads from `fd` to the addresses given in `iovecs`.
/// # Safety
/// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller
/// guarantees there are no other references to that memory and that the memory lives until the
/// transaction is complete and that completion has been returned from the `wait` function. In
/// addition there must not be any references to the data pointed to by `iovecs` until the
/// operation completes. Ensure that the fd remains open until the op completes as well.
/// The iovecs reference must be kept alive until the op returns.
pub unsafe fn add_readv(
&self,
iovecs: Pin<Box<[IoBufMut<'static>]>>,
fd: RawFd,
offset: Option<u64>,
user_data: UserData,
) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_READV as u8;
sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
sqe.len = iovecs.len() as u32;
sqe.set_off(file_offset_to_raw_offset(offset));
sqe.set_buf_index(0);
sqe.ioprio = 0;
sqe.user_data = user_data;
sqe.flags = 0;
sqe.fd = fd;
})?;
self.complete_ring.add_op_data(user_data, iovecs);
Ok(())
}
/// Add a no-op operation that doesn't perform any IO. Useful for testing the performance of the
/// io_uring itself and for waking up a thread that's blocked inside a wait() call.
pub fn add_nop(&self, user_data: UserData) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_NOP as u8;
sqe.fd = -1;
sqe.user_data = user_data;
sqe.set_addr(0);
sqe.len = 0;
sqe.set_off(0);
sqe.set_buf_index(0);
sqe.set_rw_flags(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
/// Syncs all completed operations, the ordering with in-flight async ops is not
/// defined.
pub fn add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_FSYNC as u8;
sqe.fd = fd;
sqe.user_data = user_data;
sqe.set_addr(0);
sqe.len = 0;
sqe.set_off(0);
sqe.set_buf_index(0);
sqe.set_rw_flags(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
/// See the usage of `fallocate`, this asynchronously performs the same operations.
pub fn add_fallocate(
&self,
fd: RawFd,
offset: u64,
len: u64,
mode: u32,
user_data: UserData,
) -> Result<()> {
// Note that len for fallocate in passed in the addr field of the sqe and the mode uses the
// len field.
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_FALLOCATE as u8;
sqe.fd = fd;
sqe.set_addr(len);
sqe.len = mode;
sqe.set_off(offset);
sqe.user_data = user_data;
sqe.set_buf_index(0);
sqe.set_rw_flags(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
/// Adds an FD to be polled based on the given flags.
/// The user must keep the FD open until the operation completion is returned from
/// `wait`.
/// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added
/// to get future events.
pub fn add_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_POLL_ADD as u8;
sqe.fd = fd;
sqe.user_data = user_data;
sqe.set_poll_events(events.into());
sqe.set_addr(0);
sqe.len = 0;
sqe.set_off(0);
sqe.set_buf_index(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
/// Removes an FD that was previously added with `add_poll_fd`.
pub fn remove_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_POLL_REMOVE as u8;
sqe.fd = fd;
sqe.user_data = user_data;
sqe.set_poll_events(events.into());
sqe.set_addr(0);
sqe.len = 0;
sqe.set_off(0);
sqe.set_buf_index(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
/// Attempt to cancel an already issued request. addr must contain the user_data field of the
/// request that should be cancelled. The cancellation request will complete with one of the
/// following results codes. If found, the res field of the cqe will contain 0. If not found,
/// res will contain -ENOENT. If found and attempted cancelled, the res field will contain
/// -EALREADY. In this case, the request may or may not terminate. In general, requests that
/// are interruptible (like socket IO) will get cancelled, while disk IO requests cannot be
/// cancelled if already started.
pub fn async_cancel(&self, addr: UserData, user_data: UserData) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_ASYNC_CANCEL as u8;
sqe.user_data = user_data;
sqe.set_addr(addr);
sqe.len = 0;
sqe.fd = 0;
sqe.set_off(0);
sqe.set_buf_index(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
// Calls io_uring_enter, submitting any new sqes that have been added to the submit queue and
// waiting for `wait_nr` operations to complete.
fn enter(&self, wait_nr: u64) -> Result<()> {
let added = self.submit_ring.lock().prepare_submit();
if added == 0 && wait_nr == 0 {
return Ok(());
}
let flags = if wait_nr > 0 {
IORING_ENTER_GETEVENTS
} else {
0
};
let res =
// SAFETY:
// Safe because the only memory modified is in the completion queue.
unsafe { io_uring_enter(self.ring_file.as_raw_fd(), added as u64, wait_nr, flags) };
// An EINTR means we did successfully submit the events.
if res.is_ok() || res == Err(libc::EINTR) {
self.submit_ring.lock().complete_submit(added);
} else {
self.submit_ring.lock().fail_submit(added);
}
match res {
Ok(()) => Ok(()),
// EBUSY means that some completed events need to be processed before more can
// be submitted, so wait for some sqes to finish without submitting new ones.
// EINTR means we were interrupted while waiting, so start waiting again.
Err(libc::EBUSY) | Err(libc::EINTR) if wait_nr != 0 => {
loop {
let res =
// SAFETY:
// Safe because the only memory modified is in the completion queue.
unsafe { io_uring_enter(self.ring_file.as_raw_fd(), 0, wait_nr, flags) };
if res != Err(libc::EINTR) {
return res.map_err(Error::RingEnter);
}
}
}
Err(e) => Err(Error::RingEnter(e)),
}
}
/// Sends operations added with the `add_*` functions to the kernel.
pub fn submit(&self) -> Result<()> {
self.enter(0)
}
/// Sends operations added with the `add_*` functions to the kernel and return an iterator to
/// any completed operations. `wait` blocks until at least one completion is ready. If
/// called without any new events added, this simply waits for any existing events to
/// complete and returns as soon an one or more is ready.
pub fn wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_> {
// We only want to wait for events if there aren't already events in the completion queue.
let wait_nr = if self.complete_ring.num_ready() > 0 {
0
} else {
1
};
// The CompletionQueue will iterate all completed ops.
match self.enter(wait_nr) {
Ok(()) => Ok(&self.complete_ring),
// If we cannot submit any more entries then we need to pull stuff out of the completion
// ring, so just return the completion ring. This can only happen when `wait_nr` is 0 so
// we know there are already entries in the completion queue.
Err(Error::RingEnter(libc::EBUSY)) => Ok(&self.complete_ring),
Err(e) => Err(e),
}
}
}
impl AsRawFd for URingContext {
fn as_raw_fd(&self) -> RawFd {
self.ring_file.as_raw_fd()
}
}
impl AsRawDescriptor for URingContext {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.ring_file.as_raw_descriptor()
}
}
struct SubmitQueueEntries {
mmap: MemoryMapping,
len: usize,
}
impl SubmitQueueEntries {
fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> {
if index >= self.len {
return None;
}
// SAFETY:
// Safe because the mut borrow of self resticts to one mutable reference at a time and
// we trust that the kernel has returned enough memory in io_uring_setup and mmap.
let mut_ref = unsafe { &mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index) };
// Clear any state.
*mut_ref = io_uring_sqe::default();
Some(mut_ref)
}
}
struct SubmitQueueState {
_mmap: MemoryMapping,
pointers: QueuePointers,
ring_mask: u32,
array: AtomicPtr<u32>,
}
impl SubmitQueueState {
// # Safety
// Safe iff `mmap` is created by mapping from a uring FD at the SQ_RING offset and params is
// the params struct passed to io_uring_setup.
unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState {
let ptr = mmap.as_ptr();
// Transmutes are safe because a u32 is atomic on all supported architectures and the
// pointer will live until after self is dropped because the mmap is owned.
let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32;
let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32;
// This offset is guaranteed to be within the mmap so unwrap the result.
let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap();
let array = AtomicPtr::new(ptr.add(params.sq_off.array as usize) as *mut u32);
SubmitQueueState {
_mmap: mmap,
pointers: QueuePointers { head, tail },
ring_mask,
array,
}
}
// Sets the kernel's array entry at the given `index` to `value`.
fn set_array_entry(&self, index: usize, value: u32) {
// SAFETY:
// Safe because self being constructed from the correct mmap guaratees that the memory is
// valid to written.
unsafe {
std::ptr::write_volatile(self.array.load(Ordering::Relaxed).add(index), value);
}
}
}
#[derive(Default)]
struct CompleteQueueData {
//For ops that pass in arrays of iovecs, they need to be valid for the duration of the
//operation because the kernel might read them at any time.
pending_op_addrs: BTreeMap<UserData, Pin<Box<[IoBufMut<'static>]>>>,
}
pub struct CompleteQueueState {
mmap: MemoryMapping,
pointers: QueuePointers,
ring_mask: u32,
cqes_offset: u32,
data: Mutex<CompleteQueueData>,
}
impl CompleteQueueState {
/// # Safety
/// Safe iff `mmap` is created by mapping from a uring FD at the CQ_RING offset and params is
/// the params struct passed to io_uring_setup.
unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState {
let ptr = mmap.as_ptr();
let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32;
let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32;
let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap();
CompleteQueueState {
mmap,
pointers: QueuePointers { head, tail },
ring_mask,
cqes_offset: params.cq_off.cqes,
data: Default::default(),
}
}
fn add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>) {
self.data.lock().pending_op_addrs.insert(user_data, addrs);
}
fn get_cqe(&self, head: u32) -> &io_uring_cqe {
// SAFETY:
// Safe because we trust that the kernel has returned enough memory in io_uring_setup
// and mmap and index is checked within range by the ring_mask.
unsafe {
let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize)
as *const io_uring_cqe;
let index = head & self.ring_mask;
&*cqes.add(index as usize)
}
}
pub fn num_ready(&self) -> u32 {
let tail = self.pointers.tail(Ordering::Acquire);
let head = self.pointers.head(Ordering::Relaxed);
tail.saturating_sub(head)
}
fn pop_front(&self) -> Option<(UserData, std::io::Result<u32>)> {
// Take the lock on self.data first so that 2 threads don't try to pop the same completed op
// from the queue.
let mut data = self.data.lock();
// Safe because the pointers to the atomics are valid and the cqe must be in range
// because the kernel provided mask is applied to the index.
let head = self.pointers.head(Ordering::Relaxed);
// Synchronize the read of tail after the read of head.
if head == self.pointers.tail(Ordering::Acquire) {
return None;
}
let cqe = self.get_cqe(head);
let user_data = cqe.user_data;
let res = cqe.res;
// free the addrs saved for this op.
let _ = data.pending_op_addrs.remove(&user_data);
// Store the new head and ensure the reads above complete before the kernel sees the
// update to head, `set_head` uses `Release` ordering
let new_head = head.wrapping_add(1);
self.pointers.set_head(new_head);
let io_res = match res {
r if r < 0 => Err(std::io::Error::from_raw_os_error(-r)),
r => Ok(r as u32),
};
Some((user_data, io_res))
}
}
// Return the completed ops with their result.
impl<'c> Iterator for &'c CompleteQueueState {
type Item = (UserData, std::io::Result<u32>);
fn next(&mut self) -> Option<Self::Item> {
self.pop_front()
}
}
struct QueuePointers {
head: *const AtomicU32,
tail: *const AtomicU32,
}
// SAFETY:
// Rust pointers don't implement Send or Sync but in this case both fields are atomics and so it's
// safe to send the pointers between threads or access them concurrently from multiple threads.
unsafe impl Send for QueuePointers {}
// SAFETY: See safety comments for impl Send
unsafe impl Sync for QueuePointers {}
impl QueuePointers {
// Loads the tail pointer atomically with the given ordering.
fn tail(&self, ordering: Ordering) -> u32 {
// SAFETY:
// Safe because self being constructed from the correct mmap guaratees that the memory is
// valid to read.
unsafe { (*self.tail).load(ordering) }
}
// Stores the new value of the tail in the submit queue. This allows the kernel to start
// processing entries that have been added up until the given tail pointer.
// Always stores with release ordering as that is the only valid way to use the pointer.
fn set_tail(&self, next_tail: u32) {
// SAFETY:
// Safe because self being constructed from the correct mmap guaratees that the memory is
// valid to read and it's used as an atomic to cover mutability concerns.
unsafe { (*self.tail).store(next_tail, Ordering::Release) }
}
// Loads the head pointer atomically with the given ordering.
fn head(&self, ordering: Ordering) -> u32 {
// SAFETY:
// Safe because self being constructed from the correct mmap guaratees that the memory is
// valid to read.
unsafe { (*self.head).load(ordering) }
}
// Stores the new value of the head in the submit queue. This allows the kernel to start
// processing entries that have been added up until the given head pointer.
// Always stores with release ordering as that is the only valid way to use the pointer.
fn set_head(&self, next_head: u32) {
// SAFETY:
// Safe because self being constructed from the correct mmap guaratees that the memory is
// valid to read and it's used as an atomic to cover mutability concerns.
unsafe { (*self.head).store(next_head, Ordering::Release) }
}
}