devices/utils/
event_loop.rs

1// Copyright 2018 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::collections::BTreeMap;
6use std::mem::drop;
7use std::sync::Arc;
8use std::sync::Weak;
9use std::thread;
10
11use base::error;
12use base::warn;
13use base::AsRawDescriptor;
14use base::Descriptor;
15use base::Event;
16use base::EventType;
17use base::WaitContext;
18use sync::Mutex;
19
20use super::error::Error;
21use super::error::Result;
22
23/// A fail handle will do the clean up when we cannot recover from some error.
24pub trait FailHandle: Send + Sync {
25    /// Fail the code.
26    fn fail(&self);
27    /// Returns true if already failed.
28    fn failed(&self) -> bool;
29}
30
31impl FailHandle for Option<Arc<dyn FailHandle>> {
32    fn fail(&self) {
33        match self {
34            Some(handle) => handle.fail(),
35            None => error!("event loop trying to fail without a fail handle"),
36        }
37    }
38
39    fn failed(&self) -> bool {
40        match self {
41            Some(handle) => handle.failed(),
42            None => false,
43        }
44    }
45}
46
47/// EventLoop is an event loop blocked on a set of fds. When a monitered events is triggered,
48/// event loop will invoke the mapped handler.
49pub struct EventLoop {
50    fail_handle: Option<Arc<dyn FailHandle>>,
51    poll_ctx: Arc<WaitContext<Descriptor>>,
52    handlers: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>>,
53    stop_evt: Event,
54}
55
56/// Interface for event handler.
57pub trait EventHandler: Send + Sync {
58    fn on_event(&self) -> anyhow::Result<()>;
59}
60
61impl EventLoop {
62    /// Start an event loop. An optional fail handle could be passed to the event loop.
63    pub fn start(
64        name: String,
65        fail_handle: Option<Arc<dyn FailHandle>>,
66    ) -> Result<(EventLoop, thread::JoinHandle<()>)> {
67        let (self_stop_evt, stop_evt) = Event::new()
68            .and_then(|e| Ok((e.try_clone()?, e)))
69            .map_err(Error::CreateEvent)?;
70
71        let fd_callbacks: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>> =
72            Arc::new(Mutex::new(BTreeMap::new()));
73        let poll_ctx: WaitContext<Descriptor> = WaitContext::new()
74            .and_then(|pc| {
75                pc.add(&stop_evt, Descriptor(stop_evt.as_raw_descriptor()))
76                    .and(Ok(pc))
77            })
78            .map_err(Error::CreateWaitContext)?;
79
80        let poll_ctx = Arc::new(poll_ctx);
81        let event_loop = EventLoop {
82            fail_handle: fail_handle.clone(),
83            poll_ctx: poll_ctx.clone(),
84            handlers: fd_callbacks.clone(),
85            stop_evt: self_stop_evt,
86        };
87
88        let handle = thread::Builder::new()
89            .name(name)
90            .spawn(move || {
91                loop {
92                    if fail_handle.failed() {
93                        error!("event loop already failed");
94                        return;
95                    }
96                    let events = match poll_ctx.wait() {
97                        Ok(events) => events,
98                        Err(e) => {
99                            error!("cannot wait on events {:?}", e);
100                            fail_handle.fail();
101                            return;
102                        }
103                    };
104                    for event in &events {
105                        let fd = event.token.as_raw_descriptor();
106                        if fd == stop_evt.as_raw_descriptor() {
107                            return;
108                        }
109
110                        let mut locked = fd_callbacks.lock();
111                        let weak_handler = match locked.get(&Descriptor(fd)) {
112                            Some(cb) => cb.clone(),
113                            None => {
114                                warn!("callback for fd {} already removed", fd);
115                                continue;
116                            }
117                        };
118
119                        // If the file descriptor is hung up, remove it after calling the handler
120                        // one final time.
121                        let mut remove = event.is_hungup;
122
123                        if let Some(handler) = weak_handler.upgrade() {
124                            // Drop lock before triggering the event.
125                            drop(locked);
126                            if let Err(e) = handler.on_event() {
127                                error!("removing event handler due to error: {:#}", e);
128                                remove = true;
129                            }
130                            locked = fd_callbacks.lock();
131                        } else {
132                            // If the handler is already gone, we remove the fd.
133                            remove = true;
134                        }
135
136                        if remove {
137                            let _ = poll_ctx.delete(&event.token);
138                            let _ = locked.remove(&Descriptor(fd));
139                        }
140                    }
141                }
142            })
143            .map_err(Error::StartThread)?;
144
145        Ok((event_loop, handle))
146    }
147
148    /// Add a new event to event loop. The event handler will be invoked when `event` happens on
149    /// `descriptor`.
150    ///
151    /// If the same `descriptor` is added multiple times, the old handler will be replaced.
152    /// EventLoop will not keep `handler` alive, if handler is dropped when `event` is triggered,
153    /// the event will be removed.
154    pub fn add_event(
155        &self,
156        descriptor: &dyn AsRawDescriptor,
157        event_type: EventType,
158        handler: Weak<dyn EventHandler>,
159    ) -> Result<()> {
160        if self.fail_handle.failed() {
161            return Err(Error::EventLoopAlreadyFailed);
162        }
163        self.handlers
164            .lock()
165            .insert(Descriptor(descriptor.as_raw_descriptor()), handler);
166        self.poll_ctx
167            .add_for_event(
168                descriptor,
169                event_type,
170                Descriptor(descriptor.as_raw_descriptor()),
171            )
172            .map_err(Error::WaitContextAddDescriptor)
173    }
174
175    /// Removes event for this `descriptor`. This function is safe to call even when the
176    /// `descriptor` is not actively being polled because it's been paused.
177    ///
178    /// EventLoop does not guarantee all events for `descriptor` is handled.
179    pub fn remove_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> {
180        if self.fail_handle.failed() {
181            return Err(Error::EventLoopAlreadyFailed);
182        }
183        self.poll_ctx
184            .delete(descriptor)
185            .map_err(Error::WaitContextDeleteDescriptor)?;
186        self.handlers
187            .lock()
188            .remove(&Descriptor(descriptor.as_raw_descriptor()));
189        Ok(())
190    }
191
192    /// Pauses polling on the given `descriptor`. It keeps a reference to the `descriptor` and its
193    /// handler so it can be resumed by calling `resume_event_for_descriptor()`.
194    pub fn pause_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> {
195        if self.fail_handle.failed() {
196            return Err(Error::EventLoopAlreadyFailed);
197        }
198        self.poll_ctx
199            .delete(descriptor)
200            .map_err(Error::WaitContextDeleteDescriptor)?;
201        Ok(())
202    }
203
204    /// Resumes polling on the given `descriptor` with the previously-provided handler. If
205    /// `descriptor` was not paused beforehand, this function does nothing. If `descriptor` does
206    /// not exist in the event loop, it returns an error.
207    /// `event_type` does not need to match the previously registered event type.
208    pub fn resume_event_for_descriptor(
209        &self,
210        descriptor: &dyn AsRawDescriptor,
211        event_type: EventType,
212    ) -> Result<()> {
213        let handler = self
214            .handlers
215            .lock()
216            .get(&Descriptor(descriptor.as_raw_descriptor()))
217            .ok_or(Error::EventLoopMissingHandler)?
218            .clone();
219        self.add_event(descriptor, event_type, handler)
220    }
221
222    /// Stops this event loop asynchronously. Previous events might not be handled.
223    pub fn stop(&self) {
224        match self.stop_evt.signal() {
225            Ok(_) => {}
226            Err(_) => {
227                error!("fail to send event loop stop event, it might already stopped");
228            }
229        }
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use std::sync::Arc;
236    use std::sync::Condvar;
237    use std::sync::Mutex;
238
239    use base::Event;
240
241    use super::*;
242
243    struct EventLoopTestHandler {
244        val: Mutex<u8>,
245        cvar: Condvar,
246        evt: Event,
247    }
248
249    impl EventHandler for EventLoopTestHandler {
250        fn on_event(&self) -> anyhow::Result<()> {
251            self.evt.wait().unwrap();
252            *self.val.lock().unwrap() += 1;
253            self.cvar.notify_one();
254            Ok(())
255        }
256    }
257
258    #[test]
259    fn event_loop_test() {
260        let (l, j) = EventLoop::start("test".to_string(), None).unwrap();
261        let (self_evt, evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) {
262            Ok(v) => v,
263            Err(e) => {
264                error!("failed creating Event pair: {:?}", e);
265                return;
266            }
267        };
268        let h = Arc::new(EventLoopTestHandler {
269            val: Mutex::new(0),
270            cvar: Condvar::new(),
271            evt,
272        });
273        let t: Arc<dyn EventHandler> = h.clone();
274        l.add_event(&h.evt, EventType::Read, Arc::downgrade(&t))
275            .unwrap();
276        self_evt.signal().unwrap();
277        {
278            let mut val = h.val.lock().unwrap();
279            while *val < 1 {
280                val = h.cvar.wait(val).unwrap();
281            }
282        }
283        l.stop();
284        j.join().unwrap();
285        assert_eq!(*(h.val.lock().unwrap()), 1);
286    }
287}