devices/virtio/vhost_user_backend/net/sys/
linux.rs

1// Copyright 2022 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::net::Ipv4Addr;
6use std::str::FromStr;
7use std::thread;
8
9use anyhow::anyhow;
10use anyhow::bail;
11use anyhow::Context;
12use argh::FromArgs;
13use base::error;
14use base::info;
15use base::validate_raw_descriptor;
16use base::warn;
17use base::RawDescriptor;
18use cros_async::EventAsync;
19use cros_async::Executor;
20use cros_async::IntoAsync;
21use cros_async::IoSource;
22use futures::channel::oneshot;
23use futures::select_biased;
24use futures::FutureExt;
25use hypervisor::ProtectionType;
26use net_util::sys::linux::Tap;
27use net_util::MacAddress;
28use net_util::TapT;
29use virtio_sys::virtio_net;
30use vm_memory::GuestMemory;
31use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
32
33use crate::virtio;
34use crate::virtio::net::process_mrg_rx;
35use crate::virtio::net::process_rx;
36use crate::virtio::net::validate_and_configure_tap;
37use crate::virtio::net::NetError;
38use crate::virtio::net::PendingBuffer;
39use crate::virtio::vhost_user_backend::connection::sys::VhostUserListener;
40use crate::virtio::vhost_user_backend::connection::VhostUserConnectionTrait;
41use crate::virtio::vhost_user_backend::handler::VhostUserDevice;
42use crate::virtio::vhost_user_backend::net::run_ctrl_queue;
43use crate::virtio::vhost_user_backend::net::run_tx_queue;
44use crate::virtio::vhost_user_backend::net::NetBackend;
45use crate::virtio::vhost_user_backend::net::NET_EXECUTOR;
46use crate::virtio::Queue;
47
48struct TapConfig {
49    host_ip: Ipv4Addr,
50    netmask: Ipv4Addr,
51    mac: MacAddress,
52}
53
54impl FromStr for TapConfig {
55    type Err = anyhow::Error;
56
57    fn from_str(arg: &str) -> Result<Self, Self::Err> {
58        let args: Vec<&str> = arg.split(',').collect();
59        if args.len() != 3 {
60            bail!("TAP config must consist of 3 parts but {}", args.len());
61        }
62
63        let host_ip: Ipv4Addr = args[0]
64            .parse()
65            .map_err(|e| anyhow!("invalid IP address: {}", e))?;
66        let netmask: Ipv4Addr = args[1]
67            .parse()
68            .map_err(|e| anyhow!("invalid net mask: {}", e))?;
69        let mac: MacAddress = args[2]
70            .parse()
71            .map_err(|e| anyhow!("invalid MAC address: {}", e))?;
72
73        Ok(Self {
74            host_ip,
75            netmask,
76            mac,
77        })
78    }
79}
80
81impl<T: 'static> NetBackend<T>
82where
83    T: TapT + IntoAsync,
84{
85    fn new_from_config(config: &TapConfig, mrg_rxbuf: bool) -> anyhow::Result<Self> {
86        // Create a tap device.
87        let tap = T::new(true /* vnet_hdr */, false /* multi_queue */)
88            .context("failed to create tap device")?;
89        tap.set_ip_addr(config.host_ip)
90            .context("failed to set IP address")?;
91        tap.set_netmask(config.netmask)
92            .context("failed to set netmask")?;
93        tap.set_mac_address(config.mac)
94            .context("failed to set MAC address")?;
95
96        Self::new(tap, mrg_rxbuf)
97    }
98
99    fn new_from_name(name: &str, mrg_rxbuf: bool) -> anyhow::Result<Self> {
100        let tap = T::new_with_name(name.as_bytes(), true, false).map_err(NetError::TapOpen)?;
101        Self::new(tap, mrg_rxbuf)
102    }
103
104    pub fn new_from_tap_fd(tap_fd: RawDescriptor, mrg_rxbuf: bool) -> anyhow::Result<Self> {
105        let tap_fd = validate_raw_descriptor(tap_fd).context("failed to validate tap fd")?;
106        // SAFETY:
107        // Safe because we ensure that we get a unique handle to the fd.
108        let tap = unsafe { T::from_raw_descriptor(tap_fd).context("failed to create tap device")? };
109
110        Self::new(tap, mrg_rxbuf)
111    }
112
113    pub fn new(tap: T, mrg_rxbuf: bool) -> anyhow::Result<Self> {
114        let vq_pairs = Self::max_vq_pairs();
115        validate_and_configure_tap(&tap, vq_pairs as u16)
116            .context("failed to validate and configure tap")?;
117
118        let mut avail_features = virtio::base_features(ProtectionType::Unprotected)
119            | 1 << virtio_net::VIRTIO_NET_F_GUEST_CSUM
120            | 1 << virtio_net::VIRTIO_NET_F_CSUM
121            | 1 << virtio_net::VIRTIO_NET_F_CTRL_VQ
122            | 1 << virtio_net::VIRTIO_NET_F_CTRL_GUEST_OFFLOADS
123            | 1 << virtio_net::VIRTIO_NET_F_GUEST_TSO4
124            | 1 << virtio_net::VIRTIO_NET_F_GUEST_UFO
125            | 1 << virtio_net::VIRTIO_NET_F_HOST_TSO4
126            | 1 << virtio_net::VIRTIO_NET_F_HOST_UFO
127            | 1 << virtio_net::VIRTIO_NET_F_MTU
128            | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
129
130        if mrg_rxbuf {
131            avail_features |= 1 << virtio_net::VIRTIO_NET_F_MRG_RXBUF;
132        }
133
134        let mtu = tap.mtu()?;
135
136        Ok(Self {
137            tap,
138            avail_features,
139            acked_features: 0,
140            mtu,
141            workers: Default::default(),
142        })
143    }
144}
145
146async fn run_rx_queue<T: TapT>(
147    mut queue: Queue,
148    mut tap: IoSource<T>,
149    kick_evt: EventAsync,
150    mut stop_rx: oneshot::Receiver<()>,
151    mrg_rxbuf: bool,
152) -> Queue {
153    let mut pending_buffer = if mrg_rxbuf {
154        Some(PendingBuffer::new())
155    } else {
156        None
157    };
158    loop {
159        let pending_length = pending_buffer
160            .as_ref()
161            .map_or(0, |pending_buffer| pending_buffer.length);
162        if pending_length == 0 {
163            select_biased! {
164                // `tap.wait_readable()` requires an immutable reference to `tap`, but `process_rx`
165                // requires a mutable reference to `tap`, so this future needs to be recreated on
166                // every iteration. If more arms are added that doesn't break out of the loop, then
167                // this future could be recreated too many times.
168                rx = tap.wait_readable().fuse() => {
169                    if let Err(e) = rx {
170                        error!("Failed to wait for tap device to become readable: {}", e);
171                        break;
172                    }
173                }
174                _ = stop_rx => {
175                    break;
176                }
177            }
178        }
179        let res = match pending_buffer.as_mut() {
180            Some(pending_buffer) => process_mrg_rx(&mut queue, tap.as_source_mut(), pending_buffer),
181            None => process_rx(&mut queue, tap.as_source_mut()),
182        };
183
184        match res {
185            Ok(()) => {}
186            Err(NetError::RxDescriptorsExhausted) => {
187                select_biased! {
188                    kick_evt = kick_evt.next_val().fuse() => {
189                        if let Err(e) = kick_evt {
190                            error!("Failed to read kick event for rx queue: {}", e);
191                            break;
192                        }
193                    },
194                    _ = stop_rx => {
195                        break;
196                    }
197                };
198            }
199            Err(e) => {
200                error!("Failed to process rx queue: {}", e);
201                break;
202            }
203        }
204    }
205    queue
206}
207
208/// Platform specific impl of VhostUserDevice::start_queue.
209pub(in crate::virtio::vhost_user_backend::net) fn start_queue<T: 'static + IntoAsync + TapT>(
210    backend: &mut NetBackend<T>,
211    idx: usize,
212    queue: virtio::Queue,
213    _mem: GuestMemory,
214) -> anyhow::Result<()> {
215    if backend.workers[idx].is_some() {
216        warn!("Starting new queue handler without stopping old handler");
217        backend.stop_queue(idx)?;
218    }
219
220    NET_EXECUTOR.with(|ex| {
221        // Safe because the executor is initialized in main() below.
222        let ex = ex.get().expect("Executor not initialized");
223
224        let kick_evt = queue
225            .event()
226            .try_clone()
227            .context("failed to clone queue event")?;
228        let kick_evt =
229            EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
230        let tap = backend
231            .tap
232            .try_clone()
233            .context("failed to clone tap device")?;
234        let worker_tuple = match idx {
235            0 => {
236                let tap = ex
237                    .async_from(tap)
238                    .context("failed to create async tap device")?;
239
240                let mrg_rxbuf =
241                    (backend.acked_features & 1 << virtio_net::VIRTIO_NET_F_MRG_RXBUF) != 0;
242                let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
243                (
244                    ex.spawn_local(run_rx_queue(queue, tap, kick_evt, stop_rx, mrg_rxbuf)),
245                    stop_tx,
246                )
247            }
248            1 => {
249                let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
250                (
251                    ex.spawn_local(run_tx_queue(queue, tap, kick_evt, stop_rx)),
252                    stop_tx,
253                )
254            }
255            2 => {
256                let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
257                (
258                    ex.spawn_local(run_ctrl_queue(
259                        queue,
260                        tap,
261                        kick_evt,
262                        backend.acked_features,
263                        1, /* vq_pairs */
264                        stop_rx,
265                    )),
266                    stop_tx,
267                )
268            }
269            _ => bail!("attempted to start unknown queue: {}", idx),
270        };
271
272        backend.workers[idx] = Some(worker_tuple);
273        Ok(())
274    })
275}
276
277#[derive(FromArgs)]
278#[argh(subcommand, name = "net")]
279/// Net device
280pub struct Options {
281    #[argh(option, arg_name = "SOCKET_PATH,IP_ADDR,NET_MASK,MAC_ADDR")]
282    /// TAP device config. (e.g. "path/to/sock,10.0.2.2,255.255.255.0,12:34:56:78:9a:bc")
283    device: Vec<String>,
284    #[argh(option, arg_name = "SOCKET_PATH,TAP_FD")]
285    /// TAP FD with a socket path"
286    tap_fd: Vec<String>,
287    #[argh(option, arg_name = "SOCKET_PATH,TAP_NAME")]
288    /// TAP NAME with a socket path
289    tap_name: Vec<String>,
290    #[argh(switch, arg_name = "MRG_RXBUF")]
291    /// whether enable MRG_RXBUF feature.
292    mrg_rxbuf: bool,
293}
294
295enum Connection {
296    Socket(String),
297}
298
299fn new_backend_from_device_arg(
300    arg: &str,
301    mrg_rxbuf: bool,
302) -> anyhow::Result<(String, NetBackend<Tap>)> {
303    let pos = match arg.find(',') {
304        Some(p) => p,
305        None => {
306            bail!("device must take comma-separated argument");
307        }
308    };
309    let conn = &arg[0..pos];
310    let cfg = &arg[pos + 1..]
311        .parse::<TapConfig>()
312        .context("failed to parse tap config")?;
313    let backend = NetBackend::<Tap>::new_from_config(cfg, mrg_rxbuf)
314        .context("failed to create NetBackend")?;
315    Ok((conn.to_string(), backend))
316}
317
318fn new_backend_from_tap_name(
319    arg: &str,
320    mrg_rxbuf: bool,
321) -> anyhow::Result<(String, NetBackend<Tap>)> {
322    let pos = match arg.find(',') {
323        Some(p) => p,
324        None => {
325            bail!("device must take comma-separated argument");
326        }
327    };
328    let conn = &arg[0..pos];
329    let tap_name = &arg[pos + 1..];
330
331    let backend = NetBackend::<Tap>::new_from_name(tap_name, mrg_rxbuf)
332        .context("failed to create NetBackend")?;
333    Ok((conn.to_string(), backend))
334}
335
336fn new_backend_from_tapfd_arg(
337    arg: &str,
338    mrg_rxbuf: bool,
339) -> anyhow::Result<(String, NetBackend<Tap>)> {
340    let pos = match arg.find(',') {
341        Some(p) => p,
342        None => {
343            bail!("'tap-fd' flag must take comma-separated argument");
344        }
345    };
346    let conn = &arg[0..pos];
347    let tap_fd = &arg[pos + 1..]
348        .parse::<i32>()
349        .context("failed to parse tap-fd")?;
350    let backend = NetBackend::<Tap>::new_from_tap_fd(*tap_fd, mrg_rxbuf)
351        .context("failed to create NetBackend")?;
352    Ok((conn.to_string(), backend))
353}
354
355/// Starts a vhost-user net device.
356/// Returns an error if the given `args` is invalid or the device fails to run.
357pub fn start_device(opts: Options) -> anyhow::Result<()> {
358    let num_devices = opts.device.len() + opts.tap_fd.len() + opts.tap_name.len();
359
360    if num_devices == 0 {
361        bail!("no device option was passed");
362    }
363
364    let mut devices: Vec<(Connection, NetBackend<Tap>)> = Vec::with_capacity(num_devices);
365
366    // vhost-user
367    for arg in opts.device.iter() {
368        devices.push(
369            new_backend_from_device_arg(arg, opts.mrg_rxbuf)
370                .map(|(s, backend)| (Connection::Socket(s), backend))?,
371        );
372    }
373
374    for arg in opts.tap_name.iter() {
375        devices.push(
376            new_backend_from_tap_name(arg, opts.mrg_rxbuf)
377                .map(|(s, backend)| (Connection::Socket(s), backend))?,
378        );
379    }
380    for arg in opts.tap_fd.iter() {
381        devices.push(
382            new_backend_from_tapfd_arg(arg, opts.mrg_rxbuf)
383                .map(|(s, backend)| (Connection::Socket(s), backend))?,
384        );
385    }
386
387    let mut threads = Vec::with_capacity(num_devices);
388
389    for (conn, backend) in devices {
390        let ex = Executor::new().context("failed to create executor")?;
391
392        match conn {
393            Connection::Socket(socket) => {
394                threads.push(thread::spawn(move || {
395                    NET_EXECUTOR.with(|thread_ex| {
396                        let _ = thread_ex.set(ex.clone());
397                    });
398                    let listener = VhostUserListener::new(&socket)?;
399                    // run_until() returns an Result<Result<..>> which the ? operator lets us
400                    // flatten.
401                    ex.run_until(listener.run_backend(backend, &ex))?
402                }));
403            }
404        };
405    }
406
407    info!("vhost-user net device ready, loop threads started.");
408    for t in threads {
409        match t.join() {
410            Ok(r) => r?,
411            Err(e) => bail!("thread panicked: {:?}", e),
412        }
413    }
414    Ok(())
415}