1use std::collections::VecDeque;
8use std::time::Duration;
9
10use base::clone_descriptor;
11use base::error;
12use base::info;
13use base::Event;
14use base::WaitContext;
15use cros_async::select3;
16use cros_async::AsyncWrapper;
17use cros_async::EventAsync;
18use cros_async::Executor;
19use cros_async::SelectResult;
20use futures::FutureExt;
21
22use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap;
23use crate::virtio::video::command::QueueType;
24use crate::virtio::video::command::VideoCmd;
25use crate::virtio::video::device::AsyncCmdResponse;
26use crate::virtio::video::device::AsyncCmdTag;
27use crate::virtio::video::device::Device;
28use crate::virtio::video::device::Token;
29use crate::virtio::video::device::VideoCmdResponseType;
30use crate::virtio::video::device::VideoEvtResponseType;
31use crate::virtio::video::event;
32use crate::virtio::video::event::EvtType;
33use crate::virtio::video::event::VideoEvt;
34use crate::virtio::video::response;
35use crate::virtio::video::response::Response;
36use crate::virtio::video::Error;
37use crate::virtio::video::Result;
38use crate::virtio::DescriptorChain;
39use crate::virtio::Queue;
40
41pub struct Worker {
43 cmd_queue: Queue,
45 event_queue: Queue,
47 desc_map: AsyncCmdDescMap,
49}
50
51type WritableResp = (DescriptorChain, response::CmdResponse);
53
54impl Worker {
55 pub fn new(cmd_queue: Queue, event_queue: Queue) -> Self {
56 Self {
57 cmd_queue,
58 event_queue,
59 desc_map: Default::default(),
60 }
61 }
62
63 fn write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()> {
65 if responses.is_empty() {
66 return Ok(());
67 }
68 while let Some((mut desc, response)) = responses.pop_front() {
69 if let Err(e) = response.write(&mut desc.writer) {
70 error!(
71 "failed to write a command response for {:?}: {}",
72 response, e
73 );
74 }
75 self.cmd_queue.add_used(desc);
76 }
77 self.cmd_queue.trigger_interrupt();
78 Ok(())
79 }
80
81 fn write_event(&mut self, event: event::VideoEvt) -> Result<()> {
83 let mut desc = self
84 .event_queue
85 .pop()
86 .ok_or(Error::DescriptorNotAvailable)?;
87
88 event
89 .write(&mut desc.writer)
90 .map_err(|error| Error::WriteEventFailure { event, error })?;
91 self.event_queue.add_used(desc);
92 self.event_queue.trigger_interrupt();
93 Ok(())
94 }
95
96 fn write_event_responses(
104 &mut self,
105 event_responses: Vec<VideoEvtResponseType>,
106 stream_id: u32,
107 ) -> Result<()> {
108 let mut responses: VecDeque<WritableResp> = Default::default();
109 for event_response in event_responses {
110 match event_response {
111 VideoEvtResponseType::AsyncCmd(async_response) => {
112 let AsyncCmdResponse {
113 tag,
114 response: cmd_result,
115 } = async_response;
116 match self.desc_map.remove(&tag) {
117 Some(desc) => {
118 let cmd_response = match cmd_result {
119 Ok(r) => r,
120 Err(e) => {
121 error!("returning async error response: {}", &e);
122 e.into()
123 }
124 };
125 responses.push_back((desc, cmd_response))
126 }
127 None => match tag {
128 AsyncCmdTag::Drain { stream_id: _ } => {
134 info!("ignoring unknown drain response");
135 }
136 _ => {
137 error!("dropping response for an untracked command: {:?}", tag);
138 }
139 },
140 }
141 }
142 VideoEvtResponseType::Event(evt) => {
143 self.write_event(evt)?;
144 }
145 }
146 }
147
148 if let Err(e) = self.write_responses(&mut responses) {
149 error!("Failed to write event responses: {:?}", e);
150 let _ = self.write_event(VideoEvt {
152 typ: EvtType::Error,
153 stream_id,
154 });
155 return Err(e);
156 }
157
158 Ok(())
159 }
160
161 fn handle_command_desc(
171 &mut self,
172 device: &mut dyn Device,
173 wait_ctx: &WaitContext<Token>,
174 mut desc: DescriptorChain,
175 ) -> Result<VecDeque<WritableResp>> {
176 let mut responses: VecDeque<WritableResp> = Default::default();
177 let cmd = VideoCmd::from_reader(&mut desc.reader).map_err(Error::ReadFailure)?;
178
179 let async_responses = match cmd {
183 VideoCmd::ResourceDestroyAll {
184 stream_id,
185 queue_type,
186 } => self
187 .desc_map
188 .create_cancellation_responses(&stream_id, Some(queue_type), None),
189 VideoCmd::StreamDestroy { stream_id } => self
190 .desc_map
191 .create_cancellation_responses(&stream_id, None, None),
192 VideoCmd::QueueClear {
193 stream_id,
194 queue_type: QueueType::Output,
195 } => {
196 self.desc_map.create_cancellation_responses(
200 &stream_id,
201 Some(QueueType::Output),
202 None,
203 )
204 }
205 _ => Default::default(),
206 };
207 for async_response in async_responses {
208 let AsyncCmdResponse {
209 tag,
210 response: cmd_result,
211 } = async_response;
212 let destroy_response = match cmd_result {
213 Ok(r) => r,
214 Err(e) => {
215 error!("returning async error response: {}", &e);
216 e.into()
217 }
218 };
219 match self.desc_map.remove(&tag) {
220 Some(destroy_desc) => {
221 responses.push_back((destroy_desc, destroy_response));
222 }
223 None => error!("dropping response for an untracked command: {:?}", tag),
224 }
225 }
226
227 let (cmd_response, event_responses_with_id) = device.process_cmd(cmd, wait_ctx);
229 match cmd_response {
230 VideoCmdResponseType::Sync(r) => {
231 responses.push_back((desc, r));
232 }
233 VideoCmdResponseType::Async(tag) => {
234 self.desc_map.insert(tag, desc);
238 }
239 }
240 if let Some((stream_id, event_responses)) = event_responses_with_id {
241 self.write_event_responses(event_responses, stream_id)?;
242 }
243
244 Ok(responses)
245 }
246
247 fn handle_command_queue(
255 &mut self,
256 device: &mut dyn Device,
257 wait_ctx: &WaitContext<Token>,
258 ) -> Result<()> {
259 while let Some(desc) = self.cmd_queue.pop() {
260 let mut resps = self.handle_command_desc(device, wait_ctx, desc)?;
261 self.write_responses(&mut resps)?;
262 }
263 Ok(())
264 }
265
266 fn handle_event(
275 &mut self,
276 device: &mut dyn Device,
277 stream_id: u32,
278 wait_ctx: &WaitContext<Token>,
279 ) -> Result<()> {
280 if let Some(event_responses) = device.process_event(&mut self.desc_map, stream_id, wait_ctx)
281 {
282 self.write_event_responses(event_responses, stream_id)?;
283 }
284 Ok(())
285 }
286
287 fn handle_buffer_barrier(
295 &mut self,
296 device: &mut dyn Device,
297 stream_id: u32,
298 wait_ctx: &WaitContext<Token>,
299 ) -> Result<()> {
300 if let Some(event_responses) = device.process_buffer_barrier(stream_id, wait_ctx) {
301 self.write_event_responses(event_responses, stream_id)?;
302 }
303 Ok(())
304 }
305
306 pub fn run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()> {
313 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
314 (self.cmd_queue.event(), Token::CmdQueue),
315 (self.event_queue.event(), Token::EventQueue),
316 (kill_evt, Token::Kill),
317 ])
318 .map_err(Error::WaitContextCreationFailed)?;
319
320 loop {
321 let wait_events = wait_ctx.wait().map_err(Error::WaitError)?;
322
323 for wait_event in wait_events.iter().filter(|e| e.is_readable) {
324 match wait_event.token {
325 Token::CmdQueue => {
326 let _ = self.cmd_queue.event().wait();
327 self.handle_command_queue(device.as_mut(), &wait_ctx)?;
328 }
329 Token::EventQueue => {
330 let _ = self.event_queue.event().wait();
331 }
332 Token::Event { id } => {
333 self.handle_event(device.as_mut(), id, &wait_ctx)?;
334 }
335 Token::BufferBarrier { id } => {
336 self.handle_buffer_barrier(device.as_mut(), id, &wait_ctx)?;
337 }
338 Token::Kill => return Ok(()),
339 }
340 }
341 }
342 }
343
344 #[allow(dead_code)]
353 pub async fn run_async(
354 mut self,
355 mut device: Box<dyn Device>,
356 ex: Executor,
357 cmd_evt: Event,
358 event_evt: Event,
359 ) -> Result<()> {
360 let cmd_queue_evt =
361 EventAsync::new(cmd_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
362 let event_queue_evt =
363 EventAsync::new(event_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
364
365 let device_wait_ctx = WaitContext::new().map_err(Error::WaitContextCreationFailed)?;
367 let device_evt = ex
368 .async_from(AsyncWrapper::new(
369 clone_descriptor(&device_wait_ctx).map_err(Error::CloneDescriptorFailed)?,
370 ))
371 .map_err(Error::EventAsyncCreationFailed)?;
372
373 loop {
374 let (
375 cmd_queue_evt,
376 device_evt,
377 _event_queue_evt,
379 ) = select3(
380 cmd_queue_evt.next_val().boxed_local(),
381 device_evt.wait_readable().boxed_local(),
382 event_queue_evt.next_val().boxed_local(),
383 )
384 .await;
385
386 if let SelectResult::Finished(_) = cmd_queue_evt {
387 self.handle_command_queue(device.as_mut(), &device_wait_ctx)?;
388 }
389
390 if let SelectResult::Finished(_) = device_evt {
391 let device_events = match device_wait_ctx.wait_timeout(Duration::from_secs(0)) {
392 Ok(device_events) => device_events,
393 Err(_) => {
394 error!("failed to read a device event");
395 continue;
396 }
397 };
398 for device_event in device_events {
399 if let Token::Event { id } = device_event.token {
401 self.handle_event(device.as_mut(), id, &device_wait_ctx)?;
402 } else {
403 error!(
404 "invalid event is triggered by a device {:?}",
405 device_event.token
406 );
407 }
408 }
409 }
410 }
411 }
412}