use std::future::Future;
use std::io::Result;
use std::pin::Pin;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Weak;
use std::task::Context;
use std::task::Poll;
use async_task::Task;
use base::warn;
use base::AsRawDescriptor;
use base::AsRawDescriptors;
use base::RawDescriptor;
use futures::task::noop_waker;
use pin_utils::pin_mut;
use sync::Mutex;
use crate::queue::RunnableQueue;
use crate::waker::WeakWake;
use crate::AsyncError;
use crate::AsyncResult;
use crate::BlockingPool;
use crate::DetachedTasks;
use crate::ExecutorTrait;
use crate::IntoAsync;
use crate::IoSource;
use crate::TaskHandle;
pub trait Reactor: Send + Sync + Sized {
    fn new() -> Result<Self>;
    fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
    fn on_thread_start(&self) {}
    fn wait_for_work(&self, set_processing: impl Fn()) -> Result<()>;
    fn wake(&self);
    fn new_source<F: AsRawDescriptor>(
        &self,
        ex: &Arc<RawExecutor<Self>>,
        f: F,
    ) -> AsyncResult<IoSource<F>>;
    fn wrap_task_handle<R>(task: RawTaskHandle<Self, R>) -> TaskHandle<R>;
}
const WAITING: i32 = 0x1d5b_c019u32 as i32;
const PROCESSING: i32 = 0xd474_77bcu32 as i32;
const WOKEN: i32 = 0x3e4d_3276u32 as i32;
pub struct RawExecutor<Re: Reactor + 'static> {
    pub reactor: Re,
    queue: RunnableQueue,
    blocking_pool: BlockingPool,
    state: AtomicI32,
    detached_tasks: Mutex<DetachedTasks>,
}
impl<Re: Reactor> RawExecutor<Re> {
    pub fn new_with(reactor: Re) -> AsyncResult<Arc<Self>> {
        Ok(Arc::new(RawExecutor {
            reactor,
            queue: RunnableQueue::new(),
            blocking_pool: Default::default(),
            state: AtomicI32::new(PROCESSING),
            detached_tasks: Mutex::new(DetachedTasks::new()),
        }))
    }
    pub fn new() -> AsyncResult<Arc<Self>> {
        Self::new_with(Re::new().map_err(AsyncError::Io)?)
    }
    fn wake(&self) {
        let oldstate = self.state.swap(WOKEN, Ordering::AcqRel);
        if oldstate == WAITING {
            self.reactor.wake();
        }
    }
    fn run_internal<F: Future>(&self, cx: &mut Context, done: F) -> AsyncResult<F::Output> {
        self.reactor.on_thread_start();
        pin_mut!(done);
        loop {
            self.state.store(PROCESSING, Ordering::Release);
            for runnable in self.queue.iter() {
                runnable.run();
            }
            if let Ok(mut tasks) = self.detached_tasks.try_lock() {
                tasks.poll(cx);
            }
            if let Poll::Ready(val) = done.as_mut().poll(cx) {
                return Ok(val);
            }
            let oldstate = self.state.compare_exchange(
                PROCESSING,
                WAITING,
                Ordering::AcqRel,
                Ordering::Acquire,
            );
            if let Err(oldstate) = oldstate {
                debug_assert_eq!(oldstate, WOKEN);
                continue;
            }
            self.reactor
                .wait_for_work(|| self.state.store(PROCESSING, Ordering::Release))
                .map_err(AsyncError::Io)?;
        }
    }
}
impl<Re: Reactor + 'static> ExecutorTrait for Arc<RawExecutor<Re>> {
    fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
        self.reactor.new_source(self, f)
    }
    fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static,
    {
        let raw = Arc::downgrade(self);
        let schedule = move |runnable| {
            if let Some(r) = raw.upgrade() {
                r.queue.push_back(runnable);
                r.wake();
            }
        };
        let (runnable, task) = async_task::spawn(f, schedule);
        runnable.schedule();
        Re::wrap_task_handle(RawTaskHandle {
            task,
            raw: Arc::downgrade(self),
        })
    }
    fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
    where
        F: Future + 'static,
        F::Output: 'static,
    {
        let raw = Arc::downgrade(self);
        let schedule = move |runnable| {
            if let Some(r) = raw.upgrade() {
                r.queue.push_back(runnable);
                r.wake();
            }
        };
        let (runnable, task) = async_task::spawn_local(f, schedule);
        runnable.schedule();
        Re::wrap_task_handle(RawTaskHandle {
            task,
            raw: Arc::downgrade(self),
        })
    }
    fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
    where
        F: FnOnce() -> R + Send + 'static,
        R: Send + 'static,
    {
        self.spawn(self.blocking_pool.spawn(f))
    }
    fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
        let waker = super::waker::new_waker(Arc::downgrade(self));
        let mut ctx = Context::from_waker(&waker);
        self.run_internal(&mut ctx, f)
    }
}
impl<Re: Reactor + AsRawDescriptors> AsRawDescriptors for RawExecutor<Re> {
    fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
        self.reactor.as_raw_descriptors()
    }
}
impl<Re: Reactor> WeakWake for RawExecutor<Re> {
    fn wake_by_ref(weak_self: &Weak<Self>) {
        if let Some(arc_self) = weak_self.upgrade() {
            RawExecutor::wake(&arc_self);
        }
    }
}
impl<Re: Reactor> Drop for RawExecutor<Re> {
    fn drop(&mut self) {
        let final_future = self.reactor.on_executor_drop();
        let waker = noop_waker();
        let mut cx = Context::from_waker(&waker);
        if let Err(e) = self.run_internal(&mut cx, final_future) {
            warn!("Failed to drive RawExecutor to completion: {}", e);
        }
    }
}
pub struct RawTaskHandle<Re: Reactor + 'static, R> {
    task: Task<R>,
    raw: Weak<RawExecutor<Re>>,
}
impl<Re: Reactor, R: Send + 'static> RawTaskHandle<Re, R> {
    pub fn detach(self) {
        if let Some(raw) = self.raw.upgrade() {
            raw.detached_tasks.lock().push(self.task);
        }
    }
    pub async fn cancel(self) -> Option<R> {
        self.task.cancel().await
    }
}
impl<Re: Reactor, R: 'static> Future for RawTaskHandle<Re, R> {
    type Output = R;
    fn poll(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context,
    ) -> std::task::Poll<Self::Output> {
        Pin::new(&mut self.task).poll(cx)
    }
}