base_tokio/sys/linux/
tube.rs1use std::os::fd::AsRawFd;
6
7use tokio::io::unix::AsyncFd;
8
9pub struct TubeTokio(AsyncFd<base::Tube>);
11
12impl TubeTokio {
13 pub fn new(tube: base::Tube) -> anyhow::Result<Self> {
14 base::add_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)?;
15 Ok(Self(AsyncFd::new(tube)?))
16 }
17
18 pub async fn into_inner(self) -> base::Tube {
19 let tube = self.0.into_inner();
20 base::clear_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)
21 .expect("failed to clear O_NONBLOCK");
22 tube
23 }
24
25 pub async fn send<T: serde::Serialize + Send + 'static>(
26 &mut self,
27 msg: T,
28 ) -> base::TubeResult<()> {
29 loop {
30 let mut guard = self.0.writable().await.map_err(base::TubeError::Send)?;
31 let io_result = guard.try_io(|inner| {
32 let r = inner.get_ref().send(&msg);
36 match r {
39 Ok(x) => Ok(Ok(x)),
40 Err(base::TubeError::Send(e)) => Err(e),
41 Err(e) => Ok(Err(e)),
42 }
43 });
44
45 match io_result {
46 Ok(result) => {
47 return match result {
48 Ok(Ok(x)) => Ok(x),
49 Ok(Err(e)) => Err(e),
50 Err(e) => Err(base::TubeError::Send(e)),
51 }
52 }
53 Err(_would_block) => continue,
54 }
55 }
56 }
57
58 pub async fn recv<T: serde::de::DeserializeOwned + Send + 'static>(
59 &mut self,
60 ) -> base::TubeResult<T> {
61 loop {
62 let mut guard = self.0.readable().await.map_err(base::TubeError::Recv)?;
63 let io_result = guard.try_io(|inner| {
64 let r = inner.get_ref().recv();
68 match r {
71 Ok(x) => Ok(Ok(x)),
72 Err(base::TubeError::Recv(e)) => Err(e),
73 Err(e) => Ok(Err(e)),
74 }
75 });
76
77 match io_result {
78 Ok(result) => {
79 return match result {
80 Ok(Ok(x)) => Ok(x),
81 Ok(Err(e)) => Err(e),
82 Err(e) => Err(base::TubeError::Recv(e)),
83 }
84 }
85 Err(_would_block) => continue,
86 }
87 }
88 }
89}