audio_streams/
audio_streams.rs

1// Copyright 2019 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Provides an interface for playing and recording audio.
6//!
7//! When implementing an audio playback system, the `StreamSource` trait is implemented.
8//! Implementors of this trait allow creation of `PlaybackBufferStream` objects. The
9//! `PlaybackBufferStream` provides the actual audio buffers to be filled with audio samples. These
10//! buffers can be filled with `write_playback_buffer`.
11//!
12//! Users playing audio fill the provided buffers with audio. When a `PlaybackBuffer` is dropped,
13//! the samples written to it are committed to the `PlaybackBufferStream` it came from.
14//!
15//! ```
16//! use audio_streams::{BoxError, PlaybackBuffer, SampleFormat, StreamSource, NoopStreamSource};
17//! use std::io::Write;
18//!
19//! const buffer_size: usize = 120;
20//! const num_channels: usize = 2;
21//!
22//! # fn main() -> std::result::Result<(), BoxError> {
23//! let mut stream_source = NoopStreamSource::new();
24//! let sample_format = SampleFormat::S16LE;
25//! let frame_size = num_channels * sample_format.sample_bytes();
26//!
27//! let (_, mut stream) = stream_source
28//!     .new_playback_stream(num_channels, sample_format, 48000, buffer_size)?;
29//! // Play 10 buffers of DC.
30//! let mut buf = Vec::new();
31//! buf.resize(buffer_size * frame_size, 0xa5u8);
32//! for _ in 0..10 {
33//!     let mut copy_cb = |stream_buffer: &mut PlaybackBuffer| {
34//!         assert_eq!(stream_buffer.write(&buf)?, buffer_size * frame_size);
35//!         Ok(())
36//!     };
37//!     stream.write_playback_buffer(&mut copy_cb)?;
38//! }
39//! # Ok (())
40//! # }
41//! ```
42pub mod async_api;
43
44use std::cmp::min;
45use std::error;
46use std::fmt;
47use std::fmt::Display;
48use std::io;
49use std::io::Read;
50use std::io::Write;
51#[cfg(unix)]
52use std::os::unix::io::RawFd as RawDescriptor;
53#[cfg(windows)]
54use std::os::windows::io::RawHandle as RawDescriptor;
55use std::result::Result;
56use std::str::FromStr;
57use std::time::Duration;
58use std::time::Instant;
59
60pub use async_api::AsyncStream;
61pub use async_api::AudioStreamsExecutor;
62use async_trait::async_trait;
63use remain::sorted;
64use serde::Deserialize;
65use serde::Serialize;
66use thiserror::Error;
67
68#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
69pub enum SampleFormat {
70    U8,
71    S16LE,
72    S24LE,
73    S32LE,
74}
75
76impl SampleFormat {
77    pub fn sample_bytes(self) -> usize {
78        use SampleFormat::*;
79        match self {
80            U8 => 1,
81            S16LE => 2,
82            S24LE => 4, // Not a typo, S24_LE samples are stored in 4 byte chunks.
83            S32LE => 4,
84        }
85    }
86}
87
88impl Display for SampleFormat {
89    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
90        use SampleFormat::*;
91        match self {
92            U8 => write!(f, "Unsigned 8 bit"),
93            S16LE => write!(f, "Signed 16 bit Little Endian"),
94            S24LE => write!(f, "Signed 24 bit Little Endian"),
95            S32LE => write!(f, "Signed 32 bit Little Endian"),
96        }
97    }
98}
99
100impl FromStr for SampleFormat {
101    type Err = SampleFormatError;
102    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
103        match s {
104            "U8" => Ok(SampleFormat::U8),
105            "S16_LE" => Ok(SampleFormat::S16LE),
106            "S24_LE" => Ok(SampleFormat::S24LE),
107            "S32_LE" => Ok(SampleFormat::S32LE),
108            _ => Err(SampleFormatError::InvalidSampleFormat),
109        }
110    }
111}
112
113/// Errors that are possible from a `SampleFormat`.
114#[sorted]
115#[derive(Error, Debug)]
116pub enum SampleFormatError {
117    #[error("Must be in [U8, S16_LE, S24_LE, S32_LE]")]
118    InvalidSampleFormat,
119}
120
121/// Valid directions of an audio stream.
122#[derive(Copy, Clone, Debug, PartialEq, Eq)]
123pub enum StreamDirection {
124    Playback,
125    Capture,
126}
127
128/// Valid effects for an audio stream.
129#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
130pub enum StreamEffect {
131    #[default]
132    NoEffect,
133    #[serde(alias = "aec")]
134    EchoCancellation,
135}
136
137pub mod capture;
138pub mod shm_streams;
139
140/// Errors that can pass across threads.
141pub type BoxError = Box<dyn error::Error + Send + Sync>;
142
143/// Errors that are possible from a `StreamEffect`.
144#[sorted]
145#[derive(Error, Debug)]
146pub enum StreamEffectError {
147    #[error("Must be in [EchoCancellation, aec]")]
148    InvalidEffect,
149}
150
151impl FromStr for StreamEffect {
152    type Err = StreamEffectError;
153    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
154        match s {
155            "EchoCancellation" | "aec" => Ok(StreamEffect::EchoCancellation),
156            _ => Err(StreamEffectError::InvalidEffect),
157        }
158    }
159}
160
161#[sorted]
162#[derive(Error, Debug)]
163pub enum Error {
164    #[error("Unimplemented")]
165    Unimplemented,
166}
167
168/// `StreamSourceGenerator` is a trait used to abstract types that create [`StreamSource`].
169/// It can be used when multiple types of `StreamSource` are needed.
170pub trait StreamSourceGenerator: Sync + Send {
171    fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError>;
172}
173
174/// `StreamSource` creates streams for playback or capture of audio.
175#[async_trait(?Send)]
176pub trait StreamSource: Send {
177    /// Returns a stream control and buffer generator object. These are separate as the buffer
178    /// generator might want to be passed to the audio stream.
179    #[allow(clippy::type_complexity)]
180    fn new_playback_stream(
181        &mut self,
182        num_channels: usize,
183        format: SampleFormat,
184        frame_rate: u32,
185        buffer_size: usize,
186    ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>;
187
188    /// Returns a stream control and async buffer generator object. These are separate as the buffer
189    /// generator might want to be passed to the audio stream.
190    #[allow(clippy::type_complexity)]
191    fn new_async_playback_stream(
192        &mut self,
193        _num_channels: usize,
194        _format: SampleFormat,
195        _frame_rate: u32,
196        _buffer_size: usize,
197        _ex: &dyn AudioStreamsExecutor,
198    ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
199        Err(Box::new(Error::Unimplemented))
200    }
201
202    /// Returns a stream control and async buffer generator object asynchronously.
203    /// Default implementation calls and blocks on `new_async_playback_stream()`.
204    #[allow(clippy::type_complexity)]
205    async fn async_new_async_playback_stream(
206        &mut self,
207        num_channels: usize,
208        format: SampleFormat,
209        frame_rate: u32,
210        buffer_size: usize,
211        ex: &dyn AudioStreamsExecutor,
212    ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
213        self.new_async_playback_stream(num_channels, format, frame_rate, buffer_size, ex)
214    }
215
216    /// Returns a stream control and buffer generator object. These are separate as the buffer
217    /// generator might want to be passed to the audio stream.
218    /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
219    #[allow(clippy::type_complexity)]
220    fn new_capture_stream(
221        &mut self,
222        num_channels: usize,
223        format: SampleFormat,
224        frame_rate: u32,
225        buffer_size: usize,
226        _effects: &[StreamEffect],
227    ) -> Result<
228        (
229            Box<dyn StreamControl>,
230            Box<dyn capture::CaptureBufferStream>,
231        ),
232        BoxError,
233    > {
234        Ok((
235            Box::new(NoopStreamControl::new()),
236            Box::new(capture::NoopCaptureStream::new(
237                num_channels,
238                format,
239                frame_rate,
240                buffer_size,
241            )),
242        ))
243    }
244
245    /// Returns a stream control and async buffer generator object. These are separate as the buffer
246    /// generator might want to be passed to the audio stream.
247    /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
248    #[allow(clippy::type_complexity)]
249    fn new_async_capture_stream(
250        &mut self,
251        num_channels: usize,
252        format: SampleFormat,
253        frame_rate: u32,
254        buffer_size: usize,
255        _effects: &[StreamEffect],
256        _ex: &dyn AudioStreamsExecutor,
257    ) -> Result<
258        (
259            Box<dyn StreamControl>,
260            Box<dyn capture::AsyncCaptureBufferStream>,
261        ),
262        BoxError,
263    > {
264        Ok((
265            Box::new(NoopStreamControl::new()),
266            Box::new(capture::NoopCaptureStream::new(
267                num_channels,
268                format,
269                frame_rate,
270                buffer_size,
271            )),
272        ))
273    }
274
275    /// Returns a stream control and async buffer generator object asynchronously.
276    /// Default implementation calls and blocks on `new_async_capture_stream()`.
277    #[allow(clippy::type_complexity)]
278    async fn async_new_async_capture_stream(
279        &mut self,
280        num_channels: usize,
281        format: SampleFormat,
282        frame_rate: u32,
283        buffer_size: usize,
284        effects: &[StreamEffect],
285        ex: &dyn AudioStreamsExecutor,
286    ) -> Result<
287        (
288            Box<dyn StreamControl>,
289            Box<dyn capture::AsyncCaptureBufferStream>,
290        ),
291        BoxError,
292    > {
293        self.new_async_capture_stream(num_channels, format, frame_rate, buffer_size, effects, ex)
294    }
295
296    /// Returns any open file descriptors needed by the implementor. The FD list helps users of the
297    /// StreamSource enter Linux jails making sure not to close needed FDs.
298    fn keep_rds(&self) -> Option<Vec<RawDescriptor>> {
299        None
300    }
301}
302
303/// `PlaybackBufferStream` provides `PlaybackBuffer`s to fill with audio samples for playback.
304pub trait PlaybackBufferStream: Send {
305    fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>;
306
307    /// Call `f` with a `PlaybackBuffer`, and trigger the buffer done call back after. `f` should
308    /// write playback data to the given `PlaybackBuffer`.
309    fn write_playback_buffer<'b, 's: 'b>(
310        &'s mut self,
311        f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>,
312    ) -> Result<(), BoxError> {
313        let mut buf = self.next_playback_buffer()?;
314        f(&mut buf)?;
315        buf.commit();
316        Ok(())
317    }
318}
319
320impl<S: PlaybackBufferStream + ?Sized> PlaybackBufferStream for &mut S {
321    fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
322        (**self).next_playback_buffer()
323    }
324}
325
326/// `PlaybackBufferStream` provides `PlaybackBuffer`s asynchronously to fill with audio samples for
327/// playback.
328#[async_trait(?Send)]
329pub trait AsyncPlaybackBufferStream: Send {
330    async fn next_playback_buffer<'a>(
331        &'a mut self,
332        _ex: &dyn AudioStreamsExecutor,
333    ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>;
334}
335
336#[async_trait(?Send)]
337impl<S: AsyncPlaybackBufferStream + ?Sized> AsyncPlaybackBufferStream for &mut S {
338    async fn next_playback_buffer<'a>(
339        &'a mut self,
340        ex: &dyn AudioStreamsExecutor,
341    ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
342        (**self).next_playback_buffer(ex).await
343    }
344}
345
346/// Call `f` with a `AsyncPlaybackBuffer`, and trigger the buffer done call back after. `f` should
347/// write playback data to the given `AsyncPlaybackBuffer`.
348///
349/// This cannot be a trait method because trait methods with generic parameters are not object safe.
350pub async fn async_write_playback_buffer<F>(
351    stream: &mut dyn AsyncPlaybackBufferStream,
352    f: F,
353    ex: &dyn AudioStreamsExecutor,
354) -> Result<(), BoxError>
355where
356    F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,
357{
358    let mut buf = stream.next_playback_buffer(ex).await?;
359    f(&mut buf)?;
360    buf.commit().await;
361    Ok(())
362}
363
364/// `StreamControl` provides a way to set the volume and mute states of a stream. `StreamControl`
365/// is separate from the stream so it can be owned by a different thread if needed.
366pub trait StreamControl: Send + Sync {
367    fn set_volume(&mut self, _scaler: f64) {}
368    fn set_mute(&mut self, _mute: bool) {}
369}
370
371/// `BufferCommit` is a cleanup funcion that must be called before dropping the buffer,
372/// allowing arbitrary code to be run after the buffer is filled or read by the user.
373pub trait BufferCommit {
374    /// `write_playback_buffer` or `read_capture_buffer` would trigger this automatically. `nframes`
375    /// indicates the number of audio frames that were read or written to the device.
376    fn commit(&mut self, nframes: usize);
377    /// `latency_bytes` the current device latency.
378    /// For playback it means how many bytes need to be consumed
379    /// before the current playback buffer will be played.
380    /// For capture it means the latency in terms of bytes that the capture buffer was recorded.
381    fn latency_bytes(&self) -> u32 {
382        0
383    }
384}
385
386/// `AsyncBufferCommit` is a cleanup funcion that must be called before dropping the buffer,
387/// allowing arbitrary code to be run after the buffer is filled or read by the user.
388#[async_trait(?Send)]
389pub trait AsyncBufferCommit {
390    /// `async_write_playback_buffer` or `async_read_capture_buffer` would trigger this
391    /// automatically. `nframes` indicates the number of audio frames that were read or written to
392    /// the device.
393    async fn commit(&mut self, nframes: usize);
394    /// `latency_bytes` the current device latency.
395    /// For playback it means how many bytes need to be consumed
396    /// before the current playback buffer will be played.
397    /// For capture it means the latency in terms of bytes that the capture buffer was recorded.
398    fn latency_bytes(&self) -> u32 {
399        0
400    }
401}
402
403/// Errors that are possible from a `PlaybackBuffer`.
404#[sorted]
405#[derive(Error, Debug)]
406pub enum PlaybackBufferError {
407    #[error("Invalid buffer length")]
408    InvalidLength,
409    #[error("Slicing of playback buffer out of bounds")]
410    SliceOutOfBounds,
411}
412
413/// `AudioBuffer` is one buffer that holds buffer_size audio frames.
414/// It is the inner data of `PlaybackBuffer` and `CaptureBuffer`.
415struct AudioBuffer<'a> {
416    buffer: &'a mut [u8],
417    offset: usize,     // Read or Write offset in frames.
418    frame_size: usize, // Size of a frame in bytes.
419}
420
421impl AudioBuffer<'_> {
422    /// Returns the number of audio frames that fit in the buffer.
423    pub fn frame_capacity(&self) -> usize {
424        self.buffer.len() / self.frame_size
425    }
426
427    fn calc_len(&self, size: usize) -> usize {
428        min(
429            size / self.frame_size * self.frame_size,
430            self.buffer.len() - self.offset,
431        )
432    }
433
434    /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
435    pub fn write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
436        // only write complete frames.
437        let len = self.calc_len(size);
438        cb(&mut self.buffer[self.offset..(self.offset + len)]);
439        self.offset += len;
440        Ok(len)
441    }
442
443    /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
444    pub fn read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
445        let len = self.calc_len(size);
446        cb(&self.buffer[self.offset..(self.offset + len)]);
447        self.offset += len;
448        Ok(len)
449    }
450
451    /// Copy data from an io::Reader
452    pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
453        let bytes = reader.read(&mut self.buffer[self.offset..])?;
454        self.offset += bytes;
455        Ok(bytes)
456    }
457
458    /// Copy data to an io::Write
459    pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
460        let bytes = writer.write(&self.buffer[self.offset..])?;
461        self.offset += bytes;
462        Ok(bytes)
463    }
464}
465
466impl Write for AudioBuffer<'_> {
467    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
468        let written = (&mut self.buffer[self.offset..]).write(&buf[..buf.len()])?;
469        self.offset += written;
470        Ok(written)
471    }
472
473    fn flush(&mut self) -> io::Result<()> {
474        Ok(())
475    }
476}
477
478impl Read for AudioBuffer<'_> {
479    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
480        let len = buf.len() / self.frame_size * self.frame_size;
481        let written = (&mut buf[..len]).write(&self.buffer[self.offset..])?;
482        self.offset += written;
483        Ok(written)
484    }
485}
486
487/// `PlaybackBuffer` is one buffer that holds buffer_size audio frames. It is used to temporarily
488/// allow access to an audio buffer and notifes the owning stream of write completion when dropped.
489pub struct PlaybackBuffer<'a> {
490    buffer: AudioBuffer<'a>,
491    drop: &'a mut dyn BufferCommit,
492}
493
494impl<'a> PlaybackBuffer<'a> {
495    /// Creates a new `PlaybackBuffer` that holds a reference to the backing memory specified in
496    /// `buffer`.
497    pub fn new<F>(
498        frame_size: usize,
499        buffer: &'a mut [u8],
500        drop: &'a mut F,
501    ) -> Result<Self, PlaybackBufferError>
502    where
503        F: BufferCommit,
504    {
505        if buffer.len() % frame_size != 0 {
506            return Err(PlaybackBufferError::InvalidLength);
507        }
508
509        Ok(PlaybackBuffer {
510            buffer: AudioBuffer {
511                buffer,
512                offset: 0,
513                frame_size,
514            },
515            drop,
516        })
517    }
518
519    /// Returns the number of audio frames that fit in the buffer.
520    pub fn frame_capacity(&self) -> usize {
521        self.buffer.frame_capacity()
522    }
523
524    /// This triggers the commit of `BufferCommit`. This should be called after the data is copied
525    /// to the buffer.
526    pub fn commit(&mut self) {
527        self.drop
528            .commit(self.buffer.offset / self.buffer.frame_size);
529    }
530
531    /// It returns how many bytes need to be consumed
532    /// before the current playback buffer will be played.
533    pub fn latency_bytes(&self) -> u32 {
534        self.drop.latency_bytes()
535    }
536
537    /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
538    pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
539        self.buffer.write_copy_cb(size, cb)
540    }
541}
542
543impl Write for PlaybackBuffer<'_> {
544    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
545        self.buffer.write(buf)
546    }
547
548    fn flush(&mut self) -> io::Result<()> {
549        self.buffer.flush()
550    }
551}
552
553/// `AsyncPlaybackBuffer` is the async version of `PlaybackBuffer`.
554pub struct AsyncPlaybackBuffer<'a> {
555    buffer: AudioBuffer<'a>,
556    trigger: &'a mut dyn AsyncBufferCommit,
557}
558
559impl<'a> AsyncPlaybackBuffer<'a> {
560    /// Creates a new `AsyncPlaybackBuffer` that holds a reference to the backing memory specified
561    /// in `buffer`.
562    pub fn new<F>(
563        frame_size: usize,
564        buffer: &'a mut [u8],
565        trigger: &'a mut F,
566    ) -> Result<Self, PlaybackBufferError>
567    where
568        F: AsyncBufferCommit,
569    {
570        if buffer.len() % frame_size != 0 {
571            return Err(PlaybackBufferError::InvalidLength);
572        }
573
574        Ok(AsyncPlaybackBuffer {
575            buffer: AudioBuffer {
576                buffer,
577                offset: 0,
578                frame_size,
579            },
580            trigger,
581        })
582    }
583
584    /// Returns the number of audio frames that fit in the buffer.
585    pub fn frame_capacity(&self) -> usize {
586        self.buffer.frame_capacity()
587    }
588
589    /// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
590    /// copied to the buffer.
591    pub async fn commit(&mut self) {
592        self.trigger
593            .commit(self.buffer.offset / self.buffer.frame_size)
594            .await;
595    }
596
597    /// It returns the latency in terms of bytes that the capture buffer was recorded.
598    pub fn latency_bytes(&self) -> u32 {
599        self.trigger.latency_bytes()
600    }
601
602    /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
603    pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
604        self.buffer.write_copy_cb(size, cb)
605    }
606
607    /// Copy data from an io::Reader
608    pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
609        self.buffer.copy_from(reader)
610    }
611}
612
613impl Write for AsyncPlaybackBuffer<'_> {
614    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
615        self.buffer.write(buf)
616    }
617
618    fn flush(&mut self) -> io::Result<()> {
619        self.buffer.flush()
620    }
621}
622/// Stream that accepts playback samples but drops them.
623pub struct NoopStream {
624    buffer: Vec<u8>,
625    frame_size: usize,
626    interval: Duration,
627    next_frame: Duration,
628    start_time: Option<Instant>,
629    buffer_drop: NoopBufferCommit,
630}
631
632/// NoopStream data that is needed from the buffer complete callback.
633struct NoopBufferCommit {
634    which_buffer: bool,
635}
636
637impl BufferCommit for NoopBufferCommit {
638    fn commit(&mut self, _nwritten: usize) {
639        // When a buffer completes, switch to the other one.
640        self.which_buffer ^= true;
641    }
642}
643
644#[async_trait(?Send)]
645impl AsyncBufferCommit for NoopBufferCommit {
646    async fn commit(&mut self, _nwritten: usize) {
647        // When a buffer completes, switch to the other one.
648        self.which_buffer ^= true;
649    }
650}
651
652impl NoopStream {
653    pub fn new(
654        num_channels: usize,
655        format: SampleFormat,
656        frame_rate: u32,
657        buffer_size: usize,
658    ) -> Self {
659        let frame_size = format.sample_bytes() * num_channels;
660        let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
661        NoopStream {
662            buffer: vec![0; buffer_size * frame_size],
663            frame_size,
664            interval,
665            next_frame: interval,
666            start_time: None,
667            buffer_drop: NoopBufferCommit {
668                which_buffer: false,
669            },
670        }
671    }
672}
673
674impl PlaybackBufferStream for NoopStream {
675    fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
676        if let Some(start_time) = self.start_time {
677            let elapsed = start_time.elapsed();
678            if elapsed < self.next_frame {
679                std::thread::sleep(self.next_frame - elapsed);
680            }
681            self.next_frame += self.interval;
682        } else {
683            self.start_time = Some(Instant::now());
684            self.next_frame = self.interval;
685        }
686        Ok(PlaybackBuffer::new(
687            self.frame_size,
688            &mut self.buffer,
689            &mut self.buffer_drop,
690        )?)
691    }
692}
693
694#[async_trait(?Send)]
695impl AsyncPlaybackBufferStream for NoopStream {
696    async fn next_playback_buffer<'a>(
697        &'a mut self,
698        ex: &dyn AudioStreamsExecutor,
699    ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
700        if let Some(start_time) = self.start_time {
701            let elapsed = start_time.elapsed();
702            if elapsed < self.next_frame {
703                ex.delay(self.next_frame - elapsed).await?;
704            }
705            self.next_frame += self.interval;
706        } else {
707            self.start_time = Some(Instant::now());
708            self.next_frame = self.interval;
709        }
710        Ok(AsyncPlaybackBuffer::new(
711            self.frame_size,
712            &mut self.buffer,
713            &mut self.buffer_drop,
714        )?)
715    }
716}
717
718/// No-op control for `NoopStream`s.
719#[derive(Default)]
720pub struct NoopStreamControl;
721
722impl NoopStreamControl {
723    pub fn new() -> Self {
724        NoopStreamControl {}
725    }
726}
727
728impl StreamControl for NoopStreamControl {}
729
730/// Source of `NoopStream` and `NoopStreamControl` objects.
731#[derive(Default)]
732pub struct NoopStreamSource;
733
734impl NoopStreamSource {
735    pub fn new() -> Self {
736        NoopStreamSource {}
737    }
738}
739
740impl StreamSource for NoopStreamSource {
741    #[allow(clippy::type_complexity)]
742    fn new_playback_stream(
743        &mut self,
744        num_channels: usize,
745        format: SampleFormat,
746        frame_rate: u32,
747        buffer_size: usize,
748    ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError> {
749        Ok((
750            Box::new(NoopStreamControl::new()),
751            Box::new(NoopStream::new(
752                num_channels,
753                format,
754                frame_rate,
755                buffer_size,
756            )),
757        ))
758    }
759
760    #[allow(clippy::type_complexity)]
761    fn new_async_playback_stream(
762        &mut self,
763        num_channels: usize,
764        format: SampleFormat,
765        frame_rate: u32,
766        buffer_size: usize,
767        _ex: &dyn AudioStreamsExecutor,
768    ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
769        Ok((
770            Box::new(NoopStreamControl::new()),
771            Box::new(NoopStream::new(
772                num_channels,
773                format,
774                frame_rate,
775                buffer_size,
776            )),
777        ))
778    }
779}
780
781/// `NoopStreamSourceGenerator` is a struct that implements [`StreamSourceGenerator`]
782/// to generate [`NoopStreamSource`].
783pub struct NoopStreamSourceGenerator;
784
785impl NoopStreamSourceGenerator {
786    pub fn new() -> Self {
787        NoopStreamSourceGenerator {}
788    }
789}
790
791impl Default for NoopStreamSourceGenerator {
792    fn default() -> Self {
793        Self::new()
794    }
795}
796
797impl StreamSourceGenerator for NoopStreamSourceGenerator {
798    fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError> {
799        Ok(Box::new(NoopStreamSource))
800    }
801}
802
803#[cfg(test)]
804mod tests {
805    use futures::FutureExt;
806    use io::Write;
807
808    use super::async_api::test::TestExecutor;
809    use super::*;
810
811    #[test]
812    fn invalid_buffer_length() {
813        // Playback buffers can't be created with a size that isn't divisible by the frame size.
814        let mut pb_buf = [0xa5u8; 480 * 2 * 2 + 1];
815        let mut buffer_drop = NoopBufferCommit {
816            which_buffer: false,
817        };
818        assert!(PlaybackBuffer::new(2, &mut pb_buf, &mut buffer_drop).is_err());
819    }
820
821    #[test]
822    fn audio_buffer_copy_from() {
823        const PERIOD_SIZE: usize = 8192;
824        const NUM_CHANNELS: usize = 6;
825        const FRAME_SIZE: usize = NUM_CHANNELS * 2;
826        let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
827        let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
828        let mut aud_buf = AudioBuffer {
829            buffer: &mut dst_buf,
830            offset: 0,
831            frame_size: FRAME_SIZE,
832        };
833        aud_buf
834            .copy_from(&mut &src_buf[..])
835            .expect("all data should be copied.");
836        assert_eq!(dst_buf, src_buf);
837    }
838
839    #[test]
840    fn audio_buffer_copy_from_repeat() {
841        const PERIOD_SIZE: usize = 8192;
842        const NUM_CHANNELS: usize = 6;
843        const FRAME_SIZE: usize = NUM_CHANNELS * 2;
844        let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
845        let mut aud_buf = AudioBuffer {
846            buffer: &mut dst_buf,
847            offset: 0,
848            frame_size: FRAME_SIZE,
849        };
850        let bytes = aud_buf
851            .copy_from(&mut io::repeat(1))
852            .expect("all data should be copied.");
853        assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
854        assert_eq!(dst_buf, [1u8; PERIOD_SIZE * FRAME_SIZE]);
855    }
856
857    #[test]
858    fn audio_buffer_copy_to() {
859        const PERIOD_SIZE: usize = 8192;
860        const NUM_CHANNELS: usize = 6;
861        const FRAME_SIZE: usize = NUM_CHANNELS * 2;
862        let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
863        let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
864        let mut aud_buf = AudioBuffer {
865            buffer: &mut src_buf,
866            offset: 0,
867            frame_size: FRAME_SIZE,
868        };
869        aud_buf
870            .copy_to(&mut &mut dst_buf[..])
871            .expect("all data should be copied.");
872        assert_eq!(dst_buf, src_buf);
873    }
874
875    #[test]
876    fn audio_buffer_copy_to_sink() {
877        const PERIOD_SIZE: usize = 8192;
878        const NUM_CHANNELS: usize = 6;
879        const FRAME_SIZE: usize = NUM_CHANNELS * 2;
880        let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
881        let mut aud_buf = AudioBuffer {
882            buffer: &mut src_buf,
883            offset: 0,
884            frame_size: FRAME_SIZE,
885        };
886        let bytes = aud_buf
887            .copy_to(&mut io::sink())
888            .expect("all data should be copied.");
889        assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
890    }
891
892    #[test]
893    fn io_copy_audio_buffer() {
894        const PERIOD_SIZE: usize = 8192;
895        const NUM_CHANNELS: usize = 6;
896        const FRAME_SIZE: usize = NUM_CHANNELS * 2;
897        let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
898        let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
899        let mut aud_buf = AudioBuffer {
900            buffer: &mut dst_buf,
901            offset: 0,
902            frame_size: FRAME_SIZE,
903        };
904        io::copy(&mut &src_buf[..], &mut aud_buf).expect("all data should be copied.");
905        assert_eq!(dst_buf, src_buf);
906    }
907
908    #[test]
909    fn commit() {
910        struct TestCommit {
911            frame_count: usize,
912        }
913        impl BufferCommit for TestCommit {
914            fn commit(&mut self, nwritten: usize) {
915                self.frame_count += nwritten;
916            }
917        }
918        let mut test_commit = TestCommit { frame_count: 0 };
919        {
920            const FRAME_SIZE: usize = 4;
921            let mut buf = [0u8; 480 * FRAME_SIZE];
922            let mut pb_buf = PlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
923            pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
924            pb_buf.commit();
925        }
926        assert_eq!(test_commit.frame_count, 480);
927    }
928
929    #[test]
930    fn sixteen_bit_stereo() {
931        let mut server = NoopStreamSource::new();
932        let (_, mut stream) = server
933            .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
934            .unwrap();
935        let mut copy_cb = |buf: &mut PlaybackBuffer| {
936            assert_eq!(buf.buffer.frame_capacity(), 480);
937            let pb_buf = [0xa5u8; 480 * 2 * 2];
938            assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
939            Ok(())
940        };
941        stream.write_playback_buffer(&mut copy_cb).unwrap();
942    }
943
944    #[test]
945    fn consumption_rate() {
946        let mut server = NoopStreamSource::new();
947        let (_, mut stream) = server
948            .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
949            .unwrap();
950        let start = Instant::now();
951        {
952            let mut copy_cb = |buf: &mut PlaybackBuffer| {
953                let pb_buf = [0xa5u8; 480 * 2 * 2];
954                assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
955                Ok(())
956            };
957            stream.write_playback_buffer(&mut copy_cb).unwrap();
958        }
959        // The second call should block until the first buffer is consumed.
960        let mut assert_cb = |_: &mut PlaybackBuffer| {
961            let elapsed = start.elapsed();
962            assert!(
963                elapsed > Duration::from_millis(10),
964                "next_playback_buffer didn't block long enough {}",
965                elapsed.subsec_millis()
966            );
967            Ok(())
968        };
969        stream.write_playback_buffer(&mut assert_cb).unwrap();
970    }
971
972    #[test]
973    fn async_commit() {
974        struct TestCommit {
975            frame_count: usize,
976        }
977        #[async_trait(?Send)]
978        impl AsyncBufferCommit for TestCommit {
979            async fn commit(&mut self, nwritten: usize) {
980                self.frame_count += nwritten;
981            }
982        }
983        async fn this_test() {
984            let mut test_commit = TestCommit { frame_count: 0 };
985            {
986                const FRAME_SIZE: usize = 4;
987                let mut buf = [0u8; 480 * FRAME_SIZE];
988                let mut pb_buf =
989                    AsyncPlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
990                pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
991                pb_buf.commit().await;
992            }
993            assert_eq!(test_commit.frame_count, 480);
994        }
995
996        this_test().now_or_never();
997    }
998
999    #[test]
1000    fn consumption_rate_async() {
1001        async fn this_test(ex: &TestExecutor) {
1002            let mut server = NoopStreamSource::new();
1003            let (_, mut stream) = server
1004                .new_async_playback_stream(2, SampleFormat::S16LE, 48000, 480, ex)
1005                .unwrap();
1006            let start = Instant::now();
1007            {
1008                let copy_func = |buf: &mut AsyncPlaybackBuffer| {
1009                    let pb_buf = [0xa5u8; 480 * 2 * 2];
1010                    assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
1011                    Ok(())
1012                };
1013                async_write_playback_buffer(&mut *stream, copy_func, ex)
1014                    .await
1015                    .unwrap();
1016            }
1017            // The second call should block until the first buffer is consumed.
1018            let assert_func = |_: &mut AsyncPlaybackBuffer| {
1019                let elapsed = start.elapsed();
1020                assert!(
1021                    elapsed > Duration::from_millis(10),
1022                    "write_playback_buffer didn't block long enough {}",
1023                    elapsed.subsec_millis()
1024                );
1025                Ok(())
1026            };
1027
1028            async_write_playback_buffer(&mut *stream, assert_func, ex)
1029                .await
1030                .unwrap();
1031        }
1032
1033        let ex = TestExecutor {};
1034        this_test(&ex).now_or_never();
1035    }
1036
1037    #[test]
1038    fn generate_noop_stream_source() {
1039        let generator: Box<dyn StreamSourceGenerator> = Box::new(NoopStreamSourceGenerator::new());
1040        generator
1041            .generate()
1042            .expect("failed to generate stream source");
1043    }
1044}