1use std::fs::File;
6use std::io;
7use std::io::BufRead;
8use std::io::BufReader;
9use std::io::Cursor;
10use std::io::Read;
11use std::io::Write;
12use std::mem::size_of;
13use std::os::unix::fs::FileExt;
14use std::os::unix::io::AsRawFd;
15use std::sync::Arc;
16
17use base::Protection;
18
19use crate::filesystem::FileSystem;
20use crate::filesystem::ZeroCopyReader;
21use crate::filesystem::ZeroCopyWriter;
22use crate::server::Mapper;
23use crate::server::Reader;
24use crate::server::Server;
25use crate::server::Writer;
26use crate::sys;
27use crate::Error;
28use crate::Result;
29
30struct DevFuseReader {
31 reader: BufReader<File>,
34}
35
36impl DevFuseReader {
37 pub fn new(reader: BufReader<File>) -> Self {
38 DevFuseReader { reader }
39 }
40
41 fn drain(&mut self) {
42 self.reader.consume(self.reader.buffer().len());
43 }
44}
45
46impl Read for DevFuseReader {
47 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
48 self.reader.read(buf)
49 }
50}
51
52impl Reader for DevFuseReader {}
53
54impl ZeroCopyReader for DevFuseReader {
55 fn read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
56 let buf = self.reader.fill_buf()?;
57 let end = std::cmp::min(count, buf.len());
58 let written = f.write_at(&buf[..end], off)?;
59 self.reader.consume(written);
60 Ok(written)
61 }
62}
63
64struct DevFuseWriter {
65 dev_fuse: File,
67
68 write_buf: Cursor<Vec<u8>>,
71}
72
73impl DevFuseWriter {
74 pub fn new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self {
75 debug_assert_eq!(write_buf.position(), 0);
76
77 DevFuseWriter {
78 dev_fuse,
79 write_buf,
80 }
81 }
82}
83
84impl Write for DevFuseWriter {
85 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
86 self.write_buf.write(buf)
87 }
88
89 fn flush(&mut self) -> io::Result<()> {
90 self.dev_fuse.write_all(&self.write_buf.get_ref()[..])?;
91 self.write_buf.set_position(0);
92 self.write_buf.get_mut().clear();
93 Ok(())
94 }
95}
96
97impl Writer for DevFuseWriter {
98 type ClosureWriter = Self;
99
100 fn write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize>
101 where
102 F: Fn(&mut Self) -> io::Result<usize>,
103 {
104 let original = self.write_buf.position();
106 self.write_buf.set_position(offset as u64);
107 let r = f(self);
108 self.write_buf.set_position(original);
109 r
110 }
111
112 fn has_sufficient_buffer(&self, size: u32) -> bool {
113 (self.write_buf.position() as usize + size as usize) < self.write_buf.get_ref().capacity()
114 }
115}
116
117impl ZeroCopyWriter for DevFuseWriter {
118 fn write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
119 let pos = self.write_buf.position() as usize;
120 let end = pos + count;
121 let buf = self.write_buf.get_mut();
122
123 let old_end = buf.len();
124 buf.resize(end, 0);
125 let read = f.read_at(&mut buf[pos..end], off)?;
126
127 let new_end = pos + read;
128 debug_assert!(new_end >= old_end);
129 buf.truncate(new_end);
130 self.write_buf.set_position(new_end as u64);
131 Ok(read)
132 }
133}
134
135struct DevFuseMapper;
136
137impl DevFuseMapper {
138 fn new() -> Self {
139 Self {}
140 }
141}
142
143impl Mapper for DevFuseMapper {
144 fn map(
145 &self,
146 _mem_offset: u64,
147 _size: usize,
148 _fd: &dyn AsRawFd,
149 _file_offset: u64,
150 _prot: Protection,
151 ) -> io::Result<()> {
152 Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
153 }
154
155 fn unmap(&self, _offset: u64, _size: u64) -> io::Result<()> {
156 Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
157 }
158}
159
160pub fn start_message_loop<F: FileSystem + Sync>(
171 dev_fuse: File,
172 input_buffer_size: u32,
173 output_buffer_size: u32,
174 fs: F,
175) -> Result<()> {
176 let server = Server::new(fs);
177 do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
178}
179
180fn do_start_message_loop<F: FileSystem + Sync>(
181 dev_fuse: File,
182 input_buffer_size: u32,
183 output_buffer_size: u32,
184 server: &Server<F>,
185) -> Result<()> {
186 let mut dev_fuse_reader = {
187 let rfile = dev_fuse.try_clone().map_err(Error::EndpointSetup)?;
188 let buf_reader = BufReader::with_capacity(
189 input_buffer_size as usize + size_of::<sys::InHeader>() + size_of::<sys::WriteIn>(),
190 rfile,
191 );
192 DevFuseReader::new(buf_reader)
193 };
194 let mut dev_fuse_writer = {
195 let wfile = dev_fuse;
196 let write_buf = Cursor::new(Vec::with_capacity(output_buffer_size as usize));
197 DevFuseWriter::new(wfile, write_buf)
198 };
199 let dev_fuse_mapper = DevFuseMapper::new();
200 loop {
201 server.handle_message(&mut dev_fuse_reader, &mut dev_fuse_writer, &dev_fuse_mapper)?;
202
203 dev_fuse_reader.drain();
206 }
207}
208
209pub mod internal {
211 use crossbeam_utils::thread;
212
213 use super::*;
214
215 pub fn start_message_loop_mt<F: FileSystem + Sync + Send>(
225 dev_fuse: File,
226 input_buffer_size: u32,
227 output_buffer_size: u32,
228 thread_numbers: usize,
229 fs: F,
230 ) -> Result<()> {
231 let result = thread::scope(|s| {
232 let server = Arc::new(Server::new(fs));
233 for _ in 0..thread_numbers {
234 let dev_fuse = dev_fuse
235 .try_clone()
236 .map_err(Error::EndpointSetup)
237 .expect("Failed to clone /dev/fuse FD");
238 let server = server.clone();
239 s.spawn(move |_| {
240 do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
241 });
242 }
243 });
244
245 unreachable!("Threads exited or crashed unexpectedly: {:?}", result);
246 }
247}