devices/virtio/vhost_user_backend/net/sys/
linux.rs1use std::net::Ipv4Addr;
6use std::str::FromStr;
7use std::thread;
8
9use anyhow::anyhow;
10use anyhow::bail;
11use anyhow::Context;
12use argh::FromArgs;
13use base::error;
14use base::info;
15use base::validate_raw_descriptor;
16use base::warn;
17use base::RawDescriptor;
18use cros_async::EventAsync;
19use cros_async::Executor;
20use cros_async::IntoAsync;
21use cros_async::IoSource;
22use futures::channel::oneshot;
23use futures::select_biased;
24use futures::FutureExt;
25use hypervisor::ProtectionType;
26use net_util::sys::linux::Tap;
27use net_util::MacAddress;
28use net_util::TapT;
29use virtio_sys::virtio_net;
30use vm_memory::GuestMemory;
31use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
32
33use crate::virtio;
34use crate::virtio::net::process_mrg_rx;
35use crate::virtio::net::process_rx;
36use crate::virtio::net::validate_and_configure_tap;
37use crate::virtio::net::NetError;
38use crate::virtio::net::PendingBuffer;
39use crate::virtio::vhost_user_backend::connection::sys::VhostUserListener;
40use crate::virtio::vhost_user_backend::connection::VhostUserConnectionTrait;
41use crate::virtio::vhost_user_backend::handler::VhostUserDevice;
42use crate::virtio::vhost_user_backend::net::run_ctrl_queue;
43use crate::virtio::vhost_user_backend::net::run_tx_queue;
44use crate::virtio::vhost_user_backend::net::NetBackend;
45use crate::virtio::vhost_user_backend::net::NET_EXECUTOR;
46use crate::virtio::Queue;
47
48struct TapConfig {
49 host_ip: Ipv4Addr,
50 netmask: Ipv4Addr,
51 mac: MacAddress,
52}
53
54impl FromStr for TapConfig {
55 type Err = anyhow::Error;
56
57 fn from_str(arg: &str) -> Result<Self, Self::Err> {
58 let args: Vec<&str> = arg.split(',').collect();
59 if args.len() != 3 {
60 bail!("TAP config must consist of 3 parts but {}", args.len());
61 }
62
63 let host_ip: Ipv4Addr = args[0]
64 .parse()
65 .map_err(|e| anyhow!("invalid IP address: {}", e))?;
66 let netmask: Ipv4Addr = args[1]
67 .parse()
68 .map_err(|e| anyhow!("invalid net mask: {}", e))?;
69 let mac: MacAddress = args[2]
70 .parse()
71 .map_err(|e| anyhow!("invalid MAC address: {}", e))?;
72
73 Ok(Self {
74 host_ip,
75 netmask,
76 mac,
77 })
78 }
79}
80
81impl<T: 'static> NetBackend<T>
82where
83 T: TapT + IntoAsync,
84{
85 fn new_from_config(config: &TapConfig, mrg_rxbuf: bool) -> anyhow::Result<Self> {
86 let tap = T::new(true , false )
88 .context("failed to create tap device")?;
89 tap.set_ip_addr(config.host_ip)
90 .context("failed to set IP address")?;
91 tap.set_netmask(config.netmask)
92 .context("failed to set netmask")?;
93 tap.set_mac_address(config.mac)
94 .context("failed to set MAC address")?;
95
96 Self::new(tap, mrg_rxbuf)
97 }
98
99 fn new_from_name(name: &str, mrg_rxbuf: bool) -> anyhow::Result<Self> {
100 let tap = T::new_with_name(name.as_bytes(), true, false).map_err(NetError::TapOpen)?;
101 Self::new(tap, mrg_rxbuf)
102 }
103
104 pub fn new_from_tap_fd(tap_fd: RawDescriptor, mrg_rxbuf: bool) -> anyhow::Result<Self> {
105 let tap_fd = validate_raw_descriptor(tap_fd).context("failed to validate tap fd")?;
106 let tap = unsafe { T::from_raw_descriptor(tap_fd).context("failed to create tap device")? };
109
110 Self::new(tap, mrg_rxbuf)
111 }
112
113 pub fn new(tap: T, mrg_rxbuf: bool) -> anyhow::Result<Self> {
114 let vq_pairs = Self::max_vq_pairs();
115 validate_and_configure_tap(&tap, vq_pairs as u16)
116 .context("failed to validate and configure tap")?;
117
118 let mut avail_features = virtio::base_features(ProtectionType::Unprotected)
119 | 1 << virtio_net::VIRTIO_NET_F_GUEST_CSUM
120 | 1 << virtio_net::VIRTIO_NET_F_CSUM
121 | 1 << virtio_net::VIRTIO_NET_F_CTRL_VQ
122 | 1 << virtio_net::VIRTIO_NET_F_CTRL_GUEST_OFFLOADS
123 | 1 << virtio_net::VIRTIO_NET_F_GUEST_TSO4
124 | 1 << virtio_net::VIRTIO_NET_F_GUEST_UFO
125 | 1 << virtio_net::VIRTIO_NET_F_HOST_TSO4
126 | 1 << virtio_net::VIRTIO_NET_F_HOST_UFO
127 | 1 << virtio_net::VIRTIO_NET_F_MTU
128 | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
129
130 if mrg_rxbuf {
131 avail_features |= 1 << virtio_net::VIRTIO_NET_F_MRG_RXBUF;
132 }
133
134 let mtu = tap.mtu()?;
135
136 Ok(Self {
137 tap,
138 avail_features,
139 acked_features: 0,
140 mtu,
141 workers: Default::default(),
142 })
143 }
144}
145
146async fn run_rx_queue<T: TapT>(
147 mut queue: Queue,
148 mut tap: IoSource<T>,
149 kick_evt: EventAsync,
150 mut stop_rx: oneshot::Receiver<()>,
151 mrg_rxbuf: bool,
152) -> Queue {
153 let mut pending_buffer = if mrg_rxbuf {
154 Some(PendingBuffer::new())
155 } else {
156 None
157 };
158 loop {
159 let pending_length = pending_buffer
160 .as_ref()
161 .map_or(0, |pending_buffer| pending_buffer.length);
162 if pending_length == 0 {
163 select_biased! {
164 rx = tap.wait_readable().fuse() => {
169 if let Err(e) = rx {
170 error!("Failed to wait for tap device to become readable: {}", e);
171 break;
172 }
173 }
174 _ = stop_rx => {
175 break;
176 }
177 }
178 }
179 let res = match pending_buffer.as_mut() {
180 Some(pending_buffer) => process_mrg_rx(&mut queue, tap.as_source_mut(), pending_buffer),
181 None => process_rx(&mut queue, tap.as_source_mut()),
182 };
183
184 match res {
185 Ok(()) => {}
186 Err(NetError::RxDescriptorsExhausted) => {
187 select_biased! {
188 kick_evt = kick_evt.next_val().fuse() => {
189 if let Err(e) = kick_evt {
190 error!("Failed to read kick event for rx queue: {}", e);
191 break;
192 }
193 },
194 _ = stop_rx => {
195 break;
196 }
197 };
198 }
199 Err(e) => {
200 error!("Failed to process rx queue: {}", e);
201 break;
202 }
203 }
204 }
205 queue
206}
207
208pub(in crate::virtio::vhost_user_backend::net) fn start_queue<T: 'static + IntoAsync + TapT>(
210 backend: &mut NetBackend<T>,
211 idx: usize,
212 queue: virtio::Queue,
213 _mem: GuestMemory,
214) -> anyhow::Result<()> {
215 if backend.workers[idx].is_some() {
216 warn!("Starting new queue handler without stopping old handler");
217 backend.stop_queue(idx)?;
218 }
219
220 NET_EXECUTOR.with(|ex| {
221 let ex = ex.get().expect("Executor not initialized");
223
224 let kick_evt = queue
225 .event()
226 .try_clone()
227 .context("failed to clone queue event")?;
228 let kick_evt =
229 EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
230 let tap = backend
231 .tap
232 .try_clone()
233 .context("failed to clone tap device")?;
234 let worker_tuple = match idx {
235 0 => {
236 let tap = ex
237 .async_from(tap)
238 .context("failed to create async tap device")?;
239
240 let mrg_rxbuf =
241 (backend.acked_features & 1 << virtio_net::VIRTIO_NET_F_MRG_RXBUF) != 0;
242 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
243 (
244 ex.spawn_local(run_rx_queue(queue, tap, kick_evt, stop_rx, mrg_rxbuf)),
245 stop_tx,
246 )
247 }
248 1 => {
249 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
250 (
251 ex.spawn_local(run_tx_queue(queue, tap, kick_evt, stop_rx)),
252 stop_tx,
253 )
254 }
255 2 => {
256 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
257 (
258 ex.spawn_local(run_ctrl_queue(
259 queue,
260 tap,
261 kick_evt,
262 backend.acked_features,
263 1, stop_rx,
265 )),
266 stop_tx,
267 )
268 }
269 _ => bail!("attempted to start unknown queue: {}", idx),
270 };
271
272 backend.workers[idx] = Some(worker_tuple);
273 Ok(())
274 })
275}
276
277#[derive(FromArgs)]
278#[argh(subcommand, name = "net")]
279pub struct Options {
281 #[argh(option, arg_name = "SOCKET_PATH,IP_ADDR,NET_MASK,MAC_ADDR")]
282 device: Vec<String>,
284 #[argh(option, arg_name = "SOCKET_PATH,TAP_FD")]
285 tap_fd: Vec<String>,
287 #[argh(option, arg_name = "SOCKET_PATH,TAP_NAME")]
288 tap_name: Vec<String>,
290 #[argh(switch, arg_name = "MRG_RXBUF")]
291 mrg_rxbuf: bool,
293}
294
295enum Connection {
296 Socket(String),
297}
298
299fn new_backend_from_device_arg(
300 arg: &str,
301 mrg_rxbuf: bool,
302) -> anyhow::Result<(String, NetBackend<Tap>)> {
303 let pos = match arg.find(',') {
304 Some(p) => p,
305 None => {
306 bail!("device must take comma-separated argument");
307 }
308 };
309 let conn = &arg[0..pos];
310 let cfg = &arg[pos + 1..]
311 .parse::<TapConfig>()
312 .context("failed to parse tap config")?;
313 let backend = NetBackend::<Tap>::new_from_config(cfg, mrg_rxbuf)
314 .context("failed to create NetBackend")?;
315 Ok((conn.to_string(), backend))
316}
317
318fn new_backend_from_tap_name(
319 arg: &str,
320 mrg_rxbuf: bool,
321) -> anyhow::Result<(String, NetBackend<Tap>)> {
322 let pos = match arg.find(',') {
323 Some(p) => p,
324 None => {
325 bail!("device must take comma-separated argument");
326 }
327 };
328 let conn = &arg[0..pos];
329 let tap_name = &arg[pos + 1..];
330
331 let backend = NetBackend::<Tap>::new_from_name(tap_name, mrg_rxbuf)
332 .context("failed to create NetBackend")?;
333 Ok((conn.to_string(), backend))
334}
335
336fn new_backend_from_tapfd_arg(
337 arg: &str,
338 mrg_rxbuf: bool,
339) -> anyhow::Result<(String, NetBackend<Tap>)> {
340 let pos = match arg.find(',') {
341 Some(p) => p,
342 None => {
343 bail!("'tap-fd' flag must take comma-separated argument");
344 }
345 };
346 let conn = &arg[0..pos];
347 let tap_fd = &arg[pos + 1..]
348 .parse::<i32>()
349 .context("failed to parse tap-fd")?;
350 let backend = NetBackend::<Tap>::new_from_tap_fd(*tap_fd, mrg_rxbuf)
351 .context("failed to create NetBackend")?;
352 Ok((conn.to_string(), backend))
353}
354
355pub fn start_device(opts: Options) -> anyhow::Result<()> {
358 let num_devices = opts.device.len() + opts.tap_fd.len() + opts.tap_name.len();
359
360 if num_devices == 0 {
361 bail!("no device option was passed");
362 }
363
364 let mut devices: Vec<(Connection, NetBackend<Tap>)> = Vec::with_capacity(num_devices);
365
366 for arg in opts.device.iter() {
368 devices.push(
369 new_backend_from_device_arg(arg, opts.mrg_rxbuf)
370 .map(|(s, backend)| (Connection::Socket(s), backend))?,
371 );
372 }
373
374 for arg in opts.tap_name.iter() {
375 devices.push(
376 new_backend_from_tap_name(arg, opts.mrg_rxbuf)
377 .map(|(s, backend)| (Connection::Socket(s), backend))?,
378 );
379 }
380 for arg in opts.tap_fd.iter() {
381 devices.push(
382 new_backend_from_tapfd_arg(arg, opts.mrg_rxbuf)
383 .map(|(s, backend)| (Connection::Socket(s), backend))?,
384 );
385 }
386
387 let mut threads = Vec::with_capacity(num_devices);
388
389 for (conn, backend) in devices {
390 let ex = Executor::new().context("failed to create executor")?;
391
392 match conn {
393 Connection::Socket(socket) => {
394 threads.push(thread::spawn(move || {
395 NET_EXECUTOR.with(|thread_ex| {
396 let _ = thread_ex.set(ex.clone());
397 });
398 let listener = VhostUserListener::new(&socket)?;
399 ex.run_until(listener.run_backend(backend, &ex))?
402 }));
403 }
404 };
405 }
406
407 info!("vhost-user net device ready, loop threads started.");
408 for t in threads {
409 match t.join() {
410 Ok(r) => r?,
411 Err(e) => bail!("thread panicked: {:?}", e),
412 }
413 }
414 Ok(())
415}