devices/virtio/vhost_user_backend/
net.rs

1// Copyright 2021 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod sys;
6
7use std::cell::OnceCell;
8
9use anyhow::anyhow;
10use anyhow::Context;
11use base::error;
12use base::AsRawDescriptors;
13use cros_async::EventAsync;
14use cros_async::Executor;
15use cros_async::IntoAsync;
16use cros_async::TaskHandle;
17use futures::channel::oneshot;
18use futures::pin_mut;
19use futures::select_biased;
20use futures::FutureExt;
21use net_util::TapT;
22use serde::Deserialize;
23use serde::Serialize;
24use snapshot::AnySnapshot;
25pub use sys::start_device as run_net_device;
26pub use sys::Options;
27use vm_memory::GuestMemory;
28use vmm_vhost::message::VhostUserProtocolFeatures;
29use zerocopy::IntoBytes;
30
31use crate::virtio;
32use crate::virtio::net::build_config;
33use crate::virtio::net::process_ctrl;
34use crate::virtio::net::process_tx;
35use crate::virtio::net::virtio_features_to_tap_offload;
36use crate::virtio::vhost_user_backend::handler::DeviceRequestHandler;
37use crate::virtio::vhost_user_backend::handler::Error as DeviceError;
38use crate::virtio::vhost_user_backend::handler::VhostUserDevice;
39use crate::virtio::vhost_user_backend::VhostUserDeviceBuilder;
40use crate::virtio::Queue;
41
42thread_local! {
43    pub(crate) static NET_EXECUTOR: OnceCell<Executor> = const { OnceCell::new() };
44}
45
46// TODO(b/188947559): Come up with better way to include these constants. Compiler errors happen
47// if they are kept in the trait.
48const MAX_QUEUE_NUM: usize = 3; /* rx, tx, ctrl */
49
50async fn run_tx_queue<T: TapT>(
51    mut queue: Queue,
52    mut tap: T,
53    kick_evt: EventAsync,
54    mut stop_rx: oneshot::Receiver<()>,
55) -> Queue {
56    let kick_evt_future = kick_evt.next_val().fuse();
57    pin_mut!(kick_evt_future);
58    loop {
59        select_biased! {
60            kick = kick_evt_future => {
61                kick_evt_future.set(kick_evt.next_val().fuse());
62                if let Err(e) = kick {
63                    error!("Failed to read kick event for tx queue: {}", e);
64                    break;
65                }
66            }
67            _ = stop_rx => {
68                break;
69            }
70        }
71
72        process_tx(&mut queue, &mut tap);
73    }
74    queue
75}
76
77async fn run_ctrl_queue<T: TapT>(
78    mut queue: Queue,
79    mut tap: T,
80    kick_evt: EventAsync,
81    acked_features: u64,
82    vq_pairs: u16,
83    mut stop_rx: oneshot::Receiver<()>,
84) -> Queue {
85    let kick_evt_future = kick_evt.next_val().fuse();
86    pin_mut!(kick_evt_future);
87    loop {
88        select_biased! {
89            kick = kick_evt_future => {
90                kick_evt_future.set(kick_evt.next_val().fuse());
91                if let Err(e) = kick {
92                    error!("Failed to read kick event for tx queue: {}", e);
93                    break;
94                }
95            }
96            _ = stop_rx => {
97                break;
98            }
99        }
100
101        if let Err(e) = process_ctrl(&mut queue, &mut tap, acked_features, vq_pairs) {
102            error!("Failed to process ctrl queue: {}", e);
103            break;
104        }
105    }
106    queue
107}
108
109pub struct NetBackend<T: TapT + IntoAsync> {
110    tap: T,
111    avail_features: u64,
112    acked_features: u64,
113    mtu: u16,
114    #[cfg(all(windows, feature = "slirp"))]
115    slirp_kill_event: base::Event,
116    workers: [Option<(TaskHandle<Queue>, oneshot::Sender<()>)>; MAX_QUEUE_NUM],
117}
118
119#[derive(Serialize, Deserialize)]
120pub struct NetBackendSnapshot {
121    acked_feature: u64,
122}
123
124impl<T: 'static> NetBackend<T>
125where
126    T: TapT + IntoAsync,
127{
128    fn max_vq_pairs() -> usize {
129        MAX_QUEUE_NUM / 2
130    }
131}
132
133impl<T: 'static> AsRawDescriptors for NetBackend<T>
134where
135    T: TapT + IntoAsync + AsRawDescriptors,
136{
137    fn as_raw_descriptors(&self) -> Vec<base::RawDescriptor> {
138        self.tap.as_raw_descriptors()
139    }
140}
141
142impl<T: 'static> VhostUserDevice for NetBackend<T>
143where
144    T: TapT + IntoAsync,
145{
146    fn max_queue_num(&self) -> usize {
147        MAX_QUEUE_NUM
148    }
149
150    fn features(&self) -> u64 {
151        self.avail_features
152    }
153
154    fn ack_features(&mut self, value: u64) -> anyhow::Result<()> {
155        self.acked_features |= value;
156
157        self.tap
158            .set_offload(virtio_features_to_tap_offload(self.acked_features))
159            .context("failed to set tap offload to match features")?;
160
161        Ok(())
162    }
163
164    fn protocol_features(&self) -> VhostUserProtocolFeatures {
165        VhostUserProtocolFeatures::CONFIG | VhostUserProtocolFeatures::DEVICE_STATE
166    }
167
168    fn read_config(&self, offset: u64, data: &mut [u8]) {
169        let config_space = build_config(Self::max_vq_pairs() as u16, self.mtu, None);
170        virtio::copy_config(data, 0, config_space.as_bytes(), offset);
171    }
172
173    fn reset(&mut self) {}
174
175    fn start_queue(
176        &mut self,
177        idx: usize,
178        queue: virtio::Queue,
179        mem: GuestMemory,
180    ) -> anyhow::Result<()> {
181        sys::start_queue(self, idx, queue, mem)
182    }
183
184    fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> {
185        if let Some((task, stop_tx)) = self.workers.get_mut(idx).and_then(Option::take) {
186            if stop_tx.send(()).is_err() {
187                return Err(anyhow!("Failed to request stop for net queue future"));
188            }
189
190            // Wait for queue_task to be aborted.
191            let queue = NET_EXECUTOR
192                .with(|ex| {
193                    let ex = ex.get().expect("Executor not initialized");
194                    ex.run_until(task)
195                })
196                .context("Failed to resolve queue worker future")?;
197
198            Ok(queue)
199        } else {
200            Err(anyhow::Error::new(DeviceError::WorkerNotFound))
201        }
202    }
203
204    fn enter_suspended_state(&mut self) -> anyhow::Result<()> {
205        // No non-queue workers.
206        Ok(())
207    }
208
209    fn snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
210        AnySnapshot::to_any(NetBackendSnapshot {
211            acked_feature: self.acked_features,
212        })
213        .context("Failed to serialize NetBackendSnapshot")
214    }
215
216    fn restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
217        let net_backend_snapshot: NetBackendSnapshot =
218            AnySnapshot::from_any(data).context("Failed to deserialize NetBackendSnapshot")?;
219        self.acked_features = net_backend_snapshot.acked_feature;
220        Ok(())
221    }
222}
223
224impl<T> VhostUserDeviceBuilder for NetBackend<T>
225where
226    T: TapT + IntoAsync + 'static,
227{
228    fn build(self: Box<Self>, ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>> {
229        NET_EXECUTOR.with(|thread_ex| {
230            let _ = thread_ex.set(ex.clone());
231        });
232        let handler = DeviceRequestHandler::new(*self);
233
234        Ok(Box::new(handler))
235    }
236}