1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright 2021 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use std::io;
use std::time::Duration;

use remain::sorted;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;
use thiserror::Error as ThisError;

pub use crate::sys::tube::*;

impl Tube {
    /// Given a Tube end, creates two new ends, one each for sending and receiving.
    pub fn split_to_send_recv(self) -> Result<(SendTube, RecvTube)> {
        // Safe because receiving isn't allowd on this end.
        #[allow(deprecated)]
        let send_end = self.try_clone()?;

        Ok((SendTube(send_end), RecvTube(self)))
    }

    /// Creates a Send/Recv pair of Tubes.
    pub fn directional_pair() -> Result<(SendTube, RecvTube)> {
        let (t1, t2) = Self::pair()?;
        Ok((SendTube(t1), RecvTube(t2)))
    }

    pub fn try_clone_send_tube(&self) -> Result<SendTube> {
        // Safe because receiving is only allowed on original Tube.
        #[allow(deprecated)]
        let send_end = self.try_clone()?;
        Ok(SendTube(send_end))
    }
}

use crate::AsRawDescriptor;
use crate::ReadNotifier;

#[derive(Serialize, Deserialize)]
#[serde(transparent)]
/// A Tube end which can only send messages. Cloneable.
pub struct SendTube(pub(crate) Tube);

#[allow(dead_code)]
impl SendTube {
    /// TODO(b/145998747, b/184398671): this method should be removed.
    pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
        unimplemented!("To be removed/refactored upstream.");
    }

    pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
        self.0.send(msg)
    }

    pub fn try_clone(&self) -> Result<Self> {
        Ok(SendTube(
            #[allow(deprecated)]
            self.0.try_clone()?,
        ))
    }

    /// Never call this function, it is for use by cros_async to provide
    /// directional wrapper types only. Using it in any other context may
    /// violate concurrency assumptions. (Type splitting across crates has put
    /// us in a situation where we can't use Rust privacy to enforce this.)
    #[deprecated]
    pub fn into_tube(self) -> Tube {
        self.0
    }
}

#[derive(Serialize, Deserialize)]
#[serde(transparent)]
/// A Tube end which can only recv messages.
pub struct RecvTube(pub(crate) Tube);

#[allow(dead_code)]
impl RecvTube {
    pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
        self.0.recv()
    }

    /// TODO(b/145998747, b/184398671): this method should be removed.
    pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
        unimplemented!("To be removed/refactored upstream.");
    }

    /// Never call this function, it is for use by cros_async to provide
    /// directional wrapper types only. Using it in any other context may
    /// violate concurrency assumptions. (Type splitting across crates has put
    /// us in a situation where we can't use Rust privacy to enforce this.)
    #[deprecated]
    pub fn into_tube(self) -> Tube {
        self.0
    }
}

impl ReadNotifier for RecvTube {
    fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
        self.0.get_read_notifier()
    }
}

#[sorted]
#[derive(ThisError, Debug)]
pub enum Error {
    #[cfg(windows)]
    #[error("attempt to duplicate descriptor via broker failed")]
    BrokerDupDescriptor,
    #[error("failed to clone transport: {0}")]
    Clone(io::Error),
    #[error("tube was disconnected")]
    Disconnected,
    #[error("failed to duplicate descriptor: {0}")]
    DupDescriptor(io::Error),
    #[cfg(windows)]
    #[error("failed to flush named pipe: {0}")]
    Flush(io::Error),
    #[cfg(unix)]
    #[error("byte framing mode is not supported")]
    InvalidFramingMode,
    #[error("failed to serialize/deserialize json from packet: {0}")]
    Json(serde_json::Error),
    #[error("cancelled a queued async operation")]
    OperationCancelled,
    #[error("failed to crate tube pair: {0}")]
    Pair(io::Error),
    #[cfg(any(windows, feature = "proto_tube"))]
    #[error("encountered protobuf error: {0}")]
    Proto(protobuf::Error),
    #[error("failed to receive packet: {0}")]
    Recv(io::Error),
    #[error("attempted to receive too many file descriptors")]
    RecvTooManyFds,
    #[error("Received a message with a zero sized body. This should not happen.")]
    RecvUnexpectedEmptyBody,
    #[cfg(unix)]
    #[error("failed to construct ScmSocket: {0}")]
    ScmSocket(io::Error),
    #[error("failed to send packet: {0}")]
    Send(io::Error),
    #[error("failed to write packet to intermediate buffer: {0}")]
    SendIoBuf(io::Error),
    #[error("attempted to send too many file descriptors")]
    SendTooManyFds,
    #[error("failed to set recv timeout: {0}")]
    SetRecvTimeout(io::Error),
    #[error("failed to set send timeout: {0}")]
    SetSendTimeout(io::Error),
}

pub type Result<T> = std::result::Result<T, Error>;