mod ddsketch;
mod label_filter;
mod recency;
mod recorder;
mod storage;
use std::{sync::OnceLock, time::Duration};
use chrono::Utc;
use metrics::Key;
use metrics_tracing_context::TracingContextLayer;
use metrics_util::layers::Layer;
use snafu::Snafu;
pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
use self::{label_filter::VectorLabelFilter, recorder::Registry, recorder::VectorRecorder};
use crate::event::{Metric, MetricValue};
type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Copy, Debug, PartialEq, Snafu)]
pub enum Error {
    #[snafu(display("Recorder already initialized."))]
    AlreadyInitialized,
    #[snafu(display("Metrics system was not initialized."))]
    NotInitialized,
    #[snafu(display("Timeout value of {} must be positive.", timeout))]
    TimeoutMustBePositive { timeout: f64 },
}
static CONTROLLER: OnceLock<Controller> = OnceLock::new();
const CARDINALITY_KEY_NAME: &str = "internal_metrics_cardinality";
static CARDINALITY_KEY: Key = Key::from_static_name(CARDINALITY_KEY_NAME);
const CARDINALITY_COUNTER_KEY_NAME: &str = "internal_metrics_cardinality_total";
static CARDINALITY_COUNTER_KEY: Key = Key::from_static_name(CARDINALITY_COUNTER_KEY_NAME);
pub struct Controller {
    recorder: VectorRecorder,
}
fn metrics_enabled() -> bool {
    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_CORE"), Ok(x) if x == "true")
}
fn tracing_context_layer_enabled() -> bool {
    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
}
fn init(recorder: VectorRecorder) -> Result<()> {
    if !metrics_enabled() {
        metrics::set_global_recorder(metrics::NoopRecorder)
            .map_err(|_| Error::AlreadyInitialized)?;
        info!(message = "Internal metrics core is disabled.");
        return Ok(());
    }
    if tracing_context_layer_enabled() {
        metrics::set_global_recorder(
            TracingContextLayer::new(VectorLabelFilter).layer(recorder.clone()),
        )
        .map_err(|_| Error::AlreadyInitialized)?;
    } else {
        metrics::set_global_recorder(recorder.clone()).map_err(|_| Error::AlreadyInitialized)?;
    }
    let controller = Controller { recorder };
    CONTROLLER
        .set(controller)
        .map_err(|_| Error::AlreadyInitialized)?;
    Ok(())
}
pub fn init_global() -> Result<()> {
    init(VectorRecorder::new_global())
}
pub fn init_test() {
    if init(VectorRecorder::new_test()).is_err() {
        while CONTROLLER.get().is_none() {}
    }
}
impl Controller {
    pub fn reset(&self) {
        self.recorder.with_registry(Registry::clear);
    }
    pub fn get() -> Result<&'static Self> {
        CONTROLLER.get().ok_or(Error::NotInitialized)
    }
    pub fn set_expiry(&self, timeout: Option<f64>) -> Result<()> {
        if let Some(timeout) = timeout {
            if timeout <= 0.0 {
                return Err(Error::TimeoutMustBePositive { timeout });
            }
        }
        self.recorder
            .with_registry(|registry| registry.set_expiry(timeout.map(Duration::from_secs_f64)));
        Ok(())
    }
    pub fn capture_metrics(&self) -> Vec<Metric> {
        let timestamp = Utc::now();
        let mut metrics = self.recorder.with_registry(Registry::visit_metrics);
        #[allow(clippy::cast_precision_loss)]
        let value = (metrics.len() + 2) as f64;
        metrics.push(Metric::from_metric_kv(
            &CARDINALITY_KEY,
            MetricValue::Gauge { value },
            timestamp,
        ));
        metrics.push(Metric::from_metric_kv(
            &CARDINALITY_COUNTER_KEY,
            MetricValue::Counter { value },
            timestamp,
        ));
        metrics
    }
}
#[macro_export]
macro_rules! update_counter {
    ($label:literal, $value:expr) => {{
        use ::std::sync::atomic::{AtomicU64, Ordering};
        static PREVIOUS_VALUE: AtomicU64 = AtomicU64::new(0);
        let new_value = $value;
        let mut previous_value = PREVIOUS_VALUE.load(Ordering::Relaxed);
        loop {
            if new_value <= previous_value {
                break;
            }
            match PREVIOUS_VALUE.compare_exchange_weak(
                previous_value,
                new_value,
                Ordering::SeqCst,
                Ordering::Relaxed,
            ) {
                Err(value) => previous_value = value,
                Ok(_) => {
                    let delta = new_value - previous_value;
                    counter!($label).increment(delta);
                    break;
                }
            }
        }
    }};
}
#[cfg(test)]
mod tests {
    use super::*;
    use crate::event::MetricKind;
    const IDLE_TIMEOUT: f64 = 0.5;
    fn init_metrics() -> &'static Controller {
        init_test();
        Controller::get().expect("Could not get global metrics controller")
    }
    #[test]
    fn cardinality_matches() {
        for cardinality in [0, 1, 10, 100, 1000, 10000] {
            init_test();
            let controller = Controller::get().unwrap();
            controller.reset();
            for idx in 0..cardinality {
                metrics::counter!("test", "idx" => idx.to_string()).increment(1);
            }
            let metrics = controller.capture_metrics();
            assert_eq!(metrics.len(), cardinality + 2);
            #[allow(clippy::cast_precision_loss)]
            let value = metrics.len() as f64;
            for metric in metrics {
                match metric.name() {
                    CARDINALITY_KEY_NAME => {
                        assert_eq!(metric.value(), &MetricValue::Gauge { value });
                        assert_eq!(metric.kind(), MetricKind::Absolute);
                    }
                    CARDINALITY_COUNTER_KEY_NAME => {
                        assert_eq!(metric.value(), &MetricValue::Counter { value });
                        assert_eq!(metric.kind(), MetricKind::Absolute);
                    }
                    _ => {}
                }
            }
        }
    }
    #[test]
    fn handles_registered_metrics() {
        let controller = init_metrics();
        let counter = metrics::counter!("test7");
        assert_eq!(controller.capture_metrics().len(), 3);
        counter.increment(1);
        assert_eq!(controller.capture_metrics().len(), 3);
        let gauge = metrics::gauge!("test8");
        assert_eq!(controller.capture_metrics().len(), 4);
        gauge.set(1.0);
        assert_eq!(controller.capture_metrics().len(), 4);
    }
    #[test]
    fn expires_metrics() {
        let controller = init_metrics();
        controller.set_expiry(Some(IDLE_TIMEOUT)).unwrap();
        metrics::counter!("test2").increment(1);
        metrics::counter!("test3").increment(2);
        assert_eq!(controller.capture_metrics().len(), 4);
        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
        metrics::counter!("test2").increment(3);
        assert_eq!(controller.capture_metrics().len(), 3);
    }
    #[test]
    fn expires_metrics_tags() {
        let controller = init_metrics();
        controller.set_expiry(Some(IDLE_TIMEOUT)).unwrap();
        metrics::counter!("test4", "tag" => "value1").increment(1);
        metrics::counter!("test4", "tag" => "value2").increment(2);
        assert_eq!(controller.capture_metrics().len(), 4);
        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
        metrics::counter!("test4", "tag" => "value1").increment(3);
        assert_eq!(controller.capture_metrics().len(), 3);
    }
    #[test]
    fn skips_expiring_registered() {
        let controller = init_metrics();
        controller.set_expiry(Some(IDLE_TIMEOUT)).unwrap();
        let a = metrics::counter!("test5");
        metrics::counter!("test6").increment(5);
        assert_eq!(controller.capture_metrics().len(), 4);
        a.increment(1);
        assert_eq!(controller.capture_metrics().len(), 4);
        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
        assert_eq!(controller.capture_metrics().len(), 3);
        a.increment(1);
        let metrics = controller.capture_metrics();
        assert_eq!(metrics.len(), 3);
        let metric = metrics
            .into_iter()
            .find(|metric| metric.name() == "test5")
            .expect("Test metric is not present");
        match metric.value() {
            MetricValue::Counter { value } => assert_eq!(*value, 2.0),
            value => panic!("Invalid metric value {value:?}"),
        }
    }
}