devices/utils/
event_loop.rs1use std::collections::BTreeMap;
6use std::mem::drop;
7use std::sync::Arc;
8use std::sync::Weak;
9use std::thread;
10
11use base::error;
12use base::warn;
13use base::AsRawDescriptor;
14use base::Descriptor;
15use base::Event;
16use base::EventType;
17use base::WaitContext;
18use sync::Mutex;
19
20use super::error::Error;
21use super::error::Result;
22
23pub trait FailHandle: Send + Sync {
25 fn fail(&self);
27 fn failed(&self) -> bool;
29}
30
31impl FailHandle for Option<Arc<dyn FailHandle>> {
32 fn fail(&self) {
33 match self {
34 Some(handle) => handle.fail(),
35 None => error!("event loop trying to fail without a fail handle"),
36 }
37 }
38
39 fn failed(&self) -> bool {
40 match self {
41 Some(handle) => handle.failed(),
42 None => false,
43 }
44 }
45}
46
47pub struct EventLoop {
50 fail_handle: Option<Arc<dyn FailHandle>>,
51 poll_ctx: Arc<WaitContext<Descriptor>>,
52 handlers: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>>,
53 stop_evt: Event,
54}
55
56pub trait EventHandler: Send + Sync {
58 fn on_event(&self) -> anyhow::Result<()>;
59}
60
61impl EventLoop {
62 pub fn start(
64 name: String,
65 fail_handle: Option<Arc<dyn FailHandle>>,
66 ) -> Result<(EventLoop, thread::JoinHandle<()>)> {
67 let (self_stop_evt, stop_evt) = Event::new()
68 .and_then(|e| Ok((e.try_clone()?, e)))
69 .map_err(Error::CreateEvent)?;
70
71 let fd_callbacks: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>> =
72 Arc::new(Mutex::new(BTreeMap::new()));
73 let poll_ctx: WaitContext<Descriptor> = WaitContext::new()
74 .and_then(|pc| {
75 pc.add(&stop_evt, Descriptor(stop_evt.as_raw_descriptor()))
76 .and(Ok(pc))
77 })
78 .map_err(Error::CreateWaitContext)?;
79
80 let poll_ctx = Arc::new(poll_ctx);
81 let event_loop = EventLoop {
82 fail_handle: fail_handle.clone(),
83 poll_ctx: poll_ctx.clone(),
84 handlers: fd_callbacks.clone(),
85 stop_evt: self_stop_evt,
86 };
87
88 let handle = thread::Builder::new()
89 .name(name)
90 .spawn(move || {
91 loop {
92 if fail_handle.failed() {
93 error!("event loop already failed");
94 return;
95 }
96 let events = match poll_ctx.wait() {
97 Ok(events) => events,
98 Err(e) => {
99 error!("cannot wait on events {:?}", e);
100 fail_handle.fail();
101 return;
102 }
103 };
104 for event in &events {
105 let fd = event.token.as_raw_descriptor();
106 if fd == stop_evt.as_raw_descriptor() {
107 return;
108 }
109
110 let mut locked = fd_callbacks.lock();
111 let weak_handler = match locked.get(&Descriptor(fd)) {
112 Some(cb) => cb.clone(),
113 None => {
114 warn!("callback for fd {} already removed", fd);
115 continue;
116 }
117 };
118
119 let mut remove = event.is_hungup;
122
123 if let Some(handler) = weak_handler.upgrade() {
124 drop(locked);
126 if let Err(e) = handler.on_event() {
127 error!("removing event handler due to error: {:#}", e);
128 remove = true;
129 }
130 locked = fd_callbacks.lock();
131 } else {
132 remove = true;
134 }
135
136 if remove {
137 let _ = poll_ctx.delete(&event.token);
138 let _ = locked.remove(&Descriptor(fd));
139 }
140 }
141 }
142 })
143 .map_err(Error::StartThread)?;
144
145 Ok((event_loop, handle))
146 }
147
148 pub fn add_event(
155 &self,
156 descriptor: &dyn AsRawDescriptor,
157 event_type: EventType,
158 handler: Weak<dyn EventHandler>,
159 ) -> Result<()> {
160 if self.fail_handle.failed() {
161 return Err(Error::EventLoopAlreadyFailed);
162 }
163 self.handlers
164 .lock()
165 .insert(Descriptor(descriptor.as_raw_descriptor()), handler);
166 self.poll_ctx
167 .add_for_event(
168 descriptor,
169 event_type,
170 Descriptor(descriptor.as_raw_descriptor()),
171 )
172 .map_err(Error::WaitContextAddDescriptor)
173 }
174
175 pub fn remove_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> {
180 if self.fail_handle.failed() {
181 return Err(Error::EventLoopAlreadyFailed);
182 }
183 self.poll_ctx
184 .delete(descriptor)
185 .map_err(Error::WaitContextDeleteDescriptor)?;
186 self.handlers
187 .lock()
188 .remove(&Descriptor(descriptor.as_raw_descriptor()));
189 Ok(())
190 }
191
192 pub fn pause_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> {
195 if self.fail_handle.failed() {
196 return Err(Error::EventLoopAlreadyFailed);
197 }
198 self.poll_ctx
199 .delete(descriptor)
200 .map_err(Error::WaitContextDeleteDescriptor)?;
201 Ok(())
202 }
203
204 pub fn resume_event_for_descriptor(
209 &self,
210 descriptor: &dyn AsRawDescriptor,
211 event_type: EventType,
212 ) -> Result<()> {
213 let handler = self
214 .handlers
215 .lock()
216 .get(&Descriptor(descriptor.as_raw_descriptor()))
217 .ok_or(Error::EventLoopMissingHandler)?
218 .clone();
219 self.add_event(descriptor, event_type, handler)
220 }
221
222 pub fn stop(&self) {
224 match self.stop_evt.signal() {
225 Ok(_) => {}
226 Err(_) => {
227 error!("fail to send event loop stop event, it might already stopped");
228 }
229 }
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use std::sync::Arc;
236 use std::sync::Condvar;
237 use std::sync::Mutex;
238
239 use base::Event;
240
241 use super::*;
242
243 struct EventLoopTestHandler {
244 val: Mutex<u8>,
245 cvar: Condvar,
246 evt: Event,
247 }
248
249 impl EventHandler for EventLoopTestHandler {
250 fn on_event(&self) -> anyhow::Result<()> {
251 self.evt.wait().unwrap();
252 *self.val.lock().unwrap() += 1;
253 self.cvar.notify_one();
254 Ok(())
255 }
256 }
257
258 #[test]
259 fn event_loop_test() {
260 let (l, j) = EventLoop::start("test".to_string(), None).unwrap();
261 let (self_evt, evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) {
262 Ok(v) => v,
263 Err(e) => {
264 error!("failed creating Event pair: {:?}", e);
265 return;
266 }
267 };
268 let h = Arc::new(EventLoopTestHandler {
269 val: Mutex::new(0),
270 cvar: Condvar::new(),
271 evt,
272 });
273 let t: Arc<dyn EventHandler> = h.clone();
274 l.add_event(&h.evt, EventType::Read, Arc::downgrade(&t))
275 .unwrap();
276 self_evt.signal().unwrap();
277 {
278 let mut val = h.val.lock().unwrap();
279 while *val < 1 {
280 val = h.cvar.wait(val).unwrap();
281 }
282 }
283 l.stop();
284 j.join().unwrap();
285 assert_eq!(*(h.val.lock().unwrap()), 1);
286 }
287}