cros_async/sync/
waiter.rs1use std::cell::UnsafeCell;
6use std::future::Future;
7use std::mem;
8use std::pin::Pin;
9use std::ptr::NonNull;
10use std::sync::atomic::AtomicBool;
11use std::sync::atomic::AtomicU8;
12use std::sync::atomic::Ordering;
13use std::sync::Arc;
14use std::task::Context;
15use std::task::Poll;
16use std::task::Waker;
17
18use intrusive_collections::intrusive_adapter;
19use intrusive_collections::linked_list::LinkedList;
20use intrusive_collections::linked_list::LinkedListOps;
21use intrusive_collections::DefaultLinkOps;
22use intrusive_collections::LinkOps;
23
24use super::super::sync::SpinLock;
25
26#[repr(align(128))]
29pub struct AtomicLink {
30 prev: UnsafeCell<Option<NonNull<AtomicLink>>>,
31 next: UnsafeCell<Option<NonNull<AtomicLink>>>,
32 linked: AtomicBool,
33}
34
35impl AtomicLink {
36 fn new() -> AtomicLink {
37 AtomicLink {
38 linked: AtomicBool::new(false),
39 prev: UnsafeCell::new(None),
40 next: UnsafeCell::new(None),
41 }
42 }
43
44 fn is_linked(&self) -> bool {
45 self.linked.load(Ordering::Relaxed)
46 }
47}
48
49impl DefaultLinkOps for AtomicLink {
50 type Ops = AtomicLinkOps;
51
52 const NEW: Self::Ops = AtomicLinkOps;
53}
54
55unsafe impl Send for AtomicLink {}
60unsafe impl Sync for AtomicLink {}
62
63#[derive(Copy, Clone, Default)]
64pub struct AtomicLinkOps;
65
66#[allow(clippy::undocumented_unsafe_blocks)]
68unsafe impl LinkOps for AtomicLinkOps {
69 type LinkPtr = NonNull<AtomicLink>;
70
71 unsafe fn acquire_link(&mut self, ptr: Self::LinkPtr) -> bool {
72 !ptr.as_ref().linked.swap(true, Ordering::Acquire)
73 }
74
75 unsafe fn release_link(&mut self, ptr: Self::LinkPtr) {
76 ptr.as_ref().linked.store(false, Ordering::Release)
77 }
78}
79
80#[allow(clippy::undocumented_unsafe_blocks)]
82unsafe impl LinkedListOps for AtomicLinkOps {
83 unsafe fn next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> {
84 *ptr.as_ref().next.get()
85 }
86
87 unsafe fn prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> {
88 *ptr.as_ref().prev.get()
89 }
90
91 unsafe fn set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>) {
92 *ptr.as_ref().next.get() = next;
93 }
94
95 unsafe fn set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>) {
96 *ptr.as_ref().prev.get() = prev;
97 }
98}
99
100#[derive(Clone, Copy)]
101pub enum Kind {
102 Shared,
103 Exclusive,
104}
105
106enum State {
107 Init,
108 Waiting(Waker),
109 Woken,
110 Finished,
111 Processing,
112}
113
114#[repr(u8)]
118#[derive(Debug, Eq, PartialEq)]
119pub enum WaitingFor {
120 None = 0,
122 Mutex = 1,
124 Condvar = 2,
126}
127
128pub struct Waiter {
130 link: AtomicLink,
131 state: SpinLock<State>,
132 cancel: fn(usize, &Waiter, bool),
133 cancel_data: usize,
134 kind: Kind,
135 waiting_for: AtomicU8,
136}
137
138impl Waiter {
139 pub fn new(
158 kind: Kind,
159 cancel: fn(usize, &Waiter, bool),
160 cancel_data: usize,
161 waiting_for: WaitingFor,
162 ) -> Waiter {
163 Waiter {
164 link: AtomicLink::new(),
165 state: SpinLock::new(State::Init),
166 cancel,
167 cancel_data,
168 kind,
169 waiting_for: AtomicU8::new(waiting_for as u8),
170 }
171 }
172
173 pub fn kind(&self) -> Kind {
175 self.kind
176 }
177
178 pub fn is_linked(&self) -> bool {
180 self.link.is_linked()
181 }
182
183 pub fn is_waiting_for(&self) -> WaitingFor {
185 match self.waiting_for.load(Ordering::Acquire) {
186 0 => WaitingFor::None,
187 1 => WaitingFor::Mutex,
188 2 => WaitingFor::Condvar,
189 v => panic!("Unknown value for `WaitingFor`: {v}"),
190 }
191 }
192
193 pub fn set_waiting_for(&self, waiting_for: WaitingFor) {
196 self.waiting_for.store(waiting_for as u8, Ordering::Release);
197 }
198
199 pub fn reset(&self, waiting_for: WaitingFor) {
202 debug_assert!(!self.is_linked(), "Cannot reset `Waiter` while linked");
203 self.set_waiting_for(waiting_for);
204
205 let mut state = self.state.lock();
206 if let State::Waiting(waker) = mem::replace(&mut *state, State::Init) {
207 mem::drop(state);
208 mem::drop(waker);
209 }
210 }
211
212 pub fn wait(&self) -> WaitFuture<'_> {
214 WaitFuture { waiter: self }
215 }
216
217 pub fn wake(&self) {
220 debug_assert!(!self.is_linked(), "Cannot wake `Waiter` while linked");
221 debug_assert_eq!(self.is_waiting_for(), WaitingFor::None);
222
223 let mut state = self.state.lock();
224
225 if let State::Waiting(waker) = mem::replace(&mut *state, State::Woken) {
226 mem::drop(state);
227 waker.wake();
228 }
229 }
230}
231
232pub struct WaitFuture<'w> {
233 waiter: &'w Waiter,
234}
235
236impl Future for WaitFuture<'_> {
237 type Output = ();
238
239 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
240 let mut state = self.waiter.state.lock();
241
242 match mem::replace(&mut *state, State::Processing) {
243 State::Init => {
244 *state = State::Waiting(cx.waker().clone());
245
246 Poll::Pending
247 }
248 State::Waiting(old_waker) => {
249 *state = State::Waiting(cx.waker().clone());
250 mem::drop(state);
251 mem::drop(old_waker);
252
253 Poll::Pending
254 }
255 State::Woken => {
256 *state = State::Finished;
257 Poll::Ready(())
258 }
259 State::Finished => {
260 panic!("Future polled after returning Poll::Ready");
261 }
262 State::Processing => {
263 panic!("Unexpected waker state");
264 }
265 }
266 }
267}
268
269impl Drop for WaitFuture<'_> {
270 fn drop(&mut self) {
271 let state = self.waiter.state.lock();
272
273 match *state {
274 State::Finished => {}
275 State::Processing => panic!("Unexpected waker state"),
276 State::Woken => {
277 mem::drop(state);
278
279 (self.waiter.cancel)(self.waiter.cancel_data, self.waiter, true);
281 }
282 _ => {
283 mem::drop(state);
284
285 (self.waiter.cancel)(self.waiter.cancel_data, self.waiter, false);
287 }
288 }
289 }
290}
291
292intrusive_adapter!(pub WaiterAdapter = Arc<Waiter>: Waiter { link: AtomicLink });
293
294pub type WaiterList = LinkedList<WaiterAdapter>;