use std::{
    collections::{hash_map::Entry, HashMap},
    pin::Pin,
    time::Duration,
};
use async_stream::stream;
use futures::{Stream, StreamExt};
use vector_lib::{config::LogNamespace, event::MetricValue};
use vector_lib::{
    configurable::configurable_component,
    event::metric::{Metric, MetricData, MetricKind, MetricSeries},
};
use crate::{
    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
    event::{Event, EventMetadata},
    internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
    schema,
    transforms::{TaskTransform, Transform},
};
#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct AggregateConfig {
    #[serde(default = "default_interval_ms")]
    #[configurable(metadata(docs::human_name = "Flush Interval"))]
    pub interval_ms: u64,
    #[serde(default = "default_mode")]
    #[configurable(derived)]
    pub mode: AggregationMode,
}
#[configurable_component]
#[derive(Clone, Debug, Default)]
#[configurable(description = "The aggregation mode to use.")]
pub enum AggregationMode {
    #[default]
    Auto,
    Sum,
    Latest,
    Count,
    Diff,
    Max,
    Min,
    Mean,
    Stdev,
}
const fn default_mode() -> AggregationMode {
    AggregationMode::Auto
}
const fn default_interval_ms() -> u64 {
    10 * 1000
}
impl_generate_config_from_default!(AggregateConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "aggregate")]
impl TransformConfig for AggregateConfig {
    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
        Aggregate::new(self).map(Transform::event_task)
    }
    fn input(&self) -> Input {
        Input::metric()
    }
    fn outputs(
        &self,
        _: vector_lib::enrichment::TableRegistry,
        _: &[(OutputId, schema::Definition)],
        _: LogNamespace,
    ) -> Vec<TransformOutput> {
        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
    }
}
type MetricEntry = (MetricData, EventMetadata);
#[derive(Debug)]
pub struct Aggregate {
    interval: Duration,
    map: HashMap<MetricSeries, MetricEntry>,
    prev_map: HashMap<MetricSeries, MetricEntry>,
    multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
    mode: AggregationMode,
}
impl Aggregate {
    pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
        Ok(Self {
            interval: Duration::from_millis(config.interval_ms),
            map: Default::default(),
            prev_map: Default::default(),
            multi_map: Default::default(),
            mode: config.mode.clone(),
        })
    }
    fn record(&mut self, event: Event) {
        let (series, data, metadata) = event.into_metric().into_parts();
        match self.mode {
            AggregationMode::Auto => match data.kind {
                MetricKind::Incremental => self.record_sum(series, data, metadata),
                MetricKind::Absolute => {
                    self.map.insert(series, (data, metadata));
                }
            },
            AggregationMode::Sum => self.record_sum(series, data, metadata),
            AggregationMode::Latest | AggregationMode::Diff => match data.kind {
                MetricKind::Incremental => (),
                MetricKind::Absolute => {
                    self.map.insert(series, (data, metadata));
                }
            },
            AggregationMode::Count => self.record_count(series, data, metadata),
            AggregationMode::Max | AggregationMode::Min => {
                self.record_comparison(series, data, metadata)
            }
            AggregationMode::Mean | AggregationMode::Stdev => match data.kind {
                MetricKind::Incremental => (),
                MetricKind::Absolute => {
                    if matches!(data.value, MetricValue::Gauge { value: _ }) {
                        match self.multi_map.entry(series) {
                            Entry::Occupied(mut entry) => {
                                let existing = entry.get_mut();
                                existing.push((data, metadata));
                            }
                            Entry::Vacant(entry) => {
                                entry.insert(vec![(data, metadata)]);
                            }
                        }
                    }
                }
            },
        }
        emit!(AggregateEventRecorded);
    }
    fn record_count(
        &mut self,
        series: MetricSeries,
        mut data: MetricData,
        metadata: EventMetadata,
    ) {
        let mut count_data = data.clone();
        let existing = self.map.entry(series).or_insert_with(|| {
            *data.value_mut() = MetricValue::Counter { value: 0f64 };
            (data.clone(), metadata.clone())
        });
        *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
        if existing.0.kind == data.kind && existing.0.update(&count_data) {
            existing.1.merge(metadata);
        } else {
            emit!(AggregateUpdateFailed);
        }
    }
    fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
        match data.kind {
            MetricKind::Incremental => match self.map.entry(series) {
                Entry::Occupied(mut entry) => {
                    let existing = entry.get_mut();
                    if existing.0.kind == data.kind && existing.0.update(&data) {
                        existing.1.merge(metadata);
                    } else {
                        emit!(AggregateUpdateFailed);
                        *existing = (data, metadata);
                    }
                }
                Entry::Vacant(entry) => {
                    entry.insert((data, metadata));
                }
            },
            MetricKind::Absolute => {}
        }
    }
    fn record_comparison(
        &mut self,
        series: MetricSeries,
        data: MetricData,
        metadata: EventMetadata,
    ) {
        match data.kind {
            MetricKind::Incremental => (),
            MetricKind::Absolute => match self.map.entry(series) {
                Entry::Occupied(mut entry) => {
                    let existing = entry.get_mut();
                    if existing.0.kind == data.kind {
                        if let MetricValue::Gauge {
                            value: existing_value,
                        } = existing.0.value()
                        {
                            if let MetricValue::Gauge { value: new_value } = data.value() {
                                let should_update = match self.mode {
                                    AggregationMode::Max => new_value > existing_value,
                                    AggregationMode::Min => new_value < existing_value,
                                    _ => false,
                                };
                                if should_update {
                                    *existing = (data, metadata);
                                }
                            }
                        }
                    } else {
                        emit!(AggregateUpdateFailed);
                        *existing = (data, metadata);
                    }
                }
                Entry::Vacant(entry) => {
                    entry.insert((data, metadata));
                }
            },
        }
    }
    fn flush_into(&mut self, output: &mut Vec<Event>) {
        let map = std::mem::take(&mut self.map);
        for (series, entry) in map.clone().into_iter() {
            let mut metric = Metric::from_parts(series, entry.0, entry.1);
            if matches!(self.mode, AggregationMode::Diff) {
                if let Some(prev_entry) = self.prev_map.get(metric.series()) {
                    if metric.data().kind == prev_entry.0.kind && !metric.subtract(&prev_entry.0) {
                        emit!(AggregateUpdateFailed);
                    }
                }
            }
            output.push(Event::Metric(metric));
        }
        let multi_map = std::mem::take(&mut self.multi_map);
        'outer: for (series, entries) in multi_map.into_iter() {
            if entries.is_empty() {
                continue;
            }
            let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
            for (data, metadata) in entries.iter().skip(1) {
                if !final_sum.update(data) {
                    emit!(AggregateUpdateFailed);
                    continue 'outer;
                }
                final_metadata.merge(metadata.clone());
            }
            let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
                *value /= entries.len() as f64;
                *value
            } else {
                0.0
            };
            let final_mean = final_sum.clone();
            match self.mode {
                AggregationMode::Mean => {
                    let metric = Metric::from_parts(series, final_mean, final_metadata);
                    output.push(Event::Metric(metric));
                }
                AggregationMode::Stdev => {
                    let variance = entries
                        .iter()
                        .filter_map(|(data, _)| {
                            if let MetricValue::Gauge { value } = data.value() {
                                let diff = final_mean_value - value;
                                Some(diff * diff)
                            } else {
                                None
                            }
                        })
                        .sum::<f64>()
                        / entries.len() as f64;
                    let mut final_stdev = final_mean;
                    if let MetricValue::Gauge { value } = final_stdev.value_mut() {
                        *value = variance.sqrt()
                    }
                    let metric = Metric::from_parts(series, final_stdev, final_metadata);
                    output.push(Event::Metric(metric));
                }
                _ => (),
            }
        }
        self.prev_map = map;
        emit!(AggregateFlushed);
    }
}
impl TaskTransform<Event> for Aggregate {
    fn transform(
        mut self: Box<Self>,
        mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
    where
        Self: 'static,
    {
        let mut flush_stream = tokio::time::interval(self.interval);
        Box::pin(stream! {
            let mut output = Vec::new();
            let mut done = false;
            while !done {
                tokio::select! {
                    _ = flush_stream.tick() => {
                        self.flush_into(&mut output);
                    },
                    maybe_event = input_rx.next() => {
                        match maybe_event {
                            None => {
                                self.flush_into(&mut output);
                                done = true;
                            }
                            Some(event) => self.record(event),
                        }
                    }
                };
                for event in output.drain(..) {
                    yield event;
                }
            }
        })
    }
}
#[cfg(test)]
mod tests {
    use std::{collections::BTreeSet, sync::Arc, task::Poll};
    use futures::stream;
    use tokio::sync::mpsc;
    use tokio_stream::wrappers::ReceiverStream;
    use vector_lib::config::ComponentKey;
    use vrl::value::Kind;
    use super::*;
    use crate::schema::Definition;
    use crate::{
        event::{
            metric::{MetricKind, MetricValue},
            Event, Metric,
        },
        test_util::components::assert_transform_compliance,
        transforms::test::create_topology,
    };
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<AggregateConfig>();
    }
    fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
        let mut event = Event::Metric(Metric::new(name, kind, value))
            .with_source_id(Arc::new(ComponentKey::from("in")))
            .with_upstream_id(Arc::new(OutputId::from("transform")));
        event.metadata_mut().set_schema_definition(&Arc::new(
            Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
        ));
        event.metadata_mut().set_source_type("unit_test_stream");
        event
    }
    #[test]
    fn incremental_auto() {
        let mut agg = Aggregate::new(&AggregateConfig {
            interval_ms: 1000_u64,
            mode: AggregationMode::Auto,
        })
        .unwrap();
        let counter_a_1 = make_metric(
            "counter_a",
            MetricKind::Incremental,
            MetricValue::Counter { value: 42.0 },
        );
        let counter_a_2 = make_metric(
            "counter_a",
            MetricKind::Incremental,
            MetricValue::Counter { value: 43.0 },
        );
        let counter_a_summed = make_metric(
            "counter_a",
            MetricKind::Incremental,
            MetricValue::Counter { value: 85.0 },
        );
        agg.record(counter_a_1.clone());
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&counter_a_1, &out[0]);
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        agg.record(counter_a_1.clone());
        agg.record(counter_a_2);
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&counter_a_summed, &out[0]);
        let counter_b_1 = make_metric(
            "counter_b",
            MetricKind::Incremental,
            MetricValue::Counter { value: 44.0 },
        );
        agg.record(counter_a_1.clone());
        agg.record(counter_b_1.clone());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(2, out.len());
        for event in out {
            match event.as_metric().series().name.name.as_str() {
                "counter_a" => assert_eq!(counter_a_1, event),
                "counter_b" => assert_eq!(counter_b_1, event),
                _ => panic!("Unexpected metric name in aggregate output"),
            }
        }
    }
    #[test]
    fn absolute_auto() {
        let mut agg = Aggregate::new(&AggregateConfig {
            interval_ms: 1000_u64,
            mode: AggregationMode::Auto,
        })
        .unwrap();
        let gauge_a_1 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 42.0 },
        );
        let gauge_a_2 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 43.0 },
        );
        agg.record(gauge_a_1.clone());
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&gauge_a_1, &out[0]);
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        agg.record(gauge_a_1.clone());
        agg.record(gauge_a_2.clone());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&gauge_a_2, &out[0]);
        let gauge_b_1 = make_metric(
            "gauge_b",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 44.0 },
        );
        agg.record(gauge_a_1.clone());
        agg.record(gauge_b_1.clone());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(2, out.len());
        for event in out {
            match event.as_metric().series().name.name.as_str() {
                "gauge_a" => assert_eq!(gauge_a_1, event),
                "gauge_b" => assert_eq!(gauge_b_1, event),
                _ => panic!("Unexpected metric name in aggregate output"),
            }
        }
    }
    #[test]
    fn count_agg() {
        let mut agg = Aggregate::new(&AggregateConfig {
            interval_ms: 1000_u64,
            mode: AggregationMode::Count,
        })
        .unwrap();
        let gauge_a_1 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 42.0 },
        );
        let gauge_a_2 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 43.0 },
        );
        let result_count = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Counter { value: 1.0 },
        );
        let result_count_2 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Counter { value: 2.0 },
        );
        agg.record(gauge_a_1.clone());
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&result_count, &out[0]);
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        agg.record(gauge_a_1.clone());
        agg.record(gauge_a_2.clone());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&result_count_2, &out[0]);
    }
    #[test]
    fn absolute_max() {
        let mut agg = Aggregate::new(&AggregateConfig {
            interval_ms: 1000_u64,
            mode: AggregationMode::Max,
        })
        .unwrap();
        let gauge_a_1 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 112.0 },
        );
        let gauge_a_2 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 89.0 },
        );
        agg.record(gauge_a_2.clone());
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&gauge_a_2, &out[0]);
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        agg.record(gauge_a_1.clone());
        agg.record(gauge_a_2.clone());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&gauge_a_1, &out[0]);
    }
    #[test]
    fn absolute_min() {
        let mut agg = Aggregate::new(&AggregateConfig {
            interval_ms: 1000_u64,
            mode: AggregationMode::Min,
        })
        .unwrap();
        let gauge_a_1 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 32.0 },
        );
        let gauge_a_2 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 89.0 },
        );
        agg.record(gauge_a_2.clone());
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&gauge_a_2, &out[0]);
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        agg.record(gauge_a_1.clone());
        agg.record(gauge_a_2.clone());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&gauge_a_1, &out[0]);
    }
    #[test]
    fn absolute_diff() {
        let mut agg = Aggregate::new(&AggregateConfig {
            interval_ms: 1000_u64,
            mode: AggregationMode::Diff,
        })
        .unwrap();
        let gauge_a_1 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 32.0 },
        );
        let gauge_a_2 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 82.0 },
        );
        let result = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 50.0 },
        );
        agg.record(gauge_a_2.clone());
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&gauge_a_2, &out[0]);
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        agg.record(gauge_a_1.clone());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&gauge_a_1, &out[0]);
        agg.record(gauge_a_2.clone());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&result, &out[0]);
    }
    #[test]
    fn absolute_diff_conflicting_type() {
        let mut agg = Aggregate::new(&AggregateConfig {
            interval_ms: 1000_u64,
            mode: AggregationMode::Diff,
        })
        .unwrap();
        let gauge_a_1 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 32.0 },
        );
        let gauge_a_2 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Counter { value: 1.0 },
        );
        let mut out = vec![];
        agg.record(gauge_a_1.clone());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&gauge_a_1, &out[0]);
        agg.record(gauge_a_2.clone());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&gauge_a_2, &out[0]);
    }
    #[test]
    fn absolute_mean() {
        let mut agg = Aggregate::new(&AggregateConfig {
            interval_ms: 1000_u64,
            mode: AggregationMode::Mean,
        })
        .unwrap();
        let gauge_a_1 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 32.0 },
        );
        let gauge_a_2 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 82.0 },
        );
        let gauge_a_3 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 51.0 },
        );
        let mean_result = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 55.0 },
        );
        agg.record(gauge_a_2.clone());
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&gauge_a_2, &out[0]);
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(0, out.len());
        agg.record(gauge_a_1.clone());
        agg.record(gauge_a_2.clone());
        agg.record(gauge_a_3.clone());
        out.clear();
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&mean_result, &out[0]);
    }
    #[test]
    fn absolute_stdev() {
        let mut agg = Aggregate::new(&AggregateConfig {
            interval_ms: 1000_u64,
            mode: AggregationMode::Stdev,
        })
        .unwrap();
        let gauges = vec![
            make_metric(
                "gauge_a",
                MetricKind::Absolute,
                MetricValue::Gauge { value: 25.0 },
            ),
            make_metric(
                "gauge_a",
                MetricKind::Absolute,
                MetricValue::Gauge { value: 30.0 },
            ),
            make_metric(
                "gauge_a",
                MetricKind::Absolute,
                MetricValue::Gauge { value: 35.0 },
            ),
            make_metric(
                "gauge_a",
                MetricKind::Absolute,
                MetricValue::Gauge { value: 40.0 },
            ),
            make_metric(
                "gauge_a",
                MetricKind::Absolute,
                MetricValue::Gauge { value: 45.0 },
            ),
            make_metric(
                "gauge_a",
                MetricKind::Absolute,
                MetricValue::Gauge { value: 50.0 },
            ),
            make_metric(
                "gauge_a",
                MetricKind::Absolute,
                MetricValue::Gauge { value: 55.0 },
            ),
        ];
        let stdev_result = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 10.0 },
        );
        for gauge in gauges {
            agg.record(gauge);
        }
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&stdev_result, &out[0]);
    }
    #[test]
    fn conflicting_value_type() {
        let mut agg = Aggregate::new(&AggregateConfig {
            interval_ms: 1000_u64,
            mode: AggregationMode::Auto,
        })
        .unwrap();
        let counter = make_metric(
            "the-thing",
            MetricKind::Incremental,
            MetricValue::Counter { value: 42.0 },
        );
        let mut values = BTreeSet::<String>::new();
        values.insert("a".into());
        values.insert("b".into());
        let set = make_metric(
            "the-thing",
            MetricKind::Incremental,
            MetricValue::Set { values },
        );
        let summed = make_metric(
            "the-thing",
            MetricKind::Incremental,
            MetricValue::Counter { value: 84.0 },
        );
        agg.record(counter.clone());
        agg.record(counter.clone());
        agg.record(set.clone());
        agg.record(set.clone());
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&set, &out[0]);
        agg.record(set.clone());
        agg.record(set);
        agg.record(counter.clone());
        agg.record(counter);
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&summed, &out[0]);
    }
    #[test]
    fn conflicting_kinds() {
        let mut agg = Aggregate::new(&AggregateConfig {
            interval_ms: 1000_u64,
            mode: AggregationMode::Auto,
        })
        .unwrap();
        let incremental = make_metric(
            "the-thing",
            MetricKind::Incremental,
            MetricValue::Counter { value: 42.0 },
        );
        let absolute = make_metric(
            "the-thing",
            MetricKind::Absolute,
            MetricValue::Counter { value: 43.0 },
        );
        let summed = make_metric(
            "the-thing",
            MetricKind::Incremental,
            MetricValue::Counter { value: 84.0 },
        );
        agg.record(incremental.clone());
        agg.record(incremental.clone());
        agg.record(absolute.clone());
        agg.record(absolute.clone());
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&absolute, &out[0]);
        agg.record(absolute.clone());
        agg.record(absolute);
        agg.record(incremental.clone());
        agg.record(incremental);
        let mut out = vec![];
        agg.flush_into(&mut out);
        assert_eq!(1, out.len());
        assert_eq!(&summed, &out[0]);
    }
    #[tokio::test]
    async fn transform_shutdown() {
        let agg = toml::from_str::<AggregateConfig>(
            r"
interval_ms = 999999
",
        )
        .unwrap()
        .build(&TransformContext::default())
        .await
        .unwrap();
        let agg = agg.into_task();
        let counter_a_1 = make_metric(
            "counter_a",
            MetricKind::Incremental,
            MetricValue::Counter { value: 42.0 },
        );
        let counter_a_2 = make_metric(
            "counter_a",
            MetricKind::Incremental,
            MetricValue::Counter { value: 43.0 },
        );
        let counter_a_summed = make_metric(
            "counter_a",
            MetricKind::Incremental,
            MetricValue::Counter { value: 85.0 },
        );
        let gauge_a_1 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 42.0 },
        );
        let gauge_a_2 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 43.0 },
        );
        let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
        let in_stream = Box::pin(stream::iter(inputs));
        let mut out_stream = agg.transform_events(in_stream);
        let mut count = 0_u8;
        while let Some(event) = out_stream.next().await {
            count += 1;
            match event.as_metric().series().name.name.as_str() {
                "counter_a" => assert_eq!(counter_a_summed, event),
                "gauge_a" => assert_eq!(gauge_a_2, event),
                _ => panic!("Unexpected metric name in aggregate output"),
            };
        }
        assert_eq!(2, count);
    }
    #[tokio::test]
    async fn transform_interval() {
        let transform_config = toml::from_str::<AggregateConfig>("").unwrap();
        let counter_a_1 = make_metric(
            "counter_a",
            MetricKind::Incremental,
            MetricValue::Counter { value: 42.0 },
        );
        let counter_a_2 = make_metric(
            "counter_a",
            MetricKind::Incremental,
            MetricValue::Counter { value: 43.0 },
        );
        let counter_a_summed = make_metric(
            "counter_a",
            MetricKind::Incremental,
            MetricValue::Counter { value: 85.0 },
        );
        let gauge_a_1 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 42.0 },
        );
        let gauge_a_2 = make_metric(
            "gauge_a",
            MetricKind::Absolute,
            MetricValue::Gauge { value: 43.0 },
        );
        assert_transform_compliance(async {
            let (tx, rx) = mpsc::channel(10);
            let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
            let mut out = ReceiverStream::new(out);
            tokio::time::pause();
            assert_eq!(Poll::Pending, futures::poll!(out.next()));
            tx.send(counter_a_1).await.unwrap();
            tx.send(counter_a_2).await.unwrap();
            tx.send(gauge_a_1).await.unwrap();
            tx.send(gauge_a_2.clone()).await.unwrap();
            assert_eq!(Poll::Pending, futures::poll!(out.next()));
            tokio::time::advance(Duration::from_secs(11)).await;
            let mut count = 0_u8;
            while count < 2 {
                if let Some(event) = out.next().await {
                    match event.as_metric().series().name.name.as_str() {
                        "counter_a" => assert_eq!(counter_a_summed, event),
                        "gauge_a" => assert_eq!(gauge_a_2, event),
                        _ => panic!("Unexpected metric name in aggregate output"),
                    };
                    count += 1;
                } else {
                    panic!("Unexpectedly received None in output stream");
                }
            }
            assert_eq!(Poll::Pending, futures::poll!(out.next()));
            drop(tx);
            topology.stop().await;
            assert_eq!(out.next().await, None);
        })
        .await;
    }
}