devices/virtio/video/
worker.rs

1// Copyright 2020 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Worker that runs in a virtio-video thread.
6
7use std::collections::VecDeque;
8use std::time::Duration;
9
10use base::clone_descriptor;
11use base::error;
12use base::info;
13use base::Event;
14use base::WaitContext;
15use cros_async::select3;
16use cros_async::AsyncWrapper;
17use cros_async::EventAsync;
18use cros_async::Executor;
19use cros_async::SelectResult;
20use futures::FutureExt;
21
22use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap;
23use crate::virtio::video::command::QueueType;
24use crate::virtio::video::command::VideoCmd;
25use crate::virtio::video::device::AsyncCmdResponse;
26use crate::virtio::video::device::AsyncCmdTag;
27use crate::virtio::video::device::Device;
28use crate::virtio::video::device::Token;
29use crate::virtio::video::device::VideoCmdResponseType;
30use crate::virtio::video::device::VideoEvtResponseType;
31use crate::virtio::video::event;
32use crate::virtio::video::event::EvtType;
33use crate::virtio::video::event::VideoEvt;
34use crate::virtio::video::response;
35use crate::virtio::video::response::Response;
36use crate::virtio::video::Error;
37use crate::virtio::video::Result;
38use crate::virtio::DescriptorChain;
39use crate::virtio::Queue;
40
41/// Worker that takes care of running the virtio video device.
42pub struct Worker {
43    /// VirtIO queue for Command queue
44    cmd_queue: Queue,
45    /// VirtIO queue for Event queue
46    event_queue: Queue,
47    /// Stores descriptor chains in which responses for asynchronous commands will be written
48    desc_map: AsyncCmdDescMap,
49}
50
51/// Pair of a descriptor chain and a response to be written.
52type WritableResp = (DescriptorChain, response::CmdResponse);
53
54impl Worker {
55    pub fn new(cmd_queue: Queue, event_queue: Queue) -> Self {
56        Self {
57            cmd_queue,
58            event_queue,
59            desc_map: Default::default(),
60        }
61    }
62
63    /// Writes responses into the command queue.
64    fn write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()> {
65        if responses.is_empty() {
66            return Ok(());
67        }
68        while let Some((mut desc, response)) = responses.pop_front() {
69            if let Err(e) = response.write(&mut desc.writer) {
70                error!(
71                    "failed to write a command response for {:?}: {}",
72                    response, e
73                );
74            }
75            self.cmd_queue.add_used(desc);
76        }
77        self.cmd_queue.trigger_interrupt();
78        Ok(())
79    }
80
81    /// Writes a `VideoEvt` into the event queue.
82    fn write_event(&mut self, event: event::VideoEvt) -> Result<()> {
83        let mut desc = self
84            .event_queue
85            .pop()
86            .ok_or(Error::DescriptorNotAvailable)?;
87
88        event
89            .write(&mut desc.writer)
90            .map_err(|error| Error::WriteEventFailure { event, error })?;
91        self.event_queue.add_used(desc);
92        self.event_queue.trigger_interrupt();
93        Ok(())
94    }
95
96    /// Writes the `event_responses` into the command queue or the event queue according to
97    /// each response's type.
98    ///
99    /// # Arguments
100    ///
101    /// * `event_responses` - Responses to write
102    /// * `stream_id` - Stream session ID of the responses
103    fn write_event_responses(
104        &mut self,
105        event_responses: Vec<VideoEvtResponseType>,
106        stream_id: u32,
107    ) -> Result<()> {
108        let mut responses: VecDeque<WritableResp> = Default::default();
109        for event_response in event_responses {
110            match event_response {
111                VideoEvtResponseType::AsyncCmd(async_response) => {
112                    let AsyncCmdResponse {
113                        tag,
114                        response: cmd_result,
115                    } = async_response;
116                    match self.desc_map.remove(&tag) {
117                        Some(desc) => {
118                            let cmd_response = match cmd_result {
119                                Ok(r) => r,
120                                Err(e) => {
121                                    error!("returning async error response: {}", &e);
122                                    e.into()
123                                }
124                            };
125                            responses.push_back((desc, cmd_response))
126                        }
127                        None => match tag {
128                            // TODO(b/153406792): Drain is cancelled by clearing either of the
129                            // stream's queues. To work around a limitation in the VDA api, the
130                            // output queue is cleared synchronously without going through VDA.
131                            // Because of this, the cancellation response from VDA for the
132                            // input queue might fail to find the drain's AsyncCmdTag.
133                            AsyncCmdTag::Drain { stream_id: _ } => {
134                                info!("ignoring unknown drain response");
135                            }
136                            _ => {
137                                error!("dropping response for an untracked command: {:?}", tag);
138                            }
139                        },
140                    }
141                }
142                VideoEvtResponseType::Event(evt) => {
143                    self.write_event(evt)?;
144                }
145            }
146        }
147
148        if let Err(e) = self.write_responses(&mut responses) {
149            error!("Failed to write event responses: {:?}", e);
150            // Ignore result of write_event for a fatal error.
151            let _ = self.write_event(VideoEvt {
152                typ: EvtType::Error,
153                stream_id,
154            });
155            return Err(e);
156        }
157
158        Ok(())
159    }
160
161    /// Handles a `DescriptorChain` value sent via the command queue and returns a `VecDeque`
162    /// of `WritableResp` to be sent to the guest.
163    ///
164    /// # Arguments
165    ///
166    /// * `device` - Instance of backend device
167    /// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to
168    ///   `wait_ctx`
169    /// * `desc` - `DescriptorChain` to handle
170    fn handle_command_desc(
171        &mut self,
172        device: &mut dyn Device,
173        wait_ctx: &WaitContext<Token>,
174        mut desc: DescriptorChain,
175    ) -> Result<VecDeque<WritableResp>> {
176        let mut responses: VecDeque<WritableResp> = Default::default();
177        let cmd = VideoCmd::from_reader(&mut desc.reader).map_err(Error::ReadFailure)?;
178
179        // If a destruction command comes, cancel pending requests.
180        // TODO(b/161774071): Allow `process_cmd` to return multiple responses and move this
181        // into encoder/decoder.
182        let async_responses = match cmd {
183            VideoCmd::ResourceDestroyAll {
184                stream_id,
185                queue_type,
186            } => self
187                .desc_map
188                .create_cancellation_responses(&stream_id, Some(queue_type), None),
189            VideoCmd::StreamDestroy { stream_id } => self
190                .desc_map
191                .create_cancellation_responses(&stream_id, None, None),
192            VideoCmd::QueueClear {
193                stream_id,
194                queue_type: QueueType::Output,
195            } => {
196                // TODO(b/153406792): Due to a workaround for a limitation in the VDA api,
197                // clearing the output queue doesn't go through the same Async path as clearing
198                // the input queue. However, we still need to cancel the pending resources.
199                self.desc_map.create_cancellation_responses(
200                    &stream_id,
201                    Some(QueueType::Output),
202                    None,
203                )
204            }
205            _ => Default::default(),
206        };
207        for async_response in async_responses {
208            let AsyncCmdResponse {
209                tag,
210                response: cmd_result,
211            } = async_response;
212            let destroy_response = match cmd_result {
213                Ok(r) => r,
214                Err(e) => {
215                    error!("returning async error response: {}", &e);
216                    e.into()
217                }
218            };
219            match self.desc_map.remove(&tag) {
220                Some(destroy_desc) => {
221                    responses.push_back((destroy_desc, destroy_response));
222                }
223                None => error!("dropping response for an untracked command: {:?}", tag),
224            }
225        }
226
227        // Process the command by the device.
228        let (cmd_response, event_responses_with_id) = device.process_cmd(cmd, wait_ctx);
229        match cmd_response {
230            VideoCmdResponseType::Sync(r) => {
231                responses.push_back((desc, r));
232            }
233            VideoCmdResponseType::Async(tag) => {
234                // If the command expects an asynchronous response,
235                // store `desc` to use it after the back-end device notifies the
236                // completion.
237                self.desc_map.insert(tag, desc);
238            }
239        }
240        if let Some((stream_id, event_responses)) = event_responses_with_id {
241            self.write_event_responses(event_responses, stream_id)?;
242        }
243
244        Ok(responses)
245    }
246
247    /// Handles each command in the command queue.
248    ///
249    /// # Arguments
250    ///
251    /// * `device` - Instance of backend device
252    /// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to
253    ///   `wait_ctx`
254    fn handle_command_queue(
255        &mut self,
256        device: &mut dyn Device,
257        wait_ctx: &WaitContext<Token>,
258    ) -> Result<()> {
259        while let Some(desc) = self.cmd_queue.pop() {
260            let mut resps = self.handle_command_desc(device, wait_ctx, desc)?;
261            self.write_responses(&mut resps)?;
262        }
263        Ok(())
264    }
265
266    /// Handles an event notified via an event.
267    ///
268    /// # Arguments
269    ///
270    /// * `device` - Instance of backend device
271    /// * `stream_id` - Stream session ID of the event
272    /// * `wait_ctx` - `device` may register a new `Token::Buffer` for a new stream session to
273    ///   `wait_ctx`
274    fn handle_event(
275        &mut self,
276        device: &mut dyn Device,
277        stream_id: u32,
278        wait_ctx: &WaitContext<Token>,
279    ) -> Result<()> {
280        if let Some(event_responses) = device.process_event(&mut self.desc_map, stream_id, wait_ctx)
281        {
282            self.write_event_responses(event_responses, stream_id)?;
283        }
284        Ok(())
285    }
286
287    /// Handles a completed buffer barrier.
288    ///
289    /// # Arguments
290    ///
291    /// * `device` - Instance of backend device
292    /// * `stream_id` - Stream session ID of the event
293    /// * `wait_ctx` - `device` may deregister the completed `Token::BufferBarrier` from `wait_ctx`.
294    fn handle_buffer_barrier(
295        &mut self,
296        device: &mut dyn Device,
297        stream_id: u32,
298        wait_ctx: &WaitContext<Token>,
299    ) -> Result<()> {
300        if let Some(event_responses) = device.process_buffer_barrier(stream_id, wait_ctx) {
301            self.write_event_responses(event_responses, stream_id)?;
302        }
303        Ok(())
304    }
305
306    /// Runs the video device virtio queues in a blocking way.
307    ///
308    /// # Arguments
309    ///
310    /// * `device` - Instance of backend device
311    /// * `kill_evt` - `Event` notified to make `run` stop and return
312    pub fn run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()> {
313        let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
314            (self.cmd_queue.event(), Token::CmdQueue),
315            (self.event_queue.event(), Token::EventQueue),
316            (kill_evt, Token::Kill),
317        ])
318        .map_err(Error::WaitContextCreationFailed)?;
319
320        loop {
321            let wait_events = wait_ctx.wait().map_err(Error::WaitError)?;
322
323            for wait_event in wait_events.iter().filter(|e| e.is_readable) {
324                match wait_event.token {
325                    Token::CmdQueue => {
326                        let _ = self.cmd_queue.event().wait();
327                        self.handle_command_queue(device.as_mut(), &wait_ctx)?;
328                    }
329                    Token::EventQueue => {
330                        let _ = self.event_queue.event().wait();
331                    }
332                    Token::Event { id } => {
333                        self.handle_event(device.as_mut(), id, &wait_ctx)?;
334                    }
335                    Token::BufferBarrier { id } => {
336                        self.handle_buffer_barrier(device.as_mut(), id, &wait_ctx)?;
337                    }
338                    Token::Kill => return Ok(()),
339                }
340            }
341        }
342    }
343
344    /// Runs the video device virtio queues asynchronously.
345    ///
346    /// # Arguments
347    ///
348    /// * `device` - Instance of backend device
349    /// * `ex` - Instance of `Executor` of asynchronous operations
350    /// * `cmd_evt` - Driver-to-device kick event for the command queue
351    /// * `event_evt` - Driver-to-device kick event for the event queue
352    #[allow(dead_code)]
353    pub async fn run_async(
354        mut self,
355        mut device: Box<dyn Device>,
356        ex: Executor,
357        cmd_evt: Event,
358        event_evt: Event,
359    ) -> Result<()> {
360        let cmd_queue_evt =
361            EventAsync::new(cmd_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
362        let event_queue_evt =
363            EventAsync::new(event_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
364
365        // WaitContext to wait for the response from the encoder/decoder device.
366        let device_wait_ctx = WaitContext::new().map_err(Error::WaitContextCreationFailed)?;
367        let device_evt = ex
368            .async_from(AsyncWrapper::new(
369                clone_descriptor(&device_wait_ctx).map_err(Error::CloneDescriptorFailed)?,
370            ))
371            .map_err(Error::EventAsyncCreationFailed)?;
372
373        loop {
374            let (
375                cmd_queue_evt,
376                device_evt,
377                // Ignore driver-to-device kicks since the event queue is write-only for a device.
378                _event_queue_evt,
379            ) = select3(
380                cmd_queue_evt.next_val().boxed_local(),
381                device_evt.wait_readable().boxed_local(),
382                event_queue_evt.next_val().boxed_local(),
383            )
384            .await;
385
386            if let SelectResult::Finished(_) = cmd_queue_evt {
387                self.handle_command_queue(device.as_mut(), &device_wait_ctx)?;
388            }
389
390            if let SelectResult::Finished(_) = device_evt {
391                let device_events = match device_wait_ctx.wait_timeout(Duration::from_secs(0)) {
392                    Ok(device_events) => device_events,
393                    Err(_) => {
394                        error!("failed to read a device event");
395                        continue;
396                    }
397                };
398                for device_event in device_events {
399                    // A Device must trigger only Token::Event. See [`Device::process_cmd()`].
400                    if let Token::Event { id } = device_event.token {
401                        self.handle_event(device.as_mut(), id, &device_wait_ctx)?;
402                    } else {
403                        error!(
404                            "invalid event is triggered by a device {:?}",
405                            device_event.token
406                        );
407                    }
408                }
409            }
410        }
411    }
412}