use std::collections::VecDeque;
use std::io;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use anyhow::Context;
use base::error;
use base::Event;
use base::EventToken;
use base::FileSync;
use base::RawDescriptor;
use base::WaitContext;
use base::WorkerThread;
use sync::Mutex;
use crate::serial::sys::InStreamType;
use crate::serial_device::SerialInput;
use crate::serial_device::SerialOptions;
use crate::virtio::console::device::ConsoleDevice;
use crate::virtio::console::port::ConsolePort;
use crate::virtio::console::port::ConsolePortInfo;
use crate::virtio::console::Console;
use crate::virtio::ProtectionType;
use crate::SerialDevice;
impl SerialDevice for Console {
    fn new(
        protection_type: ProtectionType,
        _event: Event,
        input: Option<Box<dyn SerialInput>>,
        output: Option<Box<dyn io::Write + Send>>,
        _sync: Option<Box<dyn FileSync + Send>>,
        options: SerialOptions,
        keep_rds: Vec<RawDescriptor>,
    ) -> Console {
        Console::new(
            protection_type,
            input,
            output,
            keep_rds,
            options.pci_address,
            options.max_queue_sizes,
        )
    }
}
impl SerialDevice for ConsoleDevice {
    fn new(
        protection_type: ProtectionType,
        _event: Event,
        input: Option<Box<dyn SerialInput>>,
        output: Option<Box<dyn io::Write + Send>>,
        _sync: Option<Box<dyn FileSync + Send>>,
        options: SerialOptions,
        keep_rds: Vec<RawDescriptor>,
    ) -> ConsoleDevice {
        let info = ConsolePortInfo {
            name: options.name,
            console: options.console,
        };
        let port = ConsolePort::new(input, output, Some(info), keep_rds);
        ConsoleDevice::new_single_port(protection_type, port)
    }
}
impl SerialDevice for ConsolePort {
    fn new(
        _protection_type: ProtectionType,
        _event: Event,
        input: Option<Box<dyn SerialInput>>,
        output: Option<Box<dyn io::Write + Send>>,
        _sync: Option<Box<dyn FileSync + Send>>,
        options: SerialOptions,
        keep_rds: Vec<RawDescriptor>,
    ) -> ConsolePort {
        let info = ConsolePortInfo {
            name: options.name,
            console: options.console,
        };
        ConsolePort::new(input, output, Some(info), keep_rds)
    }
}
pub(in crate::virtio::console) fn spawn_input_thread(
    mut input: InStreamType,
    in_avail_evt: Event,
    input_buffer: Arc<Mutex<VecDeque<u8>>>,
) -> WorkerThread<InStreamType> {
    WorkerThread::start("v_console_input", move |kill_evt| {
        if !input_buffer.lock().is_empty() {
            in_avail_evt.signal().unwrap();
        }
        if let Err(e) = read_input(&mut input, &in_avail_evt, input_buffer, kill_evt) {
            error!("console input thread exited with error: {:#}", e);
        }
        input
    })
}
fn read_input(
    input: &mut InStreamType,
    thread_in_avail_evt: &Event,
    buffer: Arc<Mutex<VecDeque<u8>>>,
    kill_evt: Event,
) -> anyhow::Result<()> {
    #[derive(EventToken)]
    enum Token {
        ConsoleEvent,
        Kill,
    }
    let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
        (&kill_evt, Token::Kill),
        (input.get_read_notifier(), Token::ConsoleEvent),
    ])
    .context("failed creating WaitContext")?;
    let mut kill_timeout = None;
    let mut rx_buf = [0u8; 1 << 12];
    'wait: loop {
        let events = wait_ctx.wait().context("Failed to wait for events")?;
        for event in events.iter() {
            match event.token {
                Token::Kill => {
                    if events.iter().all(|e| matches!(e.token, Token::Kill)) {
                        break 'wait;
                    }
                    const TIMEOUT_DURATION: Duration = Duration::from_millis(500);
                    match kill_timeout {
                        None => {
                            kill_timeout = Some(Instant::now() + TIMEOUT_DURATION);
                        }
                        Some(t) => {
                            if Instant::now() >= t {
                                error!(
                                    "failed to drain console input within {:?}, giving up",
                                    TIMEOUT_DURATION
                                );
                                break 'wait;
                            }
                        }
                    }
                }
                Token::ConsoleEvent => {
                    match input.read(&mut rx_buf) {
                        Ok(0) => break 'wait, Ok(size) => {
                            buffer.lock().extend(&rx_buf[0..size]);
                            thread_in_avail_evt.signal().unwrap();
                        }
                        Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
                        Err(e) => {
                            return Err(e).context("failed to read console input");
                        }
                    }
                }
            }
        }
    }
    Ok(())
}