base/sys/unix/
tube.rs

1// Copyright 2021 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::os::unix::prelude::AsRawFd;
6use std::os::unix::prelude::RawFd;
7use std::time::Duration;
8
9use serde::de::DeserializeOwned;
10use serde::Deserialize;
11use serde::Serialize;
12
13use crate::descriptor::AsRawDescriptor;
14use crate::descriptor_reflection::deserialize_with_descriptors;
15use crate::descriptor_reflection::SerializeDescriptors;
16use crate::handle_eintr;
17use crate::tube::Error;
18use crate::tube::RecvTube;
19use crate::tube::Result;
20use crate::tube::SendTube;
21use crate::RawDescriptor;
22use crate::ReadNotifier;
23use crate::ScmSocket;
24use crate::UnixSeqpacket;
25use crate::SCM_SOCKET_MAX_FD_COUNT;
26
27// This size matches the inline buffer size of CmsgBuffer.
28const TUBE_MAX_FDS: usize = 32;
29
30/// Bidirectional tube that support both send and recv.
31#[derive(Serialize, Deserialize)]
32pub struct Tube {
33    socket: ScmSocket<UnixSeqpacket>,
34}
35
36impl Tube {
37    /// Create a pair of connected tubes. Request is sent in one direction while response is in the
38    /// other direction.
39    pub fn pair() -> Result<(Tube, Tube)> {
40        let (socket1, socket2) = UnixSeqpacket::pair().map_err(Error::Pair)?;
41        let tube1 = Tube::try_from(socket1)?;
42        let tube2 = Tube::try_from(socket2)?;
43        Ok((tube1, tube2))
44    }
45
46    /// DO NOT USE this method directly as it will become private soon (b/221484449). Use a
47    /// directional Tube pair instead.
48    #[deprecated]
49    pub fn try_clone(&self) -> Result<Self> {
50        self.socket
51            .inner()
52            .try_clone()
53            .map_err(Error::Clone)?
54            .try_into()
55    }
56
57    /// Sends a message via a Tube.
58    /// The number of file descriptors that this method can send is limited to `TUBE_MAX_FDS`.
59    /// If you want to send more descriptors, use `send_with_max_fds` instead.
60    pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
61        self.send_with_max_fds(msg, TUBE_MAX_FDS)
62    }
63
64    /// Sends a message with at most `max_fds` file descriptors via a Tube.
65    /// Note that `max_fds` must not exceed `SCM_SOCKET_MAX_FD_COUNT` (= 253).
66    pub fn send_with_max_fds<T: Serialize>(&self, msg: &T, max_fds: usize) -> Result<()> {
67        if max_fds > SCM_SOCKET_MAX_FD_COUNT {
68            return Err(Error::SendTooManyFds);
69        }
70        let msg_serialize = SerializeDescriptors::new(&msg);
71        let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
72        let msg_descriptors = msg_serialize.into_descriptors();
73
74        if msg_descriptors.len() > max_fds {
75            return Err(Error::SendTooManyFds);
76        }
77
78        handle_eintr!(self.socket.send_with_fds(&msg_json, &msg_descriptors))
79            .map_err(Error::Send)?;
80        Ok(())
81    }
82
83    /// Recieves a message from a Tube.
84    /// If the sender sent file descriptors more than TUBE_MAX_FDS with `send_with_max_fds`, use
85    /// `recv_with_max_fds` instead.
86    pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
87        self.recv_with_max_fds(TUBE_MAX_FDS)
88    }
89
90    /// Recieves a message with at most `max_fds` file descriptors from a Tube.
91    pub fn recv_with_max_fds<T: DeserializeOwned>(&self, max_fds: usize) -> Result<T> {
92        if max_fds > SCM_SOCKET_MAX_FD_COUNT {
93            return Err(Error::RecvTooManyFds);
94        }
95
96        // WARNING: The `cros_async` and `base_tokio` tube wrappers both assume that, if the tube
97        // is readable, then a call to `Tube::recv` will not block (which ought to be true since we
98        // use SOCK_SEQPACKET and a single recvmsg call currently).
99
100        let msg_size =
101            handle_eintr!(self.socket.inner().next_packet_size()).map_err(Error::Recv)?;
102        // This buffer is the right size, as the size received in next_packet_size() represents the
103        // size of only the message itself and not the file descriptors. The descriptors are stored
104        // separately in msghdr::msg_control.
105        let mut msg_json = vec![0u8; msg_size];
106
107        let (msg_json_size, msg_descriptors) =
108            handle_eintr!(self.socket.recv_with_fds(&mut msg_json, max_fds))
109                .map_err(Error::Recv)?;
110
111        if msg_json_size == 0 {
112            return Err(Error::Disconnected);
113        }
114
115        deserialize_with_descriptors(
116            || serde_json::from_slice(&msg_json[0..msg_json_size]),
117            msg_descriptors,
118        )
119        .map_err(Error::Json)
120    }
121
122    pub fn set_send_timeout(&self, timeout: Option<Duration>) -> Result<()> {
123        self.socket
124            .inner()
125            .set_write_timeout(timeout)
126            .map_err(Error::SetSendTimeout)
127    }
128
129    pub fn set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()> {
130        self.socket
131            .inner()
132            .set_read_timeout(timeout)
133            .map_err(Error::SetRecvTimeout)
134    }
135
136    #[cfg(feature = "proto_tube")]
137    fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
138        let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
139        let no_fds: [RawFd; 0] = [];
140
141        handle_eintr!(self.socket.send_with_fds(&bytes, &no_fds)).map_err(Error::Send)?;
142
143        Ok(())
144    }
145
146    #[cfg(feature = "proto_tube")]
147    fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
148        let msg_size =
149            handle_eintr!(self.socket.inner().next_packet_size()).map_err(Error::Recv)?;
150        let mut msg_bytes = vec![0u8; msg_size];
151
152        let (msg_bytes_size, _) =
153            handle_eintr!(self.socket.recv_with_fds(&mut msg_bytes, TUBE_MAX_FDS))
154                .map_err(Error::Recv)?;
155
156        if msg_bytes_size == 0 {
157            return Err(Error::Disconnected);
158        }
159
160        protobuf::Message::parse_from_bytes(&msg_bytes).map_err(Error::Proto)
161    }
162}
163
164impl TryFrom<UnixSeqpacket> for Tube {
165    type Error = Error;
166
167    fn try_from(socket: UnixSeqpacket) -> Result<Self> {
168        Ok(Tube {
169            socket: socket.try_into().map_err(Error::ScmSocket)?,
170        })
171    }
172}
173
174impl AsRawDescriptor for Tube {
175    fn as_raw_descriptor(&self) -> RawDescriptor {
176        self.socket.as_raw_descriptor()
177    }
178}
179
180impl AsRawFd for Tube {
181    fn as_raw_fd(&self) -> RawFd {
182        self.socket.inner().as_raw_descriptor()
183    }
184}
185
186impl ReadNotifier for Tube {
187    fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
188        &self.socket
189    }
190}
191
192impl AsRawDescriptor for SendTube {
193    fn as_raw_descriptor(&self) -> RawDescriptor {
194        self.0.as_raw_descriptor()
195    }
196}
197
198impl AsRawDescriptor for RecvTube {
199    fn as_raw_descriptor(&self) -> RawDescriptor {
200        self.0.as_raw_descriptor()
201    }
202}
203
204/// Wrapper for Tube used for sending and receiving protos - avoids extra overhead of serialization
205/// via serde_json. Since protos should be standalone objects we do not support sending of file
206/// descriptors as a normal Tube would.
207#[cfg(feature = "proto_tube")]
208pub struct ProtoTube(Tube);
209
210#[cfg(feature = "proto_tube")]
211impl ProtoTube {
212    pub fn pair() -> Result<(ProtoTube, ProtoTube)> {
213        Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
214    }
215
216    pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
217        self.0.send_proto(msg)
218    }
219
220    pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
221        self.0.recv_proto()
222    }
223}
224
225#[cfg(feature = "proto_tube")]
226impl From<Tube> for ProtoTube {
227    fn from(tube: Tube) -> Self {
228        ProtoTube(tube)
229    }
230}
231
232#[cfg(all(feature = "proto_tube", test))]
233#[allow(unused_variables)]
234mod tests {
235    // not testing this proto specifically, just need an existing one to test the ProtoTube.
236    use protos::cdisk_spec::ComponentDisk;
237
238    use super::*;
239
240    #[test]
241    fn tube_serializes_and_deserializes() {
242        let (pt1, pt2) = ProtoTube::pair().unwrap();
243        let proto = ComponentDisk {
244            file_path: "/some/cool/path".to_string(),
245            offset: 99,
246            ..ComponentDisk::new()
247        };
248
249        pt1.send_proto(&proto).unwrap();
250
251        let recv_proto = pt2.recv_proto().unwrap();
252
253        assert!(proto.eq(&recv_proto));
254    }
255}