devices/virtio/console/sys/
linux.rs

1// Copyright 2022 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::collections::VecDeque;
6use std::io;
7use std::sync::Arc;
8use std::time::Duration;
9use std::time::Instant;
10
11use anyhow::Context;
12use base::error;
13use base::Event;
14use base::EventToken;
15use base::FileSync;
16use base::RawDescriptor;
17use base::WaitContext;
18use base::WorkerThread;
19use sync::Mutex;
20
21use crate::serial::sys::InStreamType;
22use crate::serial_device::SerialInput;
23use crate::serial_device::SerialOptions;
24use crate::virtio::console::device::ConsoleDevice;
25use crate::virtio::console::port::ConsolePort;
26use crate::virtio::console::port::ConsolePortInfo;
27use crate::virtio::console::Console;
28use crate::virtio::ProtectionType;
29use crate::SerialDevice;
30
31impl SerialDevice for Console {
32    fn new(
33        protection_type: ProtectionType,
34        _event: Event,
35        input: Option<Box<dyn SerialInput>>,
36        output: Option<Box<dyn io::Write + Send>>,
37        // TODO(b/171331752): connect filesync functionality.
38        _sync: Option<Box<dyn FileSync + Send>>,
39        options: SerialOptions,
40        keep_rds: Vec<RawDescriptor>,
41    ) -> Console {
42        Console::new(
43            protection_type,
44            input,
45            output,
46            keep_rds,
47            options.pci_address,
48            options.max_queue_sizes,
49        )
50    }
51}
52
53impl SerialDevice for ConsoleDevice {
54    fn new(
55        protection_type: ProtectionType,
56        _event: Event,
57        input: Option<Box<dyn SerialInput>>,
58        output: Option<Box<dyn io::Write + Send>>,
59        _sync: Option<Box<dyn FileSync + Send>>,
60        options: SerialOptions,
61        keep_rds: Vec<RawDescriptor>,
62    ) -> ConsoleDevice {
63        let info = ConsolePortInfo {
64            name: options.name,
65            console: options.console,
66        };
67        let port = ConsolePort::new(input, output, Some(info), keep_rds);
68        ConsoleDevice::new_single_port(protection_type, port)
69    }
70}
71
72impl SerialDevice for ConsolePort {
73    fn new(
74        _protection_type: ProtectionType,
75        _event: Event,
76        input: Option<Box<dyn SerialInput>>,
77        output: Option<Box<dyn io::Write + Send>>,
78        // TODO(b/171331752): connect filesync functionality.
79        _sync: Option<Box<dyn FileSync + Send>>,
80        options: SerialOptions,
81        keep_rds: Vec<RawDescriptor>,
82    ) -> ConsolePort {
83        let info = ConsolePortInfo {
84            name: options.name,
85            console: options.console,
86        };
87        ConsolePort::new(input, output, Some(info), keep_rds)
88    }
89}
90
91/// Starts a thread that reads input and sends the input back via the provided buffer.
92///
93/// The caller should listen on `in_avail_evt` for events. When `in_avail_evt` signals that data
94/// is available, the caller should lock `input_buffer` and read data out of the inner
95/// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed.
96///
97/// # Arguments
98///
99/// * `input` - Data source that the reader thread will wait on to send data back to the buffer
100/// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer
101pub(in crate::virtio::console) fn spawn_input_thread(
102    mut input: InStreamType,
103    in_avail_evt: Event,
104    input_buffer: Arc<Mutex<VecDeque<u8>>>,
105) -> WorkerThread<InStreamType> {
106    WorkerThread::start("v_console_input", move |kill_evt| {
107        // If there is already data, signal immediately.
108        if !input_buffer.lock().is_empty() {
109            in_avail_evt.signal().unwrap();
110        }
111        if let Err(e) = read_input(&mut input, &in_avail_evt, input_buffer, kill_evt) {
112            error!("console input thread exited with error: {:#}", e);
113        }
114        input
115    })
116}
117
118fn read_input(
119    input: &mut InStreamType,
120    thread_in_avail_evt: &Event,
121    buffer: Arc<Mutex<VecDeque<u8>>>,
122    kill_evt: Event,
123) -> anyhow::Result<()> {
124    #[derive(EventToken)]
125    enum Token {
126        ConsoleEvent,
127        Kill,
128    }
129
130    let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
131        (&kill_evt, Token::Kill),
132        (input.get_read_notifier(), Token::ConsoleEvent),
133    ])
134    .context("failed creating WaitContext")?;
135
136    let mut kill_timeout = None;
137    let mut rx_buf = [0u8; 1 << 12];
138    'wait: loop {
139        let events = wait_ctx.wait().context("Failed to wait for events")?;
140        for event in events.iter() {
141            match event.token {
142                Token::Kill => {
143                    // Ignore the kill event until there are no other events to process so that
144                    // we drain `input` as much as possible. The next `wait_ctx.wait()` call will
145                    // immediately re-entry this case since we don't call `kill_evt.wait()`.
146                    if events.iter().all(|e| matches!(e.token, Token::Kill)) {
147                        break 'wait;
148                    }
149                    const TIMEOUT_DURATION: Duration = Duration::from_millis(500);
150                    match kill_timeout {
151                        None => {
152                            kill_timeout = Some(Instant::now() + TIMEOUT_DURATION);
153                        }
154                        Some(t) => {
155                            if Instant::now() >= t {
156                                error!(
157                                    "failed to drain console input within {:?}, giving up",
158                                    TIMEOUT_DURATION
159                                );
160                                break 'wait;
161                            }
162                        }
163                    }
164                }
165                Token::ConsoleEvent => {
166                    match input.read(&mut rx_buf) {
167                        Ok(0) => break 'wait, // Assume the stream of input has ended.
168                        Ok(size) => {
169                            buffer.lock().extend(&rx_buf[0..size]);
170                            thread_in_avail_evt.signal().unwrap();
171                        }
172                        // Being interrupted is not an error, but everything else is.
173                        Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
174                        Err(e) => {
175                            return Err(e).context("failed to read console input");
176                        }
177                    }
178                }
179            }
180        }
181    }
182
183    Ok(())
184}