devices/virtio/vhost_user_backend/connection/sys/linux/
listener.rs1use std::pin::Pin;
6
7use anyhow::Context;
8use base::AsRawDescriptor;
9use base::RawDescriptor;
10use cros_async::AsyncWrapper;
11use cros_async::Executor;
12use futures::Future;
13use futures::FutureExt;
14use vmm_vhost::connection::Listener;
15use vmm_vhost::unix::SocketListener;
16use vmm_vhost::BackendServer;
17
18use crate::virtio::vhost_user_backend::connection::VhostUserConnectionTrait;
19use crate::virtio::vhost_user_backend::handler::sys::linux::run_handler;
20
21pub struct VhostUserListener(SocketListener);
23
24impl VhostUserListener {
25 pub fn new(path: &str) -> anyhow::Result<Self> {
27 let listener = SocketListener::new(path, true)?;
28
29 Ok(VhostUserListener(listener))
30 }
31}
32
33impl AsRawDescriptor for VhostUserListener {
34 fn as_raw_descriptor(&self) -> RawDescriptor {
35 self.0.as_raw_descriptor()
36 }
37}
38
39async fn run_with_handler(
42 mut listener: SocketListener,
43 handler: Box<dyn vmm_vhost::Backend>,
44 ex: &Executor,
45) -> anyhow::Result<()> {
46 listener.set_nonblocking(true)?;
47
48 loop {
49 match listener
53 .accept()
54 .context("failed to accept an incoming connection")?
55 {
56 Some(connection) => {
57 let req_handler = BackendServer::new(connection, handler);
58 return run_handler(req_handler, ex).await;
59 }
60 None => {
61 let async_waiter = ex
63 .async_from(AsyncWrapper::new(listener))
64 .context("failed to create async waiter")?;
65 async_waiter.wait_readable().await?;
66
67 listener = async_waiter.into_source().into_inner();
69 }
70 }
71 }
72}
73
74impl VhostUserConnectionTrait for VhostUserListener {
75 fn run_req_handler<'e>(
76 self,
77 handler: Box<dyn vmm_vhost::Backend>,
78 ex: &'e Executor,
79 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + 'e>> {
80 async { run_with_handler(self.0, handler, ex).await }.boxed_local()
81 }
82
83 fn take_parent_process_resources(&mut self) -> Option<Box<dyn std::any::Any>> {
84 self.0.take_resources_for_parent()
85 }
86}