cros_async/sys/linux/
uring_executor.rs

1// Copyright 2020 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// TODO: Move this doc to one of the public APIs, it isn't io_uring specific.
6
7//! `UringReactor`
8//!
9//! ## Read/Write buffer management.
10//!
11//! There are two key issues managing asynchronous IO buffers in rust.
12//! 1) The kernel has a mutable reference to the memory until the completion is returned. Rust must
13//!    not have any references to it during that time.
14//! 2) The memory must remain valid as long as the kernel has a reference to it.
15//!
16//! ### The kernel's mutable borrow of the buffer
17//!
18//! Because the buffers used for read and write must be passed to the kernel for an unknown
19//! duration, the functions must maintain ownership of the memory.  The core of this problem is that
20//! the lifetime of the future isn't tied to the scope in which the kernel can modify the buffer the
21//! future has a reference to.  The buffer can be modified at any point from submission until the
22//! operation completes. The operation can't be synchronously canceled when the future is dropped,
23//! and Drop can't be used for safety guarantees. To ensure this never happens, only memory that
24//! implements `BackingMemory` is accepted.  For implementors of `BackingMemory` the mut borrow
25//! isn't an issue because those are already Ok with external modifications to the memory (Like a
26//! `VolatileSlice`).
27//!
28//! ### Buffer lifetime
29//!
30//! What if the kernel's reference to the buffer outlives the buffer itself?  This could happen if a
31//! read operation was submitted, then the memory is dropped.  To solve this, the executor takes an
32//! Arc to the backing memory. Vecs being read to are also wrapped in an Arc before being passed to
33//! the executor.  The executor holds the Arc and ensures all operations are complete before
34//! dropping it, that guarantees the memory is valid for the duration.
35//!
36//! The buffers _have_ to be on the heap. Because we don't have a way to cancel a future if it is
37//! dropped(can't rely on drop running), there is no way to ensure the kernel's buffer remains valid
38//! until the operation completes unless the executor holds an Arc to the memory on the heap.
39//!
40//! ## Using `Vec` for reads/writes.
41//!
42//! There is a convenience wrapper `VecIoWrapper` provided for fully owned vectors. This type
43//! ensures that only the kernel is allowed to access the `Vec` and wraps the the `Vec` in an Arc to
44//! ensure it lives long enough.
45
46use std::convert::TryInto;
47use std::ffi::CStr;
48use std::fs::File;
49use std::future::Future;
50use std::io;
51use std::mem;
52use std::mem::MaybeUninit;
53use std::os::unix::io::FromRawFd;
54use std::os::unix::io::RawFd;
55use std::pin::Pin;
56use std::sync::Arc;
57use std::sync::LazyLock;
58use std::sync::Weak;
59use std::task::Context;
60use std::task::Poll;
61use std::task::Waker;
62use std::thread;
63use std::thread::ThreadId;
64
65use base::trace;
66use base::warn;
67use base::AsRawDescriptor;
68use base::EventType;
69use base::IoBufMut;
70use base::RawDescriptor;
71use io_uring::URingAllowlist;
72use io_uring::URingContext;
73use io_uring::URingOperation;
74use remain::sorted;
75use slab::Slab;
76use sync::Mutex;
77use thiserror::Error as ThisError;
78
79use crate::common_executor::RawExecutor;
80use crate::common_executor::RawTaskHandle;
81use crate::common_executor::Reactor;
82use crate::mem::BackingMemory;
83use crate::waker::WakerToken;
84use crate::waker::WeakWake;
85use crate::AsyncError;
86use crate::AsyncResult;
87use crate::IoSource;
88use crate::MemRegion;
89use crate::TaskHandle;
90
91#[sorted]
92#[derive(Debug, ThisError)]
93pub enum Error {
94    /// Creating a context to wait on FDs failed.
95    #[error("Error creating the fd waiting context: {0}")]
96    CreatingContext(io_uring::Error),
97    /// Failed to discard a block
98    #[error("Failed to discard a block: {0}")]
99    Discard(base::Error),
100    /// Failed to copy the FD for the polling context.
101    #[error("Failed to copy the FD for the polling context: {0}")]
102    DuplicatingFd(base::Error),
103    /// Enabling a context faild.
104    #[error("Error enabling the URing context: {0}")]
105    EnablingContext(io_uring::Error),
106    /// The Executor is gone.
107    #[error("The executor is gone")]
108    ExecutorGone,
109    /// Invalid offset or length given for an iovec in backing memory.
110    #[error("Invalid offset/len for getting an iovec")]
111    InvalidOffset,
112    /// Invalid FD source specified.
113    #[error("Invalid source, FD not registered for use")]
114    InvalidSource,
115    /// Error doing the IO.
116    #[error("Error during IO: {0}")]
117    Io(io::Error),
118    /// Registering operation restrictions to a uring failed.
119    #[error("Error registering restrictions to the URing context: {0}")]
120    RegisteringURingRestriction(io_uring::Error),
121    /// Failed to remove the waker remove the polling context.
122    #[error("Error removing from the URing context: {0}")]
123    RemovingWaker(io_uring::Error),
124    /// Failed to submit the operation to the polling context.
125    #[error("Error adding to the URing context: {0}")]
126    SubmittingOp(io_uring::Error),
127    /// URingContext failure.
128    #[error("URingContext failure: {0}")]
129    URingContextError(io_uring::Error),
130    /// Failed to submit or wait for io_uring events.
131    #[error("URing::enter: {0}")]
132    URingEnter(io_uring::Error),
133}
134pub type Result<T> = std::result::Result<T, Error>;
135
136impl From<Error> for io::Error {
137    fn from(e: Error) -> Self {
138        use Error::*;
139        match e {
140            Discard(e) => e.into(),
141            DuplicatingFd(e) => e.into(),
142            ExecutorGone => io::Error::other(ExecutorGone),
143            InvalidOffset => io::Error::new(io::ErrorKind::InvalidInput, InvalidOffset),
144            InvalidSource => io::Error::new(io::ErrorKind::InvalidData, InvalidSource),
145            Io(e) => e,
146            CreatingContext(e) => e.into(),
147            RemovingWaker(e) => e.into(),
148            SubmittingOp(e) => e.into(),
149            URingContextError(e) => e.into(),
150            URingEnter(e) => e.into(),
151            EnablingContext(e) => e.into(),
152            RegisteringURingRestriction(e) => e.into(),
153        }
154    }
155}
156
157impl From<Error> for AsyncError {
158    fn from(e: Error) -> AsyncError {
159        AsyncError::SysVariants(e.into())
160    }
161}
162
163static IS_URING_STABLE: LazyLock<bool> = LazyLock::new(|| {
164    let mut utsname = MaybeUninit::zeroed();
165
166    // SAFETY:
167    // Safe because this will only modify `utsname` and we check the return value.
168    let res = unsafe { libc::uname(utsname.as_mut_ptr()) };
169    if res < 0 {
170        return false;
171    }
172
173    // SAFETY:
174    // Safe because the kernel has initialized `utsname`.
175    let utsname = unsafe { utsname.assume_init() };
176
177    // SAFETY:
178    // Safe because the pointer is valid and the kernel guarantees that this is a valid C string.
179    let release = unsafe { CStr::from_ptr(utsname.release.as_ptr()) };
180
181    let mut components = match release.to_str().map(|r| r.split('.').map(str::parse)) {
182        Ok(c) => c,
183        Err(_) => return false,
184    };
185
186    // Kernels older than 5.10 either didn't support io_uring or had bugs in the implementation.
187    match (components.next(), components.next()) {
188        (Some(Ok(major)), Some(Ok(minor))) if (major, minor) >= (5, 10) => {
189            // The kernel version is new enough so check if we can actually make a uring context.
190            URingContext::new(8, None).is_ok()
191        }
192        _ => false,
193    }
194});
195
196// Checks if the uring executor is stable.
197// Caches the result so that the check is only run once.
198// Useful for falling back to the FD executor on pre-uring kernels.
199pub fn is_uring_stable() -> bool {
200    *IS_URING_STABLE
201}
202
203// Checks the uring availability by checking if the uring creation succeeds.
204// If uring creation succeeds, it returns `Ok(())`. It returns an `URingContextError` otherwise.
205// It fails if the kernel does not support io_uring, but note that the cause is not limited to it.
206pub(crate) fn check_uring_availability() -> Result<()> {
207    URingContext::new(8, None)
208        .map(drop)
209        .map_err(Error::URingContextError)
210}
211
212pub struct RegisteredSource {
213    tag: usize,
214    ex: Weak<RawExecutor<UringReactor>>,
215}
216
217impl RegisteredSource {
218    pub fn start_read_to_mem(
219        &self,
220        file_offset: Option<u64>,
221        mem: Arc<dyn BackingMemory + Send + Sync>,
222        addrs: impl IntoIterator<Item = MemRegion>,
223    ) -> Result<PendingOperation> {
224        let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
225        let token = ex
226            .reactor
227            .submit_read_to_vectored(self, mem, file_offset, addrs)?;
228
229        Ok(PendingOperation {
230            waker_token: Some(token),
231            ex: self.ex.clone(),
232            submitted: false,
233        })
234    }
235
236    pub fn start_write_from_mem(
237        &self,
238        file_offset: Option<u64>,
239        mem: Arc<dyn BackingMemory + Send + Sync>,
240        addrs: impl IntoIterator<Item = MemRegion>,
241    ) -> Result<PendingOperation> {
242        let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
243        let token = ex
244            .reactor
245            .submit_write_from_vectored(self, mem, file_offset, addrs)?;
246
247        Ok(PendingOperation {
248            waker_token: Some(token),
249            ex: self.ex.clone(),
250            submitted: false,
251        })
252    }
253
254    pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {
255        let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
256        let token = ex.reactor.submit_fallocate(self, offset, len, mode)?;
257
258        Ok(PendingOperation {
259            waker_token: Some(token),
260            ex: self.ex.clone(),
261            submitted: false,
262        })
263    }
264
265    pub fn start_fsync(&self) -> Result<PendingOperation> {
266        let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
267        let token = ex.reactor.submit_fsync(self)?;
268
269        Ok(PendingOperation {
270            waker_token: Some(token),
271            ex: self.ex.clone(),
272            submitted: false,
273        })
274    }
275
276    pub fn poll_fd_readable(&self) -> Result<PendingOperation> {
277        let events = EventType::Read;
278
279        let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
280        let token = ex.reactor.submit_poll(self, events)?;
281
282        Ok(PendingOperation {
283            waker_token: Some(token),
284            ex: self.ex.clone(),
285            submitted: false,
286        })
287    }
288}
289
290impl Drop for RegisteredSource {
291    fn drop(&mut self) {
292        if let Some(ex) = self.ex.upgrade() {
293            ex.reactor.deregister_source(self);
294        }
295    }
296}
297
298// Number of entries in the ring.
299const NUM_ENTRIES: usize = 256;
300
301// An operation that has been submitted to the uring and is potentially being waited on.
302struct OpData {
303    _file: Arc<File>,
304    _mem: Option<Arc<dyn BackingMemory + Send + Sync>>,
305    waker: Option<Waker>,
306    canceled: bool,
307}
308
309// The current status of an operation that's been submitted to the uring.
310enum OpStatus {
311    Nop,
312    Pending(OpData),
313    Completed(Option<::std::io::Result<u32>>),
314}
315
316struct Ring {
317    ops: Slab<OpStatus>,
318    registered_sources: Slab<Arc<File>>,
319}
320
321/// `Reactor` that manages async IO work using io_uring.
322pub struct UringReactor {
323    // The URingContext needs to be first so that it is dropped first, closing the uring fd, and
324    // releasing the resources borrowed by the kernel before we free them.
325    ctx: URingContext,
326    ring: Mutex<Ring>,
327    thread_id: Mutex<Option<ThreadId>>,
328}
329
330impl UringReactor {
331    fn new() -> Result<UringReactor> {
332        // Allow operations only that the UringReactor really submits to enhance the security.
333        let mut restrictions = URingAllowlist::new();
334        let ops = [
335            URingOperation::Writev,
336            URingOperation::Readv,
337            URingOperation::Nop,
338            URingOperation::Fsync,
339            URingOperation::Fallocate,
340            URingOperation::PollAdd,
341            URingOperation::PollRemove,
342            URingOperation::AsyncCancel,
343        ];
344        for op in ops {
345            restrictions.allow_submit_operation(op);
346        }
347
348        let ctx =
349            URingContext::new(NUM_ENTRIES, Some(&restrictions)).map_err(Error::CreatingContext)?;
350
351        Ok(UringReactor {
352            ctx,
353            ring: Mutex::new(Ring {
354                ops: Slab::with_capacity(NUM_ENTRIES),
355                registered_sources: Slab::with_capacity(NUM_ENTRIES),
356            }),
357            thread_id: Mutex::new(None),
358        })
359    }
360
361    fn runs_tasks_on_current_thread(&self) -> bool {
362        let executor_thread = self.thread_id.lock();
363        executor_thread
364            .map(|id| id == thread::current().id())
365            .unwrap_or(false)
366    }
367
368    fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {
369        let mut ring = self.ring.lock();
370
371        let op = ring
372            .ops
373            .get_mut(token.0)
374            .expect("`get_result` called on unknown operation");
375        match op {
376            OpStatus::Nop => panic!("`get_result` called on nop"),
377            OpStatus::Pending(data) => {
378                if data.canceled {
379                    panic!("`get_result` called on canceled operation");
380                }
381                data.waker = Some(cx.waker().clone());
382                None
383            }
384            OpStatus::Completed(res) => {
385                let out = res.take();
386                ring.ops.remove(token.0);
387                Some(out.expect("Missing result in completed operation"))
388            }
389        }
390    }
391
392    // Remove the waker for the given token if it hasn't fired yet.
393    fn cancel_operation(&self, token: WakerToken) {
394        let mut ring = self.ring.lock();
395        let submit_cancel = if let Some(op) = ring.ops.get_mut(token.0) {
396            match op {
397                OpStatus::Nop => panic!("`cancel_operation` called on nop"),
398                OpStatus::Pending(data) => {
399                    if data.canceled {
400                        panic!("uring operation canceled more than once");
401                    }
402
403                    if let Some(waker) = data.waker.take() {
404                        waker.wake();
405                    }
406                    // Clear the waker as it is no longer needed.
407                    data.waker = None;
408                    data.canceled = true;
409
410                    // Keep the rest of the op data as the uring might still be accessing either
411                    // the source of the backing memory so it needs to live until the kernel
412                    // completes the operation.
413                    true
414                }
415                OpStatus::Completed(_) => {
416                    ring.ops.remove(token.0);
417                    false
418                }
419            }
420        } else {
421            false
422        };
423        std::mem::drop(ring);
424        if submit_cancel {
425            let _best_effort = self.submit_cancel_async(token.0);
426        }
427    }
428
429    pub(crate) fn register_source<F: AsRawDescriptor>(
430        &self,
431        raw: &Arc<RawExecutor<UringReactor>>,
432        fd: &F,
433    ) -> Result<RegisteredSource> {
434        // SAFETY:
435        // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
436        // will only be added to the poll loop.
437        let duped_fd = unsafe { File::from_raw_fd(dup_fd(fd.as_raw_descriptor())?) };
438
439        Ok(RegisteredSource {
440            tag: self
441                .ring
442                .lock()
443                .registered_sources
444                .insert(Arc::new(duped_fd)),
445            ex: Arc::downgrade(raw),
446        })
447    }
448
449    fn deregister_source(&self, source: &RegisteredSource) {
450        // There isn't any need to pull pending ops out, the all have Arc's to the file and mem they
451        // need.let them complete. deregister with pending ops is not a common path no need to
452        // optimize that case yet.
453        self.ring.lock().registered_sources.remove(source.tag);
454    }
455
456    fn submit_poll(
457        &self,
458        source: &RegisteredSource,
459        events: base::EventType,
460    ) -> Result<WakerToken> {
461        let mut ring = self.ring.lock();
462        let src = ring
463            .registered_sources
464            .get(source.tag)
465            .ok_or(Error::InvalidSource)?
466            .clone();
467        let entry = ring.ops.vacant_entry();
468        let next_op_token = entry.key();
469        self.ctx
470            .add_poll_fd(src.as_raw_descriptor(), events, usize_to_u64(next_op_token))
471            .map_err(Error::SubmittingOp)?;
472        entry.insert(OpStatus::Pending(OpData {
473            _file: src,
474            _mem: None,
475            waker: None,
476            canceled: false,
477        }));
478
479        Ok(WakerToken(next_op_token))
480    }
481
482    fn submit_fallocate(
483        &self,
484        source: &RegisteredSource,
485        offset: u64,
486        len: u64,
487        mode: u32,
488    ) -> Result<WakerToken> {
489        let mut ring = self.ring.lock();
490        let src = ring
491            .registered_sources
492            .get(source.tag)
493            .ok_or(Error::InvalidSource)?
494            .clone();
495        let entry = ring.ops.vacant_entry();
496        let next_op_token = entry.key();
497        self.ctx
498            .add_fallocate(
499                src.as_raw_descriptor(),
500                offset,
501                len,
502                mode,
503                usize_to_u64(next_op_token),
504            )
505            .map_err(Error::SubmittingOp)?;
506
507        entry.insert(OpStatus::Pending(OpData {
508            _file: src,
509            _mem: None,
510            waker: None,
511            canceled: false,
512        }));
513
514        Ok(WakerToken(next_op_token))
515    }
516
517    fn submit_cancel_async(&self, token: usize) -> Result<WakerToken> {
518        let mut ring = self.ring.lock();
519        let entry = ring.ops.vacant_entry();
520        let next_op_token = entry.key();
521        self.ctx
522            .async_cancel(usize_to_u64(token), usize_to_u64(next_op_token))
523            .map_err(Error::SubmittingOp)?;
524
525        entry.insert(OpStatus::Nop);
526
527        Ok(WakerToken(next_op_token))
528    }
529
530    fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
531        let mut ring = self.ring.lock();
532        let src = ring
533            .registered_sources
534            .get(source.tag)
535            .ok_or(Error::InvalidSource)?
536            .clone();
537        let entry = ring.ops.vacant_entry();
538        let next_op_token = entry.key();
539        self.ctx
540            .add_fsync(src.as_raw_descriptor(), usize_to_u64(next_op_token))
541            .map_err(Error::SubmittingOp)?;
542        entry.insert(OpStatus::Pending(OpData {
543            _file: src,
544            _mem: None,
545            waker: None,
546            canceled: false,
547        }));
548
549        Ok(WakerToken(next_op_token))
550    }
551
552    fn submit_read_to_vectored(
553        &self,
554        source: &RegisteredSource,
555        mem: Arc<dyn BackingMemory + Send + Sync>,
556        offset: Option<u64>,
557        addrs: impl IntoIterator<Item = MemRegion>,
558    ) -> Result<WakerToken> {
559        let iovecs = addrs
560            .into_iter()
561            .map(|mem_range| {
562                let vslice = mem
563                    .get_volatile_slice(mem_range)
564                    .map_err(|_| Error::InvalidOffset)?;
565                // SAFETY:
566                // Safe because we guarantee that the memory pointed to by `iovecs` lives until the
567                // transaction is complete and the completion has been returned from `wait()`.
568                Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })
569            })
570            .collect::<Result<Vec<_>>>()?;
571        let iovecs = Pin::from(iovecs.into_boxed_slice());
572
573        let mut ring = self.ring.lock();
574        let src = ring
575            .registered_sources
576            .get(source.tag)
577            .ok_or(Error::InvalidSource)?
578            .clone();
579
580        let entry = ring.ops.vacant_entry();
581        let next_op_token = entry.key();
582
583        // SAFETY:
584        // Safe because all the addresses are within the Memory that an Arc is kept for the
585        // duration to ensure the memory is valid while the kernel accesses it.
586        // Tested by `dont_drop_backing_mem_read` unit test.
587        unsafe {
588            self.ctx
589                .add_readv(
590                    iovecs,
591                    src.as_raw_descriptor(),
592                    offset,
593                    usize_to_u64(next_op_token),
594                )
595                .map_err(Error::SubmittingOp)?;
596        }
597
598        entry.insert(OpStatus::Pending(OpData {
599            _file: src,
600            _mem: Some(mem),
601            waker: None,
602            canceled: false,
603        }));
604
605        Ok(WakerToken(next_op_token))
606    }
607
608    fn submit_write_from_vectored(
609        &self,
610        source: &RegisteredSource,
611        mem: Arc<dyn BackingMemory + Send + Sync>,
612        offset: Option<u64>,
613        addrs: impl IntoIterator<Item = MemRegion>,
614    ) -> Result<WakerToken> {
615        let iovecs = addrs
616            .into_iter()
617            .map(|mem_range| {
618                let vslice = mem
619                    .get_volatile_slice(mem_range)
620                    .map_err(|_| Error::InvalidOffset)?;
621                // SAFETY:
622                // Safe because we guarantee that the memory pointed to by `iovecs` lives until the
623                // transaction is complete and the completion has been returned from `wait()`.
624                Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })
625            })
626            .collect::<Result<Vec<_>>>()?;
627        let iovecs = Pin::from(iovecs.into_boxed_slice());
628
629        let mut ring = self.ring.lock();
630        let src = ring
631            .registered_sources
632            .get(source.tag)
633            .ok_or(Error::InvalidSource)?
634            .clone();
635
636        let entry = ring.ops.vacant_entry();
637        let next_op_token = entry.key();
638
639        // SAFETY:
640        // Safe because all the addresses are within the Memory that an Arc is kept for the
641        // duration to ensure the memory is valid while the kernel accesses it.
642        // Tested by `dont_drop_backing_mem_write` unit test.
643        unsafe {
644            self.ctx
645                .add_writev(
646                    iovecs,
647                    src.as_raw_descriptor(),
648                    offset,
649                    usize_to_u64(next_op_token),
650                )
651                .map_err(Error::SubmittingOp)?;
652        }
653
654        entry.insert(OpStatus::Pending(OpData {
655            _file: src,
656            _mem: Some(mem),
657            waker: None,
658            canceled: false,
659        }));
660
661        Ok(WakerToken(next_op_token))
662    }
663}
664
665impl Reactor for UringReactor {
666    fn new() -> std::io::Result<Self> {
667        Ok(UringReactor::new()?)
668    }
669
670    fn wake(&self) {
671        let mut ring = self.ring.lock();
672        let entry = ring.ops.vacant_entry();
673        let next_op_token = entry.key();
674        if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {
675            warn!("Failed to add NOP for waking up executor: {}", e);
676        }
677        entry.insert(OpStatus::Nop);
678        mem::drop(ring);
679
680        match self.ctx.submit() {
681            Ok(()) => {}
682            // If the kernel's submit ring is full then we know we won't block when calling
683            // io_uring_enter, which is all we really care about.
684            Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
685            Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),
686        }
687    }
688
689    fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
690        // At this point, there are no strong references to the executor (see `on_executor_drop`
691        // docs). That means all the `RegisteredSource::ex` will fail to upgrade and so no more IO
692        // work can be submitted.
693
694        // Submit cancellations for all operations
695        #[allow(clippy::needless_collect)]
696        let ops: Vec<_> = self
697            .ring
698            .lock()
699            .ops
700            .iter_mut()
701            .filter_map(|op| match op.1 {
702                OpStatus::Pending(data) if !data.canceled => Some(op.0),
703                _ => None,
704            })
705            .collect();
706        for token in ops {
707            self.cancel_operation(WakerToken(token));
708        }
709
710        // Since the UringReactor is wrapped in an Arc it may end up being dropped from a different
711        // thread than the one that called `run` or `run_until`. Since we know there are no other
712        // references, just clear the thread id so that we don't panic.
713        *self.thread_id.lock() = None;
714
715        // Make sure all pending uring operations are completed as kernel may try to write to
716        // memory that we may drop.
717        //
718        // This future doesn't use the waker, it assumes the future will always be polled after
719        // processing other woken futures.
720        // TODO: Find a more robust solution.
721        Box::pin(futures::future::poll_fn(|_cx| {
722            if self.ring.lock().ops.is_empty() {
723                Poll::Ready(())
724            } else {
725                Poll::Pending
726            }
727        }))
728    }
729
730    fn on_thread_start(&self) {
731        let current_thread = thread::current().id();
732        let mut thread_id = self.thread_id.lock();
733        assert_eq!(
734            *thread_id.get_or_insert(current_thread),
735            current_thread,
736            "`UringReactor::wait_for_work` cannot be called from more than one thread"
737        );
738    }
739
740    fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {
741        trace!(
742            "Waiting on events, {} pending ops",
743            self.ring.lock().ops.len()
744        );
745        let events = self.ctx.wait().map_err(Error::URingEnter)?;
746
747        // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
748        // writing to the eventfd.
749        set_processing();
750
751        let mut ring = self.ring.lock();
752        for (raw_token, result) in events {
753            // While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
754            // something that we originally gave to the kernel and that was created from a
755            // `usize` so we should always be able to convert it back into a `usize`.
756            let token = raw_token
757                .try_into()
758                .expect("`u64` doesn't fit inside a `usize`");
759
760            let op = ring
761                .ops
762                .get_mut(token)
763                .expect("Received completion token for unexpected operation");
764            match mem::replace(op, OpStatus::Completed(Some(result))) {
765                // No one is waiting on a Nop.
766                OpStatus::Nop => mem::drop(ring.ops.remove(token)),
767                OpStatus::Pending(data) => {
768                    if data.canceled {
769                        // No one is waiting for this operation and the uring is done with
770                        // it so it's safe to remove.
771                        ring.ops.remove(token);
772                    }
773                    if let Some(waker) = data.waker {
774                        waker.wake();
775                    }
776                }
777                OpStatus::Completed(_) => panic!("uring operation completed more than once"),
778            }
779        }
780
781        Ok(())
782    }
783
784    fn new_source<F: AsRawDescriptor>(
785        &self,
786        ex: &Arc<RawExecutor<Self>>,
787        f: F,
788    ) -> AsyncResult<IoSource<F>> {
789        Ok(IoSource::Uring(super::UringSource::new(f, ex)?))
790    }
791
792    fn wrap_task_handle<R>(task: RawTaskHandle<UringReactor, R>) -> TaskHandle<R> {
793        TaskHandle::Uring(task)
794    }
795}
796
797impl AsRawDescriptor for UringReactor {
798    fn as_raw_descriptor(&self) -> RawDescriptor {
799        self.ctx.as_raw_descriptor()
800    }
801}
802
803impl WeakWake for UringReactor {
804    fn wake_by_ref(weak_self: &Weak<Self>) {
805        if let Some(arc_self) = weak_self.upgrade() {
806            Reactor::wake(&*arc_self);
807        }
808    }
809}
810
811impl Drop for UringReactor {
812    fn drop(&mut self) {
813        // The ring should have been drained when the executor's Drop ran.
814        assert!(self.ring.lock().ops.is_empty());
815    }
816}
817
818// SAFETY:
819// Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while
820// waiting in TLS to be added to the main polling context.
821unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
822    let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
823    if ret < 0 {
824        Err(Error::DuplicatingFd(base::Error::last()))
825    } else {
826        Ok(ret)
827    }
828}
829
830// Converts a `usize` into a `u64` and panics if the conversion fails.
831#[inline]
832fn usize_to_u64(val: usize) -> u64 {
833    val.try_into().expect("`usize` doesn't fit inside a `u64`")
834}
835
836pub struct PendingOperation {
837    waker_token: Option<WakerToken>,
838    ex: Weak<RawExecutor<UringReactor>>,
839    submitted: bool,
840}
841
842impl Future for PendingOperation {
843    type Output = Result<u32>;
844
845    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
846        let token = self
847            .waker_token
848            .as_ref()
849            .expect("PendingOperation polled after returning Poll::Ready");
850        if let Some(ex) = self.ex.upgrade() {
851            if let Some(result) = ex.reactor.get_result(token, cx) {
852                self.waker_token = None;
853                Poll::Ready(result.map_err(Error::Io))
854            } else {
855                // If we haven't submitted the operation yet, and the executor runs on a different
856                // thread then submit it now. Otherwise the executor will submit it automatically
857                // the next time it calls UringContext::wait.
858                if !self.submitted && !ex.reactor.runs_tasks_on_current_thread() {
859                    match ex.reactor.ctx.submit() {
860                        Ok(()) => self.submitted = true,
861                        // If the kernel ring is full then wait until some ops are removed from the
862                        // completion queue. This op should get submitted the next time the executor
863                        // calls UringContext::wait.
864                        Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
865                        Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
866                    }
867                }
868                Poll::Pending
869            }
870        } else {
871            Poll::Ready(Err(Error::ExecutorGone))
872        }
873    }
874}
875
876impl Drop for PendingOperation {
877    fn drop(&mut self) {
878        if let Some(waker_token) = self.waker_token.take() {
879            if let Some(ex) = self.ex.upgrade() {
880                ex.reactor.cancel_operation(waker_token);
881            }
882        }
883    }
884}
885
886#[cfg(test)]
887mod tests {
888    use std::future::Future;
889    use std::io::Read;
890    use std::io::Write;
891    use std::mem;
892    use std::pin::Pin;
893    use std::rc::Rc;
894    use std::task::Context;
895    use std::task::Poll;
896
897    use futures::executor::block_on;
898
899    use super::*;
900    use crate::mem::BackingMemory;
901    use crate::mem::MemRegion;
902    use crate::mem::VecIoWrapper;
903    use crate::BlockingPool;
904    use crate::ExecutorTrait;
905
906    // A future that returns ready when the uring queue is empty.
907    struct UringQueueEmpty<'a> {
908        ex: &'a Arc<RawExecutor<UringReactor>>,
909    }
910
911    impl Future for UringQueueEmpty<'_> {
912        type Output = ();
913
914        fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
915            if self.ex.reactor.ring.lock().ops.is_empty() {
916                Poll::Ready(())
917            } else {
918                Poll::Pending
919            }
920        }
921    }
922
923    #[test]
924    fn dont_drop_backing_mem_read() {
925        if !is_uring_stable() {
926            return;
927        }
928
929        // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
930        // op is pending.
931        let bm =
932            Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
933
934        // Use pipes to create a future that will block forever.
935        let (rx, mut tx) = base::pipe().unwrap();
936
937        // Set up the TLS for the uring_executor by creating one.
938        let ex = RawExecutor::<UringReactor>::new().unwrap();
939
940        // Register the receive side of the pipe with the executor.
941        let registered_source = ex
942            .reactor
943            .register_source(&ex, &rx)
944            .expect("register source failed");
945
946        // Submit the op to the kernel. Next, test that the source keeps its Arc open for the
947        // duration of the op.
948        let pending_op = registered_source
949            .start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
950            .expect("failed to start read to mem");
951
952        // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
953        // reference while the op is active.
954        assert_eq!(Arc::strong_count(&bm), 2);
955
956        // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
957        // it.
958        drop(pending_op);
959        assert_eq!(Arc::strong_count(&bm), 2);
960
961        // Finishing the operation should put the Arc count back to 1.
962        // write to the pipe to wake the read pipe and then wait for the uring result in the
963        // executor.
964        tx.write_all(&[0u8; 8]).expect("write failed");
965        ex.run_until(UringQueueEmpty { ex: &ex })
966            .expect("Failed to wait for read pipe ready");
967        assert_eq!(Arc::strong_count(&bm), 1);
968    }
969
970    #[test]
971    fn dont_drop_backing_mem_write() {
972        if !is_uring_stable() {
973            return;
974        }
975
976        // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
977        // op is pending.
978        let bm =
979            Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
980
981        // Use pipes to create a future that will block forever.
982        let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed");
983
984        // Set up the TLS for the uring_executor by creating one.
985        let ex = RawExecutor::<UringReactor>::new().unwrap();
986
987        // Register the receive side of the pipe with the executor.
988        let registered_source = ex
989            .reactor
990            .register_source(&ex, &tx)
991            .expect("register source failed");
992
993        // Submit the op to the kernel. Next, test that the source keeps its Arc open for the
994        // duration of the op.
995        let pending_op = registered_source
996            .start_write_from_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
997            .expect("failed to start write to mem");
998
999        // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
1000        // reference while the op is active.
1001        assert_eq!(Arc::strong_count(&bm), 2);
1002
1003        // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
1004        // it.
1005        drop(pending_op);
1006        assert_eq!(Arc::strong_count(&bm), 2);
1007
1008        // Finishing the operation should put the Arc count back to 1.
1009        // write to the pipe to wake the read pipe and then wait for the uring result in the
1010        // executor.
1011        let mut buf = vec![0u8; base::round_up_to_page_size(1)];
1012        rx.read_exact(&mut buf).expect("read to empty failed");
1013        ex.run_until(UringQueueEmpty { ex: &ex })
1014            .expect("Failed to wait for write pipe ready");
1015        assert_eq!(Arc::strong_count(&bm), 1);
1016    }
1017
1018    #[test]
1019    fn canceled_before_completion() {
1020        if !is_uring_stable() {
1021            return;
1022        }
1023
1024        async fn cancel_io(op: PendingOperation) {
1025            mem::drop(op);
1026        }
1027
1028        async fn check_result(op: PendingOperation, expected: u32) {
1029            let actual = op.await.expect("operation failed to complete");
1030            assert_eq!(expected, actual);
1031        }
1032
1033        let bm =
1034            Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1035
1036        let (rx, tx) = base::pipe().expect("Pipe failed");
1037
1038        let ex = RawExecutor::<UringReactor>::new().unwrap();
1039
1040        let rx_source = ex
1041            .reactor
1042            .register_source(&ex, &rx)
1043            .expect("register source failed");
1044        let tx_source = ex
1045            .reactor
1046            .register_source(&ex, &tx)
1047            .expect("register source failed");
1048
1049        let read_task = rx_source
1050            .start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
1051            .expect("failed to start read to mem");
1052
1053        ex.spawn_local(cancel_io(read_task)).detach();
1054
1055        // Write to the pipe so that the kernel operation will complete.
1056        let buf =
1057            Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1058        let write_task = tx_source
1059            .start_write_from_mem(None, Arc::clone(&buf), [MemRegion { offset: 0, len: 8 }])
1060            .expect("failed to start write from mem");
1061
1062        ex.run_until(check_result(write_task, 8))
1063            .expect("Failed to run executor");
1064    }
1065
1066    // We will drain all ops on drop and its not guaranteed that operation won't finish
1067    #[ignore]
1068    #[test]
1069    fn drop_before_completion() {
1070        if !is_uring_stable() {
1071            return;
1072        }
1073
1074        const VALUE: u64 = 0xef6c_a8df_b842_eb9c;
1075
1076        async fn check_op(op: PendingOperation) {
1077            let err = op.await.expect_err("Op completed successfully");
1078            match err {
1079                Error::ExecutorGone => {}
1080                e => panic!("Unexpected error from op: {e}"),
1081            }
1082        }
1083
1084        let (mut rx, mut tx) = base::pipe().expect("Pipe failed");
1085
1086        let ex = RawExecutor::<UringReactor>::new().unwrap();
1087
1088        let tx_source = ex
1089            .reactor
1090            .register_source(&ex, &tx)
1091            .expect("Failed to register source");
1092        let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));
1093        let op = tx_source
1094            .start_write_from_mem(
1095                None,
1096                bm,
1097                [MemRegion {
1098                    offset: 0,
1099                    len: mem::size_of::<u64>(),
1100                }],
1101            )
1102            .expect("Failed to start write from mem");
1103
1104        ex.spawn_local(check_op(op)).detach();
1105
1106        // Now drop the executor. It shouldn't run the write operation.
1107        mem::drop(ex);
1108
1109        // Make sure the executor did not complete the uring operation.
1110        let new_val = [0x2e; 8];
1111        tx.write_all(&new_val).unwrap();
1112
1113        let mut buf = 0u64.to_ne_bytes();
1114        rx.read_exact(&mut buf[..])
1115            .expect("Failed to read from pipe");
1116
1117        assert_eq!(buf, new_val);
1118    }
1119
1120    // Dropping a task that owns a BlockingPool shouldn't leak the pool.
1121    #[test]
1122    fn drop_detached_blocking_pool() {
1123        if !is_uring_stable() {
1124            return;
1125        }
1126
1127        struct Cleanup(BlockingPool);
1128
1129        impl Drop for Cleanup {
1130            fn drop(&mut self) {
1131                // Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).
1132                self.0
1133                    .shutdown(Some(
1134                        std::time::Instant::now() + std::time::Duration::from_secs(1),
1135                    ))
1136                    .unwrap();
1137            }
1138        }
1139
1140        let rc = Rc::new(std::cell::Cell::new(0));
1141        {
1142            let ex = RawExecutor::<UringReactor>::new().unwrap();
1143            let rc_clone = rc.clone();
1144            ex.spawn_local(async move {
1145                rc_clone.set(1);
1146                let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
1147                let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
1148                // Spawn a blocking task.
1149                let blocking_task = pool.0.spawn(move || {
1150                    // Rendezvous.
1151                    assert_eq!(recv.recv(), Ok(()));
1152                    // Wait for drop.
1153                    assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
1154                });
1155                // Make sure it has actually started (using a "rendezvous channel" send).
1156                //
1157                // Without this step, we'll have a race where we can shutdown the blocking pool
1158                // before the worker thread pops off the task.
1159                send.send(()).unwrap();
1160                // Wait for it to finish
1161                blocking_task.await;
1162                rc_clone.set(2);
1163            })
1164            .detach();
1165            ex.run_until(async {}).unwrap();
1166            // `ex` is dropped here. If everything is working as expected, it should drop all of
1167            // its tasks, including `send` and `pool` (in that order, which is important). `pool`'s
1168            // `Drop` impl will try to join all the worker threads, which should work because send
1169            // half of the channel closed.
1170        }
1171        assert_eq!(rc.get(), 1);
1172        Rc::try_unwrap(rc).expect("Rc had too many refs");
1173    }
1174
1175    #[test]
1176    fn drop_on_different_thread() {
1177        if !is_uring_stable() {
1178            return;
1179        }
1180
1181        let ex = RawExecutor::<UringReactor>::new().unwrap();
1182
1183        let ex2 = ex.clone();
1184        let t = thread::spawn(move || ex2.run_until(async {}));
1185
1186        t.join().unwrap().unwrap();
1187
1188        // Leave an uncompleted operation in the queue so that the drop impl will try to drive it to
1189        // completion.
1190        let (_rx, tx) = base::pipe().expect("Pipe failed");
1191        let tx = ex
1192            .reactor
1193            .register_source(&ex, &tx)
1194            .expect("Failed to register source");
1195        let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
1196        let op = tx
1197            .start_write_from_mem(
1198                None,
1199                bm,
1200                [MemRegion {
1201                    offset: 0,
1202                    len: mem::size_of::<u64>(),
1203                }],
1204            )
1205            .expect("Failed to start write from mem");
1206
1207        mem::drop(ex);
1208
1209        match block_on(op).expect_err("Pending operation completed after executor was dropped") {
1210            Error::ExecutorGone => {}
1211            e => panic!("Unexpected error after dropping executor: {e}"),
1212        }
1213    }
1214}