audio_streams/
capture.rs

1// Copyright 2019 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! ```
6//! use audio_streams::{BoxError, capture::CaptureBuffer, SampleFormat, StreamSource,
7//!     NoopStreamSource};
8//! use std::io::Read;
9//!
10//! const buffer_size: usize = 120;
11//! const num_channels: usize = 2;
12//!
13//! # fn main() -> std::result::Result<(),BoxError> {
14//! let mut stream_source = NoopStreamSource::new();
15//! let sample_format = SampleFormat::S16LE;
16//! let frame_size = num_channels * sample_format.sample_bytes();
17//!
18//! let (_, mut stream) = stream_source
19//!     .new_capture_stream(num_channels, sample_format, 48000, buffer_size, &[])?;
20//! // Capture 10 buffers of zeros.
21//! let mut buf = Vec::new();
22//! buf.resize(buffer_size * frame_size, 0xa5u8);
23//! for _ in 0..10 {
24//!     let mut copy_func = |stream_buffer: &mut CaptureBuffer| {
25//!         assert_eq!(stream_buffer.read(&mut buf)?, buffer_size * frame_size);
26//!         Ok(())
27//!     };
28//!     stream.read_capture_buffer(&mut copy_func)?;
29//! }
30//! # Ok (())
31//! # }
32//! ```
33
34use std::io;
35use std::io::Read;
36use std::io::Write;
37use std::time::Duration;
38use std::time::Instant;
39
40use async_trait::async_trait;
41use remain::sorted;
42use thiserror::Error;
43
44use super::async_api::AudioStreamsExecutor;
45use super::AsyncBufferCommit;
46use super::AudioBuffer;
47use super::BoxError;
48use super::BufferCommit;
49use super::NoopBufferCommit;
50use super::SampleFormat;
51
52/// `CaptureBufferStream` provides `CaptureBuffer`s to read with audio samples from capture.
53pub trait CaptureBufferStream: Send {
54    fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>;
55
56    /// Call `f` with a `CaptureBuffer`, and trigger the buffer done call back after. `f` can read
57    /// the capture data from the given `CaptureBuffer`.
58    fn read_capture_buffer<'b, 's: 'b>(
59        &'s mut self,
60        f: &mut dyn FnMut(&mut CaptureBuffer<'b>) -> Result<(), BoxError>,
61    ) -> Result<(), BoxError> {
62        let mut buf = self.next_capture_buffer()?;
63        f(&mut buf)?;
64        buf.commit();
65        Ok(())
66    }
67}
68
69impl<S: CaptureBufferStream + ?Sized> CaptureBufferStream for &mut S {
70    fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
71        (**self).next_capture_buffer()
72    }
73}
74
75#[async_trait(?Send)]
76pub trait AsyncCaptureBufferStream: Send {
77    async fn next_capture_buffer<'a>(
78        &'a mut self,
79        _ex: &dyn AudioStreamsExecutor,
80    ) -> Result<AsyncCaptureBuffer<'a>, BoxError>;
81}
82
83#[async_trait(?Send)]
84impl<S: AsyncCaptureBufferStream + ?Sized> AsyncCaptureBufferStream for &mut S {
85    async fn next_capture_buffer<'a>(
86        &'a mut self,
87        ex: &dyn AudioStreamsExecutor,
88    ) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
89        (**self).next_capture_buffer(ex).await
90    }
91}
92
93/// `CaptureBuffer` contains a block of audio samples got from capture stream. It provides
94/// temporary view to those samples and will notifies capture stream when dropped.
95/// Note that it'll always send `buffer.len() / frame_size` to drop function when it got destroyed
96/// since `CaptureBufferStream` assumes that users get all the samples from the buffer.
97pub struct CaptureBuffer<'a> {
98    buffer: AudioBuffer<'a>,
99    drop: &'a mut dyn BufferCommit,
100}
101
102/// Async version of 'CaptureBuffer`
103pub struct AsyncCaptureBuffer<'a> {
104    buffer: AudioBuffer<'a>,
105    trigger: &'a mut dyn AsyncBufferCommit,
106}
107
108/// Errors that are possible from a `CaptureBuffer`.
109#[sorted]
110#[derive(Error, Debug)]
111pub enum CaptureBufferError {
112    #[error("Invalid buffer length")]
113    InvalidLength,
114}
115
116impl<'a> CaptureBuffer<'a> {
117    /// Creates a new `CaptureBuffer` that holds a reference to the backing memory specified in
118    /// `buffer`.
119    pub fn new<F>(
120        frame_size: usize,
121        buffer: &'a mut [u8],
122        drop: &'a mut F,
123    ) -> Result<Self, CaptureBufferError>
124    where
125        F: BufferCommit,
126    {
127        if buffer.len() % frame_size != 0 {
128            return Err(CaptureBufferError::InvalidLength);
129        }
130
131        Ok(CaptureBuffer {
132            buffer: AudioBuffer {
133                buffer,
134                frame_size,
135                offset: 0,
136            },
137            drop,
138        })
139    }
140
141    /// Returns the number of audio frames that fit in the buffer.
142    pub fn frame_capacity(&self) -> usize {
143        self.buffer.frame_capacity()
144    }
145
146    /// This triggers the callback of `BufferCommit`. This should be called after the data is read
147    /// from the buffer.
148    ///
149    /// Always sends `frame_capacity`.
150    pub fn commit(&mut self) {
151        self.drop.commit(self.frame_capacity());
152    }
153
154    pub fn latency_bytes(&self) -> u32 {
155        self.drop.latency_bytes()
156    }
157
158    /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
159    pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
160        self.buffer.read_copy_cb(size, cb)
161    }
162}
163
164impl Read for CaptureBuffer<'_> {
165    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
166        self.buffer.read(buf)
167    }
168}
169
170impl<'a> AsyncCaptureBuffer<'a> {
171    /// Creates a new `AsyncCaptureBuffer` that holds a reference to the backing memory specified in
172    /// `buffer`.
173    pub fn new<F>(
174        frame_size: usize,
175        buffer: &'a mut [u8],
176        trigger: &'a mut F,
177    ) -> Result<Self, CaptureBufferError>
178    where
179        F: AsyncBufferCommit,
180    {
181        if buffer.len() % frame_size != 0 {
182            return Err(CaptureBufferError::InvalidLength);
183        }
184
185        Ok(AsyncCaptureBuffer {
186            buffer: AudioBuffer {
187                buffer,
188                frame_size,
189                offset: 0,
190            },
191            trigger,
192        })
193    }
194
195    /// Returns the number of audio frames that fit in the buffer.
196    pub fn frame_capacity(&self) -> usize {
197        self.buffer.frame_capacity()
198    }
199
200    /// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
201    /// read from the buffer.
202    ///
203    /// Always sends `frame_capacity`.
204    pub async fn commit(&mut self) {
205        self.trigger.commit(self.frame_capacity()).await;
206    }
207
208    pub fn latency_bytes(&self) -> u32 {
209        self.trigger.latency_bytes()
210    }
211
212    /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
213    pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
214        self.buffer.read_copy_cb(size, cb)
215    }
216
217    /// Copy data to an io::Write
218    pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
219        self.buffer.copy_to(writer)
220    }
221}
222
223impl Read for AsyncCaptureBuffer<'_> {
224    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
225        self.buffer.read(buf)
226    }
227}
228
229/// Stream that provides null capture samples.
230pub struct NoopCaptureStream {
231    buffer: Vec<u8>,
232    frame_size: usize,
233    interval: Duration,
234    next_frame: Duration,
235    start_time: Option<Instant>,
236    buffer_drop: NoopBufferCommit,
237}
238
239impl NoopCaptureStream {
240    pub fn new(
241        num_channels: usize,
242        format: SampleFormat,
243        frame_rate: u32,
244        buffer_size: usize,
245    ) -> Self {
246        let frame_size = format.sample_bytes() * num_channels;
247        let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
248        NoopCaptureStream {
249            buffer: vec![0; buffer_size * frame_size],
250            frame_size,
251            interval,
252            next_frame: interval,
253            start_time: None,
254            buffer_drop: NoopBufferCommit {
255                which_buffer: false,
256            },
257        }
258    }
259}
260
261impl CaptureBufferStream for NoopCaptureStream {
262    fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
263        if let Some(start_time) = self.start_time {
264            let elapsed = start_time.elapsed();
265            if elapsed < self.next_frame {
266                std::thread::sleep(self.next_frame - elapsed);
267            }
268            self.next_frame += self.interval;
269        } else {
270            self.start_time = Some(Instant::now());
271            self.next_frame = self.interval;
272        }
273        Ok(CaptureBuffer::new(
274            self.frame_size,
275            &mut self.buffer,
276            &mut self.buffer_drop,
277        )?)
278    }
279}
280
281#[async_trait(?Send)]
282impl AsyncCaptureBufferStream for NoopCaptureStream {
283    async fn next_capture_buffer<'a>(
284        &'a mut self,
285        ex: &dyn AudioStreamsExecutor,
286    ) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
287        if let Some(start_time) = self.start_time {
288            let elapsed = start_time.elapsed();
289            if elapsed < self.next_frame {
290                ex.delay(self.next_frame - elapsed).await?;
291            }
292            self.next_frame += self.interval;
293        } else {
294            self.start_time = Some(Instant::now());
295            self.next_frame = self.interval;
296        }
297        Ok(AsyncCaptureBuffer::new(
298            self.frame_size,
299            &mut self.buffer,
300            &mut self.buffer_drop,
301        )?)
302    }
303}
304
305/// Call `f` with a `AsyncCaptureBuffer`, and trigger the buffer done call back after. `f` can read
306/// the capture data from the given `AsyncCaptureBuffer`.
307///
308/// This cannot be a trait method because trait methods with generic parameters are not object safe.
309pub async fn async_read_capture_buffer<F>(
310    stream: &mut dyn AsyncCaptureBufferStream,
311    f: F,
312    ex: &dyn AudioStreamsExecutor,
313) -> Result<(), BoxError>
314where
315    F: FnOnce(&mut AsyncCaptureBuffer) -> Result<(), BoxError>,
316{
317    let mut buf = stream.next_capture_buffer(ex).await?;
318    f(&mut buf)?;
319    buf.commit().await;
320    Ok(())
321}
322
323#[cfg(test)]
324mod tests {
325    use futures::FutureExt;
326
327    use super::super::async_api::test::TestExecutor;
328    use super::super::*;
329    use super::*;
330
331    #[test]
332    fn invalid_buffer_length() {
333        // Capture buffers can't be created with a size that isn't divisible by the frame size.
334        let mut cp_buf = [0xa5u8; 480 * 2 * 2 + 1];
335        let mut buffer_drop = NoopBufferCommit {
336            which_buffer: false,
337        };
338        assert!(CaptureBuffer::new(2, &mut cp_buf, &mut buffer_drop).is_err());
339    }
340
341    #[test]
342    fn commit() {
343        struct TestCommit {
344            frame_count: usize,
345        }
346        impl BufferCommit for TestCommit {
347            fn commit(&mut self, nwritten: usize) {
348                self.frame_count += nwritten;
349            }
350        }
351        let mut test_commit = TestCommit { frame_count: 0 };
352        {
353            const FRAME_SIZE: usize = 4;
354            let mut buf = [0u8; 480 * FRAME_SIZE];
355            let mut cp_buf = CaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
356            let mut local_buf = [0u8; 240 * FRAME_SIZE];
357            assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
358            cp_buf.commit();
359        }
360        // This should be 480 no matter how many samples are read.
361        assert_eq!(test_commit.frame_count, 480);
362    }
363
364    #[test]
365    fn sixteen_bit_stereo() {
366        let mut server = NoopStreamSource::new();
367        let (_, mut stream) = server
368            .new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
369            .unwrap();
370        let mut copy_func = |b: &mut CaptureBuffer| {
371            assert_eq!(b.buffer.frame_capacity(), 480);
372            let mut pb_buf = [0xa5u8; 480 * 2 * 2];
373            assert_eq!(b.read(&mut pb_buf).unwrap(), 480 * 2 * 2);
374            Ok(())
375        };
376        stream.read_capture_buffer(&mut copy_func).unwrap();
377    }
378
379    #[test]
380    fn consumption_rate() {
381        let mut server = NoopStreamSource::new();
382        let (_, mut stream) = server
383            .new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
384            .unwrap();
385        let start = Instant::now();
386        {
387            let mut copy_func = |b: &mut CaptureBuffer| {
388                let mut cp_buf = [0xa5u8; 480 * 2 * 2];
389                assert_eq!(b.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
390                for buf in cp_buf.iter() {
391                    assert_eq!(*buf, 0, "Read samples should all be zeros.");
392                }
393                Ok(())
394            };
395            stream.read_capture_buffer(&mut copy_func).unwrap();
396        }
397        // The second call should block until the first buffer is consumed.
398        let mut assert_func = |_: &mut CaptureBuffer| {
399            let elapsed = start.elapsed();
400            assert!(
401                elapsed > Duration::from_millis(10),
402                "next_capture_buffer didn't block long enough {}",
403                elapsed.subsec_millis()
404            );
405            Ok(())
406        };
407        stream.read_capture_buffer(&mut assert_func).unwrap();
408    }
409
410    #[test]
411    fn async_commit() {
412        struct TestCommit {
413            frame_count: usize,
414        }
415        #[async_trait(?Send)]
416        impl AsyncBufferCommit for TestCommit {
417            async fn commit(&mut self, nwritten: usize) {
418                self.frame_count += nwritten;
419            }
420        }
421        async fn this_test() {
422            let mut test_commit = TestCommit { frame_count: 0 };
423            {
424                const FRAME_SIZE: usize = 4;
425                let mut buf = [0u8; 480 * FRAME_SIZE];
426                let mut cp_buf =
427                    AsyncCaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
428                let mut local_buf = [0u8; 240 * FRAME_SIZE];
429                assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
430                cp_buf.commit().await;
431            }
432            // This should be 480 no matter how many samples are read.
433            assert_eq!(test_commit.frame_count, 480);
434        }
435
436        this_test().now_or_never();
437    }
438
439    #[test]
440    fn consumption_rate_async() {
441        async fn this_test(ex: &TestExecutor) {
442            let mut server = NoopStreamSource::new();
443            let (_, mut stream) = server
444                .new_async_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[], ex)
445                .unwrap();
446            let start = Instant::now();
447            {
448                let copy_func = |buf: &mut AsyncCaptureBuffer| {
449                    let mut cp_buf = [0xa5u8; 480 * 2 * 2];
450                    assert_eq!(buf.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
451                    for buf in cp_buf.iter() {
452                        assert_eq!(*buf, 0, "Read samples should all be zeros.");
453                    }
454                    Ok(())
455                };
456                async_read_capture_buffer(&mut *stream, copy_func, ex)
457                    .await
458                    .unwrap();
459            }
460            // The second call should block until the first buffer is consumed.
461            let assert_func = |_: &mut AsyncCaptureBuffer| {
462                let elapsed = start.elapsed();
463                assert!(
464                    elapsed > Duration::from_millis(10),
465                    "write_playback_buffer didn't block long enough {}",
466                    elapsed.subsec_millis()
467                );
468                Ok(())
469            };
470            async_read_capture_buffer(&mut *stream, assert_func, ex)
471                .await
472                .unwrap();
473        }
474
475        let ex = TestExecutor {};
476        this_test(&ex).now_or_never();
477    }
478}