use std::{borrow::Cow, sync::Arc};
use bytes::Bytes;
use futures_util::future::BoxFuture;
use http::{Request, StatusCode, Uri};
use hyper::Body;
use snafu::{ResultExt, Snafu};
use vector_lib::lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath};
use vector_lib::{config::proxy::ProxyConfig, event::EventRef};
use super::{
    request::HecRequest,
    service::{HttpRequestBuilder, MetadataFields},
    EndpointTarget,
};
use crate::{
    http::HttpClient,
    internal_events::TemplateRenderingError,
    sinks::{
        self,
        util::{http::HttpBatchService, SinkBatchSettings},
        UriParseSnafu,
    },
    template::Template,
    tls::{TlsConfig, TlsSettings},
};
#[derive(Clone, Copy, Debug, Default)]
pub struct SplunkHecDefaultBatchSettings;
impl SinkBatchSettings for SplunkHecDefaultBatchSettings {
    const MAX_EVENTS: Option<usize> = None;
    const MAX_BYTES: Option<usize> = Some(1_000_000);
    const TIMEOUT_SECS: f64 = 1.0;
}
#[derive(Debug, Snafu)]
pub enum HealthcheckError {
    #[snafu(display("Invalid HEC token"))]
    InvalidToken,
    #[snafu(display("Queues are full"))]
    QueuesFull,
}
pub fn create_client(
    tls: &Option<TlsConfig>,
    proxy_config: &ProxyConfig,
) -> crate::Result<HttpClient> {
    let tls_settings = TlsSettings::from_options(tls)?;
    Ok(HttpClient::new(tls_settings, proxy_config)?)
}
pub fn build_http_batch_service(
    client: HttpClient,
    http_request_builder: Arc<HttpRequestBuilder>,
    endpoint_target: EndpointTarget,
    auto_extract_timestamp: bool,
) -> HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HecRequest> {
    HttpBatchService::new(client, move |req: HecRequest| {
        let request_builder = Arc::clone(&http_request_builder);
        let future: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
            Box::pin(async move {
                request_builder.build_request(
                    req.body,
                    match endpoint_target {
                        EndpointTarget::Event => "/services/collector/event",
                        EndpointTarget::Raw => "/services/collector/raw",
                    },
                    req.passthrough_token,
                    MetadataFields {
                        source: req.source,
                        sourcetype: req.sourcetype,
                        index: req.index,
                        host: req.host,
                    },
                    auto_extract_timestamp,
                )
            });
        future
    })
}
pub async fn build_healthcheck(
    endpoint: String,
    token: String,
    client: HttpClient,
) -> crate::Result<()> {
    let uri = build_uri(endpoint.as_str(), "/services/collector/health/1.0", None)
        .context(UriParseSnafu)?;
    let request = Request::get(uri)
        .header("Authorization", format!("Splunk {}", token))
        .body(Body::empty())
        .unwrap();
    let response = client.send(request).await?;
    match response.status() {
        StatusCode::OK => Ok(()),
        StatusCode::BAD_REQUEST => Err(HealthcheckError::InvalidToken.into()),
        StatusCode::SERVICE_UNAVAILABLE => Err(HealthcheckError::QueuesFull.into()),
        other => Err(sinks::HealthcheckError::UnexpectedStatus { status: other }.into()),
    }
}
pub fn build_uri(
    host: &str,
    path: &str,
    query: impl IntoIterator<Item = (&'static str, String)>,
) -> Result<Uri, http::uri::InvalidUri> {
    let mut uri = format!("{}{}", host.trim_end_matches('/'), path);
    let mut first = true;
    for (key, value) in query.into_iter() {
        if first {
            uri.push('?');
            first = false;
        } else {
            uri.push('&');
        }
        uri.push_str(&Cow::<str>::from(percent_encoding::utf8_percent_encode(
            key,
            percent_encoding::NON_ALPHANUMERIC,
        )));
        uri.push('=');
        uri.push_str(&Cow::<str>::from(percent_encoding::utf8_percent_encode(
            &value,
            percent_encoding::NON_ALPHANUMERIC,
        )));
    }
    uri.parse::<Uri>()
}
pub fn config_host_key() -> OptionalValuePath {
    OptionalValuePath {
        path: crate::config::log_schema().host_key().cloned(),
    }
}
pub fn config_timestamp_key_target_path() -> OptionalTargetPath {
    OptionalTargetPath {
        path: crate::config::log_schema()
            .timestamp_key_target_path()
            .cloned(),
    }
}
pub fn render_template_string<'a>(
    template: &Template,
    event: impl Into<EventRef<'a>>,
    field_name: &str,
) -> Option<String> {
    template
        .render_string(event)
        .map_err(|error| {
            emit!(TemplateRenderingError {
                error,
                field: Some(field_name),
                drop_event: false
            });
        })
        .ok()
}
#[cfg(test)]
mod tests {
    use bytes::Bytes;
    use http::{HeaderValue, Uri};
    use vector_lib::config::proxy::ProxyConfig;
    use wiremock::{
        matchers::{header, method, path},
        Mock, MockServer, ResponseTemplate,
    };
    use crate::sinks::{
        splunk_hec::common::{
            build_healthcheck, build_uri, create_client,
            service::{HttpRequestBuilder, MetadataFields},
            EndpointTarget, HOST_FIELD, SOURCE_FIELD,
        },
        util::Compression,
    };
    #[tokio::test]
    async fn test_build_healthcheck_200_response_returns_ok() {
        let mock_server = MockServer::start().await;
        Mock::given(method("GET"))
            .and(path("/services/collector/health/1.0"))
            .and(header("Authorization", "Splunk token"))
            .respond_with(ResponseTemplate::new(200))
            .mount(&mock_server)
            .await;
        let client = create_client(&None, &ProxyConfig::default()).unwrap();
        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
        assert!(healthcheck.await.is_ok())
    }
    #[tokio::test]
    async fn test_build_healthcheck_400_response_returns_error() {
        let mock_server = MockServer::start().await;
        Mock::given(method("GET"))
            .and(path("/services/collector/health/1.0"))
            .and(header("Authorization", "Splunk token"))
            .respond_with(ResponseTemplate::new(400))
            .mount(&mock_server)
            .await;
        let client = create_client(&None, &ProxyConfig::default()).unwrap();
        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
        assert_eq!(
            &healthcheck.await.unwrap_err().to_string(),
            "Invalid HEC token"
        );
    }
    #[tokio::test]
    async fn test_build_healthcheck_503_response_returns_error() {
        let mock_server = MockServer::start().await;
        Mock::given(method("GET"))
            .and(path("/services/collector/health/1.0"))
            .and(header("Authorization", "Splunk token"))
            .respond_with(ResponseTemplate::new(503))
            .mount(&mock_server)
            .await;
        let client = create_client(&None, &ProxyConfig::default()).unwrap();
        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
        assert_eq!(
            &healthcheck.await.unwrap_err().to_string(),
            "Queues are full"
        );
    }
    #[tokio::test]
    async fn test_build_healthcheck_500_response_returns_error() {
        let mock_server = MockServer::start().await;
        Mock::given(method("GET"))
            .and(path("/services/collector/health/1.0"))
            .and(header("Authorization", "Splunk token"))
            .respond_with(ResponseTemplate::new(500))
            .mount(&mock_server)
            .await;
        let client = create_client(&None, &ProxyConfig::default()).unwrap();
        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
        assert_eq!(
            &healthcheck.await.unwrap_err().to_string(),
            "Unexpected status: 500 Internal Server Error"
        );
    }
    #[tokio::test]
    async fn test_build_request_compression_none_returns_expected_request() {
        let endpoint = "http://localhost:8888";
        let token = "token";
        let compression = Compression::None;
        let events = Bytes::from("events");
        let http_request_builder = HttpRequestBuilder::new(
            String::from(endpoint),
            EndpointTarget::default(),
            String::from(token),
            compression,
        );
        let request = http_request_builder
            .build_request(
                events.clone(),
                "/services/collector/event",
                None,
                MetadataFields::default(),
                false,
            )
            .unwrap();
        assert_eq!(
            request.uri(),
            &Uri::from_static("http://localhost:8888/services/collector/event")
        );
        assert_eq!(
            request.headers().get("Content-Type"),
            Some(&HeaderValue::from_static("application/json"))
        );
        assert_eq!(
            request.headers().get("Authorization"),
            Some(&HeaderValue::from_static("Splunk token"))
        );
        assert_eq!(request.headers().get("Content-Encoding"), None);
        assert_eq!(request.body(), &events)
    }
    #[tokio::test]
    async fn test_build_request_compression_gzip_returns_expected_request() {
        let endpoint = "http://localhost:8888";
        let token = "token";
        let compression = Compression::gzip_default();
        let events = Bytes::from("events");
        let http_request_builder = HttpRequestBuilder::new(
            String::from(endpoint),
            EndpointTarget::default(),
            String::from(token),
            compression,
        );
        let request = http_request_builder
            .build_request(
                events.clone(),
                "/services/collector/event",
                None,
                MetadataFields::default(),
                false,
            )
            .unwrap();
        assert_eq!(
            request.uri(),
            &Uri::from_static("http://localhost:8888/services/collector/event")
        );
        assert_eq!(
            request.headers().get("Content-Type"),
            Some(&HeaderValue::from_static("application/json"))
        );
        assert_eq!(
            request.headers().get("Authorization"),
            Some(&HeaderValue::from_static("Splunk token"))
        );
        assert_eq!(
            request.headers().get("Content-Encoding"),
            Some(&HeaderValue::from_static("gzip"))
        );
        assert_eq!(request.body(), &events)
    }
    #[tokio::test]
    async fn test_build_request_uri_invalid_uri_returns_error() {
        let endpoint = "invalid";
        let token = "token";
        let compression = Compression::gzip_default();
        let events = Bytes::from("events");
        let http_request_builder = HttpRequestBuilder::new(
            String::from(endpoint),
            EndpointTarget::default(),
            String::from(token),
            compression,
        );
        let err = http_request_builder
            .build_request(
                events,
                "/services/collector/event",
                None,
                MetadataFields::default(),
                false,
            )
            .unwrap_err();
        assert_eq!(err.to_string(), "URI parse error: invalid format")
    }
    #[test]
    fn test_build_uri() {
        let query = [
            (HOST_FIELD, "zork flork".to_string()),
            (SOURCE_FIELD, "zam".to_string()),
        ];
        let uri = build_uri("http://sproink.com", "/thing/thang", query).unwrap();
        assert_eq!(
            "http://sproink.com/thing/thang?host=zork%20flork&source=zam"
                .parse::<Uri>()
                .unwrap(),
            uri
        );
    }
}
#[cfg(all(test, feature = "splunk-integration-tests"))]
mod integration_tests {
    use std::net::SocketAddr;
    use http::StatusCode;
    use tokio::time::Duration;
    use vector_lib::config::proxy::ProxyConfig;
    use warp::Filter;
    use super::{
        build_healthcheck, create_client,
        integration_test_helpers::{get_token, splunk_hec_address},
    };
    use crate::{
        assert_downcast_matches, sinks::splunk_hec::common::HealthcheckError,
        test_util::retry_until,
    };
    #[tokio::test]
    async fn splunk_healthcheck_ok() {
        let client = create_client(&None, &ProxyConfig::default()).unwrap();
        let address = splunk_hec_address();
        let token = get_token().await;
        retry_until(
            || build_healthcheck(address.clone(), token.clone(), client.clone()),
            Duration::from_millis(500),
            Duration::from_secs(30),
        )
        .await;
    }
    #[tokio::test]
    async fn splunk_healthcheck_server_not_listening() {
        let client = create_client(&None, &ProxyConfig::default()).unwrap();
        let healthcheck = build_healthcheck(
            "http://localhost:1111/".to_string(),
            get_token().await,
            client,
        );
        healthcheck.await.unwrap_err();
    }
    #[tokio::test]
    async fn splunk_healthcheck_server_unavailable() {
        let client = create_client(&None, &ProxyConfig::default()).unwrap();
        let healthcheck = build_healthcheck(
            "http://localhost:5503/".to_string(),
            get_token().await,
            client,
        );
        let unhealthy = warp::any()
            .map(|| warp::reply::with_status("i'm sad", StatusCode::SERVICE_UNAVAILABLE));
        let server = warp::serve(unhealthy).bind("0.0.0.0:5503".parse::<SocketAddr>().unwrap());
        tokio::spawn(server);
        assert_downcast_matches!(
            healthcheck.await.unwrap_err(),
            HealthcheckError,
            HealthcheckError::QueuesFull
        );
    }
}
#[cfg(all(test, feature = "splunk-integration-tests"))]
pub mod integration_test_helpers {
    use serde_json::Value as JsonValue;
    use tokio::time::Duration;
    use crate::test_util::retry_until;
    const USERNAME: &str = "admin";
    const PASSWORD: &str = "password";
    pub fn splunk_hec_address() -> String {
        std::env::var("SPLUNK_HEC_ADDRESS").unwrap_or_else(|_| "http://localhost:8088".into())
    }
    pub fn splunk_api_address() -> String {
        std::env::var("SPLUNK_API_ADDRESS").unwrap_or_else(|_| "https://localhost:8089".into())
    }
    pub async fn get_token() -> String {
        let client = reqwest::Client::builder()
            .danger_accept_invalid_certs(true)
            .build()
            .unwrap();
        let res = retry_until(
            || {
                client
                    .get(format!(
                        "{}/services/data/inputs/http?output_mode=json",
                        splunk_api_address()
                    ))
                    .basic_auth(USERNAME, Some(PASSWORD))
                    .send()
            },
            Duration::from_millis(500),
            Duration::from_secs(30),
        )
        .await;
        let json: JsonValue = res.json().await.unwrap();
        let entries = json["entry"].as_array().unwrap().clone();
        if entries.is_empty() {
            panic!("You don't have any HTTP Event Collector inputs set up in Splunk");
        }
        entries[0]["content"]["token"].as_str().unwrap().to_owned()
    }
}