1use std::io;
6use std::os::fd::AsRawFd;
7use std::sync::Arc;
8
9use base::handle_eintr_errno;
10use base::sys::fallocate;
11use base::sys::FallocateMode;
12use base::AsRawDescriptor;
13use base::VolatileSlice;
14use remain::sorted;
15use thiserror::Error as ThisError;
16
17use super::fd_executor;
18use super::fd_executor::EpollReactor;
19use super::fd_executor::RegisteredSource;
20use crate::common_executor::RawExecutor;
21use crate::mem::BackingMemory;
22use crate::AsyncError;
23use crate::AsyncResult;
24use crate::MemRegion;
25
26#[sorted]
27#[derive(ThisError, Debug)]
28pub enum Error {
29 #[error("An error occurred attempting to register a waker with the executor: {0}.")]
31 AddingWaker(fd_executor::Error),
32 #[error("Failed to discard a block: {0}")]
34 Discard(base::Error),
35 #[error("An executor error occurred: {0}")]
37 Executor(fd_executor::Error),
38 #[error("An error occurred when executing fallocate synchronously: {0}")]
40 Fallocate(base::Error),
41 #[error("An error occurred when executing fdatasync synchronously: {0}")]
43 Fdatasync(base::Error),
44 #[error("An error occurred when executing fsync synchronously: {0}")]
46 Fsync(base::Error),
47 #[error("An error occurred when reading the FD: {0}.")]
49 Read(base::Error),
50 #[error("An error occurred when seeking the FD: {0}.")]
52 Seeking(base::Error),
53 #[error("An error occurred when writing the FD: {0}.")]
55 Write(base::Error),
56}
57pub type Result<T> = std::result::Result<T, Error>;
58
59impl From<Error> for io::Error {
60 fn from(e: Error) -> Self {
61 use Error::*;
62 match e {
63 AddingWaker(e) => e.into(),
64 Executor(e) => e.into(),
65 Discard(e) => e.into(),
66 Fallocate(e) => e.into(),
67 Fdatasync(e) => e.into(),
68 Fsync(e) => e.into(),
69 Read(e) => e.into(),
70 Seeking(e) => e.into(),
71 Write(e) => e.into(),
72 }
73 }
74}
75
76impl From<Error> for AsyncError {
77 fn from(e: Error) -> AsyncError {
78 AsyncError::SysVariants(e.into())
79 }
80}
81
82pub struct PollSource<F> {
84 registered_source: RegisteredSource<F>,
85}
86
87impl<F: AsRawDescriptor> PollSource<F> {
88 pub fn new(f: F, ex: &Arc<RawExecutor<EpollReactor>>) -> Result<Self> {
90 RegisteredSource::new(ex, f)
91 .map({
92 |f| PollSource {
93 registered_source: f,
94 }
95 })
96 .map_err(Error::Executor)
97 }
98}
99
100impl<F: AsRawDescriptor> PollSource<F> {
101 pub async fn read_to_vec(
103 &self,
104 file_offset: Option<u64>,
105 mut vec: Vec<u8>,
106 ) -> AsyncResult<(usize, Vec<u8>)> {
107 loop {
108 let res = if let Some(offset) = file_offset {
109 handle_eintr_errno!(unsafe {
112 libc::pread64(
113 self.registered_source.duped_fd.as_raw_fd(),
114 vec.as_mut_ptr() as *mut libc::c_void,
115 vec.len(),
116 offset as libc::off64_t,
117 )
118 })
119 } else {
120 handle_eintr_errno!(unsafe {
123 libc::read(
124 self.registered_source.duped_fd.as_raw_fd(),
125 vec.as_mut_ptr() as *mut libc::c_void,
126 vec.len(),
127 )
128 })
129 };
130
131 if res >= 0 {
132 return Ok((res as usize, vec));
133 }
134
135 match base::Error::last() {
136 e if e.errno() == libc::EWOULDBLOCK => {
137 let op = self
138 .registered_source
139 .wait_readable()
140 .map_err(Error::AddingWaker)?;
141 op.await.map_err(Error::Executor)?;
142 }
143 e => return Err(Error::Read(e).into()),
144 }
145 }
146 }
147
148 pub async fn read_to_mem(
150 &self,
151 file_offset: Option<u64>,
152 mem: Arc<dyn BackingMemory + Send + Sync>,
153 mem_offsets: impl IntoIterator<Item = MemRegion>,
154 ) -> AsyncResult<usize> {
155 let mut iovecs = mem_offsets
156 .into_iter()
157 .filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok())
158 .collect::<Vec<VolatileSlice>>();
159
160 loop {
161 let res = if let Some(offset) = file_offset {
162 handle_eintr_errno!(unsafe {
167 libc::preadv64(
168 self.registered_source.duped_fd.as_raw_fd(),
169 iovecs.as_mut_ptr() as *mut _,
170 iovecs.len() as i32,
171 offset as libc::off64_t,
172 )
173 })
174 } else {
175 handle_eintr_errno!(unsafe {
180 libc::readv(
181 self.registered_source.duped_fd.as_raw_fd(),
182 iovecs.as_mut_ptr() as *mut _,
183 iovecs.len() as i32,
184 )
185 })
186 };
187
188 if res >= 0 {
189 return Ok(res as usize);
190 }
191
192 match base::Error::last() {
193 e if e.errno() == libc::EWOULDBLOCK => {
194 let op = self
195 .registered_source
196 .wait_readable()
197 .map_err(Error::AddingWaker)?;
198 op.await.map_err(Error::Executor)?;
199 }
200 e => return Err(Error::Read(e).into()),
201 }
202 }
203 }
204
205 pub async fn wait_readable(&self) -> AsyncResult<()> {
207 let op = self
208 .registered_source
209 .wait_readable()
210 .map_err(Error::AddingWaker)?;
211 op.await.map_err(Error::Executor)?;
212 Ok(())
213 }
214
215 pub async fn write_from_vec(
217 &self,
218 file_offset: Option<u64>,
219 vec: Vec<u8>,
220 ) -> AsyncResult<(usize, Vec<u8>)> {
221 loop {
222 let res = if let Some(offset) = file_offset {
223 handle_eintr_errno!(unsafe {
226 libc::pwrite64(
227 self.registered_source.duped_fd.as_raw_fd(),
228 vec.as_ptr() as *const libc::c_void,
229 vec.len(),
230 offset as libc::off64_t,
231 )
232 })
233 } else {
234 handle_eintr_errno!(unsafe {
237 libc::write(
238 self.registered_source.duped_fd.as_raw_fd(),
239 vec.as_ptr() as *const libc::c_void,
240 vec.len(),
241 )
242 })
243 };
244
245 if res >= 0 {
246 return Ok((res as usize, vec));
247 }
248
249 match base::Error::last() {
250 e if e.errno() == libc::EWOULDBLOCK => {
251 let op = self
252 .registered_source
253 .wait_writable()
254 .map_err(Error::AddingWaker)?;
255 op.await.map_err(Error::Executor)?;
256 }
257 e => return Err(Error::Write(e).into()),
258 }
259 }
260 }
261
262 pub async fn write_from_mem(
264 &self,
265 file_offset: Option<u64>,
266 mem: Arc<dyn BackingMemory + Send + Sync>,
267 mem_offsets: impl IntoIterator<Item = MemRegion>,
268 ) -> AsyncResult<usize> {
269 let iovecs = mem_offsets
270 .into_iter()
271 .map(|mem_range| mem.get_volatile_slice(mem_range))
272 .filter_map(|r| r.ok())
273 .collect::<Vec<VolatileSlice>>();
274
275 loop {
276 let res = if let Some(offset) = file_offset {
277 handle_eintr_errno!(unsafe {
282 libc::pwritev64(
283 self.registered_source.duped_fd.as_raw_fd(),
284 iovecs.as_ptr() as *mut _,
285 iovecs.len() as i32,
286 offset as libc::off64_t,
287 )
288 })
289 } else {
290 handle_eintr_errno!(unsafe {
295 libc::writev(
296 self.registered_source.duped_fd.as_raw_fd(),
297 iovecs.as_ptr() as *mut _,
298 iovecs.len() as i32,
299 )
300 })
301 };
302
303 if res >= 0 {
304 return Ok(res as usize);
305 }
306
307 match base::Error::last() {
308 e if e.errno() == libc::EWOULDBLOCK => {
309 let op = self
310 .registered_source
311 .wait_writable()
312 .map_err(Error::AddingWaker)?;
313 op.await.map_err(Error::Executor)?;
314 }
315 e => return Err(Error::Write(e).into()),
316 }
317 }
318 }
319
320 pub async fn fsync(&self) -> AsyncResult<()> {
324 let ret = handle_eintr_errno!(unsafe {
326 libc::fsync(self.registered_source.duped_fd.as_raw_fd())
327 });
328 if ret == 0 {
329 Ok(())
330 } else {
331 Err(Error::Fsync(base::Error::last()).into())
332 }
333 }
334
335 pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
337 Ok(fallocate(
338 &self.registered_source.duped_fd,
339 FallocateMode::PunchHole,
340 file_offset,
341 len,
342 )
343 .map_err(Error::Fallocate)?)
344 }
345
346 pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
348 Ok(fallocate(
349 &self.registered_source.duped_fd,
350 FallocateMode::ZeroRange,
351 file_offset,
352 len,
353 )
354 .map_err(Error::Fallocate)?)
355 }
356
357 pub async fn fdatasync(&self) -> AsyncResult<()> {
360 let ret = handle_eintr_errno!(unsafe {
362 libc::fdatasync(self.registered_source.duped_fd.as_raw_fd())
363 });
364 if ret == 0 {
365 Ok(())
366 } else {
367 Err(Error::Fdatasync(base::Error::last()).into())
368 }
369 }
370
371 pub fn into_source(self) -> F {
373 self.registered_source.source
374 }
375
376 pub fn as_source_mut(&mut self) -> &mut F {
378 &mut self.registered_source.source
379 }
380
381 pub fn as_source(&self) -> &F {
383 &self.registered_source.source
384 }
385}
386
387#[cfg(test)]
389mod tests {
390 use std::fs::File;
391
392 use super::*;
393 use crate::ExecutorTrait;
394
395 #[test]
396 fn memory_leak() {
397 async fn owns_poll_source(source: PollSource<File>) {
400 let _ = source.wait_readable().await;
401 }
402
403 let (rx, _tx) = base::pipe().unwrap();
404 let ex = RawExecutor::<EpollReactor>::new().unwrap();
405 let source = PollSource::new(rx, &ex).unwrap();
406 ex.spawn_local(owns_poll_source(source)).detach();
407
408 }
413}