1pub mod async_api;
43
44use std::cmp::min;
45use std::error;
46use std::fmt;
47use std::fmt::Display;
48use std::io;
49use std::io::Read;
50use std::io::Write;
51#[cfg(unix)]
52use std::os::unix::io::RawFd as RawDescriptor;
53#[cfg(windows)]
54use std::os::windows::io::RawHandle as RawDescriptor;
55use std::result::Result;
56use std::str::FromStr;
57use std::time::Duration;
58use std::time::Instant;
59
60pub use async_api::AsyncStream;
61pub use async_api::AudioStreamsExecutor;
62use async_trait::async_trait;
63use remain::sorted;
64use serde::Deserialize;
65use serde::Serialize;
66use thiserror::Error;
67
68#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
69pub enum SampleFormat {
70 U8,
71 S16LE,
72 S24LE,
73 S32LE,
74}
75
76impl SampleFormat {
77 pub fn sample_bytes(self) -> usize {
78 use SampleFormat::*;
79 match self {
80 U8 => 1,
81 S16LE => 2,
82 S24LE => 4, S32LE => 4,
84 }
85 }
86}
87
88impl Display for SampleFormat {
89 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
90 use SampleFormat::*;
91 match self {
92 U8 => write!(f, "Unsigned 8 bit"),
93 S16LE => write!(f, "Signed 16 bit Little Endian"),
94 S24LE => write!(f, "Signed 24 bit Little Endian"),
95 S32LE => write!(f, "Signed 32 bit Little Endian"),
96 }
97 }
98}
99
100impl FromStr for SampleFormat {
101 type Err = SampleFormatError;
102 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
103 match s {
104 "U8" => Ok(SampleFormat::U8),
105 "S16_LE" => Ok(SampleFormat::S16LE),
106 "S24_LE" => Ok(SampleFormat::S24LE),
107 "S32_LE" => Ok(SampleFormat::S32LE),
108 _ => Err(SampleFormatError::InvalidSampleFormat),
109 }
110 }
111}
112
113#[sorted]
115#[derive(Error, Debug)]
116pub enum SampleFormatError {
117 #[error("Must be in [U8, S16_LE, S24_LE, S32_LE]")]
118 InvalidSampleFormat,
119}
120
121#[derive(Copy, Clone, Debug, PartialEq, Eq)]
123pub enum StreamDirection {
124 Playback,
125 Capture,
126}
127
128#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
130pub enum StreamEffect {
131 #[default]
132 NoEffect,
133 #[serde(alias = "aec")]
134 EchoCancellation,
135}
136
137pub mod capture;
138pub mod shm_streams;
139
140pub type BoxError = Box<dyn error::Error + Send + Sync>;
142
143#[sorted]
145#[derive(Error, Debug)]
146pub enum StreamEffectError {
147 #[error("Must be in [EchoCancellation, aec]")]
148 InvalidEffect,
149}
150
151impl FromStr for StreamEffect {
152 type Err = StreamEffectError;
153 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
154 match s {
155 "EchoCancellation" | "aec" => Ok(StreamEffect::EchoCancellation),
156 _ => Err(StreamEffectError::InvalidEffect),
157 }
158 }
159}
160
161#[sorted]
162#[derive(Error, Debug)]
163pub enum Error {
164 #[error("Unimplemented")]
165 Unimplemented,
166}
167
168pub trait StreamSourceGenerator: Sync + Send {
171 fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError>;
172}
173
174#[async_trait(?Send)]
176pub trait StreamSource: Send {
177 #[allow(clippy::type_complexity)]
180 fn new_playback_stream(
181 &mut self,
182 num_channels: usize,
183 format: SampleFormat,
184 frame_rate: u32,
185 buffer_size: usize,
186 ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>;
187
188 #[allow(clippy::type_complexity)]
191 fn new_async_playback_stream(
192 &mut self,
193 _num_channels: usize,
194 _format: SampleFormat,
195 _frame_rate: u32,
196 _buffer_size: usize,
197 _ex: &dyn AudioStreamsExecutor,
198 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
199 Err(Box::new(Error::Unimplemented))
200 }
201
202 #[allow(clippy::type_complexity)]
205 async fn async_new_async_playback_stream(
206 &mut self,
207 num_channels: usize,
208 format: SampleFormat,
209 frame_rate: u32,
210 buffer_size: usize,
211 ex: &dyn AudioStreamsExecutor,
212 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
213 self.new_async_playback_stream(num_channels, format, frame_rate, buffer_size, ex)
214 }
215
216 #[allow(clippy::type_complexity)]
220 fn new_capture_stream(
221 &mut self,
222 num_channels: usize,
223 format: SampleFormat,
224 frame_rate: u32,
225 buffer_size: usize,
226 _effects: &[StreamEffect],
227 ) -> Result<
228 (
229 Box<dyn StreamControl>,
230 Box<dyn capture::CaptureBufferStream>,
231 ),
232 BoxError,
233 > {
234 Ok((
235 Box::new(NoopStreamControl::new()),
236 Box::new(capture::NoopCaptureStream::new(
237 num_channels,
238 format,
239 frame_rate,
240 buffer_size,
241 )),
242 ))
243 }
244
245 #[allow(clippy::type_complexity)]
249 fn new_async_capture_stream(
250 &mut self,
251 num_channels: usize,
252 format: SampleFormat,
253 frame_rate: u32,
254 buffer_size: usize,
255 _effects: &[StreamEffect],
256 _ex: &dyn AudioStreamsExecutor,
257 ) -> Result<
258 (
259 Box<dyn StreamControl>,
260 Box<dyn capture::AsyncCaptureBufferStream>,
261 ),
262 BoxError,
263 > {
264 Ok((
265 Box::new(NoopStreamControl::new()),
266 Box::new(capture::NoopCaptureStream::new(
267 num_channels,
268 format,
269 frame_rate,
270 buffer_size,
271 )),
272 ))
273 }
274
275 #[allow(clippy::type_complexity)]
278 async fn async_new_async_capture_stream(
279 &mut self,
280 num_channels: usize,
281 format: SampleFormat,
282 frame_rate: u32,
283 buffer_size: usize,
284 effects: &[StreamEffect],
285 ex: &dyn AudioStreamsExecutor,
286 ) -> Result<
287 (
288 Box<dyn StreamControl>,
289 Box<dyn capture::AsyncCaptureBufferStream>,
290 ),
291 BoxError,
292 > {
293 self.new_async_capture_stream(num_channels, format, frame_rate, buffer_size, effects, ex)
294 }
295
296 fn keep_rds(&self) -> Option<Vec<RawDescriptor>> {
299 None
300 }
301}
302
303pub trait PlaybackBufferStream: Send {
305 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>;
306
307 fn write_playback_buffer<'b, 's: 'b>(
310 &'s mut self,
311 f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>,
312 ) -> Result<(), BoxError> {
313 let mut buf = self.next_playback_buffer()?;
314 f(&mut buf)?;
315 buf.commit();
316 Ok(())
317 }
318}
319
320impl<S: PlaybackBufferStream + ?Sized> PlaybackBufferStream for &mut S {
321 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
322 (**self).next_playback_buffer()
323 }
324}
325
326#[async_trait(?Send)]
329pub trait AsyncPlaybackBufferStream: Send {
330 async fn next_playback_buffer<'a>(
331 &'a mut self,
332 _ex: &dyn AudioStreamsExecutor,
333 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>;
334}
335
336#[async_trait(?Send)]
337impl<S: AsyncPlaybackBufferStream + ?Sized> AsyncPlaybackBufferStream for &mut S {
338 async fn next_playback_buffer<'a>(
339 &'a mut self,
340 ex: &dyn AudioStreamsExecutor,
341 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
342 (**self).next_playback_buffer(ex).await
343 }
344}
345
346pub async fn async_write_playback_buffer<F>(
351 stream: &mut dyn AsyncPlaybackBufferStream,
352 f: F,
353 ex: &dyn AudioStreamsExecutor,
354) -> Result<(), BoxError>
355where
356 F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,
357{
358 let mut buf = stream.next_playback_buffer(ex).await?;
359 f(&mut buf)?;
360 buf.commit().await;
361 Ok(())
362}
363
364pub trait StreamControl: Send + Sync {
367 fn set_volume(&mut self, _scaler: f64) {}
368 fn set_mute(&mut self, _mute: bool) {}
369}
370
371pub trait BufferCommit {
374 fn commit(&mut self, nframes: usize);
377 fn latency_bytes(&self) -> u32 {
382 0
383 }
384}
385
386#[async_trait(?Send)]
389pub trait AsyncBufferCommit {
390 async fn commit(&mut self, nframes: usize);
394 fn latency_bytes(&self) -> u32 {
399 0
400 }
401}
402
403#[sorted]
405#[derive(Error, Debug)]
406pub enum PlaybackBufferError {
407 #[error("Invalid buffer length")]
408 InvalidLength,
409 #[error("Slicing of playback buffer out of bounds")]
410 SliceOutOfBounds,
411}
412
413struct AudioBuffer<'a> {
416 buffer: &'a mut [u8],
417 offset: usize, frame_size: usize, }
420
421impl AudioBuffer<'_> {
422 pub fn frame_capacity(&self) -> usize {
424 self.buffer.len() / self.frame_size
425 }
426
427 fn calc_len(&self, size: usize) -> usize {
428 min(
429 size / self.frame_size * self.frame_size,
430 self.buffer.len() - self.offset,
431 )
432 }
433
434 pub fn write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
436 let len = self.calc_len(size);
438 cb(&mut self.buffer[self.offset..(self.offset + len)]);
439 self.offset += len;
440 Ok(len)
441 }
442
443 pub fn read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
445 let len = self.calc_len(size);
446 cb(&self.buffer[self.offset..(self.offset + len)]);
447 self.offset += len;
448 Ok(len)
449 }
450
451 pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
453 let bytes = reader.read(&mut self.buffer[self.offset..])?;
454 self.offset += bytes;
455 Ok(bytes)
456 }
457
458 pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
460 let bytes = writer.write(&self.buffer[self.offset..])?;
461 self.offset += bytes;
462 Ok(bytes)
463 }
464}
465
466impl Write for AudioBuffer<'_> {
467 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
468 let written = (&mut self.buffer[self.offset..]).write(&buf[..buf.len()])?;
469 self.offset += written;
470 Ok(written)
471 }
472
473 fn flush(&mut self) -> io::Result<()> {
474 Ok(())
475 }
476}
477
478impl Read for AudioBuffer<'_> {
479 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
480 let len = buf.len() / self.frame_size * self.frame_size;
481 let written = (&mut buf[..len]).write(&self.buffer[self.offset..])?;
482 self.offset += written;
483 Ok(written)
484 }
485}
486
487pub struct PlaybackBuffer<'a> {
490 buffer: AudioBuffer<'a>,
491 drop: &'a mut dyn BufferCommit,
492}
493
494impl<'a> PlaybackBuffer<'a> {
495 pub fn new<F>(
498 frame_size: usize,
499 buffer: &'a mut [u8],
500 drop: &'a mut F,
501 ) -> Result<Self, PlaybackBufferError>
502 where
503 F: BufferCommit,
504 {
505 if buffer.len() % frame_size != 0 {
506 return Err(PlaybackBufferError::InvalidLength);
507 }
508
509 Ok(PlaybackBuffer {
510 buffer: AudioBuffer {
511 buffer,
512 offset: 0,
513 frame_size,
514 },
515 drop,
516 })
517 }
518
519 pub fn frame_capacity(&self) -> usize {
521 self.buffer.frame_capacity()
522 }
523
524 pub fn commit(&mut self) {
527 self.drop
528 .commit(self.buffer.offset / self.buffer.frame_size);
529 }
530
531 pub fn latency_bytes(&self) -> u32 {
534 self.drop.latency_bytes()
535 }
536
537 pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
539 self.buffer.write_copy_cb(size, cb)
540 }
541}
542
543impl Write for PlaybackBuffer<'_> {
544 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
545 self.buffer.write(buf)
546 }
547
548 fn flush(&mut self) -> io::Result<()> {
549 self.buffer.flush()
550 }
551}
552
553pub struct AsyncPlaybackBuffer<'a> {
555 buffer: AudioBuffer<'a>,
556 trigger: &'a mut dyn AsyncBufferCommit,
557}
558
559impl<'a> AsyncPlaybackBuffer<'a> {
560 pub fn new<F>(
563 frame_size: usize,
564 buffer: &'a mut [u8],
565 trigger: &'a mut F,
566 ) -> Result<Self, PlaybackBufferError>
567 where
568 F: AsyncBufferCommit,
569 {
570 if buffer.len() % frame_size != 0 {
571 return Err(PlaybackBufferError::InvalidLength);
572 }
573
574 Ok(AsyncPlaybackBuffer {
575 buffer: AudioBuffer {
576 buffer,
577 offset: 0,
578 frame_size,
579 },
580 trigger,
581 })
582 }
583
584 pub fn frame_capacity(&self) -> usize {
586 self.buffer.frame_capacity()
587 }
588
589 pub async fn commit(&mut self) {
592 self.trigger
593 .commit(self.buffer.offset / self.buffer.frame_size)
594 .await;
595 }
596
597 pub fn latency_bytes(&self) -> u32 {
599 self.trigger.latency_bytes()
600 }
601
602 pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
604 self.buffer.write_copy_cb(size, cb)
605 }
606
607 pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
609 self.buffer.copy_from(reader)
610 }
611}
612
613impl Write for AsyncPlaybackBuffer<'_> {
614 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
615 self.buffer.write(buf)
616 }
617
618 fn flush(&mut self) -> io::Result<()> {
619 self.buffer.flush()
620 }
621}
622pub struct NoopStream {
624 buffer: Vec<u8>,
625 frame_size: usize,
626 interval: Duration,
627 next_frame: Duration,
628 start_time: Option<Instant>,
629 buffer_drop: NoopBufferCommit,
630}
631
632struct NoopBufferCommit {
634 which_buffer: bool,
635}
636
637impl BufferCommit for NoopBufferCommit {
638 fn commit(&mut self, _nwritten: usize) {
639 self.which_buffer ^= true;
641 }
642}
643
644#[async_trait(?Send)]
645impl AsyncBufferCommit for NoopBufferCommit {
646 async fn commit(&mut self, _nwritten: usize) {
647 self.which_buffer ^= true;
649 }
650}
651
652impl NoopStream {
653 pub fn new(
654 num_channels: usize,
655 format: SampleFormat,
656 frame_rate: u32,
657 buffer_size: usize,
658 ) -> Self {
659 let frame_size = format.sample_bytes() * num_channels;
660 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
661 NoopStream {
662 buffer: vec![0; buffer_size * frame_size],
663 frame_size,
664 interval,
665 next_frame: interval,
666 start_time: None,
667 buffer_drop: NoopBufferCommit {
668 which_buffer: false,
669 },
670 }
671 }
672}
673
674impl PlaybackBufferStream for NoopStream {
675 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
676 if let Some(start_time) = self.start_time {
677 let elapsed = start_time.elapsed();
678 if elapsed < self.next_frame {
679 std::thread::sleep(self.next_frame - elapsed);
680 }
681 self.next_frame += self.interval;
682 } else {
683 self.start_time = Some(Instant::now());
684 self.next_frame = self.interval;
685 }
686 Ok(PlaybackBuffer::new(
687 self.frame_size,
688 &mut self.buffer,
689 &mut self.buffer_drop,
690 )?)
691 }
692}
693
694#[async_trait(?Send)]
695impl AsyncPlaybackBufferStream for NoopStream {
696 async fn next_playback_buffer<'a>(
697 &'a mut self,
698 ex: &dyn AudioStreamsExecutor,
699 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
700 if let Some(start_time) = self.start_time {
701 let elapsed = start_time.elapsed();
702 if elapsed < self.next_frame {
703 ex.delay(self.next_frame - elapsed).await?;
704 }
705 self.next_frame += self.interval;
706 } else {
707 self.start_time = Some(Instant::now());
708 self.next_frame = self.interval;
709 }
710 Ok(AsyncPlaybackBuffer::new(
711 self.frame_size,
712 &mut self.buffer,
713 &mut self.buffer_drop,
714 )?)
715 }
716}
717
718#[derive(Default)]
720pub struct NoopStreamControl;
721
722impl NoopStreamControl {
723 pub fn new() -> Self {
724 NoopStreamControl {}
725 }
726}
727
728impl StreamControl for NoopStreamControl {}
729
730#[derive(Default)]
732pub struct NoopStreamSource;
733
734impl NoopStreamSource {
735 pub fn new() -> Self {
736 NoopStreamSource {}
737 }
738}
739
740impl StreamSource for NoopStreamSource {
741 #[allow(clippy::type_complexity)]
742 fn new_playback_stream(
743 &mut self,
744 num_channels: usize,
745 format: SampleFormat,
746 frame_rate: u32,
747 buffer_size: usize,
748 ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError> {
749 Ok((
750 Box::new(NoopStreamControl::new()),
751 Box::new(NoopStream::new(
752 num_channels,
753 format,
754 frame_rate,
755 buffer_size,
756 )),
757 ))
758 }
759
760 #[allow(clippy::type_complexity)]
761 fn new_async_playback_stream(
762 &mut self,
763 num_channels: usize,
764 format: SampleFormat,
765 frame_rate: u32,
766 buffer_size: usize,
767 _ex: &dyn AudioStreamsExecutor,
768 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
769 Ok((
770 Box::new(NoopStreamControl::new()),
771 Box::new(NoopStream::new(
772 num_channels,
773 format,
774 frame_rate,
775 buffer_size,
776 )),
777 ))
778 }
779}
780
781pub struct NoopStreamSourceGenerator;
784
785impl NoopStreamSourceGenerator {
786 pub fn new() -> Self {
787 NoopStreamSourceGenerator {}
788 }
789}
790
791impl Default for NoopStreamSourceGenerator {
792 fn default() -> Self {
793 Self::new()
794 }
795}
796
797impl StreamSourceGenerator for NoopStreamSourceGenerator {
798 fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError> {
799 Ok(Box::new(NoopStreamSource))
800 }
801}
802
803#[cfg(test)]
804mod tests {
805 use futures::FutureExt;
806 use io::Write;
807
808 use super::async_api::test::TestExecutor;
809 use super::*;
810
811 #[test]
812 fn invalid_buffer_length() {
813 let mut pb_buf = [0xa5u8; 480 * 2 * 2 + 1];
815 let mut buffer_drop = NoopBufferCommit {
816 which_buffer: false,
817 };
818 assert!(PlaybackBuffer::new(2, &mut pb_buf, &mut buffer_drop).is_err());
819 }
820
821 #[test]
822 fn audio_buffer_copy_from() {
823 const PERIOD_SIZE: usize = 8192;
824 const NUM_CHANNELS: usize = 6;
825 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
826 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
827 let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
828 let mut aud_buf = AudioBuffer {
829 buffer: &mut dst_buf,
830 offset: 0,
831 frame_size: FRAME_SIZE,
832 };
833 aud_buf
834 .copy_from(&mut &src_buf[..])
835 .expect("all data should be copied.");
836 assert_eq!(dst_buf, src_buf);
837 }
838
839 #[test]
840 fn audio_buffer_copy_from_repeat() {
841 const PERIOD_SIZE: usize = 8192;
842 const NUM_CHANNELS: usize = 6;
843 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
844 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
845 let mut aud_buf = AudioBuffer {
846 buffer: &mut dst_buf,
847 offset: 0,
848 frame_size: FRAME_SIZE,
849 };
850 let bytes = aud_buf
851 .copy_from(&mut io::repeat(1))
852 .expect("all data should be copied.");
853 assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
854 assert_eq!(dst_buf, [1u8; PERIOD_SIZE * FRAME_SIZE]);
855 }
856
857 #[test]
858 fn audio_buffer_copy_to() {
859 const PERIOD_SIZE: usize = 8192;
860 const NUM_CHANNELS: usize = 6;
861 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
862 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
863 let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
864 let mut aud_buf = AudioBuffer {
865 buffer: &mut src_buf,
866 offset: 0,
867 frame_size: FRAME_SIZE,
868 };
869 aud_buf
870 .copy_to(&mut &mut dst_buf[..])
871 .expect("all data should be copied.");
872 assert_eq!(dst_buf, src_buf);
873 }
874
875 #[test]
876 fn audio_buffer_copy_to_sink() {
877 const PERIOD_SIZE: usize = 8192;
878 const NUM_CHANNELS: usize = 6;
879 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
880 let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
881 let mut aud_buf = AudioBuffer {
882 buffer: &mut src_buf,
883 offset: 0,
884 frame_size: FRAME_SIZE,
885 };
886 let bytes = aud_buf
887 .copy_to(&mut io::sink())
888 .expect("all data should be copied.");
889 assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
890 }
891
892 #[test]
893 fn io_copy_audio_buffer() {
894 const PERIOD_SIZE: usize = 8192;
895 const NUM_CHANNELS: usize = 6;
896 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
897 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
898 let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
899 let mut aud_buf = AudioBuffer {
900 buffer: &mut dst_buf,
901 offset: 0,
902 frame_size: FRAME_SIZE,
903 };
904 io::copy(&mut &src_buf[..], &mut aud_buf).expect("all data should be copied.");
905 assert_eq!(dst_buf, src_buf);
906 }
907
908 #[test]
909 fn commit() {
910 struct TestCommit {
911 frame_count: usize,
912 }
913 impl BufferCommit for TestCommit {
914 fn commit(&mut self, nwritten: usize) {
915 self.frame_count += nwritten;
916 }
917 }
918 let mut test_commit = TestCommit { frame_count: 0 };
919 {
920 const FRAME_SIZE: usize = 4;
921 let mut buf = [0u8; 480 * FRAME_SIZE];
922 let mut pb_buf = PlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
923 pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
924 pb_buf.commit();
925 }
926 assert_eq!(test_commit.frame_count, 480);
927 }
928
929 #[test]
930 fn sixteen_bit_stereo() {
931 let mut server = NoopStreamSource::new();
932 let (_, mut stream) = server
933 .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
934 .unwrap();
935 let mut copy_cb = |buf: &mut PlaybackBuffer| {
936 assert_eq!(buf.buffer.frame_capacity(), 480);
937 let pb_buf = [0xa5u8; 480 * 2 * 2];
938 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
939 Ok(())
940 };
941 stream.write_playback_buffer(&mut copy_cb).unwrap();
942 }
943
944 #[test]
945 fn consumption_rate() {
946 let mut server = NoopStreamSource::new();
947 let (_, mut stream) = server
948 .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
949 .unwrap();
950 let start = Instant::now();
951 {
952 let mut copy_cb = |buf: &mut PlaybackBuffer| {
953 let pb_buf = [0xa5u8; 480 * 2 * 2];
954 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
955 Ok(())
956 };
957 stream.write_playback_buffer(&mut copy_cb).unwrap();
958 }
959 let mut assert_cb = |_: &mut PlaybackBuffer| {
961 let elapsed = start.elapsed();
962 assert!(
963 elapsed > Duration::from_millis(10),
964 "next_playback_buffer didn't block long enough {}",
965 elapsed.subsec_millis()
966 );
967 Ok(())
968 };
969 stream.write_playback_buffer(&mut assert_cb).unwrap();
970 }
971
972 #[test]
973 fn async_commit() {
974 struct TestCommit {
975 frame_count: usize,
976 }
977 #[async_trait(?Send)]
978 impl AsyncBufferCommit for TestCommit {
979 async fn commit(&mut self, nwritten: usize) {
980 self.frame_count += nwritten;
981 }
982 }
983 async fn this_test() {
984 let mut test_commit = TestCommit { frame_count: 0 };
985 {
986 const FRAME_SIZE: usize = 4;
987 let mut buf = [0u8; 480 * FRAME_SIZE];
988 let mut pb_buf =
989 AsyncPlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
990 pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
991 pb_buf.commit().await;
992 }
993 assert_eq!(test_commit.frame_count, 480);
994 }
995
996 this_test().now_or_never();
997 }
998
999 #[test]
1000 fn consumption_rate_async() {
1001 async fn this_test(ex: &TestExecutor) {
1002 let mut server = NoopStreamSource::new();
1003 let (_, mut stream) = server
1004 .new_async_playback_stream(2, SampleFormat::S16LE, 48000, 480, ex)
1005 .unwrap();
1006 let start = Instant::now();
1007 {
1008 let copy_func = |buf: &mut AsyncPlaybackBuffer| {
1009 let pb_buf = [0xa5u8; 480 * 2 * 2];
1010 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
1011 Ok(())
1012 };
1013 async_write_playback_buffer(&mut *stream, copy_func, ex)
1014 .await
1015 .unwrap();
1016 }
1017 let assert_func = |_: &mut AsyncPlaybackBuffer| {
1019 let elapsed = start.elapsed();
1020 assert!(
1021 elapsed > Duration::from_millis(10),
1022 "write_playback_buffer didn't block long enough {}",
1023 elapsed.subsec_millis()
1024 );
1025 Ok(())
1026 };
1027
1028 async_write_playback_buffer(&mut *stream, assert_func, ex)
1029 .await
1030 .unwrap();
1031 }
1032
1033 let ex = TestExecutor {};
1034 this_test(&ex).now_or_never();
1035 }
1036
1037 #[test]
1038 fn generate_noop_stream_source() {
1039 let generator: Box<dyn StreamSourceGenerator> = Box::new(NoopStreamSourceGenerator::new());
1040 generator
1041 .generate()
1042 .expect("failed to generate stream source");
1043 }
1044}