audio_util/
file_streams.rs

1// Copyright 2023 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::fs::File;
6use std::io::Error as IOError;
7use std::slice;
8use std::sync::atomic::AtomicUsize;
9use std::sync::atomic::Ordering;
10use std::sync::Arc;
11use std::time::Duration;
12use std::time::Instant;
13
14use async_trait::async_trait;
15use audio_streams::AsyncBufferCommit;
16use audio_streams::AsyncPlaybackBuffer;
17use audio_streams::AsyncPlaybackBufferStream;
18use audio_streams::AudioStreamsExecutor;
19use audio_streams::BoxError;
20use audio_streams::NoopStreamControl;
21use audio_streams::SampleFormat;
22use audio_streams::StreamControl;
23use audio_streams::StreamSource;
24use audio_streams::StreamSourceGenerator;
25use base::warn;
26use base::MappedRegion;
27use base::MemoryMapping;
28use base::MemoryMappingBuilder;
29use base::MmapError;
30use thiserror::Error as ThisError;
31
32#[derive(ThisError, Debug)]
33pub enum Error {
34    #[error("Failed to build memory mapping: {0}")]
35    BuildMemoryMapping(MmapError),
36    #[error("Failed to clone file descriptor: {0}")]
37    Clone(IOError),
38    #[error("Not implemented")]
39    Unimplemented,
40}
41
42/// An Audio Stream that can be used to write playback buffer to a file.
43/// `FileStream` doesn't directly open and write to file. It receives
44/// an mmap of a file instead.
45///
46/// Note that `FileStream` also needs the mmap-ed file has allocated some spaces
47/// to be written. If the playback buffer exceeds the allocated spaces,
48/// it will invoke `panic!`
49pub struct FileStream {
50    /// A MemoryMapping that will hold the copy of the playback buffer.
51    memory_mapping: AudioMemoryMapping,
52    /// Number of bytes that has been written.
53    offset: Arc<AtomicUsize>,
54    /// Number of bytes in a single audio frame.
55    frame_size: usize,
56    /// Length of the current playback buffer in bytes.
57    buffer_mem_length: usize,
58
59    /// Duration of an audio in milliseconds for the current `buffer_size`.
60    interval_ms: Duration,
61    /// Time marker of correct time to return next buffer.
62    next_frame: Duration,
63    /// Timestamp that records when the stream starts.
64    start_time: Option<Instant>,
65    /// Type that will be called before the buffer is dropped.
66    buffer_drop: FileStreamBufferCommit,
67}
68
69impl FileStream {
70    fn new(
71        memory_mapping: AudioMemoryMapping,
72        offset: Arc<AtomicUsize>,
73        frame_size: usize,
74        buffer_mem_length: usize,
75        interval_ms: Duration,
76    ) -> Self {
77        let max_offset = memory_mapping.size();
78        FileStream {
79            memory_mapping,
80            offset: offset.clone(),
81            frame_size,
82            buffer_mem_length,
83
84            interval_ms,
85            next_frame: interval_ms,
86            start_time: None,
87            buffer_drop: FileStreamBufferCommit {
88                frame_size,
89                offset,
90                max_offset,
91            },
92        }
93    }
94}
95
96#[async_trait(?Send)]
97impl AsyncPlaybackBufferStream for FileStream {
98    async fn next_playback_buffer<'a>(
99        &'a mut self,
100        ex: &dyn AudioStreamsExecutor,
101    ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
102        if let Some(start_time) = self.start_time {
103            let elapsed = start_time.elapsed();
104            if elapsed < self.next_frame {
105                ex.delay(self.next_frame - elapsed).await?;
106            }
107            self.next_frame += self.interval_ms;
108        } else {
109            self.start_time = Some(Instant::now());
110            self.next_frame = self.interval_ms;
111        }
112
113        let offset = self.offset.load(Ordering::Relaxed);
114        let buffer = self
115            .memory_mapping
116            .get_slice_mut(offset, self.buffer_mem_length);
117
118        Ok(AsyncPlaybackBuffer::new(
119            self.frame_size,
120            buffer,
121            &mut self.buffer_drop,
122        )?)
123    }
124}
125
126struct FileStreamSource {
127    file: File,
128    file_size: usize,
129    offset: Arc<AtomicUsize>,
130}
131
132impl FileStreamSource {
133    fn new(file: File, file_size: usize, offset: Arc<AtomicUsize>) -> Self {
134        FileStreamSource {
135            file,
136            file_size,
137            offset,
138        }
139    }
140}
141
142impl StreamSource for FileStreamSource {
143    fn new_async_playback_stream(
144        &mut self,
145        num_channels: usize,
146        format: SampleFormat,
147        frame_rate: u32,
148        buffer_size: usize,
149        _ex: &dyn AudioStreamsExecutor,
150    ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
151        let memory_mapping = MemoryMappingBuilder::new(self.file_size)
152            .from_file(&self.file)
153            .build()
154            .map_err(Error::BuildMemoryMapping)?;
155
156        let frame_size = format.sample_bytes() * num_channels;
157        let buffer_mem_length = buffer_size * frame_size;
158        let memory_mapping = AudioMemoryMapping::new(memory_mapping, buffer_mem_length);
159        let interval_ms = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
160        Ok((
161            Box::new(NoopStreamControl::new()),
162            Box::new(FileStream::new(
163                memory_mapping,
164                self.offset.clone(),
165                frame_size,
166                buffer_mem_length,
167                interval_ms,
168            )),
169        ))
170    }
171
172    fn new_playback_stream(
173        &mut self,
174        _num_channels: usize,
175        _format: SampleFormat,
176        _frame_rate: u32,
177        _buffer_size: usize,
178    ) -> Result<
179        (
180            Box<dyn StreamControl>,
181            Box<dyn audio_streams::PlaybackBufferStream>,
182        ),
183        BoxError,
184    > {
185        Err(Box::new(Error::Unimplemented))
186    }
187}
188
189/// `FileStreamSourceGenerator` is a struct that implements [`StreamSourceGenerator`]
190/// for `FileStreamSource`.
191pub struct FileStreamSourceGenerator {
192    /// File descriptor which will be used to write playback buffer.
193    file: File,
194    /// Size of the output file in bytes.
195    file_size: usize,
196    /// Number of bytes that has been written to the file.
197    offset: Arc<AtomicUsize>,
198}
199
200impl FileStreamSourceGenerator {
201    /// Creates a new `FileStreamSourceGenerator` by given arguments.
202    /// It expects `file` has `file_size` of bytes allocated spaces.
203    ///
204    /// # Arguments
205    ///
206    /// * `file` - The file where audio playback buffer will be written.
207    /// * `file_size` - The size of bytes allocated for playback_file.
208    pub fn new(file: File, file_size: usize) -> Self {
209        FileStreamSourceGenerator {
210            file,
211            file_size,
212            offset: Arc::new(AtomicUsize::new(0)),
213        }
214    }
215}
216
217impl StreamSourceGenerator for FileStreamSourceGenerator {
218    fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError> {
219        Ok(Box::new(FileStreamSource::new(
220            self.file.try_clone().map_err(Error::Clone)?,
221            self.file_size,
222            self.offset.clone(),
223        )))
224    }
225}
226
227struct FileStreamBufferCommit {
228    frame_size: usize,
229    offset: Arc<AtomicUsize>,
230    max_offset: usize,
231}
232
233#[async_trait(?Send)]
234impl AsyncBufferCommit for FileStreamBufferCommit {
235    async fn commit(&mut self, nwritten: usize) {
236        let written_bytes = nwritten * self.frame_size;
237        if self.offset.load(Ordering::Relaxed) + written_bytes < self.max_offset {
238            self.offset.fetch_add(written_bytes, Ordering::Relaxed);
239        }
240    }
241}
242
243struct AudioMemoryMapping {
244    memory_mapping: MemoryMapping,
245    zero_buffer: Vec<u8>,
246}
247
248impl AudioMemoryMapping {
249    fn new(memory_mapping: MemoryMapping, buffer_mem_length: usize) -> Self {
250        AudioMemoryMapping {
251            memory_mapping,
252            zero_buffer: vec![0; buffer_mem_length],
253        }
254    }
255
256    fn get_slice_mut(&mut self, offset: usize, len: usize) -> &mut [u8] {
257        if offset + len >= self.memory_mapping.size() {
258            warn!("Accessing unallocated region");
259            return &mut self.zero_buffer;
260        }
261        // SAFETY:
262        // safe because the region returned is owned by self.memory_mapping
263        unsafe { slice::from_raw_parts_mut(self.memory_mapping.as_ptr().add(offset), len) }
264    }
265
266    fn size(&self) -> usize {
267        self.memory_mapping.size()
268    }
269}