1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright 2021 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use std::sync::Arc;

use anyhow::bail;
use anyhow::Context;
use base::info;
use base::warn;
#[cfg(windows)]
use base::CloseNotifier;
use base::Event;
use base::EventToken;
use base::EventType;
use base::ReadNotifier;
use base::WaitContext;
use sync::Mutex;
use vmm_vhost::BackendClient;
use vmm_vhost::Error as VhostError;

use crate::virtio::vhost_user_frontend::handler::BackendReqHandler;
use crate::virtio::Interrupt;
use crate::virtio::VIRTIO_MSI_NO_VECTOR;

pub struct Worker {
    pub kill_evt: Event,
    pub non_msix_evt: Event,
    pub backend_req_handler: Option<BackendReqHandler>,
    pub backend_client: Arc<Mutex<BackendClient>>,
}

impl Worker {
    pub fn run(&mut self, interrupt: Interrupt) -> anyhow::Result<()> {
        #[derive(EventToken)]
        enum Token {
            Kill,
            NonMsixEvt,
            Resample,
            ReqHandlerRead,
            #[cfg(target_os = "windows")]
            ReqHandlerClose,
            // monitor whether backend_client_fd is broken
            BackendCloseNotify,
        }
        let wait_ctx = WaitContext::build_with(&[
            (&self.non_msix_evt, Token::NonMsixEvt),
            (&self.kill_evt, Token::Kill),
        ])
        .context("failed to build WaitContext")?;

        if let Some(resample_evt) = interrupt.get_resample_evt() {
            wait_ctx
                .add(resample_evt, Token::Resample)
                .context("failed to add resample event to WaitContext")?;
        }

        if let Some(backend_req_handler) = self.backend_req_handler.as_mut() {
            wait_ctx
                .add(
                    backend_req_handler.get_read_notifier(),
                    Token::ReqHandlerRead,
                )
                .context("failed to add backend req handler to WaitContext")?;

            #[cfg(target_os = "windows")]
            wait_ctx
                .add(
                    backend_req_handler.get_close_notifier(),
                    Token::ReqHandlerClose,
                )
                .context("failed to add backend req handler close notifier to WaitContext")?;
        }

        #[cfg(any(target_os = "android", target_os = "linux"))]
        wait_ctx
            .add_for_event(
                self.backend_client.lock().get_read_notifier(),
                EventType::None,
                Token::BackendCloseNotify,
            )
            .context("failed to add backend client close notifier to WaitContext")?;
        #[cfg(target_os = "windows")]
        wait_ctx
            .add(
                self.backend_client.lock().get_close_notifier(),
                Token::BackendCloseNotify,
            )
            .context("failed to add backend client close notifier to WaitContext")?;

        'wait: loop {
            let events = wait_ctx.wait().context("WaitContext::wait() failed")?;
            for event in events {
                match event.token {
                    Token::Kill => {
                        break 'wait;
                    }
                    Token::NonMsixEvt => {
                        // The vhost-user protocol allows the backend to signal events, but for
                        // non-MSI-X devices, a device must also update the interrupt status mask.
                        // `non_msix_evt` proxies events from the vhost-user backend to update the
                        // status mask.
                        let _ = self.non_msix_evt.wait();

                        // The parameter vector of signal_used_queue is used only when msix is
                        // enabled.
                        interrupt.signal_used_queue(VIRTIO_MSI_NO_VECTOR);
                    }
                    Token::Resample => {
                        interrupt.interrupt_resample();
                    }
                    Token::ReqHandlerRead => {
                        let Some(backend_req_handler) = self.backend_req_handler.as_mut() else {
                            continue;
                        };

                        match backend_req_handler.handle_request() {
                            Ok(_) => (),
                            Err(VhostError::ClientExit) | Err(VhostError::Disconnect) => {
                                info!("backend req handler connection closed");
                                // Stop monitoring `backend_req_handler` as the client closed
                                // the connection.
                                let _ = wait_ctx.delete(backend_req_handler.get_read_notifier());
                                #[cfg(target_os = "windows")]
                                let _ = wait_ctx.delete(backend_req_handler.get_close_notifier());
                                self.backend_req_handler = None;
                            }
                            Err(e) => return Err(e).context("failed to handle vhost-user request"),
                        }
                    }
                    #[cfg(target_os = "windows")]
                    Token::ReqHandlerClose => {
                        let Some(backend_req_handler) = self.backend_req_handler.as_mut() else {
                            continue;
                        };

                        info!("backend req handler connection closed");
                        let _ = wait_ctx.delete(backend_req_handler.get_read_notifier());
                        let _ = wait_ctx.delete(backend_req_handler.get_close_notifier());
                        self.backend_req_handler = None;
                    }
                    Token::BackendCloseNotify => {
                        // For linux domain socket, the close notifier fd is same with read/write
                        // notifier We need check whether the event is caused by socket broken.
                        #[cfg(any(target_os = "android", target_os = "linux"))]
                        if !event.is_hungup {
                            warn!("event besides hungup should not be notified");
                            continue;
                        }
                        bail!("Backend device disconnected early");
                    }
                }
            }
        }

        Ok(())
    }
}