devices/virtio/snd/vios_backend/
streams.rs

1// Copyright 2021 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::collections::VecDeque;
6use std::sync::mpsc::channel;
7use std::sync::mpsc::Receiver;
8use std::sync::mpsc::Sender;
9use std::sync::Arc;
10use std::thread;
11use std::time::Duration;
12use std::time::Instant;
13
14use base::error;
15use base::set_rt_prio_limit;
16use base::set_rt_round_robin;
17use base::warn;
18use data_model::Le32;
19use serde::Deserialize;
20use serde::Serialize;
21use sync::Mutex;
22
23use super::Error as VioSError;
24use super::Result;
25use super::SoundError;
26use super::*;
27use crate::virtio::snd::common::from_virtio_frame_rate;
28use crate::virtio::snd::constants::*;
29use crate::virtio::snd::layout::*;
30use crate::virtio::DescriptorChain;
31use crate::virtio::Queue;
32
33/// Messages that the worker can send to the stream (thread).
34pub enum StreamMsg {
35    SetParams(DescriptorChain, virtio_snd_pcm_set_params),
36    Prepare(DescriptorChain),
37    Start(DescriptorChain),
38    Stop(DescriptorChain),
39    Release(DescriptorChain),
40    Buffer(DescriptorChain),
41    Break,
42}
43
44#[derive(Clone, Serialize, Deserialize)]
45pub enum StreamState {
46    New,
47    ParamsSet,
48    Prepared,
49    Started,
50    Stopped,
51    Released,
52}
53
54pub struct Stream {
55    stream_id: u32,
56    receiver: Receiver<Box<StreamMsg>>,
57    vios_client: Arc<Mutex<VioSClient>>,
58    control_queue: Arc<Mutex<Queue>>,
59    io_queue: Arc<Mutex<Queue>>,
60    capture: bool,
61    current_state: StreamState,
62    period: Duration,
63    start_time: Instant,
64    next_buffer: Duration,
65    buffer_queue: VecDeque<DescriptorChain>,
66}
67
68#[derive(Clone, Serialize, Deserialize)]
69pub struct StreamSnapshot {
70    pub current_state: StreamState,
71    pub period: Duration,
72    pub next_buffer: Duration,
73}
74
75impl Stream {
76    /// Start a new stream thread and return its handler.
77    pub fn try_new(
78        stream_id: u32,
79        vios_client: Arc<Mutex<VioSClient>>,
80        control_queue: Arc<Mutex<Queue>>,
81        io_queue: Arc<Mutex<Queue>>,
82        capture: bool,
83        stream_state: Option<StreamSnapshot>,
84    ) -> Result<StreamProxy> {
85        let (sender, receiver): (Sender<Box<StreamMsg>>, Receiver<Box<StreamMsg>>) = channel();
86        let thread = thread::Builder::new()
87            .name(format!("v_snd_stream:{stream_id}"))
88            .spawn(move || {
89                try_set_real_time_priority();
90                let (current_state, period, next_buffer) =
91                    if let Some(stream_state) = stream_state.clone() {
92                        (
93                            stream_state.current_state,
94                            stream_state.period,
95                            stream_state.next_buffer,
96                        )
97                    } else {
98                        (
99                            StreamState::New,
100                            Duration::from_millis(0),
101                            Duration::from_millis(0),
102                        )
103                    };
104
105                let mut stream = Stream {
106                    stream_id,
107                    receiver,
108                    vios_client: vios_client.clone(),
109                    control_queue,
110                    io_queue,
111                    capture,
112                    current_state,
113                    period,
114                    start_time: Instant::now(),
115                    next_buffer,
116                    buffer_queue: VecDeque::new(),
117                };
118
119                if let Some(stream_state) = stream_state {
120                    if let Err(e) = vios_client
121                        .lock()
122                        .restore_stream(stream_id, stream_state.current_state)
123                    {
124                        error!("failed to restore stream params: {}", e);
125                    };
126                }
127                if let Err(e) = stream.stream_loop() {
128                    error!("virtio-snd: Error in stream {}: {}", stream_id, e);
129                }
130                let state = stream.current_state.clone();
131                StreamSnapshot {
132                    current_state: state,
133                    period: stream.period,
134                    next_buffer: stream.next_buffer,
135                }
136            })
137            .map_err(SoundError::CreateThread)?;
138        Ok(StreamProxy {
139            sender,
140            thread: Some(thread),
141        })
142    }
143
144    fn stream_loop(&mut self) -> Result<()> {
145        loop {
146            if !self.recv_msg()? {
147                break;
148            }
149            self.maybe_process_queued_buffers()?;
150        }
151        Ok(())
152    }
153
154    fn recv_msg(&mut self) -> Result<bool> {
155        let msg = self.receiver.recv().map_err(SoundError::StreamThreadRecv)?;
156        let (code, desc, next_state) = match *msg {
157            StreamMsg::SetParams(desc, params) => {
158                let code = match self.vios_client.lock().set_stream_parameters_raw(params) {
159                    Ok(()) => {
160                        let frame_rate = from_virtio_frame_rate(params.rate).unwrap_or(0) as u64;
161                        self.period = Duration::from_nanos(
162                            (params.period_bytes.to_native() as u64 * 1_000_000_000u64)
163                                / frame_rate
164                                / params.channels as u64
165                                / bytes_per_sample(params.format) as u64,
166                        );
167                        VIRTIO_SND_S_OK
168                    }
169                    Err(e) => {
170                        error!(
171                            "virtio-snd: Error setting parameters for stream {}: {}",
172                            self.stream_id, e
173                        );
174                        vios_error_to_status_code(e)
175                    }
176                };
177                (code, desc, StreamState::ParamsSet)
178            }
179            StreamMsg::Prepare(desc) => {
180                let code = match self.vios_client.lock().prepare_stream(self.stream_id) {
181                    Ok(()) => VIRTIO_SND_S_OK,
182                    Err(e) => {
183                        error!(
184                            "virtio-snd: Failed to prepare stream {}: {}",
185                            self.stream_id, e
186                        );
187                        vios_error_to_status_code(e)
188                    }
189                };
190                (code, desc, StreamState::Prepared)
191            }
192            StreamMsg::Start(desc) => {
193                let code = match self.vios_client.lock().start_stream(self.stream_id) {
194                    Ok(()) => VIRTIO_SND_S_OK,
195                    Err(e) => {
196                        error!(
197                            "virtio-snd: Failed to start stream {}: {}",
198                            self.stream_id, e
199                        );
200                        vios_error_to_status_code(e)
201                    }
202                };
203                self.start_time = Instant::now();
204                self.next_buffer = Duration::from_millis(0);
205                (code, desc, StreamState::Started)
206            }
207            StreamMsg::Stop(desc) => {
208                let code = match self.vios_client.lock().stop_stream(self.stream_id) {
209                    Ok(()) => VIRTIO_SND_S_OK,
210                    Err(e) => {
211                        error!(
212                            "virtio-snd: Failed to stop stream {}: {}",
213                            self.stream_id, e
214                        );
215                        vios_error_to_status_code(e)
216                    }
217                };
218                (code, desc, StreamState::Stopped)
219            }
220            StreamMsg::Release(desc) => {
221                let code = match self.vios_client.lock().release_stream(self.stream_id) {
222                    Ok(()) => VIRTIO_SND_S_OK,
223                    Err(e) => {
224                        error!(
225                            "virtio-snd: Failed to release stream {}: {}",
226                            self.stream_id, e
227                        );
228                        vios_error_to_status_code(e)
229                    }
230                };
231                (code, desc, StreamState::Released)
232            }
233            StreamMsg::Buffer(d) => {
234                // Buffers may arrive while in several states:
235                // - Prepared: Buffer should be queued and played when start cmd arrives
236                // - Started: Buffer should be processed immediately
237                // - Stopped: Buffer should be returned to the guest immediately
238                // Because we may need to wait to process the buffer, we always queue it and
239                // decide what to do with queued buffers after every message.
240                self.buffer_queue.push_back(d);
241                // return here to avoid replying on control queue below
242                return Ok(true);
243            }
244            StreamMsg::Break => {
245                return Ok(false);
246            }
247        };
248        reply_control_op_status(code, desc, &self.control_queue)?;
249        self.current_state = next_state;
250        Ok(true)
251    }
252
253    fn maybe_process_queued_buffers(&mut self) -> Result<()> {
254        match self.current_state {
255            StreamState::Started => {
256                while let Some(mut desc) = self.buffer_queue.pop_front() {
257                    let reader = &mut desc.reader;
258                    // Ignore the first buffer, it was already read by the time this thread
259                    // receives the descriptor
260                    reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
261                    let writer = &mut desc.writer;
262                    let io_res = if self.capture {
263                        let buffer_size =
264                            writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>();
265                        self.vios_client.lock().request_audio_data(
266                            self.stream_id,
267                            buffer_size,
268                            |vslice| writer.write_from_volatile_slice(*vslice),
269                        )
270                    } else {
271                        self.vios_client.lock().inject_audio_data(
272                            self.stream_id,
273                            reader.available_bytes(),
274                            |vslice| reader.read_to_volatile_slice(vslice),
275                        )
276                    };
277                    let (code, latency) = match io_res {
278                        Ok((latency, _)) => (VIRTIO_SND_S_OK, latency),
279                        Err(e) => {
280                            error!(
281                                "virtio-snd: Failed IO operation in stream {}: {}",
282                                self.stream_id, e
283                            );
284                            (VIRTIO_SND_S_IO_ERR, 0)
285                        }
286                    };
287                    if let Err(e) = writer.write_obj(virtio_snd_pcm_status {
288                        status: Le32::from(code),
289                        latency_bytes: Le32::from(latency),
290                    }) {
291                        error!(
292                            "virtio-snd: Failed to write pcm status from stream {} thread: {}",
293                            self.stream_id, e
294                        );
295                    }
296
297                    self.next_buffer += self.period;
298                    let elapsed = self.start_time.elapsed();
299                    if elapsed < self.next_buffer {
300                        // Completing an IO request can be considered an elapsed period
301                        // notification by the driver, so we must wait the right amount of time to
302                        // release the buffer if the sound server client returned too soon.
303                        std::thread::sleep(self.next_buffer - elapsed);
304                    }
305                    {
306                        let mut io_queue_lock = self.io_queue.lock();
307                        io_queue_lock.add_used(desc);
308                        io_queue_lock.trigger_interrupt();
309                    }
310                }
311            }
312            StreamState::Stopped | StreamState::Released => {
313                // For some reason playback buffers can arrive after stop and release (maybe because
314                // buffer-ready notifications arrive over eventfds and those are processed in
315                // random order?). The spec requires the device to not confirm the release of a
316                // stream until all IO buffers have been released, but that's impossible to
317                // guarantee if a buffer arrives after release is requested. Luckily it seems to
318                // work fine if the buffer is released after the release command is completed.
319                while let Some(desc) = self.buffer_queue.pop_front() {
320                    reply_pcm_buffer_status(VIRTIO_SND_S_OK, 0, desc, &self.io_queue)?;
321                }
322            }
323            StreamState::Prepared => {} // Do nothing, any buffers will be processed after start
324            _ => {
325                if !self.buffer_queue.is_empty() {
326                    warn!("virtio-snd: Buffers received while in unexpected state");
327                }
328            }
329        }
330        Ok(())
331    }
332}
333
334impl Drop for Stream {
335    fn drop(&mut self) {
336        // Try to stop and release the stream in case it was playing, these operations will fail if
337        // the stream is already released, just ignore that failure
338        let _ = self.vios_client.lock().stop_stream(self.stream_id);
339        let _ = self.vios_client.lock().release_stream(self.stream_id);
340
341        // Also release any pending buffer
342        while let Some(desc) = self.buffer_queue.pop_front() {
343            if let Err(e) = reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, desc, &self.io_queue) {
344                error!(
345                    "virtio-snd: Failed to reply buffer on stream {}: {}",
346                    self.stream_id, e
347                );
348            }
349        }
350    }
351}
352
353/// Basically a proxy to the thread handling a particular stream.
354pub struct StreamProxy {
355    sender: Sender<Box<StreamMsg>>,
356    thread: Option<thread::JoinHandle<StreamSnapshot>>,
357}
358
359impl StreamProxy {
360    /// Access the underlying sender to clone it or send messages
361    pub fn msg_sender(&self) -> &Sender<Box<StreamMsg>> {
362        &self.sender
363    }
364
365    /// Send a message to the stream thread on the other side of this sender
366    pub fn send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()> {
367        sender
368            .send(Box::new(msg))
369            .map_err(SoundError::StreamThreadSend)
370    }
371
372    /// Convenience function to send a message to this stream's thread
373    pub fn send(&self, msg: StreamMsg) -> Result<()> {
374        Self::send_msg(&self.sender, msg)
375    }
376
377    pub fn stop_thread(mut self) -> StreamSnapshot {
378        self.stop_thread_inner().unwrap()
379    }
380
381    fn stop_thread_inner(&mut self) -> Option<StreamSnapshot> {
382        if let Some(th) = self.thread.take() {
383            if let Err(e) = self.send(StreamMsg::Break) {
384                error!(
385                    "virtio-snd: Failed to send Break msg to stream thread: {}",
386                    e
387                );
388            }
389            match th.join() {
390                Ok(state) => Some(state),
391                Err(e) => panic!("virtio-snd: Panic detected on stream thread: {e:?}"),
392            }
393        } else {
394            None
395        }
396    }
397}
398
399impl Drop for StreamProxy {
400    fn drop(&mut self) {
401        let _ = self.stop_thread_inner();
402    }
403}
404
405/// Attempts to set the current thread's priority to a value hight enough to handle audio IO. This
406/// may fail due to insuficient permissions.
407pub fn try_set_real_time_priority() {
408    const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
409    if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
410        .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
411    {
412        warn!("Failed to set audio stream thread to real time: {}", e);
413    }
414}
415
416/// Gets the appropriate virtio-snd error to return to the driver from a VioSError.
417pub fn vios_error_to_status_code(e: VioSError) -> u32 {
418    match e {
419        VioSError::ServerIOError(_) => VIRTIO_SND_S_IO_ERR,
420        _ => VIRTIO_SND_S_NOT_SUPP,
421    }
422}
423
424/// Encapsulates sending the virtio_snd_hdr struct back to the driver.
425pub fn reply_control_op_status(
426    code: u32,
427    mut desc: DescriptorChain,
428    queue: &Arc<Mutex<Queue>>,
429) -> Result<()> {
430    let writer = &mut desc.writer;
431    writer
432        .write_obj(virtio_snd_hdr {
433            code: Le32::from(code),
434        })
435        .map_err(SoundError::QueueIO)?;
436    {
437        let mut queue_lock = queue.lock();
438        queue_lock.add_used(desc);
439        queue_lock.trigger_interrupt();
440    }
441    Ok(())
442}
443
444/// Encapsulates sending the virtio_snd_pcm_status struct back to the driver.
445pub fn reply_pcm_buffer_status(
446    status: u32,
447    latency_bytes: u32,
448    mut desc: DescriptorChain,
449    queue: &Arc<Mutex<Queue>>,
450) -> Result<()> {
451    let writer = &mut desc.writer;
452    if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
453        writer
454            .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
455    }
456    writer
457        .write_obj(virtio_snd_pcm_status {
458            status: Le32::from(status),
459            latency_bytes: Le32::from(latency_bytes),
460        })
461        .map_err(SoundError::QueueIO)?;
462    {
463        let mut queue_lock = queue.lock();
464        queue_lock.add_used(desc);
465        queue_lock.trigger_interrupt();
466    }
467    Ok(())
468}
469
470fn bytes_per_sample(format: u8) -> usize {
471    match format {
472        VIRTIO_SND_PCM_FMT_IMA_ADPCM => 1usize,
473        VIRTIO_SND_PCM_FMT_MU_LAW => 1usize,
474        VIRTIO_SND_PCM_FMT_A_LAW => 1usize,
475        VIRTIO_SND_PCM_FMT_S8 => 1usize,
476        VIRTIO_SND_PCM_FMT_U8 => 1usize,
477        VIRTIO_SND_PCM_FMT_S16 => 2usize,
478        VIRTIO_SND_PCM_FMT_U16 => 2usize,
479        VIRTIO_SND_PCM_FMT_S32 => 4usize,
480        VIRTIO_SND_PCM_FMT_U32 => 4usize,
481        VIRTIO_SND_PCM_FMT_FLOAT => 4usize,
482        VIRTIO_SND_PCM_FMT_FLOAT64 => 8usize,
483        // VIRTIO_SND_PCM_FMT_DSD_U8
484        // VIRTIO_SND_PCM_FMT_DSD_U16
485        // VIRTIO_SND_PCM_FMT_DSD_U32
486        // VIRTIO_SND_PCM_FMT_IEC958_SUBFRAME
487        // VIRTIO_SND_PCM_FMT_S18_3
488        // VIRTIO_SND_PCM_FMT_U18_3
489        // VIRTIO_SND_PCM_FMT_S20_3
490        // VIRTIO_SND_PCM_FMT_U20_3
491        // VIRTIO_SND_PCM_FMT_S24_3
492        // VIRTIO_SND_PCM_FMT_U24_3
493        // VIRTIO_SND_PCM_FMT_S20
494        // VIRTIO_SND_PCM_FMT_U20
495        // VIRTIO_SND_PCM_FMT_S24
496        // VIRTIO_SND_PCM_FMT_U24
497        _ => {
498            // Some of these formats are not consistently stored in a particular size (24bits is
499            // sometimes stored in a 32bit word) while others are of variable size.
500            // The size per sample estimated here is designed to greatly underestimate the time it
501            // takes to play a buffer and depend instead on timings provided by the sound server if
502            // it supports these formats.
503            warn!(
504                "Unknown sample size for format {}, depending on sound server timing instead.",
505                format
506            );
507            1000usize
508        }
509    }
510}