cros_async/
common_executor.rs1use std::future::Future;
6use std::io::Result;
7use std::pin::Pin;
8use std::sync::atomic::AtomicI32;
9use std::sync::atomic::Ordering;
10use std::sync::Arc;
11use std::sync::Weak;
12use std::task::Context;
13use std::task::Poll;
14
15use async_task::Task;
16use base::warn;
17use base::AsRawDescriptor;
18use base::AsRawDescriptors;
19use base::RawDescriptor;
20use futures::task::noop_waker;
21use pin_utils::pin_mut;
22use sync::Mutex;
23
24use crate::queue::RunnableQueue;
25use crate::waker::WeakWake;
26use crate::AsyncError;
27use crate::AsyncResult;
28use crate::BlockingPool;
29use crate::DetachedTasks;
30use crate::ExecutorTrait;
31use crate::IntoAsync;
32use crate::IoSource;
33use crate::TaskHandle;
34
35pub trait Reactor: Send + Sync + Sized {
37 fn new() -> Result<Self>;
38
39 fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
47
48 fn on_thread_start(&self) {}
50
51 fn wait_for_work(&self, set_processing: impl Fn()) -> Result<()>;
57
58 fn wake(&self);
61
62 fn new_source<F: AsRawDescriptor>(
64 &self,
65 ex: &Arc<RawExecutor<Self>>,
66 f: F,
67 ) -> AsyncResult<IoSource<F>>;
68
69 fn wrap_task_handle<R>(task: RawTaskHandle<Self, R>) -> TaskHandle<R>;
70}
71
72const WAITING: i32 = 0x1d5b_c019u32 as i32;
75
76const PROCESSING: i32 = 0xd474_77bcu32 as i32;
78
79const WOKEN: i32 = 0x3e4d_3276u32 as i32;
82
83pub struct RawExecutor<Re: Reactor + 'static> {
84 pub reactor: Re,
85 queue: RunnableQueue,
86 blocking_pool: BlockingPool,
87 state: AtomicI32,
88 detached_tasks: Mutex<DetachedTasks>,
89}
90
91impl<Re: Reactor> RawExecutor<Re> {
92 pub fn new_with(reactor: Re) -> AsyncResult<Arc<Self>> {
93 Ok(Arc::new(RawExecutor {
94 reactor,
95 queue: RunnableQueue::new(),
96 blocking_pool: Default::default(),
97 state: AtomicI32::new(PROCESSING),
98 detached_tasks: Mutex::new(DetachedTasks::new()),
99 }))
100 }
101
102 pub fn new() -> AsyncResult<Arc<Self>> {
103 Self::new_with(Re::new().map_err(AsyncError::Io)?)
104 }
105
106 fn wake(&self) {
107 let oldstate = self.state.swap(WOKEN, Ordering::AcqRel);
108 if oldstate == WAITING {
109 self.reactor.wake();
110 }
111 }
112
113 fn run_internal<F: Future>(&self, cx: &mut Context, done: F) -> AsyncResult<F::Output> {
114 self.reactor.on_thread_start();
115
116 pin_mut!(done);
117
118 loop {
119 self.state.store(PROCESSING, Ordering::Release);
120 for runnable in self.queue.iter() {
121 runnable.run();
122 }
123
124 if let Ok(mut tasks) = self.detached_tasks.try_lock() {
125 tasks.poll(cx);
126 }
127
128 if let Poll::Ready(val) = done.as_mut().poll(cx) {
129 return Ok(val);
130 }
131
132 let oldstate = self.state.compare_exchange(
133 PROCESSING,
134 WAITING,
135 Ordering::AcqRel,
136 Ordering::Acquire,
137 );
138 if let Err(oldstate) = oldstate {
139 debug_assert_eq!(oldstate, WOKEN);
140 continue;
142 }
143
144 self.reactor
145 .wait_for_work(|| self.state.store(PROCESSING, Ordering::Release))
146 .map_err(AsyncError::Io)?;
147 }
148 }
149}
150
151impl<Re: Reactor + 'static> ExecutorTrait for Arc<RawExecutor<Re>> {
152 fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
153 self.reactor.new_source(self, f)
154 }
155
156 fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
157 where
158 F: Future + Send + 'static,
159 F::Output: Send + 'static,
160 {
161 let raw = Arc::downgrade(self);
162 let schedule = move |runnable| {
163 if let Some(r) = raw.upgrade() {
164 r.queue.push_back(runnable);
165 r.wake();
166 }
167 };
168 let (runnable, task) = async_task::spawn(f, schedule);
169 runnable.schedule();
170 Re::wrap_task_handle(RawTaskHandle {
171 task,
172 raw: Arc::downgrade(self),
173 })
174 }
175
176 fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
177 where
178 F: Future + 'static,
179 F::Output: 'static,
180 {
181 let raw = Arc::downgrade(self);
182 let schedule = move |runnable| {
183 if let Some(r) = raw.upgrade() {
184 r.queue.push_back(runnable);
185 r.wake();
186 }
187 };
188 let (runnable, task) = async_task::spawn_local(f, schedule);
189 runnable.schedule();
190 Re::wrap_task_handle(RawTaskHandle {
191 task,
192 raw: Arc::downgrade(self),
193 })
194 }
195
196 fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
197 where
198 F: FnOnce() -> R + Send + 'static,
199 R: Send + 'static,
200 {
201 self.spawn(self.blocking_pool.spawn(f))
202 }
203
204 fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
205 let waker = super::waker::new_waker(Arc::downgrade(self));
206 let mut ctx = Context::from_waker(&waker);
207
208 self.run_internal(&mut ctx, f)
209 }
210}
211
212impl<Re: Reactor + AsRawDescriptors> AsRawDescriptors for RawExecutor<Re> {
213 fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
214 self.reactor.as_raw_descriptors()
215 }
216}
217
218impl<Re: Reactor> WeakWake for RawExecutor<Re> {
219 fn wake_by_ref(weak_self: &Weak<Self>) {
220 if let Some(arc_self) = weak_self.upgrade() {
221 RawExecutor::wake(&arc_self);
222 }
223 }
224}
225
226impl<Re: Reactor> Drop for RawExecutor<Re> {
227 fn drop(&mut self) {
228 let final_future = self.reactor.on_executor_drop();
229
230 let waker = noop_waker();
231 let mut cx = Context::from_waker(&waker);
232 if let Err(e) = self.run_internal(&mut cx, final_future) {
233 warn!("Failed to drive RawExecutor to completion: {}", e);
234 }
235 }
236}
237
238pub struct RawTaskHandle<Re: Reactor + 'static, R> {
239 task: Task<R>,
240 raw: Weak<RawExecutor<Re>>,
241}
242
243impl<Re: Reactor, R: Send + 'static> RawTaskHandle<Re, R> {
244 pub fn detach(self) {
245 if let Some(raw) = self.raw.upgrade() {
246 raw.detached_tasks.lock().push(self.task);
247 }
248 }
249
250 pub async fn cancel(self) -> Option<R> {
251 self.task.cancel().await
252 }
253}
254
255impl<Re: Reactor, R: 'static> Future for RawTaskHandle<Re, R> {
256 type Output = R;
257
258 fn poll(
259 mut self: std::pin::Pin<&mut Self>,
260 cx: &mut std::task::Context,
261 ) -> std::task::Poll<Self::Output> {
262 Pin::new(&mut self.task).poll(cx)
263 }
264}