use std::collections::HashMap;
use vrl::value::Kind;
use super::{healthcheck::healthcheck, sink::LokiSink};
use crate::{
    http::{Auth, HttpClient, MaybeAuth},
    schema,
    sinks::{prelude::*, util::UriSerde},
};
const fn default_compression() -> Compression {
    Compression::Snappy
}
fn default_loki_path() -> String {
    "/loki/api/v1/push".to_string()
}
#[configurable_component(sink("loki", "Deliver log event data to the Loki aggregation system."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct LokiConfig {
    #[configurable(metadata(docs::examples = "http://localhost:3100"))]
    pub endpoint: UriSerde,
    #[serde(default = "default_loki_path")]
    pub path: String,
    #[configurable(derived)]
    pub encoding: EncodingConfig,
    #[configurable(metadata(
        docs::examples = "some_tenant_id",
        docs::examples = "{{ event_field }}",
    ))]
    pub tenant_id: Option<Template>,
    #[configurable(metadata(docs::examples = "loki_labels_examples()"))]
    #[configurable(metadata(docs::additional_props_description = "A Loki label."))]
    pub labels: HashMap<Template, Template>,
    #[serde(default = "crate::serde::default_false")]
    pub remove_label_fields: bool,
    #[configurable(metadata(docs::examples = "loki_structured_metadata_examples()"))]
    #[configurable(metadata(docs::additional_props_description = "Loki structured metadata."))]
    #[serde(default)]
    pub structured_metadata: HashMap<Template, Template>,
    #[serde(default = "crate::serde::default_false")]
    pub remove_structured_metadata_fields: bool,
    #[serde(default = "crate::serde::default_true")]
    pub remove_timestamp: bool,
    #[serde(default = "default_compression")]
    pub compression: Compression,
    #[configurable(derived)]
    #[serde(default)]
    pub out_of_order_action: OutOfOrderAction,
    #[configurable(derived)]
    pub auth: Option<Auth>,
    #[configurable(derived)]
    #[serde(default)]
    pub request: TowerRequestConfig,
    #[configurable(derived)]
    #[serde(default)]
    pub batch: BatchConfig<LokiDefaultBatchSettings>,
    #[configurable(derived)]
    pub tls: Option<TlsConfig>,
    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    acknowledgements: AcknowledgementsConfig,
}
fn loki_labels_examples() -> HashMap<String, String> {
    let mut examples = HashMap::new();
    examples.insert("source".to_string(), "vector".to_string());
    examples.insert(
        "\"pod_labels_*\"".to_string(),
        "{{ kubernetes.pod_labels }}".to_string(),
    );
    examples.insert("\"*\"".to_string(), "{{ metadata }}".to_string());
    examples.insert(
        "{{ event_field }}".to_string(),
        "{{ some_other_event_field }}".to_string(),
    );
    examples
}
fn loki_structured_metadata_examples() -> HashMap<String, String> {
    let mut examples = HashMap::new();
    examples.insert("source".to_string(), "vector".to_string());
    examples.insert(
        "\"pod_labels_*\"".to_string(),
        "{{ kubernetes.pod_labels }}".to_string(),
    );
    examples.insert("\"*\"".to_string(), "{{ metadata }}".to_string());
    examples.insert(
        "{{ event_field }}".to_string(),
        "{{ some_other_event_field }}".to_string(),
    );
    examples
}
#[derive(Clone, Copy, Debug, Default)]
pub struct LokiDefaultBatchSettings;
impl SinkBatchSettings for LokiDefaultBatchSettings {
    const MAX_EVENTS: Option<usize> = Some(100_000);
    const MAX_BYTES: Option<usize> = Some(1_000_000);
    const TIMEOUT_SECS: f64 = 1.0;
}
#[configurable_component]
#[derive(Copy, Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(rename_all = "snake_case")]
pub enum OutOfOrderAction {
    #[derivative(Default)]
    Accept,
    RewriteTimestamp,
    Drop,
}
impl GenerateConfig for LokiConfig {
    fn generate_config() -> toml::Value {
        toml::from_str(
            r#"endpoint = "http://localhost:3100"
            encoding.codec = "json"
            labels = {}"#,
        )
        .unwrap()
    }
}
impl LokiConfig {
    pub(super) fn build_client(&self, cx: SinkContext) -> crate::Result<HttpClient> {
        let tls = TlsSettings::from_options(&self.tls)?;
        let client = HttpClient::new(tls, cx.proxy())?;
        Ok(client)
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "loki")]
impl SinkConfig for LokiConfig {
    async fn build(
        &self,
        cx: SinkContext,
    ) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
        if self.labels.is_empty() {
            return Err("`labels` must include at least one label.".into());
        }
        for label in self.labels.keys() {
            if !valid_label_name(label) {
                return Err(format!("Invalid label name {:?}", label.get_ref()).into());
            }
        }
        let client = self.build_client(cx)?;
        let config = LokiConfig {
            auth: self.auth.choose_one(&self.endpoint.auth)?,
            ..self.clone()
        };
        let sink = LokiSink::new(config.clone(), client.clone())?;
        let healthcheck = healthcheck(config, client).boxed();
        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
    }
    fn input(&self) -> Input {
        let requirement =
            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
        Input::new(self.encoding.config().input_type() & DataType::Log)
            .with_schema_requirement(requirement)
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}
pub fn valid_label_name(label: &Template) -> bool {
    label.is_dynamic() || {
        let mut label_trim = label.get_ref().trim();
        if let Some(without_opening_end) = label_trim.strip_suffix('*') {
            label_trim = without_opening_end
        }
        let mut label_chars = label_trim.chars();
        if let Some(ch) = label_chars.next() {
            (ch.is_ascii_alphabetic() || ch == '_')
                && label_chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
        } else {
            label.get_ref().trim() == "*"
        }
    }
}
#[cfg(test)]
mod tests {
    use std::convert::TryInto;
    use super::valid_label_name;
    #[test]
    fn valid_label_names() {
        assert!(valid_label_name(&"name".try_into().unwrap()));
        assert!(valid_label_name(&" name ".try_into().unwrap()));
        assert!(valid_label_name(&"bee_bop".try_into().unwrap()));
        assert!(valid_label_name(&"a09b".try_into().unwrap()));
        assert!(valid_label_name(&"abc_*".try_into().unwrap()));
        assert!(valid_label_name(&"_*".try_into().unwrap()));
        assert!(valid_label_name(&"*".try_into().unwrap()));
        assert!(!valid_label_name(&"0ab".try_into().unwrap()));
        assert!(!valid_label_name(&"".try_into().unwrap()));
        assert!(!valid_label_name(&" ".try_into().unwrap()));
        assert!(valid_label_name(&"{{field}}".try_into().unwrap()));
    }
}