cros_async/
lib.rs

1// Copyright 2020 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An Executor and future combinators based on operations that block on file descriptors.
6//!
7//! This crate is meant to be used with the `futures-rs` crate that provides further combinators
8//! and utility functions to combine and manage futures. All futures will run until they block on a
9//! file descriptor becoming readable or writable. Facilities are provided to register future
10//! wakers based on such events.
11//!
12//! # Running top-level futures.
13//!
14//! Use helper functions based the desired behavior of your application.
15//!
16//! ## Completing one of several futures.
17//!
18//! If there are several top level tasks that should run until any one completes, use the "select"
19//! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
20//! function will return when the first future completes. The uncompleted futures will also be
21//! returned so they can be run further or otherwise cleaned up. These functions are inspired by
22//! the `select_all` function from futures-rs, but built to be run inside an FD based executor and
23//! to poll only when necessary. See the docs for [`select2`](fn.select2.html),
24//! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html).
25//!
26//! ## Completing all of several futures.
27//!
28//! If there are several top level tasks that all need to be completed, use the "complete" family
29//! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
30//! function will return only once all the futures passed to it have completed. These functions are
31//! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based
32//! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html),
33//! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and
34//! [`complete5`](fn.complete5.html).
35//!
36//! # Implementing new FD-based futures.
37//!
38//! For URing implementations should provide an implementation of the `IoSource` trait.
39//! For the FD executor, new futures can use the existing ability to poll a source to build async
40//! functionality on top of.
41//!
42//! # Implementations
43//!
44//! Currently there are two paths for using the asynchronous IO. One uses a WaitContext and drives
45//! futures based on the FDs signaling they are ready for the opteration. This method will exist so
46//! long as kernels < 5.4 are supported.
47//! The other method submits operations to io_uring and is signaled when they complete. This is more
48//! efficient, but only supported on kernel 5.4+.
49//! If `IoSource::new` is used to interface with async IO, then the correct backend will be chosen
50//! automatically.
51//!
52//! # Examples
53//!
54//! See the docs for `IoSource` if support for kernels <5.4 is required. Focus on `UringSource` if
55//! all systems have support for io_uring.
56
57mod async_types;
58pub mod audio_streams_async;
59mod blocking;
60mod common_executor;
61mod complete;
62mod event;
63mod executor;
64mod io_ext;
65mod io_source;
66pub mod mem;
67mod queue;
68mod select;
69pub mod sync;
70pub mod sys;
71mod timer;
72#[cfg(feature = "tokio")]
73mod tokio_executor;
74mod waker;
75
76use std::future::Future;
77use std::pin::Pin;
78use std::task::Poll;
79
80pub use async_types::*;
81pub use base::Event;
82#[cfg(any(target_os = "android", target_os = "linux"))]
83pub use blocking::sys::linux::block_on::block_on;
84pub use blocking::unblock;
85pub use blocking::unblock_disarm;
86pub use blocking::BlockingPool;
87pub use blocking::CancellableBlockingPool;
88pub use blocking::TimeoutAction;
89pub use event::EventAsync;
90pub use executor::Executor;
91pub use executor::ExecutorKind;
92pub(crate) use executor::ExecutorTrait;
93pub use executor::TaskHandle;
94#[cfg(windows)]
95pub use futures::executor::block_on;
96use futures::stream::FuturesUnordered;
97pub use io_ext::AsyncError;
98pub use io_ext::AsyncResult;
99pub use io_ext::AsyncWrapper;
100pub use io_ext::IntoAsync;
101pub use io_source::IoSource;
102pub use mem::BackingMemory;
103pub use mem::MemRegion;
104pub use mem::MemRegionIter;
105pub use mem::VecIoWrapper;
106use remain::sorted;
107pub use select::SelectResult;
108#[cfg(any(target_os = "android", target_os = "linux"))]
109pub use sys::linux::uring_executor::is_uring_stable;
110use thiserror::Error as ThisError;
111pub use timer::TimerAsync;
112
113#[sorted]
114#[derive(ThisError, Debug)]
115pub enum Error {
116    /// Error from EventAsync
117    #[error("Failure in EventAsync: {0}")]
118    EventAsync(base::Error),
119    /// Error from the handle executor.
120    #[cfg(windows)]
121    #[error("Failure in the handle executor: {0}")]
122    HandleExecutor(sys::windows::handle_executor::Error),
123    #[error("IO error: {0}")]
124    Io(std::io::Error),
125    /// Error from the polled(FD) source, which includes error from the FD executor.
126    #[cfg(any(target_os = "android", target_os = "linux"))]
127    #[error("An error with a poll source: {0}")]
128    PollSource(sys::linux::poll_source::Error),
129    /// Error from Timer.
130    #[error("Failure in Timer: {0}")]
131    Timer(base::Error),
132    /// Error from TimerFd.
133    #[error("Failure in TimerAsync: {0}")]
134    TimerAsync(AsyncError),
135    /// Error from the uring executor.
136    #[cfg(any(target_os = "android", target_os = "linux"))]
137    #[error("Failure in the uring executor: {0}")]
138    URingExecutor(sys::linux::uring_executor::Error),
139}
140pub type Result<T> = std::result::Result<T, Error>;
141
142/// Heterogeneous collection of `async_task:Task` that are running in a "detached" state.
143///
144/// We keep them around to ensure they are dropped before the executor they are running on.
145pub(crate) struct DetachedTasks(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>);
146
147impl DetachedTasks {
148    pub(crate) fn new() -> Self {
149        DetachedTasks(FuturesUnordered::new())
150    }
151
152    pub(crate) fn push<R: Send + 'static>(&self, task: async_task::Task<R>) {
153        // Convert to fallible, otherwise poll could panic if the `Runnable` is dropped early.
154        let task = task.fallible();
155        self.0.push(Box::pin(async {
156            let _ = task.await;
157        }));
158    }
159
160    /// Polls all the tasks, dropping any that complete.
161    pub(crate) fn poll(&mut self, cx: &mut std::task::Context) {
162        use futures::Stream;
163        while let Poll::Ready(Some(_)) = Pin::new(&mut self.0).poll_next(cx) {}
164    }
165}
166
167// Select helpers to run until any future completes.
168
169/// Creates a combinator that runs the two given futures until one completes, returning a tuple
170/// containing the result of the finished future and the still pending future.
171///
172///  # Example
173///
174///    ```
175///    use cros_async::{SelectResult, select2, block_on};
176///    use futures::future::pending;
177///    use futures::pin_mut;
178///
179///    let first = async {5};
180///    let second = async {let () = pending().await;};
181///    pin_mut!(first);
182///    pin_mut!(second);
183///    match block_on(select2(first, second)) {
184///        (SelectResult::Finished(5), SelectResult::Pending(_second)) => (),
185///        _ => panic!("Select didn't return the first future"),
186///    };
187///    ```
188pub async fn select2<F1: Future + Unpin, F2: Future + Unpin>(
189    f1: F1,
190    f2: F2,
191) -> (SelectResult<F1>, SelectResult<F2>) {
192    select::Select2::new(f1, f2).await
193}
194
195/// Creates a combinator that runs the three given futures until one or more completes, returning a
196/// tuple containing the result of the finished future(s) and the still pending future(s).
197///
198///  # Example
199///
200///    ```
201///    use cros_async::{SelectResult, select3, block_on};
202///    use futures::future::pending;
203///    use futures::pin_mut;
204///
205///    let first = async {4};
206///    let second = async {let () = pending().await;};
207///    let third = async {5};
208///    pin_mut!(first);
209///    pin_mut!(second);
210///    pin_mut!(third);
211///    match block_on(select3(first, second, third)) {
212///        (SelectResult::Finished(4),
213///         SelectResult::Pending(_second),
214///         SelectResult::Finished(5)) => (),
215///        _ => panic!("Select didn't return the futures"),
216///    };
217///    ```
218pub async fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
219    f1: F1,
220    f2: F2,
221    f3: F3,
222) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>) {
223    select::Select3::new(f1, f2, f3).await
224}
225
226/// Creates a combinator that runs the four given futures until one or more completes, returning a
227/// tuple containing the result of the finished future(s) and the still pending future(s).
228///
229///  # Example
230///
231///    ```
232///    use cros_async::{SelectResult, select4, block_on};
233///    use futures::future::pending;
234///    use futures::pin_mut;
235///
236///    let first = async {4};
237///    let second = async {let () = pending().await;};
238///    let third = async {5};
239///    let fourth = async {let () = pending().await;};
240///    pin_mut!(first);
241///    pin_mut!(second);
242///    pin_mut!(third);
243///    pin_mut!(fourth);
244///    match block_on(select4(first, second, third, fourth)) {
245///        (SelectResult::Finished(4), SelectResult::Pending(_second),
246///         SelectResult::Finished(5), SelectResult::Pending(_fourth)) => (),
247///        _ => panic!("Select didn't return the futures"),
248///    };
249///    ```
250pub async fn select4<
251    F1: Future + Unpin,
252    F2: Future + Unpin,
253    F3: Future + Unpin,
254    F4: Future + Unpin,
255>(
256    f1: F1,
257    f2: F2,
258    f3: F3,
259    f4: F4,
260) -> (
261    SelectResult<F1>,
262    SelectResult<F2>,
263    SelectResult<F3>,
264    SelectResult<F4>,
265) {
266    select::Select4::new(f1, f2, f3, f4).await
267}
268
269/// Creates a combinator that runs the five given futures until one or more completes, returning a
270/// tuple containing the result of the finished future(s) and the still pending future(s).
271///
272///  # Example
273///
274///    ```
275///    use cros_async::{SelectResult, select5, block_on};
276///    use futures::future::pending;
277///    use futures::pin_mut;
278///
279///    let first = async {4};
280///    let second = async {let () = pending().await;};
281///    let third = async {5};
282///    let fourth = async {let () = pending().await;};
283///    let fifth = async {6};
284///    pin_mut!(first);
285///    pin_mut!(second);
286///    pin_mut!(third);
287///    pin_mut!(fourth);
288///    pin_mut!(fifth);
289///    match block_on(select5(first, second, third, fourth, fifth)) {
290///        (SelectResult::Finished(4), SelectResult::Pending(_second),
291///         SelectResult::Finished(5), SelectResult::Pending(_fourth),
292///         SelectResult::Finished(6)) => (),
293///        _ => panic!("Select didn't return the futures"),
294///    };
295///    ```
296pub async fn select5<
297    F1: Future + Unpin,
298    F2: Future + Unpin,
299    F3: Future + Unpin,
300    F4: Future + Unpin,
301    F5: Future + Unpin,
302>(
303    f1: F1,
304    f2: F2,
305    f3: F3,
306    f4: F4,
307    f5: F5,
308) -> (
309    SelectResult<F1>,
310    SelectResult<F2>,
311    SelectResult<F3>,
312    SelectResult<F4>,
313    SelectResult<F5>,
314) {
315    select::Select5::new(f1, f2, f3, f4, f5).await
316}
317
318/// Creates a combinator that runs the six given futures until one or more completes, returning a
319/// tuple containing the result of the finished future(s) and the still pending future(s).
320///
321///  # Example
322///
323///    ```
324///    use cros_async::{SelectResult, select6, block_on};
325///    use futures::future::pending;
326///    use futures::pin_mut;
327///
328///    let first = async {1};
329///    let second = async {let () = pending().await;};
330///    let third = async {3};
331///    let fourth = async {let () = pending().await;};
332///    let fifth = async {5};
333///    let sixth = async {6};
334///    pin_mut!(first);
335///    pin_mut!(second);
336///    pin_mut!(third);
337///    pin_mut!(fourth);
338///    pin_mut!(fifth);
339///    pin_mut!(sixth);
340///    match block_on(select6(first, second, third, fourth, fifth, sixth)) {
341///        (SelectResult::Finished(1), SelectResult::Pending(_second),
342///         SelectResult::Finished(3), SelectResult::Pending(_fourth),
343///         SelectResult::Finished(5), SelectResult::Finished(6)) => (),
344///        _ => panic!("Select didn't return the futures"),
345///    };
346///    ```
347pub async fn select6<
348    F1: Future + Unpin,
349    F2: Future + Unpin,
350    F3: Future + Unpin,
351    F4: Future + Unpin,
352    F5: Future + Unpin,
353    F6: Future + Unpin,
354>(
355    f1: F1,
356    f2: F2,
357    f3: F3,
358    f4: F4,
359    f5: F5,
360    f6: F6,
361) -> (
362    SelectResult<F1>,
363    SelectResult<F2>,
364    SelectResult<F3>,
365    SelectResult<F4>,
366    SelectResult<F5>,
367    SelectResult<F6>,
368) {
369    select::Select6::new(f1, f2, f3, f4, f5, f6).await
370}
371
372pub async fn select7<
373    F1: Future + Unpin,
374    F2: Future + Unpin,
375    F3: Future + Unpin,
376    F4: Future + Unpin,
377    F5: Future + Unpin,
378    F6: Future + Unpin,
379    F7: Future + Unpin,
380>(
381    f1: F1,
382    f2: F2,
383    f3: F3,
384    f4: F4,
385    f5: F5,
386    f6: F6,
387    f7: F7,
388) -> (
389    SelectResult<F1>,
390    SelectResult<F2>,
391    SelectResult<F3>,
392    SelectResult<F4>,
393    SelectResult<F5>,
394    SelectResult<F6>,
395    SelectResult<F7>,
396) {
397    select::Select7::new(f1, f2, f3, f4, f5, f6, f7).await
398}
399
400pub async fn select8<
401    F1: Future + Unpin,
402    F2: Future + Unpin,
403    F3: Future + Unpin,
404    F4: Future + Unpin,
405    F5: Future + Unpin,
406    F6: Future + Unpin,
407    F7: Future + Unpin,
408    F8: Future + Unpin,
409>(
410    f1: F1,
411    f2: F2,
412    f3: F3,
413    f4: F4,
414    f5: F5,
415    f6: F6,
416    f7: F7,
417    f8: F8,
418) -> (
419    SelectResult<F1>,
420    SelectResult<F2>,
421    SelectResult<F3>,
422    SelectResult<F4>,
423    SelectResult<F5>,
424    SelectResult<F6>,
425    SelectResult<F7>,
426    SelectResult<F8>,
427) {
428    select::Select8::new(f1, f2, f3, f4, f5, f6, f7, f8).await
429}
430
431pub async fn select9<
432    F1: Future + Unpin,
433    F2: Future + Unpin,
434    F3: Future + Unpin,
435    F4: Future + Unpin,
436    F5: Future + Unpin,
437    F6: Future + Unpin,
438    F7: Future + Unpin,
439    F8: Future + Unpin,
440    F9: Future + Unpin,
441>(
442    f1: F1,
443    f2: F2,
444    f3: F3,
445    f4: F4,
446    f5: F5,
447    f6: F6,
448    f7: F7,
449    f8: F8,
450    f9: F9,
451) -> (
452    SelectResult<F1>,
453    SelectResult<F2>,
454    SelectResult<F3>,
455    SelectResult<F4>,
456    SelectResult<F5>,
457    SelectResult<F6>,
458    SelectResult<F7>,
459    SelectResult<F8>,
460    SelectResult<F9>,
461) {
462    select::Select9::new(f1, f2, f3, f4, f5, f6, f7, f8, f9).await
463}
464
465pub async fn select10<
466    F1: Future + Unpin,
467    F2: Future + Unpin,
468    F3: Future + Unpin,
469    F4: Future + Unpin,
470    F5: Future + Unpin,
471    F6: Future + Unpin,
472    F7: Future + Unpin,
473    F8: Future + Unpin,
474    F9: Future + Unpin,
475    F10: Future + Unpin,
476>(
477    f1: F1,
478    f2: F2,
479    f3: F3,
480    f4: F4,
481    f5: F5,
482    f6: F6,
483    f7: F7,
484    f8: F8,
485    f9: F9,
486    f10: F10,
487) -> (
488    SelectResult<F1>,
489    SelectResult<F2>,
490    SelectResult<F3>,
491    SelectResult<F4>,
492    SelectResult<F5>,
493    SelectResult<F6>,
494    SelectResult<F7>,
495    SelectResult<F8>,
496    SelectResult<F9>,
497    SelectResult<F10>,
498) {
499    select::Select10::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10).await
500}
501
502pub async fn select11<
503    F1: Future + Unpin,
504    F2: Future + Unpin,
505    F3: Future + Unpin,
506    F4: Future + Unpin,
507    F5: Future + Unpin,
508    F6: Future + Unpin,
509    F7: Future + Unpin,
510    F8: Future + Unpin,
511    F9: Future + Unpin,
512    F10: Future + Unpin,
513    F11: Future + Unpin,
514>(
515    f1: F1,
516    f2: F2,
517    f3: F3,
518    f4: F4,
519    f5: F5,
520    f6: F6,
521    f7: F7,
522    f8: F8,
523    f9: F9,
524    f10: F10,
525    f11: F11,
526) -> (
527    SelectResult<F1>,
528    SelectResult<F2>,
529    SelectResult<F3>,
530    SelectResult<F4>,
531    SelectResult<F5>,
532    SelectResult<F6>,
533    SelectResult<F7>,
534    SelectResult<F8>,
535    SelectResult<F9>,
536    SelectResult<F10>,
537    SelectResult<F11>,
538) {
539    select::Select11::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11).await
540}
541
542pub async fn select12<
543    F1: Future + Unpin,
544    F2: Future + Unpin,
545    F3: Future + Unpin,
546    F4: Future + Unpin,
547    F5: Future + Unpin,
548    F6: Future + Unpin,
549    F7: Future + Unpin,
550    F8: Future + Unpin,
551    F9: Future + Unpin,
552    F10: Future + Unpin,
553    F11: Future + Unpin,
554    F12: Future + Unpin,
555>(
556    f1: F1,
557    f2: F2,
558    f3: F3,
559    f4: F4,
560    f5: F5,
561    f6: F6,
562    f7: F7,
563    f8: F8,
564    f9: F9,
565    f10: F10,
566    f11: F11,
567    f12: F12,
568) -> (
569    SelectResult<F1>,
570    SelectResult<F2>,
571    SelectResult<F3>,
572    SelectResult<F4>,
573    SelectResult<F5>,
574    SelectResult<F6>,
575    SelectResult<F7>,
576    SelectResult<F8>,
577    SelectResult<F9>,
578    SelectResult<F10>,
579    SelectResult<F11>,
580    SelectResult<F12>,
581) {
582    select::Select12::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12).await
583}
584
585// Combination helpers to run until all futures are complete.
586
587/// Creates a combinator that runs the two given futures to completion, returning a tuple of the
588/// outputs each yields.
589///
590///  # Example
591///
592///    ```
593///    use cros_async::{complete2, block_on};
594///
595///    let first = async {5};
596///    let second = async {6};
597///    assert_eq!(block_on(complete2(first, second)), (5,6));
598///    ```
599pub async fn complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output)
600where
601    F1: Future,
602    F2: Future,
603{
604    complete::Complete2::new(f1, f2).await
605}
606
607/// Creates a combinator that runs the three given futures to completion, returning a tuple of the
608/// outputs each yields.
609///
610///  # Example
611///
612///    ```
613///    use cros_async::{complete3, block_on};
614///
615///    let first = async {5};
616///    let second = async {6};
617///    let third = async {7};
618///    assert_eq!(block_on(complete3(first, second, third)), (5,6,7));
619///    ```
620pub async fn complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output)
621where
622    F1: Future,
623    F2: Future,
624    F3: Future,
625{
626    complete::Complete3::new(f1, f2, f3).await
627}
628
629/// Creates a combinator that runs the four given futures to completion, returning a tuple of the
630/// outputs each yields.
631///
632///  # Example
633///
634///    ```
635///    use cros_async::{complete4, block_on};
636///
637///    let first = async {5};
638///    let second = async {6};
639///    let third = async {7};
640///    let fourth = async {8};
641///    assert_eq!(block_on(complete4(first, second, third, fourth)), (5,6,7,8));
642///    ```
643pub async fn complete4<F1, F2, F3, F4>(
644    f1: F1,
645    f2: F2,
646    f3: F3,
647    f4: F4,
648) -> (F1::Output, F2::Output, F3::Output, F4::Output)
649where
650    F1: Future,
651    F2: Future,
652    F3: Future,
653    F4: Future,
654{
655    complete::Complete4::new(f1, f2, f3, f4).await
656}
657
658/// Creates a combinator that runs the five given futures to completion, returning a tuple of the
659/// outputs each yields.
660///
661///  # Example
662///
663///    ```
664///    use cros_async::{complete5, block_on};
665///
666///    let first = async {5};
667///    let second = async {6};
668///    let third = async {7};
669///    let fourth = async {8};
670///    let fifth = async {9};
671///    assert_eq!(block_on(complete5(first, second, third, fourth, fifth)),
672///               (5,6,7,8,9));
673///    ```
674pub async fn complete5<F1, F2, F3, F4, F5>(
675    f1: F1,
676    f2: F2,
677    f3: F3,
678    f4: F4,
679    f5: F5,
680) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)
681where
682    F1: Future,
683    F2: Future,
684    F3: Future,
685    F4: Future,
686    F5: Future,
687{
688    complete::Complete5::new(f1, f2, f3, f4, f5).await
689}