crosvm/crosvm/sys/linux/
pci_hotplug_manager.rs

1// Copyright 2023 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A high-level manager for hotplug PCI devices.
6
7// TODO(b/243767476): Support aarch64.
8use std::cmp::Ordering;
9use std::collections::BTreeMap;
10use std::collections::HashMap;
11use std::collections::VecDeque;
12use std::sync::mpsc;
13use std::sync::Arc;
14
15use anyhow::anyhow;
16use anyhow::bail;
17use anyhow::Context;
18use anyhow::Error;
19use arch::RunnableLinuxVm;
20use arch::VcpuArch;
21use arch::VmArch;
22use base::AsRawDescriptor;
23use base::Event;
24use base::EventToken;
25use base::RawDescriptor;
26use base::WaitContext;
27use base::WorkerThread;
28use devices::BusDevice;
29use devices::HotPlugBus;
30use devices::HotPlugKey;
31use devices::IrqEventSource;
32use devices::IrqLevelEvent;
33use devices::PciAddress;
34use devices::PciInterruptPin;
35use devices::PciRootCommand;
36use devices::ResourceCarrier;
37use log::error;
38use resources::SystemAllocator;
39#[cfg(feature = "swap")]
40use swap::SwapDeviceHelper;
41use sync::Mutex;
42use vm_memory::GuestMemory;
43
44use crate::crosvm::sys::linux::JailWarden;
45use crate::crosvm::sys::linux::JailWardenImpl;
46use crate::crosvm::sys::linux::PermissiveJailWarden;
47use crate::Config;
48
49pub type Result<T> = std::result::Result<T, Error>;
50
51/// PciHotPlugManager manages hotplug ports, and handles PCI device hot plug and hot removal.
52pub struct PciHotPlugManager {
53    /// map of ports managed
54    port_stubs: BTreeMap<PciAddress, PortManagerStub>,
55    /// map of downstream bus to upstream PCI address
56    bus_address_map: BTreeMap<u8, PciAddress>,
57    /// JailWarden for jailing hotplug devices
58    jail_warden: Box<dyn JailWarden>,
59    /// Client on Manager side of PciHotPlugWorker
60    worker_client: Option<WorkerClient>,
61}
62
63/// WorkerClient is a wrapper of the worker methods.
64struct WorkerClient {
65    /// event to signal control command is sent
66    control_evt: Event,
67    /// control channel to worker
68    command_sender: mpsc::Sender<WorkerCommand>,
69    /// response channel from worker
70    response_receiver: mpsc::Receiver<WorkerResponse>,
71    _worker_thread: WorkerThread<()>,
72}
73
74impl WorkerClient {
75    /// Constructs PciHotPlugWorker with its client.
76    fn new(rootbus_controller: mpsc::Sender<PciRootCommand>) -> Result<Self> {
77        let (command_sender, command_receiver) = mpsc::channel();
78        let (response_sender, response_receiver) = mpsc::channel();
79        let control_evt = Event::new()?;
80        let control_evt_cpy = control_evt.try_clone()?;
81        let worker_thread = WorkerThread::start("pcihp_mgr_workr", move |kill_evt| {
82            if let Err(e) = PciHotPlugWorker::new(
83                rootbus_controller,
84                command_receiver,
85                response_sender,
86                control_evt_cpy,
87                &kill_evt,
88            )
89            .and_then(move |mut worker| worker.run(kill_evt))
90            {
91                error!("PciHotPlugManager worker failed: {e:#}");
92            }
93        });
94        Ok(WorkerClient {
95            control_evt,
96            command_sender,
97            response_receiver,
98            _worker_thread: worker_thread,
99        })
100    }
101
102    /// Sends worker command, and wait for its response.
103    fn send_worker_command(&self, command: WorkerCommand) -> Result<WorkerResponse> {
104        self.command_sender.send(command)?;
105        self.control_evt.signal()?;
106        Ok(self.response_receiver.recv()?)
107    }
108}
109
110/// PortManagerStub is the manager-side copy of a port.
111struct PortManagerStub {
112    /// index of downstream bus
113    downstream_bus: u8,
114    /// Map of hotplugged devices, and system resources that can be released when device is
115    /// removed.
116    devices: HashMap<PciAddress, RecoverableResource>,
117}
118
119/// System resources that can be released when a hotplugged device is removed.
120struct RecoverableResource {
121    irq_num: u32,
122    irq_evt: IrqLevelEvent,
123}
124
125/// Control commands to worker.
126enum WorkerCommand {
127    /// Add port to the worker.
128    AddPort(PciAddress, PortWorkerStub),
129    /// Get the state of the port.
130    GetPortState(PciAddress),
131    /// Get an empty port for hotplug. Returns the least port sorted by PortKey.
132    GetEmptyPort,
133    /// Signals hot plug on port. Changes an empty port to occupied.
134    SignalHotPlug(SignalHotPlugCommand),
135    /// Signals hot unplug on port. Changes an occupied port to empty.
136    SignalHotUnplug(PciAddress),
137}
138
139#[derive(Clone)]
140struct GuestDeviceStub {
141    pci_addr: PciAddress,
142    key: HotPlugKey,
143    device: Arc<Mutex<dyn BusDevice>>,
144}
145
146#[derive(Clone)]
147struct SignalHotPlugCommand {
148    /// the upstream address of hotplug port
149    upstream_address: PciAddress,
150    /// the array of guest devices on the port
151    guest_devices: Vec<GuestDeviceStub>,
152}
153
154impl SignalHotPlugCommand {
155    fn new(upstream_address: PciAddress, guest_devices: Vec<GuestDeviceStub>) -> Result<Self> {
156        if guest_devices.is_empty() {
157            bail!("No guest devices");
158        }
159        Ok(Self {
160            upstream_address,
161            guest_devices,
162        })
163    }
164}
165
166/// PortWorkerStub is the worker-side copy of a port.
167#[derive(Clone)]
168struct PortWorkerStub {
169    /// The downstream base address of the port. Needed to send plug and unplug signal.
170    base_address: PciAddress,
171    /// Currently attached devices that should be removed.
172    attached_devices: Vec<PciAddress>,
173    /// Devices to be added each time send_hot_plug_signal is called.
174    devices_to_add: VecDeque<Vec<GuestDeviceStub>>,
175    /// hotplug port
176    port: Arc<Mutex<dyn HotPlugBus>>,
177}
178
179impl PortWorkerStub {
180    fn new(port: Arc<Mutex<dyn HotPlugBus>>, downstream_bus: u8) -> Result<Self> {
181        let base_address = PciAddress::new(0, downstream_bus.into(), 0, 0)?;
182        Ok(Self {
183            base_address,
184            devices_to_add: VecDeque::new(),
185            attached_devices: Vec::new(),
186            port,
187        })
188    }
189
190    fn add_hotplug_devices(&mut self, devices: Vec<GuestDeviceStub>) -> Result<()> {
191        if devices.is_empty() {
192            bail!("No guest devices");
193        }
194        self.devices_to_add.push_back(devices);
195        Ok(())
196    }
197
198    fn cancel_queued_add(&mut self) -> Result<()> {
199        self.devices_to_add
200            .pop_back()
201            .context("No guest device add queued")?;
202        Ok(())
203    }
204
205    fn send_hot_plug_signal(
206        &mut self,
207        rootbus_controller: &mpsc::Sender<PciRootCommand>,
208    ) -> Result<Event> {
209        let mut port_lock = self.port.lock();
210        let devices = self
211            .devices_to_add
212            .pop_front()
213            .context("Missing devices to add")?;
214        for device in devices {
215            rootbus_controller.send(PciRootCommand::Add(device.pci_addr, device.device))?;
216            self.attached_devices.push(device.pci_addr);
217            port_lock.add_hotplug_device(device.key, device.pci_addr);
218        }
219        port_lock
220            .hot_plug(self.base_address)?
221            .context("hotplug bus does not support command complete notification")
222    }
223
224    fn send_hot_unplug_signal(
225        &mut self,
226        rootbus_controller: &mpsc::Sender<PciRootCommand>,
227    ) -> Result<Event> {
228        for pci_addr in self.attached_devices.drain(..) {
229            rootbus_controller.send(PciRootCommand::Remove(pci_addr))?;
230        }
231        self.port
232            .lock()
233            .hot_unplug(self.base_address)?
234            .context("hotplug bus does not support command complete notification")
235    }
236}
237
238/// Control response from worker.
239#[derive(Debug)]
240enum WorkerResponse {
241    /// AddPort success.
242    AddPortOk,
243    /// GetEmptyPort success, use port at PciAddress.
244    GetEmptyPortOk(PciAddress),
245    /// GetPortState success. The "steps behind" field shall be considered expired, and the guest
246    /// is "less than or equal to" n steps behind.
247    GetPortStateOk(PortState),
248    /// SignalHotPlug or SignalHotUnplug success.
249    SignalOk,
250    /// Command fail because it is not valid.
251    InvalidCommand(Error),
252}
253
254impl PartialEq for WorkerResponse {
255    fn eq(&self, other: &Self) -> bool {
256        match (self, other) {
257            (Self::GetEmptyPortOk(l0), Self::GetEmptyPortOk(r0)) => l0 == r0,
258            (Self::GetPortStateOk(l0), Self::GetPortStateOk(r0)) => l0 == r0,
259            (Self::InvalidCommand(_), Self::InvalidCommand(_)) => true,
260            _ => core::mem::discriminant(self) == core::mem::discriminant(other),
261        }
262    }
263}
264
265#[derive(Debug, EventToken)]
266enum Token {
267    Kill,
268    ManagerCommand,
269    PortReady(RawDescriptor),
270    PlugComplete(RawDescriptor),
271    UnplugComplete(RawDescriptor),
272}
273
274/// PciHotPlugWorker is a worker that handles the asynchrony of slot states between crosvm and the
275/// guest OS. It is responsible for scheduling the PCIe slot control signals and handle its result.
276struct PciHotPlugWorker {
277    event_map: BTreeMap<RawDescriptor, (Event, PciAddress)>,
278    port_state_map: BTreeMap<PciAddress, PortState>,
279    port_map: BTreeMap<PortKey, PortWorkerStub>,
280    manager_evt: Event,
281    wait_ctx: WaitContext<Token>,
282    command_receiver: mpsc::Receiver<WorkerCommand>,
283    response_sender: mpsc::Sender<WorkerResponse>,
284    rootbus_controller: mpsc::Sender<PciRootCommand>,
285}
286
287impl PciHotPlugWorker {
288    fn new(
289        rootbus_controller: mpsc::Sender<PciRootCommand>,
290        command_receiver: mpsc::Receiver<WorkerCommand>,
291        response_sender: mpsc::Sender<WorkerResponse>,
292        manager_evt: Event,
293        kill_evt: &Event,
294    ) -> Result<Self> {
295        let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
296            (&manager_evt, Token::ManagerCommand),
297            (kill_evt, Token::Kill),
298        ])?;
299        Ok(Self {
300            event_map: BTreeMap::new(),
301            port_state_map: BTreeMap::new(),
302            port_map: BTreeMap::new(),
303            manager_evt,
304            wait_ctx,
305            command_receiver,
306            response_sender,
307            rootbus_controller,
308        })
309    }
310
311    /// Starts the worker. Runs until received kill request, or an error that the worker is in an
312    /// invalid state.
313    fn run(&mut self, kill_evt: Event) -> Result<()> {
314        'wait: loop {
315            let events = self.wait_ctx.wait()?;
316            for triggered_event in events.iter().filter(|e| e.is_readable) {
317                match triggered_event.token {
318                    Token::ManagerCommand => {
319                        self.manager_evt.wait()?;
320                        self.handle_manager_command()?;
321                    }
322                    Token::PortReady(descriptor) => {
323                        let (event, pci_address) = self
324                            .event_map
325                            .remove(&descriptor)
326                            .context("Cannot find event")?;
327                        event.wait()?;
328                        self.wait_ctx.delete(&event)?;
329                        self.handle_port_ready(pci_address)?;
330                    }
331                    Token::PlugComplete(descriptor) => {
332                        let (event, pci_address) = self
333                            .event_map
334                            .remove(&descriptor)
335                            .context("Cannot find event")?;
336                        event.wait()?;
337                        self.wait_ctx.delete(&event)?;
338                        self.handle_plug_complete(pci_address)?;
339                    }
340                    Token::UnplugComplete(descriptor) => {
341                        let (event, pci_address) = self
342                            .event_map
343                            .remove(&descriptor)
344                            .context("Cannot find event")?;
345                        self.wait_ctx.delete(&event)?;
346                        self.handle_unplug_complete(pci_address)?;
347                    }
348                    Token::Kill => {
349                        let _ = kill_evt.wait();
350                        break 'wait;
351                    }
352                }
353            }
354        }
355        Ok(())
356    }
357
358    fn handle_manager_command(&mut self) -> Result<()> {
359        let response = match self.command_receiver.recv()? {
360            WorkerCommand::AddPort(pci_address, port) => self.handle_add_port(pci_address, port),
361            WorkerCommand::GetPortState(pci_address) => self.handle_get_port_state(pci_address),
362            WorkerCommand::GetEmptyPort => self.handle_get_empty_port(),
363            WorkerCommand::SignalHotPlug(hotplug_command) => {
364                self.handle_plug_request(hotplug_command)
365            }
366            WorkerCommand::SignalHotUnplug(pci_address) => self.handle_unplug_request(pci_address),
367        }?;
368        Ok(self.response_sender.send(response)?)
369    }
370
371    /// Handles add port: Initiate port in EmptyNotReady state.
372    fn handle_add_port(
373        &mut self,
374        pci_address: PciAddress,
375        port: PortWorkerStub,
376    ) -> Result<WorkerResponse> {
377        if self.port_state_map.contains_key(&pci_address) {
378            return Ok(WorkerResponse::InvalidCommand(anyhow!(
379                "Conflicting upstream PCI address"
380            )));
381        }
382        let port_state = PortState::EmptyNotReady;
383        let port_ready_event = port.port.lock().get_ready_notification()?;
384        self.wait_ctx.add(
385            &port_ready_event,
386            Token::PortReady(port_ready_event.as_raw_descriptor()),
387        )?;
388        self.event_map.insert(
389            port_ready_event.as_raw_descriptor(),
390            (port_ready_event, pci_address),
391        );
392        self.port_state_map.insert(pci_address, port_state);
393        self.port_map.insert(
394            PortKey {
395                port_state,
396                pci_address,
397            },
398            port,
399        );
400        Ok(WorkerResponse::AddPortOk)
401    }
402
403    /// Handles get port state: returns the PortState.
404    fn handle_get_port_state(&self, pci_address: PciAddress) -> Result<WorkerResponse> {
405        match self.get_port_state(pci_address) {
406            Ok(ps) => Ok(WorkerResponse::GetPortStateOk(ps)),
407            Err(e) => Ok(WorkerResponse::InvalidCommand(e)),
408        }
409    }
410
411    /// Handle getting empty port: Find the most empty port, or return error if all are occupied.
412    fn handle_get_empty_port(&self) -> Result<WorkerResponse> {
413        let most_empty_port = match self.port_map.first_key_value() {
414            Some(p) => p.0,
415            None => return Ok(WorkerResponse::InvalidCommand(anyhow!("No ports added"))),
416        };
417        match most_empty_port.port_state {
418            PortState::Empty(_) | PortState::EmptyNotReady => {
419                Ok(WorkerResponse::GetEmptyPortOk(most_empty_port.pci_address))
420            }
421            PortState::Occupied(_) | PortState::OccupiedNotReady => {
422                Ok(WorkerResponse::InvalidCommand(anyhow!("No empty port")))
423            }
424        }
425    }
426
427    /// Handles plug request: Moves PortState from EmptyNotReady to OccupiedNotReady, Empty(n) to
428    /// Occupied(n+1), and schedules the next plug event if n == 0.
429    fn handle_plug_request(
430        &mut self,
431        hotplug_command: SignalHotPlugCommand,
432    ) -> Result<WorkerResponse> {
433        let pci_address = hotplug_command.upstream_address;
434        let next_state = match self.get_port_state(pci_address) {
435            Ok(PortState::Empty(n)) => {
436                self.get_port_mut(pci_address)?
437                    .add_hotplug_devices(hotplug_command.guest_devices)?;
438                if n == 0 {
439                    self.schedule_plug_event(pci_address)?;
440                }
441                PortState::Occupied(n + 1)
442            }
443            Ok(PortState::EmptyNotReady) => {
444                self.get_port_mut(pci_address)?
445                    .add_hotplug_devices(hotplug_command.guest_devices)?;
446                PortState::OccupiedNotReady
447            }
448            Ok(PortState::Occupied(_)) | Ok(PortState::OccupiedNotReady) => {
449                return Ok(WorkerResponse::InvalidCommand(anyhow!(
450                    "Attempt to plug into an occupied port"
451                )))
452            }
453            Err(e) => return Ok(WorkerResponse::InvalidCommand(e)),
454        };
455        self.set_port_state(pci_address, next_state)?;
456        Ok(WorkerResponse::SignalOk)
457    }
458
459    /// Handles unplug request: Moves PortState from OccupiedNotReady to EmptyNotReady, Occupied(n)
460    /// to Empty(n % 2 + 1), and schedules the next unplug event if n == 0.
461    ///
462    /// n % 2 + 1: When unplug request is made, it either schedule the unplug event
463    /// (n == 0 => 1 or n == 1 => 2), or cancels the corresponding plug event that has not started
464    /// (n == 2 => 1 or n == 3 => 2). Staring at the mapping, it maps n to either 1 or 2 of opposite
465    /// oddity. n % 2 + 1 is a good shorthand instead of the individual mappings.
466    fn handle_unplug_request(&mut self, pci_address: PciAddress) -> Result<WorkerResponse> {
467        let next_state = match self.get_port_state(pci_address) {
468            Ok(PortState::Occupied(n)) => {
469                if n >= 2 {
470                    self.get_port_mut(pci_address)?.cancel_queued_add()?;
471                }
472                if n == 0 {
473                    self.schedule_unplug_event(pci_address)?;
474                }
475                PortState::Empty(n % 2 + 1)
476            }
477            Ok(PortState::OccupiedNotReady) => PortState::EmptyNotReady,
478            Ok(PortState::Empty(_)) | Ok(PortState::EmptyNotReady) => {
479                return Ok(WorkerResponse::InvalidCommand(anyhow!(
480                    "Attempt to unplug from an empty port"
481                )))
482            }
483            Err(e) => return Ok(WorkerResponse::InvalidCommand(e)),
484        };
485        self.set_port_state(pci_address, next_state)?;
486        Ok(WorkerResponse::SignalOk)
487    }
488
489    /// Handles port ready: Moves PortState from EmptyNotReady to Empty(0), OccupiedNotReady to
490    /// Occupied(1), and schedules the next event if port is occupied
491    fn handle_port_ready(&mut self, pci_address: PciAddress) -> Result<()> {
492        let next_state = match self.get_port_state(pci_address)? {
493            PortState::EmptyNotReady => PortState::Empty(0),
494            PortState::OccupiedNotReady => {
495                self.schedule_plug_event(pci_address)?;
496                PortState::Occupied(1)
497            }
498            PortState::Empty(_) | PortState::Occupied(_) => {
499                bail!("Received port ready on an already enabled port");
500            }
501        };
502        self.set_port_state(pci_address, next_state)
503    }
504
505    /// Handles plug complete: Moves PortState from Any(n) to Any(n-1), and schedules the next
506    /// unplug event unless n == 1. (Any is either Empty or Occupied.)
507    fn handle_plug_complete(&mut self, pci_address: PciAddress) -> Result<()> {
508        let (n, next_state) = match self.get_port_state(pci_address)? {
509            // Note: n - 1 >= 0 as otherwise there would be no pending events.
510            PortState::Empty(n) => (n, PortState::Empty(n - 1)),
511            PortState::Occupied(n) => (n, PortState::Occupied(n - 1)),
512            PortState::EmptyNotReady | PortState::OccupiedNotReady => {
513                bail!("Received plug completed on a not enabled port");
514            }
515        };
516        if n > 1 {
517            self.schedule_unplug_event(pci_address)?;
518        }
519        self.set_port_state(pci_address, next_state)
520    }
521
522    /// Handles unplug complete: Moves PortState from Any(n) to Any(n-1), and schedules the next
523    /// plug event unless n == 1. (Any is either Empty or Occupied.)
524    fn handle_unplug_complete(&mut self, pci_address: PciAddress) -> Result<()> {
525        let (n, next_state) = match self.get_port_state(pci_address)? {
526            // Note: n - 1 >= 0 as otherwise there would be no pending events.
527            PortState::Empty(n) => (n, PortState::Empty(n - 1)),
528            PortState::Occupied(n) => (n, PortState::Occupied(n - 1)),
529            PortState::EmptyNotReady | PortState::OccupiedNotReady => {
530                bail!("Received unplug completed on a not enabled port");
531            }
532        };
533        if n > 1 {
534            self.schedule_plug_event(pci_address)?;
535        }
536        self.set_port_state(pci_address, next_state)
537    }
538
539    fn get_port_state(&self, pci_address: PciAddress) -> Result<PortState> {
540        Ok(*self
541            .port_state_map
542            .get(&pci_address)
543            .with_context(|| format!("Cannot find port state on {pci_address}"))?)
544    }
545
546    fn set_port_state(&mut self, pci_address: PciAddress, port_state: PortState) -> Result<()> {
547        let old_port_state = self.get_port_state(pci_address)?;
548        let port = self
549            .port_map
550            .remove(&PortKey {
551                port_state: old_port_state,
552                pci_address,
553            })
554            .context("Cannot find port")?;
555        self.port_map.insert(
556            PortKey {
557                port_state,
558                pci_address,
559            },
560            port,
561        );
562        self.port_state_map.insert(pci_address, port_state);
563        Ok(())
564    }
565
566    fn schedule_plug_event(&mut self, pci_address: PciAddress) -> Result<()> {
567        let rootbus_controller = self.rootbus_controller.clone();
568        let plug_event = self
569            .get_port_mut(pci_address)?
570            .send_hot_plug_signal(&rootbus_controller)?;
571        self.wait_ctx.add(
572            &plug_event,
573            Token::PlugComplete(plug_event.as_raw_descriptor()),
574        )?;
575        self.event_map
576            .insert(plug_event.as_raw_descriptor(), (plug_event, pci_address));
577        Ok(())
578    }
579
580    fn schedule_unplug_event(&mut self, pci_address: PciAddress) -> Result<()> {
581        let rootbus_controller = self.rootbus_controller.clone();
582        let unplug_event = self
583            .get_port_mut(pci_address)?
584            .send_hot_unplug_signal(&rootbus_controller)?;
585        self.wait_ctx.add(
586            &unplug_event,
587            Token::UnplugComplete(unplug_event.as_raw_descriptor()),
588        )?;
589        self.event_map.insert(
590            unplug_event.as_raw_descriptor(),
591            (unplug_event, pci_address),
592        );
593        Ok(())
594    }
595
596    fn get_port_mut(&mut self, pci_address: PciAddress) -> Result<&mut PortWorkerStub> {
597        let port_state = self.get_port_state(pci_address)?;
598        self.port_map
599            .get_mut(&PortKey {
600                port_state,
601                pci_address,
602            })
603            .context("PciHotPlugWorker is in invalid state")
604    }
605}
606
607/// PortState indicates the state of the port.
608///
609/// The initial PortState is EmptyNotReady (EmpNR). 9 PortStates are possible, and transition
610/// between the states are only possible by the following 3 groups of functions:
611/// handle_port_ready(R): guest notification of port ready to accept hot plug events.
612/// handle_plug_request(P) and handle_unplug_request(U): host initated requests.
613/// handle_plug_complete(PC) and handle_unplug_complete(UC): guest notification of event completion.
614/// When a port is not ready, PC and UC are not expected as no events are scheduled.
615/// The state transition is as follows:
616///    Emp0<-UC--Emp1<-PC--Emp2            |
617///  ^     \    ^    \^   ^    \^          |
618/// /       P  /      P\ /      P\         |
619/// |        \/        \\        \\        |
620/// |        /\        /\\        \\       |
621/// R       U  \      U  \U        \U      |
622/// |      /    v    /    v\        v\     |
623/// |  Occ0<-PC--Occ1<-UC--Occ2<-PC--Occ3  |
624/// |              ^                       |
625/// \              R                       |
626///   EmpNR<-P,U->OccNR                    |
627
628#[derive(Clone, Copy, Debug, PartialEq, Eq)]
629enum PortState {
630    /// Port is empty on crosvm. The state on the guest OS is n steps behind.
631    Empty(u8),
632    /// Port is empty on crosvm. The port is not enabled on the guest OS yet.
633    EmptyNotReady,
634    /// Port is occupied on crosvm. The state on the guest OS is n steps behind.
635    Occupied(u8),
636    /// Port is occupied on crosvm. The port is not enabled on the guest OS yet.
637    OccupiedNotReady,
638}
639
640impl PortState {
641    fn variant_order_index(&self) -> u8 {
642        match self {
643            PortState::Empty(_) => 0,
644            PortState::EmptyNotReady => 1,
645            PortState::Occupied(_) => 2,
646            PortState::OccupiedNotReady => 3,
647        }
648    }
649}
650
651/// Ordering on PortState defined by "most empty".
652impl Ord for PortState {
653    fn cmp(&self, other: &Self) -> Ordering {
654        // First compare by the variant: Empty < EmptyNotReady < Occupied < OccupiedNotReady.
655        match self.variant_order_index().cmp(&other.variant_order_index()) {
656            Ordering::Less => {
657                return Ordering::Less;
658            }
659            Ordering::Equal => {}
660            Ordering::Greater => return Ordering::Greater,
661        }
662        // For the diagonals, prioritize ones with less step behind.
663        match (self, other) {
664            (PortState::Empty(lhs), PortState::Empty(rhs)) => lhs.cmp(rhs),
665            (PortState::Occupied(lhs), PortState::Occupied(rhs)) => lhs.cmp(rhs),
666            _ => Ordering::Equal,
667        }
668    }
669}
670
671impl PartialOrd for PortState {
672    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
673        Some(self.cmp(other))
674    }
675}
676
677/// PortKey is a unique identifier of ports with an ordering defined on it.
678///
679/// Ports are ordered by whose downstream device would be discovered first by the guest OS.
680/// Empty ports without pending events are ordered before those with pending events. When multiple
681/// empty ports without pending events are available, they are ordered by PCI enumeration.
682#[derive(PartialEq, Eq, PartialOrd, Ord)]
683struct PortKey {
684    port_state: PortState,
685    pci_address: PciAddress,
686}
687
688impl PciHotPlugManager {
689    /// Constructs PciHotPlugManager.
690    ///
691    /// Constructor uses forking, therefore has to be called early, before crosvm enters a
692    /// multi-threaded context.
693    pub fn new(
694        guest_memory: GuestMemory,
695        config: &Config,
696        #[cfg(feature = "swap")] swap_device_helper: Option<SwapDeviceHelper>,
697    ) -> Result<Self> {
698        let jail_warden: Box<dyn JailWarden> = match config.jail_config {
699            Some(_) => Box::new(
700                JailWardenImpl::new(
701                    guest_memory,
702                    config,
703                    #[cfg(feature = "swap")]
704                    swap_device_helper,
705                )
706                .context("jail warden construction")?,
707            ),
708            None => Box::new(
709                PermissiveJailWarden::new(
710                    guest_memory,
711                    config,
712                    #[cfg(feature = "swap")]
713                    swap_device_helper,
714                )
715                .context("jail warden construction")?,
716            ),
717        };
718        Ok(Self {
719            jail_warden,
720            port_stubs: BTreeMap::new(),
721            bus_address_map: BTreeMap::new(),
722            worker_client: None,
723        })
724    }
725
726    /// Starts PciHotPlugManager. Required before any other commands.
727    ///
728    /// PciHotPlugManager::new must be called in a single-threaded context as it forks.
729    /// However, rootbus_controller is only available after VM boots when crosvm is multi-threaded.
730    ///
731    /// TODO(293801301): Remove unused after aarch64 support
732    #[allow(unused)]
733    pub fn set_rootbus_controller(
734        &mut self,
735        rootbus_controller: mpsc::Sender<PciRootCommand>,
736    ) -> Result<()> {
737        // Spins the PciHotPlugWorker.
738        self.worker_client = Some(WorkerClient::new(rootbus_controller)?);
739        Ok(())
740    }
741
742    /// Adds a hotplug capable port to manage.
743    ///
744    /// PciHotPlugManager assumes exclusive control for adding and removing devices to this port.
745    /// TODO(293801301): Remove unused_variables after aarch64 support
746    #[allow(unused)]
747    pub fn add_port(&mut self, port: Arc<Mutex<dyn HotPlugBus>>) -> Result<()> {
748        let worker_client = self
749            .worker_client
750            .as_ref()
751            .context("No worker thread. Is set_rootbus_controller not called?")?;
752        let port_lock = port.lock();
753        // Rejects hotplug bus with downstream devices.
754        if !port_lock.is_empty() {
755            bail!("invalid hotplug bus");
756        }
757        let pci_address = port_lock
758            .get_address()
759            .context("Hotplug bus PCI address missing")?;
760        // Reject hotplug buses not on rootbus, since otherwise the order of enumeration depends on
761        // the topology of PCI.
762        if pci_address.bus != 0 {
763            bail!("hotplug port on non-root bus not supported");
764        }
765        let downstream_bus = port_lock
766            .get_secondary_bus_number()
767            .context("cannot get downstream bus")?;
768        drop(port_lock);
769        if let Some(prev_address) = self.bus_address_map.insert(downstream_bus, pci_address) {
770            bail!(
771                "Downstream bus of new port is conflicting with previous port at {}",
772                &prev_address
773            );
774        }
775        self.port_stubs.insert(
776            pci_address,
777            PortManagerStub {
778                downstream_bus,
779                devices: HashMap::new(),
780            },
781        );
782        match worker_client.send_worker_command(WorkerCommand::AddPort(
783            pci_address,
784            PortWorkerStub::new(port, downstream_bus)?,
785        ))? {
786            WorkerResponse::AddPortOk => Ok(()),
787            WorkerResponse::InvalidCommand(e) => Err(e),
788            r => bail!("Unexpected response from worker: {:?}", &r),
789        }
790    }
791
792    /// hotplugs up to 8 PCI devices as "functions of a device" (in PCI Bus Device Function sense).
793    ///
794    /// returns the bus number of the bus on success.
795    pub fn hotplug_device<V: VmArch, Vcpu: VcpuArch>(
796        &mut self,
797        resource_carriers: Vec<ResourceCarrier>,
798        linux: &mut RunnableLinuxVm<V, Vcpu>,
799        resources: &mut SystemAllocator,
800    ) -> Result<u8> {
801        let worker_client = self
802            .worker_client
803            .as_ref()
804            .context("No worker thread. Is set_rootbus_controller not called?")?;
805        if resource_carriers.len() > 8 || resource_carriers.is_empty() {
806            bail!("PCI function count has to be 1 to 8 inclusive");
807        }
808        let pci_address = match worker_client.send_worker_command(WorkerCommand::GetEmptyPort)? {
809            WorkerResponse::GetEmptyPortOk(p) => Ok(p),
810            WorkerResponse::InvalidCommand(e) => Err(e),
811            r => bail!("Unexpected response from worker: {:?}", &r),
812        }?;
813        let port_stub = self
814            .port_stubs
815            .get_mut(&pci_address)
816            .context("Cannot find port")?;
817        let downstream_bus = port_stub.downstream_bus;
818        let mut devices = Vec::new();
819        for (func_num, mut resource_carrier) in resource_carriers.into_iter().enumerate() {
820            let device_address = PciAddress::new(0, downstream_bus as u32, 0, func_num as u32)?;
821            let hotplug_key = HotPlugKey::GuestDevice {
822                guest_addr: device_address,
823            };
824            resource_carrier.allocate_address(device_address, resources)?;
825            let irq_evt = IrqLevelEvent::new()?;
826            let (pin, irq_num) = match downstream_bus % 4 {
827                0 => (PciInterruptPin::IntA, 0),
828                1 => (PciInterruptPin::IntB, 1),
829                2 => (PciInterruptPin::IntC, 2),
830                _ => (PciInterruptPin::IntD, 3),
831            };
832            resource_carrier.assign_irq(irq_evt.try_clone()?, pin, irq_num);
833            let (proxy_device, pid) = self
834                .jail_warden
835                .make_proxy_device(resource_carrier)
836                .context("make proxy device")?;
837            let device_id = proxy_device.lock().device_id();
838            let device_name = proxy_device.lock().debug_label();
839            linux.irq_chip.as_irq_chip_mut().register_level_irq_event(
840                irq_num,
841                &irq_evt,
842                IrqEventSource {
843                    device_id,
844                    queue_id: 0,
845                    device_name: device_name.clone(),
846                },
847            )?;
848            let pid: u32 = pid.try_into().context("fork fail")?;
849            if pid > 0 {
850                linux.pid_debug_label_map.insert(pid, device_name);
851            }
852            devices.push(GuestDeviceStub {
853                pci_addr: device_address,
854                key: hotplug_key,
855                device: proxy_device,
856            });
857            port_stub
858                .devices
859                .insert(device_address, RecoverableResource { irq_num, irq_evt });
860        }
861        // Ask worker to schedule hotplug signal.
862        match worker_client.send_worker_command(WorkerCommand::SignalHotPlug(
863            SignalHotPlugCommand::new(pci_address, devices)?,
864        ))? {
865            WorkerResponse::SignalOk => Ok(downstream_bus),
866            WorkerResponse::InvalidCommand(e) => Err(e),
867            r => bail!("Unexpected response from worker: {:?}", &r),
868        }
869    }
870
871    /// Removes all hotplugged devices on the hotplug bus.
872    pub fn remove_hotplug_device<V: VmArch, Vcpu: VcpuArch>(
873        &mut self,
874        bus: u8,
875        linux: &mut RunnableLinuxVm<V, Vcpu>,
876        resources: &mut SystemAllocator,
877    ) -> Result<()> {
878        let worker_client = self
879            .worker_client
880            .as_ref()
881            .context("No worker thread. Is set_rootbus_controller not called?")?;
882        let pci_address = self
883            .bus_address_map
884            .get(&bus)
885            .with_context(|| format!("Port {} is not known", &bus))?;
886        match worker_client.send_worker_command(WorkerCommand::GetPortState(*pci_address))? {
887            WorkerResponse::GetPortStateOk(PortState::Occupied(_)) => {}
888            WorkerResponse::GetPortStateOk(PortState::Empty(_)) => {
889                bail!("Port {} is empty", &bus)
890            }
891            WorkerResponse::InvalidCommand(e) => {
892                return Err(e);
893            }
894            wr => bail!("Unexpected response from worker: {:?}", &wr),
895        };
896        // Performs a surprise removal. That is, not waiting for hot removal completion before
897        // deleting the resources.
898        match worker_client.send_worker_command(WorkerCommand::SignalHotUnplug(*pci_address))? {
899            WorkerResponse::SignalOk => {}
900            WorkerResponse::InvalidCommand(e) => {
901                return Err(e);
902            }
903            wr => bail!("Unexpected response from worker: {:?}", &wr),
904        }
905        // Remove all devices on the hotplug bus.
906        let port_stub = self
907            .port_stubs
908            .get_mut(pci_address)
909            .with_context(|| format!("Port {} is not known", &bus))?;
910        for (downstream_address, recoverable_resource) in port_stub.devices.drain() {
911            // port_stub.port does not have remove_hotplug_device method, as devices are removed
912            // when hot_unplug is called.
913            resources.release_pci(downstream_address);
914            linux.irq_chip.unregister_level_irq_event(
915                recoverable_resource.irq_num,
916                &recoverable_resource.irq_evt,
917            )?;
918        }
919        Ok(())
920    }
921}
922
923#[cfg(test)]
924mod tests {
925    use std::thread;
926    use std::time::Duration;
927
928    use devices::MockDevice;
929
930    use super::*;
931
932    /// A MockPort that only supports hot_plug and hot_unplug commands, and signaling command
933    /// complete manually, which is sufficient for PciHotPlugWorker unit test.
934    struct MockPort {
935        cc_event: Event,
936        downstream_bus: u8,
937        ready_events: Vec<Event>,
938    }
939
940    impl MockPort {
941        fn new(downstream_bus: u8) -> Self {
942            Self {
943                cc_event: Event::new().unwrap(),
944                downstream_bus,
945                ready_events: Vec::new(),
946            }
947        }
948
949        fn signal_cc(&self) {
950            self.cc_event.reset().unwrap();
951            self.cc_event.signal().unwrap();
952        }
953
954        fn signal_ready(&mut self) {
955            for event in self.ready_events.drain(..) {
956                event.reset().unwrap();
957                event.signal().unwrap();
958            }
959        }
960    }
961
962    impl HotPlugBus for MockPort {
963        fn hot_plug(&mut self, _addr: PciAddress) -> anyhow::Result<Option<Event>> {
964            self.cc_event = Event::new().unwrap();
965            Ok(Some(self.cc_event.try_clone().unwrap()))
966        }
967
968        fn hot_unplug(&mut self, _addr: PciAddress) -> anyhow::Result<Option<Event>> {
969            self.cc_event = Event::new().unwrap();
970            Ok(Some(self.cc_event.try_clone().unwrap()))
971        }
972
973        fn get_ready_notification(&mut self) -> anyhow::Result<Event> {
974            let event = Event::new()?;
975            self.ready_events.push(event.try_clone()?);
976            Ok(event)
977        }
978
979        fn is_match(&self, _host_addr: PciAddress) -> Option<u8> {
980            None
981        }
982
983        fn get_address(&self) -> Option<PciAddress> {
984            None
985        }
986
987        fn get_secondary_bus_number(&self) -> Option<u8> {
988            Some(self.downstream_bus)
989        }
990
991        fn add_hotplug_device(&mut self, _hotplug_key: HotPlugKey, _guest_addr: PciAddress) {}
992
993        fn get_hotplug_device(&self, _hotplug_key: HotPlugKey) -> Option<PciAddress> {
994            None
995        }
996
997        fn is_empty(&self) -> bool {
998            true
999        }
1000
1001        fn get_hotplug_key(&self) -> Option<HotPlugKey> {
1002            None
1003        }
1004    }
1005
1006    fn new_port(downstream_bus: u8) -> Arc<Mutex<MockPort>> {
1007        Arc::new(Mutex::new(MockPort::new(downstream_bus)))
1008    }
1009
1010    fn poll_until_with_timeout<F>(f: F, timeout: Duration) -> bool
1011    where
1012        F: Fn() -> bool,
1013    {
1014        for _ in 0..timeout.as_millis() {
1015            if f() {
1016                return true;
1017            }
1018            thread::sleep(Duration::from_millis(1));
1019        }
1020        false
1021    }
1022
1023    #[test]
1024    fn worker_empty_port_ordering() {
1025        let (rootbus_controller, _rootbus_recvr) = mpsc::channel();
1026        let client = WorkerClient::new(rootbus_controller).unwrap();
1027        // Port A: upstream 00:01.1, downstream 2.
1028        let upstream_addr_a = PciAddress {
1029            bus: 0,
1030            dev: 1,
1031            func: 1,
1032        };
1033        let bus_a = 2;
1034        let downstream_addr_a = PciAddress {
1035            bus: bus_a,
1036            dev: 0,
1037            func: 0,
1038        };
1039        let hotplug_key_a = HotPlugKey::GuestDevice {
1040            guest_addr: downstream_addr_a,
1041        };
1042        let device_a = GuestDeviceStub {
1043            pci_addr: downstream_addr_a,
1044            key: hotplug_key_a,
1045            device: Arc::new(Mutex::new(MockDevice::new())),
1046        };
1047        let hotplug_command_a =
1048            SignalHotPlugCommand::new(upstream_addr_a, [device_a].to_vec()).unwrap();
1049        let port_a = new_port(bus_a);
1050        // Port B: upstream 00:01.0, downstream 3.
1051        let upstream_addr_b = PciAddress {
1052            bus: 0,
1053            dev: 1,
1054            func: 0,
1055        };
1056        let bus_b = 3;
1057        let downstream_addr_b = PciAddress {
1058            bus: bus_b,
1059            dev: 0,
1060            func: 0,
1061        };
1062        let hotplug_key_b = HotPlugKey::GuestDevice {
1063            guest_addr: downstream_addr_b,
1064        };
1065        let device_b = GuestDeviceStub {
1066            pci_addr: downstream_addr_b,
1067            key: hotplug_key_b,
1068            device: Arc::new(Mutex::new(MockDevice::new())),
1069        };
1070        let hotplug_command_b =
1071            SignalHotPlugCommand::new(upstream_addr_b, [device_b].to_vec()).unwrap();
1072        let port_b = new_port(bus_b);
1073        // Port C: upstream 00:02.0, downstream 4.
1074        let upstream_addr_c = PciAddress {
1075            bus: 0,
1076            dev: 2,
1077            func: 0,
1078        };
1079        let bus_c = 4;
1080        let downstream_addr_c = PciAddress {
1081            bus: bus_c,
1082            dev: 0,
1083            func: 0,
1084        };
1085        let hotplug_key_c = HotPlugKey::GuestDevice {
1086            guest_addr: downstream_addr_c,
1087        };
1088        let device_c = GuestDeviceStub {
1089            pci_addr: downstream_addr_c,
1090            key: hotplug_key_c,
1091            device: Arc::new(Mutex::new(MockDevice::new())),
1092        };
1093        let hotplug_command_c =
1094            SignalHotPlugCommand::new(upstream_addr_c, [device_c].to_vec()).unwrap();
1095        let port_c = new_port(bus_c);
1096        assert_eq!(
1097            WorkerResponse::AddPortOk,
1098            client
1099                .send_worker_command(WorkerCommand::AddPort(
1100                    upstream_addr_a,
1101                    PortWorkerStub::new(port_a.clone(), bus_a).unwrap()
1102                ))
1103                .unwrap()
1104        );
1105        assert_eq!(
1106            WorkerResponse::AddPortOk,
1107            client
1108                .send_worker_command(WorkerCommand::AddPort(
1109                    upstream_addr_b,
1110                    PortWorkerStub::new(port_b.clone(), bus_b).unwrap()
1111                ))
1112                .unwrap()
1113        );
1114        assert_eq!(
1115            WorkerResponse::AddPortOk,
1116            client
1117                .send_worker_command(WorkerCommand::AddPort(
1118                    upstream_addr_c,
1119                    PortWorkerStub::new(port_c.clone(), bus_c).unwrap()
1120                ))
1121                .unwrap()
1122        );
1123        port_a.lock().signal_ready();
1124        assert!(poll_until_with_timeout(
1125            || client
1126                .send_worker_command(WorkerCommand::GetPortState(upstream_addr_a))
1127                .unwrap()
1128                == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1129            Duration::from_millis(500)
1130        ));
1131        port_b.lock().signal_ready();
1132        assert!(poll_until_with_timeout(
1133            || client
1134                .send_worker_command(WorkerCommand::GetPortState(upstream_addr_b))
1135                .unwrap()
1136                == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1137            Duration::from_millis(500)
1138        ));
1139        port_c.lock().signal_ready();
1140        assert!(poll_until_with_timeout(
1141            || client
1142                .send_worker_command(WorkerCommand::GetPortState(upstream_addr_c))
1143                .unwrap()
1144                == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1145            Duration::from_millis(500)
1146        ));
1147        // All ports empty and in sync. Should get port B.
1148        assert_eq!(
1149            WorkerResponse::GetEmptyPortOk(upstream_addr_b),
1150            client
1151                .send_worker_command(WorkerCommand::GetEmptyPort)
1152                .unwrap()
1153        );
1154        assert_eq!(
1155            WorkerResponse::SignalOk,
1156            client
1157                .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command_b))
1158                .unwrap()
1159        );
1160        // Should get port A.
1161        assert_eq!(
1162            WorkerResponse::GetEmptyPortOk(upstream_addr_a),
1163            client
1164                .send_worker_command(WorkerCommand::GetEmptyPort)
1165                .unwrap()
1166        );
1167        assert_eq!(
1168            WorkerResponse::SignalOk,
1169            client
1170                .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command_a))
1171                .unwrap()
1172        );
1173        // Should get port C.
1174        assert_eq!(
1175            WorkerResponse::GetEmptyPortOk(upstream_addr_c),
1176            client
1177                .send_worker_command(WorkerCommand::GetEmptyPort)
1178                .unwrap()
1179        );
1180        assert_eq!(
1181            WorkerResponse::SignalOk,
1182            client
1183                .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command_c))
1184                .unwrap()
1185        );
1186        // Should get an error since no port is empty.
1187        if let WorkerResponse::InvalidCommand(_) = client
1188            .send_worker_command(WorkerCommand::GetEmptyPort)
1189            .unwrap()
1190        {
1191            // Assert result is of Error type.
1192        } else {
1193            unreachable!();
1194        }
1195        // Remove device from port A, immediately it should be available.
1196        assert_eq!(
1197            WorkerResponse::SignalOk,
1198            client
1199                .send_worker_command(WorkerCommand::SignalHotUnplug(upstream_addr_a))
1200                .unwrap()
1201        );
1202        assert_eq!(
1203            WorkerResponse::GetEmptyPortOk(upstream_addr_a),
1204            client
1205                .send_worker_command(WorkerCommand::GetEmptyPort)
1206                .unwrap()
1207        );
1208        // Moreover, it should be 2 steps behind.
1209        assert_eq!(
1210            WorkerResponse::GetPortStateOk(PortState::Empty(2)),
1211            client
1212                .send_worker_command(WorkerCommand::GetPortState(upstream_addr_a))
1213                .unwrap()
1214        );
1215    }
1216
1217    #[test]
1218    fn worker_port_state_transitions() {
1219        let (rootbus_controller, _rootbus_recvr) = mpsc::channel();
1220        let client = WorkerClient::new(rootbus_controller).unwrap();
1221        let upstream_addr = PciAddress {
1222            bus: 0,
1223            dev: 1,
1224            func: 1,
1225        };
1226        let bus = 2;
1227        let downstream_addr = PciAddress {
1228            bus,
1229            dev: 0,
1230            func: 0,
1231        };
1232        let hotplug_key = HotPlugKey::GuestDevice {
1233            guest_addr: downstream_addr,
1234        };
1235        let device = GuestDeviceStub {
1236            pci_addr: downstream_addr,
1237            key: hotplug_key,
1238            device: Arc::new(Mutex::new(MockDevice::new())),
1239        };
1240        let hotplug_command = SignalHotPlugCommand::new(upstream_addr, [device].to_vec()).unwrap();
1241        let port = new_port(bus);
1242        assert_eq!(
1243            WorkerResponse::AddPortOk,
1244            client
1245                .send_worker_command(WorkerCommand::AddPort(
1246                    upstream_addr,
1247                    PortWorkerStub::new(port.clone(), bus).unwrap()
1248                ))
1249                .unwrap()
1250        );
1251        port.lock().signal_ready();
1252        assert!(poll_until_with_timeout(
1253            || client
1254                .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1255                .unwrap()
1256                == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1257            Duration::from_millis(500)
1258        ));
1259        assert_eq!(
1260            WorkerResponse::SignalOk,
1261            client
1262                .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command.clone()))
1263                .unwrap()
1264        );
1265        assert!(poll_until_with_timeout(
1266            || client
1267                .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1268                .unwrap()
1269                == WorkerResponse::GetPortStateOk(PortState::Occupied(1)),
1270            Duration::from_millis(500)
1271        ));
1272        assert_eq!(
1273            WorkerResponse::SignalOk,
1274            client
1275                .send_worker_command(WorkerCommand::SignalHotUnplug(upstream_addr))
1276                .unwrap()
1277        );
1278        assert!(poll_until_with_timeout(
1279            || client
1280                .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1281                .unwrap()
1282                == WorkerResponse::GetPortStateOk(PortState::Empty(2)),
1283            Duration::from_millis(500)
1284        ));
1285        assert_eq!(
1286            WorkerResponse::SignalOk,
1287            client
1288                .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command.clone()))
1289                .unwrap()
1290        );
1291        assert!(poll_until_with_timeout(
1292            || client
1293                .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1294                .unwrap()
1295                == WorkerResponse::GetPortStateOk(PortState::Occupied(3)),
1296            Duration::from_millis(500)
1297        ));
1298        port.lock().signal_cc();
1299        assert!(poll_until_with_timeout(
1300            || client
1301                .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1302                .unwrap()
1303                == WorkerResponse::GetPortStateOk(PortState::Occupied(2)),
1304            Duration::from_millis(500)
1305        ));
1306        assert_eq!(
1307            WorkerResponse::SignalOk,
1308            client
1309                .send_worker_command(WorkerCommand::SignalHotUnplug(upstream_addr))
1310                .unwrap()
1311        );
1312        // Moves from Occupied(2) to Empty(1) since it is redundant to unplug a device that is yet
1313        // to be plugged in.
1314        assert!(poll_until_with_timeout(
1315            || client
1316                .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1317                .unwrap()
1318                == WorkerResponse::GetPortStateOk(PortState::Empty(1)),
1319            Duration::from_millis(500)
1320        ));
1321        port.lock().signal_cc();
1322        assert!(poll_until_with_timeout(
1323            || client
1324                .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1325                .unwrap()
1326                == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1327            Duration::from_millis(500)
1328        ));
1329    }
1330
1331    #[test]
1332    fn worker_port_early_plug_state_transitions() {
1333        let (rootbus_controller, _rootbus_recvr) = mpsc::channel();
1334        let client = WorkerClient::new(rootbus_controller).unwrap();
1335        let upstream_addr = PciAddress {
1336            bus: 0,
1337            dev: 1,
1338            func: 1,
1339        };
1340        let bus = 2;
1341        let downstream_addr = PciAddress {
1342            bus,
1343            dev: 0,
1344            func: 0,
1345        };
1346        let hotplug_key = HotPlugKey::GuestDevice {
1347            guest_addr: downstream_addr,
1348        };
1349        let device = GuestDeviceStub {
1350            pci_addr: downstream_addr,
1351            key: hotplug_key,
1352            device: Arc::new(Mutex::new(MockDevice::new())),
1353        };
1354        let hotplug_command = SignalHotPlugCommand::new(upstream_addr, [device].to_vec()).unwrap();
1355        let port = new_port(bus);
1356        assert_eq!(
1357            WorkerResponse::AddPortOk,
1358            client
1359                .send_worker_command(WorkerCommand::AddPort(
1360                    upstream_addr,
1361                    PortWorkerStub::new(port.clone(), bus).unwrap()
1362                ))
1363                .unwrap()
1364        );
1365        assert!(poll_until_with_timeout(
1366            || client
1367                .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1368                .unwrap()
1369                == WorkerResponse::GetPortStateOk(PortState::EmptyNotReady),
1370            Duration::from_millis(500)
1371        ));
1372        assert_eq!(
1373            WorkerResponse::SignalOk,
1374            client
1375                .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command.clone()))
1376                .unwrap()
1377        );
1378        assert!(poll_until_with_timeout(
1379            || client
1380                .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1381                .unwrap()
1382                == WorkerResponse::GetPortStateOk(PortState::OccupiedNotReady),
1383            Duration::from_millis(500)
1384        ));
1385        port.lock().signal_ready();
1386        assert!(poll_until_with_timeout(
1387            || client
1388                .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1389                .unwrap()
1390                == WorkerResponse::GetPortStateOk(PortState::Occupied(1)),
1391            Duration::from_millis(500)
1392        ));
1393        port.lock().signal_cc();
1394        assert!(poll_until_with_timeout(
1395            || client
1396                .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1397                .unwrap()
1398                == WorkerResponse::GetPortStateOk(PortState::Occupied(0)),
1399            Duration::from_millis(500)
1400        ));
1401    }
1402}