use std::time::Duration;
use futures::StreamExt;
use serde_with::serde_as;
use tokio::time;
use tokio_stream::wrappers::IntervalStream;
use vector_lib::configurable::configurable_component;
use vector_lib::internal_event::{
    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
};
use vector_lib::lookup::lookup_v2::OptionalValuePath;
use vector_lib::{config::LogNamespace, ByteSizeOf, EstimatedJsonEncodedSizeOf};
use crate::{
    config::{log_schema, SourceConfig, SourceContext, SourceOutput},
    internal_events::{EventsReceived, StreamClosedError},
    metrics::Controller,
    shutdown::ShutdownSignal,
    SourceSender,
};
#[serde_as]
#[configurable_component(source(
    "internal_metrics",
    "Expose internal metrics emitted by the running Vector instance."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields, default)]
pub struct InternalMetricsConfig {
    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
    #[serde(default = "default_scrape_interval")]
    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
    pub scrape_interval_secs: Duration,
    #[configurable(derived)]
    pub tags: TagsConfig,
    #[serde(default = "default_namespace")]
    pub namespace: String,
}
impl Default for InternalMetricsConfig {
    fn default() -> Self {
        Self {
            scrape_interval_secs: default_scrape_interval(),
            tags: TagsConfig::default(),
            namespace: default_namespace(),
        }
    }
}
#[configurable_component]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields, default)]
pub struct TagsConfig {
    pub host_key: Option<OptionalValuePath>,
    #[configurable(metadata(docs::examples = "pid"))]
    pub pid_key: Option<String>,
}
fn default_scrape_interval() -> Duration {
    Duration::from_secs_f64(1.0)
}
fn default_namespace() -> String {
    "vector".to_owned()
}
impl_generate_config_from_default!(InternalMetricsConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "internal_metrics")]
impl SourceConfig for InternalMetricsConfig {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        if self.scrape_interval_secs.is_zero() {
            warn!(
                "Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
            );
        }
        let interval = self.scrape_interval_secs;
        let namespace = self.namespace.clone();
        let host_key = self
            .tags
            .host_key
            .clone()
            .unwrap_or(log_schema().host_key().cloned().into());
        let pid_key = self
            .tags
            .pid_key
            .as_deref()
            .and_then(|tag| (!tag.is_empty()).then(|| tag.to_owned()));
        Ok(Box::pin(
            InternalMetrics {
                namespace,
                host_key,
                pid_key,
                controller: Controller::get()?,
                interval,
                out: cx.out,
                shutdown: cx.shutdown,
            }
            .run(),
        ))
    }
    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        vec![SourceOutput::new_metrics()]
    }
    fn can_acknowledge(&self) -> bool {
        false
    }
}
struct InternalMetrics<'a> {
    namespace: String,
    host_key: OptionalValuePath,
    pid_key: Option<String>,
    controller: &'a Controller,
    interval: time::Duration,
    out: SourceSender,
    shutdown: ShutdownSignal,
}
impl<'a> InternalMetrics<'a> {
    async fn run(mut self) -> Result<(), ()> {
        let events_received = register!(EventsReceived);
        let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
        let mut interval =
            IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
        while interval.next().await.is_some() {
            let hostname = crate::get_hostname();
            let pid = std::process::id().to_string();
            let metrics = self.controller.capture_metrics();
            let count = metrics.len();
            let byte_size = metrics.size_of();
            let json_size = metrics.estimated_json_encoded_size_of();
            bytes_received.emit(ByteSize(byte_size));
            events_received.emit(CountByteSize(count, json_size));
            let batch = metrics.into_iter().map(|mut metric| {
                if self.namespace != "vector" {
                    metric = metric.with_namespace(Some(self.namespace.clone()));
                }
                if let Some(host_key) = &self.host_key.path {
                    if let Ok(hostname) = &hostname {
                        metric.replace_tag(host_key.to_string(), hostname.to_owned());
                    }
                }
                if let Some(pid_key) = &self.pid_key {
                    metric.replace_tag(pid_key.to_owned(), pid.clone());
                }
                metric
            });
            if (self.out.send_batch(batch).await).is_err() {
                emit!(StreamClosedError { count });
                return Err(());
            }
        }
        Ok(())
    }
}
#[cfg(test)]
mod tests {
    use std::collections::BTreeMap;
    use metrics::{counter, gauge, histogram};
    use vector_lib::{metric_tags, metrics::Controller};
    use super::*;
    use crate::{
        event::{
            metric::{Metric, MetricValue},
            Event,
        },
        test_util::{
            self,
            components::{run_and_assert_source_compliance, SOURCE_TAGS},
        },
    };
    #[test]
    fn generate_config() {
        test_util::test_generate_config::<InternalMetricsConfig>();
    }
    #[test]
    fn captures_internal_metrics() {
        test_util::trace_init();
        std::thread::sleep(std::time::Duration::from_millis(300));
        gauge!("foo").set(1.0);
        gauge!("foo").set(2.0);
        counter!("bar").increment(3);
        counter!("bar").increment(4);
        histogram!("baz").record(5.0);
        histogram!("baz").record(6.0);
        histogram!("quux", "host" => "foo").record(8.0);
        histogram!("quux", "host" => "foo").record(8.1);
        let controller = Controller::get().expect("no controller");
        std::thread::sleep(std::time::Duration::from_millis(300));
        let output = controller
            .capture_metrics()
            .into_iter()
            .map(|metric| (metric.name().to_string(), metric))
            .collect::<BTreeMap<String, Metric>>();
        assert_eq!(&MetricValue::Gauge { value: 2.0 }, output["foo"].value());
        assert_eq!(&MetricValue::Counter { value: 7.0 }, output["bar"].value());
        match &output["baz"].value() {
            MetricValue::AggregatedHistogram {
                buckets,
                count,
                sum,
            } => {
                assert_eq!(buckets[9].count, 2);
                assert_eq!(*count, 2);
                assert_eq!(*sum, 11.0);
            }
            _ => panic!("wrong type"),
        }
        match &output["quux"].value() {
            MetricValue::AggregatedHistogram {
                buckets,
                count,
                sum,
            } => {
                assert_eq!(buckets[9].count, 1);
                assert_eq!(buckets[10].count, 1);
                assert_eq!(*count, 2);
                assert_eq!(*sum, 16.1);
            }
            _ => panic!("wrong type"),
        }
        let labels = metric_tags!("host" => "foo");
        assert_eq!(Some(&labels), output["quux"].tags());
    }
    async fn event_from_config(config: InternalMetricsConfig) -> Event {
        let mut events = run_and_assert_source_compliance(
            config,
            time::Duration::from_millis(100),
            &SOURCE_TAGS,
        )
        .await;
        assert!(!events.is_empty());
        events.remove(0)
    }
    #[tokio::test]
    async fn default_namespace() {
        let event = event_from_config(InternalMetricsConfig::default()).await;
        assert_eq!(event.as_metric().namespace(), Some("vector"));
    }
    #[tokio::test]
    async fn sets_tags() {
        let event = event_from_config(InternalMetricsConfig {
            tags: TagsConfig {
                host_key: Some(OptionalValuePath::new("my_host_key")),
                pid_key: Some(String::from("my_pid_key")),
            },
            ..Default::default()
        })
        .await;
        let metric = event.as_metric();
        assert!(metric.tag_value("my_host_key").is_some());
        assert!(metric.tag_value("my_pid_key").is_some());
    }
    #[tokio::test]
    async fn only_host_tags_by_default() {
        let event = event_from_config(InternalMetricsConfig::default()).await;
        let metric = event.as_metric();
        assert!(metric.tag_value("host").is_some());
        assert!(metric.tag_value("pid").is_none());
    }
    #[tokio::test]
    async fn namespace() {
        let namespace = "totally_custom";
        let config = InternalMetricsConfig {
            namespace: namespace.to_owned(),
            ..InternalMetricsConfig::default()
        };
        let event = event_from_config(config).await;
        assert_eq!(event.as_metric().namespace(), Some(namespace));
    }
}