cros_async/sys/linux/
uring_source.rs

1// Copyright 2020 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::ops::Deref;
6use std::ops::DerefMut;
7use std::sync::Arc;
8
9use base::sys::FallocateMode;
10use base::AsRawDescriptor;
11
12use super::uring_executor::RegisteredSource;
13use super::uring_executor::Result;
14use super::uring_executor::UringReactor;
15use crate::common_executor::RawExecutor;
16use crate::mem::BackingMemory;
17use crate::mem::MemRegion;
18use crate::mem::VecIoWrapper;
19use crate::AsyncResult;
20
21/// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around
22/// registering an IO source with the uring that provides an `IoSource` implementation.
23pub struct UringSource<F: AsRawDescriptor> {
24    registered_source: RegisteredSource,
25    source: F,
26}
27
28impl<F: AsRawDescriptor> UringSource<F> {
29    /// Creates a new `UringSource` that wraps the given `io_source` object.
30    pub fn new(io_source: F, ex: &Arc<RawExecutor<UringReactor>>) -> Result<UringSource<F>> {
31        let r = ex.reactor.register_source(ex, &io_source)?;
32        Ok(UringSource {
33            registered_source: r,
34            source: io_source,
35        })
36    }
37
38    /// Reads from the iosource at `file_offset` and fill the given `vec`.
39    pub async fn read_to_vec(
40        &self,
41        file_offset: Option<u64>,
42        vec: Vec<u8>,
43    ) -> AsyncResult<(usize, Vec<u8>)> {
44        let buf = Arc::new(VecIoWrapper::from(vec));
45        let op = self.registered_source.start_read_to_mem(
46            file_offset,
47            buf.clone(),
48            [MemRegion {
49                offset: 0,
50                len: buf.len(),
51            }],
52        )?;
53        let len = op.await?;
54        let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
55            v.into()
56        } else {
57            panic!("too many refs on buf");
58        };
59
60        Ok((len as usize, bytes))
61    }
62
63    /// Wait for the FD of `self` to be readable.
64    pub async fn wait_readable(&self) -> AsyncResult<()> {
65        let op = self.registered_source.poll_fd_readable()?;
66        op.await?;
67        Ok(())
68    }
69
70    /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
71    pub async fn read_to_mem(
72        &self,
73        file_offset: Option<u64>,
74        mem: Arc<dyn BackingMemory + Send + Sync>,
75        mem_offsets: impl IntoIterator<Item = MemRegion>,
76    ) -> AsyncResult<usize> {
77        let op = self
78            .registered_source
79            .start_read_to_mem(file_offset, mem, mem_offsets)?;
80        let len = op.await?;
81        Ok(len as usize)
82    }
83
84    /// Writes from the given `vec` to the file starting at `file_offset`.
85    pub async fn write_from_vec(
86        &self,
87        file_offset: Option<u64>,
88        vec: Vec<u8>,
89    ) -> AsyncResult<(usize, Vec<u8>)> {
90        let buf = Arc::new(VecIoWrapper::from(vec));
91        let op = self.registered_source.start_write_from_mem(
92            file_offset,
93            buf.clone(),
94            [MemRegion {
95                offset: 0,
96                len: buf.len(),
97            }],
98        )?;
99        let len = op.await?;
100        let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
101            v.into()
102        } else {
103            panic!("too many refs on buf");
104        };
105
106        Ok((len as usize, bytes))
107    }
108
109    /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
110    pub async fn write_from_mem(
111        &self,
112        file_offset: Option<u64>,
113        mem: Arc<dyn BackingMemory + Send + Sync>,
114        mem_offsets: impl IntoIterator<Item = MemRegion>,
115    ) -> AsyncResult<usize> {
116        let op = self
117            .registered_source
118            .start_write_from_mem(file_offset, mem, mem_offsets)?;
119        let len = op.await?;
120        Ok(len as usize)
121    }
122
123    /// Deallocates the given range of a file.
124    pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
125        let op = self.registered_source.start_fallocate(
126            file_offset,
127            len,
128            FallocateMode::PunchHole.into(),
129        )?;
130        let _ = op.await?;
131        Ok(())
132    }
133
134    /// Fills the given range with zeroes.
135    pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
136        let op = self.registered_source.start_fallocate(
137            file_offset,
138            len,
139            FallocateMode::ZeroRange.into(),
140        )?;
141        let _ = op.await?;
142        Ok(())
143    }
144
145    /// Sync all completed write operations to the backing storage.
146    pub async fn fsync(&self) -> AsyncResult<()> {
147        let op = self.registered_source.start_fsync()?;
148        let _ = op.await?;
149        Ok(())
150    }
151
152    /// Sync all data of completed write operations to the backing storage. Currently, the
153    /// implementation is equivalent to fsync.
154    pub async fn fdatasync(&self) -> AsyncResult<()> {
155        // Currently io_uring does not implement fdatasync. Fall back to fsync.
156        // TODO(b/281609112): Implement real fdatasync with io_uring.
157        self.fsync().await
158    }
159
160    /// Yields the underlying IO source.
161    pub fn into_source(self) -> F {
162        self.source
163    }
164
165    /// Provides a mutable ref to the underlying IO source.
166    pub fn as_source(&self) -> &F {
167        &self.source
168    }
169
170    /// Provides a ref to the underlying IO source.
171    pub fn as_source_mut(&mut self) -> &mut F {
172        &mut self.source
173    }
174}
175
176impl<F: AsRawDescriptor> Deref for UringSource<F> {
177    type Target = F;
178
179    fn deref(&self) -> &Self::Target {
180        &self.source
181    }
182}
183
184impl<F: AsRawDescriptor> DerefMut for UringSource<F> {
185    fn deref_mut(&mut self) -> &mut Self::Target {
186        &mut self.source
187    }
188}
189
190// NOTE: Prefer adding tests to io_source.rs if not backend specific.
191#[cfg(test)]
192mod tests {
193    use std::fs::File;
194    use std::future::Future;
195    use std::pin::Pin;
196    use std::task::Context;
197    use std::task::Poll;
198    use std::task::Waker;
199
200    use sync::Mutex;
201
202    use super::super::uring_executor::is_uring_stable;
203    use super::super::UringSource;
204    use super::*;
205    use crate::sys::linux::ExecutorKindSys;
206    use crate::Executor;
207    use crate::ExecutorTrait;
208    use crate::IoSource;
209
210    async fn read_u64<T: AsRawDescriptor>(source: &UringSource<T>) -> u64 {
211        // Init a vec that translates to u64::max;
212        let u64_mem = vec![0xffu8; std::mem::size_of::<u64>()];
213        let (ret, u64_mem) = source.read_to_vec(None, u64_mem).await.unwrap();
214        assert_eq!(ret, std::mem::size_of::<u64>());
215        let mut val = 0u64.to_ne_bytes();
216        val.copy_from_slice(&u64_mem);
217        u64::from_ne_bytes(val)
218    }
219
220    #[test]
221    fn event() {
222        if !is_uring_stable() {
223            return;
224        }
225
226        use base::Event;
227        use base::EventExt;
228
229        async fn write_event(ev: Event, wait: Event, ex: &Arc<RawExecutor<UringReactor>>) {
230            let wait = UringSource::new(wait, ex).unwrap();
231            ev.write_count(55).unwrap();
232            read_u64(&wait).await;
233            ev.write_count(66).unwrap();
234            read_u64(&wait).await;
235            ev.write_count(77).unwrap();
236            read_u64(&wait).await;
237        }
238
239        async fn read_events(ev: Event, signal: Event, ex: &Arc<RawExecutor<UringReactor>>) {
240            let source = UringSource::new(ev, ex).unwrap();
241            assert_eq!(read_u64(&source).await, 55);
242            signal.signal().unwrap();
243            assert_eq!(read_u64(&source).await, 66);
244            signal.signal().unwrap();
245            assert_eq!(read_u64(&source).await, 77);
246            signal.signal().unwrap();
247        }
248
249        let event = Event::new().unwrap();
250        let signal_wait = Event::new().unwrap();
251        let ex = RawExecutor::<UringReactor>::new().unwrap();
252        let write_task = write_event(
253            event.try_clone().unwrap(),
254            signal_wait.try_clone().unwrap(),
255            &ex,
256        );
257        let read_task = read_events(event, signal_wait, &ex);
258        ex.run_until(futures::future::join(read_task, write_task))
259            .unwrap();
260    }
261
262    #[test]
263    fn pend_on_pipe() {
264        if !is_uring_stable() {
265            return;
266        }
267
268        use std::io::Write;
269
270        use futures::future::Either;
271
272        async fn do_test(ex: &Arc<RawExecutor<UringReactor>>) {
273            let (read_source, mut w) = base::pipe().unwrap();
274            let source = UringSource::new(read_source, ex).unwrap();
275            let done = Box::pin(async { 5usize });
276            let pending = Box::pin(read_u64(&source));
277            match futures::future::select(pending, done).await {
278                Either::Right((5, pending)) => {
279                    // Write to the pipe so that the kernel will release the memory associated with
280                    // the uring read operation.
281                    w.write_all(&[0]).expect("failed to write to pipe");
282                    ::std::mem::drop(pending);
283                }
284                _ => panic!("unexpected select result"),
285            };
286        }
287
288        let ex = RawExecutor::<UringReactor>::new().unwrap();
289        ex.run_until(do_test(&ex)).unwrap();
290    }
291
292    #[test]
293    fn range_error() {
294        if !is_uring_stable() {
295            return;
296        }
297
298        async fn go(ex: &Arc<RawExecutor<UringReactor>>) {
299            let f = File::open("/dev/zero").unwrap();
300            let source = UringSource::new(f, ex).unwrap();
301            let v = vec![0x55u8; 64];
302            let vw = Arc::new(VecIoWrapper::from(v));
303            let ret = source
304                .read_to_mem(
305                    None,
306                    Arc::<VecIoWrapper>::clone(&vw),
307                    [MemRegion {
308                        offset: 32,
309                        len: 33,
310                    }],
311                )
312                .await;
313            assert!(ret.is_err());
314        }
315
316        let ex = RawExecutor::<UringReactor>::new().unwrap();
317        ex.run_until(go(&ex)).unwrap();
318    }
319
320    #[test]
321    fn wait_read() {
322        if !is_uring_stable() {
323            return;
324        }
325
326        async fn go(ex: &Arc<RawExecutor<UringReactor>>) {
327            let f = File::open("/dev/zero").unwrap();
328            let source = UringSource::new(f, ex).unwrap();
329            source.wait_readable().await.unwrap();
330        }
331
332        let ex = RawExecutor::<UringReactor>::new().unwrap();
333        ex.run_until(go(&ex)).unwrap();
334    }
335
336    struct State {
337        should_quit: bool,
338        waker: Option<Waker>,
339    }
340
341    impl State {
342        fn wake(&mut self) {
343            self.should_quit = true;
344            let waker = self.waker.take();
345
346            if let Some(waker) = waker {
347                waker.wake();
348            }
349        }
350    }
351
352    struct Quit {
353        state: Arc<Mutex<State>>,
354    }
355
356    impl Future for Quit {
357        type Output = ();
358
359        fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
360            let mut state = self.state.lock();
361            if state.should_quit {
362                return Poll::Ready(());
363            }
364
365            state.waker = Some(cx.waker().clone());
366            Poll::Pending
367        }
368    }
369
370    #[cfg(any(target_os = "android", target_os = "linux"))]
371    #[test]
372    fn await_uring_from_poll() {
373        if !is_uring_stable() {
374            return;
375        }
376        // Start a uring operation and then await the result from an FdExecutor.
377        async fn go(source: IoSource<File>) {
378            let v = vec![0xa4u8; 16];
379            let (len, vec) = source.read_to_vec(None, v).await.unwrap();
380            assert_eq!(len, 16);
381            assert!(vec.iter().all(|&b| b == 0));
382        }
383
384        let state = Arc::new(Mutex::new(State {
385            should_quit: false,
386            waker: None,
387        }));
388
389        let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap();
390        let f = File::open("/dev/zero").unwrap();
391        let source = uring_ex.async_from(f).unwrap();
392
393        let quit = Quit {
394            state: state.clone(),
395        };
396        let handle = std::thread::spawn(move || uring_ex.run_until(quit));
397
398        let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap();
399        poll_ex.run_until(go(source)).unwrap();
400
401        state.lock().wake();
402        handle.join().unwrap().unwrap();
403    }
404
405    #[cfg(any(target_os = "android", target_os = "linux"))]
406    #[test]
407    fn await_poll_from_uring() {
408        if !is_uring_stable() {
409            return;
410        }
411        // Start a poll operation and then await the result
412        async fn go(source: IoSource<File>) {
413            let v = vec![0x2cu8; 16];
414            let (len, vec) = source.read_to_vec(None, v).await.unwrap();
415            assert_eq!(len, 16);
416            assert!(vec.iter().all(|&b| b == 0));
417        }
418
419        let state = Arc::new(Mutex::new(State {
420            should_quit: false,
421            waker: None,
422        }));
423
424        let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap();
425        let f = File::open("/dev/zero").unwrap();
426        let source = poll_ex.async_from(f).unwrap();
427
428        let quit = Quit {
429            state: state.clone(),
430        };
431        let handle = std::thread::spawn(move || poll_ex.run_until(quit));
432
433        let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap();
434        uring_ex.run_until(go(source)).unwrap();
435
436        state.lock().wake();
437        handle.join().unwrap().unwrap();
438    }
439}