devices/virtio/snd/vios_backend/
streams.rs1use std::collections::VecDeque;
6use std::sync::mpsc::channel;
7use std::sync::mpsc::Receiver;
8use std::sync::mpsc::Sender;
9use std::sync::Arc;
10use std::thread;
11use std::time::Duration;
12use std::time::Instant;
13
14use base::error;
15use base::set_rt_prio_limit;
16use base::set_rt_round_robin;
17use base::warn;
18use data_model::Le32;
19use serde::Deserialize;
20use serde::Serialize;
21use sync::Mutex;
22
23use super::Error as VioSError;
24use super::Result;
25use super::SoundError;
26use super::*;
27use crate::virtio::snd::common::from_virtio_frame_rate;
28use crate::virtio::snd::constants::*;
29use crate::virtio::snd::layout::*;
30use crate::virtio::DescriptorChain;
31use crate::virtio::Queue;
32
33pub enum StreamMsg {
35 SetParams(DescriptorChain, virtio_snd_pcm_set_params),
36 Prepare(DescriptorChain),
37 Start(DescriptorChain),
38 Stop(DescriptorChain),
39 Release(DescriptorChain),
40 Buffer(DescriptorChain),
41 Break,
42}
43
44#[derive(Clone, Serialize, Deserialize)]
45pub enum StreamState {
46 New,
47 ParamsSet,
48 Prepared,
49 Started,
50 Stopped,
51 Released,
52}
53
54pub struct Stream {
55 stream_id: u32,
56 receiver: Receiver<Box<StreamMsg>>,
57 vios_client: Arc<Mutex<VioSClient>>,
58 control_queue: Arc<Mutex<Queue>>,
59 io_queue: Arc<Mutex<Queue>>,
60 capture: bool,
61 current_state: StreamState,
62 period: Duration,
63 start_time: Instant,
64 next_buffer: Duration,
65 buffer_queue: VecDeque<DescriptorChain>,
66}
67
68#[derive(Clone, Serialize, Deserialize)]
69pub struct StreamSnapshot {
70 pub current_state: StreamState,
71 pub period: Duration,
72 pub next_buffer: Duration,
73}
74
75impl Stream {
76 pub fn try_new(
78 stream_id: u32,
79 vios_client: Arc<Mutex<VioSClient>>,
80 control_queue: Arc<Mutex<Queue>>,
81 io_queue: Arc<Mutex<Queue>>,
82 capture: bool,
83 stream_state: Option<StreamSnapshot>,
84 ) -> Result<StreamProxy> {
85 let (sender, receiver): (Sender<Box<StreamMsg>>, Receiver<Box<StreamMsg>>) = channel();
86 let thread = thread::Builder::new()
87 .name(format!("v_snd_stream:{stream_id}"))
88 .spawn(move || {
89 try_set_real_time_priority();
90 let (current_state, period, next_buffer) =
91 if let Some(stream_state) = stream_state.clone() {
92 (
93 stream_state.current_state,
94 stream_state.period,
95 stream_state.next_buffer,
96 )
97 } else {
98 (
99 StreamState::New,
100 Duration::from_millis(0),
101 Duration::from_millis(0),
102 )
103 };
104
105 let mut stream = Stream {
106 stream_id,
107 receiver,
108 vios_client: vios_client.clone(),
109 control_queue,
110 io_queue,
111 capture,
112 current_state,
113 period,
114 start_time: Instant::now(),
115 next_buffer,
116 buffer_queue: VecDeque::new(),
117 };
118
119 if let Some(stream_state) = stream_state {
120 if let Err(e) = vios_client
121 .lock()
122 .restore_stream(stream_id, stream_state.current_state)
123 {
124 error!("failed to restore stream params: {}", e);
125 };
126 }
127 if let Err(e) = stream.stream_loop() {
128 error!("virtio-snd: Error in stream {}: {}", stream_id, e);
129 }
130 let state = stream.current_state.clone();
131 StreamSnapshot {
132 current_state: state,
133 period: stream.period,
134 next_buffer: stream.next_buffer,
135 }
136 })
137 .map_err(SoundError::CreateThread)?;
138 Ok(StreamProxy {
139 sender,
140 thread: Some(thread),
141 })
142 }
143
144 fn stream_loop(&mut self) -> Result<()> {
145 loop {
146 if !self.recv_msg()? {
147 break;
148 }
149 self.maybe_process_queued_buffers()?;
150 }
151 Ok(())
152 }
153
154 fn recv_msg(&mut self) -> Result<bool> {
155 let msg = self.receiver.recv().map_err(SoundError::StreamThreadRecv)?;
156 let (code, desc, next_state) = match *msg {
157 StreamMsg::SetParams(desc, params) => {
158 let code = match self.vios_client.lock().set_stream_parameters_raw(params) {
159 Ok(()) => {
160 let frame_rate = from_virtio_frame_rate(params.rate).unwrap_or(0) as u64;
161 self.period = Duration::from_nanos(
162 (params.period_bytes.to_native() as u64 * 1_000_000_000u64)
163 / frame_rate
164 / params.channels as u64
165 / bytes_per_sample(params.format) as u64,
166 );
167 VIRTIO_SND_S_OK
168 }
169 Err(e) => {
170 error!(
171 "virtio-snd: Error setting parameters for stream {}: {}",
172 self.stream_id, e
173 );
174 vios_error_to_status_code(e)
175 }
176 };
177 (code, desc, StreamState::ParamsSet)
178 }
179 StreamMsg::Prepare(desc) => {
180 let code = match self.vios_client.lock().prepare_stream(self.stream_id) {
181 Ok(()) => VIRTIO_SND_S_OK,
182 Err(e) => {
183 error!(
184 "virtio-snd: Failed to prepare stream {}: {}",
185 self.stream_id, e
186 );
187 vios_error_to_status_code(e)
188 }
189 };
190 (code, desc, StreamState::Prepared)
191 }
192 StreamMsg::Start(desc) => {
193 let code = match self.vios_client.lock().start_stream(self.stream_id) {
194 Ok(()) => VIRTIO_SND_S_OK,
195 Err(e) => {
196 error!(
197 "virtio-snd: Failed to start stream {}: {}",
198 self.stream_id, e
199 );
200 vios_error_to_status_code(e)
201 }
202 };
203 self.start_time = Instant::now();
204 self.next_buffer = Duration::from_millis(0);
205 (code, desc, StreamState::Started)
206 }
207 StreamMsg::Stop(desc) => {
208 let code = match self.vios_client.lock().stop_stream(self.stream_id) {
209 Ok(()) => VIRTIO_SND_S_OK,
210 Err(e) => {
211 error!(
212 "virtio-snd: Failed to stop stream {}: {}",
213 self.stream_id, e
214 );
215 vios_error_to_status_code(e)
216 }
217 };
218 (code, desc, StreamState::Stopped)
219 }
220 StreamMsg::Release(desc) => {
221 let code = match self.vios_client.lock().release_stream(self.stream_id) {
222 Ok(()) => VIRTIO_SND_S_OK,
223 Err(e) => {
224 error!(
225 "virtio-snd: Failed to release stream {}: {}",
226 self.stream_id, e
227 );
228 vios_error_to_status_code(e)
229 }
230 };
231 (code, desc, StreamState::Released)
232 }
233 StreamMsg::Buffer(d) => {
234 self.buffer_queue.push_back(d);
241 return Ok(true);
243 }
244 StreamMsg::Break => {
245 return Ok(false);
246 }
247 };
248 reply_control_op_status(code, desc, &self.control_queue)?;
249 self.current_state = next_state;
250 Ok(true)
251 }
252
253 fn maybe_process_queued_buffers(&mut self) -> Result<()> {
254 match self.current_state {
255 StreamState::Started => {
256 while let Some(mut desc) = self.buffer_queue.pop_front() {
257 let reader = &mut desc.reader;
258 reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
261 let writer = &mut desc.writer;
262 let io_res = if self.capture {
263 let buffer_size =
264 writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>();
265 self.vios_client.lock().request_audio_data(
266 self.stream_id,
267 buffer_size,
268 |vslice| writer.write_from_volatile_slice(*vslice),
269 )
270 } else {
271 self.vios_client.lock().inject_audio_data(
272 self.stream_id,
273 reader.available_bytes(),
274 |vslice| reader.read_to_volatile_slice(vslice),
275 )
276 };
277 let (code, latency) = match io_res {
278 Ok((latency, _)) => (VIRTIO_SND_S_OK, latency),
279 Err(e) => {
280 error!(
281 "virtio-snd: Failed IO operation in stream {}: {}",
282 self.stream_id, e
283 );
284 (VIRTIO_SND_S_IO_ERR, 0)
285 }
286 };
287 if let Err(e) = writer.write_obj(virtio_snd_pcm_status {
288 status: Le32::from(code),
289 latency_bytes: Le32::from(latency),
290 }) {
291 error!(
292 "virtio-snd: Failed to write pcm status from stream {} thread: {}",
293 self.stream_id, e
294 );
295 }
296
297 self.next_buffer += self.period;
298 let elapsed = self.start_time.elapsed();
299 if elapsed < self.next_buffer {
300 std::thread::sleep(self.next_buffer - elapsed);
304 }
305 {
306 let mut io_queue_lock = self.io_queue.lock();
307 io_queue_lock.add_used(desc);
308 io_queue_lock.trigger_interrupt();
309 }
310 }
311 }
312 StreamState::Stopped | StreamState::Released => {
313 while let Some(desc) = self.buffer_queue.pop_front() {
320 reply_pcm_buffer_status(VIRTIO_SND_S_OK, 0, desc, &self.io_queue)?;
321 }
322 }
323 StreamState::Prepared => {} _ => {
325 if !self.buffer_queue.is_empty() {
326 warn!("virtio-snd: Buffers received while in unexpected state");
327 }
328 }
329 }
330 Ok(())
331 }
332}
333
334impl Drop for Stream {
335 fn drop(&mut self) {
336 let _ = self.vios_client.lock().stop_stream(self.stream_id);
339 let _ = self.vios_client.lock().release_stream(self.stream_id);
340
341 while let Some(desc) = self.buffer_queue.pop_front() {
343 if let Err(e) = reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, desc, &self.io_queue) {
344 error!(
345 "virtio-snd: Failed to reply buffer on stream {}: {}",
346 self.stream_id, e
347 );
348 }
349 }
350 }
351}
352
353pub struct StreamProxy {
355 sender: Sender<Box<StreamMsg>>,
356 thread: Option<thread::JoinHandle<StreamSnapshot>>,
357}
358
359impl StreamProxy {
360 pub fn msg_sender(&self) -> &Sender<Box<StreamMsg>> {
362 &self.sender
363 }
364
365 pub fn send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()> {
367 sender
368 .send(Box::new(msg))
369 .map_err(SoundError::StreamThreadSend)
370 }
371
372 pub fn send(&self, msg: StreamMsg) -> Result<()> {
374 Self::send_msg(&self.sender, msg)
375 }
376
377 pub fn stop_thread(mut self) -> StreamSnapshot {
378 self.stop_thread_inner().unwrap()
379 }
380
381 fn stop_thread_inner(&mut self) -> Option<StreamSnapshot> {
382 if let Some(th) = self.thread.take() {
383 if let Err(e) = self.send(StreamMsg::Break) {
384 error!(
385 "virtio-snd: Failed to send Break msg to stream thread: {}",
386 e
387 );
388 }
389 match th.join() {
390 Ok(state) => Some(state),
391 Err(e) => panic!("virtio-snd: Panic detected on stream thread: {e:?}"),
392 }
393 } else {
394 None
395 }
396 }
397}
398
399impl Drop for StreamProxy {
400 fn drop(&mut self) {
401 let _ = self.stop_thread_inner();
402 }
403}
404
405pub fn try_set_real_time_priority() {
408 const AUDIO_THREAD_RTPRIO: u16 = 10; if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
410 .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
411 {
412 warn!("Failed to set audio stream thread to real time: {}", e);
413 }
414}
415
416pub fn vios_error_to_status_code(e: VioSError) -> u32 {
418 match e {
419 VioSError::ServerIOError(_) => VIRTIO_SND_S_IO_ERR,
420 _ => VIRTIO_SND_S_NOT_SUPP,
421 }
422}
423
424pub fn reply_control_op_status(
426 code: u32,
427 mut desc: DescriptorChain,
428 queue: &Arc<Mutex<Queue>>,
429) -> Result<()> {
430 let writer = &mut desc.writer;
431 writer
432 .write_obj(virtio_snd_hdr {
433 code: Le32::from(code),
434 })
435 .map_err(SoundError::QueueIO)?;
436 {
437 let mut queue_lock = queue.lock();
438 queue_lock.add_used(desc);
439 queue_lock.trigger_interrupt();
440 }
441 Ok(())
442}
443
444pub fn reply_pcm_buffer_status(
446 status: u32,
447 latency_bytes: u32,
448 mut desc: DescriptorChain,
449 queue: &Arc<Mutex<Queue>>,
450) -> Result<()> {
451 let writer = &mut desc.writer;
452 if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
453 writer
454 .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
455 }
456 writer
457 .write_obj(virtio_snd_pcm_status {
458 status: Le32::from(status),
459 latency_bytes: Le32::from(latency_bytes),
460 })
461 .map_err(SoundError::QueueIO)?;
462 {
463 let mut queue_lock = queue.lock();
464 queue_lock.add_used(desc);
465 queue_lock.trigger_interrupt();
466 }
467 Ok(())
468}
469
470fn bytes_per_sample(format: u8) -> usize {
471 match format {
472 VIRTIO_SND_PCM_FMT_IMA_ADPCM => 1usize,
473 VIRTIO_SND_PCM_FMT_MU_LAW => 1usize,
474 VIRTIO_SND_PCM_FMT_A_LAW => 1usize,
475 VIRTIO_SND_PCM_FMT_S8 => 1usize,
476 VIRTIO_SND_PCM_FMT_U8 => 1usize,
477 VIRTIO_SND_PCM_FMT_S16 => 2usize,
478 VIRTIO_SND_PCM_FMT_U16 => 2usize,
479 VIRTIO_SND_PCM_FMT_S32 => 4usize,
480 VIRTIO_SND_PCM_FMT_U32 => 4usize,
481 VIRTIO_SND_PCM_FMT_FLOAT => 4usize,
482 VIRTIO_SND_PCM_FMT_FLOAT64 => 8usize,
483 _ => {
498 warn!(
504 "Unknown sample size for format {}, depending on sound server timing instead.",
505 format
506 );
507 1000usize
508 }
509 }
510}