use std::collections::HashMap;
use bytes::{Buf, Bytes};
use http::{Response, StatusCode, Uri};
use hyper::{body, Body};
use serde::Deserialize;
use snafu::ResultExt;
use vector_lib::config::proxy::ProxyConfig;
use vector_lib::config::LogNamespace;
use super::{
    request_builder::ElasticsearchRequestBuilder, ElasticsearchApiVersion, ElasticsearchEncoder,
    InvalidHostSnafu, Request, VersionType,
};
use crate::{
    http::{HttpClient, MaybeAuth},
    sinks::{
        elasticsearch::{
            ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig,
            OpenSearchServiceType, ParseError,
        },
        util::auth::Auth,
        util::{http::RequestConfig, UriSerde},
        HealthcheckError,
    },
    tls::TlsSettings,
    transforms::metric_to_log::MetricToLog,
};
#[derive(Debug, Clone)]
pub struct ElasticsearchCommon {
    pub base_url: String,
    pub bulk_uri: Uri,
    pub auth: Option<Auth>,
    pub service_type: OpenSearchServiceType,
    pub mode: ElasticsearchCommonMode,
    pub request_builder: ElasticsearchRequestBuilder,
    pub tls_settings: TlsSettings,
    pub request: RequestConfig,
    pub query_params: HashMap<String, String>,
    pub metric_to_log: MetricToLog,
}
impl ElasticsearchCommon {
    pub async fn parse_config(
        config: &ElasticsearchConfig,
        endpoint: &str,
        proxy_config: &ProxyConfig,
        version: &mut Option<usize>,
    ) -> crate::Result<Self> {
        let uri = format!("{}/_test", endpoint);
        let uri = uri
            .parse::<Uri>()
            .with_context(|_| InvalidHostSnafu { host: endpoint })?;
        if uri.host().is_none() {
            return Err(ParseError::HostMustIncludeHostname {
                host: endpoint.to_string(),
            }
            .into());
        }
        let uri = endpoint.parse::<UriSerde>()?;
        let auth = match &config.auth {
            Some(ElasticsearchAuthConfig::Basic { user, password }) => {
                let auth = Some(crate::http::Auth::Basic {
                    user: user.clone(),
                    password: password.clone(),
                });
                let auth = auth.choose_one(&uri.auth)?.unwrap();
                Some(Auth::Basic(auth))
            }
            #[cfg(feature = "aws-core")]
            Some(ElasticsearchAuthConfig::Aws(aws)) => {
                let region = config
                    .aws
                    .as_ref()
                    .map(|config| config.region())
                    .ok_or(ParseError::RegionRequired)?
                    .ok_or(ParseError::RegionRequired)?;
                Some(Auth::Aws {
                    credentials_provider: aws
                        .credentials_provider(region.clone(), proxy_config, &config.tls)
                        .await?,
                    region,
                })
            }
            None => None,
        };
        if config.opensearch_service_type == OpenSearchServiceType::Serverless {
            match &config.auth {
                #[cfg(feature = "aws-core")]
                Some(ElasticsearchAuthConfig::Aws(_)) => (),
                _ => return Err(ParseError::OpenSearchServerlessRequiresAwsAuth.into()),
            }
        }
        let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
        let mode = config.common_mode()?;
        let tower_request = config.request.tower.into_settings();
        if config.bulk.version.is_some() && config.bulk.version_type == VersionType::Internal {
            return Err(ParseError::ExternalVersionIgnoredWithInternalVersioning.into());
        }
        if config.bulk.version.is_some()
            && (config.bulk.version_type == VersionType::External
                || config.bulk.version_type == VersionType::ExternalGte)
            && config.id_key.is_none()
        {
            return Err(ParseError::ExternalVersioningWithoutDocumentID.into());
        }
        if config.bulk.version.is_none()
            && (config.bulk.version_type == VersionType::External
                || config.bulk.version_type == VersionType::ExternalGte)
        {
            return Err(ParseError::ExternalVersioningWithoutVersion.into());
        }
        let mut query_params = config.query.clone().unwrap_or_default();
        query_params.insert(
            "timeout".into(),
            format!("{}s", tower_request.timeout.as_secs()),
        );
        if let Some(pipeline) = &config.pipeline {
            if !pipeline.is_empty() {
                query_params.insert("pipeline".into(), pipeline.into());
            }
        }
        let bulk_url = {
            let mut query = url::form_urlencoded::Serializer::new(String::new());
            for (p, v) in &query_params {
                query.append_pair(&p[..], &v[..]);
            }
            format!("{}/_bulk?{}", base_url, query.finish())
        };
        let bulk_uri = bulk_url.parse::<Uri>().unwrap();
        let tls_settings = TlsSettings::from_options(&config.tls)?;
        let config = config.clone();
        let request = config.request;
        let metric_config = config.metrics.clone().unwrap_or_default();
        let metric_to_log = MetricToLog::new(
            metric_config.host_tag.as_deref(),
            metric_config.timezone.unwrap_or_default(),
            LogNamespace::Legacy,
            metric_config.metric_tag_values,
        );
        let service_type = config.opensearch_service_type;
        let version = if service_type == OpenSearchServiceType::Serverless {
            if config.api_version != ElasticsearchApiVersion::Auto {
                return Err(ParseError::ServerlessElasticsearchApiVersionMustBeAuto.into());
            }
            8
        } else if let Some(version) = *version {
            version
        } else {
            let ver = match config.api_version {
                ElasticsearchApiVersion::V6 => 6,
                ElasticsearchApiVersion::V7 => 7,
                ElasticsearchApiVersion::V8 => 8,
                ElasticsearchApiVersion::Auto => {
                    match get_version(
                        &base_url,
                        &auth,
                        #[cfg(feature = "aws-core")]
                        &service_type,
                        &request,
                        &tls_settings,
                        proxy_config,
                    )
                    .await
                    {
                        Ok(version) => {
                            debug!(message = "Auto-detected Elasticsearch API version.", %version);
                            version
                        }
                        Err(error) => {
                            let assumed_version = if config.suppress_type_name { 6 } else { 8 };
                            debug!(message = "Assumed Elasticsearch API version based on config setting suppress_type_name.",
                                   %assumed_version,
                                   %config.suppress_type_name
                            );
                            warn!(message = "Failed to determine Elasticsearch API version. Please fix the reported error or set an API version explicitly via `api_version`.",
                                  %assumed_version,
                                  %error
                            );
                            assumed_version
                        }
                    }
                }
            };
            *version = Some(ver);
            ver
        };
        let doc_type = config.doc_type.clone();
        let suppress_type_name = if config.suppress_type_name {
            warn!(message = "DEPRECATION, use of deprecated option `suppress_type_name`. Please use `api_version` option instead.");
            config.suppress_type_name
        } else {
            version >= 7
        };
        let request_builder = ElasticsearchRequestBuilder {
            compression: config.compression,
            encoder: ElasticsearchEncoder {
                transformer: config.encoding.clone(),
                doc_type,
                suppress_type_name,
            },
        };
        Ok(Self {
            auth,
            service_type,
            base_url,
            bulk_uri,
            mode,
            request_builder,
            query_params,
            request,
            tls_settings,
            metric_to_log,
        })
    }
    pub async fn parse_many(
        config: &ElasticsearchConfig,
        proxy_config: &ProxyConfig,
    ) -> crate::Result<Vec<Self>> {
        let mut version = None;
        if let Some(endpoint) = config.endpoint.as_ref() {
            warn!(message = "DEPRECATION, use of deprecated option `endpoint`. Please use `endpoints` option instead.");
            if config.endpoints.is_empty() {
                Ok(vec![
                    Self::parse_config(config, endpoint, proxy_config, &mut version).await?,
                ])
            } else {
                Err(ParseError::EndpointsExclusive.into())
            }
        } else if config.endpoints.is_empty() {
            Err(ParseError::EndpointRequired.into())
        } else {
            let mut commons = Vec::new();
            for endpoint in config.endpoints.iter() {
                commons
                    .push(Self::parse_config(config, endpoint, proxy_config, &mut version).await?);
            }
            Ok(commons)
        }
    }
    #[cfg(test)]
    pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result<Self> {
        let mut commons =
            Self::parse_many(config, crate::config::SinkContext::default().proxy()).await?;
        assert_eq!(commons.len(), 1);
        Ok(commons.remove(0))
    }
    pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
        if self.service_type == OpenSearchServiceType::Serverless {
            warn!(message = "Amazon OpenSearch Serverless does not support healthchecks. Skipping healthcheck...");
            Ok(())
        } else {
            match get(
                &self.base_url,
                &self.auth,
                #[cfg(feature = "aws-core")]
                &self.service_type,
                &self.request,
                client,
                "/_cluster/health",
            )
            .await?
            .status()
            {
                StatusCode::OK => Ok(()),
                status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
            }
        }
    }
}
#[cfg(feature = "aws-core")]
pub async fn sign_request(
    service_type: &OpenSearchServiceType,
    request: &mut http::Request<Bytes>,
    credentials_provider: &aws_credential_types::provider::SharedCredentialsProvider,
    region: &Option<aws_types::region::Region>,
) -> crate::Result<()> {
    crate::aws::sign_request(
        service_type.as_str(),
        request,
        credentials_provider,
        region,
        *service_type == OpenSearchServiceType::Serverless,
    )
    .await
}
async fn get_version(
    base_url: &str,
    auth: &Option<Auth>,
    #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
    request: &RequestConfig,
    tls_settings: &TlsSettings,
    proxy_config: &ProxyConfig,
) -> crate::Result<usize> {
    #[derive(Deserialize)]
    struct Version {
        number: Option<String>,
    }
    #[derive(Deserialize)]
    struct ResponsePayload {
        version: Option<Version>,
    }
    let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
    let response = get(
        base_url,
        auth,
        #[cfg(feature = "aws-core")]
        service_type,
        request,
        client,
        "/",
    )
    .await
    .map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
    let (_, body) = response.into_parts();
    let mut body = body::aggregate(body).await?;
    let body = body.copy_to_bytes(body.remaining());
    let ResponsePayload { version } = serde_json::from_slice(&body)?;
    if let Some(version) = version {
        if let Some(number) = version.number {
            let v: Vec<&str> = number.split('.').collect();
            if !v.is_empty() {
                if let Ok(major_version) = v[0].parse::<usize>() {
                    return Ok(major_version);
                }
            }
        }
    }
    Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into())
}
async fn get(
    base_url: &str,
    auth: &Option<Auth>,
    #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
    request: &RequestConfig,
    client: HttpClient,
    path: &str,
) -> crate::Result<Response<Body>> {
    let mut builder = Request::get(format!("{}{}", base_url, path));
    for (header, value) in &request.headers {
        builder = builder.header(&header[..], &value[..]);
    }
    let mut request = builder.body(Bytes::new())?;
    if let Some(auth) = auth {
        match auth {
            Auth::Basic(http_auth) => {
                http_auth.apply(&mut request);
            }
            #[cfg(feature = "aws-core")]
            Auth::Aws {
                credentials_provider: provider,
                region,
            } => {
                let region = region.clone();
                sign_request(service_type, &mut request, provider, &Some(region)).await?;
            }
        }
    }
    client
        .send(request.map(hyper::Body::from))
        .await
        .map_err(Into::into)
}