1use std::io;
35use std::io::Read;
36use std::io::Write;
37use std::time::Duration;
38use std::time::Instant;
39
40use async_trait::async_trait;
41use remain::sorted;
42use thiserror::Error;
43
44use super::async_api::AudioStreamsExecutor;
45use super::AsyncBufferCommit;
46use super::AudioBuffer;
47use super::BoxError;
48use super::BufferCommit;
49use super::NoopBufferCommit;
50use super::SampleFormat;
51
52pub trait CaptureBufferStream: Send {
54 fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>;
55
56 fn read_capture_buffer<'b, 's: 'b>(
59 &'s mut self,
60 f: &mut dyn FnMut(&mut CaptureBuffer<'b>) -> Result<(), BoxError>,
61 ) -> Result<(), BoxError> {
62 let mut buf = self.next_capture_buffer()?;
63 f(&mut buf)?;
64 buf.commit();
65 Ok(())
66 }
67}
68
69impl<S: CaptureBufferStream + ?Sized> CaptureBufferStream for &mut S {
70 fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
71 (**self).next_capture_buffer()
72 }
73}
74
75#[async_trait(?Send)]
76pub trait AsyncCaptureBufferStream: Send {
77 async fn next_capture_buffer<'a>(
78 &'a mut self,
79 _ex: &dyn AudioStreamsExecutor,
80 ) -> Result<AsyncCaptureBuffer<'a>, BoxError>;
81}
82
83#[async_trait(?Send)]
84impl<S: AsyncCaptureBufferStream + ?Sized> AsyncCaptureBufferStream for &mut S {
85 async fn next_capture_buffer<'a>(
86 &'a mut self,
87 ex: &dyn AudioStreamsExecutor,
88 ) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
89 (**self).next_capture_buffer(ex).await
90 }
91}
92
93pub struct CaptureBuffer<'a> {
98 buffer: AudioBuffer<'a>,
99 drop: &'a mut dyn BufferCommit,
100}
101
102pub struct AsyncCaptureBuffer<'a> {
104 buffer: AudioBuffer<'a>,
105 trigger: &'a mut dyn AsyncBufferCommit,
106}
107
108#[sorted]
110#[derive(Error, Debug)]
111pub enum CaptureBufferError {
112 #[error("Invalid buffer length")]
113 InvalidLength,
114}
115
116impl<'a> CaptureBuffer<'a> {
117 pub fn new<F>(
120 frame_size: usize,
121 buffer: &'a mut [u8],
122 drop: &'a mut F,
123 ) -> Result<Self, CaptureBufferError>
124 where
125 F: BufferCommit,
126 {
127 if buffer.len() % frame_size != 0 {
128 return Err(CaptureBufferError::InvalidLength);
129 }
130
131 Ok(CaptureBuffer {
132 buffer: AudioBuffer {
133 buffer,
134 frame_size,
135 offset: 0,
136 },
137 drop,
138 })
139 }
140
141 pub fn frame_capacity(&self) -> usize {
143 self.buffer.frame_capacity()
144 }
145
146 pub fn commit(&mut self) {
151 self.drop.commit(self.frame_capacity());
152 }
153
154 pub fn latency_bytes(&self) -> u32 {
155 self.drop.latency_bytes()
156 }
157
158 pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
160 self.buffer.read_copy_cb(size, cb)
161 }
162}
163
164impl Read for CaptureBuffer<'_> {
165 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
166 self.buffer.read(buf)
167 }
168}
169
170impl<'a> AsyncCaptureBuffer<'a> {
171 pub fn new<F>(
174 frame_size: usize,
175 buffer: &'a mut [u8],
176 trigger: &'a mut F,
177 ) -> Result<Self, CaptureBufferError>
178 where
179 F: AsyncBufferCommit,
180 {
181 if buffer.len() % frame_size != 0 {
182 return Err(CaptureBufferError::InvalidLength);
183 }
184
185 Ok(AsyncCaptureBuffer {
186 buffer: AudioBuffer {
187 buffer,
188 frame_size,
189 offset: 0,
190 },
191 trigger,
192 })
193 }
194
195 pub fn frame_capacity(&self) -> usize {
197 self.buffer.frame_capacity()
198 }
199
200 pub async fn commit(&mut self) {
205 self.trigger.commit(self.frame_capacity()).await;
206 }
207
208 pub fn latency_bytes(&self) -> u32 {
209 self.trigger.latency_bytes()
210 }
211
212 pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
214 self.buffer.read_copy_cb(size, cb)
215 }
216
217 pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
219 self.buffer.copy_to(writer)
220 }
221}
222
223impl Read for AsyncCaptureBuffer<'_> {
224 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
225 self.buffer.read(buf)
226 }
227}
228
229pub struct NoopCaptureStream {
231 buffer: Vec<u8>,
232 frame_size: usize,
233 interval: Duration,
234 next_frame: Duration,
235 start_time: Option<Instant>,
236 buffer_drop: NoopBufferCommit,
237}
238
239impl NoopCaptureStream {
240 pub fn new(
241 num_channels: usize,
242 format: SampleFormat,
243 frame_rate: u32,
244 buffer_size: usize,
245 ) -> Self {
246 let frame_size = format.sample_bytes() * num_channels;
247 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
248 NoopCaptureStream {
249 buffer: vec![0; buffer_size * frame_size],
250 frame_size,
251 interval,
252 next_frame: interval,
253 start_time: None,
254 buffer_drop: NoopBufferCommit {
255 which_buffer: false,
256 },
257 }
258 }
259}
260
261impl CaptureBufferStream for NoopCaptureStream {
262 fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
263 if let Some(start_time) = self.start_time {
264 let elapsed = start_time.elapsed();
265 if elapsed < self.next_frame {
266 std::thread::sleep(self.next_frame - elapsed);
267 }
268 self.next_frame += self.interval;
269 } else {
270 self.start_time = Some(Instant::now());
271 self.next_frame = self.interval;
272 }
273 Ok(CaptureBuffer::new(
274 self.frame_size,
275 &mut self.buffer,
276 &mut self.buffer_drop,
277 )?)
278 }
279}
280
281#[async_trait(?Send)]
282impl AsyncCaptureBufferStream for NoopCaptureStream {
283 async fn next_capture_buffer<'a>(
284 &'a mut self,
285 ex: &dyn AudioStreamsExecutor,
286 ) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
287 if let Some(start_time) = self.start_time {
288 let elapsed = start_time.elapsed();
289 if elapsed < self.next_frame {
290 ex.delay(self.next_frame - elapsed).await?;
291 }
292 self.next_frame += self.interval;
293 } else {
294 self.start_time = Some(Instant::now());
295 self.next_frame = self.interval;
296 }
297 Ok(AsyncCaptureBuffer::new(
298 self.frame_size,
299 &mut self.buffer,
300 &mut self.buffer_drop,
301 )?)
302 }
303}
304
305pub async fn async_read_capture_buffer<F>(
310 stream: &mut dyn AsyncCaptureBufferStream,
311 f: F,
312 ex: &dyn AudioStreamsExecutor,
313) -> Result<(), BoxError>
314where
315 F: FnOnce(&mut AsyncCaptureBuffer) -> Result<(), BoxError>,
316{
317 let mut buf = stream.next_capture_buffer(ex).await?;
318 f(&mut buf)?;
319 buf.commit().await;
320 Ok(())
321}
322
323#[cfg(test)]
324mod tests {
325 use futures::FutureExt;
326
327 use super::super::async_api::test::TestExecutor;
328 use super::super::*;
329 use super::*;
330
331 #[test]
332 fn invalid_buffer_length() {
333 let mut cp_buf = [0xa5u8; 480 * 2 * 2 + 1];
335 let mut buffer_drop = NoopBufferCommit {
336 which_buffer: false,
337 };
338 assert!(CaptureBuffer::new(2, &mut cp_buf, &mut buffer_drop).is_err());
339 }
340
341 #[test]
342 fn commit() {
343 struct TestCommit {
344 frame_count: usize,
345 }
346 impl BufferCommit for TestCommit {
347 fn commit(&mut self, nwritten: usize) {
348 self.frame_count += nwritten;
349 }
350 }
351 let mut test_commit = TestCommit { frame_count: 0 };
352 {
353 const FRAME_SIZE: usize = 4;
354 let mut buf = [0u8; 480 * FRAME_SIZE];
355 let mut cp_buf = CaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
356 let mut local_buf = [0u8; 240 * FRAME_SIZE];
357 assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
358 cp_buf.commit();
359 }
360 assert_eq!(test_commit.frame_count, 480);
362 }
363
364 #[test]
365 fn sixteen_bit_stereo() {
366 let mut server = NoopStreamSource::new();
367 let (_, mut stream) = server
368 .new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
369 .unwrap();
370 let mut copy_func = |b: &mut CaptureBuffer| {
371 assert_eq!(b.buffer.frame_capacity(), 480);
372 let mut pb_buf = [0xa5u8; 480 * 2 * 2];
373 assert_eq!(b.read(&mut pb_buf).unwrap(), 480 * 2 * 2);
374 Ok(())
375 };
376 stream.read_capture_buffer(&mut copy_func).unwrap();
377 }
378
379 #[test]
380 fn consumption_rate() {
381 let mut server = NoopStreamSource::new();
382 let (_, mut stream) = server
383 .new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
384 .unwrap();
385 let start = Instant::now();
386 {
387 let mut copy_func = |b: &mut CaptureBuffer| {
388 let mut cp_buf = [0xa5u8; 480 * 2 * 2];
389 assert_eq!(b.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
390 for buf in cp_buf.iter() {
391 assert_eq!(*buf, 0, "Read samples should all be zeros.");
392 }
393 Ok(())
394 };
395 stream.read_capture_buffer(&mut copy_func).unwrap();
396 }
397 let mut assert_func = |_: &mut CaptureBuffer| {
399 let elapsed = start.elapsed();
400 assert!(
401 elapsed > Duration::from_millis(10),
402 "next_capture_buffer didn't block long enough {}",
403 elapsed.subsec_millis()
404 );
405 Ok(())
406 };
407 stream.read_capture_buffer(&mut assert_func).unwrap();
408 }
409
410 #[test]
411 fn async_commit() {
412 struct TestCommit {
413 frame_count: usize,
414 }
415 #[async_trait(?Send)]
416 impl AsyncBufferCommit for TestCommit {
417 async fn commit(&mut self, nwritten: usize) {
418 self.frame_count += nwritten;
419 }
420 }
421 async fn this_test() {
422 let mut test_commit = TestCommit { frame_count: 0 };
423 {
424 const FRAME_SIZE: usize = 4;
425 let mut buf = [0u8; 480 * FRAME_SIZE];
426 let mut cp_buf =
427 AsyncCaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
428 let mut local_buf = [0u8; 240 * FRAME_SIZE];
429 assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
430 cp_buf.commit().await;
431 }
432 assert_eq!(test_commit.frame_count, 480);
434 }
435
436 this_test().now_or_never();
437 }
438
439 #[test]
440 fn consumption_rate_async() {
441 async fn this_test(ex: &TestExecutor) {
442 let mut server = NoopStreamSource::new();
443 let (_, mut stream) = server
444 .new_async_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[], ex)
445 .unwrap();
446 let start = Instant::now();
447 {
448 let copy_func = |buf: &mut AsyncCaptureBuffer| {
449 let mut cp_buf = [0xa5u8; 480 * 2 * 2];
450 assert_eq!(buf.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
451 for buf in cp_buf.iter() {
452 assert_eq!(*buf, 0, "Read samples should all be zeros.");
453 }
454 Ok(())
455 };
456 async_read_capture_buffer(&mut *stream, copy_func, ex)
457 .await
458 .unwrap();
459 }
460 let assert_func = |_: &mut AsyncCaptureBuffer| {
462 let elapsed = start.elapsed();
463 assert!(
464 elapsed > Duration::from_millis(10),
465 "write_playback_buffer didn't block long enough {}",
466 elapsed.subsec_millis()
467 );
468 Ok(())
469 };
470 async_read_capture_buffer(&mut *stream, assert_func, ex)
471 .await
472 .unwrap();
473 }
474
475 let ex = TestExecutor {};
476 this_test(&ex).now_or_never();
477 }
478}