1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
// Copyright 2022 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::pin::Pin;
use anyhow::Context;
use base::AsRawDescriptor;
use base::RawDescriptor;
use cros_async::AsyncWrapper;
use cros_async::Executor;
use futures::Future;
use futures::FutureExt;
use vmm_vhost::connection::Listener;
use vmm_vhost::unix::SocketListener;
use vmm_vhost::BackendServer;
use crate::virtio::vhost::user::device::handler::sys::linux::run_handler;
use crate::virtio::vhost::user::device::listener::VhostUserListenerTrait;
/// On Unix we can listen to a socket.
pub struct VhostUserListener(SocketListener);
impl VhostUserListener {
/// Creates a new regular vhost-user listener, listening on `path`.
///
/// `keep_rds` can be specified to retrieve the raw descriptors that must be preserved for this
/// listener to keep working after forking.
pub fn new_socket(
path: &str,
keep_rds: Option<&mut Vec<RawDescriptor>>,
) -> anyhow::Result<Self> {
let listener = SocketListener::new(path, true)?;
if let Some(rds) = keep_rds {
rds.push(listener.as_raw_descriptor());
}
Ok(VhostUserListener(listener))
}
}
/// Attaches to an already bound socket via `listener` and handles incoming messages from the
/// VMM, which are dispatched to the device backend via the `VhostUserDevice` trait methods.
async fn run_with_handler(
mut listener: SocketListener,
handler: Box<dyn vmm_vhost::Backend>,
ex: &Executor,
) -> anyhow::Result<()> {
listener.set_nonblocking(true)?;
loop {
// If the listener is not ready on the first call to `accept` and returns `None`, we
// temporarily convert it into an async I/O source and yield until it signals there is
// input data awaiting, before trying again.
match listener
.accept()
.context("failed to accept an incoming connection")?
{
Some(connection) => {
let req_handler = BackendServer::new(connection, handler);
return run_handler(req_handler, ex).await;
}
None => {
// Nobody is on the other end yet, wait until we get a connection.
let async_waiter = ex
.async_from(AsyncWrapper::new(listener))
.context("failed to create async waiter")?;
async_waiter.wait_readable().await?;
// Retrieve the listener back so we can use it again.
listener = async_waiter.into_source().into_inner();
}
}
}
}
impl VhostUserListenerTrait for VhostUserListener {
/// Create a vhost-user listener from a UNIX domain socket path.
///
/// `keep_rds` can be specified to retrieve the raw descriptors that must be preserved for this
/// listener to keep working after forking.
fn new(path: &str, keep_rds: Option<&mut Vec<RawDescriptor>>) -> anyhow::Result<Self> {
Self::new_socket(path, keep_rds)
}
fn run_req_handler<'e>(
self,
handler: Box<dyn vmm_vhost::Backend>,
ex: &'e Executor,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + 'e>> {
async { run_with_handler(self.0, handler, ex).await }.boxed_local()
}
fn take_parent_process_resources(&mut self) -> Option<Box<dyn std::any::Any>> {
self.0.take_resources_for_parent()
}
}