devices/virtio/video/
utils.rs1#![allow(dead_code)]
7
8use std::collections::btree_map::Entry;
9use std::collections::BTreeMap;
10use std::collections::VecDeque;
11use std::time::Duration;
12
13use base::AsRawDescriptor;
14use base::Event;
15use base::EventExt;
16use sync::Mutex;
17use thiserror::Error as ThisError;
18
19use crate::virtio::video::resource::GuestResource;
20
21pub struct EventQueue<T> {
23 event: Event,
25 pending_events: VecDeque<T>,
27}
28
29impl<T> EventQueue<T> {
30 pub fn new() -> base::Result<Self> {
32 Ok(Self {
33 event: Event::new()?,
36 pending_events: Default::default(),
37 })
38 }
39
40 pub fn queue_event(&mut self, event: T) -> base::Result<()> {
42 self.pending_events.push_back(event);
43 self.event.write_count(1)?;
44 Ok(())
45 }
46
47 pub fn dequeue_event(&mut self) -> base::Result<T> {
49 let cpt = self.event.read_count()?;
51 let event = match self.pending_events.pop_front() {
52 Some(event) => event,
53 None => panic!("event signaled but no pending event - this is a bug."),
54 };
55 if cpt > 1 {
58 self.event.write_count(cpt - 1)?;
59 }
60
61 Ok(event)
62 }
63
64 pub fn retain<P: FnMut(&T) -> bool>(&mut self, predicate: P) {
66 if !self.pending_events.is_empty() {
67 let _ = self
68 .event
69 .wait_timeout(Duration::from_millis(0))
70 .expect("wait_timeout failure");
71 }
72
73 self.pending_events.retain(predicate);
74
75 let num_pending_events = self.pending_events.len();
76 if num_pending_events > 0 {
77 self.event
78 .write_count(num_pending_events as u64)
79 .expect("write failure");
80 }
81 }
82
83 #[cfg(test)]
86 pub fn len(&self) -> usize {
87 self.pending_events.len()
88 }
89}
90
91impl<T> AsRawDescriptor for EventQueue<T> {
92 fn as_raw_descriptor(&self) -> base::RawDescriptor {
93 self.event.as_raw_descriptor()
94 }
95}
96
97pub struct SyncEventQueue<T>(Mutex<EventQueue<T>>);
100
101impl<T> From<EventQueue<T>> for SyncEventQueue<T> {
102 fn from(queue: EventQueue<T>) -> Self {
103 Self(Mutex::new(queue))
104 }
105}
106
107impl<T> SyncEventQueue<T> {
108 pub fn queue_event(&self, event: T) -> base::Result<()> {
110 self.0.lock().queue_event(event)
111 }
112
113 pub fn dequeue_event(&self) -> base::Result<T> {
115 self.0.lock().dequeue_event()
116 }
117
118 pub fn retain<P: FnMut(&T) -> bool>(&self, predicate: P) {
120 self.0.lock().retain(predicate)
121 }
122
123 #[cfg(test)]
126 pub fn len(&self) -> usize {
127 self.0.lock().len()
128 }
129}
130
131impl<T> AsRawDescriptor for SyncEventQueue<T> {
132 fn as_raw_descriptor(&self) -> base::RawDescriptor {
133 self.0.lock().as_raw_descriptor()
134 }
135}
136
137pub struct OutputQueue {
139 num_buffers: usize,
141 buffers: BTreeMap<u32, GuestResource>,
143 ready_buffers: VecDeque<u32>,
145}
146
147#[derive(Debug, ThisError)]
148pub enum OutputBufferImportError {
149 #[error("maximum number of imported buffers ({0}) already reached")]
150 MaxBuffersReached(usize),
151 #[error("a buffer with picture ID {0} is already imported")]
152 AlreadyImported(u32),
153}
154
155#[derive(Debug, ThisError)]
156pub enum OutputBufferReuseError {
157 #[error("no buffer with picture ID {0} is imported at the moment")]
158 NotYetImported(u32),
159 #[error("buffer with picture ID {0} is already ready for use")]
160 AlreadyUsed(u32),
161}
162
163impl OutputQueue {
164 pub fn new(num_buffers: usize) -> Self {
166 Self {
167 num_buffers,
168 buffers: Default::default(),
169 ready_buffers: Default::default(),
170 }
171 }
172
173 pub fn import_buffer(
178 &mut self,
179 picture_buffer_id: u32,
180 resource: GuestResource,
181 ) -> Result<(), OutputBufferImportError> {
182 if self.buffers.len() >= self.num_buffers {
183 return Err(OutputBufferImportError::MaxBuffersReached(self.num_buffers));
184 }
185
186 match self.buffers.entry(picture_buffer_id) {
187 Entry::Vacant(o) => {
188 o.insert(resource);
189 }
190 Entry::Occupied(_) => {
191 return Err(OutputBufferImportError::AlreadyImported(picture_buffer_id));
192 }
193 }
194
195 self.ready_buffers.push_back(picture_buffer_id);
196
197 Ok(())
198 }
199
200 pub fn reuse_buffer(&mut self, picture_buffer_id: u32) -> Result<(), OutputBufferReuseError> {
202 if !self.buffers.contains_key(&picture_buffer_id) {
203 return Err(OutputBufferReuseError::NotYetImported(picture_buffer_id));
204 }
205
206 if self.ready_buffers.contains(&picture_buffer_id) {
207 return Err(OutputBufferReuseError::AlreadyUsed(picture_buffer_id));
208 }
209
210 self.ready_buffers.push_back(picture_buffer_id);
211
212 Ok(())
213 }
214
215 pub fn try_get_ready_buffer(&mut self) -> Option<(u32, &mut GuestResource)> {
217 let picture_buffer_id = self.ready_buffers.pop_front()?;
218 Some((
221 picture_buffer_id,
222 self.buffers
223 .get_mut(&picture_buffer_id)
224 .expect("expected buffer not present in queue"),
225 ))
226 }
227
228 pub fn clear_ready_buffers(&mut self) {
229 self.ready_buffers.clear();
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use std::time::Duration;
236
237 use base::EventToken;
238 use base::WaitContext;
239
240 use super::*;
241 use crate::virtio::video::error::VideoError;
242 use crate::virtio::video::error::VideoResult;
243 use crate::virtio::video::format::Rect;
244
245 #[derive(Debug)]
248 pub enum TestEvent {
249 #[allow(dead_code)]
250 ProvidePictureBuffers {
251 min_num_buffers: u32,
252 width: i32,
253 height: i32,
254 visible_rect: Rect,
255 },
256 PictureReady {
257 picture_buffer_id: i32,
258 timestamp: u64,
259 visible_rect: Rect,
260 },
261 NotifyEndOfBitstreamBuffer(u32),
262 #[allow(dead_code)]
263 NotifyError(VideoError),
264 #[allow(dead_code)]
265 FlushCompleted(VideoResult<()>),
266 #[allow(dead_code)]
267 ResetCompleted(VideoResult<()>),
268 }
269
270 #[test]
272 fn event_queue() {
273 let mut event_queue = EventQueue::new().unwrap();
274
275 assert_eq!(
276 event_queue.queue_event(TestEvent::NotifyEndOfBitstreamBuffer(1)),
277 Ok(())
278 );
279 assert_eq!(event_queue.len(), 1);
280 assert_eq!(
281 event_queue.queue_event(TestEvent::PictureReady {
282 picture_buffer_id: 0,
283 timestamp: 42,
284 visible_rect: Rect {
285 left: 0,
286 top: 0,
287 right: 320,
288 bottom: 240,
289 },
290 }),
291 Ok(())
292 );
293 assert_eq!(event_queue.len(), 2);
294
295 assert!(matches!(
296 event_queue.dequeue_event(),
297 Ok(TestEvent::NotifyEndOfBitstreamBuffer(1))
298 ));
299 assert_eq!(event_queue.len(), 1);
300 assert!(matches!(
301 event_queue.dequeue_event(),
302 Ok(TestEvent::PictureReady {
303 picture_buffer_id: 0,
304 timestamp: 42,
305 visible_rect: Rect {
306 left: 0,
307 top: 0,
308 right: 320,
309 bottom: 240,
310 }
311 })
312 ));
313 assert_eq!(event_queue.len(), 0);
314 }
315
316 #[test]
318 fn decoder_event_queue_polling() {
319 #[derive(EventToken)]
320 enum Token {
321 Event,
322 }
323
324 let mut event_queue = EventQueue::new().unwrap();
325 let wait_context = WaitContext::build_with(&[(&event_queue, Token::Event)]).unwrap();
326
327 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0);
329
330 event_queue
332 .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(1))
333 .unwrap();
334 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
335 event_queue
336 .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(2))
337 .unwrap();
338 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
339 event_queue
340 .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(3))
341 .unwrap();
342 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
343
344 event_queue.dequeue_event().unwrap();
345 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
346 event_queue.dequeue_event().unwrap();
347 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
348 event_queue.dequeue_event().unwrap();
349
350 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0);
352 }
353}