pub enum Executor {
Uring(Arc<RawExecutor<UringReactor>>),
Fd(Arc<RawExecutor<EpollReactor>>),
}
Expand description
An executor for scheduling tasks that poll futures to completion.
All asynchronous operations must run within an executor, which is capable of spawning futures as tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only create a new reference, not a new executor.
Note that language limitations (trait objects can have <=1 non auto trait) require this to be represented on the POSIX side as an enum, rather than a trait. This leads to some code & interface duplication, but as far as we understand that is unavoidable.
See https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75 for further details.
Examples
Concurrently wait for multiple files to become readable/writable and then read/write the data.
use std::cmp::min;
use std::error::Error;
use std::fs::{File, OpenOptions};
use cros_async::{AsyncResult, Executor, IoSource, complete3};
const CHUNK_SIZE: usize = 32;
// Write all bytes from `data` to `f`.
async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {
while data.len() > 0 {
let (count, mut buf) = f.write_from_vec(None, data).await?;
data = buf.split_off(count);
}
Ok(())
}
// Transfer `len` bytes of data from `from` to `to`.
async fn transfer_data(
from: IoSource<File>,
to: IoSource<File>,
len: usize,
) -> AsyncResult<usize> {
let mut rem = len;
while rem > 0 {
let buf = vec![0u8; min(rem, CHUNK_SIZE)];
let (count, mut data) = from.read_to_vec(None, buf).await?;
if count == 0 {
// End of file. Return the number of bytes transferred.
return Ok(len - rem);
}
data.truncate(count);
write_file(&to, data).await?;
rem = rem.saturating_sub(count);
}
Ok(len)
}
#[cfg(any(target_os = "android", target_os = "linux"))]
let ex = Executor::new()?;
let (rx, tx) = base::unix::pipe(true)?;
let zero = File::open("/dev/zero")?;
let zero_bytes = CHUNK_SIZE * 7;
let zero_to_pipe = transfer_data(
ex.async_from(zero)?,
ex.async_from(tx.try_clone()?)?,
zero_bytes,
);
let rand = File::open("/dev/urandom")?;
let rand_bytes = CHUNK_SIZE * 19;
let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
let null = OpenOptions::new().write(true).open("/dev/null")?;
let null_bytes = zero_bytes + rand_bytes;
let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
ex.run_until(complete3(
async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
))?;
#[cfg(any(target_os = "android", target_os = "linux"))]
Variants§
Uring(Arc<RawExecutor<UringReactor>>)
Fd(Arc<RawExecutor<EpollReactor>>)
Implementations§
source§impl Executor
impl Executor
sourcepub fn new() -> AsyncResult<Self>
pub fn new() -> AsyncResult<Self>
Create a new Executor
.
sourcepub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self>
pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self>
Create a new Executor
of the given ExecutorKind
.
sourcepub fn set_default_executor_kind(
executor_kind: ExecutorKind
) -> Result<(), SetDefaultExecutorKindError>
pub fn set_default_executor_kind(
executor_kind: ExecutorKind
) -> Result<(), SetDefaultExecutorKindError>
Set the default ExecutorKind for Self::new()
. This call is effective only once.
If a call is the first call, it sets the default, and set_default_executor_kind
returns Ok(())
. Otherwise, it returns SetDefaultExecutorKindError::SetMoreThanOnce
which contains the existing ExecutorKind value configured by the first call.
sourcepub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>
pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>
Create a new IoSource<F>
associated with self
. Callers may then use the returned
IoSource
to directly start async operations without needing a separate reference to the
executor.
sourcepub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output> ⓘwhere
F: Future + Send + 'static,
F::Output: Send + 'static,
pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output> ⓘwhere
F: Future + Send + 'static,
F::Output: Send + 'static,
Spawn a new future for this executor to run to completion. Callers may use the returned
TaskHandle
to await on the result of f
. Dropping the returned TaskHandle
will cancel
f
, preventing it from being polled again. To drop a TaskHandle
without canceling the
future associated with it use TaskHandle::detach
.
Examples
use futures::executor::block_on;
let task = ex.spawn(async { 7 + 13 });
let result = block_on(task);
assert_eq!(result, 20);
sourcepub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> ⓘwhere
F: Future + 'static,
F::Output: 'static,
pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> ⓘwhere
F: Future + 'static,
F::Output: 'static,
Spawn a thread-local task for this executor to drive to completion. Like spawn
but without
requiring Send
on F
or F::Output
. This method should only be called from the same
thread where run()
or run_until()
is called.
Panics
Executor::run
and Executor::run_until
will panic if they try to poll a future that was
added by calling spawn_local
from a different thread.
Examples
let task = ex.spawn_local(async { 7 + 13 });
let result = ex.run_until(task)?;
assert_eq!(result, 20);
sourcepub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> ⓘwhere
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> ⓘwhere
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
Run the provided closure on a dedicated thread where blocking is allowed.
Callers may await
on the returned TaskHandle
to wait for the result of f
. Dropping
the returned TaskHandle
may not cancel the operation if it was already started on a
worker thread.
Panics
await
ing the TaskHandle
after the Executor
is dropped will panic if the work was not
already completed.
Examples
let res = ex.spawn_blocking(move || {
// Do some CPU-intensive or blocking work here.
42
}).await;
assert_eq!(res, 42);
sourcepub fn run(&self) -> AsyncResult<()>
pub fn run(&self) -> AsyncResult<()>
Run the executor indefinitely, driving all spawned futures to completion. This method will block the current thread and only return in the case of an error.
Panics
Once this method has been called on a thread, it may only be called on that thread from that point on. Attempting to call it from another thread will panic.
Examples
use std::thread;
use cros_async::Executor;
use futures::executor::block_on;
let ex = Executor::new()?;
// Spawn a thread that runs the executor.
let ex2 = ex.clone();
thread::spawn(move || ex2.run());
let task = ex.spawn(async { 7 + 13 });
let result = block_on(task);
assert_eq!(result, 20);
sourcepub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>
pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>
Drive all futures spawned in this executor until f
completes. This method will block the
current thread only until f
is complete and there may still be unfinished futures in the
executor.
Panics
Once this method has been called on a thread, from then onwards it may only be called on that thread. Attempting to call it from another thread will panic.
Examples
use cros_async::Executor;
let ex = Executor::new()?;
let task = ex.spawn_local(async { 7 + 13 });
let result = ex.run_until(task)?;
assert_eq!(result, 20);