1use std::ops::Deref;
6use std::ops::DerefMut;
7use std::sync::Arc;
8
9use base::sys::FallocateMode;
10use base::AsRawDescriptor;
11
12use super::uring_executor::RegisteredSource;
13use super::uring_executor::Result;
14use super::uring_executor::UringReactor;
15use crate::common_executor::RawExecutor;
16use crate::mem::BackingMemory;
17use crate::mem::MemRegion;
18use crate::mem::VecIoWrapper;
19use crate::AsyncResult;
20
21pub struct UringSource<F: AsRawDescriptor> {
24 registered_source: RegisteredSource,
25 source: F,
26}
27
28impl<F: AsRawDescriptor> UringSource<F> {
29 pub fn new(io_source: F, ex: &Arc<RawExecutor<UringReactor>>) -> Result<UringSource<F>> {
31 let r = ex.reactor.register_source(ex, &io_source)?;
32 Ok(UringSource {
33 registered_source: r,
34 source: io_source,
35 })
36 }
37
38 pub async fn read_to_vec(
40 &self,
41 file_offset: Option<u64>,
42 vec: Vec<u8>,
43 ) -> AsyncResult<(usize, Vec<u8>)> {
44 let buf = Arc::new(VecIoWrapper::from(vec));
45 let op = self.registered_source.start_read_to_mem(
46 file_offset,
47 buf.clone(),
48 [MemRegion {
49 offset: 0,
50 len: buf.len(),
51 }],
52 )?;
53 let len = op.await?;
54 let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
55 v.into()
56 } else {
57 panic!("too many refs on buf");
58 };
59
60 Ok((len as usize, bytes))
61 }
62
63 pub async fn wait_readable(&self) -> AsyncResult<()> {
65 let op = self.registered_source.poll_fd_readable()?;
66 op.await?;
67 Ok(())
68 }
69
70 pub async fn read_to_mem(
72 &self,
73 file_offset: Option<u64>,
74 mem: Arc<dyn BackingMemory + Send + Sync>,
75 mem_offsets: impl IntoIterator<Item = MemRegion>,
76 ) -> AsyncResult<usize> {
77 let op = self
78 .registered_source
79 .start_read_to_mem(file_offset, mem, mem_offsets)?;
80 let len = op.await?;
81 Ok(len as usize)
82 }
83
84 pub async fn write_from_vec(
86 &self,
87 file_offset: Option<u64>,
88 vec: Vec<u8>,
89 ) -> AsyncResult<(usize, Vec<u8>)> {
90 let buf = Arc::new(VecIoWrapper::from(vec));
91 let op = self.registered_source.start_write_from_mem(
92 file_offset,
93 buf.clone(),
94 [MemRegion {
95 offset: 0,
96 len: buf.len(),
97 }],
98 )?;
99 let len = op.await?;
100 let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
101 v.into()
102 } else {
103 panic!("too many refs on buf");
104 };
105
106 Ok((len as usize, bytes))
107 }
108
109 pub async fn write_from_mem(
111 &self,
112 file_offset: Option<u64>,
113 mem: Arc<dyn BackingMemory + Send + Sync>,
114 mem_offsets: impl IntoIterator<Item = MemRegion>,
115 ) -> AsyncResult<usize> {
116 let op = self
117 .registered_source
118 .start_write_from_mem(file_offset, mem, mem_offsets)?;
119 let len = op.await?;
120 Ok(len as usize)
121 }
122
123 pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
125 let op = self.registered_source.start_fallocate(
126 file_offset,
127 len,
128 FallocateMode::PunchHole.into(),
129 )?;
130 let _ = op.await?;
131 Ok(())
132 }
133
134 pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
136 let op = self.registered_source.start_fallocate(
137 file_offset,
138 len,
139 FallocateMode::ZeroRange.into(),
140 )?;
141 let _ = op.await?;
142 Ok(())
143 }
144
145 pub async fn fsync(&self) -> AsyncResult<()> {
147 let op = self.registered_source.start_fsync()?;
148 let _ = op.await?;
149 Ok(())
150 }
151
152 pub async fn fdatasync(&self) -> AsyncResult<()> {
155 self.fsync().await
158 }
159
160 pub fn into_source(self) -> F {
162 self.source
163 }
164
165 pub fn as_source(&self) -> &F {
167 &self.source
168 }
169
170 pub fn as_source_mut(&mut self) -> &mut F {
172 &mut self.source
173 }
174}
175
176impl<F: AsRawDescriptor> Deref for UringSource<F> {
177 type Target = F;
178
179 fn deref(&self) -> &Self::Target {
180 &self.source
181 }
182}
183
184impl<F: AsRawDescriptor> DerefMut for UringSource<F> {
185 fn deref_mut(&mut self) -> &mut Self::Target {
186 &mut self.source
187 }
188}
189
190#[cfg(test)]
192mod tests {
193 use std::fs::File;
194 use std::future::Future;
195 use std::pin::Pin;
196 use std::task::Context;
197 use std::task::Poll;
198 use std::task::Waker;
199
200 use sync::Mutex;
201
202 use super::super::uring_executor::is_uring_stable;
203 use super::super::UringSource;
204 use super::*;
205 use crate::sys::linux::ExecutorKindSys;
206 use crate::Executor;
207 use crate::ExecutorTrait;
208 use crate::IoSource;
209
210 async fn read_u64<T: AsRawDescriptor>(source: &UringSource<T>) -> u64 {
211 let u64_mem = vec![0xffu8; std::mem::size_of::<u64>()];
213 let (ret, u64_mem) = source.read_to_vec(None, u64_mem).await.unwrap();
214 assert_eq!(ret, std::mem::size_of::<u64>());
215 let mut val = 0u64.to_ne_bytes();
216 val.copy_from_slice(&u64_mem);
217 u64::from_ne_bytes(val)
218 }
219
220 #[test]
221 fn event() {
222 if !is_uring_stable() {
223 return;
224 }
225
226 use base::Event;
227 use base::EventExt;
228
229 async fn write_event(ev: Event, wait: Event, ex: &Arc<RawExecutor<UringReactor>>) {
230 let wait = UringSource::new(wait, ex).unwrap();
231 ev.write_count(55).unwrap();
232 read_u64(&wait).await;
233 ev.write_count(66).unwrap();
234 read_u64(&wait).await;
235 ev.write_count(77).unwrap();
236 read_u64(&wait).await;
237 }
238
239 async fn read_events(ev: Event, signal: Event, ex: &Arc<RawExecutor<UringReactor>>) {
240 let source = UringSource::new(ev, ex).unwrap();
241 assert_eq!(read_u64(&source).await, 55);
242 signal.signal().unwrap();
243 assert_eq!(read_u64(&source).await, 66);
244 signal.signal().unwrap();
245 assert_eq!(read_u64(&source).await, 77);
246 signal.signal().unwrap();
247 }
248
249 let event = Event::new().unwrap();
250 let signal_wait = Event::new().unwrap();
251 let ex = RawExecutor::<UringReactor>::new().unwrap();
252 let write_task = write_event(
253 event.try_clone().unwrap(),
254 signal_wait.try_clone().unwrap(),
255 &ex,
256 );
257 let read_task = read_events(event, signal_wait, &ex);
258 ex.run_until(futures::future::join(read_task, write_task))
259 .unwrap();
260 }
261
262 #[test]
263 fn pend_on_pipe() {
264 if !is_uring_stable() {
265 return;
266 }
267
268 use std::io::Write;
269
270 use futures::future::Either;
271
272 async fn do_test(ex: &Arc<RawExecutor<UringReactor>>) {
273 let (read_source, mut w) = base::pipe().unwrap();
274 let source = UringSource::new(read_source, ex).unwrap();
275 let done = Box::pin(async { 5usize });
276 let pending = Box::pin(read_u64(&source));
277 match futures::future::select(pending, done).await {
278 Either::Right((5, pending)) => {
279 w.write_all(&[0]).expect("failed to write to pipe");
282 ::std::mem::drop(pending);
283 }
284 _ => panic!("unexpected select result"),
285 };
286 }
287
288 let ex = RawExecutor::<UringReactor>::new().unwrap();
289 ex.run_until(do_test(&ex)).unwrap();
290 }
291
292 #[test]
293 fn range_error() {
294 if !is_uring_stable() {
295 return;
296 }
297
298 async fn go(ex: &Arc<RawExecutor<UringReactor>>) {
299 let f = File::open("/dev/zero").unwrap();
300 let source = UringSource::new(f, ex).unwrap();
301 let v = vec![0x55u8; 64];
302 let vw = Arc::new(VecIoWrapper::from(v));
303 let ret = source
304 .read_to_mem(
305 None,
306 Arc::<VecIoWrapper>::clone(&vw),
307 [MemRegion {
308 offset: 32,
309 len: 33,
310 }],
311 )
312 .await;
313 assert!(ret.is_err());
314 }
315
316 let ex = RawExecutor::<UringReactor>::new().unwrap();
317 ex.run_until(go(&ex)).unwrap();
318 }
319
320 #[test]
321 fn wait_read() {
322 if !is_uring_stable() {
323 return;
324 }
325
326 async fn go(ex: &Arc<RawExecutor<UringReactor>>) {
327 let f = File::open("/dev/zero").unwrap();
328 let source = UringSource::new(f, ex).unwrap();
329 source.wait_readable().await.unwrap();
330 }
331
332 let ex = RawExecutor::<UringReactor>::new().unwrap();
333 ex.run_until(go(&ex)).unwrap();
334 }
335
336 struct State {
337 should_quit: bool,
338 waker: Option<Waker>,
339 }
340
341 impl State {
342 fn wake(&mut self) {
343 self.should_quit = true;
344 let waker = self.waker.take();
345
346 if let Some(waker) = waker {
347 waker.wake();
348 }
349 }
350 }
351
352 struct Quit {
353 state: Arc<Mutex<State>>,
354 }
355
356 impl Future for Quit {
357 type Output = ();
358
359 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
360 let mut state = self.state.lock();
361 if state.should_quit {
362 return Poll::Ready(());
363 }
364
365 state.waker = Some(cx.waker().clone());
366 Poll::Pending
367 }
368 }
369
370 #[cfg(any(target_os = "android", target_os = "linux"))]
371 #[test]
372 fn await_uring_from_poll() {
373 if !is_uring_stable() {
374 return;
375 }
376 async fn go(source: IoSource<File>) {
378 let v = vec![0xa4u8; 16];
379 let (len, vec) = source.read_to_vec(None, v).await.unwrap();
380 assert_eq!(len, 16);
381 assert!(vec.iter().all(|&b| b == 0));
382 }
383
384 let state = Arc::new(Mutex::new(State {
385 should_quit: false,
386 waker: None,
387 }));
388
389 let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap();
390 let f = File::open("/dev/zero").unwrap();
391 let source = uring_ex.async_from(f).unwrap();
392
393 let quit = Quit {
394 state: state.clone(),
395 };
396 let handle = std::thread::spawn(move || uring_ex.run_until(quit));
397
398 let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap();
399 poll_ex.run_until(go(source)).unwrap();
400
401 state.lock().wake();
402 handle.join().unwrap().unwrap();
403 }
404
405 #[cfg(any(target_os = "android", target_os = "linux"))]
406 #[test]
407 fn await_poll_from_uring() {
408 if !is_uring_stable() {
409 return;
410 }
411 async fn go(source: IoSource<File>) {
413 let v = vec![0x2cu8; 16];
414 let (len, vec) = source.read_to_vec(None, v).await.unwrap();
415 assert_eq!(len, 16);
416 assert!(vec.iter().all(|&b| b == 0));
417 }
418
419 let state = Arc::new(Mutex::new(State {
420 should_quit: false,
421 waker: None,
422 }));
423
424 let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap();
425 let f = File::open("/dev/zero").unwrap();
426 let source = poll_ex.async_from(f).unwrap();
427
428 let quit = Quit {
429 state: state.clone(),
430 };
431 let handle = std::thread::spawn(move || poll_ex.run_until(quit));
432
433 let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap();
434 uring_ex.run_until(go(source)).unwrap();
435
436 state.lock().wake();
437 handle.join().unwrap().unwrap();
438 }
439}