devices/virtio/block/
asynchronous.rs

1// Copyright 2021 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::cell::RefCell;
6use std::collections::BTreeMap;
7use std::collections::BTreeSet;
8use std::io;
9use std::io::Write;
10use std::mem::size_of;
11#[cfg(windows)]
12use std::num::NonZeroU32;
13use std::rc::Rc;
14use std::result;
15use std::sync::atomic::AtomicU64;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Context;
21use base::debug;
22use base::error;
23use base::info;
24use base::warn;
25use base::AsRawDescriptor;
26use base::Error as SysError;
27use base::Event;
28use base::RawDescriptor;
29use base::Result as SysResult;
30use base::Timer;
31use base::Tube;
32use base::TubeError;
33use base::WorkerThread;
34use cros_async::sync::RwLock as AsyncRwLock;
35use cros_async::AsyncError;
36use cros_async::AsyncTube;
37use cros_async::EventAsync;
38use cros_async::Executor;
39use cros_async::ExecutorKind;
40use cros_async::TimerAsync;
41use data_model::Le16;
42use data_model::Le32;
43use data_model::Le64;
44use disk::AsyncDisk;
45use disk::DiskFile;
46use futures::channel::mpsc;
47use futures::channel::oneshot;
48use futures::pin_mut;
49use futures::stream::FuturesUnordered;
50use futures::stream::StreamExt;
51use futures::FutureExt;
52use remain::sorted;
53use snapshot::AnySnapshot;
54use thiserror::Error as ThisError;
55use virtio_sys::virtio_config::VIRTIO_F_RING_PACKED;
56use vm_control::DiskControlCommand;
57use vm_control::DiskControlResult;
58use vm_memory::GuestMemory;
59use zerocopy::IntoBytes;
60
61use crate::virtio::async_utils;
62use crate::virtio::block::sys::*;
63use crate::virtio::block::DiskOption;
64use crate::virtio::copy_config;
65use crate::virtio::device_constants::block::virtio_blk_config;
66use crate::virtio::device_constants::block::virtio_blk_discard_write_zeroes;
67use crate::virtio::device_constants::block::virtio_blk_req_header;
68use crate::virtio::device_constants::block::VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP;
69use crate::virtio::device_constants::block::VIRTIO_BLK_F_BLK_SIZE;
70use crate::virtio::device_constants::block::VIRTIO_BLK_F_DISCARD;
71use crate::virtio::device_constants::block::VIRTIO_BLK_F_FLUSH;
72use crate::virtio::device_constants::block::VIRTIO_BLK_F_MQ;
73use crate::virtio::device_constants::block::VIRTIO_BLK_F_RO;
74use crate::virtio::device_constants::block::VIRTIO_BLK_F_SEG_MAX;
75use crate::virtio::device_constants::block::VIRTIO_BLK_F_WRITE_ZEROES;
76use crate::virtio::device_constants::block::VIRTIO_BLK_S_IOERR;
77use crate::virtio::device_constants::block::VIRTIO_BLK_S_OK;
78use crate::virtio::device_constants::block::VIRTIO_BLK_S_UNSUPP;
79use crate::virtio::device_constants::block::VIRTIO_BLK_T_DISCARD;
80use crate::virtio::device_constants::block::VIRTIO_BLK_T_FLUSH;
81use crate::virtio::device_constants::block::VIRTIO_BLK_T_GET_ID;
82use crate::virtio::device_constants::block::VIRTIO_BLK_T_IN;
83use crate::virtio::device_constants::block::VIRTIO_BLK_T_OUT;
84use crate::virtio::device_constants::block::VIRTIO_BLK_T_WRITE_ZEROES;
85use crate::virtio::DescriptorChain;
86use crate::virtio::DeviceType;
87use crate::virtio::Interrupt;
88use crate::virtio::Queue;
89use crate::virtio::Reader;
90use crate::virtio::VirtioDevice;
91use crate::virtio::Writer;
92use crate::PciAddress;
93
94const DEFAULT_QUEUE_SIZE: u16 = 256;
95const DEFAULT_NUM_QUEUES: u16 = 16;
96
97const SECTOR_SHIFT: u8 = 9;
98const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
99
100const MAX_DISCARD_SECTORS: u32 = u32::MAX;
101const MAX_WRITE_ZEROES_SECTORS: u32 = u32::MAX;
102// Arbitrary limits for number of discard/write zeroes segments.
103const MAX_DISCARD_SEG: u32 = 32;
104const MAX_WRITE_ZEROES_SEG: u32 = 32;
105// Hard-coded to 64 KiB (in 512-byte sectors) for now,
106// but this should probably be based on cluster size for qcow.
107const DISCARD_SECTOR_ALIGNMENT: u32 = 128;
108
109#[sorted]
110#[derive(ThisError, Debug)]
111enum ExecuteError {
112    #[error("failed to copy ID string: {0}")]
113    CopyId(io::Error),
114    #[error("failed to perform discard or write zeroes; sector={sector} num_sectors={num_sectors} flags={flags}; {ioerr:?}")]
115    DiscardWriteZeroes {
116        ioerr: Option<disk::Error>,
117        sector: u64,
118        num_sectors: u32,
119        flags: u32,
120    },
121    #[error("failed to flush: {0}")]
122    Flush(disk::Error),
123    #[error("not enough space in descriptor chain to write status")]
124    MissingStatus,
125    #[error("out of range")]
126    OutOfRange,
127    #[error("failed to read message: {0}")]
128    Read(io::Error),
129    #[error("io error reading {length} bytes from sector {sector}: {desc_error}")]
130    ReadIo {
131        length: usize,
132        sector: u64,
133        desc_error: disk::Error,
134    },
135    #[error("read only; request_type={request_type}")]
136    ReadOnly { request_type: u32 },
137    #[error("failed to recieve command message: {0}")]
138    ReceivingCommand(TubeError),
139    #[error("failed to send command response: {0}")]
140    SendingResponse(TubeError),
141    #[error("couldn't reset the timer: {0}")]
142    TimerReset(base::Error),
143    #[error("too many segments: {0} > {0}")]
144    TooManySegments(usize, usize),
145    #[error("unsupported ({0})")]
146    Unsupported(u32),
147    #[error("io error writing {length} bytes from sector {sector}: {desc_error}")]
148    WriteIo {
149        length: usize,
150        sector: u64,
151        desc_error: disk::Error,
152    },
153    #[error("failed to write request status: {0}")]
154    WriteStatus(io::Error),
155}
156
157enum LogLevel {
158    Debug,
159    Error,
160}
161
162impl ExecuteError {
163    fn status(&self) -> u8 {
164        match self {
165            ExecuteError::CopyId(_) => VIRTIO_BLK_S_IOERR,
166            ExecuteError::DiscardWriteZeroes { .. } => VIRTIO_BLK_S_IOERR,
167            ExecuteError::Flush(_) => VIRTIO_BLK_S_IOERR,
168            ExecuteError::MissingStatus => VIRTIO_BLK_S_IOERR,
169            ExecuteError::OutOfRange => VIRTIO_BLK_S_IOERR,
170            ExecuteError::Read(_) => VIRTIO_BLK_S_IOERR,
171            ExecuteError::ReadIo { .. } => VIRTIO_BLK_S_IOERR,
172            ExecuteError::ReadOnly { .. } => VIRTIO_BLK_S_IOERR,
173            ExecuteError::ReceivingCommand(_) => VIRTIO_BLK_S_IOERR,
174            ExecuteError::SendingResponse(_) => VIRTIO_BLK_S_IOERR,
175            ExecuteError::TimerReset(_) => VIRTIO_BLK_S_IOERR,
176            ExecuteError::TooManySegments(_, _) => VIRTIO_BLK_S_IOERR,
177            ExecuteError::WriteIo { .. } => VIRTIO_BLK_S_IOERR,
178            ExecuteError::WriteStatus(_) => VIRTIO_BLK_S_IOERR,
179            ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP,
180        }
181    }
182
183    fn log_level(&self) -> LogLevel {
184        match self {
185            // Log disk I/O errors at debug level to avoid flooding the logs.
186            ExecuteError::ReadIo { .. }
187            | ExecuteError::WriteIo { .. }
188            | ExecuteError::Flush { .. }
189            | ExecuteError::DiscardWriteZeroes { .. } => LogLevel::Debug,
190            // Log all other failures as errors.
191            _ => LogLevel::Error,
192        }
193    }
194}
195
196/// Errors that happen in block outside of executing a request.
197/// This includes errors during resize and flush operations.
198#[sorted]
199#[derive(ThisError, Debug)]
200enum ControlError {
201    #[error("failed to fdatasync the disk: {0}")]
202    FdatasyncDisk(disk::Error),
203    #[error("couldn't get a value from a timer for flushing: {0}")]
204    FlushTimer(AsyncError),
205}
206
207/// Maximum length of the virtio-block ID string field.
208const ID_LEN: usize = 20;
209
210/// Virtio block device identifier.
211/// This is an ASCII string terminated by a \0, unless all 20 bytes are used,
212/// in which case the \0 terminator is omitted.
213type BlockId = [u8; ID_LEN];
214
215/// Tracks the state of an anynchronous disk.
216struct DiskState {
217    disk_image: Box<dyn AsyncDisk>,
218    read_only: bool,
219    sparse: bool,
220    id: BlockId,
221    /// A DiskState is owned by each worker's executor and cannot be shared by workers, thus
222    /// `worker_shared_state` holds the state shared by workers in Arc.
223    worker_shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
224}
225
226/// Disk state which can be modified by other worker threads
227struct WorkerSharedState {
228    disk_size: Arc<AtomicU64>,
229}
230
231async fn process_one_request(
232    avail_desc: &mut DescriptorChain,
233    disk_state: &AsyncRwLock<DiskState>,
234    flush_timer: &RefCell<TimerAsync<Timer>>,
235    flush_timer_armed: &RefCell<bool>,
236) -> result::Result<usize, ExecuteError> {
237    let reader = &mut avail_desc.reader;
238    let writer = &mut avail_desc.writer;
239
240    // The last byte of the buffer is virtio_blk_req::status.
241    // Split it into a separate Writer so that status_writer is the final byte and
242    // the original writer is left with just the actual block I/O data.
243    let available_bytes = writer.available_bytes();
244    let status_offset = available_bytes
245        .checked_sub(1)
246        .ok_or(ExecuteError::MissingStatus)?;
247    let mut status_writer = writer.split_at(status_offset);
248
249    let status = match BlockAsync::execute_request(
250        reader,
251        writer,
252        disk_state,
253        flush_timer,
254        flush_timer_armed,
255    )
256    .await
257    {
258        Ok(()) => VIRTIO_BLK_S_OK,
259        Err(e) => {
260            match e.log_level() {
261                LogLevel::Debug => debug!("failed executing disk request: {:#}", e),
262                LogLevel::Error => error!("failed executing disk request: {:#}", e),
263            }
264            e.status()
265        }
266    };
267
268    status_writer
269        .write_all(&[status])
270        .map_err(ExecuteError::WriteStatus)?;
271    Ok(available_bytes)
272}
273
274/// Process one descriptor chain asynchronously.
275async fn process_one_chain(
276    queue: &RefCell<Queue>,
277    mut avail_desc: DescriptorChain,
278    disk_state: &AsyncRwLock<DiskState>,
279    flush_timer: &RefCell<TimerAsync<Timer>>,
280    flush_timer_armed: &RefCell<bool>,
281) {
282    let len = match process_one_request(&mut avail_desc, disk_state, flush_timer, flush_timer_armed)
283        .await
284    {
285        Ok(len) => len,
286        Err(e) => {
287            error!("block: failed to handle request: {:#}", e);
288            0
289        }
290    };
291
292    let mut queue = queue.borrow_mut();
293    queue.add_used_with_bytes_written(avail_desc, len as u32);
294    queue.trigger_interrupt();
295}
296
297// There is one async task running `handle_queue` per virtio queue in use.
298// Receives messages from the guest and queues a task to complete the operations with the async
299// executor.
300async fn handle_queue(
301    disk_state: Rc<AsyncRwLock<DiskState>>,
302    queue: Queue,
303    evt: EventAsync,
304    flush_timer: Rc<RefCell<TimerAsync<Timer>>>,
305    flush_timer_armed: Rc<RefCell<bool>>,
306    mut stop_rx: oneshot::Receiver<()>,
307) -> Queue {
308    let queue = RefCell::new(queue);
309    let mut background_tasks = FuturesUnordered::new();
310    let evt_future = evt.next_val().fuse();
311    pin_mut!(evt_future);
312    loop {
313        // Wait for the next signal from `evt` and process `background_tasks` in the meantime.
314        //
315        // NOTE: We can't call `evt.next_val()` directly in the `select!` expression. That would
316        // create a new future each time, which, in the completion-based async backends like
317        // io_uring, means we'd submit a new syscall each time (i.e. a race condition on the
318        // eventfd).
319        futures::select! {
320            _ = background_tasks.next() => continue,
321            res = evt_future => {
322                evt_future.set(evt.next_val().fuse());
323                if let Err(e) = res {
324                    error!("Failed to read the next queue event: {:#}", e);
325                    continue;
326                }
327            }
328            _ = stop_rx => {
329                // Process all the descriptors we've already popped from the queue so that we leave
330                // the queue in a consistent state.
331                background_tasks.collect::<()>().await;
332                return queue.into_inner();
333            }
334        };
335        while let Some(descriptor_chain) = queue.borrow_mut().pop() {
336            background_tasks.push(process_one_chain(
337                &queue,
338                descriptor_chain,
339                &disk_state,
340                &flush_timer,
341                &flush_timer_armed,
342            ));
343        }
344    }
345}
346
347async fn handle_command_tube(
348    command_tube: &Option<AsyncTube>,
349    interrupt: &RefCell<Option<Interrupt>>,
350    disk_state: Rc<AsyncRwLock<DiskState>>,
351) -> Result<(), ExecuteError> {
352    let command_tube = match command_tube {
353        Some(c) => c,
354        None => {
355            futures::future::pending::<()>().await;
356            return Ok(());
357        }
358    };
359    loop {
360        match command_tube.next().await {
361            Ok(command) => {
362                let resp = match command {
363                    DiskControlCommand::Resize { new_size } => resize(&disk_state, new_size).await,
364                };
365
366                let resp_clone = resp.clone();
367                command_tube
368                    .send(resp_clone)
369                    .await
370                    .map_err(ExecuteError::SendingResponse)?;
371                if let DiskControlResult::Ok = resp {
372                    if let Some(interrupt) = &*interrupt.borrow() {
373                        interrupt.signal_config_changed();
374                    }
375                }
376            }
377            Err(e) => return Err(ExecuteError::ReceivingCommand(e)),
378        }
379    }
380}
381
382async fn resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult {
383    // Acquire exclusive, mutable access to the state so the virtqueue task won't be able to read
384    // the state while resizing.
385    let disk_state = disk_state.lock().await;
386    // Prevent any other worker threads won't be able to do IO.
387    let worker_shared_state = Arc::clone(&disk_state.worker_shared_state);
388    let worker_shared_state = worker_shared_state.lock().await;
389
390    if disk_state.read_only {
391        error!("Attempted to resize read-only block device");
392        return DiskControlResult::Err(SysError::new(libc::EROFS));
393    }
394
395    info!("Resizing block device to {} bytes", new_size);
396
397    if let Err(e) = disk_state.disk_image.set_len(new_size) {
398        error!("Resizing disk failed! {:#}", e);
399        return DiskControlResult::Err(SysError::new(libc::EIO));
400    }
401
402    // Allocate new space if the disk image is not sparse.
403    if !disk_state.sparse {
404        if let Err(e) = disk_state.disk_image.allocate(0, new_size) {
405            error!("Allocating disk space after resize failed! {:#}", e);
406            return DiskControlResult::Err(SysError::new(libc::EIO));
407        }
408    }
409
410    if let Ok(new_disk_size) = disk_state.disk_image.get_len() {
411        worker_shared_state
412            .disk_size
413            .store(new_disk_size, Ordering::Release);
414    }
415    DiskControlResult::Ok
416}
417
418/// Periodically flushes the disk when the given timer fires.
419async fn flush_disk(
420    disk_state: Rc<AsyncRwLock<DiskState>>,
421    timer: TimerAsync<Timer>,
422    armed: Rc<RefCell<bool>>,
423) -> Result<(), ControlError> {
424    loop {
425        timer.wait().await.map_err(ControlError::FlushTimer)?;
426        if !*armed.borrow() {
427            continue;
428        }
429
430        // Reset armed before calling fdatasync to guarantee that IO requests that started after we
431        // call fdatasync will be committed eventually.
432        *armed.borrow_mut() = false;
433
434        disk_state
435            .read_lock()
436            .await
437            .disk_image
438            .fdatasync()
439            .await
440            .map_err(ControlError::FdatasyncDisk)?;
441    }
442}
443
444enum WorkerCmd {
445    StartQueue {
446        index: usize,
447        queue: Queue,
448    },
449    StopQueue {
450        index: usize,
451        // Once the queue is stopped, it will be sent back over `response_tx`.
452        // `None` indicates that there was no queue at the given index.
453        response_tx: oneshot::Sender<Option<Queue>>,
454    },
455    // Stop all queues without recovering the queues' state and without completing any queued up
456    // work .
457    AbortQueues {
458        // Once the queues are stopped, a `()` value will be sent back over `response_tx`.
459        response_tx: oneshot::Sender<()>,
460    },
461}
462
463// The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
464// to be processed.
465//
466// `disk_state` is wrapped by `AsyncRwLock`, which provides both shared and exclusive locks. It's
467// because the state can be read from the virtqueue task while the control task is processing a
468// resizing command.
469async fn run_worker(
470    ex: &Executor,
471    disk_state: &Rc<AsyncRwLock<DiskState>>,
472    control_tube: &Option<AsyncTube>,
473    mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>,
474    kill_evt: Event,
475) -> anyhow::Result<()> {
476    // One flush timer per disk.
477    let timer = Timer::new().expect("Failed to create a timer");
478    let flush_timer_armed = Rc::new(RefCell::new(false));
479
480    // Handles control requests.
481    let control_interrupt = RefCell::new(None);
482    let control = handle_command_tube(control_tube, &control_interrupt, disk_state.clone()).fuse();
483    pin_mut!(control);
484
485    // Handle all the queues in one sub-select call.
486    let flush_timer = Rc::new(RefCell::new(
487        TimerAsync::new(
488            // Call try_clone() to share the same underlying FD with the `flush_disk` task.
489            timer.try_clone().expect("Failed to clone flush_timer"),
490            ex,
491        )
492        .expect("Failed to create an async timer"),
493    ));
494
495    // Flushes the disk periodically.
496    let flush_timer2 = TimerAsync::new(timer, ex).expect("Failed to create an async timer");
497    let disk_flush = flush_disk(disk_state.clone(), flush_timer2, flush_timer_armed.clone()).fuse();
498    pin_mut!(disk_flush);
499
500    // Exit if the kill event is triggered.
501    let kill = async_utils::await_and_exit(ex, kill_evt).fuse();
502    pin_mut!(kill);
503
504    // Running queue handlers.
505    let mut queue_handlers = FuturesUnordered::new();
506    // Async stop functions for queue handlers, by queue index.
507    let mut queue_handler_stop_fns = std::collections::BTreeMap::new();
508
509    loop {
510        futures::select! {
511            _ = queue_handlers.next() => continue,
512            r = disk_flush => return r.context("failed to flush a disk"),
513            r = control => return r.context("failed to handle a control request"),
514            r = kill => return r.context("failed to wait on the kill event"),
515            worker_cmd = worker_rx.next() => {
516                match worker_cmd {
517                    None => anyhow::bail!("worker control channel unexpectedly closed"),
518                    Some(WorkerCmd::StartQueue{index, queue}) => {
519                        if control_interrupt.borrow().is_none() {
520                            *control_interrupt.borrow_mut() = Some(queue.interrupt().clone());
521                        }
522
523                        let (tx, rx) = oneshot::channel();
524                        let kick_evt = queue.event().try_clone().expect("Failed to clone queue event");
525                        let (handle_queue_future, remote_handle) = handle_queue(
526                            Rc::clone(disk_state),
527                            queue,
528                            EventAsync::new(kick_evt, ex).expect("Failed to create async event for queue"),
529                            Rc::clone(&flush_timer),
530                            Rc::clone(&flush_timer_armed),
531                            rx,
532                        ).remote_handle();
533                        let old_stop_fn = queue_handler_stop_fns.insert(index, move || {
534                            // Ask the handler to stop.
535                            tx.send(()).unwrap_or_else(|_| panic!("queue handler channel closed early"));
536                            // Wait for its return value.
537                            remote_handle
538                        });
539
540                        // If there was already a handler for this index, stop it before adding the
541                        // new handler future.
542                        if let Some(stop_fn) = old_stop_fn {
543                            warn!("Starting new queue handler without stopping old handler");
544                            // Unfortunately we can't just do `stop_fn().await` because the actual
545                            // work we are waiting on is in `queue_handlers`. So, run both.
546                            let mut fut = stop_fn().fuse();
547                            loop {
548                                futures::select! {
549                                    _ = queue_handlers.next() => continue,
550                                    _queue = fut => break,
551                                }
552                            }
553                        }
554
555                        queue_handlers.push(handle_queue_future);
556                    }
557                    Some(WorkerCmd::StopQueue{index, response_tx}) => {
558                        match queue_handler_stop_fns.remove(&index) {
559                            Some(stop_fn) => {
560                                // NOTE: This await is blocking the select loop. If we want to
561                                // support stopping queues concurrently, then it needs to be moved.
562                                // For now, keep it simple.
563                                //
564                                // Unfortunately we can't just do `stop_fn().await` because the
565                                // actual work we are waiting on is in `queue_handlers`. So, run
566                                // both.
567                                let mut fut = stop_fn().fuse();
568                                let queue = loop {
569                                    futures::select! {
570                                        _ = queue_handlers.next() => continue,
571                                        queue = fut => break queue,
572                                    }
573                                };
574
575                                // If this is the last queue, drop references to the interrupt so
576                                // that, when queues are started up again, we'll use the new
577                                // interrupt passed with the first queue.
578                                if queue_handlers.is_empty() {
579                                    *control_interrupt.borrow_mut() = None;
580                                }
581
582                                let _ = response_tx.send(Some(queue));
583                            }
584                            None => { let _ = response_tx.send(None); },
585                        }
586
587                    }
588                    Some(WorkerCmd::AbortQueues{response_tx}) => {
589                        queue_handlers.clear();
590                        queue_handler_stop_fns.clear();
591
592                        *control_interrupt.borrow_mut() = None;
593
594                        let _ = response_tx.send(());
595                    }
596                }
597            }
598        };
599    }
600}
601
602/// Virtio device for exposing block level read/write operations on a host file.
603pub struct BlockAsync {
604    // We need to make boot_index public bc the field is used by the main crate to determine boot
605    // order
606    boot_index: Option<usize>,
607    // `None` iff `self.worker_per_queue == false` and the worker thread is running.
608    disk_image: Option<Box<dyn DiskFile>>,
609    disk_size: Arc<AtomicU64>,
610    avail_features: u64,
611    read_only: bool,
612    sparse: bool,
613    seg_max: u32,
614    block_size: u32,
615    id: BlockId,
616    control_tube: Option<Tube>,
617    queue_sizes: Vec<u16>,
618    pub(super) executor_kind: ExecutorKind,
619    // If `worker_per_queue == true`, `worker_threads` contains the worker for each running queue
620    // by index. Otherwise, contains the monolithic worker for all queues at index 0.
621    //
622    // Once a thread is started, we never stop it, except when `BlockAsync` itself is dropped. That
623    // is because we cannot easily convert the `AsyncDisk` back to a `DiskFile` when backed by
624    // Overlapped I/O on Windows because the file becomes permanently associated with the IOCP
625    // instance of the async executor.
626    worker_threads: BTreeMap<usize, (WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)>,
627    shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
628    // Whether to run worker threads in parallel for each queue
629    worker_per_queue: bool,
630    // Indices of running queues.
631    // TODO: The worker already tracks this. Only need it here to stop queues on sleep. Maybe add a
632    // worker cmd to stop all at once, then we can delete this field.
633    activated_queues: BTreeSet<usize>,
634    #[cfg(windows)]
635    pub(super) io_concurrency: u32,
636    pci_address: Option<PciAddress>,
637}
638
639impl BlockAsync {
640    /// Create a new virtio block device that operates on the given AsyncDisk.
641    pub fn new(
642        base_features: u64,
643        disk_image: Box<dyn DiskFile>,
644        disk_option: &DiskOption,
645        control_tube: Option<Tube>,
646        queue_size: Option<u16>,
647        num_queues: Option<u16>,
648    ) -> SysResult<BlockAsync> {
649        let read_only = disk_option.read_only;
650        let sparse = disk_option.sparse;
651        let block_size = disk_option.block_size;
652        let packed_queue = disk_option.packed_queue;
653        let id = disk_option.id.unwrap_or_default();
654        let mut worker_per_queue = disk_option.multiple_workers;
655        // Automatically disable multiple workers if the disk image can't be cloned.
656        if worker_per_queue && disk_image.try_clone().is_err() {
657            base::warn!("multiple workers requested, but not supported by disk image type");
658            worker_per_queue = false;
659        }
660        let executor_kind = disk_option.async_executor.unwrap_or_default();
661        let boot_index = disk_option.bootindex;
662        #[cfg(windows)]
663        let io_concurrency = disk_option.io_concurrency.get();
664
665        if block_size % SECTOR_SIZE as u32 != 0 {
666            error!(
667                "Block size {} is not a multiple of {}.",
668                block_size, SECTOR_SIZE,
669            );
670            return Err(SysError::new(libc::EINVAL));
671        }
672        let disk_size = disk_image.get_len()?;
673        if disk_size % block_size as u64 != 0 {
674            warn!(
675                "Disk size {} is not a multiple of block size {}; \
676                 the remainder will not be visible to the guest.",
677                disk_size, block_size,
678            );
679        }
680        let num_queues = num_queues.unwrap_or(DEFAULT_NUM_QUEUES);
681        let multi_queue = match num_queues {
682            0 => panic!("Number of queues cannot be zero for a block device"),
683            1 => false,
684            _ => true,
685        };
686        let q_size = queue_size.unwrap_or(DEFAULT_QUEUE_SIZE);
687        if !q_size.is_power_of_two() {
688            error!("queue size {} is not a power of 2.", q_size);
689            return Err(SysError::new(libc::EINVAL));
690        }
691        let queue_sizes = vec![q_size; num_queues as usize];
692
693        let avail_features =
694            Self::build_avail_features(base_features, read_only, sparse, multi_queue, packed_queue);
695
696        let seg_max = get_seg_max(q_size);
697
698        let disk_size = Arc::new(AtomicU64::new(disk_size));
699        let shared_state = Arc::new(AsyncRwLock::new(WorkerSharedState {
700            disk_size: disk_size.clone(),
701        }));
702
703        Ok(BlockAsync {
704            disk_image: Some(disk_image),
705            disk_size,
706            avail_features,
707            read_only,
708            sparse,
709            seg_max,
710            block_size,
711            id,
712            queue_sizes,
713            worker_threads: BTreeMap::new(),
714            shared_state,
715            worker_per_queue,
716            control_tube,
717            executor_kind,
718            activated_queues: BTreeSet::new(),
719            boot_index,
720            #[cfg(windows)]
721            io_concurrency,
722            pci_address: disk_option.pci_address,
723        })
724    }
725
726    /// Returns the feature flags given the specified attributes.
727    fn build_avail_features(
728        base_features: u64,
729        read_only: bool,
730        sparse: bool,
731        multi_queue: bool,
732        packed_queue: bool,
733    ) -> u64 {
734        let mut avail_features = base_features;
735        if read_only {
736            avail_features |= 1 << VIRTIO_BLK_F_RO;
737        } else {
738            if sparse {
739                avail_features |= 1 << VIRTIO_BLK_F_DISCARD;
740            }
741            avail_features |= 1 << VIRTIO_BLK_F_FLUSH;
742            avail_features |= 1 << VIRTIO_BLK_F_WRITE_ZEROES;
743        }
744        avail_features |= 1 << VIRTIO_BLK_F_SEG_MAX;
745        avail_features |= 1 << VIRTIO_BLK_F_BLK_SIZE;
746        if multi_queue {
747            avail_features |= 1 << VIRTIO_BLK_F_MQ;
748        }
749        if packed_queue {
750            avail_features |= 1 << VIRTIO_F_RING_PACKED;
751        }
752        avail_features
753    }
754
755    // Execute a single block device request.
756    // `writer` includes the data region only; the status byte is not included.
757    // It is up to the caller to convert the result of this function into a status byte
758    // and write it to the expected location in guest memory.
759    async fn execute_request(
760        reader: &mut Reader,
761        writer: &mut Writer,
762        disk_state: &AsyncRwLock<DiskState>,
763        flush_timer: &RefCell<TimerAsync<Timer>>,
764        flush_timer_armed: &RefCell<bool>,
765    ) -> result::Result<(), ExecuteError> {
766        // Acquire immutable access to prevent tasks from resizing disk.
767        let disk_state = disk_state.read_lock().await;
768        // Acquire immutable access to prevent other worker threads from resizing disk.
769        let worker_shared_state = disk_state.worker_shared_state.read_lock().await;
770
771        let req_header: virtio_blk_req_header = reader.read_obj().map_err(ExecuteError::Read)?;
772
773        let req_type = req_header.req_type.to_native();
774        let sector = req_header.sector.to_native();
775
776        if disk_state.read_only && req_type != VIRTIO_BLK_T_IN && req_type != VIRTIO_BLK_T_GET_ID {
777            return Err(ExecuteError::ReadOnly {
778                request_type: req_type,
779            });
780        }
781
782        /// Check that a request accesses only data within the disk's current size.
783        /// All parameters are in units of bytes.
784        fn check_range(
785            io_start: u64,
786            io_length: u64,
787            disk_size: u64,
788        ) -> result::Result<(), ExecuteError> {
789            let io_end = io_start
790                .checked_add(io_length)
791                .ok_or(ExecuteError::OutOfRange)?;
792            if io_end > disk_size {
793                Err(ExecuteError::OutOfRange)
794            } else {
795                Ok(())
796            }
797        }
798
799        let disk_size = worker_shared_state.disk_size.load(Ordering::Relaxed);
800        match req_type {
801            VIRTIO_BLK_T_IN => {
802                let data_len = writer.available_bytes();
803                if data_len == 0 {
804                    return Ok(());
805                }
806                let offset = sector
807                    .checked_shl(u32::from(SECTOR_SHIFT))
808                    .ok_or(ExecuteError::OutOfRange)?;
809                check_range(offset, data_len as u64, disk_size)?;
810                let disk_image = &disk_state.disk_image;
811                writer
812                    .write_all_from_at_fut(&**disk_image, data_len, offset)
813                    .await
814                    .map_err(|desc_error| ExecuteError::ReadIo {
815                        length: data_len,
816                        sector,
817                        desc_error,
818                    })?;
819            }
820            VIRTIO_BLK_T_OUT => {
821                let data_len = reader.available_bytes();
822                if data_len == 0 {
823                    return Ok(());
824                }
825                let offset = sector
826                    .checked_shl(u32::from(SECTOR_SHIFT))
827                    .ok_or(ExecuteError::OutOfRange)?;
828                check_range(offset, data_len as u64, disk_size)?;
829                let disk_image = &disk_state.disk_image;
830                reader
831                    .read_exact_to_at_fut(&**disk_image, data_len, offset)
832                    .await
833                    .map_err(|desc_error| ExecuteError::WriteIo {
834                        length: data_len,
835                        sector,
836                        desc_error,
837                    })?;
838
839                if !*flush_timer_armed.borrow() {
840                    *flush_timer_armed.borrow_mut() = true;
841
842                    let flush_delay = Duration::from_secs(60);
843                    flush_timer
844                        .borrow_mut()
845                        .reset_oneshot(flush_delay)
846                        .map_err(ExecuteError::TimerReset)?;
847                }
848            }
849            VIRTIO_BLK_T_DISCARD | VIRTIO_BLK_T_WRITE_ZEROES => {
850                if req_type == VIRTIO_BLK_T_DISCARD && !disk_state.sparse {
851                    // Discard is a hint; if this is a non-sparse disk, just ignore it.
852                    return Ok(());
853                }
854
855                let seg_count =
856                    reader.available_bytes() / size_of::<virtio_blk_discard_write_zeroes>();
857                let seg_max = if req_type == VIRTIO_BLK_T_DISCARD {
858                    MAX_DISCARD_SEG as usize
859                } else {
860                    MAX_WRITE_ZEROES_SEG as usize
861                };
862                if seg_count > seg_max {
863                    return Err(ExecuteError::TooManySegments(seg_count, seg_max));
864                }
865
866                while reader.available_bytes() >= size_of::<virtio_blk_discard_write_zeroes>() {
867                    let seg: virtio_blk_discard_write_zeroes =
868                        reader.read_obj().map_err(ExecuteError::Read)?;
869
870                    let sector = seg.sector.to_native();
871                    let num_sectors = seg.num_sectors.to_native();
872                    let flags = seg.flags.to_native();
873
874                    let valid_flags = if req_type == VIRTIO_BLK_T_WRITE_ZEROES {
875                        VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP
876                    } else {
877                        0
878                    };
879
880                    if (flags & !valid_flags) != 0 {
881                        return Err(ExecuteError::DiscardWriteZeroes {
882                            ioerr: None,
883                            sector,
884                            num_sectors,
885                            flags,
886                        });
887                    }
888
889                    let offset = sector
890                        .checked_shl(u32::from(SECTOR_SHIFT))
891                        .ok_or(ExecuteError::OutOfRange)?;
892                    let length = u64::from(num_sectors)
893                        .checked_shl(u32::from(SECTOR_SHIFT))
894                        .ok_or(ExecuteError::OutOfRange)?;
895                    check_range(offset, length, disk_size)?;
896
897                    if req_type == VIRTIO_BLK_T_DISCARD {
898                        // Since Discard is just a hint and some filesystems may not implement
899                        // FALLOC_FL_PUNCH_HOLE, ignore punch_hole errors.
900                        let _ = disk_state.disk_image.punch_hole(offset, length).await;
901                    } else {
902                        disk_state
903                            .disk_image
904                            .write_zeroes_at(offset, length)
905                            .await
906                            .map_err(|e| ExecuteError::DiscardWriteZeroes {
907                                ioerr: Some(e),
908                                sector,
909                                num_sectors,
910                                flags,
911                            })?;
912                    }
913                }
914            }
915            VIRTIO_BLK_T_FLUSH => {
916                disk_state
917                    .disk_image
918                    .fdatasync()
919                    .await
920                    .map_err(ExecuteError::Flush)?;
921
922                if *flush_timer_armed.borrow() {
923                    flush_timer
924                        .borrow_mut()
925                        .clear()
926                        .map_err(ExecuteError::TimerReset)?;
927                    *flush_timer_armed.borrow_mut() = false;
928                }
929            }
930            VIRTIO_BLK_T_GET_ID => {
931                writer
932                    .write_all(&disk_state.id)
933                    .map_err(ExecuteError::CopyId)?;
934            }
935            t => return Err(ExecuteError::Unsupported(t)),
936        };
937        Ok(())
938    }
939
940    /// Builds and returns the config structure used to specify block features.
941    fn build_config_space(
942        disk_size: u64,
943        seg_max: u32,
944        block_size: u32,
945        num_queues: u16,
946    ) -> virtio_blk_config {
947        virtio_blk_config {
948            // If the image is not a multiple of the sector size, the tail bits are not exposed.
949            capacity: Le64::from(disk_size >> SECTOR_SHIFT),
950            seg_max: Le32::from(seg_max),
951            blk_size: Le32::from(block_size),
952            num_queues: Le16::from(num_queues),
953            max_discard_sectors: Le32::from(MAX_DISCARD_SECTORS),
954            discard_sector_alignment: Le32::from(DISCARD_SECTOR_ALIGNMENT),
955            max_write_zeroes_sectors: Le32::from(MAX_WRITE_ZEROES_SECTORS),
956            write_zeroes_may_unmap: 1,
957            max_discard_seg: Le32::from(MAX_DISCARD_SEG),
958            max_write_zeroes_seg: Le32::from(MAX_WRITE_ZEROES_SEG),
959            ..Default::default()
960        }
961    }
962
963    /// Get the worker for a queue, starting it if necessary.
964    // NOTE: Can't use `BTreeMap::entry` because it requires an exclusive ref for the whole branch.
965    #[allow(clippy::map_entry)]
966    fn start_worker(
967        &mut self,
968        idx: usize,
969    ) -> anyhow::Result<&(WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)> {
970        let key = if self.worker_per_queue { idx } else { 0 };
971        if self.worker_threads.contains_key(&key) {
972            return Ok(self.worker_threads.get(&key).unwrap());
973        }
974
975        let ex = self.create_executor();
976        let control_tube = self.control_tube.take();
977        let disk_image = if self.worker_per_queue {
978            self.disk_image
979                .as_ref()
980                .context("Failed to ref a disk image")?
981                .try_clone()
982                .context("Failed to clone a disk image")?
983        } else {
984            self.disk_image
985                .take()
986                .context("Failed to take a disk image")?
987        };
988        let read_only = self.read_only;
989        let sparse = self.sparse;
990        let id = self.id;
991        let worker_shared_state = self.shared_state.clone();
992
993        let (worker_tx, worker_rx) = mpsc::unbounded();
994        let worker_thread = WorkerThread::start("virtio_blk", move |kill_evt| {
995            let async_control =
996                control_tube.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
997
998            let async_image = match disk_image.to_async_disk(&ex) {
999                Ok(d) => d,
1000                Err(e) => panic!("Failed to create async disk {e:#}"),
1001            };
1002
1003            let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1004                disk_image: async_image,
1005                read_only,
1006                sparse,
1007                id,
1008                worker_shared_state,
1009            }));
1010
1011            if let Err(err_string) = ex
1012                .run_until(async {
1013                    let r = run_worker(&ex, &disk_state, &async_control, worker_rx, kill_evt).await;
1014                    // Flush any in-memory disk image state to file.
1015                    if let Err(e) = disk_state.lock().await.disk_image.flush().await {
1016                        error!("failed to flush disk image when stopping worker: {e:?}");
1017                    }
1018                    r
1019                })
1020                .expect("run_until failed")
1021            {
1022                error!("{:#}", err_string);
1023            }
1024        });
1025        match self.worker_threads.entry(key) {
1026            std::collections::btree_map::Entry::Occupied(_) => unreachable!(),
1027            std::collections::btree_map::Entry::Vacant(e) => {
1028                Ok(e.insert((worker_thread, worker_tx)))
1029            }
1030        }
1031    }
1032
1033    pub fn start_queue(
1034        &mut self,
1035        idx: usize,
1036        queue: Queue,
1037        _mem: GuestMemory,
1038    ) -> anyhow::Result<()> {
1039        let (_, worker_tx) = self.start_worker(idx)?;
1040        worker_tx
1041            .unbounded_send(WorkerCmd::StartQueue { index: idx, queue })
1042            .expect("worker channel closed early");
1043        self.activated_queues.insert(idx);
1044        Ok(())
1045    }
1046
1047    pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
1048        // TODO: Consider stopping the worker thread if this is the last queue managed by it. Then,
1049        // simplify `virtio_sleep` and/or `reset` methods.
1050        let (_, worker_tx) = self
1051            .worker_threads
1052            .get(if self.worker_per_queue { &idx } else { &0 })
1053            .context("worker not found")?;
1054        let (response_tx, response_rx) = oneshot::channel();
1055        worker_tx
1056            .unbounded_send(WorkerCmd::StopQueue {
1057                index: idx,
1058                response_tx,
1059            })
1060            .expect("worker channel closed early");
1061        let queue = cros_async::block_on(async {
1062            response_rx
1063                .await
1064                .expect("response_rx closed early")
1065                .context("queue not found")
1066        })?;
1067        self.activated_queues.remove(&idx);
1068        Ok(queue)
1069    }
1070}
1071
1072impl VirtioDevice for BlockAsync {
1073    fn keep_rds(&self) -> Vec<RawDescriptor> {
1074        let mut keep_rds = Vec::new();
1075
1076        if let Some(disk_image) = &self.disk_image {
1077            keep_rds.extend(disk_image.as_raw_descriptors());
1078        }
1079
1080        if let Some(control_tube) = &self.control_tube {
1081            keep_rds.push(control_tube.as_raw_descriptor());
1082        }
1083
1084        keep_rds
1085    }
1086
1087    fn features(&self) -> u64 {
1088        self.avail_features
1089    }
1090
1091    fn device_type(&self) -> DeviceType {
1092        DeviceType::Block
1093    }
1094
1095    fn queue_max_sizes(&self) -> &[u16] {
1096        &self.queue_sizes
1097    }
1098
1099    fn read_config(&self, offset: u64, data: &mut [u8]) {
1100        let config_space = {
1101            let disk_size = self.disk_size.load(Ordering::Acquire);
1102            Self::build_config_space(
1103                disk_size,
1104                self.seg_max,
1105                self.block_size,
1106                self.queue_sizes.len() as u16,
1107            )
1108        };
1109        copy_config(data, 0, config_space.as_bytes(), offset);
1110    }
1111
1112    fn activate(
1113        &mut self,
1114        mem: GuestMemory,
1115        _interrupt: Interrupt,
1116        queues: BTreeMap<usize, Queue>,
1117    ) -> anyhow::Result<()> {
1118        for (i, q) in queues {
1119            self.start_queue(i, q, mem.clone())?;
1120        }
1121        Ok(())
1122    }
1123
1124    fn reset(&mut self) -> anyhow::Result<()> {
1125        for (_, (_, worker_tx)) in self.worker_threads.iter_mut() {
1126            let (response_tx, response_rx) = oneshot::channel();
1127            worker_tx
1128                .unbounded_send(WorkerCmd::AbortQueues { response_tx })
1129                .expect("worker channel closed early");
1130            cros_async::block_on(async { response_rx.await.expect("response_rx closed early") });
1131        }
1132        self.activated_queues.clear();
1133        Ok(())
1134    }
1135
1136    fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
1137        // Reclaim the queues from workers.
1138        let mut queues = BTreeMap::new();
1139        for index in self.activated_queues.clone() {
1140            queues.insert(index, self.stop_queue(index)?);
1141        }
1142        if queues.is_empty() {
1143            return Ok(None); // Not activated.
1144        }
1145        Ok(Some(queues))
1146    }
1147
1148    fn virtio_wake(
1149        &mut self,
1150        queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
1151    ) -> anyhow::Result<()> {
1152        if let Some((mem, _interrupt, queues)) = queues_state {
1153            for (i, q) in queues {
1154                self.start_queue(i, q, mem.clone())?
1155            }
1156        }
1157        Ok(())
1158    }
1159
1160    fn virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
1161        // `virtio_sleep` ensures there is no pending state, except for the `Queue`s, which are
1162        // handled at a higher layer.
1163        AnySnapshot::to_any(())
1164    }
1165
1166    fn virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
1167        let () = AnySnapshot::from_any(data)?;
1168        Ok(())
1169    }
1170
1171    fn pci_address(&self) -> Option<PciAddress> {
1172        self.pci_address
1173    }
1174
1175    fn bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)> {
1176        self.boot_index
1177            .map(|s| (format!("scsi@{pci_slot}/disk@0,0").as_bytes().to_vec(), s))
1178    }
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183    use std::fs::File;
1184    use std::mem::size_of_val;
1185    use std::sync::atomic::AtomicU64;
1186
1187    use data_model::Le32;
1188    use data_model::Le64;
1189    use disk::SingleFileDisk;
1190    use hypervisor::ProtectionType;
1191    use tempfile::tempfile;
1192    use tempfile::TempDir;
1193    use vm_memory::GuestAddress;
1194
1195    use super::*;
1196    use crate::suspendable_virtio_tests;
1197    use crate::virtio::base_features;
1198    use crate::virtio::descriptor_utils::create_descriptor_chain;
1199    use crate::virtio::descriptor_utils::DescriptorType;
1200    use crate::virtio::QueueConfig;
1201
1202    #[test]
1203    fn read_size() {
1204        let f = tempfile().unwrap();
1205        f.set_len(0x1000).unwrap();
1206
1207        let features = base_features(ProtectionType::Unprotected);
1208        let disk_option = DiskOption::default();
1209        let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1210        let mut num_sectors = [0u8; 4];
1211        b.read_config(0, &mut num_sectors);
1212        // size is 0x1000, so num_sectors is 8 (4096/512).
1213        assert_eq!([0x08, 0x00, 0x00, 0x00], num_sectors);
1214        let mut msw_sectors = [0u8; 4];
1215        b.read_config(4, &mut msw_sectors);
1216        // size is 0x1000, so msw_sectors is 0.
1217        assert_eq!([0x00, 0x00, 0x00, 0x00], msw_sectors);
1218    }
1219
1220    #[test]
1221    fn read_block_size() {
1222        let f = tempfile().unwrap();
1223        f.set_len(0x1000).unwrap();
1224
1225        let features = base_features(ProtectionType::Unprotected);
1226        let disk_option = DiskOption {
1227            block_size: 4096,
1228            sparse: false,
1229            ..Default::default()
1230        };
1231        let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1232        let mut blk_size = [0u8; 4];
1233        b.read_config(20, &mut blk_size);
1234        // blk_size should be 4096 (0x1000).
1235        assert_eq!([0x00, 0x10, 0x00, 0x00], blk_size);
1236    }
1237
1238    #[test]
1239    fn read_features() {
1240        let tempdir = TempDir::new().unwrap();
1241        let mut path = tempdir.path().to_owned();
1242        path.push("disk_image");
1243
1244        // Feature bits 0-23 and 50-127 are specific for the device type, but
1245        // at the moment crosvm only supports 64 bits of feature bits.
1246        const DEVICE_FEATURE_BITS: u64 = 0xffffff;
1247
1248        // read-write block device
1249        {
1250            let f = File::create(&path).unwrap();
1251            let features = base_features(ProtectionType::Unprotected);
1252            let disk_option = DiskOption::default();
1253            let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1254            // writable device should set VIRTIO_BLK_F_FLUSH + VIRTIO_BLK_F_DISCARD
1255            // + VIRTIO_BLK_F_WRITE_ZEROES + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX
1256            // + VIRTIO_BLK_F_MQ
1257            assert_eq!(0x7244, b.features() & DEVICE_FEATURE_BITS);
1258        }
1259
1260        // read-write block device, non-sparse
1261        {
1262            let f = File::create(&path).unwrap();
1263            let features = base_features(ProtectionType::Unprotected);
1264            let disk_option = DiskOption {
1265                sparse: false,
1266                ..Default::default()
1267            };
1268            let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1269            // writable device should set VIRTIO_F_FLUSH + VIRTIO_BLK_F_RO
1270            // + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1271            assert_eq!(0x5244, b.features() & DEVICE_FEATURE_BITS);
1272        }
1273
1274        // read-only block device
1275        {
1276            let f = File::create(&path).unwrap();
1277            let features = base_features(ProtectionType::Unprotected);
1278            let disk_option = DiskOption {
1279                read_only: true,
1280                ..Default::default()
1281            };
1282            let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1283            // read-only device should set VIRTIO_BLK_F_RO
1284            // + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1285            assert_eq!(0x1064, b.features() & DEVICE_FEATURE_BITS);
1286        }
1287    }
1288
1289    #[test]
1290    fn check_pci_adress_configurability() {
1291        let f = tempfile().unwrap();
1292
1293        let features = base_features(ProtectionType::Unprotected);
1294        let disk_option = DiskOption {
1295            pci_address: Some(PciAddress {
1296                bus: 0,
1297                dev: 1,
1298                func: 1,
1299            }),
1300            ..Default::default()
1301        };
1302        let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1303
1304        assert_eq!(b.pci_address(), disk_option.pci_address);
1305    }
1306
1307    #[test]
1308    fn check_runtime_blk_queue_configurability() {
1309        let tempdir = TempDir::new().unwrap();
1310        let mut path = tempdir.path().to_owned();
1311        path.push("disk_image");
1312        let features = base_features(ProtectionType::Unprotected);
1313
1314        // Default case
1315        let f = File::create(&path).unwrap();
1316        let disk_option = DiskOption::default();
1317        let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1318        assert_eq!(
1319            [DEFAULT_QUEUE_SIZE; DEFAULT_NUM_QUEUES as usize],
1320            b.queue_max_sizes()
1321        );
1322
1323        // Single queue of size 128
1324        let f = File::create(&path).unwrap();
1325        let disk_option = DiskOption::default();
1326        let b = BlockAsync::new(
1327            features,
1328            Box::new(f),
1329            &disk_option,
1330            None,
1331            Some(128),
1332            Some(1),
1333        )
1334        .unwrap();
1335        assert_eq!([128; 1], b.queue_max_sizes());
1336        // Single queue device should not set VIRTIO_BLK_F_MQ
1337        assert_eq!(0, b.features() & (1 << VIRTIO_BLK_F_MQ) as u64);
1338    }
1339
1340    #[test]
1341    fn read_last_sector() {
1342        let ex = Executor::new().expect("creating an executor failed");
1343
1344        let f = tempfile().unwrap();
1345        let disk_size = 0x1000;
1346        f.set_len(disk_size).unwrap();
1347        let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1348
1349        let mem = Rc::new(
1350            GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1351                .expect("Creating guest memory failed."),
1352        );
1353
1354        let req_hdr = virtio_blk_req_header {
1355            req_type: Le32::from(VIRTIO_BLK_T_IN),
1356            reserved: Le32::from(0),
1357            sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1358        };
1359        mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1360            .expect("writing req failed");
1361
1362        let mut avail_desc = create_descriptor_chain(
1363            &mem,
1364            GuestAddress(0x100),  // Place descriptor chain at 0x100.
1365            GuestAddress(0x1000), // Describe buffer at 0x1000.
1366            vec![
1367                // Request header
1368                (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1369                // I/O buffer (1 sector of data)
1370                (DescriptorType::Writable, 512),
1371                // Request status
1372                (DescriptorType::Writable, 1),
1373            ],
1374            0,
1375        )
1376        .expect("create_descriptor_chain failed");
1377
1378        let timer = Timer::new().expect("Failed to create a timer");
1379        let flush_timer = Rc::new(RefCell::new(
1380            TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1381        ));
1382        let flush_timer_armed = Rc::new(RefCell::new(false));
1383
1384        let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1385            disk_image: Box::new(af),
1386            read_only: false,
1387            sparse: true,
1388            id: Default::default(),
1389            worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1390                disk_size: Arc::new(AtomicU64::new(disk_size)),
1391            })),
1392        }));
1393
1394        let fut = process_one_request(
1395            &mut avail_desc,
1396            &disk_state,
1397            &flush_timer,
1398            &flush_timer_armed,
1399        );
1400
1401        ex.run_until(fut)
1402            .expect("running executor failed")
1403            .expect("execute failed");
1404
1405        let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1406        let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1407        assert_eq!(status, VIRTIO_BLK_S_OK);
1408    }
1409
1410    #[test]
1411    fn read_beyond_last_sector() {
1412        let f = tempfile().unwrap();
1413        let disk_size = 0x1000;
1414        f.set_len(disk_size).unwrap();
1415        let mem = Rc::new(
1416            GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1417                .expect("Creating guest memory failed."),
1418        );
1419
1420        let req_hdr = virtio_blk_req_header {
1421            req_type: Le32::from(VIRTIO_BLK_T_IN),
1422            reserved: Le32::from(0),
1423            sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1424        };
1425        mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1426            .expect("writing req failed");
1427
1428        let mut avail_desc = create_descriptor_chain(
1429            &mem,
1430            GuestAddress(0x100),  // Place descriptor chain at 0x100.
1431            GuestAddress(0x1000), // Describe buffer at 0x1000.
1432            vec![
1433                // Request header
1434                (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1435                // I/O buffer (2 sectors of data - overlap the end of the disk).
1436                (DescriptorType::Writable, 512 * 2),
1437                // Request status
1438                (DescriptorType::Writable, 1),
1439            ],
1440            0,
1441        )
1442        .expect("create_descriptor_chain failed");
1443
1444        let ex = Executor::new().expect("creating an executor failed");
1445
1446        let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1447        let timer = Timer::new().expect("Failed to create a timer");
1448        let flush_timer = Rc::new(RefCell::new(
1449            TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1450        ));
1451        let flush_timer_armed = Rc::new(RefCell::new(false));
1452        let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1453            disk_image: Box::new(af),
1454            read_only: false,
1455            sparse: true,
1456            id: Default::default(),
1457            worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1458                disk_size: Arc::new(AtomicU64::new(disk_size)),
1459            })),
1460        }));
1461
1462        let fut = process_one_request(
1463            &mut avail_desc,
1464            &disk_state,
1465            &flush_timer,
1466            &flush_timer_armed,
1467        );
1468
1469        ex.run_until(fut)
1470            .expect("running executor failed")
1471            .expect("execute failed");
1472
1473        let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512 * 2) as u64);
1474        let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1475        assert_eq!(status, VIRTIO_BLK_S_IOERR);
1476    }
1477
1478    #[test]
1479    fn get_id() {
1480        let ex = Executor::new().expect("creating an executor failed");
1481
1482        let f = tempfile().unwrap();
1483        let disk_size = 0x1000;
1484        f.set_len(disk_size).unwrap();
1485
1486        let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1487            .expect("Creating guest memory failed.");
1488
1489        let req_hdr = virtio_blk_req_header {
1490            req_type: Le32::from(VIRTIO_BLK_T_GET_ID),
1491            reserved: Le32::from(0),
1492            sector: Le64::from(0),
1493        };
1494        mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1495            .expect("writing req failed");
1496
1497        let mut avail_desc = create_descriptor_chain(
1498            &mem,
1499            GuestAddress(0x100),  // Place descriptor chain at 0x100.
1500            GuestAddress(0x1000), // Describe buffer at 0x1000.
1501            vec![
1502                // Request header
1503                (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1504                // I/O buffer (20 bytes for serial)
1505                (DescriptorType::Writable, 20),
1506                // Request status
1507                (DescriptorType::Writable, 1),
1508            ],
1509            0,
1510        )
1511        .expect("create_descriptor_chain failed");
1512
1513        let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1514        let timer = Timer::new().expect("Failed to create a timer");
1515        let flush_timer = Rc::new(RefCell::new(
1516            TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1517        ));
1518        let flush_timer_armed = Rc::new(RefCell::new(false));
1519
1520        let id = b"a20-byteserialnumber";
1521
1522        let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1523            disk_image: Box::new(af),
1524            read_only: false,
1525            sparse: true,
1526            id: *id,
1527            worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1528                disk_size: Arc::new(AtomicU64::new(disk_size)),
1529            })),
1530        }));
1531
1532        let fut = process_one_request(
1533            &mut avail_desc,
1534            &disk_state,
1535            &flush_timer,
1536            &flush_timer_armed,
1537        );
1538
1539        ex.run_until(fut)
1540            .expect("running executor failed")
1541            .expect("execute failed");
1542
1543        let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1544        let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1545        assert_eq!(status, VIRTIO_BLK_S_OK);
1546
1547        let id_offset = GuestAddress(0x1000 + size_of_val(&req_hdr) as u64);
1548        let returned_id = mem.read_obj_from_addr::<[u8; 20]>(id_offset).unwrap();
1549        assert_eq!(returned_id, *id);
1550    }
1551
1552    #[test]
1553    fn reset_and_reactivate_single_worker() {
1554        reset_and_reactivate(false, None);
1555    }
1556
1557    #[test]
1558    fn reset_and_reactivate_multiple_workers() {
1559        reset_and_reactivate(true, None);
1560    }
1561
1562    #[test]
1563    #[cfg(windows)]
1564    fn reset_and_reactivate_overrlapped_io() {
1565        reset_and_reactivate(
1566            false,
1567            Some(
1568                cros_async::sys::windows::ExecutorKindSys::Overlapped { concurrency: None }.into(),
1569            ),
1570        );
1571    }
1572
1573    fn reset_and_reactivate(
1574        enables_multiple_workers: bool,
1575        async_executor: Option<cros_async::ExecutorKind>,
1576    ) {
1577        // Create an empty disk image
1578        let f = tempfile::NamedTempFile::new().unwrap();
1579        f.as_file().set_len(0x1000).unwrap();
1580        // Close the file so that it is possible for the disk implementation to take exclusive
1581        // access when opening it.
1582        let path: tempfile::TempPath = f.into_temp_path();
1583
1584        // Create an empty guest memory
1585        let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1586            .expect("Creating guest memory failed.");
1587
1588        // Create a control tube.
1589        // NOTE: We don't want to drop the vmm half of the tube. That would cause the worker thread
1590        // will immediately fail, which isn't what we want to test in this case.
1591        let (_control_tube, control_tube_device) = Tube::pair().unwrap();
1592
1593        // Create a BlockAsync to test
1594        let features = base_features(ProtectionType::Unprotected);
1595        let id = b"Block serial number\0";
1596        let disk_option = DiskOption {
1597            path: path.to_path_buf(),
1598            read_only: true,
1599            id: Some(*id),
1600            sparse: false,
1601            multiple_workers: enables_multiple_workers,
1602            async_executor,
1603            ..Default::default()
1604        };
1605        let disk_image = disk_option.open().unwrap();
1606        let mut b = BlockAsync::new(
1607            features,
1608            disk_image,
1609            &disk_option,
1610            Some(control_tube_device),
1611            None,
1612            None,
1613        )
1614        .unwrap();
1615
1616        let interrupt = Interrupt::new_for_test();
1617
1618        // activate with queues of an arbitrary size.
1619        let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1620        q0.set_ready(true);
1621        let q0 = q0
1622            .activate(&mem, Event::new().unwrap(), interrupt.clone())
1623            .expect("QueueConfig::activate");
1624
1625        let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1626        q1.set_ready(true);
1627        let q1 = q1
1628            .activate(&mem, Event::new().unwrap(), interrupt.clone())
1629            .expect("QueueConfig::activate");
1630
1631        b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1632            .expect("activate should succeed");
1633        // assert resources are consumed
1634        if !enables_multiple_workers {
1635            assert!(
1636                b.disk_image.is_none(),
1637                "BlockAsync should not have a disk image"
1638            );
1639        }
1640        assert!(
1641            b.control_tube.is_none(),
1642            "BlockAsync should not have a control tube"
1643        );
1644        assert_eq!(
1645            b.worker_threads.len(),
1646            if enables_multiple_workers { 2 } else { 1 }
1647        );
1648
1649        // reset and assert resources are still not back (should be in the worker thread)
1650        assert!(b.reset().is_ok(), "reset should succeed");
1651        if !enables_multiple_workers {
1652            assert!(
1653                b.disk_image.is_none(),
1654                "BlockAsync should not have a disk image"
1655            );
1656        }
1657        assert!(
1658            b.control_tube.is_none(),
1659            "BlockAsync should not have a control tube"
1660        );
1661        assert_eq!(
1662            b.worker_threads.len(),
1663            if enables_multiple_workers { 2 } else { 1 }
1664        );
1665        assert_eq!(b.id, *b"Block serial number\0");
1666
1667        // re-activate should succeed
1668        let interrupt = Interrupt::new_for_test();
1669        let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1670        q0.set_ready(true);
1671        let q0 = q0
1672            .activate(&mem, Event::new().unwrap(), interrupt.clone())
1673            .expect("QueueConfig::activate");
1674
1675        let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1676        q1.set_ready(true);
1677        let q1 = q1
1678            .activate(&mem, Event::new().unwrap(), interrupt.clone())
1679            .expect("QueueConfig::activate");
1680
1681        b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1682            .expect("re-activate should succeed");
1683    }
1684
1685    #[test]
1686    fn resize_with_single_worker() {
1687        resize(false);
1688    }
1689
1690    #[test]
1691    fn resize_with_multiple_workers() {
1692        // Test resize handled by one worker affect the whole state
1693        resize(true);
1694    }
1695
1696    fn resize(enables_multiple_workers: bool) {
1697        // disk image size constants
1698        let original_size = 0x1000;
1699        let resized_size = 0x2000;
1700
1701        // Create an empty disk image
1702        let f = tempfile().unwrap();
1703        f.set_len(original_size).unwrap();
1704        let disk_image: Box<dyn DiskFile> = Box::new(f);
1705        assert_eq!(disk_image.get_len().unwrap(), original_size);
1706
1707        // Create an empty guest memory
1708        let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1709            .expect("Creating guest memory failed.");
1710
1711        // Create a control tube
1712        let (control_tube, control_tube_device) = Tube::pair().unwrap();
1713
1714        // Create a BlockAsync to test
1715        let features = base_features(ProtectionType::Unprotected);
1716        let disk_option = DiskOption {
1717            multiple_workers: enables_multiple_workers,
1718            ..Default::default()
1719        };
1720        let mut b = BlockAsync::new(
1721            features,
1722            disk_image.try_clone().unwrap(),
1723            &disk_option,
1724            Some(control_tube_device),
1725            None,
1726            None,
1727        )
1728        .unwrap();
1729
1730        let interrupt = Interrupt::new_for_test();
1731
1732        // activate with queues of an arbitrary size.
1733        let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1734        q0.set_ready(true);
1735        let q0 = q0
1736            .activate(&mem, Event::new().unwrap(), interrupt.clone())
1737            .expect("QueueConfig::activate");
1738
1739        let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1740        q1.set_ready(true);
1741        let q1 = q1
1742            .activate(&mem, Event::new().unwrap(), interrupt.clone())
1743            .expect("QueueConfig::activate");
1744
1745        b.activate(mem, interrupt.clone(), BTreeMap::from([(0, q0), (1, q1)]))
1746            .expect("activate should succeed");
1747
1748        // assert the original size first
1749        assert_eq!(
1750            b.disk_size.load(Ordering::Acquire),
1751            original_size,
1752            "disk_size should be the original size first"
1753        );
1754        let mut capacity = [0u8; 8];
1755        b.read_config(0, &mut capacity);
1756        assert_eq!(
1757            capacity,
1758            // original_size (0x1000) >> SECTOR_SHIFT (9) = 0x8
1759            [0x8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1760            "read_config should read the original capacity first"
1761        );
1762
1763        // assert resize works
1764        control_tube
1765            .send(&DiskControlCommand::Resize {
1766                new_size: resized_size,
1767            })
1768            .unwrap();
1769        assert_eq!(
1770            control_tube.recv::<DiskControlResult>().unwrap(),
1771            DiskControlResult::Ok,
1772            "resize command should succeed"
1773        );
1774        assert_eq!(
1775            b.disk_size.load(Ordering::Acquire),
1776            resized_size,
1777            "disk_size should be resized to the new size"
1778        );
1779        assert_eq!(
1780            disk_image.get_len().unwrap(),
1781            resized_size,
1782            "underlying disk image should be resized to the new size"
1783        );
1784        let mut capacity = [0u8; 8];
1785        b.read_config(0, &mut capacity);
1786        assert_eq!(
1787            capacity,
1788            // resized_size (0x2000) >> SECTOR_SHIFT (9) = 0x10
1789            [0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1790            "read_config should read the resized capacity"
1791        );
1792        // Wait until the blk signals the interrupt
1793        interrupt
1794            .get_interrupt_evt()
1795            .wait()
1796            .expect("interrupt should be signaled");
1797
1798        assert_eq!(
1799            interrupt.read_interrupt_status(),
1800            crate::virtio::INTERRUPT_STATUS_CONFIG_CHANGED as u8,
1801            "INTERRUPT_STATUS_CONFIG_CHANGED should be signaled"
1802        );
1803    }
1804
1805    #[test]
1806    fn run_worker_threads() {
1807        // Create an empty duplicable disk image
1808        let f = tempfile().unwrap();
1809        f.set_len(0x1000).unwrap();
1810        let disk_image: Box<dyn DiskFile> = Box::new(f);
1811
1812        // Create an empty guest memory
1813        let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1814            .expect("Creating guest memory failed.");
1815
1816        // Create a BlockAsync to test with single worker thread
1817        let features = base_features(ProtectionType::Unprotected);
1818        let disk_option = DiskOption::default();
1819        let mut b = BlockAsync::new(
1820            features,
1821            disk_image.try_clone().unwrap(),
1822            &disk_option,
1823            None,
1824            None,
1825            None,
1826        )
1827        .unwrap();
1828
1829        // activate with queues of an arbitrary size.
1830        let interrupt = Interrupt::new_for_test();
1831        let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1832        q0.set_ready(true);
1833        let q0 = q0
1834            .activate(&mem, Event::new().unwrap(), interrupt.clone())
1835            .expect("QueueConfig::activate");
1836
1837        let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1838        q1.set_ready(true);
1839        let q1 = q1
1840            .activate(&mem, Event::new().unwrap(), interrupt.clone())
1841            .expect("QueueConfig::activate");
1842
1843        b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1844            .expect("activate should succeed");
1845
1846        assert_eq!(b.worker_threads.len(), 1, "1 threads should be spawned.");
1847        drop(b);
1848
1849        // Create a BlockAsync to test with multiple worker threads
1850        let features = base_features(ProtectionType::Unprotected);
1851        let disk_option = DiskOption {
1852            read_only: true,
1853            sparse: false,
1854            multiple_workers: true,
1855            ..DiskOption::default()
1856        };
1857        let mut b = BlockAsync::new(features, disk_image, &disk_option, None, None, None).unwrap();
1858
1859        // activate should succeed
1860        let interrupt = Interrupt::new_for_test();
1861        let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1862        q0.set_ready(true);
1863        let q0 = q0
1864            .activate(&mem, Event::new().unwrap(), interrupt.clone())
1865            .expect("QueueConfig::activate");
1866
1867        let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1868        q1.set_ready(true);
1869        let q1 = q1
1870            .activate(&mem, Event::new().unwrap(), interrupt.clone())
1871            .expect("QueueConfig::activate");
1872
1873        b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1874            .expect("activate should succeed");
1875
1876        assert_eq!(b.worker_threads.len(), 2, "2 threads should be spawned.");
1877    }
1878
1879    struct BlockContext {}
1880
1881    fn modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync) {
1882        b.avail_features = !b.avail_features;
1883    }
1884
1885    fn create_device() -> (BlockContext, BlockAsync) {
1886        // Create an empty disk image
1887        let f = tempfile().unwrap();
1888        f.set_len(0x1000).unwrap();
1889        let disk_image: Box<dyn DiskFile> = Box::new(f);
1890
1891        // Create a BlockAsync to test
1892        let features = base_features(ProtectionType::Unprotected);
1893        let id = b"Block serial number\0";
1894        let disk_option = DiskOption {
1895            read_only: true,
1896            id: Some(*id),
1897            sparse: false,
1898            multiple_workers: true,
1899            ..Default::default()
1900        };
1901        (
1902            BlockContext {},
1903            BlockAsync::new(
1904                features,
1905                disk_image.try_clone().unwrap(),
1906                &disk_option,
1907                None,
1908                None,
1909                None,
1910            )
1911            .unwrap(),
1912        )
1913    }
1914
1915    #[cfg(any(target_os = "android", target_os = "linux"))]
1916    suspendable_virtio_tests!(asyncblock, create_device, 2, modify_device);
1917}