devices/virtio/video/
utils.rs

1// Copyright 2022 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// Not all video backends make use of the tools in this module.
6#![allow(dead_code)]
7
8use std::collections::btree_map::Entry;
9use std::collections::BTreeMap;
10use std::collections::VecDeque;
11use std::time::Duration;
12
13use base::AsRawDescriptor;
14use base::Event;
15use base::EventExt;
16use sync::Mutex;
17use thiserror::Error as ThisError;
18
19use crate::virtio::video::resource::GuestResource;
20
21/// Manages a pollable queue of events to be sent to the decoder or encoder.
22pub struct EventQueue<T> {
23    /// Pipe used to signal available events.
24    event: Event,
25    /// FIFO of all pending events.
26    pending_events: VecDeque<T>,
27}
28
29impl<T> EventQueue<T> {
30    /// Create a new event queue.
31    pub fn new() -> base::Result<Self> {
32        Ok(Self {
33            // Use semaphore semantics so `eventfd` can be `read` as many times as it has been
34            // `write`n to without blocking.
35            event: Event::new()?,
36            pending_events: Default::default(),
37        })
38    }
39
40    /// Add `event` to the queue.
41    pub fn queue_event(&mut self, event: T) -> base::Result<()> {
42        self.pending_events.push_back(event);
43        self.event.write_count(1)?;
44        Ok(())
45    }
46
47    /// Read the next event, blocking until an event becomes available.
48    pub fn dequeue_event(&mut self) -> base::Result<T> {
49        // Wait until at least one event is written, if necessary.
50        let cpt = self.event.read_count()?;
51        let event = match self.pending_events.pop_front() {
52            Some(event) => event,
53            None => panic!("event signaled but no pending event - this is a bug."),
54        };
55        // If we have more than one event pending, write the remainder back into the event so it
56        // keeps signalling.
57        if cpt > 1 {
58            self.event.write_count(cpt - 1)?;
59        }
60
61        Ok(event)
62    }
63
64    /// Remove all the posted events for which `predicate` returns `false`.
65    pub fn retain<P: FnMut(&T) -> bool>(&mut self, predicate: P) {
66        if !self.pending_events.is_empty() {
67            let _ = self
68                .event
69                .wait_timeout(Duration::from_millis(0))
70                .expect("wait_timeout failure");
71        }
72
73        self.pending_events.retain(predicate);
74
75        let num_pending_events = self.pending_events.len();
76        if num_pending_events > 0 {
77            self.event
78                .write_count(num_pending_events as u64)
79                .expect("write failure");
80        }
81    }
82
83    /// Returns the number of events currently pending on this queue, i.e. the number of times
84    /// `dequeue_event` can be called without blocking.
85    #[cfg(test)]
86    pub fn len(&self) -> usize {
87        self.pending_events.len()
88    }
89}
90
91impl<T> AsRawDescriptor for EventQueue<T> {
92    fn as_raw_descriptor(&self) -> base::RawDescriptor {
93        self.event.as_raw_descriptor()
94    }
95}
96
97/// An `EventQueue` that is `Sync`, `Send`, and non-mut - i.e. that can easily be passed across
98/// threads and wrapped into a `Rc` or `Arc`.
99pub struct SyncEventQueue<T>(Mutex<EventQueue<T>>);
100
101impl<T> From<EventQueue<T>> for SyncEventQueue<T> {
102    fn from(queue: EventQueue<T>) -> Self {
103        Self(Mutex::new(queue))
104    }
105}
106
107impl<T> SyncEventQueue<T> {
108    /// Add `event` to the queue.
109    pub fn queue_event(&self, event: T) -> base::Result<()> {
110        self.0.lock().queue_event(event)
111    }
112
113    /// Read the next event, blocking until an event becomes available.
114    pub fn dequeue_event(&self) -> base::Result<T> {
115        self.0.lock().dequeue_event()
116    }
117
118    /// Remove all the posted events for which `predicate` returns `false`.
119    pub fn retain<P: FnMut(&T) -> bool>(&self, predicate: P) {
120        self.0.lock().retain(predicate)
121    }
122
123    /// Returns the number of events currently pending on this queue, i.e. the number of times
124    /// `dequeue_event` can be called without blocking.
125    #[cfg(test)]
126    pub fn len(&self) -> usize {
127        self.0.lock().len()
128    }
129}
130
131impl<T> AsRawDescriptor for SyncEventQueue<T> {
132    fn as_raw_descriptor(&self) -> base::RawDescriptor {
133        self.0.lock().as_raw_descriptor()
134    }
135}
136
137/// Queue of all the output buffers provided by crosvm.
138pub struct OutputQueue {
139    // Max number of output buffers that can be imported into this queue.
140    num_buffers: usize,
141    // Maps picture IDs to the corresponding guest resource.
142    buffers: BTreeMap<u32, GuestResource>,
143    // Picture IDs of output buffers we can write into.
144    ready_buffers: VecDeque<u32>,
145}
146
147#[derive(Debug, ThisError)]
148pub enum OutputBufferImportError {
149    #[error("maximum number of imported buffers ({0}) already reached")]
150    MaxBuffersReached(usize),
151    #[error("a buffer with picture ID {0} is already imported")]
152    AlreadyImported(u32),
153}
154
155#[derive(Debug, ThisError)]
156pub enum OutputBufferReuseError {
157    #[error("no buffer with picture ID {0} is imported at the moment")]
158    NotYetImported(u32),
159    #[error("buffer with picture ID {0} is already ready for use")]
160    AlreadyUsed(u32),
161}
162
163impl OutputQueue {
164    /// Creates a new output queue capable of containing `num_buffers` buffers.
165    pub fn new(num_buffers: usize) -> Self {
166        Self {
167            num_buffers,
168            buffers: Default::default(),
169            ready_buffers: Default::default(),
170        }
171    }
172
173    /// Import a buffer, i.e. associate the buffer's `resource` to a given `picture_buffer_id`, and
174    /// make the buffer ready for use.
175    ///
176    /// A buffer with a given `picture_buffer_id` can only be imported once.
177    pub fn import_buffer(
178        &mut self,
179        picture_buffer_id: u32,
180        resource: GuestResource,
181    ) -> Result<(), OutputBufferImportError> {
182        if self.buffers.len() >= self.num_buffers {
183            return Err(OutputBufferImportError::MaxBuffersReached(self.num_buffers));
184        }
185
186        match self.buffers.entry(picture_buffer_id) {
187            Entry::Vacant(o) => {
188                o.insert(resource);
189            }
190            Entry::Occupied(_) => {
191                return Err(OutputBufferImportError::AlreadyImported(picture_buffer_id));
192            }
193        }
194
195        self.ready_buffers.push_back(picture_buffer_id);
196
197        Ok(())
198    }
199
200    /// Mark the previously-imported buffer with ID `picture_buffer_id` as ready for being used.
201    pub fn reuse_buffer(&mut self, picture_buffer_id: u32) -> Result<(), OutputBufferReuseError> {
202        if !self.buffers.contains_key(&picture_buffer_id) {
203            return Err(OutputBufferReuseError::NotYetImported(picture_buffer_id));
204        }
205
206        if self.ready_buffers.contains(&picture_buffer_id) {
207            return Err(OutputBufferReuseError::AlreadyUsed(picture_buffer_id));
208        }
209
210        self.ready_buffers.push_back(picture_buffer_id);
211
212        Ok(())
213    }
214
215    /// Get a buffer ready to be decoded into, if any is available.
216    pub fn try_get_ready_buffer(&mut self) -> Option<(u32, &mut GuestResource)> {
217        let picture_buffer_id = self.ready_buffers.pop_front()?;
218        // Unwrapping is safe here because our interface guarantees that ids in `ready_buffers` are
219        // valid keys for `buffers`.
220        Some((
221            picture_buffer_id,
222            self.buffers
223                .get_mut(&picture_buffer_id)
224                .expect("expected buffer not present in queue"),
225        ))
226    }
227
228    pub fn clear_ready_buffers(&mut self) {
229        self.ready_buffers.clear();
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use std::time::Duration;
236
237    use base::EventToken;
238    use base::WaitContext;
239
240    use super::*;
241    use crate::virtio::video::error::VideoError;
242    use crate::virtio::video::error::VideoResult;
243    use crate::virtio::video::format::Rect;
244
245    /// This is the same as DecoderEvent but copied here so that the test can be compiled
246    /// without depending on the "video-decoder" feature.
247    #[derive(Debug)]
248    pub enum TestEvent {
249        #[allow(dead_code)]
250        ProvidePictureBuffers {
251            min_num_buffers: u32,
252            width: i32,
253            height: i32,
254            visible_rect: Rect,
255        },
256        PictureReady {
257            picture_buffer_id: i32,
258            timestamp: u64,
259            visible_rect: Rect,
260        },
261        NotifyEndOfBitstreamBuffer(u32),
262        #[allow(dead_code)]
263        NotifyError(VideoError),
264        #[allow(dead_code)]
265        FlushCompleted(VideoResult<()>),
266        #[allow(dead_code)]
267        ResetCompleted(VideoResult<()>),
268    }
269
270    /// Test basic queue/dequeue functionality of `EventQueue`.
271    #[test]
272    fn event_queue() {
273        let mut event_queue = EventQueue::new().unwrap();
274
275        assert_eq!(
276            event_queue.queue_event(TestEvent::NotifyEndOfBitstreamBuffer(1)),
277            Ok(())
278        );
279        assert_eq!(event_queue.len(), 1);
280        assert_eq!(
281            event_queue.queue_event(TestEvent::PictureReady {
282                picture_buffer_id: 0,
283                timestamp: 42,
284                visible_rect: Rect {
285                    left: 0,
286                    top: 0,
287                    right: 320,
288                    bottom: 240,
289                },
290            }),
291            Ok(())
292        );
293        assert_eq!(event_queue.len(), 2);
294
295        assert!(matches!(
296            event_queue.dequeue_event(),
297            Ok(TestEvent::NotifyEndOfBitstreamBuffer(1))
298        ));
299        assert_eq!(event_queue.len(), 1);
300        assert!(matches!(
301            event_queue.dequeue_event(),
302            Ok(TestEvent::PictureReady {
303                picture_buffer_id: 0,
304                timestamp: 42,
305                visible_rect: Rect {
306                    left: 0,
307                    top: 0,
308                    right: 320,
309                    bottom: 240,
310                }
311            })
312        ));
313        assert_eq!(event_queue.len(), 0);
314    }
315
316    /// Test polling of `TestEventQueue`'s `event_pipe`.
317    #[test]
318    fn decoder_event_queue_polling() {
319        #[derive(EventToken)]
320        enum Token {
321            Event,
322        }
323
324        let mut event_queue = EventQueue::new().unwrap();
325        let wait_context = WaitContext::build_with(&[(&event_queue, Token::Event)]).unwrap();
326
327        // The queue is empty, so `event_pipe` should not signal.
328        assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0);
329
330        // `event_pipe` should signal as long as the queue is not empty.
331        event_queue
332            .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(1))
333            .unwrap();
334        assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
335        event_queue
336            .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(2))
337            .unwrap();
338        assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
339        event_queue
340            .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(3))
341            .unwrap();
342        assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
343
344        event_queue.dequeue_event().unwrap();
345        assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
346        event_queue.dequeue_event().unwrap();
347        assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
348        event_queue.dequeue_event().unwrap();
349
350        // The queue is empty again, so `event_pipe` should not signal.
351        assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0);
352    }
353}