1use std::cell::UnsafeCell;
6use std::hint;
7use std::mem;
8use std::ops::Deref;
9use std::ops::DerefMut;
10use std::sync::atomic::AtomicUsize;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13use std::thread::yield_now;
14
15use super::super::sync::waiter::Kind as WaiterKind;
16use super::super::sync::waiter::Waiter;
17use super::super::sync::waiter::WaiterAdapter;
18use super::super::sync::waiter::WaiterList;
19use super::super::sync::waiter::WaitingFor;
20
21const LOCKED: usize = 1 << 0;
23const HAS_WAITERS: usize = 1 << 1;
25const DESIGNATED_WAKER: usize = 1 << 2;
29const SPINLOCK: usize = 1 << 3;
32const WRITER_WAITING: usize = 1 << 4;
36const LONG_WAIT: usize = 1 << 5;
40const READ_LOCK: usize = 1 << 8;
44const READ_MASK: usize = !0xff;
46
47const SPIN_THRESHOLD: usize = 7;
49
50const LONG_WAIT_THRESHOLD: usize = 19;
55
56trait Kind {
58 fn zero_to_acquire() -> usize;
62
63 fn add_to_acquire() -> usize;
66
67 fn set_when_waiting() -> usize;
70
71 fn clear_on_acquire() -> usize;
73
74 fn new_waiter(raw: &RawRwLock) -> Arc<Waiter>;
76}
77
78struct Shared;
81
82impl Kind for Shared {
83 fn zero_to_acquire() -> usize {
84 LOCKED | WRITER_WAITING | LONG_WAIT
85 }
86
87 fn add_to_acquire() -> usize {
88 READ_LOCK
89 }
90
91 fn set_when_waiting() -> usize {
92 0
93 }
94
95 fn clear_on_acquire() -> usize {
96 0
97 }
98
99 fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
100 Arc::new(Waiter::new(
101 WaiterKind::Shared,
102 cancel_waiter,
103 raw as *const RawRwLock as usize,
104 WaitingFor::Mutex,
105 ))
106 }
107}
108
109struct Exclusive;
112
113impl Kind for Exclusive {
114 fn zero_to_acquire() -> usize {
115 LOCKED | READ_MASK | LONG_WAIT
116 }
117
118 fn add_to_acquire() -> usize {
119 LOCKED
120 }
121
122 fn set_when_waiting() -> usize {
123 WRITER_WAITING
124 }
125
126 fn clear_on_acquire() -> usize {
127 WRITER_WAITING
128 }
129
130 fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
131 Arc::new(Waiter::new(
132 WaiterKind::Exclusive,
133 cancel_waiter,
134 raw as *const RawRwLock as usize,
135 WaitingFor::Mutex,
136 ))
137 }
138}
139
140fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, usize) {
152 let mut to_wake = WaiterList::new(WaiterAdapter::new());
153 let mut set_on_release = 0;
154 let mut cursor = waiters.front_mut();
155
156 let mut waking_readers = false;
157 while let Some(w) = cursor.get() {
158 match w.kind() {
159 WaiterKind::Exclusive if !waking_readers => {
160 let waiter = cursor.remove().unwrap();
162 waiter.set_waiting_for(WaitingFor::None);
163 to_wake.push_back(waiter);
164 break;
165 }
166
167 WaiterKind::Shared => {
168 let waiter = cursor.remove().unwrap();
171 waiter.set_waiting_for(WaitingFor::None);
172 to_wake.push_back(waiter);
173 waking_readers = true;
174 }
175
176 WaiterKind::Exclusive => {
177 set_on_release |= WRITER_WAITING;
182 cursor.move_next();
183 }
184 }
185 }
186
187 (to_wake, set_on_release)
188}
189
190#[inline]
191fn cpu_relax(iterations: usize) {
192 for _ in 0..iterations {
193 hint::spin_loop();
194 }
195}
196
197pub(crate) struct RawRwLock {
198 state: AtomicUsize,
199 waiters: UnsafeCell<WaiterList>,
200}
201
202impl RawRwLock {
203 pub fn new() -> RawRwLock {
204 RawRwLock {
205 state: AtomicUsize::new(0),
206 waiters: UnsafeCell::new(WaiterList::new(WaiterAdapter::new())),
207 }
208 }
209
210 #[inline]
211 pub async fn lock(&self) {
212 match self
213 .state
214 .compare_exchange_weak(0, LOCKED, Ordering::Acquire, Ordering::Relaxed)
215 {
216 Ok(_) => {}
217 Err(oldstate) => {
218 if (oldstate & Exclusive::zero_to_acquire()) != 0
221 || self
222 .state
223 .compare_exchange_weak(
224 oldstate,
225 (oldstate + Exclusive::add_to_acquire())
226 & !Exclusive::clear_on_acquire(),
227 Ordering::Acquire,
228 Ordering::Relaxed,
229 )
230 .is_err()
231 {
232 self.lock_slow::<Exclusive>(0, 0).await;
233 }
234 }
235 }
236 }
237
238 #[inline]
239 pub async fn read_lock(&self) {
240 match self
241 .state
242 .compare_exchange_weak(0, READ_LOCK, Ordering::Acquire, Ordering::Relaxed)
243 {
244 Ok(_) => {}
245 Err(oldstate) => {
246 if (oldstate & Shared::zero_to_acquire()) != 0
247 || self
248 .state
249 .compare_exchange_weak(
250 oldstate,
251 (oldstate + Shared::add_to_acquire()) & !Shared::clear_on_acquire(),
252 Ordering::Acquire,
253 Ordering::Relaxed,
254 )
255 .is_err()
256 {
257 self.lock_slow::<Shared>(0, 0).await;
258 }
259 }
260 }
261 }
262
263 #[cold]
267 async fn lock_slow<K: Kind>(&self, mut clear: usize, zero_mask: usize) {
268 let mut zero_to_acquire = K::zero_to_acquire() & !zero_mask;
269
270 let mut spin_count = 0;
271 let mut wait_count = 0;
272 let mut waiter = None;
273 loop {
274 let oldstate = self.state.load(Ordering::Relaxed);
275 if (oldstate & zero_to_acquire) == 0 {
278 if self
279 .state
280 .compare_exchange_weak(
281 oldstate,
282 (oldstate + K::add_to_acquire()) & !(clear | K::clear_on_acquire()),
283 Ordering::Acquire,
284 Ordering::Relaxed,
285 )
286 .is_ok()
287 {
288 return;
289 }
290 } else if (oldstate & SPINLOCK) == 0 {
291 let w = waiter.get_or_insert_with(|| K::new_waiter(self));
294 w.reset(WaitingFor::Mutex);
295
296 if self
297 .state
298 .compare_exchange_weak(
299 oldstate,
300 (oldstate | SPINLOCK | HAS_WAITERS | K::set_when_waiting()) & !clear,
301 Ordering::Acquire,
302 Ordering::Relaxed,
303 )
304 .is_ok()
305 {
306 let mut set_on_release = 0;
307
308 if wait_count < LONG_WAIT_THRESHOLD {
309 unsafe { (*self.waiters.get()).push_back(w.clone()) };
314 } else {
315 unsafe { (*self.waiters.get()).push_front(w.clone()) };
322
323 set_on_release |= LONG_WAIT;
326
327 clear |= LONG_WAIT;
329
330 zero_to_acquire &= !LONG_WAIT;
333 }
334
335 let mut state = oldstate;
337 loop {
338 match self.state.compare_exchange_weak(
339 state,
340 (state | set_on_release) & !SPINLOCK,
341 Ordering::Release,
342 Ordering::Relaxed,
343 ) {
344 Ok(_) => break,
345 Err(w) => state = w,
346 }
347 }
348
349 w.wait().await;
351
352 clear |= DESIGNATED_WAKER;
357
358 zero_to_acquire &= !WRITER_WAITING;
362
363 spin_count = 0;
365
366 wait_count += 1;
368
369 continue;
371 }
372 }
373
374 if spin_count < SPIN_THRESHOLD {
379 cpu_relax(1 << spin_count);
380 spin_count += 1;
381 } else {
382 yield_now();
383 }
384 }
385 }
386
387 #[inline]
388 pub fn unlock(&self) {
389 let oldstate = self.state.fetch_sub(LOCKED, Ordering::Release);
392
393 debug_assert_eq!(
396 oldstate & READ_MASK,
397 0,
398 "`unlock` called on rwlock held in read-mode"
399 );
400 debug_assert_ne!(
401 oldstate & LOCKED,
402 0,
403 "`unlock` called on rwlock not held in write-mode"
404 );
405
406 if (oldstate & HAS_WAITERS) != 0 && (oldstate & DESIGNATED_WAKER) == 0 {
407 self.unlock_slow();
409 }
410 }
411
412 #[inline]
413 pub fn read_unlock(&self) {
414 let oldstate = self.state.fetch_sub(READ_LOCK, Ordering::Release);
417
418 debug_assert_eq!(
419 oldstate & LOCKED,
420 0,
421 "`read_unlock` called on rwlock held in write-mode"
422 );
423 debug_assert_ne!(
424 oldstate & READ_MASK,
425 0,
426 "`read_unlock` called on rwlock not held in read-mode"
427 );
428
429 if (oldstate & HAS_WAITERS) != 0
430 && (oldstate & DESIGNATED_WAKER) == 0
431 && (oldstate & READ_MASK) == READ_LOCK
432 {
433 self.unlock_slow();
436 }
437 }
438
439 #[cold]
440 fn unlock_slow(&self) {
441 let mut spin_count = 0;
442
443 loop {
444 let oldstate = self.state.load(Ordering::Relaxed);
445 if (oldstate & HAS_WAITERS) == 0 || (oldstate & DESIGNATED_WAKER) != 0 {
446 return;
448 } else if (oldstate & SPINLOCK) == 0 {
449 if self
452 .state
453 .compare_exchange_weak(
454 oldstate,
455 oldstate | SPINLOCK | DESIGNATED_WAKER,
456 Ordering::Acquire,
457 Ordering::Relaxed,
458 )
459 .is_ok()
460 {
461 let mut clear = SPINLOCK;
465
466 let waiters = unsafe { &mut *self.waiters.get() };
470 let (wake_list, set_on_release) = get_wake_list(waiters);
471
472 if waiters.is_empty() {
474 clear |= HAS_WAITERS;
475 }
476
477 if wake_list.is_empty() {
478 clear |= DESIGNATED_WAKER;
481 }
482
483 let mut state = oldstate;
489 loop {
490 match self.state.compare_exchange_weak(
491 state,
492 (state | set_on_release) & !clear,
493 Ordering::Release,
494 Ordering::Relaxed,
495 ) {
496 Ok(_) => break,
497 Err(w) => state = w,
498 }
499 }
500
501 for w in wake_list {
503 w.wake();
504 }
505
506 return;
508 }
509 }
510
511 if spin_count < SPIN_THRESHOLD {
513 cpu_relax(1 << spin_count);
514 spin_count += 1;
515 } else {
516 yield_now();
517 }
518 }
519 }
520
521 fn cancel_waiter(&self, waiter: &Waiter, wake_next: bool) {
522 let mut oldstate = self.state.load(Ordering::Relaxed);
523 while oldstate & SPINLOCK != 0
524 || self
525 .state
526 .compare_exchange_weak(
527 oldstate,
528 oldstate | SPINLOCK,
529 Ordering::Acquire,
530 Ordering::Relaxed,
531 )
532 .is_err()
533 {
534 hint::spin_loop();
535 oldstate = self.state.load(Ordering::Relaxed);
536 }
537
538 let waiters = unsafe { &mut *self.waiters.get() };
542
543 let mut clear = SPINLOCK;
544
545 if wake_next
553 || waiters
554 .front()
555 .get()
556 .map(|front| std::ptr::eq(front, waiter))
557 .unwrap_or(false)
558 {
559 clear |= LONG_WAIT;
560 }
561
562 let waiting_for = waiter.is_waiting_for();
563
564 let old_waiter = if waiter.is_linked() && waiting_for == WaitingFor::Mutex {
566 let mut cursor = unsafe { waiters.cursor_mut_from_ptr(waiter as *const Waiter) };
570 cursor.remove()
571 } else {
572 None
573 };
574
575 let (wake_list, set_on_release) = if wake_next || waiting_for == WaitingFor::None {
576 get_wake_list(waiters)
579 } else {
580 (WaiterList::new(WaiterAdapter::new()), 0)
581 };
582
583 if waiters.is_empty() {
584 clear |= HAS_WAITERS;
585 }
586
587 if wake_list.is_empty() {
588 clear |= DESIGNATED_WAKER;
592 }
593
594 if let WaiterKind::Exclusive = waiter.kind() {
595 clear |= WRITER_WAITING;
599 }
600
601 while self
602 .state
603 .compare_exchange_weak(
604 oldstate,
605 (oldstate & !clear) | set_on_release,
606 Ordering::Release,
607 Ordering::Relaxed,
608 )
609 .is_err()
610 {
611 hint::spin_loop();
612 oldstate = self.state.load(Ordering::Relaxed);
613 }
614
615 for w in wake_list {
616 w.wake();
617 }
618
619 mem::drop(old_waiter);
620 }
621}
622
623#[allow(clippy::undocumented_unsafe_blocks)]
625unsafe impl Send for RawRwLock {}
626#[allow(clippy::undocumented_unsafe_blocks)]
628unsafe impl Sync for RawRwLock {}
629
630fn cancel_waiter(raw: usize, waiter: &Waiter, wake_next: bool) {
631 let raw_rwlock = raw as *const RawRwLock;
632
633 unsafe { (*raw_rwlock).cancel_waiter(waiter, wake_next) }
637}
638
639#[repr(align(128))]
704pub struct RwLock<T: ?Sized> {
705 raw: RawRwLock,
706 value: UnsafeCell<T>,
707}
708
709impl<T> RwLock<T> {
710 pub fn new(v: T) -> RwLock<T> {
712 RwLock {
713 raw: RawRwLock::new(),
714 value: UnsafeCell::new(v),
715 }
716 }
717
718 pub fn into_inner(self) -> T {
722 self.value.into_inner()
725 }
726}
727
728impl<T: ?Sized> RwLock<T> {
729 #[inline]
740 pub async fn lock(&self) -> RwLockWriteGuard<'_, T> {
741 self.raw.lock().await;
742
743 RwLockWriteGuard {
744 mu: self,
745 value: unsafe { &mut *self.value.get() },
748 }
749 }
750
751 #[inline]
763 pub async fn read_lock(&self) -> RwLockReadGuard<'_, T> {
764 self.raw.read_lock().await;
765
766 RwLockReadGuard {
767 mu: self,
768 value: unsafe { &*self.value.get() },
771 }
772 }
773
774 #[inline]
776 pub(crate) async fn lock_from_cv(&self) -> RwLockWriteGuard<'_, T> {
777 self.raw.lock_slow::<Exclusive>(DESIGNATED_WAKER, 0).await;
778
779 RwLockWriteGuard {
780 mu: self,
781 value: unsafe { &mut *self.value.get() },
784 }
785 }
786
787 #[inline]
789 pub(crate) async fn read_lock_from_cv(&self) -> RwLockReadGuard<'_, T> {
790 self.raw
793 .lock_slow::<Shared>(DESIGNATED_WAKER, WRITER_WAITING)
794 .await;
795
796 RwLockReadGuard {
797 mu: self,
798 value: unsafe { &*self.value.get() },
801 }
802 }
803
804 #[inline]
805 fn unlock(&self) {
806 self.raw.unlock();
807 }
808
809 #[inline]
810 fn read_unlock(&self) {
811 self.raw.read_unlock();
812 }
813
814 pub fn get_mut(&mut self) -> &mut T {
815 unsafe { &mut *self.value.get() }
819 }
820}
821
822#[allow(clippy::undocumented_unsafe_blocks)]
824unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
825#[allow(clippy::undocumented_unsafe_blocks)]
827unsafe impl<T: ?Sized + Send> Sync for RwLock<T> {}
828
829impl<T: Default> Default for RwLock<T> {
830 fn default() -> Self {
831 Self::new(Default::default())
832 }
833}
834
835impl<T> From<T> for RwLock<T> {
836 fn from(source: T) -> Self {
837 Self::new(source)
838 }
839}
840
841pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> {
845 mu: &'a RwLock<T>,
846 value: &'a mut T,
847}
848
849impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
850 pub(crate) fn into_inner(self) -> &'a RwLock<T> {
851 self.mu
852 }
853
854 pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
855 &self.mu.raw
856 }
857}
858
859impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> {
860 type Target = T;
861
862 fn deref(&self) -> &Self::Target {
863 self.value
864 }
865}
866
867impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> {
868 fn deref_mut(&mut self) -> &mut Self::Target {
869 self.value
870 }
871}
872
873impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
874 fn drop(&mut self) {
875 self.mu.unlock()
876 }
877}
878
879pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
883 mu: &'a RwLock<T>,
884 value: &'a T,
885}
886
887impl<'a, T: ?Sized> RwLockReadGuard<'a, T> {
888 pub(crate) fn into_inner(self) -> &'a RwLock<T> {
889 self.mu
890 }
891
892 pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
893 &self.mu.raw
894 }
895}
896
897impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
898 type Target = T;
899
900 fn deref(&self) -> &Self::Target {
901 self.value
902 }
903}
904
905impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
906 fn drop(&mut self) {
907 self.mu.read_unlock()
908 }
909}
910
911#[cfg(any(target_os = "android", target_os = "linux"))]
913#[cfg(test)]
914mod test {
915 use std::future::Future;
916 use std::mem;
917 use std::pin::Pin;
918 use std::rc::Rc;
919 use std::sync::atomic::AtomicUsize;
920 use std::sync::atomic::Ordering;
921 use std::sync::mpsc::channel;
922 use std::sync::mpsc::Sender;
923 use std::sync::Arc;
924 use std::task::Context;
925 use std::task::Poll;
926 use std::task::Waker;
927 use std::thread;
928 use std::time::Duration;
929
930 use futures::channel::oneshot;
931 use futures::pending;
932 use futures::select;
933 use futures::task::waker_ref;
934 use futures::task::ArcWake;
935 use futures::FutureExt;
936 use futures_executor::LocalPool;
937 use futures_executor::ThreadPool;
938 use futures_util::task::LocalSpawnExt;
939
940 use super::super::super::block_on;
941 use super::super::super::sync::Condvar;
942 use super::super::super::sync::SpinLock;
943 use super::*;
944
945 #[derive(Debug, Eq, PartialEq)]
946 struct NonCopy(u32);
947
948 struct TestWaker;
950 impl ArcWake for TestWaker {
951 fn wake_by_ref(_arc_self: &Arc<Self>) {}
952 }
953
954 #[test]
955 fn it_works() {
956 let mu = RwLock::new(NonCopy(13));
957
958 assert_eq!(*block_on(mu.lock()), NonCopy(13));
959 }
960
961 #[test]
962 fn smoke() {
963 let mu = RwLock::new(NonCopy(7));
964
965 mem::drop(block_on(mu.lock()));
966 mem::drop(block_on(mu.lock()));
967 }
968
969 #[test]
970 fn rw_smoke() {
971 let mu = RwLock::new(NonCopy(7));
972
973 mem::drop(block_on(mu.lock()));
974 mem::drop(block_on(mu.read_lock()));
975 mem::drop((block_on(mu.read_lock()), block_on(mu.read_lock())));
976 mem::drop(block_on(mu.lock()));
977 }
978
979 #[test]
980 fn async_smoke() {
981 async fn lock(mu: Rc<RwLock<NonCopy>>) {
982 mu.lock().await;
983 }
984
985 async fn read_lock(mu: Rc<RwLock<NonCopy>>) {
986 mu.read_lock().await;
987 }
988
989 async fn double_read_lock(mu: Rc<RwLock<NonCopy>>) {
990 let first = mu.read_lock().await;
991 mu.read_lock().await;
992
993 first.as_raw_rwlock();
995 }
996
997 let mu = Rc::new(RwLock::new(NonCopy(7)));
998
999 let mut ex = LocalPool::new();
1000 let spawner = ex.spawner();
1001
1002 spawner
1003 .spawn_local(lock(Rc::clone(&mu)))
1004 .expect("Failed to spawn future");
1005 spawner
1006 .spawn_local(read_lock(Rc::clone(&mu)))
1007 .expect("Failed to spawn future");
1008 spawner
1009 .spawn_local(double_read_lock(Rc::clone(&mu)))
1010 .expect("Failed to spawn future");
1011 spawner
1012 .spawn_local(lock(Rc::clone(&mu)))
1013 .expect("Failed to spawn future");
1014
1015 ex.run();
1016 }
1017
1018 #[test]
1019 fn send() {
1020 let mu = RwLock::new(NonCopy(19));
1021
1022 thread::spawn(move || {
1023 let value = block_on(mu.lock());
1024 assert_eq!(*value, NonCopy(19));
1025 })
1026 .join()
1027 .unwrap();
1028 }
1029
1030 #[test]
1031 fn arc_nested() {
1032 let mu = RwLock::new(1);
1034 let arc = Arc::new(RwLock::new(mu));
1035 thread::spawn(move || {
1036 let nested = block_on(arc.lock());
1037 let lock2 = block_on(nested.lock());
1038 assert_eq!(*lock2, 1);
1039 })
1040 .join()
1041 .unwrap();
1042 }
1043
1044 #[test]
1045 fn arc_access_in_unwind() {
1046 let arc = Arc::new(RwLock::new(1));
1047 let arc2 = arc.clone();
1048 thread::spawn(move || {
1049 struct Unwinder {
1050 i: Arc<RwLock<i32>>,
1051 }
1052 impl Drop for Unwinder {
1053 fn drop(&mut self) {
1054 *block_on(self.i.lock()) += 1;
1055 }
1056 }
1057 let _u = Unwinder { i: arc2 };
1058 panic!();
1059 })
1060 .join()
1061 .expect_err("thread did not panic");
1062 let lock = block_on(arc.lock());
1063 assert_eq!(*lock, 2);
1064 }
1065
1066 #[test]
1067 fn unsized_value() {
1068 let rwlock: &RwLock<[i32]> = &RwLock::new([1, 2, 3]);
1069 {
1070 let b = &mut *block_on(rwlock.lock());
1071 b[0] = 4;
1072 b[2] = 5;
1073 }
1074 let expected: &[i32] = &[4, 2, 5];
1075 assert_eq!(&*block_on(rwlock.lock()), expected);
1076 }
1077 #[test]
1078 fn high_contention() {
1079 const THREADS: usize = 17;
1080 const ITERATIONS: usize = 103;
1081
1082 let mut threads = Vec::with_capacity(THREADS);
1083
1084 let mu = Arc::new(RwLock::new(0usize));
1085 for _ in 0..THREADS {
1086 let mu2 = mu.clone();
1087 threads.push(thread::spawn(move || {
1088 for _ in 0..ITERATIONS {
1089 *block_on(mu2.lock()) += 1;
1090 }
1091 }));
1092 }
1093
1094 for t in threads.into_iter() {
1095 t.join().unwrap();
1096 }
1097
1098 assert_eq!(*block_on(mu.read_lock()), THREADS * ITERATIONS);
1099 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1100 }
1101
1102 #[test]
1103 fn high_contention_with_cancel() {
1104 const TASKS: usize = 17;
1105 const ITERATIONS: usize = 103;
1106
1107 async fn increment(mu: Arc<RwLock<usize>>, alt_mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1108 for _ in 0..ITERATIONS {
1109 select! {
1110 mut count = mu.lock().fuse() => *count += 1,
1111 mut count = alt_mu.lock().fuse() => *count += 1,
1112 }
1113 }
1114 tx.send(()).expect("Failed to send completion signal");
1115 }
1116
1117 let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1118
1119 let mu = Arc::new(RwLock::new(0usize));
1120 let alt_mu = Arc::new(RwLock::new(0usize));
1121
1122 let (tx, rx) = channel();
1123 for _ in 0..TASKS {
1124 ex.spawn_ok(increment(Arc::clone(&mu), Arc::clone(&alt_mu), tx.clone()));
1125 }
1126
1127 for _ in 0..TASKS {
1128 if let Err(e) = rx.recv_timeout(Duration::from_secs(10)) {
1129 panic!("Error while waiting for threads to complete: {e}");
1130 }
1131 }
1132
1133 assert_eq!(
1134 *block_on(mu.read_lock()) + *block_on(alt_mu.read_lock()),
1135 TASKS * ITERATIONS
1136 );
1137 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1138 assert_eq!(alt_mu.raw.state.load(Ordering::Relaxed), 0);
1139 }
1140
1141 #[test]
1142 fn single_thread_async() {
1143 const TASKS: usize = 17;
1144 const ITERATIONS: usize = 103;
1145
1146 async fn increment(mu: Rc<RwLock<usize>>) {
1148 for _ in 0..ITERATIONS {
1149 *mu.lock().await += 1;
1150 }
1151 }
1152
1153 let mut ex = LocalPool::new();
1154 let spawner = ex.spawner();
1155
1156 let mu = Rc::new(RwLock::new(0usize));
1157 for _ in 0..TASKS {
1158 spawner
1159 .spawn_local(increment(Rc::clone(&mu)))
1160 .expect("Failed to spawn task");
1161 }
1162
1163 ex.run();
1164
1165 assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1166 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1167 }
1168
1169 #[test]
1170 fn multi_thread_async() {
1171 const TASKS: usize = 17;
1172 const ITERATIONS: usize = 103;
1173
1174 async fn increment(mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1176 for _ in 0..ITERATIONS {
1177 *mu.lock().await += 1;
1178 }
1179 tx.send(()).expect("Failed to send completion signal");
1180 }
1181
1182 let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1183
1184 let mu = Arc::new(RwLock::new(0usize));
1185 let (tx, rx) = channel();
1186 for _ in 0..TASKS {
1187 ex.spawn_ok(increment(Arc::clone(&mu), tx.clone()));
1188 }
1189
1190 for _ in 0..TASKS {
1191 rx.recv_timeout(Duration::from_secs(5))
1192 .expect("Failed to receive completion signal");
1193 }
1194 assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1195 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1196 }
1197
1198 #[test]
1199 fn get_mut() {
1200 let mut mu = RwLock::new(NonCopy(13));
1201 *mu.get_mut() = NonCopy(17);
1202
1203 assert_eq!(mu.into_inner(), NonCopy(17));
1204 }
1205
1206 #[test]
1207 fn into_inner() {
1208 let mu = RwLock::new(NonCopy(29));
1209 assert_eq!(mu.into_inner(), NonCopy(29));
1210 }
1211
1212 #[test]
1213 fn into_inner_drop() {
1214 struct NeedsDrop(Arc<AtomicUsize>);
1215 impl Drop for NeedsDrop {
1216 fn drop(&mut self) {
1217 self.0.fetch_add(1, Ordering::AcqRel);
1218 }
1219 }
1220
1221 let value = Arc::new(AtomicUsize::new(0));
1222 let needs_drop = RwLock::new(NeedsDrop(value.clone()));
1223 assert_eq!(value.load(Ordering::Acquire), 0);
1224
1225 {
1226 let inner = needs_drop.into_inner();
1227 assert_eq!(inner.0.load(Ordering::Acquire), 0);
1228 }
1229
1230 assert_eq!(value.load(Ordering::Acquire), 1);
1231 }
1232
1233 #[test]
1234 fn rw_arc() {
1235 const THREADS: isize = 7;
1236 const ITERATIONS: isize = 13;
1237
1238 let mu = Arc::new(RwLock::new(0isize));
1239 let mu2 = mu.clone();
1240
1241 let (tx, rx) = channel();
1242 thread::spawn(move || {
1243 let mut guard = block_on(mu2.lock());
1244 for _ in 0..ITERATIONS {
1245 let tmp = *guard;
1246 *guard = -1;
1247 thread::yield_now();
1248 *guard = tmp + 1;
1249 }
1250 tx.send(()).unwrap();
1251 });
1252
1253 let mut readers = Vec::with_capacity(10);
1254 for _ in 0..THREADS {
1255 let mu3 = mu.clone();
1256 let handle = thread::spawn(move || {
1257 let guard = block_on(mu3.read_lock());
1258 assert!(*guard >= 0);
1259 });
1260
1261 readers.push(handle);
1262 }
1263
1264 for r in readers {
1266 r.join().expect("One or more readers saw a negative value");
1267 }
1268
1269 rx.recv_timeout(Duration::from_secs(5)).unwrap();
1271 assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1272 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1273 }
1274
1275 #[test]
1276 fn rw_single_thread_async() {
1277 struct TestFuture {
1280 polled: bool,
1281 waker: Arc<SpinLock<Option<Waker>>>,
1282 }
1283
1284 impl Future for TestFuture {
1285 type Output = ();
1286
1287 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1288 if self.polled {
1289 Poll::Ready(())
1290 } else {
1291 self.polled = true;
1292 *self.waker.lock() = Some(cx.waker().clone());
1293 Poll::Pending
1294 }
1295 }
1296 }
1297
1298 fn wake_future(waker: Arc<SpinLock<Option<Waker>>>) {
1299 loop {
1300 if let Some(w) = waker.lock().take() {
1301 w.wake();
1302 return;
1303 }
1304
1305 thread::sleep(Duration::from_millis(10));
1308 }
1309 }
1310
1311 async fn writer(mu: Rc<RwLock<isize>>) {
1312 let mut guard = mu.lock().await;
1313 for _ in 0..ITERATIONS {
1314 let tmp = *guard;
1315 *guard = -1;
1316 let waker = Arc::new(SpinLock::new(None));
1317 let waker2 = Arc::clone(&waker);
1318 thread::spawn(move || wake_future(waker2));
1319 let fut = TestFuture {
1320 polled: false,
1321 waker,
1322 };
1323 fut.await;
1324 *guard = tmp + 1;
1325 }
1326 }
1327
1328 async fn reader(mu: Rc<RwLock<isize>>) {
1329 let guard = mu.read_lock().await;
1330 assert!(*guard >= 0);
1331 }
1332
1333 const TASKS: isize = 7;
1334 const ITERATIONS: isize = 13;
1335
1336 let mu = Rc::new(RwLock::new(0isize));
1337 let mut ex = LocalPool::new();
1338 let spawner = ex.spawner();
1339
1340 spawner
1341 .spawn_local(writer(Rc::clone(&mu)))
1342 .expect("Failed to spawn writer");
1343
1344 for _ in 0..TASKS {
1345 spawner
1346 .spawn_local(reader(Rc::clone(&mu)))
1347 .expect("Failed to spawn reader");
1348 }
1349
1350 ex.run();
1351
1352 assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1353 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1354 }
1355
1356 #[test]
1357 fn rw_multi_thread_async() {
1358 async fn writer(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1359 let mut guard = mu.lock().await;
1360 for _ in 0..ITERATIONS {
1361 let tmp = *guard;
1362 *guard = -1;
1363 thread::yield_now();
1364 *guard = tmp + 1;
1365 }
1366
1367 mem::drop(guard);
1368 tx.send(()).unwrap();
1369 }
1370
1371 async fn reader(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1372 let guard = mu.read_lock().await;
1373 assert!(*guard >= 0);
1374
1375 mem::drop(guard);
1376 tx.send(()).expect("Failed to send completion message");
1377 }
1378
1379 const TASKS: isize = 7;
1380 const ITERATIONS: isize = 13;
1381
1382 let mu = Arc::new(RwLock::new(0isize));
1383 let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1384
1385 let (txw, rxw) = channel();
1386 ex.spawn_ok(writer(Arc::clone(&mu), txw));
1387
1388 let (txr, rxr) = channel();
1389 for _ in 0..TASKS {
1390 ex.spawn_ok(reader(Arc::clone(&mu), txr.clone()));
1391 }
1392
1393 for _ in 0..TASKS {
1395 rxr.recv_timeout(Duration::from_secs(5))
1396 .expect("Failed to receive completion message from reader");
1397 }
1398
1399 rxw.recv_timeout(Duration::from_secs(5))
1401 .expect("Failed to receive completion message from writer");
1402
1403 assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1404 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1405 }
1406
1407 #[test]
1408 fn wake_all_readers() {
1409 async fn read(mu: Arc<RwLock<()>>) {
1410 let g = mu.read_lock().await;
1411 pending!();
1412 mem::drop(g);
1413 }
1414
1415 async fn write(mu: Arc<RwLock<()>>) {
1416 mu.lock().await;
1417 }
1418
1419 let mu = Arc::new(RwLock::new(()));
1420 let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
1421 Box::pin(read(mu.clone())),
1422 Box::pin(read(mu.clone())),
1423 Box::pin(read(mu.clone())),
1424 Box::pin(write(mu.clone())),
1425 Box::pin(read(mu.clone())),
1426 ];
1427 const NUM_READERS: usize = 4;
1428
1429 let arc_waker = Arc::new(TestWaker);
1430 let waker = waker_ref(&arc_waker);
1431 let mut cx = Context::from_waker(&waker);
1432
1433 let g = block_on(mu.lock());
1435
1436 for r in &mut futures {
1437 if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1438 panic!("future unexpectedly ready");
1439 }
1440 }
1441
1442 assert_eq!(
1443 mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1444 HAS_WAITERS
1445 );
1446
1447 assert_eq!(
1448 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1449 WRITER_WAITING
1450 );
1451
1452 mem::drop(g);
1455 for r in &mut futures {
1456 if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1457 panic!("future unexpectedly ready");
1458 }
1459 }
1460
1461 assert_eq!(
1463 mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1464 READ_LOCK * NUM_READERS
1465 );
1466 assert_eq!(
1467 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1468 WRITER_WAITING
1469 );
1470
1471 let mut needs_poll = None;
1472
1473 for (i, r) in futures.iter_mut().enumerate() {
1475 match r.as_mut().poll(&mut cx) {
1476 Poll::Ready(()) => {}
1477 Poll::Pending => {
1478 if needs_poll.is_some() {
1479 panic!("More than one future unable to complete");
1480 }
1481 needs_poll = Some(i);
1482 }
1483 }
1484 }
1485
1486 if futures[needs_poll.expect("Writer unexpectedly able to complete")]
1487 .as_mut()
1488 .poll(&mut cx)
1489 .is_pending()
1490 {
1491 panic!("Writer unable to complete");
1492 }
1493
1494 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1495 }
1496
1497 #[test]
1498 fn long_wait() {
1499 async fn tight_loop(mu: Arc<RwLock<bool>>) {
1500 loop {
1501 let ready = mu.lock().await;
1502 if *ready {
1503 break;
1504 }
1505 pending!();
1506 }
1507 }
1508
1509 async fn mark_ready(mu: Arc<RwLock<bool>>) {
1510 *mu.lock().await = true;
1511 }
1512
1513 let mu = Arc::new(RwLock::new(false));
1514 let mut tl = Box::pin(tight_loop(mu.clone()));
1515 let mut mark = Box::pin(mark_ready(mu.clone()));
1516
1517 let arc_waker = Arc::new(TestWaker);
1518 let waker = waker_ref(&arc_waker);
1519 let mut cx = Context::from_waker(&waker);
1520
1521 for _ in 0..=LONG_WAIT_THRESHOLD {
1522 if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1523 panic!("tight_loop unexpectedly ready");
1524 }
1525
1526 if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1527 panic!("mark_ready unexpectedly ready");
1528 }
1529 }
1530
1531 assert_eq!(
1532 mu.raw.state.load(Ordering::Relaxed),
1533 LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1534 );
1535
1536 if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1538 panic!("tight_loop unexpectedly ready");
1539 }
1540
1541 if mark.as_mut().poll(&mut cx).is_pending() {
1543 panic!("mark_ready not able to make progress");
1544 }
1545
1546 if tl.as_mut().poll(&mut cx).is_pending() {
1548 panic!("tight_loop not able to finish");
1549 }
1550
1551 assert!(*block_on(mu.lock()));
1552 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1553 }
1554
1555 #[test]
1556 fn cancel_long_wait_before_wake() {
1557 async fn tight_loop(mu: Arc<RwLock<bool>>) {
1558 loop {
1559 let ready = mu.lock().await;
1560 if *ready {
1561 break;
1562 }
1563 pending!();
1564 }
1565 }
1566
1567 async fn mark_ready(mu: Arc<RwLock<bool>>) {
1568 *mu.lock().await = true;
1569 }
1570
1571 let mu = Arc::new(RwLock::new(false));
1572 let mut tl = Box::pin(tight_loop(mu.clone()));
1573 let mut mark = Box::pin(mark_ready(mu.clone()));
1574
1575 let arc_waker = Arc::new(TestWaker);
1576 let waker = waker_ref(&arc_waker);
1577 let mut cx = Context::from_waker(&waker);
1578
1579 for _ in 0..=LONG_WAIT_THRESHOLD {
1580 if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1581 panic!("tight_loop unexpectedly ready");
1582 }
1583
1584 if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1585 panic!("mark_ready unexpectedly ready");
1586 }
1587 }
1588
1589 assert_eq!(
1590 mu.raw.state.load(Ordering::Relaxed),
1591 LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1592 );
1593
1594 mem::drop(mark);
1596 assert_eq!(mu.raw.state.load(Ordering::Relaxed), LOCKED);
1597
1598 mem::drop(tl);
1599 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1600 }
1601
1602 #[test]
1603 fn cancel_long_wait_after_wake() {
1604 async fn tight_loop(mu: Arc<RwLock<bool>>) {
1605 loop {
1606 let ready = mu.lock().await;
1607 if *ready {
1608 break;
1609 }
1610 pending!();
1611 }
1612 }
1613
1614 async fn mark_ready(mu: Arc<RwLock<bool>>) {
1615 *mu.lock().await = true;
1616 }
1617
1618 let mu = Arc::new(RwLock::new(false));
1619 let mut tl = Box::pin(tight_loop(mu.clone()));
1620 let mut mark = Box::pin(mark_ready(mu.clone()));
1621
1622 let arc_waker = Arc::new(TestWaker);
1623 let waker = waker_ref(&arc_waker);
1624 let mut cx = Context::from_waker(&waker);
1625
1626 for _ in 0..=LONG_WAIT_THRESHOLD {
1627 if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1628 panic!("tight_loop unexpectedly ready");
1629 }
1630
1631 if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1632 panic!("mark_ready unexpectedly ready");
1633 }
1634 }
1635
1636 assert_eq!(
1637 mu.raw.state.load(Ordering::Relaxed),
1638 LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1639 );
1640
1641 if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1643 panic!("tight_loop unexpectedly ready");
1644 }
1645
1646 mem::drop(mark);
1648 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & LONG_WAIT, 0);
1649
1650 block_on(mark_ready(mu.clone()));
1652
1653 if tl.as_mut().poll(&mut cx).is_pending() {
1655 panic!("tight_loop not able to finish");
1656 }
1657
1658 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1659 }
1660
1661 #[test]
1662 fn designated_waker() {
1663 async fn inc(mu: Arc<RwLock<usize>>) {
1664 *mu.lock().await += 1;
1665 }
1666
1667 let mu = Arc::new(RwLock::new(0));
1668
1669 let mut futures = [
1670 Box::pin(inc(mu.clone())),
1671 Box::pin(inc(mu.clone())),
1672 Box::pin(inc(mu.clone())),
1673 ];
1674
1675 let arc_waker = Arc::new(TestWaker);
1676 let waker = waker_ref(&arc_waker);
1677 let mut cx = Context::from_waker(&waker);
1678
1679 let count = block_on(mu.lock());
1680
1681 if let Poll::Ready(()) = futures[0].as_mut().poll(&mut cx) {
1684 panic!("future unexpectedly ready");
1685 }
1686 if let Poll::Ready(()) = futures[1].as_mut().poll(&mut cx) {
1687 panic!("future unexpectedly ready");
1688 }
1689
1690 assert_eq!(
1691 mu.raw.state.load(Ordering::Relaxed),
1692 LOCKED | HAS_WAITERS | WRITER_WAITING,
1693 );
1694
1695 mem::drop(count);
1698
1699 assert_eq!(
1700 mu.raw.state.load(Ordering::Relaxed),
1701 DESIGNATED_WAKER | HAS_WAITERS | WRITER_WAITING,
1702 );
1703
1704 if futures[2].as_mut().poll(&mut cx).is_pending() {
1706 panic!("future unable to complete");
1707 }
1708 assert_eq!(*block_on(mu.lock()), 1);
1709
1710 assert_eq!(
1713 mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1714 DESIGNATED_WAKER
1715 );
1716 assert_eq!(
1717 mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1718 HAS_WAITERS
1719 );
1720
1721 if futures[0].as_mut().poll(&mut cx).is_pending() {
1723 panic!("future unable to complete");
1724 }
1725 assert_eq!(*block_on(mu.lock()), 2);
1726
1727 if futures[1].as_mut().poll(&mut cx).is_pending() {
1728 panic!("future unable to complete");
1729 }
1730 assert_eq!(*block_on(mu.lock()), 3);
1731
1732 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1733 }
1734
1735 #[test]
1736 fn cancel_designated_waker() {
1737 async fn inc(mu: Arc<RwLock<usize>>) {
1738 *mu.lock().await += 1;
1739 }
1740
1741 let mu = Arc::new(RwLock::new(0));
1742
1743 let mut fut = Box::pin(inc(mu.clone()));
1744
1745 let arc_waker = Arc::new(TestWaker);
1746 let waker = waker_ref(&arc_waker);
1747 let mut cx = Context::from_waker(&waker);
1748
1749 let count = block_on(mu.lock());
1750
1751 if let Poll::Ready(()) = fut.as_mut().poll(&mut cx) {
1752 panic!("Future unexpectedly ready when lock is held");
1753 }
1754
1755 mem::drop(count);
1757
1758 mem::drop(fut);
1760
1761 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1762 }
1763
1764 #[test]
1765 fn cancel_before_wake() {
1766 async fn inc(mu: Arc<RwLock<usize>>) {
1767 *mu.lock().await += 1;
1768 }
1769
1770 let mu = Arc::new(RwLock::new(0));
1771
1772 let mut fut1 = Box::pin(inc(mu.clone()));
1773
1774 let mut fut2 = Box::pin(inc(mu.clone()));
1775
1776 let arc_waker = Arc::new(TestWaker);
1777 let waker = waker_ref(&arc_waker);
1778 let mut cx = Context::from_waker(&waker);
1779
1780 let count = block_on(mu.lock());
1782
1783 match fut1.as_mut().poll(&mut cx) {
1786 Poll::Pending => {}
1787 Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1788 }
1789
1790 match fut2.as_mut().poll(&mut cx) {
1791 Poll::Pending => {}
1792 Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1793 }
1794
1795 assert_eq!(
1796 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1797 WRITER_WAITING
1798 );
1799
1800 mem::drop(fut1);
1802
1803 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER, 0);
1805
1806 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1808
1809 match fut2.as_mut().poll(&mut cx) {
1810 Poll::Pending => {}
1811 Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1812 }
1813
1814 mem::drop(count);
1816
1817 match fut2.as_mut().poll(&mut cx) {
1818 Poll::Pending => panic!("Future is not ready to make progress"),
1819 Poll::Ready(()) => {}
1820 }
1821
1822 assert_eq!(*block_on(mu.lock()), 1);
1824 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1825 }
1826
1827 #[test]
1828 fn cancel_after_wake() {
1829 async fn inc(mu: Arc<RwLock<usize>>) {
1830 *mu.lock().await += 1;
1831 }
1832
1833 let mu = Arc::new(RwLock::new(0));
1834
1835 let mut fut1 = Box::pin(inc(mu.clone()));
1836
1837 let mut fut2 = Box::pin(inc(mu.clone()));
1838
1839 let arc_waker = Arc::new(TestWaker);
1840 let waker = waker_ref(&arc_waker);
1841 let mut cx = Context::from_waker(&waker);
1842
1843 let count = block_on(mu.lock());
1845
1846 match fut1.as_mut().poll(&mut cx) {
1849 Poll::Pending => {}
1850 Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1851 }
1852
1853 match fut2.as_mut().poll(&mut cx) {
1854 Poll::Pending => {}
1855 Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1856 }
1857
1858 assert_eq!(
1859 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1860 WRITER_WAITING
1861 );
1862
1863 mem::drop(count);
1865
1866 mem::drop(fut1);
1868
1869 assert_eq!(
1872 mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1873 DESIGNATED_WAKER
1874 );
1875
1876 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1878
1879 match fut2.as_mut().poll(&mut cx) {
1880 Poll::Pending => panic!("Future is not ready to make progress"),
1881 Poll::Ready(()) => {}
1882 }
1883
1884 assert_eq!(*block_on(mu.lock()), 1);
1886 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1887 }
1888
1889 #[test]
1890 fn timeout() {
1891 async fn timed_lock(timer: oneshot::Receiver<()>, mu: Arc<RwLock<()>>) {
1892 select! {
1893 res = timer.fuse() => {
1894 match res {
1895 Ok(()) => {},
1896 Err(e) => panic!("Timer unexpectedly canceled: {e}"),
1897 }
1898 }
1899 _ = mu.lock().fuse() => panic!("Successfuly acquired lock"),
1900 }
1901 }
1902
1903 let mu = Arc::new(RwLock::new(()));
1904 let (tx, rx) = oneshot::channel();
1905
1906 let mut timeout = Box::pin(timed_lock(rx, mu.clone()));
1907
1908 let arc_waker = Arc::new(TestWaker);
1909 let waker = waker_ref(&arc_waker);
1910 let mut cx = Context::from_waker(&waker);
1911
1912 let g = block_on(mu.lock());
1914
1915 if let Poll::Ready(()) = timeout.as_mut().poll(&mut cx) {
1917 panic!("timed_lock unexpectedly ready");
1918 }
1919
1920 assert_eq!(
1921 mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1922 HAS_WAITERS
1923 );
1924
1925 tx.send(()).expect("Failed to send wakeup");
1927
1928 if timeout.as_mut().poll(&mut cx).is_pending() {
1930 panic!("timed_lock not ready after timeout");
1931 }
1932
1933 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
1935
1936 mem::drop(g);
1937
1938 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1939 }
1940
1941 #[test]
1942 fn writer_waiting() {
1943 async fn read_zero(mu: Arc<RwLock<usize>>) {
1944 let val = mu.read_lock().await;
1945 pending!();
1946
1947 assert_eq!(*val, 0);
1948 }
1949
1950 async fn inc(mu: Arc<RwLock<usize>>) {
1951 *mu.lock().await += 1;
1952 }
1953
1954 async fn read_one(mu: Arc<RwLock<usize>>) {
1955 let val = mu.read_lock().await;
1956
1957 assert_eq!(*val, 1);
1958 }
1959
1960 let mu = Arc::new(RwLock::new(0));
1961
1962 let mut r1 = Box::pin(read_zero(mu.clone()));
1963 let mut r2 = Box::pin(read_zero(mu.clone()));
1964
1965 let mut w = Box::pin(inc(mu.clone()));
1966 let mut r3 = Box::pin(read_one(mu.clone()));
1967
1968 let arc_waker = Arc::new(TestWaker);
1969 let waker = waker_ref(&arc_waker);
1970 let mut cx = Context::from_waker(&waker);
1971
1972 if let Poll::Ready(()) = r1.as_mut().poll(&mut cx) {
1973 panic!("read_zero unexpectedly ready");
1974 }
1975 if let Poll::Ready(()) = r2.as_mut().poll(&mut cx) {
1976 panic!("read_zero unexpectedly ready");
1977 }
1978 assert_eq!(
1979 mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1980 2 * READ_LOCK
1981 );
1982
1983 if let Poll::Ready(()) = w.as_mut().poll(&mut cx) {
1984 panic!("inc unexpectedly ready");
1985 }
1986 assert_eq!(
1987 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1988 WRITER_WAITING
1989 );
1990
1991 if let Poll::Ready(()) = r3.as_mut().poll(&mut cx) {
1993 panic!("read_one unexpectedly ready");
1994 }
1995 assert_eq!(
1996 mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1997 2 * READ_LOCK
1998 );
1999
2000 if r1.as_mut().poll(&mut cx).is_pending() {
2001 panic!("read_zero unable to complete");
2002 }
2003 if r2.as_mut().poll(&mut cx).is_pending() {
2004 panic!("read_zero unable to complete");
2005 }
2006 if w.as_mut().poll(&mut cx).is_pending() {
2007 panic!("inc unable to complete");
2008 }
2009 if r3.as_mut().poll(&mut cx).is_pending() {
2010 panic!("read_one unable to complete");
2011 }
2012
2013 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2014 }
2015
2016 #[test]
2017 fn notify_one() {
2018 async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2019 let mut count = mu.read_lock().await;
2020 while *count == 0 {
2021 count = cv.wait_read(count).await;
2022 }
2023 }
2024
2025 async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2026 let mut count = mu.lock().await;
2027 while *count == 0 {
2028 count = cv.wait(count).await;
2029 }
2030
2031 *count -= 1;
2032 }
2033
2034 let mu = Arc::new(RwLock::new(0));
2035 let cv = Arc::new(Condvar::new());
2036
2037 let arc_waker = Arc::new(TestWaker);
2038 let waker = waker_ref(&arc_waker);
2039 let mut cx = Context::from_waker(&waker);
2040
2041 let mut readers = [
2042 Box::pin(read(mu.clone(), cv.clone())),
2043 Box::pin(read(mu.clone(), cv.clone())),
2044 Box::pin(read(mu.clone(), cv.clone())),
2045 Box::pin(read(mu.clone(), cv.clone())),
2046 ];
2047 let mut writer = Box::pin(write(mu.clone(), cv.clone()));
2048
2049 for r in &mut readers {
2050 if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
2051 panic!("reader unexpectedly ready");
2052 }
2053 }
2054 if let Poll::Ready(()) = writer.as_mut().poll(&mut cx) {
2055 panic!("writer unexpectedly ready");
2056 }
2057
2058 let mut count = block_on(mu.lock());
2059 *count = 1;
2060
2061 cv.notify_one();
2063
2064 for r in &mut readers {
2066 if r.as_mut().poll(&mut cx).is_ready() {
2067 panic!("reader unexpectedly ready");
2068 }
2069 }
2070
2071 if writer.as_mut().poll(&mut cx).is_ready() {
2072 panic!("writer unexpectedly ready");
2073 }
2074
2075 assert_eq!(
2076 mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
2077 HAS_WAITERS
2078 );
2079 assert_eq!(
2080 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
2081 WRITER_WAITING
2082 );
2083
2084 mem::drop(count);
2085
2086 assert_eq!(
2087 mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2088 HAS_WAITERS | WRITER_WAITING
2089 );
2090
2091 for r in &mut readers {
2092 if r.as_mut().poll(&mut cx).is_pending() {
2093 panic!("reader unable to complete");
2094 }
2095 }
2096
2097 if writer.as_mut().poll(&mut cx).is_pending() {
2098 panic!("writer unable to complete");
2099 }
2100
2101 assert_eq!(*block_on(mu.read_lock()), 0);
2102 }
2103
2104 #[test]
2105 fn notify_when_unlocked() {
2106 async fn dec(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2107 let mut count = mu.lock().await;
2108
2109 while *count == 0 {
2110 count = cv.wait(count).await;
2111 }
2112
2113 *count -= 1;
2114 }
2115
2116 let mu = Arc::new(RwLock::new(0));
2117 let cv = Arc::new(Condvar::new());
2118
2119 let arc_waker = Arc::new(TestWaker);
2120 let waker = waker_ref(&arc_waker);
2121 let mut cx = Context::from_waker(&waker);
2122
2123 let mut futures = [
2124 Box::pin(dec(mu.clone(), cv.clone())),
2125 Box::pin(dec(mu.clone(), cv.clone())),
2126 Box::pin(dec(mu.clone(), cv.clone())),
2127 Box::pin(dec(mu.clone(), cv.clone())),
2128 ];
2129
2130 for f in &mut futures {
2131 if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2132 panic!("future unexpectedly ready");
2133 }
2134 }
2135
2136 *block_on(mu.lock()) = futures.len();
2137 cv.notify_all();
2138
2139 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2141
2142 for f in &mut futures {
2143 if f.as_mut().poll(&mut cx).is_pending() {
2144 panic!("future unexpectedly ready");
2145 }
2146 }
2147 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2148 }
2149
2150 #[test]
2151 fn notify_reader_writer() {
2152 async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2153 let mut count = mu.read_lock().await;
2154 while *count == 0 {
2155 count = cv.wait_read(count).await;
2156 }
2157
2158 pending!();
2161 }
2162
2163 async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2164 let mut count = mu.lock().await;
2165 while *count == 0 {
2166 count = cv.wait(count).await;
2167 }
2168
2169 *count -= 1;
2170 }
2171
2172 async fn lock(mu: Arc<RwLock<usize>>) {
2173 mem::drop(mu.lock().await);
2174 }
2175
2176 let mu = Arc::new(RwLock::new(0));
2177 let cv = Arc::new(Condvar::new());
2178
2179 let arc_waker = Arc::new(TestWaker);
2180 let waker = waker_ref(&arc_waker);
2181 let mut cx = Context::from_waker(&waker);
2182
2183 let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
2184 Box::pin(read(mu.clone(), cv.clone())),
2185 Box::pin(read(mu.clone(), cv.clone())),
2186 Box::pin(read(mu.clone(), cv.clone())),
2187 Box::pin(write(mu.clone(), cv.clone())),
2188 Box::pin(read(mu.clone(), cv.clone())),
2189 ];
2190 const NUM_READERS: usize = 4;
2191
2192 let mut l = Box::pin(lock(mu.clone()));
2193
2194 for f in &mut futures {
2195 if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2196 panic!("future unexpectedly ready");
2197 }
2198 }
2199
2200 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2201
2202 let mut count = block_on(mu.lock());
2203 *count = 1;
2204
2205 if let Poll::Ready(()) = l.as_mut().poll(&mut cx) {
2208 panic!("lock() unexpectedly ready");
2209 }
2210
2211 assert_eq!(
2212 mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2213 HAS_WAITERS | WRITER_WAITING
2214 );
2215
2216 cv.notify_all();
2218
2219 mem::drop(count);
2221
2222 if l.as_mut().poll(&mut cx).is_pending() {
2223 panic!("lock() unable to complete");
2224 }
2225
2226 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2228
2229 for f in &mut futures {
2232 if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2233 panic!("future unexpectedly ready");
2234 }
2235 }
2236
2237 assert_eq!(
2238 mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2239 READ_LOCK * NUM_READERS
2240 );
2241
2242 let mut needs_poll = None;
2244 for (i, r) in futures.iter_mut().enumerate() {
2245 match r.as_mut().poll(&mut cx) {
2246 Poll::Ready(()) => {}
2247 Poll::Pending => {
2248 if needs_poll.is_some() {
2249 panic!("More than one future unable to complete");
2250 }
2251 needs_poll = Some(i);
2252 }
2253 }
2254 }
2255
2256 if futures[needs_poll.expect("Writer unexpectedly able to complete")]
2257 .as_mut()
2258 .poll(&mut cx)
2259 .is_pending()
2260 {
2261 panic!("Writer unable to complete");
2262 }
2263
2264 assert_eq!(*block_on(mu.lock()), 0);
2265 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2266 }
2267
2268 #[test]
2269 fn notify_readers_with_read_lock() {
2270 async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2271 let mut count = mu.read_lock().await;
2272 while *count == 0 {
2273 count = cv.wait_read(count).await;
2274 }
2275
2276 pending!();
2278 }
2279
2280 let mu = Arc::new(RwLock::new(0));
2281 let cv = Arc::new(Condvar::new());
2282
2283 let arc_waker = Arc::new(TestWaker);
2284 let waker = waker_ref(&arc_waker);
2285 let mut cx = Context::from_waker(&waker);
2286
2287 let mut futures = [
2288 Box::pin(read(mu.clone(), cv.clone())),
2289 Box::pin(read(mu.clone(), cv.clone())),
2290 Box::pin(read(mu.clone(), cv.clone())),
2291 Box::pin(read(mu.clone(), cv.clone())),
2292 ];
2293
2294 for f in &mut futures {
2295 if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2296 panic!("future unexpectedly ready");
2297 }
2298 }
2299
2300 *block_on(mu.lock()) = 1;
2302
2303 let g = block_on(mu.read_lock());
2304
2305 cv.notify_all();
2307
2308 for f in &mut futures {
2311 if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2312 panic!("future unexpectedly ready");
2313 }
2314 }
2315 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2316 assert_eq!(
2317 mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2318 READ_LOCK * (futures.len() + 1)
2319 );
2320
2321 mem::drop(g);
2322
2323 for f in &mut futures {
2324 if f.as_mut().poll(&mut cx).is_pending() {
2325 panic!("future unable to complete");
2326 }
2327 }
2328
2329 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2330 }
2331}