devices/virtio/vhost_user_backend/
wl.rs

1// Copyright 2021 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::cell::RefCell;
6use std::collections::BTreeMap;
7use std::path::PathBuf;
8use std::rc::Rc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12
13use anyhow::bail;
14use anyhow::Context;
15use argh::FromArgs;
16use base::clone_descriptor;
17use base::error;
18use base::warn;
19use base::RawDescriptor;
20use base::SafeDescriptor;
21use base::Tube;
22use base::UnixSeqpacket;
23use cros_async::AsyncWrapper;
24use cros_async::EventAsync;
25use cros_async::Executor;
26use cros_async::IoSource;
27use hypervisor::ProtectionType;
28#[cfg(feature = "gbm")]
29use rutabaga_gfx::RutabagaGralloc;
30#[cfg(feature = "gbm")]
31use rutabaga_gfx::RutabagaGrallocBackendFlags;
32use snapshot::AnySnapshot;
33use vm_memory::GuestMemory;
34use vmm_vhost::message::VhostUserProtocolFeatures;
35use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
36
37use crate::virtio::base_features;
38use crate::virtio::device_constants::wl::NUM_QUEUES;
39use crate::virtio::device_constants::wl::VIRTIO_WL_F_SEND_FENCES;
40use crate::virtio::device_constants::wl::VIRTIO_WL_F_TRANS_FLAGS;
41use crate::virtio::device_constants::wl::VIRTIO_WL_F_USE_SHMEM;
42use crate::virtio::vhost_user_backend::handler::Error as DeviceError;
43use crate::virtio::vhost_user_backend::handler::VhostBackendReqConnection;
44use crate::virtio::vhost_user_backend::handler::VhostUserDevice;
45use crate::virtio::vhost_user_backend::handler::WorkerState;
46use crate::virtio::vhost_user_backend::BackendConnection;
47use crate::virtio::wl;
48use crate::virtio::Queue;
49use crate::virtio::SharedMemoryRegion;
50
51async fn run_out_queue(
52    queue: Rc<RefCell<Queue>>,
53    kick_evt: EventAsync,
54    wlstate: Rc<RefCell<wl::WlState>>,
55) {
56    loop {
57        if let Err(e) = kick_evt.next_val().await {
58            error!("Failed to read kick event for out queue: {}", e);
59            break;
60        }
61
62        wl::process_out_queue(&mut queue.borrow_mut(), &mut wlstate.borrow_mut());
63    }
64}
65
66async fn run_in_queue(
67    queue: Rc<RefCell<Queue>>,
68    kick_evt: EventAsync,
69    wlstate: Rc<RefCell<wl::WlState>>,
70    wlstate_ctx: IoSource<AsyncWrapper<SafeDescriptor>>,
71) {
72    loop {
73        if let Err(e) = wlstate_ctx.wait_readable().await {
74            error!(
75                "Failed to wait for inner WaitContext to become readable: {}",
76                e
77            );
78            break;
79        }
80
81        if wl::process_in_queue(&mut queue.borrow_mut(), &mut wlstate.borrow_mut())
82            == Err(wl::DescriptorsExhausted)
83        {
84            if let Err(e) = kick_evt.next_val().await {
85                error!("Failed to read kick event for in queue: {}", e);
86                break;
87            }
88        }
89    }
90}
91
92struct WlBackend {
93    ex: Executor,
94    wayland_paths: Option<BTreeMap<String, PathBuf>>,
95    resource_bridge: Option<Tube>,
96    use_transition_flags: bool,
97    use_send_vfd_v2: bool,
98    use_shmem: bool,
99    features: u64,
100    acked_features: u64,
101    wlstate: Option<Rc<RefCell<wl::WlState>>>,
102    workers: [Option<WorkerState<Rc<RefCell<Queue>>, ()>>; NUM_QUEUES],
103    backend_req_conn: Option<VhostBackendReqConnection>,
104}
105
106impl WlBackend {
107    fn new(
108        ex: &Executor,
109        wayland_paths: BTreeMap<String, PathBuf>,
110        resource_bridge: Option<Tube>,
111    ) -> WlBackend {
112        let features = base_features(ProtectionType::Unprotected)
113            | 1 << VIRTIO_WL_F_TRANS_FLAGS
114            | 1 << VIRTIO_WL_F_SEND_FENCES
115            | 1 << VIRTIO_WL_F_USE_SHMEM
116            | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
117        WlBackend {
118            ex: ex.clone(),
119            wayland_paths: Some(wayland_paths),
120            resource_bridge,
121            use_transition_flags: false,
122            use_send_vfd_v2: false,
123            use_shmem: false,
124            features,
125            acked_features: 0,
126            wlstate: None,
127            workers: Default::default(),
128            backend_req_conn: None,
129        }
130    }
131}
132
133impl VhostUserDevice for WlBackend {
134    fn max_queue_num(&self) -> usize {
135        NUM_QUEUES
136    }
137
138    fn features(&self) -> u64 {
139        self.features
140    }
141
142    fn ack_features(&mut self, value: u64) -> anyhow::Result<()> {
143        self.acked_features |= value;
144
145        if value & (1 << VIRTIO_WL_F_TRANS_FLAGS) != 0 {
146            self.use_transition_flags = true;
147        }
148        if value & (1 << VIRTIO_WL_F_SEND_FENCES) != 0 {
149            self.use_send_vfd_v2 = true;
150        }
151        if value & (1 << VIRTIO_WL_F_USE_SHMEM) != 0 {
152            self.use_shmem = true;
153        }
154
155        Ok(())
156    }
157
158    fn protocol_features(&self) -> VhostUserProtocolFeatures {
159        VhostUserProtocolFeatures::BACKEND_REQ | VhostUserProtocolFeatures::SHARED_MEMORY_REGIONS
160    }
161
162    fn read_config(&self, _offset: u64, _dst: &mut [u8]) {}
163
164    fn start_queue(&mut self, idx: usize, queue: Queue, _mem: GuestMemory) -> anyhow::Result<()> {
165        if self.workers[idx].is_some() {
166            warn!("Starting new queue handler without stopping old handler");
167            self.stop_queue(idx)?;
168        }
169
170        let kick_evt = queue
171            .event()
172            .try_clone()
173            .context("failed to clone queue event")?;
174        let kick_evt = EventAsync::new(kick_evt, &self.ex)
175            .context("failed to create EventAsync for kick_evt")?;
176
177        if !self.use_shmem {
178            bail!("Incompatible driver: vhost-user-wl requires shmem support");
179        }
180
181        // We use this de-structuring let binding to separate borrows so that the compiler doesn't
182        // think we're borrowing all of `self` in the closure below.
183        let WlBackend {
184            ref mut wayland_paths,
185            ref mut resource_bridge,
186            ref use_transition_flags,
187            ref use_send_vfd_v2,
188            ..
189        } = self;
190
191        #[cfg(feature = "gbm")]
192        let gralloc = RutabagaGralloc::new(RutabagaGrallocBackendFlags::new())
193            .context("Failed to initailize gralloc")?;
194        let wlstate = match &self.wlstate {
195            None => {
196                let mapper = self
197                    .backend_req_conn
198                    .as_ref()
199                    .context("No backend request connection found")?
200                    .shmem_mapper()
201                    .context("Shared memory mapper not available")?;
202
203                let wlstate = Rc::new(RefCell::new(wl::WlState::new(
204                    wayland_paths.take().expect("WlState already initialized"),
205                    mapper,
206                    *use_transition_flags,
207                    *use_send_vfd_v2,
208                    resource_bridge.take(),
209                    #[cfg(feature = "gbm")]
210                    gralloc,
211                    None, /* address_offset */
212                )));
213                self.wlstate = Some(wlstate.clone());
214                wlstate
215            }
216            Some(state) => state.clone(),
217        };
218        let queue = Rc::new(RefCell::new(queue));
219        let queue_task = match idx {
220            0 => {
221                let wlstate_ctx = clone_descriptor(wlstate.borrow().wait_ctx())
222                    .map(AsyncWrapper::new)
223                    .context("failed to clone inner WaitContext for WlState")
224                    .and_then(|ctx| {
225                        self.ex
226                            .async_from(ctx)
227                            .context("failed to create async WaitContext")
228                    })?;
229
230                self.ex
231                    .spawn_local(run_in_queue(queue.clone(), kick_evt, wlstate, wlstate_ctx))
232            }
233            1 => self
234                .ex
235                .spawn_local(run_out_queue(queue.clone(), kick_evt, wlstate)),
236            _ => bail!("attempted to start unknown queue: {}", idx),
237        };
238        self.workers[idx] = Some(WorkerState { queue_task, queue });
239        Ok(())
240    }
241
242    fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
243        if let Some(worker) = self.workers.get_mut(idx).and_then(Option::take) {
244            // Wait for queue_task to be aborted.
245            let _ = self.ex.run_until(worker.queue_task.cancel());
246
247            let queue = match Rc::try_unwrap(worker.queue) {
248                Ok(queue_cell) => queue_cell.into_inner(),
249                Err(_) => panic!("failed to recover queue from worker"),
250            };
251
252            Ok(queue)
253        } else {
254            Err(anyhow::Error::new(DeviceError::WorkerNotFound))
255        }
256    }
257
258    fn reset(&mut self) {
259        for worker in self.workers.iter_mut().filter_map(Option::take) {
260            let _ = self.ex.run_until(worker.queue_task.cancel());
261        }
262    }
263
264    fn get_shared_memory_region(&self) -> Option<SharedMemoryRegion> {
265        Some(SharedMemoryRegion {
266            id: wl::WL_SHMEM_ID,
267            length: wl::WL_SHMEM_SIZE,
268        })
269    }
270
271    fn set_backend_req_connection(&mut self, conn: VhostBackendReqConnection) {
272        if self.backend_req_conn.is_some() {
273            warn!("connection already established. Overwriting");
274        }
275
276        self.backend_req_conn = Some(conn);
277    }
278
279    fn enter_suspended_state(&mut self) -> anyhow::Result<()> {
280        // No non-queue workers.
281        Ok(())
282    }
283
284    fn snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
285        bail!("snapshot not implemented for vhost-user wl");
286    }
287
288    fn restore(&mut self, _data: AnySnapshot) -> anyhow::Result<()> {
289        bail!("snapshot not implemented for vhost-user wl");
290    }
291}
292
293pub fn parse_wayland_sock(value: &str) -> Result<(String, PathBuf), String> {
294    let mut components = value.split(',');
295    let path = PathBuf::from(match components.next() {
296        None => return Err("missing socket path".to_string()),
297        Some(c) => c,
298    });
299    let mut name = "";
300    for c in components {
301        let mut kv = c.splitn(2, '=');
302        let (kind, value) = match (kv.next(), kv.next()) {
303            (Some(kind), Some(value)) => (kind, value),
304            _ => return Err(format!("option must be of the form `kind=value`: {c}")),
305        };
306        match kind {
307            "name" => name = value,
308            _ => return Err(format!("unrecognized option: {kind}")),
309        }
310    }
311
312    Ok((name.to_string(), path))
313}
314
315#[derive(FromArgs)]
316#[argh(subcommand, name = "wl")]
317/// Wayland device
318pub struct Options {
319    #[argh(option, arg_name = "PATH", hidden_help)]
320    /// deprecated - please use --socket-path instead
321    socket: Option<String>,
322    #[argh(option, arg_name = "PATH")]
323    /// path to the vhost-user socket to bind to.
324    /// If this flag is set, --fd cannot be specified.
325    socket_path: Option<String>,
326    #[argh(option, arg_name = "FD")]
327    /// file descriptor of a connected vhost-user socket.
328    /// If this flag is set, --socket-path cannot be specified.
329    fd: Option<RawDescriptor>,
330
331    #[argh(option, from_str_fn(parse_wayland_sock), arg_name = "PATH[,name=NAME]")]
332    /// path to one or more Wayland sockets. The unnamed socket is used for
333    /// displaying virtual screens while the named ones are used for IPC
334    wayland_sock: Vec<(String, PathBuf)>,
335    #[argh(option, arg_name = "PATH")]
336    /// path to the GPU resource bridge
337    resource_bridge: Option<String>,
338}
339
340/// Starts a vhost-user wayland device.
341/// Returns an error if the given `args` is invalid or the device fails to run.
342pub fn run_wl_device(opts: Options) -> anyhow::Result<()> {
343    let Options {
344        wayland_sock,
345        socket,
346        socket_path,
347        fd,
348        resource_bridge,
349    } = opts;
350
351    let wayland_paths: BTreeMap<_, _> = wayland_sock.into_iter().collect();
352
353    let resource_bridge = resource_bridge
354        .map(|p| -> anyhow::Result<Tube> {
355            let deadline = Instant::now() + Duration::from_secs(5);
356            loop {
357                match UnixSeqpacket::connect(&p) {
358                    Ok(s) => return Ok(Tube::try_from(s).unwrap()),
359                    Err(e) => {
360                        if Instant::now() < deadline {
361                            thread::sleep(Duration::from_millis(50));
362                        } else {
363                            return Err(anyhow::Error::new(e));
364                        }
365                    }
366                }
367            }
368        })
369        .transpose()
370        .context("failed to connect to resource bridge socket")?;
371
372    let ex = Executor::new().context("failed to create executor")?;
373
374    let conn = BackendConnection::from_opts(socket.as_deref(), socket_path.as_deref(), fd)?;
375
376    let backend = WlBackend::new(&ex, wayland_paths, resource_bridge);
377    // run_until() returns an Result<Result<..>> which the ? operator lets us flatten.
378    ex.run_until(conn.run_backend(backend, &ex))?
379}