1use std::future::Future;
6use std::io;
7use std::mem;
8use std::os::fd::AsRawFd;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::sync::Weak;
12use std::task::Context;
13use std::task::Poll;
14use std::task::Waker;
15
16use base::add_fd_flags;
17use base::warn;
18use base::AsRawDescriptor;
19use base::AsRawDescriptors;
20use base::Event;
21use base::EventType;
22use base::RawDescriptor;
23use base::WaitContext;
24use remain::sorted;
25use slab::Slab;
26use sync::Mutex;
27use thiserror::Error as ThisError;
28
29use crate::common_executor::RawExecutor;
30use crate::common_executor::RawTaskHandle;
31use crate::common_executor::Reactor;
32use crate::waker::WakerToken;
33use crate::AsyncResult;
34use crate::IoSource;
35use crate::TaskHandle;
36
37#[sorted]
38#[derive(Debug, ThisError)]
39pub enum Error {
40 #[error("Couldn't clear the wake eventfd")]
41 CantClearWakeEvent(base::Error),
42 #[error("Failed to clone the Event for waking the executor: {0}")]
44 CloneEvent(base::Error),
45 #[error("Failed to create the Event for waking the executor: {0}")]
47 CreateEvent(base::Error),
48 #[error("An error creating the fd waiting context: {0}")]
50 CreatingContext(base::Error),
51 #[error("Failed to copy the FD for the polling context: {0}")]
53 DuplicatingFd(std::io::Error),
54 #[error("Executor failed")]
55 ExecutorError(anyhow::Error),
56 #[error("The FDExecutor is gone")]
58 ExecutorGone,
59 #[error("An error occurred setting the FD non-blocking: {0}.")]
61 SettingNonBlocking(base::Error),
62 #[error("An error adding to the Aio context: {0}")]
64 SubmittingWaker(base::Error),
65 #[error("Unknown waker")]
67 UnknownWaker,
68 #[error("WaitContext failure: {0}")]
70 WaitContextError(base::Error),
71}
72pub type Result<T> = std::result::Result<T, Error>;
73
74impl From<Error> for io::Error {
75 fn from(e: Error) -> Self {
76 use Error::*;
77 match e {
78 CantClearWakeEvent(e) => e.into(),
79 CloneEvent(e) => e.into(),
80 CreateEvent(e) => e.into(),
81 DuplicatingFd(e) => e,
82 ExecutorError(e) => io::Error::other(e),
83 ExecutorGone => io::Error::other(e),
84 CreatingContext(e) => e.into(),
85 SettingNonBlocking(e) => e.into(),
86 SubmittingWaker(e) => e.into(),
87 UnknownWaker => io::Error::other(e),
88 WaitContextError(e) => e.into(),
89 }
90 }
91}
92
93struct OpData {
95 file: Arc<std::os::fd::OwnedFd>,
96 waker: Option<Waker>,
97}
98
99enum OpStatus {
101 Pending(OpData),
102 Completed,
103 WakeEvent,
105}
106
107pub struct RegisteredSource<F> {
110 pub(crate) source: F,
111 ex: Weak<RawExecutor<EpollReactor>>,
112 pub(crate) duped_fd: Arc<std::os::fd::OwnedFd>,
116}
117
118impl<F: AsRawDescriptor> RegisteredSource<F> {
119 pub(crate) fn new(raw: &Arc<RawExecutor<EpollReactor>>, f: F) -> Result<Self> {
120 let raw_fd = f.as_raw_descriptor();
121 assert_ne!(raw_fd, -1);
122
123 add_fd_flags(raw_fd, libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
124
125 let duped_fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(raw_fd) }
128 .try_clone_to_owned()
129 .map_err(Error::DuplicatingFd)?;
130 Ok(RegisteredSource {
131 source: f,
132 ex: Arc::downgrade(raw),
133 duped_fd: Arc::new(duped_fd),
134 })
135 }
136
137 pub fn wait_readable(&self) -> Result<PendingOperation> {
140 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
141
142 let token = ex
143 .reactor
144 .add_operation(Arc::clone(&self.duped_fd), EventType::Read)?;
145
146 Ok(PendingOperation {
147 token: Some(token),
148 ex: self.ex.clone(),
149 })
150 }
151
152 pub fn wait_writable(&self) -> Result<PendingOperation> {
155 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
156
157 let token = ex
158 .reactor
159 .add_operation(Arc::clone(&self.duped_fd), EventType::Write)?;
160
161 Ok(PendingOperation {
162 token: Some(token),
163 ex: self.ex.clone(),
164 })
165 }
166}
167
168pub struct PendingOperation {
172 token: Option<WakerToken>,
173 ex: Weak<RawExecutor<EpollReactor>>,
174}
175
176impl Future for PendingOperation {
177 type Output = Result<()>;
178
179 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
180 let token = self
181 .token
182 .as_ref()
183 .expect("PendingOperation polled after returning Poll::Ready");
184 if let Some(ex) = self.ex.upgrade() {
185 if ex.reactor.is_ready(token, cx) {
186 self.token = None;
187 Poll::Ready(Ok(()))
188 } else {
189 Poll::Pending
190 }
191 } else {
192 Poll::Ready(Err(Error::ExecutorGone))
193 }
194 }
195}
196
197impl Drop for PendingOperation {
198 fn drop(&mut self) {
199 if let Some(token) = self.token.take() {
200 if let Some(ex) = self.ex.upgrade() {
201 let _ = ex.reactor.cancel_operation(token);
202 }
203 }
204 }
205}
206
207pub struct EpollReactor {
209 poll_ctx: WaitContext<usize>,
210 ops: Mutex<Slab<OpStatus>>,
211 wake_event: Event,
215}
216
217impl EpollReactor {
218 fn new() -> Result<Self> {
219 let reactor = EpollReactor {
220 poll_ctx: WaitContext::new().map_err(Error::CreatingContext)?,
221 ops: Mutex::new(Slab::with_capacity(64)),
222 wake_event: {
223 let wake_event = Event::new().map_err(Error::CreateEvent)?;
224 add_fd_flags(wake_event.as_raw_descriptor(), libc::O_NONBLOCK)
225 .map_err(Error::SettingNonBlocking)?;
226 wake_event
227 },
228 };
229
230 {
232 let mut ops = reactor.ops.lock();
233 let entry = ops.vacant_entry();
234 let next_token = entry.key();
235 reactor
236 .poll_ctx
237 .add_for_event(&reactor.wake_event, EventType::Read, next_token)
238 .map_err(Error::SubmittingWaker)?;
239 entry.insert(OpStatus::WakeEvent);
240 }
241
242 Ok(reactor)
243 }
244
245 fn add_operation(
246 &self,
247 file: Arc<std::os::fd::OwnedFd>,
248 event_type: EventType,
249 ) -> Result<WakerToken> {
250 let mut ops = self.ops.lock();
251 let entry = ops.vacant_entry();
252 let next_token = entry.key();
253 self.poll_ctx
254 .add_for_event(&base::Descriptor(file.as_raw_fd()), event_type, next_token)
255 .map_err(Error::SubmittingWaker)?;
256 entry.insert(OpStatus::Pending(OpData { file, waker: None }));
257 Ok(WakerToken(next_token))
258 }
259
260 fn is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool {
261 let mut ops = self.ops.lock();
262
263 let op = ops
264 .get_mut(token.0)
265 .expect("`is_ready` called on unknown operation");
266 match op {
267 OpStatus::Pending(data) => {
268 data.waker = Some(cx.waker().clone());
269 false
270 }
271 OpStatus::Completed => {
272 ops.remove(token.0);
273 true
274 }
275 OpStatus::WakeEvent => unreachable!(),
277 }
278 }
279
280 fn cancel_operation(&self, token: WakerToken) -> Result<()> {
282 match self.ops.lock().remove(token.0) {
283 OpStatus::Pending(data) => self
284 .poll_ctx
285 .delete(&base::Descriptor(data.file.as_raw_fd()))
286 .map_err(Error::WaitContextError),
287 OpStatus::Completed => Ok(()),
288 OpStatus::WakeEvent => unreachable!(),
290 }
291 }
292}
293
294impl Reactor for EpollReactor {
295 fn new() -> std::io::Result<Self> {
296 Ok(EpollReactor::new()?)
297 }
298
299 fn wake(&self) {
300 if let Err(e) = self.wake_event.signal() {
301 warn!("Failed to notify executor that a future is ready: {}", e);
302 }
303 }
304
305 fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
306 for op in self.ops.lock().drain() {
313 match op {
314 OpStatus::Pending(mut data) => {
315 if let Some(waker) = data.waker.take() {
316 waker.wake();
317 }
318
319 if let Err(e) = self
320 .poll_ctx
321 .delete(&base::Descriptor(data.file.as_raw_fd()))
322 {
323 warn!("Failed to remove file from EpollCtx: {}", e);
324 }
325 }
326 OpStatus::Completed => {}
327 OpStatus::WakeEvent => {}
328 }
329 }
330
331 Box::pin(async {})
333 }
334
335 fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {
336 let events = self.poll_ctx.wait().map_err(Error::WaitContextError)?;
337
338 set_processing();
341 for e in events.iter() {
342 let token = e.token;
343 let mut ops = self.ops.lock();
344
345 if let Some(op) = ops.get_mut(token) {
348 let (file, waker) = match mem::replace(op, OpStatus::Completed) {
349 OpStatus::Pending(OpData { file, waker }) => (file, waker),
350 OpStatus::Completed => panic!("poll operation completed more than once"),
351 OpStatus::WakeEvent => {
352 *op = OpStatus::WakeEvent;
353 match self.wake_event.wait() {
354 Ok(_) => {}
355 Err(e) if e.errno() == libc::EWOULDBLOCK => {}
356 Err(e) => return Err(e.into()),
357 }
358 continue;
359 }
360 };
361
362 mem::drop(ops);
363
364 self.poll_ctx
365 .delete(&base::Descriptor(file.as_raw_fd()))
366 .map_err(Error::WaitContextError)?;
367
368 if let Some(waker) = waker {
369 waker.wake();
370 }
371 }
372 }
373 Ok(())
374 }
375
376 fn new_source<F: AsRawDescriptor>(
377 &self,
378 ex: &Arc<RawExecutor<Self>>,
379 f: F,
380 ) -> AsyncResult<IoSource<F>> {
381 Ok(IoSource::Epoll(super::PollSource::new(f, ex)?))
382 }
383
384 fn wrap_task_handle<R>(task: RawTaskHandle<EpollReactor, R>) -> TaskHandle<R> {
385 TaskHandle::Fd(task)
386 }
387}
388
389impl AsRawDescriptors for EpollReactor {
390 fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
391 vec![
392 self.poll_ctx.as_raw_descriptor(),
393 self.wake_event.as_raw_descriptor(),
394 ]
395 }
396}
397
398#[cfg(test)]
399mod test {
400 use std::cell::RefCell;
401 use std::fs::File;
402 use std::io::Read;
403 use std::io::Write;
404 use std::rc::Rc;
405
406 use futures::future::Either;
407
408 use super::*;
409 use crate::BlockingPool;
410 use crate::ExecutorTrait;
411
412 #[test]
413 fn test_it() {
414 async fn do_test(ex: &Arc<RawExecutor<EpollReactor>>) {
415 let (r, _w) = base::pipe().unwrap();
416 let done = Box::pin(async { 5usize });
417 let source = RegisteredSource::new(ex, r).unwrap();
418 let pending = source.wait_readable().unwrap();
419 match futures::future::select(pending, done).await {
420 Either::Right((5, pending)) => std::mem::drop(pending),
421 _ => panic!("unexpected select result"),
422 }
423 }
424
425 let ex = RawExecutor::<EpollReactor>::new().unwrap();
426 ex.run_until(do_test(&ex)).unwrap();
427
428 async fn my_async(x: Rc<RefCell<u64>>) {
430 x.replace(4);
431 }
432
433 let x = Rc::new(RefCell::new(0));
434 {
435 let ex = RawExecutor::<EpollReactor>::new().unwrap();
436 ex.run_until(my_async(x.clone())).unwrap();
437 }
438 assert_eq!(*x.borrow(), 4);
439 }
440
441 #[test]
442 fn drop_before_completion() {
443 const VALUE: u64 = 0x66ae_cb65_12fb_d260;
444
445 async fn write_value(mut tx: File) {
446 let buf = VALUE.to_ne_bytes();
447 tx.write_all(&buf[..]).expect("Failed to write to pipe");
448 }
449
450 async fn check_op(op: PendingOperation) {
451 let err = op.await.expect_err("Task completed successfully");
452 match err {
453 Error::ExecutorGone => {}
454 e => panic!("Unexpected error from task: {e}"),
455 }
456 }
457
458 let (mut rx, tx) = base::pipe().expect("Pipe failed");
459
460 let ex = RawExecutor::<EpollReactor>::new().unwrap();
461
462 let source = RegisteredSource::new(&ex, tx.try_clone().unwrap()).unwrap();
463 let op = source.wait_writable().unwrap();
464
465 ex.spawn_local(write_value(tx)).detach();
466 ex.spawn_local(check_op(op)).detach();
467
468 mem::drop(ex);
470
471 let mut buf = 0u64.to_ne_bytes();
472 rx.read_exact(&mut buf[..])
473 .expect("Failed to read from pipe");
474
475 assert_eq!(u64::from_ne_bytes(buf), VALUE);
476 }
477
478 #[test]
480 fn drop_detached_blocking_pool() {
481 struct Cleanup(BlockingPool);
482
483 impl Drop for Cleanup {
484 fn drop(&mut self) {
485 self.0
487 .shutdown(Some(
488 std::time::Instant::now() + std::time::Duration::from_secs(1),
489 ))
490 .unwrap();
491 }
492 }
493
494 let rc = Rc::new(std::cell::Cell::new(0));
495 {
496 let ex = RawExecutor::<EpollReactor>::new().unwrap();
497 let rc_clone = rc.clone();
498 ex.spawn_local(async move {
499 rc_clone.set(1);
500 let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
501 let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
502 let blocking_task = pool.0.spawn(move || {
504 assert_eq!(recv.recv(), Ok(()));
506 assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
508 });
509 send.send(()).unwrap();
514 blocking_task.await;
516 rc_clone.set(2);
517 })
518 .detach();
519 ex.run_until(async {}).unwrap();
520 }
525 assert_eq!(rc.get(), 1);
526 Rc::try_unwrap(rc).expect("Rc had too many refs");
527 }
528
529 #[test]
532 fn test_non_io_waker() {
533 use std::task::Poll;
534
535 struct Sleep(Option<u64>);
536
537 impl Future for Sleep {
538 type Output = ();
539
540 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
541 if let Some(ms) = self.0.take() {
542 let waker = cx.waker().clone();
543 std::thread::spawn(move || {
544 std::thread::sleep(std::time::Duration::from_millis(ms));
545 waker.wake();
546 });
547 Poll::Pending
548 } else {
549 Poll::Ready(())
550 }
551 }
552 }
553
554 let ex = RawExecutor::<EpollReactor>::new().unwrap();
555 ex.run_until(async move {
556 Sleep(Some(1)).await;
558 Sleep(Some(1)).await;
559 })
560 .unwrap();
561 }
562}