cros_async/
common_executor.rs

1// Copyright 2023 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::future::Future;
6use std::io::Result;
7use std::pin::Pin;
8use std::sync::atomic::AtomicI32;
9use std::sync::atomic::Ordering;
10use std::sync::Arc;
11use std::sync::Weak;
12use std::task::Context;
13use std::task::Poll;
14
15use async_task::Task;
16use base::warn;
17use base::AsRawDescriptor;
18use base::AsRawDescriptors;
19use base::RawDescriptor;
20use futures::task::noop_waker;
21use pin_utils::pin_mut;
22use sync::Mutex;
23
24use crate::queue::RunnableQueue;
25use crate::waker::WeakWake;
26use crate::AsyncError;
27use crate::AsyncResult;
28use crate::BlockingPool;
29use crate::DetachedTasks;
30use crate::ExecutorTrait;
31use crate::IntoAsync;
32use crate::IoSource;
33use crate::TaskHandle;
34
35/// Abstraction for IO backends.
36pub trait Reactor: Send + Sync + Sized {
37    fn new() -> Result<Self>;
38
39    /// Called when the executor is being dropped to allow orderly shutdown (e.g. cancelling IO
40    /// work). The returned future will be run to completion.
41    ///
42    /// Note that, since this is called from `RawExecutor::drop`, there will not be any
43    /// `Arc<Executor>` left, so weak references to the executor will always fail to upgrade at
44    /// this point. Reactors can potentially make use of this fact to keep more IO work from being
45    /// submitted.
46    fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
47
48    /// Called when an executor run loop starts on a thread.
49    fn on_thread_start(&self) {}
50
51    /// Block until an event occurs (e.g. IO work is ready) or until `wake` is called.
52    ///
53    /// As an optimization, `set_processing` should be called immediately after wake up (i.e.
54    /// before any book keeping is done) so that concurrent calls to wakers can safely skip making
55    /// redundant calls to `Reactor::wake`.
56    fn wait_for_work(&self, set_processing: impl Fn()) -> Result<()>;
57
58    /// Wake up any pending `wait_for_work` calls. If there are none pending, then wake up the next
59    /// `wait_for_work` call (necessary to avoid race conditions).
60    fn wake(&self);
61
62    /// Create an `IoSource` for the backend.
63    fn new_source<F: AsRawDescriptor>(
64        &self,
65        ex: &Arc<RawExecutor<Self>>,
66        f: F,
67    ) -> AsyncResult<IoSource<F>>;
68
69    fn wrap_task_handle<R>(task: RawTaskHandle<Self, R>) -> TaskHandle<R>;
70}
71
72// Indicates the executor is either within or about to make a `Reactor::wait_for_work` call. When a
73// waker sees this value, it will call `Reactor::wake`.
74const WAITING: i32 = 0x1d5b_c019u32 as i32;
75
76// Indicates the executor is processing futures.
77const PROCESSING: i32 = 0xd474_77bcu32 as i32;
78
79// Indicates one or more futures may be ready to make progress (i.e. causes the main loop to return
80// diretly to PROCESSING instead of WAITING).
81const WOKEN: i32 = 0x3e4d_3276u32 as i32;
82
83pub struct RawExecutor<Re: Reactor + 'static> {
84    pub reactor: Re,
85    queue: RunnableQueue,
86    blocking_pool: BlockingPool,
87    state: AtomicI32,
88    detached_tasks: Mutex<DetachedTasks>,
89}
90
91impl<Re: Reactor> RawExecutor<Re> {
92    pub fn new_with(reactor: Re) -> AsyncResult<Arc<Self>> {
93        Ok(Arc::new(RawExecutor {
94            reactor,
95            queue: RunnableQueue::new(),
96            blocking_pool: Default::default(),
97            state: AtomicI32::new(PROCESSING),
98            detached_tasks: Mutex::new(DetachedTasks::new()),
99        }))
100    }
101
102    pub fn new() -> AsyncResult<Arc<Self>> {
103        Self::new_with(Re::new().map_err(AsyncError::Io)?)
104    }
105
106    fn wake(&self) {
107        let oldstate = self.state.swap(WOKEN, Ordering::AcqRel);
108        if oldstate == WAITING {
109            self.reactor.wake();
110        }
111    }
112
113    fn run_internal<F: Future>(&self, cx: &mut Context, done: F) -> AsyncResult<F::Output> {
114        self.reactor.on_thread_start();
115
116        pin_mut!(done);
117
118        loop {
119            self.state.store(PROCESSING, Ordering::Release);
120            for runnable in self.queue.iter() {
121                runnable.run();
122            }
123
124            if let Ok(mut tasks) = self.detached_tasks.try_lock() {
125                tasks.poll(cx);
126            }
127
128            if let Poll::Ready(val) = done.as_mut().poll(cx) {
129                return Ok(val);
130            }
131
132            let oldstate = self.state.compare_exchange(
133                PROCESSING,
134                WAITING,
135                Ordering::AcqRel,
136                Ordering::Acquire,
137            );
138            if let Err(oldstate) = oldstate {
139                debug_assert_eq!(oldstate, WOKEN);
140                // One or more futures have become runnable.
141                continue;
142            }
143
144            self.reactor
145                .wait_for_work(|| self.state.store(PROCESSING, Ordering::Release))
146                .map_err(AsyncError::Io)?;
147        }
148    }
149}
150
151impl<Re: Reactor + 'static> ExecutorTrait for Arc<RawExecutor<Re>> {
152    fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
153        self.reactor.new_source(self, f)
154    }
155
156    fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
157    where
158        F: Future + Send + 'static,
159        F::Output: Send + 'static,
160    {
161        let raw = Arc::downgrade(self);
162        let schedule = move |runnable| {
163            if let Some(r) = raw.upgrade() {
164                r.queue.push_back(runnable);
165                r.wake();
166            }
167        };
168        let (runnable, task) = async_task::spawn(f, schedule);
169        runnable.schedule();
170        Re::wrap_task_handle(RawTaskHandle {
171            task,
172            raw: Arc::downgrade(self),
173        })
174    }
175
176    fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
177    where
178        F: Future + 'static,
179        F::Output: 'static,
180    {
181        let raw = Arc::downgrade(self);
182        let schedule = move |runnable| {
183            if let Some(r) = raw.upgrade() {
184                r.queue.push_back(runnable);
185                r.wake();
186            }
187        };
188        let (runnable, task) = async_task::spawn_local(f, schedule);
189        runnable.schedule();
190        Re::wrap_task_handle(RawTaskHandle {
191            task,
192            raw: Arc::downgrade(self),
193        })
194    }
195
196    fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
197    where
198        F: FnOnce() -> R + Send + 'static,
199        R: Send + 'static,
200    {
201        self.spawn(self.blocking_pool.spawn(f))
202    }
203
204    fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
205        let waker = super::waker::new_waker(Arc::downgrade(self));
206        let mut ctx = Context::from_waker(&waker);
207
208        self.run_internal(&mut ctx, f)
209    }
210}
211
212impl<Re: Reactor + AsRawDescriptors> AsRawDescriptors for RawExecutor<Re> {
213    fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
214        self.reactor.as_raw_descriptors()
215    }
216}
217
218impl<Re: Reactor> WeakWake for RawExecutor<Re> {
219    fn wake_by_ref(weak_self: &Weak<Self>) {
220        if let Some(arc_self) = weak_self.upgrade() {
221            RawExecutor::wake(&arc_self);
222        }
223    }
224}
225
226impl<Re: Reactor> Drop for RawExecutor<Re> {
227    fn drop(&mut self) {
228        let final_future = self.reactor.on_executor_drop();
229
230        let waker = noop_waker();
231        let mut cx = Context::from_waker(&waker);
232        if let Err(e) = self.run_internal(&mut cx, final_future) {
233            warn!("Failed to drive RawExecutor to completion: {}", e);
234        }
235    }
236}
237
238pub struct RawTaskHandle<Re: Reactor + 'static, R> {
239    task: Task<R>,
240    raw: Weak<RawExecutor<Re>>,
241}
242
243impl<Re: Reactor, R: Send + 'static> RawTaskHandle<Re, R> {
244    pub fn detach(self) {
245        if let Some(raw) = self.raw.upgrade() {
246            raw.detached_tasks.lock().push(self.task);
247        }
248    }
249
250    pub async fn cancel(self) -> Option<R> {
251        self.task.cancel().await
252    }
253}
254
255impl<Re: Reactor, R: 'static> Future for RawTaskHandle<Re, R> {
256    type Output = R;
257
258    fn poll(
259        mut self: std::pin::Pin<&mut Self>,
260        cx: &mut std::task::Context,
261    ) -> std::task::Poll<Self::Output> {
262        Pin::new(&mut self.task).poll(cx)
263    }
264}