1use std::io::Read;
6use std::sync::mpsc::Sender;
7use std::sync::Arc;
8use std::thread;
9
10use base::error;
11use base::warn;
12use base::Event;
13use base::EventToken;
14use base::WaitContext;
15use data_model::Le32;
16use sync::Mutex;
17use zerocopy::Immutable;
18use zerocopy::IntoBytes;
19
20use super::super::constants::*;
21use super::super::layout::*;
22use super::streams::*;
23use super::Result;
24use super::SoundError;
25use super::*;
26use crate::virtio::DescriptorChain;
27use crate::virtio::Queue;
28
29pub struct Worker {
30 pub control_queue: Arc<Mutex<Queue>>,
32 pub event_queue: Option<Queue>,
33 vios_client: Arc<Mutex<VioSClient>>,
34 streams: Vec<StreamProxy>,
35 pub tx_queue: Arc<Mutex<Queue>>,
36 pub rx_queue: Arc<Mutex<Queue>>,
37 io_thread: Option<thread::JoinHandle<Result<()>>>,
38 io_kill: Event,
39 pub saved_stream_state: Vec<StreamSnapshot>,
43}
44
45impl Worker {
46 pub fn try_new(
48 vios_client: Arc<Mutex<VioSClient>>,
49 control_queue: Arc<Mutex<Queue>>,
50 event_queue: Queue,
51 tx_queue: Arc<Mutex<Queue>>,
52 rx_queue: Arc<Mutex<Queue>>,
53 saved_stream_state: Vec<StreamSnapshot>,
54 ) -> Result<Worker> {
55 let num_streams = vios_client.lock().num_streams();
56 let mut streams: Vec<StreamProxy> = Vec::with_capacity(num_streams as usize);
57 {
58 for stream_id in 0..num_streams {
59 let capture = vios_client
60 .lock()
61 .stream_info(stream_id)
62 .map(|i| i.direction == VIRTIO_SND_D_INPUT)
63 .unwrap_or(false);
64 let io_queue = if capture { &rx_queue } else { &tx_queue };
65 streams.push(Stream::try_new(
66 stream_id,
67 vios_client.clone(),
68 control_queue.clone(),
69 io_queue.clone(),
70 capture,
71 saved_stream_state.get(stream_id as usize).cloned(),
72 )?);
73 }
74 }
75 let (self_kill_io, kill_io) = Event::new()
76 .and_then(|e| Ok((e.try_clone()?, e)))
77 .map_err(SoundError::CreateEvent)?;
78
79 let senders: Vec<Sender<Box<StreamMsg>>> =
80 streams.iter().map(|sp| sp.msg_sender().clone()).collect();
81 let tx_queue_thread = tx_queue.clone();
82 let rx_queue_thread = rx_queue.clone();
83 let io_thread = thread::Builder::new()
84 .name("v_snd_io".to_string())
85 .spawn(move || {
86 try_set_real_time_priority();
87
88 io_loop(tx_queue_thread, rx_queue_thread, senders, kill_io)
89 })
90 .map_err(SoundError::CreateThread)?;
91 Ok(Worker {
92 control_queue,
93 event_queue: Some(event_queue),
94 vios_client,
95 streams,
96 tx_queue,
97 rx_queue,
98 io_thread: Some(io_thread),
99 io_kill: self_kill_io,
100 saved_stream_state: Vec::new(),
101 })
102 }
103
104 pub fn control_loop(&mut self, kill_evt: Event) -> Result<()> {
107 let event_notifier = self
108 .vios_client
109 .lock()
110 .get_event_notifier()
111 .map_err(SoundError::ClientEventNotifier)?;
112 #[derive(EventToken)]
113 enum Token {
114 ControlQAvailable,
115 EventQAvailable,
116 EventTriggered,
117 Kill,
118 }
119 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
120 (self.control_queue.lock().event(), Token::ControlQAvailable),
121 (
122 self.event_queue.as_ref().expect("queue missing").event(),
123 Token::EventQAvailable,
124 ),
125 (&event_notifier, Token::EventTriggered),
126 (&kill_evt, Token::Kill),
127 ])
128 .map_err(SoundError::WaitCtx)?;
129
130 let mut event_queue = self.event_queue.take().expect("event_queue missing");
131 'wait: loop {
132 let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
133
134 for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
135 match wait_evt.token {
136 Token::ControlQAvailable => {
137 self.control_queue
138 .lock()
139 .event()
140 .wait()
141 .map_err(SoundError::QueueEvt)?;
142 self.process_controlq_buffers()?;
143 }
144 Token::EventQAvailable => {
145 event_queue.event().wait().map_err(SoundError::QueueEvt)?;
149 }
150 Token::EventTriggered => {
151 event_notifier.wait().map_err(SoundError::QueueEvt)?;
152 self.process_event_triggered(&mut event_queue)?;
153 }
154 Token::Kill => {
155 let _ = kill_evt.wait();
156 break 'wait;
157 }
158 }
159 }
160 }
161 self.saved_stream_state = self
162 .streams
163 .drain(..)
164 .map(|stream| stream.stop_thread())
165 .collect();
166 self.event_queue = Some(event_queue);
167 Ok(())
168 }
169
170 fn stop_io_thread(&mut self) {
171 if let Err(e) = self.io_kill.signal() {
172 error!(
173 "virtio-snd: Failed to send Break msg to stream thread: {}",
174 e
175 );
176 }
177 if let Some(th) = self.io_thread.take() {
178 match th.join() {
179 Err(e) => {
180 error!("virtio-snd: Panic detected on stream thread: {:?}", e);
181 }
182 Ok(r) => {
183 if let Err(e) = r {
184 error!("virtio-snd: IO thread exited with and error: {}", e);
185 }
186 }
187 }
188 }
189 }
190
191 fn process_controlq_buffers(&mut self) -> Result<()> {
194 while let Some(mut avail_desc) = lock_pop_unlock(&self.control_queue) {
195 let reader = &mut avail_desc.reader;
196 let available_bytes = reader.available_bytes();
197 let Ok(hdr) = reader.peek_obj::<virtio_snd_hdr>() else {
198 error!(
199 "virtio-snd: Message received on control queue is too small: {}",
200 available_bytes
201 );
202 return reply_control_op_status(
203 VIRTIO_SND_S_BAD_MSG,
204 avail_desc,
205 &self.control_queue,
206 );
207 };
208 let mut read_buf = vec![0u8; available_bytes];
209 reader
210 .read_exact(&mut read_buf)
211 .map_err(SoundError::QueueIO)?;
212 let request_type = hdr.code.to_native();
213 match request_type {
214 VIRTIO_SND_R_JACK_INFO => {
215 let (code, info_vec) = {
216 match self.parse_info_query(&read_buf) {
217 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
218 Some((start_id, count)) => {
219 let end_id = start_id.saturating_add(count);
220 if end_id > self.vios_client.lock().num_jacks() {
221 error!(
222 "virtio-snd: Requested info on invalid jacks ids: {}..{}",
223 start_id,
224 end_id - 1
225 );
226 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
227 } else {
228 (
229 VIRTIO_SND_S_OK,
230 (start_id..end_id)
233 .map(|id| {
234 self.vios_client.lock().jack_info(id).unwrap()
235 })
236 .collect(),
237 )
238 }
239 }
240 }
241 };
242 self.send_info_reply(avail_desc, code, info_vec)?;
243 }
244 VIRTIO_SND_R_JACK_REMAP => {
245 let code = if read_buf.len() != std::mem::size_of::<virtio_snd_jack_remap>() {
246 error!(
247 "virtio-snd: The driver sent the wrong number bytes for a jack_remap struct: {}",
248 read_buf.len()
249 );
250 VIRTIO_SND_S_BAD_MSG
251 } else {
252 let mut request: virtio_snd_jack_remap = Default::default();
253 request.as_mut_bytes().copy_from_slice(&read_buf);
254 let jack_id = request.hdr.jack_id.to_native();
255 let association = request.association.to_native();
256 let sequence = request.sequence.to_native();
257 if let Err(e) =
258 self.vios_client
259 .lock()
260 .remap_jack(jack_id, association, sequence)
261 {
262 error!("virtio-snd: Failed to remap jack: {}", e);
263 vios_error_to_status_code(e)
264 } else {
265 VIRTIO_SND_S_OK
266 }
267 };
268 let writer = &mut avail_desc.writer;
269 writer
270 .write_obj(virtio_snd_hdr {
271 code: Le32::from(code),
272 })
273 .map_err(SoundError::QueueIO)?;
274 {
275 let mut queue_lock = self.control_queue.lock();
276 queue_lock.add_used(avail_desc);
277 queue_lock.trigger_interrupt();
278 }
279 }
280 VIRTIO_SND_R_CHMAP_INFO => {
281 let (code, info_vec) = {
282 match self.parse_info_query(&read_buf) {
283 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
284 Some((start_id, count)) => {
285 let end_id = start_id.saturating_add(count);
286 let num_chmaps = self.vios_client.lock().num_chmaps();
287 if end_id > num_chmaps {
288 error!(
289 "virtio-snd: Requested info on invalid chmaps ids: {}..{}",
290 start_id,
291 end_id - 1
292 );
293 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
294 } else {
295 (
296 VIRTIO_SND_S_OK,
297 (start_id..end_id)
300 .map(|id| {
301 self.vios_client.lock().chmap_info(id).unwrap()
302 })
303 .collect(),
304 )
305 }
306 }
307 }
308 };
309 self.send_info_reply(avail_desc, code, info_vec)?;
310 }
311 VIRTIO_SND_R_PCM_INFO => {
312 let (code, info_vec) = {
313 match self.parse_info_query(&read_buf) {
314 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
315 Some((start_id, count)) => {
316 let end_id = start_id.saturating_add(count);
317 if end_id > self.vios_client.lock().num_streams() {
318 error!(
319 "virtio-snd: Requested info on invalid stream ids: {}..{}",
320 start_id,
321 end_id - 1
322 );
323 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
324 } else {
325 (
326 VIRTIO_SND_S_OK,
327 (start_id..end_id)
330 .map(|id| {
331 self.vios_client.lock().stream_info(id).unwrap()
332 })
333 .collect(),
334 )
335 }
336 }
337 }
338 };
339 self.send_info_reply(avail_desc, code, info_vec)?;
340 }
341 VIRTIO_SND_R_PCM_SET_PARAMS => self.process_set_params(avail_desc, &read_buf)?,
342 VIRTIO_SND_R_PCM_PREPARE => {
343 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Prepare(avail_desc))?
344 }
345 VIRTIO_SND_R_PCM_RELEASE => {
346 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Release(avail_desc))?
347 }
348 VIRTIO_SND_R_PCM_START => {
349 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Start(avail_desc))?
350 }
351 VIRTIO_SND_R_PCM_STOP => {
352 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Stop(avail_desc))?
353 }
354 VIRTIO_SND_R_CTL_INFO => {
355 let (code, info_vec) = {
356 match self.parse_info_query(&read_buf) {
357 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
358 Some((start_id, count)) => {
359 let end_id = start_id.saturating_add(count);
360 if end_id > self.vios_client.lock().num_controls() {
361 error!(
362 "virtio-snd: Requested info on invalid control ids: {}..{}",
363 start_id,
364 end_id - 1
365 );
366 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
367 } else {
368 (
369 VIRTIO_SND_S_OK,
370 (start_id..end_id)
371 .map(|id| {
372 self.vios_client.lock().control_info(id).unwrap()
375 })
376 .collect(),
377 )
378 }
379 }
380 }
381 };
382 self.send_info_reply(avail_desc, code, info_vec)?;
383 }
384 VIRTIO_SND_R_CTL_READ => {
385 if read_buf.len() != std::mem::size_of::<virtio_snd_ctl_hdr>() {
386 error!(
387 "virtio-snd: The driver sent the wrong number bytes for a ctl_hdr struct: {}",
388 read_buf.len()
389 );
390 reply_control_op_status(
391 VIRTIO_SND_S_BAD_MSG,
392 avail_desc,
393 &self.control_queue,
394 )?;
395 } else {
396 let mut hdr: virtio_snd_ctl_hdr = Default::default();
397 hdr.as_mut_bytes().copy_from_slice(&read_buf);
398 let control_id = hdr.control_id.to_native();
399 match self.vios_client.lock().get_control(control_id) {
400 Ok(value) => {
401 let writer = &mut avail_desc.writer;
402 writer
403 .write_obj(virtio_snd_hdr {
404 code: Le32::from(VIRTIO_SND_S_OK),
405 })
406 .map_err(SoundError::QueueIO)?;
407 writer.write_obj(value).map_err(SoundError::QueueIO)?;
408 {
409 let mut queue_lock = self.control_queue.lock();
410 queue_lock.add_used(avail_desc);
411 queue_lock.trigger_interrupt();
412 }
413 }
414 Err(e) => {
415 error!("virtio-snd: Failed to get control: {}", e);
416 reply_control_op_status(
417 vios_error_to_status_code(e),
418 avail_desc,
419 &self.control_queue,
420 )?;
421 }
422 }
423 }
424 }
425 VIRTIO_SND_R_CTL_WRITE => {
426 const HDR_SIZE: usize = std::mem::size_of::<virtio_snd_ctl_hdr>();
427 if read_buf.len() != std::mem::size_of::<virtio_snd_ctl_value>() + HDR_SIZE {
428 error!(
429 "virtio-snd: The driver sent the wrong number bytes for a ctl_value struct: {}",
430 read_buf.len()
431 );
432 reply_control_op_status(
433 VIRTIO_SND_S_BAD_MSG,
434 avail_desc,
435 &self.control_queue,
436 )?;
437 } else {
438 let mut hdr: virtio_snd_ctl_hdr = Default::default();
439 let mut val: virtio_snd_ctl_value = Default::default();
440 hdr.as_mut_bytes().copy_from_slice(&read_buf[..HDR_SIZE]);
441 val.as_mut_bytes().copy_from_slice(&read_buf[HDR_SIZE..]);
442 let control_id = hdr.control_id.to_native();
443 match self.vios_client.lock().set_control(control_id, val) {
444 Ok(_) => {
445 reply_control_op_status(
446 VIRTIO_SND_S_OK,
447 avail_desc,
448 &self.control_queue,
449 )?;
450 }
451 Err(e) => {
452 error!("virtio-snd: Failed to set control: {}", e);
453 reply_control_op_status(
454 vios_error_to_status_code(e),
455 avail_desc,
456 &self.control_queue,
457 )?;
458 }
459 }
460 }
461 }
462 _ => {
463 error!(
464 "virtio-snd: Unknown control queue mesage code: {}",
465 request_type
466 );
467 reply_control_op_status(
468 VIRTIO_SND_S_NOT_SUPP,
469 avail_desc,
470 &self.control_queue,
471 )?;
472 }
473 }
474 }
475 Ok(())
476 }
477
478 fn process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()> {
479 while let Some(evt) = self.vios_client.lock().pop_event() {
480 if let Some(mut desc) = event_queue.pop() {
481 let writer = &mut desc.writer;
482 writer.write_obj(evt).map_err(SoundError::QueueIO)?;
483 event_queue.add_used(desc);
484 event_queue.trigger_interrupt();
485 } else {
486 warn!("virtio-snd: Dropping event because there are no buffers in virtqueue");
487 }
488 }
489 Ok(())
490 }
491
492 fn parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)> {
493 if read_buf.len() != std::mem::size_of::<virtio_snd_query_info>() {
494 error!(
495 "virtio-snd: The driver sent the wrong number bytes for a pcm_info struct: {}",
496 read_buf.len()
497 );
498 return None;
499 }
500 let mut query: virtio_snd_query_info = Default::default();
501 query.as_mut_bytes().copy_from_slice(read_buf);
502 let start_id = query.start_id.to_native();
503 let count = query.count.to_native();
504 Some((start_id, count))
505 }
506
507 fn process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()> {
509 if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_set_params>() {
510 error!(
511 "virtio-snd: The driver sent a buffer of the wrong size for a set_params struct: {}",
512 read_buf.len()
513 );
514 return reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue);
515 }
516 let mut params: virtio_snd_pcm_set_params = Default::default();
517 params.as_mut_bytes().copy_from_slice(read_buf);
518 let stream_id = params.hdr.stream_id.to_native();
519 if stream_id < self.vios_client.lock().num_streams() {
520 self.streams[stream_id as usize].send(StreamMsg::SetParams(desc, params))
521 } else {
522 error!(
523 "virtio-snd: Driver requested operation on invalid stream: {}",
524 stream_id
525 );
526 reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue)
527 }
528 }
529
530 fn try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()> {
532 if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_hdr>() {
533 error!(
534 "virtio-snd: The driver sent a buffer too small to contain a header: {}",
535 read_buf.len()
536 );
537 return reply_control_op_status(
538 VIRTIO_SND_S_BAD_MSG,
539 match msg {
540 StreamMsg::Prepare(d)
541 | StreamMsg::Start(d)
542 | StreamMsg::Stop(d)
543 | StreamMsg::Release(d) => d,
544 _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
545 },
546 &self.control_queue,
547 );
548 }
549 let mut pcm_hdr: virtio_snd_pcm_hdr = Default::default();
550 pcm_hdr.as_mut_bytes().copy_from_slice(read_buf);
551 let stream_id = pcm_hdr.stream_id.to_native();
552 if stream_id < self.vios_client.lock().num_streams() {
553 self.streams[stream_id as usize].send(msg)
554 } else {
555 error!(
556 "virtio-snd: Driver requested operation on invalid stream: {}",
557 stream_id
558 );
559 reply_control_op_status(
560 VIRTIO_SND_S_BAD_MSG,
561 match msg {
562 StreamMsg::Prepare(d)
563 | StreamMsg::Start(d)
564 | StreamMsg::Stop(d)
565 | StreamMsg::Release(d) => d,
566 _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
567 },
568 &self.control_queue,
569 )
570 }
571 }
572
573 fn send_info_reply<T: Immutable + IntoBytes>(
574 &mut self,
575 mut desc: DescriptorChain,
576 code: u32,
577 info_vec: Vec<T>,
578 ) -> Result<()> {
579 let writer = &mut desc.writer;
580 writer
581 .write_obj(virtio_snd_hdr {
582 code: Le32::from(code),
583 })
584 .map_err(SoundError::QueueIO)?;
585 for info in info_vec {
586 writer.write_obj(info).map_err(SoundError::QueueIO)?;
587 }
588 {
589 let mut queue_lock = self.control_queue.lock();
590 queue_lock.add_used(desc);
591 queue_lock.trigger_interrupt();
592 }
593 Ok(())
594 }
595}
596
597impl Drop for Worker {
598 fn drop(&mut self) {
599 self.stop_io_thread();
600 }
601}
602
603fn io_loop(
604 tx_queue: Arc<Mutex<Queue>>,
605 rx_queue: Arc<Mutex<Queue>>,
606 senders: Vec<Sender<Box<StreamMsg>>>,
607 kill_evt: Event,
608) -> Result<()> {
609 #[derive(EventToken)]
610 enum Token {
611 TxQAvailable,
612 RxQAvailable,
613 Kill,
614 }
615 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
616 (tx_queue.lock().event(), Token::TxQAvailable),
617 (rx_queue.lock().event(), Token::RxQAvailable),
618 (&kill_evt, Token::Kill),
619 ])
620 .map_err(SoundError::WaitCtx)?;
621
622 'wait: loop {
623 let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
624 for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
625 let queue = match wait_evt.token {
626 Token::TxQAvailable => {
627 tx_queue
628 .lock()
629 .event()
630 .wait()
631 .map_err(SoundError::QueueEvt)?;
632 &tx_queue
633 }
634 Token::RxQAvailable => {
635 rx_queue
636 .lock()
637 .event()
638 .wait()
639 .map_err(SoundError::QueueEvt)?;
640 &rx_queue
641 }
642 Token::Kill => {
643 let _ = kill_evt.wait();
644 break 'wait;
645 }
646 };
647 while let Some(mut avail_desc) = lock_pop_unlock(queue) {
648 let reader = &mut avail_desc.reader;
649 let xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(SoundError::QueueIO)?;
650 let stream_id = xfer.stream_id.to_native();
651 if stream_id as usize >= senders.len() {
652 error!(
653 "virtio-snd: Driver sent buffer for invalid stream: {}",
654 stream_id
655 );
656 reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, avail_desc, queue)?;
657 } else {
658 StreamProxy::send_msg(
659 &senders[stream_id as usize],
660 StreamMsg::Buffer(avail_desc),
661 )?;
662 }
663 }
664 }
665 }
666 Ok(())
667}
668
669fn lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain> {
673 queue.lock().pop()
674}