1#[cfg(any(target_os = "android", target_os = "linux"))]
6use std::os::unix::io::RawFd;
7use std::sync::Arc;
8use std::sync::Condvar;
9use std::sync::Mutex;
10use std::time::Duration;
11use std::time::Instant;
12
13use remain::sorted;
14use thiserror::Error;
15
16use crate::BoxError;
17use crate::SampleFormat;
18use crate::StreamDirection;
19use crate::StreamEffect;
20
21type GenericResult<T> = std::result::Result<T, BoxError>;
22
23pub trait BufferSet {
27 fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>;
33
34 fn ignore(&mut self) -> GenericResult<()>;
36}
37
38#[sorted]
39#[derive(Error, Debug)]
40pub enum Error {
41 #[error("Provided number of frames {0} exceeds requested number of frames {1}")]
42 TooManyFrames(usize, usize),
43}
44
45pub struct ServerRequest<'a> {
48 requested_frames: usize,
49 buffer_set: &'a mut dyn BufferSet,
50}
51
52impl<'a> ServerRequest<'a> {
53 pub fn new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self {
66 Self {
67 requested_frames,
68 buffer_set,
69 }
70 }
71
72 pub fn requested_frames(&self) -> usize {
77 self.requested_frames
78 }
79
80 pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> {
98 if frames > self.requested_frames {
99 return Err(Box::new(Error::TooManyFrames(
100 frames,
101 self.requested_frames,
102 )));
103 }
104
105 self.buffer_set.callback(offset, frames)
106 }
107
108 pub fn ignore_request(self) -> GenericResult<()> {
114 self.buffer_set.ignore()
115 }
116}
117
118pub trait ShmStream: Send {
120 fn frame_size(&self) -> usize;
122
123 fn num_channels(&self) -> usize;
125
126 fn frame_rate(&self) -> u32;
128
129 fn wait_for_next_action_with_timeout(
156 &mut self,
157 timeout: Duration,
158 ) -> GenericResult<Option<ServerRequest>>;
159}
160
161pub trait SharedMemory {
163 type Error: std::error::Error;
164
165 fn anon(size: u64) -> Result<Self, Self::Error>
167 where
168 Self: Sized;
169
170 fn size(&self) -> u64;
175
176 #[cfg(any(target_os = "android", target_os = "linux"))]
178 fn as_raw_fd(&self) -> RawFd;
179}
180
181pub trait ShmStreamSource<E: std::error::Error>: Send {
183 #[allow(clippy::too_many_arguments)]
208 fn new_stream(
209 &mut self,
210 direction: StreamDirection,
211 num_channels: usize,
212 format: SampleFormat,
213 frame_rate: u32,
214 buffer_size: usize,
215 effects: &[StreamEffect],
216 client_shm: &dyn SharedMemory<Error = E>,
217 buffer_offsets: [u64; 2],
218 ) -> GenericResult<Box<dyn ShmStream>>;
219
220 #[cfg(any(target_os = "android", target_os = "linux"))]
226 fn keep_fds(&self) -> Vec<RawFd> {
227 Vec::new()
228 }
229}
230
231pub struct NullShmStream {
233 num_channels: usize,
234 frame_rate: u32,
235 buffer_size: usize,
236 frame_size: usize,
237 interval: Duration,
238 next_frame: Duration,
239 start_time: Instant,
240}
241
242impl NullShmStream {
243 pub fn new(
246 buffer_size: usize,
247 num_channels: usize,
248 format: SampleFormat,
249 frame_rate: u32,
250 ) -> Self {
251 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
252 Self {
253 num_channels,
254 frame_rate,
255 buffer_size,
256 frame_size: format.sample_bytes() * num_channels,
257 interval,
258 next_frame: interval,
259 start_time: Instant::now(),
260 }
261 }
262}
263
264impl BufferSet for NullShmStream {
265 fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
266 Ok(())
267 }
268
269 fn ignore(&mut self) -> GenericResult<()> {
270 Ok(())
271 }
272}
273
274impl ShmStream for NullShmStream {
275 fn frame_size(&self) -> usize {
276 self.frame_size
277 }
278
279 fn num_channels(&self) -> usize {
280 self.num_channels
281 }
282
283 fn frame_rate(&self) -> u32 {
284 self.frame_rate
285 }
286
287 fn wait_for_next_action_with_timeout(
288 &mut self,
289 timeout: Duration,
290 ) -> GenericResult<Option<ServerRequest>> {
291 let elapsed = self.start_time.elapsed();
292 if elapsed < self.next_frame {
293 if timeout < self.next_frame - elapsed {
294 std::thread::sleep(timeout);
295 return Ok(None);
296 } else {
297 std::thread::sleep(self.next_frame - elapsed);
298 }
299 }
300 self.next_frame += self.interval;
301 Ok(Some(ServerRequest::new(self.buffer_size, self)))
302 }
303}
304
305#[derive(Default)]
307pub struct NullShmStreamSource;
308
309impl NullShmStreamSource {
310 pub fn new() -> Self {
311 NullShmStreamSource
312 }
313}
314
315impl<E: std::error::Error> ShmStreamSource<E> for NullShmStreamSource {
316 fn new_stream(
317 &mut self,
318 _direction: StreamDirection,
319 num_channels: usize,
320 format: SampleFormat,
321 frame_rate: u32,
322 buffer_size: usize,
323 _effects: &[StreamEffect],
324 _client_shm: &dyn SharedMemory<Error = E>,
325 _buffer_offsets: [u64; 2],
326 ) -> GenericResult<Box<dyn ShmStream>> {
327 let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate);
328 Ok(Box::new(new_stream))
329 }
330}
331
332#[derive(Clone)]
333pub struct MockShmStream {
334 num_channels: usize,
335 frame_rate: u32,
336 request_size: usize,
337 frame_size: usize,
338 request_notifier: Arc<(Mutex<bool>, Condvar)>,
339}
340
341impl MockShmStream {
342 pub fn new(
345 num_channels: usize,
346 frame_rate: u32,
347 format: SampleFormat,
348 buffer_size: usize,
349 ) -> Self {
350 #[allow(clippy::mutex_atomic)]
351 Self {
352 num_channels,
353 frame_rate,
354 request_size: buffer_size,
355 frame_size: format.sample_bytes() * num_channels,
356 request_notifier: Arc::new((Mutex::new(false), Condvar::new())),
357 }
358 }
359
360 pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool {
366 let (lock, cvar) = &*self.request_notifier;
367 let mut requested = lock.lock().unwrap();
368 *requested = true;
369 cvar.notify_one();
370 let start_time = Instant::now();
371 while *requested {
372 requested = cvar.wait_timeout(requested, timeout).unwrap().0;
373 if start_time.elapsed() > timeout {
374 *requested = false;
376 return false;
377 }
378 }
379
380 true
381 }
382
383 fn notify_request(&mut self) {
384 let (lock, cvar) = &*self.request_notifier;
385 let mut requested = lock.lock().unwrap();
386 *requested = false;
387 cvar.notify_one();
388 }
389}
390
391impl BufferSet for MockShmStream {
392 fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
393 self.notify_request();
394 Ok(())
395 }
396
397 fn ignore(&mut self) -> GenericResult<()> {
398 self.notify_request();
399 Ok(())
400 }
401}
402
403impl ShmStream for MockShmStream {
404 fn frame_size(&self) -> usize {
405 self.frame_size
406 }
407
408 fn num_channels(&self) -> usize {
409 self.num_channels
410 }
411
412 fn frame_rate(&self) -> u32 {
413 self.frame_rate
414 }
415
416 fn wait_for_next_action_with_timeout(
417 &mut self,
418 timeout: Duration,
419 ) -> GenericResult<Option<ServerRequest>> {
420 {
421 let start_time = Instant::now();
422 let (lock, cvar) = &*self.request_notifier;
423 let mut requested = lock.lock().unwrap();
424 while !*requested {
425 requested = cvar.wait_timeout(requested, timeout).unwrap().0;
426 if start_time.elapsed() > timeout {
427 return Ok(None);
428 }
429 }
430 }
431
432 Ok(Some(ServerRequest::new(self.request_size, self)))
433 }
434}
435
436#[derive(Clone, Default)]
438pub struct MockShmStreamSource {
439 last_stream: Arc<(Mutex<Option<MockShmStream>>, Condvar)>,
440}
441
442impl MockShmStreamSource {
443 pub fn new() -> Self {
444 Default::default()
445 }
446
447 pub fn get_last_stream(&self) -> MockShmStream {
450 let (last_stream, cvar) = &*self.last_stream;
451 let mut stream = last_stream.lock().unwrap();
452 loop {
453 match &*stream {
454 None => stream = cvar.wait(stream).unwrap(),
455 Some(ref s) => return s.clone(),
456 };
457 }
458 }
459}
460
461impl<E: std::error::Error> ShmStreamSource<E> for MockShmStreamSource {
462 fn new_stream(
463 &mut self,
464 _direction: StreamDirection,
465 num_channels: usize,
466 format: SampleFormat,
467 frame_rate: u32,
468 buffer_size: usize,
469 _effects: &[StreamEffect],
470 _client_shm: &dyn SharedMemory<Error = E>,
471 _buffer_offsets: [u64; 2],
472 ) -> GenericResult<Box<dyn ShmStream>> {
473 let (last_stream, cvar) = &*self.last_stream;
474 let mut stream = last_stream.lock().unwrap();
475
476 let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size);
477 *stream = Some(new_stream.clone());
478 cvar.notify_one();
479 Ok(Box::new(new_stream))
480 }
481}
482
483#[cfg(all(test, unix))]
485pub mod tests {
486 use super::*;
487
488 struct MockSharedMemory {}
489
490 impl SharedMemory for MockSharedMemory {
491 type Error = super::Error;
492
493 fn anon(_: u64) -> Result<Self, Self::Error> {
494 Ok(MockSharedMemory {})
495 }
496
497 fn size(&self) -> u64 {
498 0
499 }
500
501 #[cfg(any(target_os = "android", target_os = "linux"))]
502 fn as_raw_fd(&self) -> RawFd {
503 0
504 }
505 }
506
507 #[test]
508 fn mock_trigger_callback() {
509 let stream_source = MockShmStreamSource::new();
510 let mut thread_stream_source = stream_source.clone();
511
512 let buffer_size = 480;
513 let num_channels = 2;
514 let format = SampleFormat::S24LE;
515 let shm = MockSharedMemory {};
516
517 let handle = std::thread::spawn(move || {
518 let mut stream = thread_stream_source
519 .new_stream(
520 StreamDirection::Playback,
521 num_channels,
522 format,
523 44100,
524 buffer_size,
525 &[],
526 &shm,
527 [400, 8000],
528 )
529 .expect("Failed to create stream");
530
531 let request = stream
532 .wait_for_next_action_with_timeout(Duration::from_secs(5))
533 .expect("Failed to wait for next action");
534 match request {
535 Some(r) => {
536 let requested = r.requested_frames();
537 r.set_buffer_offset_and_frames(872, requested)
538 .expect("Failed to set buffer offset and frames");
539 requested
540 }
541 None => 0,
542 }
543 });
544
545 let mut stream = stream_source.get_last_stream();
546 assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1)));
547
548 let requested_frames = handle.join().expect("Failed to join thread");
549 assert_eq!(requested_frames, buffer_size);
550 }
551
552 #[test]
553 fn null_consumption_rate() {
554 let frame_rate = 44100;
555 let buffer_size = 480;
556 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
557
558 let shm = MockSharedMemory {};
559
560 let start = Instant::now();
561
562 let mut stream_source = NullShmStreamSource::new();
563 let mut stream = stream_source
564 .new_stream(
565 StreamDirection::Playback,
566 2,
567 SampleFormat::S24LE,
568 frame_rate,
569 buffer_size,
570 &[],
571 &shm,
572 [400, 8000],
573 )
574 .expect("Failed to create stream");
575
576 let timeout = Duration::from_secs(5);
577 let request = stream
578 .wait_for_next_action_with_timeout(timeout)
579 .expect("Failed to wait for first request")
580 .expect("First request should not have timed out");
581 request
582 .set_buffer_offset_and_frames(276, 480)
583 .expect("Failed to set buffer offset and length");
584
585 let _request = stream
587 .wait_for_next_action_with_timeout(timeout)
588 .expect("Failed to wait for second request");
589 let elapsed = start.elapsed();
590 assert!(
591 elapsed > interval,
592 "wait_for_next_action_with_timeout didn't block long enough: {elapsed:?}"
593 );
594
595 assert!(
596 elapsed < timeout,
597 "wait_for_next_action_with_timeout blocked for too long: {elapsed:?}"
598 );
599 }
600}