cros_async/
executor.rs

1// Copyright 2024 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::sync::OnceLock;
9
10#[cfg(any(target_os = "android", target_os = "linux"))]
11use base::warn;
12#[cfg(any(target_os = "android", target_os = "linux"))]
13use base::AsRawDescriptors;
14#[cfg(any(target_os = "android", target_os = "linux"))]
15use base::RawDescriptor;
16use serde::Deserialize;
17use serde_keyvalue::argh::FromArgValue;
18use serde_keyvalue::ErrorKind;
19use serde_keyvalue::KeyValueDeserializer;
20
21use crate::common_executor;
22use crate::common_executor::RawExecutor;
23#[cfg(any(target_os = "android", target_os = "linux"))]
24use crate::sys::linux;
25#[cfg(windows)]
26use crate::sys::windows;
27use crate::sys::ExecutorKindSys;
28use crate::AsyncResult;
29use crate::IntoAsync;
30use crate::IoSource;
31
32cfg_if::cfg_if! {
33    if #[cfg(feature = "tokio")] {
34        use crate::tokio_executor::TokioExecutor;
35        use crate::tokio_executor::TokioTaskHandle;
36    }
37}
38
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum ExecutorKind {
41    SysVariants(ExecutorKindSys),
42    #[cfg(feature = "tokio")]
43    Tokio,
44}
45
46impl From<ExecutorKindSys> for ExecutorKind {
47    fn from(e: ExecutorKindSys) -> ExecutorKind {
48        ExecutorKind::SysVariants(e)
49    }
50}
51
52/// If set, [`ExecutorKind::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`.
53/// If not set, [`ExecutorKind::default()`] returns a statically-chosen default value, and
54/// [`ExecutorKind::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value.
55static DEFAULT_EXECUTOR_KIND: OnceLock<ExecutorKind> = OnceLock::new();
56
57impl Default for ExecutorKind {
58    fn default() -> Self {
59        #[cfg(any(target_os = "android", target_os = "linux"))]
60        let default_fn = || ExecutorKindSys::Fd.into();
61        #[cfg(windows)]
62        let default_fn = || ExecutorKindSys::Handle.into();
63        *DEFAULT_EXECUTOR_KIND.get_or_init(default_fn)
64    }
65}
66
67/// The error type for [`Executor::set_default_executor_kind()`].
68#[derive(thiserror::Error, Debug)]
69pub enum SetDefaultExecutorKindError {
70    /// The default executor kind is set more than once.
71    #[error("The default executor kind is already set to {0:?}")]
72    SetMoreThanOnce(ExecutorKind),
73
74    #[cfg(any(target_os = "android", target_os = "linux"))]
75    /// io_uring is unavailable. The reason might be the lack of the kernel support,
76    /// but is not limited to that.
77    #[error("io_uring is unavailable: {0}")]
78    UringUnavailable(linux::uring_executor::Error),
79}
80
81impl FromArgValue for ExecutorKind {
82    fn from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String> {
83        // `from_arg_value` returns a `String` as error, but our deserializer API defines its own
84        // error type. Perform parsing from a closure so we can easily map returned errors.
85        let builder = move || {
86            let mut des = KeyValueDeserializer::from(value);
87
88            let kind: ExecutorKind = match (des.parse_identifier()?, des.next_char()) {
89                #[cfg(any(target_os = "android", target_os = "linux"))]
90                ("epoll", None) => ExecutorKindSys::Fd.into(),
91                #[cfg(any(target_os = "android", target_os = "linux"))]
92                ("uring", None) => ExecutorKindSys::Uring.into(),
93                #[cfg(windows)]
94                ("handle", None) => ExecutorKindSys::Handle.into(),
95                #[cfg(windows)]
96                ("overlapped", None) => ExecutorKindSys::Overlapped { concurrency: None }.into(),
97                #[cfg(windows)]
98                ("overlapped", Some(',')) => {
99                    if des.parse_identifier()? != "concurrency" {
100                        let kind = ErrorKind::SerdeError("expected `concurrency`".to_string());
101                        return Err(des.error_here(kind));
102                    }
103                    if des.next_char() != Some('=') {
104                        return Err(des.error_here(ErrorKind::ExpectedEqual));
105                    }
106                    let concurrency = des.parse_number()?;
107                    ExecutorKindSys::Overlapped {
108                        concurrency: Some(concurrency),
109                    }
110                    .into()
111                }
112                #[cfg(feature = "tokio")]
113                ("tokio", None) => ExecutorKind::Tokio,
114                (_identifier, _next) => {
115                    let kind = ErrorKind::SerdeError("unexpected kind".to_string());
116                    return Err(des.error_here(kind));
117                }
118            };
119            des.finish()?;
120            Ok(kind)
121        };
122
123        builder().map_err(|e| e.to_string())
124    }
125}
126
127impl serde::Serialize for ExecutorKind {
128    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
129    where
130        S: serde::Serializer,
131    {
132        match self {
133            ExecutorKind::SysVariants(sv) => sv.serialize(serializer),
134            #[cfg(feature = "tokio")]
135            ExecutorKind::Tokio => "tokio".serialize(serializer),
136        }
137    }
138}
139
140impl<'de> Deserialize<'de> for ExecutorKind {
141    fn deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error>
142    where
143        D: serde::Deserializer<'de>,
144    {
145        let string = String::deserialize(deserializer)?;
146        ExecutorKind::from_arg_value(&string).map_err(serde::de::Error::custom)
147    }
148}
149
150/// Reference to a task managed by the executor.
151///
152/// Dropping a `TaskHandle` attempts to cancel the associated task. Call `detach` to allow it to
153/// continue running the background.
154///
155/// `await`ing the `TaskHandle` waits for the task to finish and yields its result.
156pub enum TaskHandle<R> {
157    #[cfg(any(target_os = "android", target_os = "linux"))]
158    Fd(common_executor::RawTaskHandle<linux::EpollReactor, R>),
159    #[cfg(any(target_os = "android", target_os = "linux"))]
160    Uring(common_executor::RawTaskHandle<linux::UringReactor, R>),
161    #[cfg(windows)]
162    Handle(common_executor::RawTaskHandle<windows::HandleReactor, R>),
163    #[cfg(feature = "tokio")]
164    Tokio(TokioTaskHandle<R>),
165}
166
167impl<R: Send + 'static> TaskHandle<R> {
168    pub fn detach(self) {
169        match self {
170            #[cfg(any(target_os = "android", target_os = "linux"))]
171            TaskHandle::Fd(f) => f.detach(),
172            #[cfg(any(target_os = "android", target_os = "linux"))]
173            TaskHandle::Uring(u) => u.detach(),
174            #[cfg(windows)]
175            TaskHandle::Handle(h) => h.detach(),
176            #[cfg(feature = "tokio")]
177            TaskHandle::Tokio(t) => t.detach(),
178        }
179    }
180
181    // Cancel the task and wait for it to stop. Returns the result of the task if it was already
182    // finished.
183    pub async fn cancel(self) -> Option<R> {
184        match self {
185            #[cfg(any(target_os = "android", target_os = "linux"))]
186            TaskHandle::Fd(f) => f.cancel().await,
187            #[cfg(any(target_os = "android", target_os = "linux"))]
188            TaskHandle::Uring(u) => u.cancel().await,
189            #[cfg(windows)]
190            TaskHandle::Handle(h) => h.cancel().await,
191            #[cfg(feature = "tokio")]
192            TaskHandle::Tokio(t) => t.cancel().await,
193        }
194    }
195}
196
197impl<R: 'static> Future for TaskHandle<R> {
198    type Output = R;
199
200    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
201        match self.get_mut() {
202            #[cfg(any(target_os = "android", target_os = "linux"))]
203            TaskHandle::Fd(f) => Pin::new(f).poll(cx),
204            #[cfg(any(target_os = "android", target_os = "linux"))]
205            TaskHandle::Uring(u) => Pin::new(u).poll(cx),
206            #[cfg(windows)]
207            TaskHandle::Handle(h) => Pin::new(h).poll(cx),
208            #[cfg(feature = "tokio")]
209            TaskHandle::Tokio(t) => Pin::new(t).poll(cx),
210        }
211    }
212}
213
214pub(crate) trait ExecutorTrait {
215    fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>;
216
217    fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
218    where
219        F: Future + Send + 'static,
220        F::Output: Send + 'static;
221
222    fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
223    where
224        F: FnOnce() -> R + Send + 'static,
225        R: Send + 'static;
226
227    fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
228    where
229        F: Future + 'static,
230        F::Output: 'static;
231
232    fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>;
233}
234
235/// An executor for scheduling tasks that poll futures to completion.
236///
237/// All asynchronous operations must run within an executor, which is capable of spawning futures as
238/// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
239///
240/// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
241/// create a new reference, not a new executor.
242///
243/// Note that language limitations (trait objects can have <=1 non auto trait) require this to be
244/// represented on the POSIX side as an enum, rather than a trait. This leads to some code &
245/// interface duplication, but as far as we understand that is unavoidable.
246///
247/// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75>
248/// for further details.
249///
250/// # Examples
251///
252/// Concurrently wait for multiple files to become readable/writable and then read/write the data.
253///
254/// ```
255/// use std::cmp::min;
256/// use std::error::Error;
257/// use std::fs::{File, OpenOptions};
258///
259/// use cros_async::{AsyncResult, Executor, IoSource, complete3};
260/// const CHUNK_SIZE: usize = 32;
261///
262/// // Write all bytes from `data` to `f`.
263/// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {
264///     while data.len() > 0 {
265///         let (count, mut buf) = f.write_from_vec(None, data).await?;
266///
267///         data = buf.split_off(count);
268///     }
269///
270///     Ok(())
271/// }
272///
273/// // Transfer `len` bytes of data from `from` to `to`.
274/// async fn transfer_data(
275///     from: IoSource<File>,
276///     to: IoSource<File>,
277///     len: usize,
278/// ) -> AsyncResult<usize> {
279///     let mut rem = len;
280///
281///     while rem > 0 {
282///         let buf = vec![0u8; min(rem, CHUNK_SIZE)];
283///         let (count, mut data) = from.read_to_vec(None, buf).await?;
284///
285///         if count == 0 {
286///             // End of file. Return the number of bytes transferred.
287///             return Ok(len - rem);
288///         }
289///
290///         data.truncate(count);
291///         write_file(&to, data).await?;
292///
293///         rem = rem.saturating_sub(count);
294///     }
295///
296///     Ok(len)
297/// }
298///
299/// #[cfg(any(target_os = "android", target_os = "linux"))]
300/// # fn do_it() -> Result<(), Box<dyn Error>> {
301///     let ex = Executor::new()?;
302///
303///     let (rx, tx) = base::linux::pipe()?;
304///     let zero = File::open("/dev/zero")?;
305///     let zero_bytes = CHUNK_SIZE * 7;
306///     let zero_to_pipe = transfer_data(
307///         ex.async_from(zero)?,
308///         ex.async_from(tx.try_clone()?)?,
309///         zero_bytes,
310///     );
311///
312///     let rand = File::open("/dev/urandom")?;
313///     let rand_bytes = CHUNK_SIZE * 19;
314///     let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
315///
316///     let null = OpenOptions::new().write(true).open("/dev/null")?;
317///     let null_bytes = zero_bytes + rand_bytes;
318///     let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
319///
320///     ex.run_until(complete3(
321///         async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
322///         async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
323///         async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
324///     ))?;
325///
326/// #     Ok(())
327/// # }
328/// #[cfg(any(target_os = "android", target_os = "linux"))]
329/// # do_it().unwrap();
330/// ```
331#[derive(Clone)]
332pub enum Executor {
333    #[cfg(any(target_os = "android", target_os = "linux"))]
334    Fd(Arc<RawExecutor<linux::EpollReactor>>),
335    #[cfg(any(target_os = "android", target_os = "linux"))]
336    Uring(Arc<RawExecutor<linux::UringReactor>>),
337    #[cfg(windows)]
338    Handle(Arc<RawExecutor<windows::HandleReactor>>),
339    #[cfg(windows)]
340    Overlapped(Arc<RawExecutor<windows::HandleReactor>>),
341    #[cfg(feature = "tokio")]
342    Tokio(TokioExecutor),
343}
344
345impl Executor {
346    /// Create a new `Executor`.
347    pub fn new() -> AsyncResult<Self> {
348        Executor::with_executor_kind(ExecutorKind::default())
349    }
350
351    /// Create a new `Executor` of the given `ExecutorKind`.
352    pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> {
353        Ok(match kind {
354            #[cfg(any(target_os = "android", target_os = "linux"))]
355            ExecutorKind::SysVariants(ExecutorKindSys::Fd) => Executor::Fd(RawExecutor::new()?),
356            #[cfg(any(target_os = "android", target_os = "linux"))]
357            ExecutorKind::SysVariants(ExecutorKindSys::Uring) => {
358                Executor::Uring(RawExecutor::new()?)
359            }
360            #[cfg(windows)]
361            ExecutorKind::SysVariants(ExecutorKindSys::Handle) => {
362                Executor::Handle(RawExecutor::new()?)
363            }
364            #[cfg(windows)]
365            ExecutorKind::SysVariants(ExecutorKindSys::Overlapped { concurrency }) => {
366                let reactor = match concurrency {
367                    Some(concurrency) => windows::HandleReactor::new_with(concurrency)?,
368                    None => windows::HandleReactor::new()?,
369                };
370                Executor::Overlapped(RawExecutor::new_with(reactor)?)
371            }
372            #[cfg(feature = "tokio")]
373            ExecutorKind::Tokio => Executor::Tokio(TokioExecutor::new()?),
374        })
375    }
376
377    /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once.
378    pub fn set_default_executor_kind(
379        executor_kind: ExecutorKind,
380    ) -> Result<(), SetDefaultExecutorKindError> {
381        #[cfg(any(target_os = "android", target_os = "linux"))]
382        if executor_kind == ExecutorKind::SysVariants(ExecutorKindSys::Uring) {
383            linux::uring_executor::check_uring_availability()
384                .map_err(SetDefaultExecutorKindError::UringUnavailable)?;
385            if !crate::is_uring_stable() {
386                warn!(
387                    "Enabling io_uring executor on the kernel version where io_uring is unstable"
388                );
389            }
390        }
391        DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_|
392            // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set.
393            SetDefaultExecutorKindError::SetMoreThanOnce(
394                *DEFAULT_EXECUTOR_KIND
395                    .get()
396                    .expect("Failed to get DEFAULT_EXECUTOR_KIND"),
397            ))
398    }
399
400    /// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned
401    /// `IoSource` to directly start async operations without needing a separate reference to the
402    /// executor.
403    pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
404        match self {
405            #[cfg(any(target_os = "android", target_os = "linux"))]
406            Executor::Fd(ex) => ex.async_from(f),
407            #[cfg(any(target_os = "android", target_os = "linux"))]
408            Executor::Uring(ex) => ex.async_from(f),
409            #[cfg(windows)]
410            Executor::Handle(ex) => ex.async_from(f),
411            #[cfg(windows)]
412            Executor::Overlapped(ex) => ex.async_from(f),
413            #[cfg(feature = "tokio")]
414            Executor::Tokio(ex) => ex.async_from(f),
415        }
416    }
417
418    /// Create a new overlapped `IoSource<F>` associated with `self`. Callers may then use the
419    /// If the executor is not overlapped, then Handle source is returned.
420    /// returned `IoSource` to directly start async operations without needing a separate reference
421    /// to the executor.
422    #[cfg(windows)]
423    pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
424        match self {
425            Executor::Overlapped(ex) => Ok(IoSource::Overlapped(windows::OverlappedSource::new(
426                f, ex, false,
427            )?)),
428            _ => self.async_from(f),
429        }
430    }
431
432    /// Spawn a new future for this executor to run to completion. Callers may use the returned
433    /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel
434    /// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the
435    /// future associated with it use `TaskHandle::detach`.
436    ///
437    /// # Examples
438    ///
439    /// ```
440    /// # use cros_async::AsyncResult;
441    /// # fn example_spawn() -> AsyncResult<()> {
442    /// #      use std::thread;
443    ///
444    /// #      use cros_async::Executor;
445    ///        use futures::executor::block_on;
446    ///
447    /// #      let ex = Executor::new()?;
448    ///
449    /// #      // Spawn a thread that runs the executor.
450    /// #      let ex2 = ex.clone();
451    /// #      thread::spawn(move || ex2.run());
452    ///
453    ///       let task = ex.spawn(async { 7 + 13 });
454    ///
455    ///       let result = block_on(task);
456    ///       assert_eq!(result, 20);
457    /// #     Ok(())
458    /// # }
459    ///
460    /// # example_spawn().unwrap();
461    /// ```
462    pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
463    where
464        F: Future + Send + 'static,
465        F::Output: Send + 'static,
466    {
467        match self {
468            #[cfg(any(target_os = "android", target_os = "linux"))]
469            Executor::Fd(ex) => ex.spawn(f),
470            #[cfg(any(target_os = "android", target_os = "linux"))]
471            Executor::Uring(ex) => ex.spawn(f),
472            #[cfg(windows)]
473            Executor::Handle(ex) => ex.spawn(f),
474            #[cfg(windows)]
475            Executor::Overlapped(ex) => ex.spawn(f),
476            #[cfg(feature = "tokio")]
477            Executor::Tokio(ex) => ex.spawn(f),
478        }
479    }
480
481    /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
482    /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
483    /// thread where `run()` or `run_until()` is called.
484    ///
485    /// # Panics
486    ///
487    /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
488    /// added by calling `spawn_local` from a different thread.
489    ///
490    /// # Examples
491    ///
492    /// ```
493    /// # use cros_async::AsyncResult;
494    /// # fn example_spawn_local() -> AsyncResult<()> {
495    /// #      use cros_async::Executor;
496    ///
497    /// #      let ex = Executor::new()?;
498    ///
499    ///        let task = ex.spawn_local(async { 7 + 13 });
500    ///
501    ///        let result = ex.run_until(task)?;
502    ///        assert_eq!(result, 20);
503    ///        Ok(())
504    /// # }
505    ///
506    /// # example_spawn_local().unwrap();
507    /// ```
508    pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
509    where
510        F: Future + 'static,
511        F::Output: 'static,
512    {
513        match self {
514            #[cfg(any(target_os = "android", target_os = "linux"))]
515            Executor::Fd(ex) => ex.spawn_local(f),
516            #[cfg(any(target_os = "android", target_os = "linux"))]
517            Executor::Uring(ex) => ex.spawn_local(f),
518            #[cfg(windows)]
519            Executor::Handle(ex) => ex.spawn_local(f),
520            #[cfg(windows)]
521            Executor::Overlapped(ex) => ex.spawn_local(f),
522            #[cfg(feature = "tokio")]
523            Executor::Tokio(ex) => ex.spawn_local(f),
524        }
525    }
526
527    /// Run the provided closure on a dedicated thread where blocking is allowed.
528    ///
529    /// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping
530    /// the returned `TaskHandle` may not cancel the operation if it was already started on a
531    /// worker thread.
532    ///
533    /// # Panics
534    ///
535    /// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not
536    /// already completed.
537    ///
538    /// # Examples
539    ///
540    /// ```edition2018
541    /// # use cros_async::Executor;
542    ///
543    /// # async fn do_it(ex: &Executor) {
544    ///     let res = ex.spawn_blocking(move || {
545    ///         // Do some CPU-intensive or blocking work here.
546    ///
547    ///         42
548    ///     }).await;
549    ///
550    ///     assert_eq!(res, 42);
551    /// # }
552    ///
553    /// # let ex = Executor::new().unwrap();
554    /// # ex.run_until(do_it(&ex)).unwrap();
555    /// ```
556    pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
557    where
558        F: FnOnce() -> R + Send + 'static,
559        R: Send + 'static,
560    {
561        match self {
562            #[cfg(any(target_os = "android", target_os = "linux"))]
563            Executor::Fd(ex) => ex.spawn_blocking(f),
564            #[cfg(any(target_os = "android", target_os = "linux"))]
565            Executor::Uring(ex) => ex.spawn_blocking(f),
566            #[cfg(windows)]
567            Executor::Handle(ex) => ex.spawn_blocking(f),
568            #[cfg(windows)]
569            Executor::Overlapped(ex) => ex.spawn_blocking(f),
570            #[cfg(feature = "tokio")]
571            Executor::Tokio(ex) => ex.spawn_blocking(f),
572        }
573    }
574
575    /// Run the executor indefinitely, driving all spawned futures to completion. This method will
576    /// block the current thread and only return in the case of an error.
577    ///
578    /// # Panics
579    ///
580    /// Once this method has been called on a thread, it may only be called on that thread from that
581    /// point on. Attempting to call it from another thread will panic.
582    ///
583    /// # Examples
584    ///
585    /// ```
586    /// # use cros_async::AsyncResult;
587    /// # fn example_run() -> AsyncResult<()> {
588    ///       use std::thread;
589    ///
590    ///       use cros_async::Executor;
591    ///       use futures::executor::block_on;
592    ///
593    ///       let ex = Executor::new()?;
594    ///
595    ///       // Spawn a thread that runs the executor.
596    ///       let ex2 = ex.clone();
597    ///       thread::spawn(move || ex2.run());
598    ///
599    ///       let task = ex.spawn(async { 7 + 13 });
600    ///
601    ///       let result = block_on(task);
602    ///       assert_eq!(result, 20);
603    /// #     Ok(())
604    /// # }
605    ///
606    /// # example_run().unwrap();
607    /// ```
608    pub fn run(&self) -> AsyncResult<()> {
609        self.run_until(std::future::pending())
610    }
611
612    /// Drive all futures spawned in this executor until `f` completes. This method will block the
613    /// current thread only until `f` is complete and there may still be unfinished futures in the
614    /// executor.
615    ///
616    /// # Panics
617    ///
618    /// Once this method has been called on a thread, from then onwards it may only be called on
619    /// that thread. Attempting to call it from another thread will panic.
620    ///
621    /// # Examples
622    ///
623    /// ```
624    /// # use cros_async::AsyncResult;
625    /// # fn example_run_until() -> AsyncResult<()> {
626    ///       use cros_async::Executor;
627    ///
628    ///       let ex = Executor::new()?;
629    ///
630    ///       let task = ex.spawn_local(async { 7 + 13 });
631    ///
632    ///       let result = ex.run_until(task)?;
633    ///       assert_eq!(result, 20);
634    /// #     Ok(())
635    /// # }
636    ///
637    /// # example_run_until().unwrap();
638    /// ```
639    pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
640        match self {
641            #[cfg(any(target_os = "android", target_os = "linux"))]
642            Executor::Fd(ex) => ex.run_until(f),
643            #[cfg(any(target_os = "android", target_os = "linux"))]
644            Executor::Uring(ex) => ex.run_until(f),
645            #[cfg(windows)]
646            Executor::Handle(ex) => ex.run_until(f),
647            #[cfg(windows)]
648            Executor::Overlapped(ex) => ex.run_until(f),
649            #[cfg(feature = "tokio")]
650            Executor::Tokio(ex) => ex.run_until(f),
651        }
652    }
653}
654
655#[cfg(any(target_os = "android", target_os = "linux"))]
656impl AsRawDescriptors for Executor {
657    fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
658        match self {
659            Executor::Fd(ex) => ex.as_raw_descriptors(),
660            Executor::Uring(ex) => ex.as_raw_descriptors(),
661            #[cfg(feature = "tokio")]
662            Executor::Tokio(ex) => ex.as_raw_descriptors(),
663        }
664    }
665}