cros_async/sys/linux/
poll_source.rs

1// Copyright 2020 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::io;
6use std::os::fd::AsRawFd;
7use std::sync::Arc;
8
9use base::handle_eintr_errno;
10use base::sys::fallocate;
11use base::sys::FallocateMode;
12use base::AsRawDescriptor;
13use base::VolatileSlice;
14use remain::sorted;
15use thiserror::Error as ThisError;
16
17use super::fd_executor;
18use super::fd_executor::EpollReactor;
19use super::fd_executor::RegisteredSource;
20use crate::common_executor::RawExecutor;
21use crate::mem::BackingMemory;
22use crate::AsyncError;
23use crate::AsyncResult;
24use crate::MemRegion;
25
26#[sorted]
27#[derive(ThisError, Debug)]
28pub enum Error {
29    /// An error occurred attempting to register a waker with the executor.
30    #[error("An error occurred attempting to register a waker with the executor: {0}.")]
31    AddingWaker(fd_executor::Error),
32    /// Failed to discard a block
33    #[error("Failed to discard a block: {0}")]
34    Discard(base::Error),
35    /// An executor error occurred.
36    #[error("An executor error occurred: {0}")]
37    Executor(fd_executor::Error),
38    /// An error occurred when executing fallocate synchronously.
39    #[error("An error occurred when executing fallocate synchronously: {0}")]
40    Fallocate(base::Error),
41    /// An error occurred when executing fdatasync synchronously.
42    #[error("An error occurred when executing fdatasync synchronously: {0}")]
43    Fdatasync(base::Error),
44    /// An error occurred when executing fsync synchronously.
45    #[error("An error occurred when executing fsync synchronously: {0}")]
46    Fsync(base::Error),
47    /// An error occurred when reading the FD.
48    #[error("An error occurred when reading the FD: {0}.")]
49    Read(base::Error),
50    /// Can't seek file.
51    #[error("An error occurred when seeking the FD: {0}.")]
52    Seeking(base::Error),
53    /// An error occurred when writing the FD.
54    #[error("An error occurred when writing the FD: {0}.")]
55    Write(base::Error),
56}
57pub type Result<T> = std::result::Result<T, Error>;
58
59impl From<Error> for io::Error {
60    fn from(e: Error) -> Self {
61        use Error::*;
62        match e {
63            AddingWaker(e) => e.into(),
64            Executor(e) => e.into(),
65            Discard(e) => e.into(),
66            Fallocate(e) => e.into(),
67            Fdatasync(e) => e.into(),
68            Fsync(e) => e.into(),
69            Read(e) => e.into(),
70            Seeking(e) => e.into(),
71            Write(e) => e.into(),
72        }
73    }
74}
75
76impl From<Error> for AsyncError {
77    fn from(e: Error) -> AsyncError {
78        AsyncError::SysVariants(e.into())
79    }
80}
81
82/// Async wrapper for an IO source that uses the FD executor to drive async operations.
83pub struct PollSource<F> {
84    registered_source: RegisteredSource<F>,
85}
86
87impl<F: AsRawDescriptor> PollSource<F> {
88    /// Create a new `PollSource` from the given IO source.
89    pub fn new(f: F, ex: &Arc<RawExecutor<EpollReactor>>) -> Result<Self> {
90        RegisteredSource::new(ex, f)
91            .map({
92                |f| PollSource {
93                    registered_source: f,
94                }
95            })
96            .map_err(Error::Executor)
97    }
98}
99
100impl<F: AsRawDescriptor> PollSource<F> {
101    /// Reads from the iosource at `file_offset` and fill the given `vec`.
102    pub async fn read_to_vec(
103        &self,
104        file_offset: Option<u64>,
105        mut vec: Vec<u8>,
106    ) -> AsyncResult<(usize, Vec<u8>)> {
107        loop {
108            let res = if let Some(offset) = file_offset {
109                // SAFETY:
110                // Safe because this will only modify `vec` and we check the return value.
111                handle_eintr_errno!(unsafe {
112                    libc::pread64(
113                        self.registered_source.duped_fd.as_raw_fd(),
114                        vec.as_mut_ptr() as *mut libc::c_void,
115                        vec.len(),
116                        offset as libc::off64_t,
117                    )
118                })
119            } else {
120                // SAFETY:
121                // Safe because this will only modify `vec` and we check the return value.
122                handle_eintr_errno!(unsafe {
123                    libc::read(
124                        self.registered_source.duped_fd.as_raw_fd(),
125                        vec.as_mut_ptr() as *mut libc::c_void,
126                        vec.len(),
127                    )
128                })
129            };
130
131            if res >= 0 {
132                return Ok((res as usize, vec));
133            }
134
135            match base::Error::last() {
136                e if e.errno() == libc::EWOULDBLOCK => {
137                    let op = self
138                        .registered_source
139                        .wait_readable()
140                        .map_err(Error::AddingWaker)?;
141                    op.await.map_err(Error::Executor)?;
142                }
143                e => return Err(Error::Read(e).into()),
144            }
145        }
146    }
147
148    /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
149    pub async fn read_to_mem(
150        &self,
151        file_offset: Option<u64>,
152        mem: Arc<dyn BackingMemory + Send + Sync>,
153        mem_offsets: impl IntoIterator<Item = MemRegion>,
154    ) -> AsyncResult<usize> {
155        let mut iovecs = mem_offsets
156            .into_iter()
157            .filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok())
158            .collect::<Vec<VolatileSlice>>();
159
160        loop {
161            let res = if let Some(offset) = file_offset {
162                // SAFETY:
163                // Safe because we trust the kernel not to write path the length given and the
164                // length is guaranteed to be valid from the pointer by
165                // io_slice_mut.
166                handle_eintr_errno!(unsafe {
167                    libc::preadv64(
168                        self.registered_source.duped_fd.as_raw_fd(),
169                        iovecs.as_mut_ptr() as *mut _,
170                        iovecs.len() as i32,
171                        offset as libc::off64_t,
172                    )
173                })
174            } else {
175                // SAFETY:
176                // Safe because we trust the kernel not to write path the length given and the
177                // length is guaranteed to be valid from the pointer by
178                // io_slice_mut.
179                handle_eintr_errno!(unsafe {
180                    libc::readv(
181                        self.registered_source.duped_fd.as_raw_fd(),
182                        iovecs.as_mut_ptr() as *mut _,
183                        iovecs.len() as i32,
184                    )
185                })
186            };
187
188            if res >= 0 {
189                return Ok(res as usize);
190            }
191
192            match base::Error::last() {
193                e if e.errno() == libc::EWOULDBLOCK => {
194                    let op = self
195                        .registered_source
196                        .wait_readable()
197                        .map_err(Error::AddingWaker)?;
198                    op.await.map_err(Error::Executor)?;
199                }
200                e => return Err(Error::Read(e).into()),
201            }
202        }
203    }
204
205    /// Wait for the FD of `self` to be readable.
206    pub async fn wait_readable(&self) -> AsyncResult<()> {
207        let op = self
208            .registered_source
209            .wait_readable()
210            .map_err(Error::AddingWaker)?;
211        op.await.map_err(Error::Executor)?;
212        Ok(())
213    }
214
215    /// Writes from the given `vec` to the file starting at `file_offset`.
216    pub async fn write_from_vec(
217        &self,
218        file_offset: Option<u64>,
219        vec: Vec<u8>,
220    ) -> AsyncResult<(usize, Vec<u8>)> {
221        loop {
222            let res = if let Some(offset) = file_offset {
223                // SAFETY:
224                // Safe because this will not modify any memory and we check the return value.
225                handle_eintr_errno!(unsafe {
226                    libc::pwrite64(
227                        self.registered_source.duped_fd.as_raw_fd(),
228                        vec.as_ptr() as *const libc::c_void,
229                        vec.len(),
230                        offset as libc::off64_t,
231                    )
232                })
233            } else {
234                // SAFETY:
235                // Safe because this will not modify any memory and we check the return value.
236                handle_eintr_errno!(unsafe {
237                    libc::write(
238                        self.registered_source.duped_fd.as_raw_fd(),
239                        vec.as_ptr() as *const libc::c_void,
240                        vec.len(),
241                    )
242                })
243            };
244
245            if res >= 0 {
246                return Ok((res as usize, vec));
247            }
248
249            match base::Error::last() {
250                e if e.errno() == libc::EWOULDBLOCK => {
251                    let op = self
252                        .registered_source
253                        .wait_writable()
254                        .map_err(Error::AddingWaker)?;
255                    op.await.map_err(Error::Executor)?;
256                }
257                e => return Err(Error::Write(e).into()),
258            }
259        }
260    }
261
262    /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
263    pub async fn write_from_mem(
264        &self,
265        file_offset: Option<u64>,
266        mem: Arc<dyn BackingMemory + Send + Sync>,
267        mem_offsets: impl IntoIterator<Item = MemRegion>,
268    ) -> AsyncResult<usize> {
269        let iovecs = mem_offsets
270            .into_iter()
271            .map(|mem_range| mem.get_volatile_slice(mem_range))
272            .filter_map(|r| r.ok())
273            .collect::<Vec<VolatileSlice>>();
274
275        loop {
276            let res = if let Some(offset) = file_offset {
277                // SAFETY:
278                // Safe because we trust the kernel not to write path the length given and the
279                // length is guaranteed to be valid from the pointer by
280                // io_slice_mut.
281                handle_eintr_errno!(unsafe {
282                    libc::pwritev64(
283                        self.registered_source.duped_fd.as_raw_fd(),
284                        iovecs.as_ptr() as *mut _,
285                        iovecs.len() as i32,
286                        offset as libc::off64_t,
287                    )
288                })
289            } else {
290                // SAFETY:
291                // Safe because we trust the kernel not to write path the length given and the
292                // length is guaranteed to be valid from the pointer by
293                // io_slice_mut.
294                handle_eintr_errno!(unsafe {
295                    libc::writev(
296                        self.registered_source.duped_fd.as_raw_fd(),
297                        iovecs.as_ptr() as *mut _,
298                        iovecs.len() as i32,
299                    )
300                })
301            };
302
303            if res >= 0 {
304                return Ok(res as usize);
305            }
306
307            match base::Error::last() {
308                e if e.errno() == libc::EWOULDBLOCK => {
309                    let op = self
310                        .registered_source
311                        .wait_writable()
312                        .map_err(Error::AddingWaker)?;
313                    op.await.map_err(Error::Executor)?;
314                }
315                e => return Err(Error::Write(e).into()),
316            }
317        }
318    }
319
320    /// # Safety
321    ///
322    /// Sync all completed write operations to the backing storage.
323    pub async fn fsync(&self) -> AsyncResult<()> {
324        // SAFETY: the duped_fd is valid and return value is checked.
325        let ret = handle_eintr_errno!(unsafe {
326            libc::fsync(self.registered_source.duped_fd.as_raw_fd())
327        });
328        if ret == 0 {
329            Ok(())
330        } else {
331            Err(Error::Fsync(base::Error::last()).into())
332        }
333    }
334
335    /// punch_hole
336    pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
337        Ok(fallocate(
338            &self.registered_source.duped_fd,
339            FallocateMode::PunchHole,
340            file_offset,
341            len,
342        )
343        .map_err(Error::Fallocate)?)
344    }
345
346    /// write_zeroes_at
347    pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
348        Ok(fallocate(
349            &self.registered_source.duped_fd,
350            FallocateMode::ZeroRange,
351            file_offset,
352            len,
353        )
354        .map_err(Error::Fallocate)?)
355    }
356
357    /// Sync all data of completed write operations to the backing storage, avoiding updating extra
358    /// metadata.
359    pub async fn fdatasync(&self) -> AsyncResult<()> {
360        // SAFETY: the duped_fd is valid and return value is checked.
361        let ret = handle_eintr_errno!(unsafe {
362            libc::fdatasync(self.registered_source.duped_fd.as_raw_fd())
363        });
364        if ret == 0 {
365            Ok(())
366        } else {
367            Err(Error::Fdatasync(base::Error::last()).into())
368        }
369    }
370
371    /// Yields the underlying IO source.
372    pub fn into_source(self) -> F {
373        self.registered_source.source
374    }
375
376    /// Provides a mutable ref to the underlying IO source.
377    pub fn as_source_mut(&mut self) -> &mut F {
378        &mut self.registered_source.source
379    }
380
381    /// Provides a ref to the underlying IO source.
382    pub fn as_source(&self) -> &F {
383        &self.registered_source.source
384    }
385}
386
387// NOTE: Prefer adding tests to io_source.rs if not backend specific.
388#[cfg(test)]
389mod tests {
390    use std::fs::File;
391
392    use super::*;
393    use crate::ExecutorTrait;
394
395    #[test]
396    fn memory_leak() {
397        // This test needs to run under ASAN to detect memory leaks.
398
399        async fn owns_poll_source(source: PollSource<File>) {
400            let _ = source.wait_readable().await;
401        }
402
403        let (rx, _tx) = base::pipe().unwrap();
404        let ex = RawExecutor::<EpollReactor>::new().unwrap();
405        let source = PollSource::new(rx, &ex).unwrap();
406        ex.spawn_local(owns_poll_source(source)).detach();
407
408        // Drop `ex` without running. This would cause a memory leak if PollSource owned a strong
409        // reference to the executor because it owns a reference to the future that owns PollSource
410        // (via its Runnable). The strong reference prevents the drop impl from running, which would
411        // otherwise poll the future and have it return with an error.
412    }
413}