use async_trait::async_trait;
use futures::stream::{BoxStream, StreamExt};
use indoc::indoc;
use vector_lib::configurable::configurable_component;
use vector_lib::sensitive_string::SensitiveString;
use vrl::event_path;
use super::Region;
use crate::{
    codecs::Transformer,
    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
    event::EventArray,
    sinks::{
        elasticsearch::{BulkConfig, ElasticsearchApiVersion, ElasticsearchConfig},
        util::{
            http::RequestConfig, BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
            StreamSink, TowerRequestConfig,
        },
        Healthcheck, VectorSink,
    },
    template::Template,
};
#[configurable_component(sink("sematext_logs", "Publish log events to Sematext."))]
#[derive(Clone, Debug)]
pub struct SematextLogsConfig {
    #[serde(default = "super::default_region")]
    #[configurable(derived)]
    region: Region,
    #[serde(alias = "host")]
    #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
    #[configurable(metadata(docs::examples = "https://example.com"))]
    endpoint: Option<String>,
    #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
    #[configurable(metadata(docs::examples = "some-sematext-token"))]
    token: SensitiveString,
    #[configurable(derived)]
    #[serde(skip_serializing_if = "crate::serde::is_default", default)]
    pub encoding: Transformer,
    #[configurable(derived)]
    #[serde(default)]
    request: TowerRequestConfig,
    #[configurable(derived)]
    #[serde(default)]
    batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    acknowledgements: AcknowledgementsConfig,
}
impl GenerateConfig for SematextLogsConfig {
    fn generate_config() -> toml::Value {
        toml::from_str(indoc! {r#"
            token = "${SEMATEXT_TOKEN}"
        "#})
        .unwrap()
    }
}
const US_ENDPOINT: &str = "https://logsene-receiver.sematext.com";
const EU_ENDPOINT: &str = "https://logsene-receiver.eu.sematext.com";
#[async_trait::async_trait]
#[typetag::serde(name = "sematext_logs")]
impl SinkConfig for SematextLogsConfig {
    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
        let endpoint = match (&self.endpoint, &self.region) {
            (Some(endpoint), _) => endpoint.clone(),
            (None, Region::Us) => US_ENDPOINT.to_owned(),
            (None, Region::Eu) => EU_ENDPOINT.to_owned(),
        };
        let (sink, healthcheck) = ElasticsearchConfig {
            endpoints: vec![endpoint],
            compression: Compression::None,
            doc_type: "\
                logs"
                .to_string(),
            bulk: BulkConfig {
                index: Template::try_from(self.token.inner())
                    .expect("unable to parse token as Template"),
                ..Default::default()
            },
            batch: self.batch,
            request: RequestConfig {
                tower: self.request,
                ..Default::default()
            },
            encoding: self.encoding.clone(),
            api_version: ElasticsearchApiVersion::V6,
            ..Default::default()
        }
        .build(cx)
        .await?;
        let stream = sink.into_stream();
        let mapped_stream = MapTimestampStream { inner: stream };
        Ok((VectorSink::Stream(Box::new(mapped_stream)), healthcheck))
    }
    fn input(&self) -> Input {
        Input::log()
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}
struct MapTimestampStream {
    inner: Box<dyn StreamSink<EventArray> + Send>,
}
#[async_trait]
impl StreamSink<EventArray> for MapTimestampStream {
    async fn run(self: Box<Self>, input: BoxStream<'_, EventArray>) -> Result<(), ()> {
        let mapped_input = input.map(map_timestamp).boxed();
        self.inner.run(mapped_input).await
    }
}
fn map_timestamp(mut events: EventArray) -> EventArray {
    match &mut events {
        EventArray::Logs(logs) => {
            for log in logs {
                if let Some(path) = log.timestamp_path().cloned().as_ref() {
                    log.rename_key(path, event_path!("@timestamp"));
                }
                if let Some(path) = log.host_path().cloned().as_ref() {
                    log.rename_key(path, event_path!("os.host"));
                }
            }
        }
        _ => unreachable!("This sink only accepts logs"),
    }
    events
}
#[cfg(test)]
mod tests {
    use futures::StreamExt;
    use indoc::indoc;
    use super::*;
    use crate::{
        config::SinkConfig,
        sinks::util::test::{build_test_server, load_sink},
        test_util::{
            components::{self, HTTP_SINK_TAGS},
            next_addr, random_lines_with_stream,
        },
    };
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<SematextLogsConfig>();
    }
    #[tokio::test]
    async fn smoke() {
        let (mut config, cx) = load_sink::<SematextLogsConfig>(indoc! {r#"
            token = "mylogtoken"
        "#})
        .unwrap();
        _ = config.build(cx.clone()).await.unwrap();
        let addr = next_addr();
        config.endpoint = Some(format!("http://{}", addr));
        let (sink, _) = config.build(cx).await.unwrap();
        let (mut rx, _trigger, server) = build_test_server(addr);
        tokio::spawn(server);
        let (expected, events) = random_lines_with_stream(100, 10, None);
        components::run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
        let output = rx.next().await.unwrap();
        let json = serde_json::Deserializer::from_slice(&output.1[..])
            .into_iter::<serde_json::Value>()
            .map(|v| v.expect("decoding json"));
        let mut expected_message_idx = 0;
        for (i, val) in json.enumerate() {
            if i % 2 == 0 {
                let token = val
                    .get("index")
                    .unwrap()
                    .get("_index")
                    .unwrap()
                    .as_str()
                    .unwrap();
                assert_eq!(token, "mylogtoken");
            } else {
                let message = val.get("message").unwrap().as_str().unwrap();
                assert_eq!(message, &expected[expected_message_idx]);
                expected_message_idx += 1;
            }
        }
    }
}