1#![allow(dead_code)]
7
8use std::collections::HashMap;
9use std::fmt::Write;
10use std::sync::atomic::AtomicU32;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13use std::sync::RwLock;
14use std::time::Duration;
15
16use thiserror::Error as ThisError;
17
18use crate::error;
19use crate::Event;
20use crate::EventToken;
21use crate::Timer;
22use crate::TimerTrait;
23use crate::WaitContext;
24use crate::WorkerThread;
25
26pub struct PeriodicLogger {
28 name: String,
30 interval: Duration,
32 counters: Arc<RwLock<HashMap<String, AtomicU32>>>,
34 worker_thread: Option<WorkerThread<()>>,
36}
37
38impl PeriodicLogger {
39 pub fn new(name: String, interval: Duration) -> Self {
40 PeriodicLogger {
41 name,
42 interval,
43 counters: Arc::new(RwLock::new(HashMap::new())),
44 worker_thread: None,
45 }
46 }
47
48 pub fn add_counter_item(&self, name: String) -> Result<(), PeriodicLoggerError> {
50 let mut counters_write_lock = self
52 .counters
53 .write()
54 .map_err(|e| PeriodicLoggerError::WriteLockError(e.to_string()))?;
55
56 if counters_write_lock.contains_key(&name) {
57 return Err(PeriodicLoggerError::CounterAlreadyExist(name));
58 }
59
60 counters_write_lock.insert(name, AtomicU32::new(0));
61 Ok(())
62 }
63
64 pub fn increment_counter(&self, name: String, amount: u32) -> Result<(), PeriodicLoggerError> {
66 match self.counters.read() {
67 Ok(counters_map) => {
68 if let Some(atomic_counter) = counters_map.get(&name) {
69 atomic_counter.fetch_add(amount, Ordering::Relaxed);
70 Ok(())
71 } else {
72 Err(PeriodicLoggerError::CounterDoesNotExist(name))
73 }
74 }
75 Err(e) => Err(PeriodicLoggerError::ReadLockError(e.to_string())),
76 }
77 }
78
79 pub fn start_logging_thread(&mut self) -> Result<(), PeriodicLoggerError> {
82 if self.worker_thread.is_some() {
83 return Err(PeriodicLoggerError::ThreadAlreadyStarted);
84 }
85
86 let cloned_counter = self.counters.clone();
87 let interval_copy = self.interval;
88 let name_copy = self.name.clone();
89 self.worker_thread = Some(WorkerThread::start(
90 format!("PeriodicLogger_{}", self.name),
91 move |kill_evt| {
92 if let Err(e) = Self::run_logger(kill_evt, name_copy, interval_copy, cloned_counter)
93 {
94 error!("PeriodicLogger worker failed: {e:#}");
95 }
96 },
97 ));
98
99 Ok(())
100 }
101
102 fn run_logger(
103 kill_evt: Event,
104 name_copy: String,
105 interval_copy: Duration,
106 cloned_counter: Arc<RwLock<HashMap<String, AtomicU32>>>,
107 ) -> Result<(), PeriodicLoggerError> {
108 #[derive(EventToken)]
109 enum Token {
110 Exit,
111 PeriodicLog,
112 }
113
114 let mut timer = Timer::new().map_err(PeriodicLoggerError::TimerNewError)?;
115 timer
116 .reset_repeating(interval_copy)
117 .map_err(PeriodicLoggerError::TimerResetError)?;
118
119 let wait_ctx =
120 WaitContext::build_with(&[(&kill_evt, Token::Exit), (&timer, Token::PeriodicLog)])
121 .map_err(PeriodicLoggerError::WaitContextBuildError)?;
122
123 'outer: loop {
124 let events = wait_ctx.wait().expect("wait failed");
125 for event in events.iter().filter(|e| e.is_readable) {
126 match event.token {
127 Token::Exit => {
128 break 'outer;
129 }
130 Token::PeriodicLog => {
131 timer.mark_waited().unwrap();
132
133 let counter_map = cloned_counter
134 .read()
135 .map_err(|e| PeriodicLoggerError::ReadLockError(e.to_string()))?;
136
137 let mut logged_string = format!("{name_copy} {interval_copy:?}:");
138 for (counter_name, counter_value) in counter_map.iter() {
139 let value = counter_value.swap(0, Ordering::Relaxed);
140 let _ = write!(logged_string, "\n {counter_name}: {value}");
141 }
142
143 crate::info!("{}", logged_string);
145 }
146 }
147 }
148 }
149 Ok(())
150 }
151}
152
153#[derive(Debug, ThisError, PartialEq)]
154pub enum PeriodicLoggerError {
155 #[error("Periodic logger thread already started.")]
156 ThreadAlreadyStarted,
157 #[error("Failed to acquire write lock: {0}")]
158 WriteLockError(String),
159 #[error("Failed to acquire read lock: {0}")]
160 ReadLockError(String),
161 #[error("Counter already exists: {0}")]
162 CounterAlreadyExist(String),
163 #[error("Counter does not exist: {0}")]
164 CounterDoesNotExist(String),
165 #[error("Failed to build WaitContext: {0}")]
166 WaitContextBuildError(crate::Error),
167 #[error("Failed to wait on WaitContext: {0}")]
168 WaitContextWaitError(crate::Error),
169 #[error("Failed to reset Timer: {0}")]
170 TimerResetError(crate::Error),
171 #[error("Failed initialize Timer: {0}")]
172 TimerNewError(crate::Error),
173}
174
175#[cfg(test)]
176mod tests {
177 use std::thread;
178
179 use super::*;
180
181 #[test]
182 fn periodic_add() {
183 let periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3));
184 periodic_logger
185 .add_counter_item("counter_1".to_string())
186 .unwrap();
187 periodic_logger
188 .increment_counter("counter_1".to_string(), 2)
189 .unwrap();
190 periodic_logger
191 .increment_counter("counter_1".to_string(), 5)
192 .unwrap();
193
194 assert_eq!(periodic_logger.counters.read().unwrap().len(), 1);
195 assert_eq!(
196 periodic_logger
197 .counters
198 .read()
199 .unwrap()
200 .get("counter_1")
201 .unwrap()
202 .load(Ordering::Relaxed),
203 7
204 );
205 }
206
207 #[test]
208 fn worker_thread_cannot_start_twice() {
209 let mut periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3));
210 assert!(periodic_logger.start_logging_thread().is_ok());
211 assert!(periodic_logger.start_logging_thread().is_err());
212 }
213
214 #[test]
215 fn add_same_counter_item_twice_return_err() {
216 let periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3));
217 assert!(periodic_logger
218 .add_counter_item("counter_1".to_string())
219 .is_ok());
220 assert_eq!(
221 periodic_logger.add_counter_item("counter_1".to_string()),
222 Err(PeriodicLoggerError::CounterAlreadyExist(
223 "counter_1".to_string()
224 ))
225 );
226 }
227
228 #[ignore]
230 #[test]
231 fn periodic_logger_smoke_test() {
232 let mut periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3));
233 periodic_logger
234 .add_counter_item("counter_1".to_string())
235 .unwrap();
236
237 periodic_logger.start_logging_thread().unwrap();
238 periodic_logger
239 .increment_counter("counter_1".to_string(), 5)
240 .unwrap();
241
242 thread::sleep(Duration::from_secs(5));
243 }
244}