swap/
controller.rs

1// Copyright 2022 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! crate for the vmm-swap feature.
6
7#![deny(missing_docs)]
8
9use std::fs::File;
10use std::fs::OpenOptions;
11use std::io::stderr;
12use std::io::stdout;
13use std::ops::Range;
14use std::os::unix::fs::OpenOptionsExt;
15use std::path::Path;
16use std::sync::LazyLock;
17use std::thread::Scope;
18use std::thread::ScopedJoinHandle;
19use std::time::Duration;
20use std::time::Instant;
21
22use anyhow::bail;
23use anyhow::Context;
24use base::debug;
25use base::error;
26use base::info;
27use base::linux::FileDataIterator;
28use base::syslog;
29use base::warn;
30use base::AsRawDescriptor;
31use base::AsRawDescriptors;
32use base::EventToken;
33use base::RawDescriptor;
34use base::SendTube;
35use base::SharedMemory;
36use base::Tube;
37use base::TubeError;
38use base::WaitContext;
39use jail::create_base_minijail;
40use jail::create_sandbox_minijail;
41use jail::fork::fork_process;
42use jail::fork::Child;
43use jail::JailConfig;
44use jail::SandboxConfig;
45use jail::MAX_OPEN_FILES_DEFAULT;
46use serde::Deserialize;
47use serde::Serialize;
48use sync::Mutex;
49use vm_memory::GuestMemory;
50
51use crate::file_truncator::FileTruncator;
52use crate::page_handler::Error as PageHandlerError;
53use crate::page_handler::MoveToStaging;
54use crate::page_handler::PageHandler;
55use crate::page_handler::MLOCK_BUDGET;
56use crate::pagesize::bytes_to_pages;
57use crate::pagesize::THP_SIZE;
58use crate::processes::freeze_child_processes;
59use crate::processes::ProcessesGuard;
60use crate::uffd_list::Token as UffdListToken;
61use crate::uffd_list::UffdList;
62use crate::userfaultfd::register_regions;
63use crate::userfaultfd::unregister_regions;
64use crate::userfaultfd::DeadUffdCheckerImpl;
65use crate::userfaultfd::Error as UffdError;
66use crate::userfaultfd::Factory as UffdFactory;
67use crate::userfaultfd::UffdEvent;
68use crate::userfaultfd::Userfaultfd;
69use crate::worker::BackgroundJobControl;
70use crate::worker::Worker;
71use crate::SwapMetrics;
72use crate::SwapState;
73use crate::SwapStateTransition;
74use crate::SwapStatus;
75
76/// The max size of chunks to swap out/in at once.
77const MAX_SWAP_CHUNK_SIZE: usize = 2 * 1024 * 1024; // = 2MB
78/// The max pages to trim at once.
79const MAX_TRIM_PAGES: usize = 1024;
80
81/// Returns count of pages active on the guest memory.
82fn count_resident_pages(guest_memory: &GuestMemory) -> usize {
83    let mut pages = 0;
84    for region in guest_memory.regions() {
85        let mut resident_bytes = 0u64;
86        for range in FileDataIterator::new(region.shm, region.shm_offset, region.size as u64) {
87            let range = match range {
88                Ok(r) => r,
89                Err(e) => {
90                    error!("failed to iterate data ranges: {e:?}");
91                    return 0;
92                }
93            };
94            resident_bytes += range.end - range.start;
95        }
96        let resident_bytes = match resident_bytes.try_into() {
97            Ok(n) => n,
98            Err(e) => {
99                error!("failed to load resident pages count: {:?}", e);
100                return 0;
101            }
102        };
103
104        pages += bytes_to_pages(resident_bytes);
105    }
106    pages
107}
108
109/// Commands used in vmm-swap feature internally sent to the monitor process from the main and other
110/// processes.
111///
112/// This is mainly originated from the `crosvm swap <command>` command line.
113#[derive(Serialize, Deserialize)]
114enum Command {
115    Enable,
116    Trim,
117    SwapOut,
118    Disable {
119        slow_file_cleanup: bool,
120    },
121    Exit,
122    Status,
123    ProcessForked {
124        #[serde(with = "base::with_as_descriptor")]
125        uffd: Userfaultfd,
126        reply_tube: Tube,
127    },
128    StaticDeviceSetupComplete(u32),
129}
130
131/// [SwapController] provides APIs to control vmm-swap.
132pub struct SwapController {
133    child_process: Option<Child>,
134    uffd_factory: UffdFactory,
135    command_tube: Tube,
136    num_static_devices: u32,
137    // Keep 1 page dummy mmap in the main process to make it present in all the descendant
138    // processes.
139    _dead_uffd_checker: DeadUffdCheckerImpl,
140    // Keep the cloned [GuestMemory] in the main process not to free it before the monitor process
141    // exits.
142    _guest_memory: GuestMemory,
143}
144
145impl SwapController {
146    /// Launch a monitor process for vmm-swap and return a controller.
147    ///
148    /// Pages on the [GuestMemory] are registered to userfaultfd to track pagefault events.
149    ///
150    /// # Arguments
151    ///
152    /// * `guest_memory` - fresh new [GuestMemory]. Any pages on the [GuestMemory] must not be
153    ///   touched.
154    /// * `swap_dir` - directory to store swap files.
155    pub fn launch(
156        guest_memory: GuestMemory,
157        swap_dir: &Path,
158        jail_config: Option<&JailConfig>,
159    ) -> anyhow::Result<Self> {
160        info!("vmm-swap is enabled. launch monitor process.");
161
162        let preserved_guest_memory = guest_memory.clone();
163
164        let uffd_factory = UffdFactory::new();
165        let uffd = uffd_factory.create().context("create userfaultfd")?;
166
167        // The swap file is created as `O_TMPFILE` from the specified directory. As benefits:
168        //
169        // * it has no chance to conflict.
170        // * it has a security benefit that no one (except root) can access the swap file.
171        // * it will be automatically deleted by the kernel when crosvm exits/dies or on reboot if
172        //   the device panics/hard-resets while crosvm is running.
173        let swap_file = OpenOptions::new()
174            .read(true)
175            .write(true)
176            .custom_flags(libc::O_TMPFILE | libc::O_EXCL)
177            .mode(0o000) // other processes with the same uid can't open the file
178            .open(swap_dir)?;
179        // The internal tube in which [Command]s sent from other processes than the monitor process
180        // to the monitor process. The response is `Status` only.
181        let (command_tube_main, command_tube_monitor) =
182            Tube::pair().context("create swap command tube")?;
183
184        // Allocate eventfd before creating sandbox.
185        let bg_job_control = BackgroundJobControl::new().context("create background job event")?;
186
187        let dead_uffd_checker = DeadUffdCheckerImpl::new().context("create dead uffd checker")?;
188
189        let mut keep_rds = vec![
190            stdout().as_raw_descriptor(),
191            stderr().as_raw_descriptor(),
192            uffd.as_raw_descriptor(),
193            swap_file.as_raw_descriptor(),
194            command_tube_monitor.as_raw_descriptor(),
195            bg_job_control.get_completion_event().as_raw_descriptor(),
196        ];
197
198        syslog::push_descriptors(&mut keep_rds);
199        cros_tracing::push_descriptors!(&mut keep_rds);
200        metrics::push_descriptors(&mut keep_rds);
201        keep_rds.extend(guest_memory.as_raw_descriptors());
202
203        keep_rds.extend(uffd_factory.as_raw_descriptors());
204
205        // Load and cache transparent hugepage size from sysfs before jumping into sandbox.
206        LazyLock::force(&THP_SIZE);
207
208        let mut jail = if let Some(jail_config) = jail_config {
209            let config = SandboxConfig::new(jail_config, "swap_monitor");
210            create_sandbox_minijail(&jail_config.pivot_root, MAX_OPEN_FILES_DEFAULT, &config)
211                .context("create sandbox jail")?
212        } else {
213            create_base_minijail(Path::new("/"), MAX_OPEN_FILES_DEFAULT)
214                .context("create minijail")?
215        };
216        jail.set_rlimit(
217            libc::RLIMIT_MEMLOCK as libc::c_int,
218            MLOCK_BUDGET as u64,
219            MLOCK_BUDGET as u64,
220        )
221        .context("error setting RLIMIT_MEMLOCK")?;
222
223        // Start a page fault monitoring process (this will be the first child process of the
224        // current process)
225        let child_process =
226            fork_process(jail, keep_rds, Some(String::from("swap monitor")), || {
227                if let Err(e) = monitor_process(
228                    command_tube_monitor,
229                    guest_memory,
230                    uffd,
231                    swap_file,
232                    bg_job_control,
233                    &dead_uffd_checker,
234                ) {
235                    if let Some(PageHandlerError::Userfaultfd(UffdError::UffdClosed)) =
236                        e.downcast_ref::<PageHandlerError>()
237                    {
238                        // Userfaultfd can cause UffdError::UffdClosed if the main process
239                        // unexpectedly while it is swapping in. This is not a bug of swap monitor,
240                        // but the other feature on the main process.
241                        // Note that UffdError::UffdClosed from other processes than the main
242                        // process are derived from PageHandler::handle_page_fault() only and
243                        // handled in the loop of handle_vmm_swap().
244                        error!(
245                            "page_fault_handler_thread exited with userfaultfd closed error: {:#}",
246                            e
247                        );
248                    } else if e.is::<TubeError>() {
249                        // Tube can cause TubeError if the main process unexpectedly dies. This is
250                        // not a bug of swap monitor, but the other feature on the main process.
251                        // Even if the tube itself is broken and the main process is alive, the main
252                        // process catch that the swap monitor process exits unexpectedly and
253                        // terminates itself.
254                        error!("page_fault_handler_thread exited with tube error: {:#}", e);
255                    } else {
256                        panic!("page_fault_handler_thread exited with error: {e:#}");
257                    }
258                }
259            })
260            .context("fork monitor process")?;
261
262        // send first status request to the monitor process and wait for the response until setup on
263        // the monitor process completes.
264        command_tube_main.send(&Command::Status)?;
265        match command_tube_main
266            .recv::<SwapStatus>()
267            .context("recv initial status")?
268            .state
269        {
270            SwapState::Ready => {
271                // The initial state of swap status is Ready and this is a signal that the
272                // monitoring process completes setup and is running.
273            }
274            status => {
275                bail!("initial state is not Ready, but {:?}", status);
276            }
277        };
278
279        Ok(Self {
280            child_process: Some(child_process),
281            uffd_factory,
282            command_tube: command_tube_main,
283            num_static_devices: 0,
284            _dead_uffd_checker: dead_uffd_checker,
285            _guest_memory: preserved_guest_memory,
286        })
287    }
288
289    /// Enable monitoring page faults and move guest memory to staging memory.
290    ///
291    /// The pages will be swapped in from the staging memory to the guest memory on page faults
292    /// until pages are written into the swap file by [Self::swap_out()].
293    ///
294    /// This waits until enabling vmm-swap finishes on the monitor process.
295    ///
296    /// The caller must guarantee that any contents on the guest memory is not updated during
297    /// enabling vmm-swap.
298    ///
299    /// # Note
300    ///
301    /// Enabling does not write pages to the swap file. User should call [Self::swap_out()]
302    /// after a suitable time.
303    ///
304    /// Just after enabling vmm-swap, some amount of pages are swapped in as soon as guest resumes.
305    /// By splitting the enable/swap_out operation and by delaying write to the swap file operation,
306    /// it has a benefit of reducing file I/O for hot pages.
307    pub fn enable(&self) -> anyhow::Result<()> {
308        self.command_tube
309            .send(&Command::Enable)
310            .context("send swap enable request")?;
311
312        let _ = self
313            .command_tube
314            .recv::<SwapStatus>()
315            .context("receive swap status")?;
316        Ok(())
317    }
318
319    /// Trim pages in the staging memory which are needless to be written back to the swap file.
320    ///
321    /// * zero pages
322    /// * pages which are the same as the pages in the swap file.
323    pub fn trim(&self) -> anyhow::Result<()> {
324        self.command_tube
325            .send(&Command::Trim)
326            .context("send swap trim request")?;
327        Ok(())
328    }
329
330    /// Swap out all the pages in the staging memory to the swap files.
331    ///
332    /// This returns as soon as it succeeds to send request to the monitor process.
333    ///
334    /// Users should call [Self::enable()] before this. See the comment of [Self::enable()] as well.
335    pub fn swap_out(&self) -> anyhow::Result<()> {
336        self.command_tube
337            .send(&Command::SwapOut)
338            .context("send swap out request")?;
339        Ok(())
340    }
341
342    /// Swap in all the guest memory and disable monitoring page faults.
343    ///
344    /// This returns as soon as it succeeds to send request to the monitor process.
345    pub fn disable(&self, slow_file_cleanup: bool) -> anyhow::Result<()> {
346        self.command_tube
347            .send(&Command::Disable { slow_file_cleanup })
348            .context("send swap disable request")?;
349        Ok(())
350    }
351
352    /// Return current swap status.
353    ///
354    /// This blocks until response from the monitor process arrives to the main process.
355    pub fn status(&self) -> anyhow::Result<SwapStatus> {
356        self.command_tube
357            .send(&Command::Status)
358            .context("send swap status request")?;
359        let status = self.command_tube.recv().context("receive swap status")?;
360        Ok(status)
361    }
362
363    /// Suspend device processes using `SIGSTOP` signal.
364    ///
365    /// When the returned `ProcessesGuard` is dropped, the devices resume.
366    ///
367    /// This must be called from the main process.
368    pub fn suspend_devices(&self) -> anyhow::Result<ProcessesGuard> {
369        // child_process become none on dropping SwapController.
370        freeze_child_processes(
371            self.child_process
372                .as_ref()
373                .expect("monitor process not exist")
374                .pid,
375        )
376    }
377
378    /// Notify the monitor process that all static devices are forked.
379    ///
380    /// Devices forked after this call are treated as dynamic devices which can die (e.g. hotplug
381    /// devices).
382    pub fn on_static_devices_setup_complete(&self) -> anyhow::Result<()> {
383        // This sends the number of static devices counted on the main process because device
384        // initializations are executed on child processes asynchronously.
385        self.command_tube
386            .send(&Command::StaticDeviceSetupComplete(self.num_static_devices))
387            .context("send command")
388    }
389
390    /// Create [SwapDeviceHelper].
391    pub fn create_device_helper(&self) -> anyhow::Result<SwapDeviceHelper> {
392        let uffd_factory = self
393            .uffd_factory
394            .try_clone()
395            .context("try clone uffd factory")?;
396        let command_tube = self
397            .command_tube
398            .try_clone_send_tube()
399            .context("try clone tube")?;
400        Ok(SwapDeviceHelper {
401            uffd_factory,
402            command_tube,
403        })
404    }
405}
406
407impl Drop for SwapController {
408    fn drop(&mut self) {
409        // Shutdown the monitor process.
410        // This blocks until the monitor process exits.
411        if let Err(e) = self.command_tube.send(&Command::Exit) {
412            error!(
413                "failed to sent exit command to vmm-swap monitor process: {:#}",
414                e
415            );
416            return;
417        }
418        if let Err(e) = self
419            .child_process
420            .take()
421            .expect("monitor process not exist")
422            .wait()
423        {
424            error!("failed to wait vmm-swap monitor process shutdown: {:#}", e);
425        }
426    }
427}
428
429/// Create a new [SwapDeviceUffdSender] which is passed to the forked child process.
430pub trait PrepareFork {
431    /// Create a new [SwapDeviceUffdSender].
432    fn prepare_fork(&mut self) -> anyhow::Result<SwapDeviceUffdSender>;
433}
434
435impl PrepareFork for SwapController {
436    /// Create a new [SwapDeviceUffdSender].
437    ///
438    /// This should be called from the main process because creating a [Tube]s requires seccomp
439    /// policy.
440    ///
441    /// This also counts the number of static devices which are created before booting.
442    fn prepare_fork(&mut self) -> anyhow::Result<SwapDeviceUffdSender> {
443        let command_tube = self
444            .command_tube
445            .try_clone_send_tube()
446            .context("try clone tube")?;
447        self.num_static_devices += 1;
448        SwapDeviceUffdSender::new(command_tube, &self.uffd_factory)
449    }
450}
451
452/// Helper to create [SwapDeviceUffdSender] from child processes (e.g. JailWarden for hotplug
453/// devices).
454pub struct SwapDeviceHelper {
455    uffd_factory: UffdFactory,
456    command_tube: SendTube,
457}
458
459impl PrepareFork for SwapDeviceHelper {
460    /// Create a new [SwapDeviceUffdSender].
461    fn prepare_fork(&mut self) -> anyhow::Result<SwapDeviceUffdSender> {
462        let command_tube = self.command_tube.try_clone().context("try clone tube")?;
463        SwapDeviceUffdSender::new(command_tube, &self.uffd_factory)
464    }
465}
466
467impl AsRawDescriptors for SwapDeviceHelper {
468    fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
469        let mut rds = self.uffd_factory.as_raw_descriptors();
470        rds.push(self.command_tube.as_raw_descriptor());
471        rds
472    }
473}
474
475/// Create a new userfaultfd and send it to the monitor process.
476pub struct SwapDeviceUffdSender {
477    uffd_factory: UffdFactory,
478    command_tube: SendTube,
479    sender: Tube,
480    receiver: Tube,
481}
482
483impl SwapDeviceUffdSender {
484    fn new(command_tube: SendTube, uffd_factory: &UffdFactory) -> anyhow::Result<Self> {
485        let uffd_factory = uffd_factory.try_clone().context("try clone uffd factory")?;
486        let (sender, receiver) = Tube::pair().context("create tube")?;
487        receiver
488            .set_recv_timeout(Some(Duration::from_secs(60)))
489            .context("set recv timeout")?;
490        Ok(SwapDeviceUffdSender {
491            uffd_factory,
492            command_tube,
493            sender,
494            receiver,
495        })
496    }
497
498    /// Create a new userfaultfd and send it to the monitor process.
499    ///
500    /// This must be called as soon as a child process which may touch the guest memory is forked.
501    ///
502    /// Userfaultfd(2) originally has `UFFD_FEATURE_EVENT_FORK`. But it is not applicable to crosvm
503    /// since it does not support non-root user namespace.
504    pub fn on_process_forked(self) -> anyhow::Result<()> {
505        let uffd = self.uffd_factory.create().context("create userfaultfd")?;
506        // The fd for Userfaultfd in this process is dropped when it is sent via Tube, but the
507        // userfaultfd keeps alive in the monitor process which it is sent to.
508        self.command_tube
509            .send(&Command::ProcessForked {
510                uffd,
511                reply_tube: self.sender,
512            })
513            .context("send forked event")?;
514        // Wait to proceeds the child process logic until the userfaultfd is set up.
515        if !self.receiver.recv::<bool>().context("recv tube")? {
516            bail!("failed to register a new userfaultfd");
517        }
518        Ok(())
519    }
520}
521
522impl AsRawDescriptors for SwapDeviceUffdSender {
523    fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
524        let mut rds = self.uffd_factory.as_raw_descriptors();
525        rds.push(self.command_tube.as_raw_descriptor());
526        rds.push(self.sender.as_raw_descriptor());
527        rds.push(self.receiver.as_raw_descriptor());
528        rds
529    }
530}
531
532#[derive(EventToken, Clone, Copy)]
533enum Token {
534    UffdEvents(u32),
535    Command,
536    BackgroundJobCompleted,
537}
538
539impl UffdListToken for Token {
540    fn uffd_token(idx: u32) -> Self {
541        Token::UffdEvents(idx)
542    }
543}
544
545fn regions_from_guest_memory(guest_memory: &GuestMemory) -> Vec<Range<usize>> {
546    guest_memory
547        .regions()
548        .map(|region| region.host_addr..(region.host_addr + region.size))
549        .collect()
550}
551
552/// The main thread of the monitor process.
553fn monitor_process(
554    command_tube: Tube,
555    guest_memory: GuestMemory,
556    uffd: Userfaultfd,
557    swap_file: File,
558    bg_job_control: BackgroundJobControl,
559    dead_uffd_checker: &DeadUffdCheckerImpl,
560) -> anyhow::Result<()> {
561    info!("monitor_process started");
562
563    let wait_ctx = WaitContext::build_with(&[
564        (&command_tube, Token::Command),
565        (
566            bg_job_control.get_completion_event(),
567            Token::BackgroundJobCompleted,
568        ),
569    ])
570    .context("create wait context")?;
571
572    let mut swap_file_opt = Some(swap_file);
573    let mut truncate_worker: Option<FileTruncator> = None;
574
575    let n_worker = num_cpus::get();
576    info!("start {} workers for staging memory move", n_worker);
577    // The worker threads are killed when the main thread of the monitor process dies.
578    let worker = Worker::new(n_worker, n_worker);
579
580    let mut uffd_list =
581        UffdList::new(uffd, dead_uffd_checker, &wait_ctx).context("create uffd list")?;
582    let mut state_transition = SwapStateTransition::default();
583    let mut try_gc_uffds = false;
584
585    loop {
586        let events = wait_ctx.wait().context("wait poll events")?;
587
588        for event in events.iter() {
589            match event.token {
590                Token::UffdEvents(id_uffd) => {
591                    let uffd = uffd_list
592                        .get(id_uffd)
593                        .with_context(|| format!("uffd is not found for idx: {id_uffd}"))?;
594                    // Userfaultfd does not work as level triggered but as edge triggered. We need
595                    // to read all the events in the userfaultfd here.
596                    while let Some(event) = uffd.read_event().context("read userfaultfd event")? {
597                        match event {
598                            UffdEvent::Remove { .. } => {
599                                // BUG(b/272620051): This is a bug of userfaultfd that
600                                // UFFD_EVENT_REMOVE can be read even after unregistering memory
601                                // from the userfaultfd.
602                                warn!("page remove event while vmm-swap disabled");
603                            }
604                            event => {
605                                bail!("unexpected uffd event: {:?}", event);
606                            }
607                        }
608                    }
609                }
610                Token::Command => match command_tube
611                    .recv::<Command>()
612                    .context("recv swap command")?
613                {
614                    Command::ProcessForked { uffd, reply_tube } => {
615                        debug!("new fork uffd: {:?}", uffd);
616                        let result = match uffd_list.register(uffd) {
617                            Ok(is_dynamic_uffd) => {
618                                try_gc_uffds = is_dynamic_uffd;
619                                true
620                            }
621                            Err(e) => {
622                                error!("failed to register uffd to list: {:?}", e);
623                                false
624                            }
625                        };
626                        if let Err(e) = reply_tube.send(&result) {
627                            error!("failed to response to new process: {:?}", e);
628                        }
629                    }
630                    Command::StaticDeviceSetupComplete(num_static_devices) => {
631                        info!("static device setup complete: n={}", num_static_devices);
632                        if !uffd_list.set_num_static_devices(num_static_devices) {
633                            bail!("failed to set num_static_devices");
634                        }
635                    }
636                    Command::Enable => {
637                        info!("enabling vmm-swap");
638
639                        let staging_shmem =
640                            SharedMemory::new("swap staging memory", guest_memory.memory_size())
641                                .context("create staging shmem")?;
642
643                        let regions = regions_from_guest_memory(&guest_memory);
644
645                        let swap_file = match (swap_file_opt.take(), truncate_worker.take()) {
646                            (Some(file), None) => file,
647                            (None, Some(worker)) => {
648                                worker.take_file().context("failed to get truncated swap")?
649                            }
650                            _ => bail!("Missing swap file"),
651                        };
652
653                        let page_handler = match PageHandler::create(
654                            &swap_file,
655                            &staging_shmem,
656                            &regions,
657                            worker.channel.clone(),
658                        ) {
659                            Ok(page_handler) => page_handler,
660                            Err(e) => {
661                                error!("failed to create swap handler: {:?}", e);
662                                continue;
663                            }
664                        };
665
666                        // TODO(b/272634283): Should just disable vmm-swap without crash.
667                        // SAFETY:
668                        // Safe because the regions are from guest memory and uffd_list contains all
669                        // the processes of crosvm.
670                        unsafe { register_regions(&regions, uffd_list.get_list()) }
671                            .context("register regions")?;
672
673                        // events may contain unprocessed entries, but those pending events will be
674                        // immediately re-created when handle_vmm_swap checks wait_ctx because
675                        // WaitContext is level triggered.
676                        drop(events);
677
678                        let mutex_transition = Mutex::new(state_transition);
679
680                        bg_job_control.reset()?;
681                        let swap_result = std::thread::scope(|scope| {
682                            let result = handle_vmm_swap(
683                                scope,
684                                &wait_ctx,
685                                &page_handler,
686                                &mut uffd_list,
687                                &guest_memory,
688                                &regions,
689                                &command_tube,
690                                &worker,
691                                &mutex_transition,
692                                &bg_job_control,
693                            );
694                            // Abort background jobs to unblock ScopedJoinHandle eariler on a
695                            // failure.
696                            bg_job_control.abort();
697                            result
698                        })?;
699                        if swap_result.should_exit {
700                            return Ok(());
701                        }
702                        state_transition = mutex_transition.into_inner();
703
704                        unregister_regions(&regions, uffd_list.get_list())
705                            .context("unregister regions")?;
706
707                        // Truncate the swap file to hold minimum resources while disabled.
708                        if swap_result.slow_file_cleanup {
709                            truncate_worker = Some(
710                                FileTruncator::new(swap_file)
711                                    .context("failed to start truncating")?,
712                            );
713                        } else {
714                            if let Err(e) = swap_file.set_len(0) {
715                                error!("failed to clear swap file: {:?}", e);
716                            };
717                            swap_file_opt = Some(swap_file);
718                        }
719
720                        info!("vmm-swap is disabled");
721                        // events are obsolete. Run `WaitContext::wait()` again
722                        break;
723                    }
724                    Command::Trim => {
725                        warn!("swap trim while disabled");
726                    }
727                    Command::SwapOut => {
728                        warn!("swap out while disabled");
729                    }
730                    Command::Disable { slow_file_cleanup } => {
731                        if !slow_file_cleanup {
732                            if let Some(worker) = truncate_worker.take() {
733                                swap_file_opt =
734                                    Some(worker.take_file().context("failed to truncate swap")?);
735                            }
736                        }
737                    }
738                    Command::Exit => {
739                        return Ok(());
740                    }
741                    Command::Status => {
742                        let metrics = SwapMetrics {
743                            resident_pages: count_resident_pages(&guest_memory) as u64,
744                            ..Default::default()
745                        };
746                        let status = SwapStatus {
747                            state: SwapState::Ready,
748                            metrics,
749                            state_transition,
750                        };
751                        command_tube.send(&status).context("send status response")?;
752                        debug!("swap status: {:?}", status);
753                    }
754                },
755                Token::BackgroundJobCompleted => {
756                    error!("unexpected background job completed event while swap is disabled");
757                    bg_job_control.reset()?;
758                }
759            };
760        }
761        if try_gc_uffds {
762            uffd_list.gc_dead_uffds().context("gc dead uffds")?;
763            try_gc_uffds = false;
764        }
765    }
766}
767
768enum State<'scope> {
769    SwapOutPending,
770    Trim(ScopedJoinHandle<'scope, anyhow::Result<()>>),
771    SwapOutInProgress {
772        started_time: Instant,
773    },
774    SwapOutCompleted,
775    SwapInInProgress {
776        join_handle: ScopedJoinHandle<'scope, anyhow::Result<()>>,
777        slow_file_cleanup: bool,
778    },
779    Failed,
780}
781
782impl From<&State<'_>> for SwapState {
783    fn from(state: &State<'_>) -> Self {
784        match state {
785            State::SwapOutPending => SwapState::Pending,
786            State::Trim(_) => SwapState::TrimInProgress,
787            State::SwapOutInProgress { .. } => SwapState::SwapOutInProgress,
788            State::SwapOutCompleted => SwapState::Active,
789            State::SwapInInProgress { .. } => SwapState::SwapInInProgress,
790            State::Failed => SwapState::Failed,
791        }
792    }
793}
794
795fn handle_enable_command<'scope>(
796    state: State,
797    bg_job_control: &BackgroundJobControl,
798    page_handler: &PageHandler,
799    guest_memory: &GuestMemory,
800    worker: &Worker<MoveToStaging>,
801    state_transition: &Mutex<SwapStateTransition>,
802) -> anyhow::Result<State<'scope>> {
803    match state {
804        State::SwapInInProgress { join_handle, .. } => {
805            info!("abort swap-in");
806            abort_background_job(join_handle, bg_job_control).context("abort swap-in")?;
807        }
808        State::Trim(join_handle) => {
809            info!("abort trim");
810            abort_background_job(join_handle, bg_job_control).context("abort trim")?;
811        }
812        _ => {}
813    }
814
815    info!("start moving memory to staging");
816    match move_guest_to_staging(page_handler, guest_memory, worker) {
817        Ok(new_state_transition) => {
818            info!(
819                "move {} pages to staging in {} ms",
820                new_state_transition.pages, new_state_transition.time_ms
821            );
822            *state_transition.lock() = new_state_transition;
823            Ok(State::SwapOutPending)
824        }
825        Err(e) => {
826            error!("failed to move memory to staging: {}", e);
827            *state_transition.lock() = SwapStateTransition::default();
828            Ok(State::Failed)
829        }
830    }
831}
832
833fn move_guest_to_staging(
834    page_handler: &PageHandler,
835    guest_memory: &GuestMemory,
836    worker: &Worker<MoveToStaging>,
837) -> anyhow::Result<SwapStateTransition> {
838    let start_time = std::time::Instant::now();
839
840    let mut pages = 0;
841
842    let result = guest_memory.regions().try_for_each(|region| {
843        // SAFETY:
844        // safe because:
845        // * all the regions are registered to all userfaultfd
846        // * no process access the guest memory
847        // * page fault events are handled by PageHandler
848        // * wait for all the copy completed within _processes_guard
849        pages += unsafe {
850            page_handler.move_to_staging(region.host_addr, region.shm, region.shm_offset)
851        }
852        .context("move to staging")? as u64;
853        Ok(())
854    });
855    worker.channel.wait_complete();
856
857    match result {
858        Ok(()) => {
859            let resident_pages = count_resident_pages(guest_memory);
860            if resident_pages > 0 {
861                error!(
862                    "active page is not zero just after swap out but {} pages",
863                    resident_pages
864                );
865            }
866            let time_ms = start_time.elapsed().as_millis().try_into()?;
867            Ok(SwapStateTransition { pages, time_ms })
868        }
869        Err(e) => Err(e),
870    }
871}
872
873fn abort_background_job<T>(
874    join_handle: ScopedJoinHandle<'_, anyhow::Result<T>>,
875    bg_job_control: &BackgroundJobControl,
876) -> anyhow::Result<T> {
877    bg_job_control.abort();
878    // Wait until the background job is aborted and the thread finishes.
879    let result = join_handle
880        .join()
881        .expect("panic on the background job thread");
882    bg_job_control.reset().context("reset swap in event")?;
883    result.context("failure on background job thread")
884}
885
886struct VmmSwapResult {
887    should_exit: bool,
888    slow_file_cleanup: bool,
889}
890
891fn handle_vmm_swap<'scope, 'env>(
892    scope: &'scope Scope<'scope, 'env>,
893    wait_ctx: &WaitContext<Token>,
894    page_handler: &'env PageHandler<'env>,
895    uffd_list: &'env mut UffdList<Token, DeadUffdCheckerImpl>,
896    guest_memory: &GuestMemory,
897    regions: &[Range<usize>],
898    command_tube: &Tube,
899    worker: &Worker<MoveToStaging>,
900    state_transition: &'env Mutex<SwapStateTransition>,
901    bg_job_control: &'env BackgroundJobControl,
902) -> anyhow::Result<VmmSwapResult> {
903    let mut state = match move_guest_to_staging(page_handler, guest_memory, worker) {
904        Ok(transition) => {
905            info!(
906                "move {} pages to staging in {} ms",
907                transition.pages, transition.time_ms
908            );
909            *state_transition.lock() = transition;
910            State::SwapOutPending
911        }
912        Err(e) => {
913            error!("failed to move memory to staging: {}", e);
914            *state_transition.lock() = SwapStateTransition::default();
915            State::Failed
916        }
917    };
918    command_tube
919        .send(&SwapStatus::dummy())
920        .context("send enable finish signal")?;
921
922    let mut try_gc_uffds = false;
923    loop {
924        let events = match &state {
925            State::SwapOutInProgress { started_time } => {
926                let events = wait_ctx
927                    .wait_timeout(Duration::ZERO)
928                    .context("wait poll events")?;
929
930                // TODO(b/273129441): swap out on a background thread.
931                // Proceed swap out only when there is no page fault (or other) events.
932                if events.is_empty() {
933                    match page_handler.swap_out(MAX_SWAP_CHUNK_SIZE) {
934                        Ok(num_pages) => {
935                            let mut state_transition = state_transition.lock();
936                            state_transition.pages += num_pages as u64;
937                            state_transition.time_ms =
938                                started_time.elapsed().as_millis().try_into()?;
939                            if num_pages == 0 {
940                                info!(
941                                    "swap out all {} pages to file in {} ms",
942                                    state_transition.pages, state_transition.time_ms
943                                );
944                                state = State::SwapOutCompleted;
945                            }
946                        }
947                        Err(e) => {
948                            error!("failed to swap out: {:?}", e);
949                            state = State::Failed;
950                            *state_transition.lock() = SwapStateTransition::default();
951                        }
952                    }
953                    continue;
954                }
955
956                events
957            }
958            _ => wait_ctx.wait().context("wait poll events")?,
959        };
960
961        for event in events.iter() {
962            match event.token {
963                Token::UffdEvents(id_uffd) => {
964                    let uffd = uffd_list
965                        .get(id_uffd)
966                        .with_context(|| format!("uffd is not found for idx: {id_uffd}"))?;
967                    // Userfaultfd does not work as level triggered but as edge triggered. We need
968                    // to read all the events in the userfaultfd here.
969                    // TODO(kawasin): Use [userfaultfd::Uffd::read_events()] for performance.
970                    while let Some(event) = uffd.read_event().context("read userfaultfd event")? {
971                        match event {
972                            UffdEvent::Pagefault { addr, .. } => {
973                                match page_handler.handle_page_fault(uffd, addr as usize) {
974                                    Ok(()) => {}
975                                    Err(PageHandlerError::Userfaultfd(UffdError::UffdClosed)) => {
976                                        // Do nothing for the uffd. It will be garbage-collected
977                                        // when a new uffd is registered.
978                                        break;
979                                    }
980                                    Err(e) => {
981                                        bail!("failed to handle page fault: {:?}", e);
982                                    }
983                                }
984                            }
985                            UffdEvent::Remove { start, end } => {
986                                page_handler
987                                    .handle_page_remove(start as usize, end as usize)
988                                    .context("handle fault")?;
989                            }
990                            event => {
991                                bail!("unsupported UffdEvent: {:?}", event);
992                            }
993                        }
994                    }
995                }
996                Token::Command => match command_tube
997                    .recv::<Command>()
998                    .context("recv swap command")?
999                {
1000                    Command::ProcessForked { uffd, reply_tube } => {
1001                        debug!("new fork uffd: {:?}", uffd);
1002                        let result = if let Err(e) = {
1003                            // SAFETY: regions is generated from the guest memory
1004                            // SAFETY: the uffd is from a new process.
1005                            unsafe { register_regions(regions, std::array::from_ref(&uffd)) }
1006                        } {
1007                            error!("failed to setup uffd: {:?}", e);
1008                            false
1009                        } else {
1010                            match uffd_list.register(uffd) {
1011                                Ok(is_dynamic_uffd) => {
1012                                    try_gc_uffds = is_dynamic_uffd;
1013                                    true
1014                                }
1015                                Err(e) => {
1016                                    error!("failed to register uffd to list: {:?}", e);
1017                                    false
1018                                }
1019                            }
1020                        };
1021                        if let Err(e) = reply_tube.send(&result) {
1022                            error!("failed to response to new process: {:?}", e);
1023                        }
1024                    }
1025                    Command::StaticDeviceSetupComplete(num_static_devices) => {
1026                        info!("static device setup complete: n={}", num_static_devices);
1027                        if !uffd_list.set_num_static_devices(num_static_devices) {
1028                            bail!("failed to set num_static_devices");
1029                        }
1030                    }
1031                    Command::Enable => {
1032                        let result = handle_enable_command(
1033                            state,
1034                            bg_job_control,
1035                            page_handler,
1036                            guest_memory,
1037                            worker,
1038                            state_transition,
1039                        );
1040                        command_tube
1041                            .send(&SwapStatus::dummy())
1042                            .context("send enable finish signal")?;
1043                        state = result?;
1044                    }
1045                    Command::Trim => match &state {
1046                        State::SwapOutPending => {
1047                            *state_transition.lock() = SwapStateTransition::default();
1048                            let join_handle = scope.spawn(|| {
1049                                let mut ctx = page_handler.start_trim();
1050                                let job = bg_job_control.new_job();
1051                                let start_time = std::time::Instant::now();
1052
1053                                while !job.is_aborted() {
1054                                    if let Some(trimmed_pages) =
1055                                        ctx.trim_pages(MAX_TRIM_PAGES).context("trim pages")?
1056                                    {
1057                                        let mut state_transition = state_transition.lock();
1058                                        state_transition.pages += trimmed_pages as u64;
1059                                        state_transition.time_ms =
1060                                            start_time.elapsed().as_millis().try_into()?;
1061                                    } else {
1062                                        // Traversed all pages.
1063                                        break;
1064                                    }
1065                                }
1066
1067                                if job.is_aborted() {
1068                                    info!("trim is aborted");
1069                                } else {
1070                                    info!(
1071                                        "trimmed {} clean pages and {} zero pages",
1072                                        ctx.trimmed_clean_pages(),
1073                                        ctx.trimmed_zero_pages()
1074                                    );
1075                                }
1076                                Ok(())
1077                            });
1078
1079                            state = State::Trim(join_handle);
1080                            info!("start trimming staging memory");
1081                        }
1082                        state => {
1083                            warn!(
1084                                "swap trim is not ready. state: {:?}",
1085                                SwapState::from(state)
1086                            );
1087                        }
1088                    },
1089                    Command::SwapOut => match &state {
1090                        State::SwapOutPending => {
1091                            state = State::SwapOutInProgress {
1092                                started_time: std::time::Instant::now(),
1093                            };
1094                            *state_transition.lock() = SwapStateTransition::default();
1095                            info!("start swapping out");
1096                        }
1097                        state => {
1098                            warn!("swap out is not ready. state: {:?}", SwapState::from(state));
1099                        }
1100                    },
1101                    Command::Disable { slow_file_cleanup } => {
1102                        match state {
1103                            State::Trim(join_handle) => {
1104                                info!("abort trim");
1105                                abort_background_job(join_handle, bg_job_control)
1106                                    .context("abort trim")?;
1107                            }
1108                            State::SwapOutInProgress { .. } => {
1109                                info!("swap out is aborted");
1110                            }
1111                            State::SwapInInProgress { join_handle, .. } => {
1112                                info!("swap in is in progress");
1113                                state = State::SwapInInProgress {
1114                                    join_handle,
1115                                    slow_file_cleanup,
1116                                };
1117                                continue;
1118                            }
1119                            _ => {}
1120                        }
1121                        *state_transition.lock() = SwapStateTransition::default();
1122
1123                        let uffd = uffd_list.clone_main_uffd().context("clone main uffd")?;
1124                        let join_handle = scope.spawn(move || {
1125                            let mut ctx = page_handler.start_swap_in();
1126                            let job = bg_job_control.new_job();
1127                            let start_time = std::time::Instant::now();
1128                            while !job.is_aborted() {
1129                                match ctx.swap_in(&uffd, MAX_SWAP_CHUNK_SIZE) {
1130                                    Ok(num_pages) => {
1131                                        if num_pages == 0 {
1132                                            break;
1133                                        }
1134                                        let mut state_transition = state_transition.lock();
1135                                        state_transition.pages += num_pages as u64;
1136                                        state_transition.time_ms =
1137                                            start_time.elapsed().as_millis().try_into()?;
1138                                    }
1139                                    Err(e) => {
1140                                        bail!("failed to swap in: {:?}", e);
1141                                    }
1142                                }
1143                            }
1144                            if job.is_aborted() {
1145                                info!("swap in is aborted");
1146                            }
1147                            Ok(())
1148                        });
1149                        state = State::SwapInInProgress {
1150                            join_handle,
1151                            slow_file_cleanup,
1152                        };
1153
1154                        info!("start swapping in");
1155                    }
1156                    Command::Exit => {
1157                        match state {
1158                            State::SwapInInProgress { join_handle, .. } => {
1159                                // Wait until swap-in finishes.
1160                                if let Err(e) = join_handle.join() {
1161                                    bail!("failed to join swap in thread: {:?}", e);
1162                                }
1163                                return Ok(VmmSwapResult {
1164                                    should_exit: true,
1165                                    slow_file_cleanup: false,
1166                                });
1167                            }
1168                            State::Trim(join_handle) => {
1169                                abort_background_job(join_handle, bg_job_control)
1170                                    .context("abort trim")?;
1171                            }
1172                            _ => {}
1173                        }
1174                        let mut ctx = page_handler.start_swap_in();
1175                        // Swap-in all before exit.
1176                        while ctx
1177                            .swap_in(uffd_list.main_uffd(), MAX_SWAP_CHUNK_SIZE)
1178                            .context("swap in")?
1179                            > 0
1180                        {}
1181                        return Ok(VmmSwapResult {
1182                            should_exit: true,
1183                            slow_file_cleanup: false,
1184                        });
1185                    }
1186                    Command::Status => {
1187                        let mut metrics = SwapMetrics {
1188                            resident_pages: count_resident_pages(guest_memory) as u64,
1189                            ..Default::default()
1190                        };
1191                        page_handler.load_metrics(&mut metrics);
1192                        let status = SwapStatus {
1193                            state: (&state).into(),
1194                            metrics,
1195                            state_transition: *state_transition.lock(),
1196                        };
1197                        command_tube.send(&status).context("send status response")?;
1198                        debug!("swap status: {:?}", status);
1199                    }
1200                },
1201                Token::BackgroundJobCompleted => {
1202                    // Reset the completed event.
1203                    if !bg_job_control
1204                        .reset()
1205                        .context("reset background job event")?
1206                    {
1207                        // When the job is aborted and the event is comsumed by reset(), the token
1208                        // `Token::BackgroundJobCompleted` may remain in the `events`. Just ignore
1209                        // the obsolete token here.
1210                        continue;
1211                    }
1212                    match state {
1213                        State::SwapInInProgress {
1214                            join_handle,
1215                            slow_file_cleanup,
1216                        } => {
1217                            join_handle
1218                                .join()
1219                                .expect("panic on the background job thread")
1220                                .context("swap in finish")?;
1221                            let state_transition = state_transition.lock();
1222                            info!(
1223                                "swap in all {} pages in {} ms.",
1224                                state_transition.pages, state_transition.time_ms
1225                            );
1226                            return Ok(VmmSwapResult {
1227                                should_exit: false,
1228                                slow_file_cleanup,
1229                            });
1230                        }
1231                        State::Trim(join_handle) => {
1232                            join_handle
1233                                .join()
1234                                .expect("panic on the background job thread")
1235                                .context("trim finish")?;
1236                            let state_transition = state_transition.lock();
1237                            info!(
1238                                "trimmed {} pages in {} ms.",
1239                                state_transition.pages, state_transition.time_ms
1240                            );
1241                            state = State::SwapOutPending;
1242                        }
1243                        state => {
1244                            bail!(
1245                                "background job completed but the actual state is {:?}",
1246                                SwapState::from(&state)
1247                            );
1248                        }
1249                    }
1250                }
1251            };
1252        }
1253        if try_gc_uffds {
1254            uffd_list.gc_dead_uffds().context("gc dead uffds")?;
1255            try_gc_uffds = false;
1256        }
1257    }
1258}