cros_async/sys/linux/
fd_executor.rs

1// Copyright 2020 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::future::Future;
6use std::io;
7use std::mem;
8use std::os::fd::AsRawFd;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::sync::Weak;
12use std::task::Context;
13use std::task::Poll;
14use std::task::Waker;
15
16use base::add_fd_flags;
17use base::warn;
18use base::AsRawDescriptor;
19use base::AsRawDescriptors;
20use base::Event;
21use base::EventType;
22use base::RawDescriptor;
23use base::WaitContext;
24use remain::sorted;
25use slab::Slab;
26use sync::Mutex;
27use thiserror::Error as ThisError;
28
29use crate::common_executor::RawExecutor;
30use crate::common_executor::RawTaskHandle;
31use crate::common_executor::Reactor;
32use crate::waker::WakerToken;
33use crate::AsyncResult;
34use crate::IoSource;
35use crate::TaskHandle;
36
37#[sorted]
38#[derive(Debug, ThisError)]
39pub enum Error {
40    #[error("Couldn't clear the wake eventfd")]
41    CantClearWakeEvent(base::Error),
42    /// Failed to clone the Event for waking the executor.
43    #[error("Failed to clone the Event for waking the executor: {0}")]
44    CloneEvent(base::Error),
45    /// Failed to create the Event for waking the executor.
46    #[error("Failed to create the Event for waking the executor: {0}")]
47    CreateEvent(base::Error),
48    /// Creating a context to wait on FDs failed.
49    #[error("An error creating the fd waiting context: {0}")]
50    CreatingContext(base::Error),
51    /// Failed to copy the FD for the polling context.
52    #[error("Failed to copy the FD for the polling context: {0}")]
53    DuplicatingFd(std::io::Error),
54    #[error("Executor failed")]
55    ExecutorError(anyhow::Error),
56    /// The Executor is gone.
57    #[error("The FDExecutor is gone")]
58    ExecutorGone,
59    /// An error occurred when setting the FD non-blocking.
60    #[error("An error occurred setting the FD non-blocking: {0}.")]
61    SettingNonBlocking(base::Error),
62    /// Failed to submit the waker to the polling context.
63    #[error("An error adding to the Aio context: {0}")]
64    SubmittingWaker(base::Error),
65    /// A Waker was canceled, but the operation isn't running.
66    #[error("Unknown waker")]
67    UnknownWaker,
68    /// WaitContext failure.
69    #[error("WaitContext failure: {0}")]
70    WaitContextError(base::Error),
71}
72pub type Result<T> = std::result::Result<T, Error>;
73
74impl From<Error> for io::Error {
75    fn from(e: Error) -> Self {
76        use Error::*;
77        match e {
78            CantClearWakeEvent(e) => e.into(),
79            CloneEvent(e) => e.into(),
80            CreateEvent(e) => e.into(),
81            DuplicatingFd(e) => e,
82            ExecutorError(e) => io::Error::other(e),
83            ExecutorGone => io::Error::other(e),
84            CreatingContext(e) => e.into(),
85            SettingNonBlocking(e) => e.into(),
86            SubmittingWaker(e) => e.into(),
87            UnknownWaker => io::Error::other(e),
88            WaitContextError(e) => e.into(),
89        }
90    }
91}
92
93// A poll operation that has been submitted and is potentially being waited on.
94struct OpData {
95    file: Arc<std::os::fd::OwnedFd>,
96    waker: Option<Waker>,
97}
98
99// The current status of a submitted operation.
100enum OpStatus {
101    Pending(OpData),
102    Completed,
103    // Special status that identifies the "wake up" eventfd, which is essentially always pending.
104    WakeEvent,
105}
106
107// An IO source previously registered with an EpollReactor. Used to initiate asynchronous IO with
108// the associated executor.
109pub struct RegisteredSource<F> {
110    pub(crate) source: F,
111    ex: Weak<RawExecutor<EpollReactor>>,
112    /// A clone of `source`'s underlying FD. Allows us to ensure that the FD isn't closed during
113    /// the epoll wait call. There are well defined sematics for closing an FD in an epoll context
114    /// so it might be possible to eliminate this dup if someone thinks hard about it.
115    pub(crate) duped_fd: Arc<std::os::fd::OwnedFd>,
116}
117
118impl<F: AsRawDescriptor> RegisteredSource<F> {
119    pub(crate) fn new(raw: &Arc<RawExecutor<EpollReactor>>, f: F) -> Result<Self> {
120        let raw_fd = f.as_raw_descriptor();
121        assert_ne!(raw_fd, -1);
122
123        add_fd_flags(raw_fd, libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
124
125        // SAFETY: The FD is open for the duration of the BorrowedFd lifetime (this line) and not
126        // -1 (checked above).
127        let duped_fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(raw_fd) }
128            .try_clone_to_owned()
129            .map_err(Error::DuplicatingFd)?;
130        Ok(RegisteredSource {
131            source: f,
132            ex: Arc::downgrade(raw),
133            duped_fd: Arc::new(duped_fd),
134        })
135    }
136
137    // Start an asynchronous operation to wait for this source to become readable. The returned
138    // future will not be ready until the source is readable.
139    pub fn wait_readable(&self) -> Result<PendingOperation> {
140        let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
141
142        let token = ex
143            .reactor
144            .add_operation(Arc::clone(&self.duped_fd), EventType::Read)?;
145
146        Ok(PendingOperation {
147            token: Some(token),
148            ex: self.ex.clone(),
149        })
150    }
151
152    // Start an asynchronous operation to wait for this source to become writable. The returned
153    // future will not be ready until the source is writable.
154    pub fn wait_writable(&self) -> Result<PendingOperation> {
155        let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
156
157        let token = ex
158            .reactor
159            .add_operation(Arc::clone(&self.duped_fd), EventType::Write)?;
160
161        Ok(PendingOperation {
162            token: Some(token),
163            ex: self.ex.clone(),
164        })
165    }
166}
167
168/// A token returned from `add_operation` that can be used to cancel the waker before it completes.
169/// Used to manage getting the result from the underlying executor for a completed operation.
170/// Dropping a `PendingOperation` will get the result from the executor.
171pub struct PendingOperation {
172    token: Option<WakerToken>,
173    ex: Weak<RawExecutor<EpollReactor>>,
174}
175
176impl Future for PendingOperation {
177    type Output = Result<()>;
178
179    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
180        let token = self
181            .token
182            .as_ref()
183            .expect("PendingOperation polled after returning Poll::Ready");
184        if let Some(ex) = self.ex.upgrade() {
185            if ex.reactor.is_ready(token, cx) {
186                self.token = None;
187                Poll::Ready(Ok(()))
188            } else {
189                Poll::Pending
190            }
191        } else {
192            Poll::Ready(Err(Error::ExecutorGone))
193        }
194    }
195}
196
197impl Drop for PendingOperation {
198    fn drop(&mut self) {
199        if let Some(token) = self.token.take() {
200            if let Some(ex) = self.ex.upgrade() {
201                let _ = ex.reactor.cancel_operation(token);
202            }
203        }
204    }
205}
206
207/// `Reactor` that manages async IO work using epoll.
208pub struct EpollReactor {
209    poll_ctx: WaitContext<usize>,
210    ops: Mutex<Slab<OpStatus>>,
211    // This event is always present in `poll_ctx` with the special op status `WakeEvent`. It is
212    // used by `RawExecutor::wake` to break other threads out of `poll_ctx.wait()` calls (usually
213    // to notify them that `queue` has new work).
214    wake_event: Event,
215}
216
217impl EpollReactor {
218    fn new() -> Result<Self> {
219        let reactor = EpollReactor {
220            poll_ctx: WaitContext::new().map_err(Error::CreatingContext)?,
221            ops: Mutex::new(Slab::with_capacity(64)),
222            wake_event: {
223                let wake_event = Event::new().map_err(Error::CreateEvent)?;
224                add_fd_flags(wake_event.as_raw_descriptor(), libc::O_NONBLOCK)
225                    .map_err(Error::SettingNonBlocking)?;
226                wake_event
227            },
228        };
229
230        // Add the special "wake up" op.
231        {
232            let mut ops = reactor.ops.lock();
233            let entry = ops.vacant_entry();
234            let next_token = entry.key();
235            reactor
236                .poll_ctx
237                .add_for_event(&reactor.wake_event, EventType::Read, next_token)
238                .map_err(Error::SubmittingWaker)?;
239            entry.insert(OpStatus::WakeEvent);
240        }
241
242        Ok(reactor)
243    }
244
245    fn add_operation(
246        &self,
247        file: Arc<std::os::fd::OwnedFd>,
248        event_type: EventType,
249    ) -> Result<WakerToken> {
250        let mut ops = self.ops.lock();
251        let entry = ops.vacant_entry();
252        let next_token = entry.key();
253        self.poll_ctx
254            .add_for_event(&base::Descriptor(file.as_raw_fd()), event_type, next_token)
255            .map_err(Error::SubmittingWaker)?;
256        entry.insert(OpStatus::Pending(OpData { file, waker: None }));
257        Ok(WakerToken(next_token))
258    }
259
260    fn is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool {
261        let mut ops = self.ops.lock();
262
263        let op = ops
264            .get_mut(token.0)
265            .expect("`is_ready` called on unknown operation");
266        match op {
267            OpStatus::Pending(data) => {
268                data.waker = Some(cx.waker().clone());
269                false
270            }
271            OpStatus::Completed => {
272                ops.remove(token.0);
273                true
274            }
275            // unreachable because we never create a WakerToken for `wake_event`.
276            OpStatus::WakeEvent => unreachable!(),
277        }
278    }
279
280    // Remove the waker for the given token if it hasn't fired yet.
281    fn cancel_operation(&self, token: WakerToken) -> Result<()> {
282        match self.ops.lock().remove(token.0) {
283            OpStatus::Pending(data) => self
284                .poll_ctx
285                .delete(&base::Descriptor(data.file.as_raw_fd()))
286                .map_err(Error::WaitContextError),
287            OpStatus::Completed => Ok(()),
288            // unreachable because we never create a WakerToken for `wake_event`.
289            OpStatus::WakeEvent => unreachable!(),
290        }
291    }
292}
293
294impl Reactor for EpollReactor {
295    fn new() -> std::io::Result<Self> {
296        Ok(EpollReactor::new()?)
297    }
298
299    fn wake(&self) {
300        if let Err(e) = self.wake_event.signal() {
301            warn!("Failed to notify executor that a future is ready: {}", e);
302        }
303    }
304
305    fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
306        // At this point, there are no strong references to the executor (see `on_executor_drop`
307        // docs). That means all the `RegisteredSource::ex` will fail to upgrade and so no more IO
308        // work can be submitted.
309
310        // Wake up any futures still waiting on poll operations as they are just going to get an
311        // ExecutorGone error now.
312        for op in self.ops.lock().drain() {
313            match op {
314                OpStatus::Pending(mut data) => {
315                    if let Some(waker) = data.waker.take() {
316                        waker.wake();
317                    }
318
319                    if let Err(e) = self
320                        .poll_ctx
321                        .delete(&base::Descriptor(data.file.as_raw_fd()))
322                    {
323                        warn!("Failed to remove file from EpollCtx: {}", e);
324                    }
325                }
326                OpStatus::Completed => {}
327                OpStatus::WakeEvent => {}
328            }
329        }
330
331        // Now run the executor one more time to drive any remaining futures to completion.
332        Box::pin(async {})
333    }
334
335    fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {
336        let events = self.poll_ctx.wait().map_err(Error::WaitContextError)?;
337
338        // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
339        // writing to the eventfd.
340        set_processing();
341        for e in events.iter() {
342            let token = e.token;
343            let mut ops = self.ops.lock();
344
345            // The op could have been canceled and removed by another thread so ignore it if it
346            // doesn't exist.
347            if let Some(op) = ops.get_mut(token) {
348                let (file, waker) = match mem::replace(op, OpStatus::Completed) {
349                    OpStatus::Pending(OpData { file, waker }) => (file, waker),
350                    OpStatus::Completed => panic!("poll operation completed more than once"),
351                    OpStatus::WakeEvent => {
352                        *op = OpStatus::WakeEvent;
353                        match self.wake_event.wait() {
354                            Ok(_) => {}
355                            Err(e) if e.errno() == libc::EWOULDBLOCK => {}
356                            Err(e) => return Err(e.into()),
357                        }
358                        continue;
359                    }
360                };
361
362                mem::drop(ops);
363
364                self.poll_ctx
365                    .delete(&base::Descriptor(file.as_raw_fd()))
366                    .map_err(Error::WaitContextError)?;
367
368                if let Some(waker) = waker {
369                    waker.wake();
370                }
371            }
372        }
373        Ok(())
374    }
375
376    fn new_source<F: AsRawDescriptor>(
377        &self,
378        ex: &Arc<RawExecutor<Self>>,
379        f: F,
380    ) -> AsyncResult<IoSource<F>> {
381        Ok(IoSource::Epoll(super::PollSource::new(f, ex)?))
382    }
383
384    fn wrap_task_handle<R>(task: RawTaskHandle<EpollReactor, R>) -> TaskHandle<R> {
385        TaskHandle::Fd(task)
386    }
387}
388
389impl AsRawDescriptors for EpollReactor {
390    fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
391        vec![
392            self.poll_ctx.as_raw_descriptor(),
393            self.wake_event.as_raw_descriptor(),
394        ]
395    }
396}
397
398#[cfg(test)]
399mod test {
400    use std::cell::RefCell;
401    use std::fs::File;
402    use std::io::Read;
403    use std::io::Write;
404    use std::rc::Rc;
405
406    use futures::future::Either;
407
408    use super::*;
409    use crate::BlockingPool;
410    use crate::ExecutorTrait;
411
412    #[test]
413    fn test_it() {
414        async fn do_test(ex: &Arc<RawExecutor<EpollReactor>>) {
415            let (r, _w) = base::pipe().unwrap();
416            let done = Box::pin(async { 5usize });
417            let source = RegisteredSource::new(ex, r).unwrap();
418            let pending = source.wait_readable().unwrap();
419            match futures::future::select(pending, done).await {
420                Either::Right((5, pending)) => std::mem::drop(pending),
421                _ => panic!("unexpected select result"),
422            }
423        }
424
425        let ex = RawExecutor::<EpollReactor>::new().unwrap();
426        ex.run_until(do_test(&ex)).unwrap();
427
428        // Example of starting the framework and running a future:
429        async fn my_async(x: Rc<RefCell<u64>>) {
430            x.replace(4);
431        }
432
433        let x = Rc::new(RefCell::new(0));
434        {
435            let ex = RawExecutor::<EpollReactor>::new().unwrap();
436            ex.run_until(my_async(x.clone())).unwrap();
437        }
438        assert_eq!(*x.borrow(), 4);
439    }
440
441    #[test]
442    fn drop_before_completion() {
443        const VALUE: u64 = 0x66ae_cb65_12fb_d260;
444
445        async fn write_value(mut tx: File) {
446            let buf = VALUE.to_ne_bytes();
447            tx.write_all(&buf[..]).expect("Failed to write to pipe");
448        }
449
450        async fn check_op(op: PendingOperation) {
451            let err = op.await.expect_err("Task completed successfully");
452            match err {
453                Error::ExecutorGone => {}
454                e => panic!("Unexpected error from task: {e}"),
455            }
456        }
457
458        let (mut rx, tx) = base::pipe().expect("Pipe failed");
459
460        let ex = RawExecutor::<EpollReactor>::new().unwrap();
461
462        let source = RegisteredSource::new(&ex, tx.try_clone().unwrap()).unwrap();
463        let op = source.wait_writable().unwrap();
464
465        ex.spawn_local(write_value(tx)).detach();
466        ex.spawn_local(check_op(op)).detach();
467
468        // Now drop the executor. It should still run until the write to the pipe is complete.
469        mem::drop(ex);
470
471        let mut buf = 0u64.to_ne_bytes();
472        rx.read_exact(&mut buf[..])
473            .expect("Failed to read from pipe");
474
475        assert_eq!(u64::from_ne_bytes(buf), VALUE);
476    }
477
478    // Dropping a task that owns a BlockingPool shouldn't leak the pool.
479    #[test]
480    fn drop_detached_blocking_pool() {
481        struct Cleanup(BlockingPool);
482
483        impl Drop for Cleanup {
484            fn drop(&mut self) {
485                // Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).
486                self.0
487                    .shutdown(Some(
488                        std::time::Instant::now() + std::time::Duration::from_secs(1),
489                    ))
490                    .unwrap();
491            }
492        }
493
494        let rc = Rc::new(std::cell::Cell::new(0));
495        {
496            let ex = RawExecutor::<EpollReactor>::new().unwrap();
497            let rc_clone = rc.clone();
498            ex.spawn_local(async move {
499                rc_clone.set(1);
500                let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
501                let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
502                // Spawn a blocking task.
503                let blocking_task = pool.0.spawn(move || {
504                    // Rendezvous.
505                    assert_eq!(recv.recv(), Ok(()));
506                    // Wait for drop.
507                    assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
508                });
509                // Make sure it has actually started (using a "rendezvous channel" send).
510                //
511                // Without this step, we'll have a race where we can shutdown the blocking pool
512                // before the worker thread pops off the task.
513                send.send(()).unwrap();
514                // Wait for it to finish
515                blocking_task.await;
516                rc_clone.set(2);
517            })
518            .detach();
519            ex.run_until(async {}).unwrap();
520            // `ex` is dropped here. If everything is working as expected, it should drop all of
521            // its tasks, including `send` and `pool` (in that order, which is important). `pool`'s
522            // `Drop` impl will try to join all the worker threads, which should work because send
523            // half of the channel closed.
524        }
525        assert_eq!(rc.get(), 1);
526        Rc::try_unwrap(rc).expect("Rc had too many refs");
527    }
528
529    // Test the waker implementation. This code path doesn't get hit by `IoSource`, only by backend
530    // agnostic libraries, like `BlockingPool` and `futures::channel`.
531    #[test]
532    fn test_non_io_waker() {
533        use std::task::Poll;
534
535        struct Sleep(Option<u64>);
536
537        impl Future for Sleep {
538            type Output = ();
539
540            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
541                if let Some(ms) = self.0.take() {
542                    let waker = cx.waker().clone();
543                    std::thread::spawn(move || {
544                        std::thread::sleep(std::time::Duration::from_millis(ms));
545                        waker.wake();
546                    });
547                    Poll::Pending
548                } else {
549                    Poll::Ready(())
550                }
551            }
552        }
553
554        let ex = RawExecutor::<EpollReactor>::new().unwrap();
555        ex.run_until(async move {
556            // Test twice because there was once a bug where the second time panic'd.
557            Sleep(Some(1)).await;
558            Sleep(Some(1)).await;
559        })
560        .unwrap();
561    }
562}