1use std::fmt;
10use std::fmt::Debug;
11use std::ops::Add;
12use std::ops::Div;
13use std::ops::Range;
14use std::ops::Sub;
15use std::sync::Arc;
16use std::time::Instant;
17
18use anyhow::anyhow;
19use anyhow::Result;
20use base::info;
21use sync::Mutex;
22
23pub trait Limits {
24 fn absolute_min() -> Self;
25 fn absolute_max() -> Self;
26}
27
28impl Limits for u64 {
29 fn absolute_min() -> Self {
30 u64::MIN
31 }
32
33 fn absolute_max() -> Self {
34 u64::MAX
35 }
36}
37
38pub trait SummaryStats<T> {
40 fn count(&self) -> u64;
42
43 fn sum(&self) -> Option<T>;
46
47 fn min(&self) -> Option<T>;
50
51 fn max(&self) -> Option<T>;
54
55 fn average(&self) -> Option<T>;
58}
59
60pub trait NumberType:
61 Limits + Div<u64, Output = Self> + Add<Output = Self> + Clone + Ord + PartialOrd + Debug + Sub<Self>
62{
63 fn as_f64(&self) -> f64;
64}
65
66impl NumberType for u64 {
67 fn as_f64(&self) -> f64 {
68 *self as f64
69 }
70}
71
72#[derive(Eq, PartialEq)]
76pub struct SimpleStat<T: NumberType> {
77 count: u64,
78 sum: T,
79 min: T,
80 max: T,
81}
82
83pub trait Details<T: NumberType>: Debug {
87 fn value(&self) -> T;
89}
90
91impl<T: NumberType> Details<T> for T {
92 fn value(&self) -> T {
93 self.clone()
94 }
95}
96
97impl Details<u64> for Range<u64> {
98 fn value(&self) -> u64 {
99 self.end - self.start
100 }
101}
102
103impl<T: NumberType> Debug for SimpleStat<T> {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 if self.count == 0 {
106 f.debug_struct("SimpleStat")
107 .field("count", &self.count)
108 .finish()
109 } else {
110 f.debug_struct("SimpleStat")
111 .field("count", &self.count)
112 .field("sum", &self.sum)
113 .field("min", &self.min)
114 .field("max", &self.max)
115 .field("average", &self.average().unwrap())
116 .finish()
117 }
118 }
119}
120
121impl<T: NumberType> SimpleStat<T> {
122 pub fn add(&mut self, value: T) {
123 self.count += 1;
124 self.sum = self.sum.clone() + value.clone();
125 if self.max < value {
126 self.max = value.clone();
127 }
128 if self.min > value {
129 self.min = value;
130 }
131 }
132}
133
134impl<T: NumberType> Default for SimpleStat<T> {
135 fn default() -> Self {
136 Self {
137 count: 0,
138 sum: T::absolute_min(),
139 min: T::absolute_max(),
140 max: T::absolute_min(),
141 }
142 }
143}
144
145impl<T: NumberType> SummaryStats<T> for SimpleStat<T> {
146 fn count(&self) -> u64 {
147 self.count
148 }
149
150 fn sum(&self) -> Option<T> {
151 if self.count == 0 {
152 return None;
153 }
154 Some(self.sum.clone())
155 }
156
157 fn min(&self) -> Option<T> {
158 if self.count == 0 {
159 return None;
160 }
161 Some(self.min.clone())
162 }
163
164 fn max(&self) -> Option<T> {
165 if self.count == 0 {
166 return None;
167 }
168 Some(self.max.clone())
169 }
170
171 fn average(&self) -> Option<T> {
172 if self.count == 0 {
173 return None;
174 }
175 Some(self.sum.clone() / self.count)
176 }
177}
178
179fn median<T: NumberType, D: Details<T>>(values: &[D]) -> T {
182 let mut sorted: Vec<T> = values.iter().map(|v| v.value()).collect();
183 sorted.sort();
184 sorted.get(sorted.len() / 2).unwrap().clone()
185}
186
187fn stddev<T: NumberType, D: Details<T>>(values: &[D], simple_stat: &SimpleStat<T>) -> f64 {
189 let avg = simple_stat.sum().unwrap().as_f64() / simple_stat.count() as f64;
190 (values
191 .iter()
192 .map(|value| {
193 let diff = avg - (value.value().as_f64());
194 diff * diff
195 })
196 .sum::<f64>()
197 / simple_stat.count as f64)
198 .sqrt()
199}
200
201#[derive(Debug)]
203struct Bucket<T: NumberType> {
204 simple_stat: SimpleStat<T>,
205 range: Range<T>,
206}
207
208impl<T: NumberType> Bucket<T> {
209 fn new(range: Range<T>) -> Self {
210 Self {
211 simple_stat: SimpleStat::default(),
212 range,
213 }
214 }
215
216 fn add(&mut self, value: T) {
217 self.simple_stat.add(value);
218 }
219}
220
221pub struct DetailedHistogram<T: NumberType, D: Details<T>> {
224 buckets: Vec<Bucket<T>>,
225 values: Option<Vec<D>>,
226}
227
228impl<T: NumberType, D: Details<T>> Debug for DetailedHistogram<T, D> {
229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 let mut dbg = f.debug_struct("DetailedHistogram");
231 let simple_stat = self.simple_stat();
232 dbg.field("simple_stats", &simple_stat);
233 if simple_stat.count > 0 {
234 if let Some(values) = &self.values {
235 dbg.field("median", &median(values));
236 dbg.field("std_dev", &stddev(values, &simple_stat));
237 dbg.field("values", values);
238 }
239 }
240 dbg.field("buckets", &self.buckets);
241 dbg.finish()
242 }
243}
244
245impl<T: NumberType, D: Details<T>> DetailedHistogram<T, D> {
246 fn new_internal(ranges: &[Range<T>], details: bool) -> Result<Self> {
247 let mut last = T::absolute_min();
248 let mut buckets = vec![];
249 for r in ranges {
250 if r.start > r.end {
251 return Err(anyhow!("invalid range {:?}", r));
252 }
253
254 if r.start < last {
255 return Err(anyhow!("Ranges overlap {:?} ", r));
256 }
257 last = r.end.clone();
258 buckets.push(Bucket::new(r.clone()));
259 }
260 let values = if details { Some(vec![]) } else { None };
261
262 Ok(Self { buckets, values })
263 }
264
265 pub fn new(ranges: &[Range<T>]) -> Result<Self> {
267 Self::new_internal(ranges, false)
268 }
269
270 #[cfg(feature = "experimental")]
274 pub fn new_with_details(ranges: &[Range<T>]) -> Result<Self> {
275 Self::new_internal(ranges, true)
276 }
277
278 pub fn add(&mut self, value: D) -> Result<()> {
280 for b in &mut self.buckets {
281 if value.value() >= b.range.start && value.value() < b.range.end {
282 b.add(value.value());
283 if let Some(values) = &mut self.values {
284 values.push(value);
285 }
286 return Ok(());
287 }
288 }
289 Err(anyhow!(
290 "value does not fit in any buckets: {:?}",
291 value.value()
292 ))
293 }
294
295 pub fn simple_stat(&self) -> SimpleStat<T> {
297 let count = self.count();
298 if count == 0 {
299 SimpleStat::default()
300 } else {
301 SimpleStat {
302 count: self.count(),
303 sum: self.sum().unwrap(),
304 min: self.min().unwrap(),
305 max: self.max().unwrap(),
306 }
307 }
308 }
309}
310
311impl<T: NumberType, D: Details<T>> SummaryStats<T> for DetailedHistogram<T, D> {
312 fn count(&self) -> u64 {
313 let mut count = 0;
314 for b in &self.buckets {
315 count += b.simple_stat.count();
316 }
317 count
318 }
319
320 fn sum(&self) -> Option<T> {
321 let mut sum = T::absolute_min();
322 let mut ret = None;
323 for b in &self.buckets {
324 if let Some(v) = b.simple_stat.sum() {
325 sum = sum.clone() + v;
326 ret = Some(sum.clone())
327 }
328 }
329 ret
330 }
331
332 fn min(&self) -> Option<T> {
333 for b in &self.buckets {
334 let min = b.simple_stat.min();
335 if min.is_some() {
336 return min;
337 }
338 }
339 None
340 }
341
342 fn max(&self) -> Option<T> {
343 for b in self.buckets.iter().rev() {
344 let max = b.simple_stat.max();
345 if max.is_some() {
346 return max;
347 }
348 }
349 None
350 }
351
352 fn average(&self) -> Option<T> {
353 let mut count = 0;
354 let mut sum = T::absolute_min();
355 for b in &self.buckets {
356 if b.simple_stat.count != 0 {
357 sum = sum + b.simple_stat.sum().unwrap();
358 count += b.simple_stat.count();
359 }
360 }
361 if count != 0 {
362 Some(sum / count)
363 } else {
364 None
365 }
366 }
367}
368
369pub type Histogram<T> = DetailedHistogram<T, T>;
372
373pub struct CallOnDrop<V, F: ?Sized + Fn(&V)> {
375 init_value: V,
376 update_value: F,
377}
378
379impl<V, F: Fn(&V)> CallOnDrop<V, F> {
380 pub fn new(init_value: V, update_value: F) -> Self {
381 Self {
382 init_value,
383 update_value,
384 }
385 }
386}
387
388impl<V, F: ?Sized + Fn(&V)> Drop for CallOnDrop<V, F> {
389 fn drop(&mut self) {
390 let f = &(self.update_value);
391 f(&self.init_value);
392 }
393}
394
395pub fn timed_scope(
396 histogram: Arc<Mutex<DetailedHistogram<u64, u64>>>,
397) -> CallOnDrop<
398 (Arc<Mutex<DetailedHistogram<u64, u64>>>, Instant),
399 fn(&(Arc<Mutex<DetailedHistogram<u64, u64>>>, Instant)),
400> {
401 CallOnDrop::new((histogram, Instant::now()), |(histogram, x)| {
402 if histogram.lock().add(x.elapsed().as_nanos() as u64).is_err() {
403 info!("Error adding timed scope stat");
404 }
405 })
406}
407
408#[derive(Debug)]
410pub struct BytesLatencyStats {
411 pub latency: DetailedHistogram<u64, u64>,
414 pub bytes_transferred: DetailedHistogram<u64, Range<u64>>,
417}
418
419impl BytesLatencyStats {
420 pub fn new_with_buckets(latency_buckets: &[Range<u64>], bytes_buckets: &[Range<u64>]) -> Self {
421 Self {
422 latency: DetailedHistogram::new(latency_buckets).unwrap(),
423 bytes_transferred: DetailedHistogram::new(bytes_buckets).unwrap(),
424 }
425 }
426}
427
428pub trait GetStatsForOp<OperationType> {
429 fn get_stats_for_op(&mut self, op: OperationType) -> &mut BytesLatencyStats;
430}
431
432#[cfg(any(test, feature = "collect"))]
435pub struct OpInfo<Stats, OperationType> {
436 stats: Arc<Mutex<Stats>>,
437 io_range: Range<u64>,
438 operation: OperationType,
439 start_time: Instant,
440}
441
442#[cfg(any(test, feature = "collect"))]
452pub fn collect_scoped_byte_latency_stat<
453 Stats: GetStatsForOp<OperationType> + Debug,
454 OperationType: Copy + Clone + Debug,
455>(
456 stats: Arc<Mutex<Stats>>,
457 io_range: Range<u64>,
458 operation: OperationType,
459) -> CallOnDrop<OpInfo<Stats, OperationType>, fn(&OpInfo<Stats, OperationType>)> {
460 let info = OpInfo {
461 stats,
462 io_range,
463 operation,
464 start_time: Instant::now(),
465 };
466 CallOnDrop::new(info, |info| {
467 let mut stats = info.stats.lock();
468 let op_stats = stats.get_stats_for_op(info.operation);
469
470 if op_stats
471 .latency
472 .add(info.start_time.elapsed().as_nanos() as u64)
473 .is_err()
474 {
475 info!("Error adding disk IO latency stat");
476 }
477
478 if op_stats
479 .bytes_transferred
480 .add(info.io_range.clone())
481 .is_err()
482 {
483 info!("Error adding disk IO bytes transferred stat");
484 }
485 })
486}
487
488#[cfg(all(not(test), not(feature = "collect")))]
489pub struct OpInfo {}
490
491#[cfg(all(not(test), not(feature = "collect")))]
492pub fn collect_scoped_byte_latency_stat<
493 Stats: GetStatsForOp<OperationType> + Debug,
494 OperationType: Copy + Clone + Debug,
495>(
496 _stats: Arc<Mutex<Stats>>,
497 _io_range: Range<u64>,
498 _operation: OperationType,
499) -> OpInfo {
500 OpInfo {}
501}
502
503#[cfg(test)]
504mod tests {
505
506 use std::time::Duration;
507
508 use super::*;
509
510 #[test]
511 fn simple_stat_init() {
512 let x = SimpleStat::<u64>::default();
513 assert_eq!(x.count, 0);
514 assert_eq!(x.max(), None);
515 assert_eq!(x.min(), None);
516 assert_eq!(x.average(), None);
517 assert_eq!(x.sum(), None);
518 }
519
520 #[test]
521 fn simple_stat_updates() {
522 let mut x = SimpleStat::<u64>::default();
523 x.add(10);
524 assert_eq!(x.count, 1);
525 assert_eq!(x.max(), Some(10));
526 assert_eq!(x.min(), Some(10));
527 assert_eq!(x.average(), Some(10));
528 assert_eq!(x.sum(), Some(10));
529 x.add(2);
530 assert_eq!(x.count, 2);
531 assert_eq!(x.max(), Some(10));
532 assert_eq!(x.min(), Some(2));
533 assert_eq!(x.average(), Some(6));
534 assert_eq!(x.sum(), Some(12));
535 x.add(1);
536 assert_eq!(x.count, 3);
537 assert_eq!(x.max(), Some(10));
538 assert_eq!(x.min(), Some(1));
539 assert_eq!(x.average(), Some(4));
540 assert_eq!(x.sum(), Some(13));
541 x.add(0);
542 assert_eq!(x.count, 4);
543 assert_eq!(x.max(), Some(10));
544 assert_eq!(x.min(), Some(0));
545 assert_eq!(x.average(), Some(3));
546 assert_eq!(x.sum(), Some(13));
547 }
548
549 fn bucket_check(bucket: &Bucket<u64>, values: &[u64]) {
550 let mut stats = SimpleStat::default();
551 for v in values {
552 stats.add(*v);
553 }
554 assert_eq!(bucket.simple_stat.count(), stats.count());
555 assert_eq!(bucket.simple_stat.sum(), stats.sum());
556 assert_eq!(bucket.simple_stat.min(), stats.min());
557 assert_eq!(bucket.simple_stat.max(), stats.max());
558 assert_eq!(bucket.simple_stat.average(), stats.average());
559 }
560
561 #[test]
562 fn histogram_without_details() {
563 let mut histogram = Histogram::new(&[0..10, 10..100, 100..200]).unwrap();
564
565 let mut simple_stats = SimpleStat::default();
566 assert_eq!(histogram.simple_stat(), simple_stats);
567 let values = [0, 20, 199, 50, 9, 5, 120];
568
569 for v in values {
570 histogram.add(v).unwrap();
571 simple_stats.add(v);
572 }
573
574 bucket_check(&histogram.buckets[0], &[0, 9, 5]);
575 bucket_check(&histogram.buckets[1], &[20, 50]);
576 bucket_check(&histogram.buckets[2], &[199, 120]);
577 assert_eq!(histogram.buckets.len(), 3);
578 assert_eq!(histogram.simple_stat(), simple_stats);
579 assert_eq!(histogram.values, None);
580 }
581
582 #[test]
583 fn histogram_without_details_empty_first_last_buckets() {
584 let mut histogram = Histogram::new(&[0..4, 4..10, 10..100, 100..200, 200..300]).unwrap();
585
586 let mut simple_stats = SimpleStat::default();
587 assert_eq!(histogram.simple_stat(), simple_stats);
588 let values = [4, 20, 199, 50, 9, 5, 120];
589
590 for v in values {
591 histogram.add(v).unwrap();
592 simple_stats.add(v);
593 }
594
595 bucket_check(&histogram.buckets[1], &[4, 9, 5]);
596 bucket_check(&histogram.buckets[2], &[20, 50]);
597 bucket_check(&histogram.buckets[3], &[199, 120]);
598 assert_eq!(histogram.buckets.len(), 5);
599 assert_eq!(histogram.simple_stat(), simple_stats);
600 assert_eq!(histogram.values, None);
601 }
602
603 #[cfg(feature = "experimental")]
604 #[derive(Clone, Debug, PartialEq)]
605 struct MyDetails(u64, u64);
606
607 #[cfg(feature = "experimental")]
608 impl Details<u64> for MyDetails {
609 fn value(&self) -> u64 {
610 self.1 - self.0
611 }
612 }
613
614 #[cfg(feature = "experimental")]
615 fn test_detailed_values() -> Vec<MyDetails> {
616 vec![
617 MyDetails(0, 4),
618 MyDetails(1, 21),
619 MyDetails(2, 201),
620 MyDetails(3, 53),
621 MyDetails(10, 19),
622 MyDetails(5, 10),
623 MyDetails(120, 240),
624 ]
625 }
626
627 #[cfg(feature = "experimental")]
628 #[test]
629 fn histogram_with_details() {
630 let mut histogram =
631 DetailedHistogram::new_with_details(&[0..10, 10..100, 100..200]).unwrap();
632
633 let mut simple_stats = SimpleStat::default();
634 assert_eq!(histogram.simple_stat(), simple_stats);
635
636 let values = test_detailed_values();
637
638 for v in &values {
639 simple_stats.add(v.value());
640 histogram.add(v.clone()).unwrap();
641 }
642
643 bucket_check(histogram.buckets[0], &[4, 9, 5]);
644 bucket_check(histogram.buckets[1], &[20, 50]);
645 bucket_check(histogram.buckets[2], &[199, 120]);
646 assert_eq!(histogram.buckets.len(), 3);
647 assert_eq!(histogram.simple_stat(), simple_stats);
648 assert_eq!(histogram.values, Some(values));
649 }
650
651 #[cfg(feature = "experimental")]
652 #[test]
653 fn histogram_with_details_empty_first_last_buckets() {
654 let mut histogram =
655 DetailedHistogram::new_with_details(&[0..4, 4..10, 10..100, 100..200, 200..300])
656 .unwrap();
657
658 let mut simple_stats = SimpleStat::default();
659 assert_eq!(histogram.simple_stat(), simple_stats);
660 let values = test_detailed_values();
661
662 for v in &values {
663 simple_stats.add(v.value());
664 histogram.add(v.clone()).unwrap();
665 }
666
667 bucket_check(histogram.buckets[0], &[]);
668 bucket_check(histogram.buckets[4], &[]);
669 bucket_check(histogram.buckets[1], &[4, 9, 5]);
670 bucket_check(histogram.buckets[2], &[20, 50]);
671 bucket_check(histogram.buckets[3], &[199, 120]);
672 assert_eq!(histogram.buckets.len(), 5);
673 assert_eq!(histogram.simple_stat(), simple_stats);
674 assert_eq!(histogram.values, Some(values));
675 }
676
677 #[test]
678 fn histogram_debug_fmt() {
679 let range = 0..200;
680 let mut histogram = Histogram::new(&[range]).unwrap();
681
682 let mut simple_stats = SimpleStat::default();
683 assert_eq!(histogram.simple_stat(), simple_stats);
684 let values = [0, 20, 199];
685
686 for v in values {
687 histogram.add(v).unwrap();
688 simple_stats.add(v);
689 }
690 assert_eq!(
691 format!("{histogram:#?}"),
692 r#"DetailedHistogram {
693 simple_stats: SimpleStat {
694 count: 3,
695 sum: 219,
696 min: 0,
697 max: 199,
698 average: 73,
699 },
700 buckets: [
701 Bucket {
702 simple_stat: SimpleStat {
703 count: 3,
704 sum: 219,
705 min: 0,
706 max: 199,
707 average: 73,
708 },
709 range: 0..200,
710 },
711 ],
712}"#
713 );
714 }
715
716 #[cfg(feature = "experimental")]
717 #[test]
718 fn detailed_histogram_debug_fmt() {
719 let mut histogram = DetailedHistogram::new_with_details(&[0..200]).unwrap();
720
721 let mut simple_stats = SimpleStat::default();
722 assert_eq!(histogram.simple_stat(), simple_stats);
723 let values = test_detailed_values();
724
725 for v in &values {
726 histogram.add(v.clone()).unwrap();
727 simple_stats.add(v.value());
728 }
729 assert_eq!(
730 format!("{:#?}", histogram),
731 r#"DetailedHistogram {
732 simple_stats: SimpleStat {
733 count: 7,
734 sum: 407,
735 min: 4,
736 max: 199,
737 average: 58,
738 },
739 median: 20,
740 std_dev: 69.03297053153779,
741 values: [
742 MyDetails(
743 0,
744 4,
745 ),
746 MyDetails(
747 1,
748 21,
749 ),
750 MyDetails(
751 2,
752 201,
753 ),
754 MyDetails(
755 3,
756 53,
757 ),
758 MyDetails(
759 10,
760 19,
761 ),
762 MyDetails(
763 5,
764 10,
765 ),
766 MyDetails(
767 120,
768 240,
769 ),
770 ],
771 buckets: [
772 Bucket {
773 simple_stat: SimpleStat {
774 count: 7,
775 sum: 407,
776 min: 4,
777 max: 199,
778 average: 58,
779 },
780 range: 0..200,
781 },
782 ],
783}"#
784 );
785 }
786
787 #[test]
788 fn add_on_drop() {
789 let range = 0..u64::MAX;
790 let histogram = Arc::new(Mutex::new(DetailedHistogram::new(&[range]).unwrap()));
791
792 {
793 let _ = timed_scope(histogram.clone());
794 }
795
796 assert_eq!(histogram.lock().count(), 1);
797 assert!(histogram.lock().sum().unwrap() > 1);
798 }
799
800 #[test]
801 fn disk_io_stat() {
802 #[derive(Debug)]
803 struct DiskIOStats {
804 read: BytesLatencyStats,
805 write: BytesLatencyStats,
806 }
807
808 #[derive(Copy, Clone, Debug)]
809 enum DiskOperationType {
810 Read,
811 Write,
812 }
813
814 impl GetStatsForOp<DiskOperationType> for DiskIOStats {
815 fn get_stats_for_op(&mut self, op: DiskOperationType) -> &mut BytesLatencyStats {
816 match op {
817 DiskOperationType::Read => &mut self.read,
818 DiskOperationType::Write => &mut self.write,
819 }
820 }
821 }
822
823 let stats = Arc::new(Mutex::new(DiskIOStats {
824 read: BytesLatencyStats::new_with_buckets(
825 &[0..100, 100..u64::MAX],
826 &[0..100, 100..u64::MAX],
827 ),
828 write: BytesLatencyStats::new_with_buckets(
829 &[0..100, 100..u64::MAX],
830 &[0..100, 100..u64::MAX],
831 ),
832 }));
833
834 {
835 let _ =
836 collect_scoped_byte_latency_stat(stats.clone(), 100..1000, DiskOperationType::Read);
837 std::thread::sleep(Duration::from_millis(10));
838 }
839 assert_eq!(stats.lock().read.latency.count(), 1);
840 assert_eq!(stats.lock().read.bytes_transferred.sum(), Some(900));
841 assert_eq!(stats.lock().write.latency.count(), 0);
842
843 {
844 let _ = collect_scoped_byte_latency_stat(
845 stats.clone(),
846 200..1000,
847 DiskOperationType::Write,
848 );
849 std::thread::sleep(Duration::from_millis(10));
850 }
851 assert_eq!(stats.lock().write.latency.count(), 1);
852 assert_eq!(stats.lock().write.bytes_transferred.sum(), Some(800));
853 assert_eq!(stats.lock().read.latency.count(), 1);
854 assert_eq!(stats.lock().read.bytes_transferred.sum(), Some(900));
855 }
856}