cros_async/lib.rs
1// Copyright 2020 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An Executor and future combinators based on operations that block on file descriptors.
6//!
7//! This crate is meant to be used with the `futures-rs` crate that provides further combinators
8//! and utility functions to combine and manage futures. All futures will run until they block on a
9//! file descriptor becoming readable or writable. Facilities are provided to register future
10//! wakers based on such events.
11//!
12//! # Running top-level futures.
13//!
14//! Use helper functions based the desired behavior of your application.
15//!
16//! ## Completing one of several futures.
17//!
18//! If there are several top level tasks that should run until any one completes, use the "select"
19//! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
20//! function will return when the first future completes. The uncompleted futures will also be
21//! returned so they can be run further or otherwise cleaned up. These functions are inspired by
22//! the `select_all` function from futures-rs, but built to be run inside an FD based executor and
23//! to poll only when necessary. See the docs for [`select2`](fn.select2.html),
24//! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html).
25//!
26//! ## Completing all of several futures.
27//!
28//! If there are several top level tasks that all need to be completed, use the "complete" family
29//! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
30//! function will return only once all the futures passed to it have completed. These functions are
31//! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based
32//! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html),
33//! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and
34//! [`complete5`](fn.complete5.html).
35//!
36//! # Implementing new FD-based futures.
37//!
38//! For URing implementations should provide an implementation of the `IoSource` trait.
39//! For the FD executor, new futures can use the existing ability to poll a source to build async
40//! functionality on top of.
41//!
42//! # Implementations
43//!
44//! Currently there are two paths for using the asynchronous IO. One uses a WaitContext and drives
45//! futures based on the FDs signaling they are ready for the opteration. This method will exist so
46//! long as kernels < 5.4 are supported.
47//! The other method submits operations to io_uring and is signaled when they complete. This is more
48//! efficient, but only supported on kernel 5.4+.
49//! If `IoSource::new` is used to interface with async IO, then the correct backend will be chosen
50//! automatically.
51//!
52//! # Examples
53//!
54//! See the docs for `IoSource` if support for kernels <5.4 is required. Focus on `UringSource` if
55//! all systems have support for io_uring.
56
57mod async_types;
58pub mod audio_streams_async;
59mod blocking;
60mod common_executor;
61mod complete;
62mod event;
63mod executor;
64mod io_ext;
65mod io_source;
66pub mod mem;
67mod queue;
68mod select;
69pub mod sync;
70pub mod sys;
71mod timer;
72#[cfg(feature = "tokio")]
73mod tokio_executor;
74mod waker;
75
76use std::future::Future;
77use std::pin::Pin;
78use std::task::Poll;
79
80pub use async_types::*;
81pub use base::Event;
82#[cfg(any(target_os = "android", target_os = "linux"))]
83pub use blocking::sys::linux::block_on::block_on;
84pub use blocking::unblock;
85pub use blocking::unblock_disarm;
86pub use blocking::BlockingPool;
87pub use blocking::CancellableBlockingPool;
88pub use blocking::TimeoutAction;
89pub use event::EventAsync;
90pub use executor::Executor;
91pub use executor::ExecutorKind;
92pub(crate) use executor::ExecutorTrait;
93pub use executor::TaskHandle;
94#[cfg(windows)]
95pub use futures::executor::block_on;
96use futures::stream::FuturesUnordered;
97pub use io_ext::AsyncError;
98pub use io_ext::AsyncResult;
99pub use io_ext::AsyncWrapper;
100pub use io_ext::IntoAsync;
101pub use io_source::IoSource;
102pub use mem::BackingMemory;
103pub use mem::MemRegion;
104pub use mem::MemRegionIter;
105pub use mem::VecIoWrapper;
106use remain::sorted;
107pub use select::SelectResult;
108#[cfg(any(target_os = "android", target_os = "linux"))]
109pub use sys::linux::uring_executor::is_uring_stable;
110use thiserror::Error as ThisError;
111pub use timer::TimerAsync;
112
113#[sorted]
114#[derive(ThisError, Debug)]
115pub enum Error {
116 /// Error from EventAsync
117 #[error("Failure in EventAsync: {0}")]
118 EventAsync(base::Error),
119 /// Error from the handle executor.
120 #[cfg(windows)]
121 #[error("Failure in the handle executor: {0}")]
122 HandleExecutor(sys::windows::handle_executor::Error),
123 #[error("IO error: {0}")]
124 Io(std::io::Error),
125 /// Error from the polled(FD) source, which includes error from the FD executor.
126 #[cfg(any(target_os = "android", target_os = "linux"))]
127 #[error("An error with a poll source: {0}")]
128 PollSource(sys::linux::poll_source::Error),
129 /// Error from Timer.
130 #[error("Failure in Timer: {0}")]
131 Timer(base::Error),
132 /// Error from TimerFd.
133 #[error("Failure in TimerAsync: {0}")]
134 TimerAsync(AsyncError),
135 /// Error from the uring executor.
136 #[cfg(any(target_os = "android", target_os = "linux"))]
137 #[error("Failure in the uring executor: {0}")]
138 URingExecutor(sys::linux::uring_executor::Error),
139}
140pub type Result<T> = std::result::Result<T, Error>;
141
142/// Heterogeneous collection of `async_task:Task` that are running in a "detached" state.
143///
144/// We keep them around to ensure they are dropped before the executor they are running on.
145pub(crate) struct DetachedTasks(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>);
146
147impl DetachedTasks {
148 pub(crate) fn new() -> Self {
149 DetachedTasks(FuturesUnordered::new())
150 }
151
152 pub(crate) fn push<R: Send + 'static>(&self, task: async_task::Task<R>) {
153 // Convert to fallible, otherwise poll could panic if the `Runnable` is dropped early.
154 let task = task.fallible();
155 self.0.push(Box::pin(async {
156 let _ = task.await;
157 }));
158 }
159
160 /// Polls all the tasks, dropping any that complete.
161 pub(crate) fn poll(&mut self, cx: &mut std::task::Context) {
162 use futures::Stream;
163 while let Poll::Ready(Some(_)) = Pin::new(&mut self.0).poll_next(cx) {}
164 }
165}
166
167// Select helpers to run until any future completes.
168
169/// Creates a combinator that runs the two given futures until one completes, returning a tuple
170/// containing the result of the finished future and the still pending future.
171///
172/// # Example
173///
174/// ```
175/// use cros_async::{SelectResult, select2, block_on};
176/// use futures::future::pending;
177/// use futures::pin_mut;
178///
179/// let first = async {5};
180/// let second = async {let () = pending().await;};
181/// pin_mut!(first);
182/// pin_mut!(second);
183/// match block_on(select2(first, second)) {
184/// (SelectResult::Finished(5), SelectResult::Pending(_second)) => (),
185/// _ => panic!("Select didn't return the first future"),
186/// };
187/// ```
188pub async fn select2<F1: Future + Unpin, F2: Future + Unpin>(
189 f1: F1,
190 f2: F2,
191) -> (SelectResult<F1>, SelectResult<F2>) {
192 select::Select2::new(f1, f2).await
193}
194
195/// Creates a combinator that runs the three given futures until one or more completes, returning a
196/// tuple containing the result of the finished future(s) and the still pending future(s).
197///
198/// # Example
199///
200/// ```
201/// use cros_async::{SelectResult, select3, block_on};
202/// use futures::future::pending;
203/// use futures::pin_mut;
204///
205/// let first = async {4};
206/// let second = async {let () = pending().await;};
207/// let third = async {5};
208/// pin_mut!(first);
209/// pin_mut!(second);
210/// pin_mut!(third);
211/// match block_on(select3(first, second, third)) {
212/// (SelectResult::Finished(4),
213/// SelectResult::Pending(_second),
214/// SelectResult::Finished(5)) => (),
215/// _ => panic!("Select didn't return the futures"),
216/// };
217/// ```
218pub async fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
219 f1: F1,
220 f2: F2,
221 f3: F3,
222) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>) {
223 select::Select3::new(f1, f2, f3).await
224}
225
226/// Creates a combinator that runs the four given futures until one or more completes, returning a
227/// tuple containing the result of the finished future(s) and the still pending future(s).
228///
229/// # Example
230///
231/// ```
232/// use cros_async::{SelectResult, select4, block_on};
233/// use futures::future::pending;
234/// use futures::pin_mut;
235///
236/// let first = async {4};
237/// let second = async {let () = pending().await;};
238/// let third = async {5};
239/// let fourth = async {let () = pending().await;};
240/// pin_mut!(first);
241/// pin_mut!(second);
242/// pin_mut!(third);
243/// pin_mut!(fourth);
244/// match block_on(select4(first, second, third, fourth)) {
245/// (SelectResult::Finished(4), SelectResult::Pending(_second),
246/// SelectResult::Finished(5), SelectResult::Pending(_fourth)) => (),
247/// _ => panic!("Select didn't return the futures"),
248/// };
249/// ```
250pub async fn select4<
251 F1: Future + Unpin,
252 F2: Future + Unpin,
253 F3: Future + Unpin,
254 F4: Future + Unpin,
255>(
256 f1: F1,
257 f2: F2,
258 f3: F3,
259 f4: F4,
260) -> (
261 SelectResult<F1>,
262 SelectResult<F2>,
263 SelectResult<F3>,
264 SelectResult<F4>,
265) {
266 select::Select4::new(f1, f2, f3, f4).await
267}
268
269/// Creates a combinator that runs the five given futures until one or more completes, returning a
270/// tuple containing the result of the finished future(s) and the still pending future(s).
271///
272/// # Example
273///
274/// ```
275/// use cros_async::{SelectResult, select5, block_on};
276/// use futures::future::pending;
277/// use futures::pin_mut;
278///
279/// let first = async {4};
280/// let second = async {let () = pending().await;};
281/// let third = async {5};
282/// let fourth = async {let () = pending().await;};
283/// let fifth = async {6};
284/// pin_mut!(first);
285/// pin_mut!(second);
286/// pin_mut!(third);
287/// pin_mut!(fourth);
288/// pin_mut!(fifth);
289/// match block_on(select5(first, second, third, fourth, fifth)) {
290/// (SelectResult::Finished(4), SelectResult::Pending(_second),
291/// SelectResult::Finished(5), SelectResult::Pending(_fourth),
292/// SelectResult::Finished(6)) => (),
293/// _ => panic!("Select didn't return the futures"),
294/// };
295/// ```
296pub async fn select5<
297 F1: Future + Unpin,
298 F2: Future + Unpin,
299 F3: Future + Unpin,
300 F4: Future + Unpin,
301 F5: Future + Unpin,
302>(
303 f1: F1,
304 f2: F2,
305 f3: F3,
306 f4: F4,
307 f5: F5,
308) -> (
309 SelectResult<F1>,
310 SelectResult<F2>,
311 SelectResult<F3>,
312 SelectResult<F4>,
313 SelectResult<F5>,
314) {
315 select::Select5::new(f1, f2, f3, f4, f5).await
316}
317
318/// Creates a combinator that runs the six given futures until one or more completes, returning a
319/// tuple containing the result of the finished future(s) and the still pending future(s).
320///
321/// # Example
322///
323/// ```
324/// use cros_async::{SelectResult, select6, block_on};
325/// use futures::future::pending;
326/// use futures::pin_mut;
327///
328/// let first = async {1};
329/// let second = async {let () = pending().await;};
330/// let third = async {3};
331/// let fourth = async {let () = pending().await;};
332/// let fifth = async {5};
333/// let sixth = async {6};
334/// pin_mut!(first);
335/// pin_mut!(second);
336/// pin_mut!(third);
337/// pin_mut!(fourth);
338/// pin_mut!(fifth);
339/// pin_mut!(sixth);
340/// match block_on(select6(first, second, third, fourth, fifth, sixth)) {
341/// (SelectResult::Finished(1), SelectResult::Pending(_second),
342/// SelectResult::Finished(3), SelectResult::Pending(_fourth),
343/// SelectResult::Finished(5), SelectResult::Finished(6)) => (),
344/// _ => panic!("Select didn't return the futures"),
345/// };
346/// ```
347pub async fn select6<
348 F1: Future + Unpin,
349 F2: Future + Unpin,
350 F3: Future + Unpin,
351 F4: Future + Unpin,
352 F5: Future + Unpin,
353 F6: Future + Unpin,
354>(
355 f1: F1,
356 f2: F2,
357 f3: F3,
358 f4: F4,
359 f5: F5,
360 f6: F6,
361) -> (
362 SelectResult<F1>,
363 SelectResult<F2>,
364 SelectResult<F3>,
365 SelectResult<F4>,
366 SelectResult<F5>,
367 SelectResult<F6>,
368) {
369 select::Select6::new(f1, f2, f3, f4, f5, f6).await
370}
371
372pub async fn select7<
373 F1: Future + Unpin,
374 F2: Future + Unpin,
375 F3: Future + Unpin,
376 F4: Future + Unpin,
377 F5: Future + Unpin,
378 F6: Future + Unpin,
379 F7: Future + Unpin,
380>(
381 f1: F1,
382 f2: F2,
383 f3: F3,
384 f4: F4,
385 f5: F5,
386 f6: F6,
387 f7: F7,
388) -> (
389 SelectResult<F1>,
390 SelectResult<F2>,
391 SelectResult<F3>,
392 SelectResult<F4>,
393 SelectResult<F5>,
394 SelectResult<F6>,
395 SelectResult<F7>,
396) {
397 select::Select7::new(f1, f2, f3, f4, f5, f6, f7).await
398}
399
400pub async fn select8<
401 F1: Future + Unpin,
402 F2: Future + Unpin,
403 F3: Future + Unpin,
404 F4: Future + Unpin,
405 F5: Future + Unpin,
406 F6: Future + Unpin,
407 F7: Future + Unpin,
408 F8: Future + Unpin,
409>(
410 f1: F1,
411 f2: F2,
412 f3: F3,
413 f4: F4,
414 f5: F5,
415 f6: F6,
416 f7: F7,
417 f8: F8,
418) -> (
419 SelectResult<F1>,
420 SelectResult<F2>,
421 SelectResult<F3>,
422 SelectResult<F4>,
423 SelectResult<F5>,
424 SelectResult<F6>,
425 SelectResult<F7>,
426 SelectResult<F8>,
427) {
428 select::Select8::new(f1, f2, f3, f4, f5, f6, f7, f8).await
429}
430
431pub async fn select9<
432 F1: Future + Unpin,
433 F2: Future + Unpin,
434 F3: Future + Unpin,
435 F4: Future + Unpin,
436 F5: Future + Unpin,
437 F6: Future + Unpin,
438 F7: Future + Unpin,
439 F8: Future + Unpin,
440 F9: Future + Unpin,
441>(
442 f1: F1,
443 f2: F2,
444 f3: F3,
445 f4: F4,
446 f5: F5,
447 f6: F6,
448 f7: F7,
449 f8: F8,
450 f9: F9,
451) -> (
452 SelectResult<F1>,
453 SelectResult<F2>,
454 SelectResult<F3>,
455 SelectResult<F4>,
456 SelectResult<F5>,
457 SelectResult<F6>,
458 SelectResult<F7>,
459 SelectResult<F8>,
460 SelectResult<F9>,
461) {
462 select::Select9::new(f1, f2, f3, f4, f5, f6, f7, f8, f9).await
463}
464
465pub async fn select10<
466 F1: Future + Unpin,
467 F2: Future + Unpin,
468 F3: Future + Unpin,
469 F4: Future + Unpin,
470 F5: Future + Unpin,
471 F6: Future + Unpin,
472 F7: Future + Unpin,
473 F8: Future + Unpin,
474 F9: Future + Unpin,
475 F10: Future + Unpin,
476>(
477 f1: F1,
478 f2: F2,
479 f3: F3,
480 f4: F4,
481 f5: F5,
482 f6: F6,
483 f7: F7,
484 f8: F8,
485 f9: F9,
486 f10: F10,
487) -> (
488 SelectResult<F1>,
489 SelectResult<F2>,
490 SelectResult<F3>,
491 SelectResult<F4>,
492 SelectResult<F5>,
493 SelectResult<F6>,
494 SelectResult<F7>,
495 SelectResult<F8>,
496 SelectResult<F9>,
497 SelectResult<F10>,
498) {
499 select::Select10::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10).await
500}
501
502pub async fn select11<
503 F1: Future + Unpin,
504 F2: Future + Unpin,
505 F3: Future + Unpin,
506 F4: Future + Unpin,
507 F5: Future + Unpin,
508 F6: Future + Unpin,
509 F7: Future + Unpin,
510 F8: Future + Unpin,
511 F9: Future + Unpin,
512 F10: Future + Unpin,
513 F11: Future + Unpin,
514>(
515 f1: F1,
516 f2: F2,
517 f3: F3,
518 f4: F4,
519 f5: F5,
520 f6: F6,
521 f7: F7,
522 f8: F8,
523 f9: F9,
524 f10: F10,
525 f11: F11,
526) -> (
527 SelectResult<F1>,
528 SelectResult<F2>,
529 SelectResult<F3>,
530 SelectResult<F4>,
531 SelectResult<F5>,
532 SelectResult<F6>,
533 SelectResult<F7>,
534 SelectResult<F8>,
535 SelectResult<F9>,
536 SelectResult<F10>,
537 SelectResult<F11>,
538) {
539 select::Select11::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11).await
540}
541
542pub async fn select12<
543 F1: Future + Unpin,
544 F2: Future + Unpin,
545 F3: Future + Unpin,
546 F4: Future + Unpin,
547 F5: Future + Unpin,
548 F6: Future + Unpin,
549 F7: Future + Unpin,
550 F8: Future + Unpin,
551 F9: Future + Unpin,
552 F10: Future + Unpin,
553 F11: Future + Unpin,
554 F12: Future + Unpin,
555>(
556 f1: F1,
557 f2: F2,
558 f3: F3,
559 f4: F4,
560 f5: F5,
561 f6: F6,
562 f7: F7,
563 f8: F8,
564 f9: F9,
565 f10: F10,
566 f11: F11,
567 f12: F12,
568) -> (
569 SelectResult<F1>,
570 SelectResult<F2>,
571 SelectResult<F3>,
572 SelectResult<F4>,
573 SelectResult<F5>,
574 SelectResult<F6>,
575 SelectResult<F7>,
576 SelectResult<F8>,
577 SelectResult<F9>,
578 SelectResult<F10>,
579 SelectResult<F11>,
580 SelectResult<F12>,
581) {
582 select::Select12::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12).await
583}
584
585// Combination helpers to run until all futures are complete.
586
587/// Creates a combinator that runs the two given futures to completion, returning a tuple of the
588/// outputs each yields.
589///
590/// # Example
591///
592/// ```
593/// use cros_async::{complete2, block_on};
594///
595/// let first = async {5};
596/// let second = async {6};
597/// assert_eq!(block_on(complete2(first, second)), (5,6));
598/// ```
599pub async fn complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output)
600where
601 F1: Future,
602 F2: Future,
603{
604 complete::Complete2::new(f1, f2).await
605}
606
607/// Creates a combinator that runs the three given futures to completion, returning a tuple of the
608/// outputs each yields.
609///
610/// # Example
611///
612/// ```
613/// use cros_async::{complete3, block_on};
614///
615/// let first = async {5};
616/// let second = async {6};
617/// let third = async {7};
618/// assert_eq!(block_on(complete3(first, second, third)), (5,6,7));
619/// ```
620pub async fn complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output)
621where
622 F1: Future,
623 F2: Future,
624 F3: Future,
625{
626 complete::Complete3::new(f1, f2, f3).await
627}
628
629/// Creates a combinator that runs the four given futures to completion, returning a tuple of the
630/// outputs each yields.
631///
632/// # Example
633///
634/// ```
635/// use cros_async::{complete4, block_on};
636///
637/// let first = async {5};
638/// let second = async {6};
639/// let third = async {7};
640/// let fourth = async {8};
641/// assert_eq!(block_on(complete4(first, second, third, fourth)), (5,6,7,8));
642/// ```
643pub async fn complete4<F1, F2, F3, F4>(
644 f1: F1,
645 f2: F2,
646 f3: F3,
647 f4: F4,
648) -> (F1::Output, F2::Output, F3::Output, F4::Output)
649where
650 F1: Future,
651 F2: Future,
652 F3: Future,
653 F4: Future,
654{
655 complete::Complete4::new(f1, f2, f3, f4).await
656}
657
658/// Creates a combinator that runs the five given futures to completion, returning a tuple of the
659/// outputs each yields.
660///
661/// # Example
662///
663/// ```
664/// use cros_async::{complete5, block_on};
665///
666/// let first = async {5};
667/// let second = async {6};
668/// let third = async {7};
669/// let fourth = async {8};
670/// let fifth = async {9};
671/// assert_eq!(block_on(complete5(first, second, third, fourth, fifth)),
672/// (5,6,7,8,9));
673/// ```
674pub async fn complete5<F1, F2, F3, F4, F5>(
675 f1: F1,
676 f2: F2,
677 f3: F3,
678 f4: F4,
679 f5: F5,
680) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)
681where
682 F1: Future,
683 F2: Future,
684 F3: Future,
685 F4: Future,
686 F5: Future,
687{
688 complete::Complete5::new(f1, f2, f3, f4, f5).await
689}