use bytes::Bytes;
use chrono::{DateTime, Utc};
use derivative::Derivative;
use lookup::{event_path, owned_value_path};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, TimestampSecondsWithFrac};
use smallvec::{smallvec, SmallVec};
use std::collections::HashMap;
use vector_config::configurable_component;
use vector_core::config::LogNamespace;
use vector_core::{
    config::{log_schema, DataType},
    event::Event,
    event::LogEvent,
    schema,
};
use vrl::value::kind::Collection;
use vrl::value::{Kind, Value};
use super::{default_lossy, Deserializer};
use crate::gelf::GELF_TARGET_PATHS;
use crate::{gelf_fields::*, VALID_FIELD_REGEX};
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct GelfDeserializerConfig {
    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
    pub gelf: GelfDeserializerOptions,
}
impl GelfDeserializerConfig {
    pub fn new(options: GelfDeserializerOptions) -> Self {
        Self { gelf: options }
    }
    pub fn build(&self) -> GelfDeserializer {
        GelfDeserializer {
            lossy: self.gelf.lossy,
        }
    }
    pub fn output_type(&self) -> DataType {
        DataType::Log
    }
    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
        schema::Definition::new_with_default_metadata(
            Kind::object(Collection::empty()),
            [log_namespace],
        )
        .with_event_field(&owned_value_path!(VERSION), Kind::bytes(), None)
        .with_event_field(&owned_value_path!(HOST), Kind::bytes(), None)
        .with_event_field(&owned_value_path!(SHORT_MESSAGE), Kind::bytes(), None)
        .optional_field(&owned_value_path!(FULL_MESSAGE), Kind::bytes(), None)
        .optional_field(&owned_value_path!(TIMESTAMP), Kind::timestamp(), None)
        .optional_field(&owned_value_path!(LEVEL), Kind::integer(), None)
        .optional_field(&owned_value_path!(FACILITY), Kind::bytes(), None)
        .optional_field(&owned_value_path!(LINE), Kind::integer(), None)
        .optional_field(&owned_value_path!(FILE), Kind::bytes(), None)
        .unknown_fields(Kind::bytes().or_integer().or_float())
    }
}
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct GelfDeserializerOptions {
    #[serde(
        default = "default_lossy",
        skip_serializing_if = "vector_core::serde::is_default"
    )]
    #[derivative(Default(value = "default_lossy()"))]
    pub lossy: bool,
}
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct GelfDeserializer {
    #[derivative(Default(value = "default_lossy()"))]
    lossy: bool,
}
impl GelfDeserializer {
    pub fn new(lossy: bool) -> GelfDeserializer {
        GelfDeserializer { lossy }
    }
    fn message_to_event(&self, parsed: &GelfMessage) -> vector_common::Result<Event> {
        let mut log = LogEvent::from_str_legacy(parsed.short_message.to_string());
        if parsed.version != GELF_VERSION {
            return Err(format!(
                "{} does not match GELF spec version ({})",
                VERSION, GELF_VERSION
            )
            .into());
        }
        log.insert(&GELF_TARGET_PATHS.version, parsed.version.to_string());
        log.insert(&GELF_TARGET_PATHS.host, parsed.host.to_string());
        if let Some(full_message) = &parsed.full_message {
            log.insert(&GELF_TARGET_PATHS.full_message, full_message.to_string());
        }
        if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
            if let Some(timestamp) = parsed.timestamp {
                log.insert(timestamp_key, timestamp);
                } else {
                log.insert(timestamp_key, Utc::now());
            }
        }
        if let Some(level) = parsed.level {
            log.insert(&GELF_TARGET_PATHS.level, level);
        }
        if let Some(facility) = &parsed.facility {
            log.insert(&GELF_TARGET_PATHS.facility, facility.to_string());
        }
        if let Some(line) = parsed.line {
            log.insert(
                &GELF_TARGET_PATHS.line,
                Value::Float(ordered_float::NotNan::new(line).expect("JSON doesn't allow NaNs")),
            );
        }
        if let Some(file) = &parsed.file {
            log.insert(&GELF_TARGET_PATHS.file, file.to_string());
        }
        if let Some(add) = &parsed.additional_fields {
            for (key, val) in add.iter() {
                if key == "_id" {
                    continue;
                }
                if !key.starts_with('_') {
                    return Err(format!(
                        "'{}' field is invalid. \
                                       Additional field names must be prefixed with an underscore.",
                        key
                    )
                    .into());
                }
                if !VALID_FIELD_REGEX.is_match(key) {
                    return Err(format!("'{}' field contains invalid characters. Field names may \
                                       contain only letters, numbers, underscores, dashes and dots.", key).into());
                }
                if val.is_string() || val.is_number() {
                    let vector_val: Value = val.into();
                    log.insert(event_path!(key.as_str()), vector_val);
                } else {
                    let type_ = match val {
                        serde_json::Value::Null => "null",
                        serde_json::Value::Bool(_) => "boolean",
                        serde_json::Value::Number(_) => "number",
                        serde_json::Value::String(_) => "string",
                        serde_json::Value::Array(_) => "array",
                        serde_json::Value::Object(_) => "object",
                    };
                    return Err(format!("The value type for field {} is an invalid type ({}). Additional field values \
                                       should be either strings or numbers.", key, type_).into());
                }
            }
        }
        Ok(Event::Log(log))
    }
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
struct GelfMessage {
    version: String,
    host: String,
    short_message: String,
    full_message: Option<String>,
    #[serde_as(as = "Option<TimestampSecondsWithFrac<f64>>")]
    timestamp: Option<DateTime<Utc>>,
    level: Option<u8>,
    facility: Option<String>,
    line: Option<f64>,
    file: Option<String>,
    #[serde(flatten)]
    additional_fields: Option<HashMap<String, serde_json::Value>>,
}
impl Deserializer for GelfDeserializer {
    fn parse(
        &self,
        bytes: Bytes,
        _log_namespace: LogNamespace,
    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
        let parsed: GelfMessage = match self.lossy {
            true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
            false => serde_json::from_slice(&bytes),
        }?;
        let event = self.message_to_event(&parsed)?;
        Ok(smallvec![event])
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use bytes::Bytes;
    use lookup::event_path;
    use serde_json::json;
    use similar_asserts::assert_eq;
    use smallvec::SmallVec;
    use vector_core::{config::log_schema, event::Event};
    use vrl::value::Value;
    fn deserialize_gelf_input(
        input: &serde_json::Value,
    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
        let config = GelfDeserializerConfig::default();
        let deserializer = config.build();
        let buffer = Bytes::from(serde_json::to_vec(&input).unwrap());
        deserializer.parse(buffer, LogNamespace::Legacy)
    }
    #[test]
    fn gelf_deserialize_correctness() {
        let add_on_int_in = "_an.add-field_int";
        let add_on_str_in = "_an.add-field_str";
        let input = json!({
            VERSION: "1.1",
            HOST: "example.org",
            SHORT_MESSAGE: "A short message that helps you identify what is going on",
            FULL_MESSAGE: "Backtrace here\n\nmore stuff",
            TIMESTAMP: 1385053862.3072,
            LEVEL: 1,
            FACILITY: "foo",
            LINE: 42,
            FILE: "/tmp/bar",
            add_on_int_in: 2001.1002,
            add_on_str_in: "A Space Odyssey",
        });
        let events = deserialize_gelf_input(&input).unwrap();
        assert_eq!(events.len(), 1);
        let log = events[0].as_log();
        assert_eq!(
            log.get(VERSION),
            Some(&Value::Bytes(Bytes::from_static(b"1.1")))
        );
        assert_eq!(
            log.get(HOST),
            Some(&Value::Bytes(Bytes::from_static(b"example.org")))
        );
        assert_eq!(
            log.get(log_schema().message_key_target_path().unwrap()),
            Some(&Value::Bytes(Bytes::from_static(
                b"A short message that helps you identify what is going on"
            )))
        );
        assert_eq!(
            log.get(FULL_MESSAGE),
            Some(&Value::Bytes(Bytes::from_static(
                b"Backtrace here\n\nmore stuff"
            )))
        );
        let dt = DateTime::from_timestamp(1385053862, 307_200_000).expect("invalid timestamp");
        assert_eq!(log.get(TIMESTAMP), Some(&Value::Timestamp(dt)));
        assert_eq!(log.get(LEVEL), Some(&Value::Integer(1)));
        assert_eq!(
            log.get(FACILITY),
            Some(&Value::Bytes(Bytes::from_static(b"foo")))
        );
        assert_eq!(
            log.get(LINE),
            Some(&Value::Float(ordered_float::NotNan::new(42.0).unwrap()))
        );
        assert_eq!(
            log.get(FILE),
            Some(&Value::Bytes(Bytes::from_static(b"/tmp/bar")))
        );
        assert_eq!(
            log.get(event_path!(add_on_int_in)),
            Some(&Value::Float(
                ordered_float::NotNan::new(2001.1002).unwrap()
            ))
        );
        assert_eq!(
            log.get(event_path!(add_on_str_in)),
            Some(&Value::Bytes(Bytes::from_static(b"A Space Odyssey")))
        );
    }
    #[test]
    fn gelf_deserializing_edge_cases() {
        {
            let input = json!({
                HOST: "example.org",
                SHORT_MESSAGE: "foobar",
                VERSION: "1.1",
            });
            let events = deserialize_gelf_input(&input).unwrap();
            assert_eq!(events.len(), 1);
            let log = events[0].as_log();
            assert!(log.contains(log_schema().message_key_target_path().unwrap()));
        }
        {
            let input = json!({
                HOST: "example.org",
                SHORT_MESSAGE: "foobar",
                VERSION: "1.1",
                "_id": "S3creTz",
            });
            let events = deserialize_gelf_input(&input).unwrap();
            assert_eq!(events.len(), 1);
            let log = events[0].as_log();
            assert!(!log.contains(event_path!("_id")));
        }
    }
    #[test]
    fn gelf_deserializing_err() {
        fn validate_err(input: &serde_json::Value) {
            assert!(deserialize_gelf_input(input).is_err());
        }
        validate_err(&json!({
            HOST: "example.org",
            SHORT_MESSAGE: "foobar",
            VERSION: "1.1",
            "_bad%key": "raboof",
        }));
        validate_err(&json!({
            HOST: "example.org",
            SHORT_MESSAGE: "foobar",
            VERSION: "1.1",
            "bad-key": "raboof",
        }));
        validate_err(&json!({
            HOST: "example.org",
            VERSION: "1.1",
        }));
        validate_err(&json!({
            SHORT_MESSAGE: "foobar",
            VERSION: "1.1",
        }));
        validate_err(&json!({
            HOST: 42,
            SHORT_MESSAGE: "foobar",
            VERSION: "1.1",
        }));
        validate_err(&json!({
            HOST: "example.org",
            VERSION: "1.1",
            SHORT_MESSAGE: "foobar",
            LEVEL: "baz",
        }));
    }
}