use crate::components::validation::{
    component_names::*, ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent,
};
use vector_lib::event::{Event, Metric, MetricKind};
use super::{ComponentMetricType, Validator};
#[derive(Default)]
pub struct ComponentSpecValidator;
impl Validator for ComponentSpecValidator {
    fn name(&self) -> &'static str {
        "component_spec"
    }
    fn check_validation(
        &self,
        component_type: ComponentType,
        expectation: TestCaseExpectation,
        inputs: &[TestEvent],
        outputs: &[Event],
        telemetry_events: &[Event],
        runner_metrics: &RunnerMetrics,
    ) -> Result<Vec<String>, Vec<String>> {
        let expect_received_events = inputs
            .iter()
            .filter(|te| !te.should_fail() || te.should_reject())
            .count() as u64;
        for input in inputs {
            info!("Validator observed input event: {:?}", input);
        }
        for output in outputs {
            info!("Validator observed output event: {:?}", output);
        }
        match expectation {
            TestCaseExpectation::Success => {
                if inputs.len() != outputs.len() {
                    return Err(vec![format!(
                        "Sent {} inputs but received {} outputs.",
                        inputs.len(),
                        outputs.len()
                    )]);
                }
            }
            TestCaseExpectation::Failure => {
                if !outputs.is_empty() {
                    return Err(vec![format!(
                        "Received {} outputs but none were expected.",
                        outputs.len()
                    )]);
                }
            }
            TestCaseExpectation::PartialSuccess => {
                if inputs.len() == outputs.len() {
                    return Err(vec![
                        "Received an output event for every input, when only some outputs were expected.".to_string()
                    ]);
                }
            }
        }
        let mut run_out = vec![
            format!(
                "sent {} inputs and received {} outputs",
                inputs.len(),
                outputs.len()
            ),
            format!("received {} telemetry events", telemetry_events.len()),
        ];
        let out = validate_telemetry(
            component_type,
            telemetry_events,
            runner_metrics,
            expect_received_events,
        )?;
        run_out.extend(out);
        Ok(run_out)
    }
}
fn validate_telemetry(
    component_type: ComponentType,
    telemetry_events: &[Event],
    runner_metrics: &RunnerMetrics,
    expect_received_events: u64,
) -> Result<Vec<String>, Vec<String>> {
    let mut out: Vec<String> = Vec::new();
    let mut errs: Vec<String> = Vec::new();
    let metric_types = [
        ComponentMetricType::EventsReceived,
        ComponentMetricType::EventsReceivedBytes,
        ComponentMetricType::ReceivedBytesTotal,
        ComponentMetricType::SentEventsTotal,
        ComponentMetricType::SentEventBytesTotal,
        ComponentMetricType::SentBytesTotal,
        ComponentMetricType::ErrorsTotal,
        ComponentMetricType::DiscardedEventsTotal,
    ];
    metric_types.iter().for_each(|metric_type| {
        match validate_metric(
            telemetry_events,
            runner_metrics,
            metric_type,
            component_type,
            expect_received_events,
        ) {
            Err(e) => errs.extend(e),
            Ok(m) => out.extend(m),
        }
    });
    if errs.is_empty() {
        Ok(out)
    } else {
        Err(errs)
    }
}
fn validate_metric(
    telemetry_events: &[Event],
    runner_metrics: &RunnerMetrics,
    metric_type: &ComponentMetricType,
    component_type: ComponentType,
    expect_received_events: u64,
) -> Result<Vec<String>, Vec<String>> {
    let component_id = match component_type {
        ComponentType::Source => TEST_SOURCE_NAME,
        ComponentType::Transform => TEST_TRANSFORM_NAME,
        ComponentType::Sink => TEST_SINK_NAME,
    };
    let expected = match metric_type {
        ComponentMetricType::EventsReceived => {
            runner_metrics.sent_events_total
        }
        ComponentMetricType::EventsReceivedBytes => {
            runner_metrics.sent_event_bytes_total
        }
        ComponentMetricType::ReceivedBytesTotal => {
            if component_type == ComponentType::Sink {
                0 } else {
                runner_metrics.sent_bytes_total
            }
        }
        ComponentMetricType::SentEventsTotal => {
            runner_metrics.received_events_total
        }
        ComponentMetricType::SentBytesTotal => {
            if component_type == ComponentType::Source {
                0 } else {
                runner_metrics.received_bytes_total
            }
        }
        ComponentMetricType::SentEventBytesTotal => {
            runner_metrics.received_event_bytes_total
        }
        ComponentMetricType::ErrorsTotal => runner_metrics.errors_total,
        ComponentMetricType::DiscardedEventsTotal => runner_metrics.discarded_events_total,
    };
    compare_actual_to_expected(
        telemetry_events,
        metric_type,
        component_id,
        expected,
        expect_received_events,
    )
}
fn filter_events_by_metric_and_component<'a>(
    telemetry_events: &'a [Event],
    metric: &ComponentMetricType,
    component_id: &'a str,
) -> Vec<&'a Metric> {
    info!(
        "Filter looking for metric {} {}",
        metric.to_string(),
        component_id
    );
    let metrics: Vec<&Metric> = telemetry_events
        .iter()
        .flat_map(|e| {
            if let vector_lib::event::Event::Metric(m) = e {
                Some(m)
            } else {
                None
            }
        })
        .filter(|&m| {
            if m.name() == metric.to_string() {
                debug!("{}", m);
                if let Some(tags) = m.tags() {
                    if tags.get("component_id").unwrap_or("") == component_id {
                        return true;
                    }
                }
            }
            false
        })
        .collect();
    info!("{}: {} metrics found.", metric.to_string(), metrics.len());
    metrics
}
fn sum_counters(
    metric_name: &ComponentMetricType,
    metrics: &[&Metric],
) -> Result<u64, Vec<String>> {
    let mut sum: f64 = 0.0;
    let mut errs = Vec::new();
    for m in metrics {
        match m.value() {
            vector_lib::event::MetricValue::Counter { value } => {
                if let MetricKind::Absolute = m.data().kind {
                    sum = *value;
                } else {
                    sum += *value;
                }
            }
            _ => errs.push(format!("{}: metric value is not a counter", metric_name,)),
        }
    }
    if errs.is_empty() {
        Ok(sum as u64)
    } else {
        Err(errs)
    }
}
fn compare_actual_to_expected(
    telemetry_events: &[Event],
    metric_type: &ComponentMetricType,
    component_id: &str,
    expected: u64,
    expect_received_events: u64,
) -> Result<Vec<String>, Vec<String>> {
    let mut errs: Vec<String> = Vec::new();
    let metrics =
        filter_events_by_metric_and_component(telemetry_events, metric_type, component_id);
    let actual = sum_counters(metric_type, &metrics)?;
    info!("{metric_type}: expected {expected}, actual {actual}.");
    if actual != expected &&
        (metric_type != &ComponentMetricType::EventsReceivedBytes
            || (actual != (expected + (expect_received_events * 2))))
    {
        errs.push(format!(
            "{metric_type}: expected {expected}, actual {actual}",
        ));
    }
    if !errs.is_empty() {
        return Err(errs);
    }
    Ok(vec![format!("{}: {}", metric_type, actual)])
}