devices/virtio/vhost_user_backend/
net.rs1pub mod sys;
6
7use std::cell::OnceCell;
8
9use anyhow::anyhow;
10use anyhow::Context;
11use base::error;
12use base::AsRawDescriptors;
13use cros_async::EventAsync;
14use cros_async::Executor;
15use cros_async::IntoAsync;
16use cros_async::TaskHandle;
17use futures::channel::oneshot;
18use futures::pin_mut;
19use futures::select_biased;
20use futures::FutureExt;
21use net_util::TapT;
22use serde::Deserialize;
23use serde::Serialize;
24use snapshot::AnySnapshot;
25pub use sys::start_device as run_net_device;
26pub use sys::Options;
27use vm_memory::GuestMemory;
28use vmm_vhost::message::VhostUserProtocolFeatures;
29use zerocopy::IntoBytes;
30
31use crate::virtio;
32use crate::virtio::net::build_config;
33use crate::virtio::net::process_ctrl;
34use crate::virtio::net::process_tx;
35use crate::virtio::net::virtio_features_to_tap_offload;
36use crate::virtio::vhost_user_backend::handler::DeviceRequestHandler;
37use crate::virtio::vhost_user_backend::handler::Error as DeviceError;
38use crate::virtio::vhost_user_backend::handler::VhostUserDevice;
39use crate::virtio::vhost_user_backend::VhostUserDeviceBuilder;
40use crate::virtio::Queue;
41
42thread_local! {
43 pub(crate) static NET_EXECUTOR: OnceCell<Executor> = const { OnceCell::new() };
44}
45
46const MAX_QUEUE_NUM: usize = 3; async fn run_tx_queue<T: TapT>(
51 mut queue: Queue,
52 mut tap: T,
53 kick_evt: EventAsync,
54 mut stop_rx: oneshot::Receiver<()>,
55) -> Queue {
56 let kick_evt_future = kick_evt.next_val().fuse();
57 pin_mut!(kick_evt_future);
58 loop {
59 select_biased! {
60 kick = kick_evt_future => {
61 kick_evt_future.set(kick_evt.next_val().fuse());
62 if let Err(e) = kick {
63 error!("Failed to read kick event for tx queue: {}", e);
64 break;
65 }
66 }
67 _ = stop_rx => {
68 break;
69 }
70 }
71
72 process_tx(&mut queue, &mut tap);
73 }
74 queue
75}
76
77async fn run_ctrl_queue<T: TapT>(
78 mut queue: Queue,
79 mut tap: T,
80 kick_evt: EventAsync,
81 acked_features: u64,
82 vq_pairs: u16,
83 mut stop_rx: oneshot::Receiver<()>,
84) -> Queue {
85 let kick_evt_future = kick_evt.next_val().fuse();
86 pin_mut!(kick_evt_future);
87 loop {
88 select_biased! {
89 kick = kick_evt_future => {
90 kick_evt_future.set(kick_evt.next_val().fuse());
91 if let Err(e) = kick {
92 error!("Failed to read kick event for tx queue: {}", e);
93 break;
94 }
95 }
96 _ = stop_rx => {
97 break;
98 }
99 }
100
101 if let Err(e) = process_ctrl(&mut queue, &mut tap, acked_features, vq_pairs) {
102 error!("Failed to process ctrl queue: {}", e);
103 break;
104 }
105 }
106 queue
107}
108
109pub struct NetBackend<T: TapT + IntoAsync> {
110 tap: T,
111 avail_features: u64,
112 acked_features: u64,
113 mtu: u16,
114 #[cfg(all(windows, feature = "slirp"))]
115 slirp_kill_event: base::Event,
116 workers: [Option<(TaskHandle<Queue>, oneshot::Sender<()>)>; MAX_QUEUE_NUM],
117}
118
119#[derive(Serialize, Deserialize)]
120pub struct NetBackendSnapshot {
121 acked_feature: u64,
122}
123
124impl<T: 'static> NetBackend<T>
125where
126 T: TapT + IntoAsync,
127{
128 fn max_vq_pairs() -> usize {
129 MAX_QUEUE_NUM / 2
130 }
131}
132
133impl<T: 'static> AsRawDescriptors for NetBackend<T>
134where
135 T: TapT + IntoAsync + AsRawDescriptors,
136{
137 fn as_raw_descriptors(&self) -> Vec<base::RawDescriptor> {
138 self.tap.as_raw_descriptors()
139 }
140}
141
142impl<T: 'static> VhostUserDevice for NetBackend<T>
143where
144 T: TapT + IntoAsync,
145{
146 fn max_queue_num(&self) -> usize {
147 MAX_QUEUE_NUM
148 }
149
150 fn features(&self) -> u64 {
151 self.avail_features
152 }
153
154 fn ack_features(&mut self, value: u64) -> anyhow::Result<()> {
155 self.acked_features |= value;
156
157 self.tap
158 .set_offload(virtio_features_to_tap_offload(self.acked_features))
159 .context("failed to set tap offload to match features")?;
160
161 Ok(())
162 }
163
164 fn protocol_features(&self) -> VhostUserProtocolFeatures {
165 VhostUserProtocolFeatures::CONFIG | VhostUserProtocolFeatures::DEVICE_STATE
166 }
167
168 fn read_config(&self, offset: u64, data: &mut [u8]) {
169 let config_space = build_config(Self::max_vq_pairs() as u16, self.mtu, None);
170 virtio::copy_config(data, 0, config_space.as_bytes(), offset);
171 }
172
173 fn reset(&mut self) {}
174
175 fn start_queue(
176 &mut self,
177 idx: usize,
178 queue: virtio::Queue,
179 mem: GuestMemory,
180 ) -> anyhow::Result<()> {
181 sys::start_queue(self, idx, queue, mem)
182 }
183
184 fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> {
185 if let Some((task, stop_tx)) = self.workers.get_mut(idx).and_then(Option::take) {
186 if stop_tx.send(()).is_err() {
187 return Err(anyhow!("Failed to request stop for net queue future"));
188 }
189
190 let queue = NET_EXECUTOR
192 .with(|ex| {
193 let ex = ex.get().expect("Executor not initialized");
194 ex.run_until(task)
195 })
196 .context("Failed to resolve queue worker future")?;
197
198 Ok(queue)
199 } else {
200 Err(anyhow::Error::new(DeviceError::WorkerNotFound))
201 }
202 }
203
204 fn enter_suspended_state(&mut self) -> anyhow::Result<()> {
205 Ok(())
207 }
208
209 fn snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
210 AnySnapshot::to_any(NetBackendSnapshot {
211 acked_feature: self.acked_features,
212 })
213 .context("Failed to serialize NetBackendSnapshot")
214 }
215
216 fn restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
217 let net_backend_snapshot: NetBackendSnapshot =
218 AnySnapshot::from_any(data).context("Failed to deserialize NetBackendSnapshot")?;
219 self.acked_features = net_backend_snapshot.acked_feature;
220 Ok(())
221 }
222}
223
224impl<T> VhostUserDeviceBuilder for NetBackend<T>
225where
226 T: TapT + IntoAsync + 'static,
227{
228 fn build(self: Box<Self>, ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>> {
229 NET_EXECUTOR.with(|thread_ex| {
230 let _ = thread_ex.set(ex.clone());
231 });
232 let handler = DeviceRequestHandler::new(*self);
233
234 Ok(Box::new(handler))
235 }
236}