use std::future::Future;
use std::io::Result;
use std::pin::Pin;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Weak;
use std::task::Context;
use std::task::Poll;
use async_task::Task;
use base::warn;
use base::AsRawDescriptor;
use base::AsRawDescriptors;
use base::RawDescriptor;
use futures::task::noop_waker;
use pin_utils::pin_mut;
use sync::Mutex;
use crate::queue::RunnableQueue;
use crate::waker::WeakWake;
use crate::AsyncError;
use crate::AsyncResult;
use crate::BlockingPool;
use crate::DetachedTasks;
use crate::ExecutorTrait;
use crate::IntoAsync;
use crate::IoSource;
use crate::TaskHandle;
pub trait Reactor: Send + Sync + Sized {
fn new() -> Result<Self>;
fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
fn on_thread_start(&self) {}
fn wait_for_work(&self, set_processing: impl Fn()) -> Result<()>;
fn wake(&self);
fn new_source<F: AsRawDescriptor>(
&self,
ex: &Arc<RawExecutor<Self>>,
f: F,
) -> AsyncResult<IoSource<F>>;
fn wrap_task_handle<R>(task: RawTaskHandle<Self, R>) -> TaskHandle<R>;
}
const WAITING: i32 = 0x1d5b_c019u32 as i32;
const PROCESSING: i32 = 0xd474_77bcu32 as i32;
const WOKEN: i32 = 0x3e4d_3276u32 as i32;
pub struct RawExecutor<Re: Reactor + 'static> {
pub reactor: Re,
queue: RunnableQueue,
blocking_pool: BlockingPool,
state: AtomicI32,
detached_tasks: Mutex<DetachedTasks>,
}
impl<Re: Reactor> RawExecutor<Re> {
pub fn new_with(reactor: Re) -> AsyncResult<Arc<Self>> {
Ok(Arc::new(RawExecutor {
reactor,
queue: RunnableQueue::new(),
blocking_pool: Default::default(),
state: AtomicI32::new(PROCESSING),
detached_tasks: Mutex::new(DetachedTasks::new()),
}))
}
pub fn new() -> AsyncResult<Arc<Self>> {
Self::new_with(Re::new().map_err(AsyncError::Io)?)
}
fn wake(&self) {
let oldstate = self.state.swap(WOKEN, Ordering::AcqRel);
if oldstate == WAITING {
self.reactor.wake();
}
}
fn run_internal<F: Future>(&self, cx: &mut Context, done: F) -> AsyncResult<F::Output> {
self.reactor.on_thread_start();
pin_mut!(done);
loop {
self.state.store(PROCESSING, Ordering::Release);
for runnable in self.queue.iter() {
runnable.run();
}
if let Ok(mut tasks) = self.detached_tasks.try_lock() {
tasks.poll(cx);
}
if let Poll::Ready(val) = done.as_mut().poll(cx) {
return Ok(val);
}
let oldstate = self.state.compare_exchange(
PROCESSING,
WAITING,
Ordering::AcqRel,
Ordering::Acquire,
);
if let Err(oldstate) = oldstate {
debug_assert_eq!(oldstate, WOKEN);
continue;
}
self.reactor
.wait_for_work(|| self.state.store(PROCESSING, Ordering::Release))
.map_err(AsyncError::Io)?;
}
}
}
impl<Re: Reactor + 'static> ExecutorTrait for Arc<RawExecutor<Re>> {
fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
self.reactor.new_source(self, f)
}
fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let raw = Arc::downgrade(self);
let schedule = move |runnable| {
if let Some(r) = raw.upgrade() {
r.queue.push_back(runnable);
r.wake();
}
};
let (runnable, task) = async_task::spawn(f, schedule);
runnable.schedule();
Re::wrap_task_handle(RawTaskHandle {
task,
raw: Arc::downgrade(self),
})
}
fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let raw = Arc::downgrade(self);
let schedule = move |runnable| {
if let Some(r) = raw.upgrade() {
r.queue.push_back(runnable);
r.wake();
}
};
let (runnable, task) = async_task::spawn_local(f, schedule);
runnable.schedule();
Re::wrap_task_handle(RawTaskHandle {
task,
raw: Arc::downgrade(self),
})
}
fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.spawn(self.blocking_pool.spawn(f))
}
fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
let waker = super::waker::new_waker(Arc::downgrade(self));
let mut ctx = Context::from_waker(&waker);
self.run_internal(&mut ctx, f)
}
}
impl<Re: Reactor + AsRawDescriptors> AsRawDescriptors for RawExecutor<Re> {
fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
self.reactor.as_raw_descriptors()
}
}
impl<Re: Reactor> WeakWake for RawExecutor<Re> {
fn wake_by_ref(weak_self: &Weak<Self>) {
if let Some(arc_self) = weak_self.upgrade() {
RawExecutor::wake(&arc_self);
}
}
}
impl<Re: Reactor> Drop for RawExecutor<Re> {
fn drop(&mut self) {
let final_future = self.reactor.on_executor_drop();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
if let Err(e) = self.run_internal(&mut cx, final_future) {
warn!("Failed to drive RawExecutor to completion: {}", e);
}
}
}
pub struct RawTaskHandle<Re: Reactor + 'static, R> {
task: Task<R>,
raw: Weak<RawExecutor<Re>>,
}
impl<Re: Reactor, R: Send + 'static> RawTaskHandle<Re, R> {
pub fn detach(self) {
if let Some(raw) = self.raw.upgrade() {
raw.detached_tasks.lock().push(self.task);
}
}
pub async fn cancel(self) -> Option<R> {
self.task.cancel().await
}
}
impl<Re: Reactor, R: 'static> Future for RawTaskHandle<Re, R> {
type Output = R;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context,
) -> std::task::Poll<Self::Output> {
Pin::new(&mut self.task).poll(cx)
}
}