use std::io;
use std::io::Read;
use std::io::Write;
use std::time::Duration;
use std::time::Instant;
use async_trait::async_trait;
use remain::sorted;
use thiserror::Error;
use super::async_api::AudioStreamsExecutor;
use super::AsyncBufferCommit;
use super::AudioBuffer;
use super::BoxError;
use super::BufferCommit;
use super::NoopBufferCommit;
use super::SampleFormat;
pub trait CaptureBufferStream: Send {
fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>;
fn read_capture_buffer<'b, 's: 'b>(
&'s mut self,
f: &mut dyn FnMut(&mut CaptureBuffer<'b>) -> Result<(), BoxError>,
) -> Result<(), BoxError> {
let mut buf = self.next_capture_buffer()?;
f(&mut buf)?;
buf.commit();
Ok(())
}
}
impl<S: CaptureBufferStream + ?Sized> CaptureBufferStream for &mut S {
fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
(**self).next_capture_buffer()
}
}
#[async_trait(?Send)]
pub trait AsyncCaptureBufferStream: Send {
async fn next_capture_buffer<'a>(
&'a mut self,
_ex: &dyn AudioStreamsExecutor,
) -> Result<AsyncCaptureBuffer<'a>, BoxError>;
}
#[async_trait(?Send)]
impl<S: AsyncCaptureBufferStream + ?Sized> AsyncCaptureBufferStream for &mut S {
async fn next_capture_buffer<'a>(
&'a mut self,
ex: &dyn AudioStreamsExecutor,
) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
(**self).next_capture_buffer(ex).await
}
}
pub struct CaptureBuffer<'a> {
buffer: AudioBuffer<'a>,
drop: &'a mut dyn BufferCommit,
}
pub struct AsyncCaptureBuffer<'a> {
buffer: AudioBuffer<'a>,
trigger: &'a mut dyn AsyncBufferCommit,
}
#[sorted]
#[derive(Error, Debug)]
pub enum CaptureBufferError {
#[error("Invalid buffer length")]
InvalidLength,
}
impl<'a> CaptureBuffer<'a> {
pub fn new<F>(
frame_size: usize,
buffer: &'a mut [u8],
drop: &'a mut F,
) -> Result<Self, CaptureBufferError>
where
F: BufferCommit,
{
if buffer.len() % frame_size != 0 {
return Err(CaptureBufferError::InvalidLength);
}
Ok(CaptureBuffer {
buffer: AudioBuffer {
buffer,
frame_size,
offset: 0,
},
drop,
})
}
pub fn frame_capacity(&self) -> usize {
self.buffer.frame_capacity()
}
pub fn commit(&mut self) {
self.drop.commit(self.frame_capacity());
}
pub fn latency_bytes(&self) -> u32 {
self.drop.latency_bytes()
}
pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
self.buffer.read_copy_cb(size, cb)
}
}
impl<'a> Read for CaptureBuffer<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.buffer.read(buf)
}
}
impl<'a> AsyncCaptureBuffer<'a> {
pub fn new<F>(
frame_size: usize,
buffer: &'a mut [u8],
trigger: &'a mut F,
) -> Result<Self, CaptureBufferError>
where
F: AsyncBufferCommit,
{
if buffer.len() % frame_size != 0 {
return Err(CaptureBufferError::InvalidLength);
}
Ok(AsyncCaptureBuffer {
buffer: AudioBuffer {
buffer,
frame_size,
offset: 0,
},
trigger,
})
}
pub fn frame_capacity(&self) -> usize {
self.buffer.frame_capacity()
}
pub async fn commit(&mut self) {
self.trigger.commit(self.frame_capacity()).await;
}
pub fn latency_bytes(&self) -> u32 {
self.trigger.latency_bytes()
}
pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
self.buffer.read_copy_cb(size, cb)
}
pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
self.buffer.copy_to(writer)
}
}
impl<'a> Read for AsyncCaptureBuffer<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.buffer.read(buf)
}
}
pub struct NoopCaptureStream {
buffer: Vec<u8>,
frame_size: usize,
interval: Duration,
next_frame: Duration,
start_time: Option<Instant>,
buffer_drop: NoopBufferCommit,
}
impl NoopCaptureStream {
pub fn new(
num_channels: usize,
format: SampleFormat,
frame_rate: u32,
buffer_size: usize,
) -> Self {
let frame_size = format.sample_bytes() * num_channels;
let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
NoopCaptureStream {
buffer: vec![0; buffer_size * frame_size],
frame_size,
interval,
next_frame: interval,
start_time: None,
buffer_drop: NoopBufferCommit {
which_buffer: false,
},
}
}
}
impl CaptureBufferStream for NoopCaptureStream {
fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
if let Some(start_time) = self.start_time {
let elapsed = start_time.elapsed();
if elapsed < self.next_frame {
std::thread::sleep(self.next_frame - elapsed);
}
self.next_frame += self.interval;
} else {
self.start_time = Some(Instant::now());
self.next_frame = self.interval;
}
Ok(CaptureBuffer::new(
self.frame_size,
&mut self.buffer,
&mut self.buffer_drop,
)?)
}
}
#[async_trait(?Send)]
impl AsyncCaptureBufferStream for NoopCaptureStream {
async fn next_capture_buffer<'a>(
&'a mut self,
ex: &dyn AudioStreamsExecutor,
) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
if let Some(start_time) = self.start_time {
let elapsed = start_time.elapsed();
if elapsed < self.next_frame {
ex.delay(self.next_frame - elapsed).await?;
}
self.next_frame += self.interval;
} else {
self.start_time = Some(Instant::now());
self.next_frame = self.interval;
}
Ok(AsyncCaptureBuffer::new(
self.frame_size,
&mut self.buffer,
&mut self.buffer_drop,
)?)
}
}
pub async fn async_read_capture_buffer<F>(
stream: &mut dyn AsyncCaptureBufferStream,
f: F,
ex: &dyn AudioStreamsExecutor,
) -> Result<(), BoxError>
where
F: FnOnce(&mut AsyncCaptureBuffer) -> Result<(), BoxError>,
{
let mut buf = stream.next_capture_buffer(ex).await?;
f(&mut buf)?;
buf.commit().await;
Ok(())
}
#[cfg(test)]
mod tests {
use futures::FutureExt;
use super::super::async_api::test::TestExecutor;
use super::super::*;
use super::*;
#[test]
fn invalid_buffer_length() {
let mut cp_buf = [0xa5u8; 480 * 2 * 2 + 1];
let mut buffer_drop = NoopBufferCommit {
which_buffer: false,
};
assert!(CaptureBuffer::new(2, &mut cp_buf, &mut buffer_drop).is_err());
}
#[test]
fn commit() {
struct TestCommit {
frame_count: usize,
}
impl BufferCommit for TestCommit {
fn commit(&mut self, nwritten: usize) {
self.frame_count += nwritten;
}
}
let mut test_commit = TestCommit { frame_count: 0 };
{
const FRAME_SIZE: usize = 4;
let mut buf = [0u8; 480 * FRAME_SIZE];
let mut cp_buf = CaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
let mut local_buf = [0u8; 240 * FRAME_SIZE];
assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
cp_buf.commit();
}
assert_eq!(test_commit.frame_count, 480);
}
#[test]
fn sixteen_bit_stereo() {
let mut server = NoopStreamSource::new();
let (_, mut stream) = server
.new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
.unwrap();
let mut copy_func = |b: &mut CaptureBuffer| {
assert_eq!(b.buffer.frame_capacity(), 480);
let mut pb_buf = [0xa5u8; 480 * 2 * 2];
assert_eq!(b.read(&mut pb_buf).unwrap(), 480 * 2 * 2);
Ok(())
};
stream.read_capture_buffer(&mut copy_func).unwrap();
}
#[test]
fn consumption_rate() {
let mut server = NoopStreamSource::new();
let (_, mut stream) = server
.new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
.unwrap();
let start = Instant::now();
{
let mut copy_func = |b: &mut CaptureBuffer| {
let mut cp_buf = [0xa5u8; 480 * 2 * 2];
assert_eq!(b.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
for buf in cp_buf.iter() {
assert_eq!(*buf, 0, "Read samples should all be zeros.");
}
Ok(())
};
stream.read_capture_buffer(&mut copy_func).unwrap();
}
let mut assert_func = |_: &mut CaptureBuffer| {
let elapsed = start.elapsed();
assert!(
elapsed > Duration::from_millis(10),
"next_capture_buffer didn't block long enough {}",
elapsed.subsec_millis()
);
Ok(())
};
stream.read_capture_buffer(&mut assert_func).unwrap();
}
#[test]
fn async_commit() {
struct TestCommit {
frame_count: usize,
}
#[async_trait(?Send)]
impl AsyncBufferCommit for TestCommit {
async fn commit(&mut self, nwritten: usize) {
self.frame_count += nwritten;
}
}
async fn this_test() {
let mut test_commit = TestCommit { frame_count: 0 };
{
const FRAME_SIZE: usize = 4;
let mut buf = [0u8; 480 * FRAME_SIZE];
let mut cp_buf =
AsyncCaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
let mut local_buf = [0u8; 240 * FRAME_SIZE];
assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
cp_buf.commit().await;
}
assert_eq!(test_commit.frame_count, 480);
}
this_test().now_or_never();
}
#[test]
fn consumption_rate_async() {
async fn this_test(ex: &TestExecutor) {
let mut server = NoopStreamSource::new();
let (_, mut stream) = server
.new_async_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[], ex)
.unwrap();
let start = Instant::now();
{
let copy_func = |buf: &mut AsyncCaptureBuffer| {
let mut cp_buf = [0xa5u8; 480 * 2 * 2];
assert_eq!(buf.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
for buf in cp_buf.iter() {
assert_eq!(*buf, 0, "Read samples should all be zeros.");
}
Ok(())
};
async_read_capture_buffer(&mut *stream, copy_func, ex)
.await
.unwrap();
}
let assert_func = |_: &mut AsyncCaptureBuffer| {
let elapsed = start.elapsed();
assert!(
elapsed > Duration::from_millis(10),
"write_playback_buffer didn't block long enough {}",
elapsed.subsec_millis()
);
Ok(())
};
async_read_capture_buffer(&mut *stream, assert_func, ex)
.await
.unwrap();
}
let ex = TestExecutor {};
this_test(&ex).now_or_never();
}
}