base/
tube.rs

1// Copyright 2021 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::io;
6use std::time::Duration;
7
8use remain::sorted;
9use serde::de::DeserializeOwned;
10use serde::Deserialize;
11use serde::Serialize;
12use thiserror::Error as ThisError;
13
14pub use crate::sys::tube::*;
15
16impl Tube {
17    /// Given a Tube end, creates two new ends, one each for sending and receiving.
18    pub fn split_to_send_recv(self) -> Result<(SendTube, RecvTube)> {
19        // Safe because receiving isn't allowd on this end.
20        #[allow(deprecated)]
21        let send_end = self.try_clone()?;
22
23        Ok((SendTube(send_end), RecvTube(self)))
24    }
25
26    /// Creates a Send/Recv pair of Tubes.
27    pub fn directional_pair() -> Result<(SendTube, RecvTube)> {
28        let (t1, t2) = Self::pair()?;
29        Ok((SendTube(t1), RecvTube(t2)))
30    }
31
32    pub fn try_clone_send_tube(&self) -> Result<SendTube> {
33        // Safe because receiving is only allowed on original Tube.
34        #[allow(deprecated)]
35        let send_end = self.try_clone()?;
36        Ok(SendTube(send_end))
37    }
38}
39
40use crate::AsRawDescriptor;
41use crate::ReadNotifier;
42
43#[derive(Serialize, Deserialize)]
44#[serde(transparent)]
45/// A Tube end which can only send messages. Cloneable.
46pub struct SendTube(pub(crate) Tube);
47
48#[allow(dead_code)]
49impl SendTube {
50    /// TODO(b/145998747, b/184398671): this method should be removed.
51    pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
52        unimplemented!("To be removed/refactored upstream.");
53    }
54
55    pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
56        self.0.send(msg)
57    }
58
59    pub fn try_clone(&self) -> Result<Self> {
60        Ok(SendTube(
61            #[allow(deprecated)]
62            self.0.try_clone()?,
63        ))
64    }
65
66    /// Never call this function, it is for use by cros_async to provide
67    /// directional wrapper types only. Using it in any other context may
68    /// violate concurrency assumptions. (Type splitting across crates has put
69    /// us in a situation where we can't use Rust privacy to enforce this.)
70    #[deprecated]
71    pub fn into_tube(self) -> Tube {
72        self.0
73    }
74}
75
76#[derive(Serialize, Deserialize)]
77#[serde(transparent)]
78/// A Tube end which can only recv messages.
79pub struct RecvTube(pub(crate) Tube);
80
81#[allow(dead_code)]
82impl RecvTube {
83    pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
84        self.0.recv()
85    }
86
87    /// TODO(b/145998747, b/184398671): this method should be removed.
88    pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
89        unimplemented!("To be removed/refactored upstream.");
90    }
91
92    /// Never call this function, it is for use by cros_async to provide
93    /// directional wrapper types only. Using it in any other context may
94    /// violate concurrency assumptions. (Type splitting across crates has put
95    /// us in a situation where we can't use Rust privacy to enforce this.)
96    #[deprecated]
97    pub fn into_tube(self) -> Tube {
98        self.0
99    }
100}
101
102impl ReadNotifier for RecvTube {
103    fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
104        self.0.get_read_notifier()
105    }
106}
107
108#[sorted]
109#[derive(ThisError, Debug)]
110pub enum Error {
111    #[cfg(windows)]
112    #[error("attempt to duplicate descriptor via broker failed")]
113    BrokerDupDescriptor,
114    #[error("failed to clone transport: {0}")]
115    Clone(io::Error),
116    #[error("tube was disconnected")]
117    Disconnected,
118    #[error("failed to duplicate descriptor: {0}")]
119    DupDescriptor(io::Error),
120    #[cfg(windows)]
121    #[error("failed to flush named pipe: {0}")]
122    Flush(io::Error),
123    #[cfg(unix)]
124    #[error("byte framing mode is not supported")]
125    InvalidFramingMode,
126    #[error("failed to serialize/deserialize json from packet: {0}")]
127    Json(serde_json::Error),
128    #[error("cancelled a queued async operation")]
129    OperationCancelled,
130    #[error("failed to crate tube pair: {0}")]
131    Pair(io::Error),
132    #[cfg(any(windows, feature = "proto_tube"))]
133    #[error("encountered protobuf error: {0}")]
134    Proto(protobuf::Error),
135    #[error("failed to receive packet: {0}")]
136    Recv(io::Error),
137    #[error("attempted to receive too many file descriptors")]
138    RecvTooManyFds,
139    #[error("Received a message with a zero sized body. This should not happen.")]
140    RecvUnexpectedEmptyBody,
141    #[cfg(unix)]
142    #[error("failed to construct ScmSocket: {0}")]
143    ScmSocket(io::Error),
144    #[error("failed to send packet: {0}")]
145    Send(io::Error),
146    #[error("failed to write packet to intermediate buffer: {0}")]
147    SendIoBuf(io::Error),
148    #[error("attempted to send too many file descriptors")]
149    SendTooManyFds,
150    #[error("failed to set recv timeout: {0}")]
151    SetRecvTimeout(io::Error),
152    #[error("failed to set send timeout: {0}")]
153    SetSendTimeout(io::Error),
154}
155
156pub type Result<T> = std::result::Result<T, Error>;