vm_control/
balloon_tube.rs

1// Copyright 2023 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Balloon related control APIs.
6
7use std::collections::VecDeque;
8
9use anyhow::bail;
10use anyhow::Context;
11use anyhow::Result;
12use balloon_control::BalloonTubeCommand;
13use balloon_control::BalloonTubeResult;
14use base::error;
15use base::Error as SysError;
16use base::Tube;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::VmResponse;
21
22// Balloon commands that are sent on the crosvm control socket.
23#[derive(Serialize, Deserialize, Debug, Clone)]
24pub enum BalloonControlCommand {
25    /// Set the size of the VM's balloon.
26    Adjust {
27        num_bytes: u64,
28        wait_for_success: bool,
29    },
30    Stats,
31    WorkingSet,
32    WorkingSetConfig {
33        bins: Vec<u32>,
34        refresh_threshold: u32,
35        report_threshold: u32,
36    },
37}
38
39fn do_send(tube: &Tube, cmd: &BalloonControlCommand) -> Option<VmResponse> {
40    match *cmd {
41        BalloonControlCommand::Adjust {
42            num_bytes,
43            wait_for_success,
44        } => {
45            match tube.send(&BalloonTubeCommand::Adjust {
46                num_bytes,
47                allow_failure: wait_for_success,
48            }) {
49                Ok(_) => {
50                    if wait_for_success {
51                        None
52                    } else {
53                        Some(VmResponse::Ok)
54                    }
55                }
56                Err(_) => Some(VmResponse::Err(SysError::last())),
57            }
58        }
59        BalloonControlCommand::WorkingSetConfig {
60            ref bins,
61            refresh_threshold,
62            report_threshold,
63        } => {
64            match tube.send(&BalloonTubeCommand::WorkingSetConfig {
65                bins: bins.clone(),
66                refresh_threshold,
67                report_threshold,
68            }) {
69                Ok(_) => Some(VmResponse::Ok),
70                Err(_) => Some(VmResponse::Err(SysError::last())),
71            }
72        }
73        BalloonControlCommand::Stats => match tube.send(&BalloonTubeCommand::Stats) {
74            Ok(_) => None,
75            Err(_) => Some(VmResponse::Err(SysError::last())),
76        },
77        BalloonControlCommand::WorkingSet => match tube.send(&BalloonTubeCommand::WorkingSet) {
78            Ok(_) => None,
79            Err(_) => Some(VmResponse::Err(SysError::last())),
80        },
81    }
82}
83
84/// Utility for multiplexing a balloon tube between multiple control tubes. Commands
85/// are sent and processed serially.
86pub struct BalloonTube {
87    tube: Tube,
88    pending_queue: VecDeque<(BalloonControlCommand, Option<usize>)>,
89    pending_adjust_with_completion: Option<(u64, usize)>,
90}
91
92impl BalloonTube {
93    pub fn new(tube: Tube) -> Self {
94        BalloonTube {
95            tube,
96            pending_queue: VecDeque::new(),
97            pending_adjust_with_completion: None,
98        }
99    }
100
101    /// Sends or queues the given command to this tube. Associates the
102    /// response with the given key.
103    pub fn send_cmd(
104        &mut self,
105        cmd: BalloonControlCommand,
106        key: Option<usize>,
107    ) -> Option<(VmResponse, usize)> {
108        match cmd {
109            BalloonControlCommand::Adjust {
110                wait_for_success: true,
111                num_bytes,
112            } => {
113                let Some(key) = key else {
114                    error!("Asked for completion without reply key");
115                    return None;
116                };
117                let resp = self
118                    .pending_adjust_with_completion
119                    .take()
120                    .map(|(_, key)| (VmResponse::ErrString("Adjust overriden".to_string()), key));
121                if do_send(&self.tube, &cmd).is_some() {
122                    unreachable!("Unexpected early reply");
123                }
124                self.pending_adjust_with_completion = Some((num_bytes, key));
125                resp
126            }
127            _ => {
128                if !self.pending_queue.is_empty() {
129                    self.pending_queue.push_back((cmd, key));
130                    return None;
131                }
132                let resp = do_send(&self.tube, &cmd);
133                if resp.is_none() {
134                    self.pending_queue.push_back((cmd, key));
135                }
136                match key {
137                    None => None,
138                    Some(key) => resp.map(|r| (r, key)),
139                }
140            }
141        }
142    }
143
144    /// Receives responses from the balloon tube, and returns them with
145    /// their assoicated keys.
146    pub fn recv(&mut self) -> Result<Vec<(VmResponse, usize)>> {
147        let res = self
148            .tube
149            .recv::<BalloonTubeResult>()
150            .context("failed to read balloon tube")?;
151        if let BalloonTubeResult::Adjusted { num_bytes: actual } = res {
152            let Some((target, key)) = self.pending_adjust_with_completion else {
153                bail!("Unexpected balloon adjust to {}", actual);
154            };
155            if actual != target {
156                return Ok(vec![]);
157            }
158            self.pending_adjust_with_completion.take();
159            return Ok(vec![(VmResponse::Ok, key)]);
160        }
161        let mut responses = vec![];
162        if self.pending_queue.is_empty() {
163            bail!("Unexpected balloon tube result {:?}", res)
164        }
165        let resp = match (
166            &self.pending_queue.front().expect("entry disappeared").0,
167            res,
168        ) {
169            (
170                BalloonControlCommand::Stats,
171                BalloonTubeResult::Stats {
172                    stats,
173                    balloon_actual,
174                },
175            ) => VmResponse::BalloonStats {
176                stats,
177                balloon_actual,
178            },
179            (
180                BalloonControlCommand::WorkingSet,
181                BalloonTubeResult::WorkingSet { ws, balloon_actual },
182            ) => VmResponse::BalloonWS { ws, balloon_actual },
183            (_, resp) => {
184                bail!("Unexpected balloon tube result {:?}", resp);
185            }
186        };
187        let key = self.pending_queue.pop_front().expect("entry disappeared").1;
188        if let Some(key) = key {
189            responses.push((resp, key))
190        }
191        while let Some((cmd, key)) = self.pending_queue.front() {
192            match do_send(&self.tube, cmd) {
193                Some(resp) => {
194                    if let Some(key) = key {
195                        responses.push((resp, *key));
196                    }
197                    self.pending_queue.pop_front();
198                }
199                None => break,
200            }
201        }
202        Ok(responses)
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use balloon_control::BalloonStats;
209
210    use super::*;
211
212    fn balloon_device_respond_stats(device: &Tube) {
213        let BalloonTubeCommand::Stats = device.recv::<BalloonTubeCommand>().unwrap() else {
214            panic!("unexpected command");
215        };
216
217        device
218            .send(&BalloonTubeResult::Stats {
219                stats: BalloonStats::default(),
220                balloon_actual: 0,
221            })
222            .unwrap();
223    }
224
225    #[test]
226    fn test_stat_command() {
227        let (host, device) = Tube::pair().unwrap();
228        let mut balloon_tube = BalloonTube::new(host);
229
230        let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
231        assert!(resp.is_none());
232
233        balloon_device_respond_stats(&device);
234
235        let resp = balloon_tube.recv().unwrap();
236        assert_eq!(resp.len(), 1);
237        assert_eq!(resp[0].1, 0xc0ffee);
238        assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
239    }
240
241    #[test]
242    fn test_multiple_stat_command() {
243        let (host, device) = Tube::pair().unwrap();
244        let mut balloon_tube = BalloonTube::new(host);
245
246        let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
247        assert!(resp.is_none());
248        let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xbadcafe));
249        assert!(resp.is_none());
250
251        balloon_device_respond_stats(&device);
252
253        let resp = balloon_tube.recv().unwrap();
254        assert_eq!(resp.len(), 1);
255        assert_eq!(resp[0].1, 0xc0ffee);
256        assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
257
258        balloon_device_respond_stats(&device);
259
260        let resp = balloon_tube.recv().unwrap();
261        assert_eq!(resp.len(), 1);
262        assert_eq!(resp[0].1, 0xbadcafe);
263        assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
264    }
265
266    #[test]
267    fn test_queued_stats_adjust_no_reply() {
268        let (host, device) = Tube::pair().unwrap();
269        let mut balloon_tube = BalloonTube::new(host);
270
271        let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
272        assert!(resp.is_none());
273        let resp = balloon_tube.send_cmd(
274            BalloonControlCommand::Adjust {
275                num_bytes: 0,
276                wait_for_success: false,
277            },
278            Some(0xbadcafe),
279        );
280        assert!(resp.is_none());
281
282        balloon_device_respond_stats(&device);
283
284        let resp = balloon_tube.recv().unwrap();
285        assert_eq!(resp.len(), 2);
286        assert_eq!(resp[0].1, 0xc0ffee);
287        assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
288        assert_eq!(resp[1].1, 0xbadcafe);
289        assert!(matches!(resp[1].0, VmResponse::Ok));
290
291        let cmd = device.recv::<BalloonTubeCommand>().unwrap();
292        assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
293    }
294
295    #[test]
296    fn test_adjust_with_reply() {
297        let (host, device) = Tube::pair().unwrap();
298        let mut balloon_tube = BalloonTube::new(host);
299
300        let resp = balloon_tube.send_cmd(
301            BalloonControlCommand::Adjust {
302                num_bytes: 0xc0ffee,
303                wait_for_success: true,
304            },
305            Some(0xc0ffee),
306        );
307        assert!(resp.is_none());
308        let cmd = device.recv::<BalloonTubeCommand>().unwrap();
309        assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
310
311        let resp = balloon_tube.send_cmd(
312            BalloonControlCommand::Adjust {
313                num_bytes: 0xbadcafe,
314                wait_for_success: true,
315            },
316            Some(0xbadcafe),
317        );
318        assert!(matches!(resp, Some((VmResponse::ErrString(_), 0xc0ffee))));
319        let cmd = device.recv::<BalloonTubeCommand>().unwrap();
320        assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
321
322        device
323            .send(&BalloonTubeResult::Adjusted {
324                num_bytes: 0xc0ffee,
325            })
326            .unwrap();
327        let resp = balloon_tube.recv().unwrap();
328        assert_eq!(resp.len(), 0);
329
330        device
331            .send(&BalloonTubeResult::Adjusted {
332                num_bytes: 0xbadcafe,
333            })
334            .unwrap();
335        let resp = balloon_tube.recv().unwrap();
336        assert_eq!(resp.len(), 1);
337        assert_eq!(resp[0].1, 0xbadcafe);
338        assert!(matches!(resp[0].0, VmResponse::Ok));
339    }
340
341    #[test]
342    fn test_stats_and_adjust_with_reply() {
343        let (host, device) = Tube::pair().unwrap();
344        let mut balloon_tube = BalloonTube::new(host);
345
346        let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
347        assert!(resp.is_none());
348
349        let resp = balloon_tube.send_cmd(
350            BalloonControlCommand::Adjust {
351                num_bytes: 0xbadcafe,
352                wait_for_success: true,
353            },
354            Some(0xbadcafe),
355        );
356        assert!(resp.is_none());
357
358        let cmd = device.recv::<BalloonTubeCommand>().unwrap();
359        assert!(matches!(cmd, BalloonTubeCommand::Stats));
360        let cmd = device.recv::<BalloonTubeCommand>().unwrap();
361        assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
362
363        device
364            .send(&BalloonTubeResult::Adjusted {
365                num_bytes: 0xbadcafe,
366            })
367            .unwrap();
368        let resp = balloon_tube.recv().unwrap();
369        assert_eq!(resp.len(), 1);
370        assert_eq!(resp[0].1, 0xbadcafe);
371        assert!(matches!(resp[0].0, VmResponse::Ok));
372
373        device
374            .send(&BalloonTubeResult::Stats {
375                stats: BalloonStats::default(),
376                balloon_actual: 0,
377            })
378            .unwrap();
379        let resp = balloon_tube.recv().unwrap();
380        assert_eq!(resp.len(), 1);
381        assert_eq!(resp[0].1, 0xc0ffee);
382        assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
383    }
384}