use bytes::Bytes;
use chrono::{DateTime, Datelike, Utc};
use derivative::Derivative;
use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath};
use smallvec::{smallvec, SmallVec};
use std::borrow::Cow;
use syslog_loose::{IncompleteDate, Message, ProcId, Protocol, Variant};
use vector_config::configurable_component;
use vector_core::config::{LegacyKey, LogNamespace};
use vector_core::{
    config::{log_schema, DataType},
    event::{Event, LogEvent, ObjectMap, Value},
    schema,
};
use vrl::value::{kind::Collection, Kind};
use super::{default_lossy, Deserializer};
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct SyslogDeserializerConfig {
    #[serde(skip)]
    source: Option<&'static str>,
    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
    pub syslog: SyslogDeserializerOptions,
}
impl SyslogDeserializerConfig {
    pub fn new(options: SyslogDeserializerOptions) -> Self {
        Self {
            source: None,
            syslog: options,
        }
    }
    pub fn from_source(source: &'static str) -> Self {
        Self {
            source: Some(source),
            ..Default::default()
        }
    }
    pub const fn build(&self) -> SyslogDeserializer {
        SyslogDeserializer {
            source: self.source,
            lossy: self.syslog.lossy,
        }
    }
    pub fn output_type(&self) -> DataType {
        DataType::Log
    }
    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
        match (log_namespace, self.source) {
            (LogNamespace::Legacy, _) => {
                let mut definition = schema::Definition::empty_legacy_namespace()
                    .with_event_field(
                        log_schema().message_key().expect("valid message key"),
                        Kind::bytes(),
                        Some("message"),
                    );
                if let Some(timestamp_key) = log_schema().timestamp_key() {
                    definition = definition.optional_field(
                        timestamp_key,
                        Kind::timestamp(),
                        Some("timestamp"),
                    )
                }
                definition = definition
                    .optional_field(&owned_value_path!("hostname"), Kind::bytes(), Some("host"))
                    .optional_field(
                        &owned_value_path!("severity"),
                        Kind::bytes(),
                        Some("severity"),
                    )
                    .optional_field(&owned_value_path!("facility"), Kind::bytes(), None)
                    .optional_field(&owned_value_path!("version"), Kind::integer(), None)
                    .optional_field(
                        &owned_value_path!("appname"),
                        Kind::bytes(),
                        Some("service"),
                    )
                    .optional_field(&owned_value_path!("msgid"), Kind::bytes(), None)
                    .optional_field(
                        &owned_value_path!("procid"),
                        Kind::integer().or_bytes(),
                        None,
                    )
                    .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())));
                if self.source.is_some() {
                    definition.optional_field(&owned_value_path!("source_ip"), Kind::bytes(), None)
                } else {
                    definition
                }
            }
            (LogNamespace::Vector, None) => {
                schema::Definition::new_with_default_metadata(
                    Kind::object(Collection::empty()),
                    [log_namespace],
                )
                .with_event_field(
                    &owned_value_path!("message"),
                    Kind::bytes(),
                    Some("message"),
                )
                .optional_field(
                    &owned_value_path!("timestamp"),
                    Kind::timestamp(),
                    Some("timestamp"),
                )
                .optional_field(&owned_value_path!("hostname"), Kind::bytes(), Some("host"))
                .optional_field(
                    &owned_value_path!("severity"),
                    Kind::bytes(),
                    Some("severity"),
                )
                .optional_field(&owned_value_path!("facility"), Kind::bytes(), None)
                .optional_field(&owned_value_path!("version"), Kind::integer(), None)
                .optional_field(
                    &owned_value_path!("appname"),
                    Kind::bytes(),
                    Some("service"),
                )
                .optional_field(&owned_value_path!("msgid"), Kind::bytes(), None)
                .optional_field(
                    &owned_value_path!("procid"),
                    Kind::integer().or_bytes(),
                    None,
                )
                .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
            }
            (LogNamespace::Vector, Some(source)) => {
                schema::Definition::new_with_default_metadata(Kind::bytes(), [log_namespace])
                    .with_meaning(OwnedTargetPath::event_root(), "message")
                    .with_source_metadata(
                        source,
                        None,
                        &owned_value_path!("timestamp"),
                        Kind::timestamp(),
                        Some("timestamp"),
                    )
                    .with_source_metadata(
                        source,
                        None,
                        &owned_value_path!("hostname"),
                        Kind::bytes().or_undefined(),
                        Some("host"),
                    )
                    .with_source_metadata(
                        source,
                        None,
                        &owned_value_path!("source_ip"),
                        Kind::bytes().or_undefined(),
                        None,
                    )
                    .with_source_metadata(
                        source,
                        None,
                        &owned_value_path!("severity"),
                        Kind::bytes().or_undefined(),
                        Some("severity"),
                    )
                    .with_source_metadata(
                        source,
                        None,
                        &owned_value_path!("facility"),
                        Kind::bytes().or_undefined(),
                        None,
                    )
                    .with_source_metadata(
                        source,
                        None,
                        &owned_value_path!("version"),
                        Kind::integer().or_undefined(),
                        None,
                    )
                    .with_source_metadata(
                        source,
                        None,
                        &owned_value_path!("appname"),
                        Kind::bytes().or_undefined(),
                        Some("service"),
                    )
                    .with_source_metadata(
                        source,
                        None,
                        &owned_value_path!("msgid"),
                        Kind::bytes().or_undefined(),
                        None,
                    )
                    .with_source_metadata(
                        source,
                        None,
                        &owned_value_path!("procid"),
                        Kind::integer().or_bytes().or_undefined(),
                        None,
                    )
                    .with_source_metadata(
                        source,
                        None,
                        &owned_value_path!("structured_data"),
                        Kind::object(Collection::from_unknown(Kind::object(
                            Collection::from_unknown(Kind::bytes()),
                        ))),
                        None,
                    )
                    .with_source_metadata(
                        source,
                        None,
                        &owned_value_path!("tls_client_metadata"),
                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
                            .or_undefined(),
                        None,
                    )
            }
        }
    }
}
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct SyslogDeserializerOptions {
    #[serde(
        default = "default_lossy",
        skip_serializing_if = "vector_core::serde::is_default"
    )]
    #[derivative(Default(value = "default_lossy()"))]
    pub lossy: bool,
}
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct SyslogDeserializer {
    pub source: Option<&'static str>,
    #[derivative(Default(value = "default_lossy()"))]
    lossy: bool,
}
impl Deserializer for SyslogDeserializer {
    fn parse(
        &self,
        bytes: Bytes,
        log_namespace: LogNamespace,
    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
        let line: Cow<str> = match self.lossy {
            true => String::from_utf8_lossy(&bytes),
            false => Cow::from(std::str::from_utf8(&bytes)?),
        };
        let line = line.trim();
        let parsed =
            syslog_loose::parse_message_with_year_exact(line, resolve_year, Variant::Either)?;
        let log = match (self.source, log_namespace) {
            (Some(source), LogNamespace::Vector) => {
                let mut log = LogEvent::from(Value::Bytes(Bytes::from(parsed.msg.to_string())));
                insert_metadata_fields_from_syslog(&mut log, source, parsed, log_namespace);
                log
            }
            _ => {
                let mut log = LogEvent::from(Value::Object(ObjectMap::new()));
                insert_fields_from_syslog(&mut log, parsed, log_namespace);
                log
            }
        };
        Ok(smallvec![Event::from(log)])
    }
}
fn resolve_year((month, _date, _hour, _min, _sec): IncompleteDate) -> i32 {
    let now = Utc::now();
    if now.month() == 1 && month == 12 {
        now.year() - 1
    } else {
        now.year()
    }
}
fn insert_metadata_fields_from_syslog(
    log: &mut LogEvent,
    source: &'static str,
    parsed: Message<&str>,
    log_namespace: LogNamespace,
) {
    if let Some(timestamp) = parsed.timestamp {
        let timestamp = DateTime::<Utc>::from(timestamp);
        log_namespace.insert_source_metadata(
            source,
            log,
            None::<LegacyKey<&OwnedValuePath>>,
            &owned_value_path!("timestamp"),
            timestamp,
        );
    }
    if let Some(host) = parsed.hostname {
        log_namespace.insert_source_metadata(
            source,
            log,
            None::<LegacyKey<&OwnedValuePath>>,
            &owned_value_path!("hostname"),
            host.to_string(),
        );
    }
    if let Some(severity) = parsed.severity {
        log_namespace.insert_source_metadata(
            source,
            log,
            None::<LegacyKey<&OwnedValuePath>>,
            &owned_value_path!("severity"),
            severity.as_str().to_owned(),
        );
    }
    if let Some(facility) = parsed.facility {
        log_namespace.insert_source_metadata(
            source,
            log,
            None::<LegacyKey<&OwnedValuePath>>,
            &owned_value_path!("facility"),
            facility.as_str().to_owned(),
        );
    }
    if let Protocol::RFC5424(version) = parsed.protocol {
        log_namespace.insert_source_metadata(
            source,
            log,
            None::<LegacyKey<&OwnedValuePath>>,
            &owned_value_path!("version"),
            version as i64,
        );
    }
    if let Some(app_name) = parsed.appname {
        log_namespace.insert_source_metadata(
            source,
            log,
            None::<LegacyKey<&OwnedValuePath>>,
            &owned_value_path!("appname"),
            app_name.to_owned(),
        );
    }
    if let Some(msg_id) = parsed.msgid {
        log_namespace.insert_source_metadata(
            source,
            log,
            None::<LegacyKey<&OwnedValuePath>>,
            &owned_value_path!("msgid"),
            msg_id.to_owned(),
        );
    }
    if let Some(procid) = parsed.procid {
        let value: Value = match procid {
            ProcId::PID(pid) => pid.into(),
            ProcId::Name(name) => name.to_string().into(),
        };
        log_namespace.insert_source_metadata(
            source,
            log,
            None::<LegacyKey<&OwnedValuePath>>,
            &owned_value_path!("procid"),
            value,
        );
    }
    let mut sdata = ObjectMap::new();
    for element in parsed.structured_data.into_iter() {
        let mut data = ObjectMap::new();
        for (name, value) in element.params() {
            data.insert(name.to_string().into(), value.into());
        }
        sdata.insert(element.id.into(), data.into());
    }
    log_namespace.insert_source_metadata(
        source,
        log,
        None::<LegacyKey<&OwnedValuePath>>,
        &owned_value_path!("structured_data"),
        sdata,
    );
}
fn insert_fields_from_syslog(
    log: &mut LogEvent,
    parsed: Message<&str>,
    log_namespace: LogNamespace,
) {
    match log_namespace {
        LogNamespace::Legacy => {
            log.maybe_insert(log_schema().message_key_target_path(), parsed.msg);
        }
        LogNamespace::Vector => {
            log.insert(event_path!("message"), parsed.msg);
        }
    }
    if let Some(timestamp) = parsed.timestamp {
        let timestamp = DateTime::<Utc>::from(timestamp);
        match log_namespace {
            LogNamespace::Legacy => {
                log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
            }
            LogNamespace::Vector => {
                log.insert(event_path!("timestamp"), timestamp);
            }
        };
    }
    if let Some(host) = parsed.hostname {
        log.insert(event_path!("hostname"), host.to_string());
    }
    if let Some(severity) = parsed.severity {
        log.insert(event_path!("severity"), severity.as_str().to_owned());
    }
    if let Some(facility) = parsed.facility {
        log.insert(event_path!("facility"), facility.as_str().to_owned());
    }
    if let Protocol::RFC5424(version) = parsed.protocol {
        log.insert(event_path!("version"), version as i64);
    }
    if let Some(app_name) = parsed.appname {
        log.insert(event_path!("appname"), app_name.to_owned());
    }
    if let Some(msg_id) = parsed.msgid {
        log.insert(event_path!("msgid"), msg_id.to_owned());
    }
    if let Some(procid) = parsed.procid {
        let value: Value = match procid {
            ProcId::PID(pid) => pid.into(),
            ProcId::Name(name) => name.to_string().into(),
        };
        log.insert(event_path!("procid"), value);
    }
    for element in parsed.structured_data.into_iter() {
        let mut sdata = ObjectMap::new();
        for (name, value) in element.params() {
            sdata.insert(name.to_string().into(), value.into());
        }
        log.insert(event_path!(element.id), sdata);
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use vector_core::config::{init_log_schema, log_schema, LogSchema};
    #[test]
    fn deserialize_syslog_legacy_namespace() {
        init();
        let input =
            Bytes::from("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - MSG");
        let deserializer = SyslogDeserializer::default();
        let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
        assert_eq!(events.len(), 1);
        assert_eq!(
            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
            "MSG".into()
        );
        assert!(
            events[0].as_log()[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()
        );
    }
    #[test]
    fn deserialize_syslog_vector_namespace() {
        init();
        let input =
            Bytes::from("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - MSG");
        let deserializer = SyslogDeserializer::default();
        let events = deserializer.parse(input, LogNamespace::Vector).unwrap();
        assert_eq!(events.len(), 1);
        assert_eq!(events[0].as_log()["message"], "MSG".into());
        assert!(events[0].as_log()["timestamp"].is_timestamp());
    }
    fn init() {
        let mut schema = LogSchema::default();
        schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
            "legacy_message"
        ))));
        schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
            "legacy_timestamp"
        ))));
        init_log_schema(schema, false);
    }
}