Enum cros_async::Executor

source ·
pub enum Executor {
    Fd(Arc<RawExecutor<EpollReactor>>),
    Uring(Arc<RawExecutor<UringReactor>>),
    Tokio(TokioExecutor),
}
Expand description

An executor for scheduling tasks that poll futures to completion.

All asynchronous operations must run within an executor, which is capable of spawning futures as tasks. This executor also provides a mechanism for performing asynchronous I/O operations.

The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only create a new reference, not a new executor.

Note that language limitations (trait objects can have <=1 non auto trait) require this to be represented on the POSIX side as an enum, rather than a trait. This leads to some code & interface duplication, but as far as we understand that is unavoidable.

See https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75 for further details.

§Examples

Concurrently wait for multiple files to become readable/writable and then read/write the data.

use std::cmp::min;
use std::error::Error;
use std::fs::{File, OpenOptions};

use cros_async::{AsyncResult, Executor, IoSource, complete3};
const CHUNK_SIZE: usize = 32;

// Write all bytes from `data` to `f`.
async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {
    while data.len() > 0 {
        let (count, mut buf) = f.write_from_vec(None, data).await?;

        data = buf.split_off(count);
    }

    Ok(())
}

// Transfer `len` bytes of data from `from` to `to`.
async fn transfer_data(
    from: IoSource<File>,
    to: IoSource<File>,
    len: usize,
) -> AsyncResult<usize> {
    let mut rem = len;

    while rem > 0 {
        let buf = vec![0u8; min(rem, CHUNK_SIZE)];
        let (count, mut data) = from.read_to_vec(None, buf).await?;

        if count == 0 {
            // End of file. Return the number of bytes transferred.
            return Ok(len - rem);
        }

        data.truncate(count);
        write_file(&to, data).await?;

        rem = rem.saturating_sub(count);
    }

    Ok(len)
}

#[cfg(any(target_os = "android", target_os = "linux"))]
    let ex = Executor::new()?;

    let (rx, tx) = base::linux::pipe()?;
    let zero = File::open("/dev/zero")?;
    let zero_bytes = CHUNK_SIZE * 7;
    let zero_to_pipe = transfer_data(
        ex.async_from(zero)?,
        ex.async_from(tx.try_clone()?)?,
        zero_bytes,
    );

    let rand = File::open("/dev/urandom")?;
    let rand_bytes = CHUNK_SIZE * 19;
    let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);

    let null = OpenOptions::new().write(true).open("/dev/null")?;
    let null_bytes = zero_bytes + rand_bytes;
    let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);

    ex.run_until(complete3(
        async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
        async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
        async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
    ))?;

#[cfg(any(target_os = "android", target_os = "linux"))]

Variants§

Implementations§

source§

impl Executor

source

pub fn new() -> AsyncResult<Self>

Create a new Executor.

source

pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self>

Create a new Executor of the given ExecutorKind.

source

pub fn set_default_executor_kind( executor_kind: ExecutorKind ) -> Result<(), SetDefaultExecutorKindError>

Set the default ExecutorKind for Self::new(). This call is effective only once.

source

pub fn async_from<'a, F: IntoAsync + 'a>( &self, f: F ) -> AsyncResult<IoSource<F>>

Create a new IoSource<F> associated with self. Callers may then use the returned IoSource to directly start async operations without needing a separate reference to the executor.

source

pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
where F: Future + Send + 'static, F::Output: Send + 'static,

Spawn a new future for this executor to run to completion. Callers may use the returned TaskHandle to await on the result of f. Dropping the returned TaskHandle will cancel f, preventing it from being polled again. To drop a TaskHandle without canceling the future associated with it use TaskHandle::detach.

§Examples

       use futures::executor::block_on;



      let task = ex.spawn(async { 7 + 13 });

      let result = block_on(task);
      assert_eq!(result, 20);
source

pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
where F: Future + 'static, F::Output: 'static,

Spawn a thread-local task for this executor to drive to completion. Like spawn but without requiring Send on F or F::Output. This method should only be called from the same thread where run() or run_until() is called.

§Panics

Executor::run and Executor::run_util will panic if they try to poll a future that was added by calling spawn_local from a different thread.

§Examples


       let task = ex.spawn_local(async { 7 + 13 });

       let result = ex.run_until(task)?;
       assert_eq!(result, 20);
       Ok(())
source

pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
where F: FnOnce() -> R + Send + 'static, R: Send + 'static,

Run the provided closure on a dedicated thread where blocking is allowed.

Callers may await on the returned TaskHandle to wait for the result of f. Dropping the returned TaskHandle may not cancel the operation if it was already started on a worker thread.

§Panics

awaiting the TaskHandle after the Executor is dropped will panic if the work was not already completed.

§Examples

    let res = ex.spawn_blocking(move || {
        // Do some CPU-intensive or blocking work here.

        42
    }).await;

    assert_eq!(res, 42);
source

pub fn run(&self) -> AsyncResult<()>

Run the executor indefinitely, driving all spawned futures to completion. This method will block the current thread and only return in the case of an error.

§Panics

Once this method has been called on a thread, it may only be called on that thread from that point on. Attempting to call it from another thread will panic.

§Examples
      use std::thread;

      use cros_async::Executor;
      use futures::executor::block_on;

      let ex = Executor::new()?;

      // Spawn a thread that runs the executor.
      let ex2 = ex.clone();
      thread::spawn(move || ex2.run());

      let task = ex.spawn(async { 7 + 13 });

      let result = block_on(task);
      assert_eq!(result, 20);
source

pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>

Drive all futures spawned in this executor until f completes. This method will block the current thread only until f is complete and there may still be unfinished futures in the executor.

§Panics

Once this method has been called on a thread, from then onwards it may only be called on that thread. Attempting to call it from another thread will panic.

§Examples
      use cros_async::Executor;

      let ex = Executor::new()?;

      let task = ex.spawn_local(async { 7 + 13 });

      let result = ex.run_until(task)?;
      assert_eq!(result, 20);

Trait Implementations§

source§

impl AsRawDescriptors for Executor

source§

fn as_raw_descriptors(&self) -> Vec<RawDescriptor>

Returns the underlying raw descriptors. Read more
source§

impl AudioStreamsExecutor for Executor

source§

fn async_unix_stream(&self, stream: UnixStream) -> Result<AsyncStream>

Create an object to allow async reads/writes from the specified UnixStream.
source§

fn delay<'life0, 'async_trait>( &'life0 self, dur: Duration ) -> Pin<Box<dyn Future<Output = Result<()>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns a future that resolves after the specified time.
source§

fn wait_fd_readable<'life0, 'async_trait>( &'life0 self, fd: RawDescriptor ) -> Pin<Box<dyn Future<Output = Result<()>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

source§

impl Clone for Executor

source§

fn clone(&self) -> Executor

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.