base/
periodic_logger.rs

1// Copyright 2024 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// TODO(b/318439696): Remove once it is used
6#![allow(dead_code)]
7
8use std::collections::HashMap;
9use std::fmt::Write;
10use std::sync::atomic::AtomicU32;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13use std::sync::RwLock;
14use std::time::Duration;
15
16use thiserror::Error as ThisError;
17
18use crate::error;
19use crate::Event;
20use crate::EventToken;
21use crate::Timer;
22use crate::TimerTrait;
23use crate::WaitContext;
24use crate::WorkerThread;
25
26/// Utility class that helps count and log high frequency events periodically.
27pub struct PeriodicLogger {
28    // Name that is printed out to differentiate between other `PeriodicLogger`s
29    name: String,
30    // Interval to log
31    interval: Duration,
32    // Map of event counters that are periodically logged
33    counters: Arc<RwLock<HashMap<String, AtomicU32>>>,
34    // The periodic logger thread
35    worker_thread: Option<WorkerThread<()>>,
36}
37
38impl PeriodicLogger {
39    pub fn new(name: String, interval: Duration) -> Self {
40        PeriodicLogger {
41            name,
42            interval,
43            counters: Arc::new(RwLock::new(HashMap::new())),
44            worker_thread: None,
45        }
46    }
47
48    /// Add a new event item to be counted.
49    pub fn add_counter_item(&self, name: String) -> Result<(), PeriodicLoggerError> {
50        // This write lock will likely be acquired infrequently.
51        let mut counters_write_lock = self
52            .counters
53            .write()
54            .map_err(|e| PeriodicLoggerError::WriteLockError(e.to_string()))?;
55
56        if counters_write_lock.contains_key(&name) {
57            return Err(PeriodicLoggerError::CounterAlreadyExist(name));
58        }
59
60        counters_write_lock.insert(name, AtomicU32::new(0));
61        Ok(())
62    }
63
64    /// Increment event counter by an `amount`
65    pub fn increment_counter(&self, name: String, amount: u32) -> Result<(), PeriodicLoggerError> {
66        match self.counters.read() {
67            Ok(counters_map) => {
68                if let Some(atomic_counter) = counters_map.get(&name) {
69                    atomic_counter.fetch_add(amount, Ordering::Relaxed);
70                    Ok(())
71                } else {
72                    Err(PeriodicLoggerError::CounterDoesNotExist(name))
73                }
74            }
75            Err(e) => Err(PeriodicLoggerError::ReadLockError(e.to_string())),
76        }
77    }
78
79    /// Starts a thread that will log the count of events within a `self.interval` time period.
80    /// All counters will be reset to 0 after logging.
81    pub fn start_logging_thread(&mut self) -> Result<(), PeriodicLoggerError> {
82        if self.worker_thread.is_some() {
83            return Err(PeriodicLoggerError::ThreadAlreadyStarted);
84        }
85
86        let cloned_counter = self.counters.clone();
87        let interval_copy = self.interval;
88        let name_copy = self.name.clone();
89        self.worker_thread = Some(WorkerThread::start(
90            format!("PeriodicLogger_{}", self.name),
91            move |kill_evt| {
92                if let Err(e) = Self::run_logger(kill_evt, name_copy, interval_copy, cloned_counter)
93                {
94                    error!("PeriodicLogger worker failed: {e:#}");
95                }
96            },
97        ));
98
99        Ok(())
100    }
101
102    fn run_logger(
103        kill_evt: Event,
104        name_copy: String,
105        interval_copy: Duration,
106        cloned_counter: Arc<RwLock<HashMap<String, AtomicU32>>>,
107    ) -> Result<(), PeriodicLoggerError> {
108        #[derive(EventToken)]
109        enum Token {
110            Exit,
111            PeriodicLog,
112        }
113
114        let mut timer = Timer::new().map_err(PeriodicLoggerError::TimerNewError)?;
115        timer
116            .reset_repeating(interval_copy)
117            .map_err(PeriodicLoggerError::TimerResetError)?;
118
119        let wait_ctx =
120            WaitContext::build_with(&[(&kill_evt, Token::Exit), (&timer, Token::PeriodicLog)])
121                .map_err(PeriodicLoggerError::WaitContextBuildError)?;
122
123        'outer: loop {
124            let events = wait_ctx.wait().expect("wait failed");
125            for event in events.iter().filter(|e| e.is_readable) {
126                match event.token {
127                    Token::Exit => {
128                        break 'outer;
129                    }
130                    Token::PeriodicLog => {
131                        timer.mark_waited().unwrap();
132
133                        let counter_map = cloned_counter
134                            .read()
135                            .map_err(|e| PeriodicLoggerError::ReadLockError(e.to_string()))?;
136
137                        let mut logged_string = format!("{name_copy} {interval_copy:?}:");
138                        for (counter_name, counter_value) in counter_map.iter() {
139                            let value = counter_value.swap(0, Ordering::Relaxed);
140                            let _ = write!(logged_string, "\n    {counter_name}: {value}");
141                        }
142
143                        // Log all counters
144                        crate::info!("{}", logged_string);
145                    }
146                }
147            }
148        }
149        Ok(())
150    }
151}
152
153#[derive(Debug, ThisError, PartialEq)]
154pub enum PeriodicLoggerError {
155    #[error("Periodic logger thread already started.")]
156    ThreadAlreadyStarted,
157    #[error("Failed to acquire write lock: {0}")]
158    WriteLockError(String),
159    #[error("Failed to acquire read lock: {0}")]
160    ReadLockError(String),
161    #[error("Counter already exists: {0}")]
162    CounterAlreadyExist(String),
163    #[error("Counter does not exist: {0}")]
164    CounterDoesNotExist(String),
165    #[error("Failed to build WaitContext: {0}")]
166    WaitContextBuildError(crate::Error),
167    #[error("Failed to wait on WaitContext: {0}")]
168    WaitContextWaitError(crate::Error),
169    #[error("Failed to reset Timer: {0}")]
170    TimerResetError(crate::Error),
171    #[error("Failed initialize Timer: {0}")]
172    TimerNewError(crate::Error),
173}
174
175#[cfg(test)]
176mod tests {
177    use std::thread;
178
179    use super::*;
180
181    #[test]
182    fn periodic_add() {
183        let periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3));
184        periodic_logger
185            .add_counter_item("counter_1".to_string())
186            .unwrap();
187        periodic_logger
188            .increment_counter("counter_1".to_string(), 2)
189            .unwrap();
190        periodic_logger
191            .increment_counter("counter_1".to_string(), 5)
192            .unwrap();
193
194        assert_eq!(periodic_logger.counters.read().unwrap().len(), 1);
195        assert_eq!(
196            periodic_logger
197                .counters
198                .read()
199                .unwrap()
200                .get("counter_1")
201                .unwrap()
202                .load(Ordering::Relaxed),
203            7
204        );
205    }
206
207    #[test]
208    fn worker_thread_cannot_start_twice() {
209        let mut periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3));
210        assert!(periodic_logger.start_logging_thread().is_ok());
211        assert!(periodic_logger.start_logging_thread().is_err());
212    }
213
214    #[test]
215    fn add_same_counter_item_twice_return_err() {
216        let periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3));
217        assert!(periodic_logger
218            .add_counter_item("counter_1".to_string())
219            .is_ok());
220        assert_eq!(
221            periodic_logger.add_counter_item("counter_1".to_string()),
222            Err(PeriodicLoggerError::CounterAlreadyExist(
223                "counter_1".to_string()
224            ))
225        );
226    }
227
228    /// Ignored because this is intended to be ran locally
229    #[ignore]
230    #[test]
231    fn periodic_logger_smoke_test() {
232        let mut periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3));
233        periodic_logger
234            .add_counter_item("counter_1".to_string())
235            .unwrap();
236
237        periodic_logger.start_logging_thread().unwrap();
238        periodic_logger
239            .increment_counter("counter_1".to_string(), 5)
240            .unwrap();
241
242        thread::sleep(Duration::from_secs(5));
243    }
244}