use async_trait::async_trait;
use futures::StreamExt;
use futures_util::stream::BoxStream;
use indoc::indoc;
use vector_lib::codecs::JsonSerializerConfig;
use vector_lib::configurable::configurable_component;
use vector_lib::lookup;
use vector_lib::lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath, OptionalValuePath};
use vector_lib::sensitive_string::SensitiveString;
use vector_lib::sink::StreamSink;
use super::{
    config_host_key,
    logs::{HumioLogsConfig, HOST},
};
use crate::{
    config::{
        AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, TransformContext,
    },
    event::{Event, EventArray, EventContainer},
    sinks::{
        splunk_hec::common::SplunkHecDefaultBatchSettings,
        util::{BatchConfig, Compression, TowerRequestConfig},
        Healthcheck, VectorSink,
    },
    template::Template,
    tls::TlsConfig,
    transforms::{
        metric_to_log::{MetricToLog, MetricToLogConfig},
        FunctionTransform, OutputBuffer,
    },
};
#[configurable_component(sink("humio_metrics", "Deliver metric event data to Humio."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct HumioMetricsConfig {
    #[serde(flatten)]
    transform: MetricToLogConfig,
    #[configurable(metadata(
        docs::examples = "${HUMIO_TOKEN}",
        docs::examples = "A94A8FE5CCB19BA61C4C08"
    ))]
    token: SensitiveString,
    #[serde(alias = "host")]
    #[serde(default = "default_endpoint")]
    #[configurable(metadata(
        docs::examples = "http://127.0.0.1",
        docs::examples = "https://example.com",
    ))]
    pub(super) endpoint: String,
    source: Option<Template>,
    #[configurable(metadata(
        docs::examples = "json",
        docs::examples = "none",
        docs::examples = "{{ event_type }}"
    ))]
    event_type: Option<Template>,
    #[serde(default = "config_host_key")]
    host_key: OptionalValuePath,
    #[serde(default)]
    indexed_fields: Vec<ConfigValuePath>,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
    index: Option<Template>,
    #[configurable(derived)]
    #[serde(default)]
    compression: Compression,
    #[configurable(derived)]
    #[serde(default)]
    request: TowerRequestConfig,
    #[configurable(derived)]
    #[serde(default)]
    batch: BatchConfig<SplunkHecDefaultBatchSettings>,
    #[configurable(derived)]
    tls: Option<TlsConfig>,
    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    acknowledgements: AcknowledgementsConfig,
}
fn default_endpoint() -> String {
    HOST.to_string()
}
impl GenerateConfig for HumioMetricsConfig {
    fn generate_config() -> toml::Value {
        toml::from_str(indoc! {r#"
                host_key = "hostname"
                token = "${HUMIO_TOKEN}"
            "#})
        .unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "humio_metrics")]
impl SinkConfig for HumioMetricsConfig {
    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
        let transform = self
            .transform
            .build_transform(&TransformContext::new_with_globals(cx.globals.clone()));
        let sink = HumioLogsConfig {
            token: self.token.clone(),
            endpoint: self.endpoint.clone(),
            source: self.source.clone(),
            encoding: JsonSerializerConfig::default().into(),
            event_type: self.event_type.clone(),
            host_key: OptionalTargetPath::from(
                vrl::path::PathPrefix::Event,
                self.host_key.path.clone(),
            ),
            indexed_fields: self.indexed_fields.clone(),
            index: self.index.clone(),
            compression: self.compression,
            request: self.request,
            batch: self.batch,
            tls: self.tls.clone(),
            timestamp_nanos_key: None,
            acknowledgements: Default::default(),
            timestamp_key: OptionalTargetPath::from(
                vrl::path::PathPrefix::Event,
                Some(lookup::owned_value_path!("timestamp")),
            ),
        };
        let (sink, healthcheck) = sink.clone().build(cx).await?;
        let sink = HumioMetricsSink {
            inner: sink,
            transform,
        };
        Ok((VectorSink::Stream(Box::new(sink)), healthcheck))
    }
    fn input(&self) -> Input {
        Input::metric()
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}
pub struct HumioMetricsSink {
    inner: VectorSink,
    transform: MetricToLog,
}
#[async_trait]
impl StreamSink<EventArray> for HumioMetricsSink {
    async fn run(self: Box<Self>, input: BoxStream<'_, EventArray>) -> Result<(), ()> {
        let mut transform = self.transform;
        self.inner
            .run(input.map(move |events| {
                let mut buf = OutputBuffer::with_capacity(events.len());
                for event in events.into_events() {
                    transform.transform(&mut buf, event);
                }
                let events = buf.into_events().map(Event::into_log).collect::<Vec<_>>();
                events.into()
            }))
            .await
    }
}
#[cfg(test)]
mod tests {
    use chrono::{offset::TimeZone, Utc};
    use futures::stream;
    use indoc::indoc;
    use similar_asserts::assert_eq;
    use vector_lib::metric_tags;
    use super::*;
    use crate::{
        event::{
            metric::{MetricKind, MetricValue, StatisticKind},
            Event, Metric,
        },
        sinks::util::test::{build_test_server, load_sink},
        test_util::{
            self,
            components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
        },
    };
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<HumioMetricsConfig>();
    }
    #[test]
    fn test_endpoint_field() {
        let (config, _) = load_sink::<HumioMetricsConfig>(indoc! {r#"
            token = "atoken"
            batch.max_events = 1
            endpoint = "https://localhost:9200/"
        "#})
        .unwrap();
        assert_eq!("https://localhost:9200/".to_string(), config.endpoint);
        let (config, _) = load_sink::<HumioMetricsConfig>(indoc! {r#"
            token = "atoken"
            batch.max_events = 1
            host = "https://localhost:9200/"
        "#})
        .unwrap();
        assert_eq!("https://localhost:9200/".to_string(), config.endpoint);
    }
    #[tokio::test]
    async fn smoke_json() {
        let (mut config, cx) = load_sink::<HumioMetricsConfig>(indoc! {r#"
            token = "atoken"
            batch.max_events = 1
        "#})
        .unwrap();
        let addr = test_util::next_addr();
        config.endpoint = format!("http://{}", addr);
        let (sink, _) = config.build(cx).await.unwrap();
        let (rx, _trigger, server) = build_test_server(addr);
        tokio::spawn(server);
        let metrics = vec![
            Event::from(
                Metric::new(
                    "metric1",
                    MetricKind::Incremental,
                    MetricValue::Counter { value: 42.0 },
                )
                .with_tags(Some(metric_tags!("os.host" => "somehost")))
                .with_timestamp(Some(
                    Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 1)
                        .single()
                        .expect("invalid timestamp"),
                )),
            ),
            Event::from(
                Metric::new(
                    "metric2",
                    MetricKind::Absolute,
                    MetricValue::Distribution {
                        samples: vector_lib::samples![1.0 => 100, 2.0 => 200, 3.0 => 300],
                        statistic: StatisticKind::Histogram,
                    },
                )
                .with_tags(Some(metric_tags!("os.host" => "somehost")))
                .with_timestamp(Some(
                    Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 2)
                        .single()
                        .expect("invalid timestamp"),
                )),
            ),
        ];
        let len = metrics.len();
        run_and_assert_sink_compliance(sink, stream::iter(metrics), &HTTP_SINK_TAGS).await;
        let output = rx.take(len).collect::<Vec<_>>().await;
        assert_eq!(
            r#"{"event":{"counter":{"value":42.0},"kind":"incremental","name":"metric1","tags":{"os.host":"somehost"}},"fields":{},"time":1597784401.0}"#,
            output[0].1
        );
        assert_eq!(
            r#"{"event":{"distribution":{"samples":[{"rate":100,"value":1.0},{"rate":200,"value":2.0},{"rate":300,"value":3.0}],"statistic":"histogram"},"kind":"absolute","name":"metric2","tags":{"os.host":"somehost"}},"fields":{},"time":1597784402.0}"#,
            output[1].1
        );
    }
    #[tokio::test]
    async fn multi_value_tags() {
        let (mut config, cx) = load_sink::<HumioMetricsConfig>(indoc! {r#"
            token = "atoken"
            batch.max_events = 1
            metric_tag_values = "full"
        "#})
        .unwrap();
        let addr = test_util::next_addr();
        config.endpoint = format!("http://{}", addr);
        let (sink, _) = config.build(cx).await.unwrap();
        let (rx, _trigger, server) = build_test_server(addr);
        tokio::spawn(server);
        let metrics = vec![Event::from(
            Metric::new(
                "metric1",
                MetricKind::Incremental,
                MetricValue::Counter { value: 42.0 },
            )
            .with_tags(Some(metric_tags!(
                "code" => "200",
                "code" => "success"
            )))
            .with_timestamp(Some(
                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 1)
                    .single()
                    .expect("invalid timestamp"),
            )),
        )];
        let len = metrics.len();
        run_and_assert_sink_compliance(sink, stream::iter(metrics), &HTTP_SINK_TAGS).await;
        let output = rx.take(len).collect::<Vec<_>>().await;
        assert_eq!(
            r#"{"event":{"counter":{"value":42.0},"kind":"incremental","name":"metric1","tags":{"code":["200","success"]}},"fields":{},"time":1597784401.0}"#,
            output[0].1
        );
    }
}