use std::{
collections::{hash_map::Entry, HashMap},
pin::Pin,
time::Duration,
};
use async_stream::stream;
use futures::{Stream, StreamExt};
use vector_lib::{config::LogNamespace, event::MetricValue};
use vector_lib::{
configurable::configurable_component,
event::metric::{Metric, MetricData, MetricKind, MetricSeries},
};
use crate::{
config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
event::{Event, EventMetadata},
internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
schema,
transforms::{TaskTransform, Transform},
};
#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct AggregateConfig {
#[serde(default = "default_interval_ms")]
#[configurable(metadata(docs::human_name = "Flush Interval"))]
pub interval_ms: u64,
#[serde(default = "default_mode")]
#[configurable(derived)]
pub mode: AggregationMode,
}
#[configurable_component]
#[derive(Clone, Debug, Default)]
#[configurable(description = "The aggregation mode to use.")]
pub enum AggregationMode {
#[default]
Auto,
Sum,
Latest,
Count,
Diff,
Max,
Min,
Mean,
Stdev,
}
const fn default_mode() -> AggregationMode {
AggregationMode::Auto
}
const fn default_interval_ms() -> u64 {
10 * 1000
}
impl_generate_config_from_default!(AggregateConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "aggregate")]
impl TransformConfig for AggregateConfig {
async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
Aggregate::new(self).map(Transform::event_task)
}
fn input(&self) -> Input {
Input::metric()
}
fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
_: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
vec![TransformOutput::new(DataType::Metric, HashMap::new())]
}
}
type MetricEntry = (MetricData, EventMetadata);
#[derive(Debug)]
pub struct Aggregate {
interval: Duration,
map: HashMap<MetricSeries, MetricEntry>,
prev_map: HashMap<MetricSeries, MetricEntry>,
multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
mode: AggregationMode,
}
impl Aggregate {
pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
Ok(Self {
interval: Duration::from_millis(config.interval_ms),
map: Default::default(),
prev_map: Default::default(),
multi_map: Default::default(),
mode: config.mode.clone(),
})
}
fn record(&mut self, event: Event) {
let (series, data, metadata) = event.into_metric().into_parts();
match self.mode {
AggregationMode::Auto => match data.kind {
MetricKind::Incremental => self.record_sum(series, data, metadata),
MetricKind::Absolute => {
self.map.insert(series, (data, metadata));
}
},
AggregationMode::Sum => self.record_sum(series, data, metadata),
AggregationMode::Latest | AggregationMode::Diff => match data.kind {
MetricKind::Incremental => (),
MetricKind::Absolute => {
self.map.insert(series, (data, metadata));
}
},
AggregationMode::Count => self.record_count(series, data, metadata),
AggregationMode::Max | AggregationMode::Min => {
self.record_comparison(series, data, metadata)
}
AggregationMode::Mean | AggregationMode::Stdev => match data.kind {
MetricKind::Incremental => (),
MetricKind::Absolute => {
if matches!(data.value, MetricValue::Gauge { value: _ }) {
match self.multi_map.entry(series) {
Entry::Occupied(mut entry) => {
let existing = entry.get_mut();
existing.push((data, metadata));
}
Entry::Vacant(entry) => {
entry.insert(vec![(data, metadata)]);
}
}
}
}
},
}
emit!(AggregateEventRecorded);
}
fn record_count(
&mut self,
series: MetricSeries,
mut data: MetricData,
metadata: EventMetadata,
) {
let mut count_data = data.clone();
let existing = self.map.entry(series).or_insert_with(|| {
*data.value_mut() = MetricValue::Counter { value: 0f64 };
(data.clone(), metadata.clone())
});
*count_data.value_mut() = MetricValue::Counter { value: 1f64 };
if existing.0.kind == data.kind && existing.0.update(&count_data) {
existing.1.merge(metadata);
} else {
emit!(AggregateUpdateFailed);
}
}
fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
match data.kind {
MetricKind::Incremental => match self.map.entry(series) {
Entry::Occupied(mut entry) => {
let existing = entry.get_mut();
if existing.0.kind == data.kind && existing.0.update(&data) {
existing.1.merge(metadata);
} else {
emit!(AggregateUpdateFailed);
*existing = (data, metadata);
}
}
Entry::Vacant(entry) => {
entry.insert((data, metadata));
}
},
MetricKind::Absolute => {}
}
}
fn record_comparison(
&mut self,
series: MetricSeries,
data: MetricData,
metadata: EventMetadata,
) {
match data.kind {
MetricKind::Incremental => (),
MetricKind::Absolute => match self.map.entry(series) {
Entry::Occupied(mut entry) => {
let existing = entry.get_mut();
if existing.0.kind == data.kind {
if let MetricValue::Gauge {
value: existing_value,
} = existing.0.value()
{
if let MetricValue::Gauge { value: new_value } = data.value() {
let should_update = match self.mode {
AggregationMode::Max => new_value > existing_value,
AggregationMode::Min => new_value < existing_value,
_ => false,
};
if should_update {
*existing = (data, metadata);
}
}
}
} else {
emit!(AggregateUpdateFailed);
*existing = (data, metadata);
}
}
Entry::Vacant(entry) => {
entry.insert((data, metadata));
}
},
}
}
fn flush_into(&mut self, output: &mut Vec<Event>) {
let map = std::mem::take(&mut self.map);
for (series, entry) in map.clone().into_iter() {
let mut metric = Metric::from_parts(series, entry.0, entry.1);
if matches!(self.mode, AggregationMode::Diff) {
if let Some(prev_entry) = self.prev_map.get(metric.series()) {
if metric.data().kind == prev_entry.0.kind && !metric.subtract(&prev_entry.0) {
emit!(AggregateUpdateFailed);
}
}
}
output.push(Event::Metric(metric));
}
let multi_map = std::mem::take(&mut self.multi_map);
'outer: for (series, entries) in multi_map.into_iter() {
if entries.is_empty() {
continue;
}
let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
for (data, metadata) in entries.iter().skip(1) {
if !final_sum.update(data) {
emit!(AggregateUpdateFailed);
continue 'outer;
}
final_metadata.merge(metadata.clone());
}
let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
*value /= entries.len() as f64;
*value
} else {
0.0
};
let final_mean = final_sum.clone();
match self.mode {
AggregationMode::Mean => {
let metric = Metric::from_parts(series, final_mean, final_metadata);
output.push(Event::Metric(metric));
}
AggregationMode::Stdev => {
let variance = entries
.iter()
.filter_map(|(data, _)| {
if let MetricValue::Gauge { value } = data.value() {
let diff = final_mean_value - value;
Some(diff * diff)
} else {
None
}
})
.sum::<f64>()
/ entries.len() as f64;
let mut final_stdev = final_mean;
if let MetricValue::Gauge { value } = final_stdev.value_mut() {
*value = variance.sqrt()
}
let metric = Metric::from_parts(series, final_stdev, final_metadata);
output.push(Event::Metric(metric));
}
_ => (),
}
}
self.prev_map = map;
emit!(AggregateFlushed);
}
}
impl TaskTransform<Event> for Aggregate {
fn transform(
mut self: Box<Self>,
mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
) -> Pin<Box<dyn Stream<Item = Event> + Send>>
where
Self: 'static,
{
let mut flush_stream = tokio::time::interval(self.interval);
Box::pin(stream! {
let mut output = Vec::new();
let mut done = false;
while !done {
tokio::select! {
_ = flush_stream.tick() => {
self.flush_into(&mut output);
},
maybe_event = input_rx.next() => {
match maybe_event {
None => {
self.flush_into(&mut output);
done = true;
}
Some(event) => self.record(event),
}
}
};
for event in output.drain(..) {
yield event;
}
}
})
}
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeSet, sync::Arc, task::Poll};
use futures::stream;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use vector_lib::config::ComponentKey;
use vrl::value::Kind;
use super::*;
use crate::schema::Definition;
use crate::{
event::{
metric::{MetricKind, MetricValue},
Event, Metric,
},
test_util::components::assert_transform_compliance,
transforms::test::create_topology,
};
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<AggregateConfig>();
}
fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
let mut event = Event::Metric(Metric::new(name, kind, value))
.with_source_id(Arc::new(ComponentKey::from("in")))
.with_upstream_id(Arc::new(OutputId::from("transform")));
event.metadata_mut().set_schema_definition(&Arc::new(
Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
));
event.metadata_mut().set_source_type("unit_test_stream");
event
}
#[test]
fn incremental_auto() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 1000_u64,
mode: AggregationMode::Auto,
})
.unwrap();
let counter_a_1 = make_metric(
"counter_a",
MetricKind::Incremental,
MetricValue::Counter { value: 42.0 },
);
let counter_a_2 = make_metric(
"counter_a",
MetricKind::Incremental,
MetricValue::Counter { value: 43.0 },
);
let counter_a_summed = make_metric(
"counter_a",
MetricKind::Incremental,
MetricValue::Counter { value: 85.0 },
);
agg.record(counter_a_1.clone());
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&counter_a_1, &out[0]);
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
agg.record(counter_a_1.clone());
agg.record(counter_a_2);
out.clear();
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&counter_a_summed, &out[0]);
let counter_b_1 = make_metric(
"counter_b",
MetricKind::Incremental,
MetricValue::Counter { value: 44.0 },
);
agg.record(counter_a_1.clone());
agg.record(counter_b_1.clone());
out.clear();
agg.flush_into(&mut out);
assert_eq!(2, out.len());
for event in out {
match event.as_metric().series().name.name.as_str() {
"counter_a" => assert_eq!(counter_a_1, event),
"counter_b" => assert_eq!(counter_b_1, event),
_ => panic!("Unexpected metric name in aggregate output"),
}
}
}
#[test]
fn absolute_auto() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 1000_u64,
mode: AggregationMode::Auto,
})
.unwrap();
let gauge_a_1 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 42.0 },
);
let gauge_a_2 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 43.0 },
);
agg.record(gauge_a_1.clone());
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&gauge_a_1, &out[0]);
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
agg.record(gauge_a_1.clone());
agg.record(gauge_a_2.clone());
out.clear();
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&gauge_a_2, &out[0]);
let gauge_b_1 = make_metric(
"gauge_b",
MetricKind::Absolute,
MetricValue::Gauge { value: 44.0 },
);
agg.record(gauge_a_1.clone());
agg.record(gauge_b_1.clone());
out.clear();
agg.flush_into(&mut out);
assert_eq!(2, out.len());
for event in out {
match event.as_metric().series().name.name.as_str() {
"gauge_a" => assert_eq!(gauge_a_1, event),
"gauge_b" => assert_eq!(gauge_b_1, event),
_ => panic!("Unexpected metric name in aggregate output"),
}
}
}
#[test]
fn count_agg() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 1000_u64,
mode: AggregationMode::Count,
})
.unwrap();
let gauge_a_1 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 42.0 },
);
let gauge_a_2 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 43.0 },
);
let result_count = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Counter { value: 1.0 },
);
let result_count_2 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Counter { value: 2.0 },
);
agg.record(gauge_a_1.clone());
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&result_count, &out[0]);
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
agg.record(gauge_a_1.clone());
agg.record(gauge_a_2.clone());
out.clear();
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&result_count_2, &out[0]);
}
#[test]
fn absolute_max() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 1000_u64,
mode: AggregationMode::Max,
})
.unwrap();
let gauge_a_1 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 112.0 },
);
let gauge_a_2 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 89.0 },
);
agg.record(gauge_a_2.clone());
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&gauge_a_2, &out[0]);
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
agg.record(gauge_a_1.clone());
agg.record(gauge_a_2.clone());
out.clear();
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&gauge_a_1, &out[0]);
}
#[test]
fn absolute_min() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 1000_u64,
mode: AggregationMode::Min,
})
.unwrap();
let gauge_a_1 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 32.0 },
);
let gauge_a_2 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 89.0 },
);
agg.record(gauge_a_2.clone());
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&gauge_a_2, &out[0]);
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
agg.record(gauge_a_1.clone());
agg.record(gauge_a_2.clone());
out.clear();
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&gauge_a_1, &out[0]);
}
#[test]
fn absolute_diff() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 1000_u64,
mode: AggregationMode::Diff,
})
.unwrap();
let gauge_a_1 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 32.0 },
);
let gauge_a_2 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 82.0 },
);
let result = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 50.0 },
);
agg.record(gauge_a_2.clone());
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&gauge_a_2, &out[0]);
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
agg.record(gauge_a_1.clone());
out.clear();
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&gauge_a_1, &out[0]);
agg.record(gauge_a_2.clone());
out.clear();
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&result, &out[0]);
}
#[test]
fn absolute_diff_conflicting_type() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 1000_u64,
mode: AggregationMode::Diff,
})
.unwrap();
let gauge_a_1 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 32.0 },
);
let gauge_a_2 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Counter { value: 1.0 },
);
let mut out = vec![];
agg.record(gauge_a_1.clone());
out.clear();
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&gauge_a_1, &out[0]);
agg.record(gauge_a_2.clone());
out.clear();
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&gauge_a_2, &out[0]);
}
#[test]
fn absolute_mean() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 1000_u64,
mode: AggregationMode::Mean,
})
.unwrap();
let gauge_a_1 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 32.0 },
);
let gauge_a_2 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 82.0 },
);
let gauge_a_3 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 51.0 },
);
let mean_result = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 55.0 },
);
agg.record(gauge_a_2.clone());
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&gauge_a_2, &out[0]);
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
out.clear();
agg.flush_into(&mut out);
assert_eq!(0, out.len());
agg.record(gauge_a_1.clone());
agg.record(gauge_a_2.clone());
agg.record(gauge_a_3.clone());
out.clear();
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&mean_result, &out[0]);
}
#[test]
fn absolute_stdev() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 1000_u64,
mode: AggregationMode::Stdev,
})
.unwrap();
let gauges = vec![
make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 25.0 },
),
make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 30.0 },
),
make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 35.0 },
),
make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 40.0 },
),
make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 45.0 },
),
make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 50.0 },
),
make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 55.0 },
),
];
let stdev_result = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 10.0 },
);
for gauge in gauges {
agg.record(gauge);
}
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&stdev_result, &out[0]);
}
#[test]
fn conflicting_value_type() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 1000_u64,
mode: AggregationMode::Auto,
})
.unwrap();
let counter = make_metric(
"the-thing",
MetricKind::Incremental,
MetricValue::Counter { value: 42.0 },
);
let mut values = BTreeSet::<String>::new();
values.insert("a".into());
values.insert("b".into());
let set = make_metric(
"the-thing",
MetricKind::Incremental,
MetricValue::Set { values },
);
let summed = make_metric(
"the-thing",
MetricKind::Incremental,
MetricValue::Counter { value: 84.0 },
);
agg.record(counter.clone());
agg.record(counter.clone());
agg.record(set.clone());
agg.record(set.clone());
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&set, &out[0]);
agg.record(set.clone());
agg.record(set);
agg.record(counter.clone());
agg.record(counter);
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&summed, &out[0]);
}
#[test]
fn conflicting_kinds() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 1000_u64,
mode: AggregationMode::Auto,
})
.unwrap();
let incremental = make_metric(
"the-thing",
MetricKind::Incremental,
MetricValue::Counter { value: 42.0 },
);
let absolute = make_metric(
"the-thing",
MetricKind::Absolute,
MetricValue::Counter { value: 43.0 },
);
let summed = make_metric(
"the-thing",
MetricKind::Incremental,
MetricValue::Counter { value: 84.0 },
);
agg.record(incremental.clone());
agg.record(incremental.clone());
agg.record(absolute.clone());
agg.record(absolute.clone());
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&absolute, &out[0]);
agg.record(absolute.clone());
agg.record(absolute);
agg.record(incremental.clone());
agg.record(incremental);
let mut out = vec![];
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&summed, &out[0]);
}
#[tokio::test]
async fn transform_shutdown() {
let agg = toml::from_str::<AggregateConfig>(
r"
interval_ms = 999999
",
)
.unwrap()
.build(&TransformContext::default())
.await
.unwrap();
let agg = agg.into_task();
let counter_a_1 = make_metric(
"counter_a",
MetricKind::Incremental,
MetricValue::Counter { value: 42.0 },
);
let counter_a_2 = make_metric(
"counter_a",
MetricKind::Incremental,
MetricValue::Counter { value: 43.0 },
);
let counter_a_summed = make_metric(
"counter_a",
MetricKind::Incremental,
MetricValue::Counter { value: 85.0 },
);
let gauge_a_1 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 42.0 },
);
let gauge_a_2 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 43.0 },
);
let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
let in_stream = Box::pin(stream::iter(inputs));
let mut out_stream = agg.transform_events(in_stream);
let mut count = 0_u8;
while let Some(event) = out_stream.next().await {
count += 1;
match event.as_metric().series().name.name.as_str() {
"counter_a" => assert_eq!(counter_a_summed, event),
"gauge_a" => assert_eq!(gauge_a_2, event),
_ => panic!("Unexpected metric name in aggregate output"),
};
}
assert_eq!(2, count);
}
#[tokio::test]
async fn transform_interval() {
let transform_config = toml::from_str::<AggregateConfig>("").unwrap();
let counter_a_1 = make_metric(
"counter_a",
MetricKind::Incremental,
MetricValue::Counter { value: 42.0 },
);
let counter_a_2 = make_metric(
"counter_a",
MetricKind::Incremental,
MetricValue::Counter { value: 43.0 },
);
let counter_a_summed = make_metric(
"counter_a",
MetricKind::Incremental,
MetricValue::Counter { value: 85.0 },
);
let gauge_a_1 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 42.0 },
);
let gauge_a_2 = make_metric(
"gauge_a",
MetricKind::Absolute,
MetricValue::Gauge { value: 43.0 },
);
assert_transform_compliance(async {
let (tx, rx) = mpsc::channel(10);
let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
let mut out = ReceiverStream::new(out);
tokio::time::pause();
assert_eq!(Poll::Pending, futures::poll!(out.next()));
tx.send(counter_a_1).await.unwrap();
tx.send(counter_a_2).await.unwrap();
tx.send(gauge_a_1).await.unwrap();
tx.send(gauge_a_2.clone()).await.unwrap();
assert_eq!(Poll::Pending, futures::poll!(out.next()));
tokio::time::advance(Duration::from_secs(11)).await;
let mut count = 0_u8;
while count < 2 {
if let Some(event) = out.next().await {
match event.as_metric().series().name.name.as_str() {
"counter_a" => assert_eq!(counter_a_summed, event),
"gauge_a" => assert_eq!(gauge_a_2, event),
_ => panic!("Unexpected metric name in aggregate output"),
};
count += 1;
} else {
panic!("Unexpectedly received None in output stream");
}
}
assert_eq!(Poll::Pending, futures::poll!(out.next()));
drop(tx);
topology.stop().await;
assert_eq!(out.next().await, None);
})
.await;
}
}