1use std::cell::RefCell;
6use std::collections::BTreeMap;
7use std::collections::BTreeSet;
8use std::io;
9use std::io::Write;
10use std::mem::size_of;
11#[cfg(windows)]
12use std::num::NonZeroU32;
13use std::rc::Rc;
14use std::result;
15use std::sync::atomic::AtomicU64;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Context;
21use base::debug;
22use base::error;
23use base::info;
24use base::warn;
25use base::AsRawDescriptor;
26use base::Error as SysError;
27use base::Event;
28use base::RawDescriptor;
29use base::Result as SysResult;
30use base::Timer;
31use base::Tube;
32use base::TubeError;
33use base::WorkerThread;
34use cros_async::sync::RwLock as AsyncRwLock;
35use cros_async::AsyncError;
36use cros_async::AsyncTube;
37use cros_async::EventAsync;
38use cros_async::Executor;
39use cros_async::ExecutorKind;
40use cros_async::TimerAsync;
41use data_model::Le16;
42use data_model::Le32;
43use data_model::Le64;
44use disk::AsyncDisk;
45use disk::DiskFile;
46use futures::channel::mpsc;
47use futures::channel::oneshot;
48use futures::pin_mut;
49use futures::stream::FuturesUnordered;
50use futures::stream::StreamExt;
51use futures::FutureExt;
52use remain::sorted;
53use snapshot::AnySnapshot;
54use thiserror::Error as ThisError;
55use virtio_sys::virtio_config::VIRTIO_F_RING_PACKED;
56use vm_control::DiskControlCommand;
57use vm_control::DiskControlResult;
58use vm_memory::GuestMemory;
59use zerocopy::IntoBytes;
60
61use crate::virtio::async_utils;
62use crate::virtio::block::sys::*;
63use crate::virtio::block::DiskOption;
64use crate::virtio::copy_config;
65use crate::virtio::device_constants::block::virtio_blk_config;
66use crate::virtio::device_constants::block::virtio_blk_discard_write_zeroes;
67use crate::virtio::device_constants::block::virtio_blk_req_header;
68use crate::virtio::device_constants::block::VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP;
69use crate::virtio::device_constants::block::VIRTIO_BLK_F_BLK_SIZE;
70use crate::virtio::device_constants::block::VIRTIO_BLK_F_DISCARD;
71use crate::virtio::device_constants::block::VIRTIO_BLK_F_FLUSH;
72use crate::virtio::device_constants::block::VIRTIO_BLK_F_MQ;
73use crate::virtio::device_constants::block::VIRTIO_BLK_F_RO;
74use crate::virtio::device_constants::block::VIRTIO_BLK_F_SEG_MAX;
75use crate::virtio::device_constants::block::VIRTIO_BLK_F_WRITE_ZEROES;
76use crate::virtio::device_constants::block::VIRTIO_BLK_S_IOERR;
77use crate::virtio::device_constants::block::VIRTIO_BLK_S_OK;
78use crate::virtio::device_constants::block::VIRTIO_BLK_S_UNSUPP;
79use crate::virtio::device_constants::block::VIRTIO_BLK_T_DISCARD;
80use crate::virtio::device_constants::block::VIRTIO_BLK_T_FLUSH;
81use crate::virtio::device_constants::block::VIRTIO_BLK_T_GET_ID;
82use crate::virtio::device_constants::block::VIRTIO_BLK_T_IN;
83use crate::virtio::device_constants::block::VIRTIO_BLK_T_OUT;
84use crate::virtio::device_constants::block::VIRTIO_BLK_T_WRITE_ZEROES;
85use crate::virtio::DescriptorChain;
86use crate::virtio::DeviceType;
87use crate::virtio::Interrupt;
88use crate::virtio::Queue;
89use crate::virtio::Reader;
90use crate::virtio::VirtioDevice;
91use crate::virtio::Writer;
92use crate::PciAddress;
93
94const DEFAULT_QUEUE_SIZE: u16 = 256;
95const DEFAULT_NUM_QUEUES: u16 = 16;
96
97const SECTOR_SHIFT: u8 = 9;
98const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
99
100const MAX_DISCARD_SECTORS: u32 = u32::MAX;
101const MAX_WRITE_ZEROES_SECTORS: u32 = u32::MAX;
102const MAX_DISCARD_SEG: u32 = 32;
104const MAX_WRITE_ZEROES_SEG: u32 = 32;
105const DISCARD_SECTOR_ALIGNMENT: u32 = 128;
108
109#[sorted]
110#[derive(ThisError, Debug)]
111enum ExecuteError {
112 #[error("failed to copy ID string: {0}")]
113 CopyId(io::Error),
114 #[error("failed to perform discard or write zeroes; sector={sector} num_sectors={num_sectors} flags={flags}; {ioerr:?}")]
115 DiscardWriteZeroes {
116 ioerr: Option<disk::Error>,
117 sector: u64,
118 num_sectors: u32,
119 flags: u32,
120 },
121 #[error("failed to flush: {0}")]
122 Flush(disk::Error),
123 #[error("not enough space in descriptor chain to write status")]
124 MissingStatus,
125 #[error("out of range")]
126 OutOfRange,
127 #[error("failed to read message: {0}")]
128 Read(io::Error),
129 #[error("io error reading {length} bytes from sector {sector}: {desc_error}")]
130 ReadIo {
131 length: usize,
132 sector: u64,
133 desc_error: disk::Error,
134 },
135 #[error("read only; request_type={request_type}")]
136 ReadOnly { request_type: u32 },
137 #[error("failed to recieve command message: {0}")]
138 ReceivingCommand(TubeError),
139 #[error("failed to send command response: {0}")]
140 SendingResponse(TubeError),
141 #[error("couldn't reset the timer: {0}")]
142 TimerReset(base::Error),
143 #[error("too many segments: {0} > {0}")]
144 TooManySegments(usize, usize),
145 #[error("unsupported ({0})")]
146 Unsupported(u32),
147 #[error("io error writing {length} bytes from sector {sector}: {desc_error}")]
148 WriteIo {
149 length: usize,
150 sector: u64,
151 desc_error: disk::Error,
152 },
153 #[error("failed to write request status: {0}")]
154 WriteStatus(io::Error),
155}
156
157enum LogLevel {
158 Debug,
159 Error,
160}
161
162impl ExecuteError {
163 fn status(&self) -> u8 {
164 match self {
165 ExecuteError::CopyId(_) => VIRTIO_BLK_S_IOERR,
166 ExecuteError::DiscardWriteZeroes { .. } => VIRTIO_BLK_S_IOERR,
167 ExecuteError::Flush(_) => VIRTIO_BLK_S_IOERR,
168 ExecuteError::MissingStatus => VIRTIO_BLK_S_IOERR,
169 ExecuteError::OutOfRange => VIRTIO_BLK_S_IOERR,
170 ExecuteError::Read(_) => VIRTIO_BLK_S_IOERR,
171 ExecuteError::ReadIo { .. } => VIRTIO_BLK_S_IOERR,
172 ExecuteError::ReadOnly { .. } => VIRTIO_BLK_S_IOERR,
173 ExecuteError::ReceivingCommand(_) => VIRTIO_BLK_S_IOERR,
174 ExecuteError::SendingResponse(_) => VIRTIO_BLK_S_IOERR,
175 ExecuteError::TimerReset(_) => VIRTIO_BLK_S_IOERR,
176 ExecuteError::TooManySegments(_, _) => VIRTIO_BLK_S_IOERR,
177 ExecuteError::WriteIo { .. } => VIRTIO_BLK_S_IOERR,
178 ExecuteError::WriteStatus(_) => VIRTIO_BLK_S_IOERR,
179 ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP,
180 }
181 }
182
183 fn log_level(&self) -> LogLevel {
184 match self {
185 ExecuteError::ReadIo { .. }
187 | ExecuteError::WriteIo { .. }
188 | ExecuteError::Flush { .. }
189 | ExecuteError::DiscardWriteZeroes { .. } => LogLevel::Debug,
190 _ => LogLevel::Error,
192 }
193 }
194}
195
196#[sorted]
199#[derive(ThisError, Debug)]
200enum ControlError {
201 #[error("failed to fdatasync the disk: {0}")]
202 FdatasyncDisk(disk::Error),
203 #[error("couldn't get a value from a timer for flushing: {0}")]
204 FlushTimer(AsyncError),
205}
206
207const ID_LEN: usize = 20;
209
210type BlockId = [u8; ID_LEN];
214
215struct DiskState {
217 disk_image: Box<dyn AsyncDisk>,
218 read_only: bool,
219 sparse: bool,
220 id: BlockId,
221 worker_shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
224}
225
226struct WorkerSharedState {
228 disk_size: Arc<AtomicU64>,
229}
230
231async fn process_one_request(
232 avail_desc: &mut DescriptorChain,
233 disk_state: &AsyncRwLock<DiskState>,
234 flush_timer: &RefCell<TimerAsync<Timer>>,
235 flush_timer_armed: &RefCell<bool>,
236) -> result::Result<usize, ExecuteError> {
237 let reader = &mut avail_desc.reader;
238 let writer = &mut avail_desc.writer;
239
240 let available_bytes = writer.available_bytes();
244 let status_offset = available_bytes
245 .checked_sub(1)
246 .ok_or(ExecuteError::MissingStatus)?;
247 let mut status_writer = writer.split_at(status_offset);
248
249 let status = match BlockAsync::execute_request(
250 reader,
251 writer,
252 disk_state,
253 flush_timer,
254 flush_timer_armed,
255 )
256 .await
257 {
258 Ok(()) => VIRTIO_BLK_S_OK,
259 Err(e) => {
260 match e.log_level() {
261 LogLevel::Debug => debug!("failed executing disk request: {:#}", e),
262 LogLevel::Error => error!("failed executing disk request: {:#}", e),
263 }
264 e.status()
265 }
266 };
267
268 status_writer
269 .write_all(&[status])
270 .map_err(ExecuteError::WriteStatus)?;
271 Ok(available_bytes)
272}
273
274async fn process_one_chain(
276 queue: &RefCell<Queue>,
277 mut avail_desc: DescriptorChain,
278 disk_state: &AsyncRwLock<DiskState>,
279 flush_timer: &RefCell<TimerAsync<Timer>>,
280 flush_timer_armed: &RefCell<bool>,
281) {
282 let len = match process_one_request(&mut avail_desc, disk_state, flush_timer, flush_timer_armed)
283 .await
284 {
285 Ok(len) => len,
286 Err(e) => {
287 error!("block: failed to handle request: {:#}", e);
288 0
289 }
290 };
291
292 let mut queue = queue.borrow_mut();
293 queue.add_used_with_bytes_written(avail_desc, len as u32);
294 queue.trigger_interrupt();
295}
296
297async fn handle_queue(
301 disk_state: Rc<AsyncRwLock<DiskState>>,
302 queue: Queue,
303 evt: EventAsync,
304 flush_timer: Rc<RefCell<TimerAsync<Timer>>>,
305 flush_timer_armed: Rc<RefCell<bool>>,
306 mut stop_rx: oneshot::Receiver<()>,
307) -> Queue {
308 let queue = RefCell::new(queue);
309 let mut background_tasks = FuturesUnordered::new();
310 let evt_future = evt.next_val().fuse();
311 pin_mut!(evt_future);
312 loop {
313 futures::select! {
320 _ = background_tasks.next() => continue,
321 res = evt_future => {
322 evt_future.set(evt.next_val().fuse());
323 if let Err(e) = res {
324 error!("Failed to read the next queue event: {:#}", e);
325 continue;
326 }
327 }
328 _ = stop_rx => {
329 background_tasks.collect::<()>().await;
332 return queue.into_inner();
333 }
334 };
335 while let Some(descriptor_chain) = queue.borrow_mut().pop() {
336 background_tasks.push(process_one_chain(
337 &queue,
338 descriptor_chain,
339 &disk_state,
340 &flush_timer,
341 &flush_timer_armed,
342 ));
343 }
344 }
345}
346
347async fn handle_command_tube(
348 command_tube: &Option<AsyncTube>,
349 interrupt: &RefCell<Option<Interrupt>>,
350 disk_state: Rc<AsyncRwLock<DiskState>>,
351) -> Result<(), ExecuteError> {
352 let command_tube = match command_tube {
353 Some(c) => c,
354 None => {
355 futures::future::pending::<()>().await;
356 return Ok(());
357 }
358 };
359 loop {
360 match command_tube.next().await {
361 Ok(command) => {
362 let resp = match command {
363 DiskControlCommand::Resize { new_size } => resize(&disk_state, new_size).await,
364 };
365
366 let resp_clone = resp.clone();
367 command_tube
368 .send(resp_clone)
369 .await
370 .map_err(ExecuteError::SendingResponse)?;
371 if let DiskControlResult::Ok = resp {
372 if let Some(interrupt) = &*interrupt.borrow() {
373 interrupt.signal_config_changed();
374 }
375 }
376 }
377 Err(e) => return Err(ExecuteError::ReceivingCommand(e)),
378 }
379 }
380}
381
382async fn resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult {
383 let disk_state = disk_state.lock().await;
386 let worker_shared_state = Arc::clone(&disk_state.worker_shared_state);
388 let worker_shared_state = worker_shared_state.lock().await;
389
390 if disk_state.read_only {
391 error!("Attempted to resize read-only block device");
392 return DiskControlResult::Err(SysError::new(libc::EROFS));
393 }
394
395 info!("Resizing block device to {} bytes", new_size);
396
397 if let Err(e) = disk_state.disk_image.set_len(new_size) {
398 error!("Resizing disk failed! {:#}", e);
399 return DiskControlResult::Err(SysError::new(libc::EIO));
400 }
401
402 if !disk_state.sparse {
404 if let Err(e) = disk_state.disk_image.allocate(0, new_size) {
405 error!("Allocating disk space after resize failed! {:#}", e);
406 return DiskControlResult::Err(SysError::new(libc::EIO));
407 }
408 }
409
410 if let Ok(new_disk_size) = disk_state.disk_image.get_len() {
411 worker_shared_state
412 .disk_size
413 .store(new_disk_size, Ordering::Release);
414 }
415 DiskControlResult::Ok
416}
417
418async fn flush_disk(
420 disk_state: Rc<AsyncRwLock<DiskState>>,
421 timer: TimerAsync<Timer>,
422 armed: Rc<RefCell<bool>>,
423) -> Result<(), ControlError> {
424 loop {
425 timer.wait().await.map_err(ControlError::FlushTimer)?;
426 if !*armed.borrow() {
427 continue;
428 }
429
430 *armed.borrow_mut() = false;
433
434 disk_state
435 .read_lock()
436 .await
437 .disk_image
438 .fdatasync()
439 .await
440 .map_err(ControlError::FdatasyncDisk)?;
441 }
442}
443
444enum WorkerCmd {
445 StartQueue {
446 index: usize,
447 queue: Queue,
448 },
449 StopQueue {
450 index: usize,
451 response_tx: oneshot::Sender<Option<Queue>>,
454 },
455 AbortQueues {
458 response_tx: oneshot::Sender<()>,
460 },
461}
462
463async fn run_worker(
470 ex: &Executor,
471 disk_state: &Rc<AsyncRwLock<DiskState>>,
472 control_tube: &Option<AsyncTube>,
473 mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>,
474 kill_evt: Event,
475) -> anyhow::Result<()> {
476 let timer = Timer::new().expect("Failed to create a timer");
478 let flush_timer_armed = Rc::new(RefCell::new(false));
479
480 let control_interrupt = RefCell::new(None);
482 let control = handle_command_tube(control_tube, &control_interrupt, disk_state.clone()).fuse();
483 pin_mut!(control);
484
485 let flush_timer = Rc::new(RefCell::new(
487 TimerAsync::new(
488 timer.try_clone().expect("Failed to clone flush_timer"),
490 ex,
491 )
492 .expect("Failed to create an async timer"),
493 ));
494
495 let flush_timer2 = TimerAsync::new(timer, ex).expect("Failed to create an async timer");
497 let disk_flush = flush_disk(disk_state.clone(), flush_timer2, flush_timer_armed.clone()).fuse();
498 pin_mut!(disk_flush);
499
500 let kill = async_utils::await_and_exit(ex, kill_evt).fuse();
502 pin_mut!(kill);
503
504 let mut queue_handlers = FuturesUnordered::new();
506 let mut queue_handler_stop_fns = std::collections::BTreeMap::new();
508
509 loop {
510 futures::select! {
511 _ = queue_handlers.next() => continue,
512 r = disk_flush => return r.context("failed to flush a disk"),
513 r = control => return r.context("failed to handle a control request"),
514 r = kill => return r.context("failed to wait on the kill event"),
515 worker_cmd = worker_rx.next() => {
516 match worker_cmd {
517 None => anyhow::bail!("worker control channel unexpectedly closed"),
518 Some(WorkerCmd::StartQueue{index, queue}) => {
519 if control_interrupt.borrow().is_none() {
520 *control_interrupt.borrow_mut() = Some(queue.interrupt().clone());
521 }
522
523 let (tx, rx) = oneshot::channel();
524 let kick_evt = queue.event().try_clone().expect("Failed to clone queue event");
525 let (handle_queue_future, remote_handle) = handle_queue(
526 Rc::clone(disk_state),
527 queue,
528 EventAsync::new(kick_evt, ex).expect("Failed to create async event for queue"),
529 Rc::clone(&flush_timer),
530 Rc::clone(&flush_timer_armed),
531 rx,
532 ).remote_handle();
533 let old_stop_fn = queue_handler_stop_fns.insert(index, move || {
534 tx.send(()).unwrap_or_else(|_| panic!("queue handler channel closed early"));
536 remote_handle
538 });
539
540 if let Some(stop_fn) = old_stop_fn {
543 warn!("Starting new queue handler without stopping old handler");
544 let mut fut = stop_fn().fuse();
547 loop {
548 futures::select! {
549 _ = queue_handlers.next() => continue,
550 _queue = fut => break,
551 }
552 }
553 }
554
555 queue_handlers.push(handle_queue_future);
556 }
557 Some(WorkerCmd::StopQueue{index, response_tx}) => {
558 match queue_handler_stop_fns.remove(&index) {
559 Some(stop_fn) => {
560 let mut fut = stop_fn().fuse();
568 let queue = loop {
569 futures::select! {
570 _ = queue_handlers.next() => continue,
571 queue = fut => break queue,
572 }
573 };
574
575 if queue_handlers.is_empty() {
579 *control_interrupt.borrow_mut() = None;
580 }
581
582 let _ = response_tx.send(Some(queue));
583 }
584 None => { let _ = response_tx.send(None); },
585 }
586
587 }
588 Some(WorkerCmd::AbortQueues{response_tx}) => {
589 queue_handlers.clear();
590 queue_handler_stop_fns.clear();
591
592 *control_interrupt.borrow_mut() = None;
593
594 let _ = response_tx.send(());
595 }
596 }
597 }
598 };
599 }
600}
601
602pub struct BlockAsync {
604 boot_index: Option<usize>,
607 disk_image: Option<Box<dyn DiskFile>>,
609 disk_size: Arc<AtomicU64>,
610 avail_features: u64,
611 read_only: bool,
612 sparse: bool,
613 seg_max: u32,
614 block_size: u32,
615 id: BlockId,
616 control_tube: Option<Tube>,
617 queue_sizes: Vec<u16>,
618 pub(super) executor_kind: ExecutorKind,
619 worker_threads: BTreeMap<usize, (WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)>,
627 shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
628 worker_per_queue: bool,
630 activated_queues: BTreeSet<usize>,
634 #[cfg(windows)]
635 pub(super) io_concurrency: u32,
636 pci_address: Option<PciAddress>,
637}
638
639impl BlockAsync {
640 pub fn new(
642 base_features: u64,
643 disk_image: Box<dyn DiskFile>,
644 disk_option: &DiskOption,
645 control_tube: Option<Tube>,
646 queue_size: Option<u16>,
647 num_queues: Option<u16>,
648 ) -> SysResult<BlockAsync> {
649 let read_only = disk_option.read_only;
650 let sparse = disk_option.sparse;
651 let block_size = disk_option.block_size;
652 let packed_queue = disk_option.packed_queue;
653 let id = disk_option.id.unwrap_or_default();
654 let mut worker_per_queue = disk_option.multiple_workers;
655 if worker_per_queue && disk_image.try_clone().is_err() {
657 base::warn!("multiple workers requested, but not supported by disk image type");
658 worker_per_queue = false;
659 }
660 let executor_kind = disk_option.async_executor.unwrap_or_default();
661 let boot_index = disk_option.bootindex;
662 #[cfg(windows)]
663 let io_concurrency = disk_option.io_concurrency.get();
664
665 if block_size % SECTOR_SIZE as u32 != 0 {
666 error!(
667 "Block size {} is not a multiple of {}.",
668 block_size, SECTOR_SIZE,
669 );
670 return Err(SysError::new(libc::EINVAL));
671 }
672 let disk_size = disk_image.get_len()?;
673 if disk_size % block_size as u64 != 0 {
674 warn!(
675 "Disk size {} is not a multiple of block size {}; \
676 the remainder will not be visible to the guest.",
677 disk_size, block_size,
678 );
679 }
680 let num_queues = num_queues.unwrap_or(DEFAULT_NUM_QUEUES);
681 let multi_queue = match num_queues {
682 0 => panic!("Number of queues cannot be zero for a block device"),
683 1 => false,
684 _ => true,
685 };
686 let q_size = queue_size.unwrap_or(DEFAULT_QUEUE_SIZE);
687 if !q_size.is_power_of_two() {
688 error!("queue size {} is not a power of 2.", q_size);
689 return Err(SysError::new(libc::EINVAL));
690 }
691 let queue_sizes = vec![q_size; num_queues as usize];
692
693 let avail_features =
694 Self::build_avail_features(base_features, read_only, sparse, multi_queue, packed_queue);
695
696 let seg_max = get_seg_max(q_size);
697
698 let disk_size = Arc::new(AtomicU64::new(disk_size));
699 let shared_state = Arc::new(AsyncRwLock::new(WorkerSharedState {
700 disk_size: disk_size.clone(),
701 }));
702
703 Ok(BlockAsync {
704 disk_image: Some(disk_image),
705 disk_size,
706 avail_features,
707 read_only,
708 sparse,
709 seg_max,
710 block_size,
711 id,
712 queue_sizes,
713 worker_threads: BTreeMap::new(),
714 shared_state,
715 worker_per_queue,
716 control_tube,
717 executor_kind,
718 activated_queues: BTreeSet::new(),
719 boot_index,
720 #[cfg(windows)]
721 io_concurrency,
722 pci_address: disk_option.pci_address,
723 })
724 }
725
726 fn build_avail_features(
728 base_features: u64,
729 read_only: bool,
730 sparse: bool,
731 multi_queue: bool,
732 packed_queue: bool,
733 ) -> u64 {
734 let mut avail_features = base_features;
735 if read_only {
736 avail_features |= 1 << VIRTIO_BLK_F_RO;
737 } else {
738 if sparse {
739 avail_features |= 1 << VIRTIO_BLK_F_DISCARD;
740 }
741 avail_features |= 1 << VIRTIO_BLK_F_FLUSH;
742 avail_features |= 1 << VIRTIO_BLK_F_WRITE_ZEROES;
743 }
744 avail_features |= 1 << VIRTIO_BLK_F_SEG_MAX;
745 avail_features |= 1 << VIRTIO_BLK_F_BLK_SIZE;
746 if multi_queue {
747 avail_features |= 1 << VIRTIO_BLK_F_MQ;
748 }
749 if packed_queue {
750 avail_features |= 1 << VIRTIO_F_RING_PACKED;
751 }
752 avail_features
753 }
754
755 async fn execute_request(
760 reader: &mut Reader,
761 writer: &mut Writer,
762 disk_state: &AsyncRwLock<DiskState>,
763 flush_timer: &RefCell<TimerAsync<Timer>>,
764 flush_timer_armed: &RefCell<bool>,
765 ) -> result::Result<(), ExecuteError> {
766 let disk_state = disk_state.read_lock().await;
768 let worker_shared_state = disk_state.worker_shared_state.read_lock().await;
770
771 let req_header: virtio_blk_req_header = reader.read_obj().map_err(ExecuteError::Read)?;
772
773 let req_type = req_header.req_type.to_native();
774 let sector = req_header.sector.to_native();
775
776 if disk_state.read_only && req_type != VIRTIO_BLK_T_IN && req_type != VIRTIO_BLK_T_GET_ID {
777 return Err(ExecuteError::ReadOnly {
778 request_type: req_type,
779 });
780 }
781
782 fn check_range(
785 io_start: u64,
786 io_length: u64,
787 disk_size: u64,
788 ) -> result::Result<(), ExecuteError> {
789 let io_end = io_start
790 .checked_add(io_length)
791 .ok_or(ExecuteError::OutOfRange)?;
792 if io_end > disk_size {
793 Err(ExecuteError::OutOfRange)
794 } else {
795 Ok(())
796 }
797 }
798
799 let disk_size = worker_shared_state.disk_size.load(Ordering::Relaxed);
800 match req_type {
801 VIRTIO_BLK_T_IN => {
802 let data_len = writer.available_bytes();
803 if data_len == 0 {
804 return Ok(());
805 }
806 let offset = sector
807 .checked_shl(u32::from(SECTOR_SHIFT))
808 .ok_or(ExecuteError::OutOfRange)?;
809 check_range(offset, data_len as u64, disk_size)?;
810 let disk_image = &disk_state.disk_image;
811 writer
812 .write_all_from_at_fut(&**disk_image, data_len, offset)
813 .await
814 .map_err(|desc_error| ExecuteError::ReadIo {
815 length: data_len,
816 sector,
817 desc_error,
818 })?;
819 }
820 VIRTIO_BLK_T_OUT => {
821 let data_len = reader.available_bytes();
822 if data_len == 0 {
823 return Ok(());
824 }
825 let offset = sector
826 .checked_shl(u32::from(SECTOR_SHIFT))
827 .ok_or(ExecuteError::OutOfRange)?;
828 check_range(offset, data_len as u64, disk_size)?;
829 let disk_image = &disk_state.disk_image;
830 reader
831 .read_exact_to_at_fut(&**disk_image, data_len, offset)
832 .await
833 .map_err(|desc_error| ExecuteError::WriteIo {
834 length: data_len,
835 sector,
836 desc_error,
837 })?;
838
839 if !*flush_timer_armed.borrow() {
840 *flush_timer_armed.borrow_mut() = true;
841
842 let flush_delay = Duration::from_secs(60);
843 flush_timer
844 .borrow_mut()
845 .reset_oneshot(flush_delay)
846 .map_err(ExecuteError::TimerReset)?;
847 }
848 }
849 VIRTIO_BLK_T_DISCARD | VIRTIO_BLK_T_WRITE_ZEROES => {
850 if req_type == VIRTIO_BLK_T_DISCARD && !disk_state.sparse {
851 return Ok(());
853 }
854
855 let seg_count =
856 reader.available_bytes() / size_of::<virtio_blk_discard_write_zeroes>();
857 let seg_max = if req_type == VIRTIO_BLK_T_DISCARD {
858 MAX_DISCARD_SEG as usize
859 } else {
860 MAX_WRITE_ZEROES_SEG as usize
861 };
862 if seg_count > seg_max {
863 return Err(ExecuteError::TooManySegments(seg_count, seg_max));
864 }
865
866 while reader.available_bytes() >= size_of::<virtio_blk_discard_write_zeroes>() {
867 let seg: virtio_blk_discard_write_zeroes =
868 reader.read_obj().map_err(ExecuteError::Read)?;
869
870 let sector = seg.sector.to_native();
871 let num_sectors = seg.num_sectors.to_native();
872 let flags = seg.flags.to_native();
873
874 let valid_flags = if req_type == VIRTIO_BLK_T_WRITE_ZEROES {
875 VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP
876 } else {
877 0
878 };
879
880 if (flags & !valid_flags) != 0 {
881 return Err(ExecuteError::DiscardWriteZeroes {
882 ioerr: None,
883 sector,
884 num_sectors,
885 flags,
886 });
887 }
888
889 let offset = sector
890 .checked_shl(u32::from(SECTOR_SHIFT))
891 .ok_or(ExecuteError::OutOfRange)?;
892 let length = u64::from(num_sectors)
893 .checked_shl(u32::from(SECTOR_SHIFT))
894 .ok_or(ExecuteError::OutOfRange)?;
895 check_range(offset, length, disk_size)?;
896
897 if req_type == VIRTIO_BLK_T_DISCARD {
898 let _ = disk_state.disk_image.punch_hole(offset, length).await;
901 } else {
902 disk_state
903 .disk_image
904 .write_zeroes_at(offset, length)
905 .await
906 .map_err(|e| ExecuteError::DiscardWriteZeroes {
907 ioerr: Some(e),
908 sector,
909 num_sectors,
910 flags,
911 })?;
912 }
913 }
914 }
915 VIRTIO_BLK_T_FLUSH => {
916 disk_state
917 .disk_image
918 .fdatasync()
919 .await
920 .map_err(ExecuteError::Flush)?;
921
922 if *flush_timer_armed.borrow() {
923 flush_timer
924 .borrow_mut()
925 .clear()
926 .map_err(ExecuteError::TimerReset)?;
927 *flush_timer_armed.borrow_mut() = false;
928 }
929 }
930 VIRTIO_BLK_T_GET_ID => {
931 writer
932 .write_all(&disk_state.id)
933 .map_err(ExecuteError::CopyId)?;
934 }
935 t => return Err(ExecuteError::Unsupported(t)),
936 };
937 Ok(())
938 }
939
940 fn build_config_space(
942 disk_size: u64,
943 seg_max: u32,
944 block_size: u32,
945 num_queues: u16,
946 ) -> virtio_blk_config {
947 virtio_blk_config {
948 capacity: Le64::from(disk_size >> SECTOR_SHIFT),
950 seg_max: Le32::from(seg_max),
951 blk_size: Le32::from(block_size),
952 num_queues: Le16::from(num_queues),
953 max_discard_sectors: Le32::from(MAX_DISCARD_SECTORS),
954 discard_sector_alignment: Le32::from(DISCARD_SECTOR_ALIGNMENT),
955 max_write_zeroes_sectors: Le32::from(MAX_WRITE_ZEROES_SECTORS),
956 write_zeroes_may_unmap: 1,
957 max_discard_seg: Le32::from(MAX_DISCARD_SEG),
958 max_write_zeroes_seg: Le32::from(MAX_WRITE_ZEROES_SEG),
959 ..Default::default()
960 }
961 }
962
963 #[allow(clippy::map_entry)]
966 fn start_worker(
967 &mut self,
968 idx: usize,
969 ) -> anyhow::Result<&(WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)> {
970 let key = if self.worker_per_queue { idx } else { 0 };
971 if self.worker_threads.contains_key(&key) {
972 return Ok(self.worker_threads.get(&key).unwrap());
973 }
974
975 let ex = self.create_executor();
976 let control_tube = self.control_tube.take();
977 let disk_image = if self.worker_per_queue {
978 self.disk_image
979 .as_ref()
980 .context("Failed to ref a disk image")?
981 .try_clone()
982 .context("Failed to clone a disk image")?
983 } else {
984 self.disk_image
985 .take()
986 .context("Failed to take a disk image")?
987 };
988 let read_only = self.read_only;
989 let sparse = self.sparse;
990 let id = self.id;
991 let worker_shared_state = self.shared_state.clone();
992
993 let (worker_tx, worker_rx) = mpsc::unbounded();
994 let worker_thread = WorkerThread::start("virtio_blk", move |kill_evt| {
995 let async_control =
996 control_tube.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
997
998 let async_image = match disk_image.to_async_disk(&ex) {
999 Ok(d) => d,
1000 Err(e) => panic!("Failed to create async disk {e:#}"),
1001 };
1002
1003 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1004 disk_image: async_image,
1005 read_only,
1006 sparse,
1007 id,
1008 worker_shared_state,
1009 }));
1010
1011 if let Err(err_string) = ex
1012 .run_until(async {
1013 let r = run_worker(&ex, &disk_state, &async_control, worker_rx, kill_evt).await;
1014 if let Err(e) = disk_state.lock().await.disk_image.flush().await {
1016 error!("failed to flush disk image when stopping worker: {e:?}");
1017 }
1018 r
1019 })
1020 .expect("run_until failed")
1021 {
1022 error!("{:#}", err_string);
1023 }
1024 });
1025 match self.worker_threads.entry(key) {
1026 std::collections::btree_map::Entry::Occupied(_) => unreachable!(),
1027 std::collections::btree_map::Entry::Vacant(e) => {
1028 Ok(e.insert((worker_thread, worker_tx)))
1029 }
1030 }
1031 }
1032
1033 pub fn start_queue(
1034 &mut self,
1035 idx: usize,
1036 queue: Queue,
1037 _mem: GuestMemory,
1038 ) -> anyhow::Result<()> {
1039 let (_, worker_tx) = self.start_worker(idx)?;
1040 worker_tx
1041 .unbounded_send(WorkerCmd::StartQueue { index: idx, queue })
1042 .expect("worker channel closed early");
1043 self.activated_queues.insert(idx);
1044 Ok(())
1045 }
1046
1047 pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
1048 let (_, worker_tx) = self
1051 .worker_threads
1052 .get(if self.worker_per_queue { &idx } else { &0 })
1053 .context("worker not found")?;
1054 let (response_tx, response_rx) = oneshot::channel();
1055 worker_tx
1056 .unbounded_send(WorkerCmd::StopQueue {
1057 index: idx,
1058 response_tx,
1059 })
1060 .expect("worker channel closed early");
1061 let queue = cros_async::block_on(async {
1062 response_rx
1063 .await
1064 .expect("response_rx closed early")
1065 .context("queue not found")
1066 })?;
1067 self.activated_queues.remove(&idx);
1068 Ok(queue)
1069 }
1070}
1071
1072impl VirtioDevice for BlockAsync {
1073 fn keep_rds(&self) -> Vec<RawDescriptor> {
1074 let mut keep_rds = Vec::new();
1075
1076 if let Some(disk_image) = &self.disk_image {
1077 keep_rds.extend(disk_image.as_raw_descriptors());
1078 }
1079
1080 if let Some(control_tube) = &self.control_tube {
1081 keep_rds.push(control_tube.as_raw_descriptor());
1082 }
1083
1084 keep_rds
1085 }
1086
1087 fn features(&self) -> u64 {
1088 self.avail_features
1089 }
1090
1091 fn device_type(&self) -> DeviceType {
1092 DeviceType::Block
1093 }
1094
1095 fn queue_max_sizes(&self) -> &[u16] {
1096 &self.queue_sizes
1097 }
1098
1099 fn read_config(&self, offset: u64, data: &mut [u8]) {
1100 let config_space = {
1101 let disk_size = self.disk_size.load(Ordering::Acquire);
1102 Self::build_config_space(
1103 disk_size,
1104 self.seg_max,
1105 self.block_size,
1106 self.queue_sizes.len() as u16,
1107 )
1108 };
1109 copy_config(data, 0, config_space.as_bytes(), offset);
1110 }
1111
1112 fn activate(
1113 &mut self,
1114 mem: GuestMemory,
1115 _interrupt: Interrupt,
1116 queues: BTreeMap<usize, Queue>,
1117 ) -> anyhow::Result<()> {
1118 for (i, q) in queues {
1119 self.start_queue(i, q, mem.clone())?;
1120 }
1121 Ok(())
1122 }
1123
1124 fn reset(&mut self) -> anyhow::Result<()> {
1125 for (_, (_, worker_tx)) in self.worker_threads.iter_mut() {
1126 let (response_tx, response_rx) = oneshot::channel();
1127 worker_tx
1128 .unbounded_send(WorkerCmd::AbortQueues { response_tx })
1129 .expect("worker channel closed early");
1130 cros_async::block_on(async { response_rx.await.expect("response_rx closed early") });
1131 }
1132 self.activated_queues.clear();
1133 Ok(())
1134 }
1135
1136 fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
1137 let mut queues = BTreeMap::new();
1139 for index in self.activated_queues.clone() {
1140 queues.insert(index, self.stop_queue(index)?);
1141 }
1142 if queues.is_empty() {
1143 return Ok(None); }
1145 Ok(Some(queues))
1146 }
1147
1148 fn virtio_wake(
1149 &mut self,
1150 queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
1151 ) -> anyhow::Result<()> {
1152 if let Some((mem, _interrupt, queues)) = queues_state {
1153 for (i, q) in queues {
1154 self.start_queue(i, q, mem.clone())?
1155 }
1156 }
1157 Ok(())
1158 }
1159
1160 fn virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
1161 AnySnapshot::to_any(())
1164 }
1165
1166 fn virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
1167 let () = AnySnapshot::from_any(data)?;
1168 Ok(())
1169 }
1170
1171 fn pci_address(&self) -> Option<PciAddress> {
1172 self.pci_address
1173 }
1174
1175 fn bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)> {
1176 self.boot_index
1177 .map(|s| (format!("scsi@{pci_slot}/disk@0,0").as_bytes().to_vec(), s))
1178 }
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183 use std::fs::File;
1184 use std::mem::size_of_val;
1185 use std::sync::atomic::AtomicU64;
1186
1187 use data_model::Le32;
1188 use data_model::Le64;
1189 use disk::SingleFileDisk;
1190 use hypervisor::ProtectionType;
1191 use tempfile::tempfile;
1192 use tempfile::TempDir;
1193 use vm_memory::GuestAddress;
1194
1195 use super::*;
1196 use crate::suspendable_virtio_tests;
1197 use crate::virtio::base_features;
1198 use crate::virtio::descriptor_utils::create_descriptor_chain;
1199 use crate::virtio::descriptor_utils::DescriptorType;
1200 use crate::virtio::QueueConfig;
1201
1202 #[test]
1203 fn read_size() {
1204 let f = tempfile().unwrap();
1205 f.set_len(0x1000).unwrap();
1206
1207 let features = base_features(ProtectionType::Unprotected);
1208 let disk_option = DiskOption::default();
1209 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1210 let mut num_sectors = [0u8; 4];
1211 b.read_config(0, &mut num_sectors);
1212 assert_eq!([0x08, 0x00, 0x00, 0x00], num_sectors);
1214 let mut msw_sectors = [0u8; 4];
1215 b.read_config(4, &mut msw_sectors);
1216 assert_eq!([0x00, 0x00, 0x00, 0x00], msw_sectors);
1218 }
1219
1220 #[test]
1221 fn read_block_size() {
1222 let f = tempfile().unwrap();
1223 f.set_len(0x1000).unwrap();
1224
1225 let features = base_features(ProtectionType::Unprotected);
1226 let disk_option = DiskOption {
1227 block_size: 4096,
1228 sparse: false,
1229 ..Default::default()
1230 };
1231 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1232 let mut blk_size = [0u8; 4];
1233 b.read_config(20, &mut blk_size);
1234 assert_eq!([0x00, 0x10, 0x00, 0x00], blk_size);
1236 }
1237
1238 #[test]
1239 fn read_features() {
1240 let tempdir = TempDir::new().unwrap();
1241 let mut path = tempdir.path().to_owned();
1242 path.push("disk_image");
1243
1244 const DEVICE_FEATURE_BITS: u64 = 0xffffff;
1247
1248 {
1250 let f = File::create(&path).unwrap();
1251 let features = base_features(ProtectionType::Unprotected);
1252 let disk_option = DiskOption::default();
1253 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1254 assert_eq!(0x7244, b.features() & DEVICE_FEATURE_BITS);
1258 }
1259
1260 {
1262 let f = File::create(&path).unwrap();
1263 let features = base_features(ProtectionType::Unprotected);
1264 let disk_option = DiskOption {
1265 sparse: false,
1266 ..Default::default()
1267 };
1268 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1269 assert_eq!(0x5244, b.features() & DEVICE_FEATURE_BITS);
1272 }
1273
1274 {
1276 let f = File::create(&path).unwrap();
1277 let features = base_features(ProtectionType::Unprotected);
1278 let disk_option = DiskOption {
1279 read_only: true,
1280 ..Default::default()
1281 };
1282 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1283 assert_eq!(0x1064, b.features() & DEVICE_FEATURE_BITS);
1286 }
1287 }
1288
1289 #[test]
1290 fn check_pci_adress_configurability() {
1291 let f = tempfile().unwrap();
1292
1293 let features = base_features(ProtectionType::Unprotected);
1294 let disk_option = DiskOption {
1295 pci_address: Some(PciAddress {
1296 bus: 0,
1297 dev: 1,
1298 func: 1,
1299 }),
1300 ..Default::default()
1301 };
1302 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1303
1304 assert_eq!(b.pci_address(), disk_option.pci_address);
1305 }
1306
1307 #[test]
1308 fn check_runtime_blk_queue_configurability() {
1309 let tempdir = TempDir::new().unwrap();
1310 let mut path = tempdir.path().to_owned();
1311 path.push("disk_image");
1312 let features = base_features(ProtectionType::Unprotected);
1313
1314 let f = File::create(&path).unwrap();
1316 let disk_option = DiskOption::default();
1317 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1318 assert_eq!(
1319 [DEFAULT_QUEUE_SIZE; DEFAULT_NUM_QUEUES as usize],
1320 b.queue_max_sizes()
1321 );
1322
1323 let f = File::create(&path).unwrap();
1325 let disk_option = DiskOption::default();
1326 let b = BlockAsync::new(
1327 features,
1328 Box::new(f),
1329 &disk_option,
1330 None,
1331 Some(128),
1332 Some(1),
1333 )
1334 .unwrap();
1335 assert_eq!([128; 1], b.queue_max_sizes());
1336 assert_eq!(0, b.features() & (1 << VIRTIO_BLK_F_MQ) as u64);
1338 }
1339
1340 #[test]
1341 fn read_last_sector() {
1342 let ex = Executor::new().expect("creating an executor failed");
1343
1344 let f = tempfile().unwrap();
1345 let disk_size = 0x1000;
1346 f.set_len(disk_size).unwrap();
1347 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1348
1349 let mem = Rc::new(
1350 GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1351 .expect("Creating guest memory failed."),
1352 );
1353
1354 let req_hdr = virtio_blk_req_header {
1355 req_type: Le32::from(VIRTIO_BLK_T_IN),
1356 reserved: Le32::from(0),
1357 sector: Le64::from(7), };
1359 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1360 .expect("writing req failed");
1361
1362 let mut avail_desc = create_descriptor_chain(
1363 &mem,
1364 GuestAddress(0x100), GuestAddress(0x1000), vec![
1367 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1369 (DescriptorType::Writable, 512),
1371 (DescriptorType::Writable, 1),
1373 ],
1374 0,
1375 )
1376 .expect("create_descriptor_chain failed");
1377
1378 let timer = Timer::new().expect("Failed to create a timer");
1379 let flush_timer = Rc::new(RefCell::new(
1380 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1381 ));
1382 let flush_timer_armed = Rc::new(RefCell::new(false));
1383
1384 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1385 disk_image: Box::new(af),
1386 read_only: false,
1387 sparse: true,
1388 id: Default::default(),
1389 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1390 disk_size: Arc::new(AtomicU64::new(disk_size)),
1391 })),
1392 }));
1393
1394 let fut = process_one_request(
1395 &mut avail_desc,
1396 &disk_state,
1397 &flush_timer,
1398 &flush_timer_armed,
1399 );
1400
1401 ex.run_until(fut)
1402 .expect("running executor failed")
1403 .expect("execute failed");
1404
1405 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1406 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1407 assert_eq!(status, VIRTIO_BLK_S_OK);
1408 }
1409
1410 #[test]
1411 fn read_beyond_last_sector() {
1412 let f = tempfile().unwrap();
1413 let disk_size = 0x1000;
1414 f.set_len(disk_size).unwrap();
1415 let mem = Rc::new(
1416 GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1417 .expect("Creating guest memory failed."),
1418 );
1419
1420 let req_hdr = virtio_blk_req_header {
1421 req_type: Le32::from(VIRTIO_BLK_T_IN),
1422 reserved: Le32::from(0),
1423 sector: Le64::from(7), };
1425 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1426 .expect("writing req failed");
1427
1428 let mut avail_desc = create_descriptor_chain(
1429 &mem,
1430 GuestAddress(0x100), GuestAddress(0x1000), vec![
1433 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1435 (DescriptorType::Writable, 512 * 2),
1437 (DescriptorType::Writable, 1),
1439 ],
1440 0,
1441 )
1442 .expect("create_descriptor_chain failed");
1443
1444 let ex = Executor::new().expect("creating an executor failed");
1445
1446 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1447 let timer = Timer::new().expect("Failed to create a timer");
1448 let flush_timer = Rc::new(RefCell::new(
1449 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1450 ));
1451 let flush_timer_armed = Rc::new(RefCell::new(false));
1452 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1453 disk_image: Box::new(af),
1454 read_only: false,
1455 sparse: true,
1456 id: Default::default(),
1457 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1458 disk_size: Arc::new(AtomicU64::new(disk_size)),
1459 })),
1460 }));
1461
1462 let fut = process_one_request(
1463 &mut avail_desc,
1464 &disk_state,
1465 &flush_timer,
1466 &flush_timer_armed,
1467 );
1468
1469 ex.run_until(fut)
1470 .expect("running executor failed")
1471 .expect("execute failed");
1472
1473 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512 * 2) as u64);
1474 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1475 assert_eq!(status, VIRTIO_BLK_S_IOERR);
1476 }
1477
1478 #[test]
1479 fn get_id() {
1480 let ex = Executor::new().expect("creating an executor failed");
1481
1482 let f = tempfile().unwrap();
1483 let disk_size = 0x1000;
1484 f.set_len(disk_size).unwrap();
1485
1486 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1487 .expect("Creating guest memory failed.");
1488
1489 let req_hdr = virtio_blk_req_header {
1490 req_type: Le32::from(VIRTIO_BLK_T_GET_ID),
1491 reserved: Le32::from(0),
1492 sector: Le64::from(0),
1493 };
1494 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1495 .expect("writing req failed");
1496
1497 let mut avail_desc = create_descriptor_chain(
1498 &mem,
1499 GuestAddress(0x100), GuestAddress(0x1000), vec![
1502 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1504 (DescriptorType::Writable, 20),
1506 (DescriptorType::Writable, 1),
1508 ],
1509 0,
1510 )
1511 .expect("create_descriptor_chain failed");
1512
1513 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1514 let timer = Timer::new().expect("Failed to create a timer");
1515 let flush_timer = Rc::new(RefCell::new(
1516 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1517 ));
1518 let flush_timer_armed = Rc::new(RefCell::new(false));
1519
1520 let id = b"a20-byteserialnumber";
1521
1522 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1523 disk_image: Box::new(af),
1524 read_only: false,
1525 sparse: true,
1526 id: *id,
1527 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1528 disk_size: Arc::new(AtomicU64::new(disk_size)),
1529 })),
1530 }));
1531
1532 let fut = process_one_request(
1533 &mut avail_desc,
1534 &disk_state,
1535 &flush_timer,
1536 &flush_timer_armed,
1537 );
1538
1539 ex.run_until(fut)
1540 .expect("running executor failed")
1541 .expect("execute failed");
1542
1543 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1544 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1545 assert_eq!(status, VIRTIO_BLK_S_OK);
1546
1547 let id_offset = GuestAddress(0x1000 + size_of_val(&req_hdr) as u64);
1548 let returned_id = mem.read_obj_from_addr::<[u8; 20]>(id_offset).unwrap();
1549 assert_eq!(returned_id, *id);
1550 }
1551
1552 #[test]
1553 fn reset_and_reactivate_single_worker() {
1554 reset_and_reactivate(false, None);
1555 }
1556
1557 #[test]
1558 fn reset_and_reactivate_multiple_workers() {
1559 reset_and_reactivate(true, None);
1560 }
1561
1562 #[test]
1563 #[cfg(windows)]
1564 fn reset_and_reactivate_overrlapped_io() {
1565 reset_and_reactivate(
1566 false,
1567 Some(
1568 cros_async::sys::windows::ExecutorKindSys::Overlapped { concurrency: None }.into(),
1569 ),
1570 );
1571 }
1572
1573 fn reset_and_reactivate(
1574 enables_multiple_workers: bool,
1575 async_executor: Option<cros_async::ExecutorKind>,
1576 ) {
1577 let f = tempfile::NamedTempFile::new().unwrap();
1579 f.as_file().set_len(0x1000).unwrap();
1580 let path: tempfile::TempPath = f.into_temp_path();
1583
1584 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1586 .expect("Creating guest memory failed.");
1587
1588 let (_control_tube, control_tube_device) = Tube::pair().unwrap();
1592
1593 let features = base_features(ProtectionType::Unprotected);
1595 let id = b"Block serial number\0";
1596 let disk_option = DiskOption {
1597 path: path.to_path_buf(),
1598 read_only: true,
1599 id: Some(*id),
1600 sparse: false,
1601 multiple_workers: enables_multiple_workers,
1602 async_executor,
1603 ..Default::default()
1604 };
1605 let disk_image = disk_option.open().unwrap();
1606 let mut b = BlockAsync::new(
1607 features,
1608 disk_image,
1609 &disk_option,
1610 Some(control_tube_device),
1611 None,
1612 None,
1613 )
1614 .unwrap();
1615
1616 let interrupt = Interrupt::new_for_test();
1617
1618 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1620 q0.set_ready(true);
1621 let q0 = q0
1622 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1623 .expect("QueueConfig::activate");
1624
1625 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1626 q1.set_ready(true);
1627 let q1 = q1
1628 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1629 .expect("QueueConfig::activate");
1630
1631 b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1632 .expect("activate should succeed");
1633 if !enables_multiple_workers {
1635 assert!(
1636 b.disk_image.is_none(),
1637 "BlockAsync should not have a disk image"
1638 );
1639 }
1640 assert!(
1641 b.control_tube.is_none(),
1642 "BlockAsync should not have a control tube"
1643 );
1644 assert_eq!(
1645 b.worker_threads.len(),
1646 if enables_multiple_workers { 2 } else { 1 }
1647 );
1648
1649 assert!(b.reset().is_ok(), "reset should succeed");
1651 if !enables_multiple_workers {
1652 assert!(
1653 b.disk_image.is_none(),
1654 "BlockAsync should not have a disk image"
1655 );
1656 }
1657 assert!(
1658 b.control_tube.is_none(),
1659 "BlockAsync should not have a control tube"
1660 );
1661 assert_eq!(
1662 b.worker_threads.len(),
1663 if enables_multiple_workers { 2 } else { 1 }
1664 );
1665 assert_eq!(b.id, *b"Block serial number\0");
1666
1667 let interrupt = Interrupt::new_for_test();
1669 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1670 q0.set_ready(true);
1671 let q0 = q0
1672 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1673 .expect("QueueConfig::activate");
1674
1675 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1676 q1.set_ready(true);
1677 let q1 = q1
1678 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1679 .expect("QueueConfig::activate");
1680
1681 b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1682 .expect("re-activate should succeed");
1683 }
1684
1685 #[test]
1686 fn resize_with_single_worker() {
1687 resize(false);
1688 }
1689
1690 #[test]
1691 fn resize_with_multiple_workers() {
1692 resize(true);
1694 }
1695
1696 fn resize(enables_multiple_workers: bool) {
1697 let original_size = 0x1000;
1699 let resized_size = 0x2000;
1700
1701 let f = tempfile().unwrap();
1703 f.set_len(original_size).unwrap();
1704 let disk_image: Box<dyn DiskFile> = Box::new(f);
1705 assert_eq!(disk_image.get_len().unwrap(), original_size);
1706
1707 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1709 .expect("Creating guest memory failed.");
1710
1711 let (control_tube, control_tube_device) = Tube::pair().unwrap();
1713
1714 let features = base_features(ProtectionType::Unprotected);
1716 let disk_option = DiskOption {
1717 multiple_workers: enables_multiple_workers,
1718 ..Default::default()
1719 };
1720 let mut b = BlockAsync::new(
1721 features,
1722 disk_image.try_clone().unwrap(),
1723 &disk_option,
1724 Some(control_tube_device),
1725 None,
1726 None,
1727 )
1728 .unwrap();
1729
1730 let interrupt = Interrupt::new_for_test();
1731
1732 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1734 q0.set_ready(true);
1735 let q0 = q0
1736 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1737 .expect("QueueConfig::activate");
1738
1739 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1740 q1.set_ready(true);
1741 let q1 = q1
1742 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1743 .expect("QueueConfig::activate");
1744
1745 b.activate(mem, interrupt.clone(), BTreeMap::from([(0, q0), (1, q1)]))
1746 .expect("activate should succeed");
1747
1748 assert_eq!(
1750 b.disk_size.load(Ordering::Acquire),
1751 original_size,
1752 "disk_size should be the original size first"
1753 );
1754 let mut capacity = [0u8; 8];
1755 b.read_config(0, &mut capacity);
1756 assert_eq!(
1757 capacity,
1758 [0x8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1760 "read_config should read the original capacity first"
1761 );
1762
1763 control_tube
1765 .send(&DiskControlCommand::Resize {
1766 new_size: resized_size,
1767 })
1768 .unwrap();
1769 assert_eq!(
1770 control_tube.recv::<DiskControlResult>().unwrap(),
1771 DiskControlResult::Ok,
1772 "resize command should succeed"
1773 );
1774 assert_eq!(
1775 b.disk_size.load(Ordering::Acquire),
1776 resized_size,
1777 "disk_size should be resized to the new size"
1778 );
1779 assert_eq!(
1780 disk_image.get_len().unwrap(),
1781 resized_size,
1782 "underlying disk image should be resized to the new size"
1783 );
1784 let mut capacity = [0u8; 8];
1785 b.read_config(0, &mut capacity);
1786 assert_eq!(
1787 capacity,
1788 [0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1790 "read_config should read the resized capacity"
1791 );
1792 interrupt
1794 .get_interrupt_evt()
1795 .wait()
1796 .expect("interrupt should be signaled");
1797
1798 assert_eq!(
1799 interrupt.read_interrupt_status(),
1800 crate::virtio::INTERRUPT_STATUS_CONFIG_CHANGED as u8,
1801 "INTERRUPT_STATUS_CONFIG_CHANGED should be signaled"
1802 );
1803 }
1804
1805 #[test]
1806 fn run_worker_threads() {
1807 let f = tempfile().unwrap();
1809 f.set_len(0x1000).unwrap();
1810 let disk_image: Box<dyn DiskFile> = Box::new(f);
1811
1812 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1814 .expect("Creating guest memory failed.");
1815
1816 let features = base_features(ProtectionType::Unprotected);
1818 let disk_option = DiskOption::default();
1819 let mut b = BlockAsync::new(
1820 features,
1821 disk_image.try_clone().unwrap(),
1822 &disk_option,
1823 None,
1824 None,
1825 None,
1826 )
1827 .unwrap();
1828
1829 let interrupt = Interrupt::new_for_test();
1831 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1832 q0.set_ready(true);
1833 let q0 = q0
1834 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1835 .expect("QueueConfig::activate");
1836
1837 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1838 q1.set_ready(true);
1839 let q1 = q1
1840 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1841 .expect("QueueConfig::activate");
1842
1843 b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1844 .expect("activate should succeed");
1845
1846 assert_eq!(b.worker_threads.len(), 1, "1 threads should be spawned.");
1847 drop(b);
1848
1849 let features = base_features(ProtectionType::Unprotected);
1851 let disk_option = DiskOption {
1852 read_only: true,
1853 sparse: false,
1854 multiple_workers: true,
1855 ..DiskOption::default()
1856 };
1857 let mut b = BlockAsync::new(features, disk_image, &disk_option, None, None, None).unwrap();
1858
1859 let interrupt = Interrupt::new_for_test();
1861 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1862 q0.set_ready(true);
1863 let q0 = q0
1864 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1865 .expect("QueueConfig::activate");
1866
1867 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1868 q1.set_ready(true);
1869 let q1 = q1
1870 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1871 .expect("QueueConfig::activate");
1872
1873 b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1874 .expect("activate should succeed");
1875
1876 assert_eq!(b.worker_threads.len(), 2, "2 threads should be spawned.");
1877 }
1878
1879 struct BlockContext {}
1880
1881 fn modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync) {
1882 b.avail_features = !b.avail_features;
1883 }
1884
1885 fn create_device() -> (BlockContext, BlockAsync) {
1886 let f = tempfile().unwrap();
1888 f.set_len(0x1000).unwrap();
1889 let disk_image: Box<dyn DiskFile> = Box::new(f);
1890
1891 let features = base_features(ProtectionType::Unprotected);
1893 let id = b"Block serial number\0";
1894 let disk_option = DiskOption {
1895 read_only: true,
1896 id: Some(*id),
1897 sparse: false,
1898 multiple_workers: true,
1899 ..Default::default()
1900 };
1901 (
1902 BlockContext {},
1903 BlockAsync::new(
1904 features,
1905 disk_image.try_clone().unwrap(),
1906 &disk_option,
1907 None,
1908 None,
1909 None,
1910 )
1911 .unwrap(),
1912 )
1913 }
1914
1915 #[cfg(any(target_os = "android", target_os = "linux"))]
1916 suspendable_virtio_tests!(asyncblock, create_device, 2, modify_device);
1917}