1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
// Copyright 2022 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Provides helpers that make it easier to process virtio queues on an async executor.
#![cfg_attr(windows, allow(dead_code))]
use anyhow::bail;
use anyhow::Context;
use base::warn;
use cros_async::AsyncResult;
use cros_async::Executor;
use cros_async::TaskHandle;
use futures::future::AbortHandle;
use futures::future::Abortable;
use futures::future::Pending;
use futures::Future;
/// A queue for which processing can be started on an async executor.
///
/// `T` is the resource type of the queue, i.e. the device-specific data it needs in order to run.
/// For instance, a block device will likely need a file to provide its data.
pub enum AsyncQueueState<T: 'static> {
/// Queue is currently stopped.
Stopped(T),
/// Queue is being processed as a `Task` on an `Executor`, and can be stopped by aborting the
/// `AbortHandle`.
Running((TaskHandle<T>, Executor, AbortHandle)),
/// Something terrible happened and this queue is in a non-recoverable state.
Broken,
}
impl<T: 'static> AsyncQueueState<T> {
/// Start processing of the queue on `ex`, or stop and restart it with the new parameters if
/// it was already running.
///
/// `fut_provider` is a closure that is passed the resource of the queue, as well as a
/// `Abortable` future. It must return a `Future` that takes ownership of the device's resource
/// and processes the queue for as long as possible, but immediately quits and returns the
/// device resource when the `Abortable` is signaled.
///
/// If `fut_provider` or the `Future` it returns end with an error, the queue is considered
/// broken and cannot be used anymore.
///
/// The task is only scheduled and no processing actually starts in this method. The task is
/// scheduled locally, which implies that `ex` must be run on the current thread.
pub fn start<
U: Future<Output = T> + 'static,
F: FnOnce(T, Abortable<Pending<()>>) -> anyhow::Result<U>,
>(
&mut self,
ex: &Executor,
fut_provider: F,
) -> anyhow::Result<()> {
if matches!(self, AsyncQueueState::Running(_)) {
warn!("queue is already running, stopping it first");
self.stop().context("while trying to restart queue")?;
}
let resource = match std::mem::replace(self, AsyncQueueState::Broken) {
AsyncQueueState::Stopped(resource) => resource,
_ => bail!("queue is in a bad state and cannot be started"),
};
let (wait_fut, abort_handle) = futures::future::abortable(futures::future::pending::<()>());
let queue_future = fut_provider(resource, wait_fut)?;
let task = ex.spawn_local(queue_future);
*self = AsyncQueueState::Running((task, ex.clone(), abort_handle));
Ok(())
}
/// Stops a previously started queue.
///
/// The executor on which the task has been started will be run if needed in order to retrieve
/// the queue's resource.
///
/// Returns `true` if the queue was running, `false` if it wasn't.
pub fn stop(&mut self) -> AsyncResult<bool> {
// TODO: schuffelen - All callers should use stop_async instead.
match std::mem::replace(self, AsyncQueueState::Broken) {
AsyncQueueState::Running((task, ex, handle)) => {
// Abort the task and run it to completion to retrieve the queue's resource.
handle.abort();
let resource = ex.run_until(task)?;
*self = AsyncQueueState::Stopped(resource);
Ok(true)
}
state => {
*self = state;
Ok(false)
}
}
}
/// Stops a previously started queue.
///
/// The executor on which the task has been started will be run if needed in order to retrieve
/// the queue's resource.
///
/// Returns `true` if the queue was running, `false` if it wasn't.
pub async fn stop_async(&mut self) -> AsyncResult<bool> {
match std::mem::replace(self, AsyncQueueState::Broken) {
AsyncQueueState::Running((task, _, handle)) => {
// Abort the task and run it to completion to retrieve the queue's resource.
handle.abort();
let resource = task.await;
*self = AsyncQueueState::Stopped(resource);
Ok(true)
}
state => {
*self = state;
Ok(false)
}
}
}
}