1use std::cmp::Ordering;
6use std::cmp::Reverse;
7use std::ops::Sub;
8use std::time::Duration;
9
10use anyhow::anyhow;
11use anyhow::bail;
12use anyhow::Result;
13use base::warn;
14
15fn abs_diff<T: Sub<Output = T> + Ord>(x: T, y: T) -> T {
16 if x < y {
17 y - x
18 } else {
19 x - y
20 }
21}
22
23#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
24pub struct CoreOffset {
25 pub core: usize,
26 pub offset: i128,
27}
28
29impl Ord for CoreOffset {
30 fn cmp(&self, other: &Self) -> Ordering {
32 (self.offset, self.core).cmp(&(other.offset, other.core))
33 }
34}
35
36impl PartialOrd for CoreOffset {
37 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
38 Some(self.cmp(other))
39 }
40}
41
42#[derive(Default, Debug, Clone, Eq, PartialEq)]
43pub struct CoreGroup {
44 pub cores: Vec<CoreOffset>,
45}
46
47impl CoreGroup {
48 fn size(&self) -> usize {
49 self.cores.len()
50 }
51
52 fn add(&self, core: CoreOffset, limit: u128) -> Result<Self> {
53 let diff_from_min = abs_diff(self.cores.iter().min().unwrap().offset, core.offset) as u128;
54 let diff_from_max = abs_diff(self.cores.iter().max().unwrap().offset, core.offset) as u128;
55 let can_add = diff_from_min < limit && diff_from_max < limit;
56
57 if can_add {
58 let mut new = self.clone();
59 new.cores.push(core);
60 Ok(new)
61 } else {
62 Err(anyhow!(
63 "offset {} not within {} of all members of core group",
64 core.offset,
65 limit
66 ))
67 }
68 }
69}
70
71#[derive(Default, Debug, Clone, Eq, PartialEq)]
72pub struct CoreGrouping {
73 groups: Vec<CoreGroup>,
74}
75
76impl Ord for CoreGrouping {
77 fn cmp(&self, other: &Self) -> Ordering {
78 (Reverse(self.largest_group().size()), self.groups.len())
80 .cmp(&(Reverse(other.largest_group().size()), other.groups.len()))
81 }
82}
83
84impl PartialOrd for CoreGrouping {
85 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
86 Some(self.cmp(other))
87 }
88}
89
90impl CoreGrouping {
91 pub(super) fn new(groups: Vec<CoreGroup>) -> Result<Self> {
92 if groups.is_empty() {
94 return Err(anyhow!("Cannot create an empty CoreGrouping."));
95 }
96 Ok(CoreGrouping { groups })
97 }
98
99 pub fn size(&self) -> usize {
100 self.groups.len()
101 }
102
103 pub fn largest_group_index(&self) -> usize {
104 self.groups
107 .iter()
108 .enumerate()
109 .map(|(i, g)| (Reverse(g.size()), i))
110 .min()
111 .unwrap_or((Reverse(0), 0))
112 .1
113 }
114
115 pub fn largest_group(&self) -> &CoreGroup {
116 &self.groups[self.largest_group_index()]
117 }
118
119 pub fn core_grouping_bitmask(&self) -> u64 {
120 if self.size() == 0 {
122 return 0;
123 }
124
125 let mut bitmask = 0u64;
126 let largest_group_index = self.largest_group_index();
127 for (i, group) in self.groups.iter().enumerate() {
128 if i == largest_group_index {
130 continue;
131 }
132
133 for core in &group.cores {
135 if core.core > 63 {
136 warn!("Core grouping bitmask cannot contain core {}", core.core);
137 continue;
138 }
139 bitmask |= 1 << core.core;
140 }
141 }
142
143 bitmask
144 }
145
146 fn add_group(&mut self, group: CoreGroup) {
147 self.groups.push(group)
148 }
149
150 fn add_core_to_last_group(&self, core: CoreOffset, in_sync_threshold: u128) -> Option<Self> {
151 let last_group = self.groups.len() - 1;
152 self.groups[last_group]
153 .add(core, in_sync_threshold)
154 .map(|new_group| {
155 let mut new_grouping = self.clone();
156 new_grouping.groups[last_group] = new_group;
157 new_grouping
158 })
159 .ok()
160 }
161}
162
163pub(super) fn group_core_offsets(
182 offsets: &[(usize, i128)],
183 in_sync_threshold: Duration,
184 tsc_frequency: u64,
185) -> Result<CoreGrouping> {
186 if offsets.is_empty() {
187 bail!("Per-core offsets cannot be empty");
188 }
189
190 let in_sync_threshold_ticks =
192 in_sync_threshold.as_nanos() * tsc_frequency as u128 / 1_000_000_000u128;
193
194 let mut cores: Vec<CoreOffset> = offsets
195 .iter()
196 .map(|(i, offset)| CoreOffset {
197 core: *i,
198 offset: *offset,
199 })
200 .collect();
201
202 cores.sort();
205
206 let mut grouping_options: Vec<CoreGrouping> = vec![CoreGrouping::new(vec![CoreGroup {
207 cores: vec![cores[0]],
208 }])?];
209 for core in &cores[1..] {
210 let mut best = grouping_options[0].clone();
211 best.add_group(CoreGroup { cores: vec![*core] });
212
213 let mut next_grouping_options = vec![best];
214
215 for grouping_option in &grouping_options {
216 if let Some(new_grouping) =
217 grouping_option.add_core_to_last_group(*core, in_sync_threshold_ticks)
218 {
219 next_grouping_options.push(new_grouping);
220 }
221 }
222
223 next_grouping_options.sort();
224 grouping_options = next_grouping_options;
225 }
226
227 Ok(grouping_options[0].clone())
228}
229
230#[cfg(test)]
231mod tests {
232 use super::super::TscState;
233 use super::*;
234
235 #[test]
236 fn test_simple_offset_grouping() {
237 let offsets = vec![(0, 10), (1, 10), (2, 10), (3, 1000)];
238 let state = TscState::new(1_000_000_000, offsets, Duration::from_nanos(1))
239 .expect("TscState::new should not fail for this test");
240
241 let group0 = CoreGroup {
242 cores: vec![
243 CoreOffset {
244 core: 0,
245 offset: 10,
246 },
247 CoreOffset {
248 core: 1,
249 offset: 10,
250 },
251 CoreOffset {
252 core: 2,
253 offset: 10,
254 },
255 ],
256 };
257 let group1 = CoreGroup {
258 cores: vec![CoreOffset {
259 core: 3,
260 offset: 1000,
261 }],
262 };
263 assert_eq!(
264 state.core_grouping,
265 CoreGrouping::new(vec![group0.clone(), group1])
266 .expect("CoreGrouping::new should not fail here")
267 );
268
269 assert_eq!(state.core_grouping.largest_group().clone(), group0);
270 assert_eq!(state.core_grouping.core_grouping_bitmask(), 0b1000u64);
271 }
272
273 #[test]
274 fn test_ambiguous_offset_grouping() {
275 let offsets = vec![(0, 10), (1, 20), (2, 30), (3, 40), (4, 50)];
280 let state = TscState::new(1_000_000_000, offsets, Duration::from_nanos(20))
281 .expect("TscState::new should not fail for this test");
282
283 let group0 = CoreGroup {
284 cores: vec![
285 CoreOffset {
286 core: 0,
287 offset: 10,
288 },
289 CoreOffset {
290 core: 1,
291 offset: 20,
292 },
293 ],
294 };
295 let group1 = CoreGroup {
296 cores: vec![
297 CoreOffset {
298 core: 2,
299 offset: 30,
300 },
301 CoreOffset {
302 core: 3,
303 offset: 40,
304 },
305 ],
306 };
307 let group2 = CoreGroup {
308 cores: vec![CoreOffset {
309 core: 4,
310 offset: 50,
311 }],
312 };
313 assert_eq!(
314 state.core_grouping,
315 CoreGrouping::new(vec![group0.clone(), group1, group2])
316 .expect("CoreGrouping::new should not fail here")
317 );
318
319 assert_eq!(state.core_grouping.largest_group().clone(), group0);
321 assert_eq!(state.core_grouping.core_grouping_bitmask(), 0b11100u64);
322 }
323
324 #[test]
325 fn test_worst_case_grouping() {
326 let offsets = (0..129).map(|i| (i, 0)).collect();
330 TscState::new(1_000_000_000, offsets, Duration::from_nanos(1))
331 .expect("TscState::new should not fail for this test");
332 }
333}