use std::collections::HashMap;
use std::collections::VecDeque;
use std::fs::File;
use std::io::Error as IOError;
use std::io::ErrorKind as IOErrorKind;
use std::io::Seek;
use std::io::SeekFrom;
use std::path::Path;
use std::path::PathBuf;
use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::RecvError;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use base::error;
use base::AsRawDescriptor;
use base::Error as BaseError;
use base::Event;
use base::EventToken;
use base::FromRawDescriptor;
use base::IntoRawDescriptor;
use base::MemoryMapping;
use base::MemoryMappingBuilder;
use base::MmapError;
use base::RawDescriptor;
use base::SafeDescriptor;
use base::ScmSocket;
use base::UnixSeqpacket;
use base::VolatileMemory;
use base::VolatileMemoryError;
use base::VolatileSlice;
use base::WaitContext;
use base::WorkerThread;
use remain::sorted;
use serde::Deserialize;
use serde::Serialize;
use sync::Mutex;
use thiserror::Error as ThisError;
use zerocopy::FromBytes;
use zerocopy::Immutable;
use zerocopy::IntoBytes;
use zerocopy::KnownLayout;
use crate::virtio::snd::constants::*;
use crate::virtio::snd::layout::*;
use crate::virtio::snd::vios_backend::streams::StreamState;
pub type Result<T> = std::result::Result<T, Error>;
#[sorted]
#[derive(ThisError, Debug)]
pub enum Error {
    #[error("Error memory mapping client_shm: {0}")]
    BaseMmapError(BaseError),
    #[error("Sender was dropped without sending buffer status, the recv thread may have exited")]
    BufferStatusSenderLost(RecvError),
    #[error("Command failed with status {0}")]
    CommandFailed(u32),
    #[error("Error duplicating file descriptor: {0}")]
    DupError(BaseError),
    #[error("Failed to create Recv event: {0}")]
    EventCreateError(BaseError),
    #[error("Failed to dup Recv event: {0}")]
    EventDupError(BaseError),
    #[error("Failed to signal event: {0}")]
    EventWriteError(BaseError),
    #[error("Failed to get size of tx shared memory: {0}")]
    FileSizeError(IOError),
    #[error("Error accessing guest's shared memory: {0}")]
    GuestMmapError(MmapError),
    #[error("No jack with id {0}")]
    InvalidJackId(u32),
    #[error("No stream with id {0}")]
    InvalidStreamId(u32),
    #[error("IO buffer operation failed: status = {0}")]
    IOBufferError(u32),
    #[error("No PCM streams available")]
    NoStreamsAvailable,
    #[error("Insuficient space for the new buffer in the queue's buffer area")]
    OutOfSpace,
    #[error("Platform not supported")]
    PlatformNotSupported,
    #[error("{0}")]
    ProtocolError(ProtocolErrorKind),
    #[error("Failed to connect to VioS server {1}: {0:?}")]
    ServerConnectionError(IOError, PathBuf),
    #[error("Failed to communicate with VioS server: {0:?}")]
    ServerError(IOError),
    #[error("Failed to communicate with VioS server: {0:?}")]
    ServerIOError(IOError),
    #[error("Error accessing VioS server's shared memory: {0}")]
    ServerMmapError(MmapError),
    #[error("Failed to duplicate UnixSeqpacket: {0}")]
    UnixSeqpacketDupError(IOError),
    #[error("Unsupported frame rate: {0}")]
    UnsupportedFrameRate(u32),
    #[error("Error accessing volatile memory: {0}")]
    VolatileMemoryError(VolatileMemoryError),
    #[error("Failed to create Recv thread's WaitContext: {0}")]
    WaitContextCreateError(BaseError),
    #[error("Error waiting for events")]
    WaitError(BaseError),
    #[error("Invalid operation for stream direction: {0}")]
    WrongDirection(u8),
    #[error("Set saved params should only be used while restoring the device")]
    WrongSetParams,
}
#[derive(ThisError, Debug)]
pub enum ProtocolErrorKind {
    #[error("The server sent a config of the wrong size: {0}")]
    UnexpectedConfigSize(usize),
    #[error("Received {1} file descriptors from the server, expected {0}")]
    UnexpectedNumberOfFileDescriptors(usize, usize), #[error("Server's version ({0}) doesn't match client's")]
    VersionMismatch(u32),
    #[error("Received a msg with an unexpected size: expected {0}, received {1}")]
    UnexpectedMessageSize(usize, usize), }
pub struct VioSClient {
    config: VioSConfig,
    jacks: Vec<virtio_snd_jack_info>,
    streams: Vec<virtio_snd_pcm_info>,
    chmaps: Vec<virtio_snd_chmap_info>,
    control_socket: Mutex<UnixSeqpacket>,
    event_socket: UnixSeqpacket,
    tx: IoBufferQueue,
    rx: IoBufferQueue,
    events: Arc<Mutex<VecDeque<virtio_snd_event>>>,
    event_notifier: Event,
    tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
    rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
    recv_thread_state: Arc<Mutex<ThreadFlags>>,
    recv_thread: Mutex<Option<WorkerThread<()>>>,
    params: HashMap<u32, virtio_snd_pcm_set_params>,
}
#[derive(Serialize, Deserialize)]
pub struct VioSClientSnapshot {
    config: VioSConfig,
    jacks: Vec<virtio_snd_jack_info>,
    streams: Vec<virtio_snd_pcm_info>,
    chmaps: Vec<virtio_snd_chmap_info>,
    params: HashMap<u32, virtio_snd_pcm_set_params>,
}
impl VioSClient {
    pub fn try_new<P: AsRef<Path>>(server: P) -> Result<VioSClient> {
        let client_socket = ScmSocket::try_from(
            UnixSeqpacket::connect(server.as_ref())
                .map_err(|e| Error::ServerConnectionError(e, server.as_ref().into()))?,
        )
        .map_err(|e| Error::ServerConnectionError(e, server.as_ref().into()))?;
        let mut config: VioSConfig = Default::default();
        const NUM_FDS: usize = 5;
        let (recv_size, mut safe_fds) = client_socket
            .recv_with_fds(config.as_mut_bytes(), NUM_FDS)
            .map_err(Error::ServerError)?;
        if recv_size != std::mem::size_of::<VioSConfig>() {
            return Err(Error::ProtocolError(
                ProtocolErrorKind::UnexpectedConfigSize(recv_size),
            ));
        }
        if config.version != VIOS_VERSION {
            return Err(Error::ProtocolError(ProtocolErrorKind::VersionMismatch(
                config.version,
            )));
        }
        fn pop<T: FromRawDescriptor>(
            safe_fds: &mut Vec<SafeDescriptor>,
            expected: usize,
            received: usize,
        ) -> Result<T> {
            unsafe {
                Ok(T::from_raw_descriptor(
                    safe_fds
                        .pop()
                        .ok_or(Error::ProtocolError(
                            ProtocolErrorKind::UnexpectedNumberOfFileDescriptors(
                                expected, received,
                            ),
                        ))?
                        .into_raw_descriptor(),
                ))
            }
        }
        let fd_count = safe_fds.len();
        let rx_shm_file = pop::<File>(&mut safe_fds, NUM_FDS, fd_count)?;
        let tx_shm_file = pop::<File>(&mut safe_fds, NUM_FDS, fd_count)?;
        let rx_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
        let tx_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
        let event_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
        if !safe_fds.is_empty() {
            return Err(Error::ProtocolError(
                ProtocolErrorKind::UnexpectedNumberOfFileDescriptors(NUM_FDS, fd_count),
            ));
        }
        let tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
            Arc::new(Mutex::new(HashMap::new()));
        let rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
            Arc::new(Mutex::new(HashMap::new()));
        let recv_thread_state = Arc::new(Mutex::new(ThreadFlags {
            reporting_events: false,
        }));
        let mut client = VioSClient {
            config,
            jacks: Vec::new(),
            streams: Vec::new(),
            chmaps: Vec::new(),
            control_socket: Mutex::new(client_socket.into_inner()),
            event_socket,
            tx: IoBufferQueue::new(tx_socket, tx_shm_file)?,
            rx: IoBufferQueue::new(rx_socket, rx_shm_file)?,
            events: Arc::new(Mutex::new(VecDeque::new())),
            event_notifier: Event::new().map_err(Error::EventCreateError)?,
            tx_subscribers,
            rx_subscribers,
            recv_thread_state,
            recv_thread: Mutex::new(None),
            params: HashMap::new(),
        };
        client.request_and_cache_info()?;
        Ok(client)
    }
    pub fn num_jacks(&self) -> u32 {
        self.config.jacks
    }
    pub fn num_streams(&self) -> u32 {
        self.config.streams
    }
    pub fn num_chmaps(&self) -> u32 {
        self.config.chmaps
    }
    pub fn jack_info(&self, idx: u32) -> Option<virtio_snd_jack_info> {
        self.jacks.get(idx as usize).copied()
    }
    pub fn stream_info(&self, idx: u32) -> Option<virtio_snd_pcm_info> {
        self.streams.get(idx as usize).cloned()
    }
    pub fn chmap_info(&self, idx: u32) -> Option<virtio_snd_chmap_info> {
        self.chmaps.get(idx as usize).copied()
    }
    pub fn start_bg_thread(&self) -> Result<()> {
        if self.recv_thread.lock().is_some() {
            return Ok(());
        }
        let tx_socket = self.tx.try_clone_socket()?;
        let rx_socket = self.rx.try_clone_socket()?;
        let event_socket = self
            .event_socket
            .try_clone()
            .map_err(Error::UnixSeqpacketDupError)?;
        let mut opt = self.recv_thread.lock();
        if opt.is_none() {
            let tx_subscribers = self.tx_subscribers.clone();
            let rx_subscribers = self.rx_subscribers.clone();
            let event_notifier = self
                .event_notifier
                .try_clone()
                .map_err(Error::EventDupError)?;
            let events = self.events.clone();
            let recv_thread_state = self.recv_thread_state.clone();
            *opt = Some(WorkerThread::start("shm_vios", move |kill_event| {
                if let Err(e) = run_recv_thread(
                    kill_event,
                    tx_subscribers,
                    rx_subscribers,
                    event_notifier,
                    events,
                    recv_thread_state,
                    tx_socket,
                    rx_socket,
                    event_socket,
                ) {
                    error!("virtio-snd shm_vios worker failed: {e:#}");
                }
            }));
        }
        Ok(())
    }
    pub fn stop_bg_thread(&self) -> Result<()> {
        if let Some(recv_thread) = self.recv_thread.lock().take() {
            recv_thread.stop();
        }
        Ok(())
    }
    pub fn get_event_notifier(&self) -> Result<Event> {
        self.recv_thread_state.lock().reporting_events = true;
        self.event_notifier
            .try_clone()
            .map_err(Error::EventDupError)
    }
    pub fn pop_event(&self) -> Option<virtio_snd_event> {
        self.events.lock().pop_front()
    }
    pub fn remap_jack(&self, jack_id: u32, association: u32, sequence: u32) -> Result<()> {
        if jack_id >= self.config.jacks {
            return Err(Error::InvalidJackId(jack_id));
        }
        let msg = virtio_snd_jack_remap {
            hdr: virtio_snd_jack_hdr {
                hdr: virtio_snd_hdr {
                    code: VIRTIO_SND_R_JACK_REMAP.into(),
                },
                jack_id: jack_id.into(),
            },
            association: association.into(),
            sequence: sequence.into(),
        };
        let control_socket_lock = self.control_socket.lock();
        send_cmd(&control_socket_lock, msg)
    }
    pub fn set_stream_parameters(
        &mut self,
        stream_id: u32,
        params: VioSStreamParams,
    ) -> Result<()> {
        self.streams
            .get(stream_id as usize)
            .ok_or(Error::InvalidStreamId(stream_id))?;
        let raw_params: virtio_snd_pcm_set_params = (stream_id, params).into();
        let _ = self.params.insert(stream_id, raw_params);
        let control_socket_lock = self.control_socket.lock();
        send_cmd(&control_socket_lock, raw_params)
    }
    pub fn set_stream_parameters_raw(
        &mut self,
        raw_params: virtio_snd_pcm_set_params,
    ) -> Result<()> {
        let stream_id = raw_params.hdr.stream_id.to_native();
        let _ = self.params.insert(stream_id, raw_params);
        self.streams
            .get(stream_id as usize)
            .ok_or(Error::InvalidStreamId(stream_id))?;
        let control_socket_lock = self.control_socket.lock();
        send_cmd(&control_socket_lock, raw_params)
    }
    pub fn prepare_stream(&self, stream_id: u32) -> Result<()> {
        self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_PREPARE)
    }
    pub fn release_stream(&self, stream_id: u32) -> Result<()> {
        self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_RELEASE)
    }
    pub fn start_stream(&self, stream_id: u32) -> Result<()> {
        self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_START)
    }
    pub fn stop_stream(&self, stream_id: u32) -> Result<()> {
        self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_STOP)
    }
    pub fn inject_audio_data<R, Cb: FnOnce(VolatileSlice) -> R>(
        &self,
        stream_id: u32,
        size: usize,
        callback: Cb,
    ) -> Result<(u32, R)> {
        if self
            .streams
            .get(stream_id as usize)
            .ok_or(Error::InvalidStreamId(stream_id))?
            .direction
            != VIRTIO_SND_D_OUTPUT
        {
            return Err(Error::WrongDirection(VIRTIO_SND_D_OUTPUT));
        }
        self.streams
            .get(stream_id as usize)
            .ok_or(Error::InvalidStreamId(stream_id))?;
        let dst_offset = self.tx.allocate_buffer(size)?;
        let buffer_slice = self.tx.buffer_at(dst_offset, size)?;
        let ret = callback(buffer_slice);
        let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) = channel();
        self.tx_subscribers.lock().insert(dst_offset, sender);
        self.tx.send_buffer(stream_id, dst_offset, size)?;
        let (_, latency) = await_status(receiver)?;
        Ok((latency, ret))
    }
    pub fn request_audio_data<R, Cb: FnOnce(&VolatileSlice) -> R>(
        &self,
        stream_id: u32,
        size: usize,
        callback: Cb,
    ) -> Result<(u32, R)> {
        if self
            .streams
            .get(stream_id as usize)
            .ok_or(Error::InvalidStreamId(stream_id))?
            .direction
            != VIRTIO_SND_D_INPUT
        {
            return Err(Error::WrongDirection(VIRTIO_SND_D_INPUT));
        }
        let src_offset = self.rx.allocate_buffer(size)?;
        let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) = channel();
        self.rx_subscribers.lock().insert(src_offset, sender);
        self.rx.send_buffer(stream_id, src_offset, size)?;
        let (recv_size, latency) = await_status(receiver)?;
        let buffer_slice = self.rx.buffer_at(src_offset, recv_size)?;
        Ok((latency, callback(&buffer_slice)))
    }
    pub fn keep_rds(&self) -> Vec<RawDescriptor> {
        let control_desc = self.control_socket.lock().as_raw_descriptor();
        let event_desc = self.event_socket.as_raw_descriptor();
        let event_notifier = self.event_notifier.as_raw_descriptor();
        let mut ret = vec![control_desc, event_desc, event_notifier];
        ret.append(&mut self.tx.keep_rds());
        ret.append(&mut self.rx.keep_rds());
        ret
    }
    fn common_stream_op(&self, stream_id: u32, op: u32) -> Result<()> {
        self.streams
            .get(stream_id as usize)
            .ok_or(Error::InvalidStreamId(stream_id))?;
        let msg = virtio_snd_pcm_hdr {
            hdr: virtio_snd_hdr { code: op.into() },
            stream_id: stream_id.into(),
        };
        let control_socket_lock = self.control_socket.lock();
        send_cmd(&control_socket_lock, msg)
    }
    fn request_and_cache_info(&mut self) -> Result<()> {
        self.request_and_cache_jacks_info()?;
        self.request_and_cache_streams_info()?;
        self.request_and_cache_chmaps_info()?;
        Ok(())
    }
    fn request_info<T: IntoBytes + FromBytes + Default + Copy + Clone>(
        &self,
        req_code: u32,
        count: usize,
    ) -> Result<Vec<T>> {
        let info_size = std::mem::size_of::<T>();
        let status_size = std::mem::size_of::<virtio_snd_hdr>();
        let req = virtio_snd_query_info {
            hdr: virtio_snd_hdr {
                code: req_code.into(),
            },
            start_id: 0u32.into(),
            count: (count as u32).into(),
            size: (std::mem::size_of::<virtio_snd_query_info>() as u32).into(),
        };
        let control_socket_lock = self.control_socket.lock();
        seq_socket_send(&control_socket_lock, req.as_bytes())?;
        let reply = control_socket_lock
            .recv_as_vec()
            .map_err(Error::ServerIOError)?;
        let mut status: virtio_snd_hdr = Default::default();
        status
            .as_mut_bytes()
            .copy_from_slice(&reply[0..status_size]);
        if status.code.to_native() != VIRTIO_SND_S_OK {
            return Err(Error::CommandFailed(status.code.to_native()));
        }
        if reply.len() != status_size + count * info_size {
            return Err(Error::ProtocolError(
                ProtocolErrorKind::UnexpectedMessageSize(count * info_size, reply.len()),
            ));
        }
        Ok(reply[status_size..]
            .chunks(info_size)
            .map(|info_buffer| T::read_from_bytes(info_buffer).unwrap())
            .collect())
    }
    fn request_and_cache_jacks_info(&mut self) -> Result<()> {
        let num_jacks = self.config.jacks as usize;
        if num_jacks == 0 {
            return Ok(());
        }
        self.jacks = self.request_info(VIRTIO_SND_R_JACK_INFO, num_jacks)?;
        Ok(())
    }
    fn request_and_cache_streams_info(&mut self) -> Result<()> {
        let num_streams = self.config.streams as usize;
        if num_streams == 0 {
            return Ok(());
        }
        self.streams = self.request_info(VIRTIO_SND_R_PCM_INFO, num_streams)?;
        Ok(())
    }
    fn request_and_cache_chmaps_info(&mut self) -> Result<()> {
        let num_chmaps = self.config.chmaps as usize;
        if num_chmaps == 0 {
            return Ok(());
        }
        self.chmaps = self.request_info(VIRTIO_SND_R_CHMAP_INFO, num_chmaps)?;
        Ok(())
    }
    pub fn snapshot(&self) -> VioSClientSnapshot {
        VioSClientSnapshot {
            config: self.config,
            jacks: self.jacks.clone(),
            streams: self.streams.clone(),
            chmaps: self.chmaps.clone(),
            params: self.params.clone(),
        }
    }
    pub fn restore(&mut self, data: VioSClientSnapshot) -> anyhow::Result<()> {
        anyhow::ensure!(
            data.config == self.config,
            "config doesn't match on restore: expected: {:?}, got: {:?}",
            data.config,
            self.config
        );
        self.jacks = data.jacks;
        self.streams = data.streams;
        self.chmaps = data.chmaps;
        self.params = data.params;
        Ok(())
    }
    pub fn restore_stream(&mut self, stream_id: u32, state: StreamState) -> Result<()> {
        if let Some(params) = self.params.get(&stream_id).cloned() {
            self.set_stream_parameters_raw(params)?;
        }
        match state {
            StreamState::Started => {
                if let Err(e) = self.prepare_stream(stream_id) {
                    error!("failed to prepare stream: {}", e);
                };
                self.start_stream(stream_id)
            }
            StreamState::Prepared => self.prepare_stream(stream_id),
            _ => Ok(()),
        }
    }
}
#[derive(Clone, Copy)]
struct ThreadFlags {
    reporting_events: bool,
}
#[derive(EventToken)]
enum Token {
    Notification,
    TxBufferMsg,
    RxBufferMsg,
    EventMsg,
}
fn recv_buffer_status_msg(
    socket: &UnixSeqpacket,
    subscribers: &Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
) -> Result<()> {
    let mut msg: IoStatusMsg = Default::default();
    let size = socket
        .recv(msg.as_mut_bytes())
        .map_err(Error::ServerIOError)?;
    if size != std::mem::size_of::<IoStatusMsg>() {
        return Err(Error::ProtocolError(
            ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<IoStatusMsg>(), size),
        ));
    }
    let mut status = msg.status.status.into();
    if status == u32::MAX {
        status -= 1;
    }
    let latency = msg.status.latency_bytes.into();
    let offset = msg.buffer_offset as usize;
    let consumed_len = msg.consumed_len as usize;
    let promise_opt = subscribers.lock().remove(&offset);
    match promise_opt {
        None => error!(
            "Received an unexpected buffer status message: {}. This is a BUG!!",
            offset
        ),
        Some(sender) => {
            if let Err(e) = sender.send(BufferReleaseMsg {
                status,
                latency,
                consumed_len,
            }) {
                error!("Failed to notify waiting thread: {:?}", e);
            }
        }
    }
    Ok(())
}
fn recv_event(socket: &UnixSeqpacket) -> Result<virtio_snd_event> {
    let mut msg: virtio_snd_event = Default::default();
    let size = socket
        .recv(msg.as_mut_bytes())
        .map_err(Error::ServerIOError)?;
    if size != std::mem::size_of::<virtio_snd_event>() {
        return Err(Error::ProtocolError(
            ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<virtio_snd_event>(), size),
        ));
    }
    Ok(msg)
}
fn run_recv_thread(
    kill_event: Event,
    tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
    rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
    event_notifier: Event,
    event_queue: Arc<Mutex<VecDeque<virtio_snd_event>>>,
    state: Arc<Mutex<ThreadFlags>>,
    tx_socket: UnixSeqpacket,
    rx_socket: UnixSeqpacket,
    event_socket: UnixSeqpacket,
) -> Result<()> {
    let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
        (&tx_socket, Token::TxBufferMsg),
        (&rx_socket, Token::RxBufferMsg),
        (&event_socket, Token::EventMsg),
        (&kill_event, Token::Notification),
    ])
    .map_err(Error::WaitContextCreateError)?;
    let mut running = true;
    while running {
        let events = wait_ctx.wait().map_err(Error::WaitError)?;
        for evt in events {
            match evt.token {
                Token::TxBufferMsg => recv_buffer_status_msg(&tx_socket, &tx_subscribers)?,
                Token::RxBufferMsg => recv_buffer_status_msg(&rx_socket, &rx_subscribers)?,
                Token::EventMsg => {
                    let evt = recv_event(&event_socket)?;
                    let state_cpy = *state.lock();
                    if state_cpy.reporting_events {
                        event_queue.lock().push_back(evt);
                        event_notifier.signal().map_err(Error::EventWriteError)?;
                    } }
                Token::Notification => {
                    if let Err(e) = kill_event.wait() {
                        error!("Failed to consume notification from recv thread: {:?}", e);
                    }
                    running = false;
                }
            }
        }
    }
    Ok(())
}
fn await_status(promise: Receiver<BufferReleaseMsg>) -> Result<(usize, u32)> {
    let BufferReleaseMsg {
        status,
        latency,
        consumed_len,
    } = promise.recv().map_err(Error::BufferStatusSenderLost)?;
    if status == VIRTIO_SND_S_OK {
        Ok((consumed_len, latency))
    } else {
        Err(Error::IOBufferError(status))
    }
}
struct IoBufferQueue {
    socket: UnixSeqpacket,
    file: File,
    mmap: MemoryMapping,
    size: usize,
    next: Mutex<usize>,
}
impl IoBufferQueue {
    fn new(socket: UnixSeqpacket, mut file: File) -> Result<IoBufferQueue> {
        let size = file.seek(SeekFrom::End(0)).map_err(Error::FileSizeError)? as usize;
        let mmap = MemoryMappingBuilder::new(size)
            .from_file(&file)
            .build()
            .map_err(Error::ServerMmapError)?;
        Ok(IoBufferQueue {
            socket,
            file,
            mmap,
            size,
            next: Mutex::new(0),
        })
    }
    fn allocate_buffer(&self, size: usize) -> Result<usize> {
        if size > self.size {
            return Err(Error::OutOfSpace);
        }
        let mut next_lock = self.next.lock();
        let offset = if size > self.size - *next_lock {
            0
        } else {
            *next_lock
        };
        *next_lock = offset + size;
        Ok(offset)
    }
    fn buffer_at(&self, offset: usize, len: usize) -> Result<VolatileSlice> {
        self.mmap
            .get_slice(offset, len)
            .map_err(Error::VolatileMemoryError)
    }
    fn try_clone_socket(&self) -> Result<UnixSeqpacket> {
        self.socket
            .try_clone()
            .map_err(Error::UnixSeqpacketDupError)
    }
    fn send_buffer(&self, stream_id: u32, offset: usize, size: usize) -> Result<()> {
        let msg = IoTransferMsg::new(stream_id, offset, size);
        seq_socket_send(&self.socket, msg.as_bytes())
    }
    fn keep_rds(&self) -> Vec<RawDescriptor> {
        vec![
            self.file.as_raw_descriptor(),
            self.socket.as_raw_descriptor(),
        ]
    }
}
pub struct VioSStreamParams {
    pub buffer_bytes: u32,
    pub period_bytes: u32,
    pub features: u32,
    pub channels: u8,
    pub format: u8,
    pub rate: u8,
}
impl From<(u32, VioSStreamParams)> for virtio_snd_pcm_set_params {
    fn from(val: (u32, VioSStreamParams)) -> Self {
        virtio_snd_pcm_set_params {
            hdr: virtio_snd_pcm_hdr {
                hdr: virtio_snd_hdr {
                    code: VIRTIO_SND_R_PCM_SET_PARAMS.into(),
                },
                stream_id: val.0.into(),
            },
            buffer_bytes: val.1.buffer_bytes.into(),
            period_bytes: val.1.period_bytes.into(),
            features: val.1.features.into(),
            channels: val.1.channels,
            format: val.1.format,
            rate: val.1.rate,
            padding: 0u8,
        }
    }
}
fn send_cmd<T: Immutable + IntoBytes>(control_socket: &UnixSeqpacket, data: T) -> Result<()> {
    seq_socket_send(control_socket, data.as_bytes())?;
    recv_cmd_status(control_socket)
}
fn recv_cmd_status(control_socket: &UnixSeqpacket) -> Result<()> {
    let mut status: virtio_snd_hdr = Default::default();
    control_socket
        .recv(status.as_mut_bytes())
        .map_err(Error::ServerIOError)?;
    if status.code.to_native() == VIRTIO_SND_S_OK {
        Ok(())
    } else {
        Err(Error::CommandFailed(status.code.to_native()))
    }
}
fn seq_socket_send(socket: &UnixSeqpacket, data: &[u8]) -> Result<()> {
    loop {
        let send_res = socket.send(data);
        if let Err(e) = send_res {
            match e.kind() {
                IOErrorKind::Interrupted => continue,
                _ => return Err(Error::ServerIOError(e)),
            }
        }
        break;
    }
    Ok(())
}
const VIOS_VERSION: u32 = 2;
#[repr(C)]
#[derive(
    Copy,
    Clone,
    Default,
    FromBytes,
    Immutable,
    IntoBytes,
    KnownLayout,
    Serialize,
    Deserialize,
    PartialEq,
    Eq,
    Debug,
)]
struct VioSConfig {
    version: u32,
    jacks: u32,
    streams: u32,
    chmaps: u32,
}
struct BufferReleaseMsg {
    status: u32,
    latency: u32,
    consumed_len: usize,
}
#[repr(C)]
#[derive(Copy, Clone, FromBytes, Immutable, IntoBytes, KnownLayout)]
struct IoTransferMsg {
    io_xfer: virtio_snd_pcm_xfer,
    buffer_offset: u32,
    buffer_len: u32,
}
impl IoTransferMsg {
    fn new(stream_id: u32, buffer_offset: usize, buffer_len: usize) -> IoTransferMsg {
        IoTransferMsg {
            io_xfer: virtio_snd_pcm_xfer {
                stream_id: stream_id.into(),
            },
            buffer_offset: buffer_offset as u32,
            buffer_len: buffer_len as u32,
        }
    }
}
#[repr(C)]
#[derive(Copy, Clone, Default, FromBytes, Immutable, IntoBytes, KnownLayout)]
struct IoStatusMsg {
    status: virtio_snd_pcm_status,
    buffer_offset: u32,
    consumed_len: u32,
}