fuse/
worker.rs

1// Copyright 2020 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::fs::File;
6use std::io;
7use std::io::BufRead;
8use std::io::BufReader;
9use std::io::Cursor;
10use std::io::Read;
11use std::io::Write;
12use std::mem::size_of;
13use std::os::unix::fs::FileExt;
14use std::os::unix::io::AsRawFd;
15use std::sync::Arc;
16
17use base::Protection;
18
19use crate::filesystem::FileSystem;
20use crate::filesystem::ZeroCopyReader;
21use crate::filesystem::ZeroCopyWriter;
22use crate::server::Mapper;
23use crate::server::Reader;
24use crate::server::Server;
25use crate::server::Writer;
26use crate::sys;
27use crate::Error;
28use crate::Result;
29
30struct DevFuseReader {
31    // File representing /dev/fuse for reading, with sufficient buffer to accommodate a FUSE read
32    // transaction.
33    reader: BufReader<File>,
34}
35
36impl DevFuseReader {
37    pub fn new(reader: BufReader<File>) -> Self {
38        DevFuseReader { reader }
39    }
40
41    fn drain(&mut self) {
42        self.reader.consume(self.reader.buffer().len());
43    }
44}
45
46impl Read for DevFuseReader {
47    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
48        self.reader.read(buf)
49    }
50}
51
52impl Reader for DevFuseReader {}
53
54impl ZeroCopyReader for DevFuseReader {
55    fn read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
56        let buf = self.reader.fill_buf()?;
57        let end = std::cmp::min(count, buf.len());
58        let written = f.write_at(&buf[..end], off)?;
59        self.reader.consume(written);
60        Ok(written)
61    }
62}
63
64struct DevFuseWriter {
65    // File representing /dev/fuse for writing.
66    dev_fuse: File,
67
68    // An internal buffer to allow generating data and header out of order, such that they can be
69    // flushed at once. This is wrapped by a cursor for tracking the current written position.
70    write_buf: Cursor<Vec<u8>>,
71}
72
73impl DevFuseWriter {
74    pub fn new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self {
75        debug_assert_eq!(write_buf.position(), 0);
76
77        DevFuseWriter {
78            dev_fuse,
79            write_buf,
80        }
81    }
82}
83
84impl Write for DevFuseWriter {
85    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
86        self.write_buf.write(buf)
87    }
88
89    fn flush(&mut self) -> io::Result<()> {
90        self.dev_fuse.write_all(&self.write_buf.get_ref()[..])?;
91        self.write_buf.set_position(0);
92        self.write_buf.get_mut().clear();
93        Ok(())
94    }
95}
96
97impl Writer for DevFuseWriter {
98    type ClosureWriter = Self;
99
100    fn write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize>
101    where
102        F: Fn(&mut Self) -> io::Result<usize>,
103    {
104        // Restore the cursor for idempotent.
105        let original = self.write_buf.position();
106        self.write_buf.set_position(offset as u64);
107        let r = f(self);
108        self.write_buf.set_position(original);
109        r
110    }
111
112    fn has_sufficient_buffer(&self, size: u32) -> bool {
113        (self.write_buf.position() as usize + size as usize) < self.write_buf.get_ref().capacity()
114    }
115}
116
117impl ZeroCopyWriter for DevFuseWriter {
118    fn write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
119        let pos = self.write_buf.position() as usize;
120        let end = pos + count;
121        let buf = self.write_buf.get_mut();
122
123        let old_end = buf.len();
124        buf.resize(end, 0);
125        let read = f.read_at(&mut buf[pos..end], off)?;
126
127        let new_end = pos + read;
128        debug_assert!(new_end >= old_end);
129        buf.truncate(new_end);
130        self.write_buf.set_position(new_end as u64);
131        Ok(read)
132    }
133}
134
135struct DevFuseMapper;
136
137impl DevFuseMapper {
138    fn new() -> Self {
139        Self {}
140    }
141}
142
143impl Mapper for DevFuseMapper {
144    fn map(
145        &self,
146        _mem_offset: u64,
147        _size: usize,
148        _fd: &dyn AsRawFd,
149        _file_offset: u64,
150        _prot: Protection,
151    ) -> io::Result<()> {
152        Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
153    }
154
155    fn unmap(&self, _offset: u64, _size: u64) -> io::Result<()> {
156        Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
157    }
158}
159
160/// Start the FUSE message handling loop. Returns when an error happens.
161///
162/// # Arguments
163///
164/// * `dev_fuse` - A `File` object of /dev/fuse
165/// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
166/// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse. Must be large
167///   enough (usually equal) to `n` in `MountOption::MaxRead(n)`.
168///
169/// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
170pub fn start_message_loop<F: FileSystem + Sync>(
171    dev_fuse: File,
172    input_buffer_size: u32,
173    output_buffer_size: u32,
174    fs: F,
175) -> Result<()> {
176    let server = Server::new(fs);
177    do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
178}
179
180fn do_start_message_loop<F: FileSystem + Sync>(
181    dev_fuse: File,
182    input_buffer_size: u32,
183    output_buffer_size: u32,
184    server: &Server<F>,
185) -> Result<()> {
186    let mut dev_fuse_reader = {
187        let rfile = dev_fuse.try_clone().map_err(Error::EndpointSetup)?;
188        let buf_reader = BufReader::with_capacity(
189            input_buffer_size as usize + size_of::<sys::InHeader>() + size_of::<sys::WriteIn>(),
190            rfile,
191        );
192        DevFuseReader::new(buf_reader)
193    };
194    let mut dev_fuse_writer = {
195        let wfile = dev_fuse;
196        let write_buf = Cursor::new(Vec::with_capacity(output_buffer_size as usize));
197        DevFuseWriter::new(wfile, write_buf)
198    };
199    let dev_fuse_mapper = DevFuseMapper::new();
200    loop {
201        server.handle_message(&mut dev_fuse_reader, &mut dev_fuse_writer, &dev_fuse_mapper)?;
202
203        // Since we're reusing the buffer to avoid repeated allocation, drain the possible
204        // residual from the buffer.
205        dev_fuse_reader.drain();
206    }
207}
208
209// TODO: Remove worker and this namespace from public
210pub mod internal {
211    use crossbeam_utils::thread;
212
213    use super::*;
214
215    /// Start the FUSE message handling loops in multiple threads. Returns when an error happens.
216    ///
217    /// # Arguments
218    ///
219    /// * `dev_fuse` - A `File` object of /dev/fuse
220    /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
221    /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse.
222    ///
223    /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
224    pub fn start_message_loop_mt<F: FileSystem + Sync + Send>(
225        dev_fuse: File,
226        input_buffer_size: u32,
227        output_buffer_size: u32,
228        thread_numbers: usize,
229        fs: F,
230    ) -> Result<()> {
231        let result = thread::scope(|s| {
232            let server = Arc::new(Server::new(fs));
233            for _ in 0..thread_numbers {
234                let dev_fuse = dev_fuse
235                    .try_clone()
236                    .map_err(Error::EndpointSetup)
237                    .expect("Failed to clone /dev/fuse FD");
238                let server = server.clone();
239                s.spawn(move |_| {
240                    do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
241                });
242            }
243        });
244
245        unreachable!("Threads exited or crashed unexpectedly: {:?}", result);
246    }
247}