use std::{
    collections::{BTreeMap, HashMap},
    convert::TryFrom,
};
use futures::{FutureExt, TryFutureExt};
use vector_lib::configurable::configurable_component;
use crate::{
    codecs::Transformer,
    config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
    event::{EventRef, LogEvent, Value},
    http::HttpClient,
    internal_events::TemplateRenderingError,
    sinks::{
        elasticsearch::{
            health::ElasticsearchHealthLogic,
            retry::ElasticsearchRetryLogic,
            service::{ElasticsearchService, HttpRequestBuilder},
            sink::ElasticsearchSink,
            ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
            ElasticsearchCommonMode, ElasticsearchMode, VersionType,
        },
        util::{
            http::RequestConfig, service::HealthConfig, BatchConfig, Compression,
            RealtimeSizeBasedDefaultBatchSettings,
        },
        Healthcheck, VectorSink,
    },
    template::Template,
    tls::TlsConfig,
    transforms::metric_to_log::MetricToLogConfig,
};
use vector_lib::lookup::event_path;
use vector_lib::lookup::lookup_v2::ConfigValuePath;
use vector_lib::schema::Requirement;
use vrl::value::Kind;
pub const DATA_STREAM_TIMESTAMP_KEY: &str = "@timestamp";
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub enum OpenSearchServiceType {
    Managed,
    Serverless,
}
impl OpenSearchServiceType {
    pub const fn as_str(&self) -> &'static str {
        match self {
            OpenSearchServiceType::Managed => "es",
            OpenSearchServiceType::Serverless => "aoss",
        }
    }
}
impl Default for OpenSearchServiceType {
    fn default() -> Self {
        Self::Managed
    }
}
#[configurable_component(sink("elasticsearch", "Index observability events in Elasticsearch."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct ElasticsearchConfig {
    #[serde(default)]
    #[configurable(
        deprecated = "This option has been deprecated, the `endpoints` option should be used instead."
    )]
    pub endpoint: Option<String>,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "http://10.24.32.122:9000"))]
    #[configurable(metadata(docs::examples = "https://example.com"))]
    #[configurable(metadata(docs::examples = "https://user:password@example.com"))]
    pub endpoints: Vec<String>,
    #[serde(default = "default_doc_type")]
    #[configurable(metadata(docs::advanced))]
    pub doc_type: String,
    #[serde(default)]
    #[configurable(derived)]
    pub api_version: ElasticsearchApiVersion,
    #[serde(default)]
    #[configurable(
        deprecated = "This option has been deprecated, the `api_version` option should be used instead."
    )]
    pub suppress_type_name: bool,
    #[serde(default)]
    #[configurable(metadata(docs::advanced))]
    pub request_retry_partial: bool,
    #[serde(default)]
    #[configurable(metadata(docs::advanced))]
    #[configurable(metadata(docs::examples = "id"))]
    #[configurable(metadata(docs::examples = "_id"))]
    pub id_key: Option<ConfigValuePath>,
    #[serde(default)]
    #[configurable(metadata(docs::advanced))]
    #[configurable(metadata(docs::examples = "pipeline-name"))]
    pub pipeline: Option<String>,
    #[serde(default)]
    #[configurable(derived)]
    pub mode: ElasticsearchMode,
    #[serde(default)]
    #[configurable(derived)]
    pub compression: Compression,
    #[serde(skip_serializing_if = "crate::serde::is_default", default)]
    #[configurable(derived)]
    #[configurable(metadata(docs::advanced))]
    pub encoding: Transformer,
    #[serde(default)]
    #[configurable(derived)]
    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
    #[serde(default)]
    #[configurable(derived)]
    pub request: RequestConfig,
    #[configurable(derived)]
    pub auth: Option<ElasticsearchAuthConfig>,
    #[serde(default)]
    #[configurable(metadata(docs::advanced))]
    #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
    #[configurable(metadata(docs::examples = "query_examples()"))]
    pub query: Option<HashMap<String, String>>,
    #[serde(default)]
    #[configurable(derived)]
    #[cfg(feature = "aws-core")]
    pub aws: Option<crate::aws::RegionOrEndpoint>,
    #[serde(default)]
    pub opensearch_service_type: OpenSearchServiceType,
    #[serde(default)]
    #[configurable(derived)]
    pub tls: Option<TlsConfig>,
    #[serde(default)]
    #[configurable(derived)]
    #[serde(rename = "distribution")]
    pub endpoint_health: Option<HealthConfig>,
    #[serde(alias = "normal", default)]
    #[configurable(derived)]
    pub bulk: BulkConfig,
    #[serde(default)]
    #[configurable(derived)]
    pub data_stream: Option<DataStreamConfig>,
    #[serde(default)]
    #[configurable(derived)]
    pub metrics: Option<MetricToLogConfig>,
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    #[configurable(derived)]
    pub acknowledgements: AcknowledgementsConfig,
}
fn default_doc_type() -> String {
    "_doc".to_owned()
}
fn query_examples() -> HashMap<String, String> {
    HashMap::<_, _>::from_iter([("X-Powered-By".to_owned(), "Vector".to_owned())])
}
impl Default for ElasticsearchConfig {
    fn default() -> Self {
        Self {
            endpoint: None,
            endpoints: vec![],
            doc_type: default_doc_type(),
            api_version: Default::default(),
            suppress_type_name: false,
            request_retry_partial: false,
            id_key: None,
            pipeline: None,
            mode: Default::default(),
            compression: Default::default(),
            encoding: Default::default(),
            batch: Default::default(),
            request: Default::default(),
            auth: None,
            query: None,
            #[cfg(feature = "aws-core")]
            aws: None,
            opensearch_service_type: Default::default(),
            tls: None,
            endpoint_health: None,
            bulk: BulkConfig::default(), data_stream: None,
            metrics: None,
            acknowledgements: Default::default(),
        }
    }
}
impl ElasticsearchConfig {
    pub fn common_mode(&self) -> crate::Result<ElasticsearchCommonMode> {
        match self.mode {
            ElasticsearchMode::Bulk => Ok(ElasticsearchCommonMode::Bulk {
                index: self.bulk.index.clone(),
                action: self.bulk.action.clone(),
                version: self.bulk.version.clone(),
                version_type: self.bulk.version_type,
            }),
            ElasticsearchMode::DataStream => Ok(ElasticsearchCommonMode::DataStream(
                self.data_stream.clone().unwrap_or_default(),
            )),
        }
    }
}
#[configurable_component]
#[derive(Clone, Debug, PartialEq)]
#[serde(rename_all = "snake_case")]
pub struct BulkConfig {
    #[serde(default = "default_bulk_action")]
    #[configurable(metadata(docs::examples = "create"))]
    #[configurable(metadata(docs::examples = "{{ action }}"))]
    pub action: Template,
    #[serde(default = "default_index")]
    #[configurable(metadata(docs::examples = "application-{{ application_id }}-%Y-%m-%d"))]
    #[configurable(metadata(docs::examples = "{{ index }}"))]
    pub index: Template,
    #[configurable(metadata(docs::examples = "{{ obj_version }}-%Y-%m-%d"))]
    #[configurable(metadata(docs::examples = "123"))]
    pub version: Option<Template>,
    #[serde(default = "default_version_type")]
    #[configurable(metadata(docs::examples = "internal"))]
    #[configurable(metadata(docs::examples = "external"))]
    pub version_type: VersionType,
}
fn default_bulk_action() -> Template {
    Template::try_from("index").expect("unable to parse template")
}
fn default_index() -> Template {
    Template::try_from("vector-%Y.%m.%d").expect("unable to parse template")
}
const fn default_version_type() -> VersionType {
    VersionType::Internal
}
impl Default for BulkConfig {
    fn default() -> Self {
        Self {
            action: default_bulk_action(),
            index: default_index(),
            version: Default::default(),
            version_type: default_version_type(),
        }
    }
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(rename_all = "snake_case")]
pub struct DataStreamConfig {
    #[serde(rename = "type", default = "DataStreamConfig::default_type")]
    #[configurable(metadata(docs::examples = "metrics"))]
    #[configurable(metadata(docs::examples = "synthetics"))]
    #[configurable(metadata(docs::examples = "{{ type }}"))]
    pub dtype: Template,
    #[serde(default = "DataStreamConfig::default_dataset")]
    #[configurable(metadata(docs::examples = "generic"))]
    #[configurable(metadata(docs::examples = "nginx"))]
    #[configurable(metadata(docs::examples = "{{ service }}"))]
    pub dataset: Template,
    #[serde(default = "DataStreamConfig::default_namespace")]
    #[configurable(metadata(docs::examples = "{{ environment }}"))]
    pub namespace: Template,
    #[serde(default = "DataStreamConfig::default_auto_routing")]
    pub auto_routing: bool,
    #[serde(default = "DataStreamConfig::default_sync_fields")]
    pub sync_fields: bool,
}
impl Default for DataStreamConfig {
    fn default() -> Self {
        Self {
            dtype: Self::default_type(),
            dataset: Self::default_dataset(),
            namespace: Self::default_namespace(),
            auto_routing: Self::default_auto_routing(),
            sync_fields: Self::default_sync_fields(),
        }
    }
}
impl DataStreamConfig {
    fn default_type() -> Template {
        Template::try_from("logs").expect("couldn't build default type template")
    }
    fn default_dataset() -> Template {
        Template::try_from("generic").expect("couldn't build default dataset template")
    }
    fn default_namespace() -> Template {
        Template::try_from("default").expect("couldn't build default namespace template")
    }
    const fn default_auto_routing() -> bool {
        true
    }
    const fn default_sync_fields() -> bool {
        true
    }
    pub fn remap_timestamp(&self, log: &mut LogEvent) {
        if let Some(timestamp_key) = log.timestamp_path().cloned() {
            if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY {
                return;
            }
            log.rename_key(×tamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY));
        }
    }
    pub fn dtype<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
        self.dtype
            .render_string(event)
            .map_err(|error| {
                emit!(TemplateRenderingError {
                    error,
                    field: Some("data_stream.type"),
                    drop_event: true,
                });
            })
            .ok()
    }
    pub fn dataset<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
        self.dataset
            .render_string(event)
            .map_err(|error| {
                emit!(TemplateRenderingError {
                    error,
                    field: Some("data_stream.dataset"),
                    drop_event: true,
                });
            })
            .ok()
    }
    pub fn namespace<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
        self.namespace
            .render_string(event)
            .map_err(|error| {
                emit!(TemplateRenderingError {
                    error,
                    field: Some("data_stream.namespace"),
                    drop_event: true,
                });
            })
            .ok()
    }
    pub fn sync_fields(&self, log: &mut LogEvent) {
        if !self.sync_fields {
            return;
        }
        let dtype = self.dtype(&*log);
        let dataset = self.dataset(&*log);
        let namespace = self.namespace(&*log);
        if log.as_map().is_none() {
            *log.value_mut() = Value::Object(BTreeMap::new());
        }
        let existing = log
            .as_map_mut()
            .expect("must be a map")
            .entry("data_stream".into())
            .or_insert_with(|| Value::Object(BTreeMap::new()))
            .as_object_mut_unwrap();
        if let Some(dtype) = dtype {
            existing
                .entry("type".into())
                .or_insert_with(|| dtype.into());
        }
        if let Some(dataset) = dataset {
            existing
                .entry("dataset".into())
                .or_insert_with(|| dataset.into());
        }
        if let Some(namespace) = namespace {
            existing
                .entry("namespace".into())
                .or_insert_with(|| namespace.into());
        }
    }
    pub fn index(&self, log: &LogEvent) -> Option<String> {
        let (dtype, dataset, namespace) = if !self.auto_routing {
            (self.dtype(log)?, self.dataset(log)?, self.namespace(log)?)
        } else {
            let data_stream = log
                .get(event_path!("data_stream"))
                .and_then(|ds| ds.as_object());
            let dtype = data_stream
                .and_then(|ds| ds.get("type"))
                .map(|value| value.to_string_lossy().into_owned())
                .or_else(|| self.dtype(log))?;
            let dataset = data_stream
                .and_then(|ds| ds.get("dataset"))
                .map(|value| value.to_string_lossy().into_owned())
                .or_else(|| self.dataset(log))?;
            let namespace = data_stream
                .and_then(|ds| ds.get("namespace"))
                .map(|value| value.to_string_lossy().into_owned())
                .or_else(|| self.namespace(log))?;
            (dtype, dataset, namespace)
        };
        let name = [dtype, dataset, namespace]
            .into_iter()
            .filter(|s| !s.is_empty())
            .collect::<Vec<_>>()
            .join("-");
        Some(name)
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "elasticsearch")]
impl SinkConfig for ElasticsearchConfig {
    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
        let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?;
        let common = commons[0].clone();
        let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;
        let request_limits = self.request.tower.into_settings();
        let health_config = self.endpoint_health.clone().unwrap_or_default();
        let services = commons
            .iter()
            .cloned()
            .map(|common| {
                let endpoint = common.base_url.clone();
                let http_request_builder = HttpRequestBuilder::new(&common, self);
                let service = ElasticsearchService::new(client.clone(), http_request_builder);
                (endpoint, service)
            })
            .collect::<Vec<_>>();
        let service = request_limits.distributed_service(
            ElasticsearchRetryLogic {
                retry_partial: self.request_retry_partial,
            },
            services,
            health_config,
            ElasticsearchHealthLogic,
            1,
        );
        let sink = ElasticsearchSink::new(&common, self, service)?;
        let stream = VectorSink::from_event_streamsink(sink);
        let healthcheck = futures::future::select_ok(
            commons
                .into_iter()
                .map(move |common| common.healthcheck(client.clone()).boxed()),
        )
        .map_ok(|((), _)| ())
        .boxed();
        Ok((stream, healthcheck))
    }
    fn input(&self) -> Input {
        let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
        Input::new(DataType::Metric | DataType::Log).with_schema_requirement(requirements)
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<ElasticsearchConfig>();
    }
    #[test]
    fn parse_aws_auth() {
        toml::from_str::<ElasticsearchConfig>(
            r#"
            endpoints = [""]
            auth.strategy = "aws"
            auth.assume_role = "role"
        "#,
        )
        .unwrap();
        toml::from_str::<ElasticsearchConfig>(
            r#"
            endpoints = [""]
            auth.strategy = "aws"
        "#,
        )
        .unwrap();
    }
    #[test]
    fn parse_mode() {
        let config = toml::from_str::<ElasticsearchConfig>(
            r#"
            endpoints = [""]
            mode = "data_stream"
            data_stream.type = "synthetics"
        "#,
        )
        .unwrap();
        assert!(matches!(config.mode, ElasticsearchMode::DataStream));
        assert!(config.data_stream.is_some());
    }
    #[test]
    fn parse_distribution() {
        toml::from_str::<ElasticsearchConfig>(
            r#"
            endpoints = ["", ""]
            distribution.retry_initial_backoff_secs = 10
        "#,
        )
        .unwrap();
    }
    #[test]
    fn parse_version() {
        let config = toml::from_str::<ElasticsearchConfig>(
            r#"
            endpoints = [""]
            api_version = "v7"
        "#,
        )
        .unwrap();
        assert_eq!(config.api_version, ElasticsearchApiVersion::V7);
    }
    #[test]
    fn parse_version_auto() {
        let config = toml::from_str::<ElasticsearchConfig>(
            r#"
            endpoints = [""]
            api_version = "auto"
        "#,
        )
        .unwrap();
        assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
    }
    #[test]
    fn parse_default_bulk() {
        let config = toml::from_str::<ElasticsearchConfig>(
            r#"
            endpoints = [""]
        "#,
        )
        .unwrap();
        assert_eq!(config.mode, ElasticsearchMode::Bulk);
        assert_eq!(config.bulk, BulkConfig::default());
    }
}