1use std::convert::TryInto;
47use std::ffi::CStr;
48use std::fs::File;
49use std::future::Future;
50use std::io;
51use std::mem;
52use std::mem::MaybeUninit;
53use std::os::unix::io::FromRawFd;
54use std::os::unix::io::RawFd;
55use std::pin::Pin;
56use std::sync::Arc;
57use std::sync::LazyLock;
58use std::sync::Weak;
59use std::task::Context;
60use std::task::Poll;
61use std::task::Waker;
62use std::thread;
63use std::thread::ThreadId;
64
65use base::trace;
66use base::warn;
67use base::AsRawDescriptor;
68use base::EventType;
69use base::IoBufMut;
70use base::RawDescriptor;
71use io_uring::URingAllowlist;
72use io_uring::URingContext;
73use io_uring::URingOperation;
74use remain::sorted;
75use slab::Slab;
76use sync::Mutex;
77use thiserror::Error as ThisError;
78
79use crate::common_executor::RawExecutor;
80use crate::common_executor::RawTaskHandle;
81use crate::common_executor::Reactor;
82use crate::mem::BackingMemory;
83use crate::waker::WakerToken;
84use crate::waker::WeakWake;
85use crate::AsyncError;
86use crate::AsyncResult;
87use crate::IoSource;
88use crate::MemRegion;
89use crate::TaskHandle;
90
91#[sorted]
92#[derive(Debug, ThisError)]
93pub enum Error {
94 #[error("Error creating the fd waiting context: {0}")]
96 CreatingContext(io_uring::Error),
97 #[error("Failed to discard a block: {0}")]
99 Discard(base::Error),
100 #[error("Failed to copy the FD for the polling context: {0}")]
102 DuplicatingFd(base::Error),
103 #[error("Error enabling the URing context: {0}")]
105 EnablingContext(io_uring::Error),
106 #[error("The executor is gone")]
108 ExecutorGone,
109 #[error("Invalid offset/len for getting an iovec")]
111 InvalidOffset,
112 #[error("Invalid source, FD not registered for use")]
114 InvalidSource,
115 #[error("Error during IO: {0}")]
117 Io(io::Error),
118 #[error("Error registering restrictions to the URing context: {0}")]
120 RegisteringURingRestriction(io_uring::Error),
121 #[error("Error removing from the URing context: {0}")]
123 RemovingWaker(io_uring::Error),
124 #[error("Error adding to the URing context: {0}")]
126 SubmittingOp(io_uring::Error),
127 #[error("URingContext failure: {0}")]
129 URingContextError(io_uring::Error),
130 #[error("URing::enter: {0}")]
132 URingEnter(io_uring::Error),
133}
134pub type Result<T> = std::result::Result<T, Error>;
135
136impl From<Error> for io::Error {
137 fn from(e: Error) -> Self {
138 use Error::*;
139 match e {
140 Discard(e) => e.into(),
141 DuplicatingFd(e) => e.into(),
142 ExecutorGone => io::Error::other(ExecutorGone),
143 InvalidOffset => io::Error::new(io::ErrorKind::InvalidInput, InvalidOffset),
144 InvalidSource => io::Error::new(io::ErrorKind::InvalidData, InvalidSource),
145 Io(e) => e,
146 CreatingContext(e) => e.into(),
147 RemovingWaker(e) => e.into(),
148 SubmittingOp(e) => e.into(),
149 URingContextError(e) => e.into(),
150 URingEnter(e) => e.into(),
151 EnablingContext(e) => e.into(),
152 RegisteringURingRestriction(e) => e.into(),
153 }
154 }
155}
156
157impl From<Error> for AsyncError {
158 fn from(e: Error) -> AsyncError {
159 AsyncError::SysVariants(e.into())
160 }
161}
162
163static IS_URING_STABLE: LazyLock<bool> = LazyLock::new(|| {
164 let mut utsname = MaybeUninit::zeroed();
165
166 let res = unsafe { libc::uname(utsname.as_mut_ptr()) };
169 if res < 0 {
170 return false;
171 }
172
173 let utsname = unsafe { utsname.assume_init() };
176
177 let release = unsafe { CStr::from_ptr(utsname.release.as_ptr()) };
180
181 let mut components = match release.to_str().map(|r| r.split('.').map(str::parse)) {
182 Ok(c) => c,
183 Err(_) => return false,
184 };
185
186 match (components.next(), components.next()) {
188 (Some(Ok(major)), Some(Ok(minor))) if (major, minor) >= (5, 10) => {
189 URingContext::new(8, None).is_ok()
191 }
192 _ => false,
193 }
194});
195
196pub fn is_uring_stable() -> bool {
200 *IS_URING_STABLE
201}
202
203pub(crate) fn check_uring_availability() -> Result<()> {
207 URingContext::new(8, None)
208 .map(drop)
209 .map_err(Error::URingContextError)
210}
211
212pub struct RegisteredSource {
213 tag: usize,
214 ex: Weak<RawExecutor<UringReactor>>,
215}
216
217impl RegisteredSource {
218 pub fn start_read_to_mem(
219 &self,
220 file_offset: Option<u64>,
221 mem: Arc<dyn BackingMemory + Send + Sync>,
222 addrs: impl IntoIterator<Item = MemRegion>,
223 ) -> Result<PendingOperation> {
224 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
225 let token = ex
226 .reactor
227 .submit_read_to_vectored(self, mem, file_offset, addrs)?;
228
229 Ok(PendingOperation {
230 waker_token: Some(token),
231 ex: self.ex.clone(),
232 submitted: false,
233 })
234 }
235
236 pub fn start_write_from_mem(
237 &self,
238 file_offset: Option<u64>,
239 mem: Arc<dyn BackingMemory + Send + Sync>,
240 addrs: impl IntoIterator<Item = MemRegion>,
241 ) -> Result<PendingOperation> {
242 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
243 let token = ex
244 .reactor
245 .submit_write_from_vectored(self, mem, file_offset, addrs)?;
246
247 Ok(PendingOperation {
248 waker_token: Some(token),
249 ex: self.ex.clone(),
250 submitted: false,
251 })
252 }
253
254 pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {
255 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
256 let token = ex.reactor.submit_fallocate(self, offset, len, mode)?;
257
258 Ok(PendingOperation {
259 waker_token: Some(token),
260 ex: self.ex.clone(),
261 submitted: false,
262 })
263 }
264
265 pub fn start_fsync(&self) -> Result<PendingOperation> {
266 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
267 let token = ex.reactor.submit_fsync(self)?;
268
269 Ok(PendingOperation {
270 waker_token: Some(token),
271 ex: self.ex.clone(),
272 submitted: false,
273 })
274 }
275
276 pub fn poll_fd_readable(&self) -> Result<PendingOperation> {
277 let events = EventType::Read;
278
279 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
280 let token = ex.reactor.submit_poll(self, events)?;
281
282 Ok(PendingOperation {
283 waker_token: Some(token),
284 ex: self.ex.clone(),
285 submitted: false,
286 })
287 }
288}
289
290impl Drop for RegisteredSource {
291 fn drop(&mut self) {
292 if let Some(ex) = self.ex.upgrade() {
293 ex.reactor.deregister_source(self);
294 }
295 }
296}
297
298const NUM_ENTRIES: usize = 256;
300
301struct OpData {
303 _file: Arc<File>,
304 _mem: Option<Arc<dyn BackingMemory + Send + Sync>>,
305 waker: Option<Waker>,
306 canceled: bool,
307}
308
309enum OpStatus {
311 Nop,
312 Pending(OpData),
313 Completed(Option<::std::io::Result<u32>>),
314}
315
316struct Ring {
317 ops: Slab<OpStatus>,
318 registered_sources: Slab<Arc<File>>,
319}
320
321pub struct UringReactor {
323 ctx: URingContext,
326 ring: Mutex<Ring>,
327 thread_id: Mutex<Option<ThreadId>>,
328}
329
330impl UringReactor {
331 fn new() -> Result<UringReactor> {
332 let mut restrictions = URingAllowlist::new();
334 let ops = [
335 URingOperation::Writev,
336 URingOperation::Readv,
337 URingOperation::Nop,
338 URingOperation::Fsync,
339 URingOperation::Fallocate,
340 URingOperation::PollAdd,
341 URingOperation::PollRemove,
342 URingOperation::AsyncCancel,
343 ];
344 for op in ops {
345 restrictions.allow_submit_operation(op);
346 }
347
348 let ctx =
349 URingContext::new(NUM_ENTRIES, Some(&restrictions)).map_err(Error::CreatingContext)?;
350
351 Ok(UringReactor {
352 ctx,
353 ring: Mutex::new(Ring {
354 ops: Slab::with_capacity(NUM_ENTRIES),
355 registered_sources: Slab::with_capacity(NUM_ENTRIES),
356 }),
357 thread_id: Mutex::new(None),
358 })
359 }
360
361 fn runs_tasks_on_current_thread(&self) -> bool {
362 let executor_thread = self.thread_id.lock();
363 executor_thread
364 .map(|id| id == thread::current().id())
365 .unwrap_or(false)
366 }
367
368 fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {
369 let mut ring = self.ring.lock();
370
371 let op = ring
372 .ops
373 .get_mut(token.0)
374 .expect("`get_result` called on unknown operation");
375 match op {
376 OpStatus::Nop => panic!("`get_result` called on nop"),
377 OpStatus::Pending(data) => {
378 if data.canceled {
379 panic!("`get_result` called on canceled operation");
380 }
381 data.waker = Some(cx.waker().clone());
382 None
383 }
384 OpStatus::Completed(res) => {
385 let out = res.take();
386 ring.ops.remove(token.0);
387 Some(out.expect("Missing result in completed operation"))
388 }
389 }
390 }
391
392 fn cancel_operation(&self, token: WakerToken) {
394 let mut ring = self.ring.lock();
395 let submit_cancel = if let Some(op) = ring.ops.get_mut(token.0) {
396 match op {
397 OpStatus::Nop => panic!("`cancel_operation` called on nop"),
398 OpStatus::Pending(data) => {
399 if data.canceled {
400 panic!("uring operation canceled more than once");
401 }
402
403 if let Some(waker) = data.waker.take() {
404 waker.wake();
405 }
406 data.waker = None;
408 data.canceled = true;
409
410 true
414 }
415 OpStatus::Completed(_) => {
416 ring.ops.remove(token.0);
417 false
418 }
419 }
420 } else {
421 false
422 };
423 std::mem::drop(ring);
424 if submit_cancel {
425 let _best_effort = self.submit_cancel_async(token.0);
426 }
427 }
428
429 pub(crate) fn register_source<F: AsRawDescriptor>(
430 &self,
431 raw: &Arc<RawExecutor<UringReactor>>,
432 fd: &F,
433 ) -> Result<RegisteredSource> {
434 let duped_fd = unsafe { File::from_raw_fd(dup_fd(fd.as_raw_descriptor())?) };
438
439 Ok(RegisteredSource {
440 tag: self
441 .ring
442 .lock()
443 .registered_sources
444 .insert(Arc::new(duped_fd)),
445 ex: Arc::downgrade(raw),
446 })
447 }
448
449 fn deregister_source(&self, source: &RegisteredSource) {
450 self.ring.lock().registered_sources.remove(source.tag);
454 }
455
456 fn submit_poll(
457 &self,
458 source: &RegisteredSource,
459 events: base::EventType,
460 ) -> Result<WakerToken> {
461 let mut ring = self.ring.lock();
462 let src = ring
463 .registered_sources
464 .get(source.tag)
465 .ok_or(Error::InvalidSource)?
466 .clone();
467 let entry = ring.ops.vacant_entry();
468 let next_op_token = entry.key();
469 self.ctx
470 .add_poll_fd(src.as_raw_descriptor(), events, usize_to_u64(next_op_token))
471 .map_err(Error::SubmittingOp)?;
472 entry.insert(OpStatus::Pending(OpData {
473 _file: src,
474 _mem: None,
475 waker: None,
476 canceled: false,
477 }));
478
479 Ok(WakerToken(next_op_token))
480 }
481
482 fn submit_fallocate(
483 &self,
484 source: &RegisteredSource,
485 offset: u64,
486 len: u64,
487 mode: u32,
488 ) -> Result<WakerToken> {
489 let mut ring = self.ring.lock();
490 let src = ring
491 .registered_sources
492 .get(source.tag)
493 .ok_or(Error::InvalidSource)?
494 .clone();
495 let entry = ring.ops.vacant_entry();
496 let next_op_token = entry.key();
497 self.ctx
498 .add_fallocate(
499 src.as_raw_descriptor(),
500 offset,
501 len,
502 mode,
503 usize_to_u64(next_op_token),
504 )
505 .map_err(Error::SubmittingOp)?;
506
507 entry.insert(OpStatus::Pending(OpData {
508 _file: src,
509 _mem: None,
510 waker: None,
511 canceled: false,
512 }));
513
514 Ok(WakerToken(next_op_token))
515 }
516
517 fn submit_cancel_async(&self, token: usize) -> Result<WakerToken> {
518 let mut ring = self.ring.lock();
519 let entry = ring.ops.vacant_entry();
520 let next_op_token = entry.key();
521 self.ctx
522 .async_cancel(usize_to_u64(token), usize_to_u64(next_op_token))
523 .map_err(Error::SubmittingOp)?;
524
525 entry.insert(OpStatus::Nop);
526
527 Ok(WakerToken(next_op_token))
528 }
529
530 fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
531 let mut ring = self.ring.lock();
532 let src = ring
533 .registered_sources
534 .get(source.tag)
535 .ok_or(Error::InvalidSource)?
536 .clone();
537 let entry = ring.ops.vacant_entry();
538 let next_op_token = entry.key();
539 self.ctx
540 .add_fsync(src.as_raw_descriptor(), usize_to_u64(next_op_token))
541 .map_err(Error::SubmittingOp)?;
542 entry.insert(OpStatus::Pending(OpData {
543 _file: src,
544 _mem: None,
545 waker: None,
546 canceled: false,
547 }));
548
549 Ok(WakerToken(next_op_token))
550 }
551
552 fn submit_read_to_vectored(
553 &self,
554 source: &RegisteredSource,
555 mem: Arc<dyn BackingMemory + Send + Sync>,
556 offset: Option<u64>,
557 addrs: impl IntoIterator<Item = MemRegion>,
558 ) -> Result<WakerToken> {
559 let iovecs = addrs
560 .into_iter()
561 .map(|mem_range| {
562 let vslice = mem
563 .get_volatile_slice(mem_range)
564 .map_err(|_| Error::InvalidOffset)?;
565 Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })
569 })
570 .collect::<Result<Vec<_>>>()?;
571 let iovecs = Pin::from(iovecs.into_boxed_slice());
572
573 let mut ring = self.ring.lock();
574 let src = ring
575 .registered_sources
576 .get(source.tag)
577 .ok_or(Error::InvalidSource)?
578 .clone();
579
580 let entry = ring.ops.vacant_entry();
581 let next_op_token = entry.key();
582
583 unsafe {
588 self.ctx
589 .add_readv(
590 iovecs,
591 src.as_raw_descriptor(),
592 offset,
593 usize_to_u64(next_op_token),
594 )
595 .map_err(Error::SubmittingOp)?;
596 }
597
598 entry.insert(OpStatus::Pending(OpData {
599 _file: src,
600 _mem: Some(mem),
601 waker: None,
602 canceled: false,
603 }));
604
605 Ok(WakerToken(next_op_token))
606 }
607
608 fn submit_write_from_vectored(
609 &self,
610 source: &RegisteredSource,
611 mem: Arc<dyn BackingMemory + Send + Sync>,
612 offset: Option<u64>,
613 addrs: impl IntoIterator<Item = MemRegion>,
614 ) -> Result<WakerToken> {
615 let iovecs = addrs
616 .into_iter()
617 .map(|mem_range| {
618 let vslice = mem
619 .get_volatile_slice(mem_range)
620 .map_err(|_| Error::InvalidOffset)?;
621 Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })
625 })
626 .collect::<Result<Vec<_>>>()?;
627 let iovecs = Pin::from(iovecs.into_boxed_slice());
628
629 let mut ring = self.ring.lock();
630 let src = ring
631 .registered_sources
632 .get(source.tag)
633 .ok_or(Error::InvalidSource)?
634 .clone();
635
636 let entry = ring.ops.vacant_entry();
637 let next_op_token = entry.key();
638
639 unsafe {
644 self.ctx
645 .add_writev(
646 iovecs,
647 src.as_raw_descriptor(),
648 offset,
649 usize_to_u64(next_op_token),
650 )
651 .map_err(Error::SubmittingOp)?;
652 }
653
654 entry.insert(OpStatus::Pending(OpData {
655 _file: src,
656 _mem: Some(mem),
657 waker: None,
658 canceled: false,
659 }));
660
661 Ok(WakerToken(next_op_token))
662 }
663}
664
665impl Reactor for UringReactor {
666 fn new() -> std::io::Result<Self> {
667 Ok(UringReactor::new()?)
668 }
669
670 fn wake(&self) {
671 let mut ring = self.ring.lock();
672 let entry = ring.ops.vacant_entry();
673 let next_op_token = entry.key();
674 if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {
675 warn!("Failed to add NOP for waking up executor: {}", e);
676 }
677 entry.insert(OpStatus::Nop);
678 mem::drop(ring);
679
680 match self.ctx.submit() {
681 Ok(()) => {}
682 Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
685 Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),
686 }
687 }
688
689 fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
690 #[allow(clippy::needless_collect)]
696 let ops: Vec<_> = self
697 .ring
698 .lock()
699 .ops
700 .iter_mut()
701 .filter_map(|op| match op.1 {
702 OpStatus::Pending(data) if !data.canceled => Some(op.0),
703 _ => None,
704 })
705 .collect();
706 for token in ops {
707 self.cancel_operation(WakerToken(token));
708 }
709
710 *self.thread_id.lock() = None;
714
715 Box::pin(futures::future::poll_fn(|_cx| {
722 if self.ring.lock().ops.is_empty() {
723 Poll::Ready(())
724 } else {
725 Poll::Pending
726 }
727 }))
728 }
729
730 fn on_thread_start(&self) {
731 let current_thread = thread::current().id();
732 let mut thread_id = self.thread_id.lock();
733 assert_eq!(
734 *thread_id.get_or_insert(current_thread),
735 current_thread,
736 "`UringReactor::wait_for_work` cannot be called from more than one thread"
737 );
738 }
739
740 fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {
741 trace!(
742 "Waiting on events, {} pending ops",
743 self.ring.lock().ops.len()
744 );
745 let events = self.ctx.wait().map_err(Error::URingEnter)?;
746
747 set_processing();
750
751 let mut ring = self.ring.lock();
752 for (raw_token, result) in events {
753 let token = raw_token
757 .try_into()
758 .expect("`u64` doesn't fit inside a `usize`");
759
760 let op = ring
761 .ops
762 .get_mut(token)
763 .expect("Received completion token for unexpected operation");
764 match mem::replace(op, OpStatus::Completed(Some(result))) {
765 OpStatus::Nop => mem::drop(ring.ops.remove(token)),
767 OpStatus::Pending(data) => {
768 if data.canceled {
769 ring.ops.remove(token);
772 }
773 if let Some(waker) = data.waker {
774 waker.wake();
775 }
776 }
777 OpStatus::Completed(_) => panic!("uring operation completed more than once"),
778 }
779 }
780
781 Ok(())
782 }
783
784 fn new_source<F: AsRawDescriptor>(
785 &self,
786 ex: &Arc<RawExecutor<Self>>,
787 f: F,
788 ) -> AsyncResult<IoSource<F>> {
789 Ok(IoSource::Uring(super::UringSource::new(f, ex)?))
790 }
791
792 fn wrap_task_handle<R>(task: RawTaskHandle<UringReactor, R>) -> TaskHandle<R> {
793 TaskHandle::Uring(task)
794 }
795}
796
797impl AsRawDescriptor for UringReactor {
798 fn as_raw_descriptor(&self) -> RawDescriptor {
799 self.ctx.as_raw_descriptor()
800 }
801}
802
803impl WeakWake for UringReactor {
804 fn wake_by_ref(weak_self: &Weak<Self>) {
805 if let Some(arc_self) = weak_self.upgrade() {
806 Reactor::wake(&*arc_self);
807 }
808 }
809}
810
811impl Drop for UringReactor {
812 fn drop(&mut self) {
813 assert!(self.ring.lock().ops.is_empty());
815 }
816}
817
818unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
822 let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
823 if ret < 0 {
824 Err(Error::DuplicatingFd(base::Error::last()))
825 } else {
826 Ok(ret)
827 }
828}
829
830#[inline]
832fn usize_to_u64(val: usize) -> u64 {
833 val.try_into().expect("`usize` doesn't fit inside a `u64`")
834}
835
836pub struct PendingOperation {
837 waker_token: Option<WakerToken>,
838 ex: Weak<RawExecutor<UringReactor>>,
839 submitted: bool,
840}
841
842impl Future for PendingOperation {
843 type Output = Result<u32>;
844
845 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
846 let token = self
847 .waker_token
848 .as_ref()
849 .expect("PendingOperation polled after returning Poll::Ready");
850 if let Some(ex) = self.ex.upgrade() {
851 if let Some(result) = ex.reactor.get_result(token, cx) {
852 self.waker_token = None;
853 Poll::Ready(result.map_err(Error::Io))
854 } else {
855 if !self.submitted && !ex.reactor.runs_tasks_on_current_thread() {
859 match ex.reactor.ctx.submit() {
860 Ok(()) => self.submitted = true,
861 Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
865 Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
866 }
867 }
868 Poll::Pending
869 }
870 } else {
871 Poll::Ready(Err(Error::ExecutorGone))
872 }
873 }
874}
875
876impl Drop for PendingOperation {
877 fn drop(&mut self) {
878 if let Some(waker_token) = self.waker_token.take() {
879 if let Some(ex) = self.ex.upgrade() {
880 ex.reactor.cancel_operation(waker_token);
881 }
882 }
883 }
884}
885
886#[cfg(test)]
887mod tests {
888 use std::future::Future;
889 use std::io::Read;
890 use std::io::Write;
891 use std::mem;
892 use std::pin::Pin;
893 use std::rc::Rc;
894 use std::task::Context;
895 use std::task::Poll;
896
897 use futures::executor::block_on;
898
899 use super::*;
900 use crate::mem::BackingMemory;
901 use crate::mem::MemRegion;
902 use crate::mem::VecIoWrapper;
903 use crate::BlockingPool;
904 use crate::ExecutorTrait;
905
906 struct UringQueueEmpty<'a> {
908 ex: &'a Arc<RawExecutor<UringReactor>>,
909 }
910
911 impl Future for UringQueueEmpty<'_> {
912 type Output = ();
913
914 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
915 if self.ex.reactor.ring.lock().ops.is_empty() {
916 Poll::Ready(())
917 } else {
918 Poll::Pending
919 }
920 }
921 }
922
923 #[test]
924 fn dont_drop_backing_mem_read() {
925 if !is_uring_stable() {
926 return;
927 }
928
929 let bm =
932 Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
933
934 let (rx, mut tx) = base::pipe().unwrap();
936
937 let ex = RawExecutor::<UringReactor>::new().unwrap();
939
940 let registered_source = ex
942 .reactor
943 .register_source(&ex, &rx)
944 .expect("register source failed");
945
946 let pending_op = registered_source
949 .start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
950 .expect("failed to start read to mem");
951
952 assert_eq!(Arc::strong_count(&bm), 2);
955
956 drop(pending_op);
959 assert_eq!(Arc::strong_count(&bm), 2);
960
961 tx.write_all(&[0u8; 8]).expect("write failed");
965 ex.run_until(UringQueueEmpty { ex: &ex })
966 .expect("Failed to wait for read pipe ready");
967 assert_eq!(Arc::strong_count(&bm), 1);
968 }
969
970 #[test]
971 fn dont_drop_backing_mem_write() {
972 if !is_uring_stable() {
973 return;
974 }
975
976 let bm =
979 Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
980
981 let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed");
983
984 let ex = RawExecutor::<UringReactor>::new().unwrap();
986
987 let registered_source = ex
989 .reactor
990 .register_source(&ex, &tx)
991 .expect("register source failed");
992
993 let pending_op = registered_source
996 .start_write_from_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
997 .expect("failed to start write to mem");
998
999 assert_eq!(Arc::strong_count(&bm), 2);
1002
1003 drop(pending_op);
1006 assert_eq!(Arc::strong_count(&bm), 2);
1007
1008 let mut buf = vec![0u8; base::round_up_to_page_size(1)];
1012 rx.read_exact(&mut buf).expect("read to empty failed");
1013 ex.run_until(UringQueueEmpty { ex: &ex })
1014 .expect("Failed to wait for write pipe ready");
1015 assert_eq!(Arc::strong_count(&bm), 1);
1016 }
1017
1018 #[test]
1019 fn canceled_before_completion() {
1020 if !is_uring_stable() {
1021 return;
1022 }
1023
1024 async fn cancel_io(op: PendingOperation) {
1025 mem::drop(op);
1026 }
1027
1028 async fn check_result(op: PendingOperation, expected: u32) {
1029 let actual = op.await.expect("operation failed to complete");
1030 assert_eq!(expected, actual);
1031 }
1032
1033 let bm =
1034 Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1035
1036 let (rx, tx) = base::pipe().expect("Pipe failed");
1037
1038 let ex = RawExecutor::<UringReactor>::new().unwrap();
1039
1040 let rx_source = ex
1041 .reactor
1042 .register_source(&ex, &rx)
1043 .expect("register source failed");
1044 let tx_source = ex
1045 .reactor
1046 .register_source(&ex, &tx)
1047 .expect("register source failed");
1048
1049 let read_task = rx_source
1050 .start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
1051 .expect("failed to start read to mem");
1052
1053 ex.spawn_local(cancel_io(read_task)).detach();
1054
1055 let buf =
1057 Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1058 let write_task = tx_source
1059 .start_write_from_mem(None, Arc::clone(&buf), [MemRegion { offset: 0, len: 8 }])
1060 .expect("failed to start write from mem");
1061
1062 ex.run_until(check_result(write_task, 8))
1063 .expect("Failed to run executor");
1064 }
1065
1066 #[ignore]
1068 #[test]
1069 fn drop_before_completion() {
1070 if !is_uring_stable() {
1071 return;
1072 }
1073
1074 const VALUE: u64 = 0xef6c_a8df_b842_eb9c;
1075
1076 async fn check_op(op: PendingOperation) {
1077 let err = op.await.expect_err("Op completed successfully");
1078 match err {
1079 Error::ExecutorGone => {}
1080 e => panic!("Unexpected error from op: {e}"),
1081 }
1082 }
1083
1084 let (mut rx, mut tx) = base::pipe().expect("Pipe failed");
1085
1086 let ex = RawExecutor::<UringReactor>::new().unwrap();
1087
1088 let tx_source = ex
1089 .reactor
1090 .register_source(&ex, &tx)
1091 .expect("Failed to register source");
1092 let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));
1093 let op = tx_source
1094 .start_write_from_mem(
1095 None,
1096 bm,
1097 [MemRegion {
1098 offset: 0,
1099 len: mem::size_of::<u64>(),
1100 }],
1101 )
1102 .expect("Failed to start write from mem");
1103
1104 ex.spawn_local(check_op(op)).detach();
1105
1106 mem::drop(ex);
1108
1109 let new_val = [0x2e; 8];
1111 tx.write_all(&new_val).unwrap();
1112
1113 let mut buf = 0u64.to_ne_bytes();
1114 rx.read_exact(&mut buf[..])
1115 .expect("Failed to read from pipe");
1116
1117 assert_eq!(buf, new_val);
1118 }
1119
1120 #[test]
1122 fn drop_detached_blocking_pool() {
1123 if !is_uring_stable() {
1124 return;
1125 }
1126
1127 struct Cleanup(BlockingPool);
1128
1129 impl Drop for Cleanup {
1130 fn drop(&mut self) {
1131 self.0
1133 .shutdown(Some(
1134 std::time::Instant::now() + std::time::Duration::from_secs(1),
1135 ))
1136 .unwrap();
1137 }
1138 }
1139
1140 let rc = Rc::new(std::cell::Cell::new(0));
1141 {
1142 let ex = RawExecutor::<UringReactor>::new().unwrap();
1143 let rc_clone = rc.clone();
1144 ex.spawn_local(async move {
1145 rc_clone.set(1);
1146 let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
1147 let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
1148 let blocking_task = pool.0.spawn(move || {
1150 assert_eq!(recv.recv(), Ok(()));
1152 assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
1154 });
1155 send.send(()).unwrap();
1160 blocking_task.await;
1162 rc_clone.set(2);
1163 })
1164 .detach();
1165 ex.run_until(async {}).unwrap();
1166 }
1171 assert_eq!(rc.get(), 1);
1172 Rc::try_unwrap(rc).expect("Rc had too many refs");
1173 }
1174
1175 #[test]
1176 fn drop_on_different_thread() {
1177 if !is_uring_stable() {
1178 return;
1179 }
1180
1181 let ex = RawExecutor::<UringReactor>::new().unwrap();
1182
1183 let ex2 = ex.clone();
1184 let t = thread::spawn(move || ex2.run_until(async {}));
1185
1186 t.join().unwrap().unwrap();
1187
1188 let (_rx, tx) = base::pipe().expect("Pipe failed");
1191 let tx = ex
1192 .reactor
1193 .register_source(&ex, &tx)
1194 .expect("Failed to register source");
1195 let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
1196 let op = tx
1197 .start_write_from_mem(
1198 None,
1199 bm,
1200 [MemRegion {
1201 offset: 0,
1202 len: mem::size_of::<u64>(),
1203 }],
1204 )
1205 .expect("Failed to start write from mem");
1206
1207 mem::drop(ex);
1208
1209 match block_on(op).expect_err("Pending operation completed after executor was dropped") {
1210 Error::ExecutorGone => {}
1211 e => panic!("Unexpected error after dropping executor: {e}"),
1212 }
1213 }
1214}