use std::time::SystemTime;
use bytes::Bytes;
use futures::{FutureExt, SinkExt};
use http::{Request, StatusCode, Uri};
use serde_json::json;
use vector_lib::configurable::configurable_component;
use vector_lib::sensitive_string::SensitiveString;
use vrl::event_path;
use vrl::value::{Kind, Value};
use crate::{
    codecs::Transformer,
    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
    event::Event,
    http::{Auth, HttpClient},
    schema,
    sinks::util::{
        http::{HttpEventEncoder, HttpSink, PartitionHttpSink},
        BatchConfig, BoxedRawValue, JsonArrayBuffer, PartitionBuffer, PartitionInnerBuffer,
        RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig, UriSerde,
    },
    template::{Template, TemplateRenderingError},
};
const PATH: &str = "/logs/ingest";
#[configurable_component(sink("logdna", "Deliver log event data to LogDNA."))]
#[configurable(metadata(
    deprecated = "The `logdna` sink has been renamed. Please use `mezmo` instead."
))]
#[derive(Clone, Debug)]
pub struct LogdnaConfig(MezmoConfig);
impl GenerateConfig for LogdnaConfig {
    fn generate_config() -> toml::Value {
        <MezmoConfig as GenerateConfig>::generate_config()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "logdna")]
impl SinkConfig for LogdnaConfig {
    async fn build(
        &self,
        cx: SinkContext,
    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
        warn!("DEPRECATED: The `logdna` sink has been renamed. Please use `mezmo` instead.");
        self.0.build(cx).await
    }
    fn input(&self) -> Input {
        self.0.input()
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        self.0.acknowledgements()
    }
}
#[configurable_component(sink("mezmo", "Deliver log event data to Mezmo."))]
#[derive(Clone, Debug)]
pub struct MezmoConfig {
    #[configurable(metadata(docs::examples = "${LOGDNA_API_KEY}"))]
    #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
    api_key: SensitiveString,
    #[serde(alias = "host")]
    #[serde(default = "default_endpoint")]
    #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
    #[configurable(metadata(docs::examples = "http://example.com"))]
    endpoint: UriSerde,
    #[configurable(metadata(docs::examples = "${HOSTNAME}"))]
    #[configurable(metadata(docs::examples = "my-local-machine"))]
    hostname: Template,
    #[configurable(metadata(docs::examples = "my-mac-address"))]
    #[configurable(metadata(docs::human_name = "MAC Address"))]
    mac: Option<String>,
    #[configurable(metadata(docs::examples = "0.0.0.0"))]
    #[configurable(metadata(docs::human_name = "IP Address"))]
    ip: Option<String>,
    #[configurable(metadata(docs::examples = "tag1"))]
    #[configurable(metadata(docs::examples = "tag2"))]
    tags: Option<Vec<Template>>,
    #[configurable(derived)]
    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
    pub encoding: Transformer,
    #[serde(default = "default_app")]
    #[configurable(metadata(docs::examples = "my-app"))]
    default_app: String,
    #[serde(default = "default_env")]
    #[configurable(metadata(docs::examples = "staging"))]
    default_env: String,
    #[configurable(derived)]
    #[serde(default)]
    batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
    #[configurable(derived)]
    #[serde(default)]
    request: TowerRequestConfig,
    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    acknowledgements: AcknowledgementsConfig,
}
fn default_endpoint() -> UriSerde {
    UriSerde {
        uri: Uri::from_static("https://logs.mezmo.com"),
        auth: None,
    }
}
fn default_app() -> String {
    "vector".to_owned()
}
fn default_env() -> String {
    "production".to_owned()
}
impl GenerateConfig for MezmoConfig {
    fn generate_config() -> toml::Value {
        toml::from_str(
            r#"hostname = "hostname"
            api_key = "${LOGDNA_API_KEY}""#,
        )
        .unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "mezmo")]
impl SinkConfig for MezmoConfig {
    async fn build(
        &self,
        cx: SinkContext,
    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
        let request_settings = self.request.into_settings();
        let batch_settings = self.batch.into_batch_settings()?;
        let client = HttpClient::new(None, cx.proxy())?;
        let sink = PartitionHttpSink::new(
            self.clone(),
            PartitionBuffer::new(JsonArrayBuffer::new(batch_settings.size)),
            request_settings,
            batch_settings.timeout,
            client.clone(),
        )
        .sink_map_err(|error| error!(message = "Fatal mezmo sink error.", %error));
        let healthcheck = healthcheck(self.clone(), client).boxed();
        #[allow(deprecated)]
        Ok((super::VectorSink::from_event_sink(sink), healthcheck))
    }
    fn input(&self) -> Input {
        let requirement = schema::Requirement::empty()
            .optional_meaning("timestamp", Kind::timestamp())
            .optional_meaning("message", Kind::bytes());
        Input::log().with_schema_requirement(requirement)
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}
#[derive(Hash, Eq, PartialEq, Clone)]
pub struct PartitionKey {
    hostname: String,
    tags: Option<Vec<String>>,
}
pub struct MezmoEventEncoder {
    hostname: Template,
    tags: Option<Vec<Template>>,
    transformer: Transformer,
    default_app: String,
    default_env: String,
}
impl MezmoEventEncoder {
    fn render_key(
        &self,
        event: &Event,
    ) -> Result<PartitionKey, (Option<&str>, TemplateRenderingError)> {
        let hostname = self
            .hostname
            .render_string(event)
            .map_err(|e| (Some("hostname"), e))?;
        let tags = self
            .tags
            .as_ref()
            .map(|tags| {
                let mut vec = Vec::with_capacity(tags.len());
                for tag in tags {
                    vec.push(tag.render_string(event).map_err(|e| (None, e))?);
                }
                Ok(Some(vec))
            })
            .unwrap_or(Ok(None))?;
        Ok(PartitionKey { hostname, tags })
    }
}
impl HttpEventEncoder<PartitionInnerBuffer<serde_json::Value, PartitionKey>> for MezmoEventEncoder {
    fn encode_event(
        &mut self,
        mut event: Event,
    ) -> Option<PartitionInnerBuffer<serde_json::Value, PartitionKey>> {
        let key = self
            .render_key(&event)
            .map_err(|(field, error)| {
                emit!(crate::internal_events::TemplateRenderingError {
                    error,
                    field,
                    drop_event: true,
                });
            })
            .ok()?;
        self.transformer.transform(&mut event);
        let mut log = event.into_log();
        let line = log
            .message_path()
            .cloned()
            .as_ref()
            .and_then(|path| log.remove(path))
            .unwrap_or_else(|| String::from("").into());
        let timestamp: Value = log
            .timestamp_path()
            .cloned()
            .and_then(|path| log.remove(&path))
            .unwrap_or_else(|| chrono::Utc::now().into());
        let mut map = serde_json::map::Map::new();
        map.insert("line".to_string(), json!(line));
        map.insert("timestamp".to_string(), json!(timestamp));
        if let Some(env) = log.remove(event_path!("env")) {
            map.insert("env".to_string(), json!(env));
        }
        if let Some(app) = log.remove(event_path!("app")) {
            map.insert("app".to_string(), json!(app));
        }
        if let Some(file) = log.remove(event_path!("file")) {
            map.insert("file".to_string(), json!(file));
        }
        if !map.contains_key("env") {
            map.insert("env".to_string(), json!(self.default_env));
        }
        if !map.contains_key("app") && !map.contains_key("file") {
            map.insert("app".to_string(), json!(self.default_app.as_str()));
        }
        if !log.is_empty_object() {
            map.insert("meta".into(), json!(&log));
        }
        Some(PartitionInnerBuffer::new(map.into(), key))
    }
}
impl HttpSink for MezmoConfig {
    type Input = PartitionInnerBuffer<serde_json::Value, PartitionKey>;
    type Output = PartitionInnerBuffer<Vec<BoxedRawValue>, PartitionKey>;
    type Encoder = MezmoEventEncoder;
    fn build_encoder(&self) -> Self::Encoder {
        MezmoEventEncoder {
            hostname: self.hostname.clone(),
            tags: self.tags.clone(),
            transformer: self.encoding.clone(),
            default_app: self.default_app.clone(),
            default_env: self.default_env.clone(),
        }
    }
    async fn build_request(&self, output: Self::Output) -> crate::Result<http::Request<Bytes>> {
        let (events, key) = output.into_parts();
        let mut query = url::form_urlencoded::Serializer::new(String::new());
        let now = SystemTime::now()
            .duration_since(SystemTime::UNIX_EPOCH)
            .expect("Time can't drift behind the epoch!")
            .as_millis();
        query.append_pair("hostname", &key.hostname);
        query.append_pair("now", &now.to_string());
        if let Some(mac) = &self.mac {
            query.append_pair("mac", mac);
        }
        if let Some(ip) = &self.ip {
            query.append_pair("ip", ip);
        }
        if let Some(tags) = &key.tags {
            let tags = tags.join(",");
            query.append_pair("tags", &tags);
        }
        let query = query.finish();
        let body = crate::serde::json::to_bytes(&json!({
            "lines": events,
        }))
        .unwrap()
        .freeze();
        let uri = self.build_uri(&query);
        let mut request = Request::builder()
            .uri(uri)
            .method("POST")
            .header("Content-Type", "application/json")
            .body(body)
            .unwrap();
        let auth = Auth::Basic {
            user: self.api_key.inner().to_string(),
            password: SensitiveString::default(),
        };
        auth.apply(&mut request);
        Ok(request)
    }
}
impl MezmoConfig {
    fn build_uri(&self, query: &str) -> Uri {
        let host = &self.endpoint.uri;
        let uri = format!("{}{}?{}", host, PATH, query);
        uri.parse::<http::Uri>()
            .expect("This should be a valid uri")
    }
}
async fn healthcheck(config: MezmoConfig, client: HttpClient) -> crate::Result<()> {
    let uri = config.build_uri("");
    let req = Request::post(uri).body(hyper::Body::empty()).unwrap();
    let res = client.send(req).await?;
    if res.status().is_server_error() {
        return Err("Server returned a server error".into());
    }
    if res.status() == StatusCode::FORBIDDEN {
        return Err("Token is not valid, 403 returned.".into());
    }
    Ok(())
}
#[cfg(test)]
mod tests {
    use futures::{channel::mpsc, StreamExt};
    use futures_util::stream;
    use http::{request::Parts, StatusCode};
    use serde_json::json;
    use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
    use super::*;
    use crate::{
        config::SinkConfig,
        sinks::util::test::{build_test_server_status, load_sink},
        test_util::{
            components::{assert_sink_compliance, HTTP_SINK_TAGS},
            next_addr, random_lines,
        },
    };
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<MezmoConfig>();
    }
    #[test]
    fn encode_event() {
        let (config, _cx) = load_sink::<MezmoConfig>(
            r#"
            api_key = "mylogtoken"
            hostname = "vector"
            default_env = "acceptance"
            codec.except_fields = ["magic"]
        "#,
        )
        .unwrap();
        let mut encoder = config.build_encoder();
        let mut event1 = Event::Log(LogEvent::from("hello world"));
        event1.as_mut_log().insert("app", "notvector");
        event1.as_mut_log().insert("magic", "vector");
        let mut event2 = Event::Log(LogEvent::from("hello world"));
        event2.as_mut_log().insert("file", "log.txt");
        let event3 = Event::Log(LogEvent::from("hello world"));
        let mut event4 = Event::Log(LogEvent::from("hello world"));
        event4.as_mut_log().insert("env", "staging");
        let event1_out = encoder.encode_event(event1).unwrap().into_parts().0;
        let event1_out = event1_out.as_object().unwrap();
        let event2_out = encoder.encode_event(event2).unwrap().into_parts().0;
        let event2_out = event2_out.as_object().unwrap();
        let event3_out = encoder.encode_event(event3).unwrap().into_parts().0;
        let event3_out = event3_out.as_object().unwrap();
        let event4_out = encoder.encode_event(event4).unwrap().into_parts().0;
        let event4_out = event4_out.as_object().unwrap();
        assert_eq!(event1_out.get("app").unwrap(), &json!("notvector"));
        assert_eq!(event2_out.get("file").unwrap(), &json!("log.txt"));
        assert_eq!(event3_out.get("app").unwrap(), &json!("vector"));
        assert_eq!(event3_out.get("env").unwrap(), &json!("acceptance"));
        assert_eq!(event4_out.get("env").unwrap(), &json!("staging"));
    }
    async fn smoke_start(
        status_code: StatusCode,
        batch_status: BatchStatus,
    ) -> (
        Vec<&'static str>,
        Vec<Vec<String>>,
        mpsc::Receiver<(Parts, bytes::Bytes)>,
    ) {
        let (mut config, cx) = load_sink::<MezmoConfig>(
            r#"
            api_key = "mylogtoken"
            ip = "127.0.0.1"
            mac = "some-mac-addr"
            hostname = "{{ hostname }}"
            tags = ["test","maybeanothertest"]
        "#,
        )
        .unwrap();
        _ = config.build(cx.clone()).await.unwrap();
        let addr = next_addr();
        let endpoint = UriSerde {
            uri: format!("http://{}", addr).parse::<http::Uri>().unwrap(),
            auth: None,
        };
        config.endpoint = endpoint;
        let (sink, _) = config.build(cx).await.unwrap();
        let (rx, _trigger, server) = build_test_server_status(addr, status_code);
        tokio::spawn(server);
        let lines = random_lines(100).take(10).collect::<Vec<_>>();
        let mut events = Vec::new();
        let hosts = vec!["host0", "host1"];
        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
        let mut partitions = vec![Vec::new(), Vec::new()];
        for (i, line) in lines.iter().enumerate() {
            let mut event = LogEvent::from(line.as_str()).with_batch_notifier(&batch);
            let p = i % 2;
            event.insert("hostname", hosts[p]);
            partitions[p].push(line.into());
            events.push(Event::Log(event));
        }
        drop(batch);
        let events = stream::iter(events).map(Into::into);
        sink.run(events).await.expect("Running sink failed");
        assert_eq!(receiver.try_recv(), Ok(batch_status));
        (hosts, partitions, rx)
    }
    #[tokio::test]
    async fn smoke_fails() {
        let (_hosts, _partitions, mut rx) =
            smoke_start(StatusCode::FORBIDDEN, BatchStatus::Rejected).await;
        assert!(matches!(rx.try_next(), Err(mpsc::TryRecvError { .. })));
    }
    #[tokio::test]
    async fn smoke() {
        assert_sink_compliance(&HTTP_SINK_TAGS, async {
            let (hosts, partitions, mut rx) =
                smoke_start(StatusCode::OK, BatchStatus::Delivered).await;
            for _ in 0..partitions.len() {
                let output = rx.next().await.unwrap();
                let request = &output.0;
                let body: serde_json::Value = serde_json::from_slice(&output.1[..]).unwrap();
                let query = request.uri.query().unwrap();
                let (p, host) = hosts
                    .iter()
                    .enumerate()
                    .find(|(_, host)| query.contains(&format!("hostname={}", host)))
                    .expect("invalid hostname");
                let lines = &partitions[p];
                assert!(query.contains("ip=127.0.0.1"));
                assert!(query.contains("mac=some-mac-addr"));
                assert!(query.contains("tags=test%2Cmaybeanothertest"));
                let output = body
                    .as_object()
                    .unwrap()
                    .get("lines")
                    .unwrap()
                    .as_array()
                    .unwrap();
                for (i, line) in output.iter().enumerate() {
                    let line = line.as_object().unwrap();
                    assert_eq!(line.get("app").unwrap(), &json!("vector"));
                    assert_eq!(line.get("env").unwrap(), &json!("production"));
                    assert_eq!(line.get("line").unwrap(), &json!(lines[i]));
                    assert_eq!(
                        line.get("meta").unwrap(),
                        &json!({
                            "hostname": host,
                        })
                    );
                }
            }
        })
        .await;
    }
}