1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
// Copyright 2017 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::cmp::min;
use std::fs::File;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::null_mut;
use std::time::Duration;
use libc::c_int;
use libc::epoll_create1;
use libc::epoll_ctl;
use libc::epoll_event;
use libc::epoll_wait;
use libc::ENOENT;
use libc::EPOLLHUP;
use libc::EPOLLIN;
use libc::EPOLLOUT;
use libc::EPOLLRDHUP;
use libc::EPOLL_CLOEXEC;
use libc::EPOLL_CTL_ADD;
use libc::EPOLL_CTL_DEL;
use libc::EPOLL_CTL_MOD;
use smallvec::SmallVec;
use super::errno_result;
use super::Result;
use crate::handle_eintr_errno;
use crate::AsRawDescriptor;
use crate::EventToken;
use crate::EventType;
use crate::FromRawDescriptor;
use crate::RawDescriptor;
use crate::TriggeredEvent;
const EVENT_CONTEXT_MAX_EVENTS: usize = 16;
impl From<EventType> for u32 {
fn from(et: EventType) -> u32 {
let v = match et {
EventType::None => 0,
EventType::Read => EPOLLIN,
EventType::Write => EPOLLOUT,
EventType::ReadWrite => EPOLLIN | EPOLLOUT,
};
v as u32
}
}
/// Used to poll multiple objects that have file descriptors.
///
/// See [`crate::WaitContext`] for an example that uses the cross-platform wrapper.
pub struct EventContext<T> {
epoll_ctx: File,
// Needed to satisfy usage of T
tokens: PhantomData<[T]>,
}
impl<T: EventToken> EventContext<T> {
/// Creates a new `EventContext`.
pub fn new() -> Result<EventContext<T>> {
// SAFETY:
// Safe because we check the return value.
let epoll_fd = unsafe { epoll_create1(EPOLL_CLOEXEC) };
if epoll_fd < 0 {
return errno_result();
}
Ok(EventContext {
// SAFETY:
// Safe because epoll_fd is valid.
epoll_ctx: unsafe { File::from_raw_descriptor(epoll_fd) },
tokens: PhantomData,
})
}
/// Creates a new `EventContext` and adds the slice of `fd` and `token` tuples to the new
/// context.
///
/// This is equivalent to calling `new` followed by `add_many`. If there is an error, this will
/// return the error instead of the new context.
pub fn build_with(fd_tokens: &[(&dyn AsRawDescriptor, T)]) -> Result<EventContext<T>> {
let ctx = EventContext::new()?;
ctx.add_many(fd_tokens)?;
Ok(ctx)
}
/// Adds the given slice of `fd` and `token` tuples to this context.
///
/// This is equivalent to calling `add` with each `fd` and `token`. If there are any errors,
/// this method will stop adding `fd`s and return the first error, leaving this context in a
/// undefined state.
pub fn add_many(&self, fd_tokens: &[(&dyn AsRawDescriptor, T)]) -> Result<()> {
for (fd, token) in fd_tokens {
self.add(*fd, T::from_raw_token(token.as_raw_token()))?;
}
Ok(())
}
/// Adds the given `fd` to this context and associates the given `token` with the `fd`'s
/// readable events.
///
/// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and
/// there were no duplicated file descriptors (i.e. adding the same descriptor with a different
/// FD number) added to this context, events will not be reported by `wait` anymore.
pub fn add(&self, fd: &dyn AsRawDescriptor, token: T) -> Result<()> {
self.add_for_event(fd, EventType::Read, token)
}
/// Adds the given `descriptor` to this context, watching for the specified events and
/// associates the given 'token' with those events.
///
/// A `descriptor` can only be added once and does not need to be kept open. If the `descriptor`
/// is dropped and there were no duplicated file descriptors (i.e. adding the same descriptor
/// with a different FD number) added to this context, events will not be reported by `wait`
/// anymore.
pub fn add_for_event(
&self,
descriptor: &dyn AsRawDescriptor,
event_type: EventType,
token: T,
) -> Result<()> {
let mut evt = epoll_event {
events: event_type.into(),
u64: token.as_raw_token(),
};
// SAFETY:
// Safe because we give a valid epoll FD and FD to watch, as well as a valid epoll_event
// structure. Then we check the return value.
let ret = unsafe {
epoll_ctl(
self.epoll_ctx.as_raw_descriptor(),
EPOLL_CTL_ADD,
descriptor.as_raw_descriptor(),
&mut evt,
)
};
if ret < 0 {
return errno_result();
};
Ok(())
}
/// If `fd` was previously added to this context, the watched events will be replaced with
/// `event_type` and the token associated with it will be replaced with the given `token`.
pub fn modify(&self, fd: &dyn AsRawDescriptor, event_type: EventType, token: T) -> Result<()> {
let mut evt = epoll_event {
events: event_type.into(),
u64: token.as_raw_token(),
};
// SAFETY:
// Safe because we give a valid epoll FD and FD to modify, as well as a valid epoll_event
// structure. Then we check the return value.
let ret = unsafe {
epoll_ctl(
self.epoll_ctx.as_raw_descriptor(),
EPOLL_CTL_MOD,
fd.as_raw_descriptor(),
&mut evt,
)
};
if ret < 0 {
return errno_result();
};
Ok(())
}
/// Deletes the given `fd` from this context. If the `fd` is not being polled by this context,
/// the call is silently dropped without errors.
///
/// If an `fd`'s token shows up in the list of hangup events, it should be removed using this
/// method or by closing/dropping (if and only if the fd was never dup()'d/fork()'d) the `fd`.
/// Failure to do so will cause the `wait` method to always return immediately, causing ~100%
/// CPU load.
pub fn delete(&self, fd: &dyn AsRawDescriptor) -> Result<()> {
// SAFETY:
// Safe because we give a valid epoll FD and FD to stop watching. Then we check the return
// value.
let ret = unsafe {
epoll_ctl(
self.epoll_ctx.as_raw_descriptor(),
EPOLL_CTL_DEL,
fd.as_raw_descriptor(),
null_mut(),
)
};
// If epoll_ctl returns ENOENT it means the fd is not part of the current polling set so
// there is nothing to delete.
if ret < 0 && ret != ENOENT {
return errno_result();
};
Ok(())
}
/// Waits for any events to occur in FDs that were previously added to this context.
///
/// The events are level-triggered, meaning that if any events are unhandled (i.e. not reading
/// for readable events and not closing for hungup events), subsequent calls to `wait` will
/// return immediately. The consequence of not handling an event perpetually while calling
/// `wait` is that the callers loop will degenerated to busy loop polling, pinning a CPU to
/// ~100% usage.
pub fn wait(&self) -> Result<SmallVec<[TriggeredEvent<T>; 16]>> {
self.wait_timeout(Duration::new(i64::MAX as u64, 0))
}
/// Like `wait` except will only block for a maximum of the given `timeout`.
///
/// This may return earlier than `timeout` with zero events if the duration indicated exceeds
/// system limits.
pub fn wait_timeout(&self, timeout: Duration) -> Result<SmallVec<[TriggeredEvent<T>; 16]>> {
let mut epoll_events: [MaybeUninit<epoll_event>; EVENT_CONTEXT_MAX_EVENTS] =
// SAFETY:
// `MaybeUnint<T>` has the same layout as plain `T` (`epoll_event` in our case).
// We submit an uninitialized array to the `epoll_wait` system call, which returns how many
// elements it initialized, and then we convert only the initialized `MaybeUnint` values
// into `epoll_event` structures after the call.
unsafe { MaybeUninit::uninit().assume_init() };
let timeout_millis = if timeout.as_secs() as i64 == i64::MAX {
// We make the convenient assumption that 2^63 seconds is an effectively unbounded time
// frame. This is meant to mesh with `wait` calling us with no timeout.
-1
} else {
// In cases where we the number of milliseconds would overflow an i32, we substitute the
// maximum timeout which is ~24.8 days.
let millis = timeout
.as_secs()
.checked_mul(1_000)
.and_then(|ms| ms.checked_add(u64::from(timeout.subsec_nanos()) / 1_000_000))
.unwrap_or(i32::MAX as u64);
min(i32::MAX as u64, millis) as i32
};
let ret = {
let max_events = epoll_events.len() as c_int;
// SAFETY:
// Safe because we give an epoll context and a properly sized epoll_events array
// pointer, which we trust the kernel to fill in properly. The `transmute` is safe,
// since `MaybeUnint<T>` has the same layout as `T`, and the `epoll_wait` syscall will
// initialize as many elements of the `epoll_events` array as it returns.
unsafe {
handle_eintr_errno!(epoll_wait(
self.epoll_ctx.as_raw_descriptor(),
std::mem::transmute(&mut epoll_events[0]),
max_events,
timeout_millis
))
}
};
if ret < 0 {
return errno_result();
}
let count = ret as usize;
let events = epoll_events[0..count]
.iter()
.map(|e| {
// SAFETY:
// Converting `MaybeUninit<epoll_event>` into `epoll_event` is safe here, since we
// are only iterating over elements that the `epoll_wait` system call initialized.
let e = unsafe { e.assume_init() };
TriggeredEvent {
token: T::from_raw_token(e.u64),
is_readable: e.events & (EPOLLIN as u32) != 0,
is_writable: e.events & (EPOLLOUT as u32) != 0,
is_hungup: e.events & ((EPOLLHUP | EPOLLRDHUP) as u32) != 0,
}
})
.collect();
Ok(events)
}
}
impl<T: EventToken> AsRawDescriptor for EventContext<T> {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.epoll_ctx.as_raw_descriptor()
}
}
#[cfg(test)]
mod tests {
use std::time::Instant;
use base_event_token_derive::EventToken;
use super::*;
use crate::Event;
#[test]
fn event_context() {
let evt1 = Event::new().unwrap();
let evt2 = Event::new().unwrap();
evt1.signal().unwrap();
evt2.signal().unwrap();
let ctx: EventContext<u32> = EventContext::build_with(&[(&evt1, 1), (&evt2, 2)]).unwrap();
let mut evt_count = 0;
while evt_count < 2 {
for event in ctx.wait().unwrap().iter().filter(|e| e.is_readable) {
evt_count += 1;
match event.token {
1 => {
evt1.wait().unwrap();
ctx.delete(&evt1).unwrap();
}
2 => {
evt2.wait().unwrap();
ctx.delete(&evt2).unwrap();
}
_ => panic!("unexpected token"),
};
}
}
assert_eq!(evt_count, 2);
}
#[test]
fn event_context_overflow() {
const EVT_COUNT: usize = EVENT_CONTEXT_MAX_EVENTS * 2 + 1;
let ctx: EventContext<usize> = EventContext::new().unwrap();
let mut evts = Vec::with_capacity(EVT_COUNT);
for i in 0..EVT_COUNT {
let evt = Event::new().unwrap();
evt.signal().unwrap();
ctx.add(&evt, i).unwrap();
evts.push(evt);
}
let mut evt_count = 0;
while evt_count < EVT_COUNT {
for event in ctx.wait().unwrap().iter().filter(|e| e.is_readable) {
evts[event.token].wait().unwrap();
evt_count += 1;
}
}
}
#[test]
fn event_context_timeout() {
let ctx: EventContext<u32> = EventContext::new().unwrap();
let dur = Duration::from_millis(10);
let start_inst = Instant::now();
ctx.wait_timeout(dur).unwrap();
assert!(start_inst.elapsed() >= dur);
}
#[test]
#[allow(dead_code)]
fn event_token_derive() {
#[derive(EventToken)]
enum EmptyToken {}
#[derive(PartialEq, Debug, EventToken)]
enum Token {
Alpha,
Beta,
// comments
Gamma(u32),
Delta { index: usize },
Omega,
}
assert_eq!(
Token::from_raw_token(Token::Alpha.as_raw_token()),
Token::Alpha
);
assert_eq!(
Token::from_raw_token(Token::Beta.as_raw_token()),
Token::Beta
);
assert_eq!(
Token::from_raw_token(Token::Gamma(55).as_raw_token()),
Token::Gamma(55)
);
assert_eq!(
Token::from_raw_token(Token::Delta { index: 100 }.as_raw_token()),
Token::Delta { index: 100 }
);
assert_eq!(
Token::from_raw_token(Token::Omega.as_raw_token()),
Token::Omega
);
}
}