use std::collections::VecDeque;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use balloon_control::BalloonTubeCommand;
use balloon_control::BalloonTubeResult;
use base::error;
use base::Error as SysError;
use base::Tube;
use serde::Deserialize;
use serde::Serialize;
use crate::VmResponse;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum BalloonControlCommand {
    Adjust {
        num_bytes: u64,
        wait_for_success: bool,
    },
    Stats,
    WorkingSet,
    WorkingSetConfig {
        bins: Vec<u32>,
        refresh_threshold: u32,
        report_threshold: u32,
    },
}
fn do_send(tube: &Tube, cmd: &BalloonControlCommand) -> Option<VmResponse> {
    match *cmd {
        BalloonControlCommand::Adjust {
            num_bytes,
            wait_for_success,
        } => {
            match tube.send(&BalloonTubeCommand::Adjust {
                num_bytes,
                allow_failure: wait_for_success,
            }) {
                Ok(_) => {
                    if wait_for_success {
                        None
                    } else {
                        Some(VmResponse::Ok)
                    }
                }
                Err(_) => Some(VmResponse::Err(SysError::last())),
            }
        }
        BalloonControlCommand::WorkingSetConfig {
            ref bins,
            refresh_threshold,
            report_threshold,
        } => {
            match tube.send(&BalloonTubeCommand::WorkingSetConfig {
                bins: bins.clone(),
                refresh_threshold,
                report_threshold,
            }) {
                Ok(_) => Some(VmResponse::Ok),
                Err(_) => Some(VmResponse::Err(SysError::last())),
            }
        }
        BalloonControlCommand::Stats => match tube.send(&BalloonTubeCommand::Stats) {
            Ok(_) => None,
            Err(_) => Some(VmResponse::Err(SysError::last())),
        },
        BalloonControlCommand::WorkingSet => match tube.send(&BalloonTubeCommand::WorkingSet) {
            Ok(_) => None,
            Err(_) => Some(VmResponse::Err(SysError::last())),
        },
    }
}
pub struct BalloonTube {
    tube: Tube,
    pending_queue: VecDeque<(BalloonControlCommand, Option<usize>)>,
    pending_adjust_with_completion: Option<(u64, usize)>,
}
impl BalloonTube {
    pub fn new(tube: Tube) -> Self {
        BalloonTube {
            tube,
            pending_queue: VecDeque::new(),
            pending_adjust_with_completion: None,
        }
    }
    pub fn send_cmd(
        &mut self,
        cmd: BalloonControlCommand,
        key: Option<usize>,
    ) -> Option<(VmResponse, usize)> {
        match cmd {
            BalloonControlCommand::Adjust {
                wait_for_success: true,
                num_bytes,
            } => {
                let Some(key) = key else {
                    error!("Asked for completion without reply key");
                    return None;
                };
                let resp = self
                    .pending_adjust_with_completion
                    .take()
                    .map(|(_, key)| (VmResponse::ErrString("Adjust overriden".to_string()), key));
                if do_send(&self.tube, &cmd).is_some() {
                    unreachable!("Unexpected early reply");
                }
                self.pending_adjust_with_completion = Some((num_bytes, key));
                resp
            }
            _ => {
                if !self.pending_queue.is_empty() {
                    self.pending_queue.push_back((cmd, key));
                    return None;
                }
                let resp = do_send(&self.tube, &cmd);
                if resp.is_none() {
                    self.pending_queue.push_back((cmd, key));
                }
                match key {
                    None => None,
                    Some(key) => resp.map(|r| (r, key)),
                }
            }
        }
    }
    pub fn recv(&mut self) -> Result<Vec<(VmResponse, usize)>> {
        let res = self
            .tube
            .recv::<BalloonTubeResult>()
            .context("failed to read balloon tube")?;
        if let BalloonTubeResult::Adjusted { num_bytes: actual } = res {
            let Some((target, key)) = self.pending_adjust_with_completion else {
                bail!("Unexpected balloon adjust to {}", actual);
            };
            if actual != target {
                return Ok(vec![]);
            }
            self.pending_adjust_with_completion.take();
            return Ok(vec![(VmResponse::Ok, key)]);
        }
        let mut responses = vec![];
        if self.pending_queue.is_empty() {
            bail!("Unexpected balloon tube result {:?}", res)
        }
        let resp = match (
            &self.pending_queue.front().expect("entry disappeared").0,
            res,
        ) {
            (
                BalloonControlCommand::Stats,
                BalloonTubeResult::Stats {
                    stats,
                    balloon_actual,
                },
            ) => VmResponse::BalloonStats {
                stats,
                balloon_actual,
            },
            (
                BalloonControlCommand::WorkingSet,
                BalloonTubeResult::WorkingSet { ws, balloon_actual },
            ) => VmResponse::BalloonWS { ws, balloon_actual },
            (_, resp) => {
                bail!("Unexpected balloon tube result {:?}", resp);
            }
        };
        let key = self.pending_queue.pop_front().expect("entry disappeared").1;
        if let Some(key) = key {
            responses.push((resp, key))
        }
        while let Some((cmd, key)) = self.pending_queue.front() {
            match do_send(&self.tube, cmd) {
                Some(resp) => {
                    if let Some(key) = key {
                        responses.push((resp, *key));
                    }
                    self.pending_queue.pop_front();
                }
                None => break,
            }
        }
        Ok(responses)
    }
}
#[cfg(test)]
mod tests {
    use balloon_control::BalloonStats;
    use super::*;
    fn balloon_device_respond_stats(device: &Tube) {
        let BalloonTubeCommand::Stats = device.recv::<BalloonTubeCommand>().unwrap() else {
            panic!("unexpected command");
        };
        device
            .send(&BalloonTubeResult::Stats {
                stats: BalloonStats::default(),
                balloon_actual: 0,
            })
            .unwrap();
    }
    #[test]
    fn test_stat_command() {
        let (host, device) = Tube::pair().unwrap();
        let mut balloon_tube = BalloonTube::new(host);
        let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
        assert!(resp.is_none());
        balloon_device_respond_stats(&device);
        let resp = balloon_tube.recv().unwrap();
        assert_eq!(resp.len(), 1);
        assert_eq!(resp[0].1, 0xc0ffee);
        assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
    }
    #[test]
    fn test_multiple_stat_command() {
        let (host, device) = Tube::pair().unwrap();
        let mut balloon_tube = BalloonTube::new(host);
        let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
        assert!(resp.is_none());
        let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xbadcafe));
        assert!(resp.is_none());
        balloon_device_respond_stats(&device);
        let resp = balloon_tube.recv().unwrap();
        assert_eq!(resp.len(), 1);
        assert_eq!(resp[0].1, 0xc0ffee);
        assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
        balloon_device_respond_stats(&device);
        let resp = balloon_tube.recv().unwrap();
        assert_eq!(resp.len(), 1);
        assert_eq!(resp[0].1, 0xbadcafe);
        assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
    }
    #[test]
    fn test_queued_stats_adjust_no_reply() {
        let (host, device) = Tube::pair().unwrap();
        let mut balloon_tube = BalloonTube::new(host);
        let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
        assert!(resp.is_none());
        let resp = balloon_tube.send_cmd(
            BalloonControlCommand::Adjust {
                num_bytes: 0,
                wait_for_success: false,
            },
            Some(0xbadcafe),
        );
        assert!(resp.is_none());
        balloon_device_respond_stats(&device);
        let resp = balloon_tube.recv().unwrap();
        assert_eq!(resp.len(), 2);
        assert_eq!(resp[0].1, 0xc0ffee);
        assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
        assert_eq!(resp[1].1, 0xbadcafe);
        assert!(matches!(resp[1].0, VmResponse::Ok));
        let cmd = device.recv::<BalloonTubeCommand>().unwrap();
        assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
    }
    #[test]
    fn test_adjust_with_reply() {
        let (host, device) = Tube::pair().unwrap();
        let mut balloon_tube = BalloonTube::new(host);
        let resp = balloon_tube.send_cmd(
            BalloonControlCommand::Adjust {
                num_bytes: 0xc0ffee,
                wait_for_success: true,
            },
            Some(0xc0ffee),
        );
        assert!(resp.is_none());
        let cmd = device.recv::<BalloonTubeCommand>().unwrap();
        assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
        let resp = balloon_tube.send_cmd(
            BalloonControlCommand::Adjust {
                num_bytes: 0xbadcafe,
                wait_for_success: true,
            },
            Some(0xbadcafe),
        );
        assert!(matches!(resp, Some((VmResponse::ErrString(_), 0xc0ffee))));
        let cmd = device.recv::<BalloonTubeCommand>().unwrap();
        assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
        device
            .send(&BalloonTubeResult::Adjusted {
                num_bytes: 0xc0ffee,
            })
            .unwrap();
        let resp = balloon_tube.recv().unwrap();
        assert_eq!(resp.len(), 0);
        device
            .send(&BalloonTubeResult::Adjusted {
                num_bytes: 0xbadcafe,
            })
            .unwrap();
        let resp = balloon_tube.recv().unwrap();
        assert_eq!(resp.len(), 1);
        assert_eq!(resp[0].1, 0xbadcafe);
        assert!(matches!(resp[0].0, VmResponse::Ok));
    }
    #[test]
    fn test_stats_and_adjust_with_reply() {
        let (host, device) = Tube::pair().unwrap();
        let mut balloon_tube = BalloonTube::new(host);
        let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
        assert!(resp.is_none());
        let resp = balloon_tube.send_cmd(
            BalloonControlCommand::Adjust {
                num_bytes: 0xbadcafe,
                wait_for_success: true,
            },
            Some(0xbadcafe),
        );
        assert!(resp.is_none());
        let cmd = device.recv::<BalloonTubeCommand>().unwrap();
        assert!(matches!(cmd, BalloonTubeCommand::Stats));
        let cmd = device.recv::<BalloonTubeCommand>().unwrap();
        assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
        device
            .send(&BalloonTubeResult::Adjusted {
                num_bytes: 0xbadcafe,
            })
            .unwrap();
        let resp = balloon_tube.recv().unwrap();
        assert_eq!(resp.len(), 1);
        assert_eq!(resp[0].1, 0xbadcafe);
        assert!(matches!(resp[0].0, VmResponse::Ok));
        device
            .send(&BalloonTubeResult::Stats {
                stats: BalloonStats::default(),
                balloon_actual: 0,
            })
            .unwrap();
        let resp = balloon_tube.recv().unwrap();
        assert_eq!(resp.len(), 1);
        assert_eq!(resp[0].1, 0xc0ffee);
        assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
    }
}