base_tokio/sys/linux/
tube.rs

1// Copyright 2024 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::os::fd::AsRawFd;
6
7use tokio::io::unix::AsyncFd;
8
9/// An async version of `base::Tube`.
10pub struct TubeTokio(AsyncFd<base::Tube>);
11
12impl TubeTokio {
13    pub fn new(tube: base::Tube) -> anyhow::Result<Self> {
14        base::add_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)?;
15        Ok(Self(AsyncFd::new(tube)?))
16    }
17
18    pub async fn into_inner(self) -> base::Tube {
19        let tube = self.0.into_inner();
20        base::clear_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)
21            .expect("failed to clear O_NONBLOCK");
22        tube
23    }
24
25    pub async fn send<T: serde::Serialize + Send + 'static>(
26        &mut self,
27        msg: T,
28    ) -> base::TubeResult<()> {
29        loop {
30            let mut guard = self.0.writable().await.map_err(base::TubeError::Send)?;
31            let io_result = guard.try_io(|inner| {
32                // Re-using the non-async send is potentially hazardous since it isn't explicitly
33                // written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a
34                // single write syscall, it should be OK.
35                let r = inner.get_ref().send(&msg);
36                // Transpose the `std::io::Error` errors outside so that `try_io` can check them
37                // for `WouldBlock`.
38                match r {
39                    Ok(x) => Ok(Ok(x)),
40                    Err(base::TubeError::Send(e)) => Err(e),
41                    Err(e) => Ok(Err(e)),
42                }
43            });
44
45            match io_result {
46                Ok(result) => {
47                    return match result {
48                        Ok(Ok(x)) => Ok(x),
49                        Ok(Err(e)) => Err(e),
50                        Err(e) => Err(base::TubeError::Send(e)),
51                    }
52                }
53                Err(_would_block) => continue,
54            }
55        }
56    }
57
58    pub async fn recv<T: serde::de::DeserializeOwned + Send + 'static>(
59        &mut self,
60    ) -> base::TubeResult<T> {
61        loop {
62            let mut guard = self.0.readable().await.map_err(base::TubeError::Recv)?;
63            let io_result = guard.try_io(|inner| {
64                // Re-using the non-async recv is potentially hazardous since it isn't explicitly
65                // written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a
66                // single read syscall, it should be OK.
67                let r = inner.get_ref().recv();
68                // Transpose the `std::io::Error` errors outside so that `try_io` can check them
69                // for `WouldBlock`.
70                match r {
71                    Ok(x) => Ok(Ok(x)),
72                    Err(base::TubeError::Recv(e)) => Err(e),
73                    Err(e) => Ok(Err(e)),
74                }
75            });
76
77            match io_result {
78                Ok(result) => {
79                    return match result {
80                        Ok(Ok(x)) => Ok(x),
81                        Ok(Err(e)) => Err(e),
82                        Err(e) => Err(base::TubeError::Recv(e)),
83                    }
84                }
85                Err(_would_block) => continue,
86            }
87        }
88    }
89}