Return a single Option from EventCoalescer
This commit is contained in:
parent
ac5f8254a2
commit
a9fce19048
2 changed files with 132 additions and 98 deletions
|
@ -407,7 +407,7 @@ impl Telemetry {
|
||||||
let period_data = state.event_coalescer.log_event(environment);
|
let period_data = state.event_coalescer.log_event(environment);
|
||||||
drop(state);
|
drop(state);
|
||||||
|
|
||||||
if let (Some((start, end)), Some(environment)) = period_data {
|
if let Some((start, end, environment)) = period_data {
|
||||||
let event = Event::Edit {
|
let event = Event::Edit {
|
||||||
duration: end.timestamp_millis() - start.timestamp_millis(),
|
duration: end.timestamp_millis() - start.timestamp_millis(),
|
||||||
environment,
|
environment,
|
||||||
|
|
|
@ -4,25 +4,26 @@ use std::time;
|
||||||
const COALESCE_TIMEOUT: time::Duration = time::Duration::from_secs(20);
|
const COALESCE_TIMEOUT: time::Duration = time::Duration::from_secs(20);
|
||||||
const SIMULATED_DURATION_FOR_SINGLE_EVENT: time::Duration = time::Duration::from_millis(1);
|
const SIMULATED_DURATION_FOR_SINGLE_EVENT: time::Duration = time::Duration::from_millis(1);
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq)]
|
||||||
|
struct PeriodData {
|
||||||
|
environment: &'static str,
|
||||||
|
start: DateTime<Utc>,
|
||||||
|
end: Option<DateTime<Utc>>,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct EventCoalescer {
|
pub struct EventCoalescer {
|
||||||
environment: Option<&'static str>,
|
state: Option<PeriodData>,
|
||||||
period_start: Option<DateTime<Utc>>,
|
|
||||||
period_end: Option<DateTime<Utc>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventCoalescer {
|
impl EventCoalescer {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self { state: None }
|
||||||
environment: None,
|
|
||||||
period_start: None,
|
|
||||||
period_end: None,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn log_event(
|
pub fn log_event(
|
||||||
&mut self,
|
&mut self,
|
||||||
environment: &'static str,
|
environment: &'static str,
|
||||||
) -> (Option<(DateTime<Utc>, DateTime<Utc>)>, Option<&'static str>) {
|
) -> Option<(DateTime<Utc>, DateTime<Utc>, &'static str)> {
|
||||||
self.log_event_with_time(Utc::now(), environment)
|
self.log_event_with_time(Utc::now(), environment)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,41 +35,43 @@ impl EventCoalescer {
|
||||||
&mut self,
|
&mut self,
|
||||||
log_time: DateTime<Utc>,
|
log_time: DateTime<Utc>,
|
||||||
environment: &'static str,
|
environment: &'static str,
|
||||||
) -> (Option<(DateTime<Utc>, DateTime<Utc>)>, Option<&'static str>) {
|
) -> Option<(DateTime<Utc>, DateTime<Utc>, &'static str)> {
|
||||||
let coalesce_timeout = Duration::from_std(COALESCE_TIMEOUT).unwrap();
|
let coalesce_timeout = Duration::from_std(COALESCE_TIMEOUT).unwrap();
|
||||||
|
|
||||||
let Some(period_start) = self.period_start else {
|
let Some(state) = &mut self.state else {
|
||||||
self.period_start = Some(log_time);
|
self.state = Some(PeriodData {
|
||||||
self.environment = Some(environment);
|
start: log_time,
|
||||||
return (None, None);
|
end: None,
|
||||||
|
environment,
|
||||||
|
});
|
||||||
|
return None;
|
||||||
};
|
};
|
||||||
|
|
||||||
let period_end = self
|
let period_end = state
|
||||||
.period_end
|
.end
|
||||||
.unwrap_or(period_start + SIMULATED_DURATION_FOR_SINGLE_EVENT);
|
.unwrap_or(state.start + SIMULATED_DURATION_FOR_SINGLE_EVENT);
|
||||||
let within_timeout = log_time - period_end < coalesce_timeout;
|
let within_timeout = log_time - period_end < coalesce_timeout;
|
||||||
let environment_is_same = self.environment == Some(environment);
|
let environment_is_same = state.environment == environment;
|
||||||
let should_coaelesce = !within_timeout || !environment_is_same;
|
let should_coaelesce = !within_timeout || !environment_is_same;
|
||||||
|
|
||||||
if should_coaelesce {
|
if should_coaelesce {
|
||||||
let previous_environment = self.environment;
|
let previous_environment = state.environment;
|
||||||
|
let original_start = state.start;
|
||||||
|
|
||||||
self.period_start = Some(log_time);
|
state.start = log_time;
|
||||||
self.period_end = None;
|
state.end = None;
|
||||||
self.environment = Some(environment);
|
state.environment = environment;
|
||||||
|
|
||||||
return (
|
return Some((
|
||||||
Some((
|
original_start,
|
||||||
period_start,
|
if within_timeout { log_time } else { period_end },
|
||||||
if within_timeout { log_time } else { period_end },
|
|
||||||
)),
|
|
||||||
previous_environment,
|
previous_environment,
|
||||||
);
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
self.period_end = Some(log_time);
|
state.end = Some(log_time);
|
||||||
|
|
||||||
(None, None)
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,17 +86,20 @@ mod tests {
|
||||||
let environment_1 = "environment_1";
|
let environment_1 = "environment_1";
|
||||||
let mut event_coalescer = EventCoalescer::new();
|
let mut event_coalescer = EventCoalescer::new();
|
||||||
|
|
||||||
assert_eq!(event_coalescer.period_start, None);
|
assert_eq!(event_coalescer.state, None);
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
|
||||||
assert_eq!(event_coalescer.environment, None);
|
|
||||||
|
|
||||||
let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
|
let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
|
||||||
let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
|
let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
|
||||||
|
|
||||||
assert_eq!(period_data, (None, None));
|
assert_eq!(period_data, None);
|
||||||
assert_eq!(event_coalescer.period_start, Some(period_start));
|
assert_eq!(
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
event_coalescer.state,
|
||||||
assert_eq!(event_coalescer.environment, Some(environment_1));
|
Some(PeriodData {
|
||||||
|
start: period_start,
|
||||||
|
end: None,
|
||||||
|
environment: environment_1,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
|
let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
|
||||||
let mut period_end = period_start;
|
let mut period_end = period_start;
|
||||||
|
@ -103,10 +109,15 @@ mod tests {
|
||||||
period_end += within_timeout_adjustment;
|
period_end += within_timeout_adjustment;
|
||||||
let period_data = event_coalescer.log_event_with_time(period_end, environment_1);
|
let period_data = event_coalescer.log_event_with_time(period_end, environment_1);
|
||||||
|
|
||||||
assert_eq!(period_data, (None, None));
|
assert_eq!(period_data, None);
|
||||||
assert_eq!(event_coalescer.period_start, Some(period_start));
|
assert_eq!(
|
||||||
assert_eq!(event_coalescer.period_end, Some(period_end));
|
event_coalescer.state,
|
||||||
assert_eq!(event_coalescer.environment, Some(environment_1));
|
Some(PeriodData {
|
||||||
|
start: period_start,
|
||||||
|
end: Some(period_end),
|
||||||
|
environment: environment_1,
|
||||||
|
})
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
|
let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
|
||||||
|
@ -114,13 +125,15 @@ mod tests {
|
||||||
let new_period_start = period_end + exceed_timeout_adjustment;
|
let new_period_start = period_end + exceed_timeout_adjustment;
|
||||||
let period_data = event_coalescer.log_event_with_time(new_period_start, environment_1);
|
let period_data = event_coalescer.log_event_with_time(new_period_start, environment_1);
|
||||||
|
|
||||||
|
assert_eq!(period_data, Some((period_start, period_end, environment_1)));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
period_data,
|
event_coalescer.state,
|
||||||
(Some((period_start, period_end)), Some(environment_1))
|
Some(PeriodData {
|
||||||
|
start: new_period_start,
|
||||||
|
end: None,
|
||||||
|
environment: environment_1,
|
||||||
|
})
|
||||||
);
|
);
|
||||||
assert_eq!(event_coalescer.period_start, Some(new_period_start));
|
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
|
||||||
assert_eq!(event_coalescer.environment, Some(environment_1));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -128,39 +141,49 @@ mod tests {
|
||||||
let environment_1 = "environment_1";
|
let environment_1 = "environment_1";
|
||||||
let mut event_coalescer = EventCoalescer::new();
|
let mut event_coalescer = EventCoalescer::new();
|
||||||
|
|
||||||
assert_eq!(event_coalescer.period_start, None);
|
assert_eq!(event_coalescer.state, None);
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
|
||||||
assert_eq!(event_coalescer.environment, None);
|
|
||||||
|
|
||||||
let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
|
let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
|
||||||
let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
|
let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
|
||||||
|
|
||||||
assert_eq!(period_data, (None, None));
|
assert_eq!(period_data, None);
|
||||||
assert_eq!(event_coalescer.period_start, Some(period_start));
|
assert_eq!(
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
event_coalescer.state,
|
||||||
assert_eq!(event_coalescer.environment, Some(environment_1));
|
Some(PeriodData {
|
||||||
|
start: period_start,
|
||||||
|
end: None,
|
||||||
|
environment: environment_1,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
|
let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
|
||||||
let period_end = period_start + within_timeout_adjustment;
|
let period_end = period_start + within_timeout_adjustment;
|
||||||
let period_data = event_coalescer.log_event_with_time(period_end, environment_1);
|
let period_data = event_coalescer.log_event_with_time(period_end, environment_1);
|
||||||
|
|
||||||
assert_eq!(period_data, (None, None));
|
assert_eq!(period_data, None);
|
||||||
assert_eq!(event_coalescer.period_start, Some(period_start));
|
assert_eq!(
|
||||||
assert_eq!(event_coalescer.period_end, Some(period_end));
|
event_coalescer.state,
|
||||||
assert_eq!(event_coalescer.environment, Some(environment_1));
|
Some(PeriodData {
|
||||||
|
start: period_start,
|
||||||
|
end: Some(period_end),
|
||||||
|
environment: environment_1,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
// Logging an event within the timeout but with a different environment should start a new period
|
// Logging an event within the timeout but with a different environment should start a new period
|
||||||
let period_end = period_end + within_timeout_adjustment;
|
let period_end = period_end + within_timeout_adjustment;
|
||||||
let environment_2 = "environment_2";
|
let environment_2 = "environment_2";
|
||||||
let period_data = event_coalescer.log_event_with_time(period_end, environment_2);
|
let period_data = event_coalescer.log_event_with_time(period_end, environment_2);
|
||||||
|
|
||||||
|
assert_eq!(period_data, Some((period_start, period_end, environment_1)));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
period_data,
|
event_coalescer.state,
|
||||||
(Some((period_start, period_end)), Some(environment_1))
|
Some(PeriodData {
|
||||||
|
start: period_end,
|
||||||
|
end: None,
|
||||||
|
environment: environment_2,
|
||||||
|
})
|
||||||
);
|
);
|
||||||
assert_eq!(event_coalescer.period_start, Some(period_end));
|
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
|
||||||
assert_eq!(event_coalescer.environment, Some(environment_2));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -168,54 +191,62 @@ mod tests {
|
||||||
let environment_1 = "environment_1";
|
let environment_1 = "environment_1";
|
||||||
let mut event_coalescer = EventCoalescer::new();
|
let mut event_coalescer = EventCoalescer::new();
|
||||||
|
|
||||||
assert_eq!(event_coalescer.period_start, None);
|
assert_eq!(event_coalescer.state, None);
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
|
||||||
assert_eq!(event_coalescer.environment, None);
|
|
||||||
|
|
||||||
let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
|
let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
|
||||||
let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
|
let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
|
||||||
|
|
||||||
assert_eq!(period_data, (None, None));
|
assert_eq!(period_data, None);
|
||||||
assert_eq!(event_coalescer.period_start, Some(period_start));
|
assert_eq!(
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
event_coalescer.state,
|
||||||
assert_eq!(event_coalescer.environment, Some(environment_1));
|
Some(PeriodData {
|
||||||
|
start: period_start,
|
||||||
|
end: None,
|
||||||
|
environment: environment_1,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
|
let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
|
||||||
let period_end = period_start + within_timeout_adjustment;
|
let period_end = period_start + within_timeout_adjustment;
|
||||||
let environment_2 = "environment_2";
|
let environment_2 = "environment_2";
|
||||||
let period_data = event_coalescer.log_event_with_time(period_end, environment_2);
|
let period_data = event_coalescer.log_event_with_time(period_end, environment_2);
|
||||||
|
|
||||||
|
assert_eq!(period_data, Some((period_start, period_end, environment_1)));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
period_data,
|
event_coalescer.state,
|
||||||
(Some((period_start, period_end)), Some(environment_1))
|
Some(PeriodData {
|
||||||
|
start: period_end,
|
||||||
|
end: None,
|
||||||
|
environment: environment_2,
|
||||||
|
})
|
||||||
);
|
);
|
||||||
assert_eq!(event_coalescer.period_start, Some(period_end));
|
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
|
||||||
assert_eq!(event_coalescer.environment, Some(environment_2));
|
|
||||||
}
|
}
|
||||||
// 0 20 40 60
|
// // 0 20 40 60
|
||||||
// |-------------------|-------------------|-------------------|-------------------
|
// // |-------------------|-------------------|-------------------|-------------------
|
||||||
// |--------|----------env change
|
// // |--------|----------env change
|
||||||
// |-------------------
|
// // |-------------------
|
||||||
// |period_start |period_end
|
// // |period_start |period_end
|
||||||
// |new_period_start
|
// // |new_period_start
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_switching_environment_while_exceeding_timeout() {
|
fn test_switching_environment_while_exceeding_timeout() {
|
||||||
let environment_1 = "environment_1";
|
let environment_1 = "environment_1";
|
||||||
let mut event_coalescer = EventCoalescer::new();
|
let mut event_coalescer = EventCoalescer::new();
|
||||||
|
|
||||||
assert_eq!(event_coalescer.period_start, None);
|
assert_eq!(event_coalescer.state, None);
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
|
||||||
assert_eq!(event_coalescer.environment, None);
|
|
||||||
|
|
||||||
let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
|
let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
|
||||||
let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
|
let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
|
||||||
|
|
||||||
assert_eq!(period_data, (None, None));
|
assert_eq!(period_data, None);
|
||||||
assert_eq!(event_coalescer.period_start, Some(period_start));
|
assert_eq!(
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
event_coalescer.state,
|
||||||
assert_eq!(event_coalescer.environment, Some(environment_1));
|
Some(PeriodData {
|
||||||
|
start: period_start,
|
||||||
|
end: None,
|
||||||
|
environment: environment_1,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
|
let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
|
||||||
let period_end = period_start + exceed_timeout_adjustment;
|
let period_end = period_start + exceed_timeout_adjustment;
|
||||||
|
@ -224,17 +255,20 @@ mod tests {
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
period_data,
|
period_data,
|
||||||
(
|
Some((
|
||||||
Some((
|
period_start,
|
||||||
period_start,
|
period_start + SIMULATED_DURATION_FOR_SINGLE_EVENT,
|
||||||
period_start + SIMULATED_DURATION_FOR_SINGLE_EVENT
|
environment_1
|
||||||
)),
|
))
|
||||||
Some(environment_1)
|
);
|
||||||
)
|
assert_eq!(
|
||||||
|
event_coalescer.state,
|
||||||
|
Some(PeriodData {
|
||||||
|
start: period_end,
|
||||||
|
end: None,
|
||||||
|
environment: environment_2,
|
||||||
|
})
|
||||||
);
|
);
|
||||||
assert_eq!(event_coalescer.period_start, Some(period_end));
|
|
||||||
assert_eq!(event_coalescer.period_end, None);
|
|
||||||
assert_eq!(event_coalescer.environment, Some(environment_2));
|
|
||||||
}
|
}
|
||||||
// 0 20 40 60
|
// 0 20 40 60
|
||||||
// |-------------------|-------------------|-------------------|-------------------
|
// |-------------------|-------------------|-------------------|-------------------
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue