1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Copyright 2022 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use std::pin::Pin;

use anyhow::Context;
use base::AsRawDescriptor;
use base::RawDescriptor;
use cros_async::AsyncWrapper;
use cros_async::Executor;
use futures::Future;
use futures::FutureExt;
use vmm_vhost::connection::Listener;
use vmm_vhost::unix::SocketListener;
use vmm_vhost::BackendServer;

use crate::virtio::vhost::user::device::handler::sys::linux::run_handler;
use crate::virtio::vhost::user::device::listener::VhostUserListenerTrait;

/// On Unix we can listen to a socket.
pub struct VhostUserListener(SocketListener);

impl VhostUserListener {
    /// Creates a new regular vhost-user listener, listening on `path`.
    ///
    /// `keep_rds` can be specified to retrieve the raw descriptors that must be preserved for this
    /// listener to keep working after forking.
    pub fn new_socket(
        path: &str,
        keep_rds: Option<&mut Vec<RawDescriptor>>,
    ) -> anyhow::Result<Self> {
        let listener = SocketListener::new(path, true)?;
        if let Some(rds) = keep_rds {
            rds.push(listener.as_raw_descriptor());
        }

        Ok(VhostUserListener(listener))
    }
}

/// Attaches to an already bound socket via `listener` and handles incoming messages from the
/// VMM, which are dispatched to the device backend via the `VhostUserDevice` trait methods.
async fn run_with_handler(
    mut listener: SocketListener,
    handler: Box<dyn vmm_vhost::Backend>,
    ex: &Executor,
) -> anyhow::Result<()> {
    listener.set_nonblocking(true)?;

    loop {
        // If the listener is not ready on the first call to `accept` and returns `None`, we
        // temporarily convert it into an async I/O source and yield until it signals there is
        // input data awaiting, before trying again.
        match listener
            .accept()
            .context("failed to accept an incoming connection")?
        {
            Some(connection) => {
                let req_handler = BackendServer::new(connection, handler);
                return run_handler(req_handler, ex).await;
            }
            None => {
                // Nobody is on the other end yet, wait until we get a connection.
                let async_waiter = ex
                    .async_from(AsyncWrapper::new(listener))
                    .context("failed to create async waiter")?;
                async_waiter.wait_readable().await?;

                // Retrieve the listener back so we can use it again.
                listener = async_waiter.into_source().into_inner();
            }
        }
    }
}

impl VhostUserListenerTrait for VhostUserListener {
    /// Create a vhost-user listener from a UNIX domain socket path.
    ///
    /// `keep_rds` can be specified to retrieve the raw descriptors that must be preserved for this
    /// listener to keep working after forking.
    fn new(path: &str, keep_rds: Option<&mut Vec<RawDescriptor>>) -> anyhow::Result<Self> {
        Self::new_socket(path, keep_rds)
    }

    fn run_req_handler<'e>(
        self,
        handler: Box<dyn vmm_vhost::Backend>,
        ex: &'e Executor,
    ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + 'e>> {
        async { run_with_handler(self.0, handler, ex).await }.boxed_local()
    }

    fn take_parent_process_resources(&mut self) -> Option<Box<dyn std::any::Any>> {
        self.0.take_resources_for_parent()
    }
}