cros_async/sync/
mu.rs

1// Copyright 2020 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::cell::UnsafeCell;
6use std::hint;
7use std::mem;
8use std::ops::Deref;
9use std::ops::DerefMut;
10use std::sync::atomic::AtomicUsize;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13use std::thread::yield_now;
14
15use super::super::sync::waiter::Kind as WaiterKind;
16use super::super::sync::waiter::Waiter;
17use super::super::sync::waiter::WaiterAdapter;
18use super::super::sync::waiter::WaiterList;
19use super::super::sync::waiter::WaitingFor;
20
21// Set when the rwlock is exclusively locked.
22const LOCKED: usize = 1 << 0;
23// Set when there are one or more threads waiting to acquire the lock.
24const HAS_WAITERS: usize = 1 << 1;
25// Set when a thread has been woken up from the wait queue. Cleared when that thread either acquires
26// the lock or adds itself back into the wait queue. Used to prevent unnecessary wake ups when a
27// thread has been removed from the wait queue but has not gotten CPU time yet.
28const DESIGNATED_WAKER: usize = 1 << 2;
29// Used to provide exclusive access to the `waiters` field in `RwLock`. Should only be held while
30// modifying the waiter list.
31const SPINLOCK: usize = 1 << 3;
32// Set when a thread that wants an exclusive lock adds itself to the wait queue. New threads
33// attempting to acquire a shared lock will be preventing from getting it when this bit is set.
34// However, this bit is ignored once a thread has gone through the wait queue at least once.
35const WRITER_WAITING: usize = 1 << 4;
36// Set when a thread has gone through the wait queue many times but has failed to acquire the lock
37// every time it is woken up. When this bit is set, all other threads are prevented from acquiring
38// the lock until the thread that set the `LONG_WAIT` bit has acquired the lock.
39const LONG_WAIT: usize = 1 << 5;
40// The bit that is added to the rwlock state in order to acquire a shared lock. Since more than one
41// thread can acquire a shared lock, we cannot use a single bit. Instead we use all the remaining
42// bits in the state to track the number of threads that have acquired a shared lock.
43const READ_LOCK: usize = 1 << 8;
44// Mask used for checking if any threads currently hold a shared lock.
45const READ_MASK: usize = !0xff;
46
47// The number of times the thread should just spin and attempt to re-acquire the lock.
48const SPIN_THRESHOLD: usize = 7;
49
50// The number of times the thread needs to go through the wait queue before it sets the `LONG_WAIT`
51// bit and forces all other threads to wait for it to acquire the lock. This value is set relatively
52// high so that we don't lose the benefit of having running threads unless it is absolutely
53// necessary.
54const LONG_WAIT_THRESHOLD: usize = 19;
55
56// Common methods between shared and exclusive locks.
57trait Kind {
58    // The bits that must be zero for the thread to acquire this kind of lock. If any of these bits
59    // are not zero then the thread will first spin and retry a few times before adding itself to
60    // the wait queue.
61    fn zero_to_acquire() -> usize;
62
63    // The bit that must be added in order to acquire this kind of lock. This should either be
64    // `LOCKED` or `READ_LOCK`.
65    fn add_to_acquire() -> usize;
66
67    // The bits that should be set when a thread adds itself to the wait queue while waiting to
68    // acquire this kind of lock.
69    fn set_when_waiting() -> usize;
70
71    // The bits that should be cleared when a thread acquires this kind of lock.
72    fn clear_on_acquire() -> usize;
73
74    // The waiter that a thread should use when waiting to acquire this kind of lock.
75    fn new_waiter(raw: &RawRwLock) -> Arc<Waiter>;
76}
77
78// A lock type for shared read-only access to the data. More than one thread may hold this kind of
79// lock simultaneously.
80struct Shared;
81
82impl Kind for Shared {
83    fn zero_to_acquire() -> usize {
84        LOCKED | WRITER_WAITING | LONG_WAIT
85    }
86
87    fn add_to_acquire() -> usize {
88        READ_LOCK
89    }
90
91    fn set_when_waiting() -> usize {
92        0
93    }
94
95    fn clear_on_acquire() -> usize {
96        0
97    }
98
99    fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
100        Arc::new(Waiter::new(
101            WaiterKind::Shared,
102            cancel_waiter,
103            raw as *const RawRwLock as usize,
104            WaitingFor::Mutex,
105        ))
106    }
107}
108
109// A lock type for mutually exclusive read-write access to the data. Only one thread can hold this
110// kind of lock at a time.
111struct Exclusive;
112
113impl Kind for Exclusive {
114    fn zero_to_acquire() -> usize {
115        LOCKED | READ_MASK | LONG_WAIT
116    }
117
118    fn add_to_acquire() -> usize {
119        LOCKED
120    }
121
122    fn set_when_waiting() -> usize {
123        WRITER_WAITING
124    }
125
126    fn clear_on_acquire() -> usize {
127        WRITER_WAITING
128    }
129
130    fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
131        Arc::new(Waiter::new(
132            WaiterKind::Exclusive,
133            cancel_waiter,
134            raw as *const RawRwLock as usize,
135            WaitingFor::Mutex,
136        ))
137    }
138}
139
140// Scan `waiters` and return the ones that should be woken up. Also returns any bits that should be
141// set in the rwlock state when the current thread releases the spin lock protecting the waiter
142// list.
143//
144// If the first waiter is trying to acquire a shared lock, then all waiters in the list that are
145// waiting for a shared lock are also woken up. If any waiters waiting for an exclusive lock are
146// found when iterating through the list, then the returned `usize` contains the `WRITER_WAITING`
147// bit, which should be set when the thread releases the spin lock.
148//
149// If the first waiter is trying to acquire an exclusive lock, then only that waiter is returned and
150// no bits are set in the returned `usize`.
151fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, usize) {
152    let mut to_wake = WaiterList::new(WaiterAdapter::new());
153    let mut set_on_release = 0;
154    let mut cursor = waiters.front_mut();
155
156    let mut waking_readers = false;
157    while let Some(w) = cursor.get() {
158        match w.kind() {
159            WaiterKind::Exclusive if !waking_readers => {
160                // This is the first waiter and it's a writer. No need to check the other waiters.
161                let waiter = cursor.remove().unwrap();
162                waiter.set_waiting_for(WaitingFor::None);
163                to_wake.push_back(waiter);
164                break;
165            }
166
167            WaiterKind::Shared => {
168                // This is a reader and the first waiter in the list was not a writer so wake up all
169                // the readers in the wait list.
170                let waiter = cursor.remove().unwrap();
171                waiter.set_waiting_for(WaitingFor::None);
172                to_wake.push_back(waiter);
173                waking_readers = true;
174            }
175
176            WaiterKind::Exclusive => {
177                // We found a writer while looking for more readers to wake up. Set the
178                // `WRITER_WAITING` bit to prevent any new readers from acquiring the lock. All
179                // readers currently in the wait list will ignore this bit since they already waited
180                // once.
181                set_on_release |= WRITER_WAITING;
182                cursor.move_next();
183            }
184        }
185    }
186
187    (to_wake, set_on_release)
188}
189
190#[inline]
191fn cpu_relax(iterations: usize) {
192    for _ in 0..iterations {
193        hint::spin_loop();
194    }
195}
196
197pub(crate) struct RawRwLock {
198    state: AtomicUsize,
199    waiters: UnsafeCell<WaiterList>,
200}
201
202impl RawRwLock {
203    pub fn new() -> RawRwLock {
204        RawRwLock {
205            state: AtomicUsize::new(0),
206            waiters: UnsafeCell::new(WaiterList::new(WaiterAdapter::new())),
207        }
208    }
209
210    #[inline]
211    pub async fn lock(&self) {
212        match self
213            .state
214            .compare_exchange_weak(0, LOCKED, Ordering::Acquire, Ordering::Relaxed)
215        {
216            Ok(_) => {}
217            Err(oldstate) => {
218                // If any bits that should be zero are not zero or if we fail to acquire the lock
219                // with a single compare_exchange then go through the slow path.
220                if (oldstate & Exclusive::zero_to_acquire()) != 0
221                    || self
222                        .state
223                        .compare_exchange_weak(
224                            oldstate,
225                            (oldstate + Exclusive::add_to_acquire())
226                                & !Exclusive::clear_on_acquire(),
227                            Ordering::Acquire,
228                            Ordering::Relaxed,
229                        )
230                        .is_err()
231                {
232                    self.lock_slow::<Exclusive>(0, 0).await;
233                }
234            }
235        }
236    }
237
238    #[inline]
239    pub async fn read_lock(&self) {
240        match self
241            .state
242            .compare_exchange_weak(0, READ_LOCK, Ordering::Acquire, Ordering::Relaxed)
243        {
244            Ok(_) => {}
245            Err(oldstate) => {
246                if (oldstate & Shared::zero_to_acquire()) != 0
247                    || self
248                        .state
249                        .compare_exchange_weak(
250                            oldstate,
251                            (oldstate + Shared::add_to_acquire()) & !Shared::clear_on_acquire(),
252                            Ordering::Acquire,
253                            Ordering::Relaxed,
254                        )
255                        .is_err()
256                {
257                    self.lock_slow::<Shared>(0, 0).await;
258                }
259            }
260        }
261    }
262
263    // Slow path for acquiring the lock. `clear` should contain any bits that need to be cleared
264    // when the lock is acquired. Any bits set in `zero_mask` are cleared from the bits returned by
265    // `K::zero_to_acquire()`.
266    #[cold]
267    async fn lock_slow<K: Kind>(&self, mut clear: usize, zero_mask: usize) {
268        let mut zero_to_acquire = K::zero_to_acquire() & !zero_mask;
269
270        let mut spin_count = 0;
271        let mut wait_count = 0;
272        let mut waiter = None;
273        loop {
274            let oldstate = self.state.load(Ordering::Relaxed);
275            //  If all the bits in `zero_to_acquire` are actually zero then try to acquire the lock
276            //  directly.
277            if (oldstate & zero_to_acquire) == 0 {
278                if self
279                    .state
280                    .compare_exchange_weak(
281                        oldstate,
282                        (oldstate + K::add_to_acquire()) & !(clear | K::clear_on_acquire()),
283                        Ordering::Acquire,
284                        Ordering::Relaxed,
285                    )
286                    .is_ok()
287                {
288                    return;
289                }
290            } else if (oldstate & SPINLOCK) == 0 {
291                // The rwlock is locked and the spin lock is available.  Try to add this thread to
292                // the waiter queue.
293                let w = waiter.get_or_insert_with(|| K::new_waiter(self));
294                w.reset(WaitingFor::Mutex);
295
296                if self
297                    .state
298                    .compare_exchange_weak(
299                        oldstate,
300                        (oldstate | SPINLOCK | HAS_WAITERS | K::set_when_waiting()) & !clear,
301                        Ordering::Acquire,
302                        Ordering::Relaxed,
303                    )
304                    .is_ok()
305                {
306                    let mut set_on_release = 0;
307
308                    if wait_count < LONG_WAIT_THRESHOLD {
309                        // Add the waiter to the back of the queue.
310                        // SAFETY:
311                        // Safe because we have acquired the spin lock and it provides exclusive
312                        // access to the waiter queue.
313                        unsafe { (*self.waiters.get()).push_back(w.clone()) };
314                    } else {
315                        // This waiter has gone through the queue too many times. Put it in the
316                        // front of the queue and block all other threads from acquiring the lock
317                        // until this one has acquired it at least once.
318                        // SAFETY:
319                        // Safe because we have acquired the spin lock and it provides exclusive
320                        // access to the waiter queue.
321                        unsafe { (*self.waiters.get()).push_front(w.clone()) };
322
323                        // Set the LONG_WAIT bit to prevent all other threads from acquiring the
324                        // lock.
325                        set_on_release |= LONG_WAIT;
326
327                        // Make sure we clear the LONG_WAIT bit when we do finally get the lock.
328                        clear |= LONG_WAIT;
329
330                        // Since we set the LONG_WAIT bit we shouldn't allow that bit to prevent us
331                        // from acquiring the lock.
332                        zero_to_acquire &= !LONG_WAIT;
333                    }
334
335                    // Release the spin lock.
336                    let mut state = oldstate;
337                    loop {
338                        match self.state.compare_exchange_weak(
339                            state,
340                            (state | set_on_release) & !SPINLOCK,
341                            Ordering::Release,
342                            Ordering::Relaxed,
343                        ) {
344                            Ok(_) => break,
345                            Err(w) => state = w,
346                        }
347                    }
348
349                    // Now wait until we are woken.
350                    w.wait().await;
351
352                    // The `DESIGNATED_WAKER` bit gets set when this thread is woken up by the
353                    // thread that originally held the lock. While this bit is set, no other waiters
354                    // will be woken up so it's important to clear it the next time we try to
355                    // acquire the main lock or the spin lock.
356                    clear |= DESIGNATED_WAKER;
357
358                    // Now that the thread has waited once, we no longer care if there is a writer
359                    // waiting. Only the limits of mutual exclusion can prevent us from acquiring
360                    // the lock.
361                    zero_to_acquire &= !WRITER_WAITING;
362
363                    // Reset the spin count since we just went through the wait queue.
364                    spin_count = 0;
365
366                    // Increment the wait count since we went through the wait queue.
367                    wait_count += 1;
368
369                    // Skip the `cpu_relax` below.
370                    continue;
371                }
372            }
373
374            // Both the lock and the spin lock are held by one or more other threads. First, we'll
375            // spin a few times in case we can acquire the lock or the spin lock. If that fails then
376            // we yield because we might be preventing the threads that do hold the 2 locks from
377            // getting cpu time.
378            if spin_count < SPIN_THRESHOLD {
379                cpu_relax(1 << spin_count);
380                spin_count += 1;
381            } else {
382                yield_now();
383            }
384        }
385    }
386
387    #[inline]
388    pub fn unlock(&self) {
389        // Fast path, if possible. We can directly clear the locked bit since we have exclusive
390        // access to the rwlock.
391        let oldstate = self.state.fetch_sub(LOCKED, Ordering::Release);
392
393        // Panic if we just tried to unlock a rwlock that wasn't held by this thread. This shouldn't
394        // really be possible since `unlock` is not a public method.
395        debug_assert_eq!(
396            oldstate & READ_MASK,
397            0,
398            "`unlock` called on rwlock held in read-mode"
399        );
400        debug_assert_ne!(
401            oldstate & LOCKED,
402            0,
403            "`unlock` called on rwlock not held in write-mode"
404        );
405
406        if (oldstate & HAS_WAITERS) != 0 && (oldstate & DESIGNATED_WAKER) == 0 {
407            // The oldstate has waiters but no designated waker has been chosen yet.
408            self.unlock_slow();
409        }
410    }
411
412    #[inline]
413    pub fn read_unlock(&self) {
414        // Fast path, if possible. We can directly subtract the READ_LOCK bit since we had
415        // previously added it.
416        let oldstate = self.state.fetch_sub(READ_LOCK, Ordering::Release);
417
418        debug_assert_eq!(
419            oldstate & LOCKED,
420            0,
421            "`read_unlock` called on rwlock held in write-mode"
422        );
423        debug_assert_ne!(
424            oldstate & READ_MASK,
425            0,
426            "`read_unlock` called on rwlock not held in read-mode"
427        );
428
429        if (oldstate & HAS_WAITERS) != 0
430            && (oldstate & DESIGNATED_WAKER) == 0
431            && (oldstate & READ_MASK) == READ_LOCK
432        {
433            // There are waiters, no designated waker has been chosen yet, and the last reader is
434            // unlocking so we have to take the slow path.
435            self.unlock_slow();
436        }
437    }
438
439    #[cold]
440    fn unlock_slow(&self) {
441        let mut spin_count = 0;
442
443        loop {
444            let oldstate = self.state.load(Ordering::Relaxed);
445            if (oldstate & HAS_WAITERS) == 0 || (oldstate & DESIGNATED_WAKER) != 0 {
446                // No more waiters or a designated waker has been chosen. Nothing left for us to do.
447                return;
448            } else if (oldstate & SPINLOCK) == 0 {
449                // The spin lock is not held by another thread. Try to acquire it. Also set the
450                // `DESIGNATED_WAKER` bit since we are likely going to wake up one or more threads.
451                if self
452                    .state
453                    .compare_exchange_weak(
454                        oldstate,
455                        oldstate | SPINLOCK | DESIGNATED_WAKER,
456                        Ordering::Acquire,
457                        Ordering::Relaxed,
458                    )
459                    .is_ok()
460                {
461                    // Acquired the spinlock. Try to wake a waiter. We may also end up wanting to
462                    // clear the HAS_WAITER and DESIGNATED_WAKER bits so start collecting the bits
463                    // to be cleared.
464                    let mut clear = SPINLOCK;
465
466                    // SAFETY:
467                    // Safe because the spinlock guarantees exclusive access to the waiter list and
468                    // the reference does not escape this function.
469                    let waiters = unsafe { &mut *self.waiters.get() };
470                    let (wake_list, set_on_release) = get_wake_list(waiters);
471
472                    // If the waiter list is now empty, clear the HAS_WAITERS bit.
473                    if waiters.is_empty() {
474                        clear |= HAS_WAITERS;
475                    }
476
477                    if wake_list.is_empty() {
478                        // Since we are not going to wake any waiters clear the DESIGNATED_WAKER bit
479                        // that we set when we acquired the spin lock.
480                        clear |= DESIGNATED_WAKER;
481                    }
482
483                    // Release the spin lock and clear any other bits as necessary. Also, set any
484                    // bits returned by `get_wake_list`. For now, this is just the `WRITER_WAITING`
485                    // bit, which needs to be set when we are waking up a bunch of readers and there
486                    // are still writers in the wait queue. This will prevent any readers that
487                    // aren't in `wake_list` from acquiring the read lock.
488                    let mut state = oldstate;
489                    loop {
490                        match self.state.compare_exchange_weak(
491                            state,
492                            (state | set_on_release) & !clear,
493                            Ordering::Release,
494                            Ordering::Relaxed,
495                        ) {
496                            Ok(_) => break,
497                            Err(w) => state = w,
498                        }
499                    }
500
501                    // Now wake the waiters, if any.
502                    for w in wake_list {
503                        w.wake();
504                    }
505
506                    // We're done.
507                    return;
508                }
509            }
510
511            // Spin and try again.  It's ok to block here as we have already released the lock.
512            if spin_count < SPIN_THRESHOLD {
513                cpu_relax(1 << spin_count);
514                spin_count += 1;
515            } else {
516                yield_now();
517            }
518        }
519    }
520
521    fn cancel_waiter(&self, waiter: &Waiter, wake_next: bool) {
522        let mut oldstate = self.state.load(Ordering::Relaxed);
523        while oldstate & SPINLOCK != 0
524            || self
525                .state
526                .compare_exchange_weak(
527                    oldstate,
528                    oldstate | SPINLOCK,
529                    Ordering::Acquire,
530                    Ordering::Relaxed,
531                )
532                .is_err()
533        {
534            hint::spin_loop();
535            oldstate = self.state.load(Ordering::Relaxed);
536        }
537
538        // SAFETY:
539        // Safe because the spin lock provides exclusive access and the reference does not escape
540        // this function.
541        let waiters = unsafe { &mut *self.waiters.get() };
542
543        let mut clear = SPINLOCK;
544
545        // If we are about to remove the first waiter in the wait list, then clear the LONG_WAIT
546        // bit. Also clear the bit if we are going to be waking some other waiters. In this case the
547        // waiter that set the bit may have already been removed from the waiter list (and could be
548        // the one that is currently being dropped). If it is still in the waiter list then clearing
549        // this bit may starve it for one more iteration through the lock_slow() loop, whereas not
550        // clearing this bit could cause a deadlock if the waiter that set it is the one that is
551        // being dropped.
552        if wake_next
553            || waiters
554                .front()
555                .get()
556                .map(|front| std::ptr::eq(front, waiter))
557                .unwrap_or(false)
558        {
559            clear |= LONG_WAIT;
560        }
561
562        let waiting_for = waiter.is_waiting_for();
563
564        // Don't drop the old waiter while holding the spin lock.
565        let old_waiter = if waiter.is_linked() && waiting_for == WaitingFor::Mutex {
566            // SAFETY:
567            // We know that the waiter is still linked and is waiting for the rwlock, which
568            // guarantees that it is still linked into `self.waiters`.
569            let mut cursor = unsafe { waiters.cursor_mut_from_ptr(waiter as *const Waiter) };
570            cursor.remove()
571        } else {
572            None
573        };
574
575        let (wake_list, set_on_release) = if wake_next || waiting_for == WaitingFor::None {
576            // Either the waiter was already woken or it's been removed from the rwlock's waiter
577            // list and is going to be woken. Either way, we need to wake up another thread.
578            get_wake_list(waiters)
579        } else {
580            (WaiterList::new(WaiterAdapter::new()), 0)
581        };
582
583        if waiters.is_empty() {
584            clear |= HAS_WAITERS;
585        }
586
587        if wake_list.is_empty() {
588            // We're not waking any other threads so clear the DESIGNATED_WAKER bit. In the worst
589            // case this leads to an additional thread being woken up but we risk a deadlock if we
590            // don't clear it.
591            clear |= DESIGNATED_WAKER;
592        }
593
594        if let WaiterKind::Exclusive = waiter.kind() {
595            // The waiter being dropped is a writer so clear the writer waiting bit for now. If we
596            // found more writers in the list while fetching waiters to wake up then this bit will
597            // be set again via `set_on_release`.
598            clear |= WRITER_WAITING;
599        }
600
601        while self
602            .state
603            .compare_exchange_weak(
604                oldstate,
605                (oldstate & !clear) | set_on_release,
606                Ordering::Release,
607                Ordering::Relaxed,
608            )
609            .is_err()
610        {
611            hint::spin_loop();
612            oldstate = self.state.load(Ordering::Relaxed);
613        }
614
615        for w in wake_list {
616            w.wake();
617        }
618
619        mem::drop(old_waiter);
620    }
621}
622
623// TODO(b/315998194): Add safety comment
624#[allow(clippy::undocumented_unsafe_blocks)]
625unsafe impl Send for RawRwLock {}
626// TODO(b/315998194): Add safety comment
627#[allow(clippy::undocumented_unsafe_blocks)]
628unsafe impl Sync for RawRwLock {}
629
630fn cancel_waiter(raw: usize, waiter: &Waiter, wake_next: bool) {
631    let raw_rwlock = raw as *const RawRwLock;
632
633    // SAFETY:
634    // Safe because the thread that owns the waiter that is being canceled must also own a reference
635    // to the rwlock, which ensures that this pointer is valid.
636    unsafe { (*raw_rwlock).cancel_waiter(waiter, wake_next) }
637}
638
639/// A high-level primitive that provides safe, mutable access to a shared resource.
640///
641/// `RwLock` safely provides both shared, immutable access (via `read_lock()`) as well as exclusive,
642/// mutable access (via `lock()`) to an underlying resource asynchronously while ensuring fairness
643/// with no loss of performance. If you don't need `read_lock()` nor fairness, try upstream
644/// `futures::lock::Mutex` instead.
645///
646/// # Poisoning
647///
648/// `RwLock` does not support lock poisoning so if a thread panics while holding the lock, the
649/// poisoned data will be accessible by other threads in your program. If you need to guarantee that
650/// other threads cannot access poisoned data then you may wish to wrap this `RwLock` inside another
651/// type that provides the poisoning feature. See the implementation of `std::sync::Mutex` for an
652/// example of this. Note `futures::lock::Mutex` does not support poisoning either.
653///
654///
655/// # Fairness
656///
657/// This `RwLock` implementation does not guarantee that threads will acquire the lock in the same
658/// order that they call `lock()` or `read_lock()`. However it will attempt to prevent long-term
659/// starvation: if a thread repeatedly fails to acquire the lock beyond a threshold then all other
660/// threads will fail to acquire the lock until the starved thread has acquired it. Note, on the
661/// other hand, `futures::lock::Mutex` does not guarantee fairness.
662///
663/// Similarly, this `RwLock` will attempt to balance reader and writer threads: once there is a
664/// writer thread waiting to acquire the lock no new reader threads will be allowed to acquire it.
665/// However, any reader threads that were already waiting will still be allowed to acquire it.
666///
667/// # Examples
668///
669/// ```edition2018
670/// use std::sync::Arc;
671/// use std::thread;
672/// use std::sync::mpsc::channel;
673///
674/// use cros_async::{block_on, sync::RwLock};
675///
676/// const N: usize = 10;
677///
678/// // Spawn a few threads to increment a shared variable (non-atomically), and
679/// // let the main thread know once all increments are done.
680/// //
681/// // Here we're using an Arc to share memory among threads, and the data inside
682/// // the Arc is protected with a rwlock.
683/// let data = Arc::new(RwLock::new(0));
684///
685/// let (tx, rx) = channel();
686/// for _ in 0..N {
687///     let (data, tx) = (Arc::clone(&data), tx.clone());
688///     thread::spawn(move || {
689///         // The shared state can only be accessed once the lock is held.
690///         // Our non-atomic increment is safe because we're the only thread
691///         // which can access the shared state when the lock is held.
692///         let mut data = block_on(data.lock());
693///         *data += 1;
694///         if *data == N {
695///             tx.send(()).unwrap();
696///         }
697///         // the lock is unlocked here when `data` goes out of scope.
698///     });
699/// }
700///
701/// rx.recv().unwrap();
702/// ```
703#[repr(align(128))]
704pub struct RwLock<T: ?Sized> {
705    raw: RawRwLock,
706    value: UnsafeCell<T>,
707}
708
709impl<T> RwLock<T> {
710    /// Create a new, unlocked `RwLock` ready for use.
711    pub fn new(v: T) -> RwLock<T> {
712        RwLock {
713            raw: RawRwLock::new(),
714            value: UnsafeCell::new(v),
715        }
716    }
717
718    /// Consume the `RwLock` and return the contained value. This method does not perform any
719    /// locking as the compiler will guarantee that there are no other references to `self` and the
720    /// caller owns the `RwLock`.
721    pub fn into_inner(self) -> T {
722        // Don't need to acquire the lock because the compiler guarantees that there are
723        // no references to `self`.
724        self.value.into_inner()
725    }
726}
727
728impl<T: ?Sized> RwLock<T> {
729    /// Acquires exclusive, mutable access to the resource protected by the `RwLock`, blocking the
730    /// current thread until it is able to do so. Upon returning, the current thread will be the
731    /// only thread with access to the resource. The `RwLock` will be released when the returned
732    /// `RwLockWriteGuard` is dropped.
733    ///
734    /// Calling `lock()` while holding a `RwLockWriteGuard` or a `RwLockReadGuard` will cause a
735    /// deadlock.
736    ///
737    /// Callers that are not in an async context may wish to use the `block_on` method to block the
738    /// thread until the `RwLock` is acquired.
739    #[inline]
740    pub async fn lock(&self) -> RwLockWriteGuard<'_, T> {
741        self.raw.lock().await;
742
743        RwLockWriteGuard {
744            mu: self,
745            // SAFETY:
746            // Safe because we have exclusive access to `self.value`.
747            value: unsafe { &mut *self.value.get() },
748        }
749    }
750
751    /// Acquires shared, immutable access to the resource protected by the `RwLock`, blocking the
752    /// current thread until it is able to do so. Upon returning there may be other threads that
753    /// also have immutable access to the resource but there will not be any threads that have
754    /// mutable access to the resource. When the returned `RwLockReadGuard` is dropped the thread
755    /// releases its access to the resource.
756    ///
757    /// Calling `read_lock()` while holding a `RwLockReadGuard` may deadlock. Calling `read_lock()`
758    /// while holding a `RwLockWriteGuard` will deadlock.
759    ///
760    /// Callers that are not in an async context may wish to use the `block_on` method to block the
761    /// thread until the `RwLock` is acquired.
762    #[inline]
763    pub async fn read_lock(&self) -> RwLockReadGuard<'_, T> {
764        self.raw.read_lock().await;
765
766        RwLockReadGuard {
767            mu: self,
768            // SAFETY:
769            // Safe because we have shared read-only access to `self.value`.
770            value: unsafe { &*self.value.get() },
771        }
772    }
773
774    // Called from `Condvar::wait` when the thread wants to reacquire the lock.
775    #[inline]
776    pub(crate) async fn lock_from_cv(&self) -> RwLockWriteGuard<'_, T> {
777        self.raw.lock_slow::<Exclusive>(DESIGNATED_WAKER, 0).await;
778
779        RwLockWriteGuard {
780            mu: self,
781            // SAFETY:
782            // Safe because we have exclusive access to `self.value`.
783            value: unsafe { &mut *self.value.get() },
784        }
785    }
786
787    // Like `lock_from_cv` but for acquiring a shared lock.
788    #[inline]
789    pub(crate) async fn read_lock_from_cv(&self) -> RwLockReadGuard<'_, T> {
790        // Threads that have waited in the Condvar's waiter list don't have to care if there is a
791        // writer waiting since they have already waited once.
792        self.raw
793            .lock_slow::<Shared>(DESIGNATED_WAKER, WRITER_WAITING)
794            .await;
795
796        RwLockReadGuard {
797            mu: self,
798            // SAFETY:
799            // Safe because we have exclusive access to `self.value`.
800            value: unsafe { &*self.value.get() },
801        }
802    }
803
804    #[inline]
805    fn unlock(&self) {
806        self.raw.unlock();
807    }
808
809    #[inline]
810    fn read_unlock(&self) {
811        self.raw.read_unlock();
812    }
813
814    pub fn get_mut(&mut self) -> &mut T {
815        // SAFETY:
816        // Safe because the compiler statically guarantees that are no other references to `self`.
817        // This is also why we don't need to acquire the lock first.
818        unsafe { &mut *self.value.get() }
819    }
820}
821
822// TODO(b/315998194): Add safety comment
823#[allow(clippy::undocumented_unsafe_blocks)]
824unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
825// TODO(b/315998194): Add safety comment
826#[allow(clippy::undocumented_unsafe_blocks)]
827unsafe impl<T: ?Sized + Send> Sync for RwLock<T> {}
828
829impl<T: Default> Default for RwLock<T> {
830    fn default() -> Self {
831        Self::new(Default::default())
832    }
833}
834
835impl<T> From<T> for RwLock<T> {
836    fn from(source: T) -> Self {
837        Self::new(source)
838    }
839}
840
841/// An RAII implementation of a "scoped exclusive lock" for a `RwLock`. When this structure is
842/// dropped, the lock will be released. The resource protected by the `RwLock` can be accessed via
843/// the `Deref` and `DerefMut` implementations of this structure.
844pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> {
845    mu: &'a RwLock<T>,
846    value: &'a mut T,
847}
848
849impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
850    pub(crate) fn into_inner(self) -> &'a RwLock<T> {
851        self.mu
852    }
853
854    pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
855        &self.mu.raw
856    }
857}
858
859impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> {
860    type Target = T;
861
862    fn deref(&self) -> &Self::Target {
863        self.value
864    }
865}
866
867impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> {
868    fn deref_mut(&mut self) -> &mut Self::Target {
869        self.value
870    }
871}
872
873impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
874    fn drop(&mut self) {
875        self.mu.unlock()
876    }
877}
878
879/// An RAII implementation of a "scoped shared lock" for a `RwLock`. When this structure is dropped,
880/// the lock will be released. The resource protected by the `RwLock` can be accessed via the
881/// `Deref` implementation of this structure.
882pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
883    mu: &'a RwLock<T>,
884    value: &'a T,
885}
886
887impl<'a, T: ?Sized> RwLockReadGuard<'a, T> {
888    pub(crate) fn into_inner(self) -> &'a RwLock<T> {
889        self.mu
890    }
891
892    pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
893        &self.mu.raw
894    }
895}
896
897impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
898    type Target = T;
899
900    fn deref(&self) -> &Self::Target {
901        self.value
902    }
903}
904
905impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
906    fn drop(&mut self) {
907        self.mu.read_unlock()
908    }
909}
910
911// TODO(b/194338842): Fix tests for windows
912#[cfg(any(target_os = "android", target_os = "linux"))]
913#[cfg(test)]
914mod test {
915    use std::future::Future;
916    use std::mem;
917    use std::pin::Pin;
918    use std::rc::Rc;
919    use std::sync::atomic::AtomicUsize;
920    use std::sync::atomic::Ordering;
921    use std::sync::mpsc::channel;
922    use std::sync::mpsc::Sender;
923    use std::sync::Arc;
924    use std::task::Context;
925    use std::task::Poll;
926    use std::task::Waker;
927    use std::thread;
928    use std::time::Duration;
929
930    use futures::channel::oneshot;
931    use futures::pending;
932    use futures::select;
933    use futures::task::waker_ref;
934    use futures::task::ArcWake;
935    use futures::FutureExt;
936    use futures_executor::LocalPool;
937    use futures_executor::ThreadPool;
938    use futures_util::task::LocalSpawnExt;
939
940    use super::super::super::block_on;
941    use super::super::super::sync::Condvar;
942    use super::super::super::sync::SpinLock;
943    use super::*;
944
945    #[derive(Debug, Eq, PartialEq)]
946    struct NonCopy(u32);
947
948    // Dummy waker used when we want to manually drive futures.
949    struct TestWaker;
950    impl ArcWake for TestWaker {
951        fn wake_by_ref(_arc_self: &Arc<Self>) {}
952    }
953
954    #[test]
955    fn it_works() {
956        let mu = RwLock::new(NonCopy(13));
957
958        assert_eq!(*block_on(mu.lock()), NonCopy(13));
959    }
960
961    #[test]
962    fn smoke() {
963        let mu = RwLock::new(NonCopy(7));
964
965        mem::drop(block_on(mu.lock()));
966        mem::drop(block_on(mu.lock()));
967    }
968
969    #[test]
970    fn rw_smoke() {
971        let mu = RwLock::new(NonCopy(7));
972
973        mem::drop(block_on(mu.lock()));
974        mem::drop(block_on(mu.read_lock()));
975        mem::drop((block_on(mu.read_lock()), block_on(mu.read_lock())));
976        mem::drop(block_on(mu.lock()));
977    }
978
979    #[test]
980    fn async_smoke() {
981        async fn lock(mu: Rc<RwLock<NonCopy>>) {
982            mu.lock().await;
983        }
984
985        async fn read_lock(mu: Rc<RwLock<NonCopy>>) {
986            mu.read_lock().await;
987        }
988
989        async fn double_read_lock(mu: Rc<RwLock<NonCopy>>) {
990            let first = mu.read_lock().await;
991            mu.read_lock().await;
992
993            // Make sure first lives past the second read lock.
994            first.as_raw_rwlock();
995        }
996
997        let mu = Rc::new(RwLock::new(NonCopy(7)));
998
999        let mut ex = LocalPool::new();
1000        let spawner = ex.spawner();
1001
1002        spawner
1003            .spawn_local(lock(Rc::clone(&mu)))
1004            .expect("Failed to spawn future");
1005        spawner
1006            .spawn_local(read_lock(Rc::clone(&mu)))
1007            .expect("Failed to spawn future");
1008        spawner
1009            .spawn_local(double_read_lock(Rc::clone(&mu)))
1010            .expect("Failed to spawn future");
1011        spawner
1012            .spawn_local(lock(Rc::clone(&mu)))
1013            .expect("Failed to spawn future");
1014
1015        ex.run();
1016    }
1017
1018    #[test]
1019    fn send() {
1020        let mu = RwLock::new(NonCopy(19));
1021
1022        thread::spawn(move || {
1023            let value = block_on(mu.lock());
1024            assert_eq!(*value, NonCopy(19));
1025        })
1026        .join()
1027        .unwrap();
1028    }
1029
1030    #[test]
1031    fn arc_nested() {
1032        // Tests nested rwlocks and access to underlying data.
1033        let mu = RwLock::new(1);
1034        let arc = Arc::new(RwLock::new(mu));
1035        thread::spawn(move || {
1036            let nested = block_on(arc.lock());
1037            let lock2 = block_on(nested.lock());
1038            assert_eq!(*lock2, 1);
1039        })
1040        .join()
1041        .unwrap();
1042    }
1043
1044    #[test]
1045    fn arc_access_in_unwind() {
1046        let arc = Arc::new(RwLock::new(1));
1047        let arc2 = arc.clone();
1048        thread::spawn(move || {
1049            struct Unwinder {
1050                i: Arc<RwLock<i32>>,
1051            }
1052            impl Drop for Unwinder {
1053                fn drop(&mut self) {
1054                    *block_on(self.i.lock()) += 1;
1055                }
1056            }
1057            let _u = Unwinder { i: arc2 };
1058            panic!();
1059        })
1060        .join()
1061        .expect_err("thread did not panic");
1062        let lock = block_on(arc.lock());
1063        assert_eq!(*lock, 2);
1064    }
1065
1066    #[test]
1067    fn unsized_value() {
1068        let rwlock: &RwLock<[i32]> = &RwLock::new([1, 2, 3]);
1069        {
1070            let b = &mut *block_on(rwlock.lock());
1071            b[0] = 4;
1072            b[2] = 5;
1073        }
1074        let expected: &[i32] = &[4, 2, 5];
1075        assert_eq!(&*block_on(rwlock.lock()), expected);
1076    }
1077    #[test]
1078    fn high_contention() {
1079        const THREADS: usize = 17;
1080        const ITERATIONS: usize = 103;
1081
1082        let mut threads = Vec::with_capacity(THREADS);
1083
1084        let mu = Arc::new(RwLock::new(0usize));
1085        for _ in 0..THREADS {
1086            let mu2 = mu.clone();
1087            threads.push(thread::spawn(move || {
1088                for _ in 0..ITERATIONS {
1089                    *block_on(mu2.lock()) += 1;
1090                }
1091            }));
1092        }
1093
1094        for t in threads.into_iter() {
1095            t.join().unwrap();
1096        }
1097
1098        assert_eq!(*block_on(mu.read_lock()), THREADS * ITERATIONS);
1099        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1100    }
1101
1102    #[test]
1103    fn high_contention_with_cancel() {
1104        const TASKS: usize = 17;
1105        const ITERATIONS: usize = 103;
1106
1107        async fn increment(mu: Arc<RwLock<usize>>, alt_mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1108            for _ in 0..ITERATIONS {
1109                select! {
1110                    mut count = mu.lock().fuse() => *count += 1,
1111                    mut count = alt_mu.lock().fuse() => *count += 1,
1112                }
1113            }
1114            tx.send(()).expect("Failed to send completion signal");
1115        }
1116
1117        let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1118
1119        let mu = Arc::new(RwLock::new(0usize));
1120        let alt_mu = Arc::new(RwLock::new(0usize));
1121
1122        let (tx, rx) = channel();
1123        for _ in 0..TASKS {
1124            ex.spawn_ok(increment(Arc::clone(&mu), Arc::clone(&alt_mu), tx.clone()));
1125        }
1126
1127        for _ in 0..TASKS {
1128            if let Err(e) = rx.recv_timeout(Duration::from_secs(10)) {
1129                panic!("Error while waiting for threads to complete: {e}");
1130            }
1131        }
1132
1133        assert_eq!(
1134            *block_on(mu.read_lock()) + *block_on(alt_mu.read_lock()),
1135            TASKS * ITERATIONS
1136        );
1137        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1138        assert_eq!(alt_mu.raw.state.load(Ordering::Relaxed), 0);
1139    }
1140
1141    #[test]
1142    fn single_thread_async() {
1143        const TASKS: usize = 17;
1144        const ITERATIONS: usize = 103;
1145
1146        // Async closures are unstable.
1147        async fn increment(mu: Rc<RwLock<usize>>) {
1148            for _ in 0..ITERATIONS {
1149                *mu.lock().await += 1;
1150            }
1151        }
1152
1153        let mut ex = LocalPool::new();
1154        let spawner = ex.spawner();
1155
1156        let mu = Rc::new(RwLock::new(0usize));
1157        for _ in 0..TASKS {
1158            spawner
1159                .spawn_local(increment(Rc::clone(&mu)))
1160                .expect("Failed to spawn task");
1161        }
1162
1163        ex.run();
1164
1165        assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1166        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1167    }
1168
1169    #[test]
1170    fn multi_thread_async() {
1171        const TASKS: usize = 17;
1172        const ITERATIONS: usize = 103;
1173
1174        // Async closures are unstable.
1175        async fn increment(mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1176            for _ in 0..ITERATIONS {
1177                *mu.lock().await += 1;
1178            }
1179            tx.send(()).expect("Failed to send completion signal");
1180        }
1181
1182        let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1183
1184        let mu = Arc::new(RwLock::new(0usize));
1185        let (tx, rx) = channel();
1186        for _ in 0..TASKS {
1187            ex.spawn_ok(increment(Arc::clone(&mu), tx.clone()));
1188        }
1189
1190        for _ in 0..TASKS {
1191            rx.recv_timeout(Duration::from_secs(5))
1192                .expect("Failed to receive completion signal");
1193        }
1194        assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1195        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1196    }
1197
1198    #[test]
1199    fn get_mut() {
1200        let mut mu = RwLock::new(NonCopy(13));
1201        *mu.get_mut() = NonCopy(17);
1202
1203        assert_eq!(mu.into_inner(), NonCopy(17));
1204    }
1205
1206    #[test]
1207    fn into_inner() {
1208        let mu = RwLock::new(NonCopy(29));
1209        assert_eq!(mu.into_inner(), NonCopy(29));
1210    }
1211
1212    #[test]
1213    fn into_inner_drop() {
1214        struct NeedsDrop(Arc<AtomicUsize>);
1215        impl Drop for NeedsDrop {
1216            fn drop(&mut self) {
1217                self.0.fetch_add(1, Ordering::AcqRel);
1218            }
1219        }
1220
1221        let value = Arc::new(AtomicUsize::new(0));
1222        let needs_drop = RwLock::new(NeedsDrop(value.clone()));
1223        assert_eq!(value.load(Ordering::Acquire), 0);
1224
1225        {
1226            let inner = needs_drop.into_inner();
1227            assert_eq!(inner.0.load(Ordering::Acquire), 0);
1228        }
1229
1230        assert_eq!(value.load(Ordering::Acquire), 1);
1231    }
1232
1233    #[test]
1234    fn rw_arc() {
1235        const THREADS: isize = 7;
1236        const ITERATIONS: isize = 13;
1237
1238        let mu = Arc::new(RwLock::new(0isize));
1239        let mu2 = mu.clone();
1240
1241        let (tx, rx) = channel();
1242        thread::spawn(move || {
1243            let mut guard = block_on(mu2.lock());
1244            for _ in 0..ITERATIONS {
1245                let tmp = *guard;
1246                *guard = -1;
1247                thread::yield_now();
1248                *guard = tmp + 1;
1249            }
1250            tx.send(()).unwrap();
1251        });
1252
1253        let mut readers = Vec::with_capacity(10);
1254        for _ in 0..THREADS {
1255            let mu3 = mu.clone();
1256            let handle = thread::spawn(move || {
1257                let guard = block_on(mu3.read_lock());
1258                assert!(*guard >= 0);
1259            });
1260
1261            readers.push(handle);
1262        }
1263
1264        // Wait for the readers to finish their checks.
1265        for r in readers {
1266            r.join().expect("One or more readers saw a negative value");
1267        }
1268
1269        // Wait for the writer to finish.
1270        rx.recv_timeout(Duration::from_secs(5)).unwrap();
1271        assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1272        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1273    }
1274
1275    #[test]
1276    fn rw_single_thread_async() {
1277        // A Future that returns `Poll::pending` the first time it is polled and `Poll::Ready` every
1278        // time after that.
1279        struct TestFuture {
1280            polled: bool,
1281            waker: Arc<SpinLock<Option<Waker>>>,
1282        }
1283
1284        impl Future for TestFuture {
1285            type Output = ();
1286
1287            fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1288                if self.polled {
1289                    Poll::Ready(())
1290                } else {
1291                    self.polled = true;
1292                    *self.waker.lock() = Some(cx.waker().clone());
1293                    Poll::Pending
1294                }
1295            }
1296        }
1297
1298        fn wake_future(waker: Arc<SpinLock<Option<Waker>>>) {
1299            loop {
1300                if let Some(w) = waker.lock().take() {
1301                    w.wake();
1302                    return;
1303                }
1304
1305                // This sleep cannot be moved into an else branch because we would end up holding
1306                // the lock while sleeping due to rust's drop ordering rules.
1307                thread::sleep(Duration::from_millis(10));
1308            }
1309        }
1310
1311        async fn writer(mu: Rc<RwLock<isize>>) {
1312            let mut guard = mu.lock().await;
1313            for _ in 0..ITERATIONS {
1314                let tmp = *guard;
1315                *guard = -1;
1316                let waker = Arc::new(SpinLock::new(None));
1317                let waker2 = Arc::clone(&waker);
1318                thread::spawn(move || wake_future(waker2));
1319                let fut = TestFuture {
1320                    polled: false,
1321                    waker,
1322                };
1323                fut.await;
1324                *guard = tmp + 1;
1325            }
1326        }
1327
1328        async fn reader(mu: Rc<RwLock<isize>>) {
1329            let guard = mu.read_lock().await;
1330            assert!(*guard >= 0);
1331        }
1332
1333        const TASKS: isize = 7;
1334        const ITERATIONS: isize = 13;
1335
1336        let mu = Rc::new(RwLock::new(0isize));
1337        let mut ex = LocalPool::new();
1338        let spawner = ex.spawner();
1339
1340        spawner
1341            .spawn_local(writer(Rc::clone(&mu)))
1342            .expect("Failed to spawn writer");
1343
1344        for _ in 0..TASKS {
1345            spawner
1346                .spawn_local(reader(Rc::clone(&mu)))
1347                .expect("Failed to spawn reader");
1348        }
1349
1350        ex.run();
1351
1352        assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1353        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1354    }
1355
1356    #[test]
1357    fn rw_multi_thread_async() {
1358        async fn writer(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1359            let mut guard = mu.lock().await;
1360            for _ in 0..ITERATIONS {
1361                let tmp = *guard;
1362                *guard = -1;
1363                thread::yield_now();
1364                *guard = tmp + 1;
1365            }
1366
1367            mem::drop(guard);
1368            tx.send(()).unwrap();
1369        }
1370
1371        async fn reader(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1372            let guard = mu.read_lock().await;
1373            assert!(*guard >= 0);
1374
1375            mem::drop(guard);
1376            tx.send(()).expect("Failed to send completion message");
1377        }
1378
1379        const TASKS: isize = 7;
1380        const ITERATIONS: isize = 13;
1381
1382        let mu = Arc::new(RwLock::new(0isize));
1383        let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1384
1385        let (txw, rxw) = channel();
1386        ex.spawn_ok(writer(Arc::clone(&mu), txw));
1387
1388        let (txr, rxr) = channel();
1389        for _ in 0..TASKS {
1390            ex.spawn_ok(reader(Arc::clone(&mu), txr.clone()));
1391        }
1392
1393        // Wait for the readers to finish their checks.
1394        for _ in 0..TASKS {
1395            rxr.recv_timeout(Duration::from_secs(5))
1396                .expect("Failed to receive completion message from reader");
1397        }
1398
1399        // Wait for the writer to finish.
1400        rxw.recv_timeout(Duration::from_secs(5))
1401            .expect("Failed to receive completion message from writer");
1402
1403        assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1404        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1405    }
1406
1407    #[test]
1408    fn wake_all_readers() {
1409        async fn read(mu: Arc<RwLock<()>>) {
1410            let g = mu.read_lock().await;
1411            pending!();
1412            mem::drop(g);
1413        }
1414
1415        async fn write(mu: Arc<RwLock<()>>) {
1416            mu.lock().await;
1417        }
1418
1419        let mu = Arc::new(RwLock::new(()));
1420        let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
1421            Box::pin(read(mu.clone())),
1422            Box::pin(read(mu.clone())),
1423            Box::pin(read(mu.clone())),
1424            Box::pin(write(mu.clone())),
1425            Box::pin(read(mu.clone())),
1426        ];
1427        const NUM_READERS: usize = 4;
1428
1429        let arc_waker = Arc::new(TestWaker);
1430        let waker = waker_ref(&arc_waker);
1431        let mut cx = Context::from_waker(&waker);
1432
1433        // Acquire the lock so that the futures cannot get it.
1434        let g = block_on(mu.lock());
1435
1436        for r in &mut futures {
1437            if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1438                panic!("future unexpectedly ready");
1439            }
1440        }
1441
1442        assert_eq!(
1443            mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1444            HAS_WAITERS
1445        );
1446
1447        assert_eq!(
1448            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1449            WRITER_WAITING
1450        );
1451
1452        // Drop the lock. This should allow all readers to make progress. Since they already waited
1453        // once they should ignore the WRITER_WAITING bit that is currently set.
1454        mem::drop(g);
1455        for r in &mut futures {
1456            if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1457                panic!("future unexpectedly ready");
1458            }
1459        }
1460
1461        // Check that all readers were able to acquire the lock.
1462        assert_eq!(
1463            mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1464            READ_LOCK * NUM_READERS
1465        );
1466        assert_eq!(
1467            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1468            WRITER_WAITING
1469        );
1470
1471        let mut needs_poll = None;
1472
1473        // All the readers can now finish but the writer needs to be polled again.
1474        for (i, r) in futures.iter_mut().enumerate() {
1475            match r.as_mut().poll(&mut cx) {
1476                Poll::Ready(()) => {}
1477                Poll::Pending => {
1478                    if needs_poll.is_some() {
1479                        panic!("More than one future unable to complete");
1480                    }
1481                    needs_poll = Some(i);
1482                }
1483            }
1484        }
1485
1486        if futures[needs_poll.expect("Writer unexpectedly able to complete")]
1487            .as_mut()
1488            .poll(&mut cx)
1489            .is_pending()
1490        {
1491            panic!("Writer unable to complete");
1492        }
1493
1494        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1495    }
1496
1497    #[test]
1498    fn long_wait() {
1499        async fn tight_loop(mu: Arc<RwLock<bool>>) {
1500            loop {
1501                let ready = mu.lock().await;
1502                if *ready {
1503                    break;
1504                }
1505                pending!();
1506            }
1507        }
1508
1509        async fn mark_ready(mu: Arc<RwLock<bool>>) {
1510            *mu.lock().await = true;
1511        }
1512
1513        let mu = Arc::new(RwLock::new(false));
1514        let mut tl = Box::pin(tight_loop(mu.clone()));
1515        let mut mark = Box::pin(mark_ready(mu.clone()));
1516
1517        let arc_waker = Arc::new(TestWaker);
1518        let waker = waker_ref(&arc_waker);
1519        let mut cx = Context::from_waker(&waker);
1520
1521        for _ in 0..=LONG_WAIT_THRESHOLD {
1522            if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1523                panic!("tight_loop unexpectedly ready");
1524            }
1525
1526            if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1527                panic!("mark_ready unexpectedly ready");
1528            }
1529        }
1530
1531        assert_eq!(
1532            mu.raw.state.load(Ordering::Relaxed),
1533            LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1534        );
1535
1536        // This time the tight loop will fail to acquire the lock.
1537        if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1538            panic!("tight_loop unexpectedly ready");
1539        }
1540
1541        // Which will finally allow the mark_ready function to make progress.
1542        if mark.as_mut().poll(&mut cx).is_pending() {
1543            panic!("mark_ready not able to make progress");
1544        }
1545
1546        // Now the tight loop will finish.
1547        if tl.as_mut().poll(&mut cx).is_pending() {
1548            panic!("tight_loop not able to finish");
1549        }
1550
1551        assert!(*block_on(mu.lock()));
1552        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1553    }
1554
1555    #[test]
1556    fn cancel_long_wait_before_wake() {
1557        async fn tight_loop(mu: Arc<RwLock<bool>>) {
1558            loop {
1559                let ready = mu.lock().await;
1560                if *ready {
1561                    break;
1562                }
1563                pending!();
1564            }
1565        }
1566
1567        async fn mark_ready(mu: Arc<RwLock<bool>>) {
1568            *mu.lock().await = true;
1569        }
1570
1571        let mu = Arc::new(RwLock::new(false));
1572        let mut tl = Box::pin(tight_loop(mu.clone()));
1573        let mut mark = Box::pin(mark_ready(mu.clone()));
1574
1575        let arc_waker = Arc::new(TestWaker);
1576        let waker = waker_ref(&arc_waker);
1577        let mut cx = Context::from_waker(&waker);
1578
1579        for _ in 0..=LONG_WAIT_THRESHOLD {
1580            if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1581                panic!("tight_loop unexpectedly ready");
1582            }
1583
1584            if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1585                panic!("mark_ready unexpectedly ready");
1586            }
1587        }
1588
1589        assert_eq!(
1590            mu.raw.state.load(Ordering::Relaxed),
1591            LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1592        );
1593
1594        // Now drop the mark_ready future, which should clear the LONG_WAIT bit.
1595        mem::drop(mark);
1596        assert_eq!(mu.raw.state.load(Ordering::Relaxed), LOCKED);
1597
1598        mem::drop(tl);
1599        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1600    }
1601
1602    #[test]
1603    fn cancel_long_wait_after_wake() {
1604        async fn tight_loop(mu: Arc<RwLock<bool>>) {
1605            loop {
1606                let ready = mu.lock().await;
1607                if *ready {
1608                    break;
1609                }
1610                pending!();
1611            }
1612        }
1613
1614        async fn mark_ready(mu: Arc<RwLock<bool>>) {
1615            *mu.lock().await = true;
1616        }
1617
1618        let mu = Arc::new(RwLock::new(false));
1619        let mut tl = Box::pin(tight_loop(mu.clone()));
1620        let mut mark = Box::pin(mark_ready(mu.clone()));
1621
1622        let arc_waker = Arc::new(TestWaker);
1623        let waker = waker_ref(&arc_waker);
1624        let mut cx = Context::from_waker(&waker);
1625
1626        for _ in 0..=LONG_WAIT_THRESHOLD {
1627            if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1628                panic!("tight_loop unexpectedly ready");
1629            }
1630
1631            if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1632                panic!("mark_ready unexpectedly ready");
1633            }
1634        }
1635
1636        assert_eq!(
1637            mu.raw.state.load(Ordering::Relaxed),
1638            LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1639        );
1640
1641        // This time the tight loop will fail to acquire the lock.
1642        if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1643            panic!("tight_loop unexpectedly ready");
1644        }
1645
1646        // Now drop the mark_ready future, which should clear the LONG_WAIT bit.
1647        mem::drop(mark);
1648        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & LONG_WAIT, 0);
1649
1650        // Since the lock is not held, we should be able to spawn a future to set the ready flag.
1651        block_on(mark_ready(mu.clone()));
1652
1653        // Now the tight loop will finish.
1654        if tl.as_mut().poll(&mut cx).is_pending() {
1655            panic!("tight_loop not able to finish");
1656        }
1657
1658        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1659    }
1660
1661    #[test]
1662    fn designated_waker() {
1663        async fn inc(mu: Arc<RwLock<usize>>) {
1664            *mu.lock().await += 1;
1665        }
1666
1667        let mu = Arc::new(RwLock::new(0));
1668
1669        let mut futures = [
1670            Box::pin(inc(mu.clone())),
1671            Box::pin(inc(mu.clone())),
1672            Box::pin(inc(mu.clone())),
1673        ];
1674
1675        let arc_waker = Arc::new(TestWaker);
1676        let waker = waker_ref(&arc_waker);
1677        let mut cx = Context::from_waker(&waker);
1678
1679        let count = block_on(mu.lock());
1680
1681        // Poll 2 futures. Since neither will be able to acquire the lock, they should get added to
1682        // the waiter list.
1683        if let Poll::Ready(()) = futures[0].as_mut().poll(&mut cx) {
1684            panic!("future unexpectedly ready");
1685        }
1686        if let Poll::Ready(()) = futures[1].as_mut().poll(&mut cx) {
1687            panic!("future unexpectedly ready");
1688        }
1689
1690        assert_eq!(
1691            mu.raw.state.load(Ordering::Relaxed),
1692            LOCKED | HAS_WAITERS | WRITER_WAITING,
1693        );
1694
1695        // Now drop the lock. This should set the DESIGNATED_WAKER bit and wake up the first future
1696        // in the wait list.
1697        mem::drop(count);
1698
1699        assert_eq!(
1700            mu.raw.state.load(Ordering::Relaxed),
1701            DESIGNATED_WAKER | HAS_WAITERS | WRITER_WAITING,
1702        );
1703
1704        // Now poll the third future.  It should be able to acquire the lock immediately.
1705        if futures[2].as_mut().poll(&mut cx).is_pending() {
1706            panic!("future unable to complete");
1707        }
1708        assert_eq!(*block_on(mu.lock()), 1);
1709
1710        // There should still be a waiter in the wait list and the DESIGNATED_WAKER bit should still
1711        // be set.
1712        assert_eq!(
1713            mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1714            DESIGNATED_WAKER
1715        );
1716        assert_eq!(
1717            mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1718            HAS_WAITERS
1719        );
1720
1721        // Now let the future that was woken up run.
1722        if futures[0].as_mut().poll(&mut cx).is_pending() {
1723            panic!("future unable to complete");
1724        }
1725        assert_eq!(*block_on(mu.lock()), 2);
1726
1727        if futures[1].as_mut().poll(&mut cx).is_pending() {
1728            panic!("future unable to complete");
1729        }
1730        assert_eq!(*block_on(mu.lock()), 3);
1731
1732        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1733    }
1734
1735    #[test]
1736    fn cancel_designated_waker() {
1737        async fn inc(mu: Arc<RwLock<usize>>) {
1738            *mu.lock().await += 1;
1739        }
1740
1741        let mu = Arc::new(RwLock::new(0));
1742
1743        let mut fut = Box::pin(inc(mu.clone()));
1744
1745        let arc_waker = Arc::new(TestWaker);
1746        let waker = waker_ref(&arc_waker);
1747        let mut cx = Context::from_waker(&waker);
1748
1749        let count = block_on(mu.lock());
1750
1751        if let Poll::Ready(()) = fut.as_mut().poll(&mut cx) {
1752            panic!("Future unexpectedly ready when lock is held");
1753        }
1754
1755        // Drop the lock.  This will wake up the future.
1756        mem::drop(count);
1757
1758        // Now drop the future without polling. This should clear all the state in the rwlock.
1759        mem::drop(fut);
1760
1761        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1762    }
1763
1764    #[test]
1765    fn cancel_before_wake() {
1766        async fn inc(mu: Arc<RwLock<usize>>) {
1767            *mu.lock().await += 1;
1768        }
1769
1770        let mu = Arc::new(RwLock::new(0));
1771
1772        let mut fut1 = Box::pin(inc(mu.clone()));
1773
1774        let mut fut2 = Box::pin(inc(mu.clone()));
1775
1776        let arc_waker = Arc::new(TestWaker);
1777        let waker = waker_ref(&arc_waker);
1778        let mut cx = Context::from_waker(&waker);
1779
1780        // First acquire the lock.
1781        let count = block_on(mu.lock());
1782
1783        // Now poll the futures. Since the lock is acquired they will both get queued in the waiter
1784        // list.
1785        match fut1.as_mut().poll(&mut cx) {
1786            Poll::Pending => {}
1787            Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1788        }
1789
1790        match fut2.as_mut().poll(&mut cx) {
1791            Poll::Pending => {}
1792            Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1793        }
1794
1795        assert_eq!(
1796            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1797            WRITER_WAITING
1798        );
1799
1800        // Drop fut1.  This should remove it from the waiter list but shouldn't wake fut2.
1801        mem::drop(fut1);
1802
1803        // There should be no designated waker.
1804        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER, 0);
1805
1806        // Since the waiter was a writer, we should clear the WRITER_WAITING bit.
1807        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1808
1809        match fut2.as_mut().poll(&mut cx) {
1810            Poll::Pending => {}
1811            Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1812        }
1813
1814        // Now drop the lock.  This should mark fut2 as ready to make progress.
1815        mem::drop(count);
1816
1817        match fut2.as_mut().poll(&mut cx) {
1818            Poll::Pending => panic!("Future is not ready to make progress"),
1819            Poll::Ready(()) => {}
1820        }
1821
1822        // Verify that we only incremented the count once.
1823        assert_eq!(*block_on(mu.lock()), 1);
1824        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1825    }
1826
1827    #[test]
1828    fn cancel_after_wake() {
1829        async fn inc(mu: Arc<RwLock<usize>>) {
1830            *mu.lock().await += 1;
1831        }
1832
1833        let mu = Arc::new(RwLock::new(0));
1834
1835        let mut fut1 = Box::pin(inc(mu.clone()));
1836
1837        let mut fut2 = Box::pin(inc(mu.clone()));
1838
1839        let arc_waker = Arc::new(TestWaker);
1840        let waker = waker_ref(&arc_waker);
1841        let mut cx = Context::from_waker(&waker);
1842
1843        // First acquire the lock.
1844        let count = block_on(mu.lock());
1845
1846        // Now poll the futures. Since the lock is acquired they will both get queued in the waiter
1847        // list.
1848        match fut1.as_mut().poll(&mut cx) {
1849            Poll::Pending => {}
1850            Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1851        }
1852
1853        match fut2.as_mut().poll(&mut cx) {
1854            Poll::Pending => {}
1855            Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1856        }
1857
1858        assert_eq!(
1859            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1860            WRITER_WAITING
1861        );
1862
1863        // Drop the lock.  This should mark fut1 as ready to make progress.
1864        mem::drop(count);
1865
1866        // Now drop fut1.  This should make fut2 ready to make progress.
1867        mem::drop(fut1);
1868
1869        // Since there was still another waiter in the list we shouldn't have cleared the
1870        // DESIGNATED_WAKER bit.
1871        assert_eq!(
1872            mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1873            DESIGNATED_WAKER
1874        );
1875
1876        // Since the waiter was a writer, we should clear the WRITER_WAITING bit.
1877        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1878
1879        match fut2.as_mut().poll(&mut cx) {
1880            Poll::Pending => panic!("Future is not ready to make progress"),
1881            Poll::Ready(()) => {}
1882        }
1883
1884        // Verify that we only incremented the count once.
1885        assert_eq!(*block_on(mu.lock()), 1);
1886        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1887    }
1888
1889    #[test]
1890    fn timeout() {
1891        async fn timed_lock(timer: oneshot::Receiver<()>, mu: Arc<RwLock<()>>) {
1892            select! {
1893                res = timer.fuse() => {
1894                    match res {
1895                        Ok(()) => {},
1896                        Err(e) => panic!("Timer unexpectedly canceled: {e}"),
1897                    }
1898                }
1899                _ = mu.lock().fuse() => panic!("Successfuly acquired lock"),
1900            }
1901        }
1902
1903        let mu = Arc::new(RwLock::new(()));
1904        let (tx, rx) = oneshot::channel();
1905
1906        let mut timeout = Box::pin(timed_lock(rx, mu.clone()));
1907
1908        let arc_waker = Arc::new(TestWaker);
1909        let waker = waker_ref(&arc_waker);
1910        let mut cx = Context::from_waker(&waker);
1911
1912        // Acquire the lock.
1913        let g = block_on(mu.lock());
1914
1915        // Poll the future.
1916        if let Poll::Ready(()) = timeout.as_mut().poll(&mut cx) {
1917            panic!("timed_lock unexpectedly ready");
1918        }
1919
1920        assert_eq!(
1921            mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1922            HAS_WAITERS
1923        );
1924
1925        // Signal the channel, which should cancel the lock.
1926        tx.send(()).expect("Failed to send wakeup");
1927
1928        // Now the future should have completed without acquiring the lock.
1929        if timeout.as_mut().poll(&mut cx).is_pending() {
1930            panic!("timed_lock not ready after timeout");
1931        }
1932
1933        // The rwlock state should not show any waiters.
1934        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
1935
1936        mem::drop(g);
1937
1938        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1939    }
1940
1941    #[test]
1942    fn writer_waiting() {
1943        async fn read_zero(mu: Arc<RwLock<usize>>) {
1944            let val = mu.read_lock().await;
1945            pending!();
1946
1947            assert_eq!(*val, 0);
1948        }
1949
1950        async fn inc(mu: Arc<RwLock<usize>>) {
1951            *mu.lock().await += 1;
1952        }
1953
1954        async fn read_one(mu: Arc<RwLock<usize>>) {
1955            let val = mu.read_lock().await;
1956
1957            assert_eq!(*val, 1);
1958        }
1959
1960        let mu = Arc::new(RwLock::new(0));
1961
1962        let mut r1 = Box::pin(read_zero(mu.clone()));
1963        let mut r2 = Box::pin(read_zero(mu.clone()));
1964
1965        let mut w = Box::pin(inc(mu.clone()));
1966        let mut r3 = Box::pin(read_one(mu.clone()));
1967
1968        let arc_waker = Arc::new(TestWaker);
1969        let waker = waker_ref(&arc_waker);
1970        let mut cx = Context::from_waker(&waker);
1971
1972        if let Poll::Ready(()) = r1.as_mut().poll(&mut cx) {
1973            panic!("read_zero unexpectedly ready");
1974        }
1975        if let Poll::Ready(()) = r2.as_mut().poll(&mut cx) {
1976            panic!("read_zero unexpectedly ready");
1977        }
1978        assert_eq!(
1979            mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1980            2 * READ_LOCK
1981        );
1982
1983        if let Poll::Ready(()) = w.as_mut().poll(&mut cx) {
1984            panic!("inc unexpectedly ready");
1985        }
1986        assert_eq!(
1987            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1988            WRITER_WAITING
1989        );
1990
1991        // The WRITER_WAITING bit should prevent the next reader from acquiring the lock.
1992        if let Poll::Ready(()) = r3.as_mut().poll(&mut cx) {
1993            panic!("read_one unexpectedly ready");
1994        }
1995        assert_eq!(
1996            mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1997            2 * READ_LOCK
1998        );
1999
2000        if r1.as_mut().poll(&mut cx).is_pending() {
2001            panic!("read_zero unable to complete");
2002        }
2003        if r2.as_mut().poll(&mut cx).is_pending() {
2004            panic!("read_zero unable to complete");
2005        }
2006        if w.as_mut().poll(&mut cx).is_pending() {
2007            panic!("inc unable to complete");
2008        }
2009        if r3.as_mut().poll(&mut cx).is_pending() {
2010            panic!("read_one unable to complete");
2011        }
2012
2013        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2014    }
2015
2016    #[test]
2017    fn notify_one() {
2018        async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2019            let mut count = mu.read_lock().await;
2020            while *count == 0 {
2021                count = cv.wait_read(count).await;
2022            }
2023        }
2024
2025        async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2026            let mut count = mu.lock().await;
2027            while *count == 0 {
2028                count = cv.wait(count).await;
2029            }
2030
2031            *count -= 1;
2032        }
2033
2034        let mu = Arc::new(RwLock::new(0));
2035        let cv = Arc::new(Condvar::new());
2036
2037        let arc_waker = Arc::new(TestWaker);
2038        let waker = waker_ref(&arc_waker);
2039        let mut cx = Context::from_waker(&waker);
2040
2041        let mut readers = [
2042            Box::pin(read(mu.clone(), cv.clone())),
2043            Box::pin(read(mu.clone(), cv.clone())),
2044            Box::pin(read(mu.clone(), cv.clone())),
2045            Box::pin(read(mu.clone(), cv.clone())),
2046        ];
2047        let mut writer = Box::pin(write(mu.clone(), cv.clone()));
2048
2049        for r in &mut readers {
2050            if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
2051                panic!("reader unexpectedly ready");
2052            }
2053        }
2054        if let Poll::Ready(()) = writer.as_mut().poll(&mut cx) {
2055            panic!("writer unexpectedly ready");
2056        }
2057
2058        let mut count = block_on(mu.lock());
2059        *count = 1;
2060
2061        // This should wake all readers + one writer.
2062        cv.notify_one();
2063
2064        // Poll the readers and the writer so they add themselves to the rwlock's waiter list.
2065        for r in &mut readers {
2066            if r.as_mut().poll(&mut cx).is_ready() {
2067                panic!("reader unexpectedly ready");
2068            }
2069        }
2070
2071        if writer.as_mut().poll(&mut cx).is_ready() {
2072            panic!("writer unexpectedly ready");
2073        }
2074
2075        assert_eq!(
2076            mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
2077            HAS_WAITERS
2078        );
2079        assert_eq!(
2080            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
2081            WRITER_WAITING
2082        );
2083
2084        mem::drop(count);
2085
2086        assert_eq!(
2087            mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2088            HAS_WAITERS | WRITER_WAITING
2089        );
2090
2091        for r in &mut readers {
2092            if r.as_mut().poll(&mut cx).is_pending() {
2093                panic!("reader unable to complete");
2094            }
2095        }
2096
2097        if writer.as_mut().poll(&mut cx).is_pending() {
2098            panic!("writer unable to complete");
2099        }
2100
2101        assert_eq!(*block_on(mu.read_lock()), 0);
2102    }
2103
2104    #[test]
2105    fn notify_when_unlocked() {
2106        async fn dec(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2107            let mut count = mu.lock().await;
2108
2109            while *count == 0 {
2110                count = cv.wait(count).await;
2111            }
2112
2113            *count -= 1;
2114        }
2115
2116        let mu = Arc::new(RwLock::new(0));
2117        let cv = Arc::new(Condvar::new());
2118
2119        let arc_waker = Arc::new(TestWaker);
2120        let waker = waker_ref(&arc_waker);
2121        let mut cx = Context::from_waker(&waker);
2122
2123        let mut futures = [
2124            Box::pin(dec(mu.clone(), cv.clone())),
2125            Box::pin(dec(mu.clone(), cv.clone())),
2126            Box::pin(dec(mu.clone(), cv.clone())),
2127            Box::pin(dec(mu.clone(), cv.clone())),
2128        ];
2129
2130        for f in &mut futures {
2131            if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2132                panic!("future unexpectedly ready");
2133            }
2134        }
2135
2136        *block_on(mu.lock()) = futures.len();
2137        cv.notify_all();
2138
2139        // Since we haven't polled `futures` yet, the rwlock should not have any waiters.
2140        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2141
2142        for f in &mut futures {
2143            if f.as_mut().poll(&mut cx).is_pending() {
2144                panic!("future unexpectedly ready");
2145            }
2146        }
2147        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2148    }
2149
2150    #[test]
2151    fn notify_reader_writer() {
2152        async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2153            let mut count = mu.read_lock().await;
2154            while *count == 0 {
2155                count = cv.wait_read(count).await;
2156            }
2157
2158            // Yield once while holding the read lock, which should prevent the writer from waking
2159            // up.
2160            pending!();
2161        }
2162
2163        async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2164            let mut count = mu.lock().await;
2165            while *count == 0 {
2166                count = cv.wait(count).await;
2167            }
2168
2169            *count -= 1;
2170        }
2171
2172        async fn lock(mu: Arc<RwLock<usize>>) {
2173            mem::drop(mu.lock().await);
2174        }
2175
2176        let mu = Arc::new(RwLock::new(0));
2177        let cv = Arc::new(Condvar::new());
2178
2179        let arc_waker = Arc::new(TestWaker);
2180        let waker = waker_ref(&arc_waker);
2181        let mut cx = Context::from_waker(&waker);
2182
2183        let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
2184            Box::pin(read(mu.clone(), cv.clone())),
2185            Box::pin(read(mu.clone(), cv.clone())),
2186            Box::pin(read(mu.clone(), cv.clone())),
2187            Box::pin(write(mu.clone(), cv.clone())),
2188            Box::pin(read(mu.clone(), cv.clone())),
2189        ];
2190        const NUM_READERS: usize = 4;
2191
2192        let mut l = Box::pin(lock(mu.clone()));
2193
2194        for f in &mut futures {
2195            if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2196                panic!("future unexpectedly ready");
2197            }
2198        }
2199
2200        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2201
2202        let mut count = block_on(mu.lock());
2203        *count = 1;
2204
2205        // Now poll the lock function. Since the lock is held by us, it will get queued on the
2206        // waiter list.
2207        if let Poll::Ready(()) = l.as_mut().poll(&mut cx) {
2208            panic!("lock() unexpectedly ready");
2209        }
2210
2211        assert_eq!(
2212            mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2213            HAS_WAITERS | WRITER_WAITING
2214        );
2215
2216        // Wake up waiters while holding the lock.
2217        cv.notify_all();
2218
2219        // Drop the lock.  This should wake up the lock function.
2220        mem::drop(count);
2221
2222        if l.as_mut().poll(&mut cx).is_pending() {
2223            panic!("lock() unable to complete");
2224        }
2225
2226        // Since we haven't polled `futures` yet, the rwlock state should now be empty.
2227        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2228
2229        // Poll everything again. The readers should be able to make progress (but not complete) but
2230        // the writer should be blocked.
2231        for f in &mut futures {
2232            if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2233                panic!("future unexpectedly ready");
2234            }
2235        }
2236
2237        assert_eq!(
2238            mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2239            READ_LOCK * NUM_READERS
2240        );
2241
2242        // All the readers can now finish but the writer needs to be polled again.
2243        let mut needs_poll = None;
2244        for (i, r) in futures.iter_mut().enumerate() {
2245            match r.as_mut().poll(&mut cx) {
2246                Poll::Ready(()) => {}
2247                Poll::Pending => {
2248                    if needs_poll.is_some() {
2249                        panic!("More than one future unable to complete");
2250                    }
2251                    needs_poll = Some(i);
2252                }
2253            }
2254        }
2255
2256        if futures[needs_poll.expect("Writer unexpectedly able to complete")]
2257            .as_mut()
2258            .poll(&mut cx)
2259            .is_pending()
2260        {
2261            panic!("Writer unable to complete");
2262        }
2263
2264        assert_eq!(*block_on(mu.lock()), 0);
2265        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2266    }
2267
2268    #[test]
2269    fn notify_readers_with_read_lock() {
2270        async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2271            let mut count = mu.read_lock().await;
2272            while *count == 0 {
2273                count = cv.wait_read(count).await;
2274            }
2275
2276            // Yield once while holding the read lock.
2277            pending!();
2278        }
2279
2280        let mu = Arc::new(RwLock::new(0));
2281        let cv = Arc::new(Condvar::new());
2282
2283        let arc_waker = Arc::new(TestWaker);
2284        let waker = waker_ref(&arc_waker);
2285        let mut cx = Context::from_waker(&waker);
2286
2287        let mut futures = [
2288            Box::pin(read(mu.clone(), cv.clone())),
2289            Box::pin(read(mu.clone(), cv.clone())),
2290            Box::pin(read(mu.clone(), cv.clone())),
2291            Box::pin(read(mu.clone(), cv.clone())),
2292        ];
2293
2294        for f in &mut futures {
2295            if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2296                panic!("future unexpectedly ready");
2297            }
2298        }
2299
2300        // Increment the count and then grab a read lock.
2301        *block_on(mu.lock()) = 1;
2302
2303        let g = block_on(mu.read_lock());
2304
2305        // Notify the condvar while holding the read lock. This should wake up all the waiters.
2306        cv.notify_all();
2307
2308        // Since the lock is held in shared mode, all the readers should immediately be able to
2309        // acquire the read lock.
2310        for f in &mut futures {
2311            if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2312                panic!("future unexpectedly ready");
2313            }
2314        }
2315        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2316        assert_eq!(
2317            mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2318            READ_LOCK * (futures.len() + 1)
2319        );
2320
2321        mem::drop(g);
2322
2323        for f in &mut futures {
2324            if f.as_mut().poll(&mut cx).is_pending() {
2325                panic!("future unable to complete");
2326            }
2327        }
2328
2329        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2330    }
2331}