use std::path::PathBuf;
use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
use prost_reflect::{DynamicMessage, MessageDescriptor};
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::event::LogEvent;
use vector_core::{
    config::{log_schema, DataType, LogNamespace},
    event::Event,
    schema,
};
use vrl::value::Kind;
use super::Deserializer;
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct ProtobufDeserializerConfig {
    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
    pub protobuf: ProtobufDeserializerOptions,
}
impl ProtobufDeserializerConfig {
    pub fn build(&self) -> vector_common::Result<ProtobufDeserializer> {
        ProtobufDeserializer::try_from(self)
    }
    pub fn output_type(&self) -> DataType {
        DataType::Log
    }
    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
        match log_namespace {
            LogNamespace::Legacy => {
                let mut definition =
                    schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any());
                if let Some(timestamp_key) = log_schema().timestamp_key() {
                    definition = definition.try_with_field(
                        timestamp_key,
                        Kind::any().or_timestamp(),
                        Some("timestamp"),
                    );
                }
                definition
            }
            LogNamespace::Vector => {
                schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
            }
        }
    }
}
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct ProtobufDeserializerOptions {
    pub desc_file: PathBuf,
    pub message_type: String,
}
#[derive(Debug, Clone)]
pub struct ProtobufDeserializer {
    message_descriptor: MessageDescriptor,
}
impl ProtobufDeserializer {
    pub fn new(message_descriptor: MessageDescriptor) -> Self {
        Self { message_descriptor }
    }
}
impl Deserializer for ProtobufDeserializer {
    fn parse(
        &self,
        bytes: Bytes,
        log_namespace: LogNamespace,
    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
        let dynamic_message = DynamicMessage::decode(self.message_descriptor.clone(), bytes)
            .map_err(|error| format!("Error parsing protobuf: {:?}", error))?;
        let proto_vrl =
            vrl::protobuf::proto_to_value(&prost_reflect::Value::Message(dynamic_message), None)?;
        let mut event = Event::Log(LogEvent::from(proto_vrl));
        let event = match log_namespace {
            LogNamespace::Vector => event,
            LogNamespace::Legacy => {
                let timestamp = Utc::now();
                if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
                    let log = event.as_mut_log();
                    if !log.contains(timestamp_key) {
                        log.insert(timestamp_key, timestamp);
                    }
                }
                event
            }
        };
        Ok(smallvec![event])
    }
}
impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
    type Error = vector_common::Error;
    fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result<Self> {
        let message_descriptor = vrl::protobuf::get_message_descriptor(
            &config.protobuf.desc_file,
            &config.protobuf.message_type,
        )?;
        Ok(Self::new(message_descriptor))
    }
}
#[cfg(test)]
mod tests {
    use std::path::PathBuf;
    use std::{env, fs};
    use vector_core::config::log_schema;
    use super::*;
    fn test_data_dir() -> PathBuf {
        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
    }
    fn parse_and_validate(
        protobuf_bin_message: String,
        protobuf_desc_path: PathBuf,
        message_type: &str,
        validate_log: fn(&LogEvent),
    ) {
        let input = Bytes::from(protobuf_bin_message);
        let message_descriptor =
            vrl::protobuf::get_message_descriptor(&protobuf_desc_path, message_type).unwrap();
        let deserializer = ProtobufDeserializer::new(message_descriptor);
        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
            let events = deserializer.parse(input.clone(), namespace).unwrap();
            let mut events = events.into_iter();
            {
                let event = events.next().unwrap();
                let log = event.as_log();
                validate_log(log);
                assert_eq!(
                    log.get(log_schema().timestamp_key_target_path().unwrap())
                        .is_some(),
                    namespace == LogNamespace::Legacy
                );
            }
            assert_eq!(events.next(), None);
        }
    }
    #[test]
    fn deserialize_protobuf() {
        let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone.pb");
        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
        let message_type = "test_protobuf.Person";
        let validate_log = |log: &LogEvent| {
            assert_eq!(log["name"], "someone".into());
            assert_eq!(
                log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
                    .as_str()
                    .unwrap(),
                "123456"
            );
        };
        parse_and_validate(
            fs::read_to_string(protobuf_bin_message_path).unwrap(),
            protobuf_desc_path,
            message_type,
            validate_log,
        );
    }
    #[test]
    fn deserialize_protobuf3() {
        let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone3.pb");
        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf3.desc");
        let message_type = "test_protobuf3.Person";
        let validate_log = |log: &LogEvent| {
            assert_eq!(log["name"], "someone".into());
            assert_eq!(
                log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
                    .as_str()
                    .unwrap(),
                "1234"
            );
            assert_eq!(
                log["data"].as_object().unwrap()["data_phone"],
                "HOME".into()
            );
        };
        parse_and_validate(
            fs::read_to_string(protobuf_bin_message_path).unwrap(),
            protobuf_desc_path,
            message_type,
            validate_log,
        );
    }
    #[test]
    fn deserialize_empty_buffer() {
        let protobuf_bin_message = "".to_string();
        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
        let message_type = "test_protobuf.Person";
        let validate_log = |log: &LogEvent| {
            assert!(!log.contains("name"));
            assert!(!log.contains("id"));
            assert!(!log.contains("email"));
            assert!(!log.contains("phones"));
        };
        parse_and_validate(
            protobuf_bin_message,
            protobuf_desc_path,
            message_type,
            validate_log,
        );
    }
    #[test]
    fn deserialize_error_invalid_protobuf() {
        let input = Bytes::from("{ foo");
        let message_descriptor = vrl::protobuf::get_message_descriptor(
            &test_data_dir().join("protos/test_protobuf.desc"),
            "test_protobuf.Person",
        )
        .unwrap();
        let deserializer = ProtobufDeserializer::new(message_descriptor);
        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
            assert!(deserializer.parse(input.clone(), namespace).is_err());
        }
    }
}