1#![deny(missing_docs)]
6
7use std::fs::File;
8use std::io::Seek;
9use std::io::SeekFrom;
10use std::time::Duration;
11
12use anyhow::Context;
13use anyhow::Result;
14use base::Descriptor;
15use base::Event;
16use base::EventToken;
17use base::Timer;
18use base::TimerTrait;
19use base::WaitContext;
20use base::WorkerThread;
21
22pub struct FileTruncator {
28 worker: Option<WorkerThread<Result<File>>>,
29}
30
31const TRUNCATE_STEP_BYTES: u64 = 64 * 1024 * 1024; const TRUNCATE_INTERVAL: Duration = Duration::from_secs(5);
35
36fn truncate_worker(
37 mut timer: Box<dyn TimerTrait>,
38 mut file: File,
39 kill_evt: Event,
40) -> Result<File> {
41 #[derive(EventToken)]
42 enum Token {
43 Alarm,
44 Kill,
45 }
46
47 let mut len = file
48 .seek(SeekFrom::End(0))
49 .context("Failed to determine size")?;
50
51 let descriptor = Descriptor(timer.as_raw_descriptor());
52 let wait_ctx: WaitContext<Token> =
53 WaitContext::build_with(&[(&descriptor, Token::Alarm), (&kill_evt, Token::Kill)])
54 .context("worker context failed")?;
55
56 while len > 0 {
57 let events = wait_ctx.wait().context("wait failed")?;
58 for event in events.iter().filter(|e| e.is_readable) {
59 match event.token {
60 Token::Alarm => {
61 let _ = timer.mark_waited().context("failed to reset timer")?;
62 len = len.saturating_sub(TRUNCATE_STEP_BYTES);
63 file.set_len(len).context("failed to truncate file")?;
64 }
65 Token::Kill => {
66 file.set_len(0).context("failed to clear file")?;
67 return Ok(file);
68 }
69 }
70 }
71 }
72 Ok(file)
73}
74
75impl FileTruncator {
76 pub fn new(file: File) -> Result<Self> {
82 let timer = Timer::new().context("failed to create truncate timer")?;
83 Self::new_inner(Box::new(timer), file)
84 }
85
86 fn new_inner(mut timer: Box<dyn TimerTrait>, file: File) -> Result<Self> {
87 timer
88 .reset_repeating(TRUNCATE_INTERVAL)
89 .context("failed to arm timer")?;
90 Ok(Self {
91 worker: Some(WorkerThread::start(
92 "truncate_worker",
93 move |kill_evt| -> Result<File> { truncate_worker(timer, file, kill_evt) },
94 )),
95 })
96 }
97
98 pub fn take_file(mut self) -> Result<File> {
103 let file = self
104 .worker
105 .take()
106 .context("missing worker")?
107 .stop()
108 .context("worker failure")?;
109 Ok(file)
110 }
111}
112
113impl Drop for FileTruncator {
114 fn drop(&mut self) {
115 if let Some(worker) = self.worker.take() {
116 let _ = worker.stop();
117 }
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use std::sync::Arc;
124
125 use base::FakeClock;
126 use base::FakeTimer;
127 use sync::Mutex;
128
129 use super::*;
130
131 fn wait_for_target_length(file: &mut File, len: u64) {
132 let mut count = 0;
133 while file.seek(SeekFrom::End(0)).unwrap() != len && count < 100 {
134 std::thread::sleep(Duration::from_millis(1));
135 count += 1;
136 }
137 assert_eq!(file.seek(SeekFrom::End(0)).unwrap(), len);
138 }
139
140 #[test]
141 fn test_full_truncate() {
142 let mut file = tempfile::tempfile().unwrap();
143 let clock = Arc::new(Mutex::new(FakeClock::new()));
144 let timer = Box::new(FakeTimer::new(clock.clone()));
145
146 file.set_len(2 * TRUNCATE_STEP_BYTES).unwrap();
147
148 let worker = FileTruncator::new_inner(timer, file.try_clone().unwrap()).unwrap();
149 clock.lock().add_ns(TRUNCATE_INTERVAL.as_nanos() as u64);
150 wait_for_target_length(&mut file, TRUNCATE_STEP_BYTES);
151 clock.lock().add_ns(TRUNCATE_INTERVAL.as_nanos() as u64);
152 wait_for_target_length(&mut file, 0);
153
154 let _ = worker.take_file().unwrap();
155 }
156
157 #[test]
158 fn test_early_exit() {
159 let mut file = tempfile::tempfile().unwrap();
160 let clock = Arc::new(Mutex::new(FakeClock::new()));
161 let timer = Box::new(FakeTimer::new(clock));
162
163 file.set_len(2 * TRUNCATE_STEP_BYTES).unwrap();
164
165 let worker = FileTruncator::new_inner(timer, file.try_clone().unwrap()).unwrap();
166
167 let _ = worker.take_file().unwrap();
168 assert_eq!(file.seek(SeekFrom::End(0)).unwrap(), 0);
169 }
170}