1use std::fs::File;
6use std::io::Error as IOError;
7use std::slice;
8use std::sync::atomic::AtomicUsize;
9use std::sync::atomic::Ordering;
10use std::sync::Arc;
11use std::time::Duration;
12use std::time::Instant;
13
14use async_trait::async_trait;
15use audio_streams::AsyncBufferCommit;
16use audio_streams::AsyncPlaybackBuffer;
17use audio_streams::AsyncPlaybackBufferStream;
18use audio_streams::AudioStreamsExecutor;
19use audio_streams::BoxError;
20use audio_streams::NoopStreamControl;
21use audio_streams::SampleFormat;
22use audio_streams::StreamControl;
23use audio_streams::StreamSource;
24use audio_streams::StreamSourceGenerator;
25use base::warn;
26use base::MappedRegion;
27use base::MemoryMapping;
28use base::MemoryMappingBuilder;
29use base::MmapError;
30use thiserror::Error as ThisError;
31
32#[derive(ThisError, Debug)]
33pub enum Error {
34 #[error("Failed to build memory mapping: {0}")]
35 BuildMemoryMapping(MmapError),
36 #[error("Failed to clone file descriptor: {0}")]
37 Clone(IOError),
38 #[error("Not implemented")]
39 Unimplemented,
40}
41
42pub struct FileStream {
50 memory_mapping: AudioMemoryMapping,
52 offset: Arc<AtomicUsize>,
54 frame_size: usize,
56 buffer_mem_length: usize,
58
59 interval_ms: Duration,
61 next_frame: Duration,
63 start_time: Option<Instant>,
65 buffer_drop: FileStreamBufferCommit,
67}
68
69impl FileStream {
70 fn new(
71 memory_mapping: AudioMemoryMapping,
72 offset: Arc<AtomicUsize>,
73 frame_size: usize,
74 buffer_mem_length: usize,
75 interval_ms: Duration,
76 ) -> Self {
77 let max_offset = memory_mapping.size();
78 FileStream {
79 memory_mapping,
80 offset: offset.clone(),
81 frame_size,
82 buffer_mem_length,
83
84 interval_ms,
85 next_frame: interval_ms,
86 start_time: None,
87 buffer_drop: FileStreamBufferCommit {
88 frame_size,
89 offset,
90 max_offset,
91 },
92 }
93 }
94}
95
96#[async_trait(?Send)]
97impl AsyncPlaybackBufferStream for FileStream {
98 async fn next_playback_buffer<'a>(
99 &'a mut self,
100 ex: &dyn AudioStreamsExecutor,
101 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
102 if let Some(start_time) = self.start_time {
103 let elapsed = start_time.elapsed();
104 if elapsed < self.next_frame {
105 ex.delay(self.next_frame - elapsed).await?;
106 }
107 self.next_frame += self.interval_ms;
108 } else {
109 self.start_time = Some(Instant::now());
110 self.next_frame = self.interval_ms;
111 }
112
113 let offset = self.offset.load(Ordering::Relaxed);
114 let buffer = self
115 .memory_mapping
116 .get_slice_mut(offset, self.buffer_mem_length);
117
118 Ok(AsyncPlaybackBuffer::new(
119 self.frame_size,
120 buffer,
121 &mut self.buffer_drop,
122 )?)
123 }
124}
125
126struct FileStreamSource {
127 file: File,
128 file_size: usize,
129 offset: Arc<AtomicUsize>,
130}
131
132impl FileStreamSource {
133 fn new(file: File, file_size: usize, offset: Arc<AtomicUsize>) -> Self {
134 FileStreamSource {
135 file,
136 file_size,
137 offset,
138 }
139 }
140}
141
142impl StreamSource for FileStreamSource {
143 fn new_async_playback_stream(
144 &mut self,
145 num_channels: usize,
146 format: SampleFormat,
147 frame_rate: u32,
148 buffer_size: usize,
149 _ex: &dyn AudioStreamsExecutor,
150 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
151 let memory_mapping = MemoryMappingBuilder::new(self.file_size)
152 .from_file(&self.file)
153 .build()
154 .map_err(Error::BuildMemoryMapping)?;
155
156 let frame_size = format.sample_bytes() * num_channels;
157 let buffer_mem_length = buffer_size * frame_size;
158 let memory_mapping = AudioMemoryMapping::new(memory_mapping, buffer_mem_length);
159 let interval_ms = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
160 Ok((
161 Box::new(NoopStreamControl::new()),
162 Box::new(FileStream::new(
163 memory_mapping,
164 self.offset.clone(),
165 frame_size,
166 buffer_mem_length,
167 interval_ms,
168 )),
169 ))
170 }
171
172 fn new_playback_stream(
173 &mut self,
174 _num_channels: usize,
175 _format: SampleFormat,
176 _frame_rate: u32,
177 _buffer_size: usize,
178 ) -> Result<
179 (
180 Box<dyn StreamControl>,
181 Box<dyn audio_streams::PlaybackBufferStream>,
182 ),
183 BoxError,
184 > {
185 Err(Box::new(Error::Unimplemented))
186 }
187}
188
189pub struct FileStreamSourceGenerator {
192 file: File,
194 file_size: usize,
196 offset: Arc<AtomicUsize>,
198}
199
200impl FileStreamSourceGenerator {
201 pub fn new(file: File, file_size: usize) -> Self {
209 FileStreamSourceGenerator {
210 file,
211 file_size,
212 offset: Arc::new(AtomicUsize::new(0)),
213 }
214 }
215}
216
217impl StreamSourceGenerator for FileStreamSourceGenerator {
218 fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError> {
219 Ok(Box::new(FileStreamSource::new(
220 self.file.try_clone().map_err(Error::Clone)?,
221 self.file_size,
222 self.offset.clone(),
223 )))
224 }
225}
226
227struct FileStreamBufferCommit {
228 frame_size: usize,
229 offset: Arc<AtomicUsize>,
230 max_offset: usize,
231}
232
233#[async_trait(?Send)]
234impl AsyncBufferCommit for FileStreamBufferCommit {
235 async fn commit(&mut self, nwritten: usize) {
236 let written_bytes = nwritten * self.frame_size;
237 if self.offset.load(Ordering::Relaxed) + written_bytes < self.max_offset {
238 self.offset.fetch_add(written_bytes, Ordering::Relaxed);
239 }
240 }
241}
242
243struct AudioMemoryMapping {
244 memory_mapping: MemoryMapping,
245 zero_buffer: Vec<u8>,
246}
247
248impl AudioMemoryMapping {
249 fn new(memory_mapping: MemoryMapping, buffer_mem_length: usize) -> Self {
250 AudioMemoryMapping {
251 memory_mapping,
252 zero_buffer: vec![0; buffer_mem_length],
253 }
254 }
255
256 fn get_slice_mut(&mut self, offset: usize, len: usize) -> &mut [u8] {
257 if offset + len >= self.memory_mapping.size() {
258 warn!("Accessing unallocated region");
259 return &mut self.zero_buffer;
260 }
261 unsafe { slice::from_raw_parts_mut(self.memory_mapping.as_ptr().add(offset), len) }
264 }
265
266 fn size(&self) -> usize {
267 self.memory_mapping.size()
268 }
269}