1use std::io;
6use std::time::Duration;
7
8use remain::sorted;
9use serde::de::DeserializeOwned;
10use serde::Deserialize;
11use serde::Serialize;
12use thiserror::Error as ThisError;
13
14pub use crate::sys::tube::*;
15
16impl Tube {
17 pub fn split_to_send_recv(self) -> Result<(SendTube, RecvTube)> {
19 #[allow(deprecated)]
21 let send_end = self.try_clone()?;
22
23 Ok((SendTube(send_end), RecvTube(self)))
24 }
25
26 pub fn directional_pair() -> Result<(SendTube, RecvTube)> {
28 let (t1, t2) = Self::pair()?;
29 Ok((SendTube(t1), RecvTube(t2)))
30 }
31
32 pub fn try_clone_send_tube(&self) -> Result<SendTube> {
33 #[allow(deprecated)]
35 let send_end = self.try_clone()?;
36 Ok(SendTube(send_end))
37 }
38}
39
40use crate::AsRawDescriptor;
41use crate::ReadNotifier;
42
43#[derive(Serialize, Deserialize)]
44#[serde(transparent)]
45pub struct SendTube(pub(crate) Tube);
47
48#[allow(dead_code)]
49impl SendTube {
50 pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
52 unimplemented!("To be removed/refactored upstream.");
53 }
54
55 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
56 self.0.send(msg)
57 }
58
59 pub fn try_clone(&self) -> Result<Self> {
60 Ok(SendTube(
61 #[allow(deprecated)]
62 self.0.try_clone()?,
63 ))
64 }
65
66 #[deprecated]
71 pub fn into_tube(self) -> Tube {
72 self.0
73 }
74}
75
76#[derive(Serialize, Deserialize)]
77#[serde(transparent)]
78pub struct RecvTube(pub(crate) Tube);
80
81#[allow(dead_code)]
82impl RecvTube {
83 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
84 self.0.recv()
85 }
86
87 pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
89 unimplemented!("To be removed/refactored upstream.");
90 }
91
92 #[deprecated]
97 pub fn into_tube(self) -> Tube {
98 self.0
99 }
100}
101
102impl ReadNotifier for RecvTube {
103 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
104 self.0.get_read_notifier()
105 }
106}
107
108#[sorted]
109#[derive(ThisError, Debug)]
110pub enum Error {
111 #[cfg(windows)]
112 #[error("attempt to duplicate descriptor via broker failed")]
113 BrokerDupDescriptor,
114 #[error("failed to clone transport: {0}")]
115 Clone(io::Error),
116 #[error("tube was disconnected")]
117 Disconnected,
118 #[error("failed to duplicate descriptor: {0}")]
119 DupDescriptor(io::Error),
120 #[cfg(windows)]
121 #[error("failed to flush named pipe: {0}")]
122 Flush(io::Error),
123 #[cfg(unix)]
124 #[error("byte framing mode is not supported")]
125 InvalidFramingMode,
126 #[error("failed to serialize/deserialize json from packet: {0}")]
127 Json(serde_json::Error),
128 #[error("cancelled a queued async operation")]
129 OperationCancelled,
130 #[error("failed to crate tube pair: {0}")]
131 Pair(io::Error),
132 #[cfg(any(windows, feature = "proto_tube"))]
133 #[error("encountered protobuf error: {0}")]
134 Proto(protobuf::Error),
135 #[error("failed to receive packet: {0}")]
136 Recv(io::Error),
137 #[error("attempted to receive too many file descriptors")]
138 RecvTooManyFds,
139 #[error("Received a message with a zero sized body. This should not happen.")]
140 RecvUnexpectedEmptyBody,
141 #[cfg(unix)]
142 #[error("failed to construct ScmSocket: {0}")]
143 ScmSocket(io::Error),
144 #[error("failed to send packet: {0}")]
145 Send(io::Error),
146 #[error("failed to write packet to intermediate buffer: {0}")]
147 SendIoBuf(io::Error),
148 #[error("attempted to send too many file descriptors")]
149 SendTooManyFds,
150 #[error("failed to set recv timeout: {0}")]
151 SetRecvTimeout(io::Error),
152 #[error("failed to set send timeout: {0}")]
153 SetSendTimeout(io::Error),
154}
155
156pub type Result<T> = std::result::Result<T, Error>;