1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::os::fd::AsRawFd;
use tokio::io::unix::AsyncFd;
/// An async version of `base::Tube`.
pub struct TubeTokio(AsyncFd<base::Tube>);
impl TubeTokio {
pub fn new(tube: base::Tube) -> anyhow::Result<Self> {
base::add_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)?;
Ok(Self(AsyncFd::new(tube)?))
}
pub async fn into_inner(self) -> base::Tube {
let tube = self.0.into_inner();
base::clear_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)
.expect("failed to clear O_NONBLOCK");
tube
}
pub async fn send<T: serde::Serialize + Send + 'static>(
&mut self,
msg: T,
) -> base::TubeResult<()> {
loop {
let mut guard = self.0.writable().await.map_err(base::TubeError::Send)?;
let io_result = guard.try_io(|inner| {
// Re-using the non-async send is potentially hazardous since it isn't explicitly
// written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a
// single write syscall, it should be OK.
let r = inner.get_ref().send(&msg);
// Transpose the `std::io::Error` errors outside so that `try_io` can check them
// for `WouldBlock`.
match r {
Ok(x) => Ok(Ok(x)),
Err(base::TubeError::Send(e)) => Err(e),
Err(e) => Ok(Err(e)),
}
});
match io_result {
Ok(result) => {
return match result {
Ok(Ok(x)) => Ok(x),
Ok(Err(e)) => Err(e),
Err(e) => Err(base::TubeError::Send(e)),
}
}
Err(_would_block) => continue,
}
}
}
pub async fn recv<T: serde::de::DeserializeOwned + Send + 'static>(
&mut self,
) -> base::TubeResult<T> {
loop {
let mut guard = self.0.readable().await.map_err(base::TubeError::Recv)?;
let io_result = guard.try_io(|inner| {
// Re-using the non-async recv is potentially hazardous since it isn't explicitly
// written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a
// single read syscall, it should be OK.
let r = inner.get_ref().recv();
// Transpose the `std::io::Error` errors outside so that `try_io` can check them
// for `WouldBlock`.
match r {
Ok(x) => Ok(Ok(x)),
Err(base::TubeError::Recv(e)) => Err(e),
Err(e) => Ok(Err(e)),
}
});
match io_result {
Ok(result) => {
return match result {
Ok(Ok(x)) => Ok(x),
Ok(Err(e)) => Err(e),
Err(e) => Err(base::TubeError::Recv(e)),
}
}
Err(_would_block) => continue,
}
}
}
}