1use std::os::unix::prelude::AsRawFd;
6use std::os::unix::prelude::RawFd;
7use std::time::Duration;
8
9use serde::de::DeserializeOwned;
10use serde::Deserialize;
11use serde::Serialize;
12
13use crate::descriptor::AsRawDescriptor;
14use crate::descriptor_reflection::deserialize_with_descriptors;
15use crate::descriptor_reflection::SerializeDescriptors;
16use crate::handle_eintr;
17use crate::tube::Error;
18use crate::tube::RecvTube;
19use crate::tube::Result;
20use crate::tube::SendTube;
21use crate::RawDescriptor;
22use crate::ReadNotifier;
23use crate::ScmSocket;
24use crate::UnixSeqpacket;
25use crate::SCM_SOCKET_MAX_FD_COUNT;
26
27const TUBE_MAX_FDS: usize = 32;
29
30#[derive(Serialize, Deserialize)]
32pub struct Tube {
33 socket: ScmSocket<UnixSeqpacket>,
34}
35
36impl Tube {
37 pub fn pair() -> Result<(Tube, Tube)> {
40 let (socket1, socket2) = UnixSeqpacket::pair().map_err(Error::Pair)?;
41 let tube1 = Tube::try_from(socket1)?;
42 let tube2 = Tube::try_from(socket2)?;
43 Ok((tube1, tube2))
44 }
45
46 #[deprecated]
49 pub fn try_clone(&self) -> Result<Self> {
50 self.socket
51 .inner()
52 .try_clone()
53 .map_err(Error::Clone)?
54 .try_into()
55 }
56
57 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
61 self.send_with_max_fds(msg, TUBE_MAX_FDS)
62 }
63
64 pub fn send_with_max_fds<T: Serialize>(&self, msg: &T, max_fds: usize) -> Result<()> {
67 if max_fds > SCM_SOCKET_MAX_FD_COUNT {
68 return Err(Error::SendTooManyFds);
69 }
70 let msg_serialize = SerializeDescriptors::new(&msg);
71 let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
72 let msg_descriptors = msg_serialize.into_descriptors();
73
74 if msg_descriptors.len() > max_fds {
75 return Err(Error::SendTooManyFds);
76 }
77
78 handle_eintr!(self.socket.send_with_fds(&msg_json, &msg_descriptors))
79 .map_err(Error::Send)?;
80 Ok(())
81 }
82
83 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
87 self.recv_with_max_fds(TUBE_MAX_FDS)
88 }
89
90 pub fn recv_with_max_fds<T: DeserializeOwned>(&self, max_fds: usize) -> Result<T> {
92 if max_fds > SCM_SOCKET_MAX_FD_COUNT {
93 return Err(Error::RecvTooManyFds);
94 }
95
96 let msg_size =
101 handle_eintr!(self.socket.inner().next_packet_size()).map_err(Error::Recv)?;
102 let mut msg_json = vec![0u8; msg_size];
106
107 let (msg_json_size, msg_descriptors) =
108 handle_eintr!(self.socket.recv_with_fds(&mut msg_json, max_fds))
109 .map_err(Error::Recv)?;
110
111 if msg_json_size == 0 {
112 return Err(Error::Disconnected);
113 }
114
115 deserialize_with_descriptors(
116 || serde_json::from_slice(&msg_json[0..msg_json_size]),
117 msg_descriptors,
118 )
119 .map_err(Error::Json)
120 }
121
122 pub fn set_send_timeout(&self, timeout: Option<Duration>) -> Result<()> {
123 self.socket
124 .inner()
125 .set_write_timeout(timeout)
126 .map_err(Error::SetSendTimeout)
127 }
128
129 pub fn set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()> {
130 self.socket
131 .inner()
132 .set_read_timeout(timeout)
133 .map_err(Error::SetRecvTimeout)
134 }
135
136 #[cfg(feature = "proto_tube")]
137 fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
138 let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
139 let no_fds: [RawFd; 0] = [];
140
141 handle_eintr!(self.socket.send_with_fds(&bytes, &no_fds)).map_err(Error::Send)?;
142
143 Ok(())
144 }
145
146 #[cfg(feature = "proto_tube")]
147 fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
148 let msg_size =
149 handle_eintr!(self.socket.inner().next_packet_size()).map_err(Error::Recv)?;
150 let mut msg_bytes = vec![0u8; msg_size];
151
152 let (msg_bytes_size, _) =
153 handle_eintr!(self.socket.recv_with_fds(&mut msg_bytes, TUBE_MAX_FDS))
154 .map_err(Error::Recv)?;
155
156 if msg_bytes_size == 0 {
157 return Err(Error::Disconnected);
158 }
159
160 protobuf::Message::parse_from_bytes(&msg_bytes).map_err(Error::Proto)
161 }
162}
163
164impl TryFrom<UnixSeqpacket> for Tube {
165 type Error = Error;
166
167 fn try_from(socket: UnixSeqpacket) -> Result<Self> {
168 Ok(Tube {
169 socket: socket.try_into().map_err(Error::ScmSocket)?,
170 })
171 }
172}
173
174impl AsRawDescriptor for Tube {
175 fn as_raw_descriptor(&self) -> RawDescriptor {
176 self.socket.as_raw_descriptor()
177 }
178}
179
180impl AsRawFd for Tube {
181 fn as_raw_fd(&self) -> RawFd {
182 self.socket.inner().as_raw_descriptor()
183 }
184}
185
186impl ReadNotifier for Tube {
187 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
188 &self.socket
189 }
190}
191
192impl AsRawDescriptor for SendTube {
193 fn as_raw_descriptor(&self) -> RawDescriptor {
194 self.0.as_raw_descriptor()
195 }
196}
197
198impl AsRawDescriptor for RecvTube {
199 fn as_raw_descriptor(&self) -> RawDescriptor {
200 self.0.as_raw_descriptor()
201 }
202}
203
204#[cfg(feature = "proto_tube")]
208pub struct ProtoTube(Tube);
209
210#[cfg(feature = "proto_tube")]
211impl ProtoTube {
212 pub fn pair() -> Result<(ProtoTube, ProtoTube)> {
213 Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
214 }
215
216 pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
217 self.0.send_proto(msg)
218 }
219
220 pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
221 self.0.recv_proto()
222 }
223}
224
225#[cfg(feature = "proto_tube")]
226impl From<Tube> for ProtoTube {
227 fn from(tube: Tube) -> Self {
228 ProtoTube(tube)
229 }
230}
231
232#[cfg(all(feature = "proto_tube", test))]
233#[allow(unused_variables)]
234mod tests {
235 use protos::cdisk_spec::ComponentDisk;
237
238 use super::*;
239
240 #[test]
241 fn tube_serializes_and_deserializes() {
242 let (pt1, pt2) = ProtoTube::pair().unwrap();
243 let proto = ComponentDisk {
244 file_path: "/some/cool/path".to_string(),
245 offset: 99,
246 ..ComponentDisk::new()
247 };
248
249 pt1.send_proto(&proto).unwrap();
250
251 let recv_proto = pt2.recv_proto().unwrap();
252
253 assert!(proto.eq(&recv_proto));
254 }
255}