use crate::encoding::BuildError;
use bytes::BytesMut;
use chrono::SecondsFormat;
use csv_core::{WriteResult, Writer, WriterBuilder};
use lookup::lookup_v2::ConfigTargetPath;
use tokio_util::codec::Encoder;
use vector_config_macros::configurable_component;
use vector_core::{
    config::DataType,
    event::{Event, Value},
    schema,
};
#[configurable_component]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum QuoteStyle {
    Always,
    #[default]
    Necessary,
    NonNumeric,
    Never,
}
#[configurable_component]
#[derive(Debug, Clone)]
pub struct CsvSerializerConfig {
    pub csv: CsvSerializerOptions,
}
impl CsvSerializerConfig {
    pub const fn new(csv: CsvSerializerOptions) -> Self {
        Self { csv }
    }
    pub fn build(&self) -> Result<CsvSerializer, BuildError> {
        if self.csv.fields.is_empty() {
            Err("At least one CSV field must be specified".into())
        } else {
            Ok(CsvSerializer::new(self.clone()))
        }
    }
    pub fn input_type(&self) -> DataType {
        DataType::Log
    }
    pub fn schema_requirement(&self) -> schema::Requirement {
        schema::Requirement::empty()
    }
}
#[configurable_component]
#[derive(Debug, Clone)]
pub struct CsvSerializerOptions {
    #[configurable(metadata(docs::type_override = "ascii_char"))]
    #[serde(
        default = "default_delimiter",
        with = "vector_core::serde::ascii_char",
        skip_serializing_if = "vector_core::serde::is_default"
    )]
    pub delimiter: u8,
    #[serde(
        default = "default_double_quote",
        skip_serializing_if = "vector_core::serde::is_default"
    )]
    pub double_quote: bool,
    #[configurable(metadata(docs::type_override = "ascii_char"))]
    #[serde(
        default = "default_escape",
        with = "vector_core::serde::ascii_char",
        skip_serializing_if = "vector_core::serde::is_default"
    )]
    pub escape: u8,
    #[configurable(metadata(docs::type_override = "ascii_char"))]
    #[serde(
        default = "default_escape",
        with = "vector_core::serde::ascii_char",
        skip_serializing_if = "vector_core::serde::is_default"
    )]
    quote: u8,
    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
    pub quote_style: QuoteStyle,
    #[serde(default = "default_capacity")]
    pub capacity: usize,
    pub fields: Vec<ConfigTargetPath>,
}
const fn default_delimiter() -> u8 {
    b','
}
const fn default_escape() -> u8 {
    b'"'
}
const fn default_double_quote() -> bool {
    true
}
const fn default_capacity() -> usize {
    8 * (1 << 10)
}
impl Default for CsvSerializerOptions {
    fn default() -> Self {
        Self {
            delimiter: default_delimiter(),
            double_quote: default_double_quote(),
            escape: default_escape(),
            quote: default_escape(),
            quote_style: QuoteStyle::default(),
            capacity: default_capacity(),
            fields: Vec::new(),
        }
    }
}
impl CsvSerializerOptions {
    fn csv_quote_style(&self) -> csv_core::QuoteStyle {
        match self.quote_style {
            QuoteStyle::Always => csv_core::QuoteStyle::Always,
            QuoteStyle::Necessary => csv_core::QuoteStyle::Necessary,
            QuoteStyle::NonNumeric => csv_core::QuoteStyle::NonNumeric,
            QuoteStyle::Never => csv_core::QuoteStyle::Never,
        }
    }
}
#[derive(Debug, Clone)]
pub struct CsvSerializer {
    writer: Box<Writer>,
    fields: Vec<ConfigTargetPath>,
    internal_buffer: Vec<u8>,
}
impl CsvSerializer {
    pub fn new(config: CsvSerializerConfig) -> Self {
        let writer = Box::new(
            WriterBuilder::new()
                .delimiter(config.csv.delimiter)
                .double_quote(config.csv.double_quote)
                .escape(config.csv.escape)
                .quote_style(config.csv.csv_quote_style())
                .quote(config.csv.quote)
                .build(),
        );
        let internal_buffer = if config.csv.capacity < 1 {
            vec![0; 1]
        } else {
            vec![0; config.csv.capacity]
        };
        Self {
            writer,
            internal_buffer,
            fields: config.csv.fields,
        }
    }
}
impl Encoder<Event> for CsvSerializer {
    type Error = vector_common::Error;
    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
        let log = event.into_log();
        let mut used_buffer_bytes = 0;
        for (fields_written, field) in self.fields.iter().enumerate() {
            let field_value = log.get(field);
            if fields_written > 0 {
                loop {
                    let (res, bytes_written) = self
                        .writer
                        .delimiter(&mut self.internal_buffer[used_buffer_bytes..]);
                    used_buffer_bytes += bytes_written;
                    match res {
                        WriteResult::InputEmpty => {
                            break;
                        }
                        WriteResult::OutputFull => {
                            buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
                            used_buffer_bytes = 0;
                        }
                    }
                }
            }
            let field_value = match field_value {
                Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).into_owned(),
                Some(Value::Integer(int)) => int.to_string(),
                Some(Value::Float(float)) => float.to_string(),
                Some(Value::Boolean(bool)) => bool.to_string(),
                Some(Value::Timestamp(timestamp)) => {
                    timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true)
                }
                Some(Value::Null) => String::new(),
                Some(_) => String::new(),
                None => String::new(),
            };
            let mut field_value = field_value.as_bytes();
            loop {
                let (res, bytes_read, bytes_written) = self
                    .writer
                    .field(field_value, &mut self.internal_buffer[used_buffer_bytes..]);
                field_value = &field_value[bytes_read..];
                used_buffer_bytes += bytes_written;
                match res {
                    WriteResult::InputEmpty => break,
                    WriteResult::OutputFull => {
                        buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
                        used_buffer_bytes = 0;
                    }
                }
            }
        }
        loop {
            let (res, bytes_written) = self
                .writer
                .finish(&mut self.internal_buffer[used_buffer_bytes..]);
            used_buffer_bytes += bytes_written;
            match res {
                WriteResult::InputEmpty => break,
                WriteResult::OutputFull => {
                    buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
                    used_buffer_bytes = 0;
                }
            }
        }
        if used_buffer_bytes > 0 {
            buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
        }
        Ok(())
    }
}
#[cfg(test)]
mod tests {
    use bytes::BytesMut;
    use chrono::DateTime;
    use ordered_float::NotNan;
    use vector_common::btreemap;
    use vector_core::event::{LogEvent, ObjectMap, Value};
    use super::*;
    fn make_event_with_fields(field_data: Vec<(&str, &str)>) -> (Vec<ConfigTargetPath>, Event) {
        let mut fields: Vec<ConfigTargetPath> = std::vec::Vec::new();
        let mut tree = ObjectMap::new();
        for (field_name, field_value) in field_data.into_iter() {
            let field = field_name.into();
            fields.push(field);
            let field_value = Value::from(field_value.to_string());
            tree.insert(field_name.into(), field_value);
        }
        let event = Event::Log(LogEvent::from(tree));
        (fields, event)
    }
    #[test]
    fn build_error_on_empty_fields() {
        let opts = CsvSerializerOptions::default();
        let config = CsvSerializerConfig::new(opts);
        let err = config.build().unwrap_err();
        assert_eq!(err.to_string(), "At least one CSV field must be specified");
    }
    #[test]
    fn serialize_fields() {
        let event = Event::Log(LogEvent::from(btreemap! {
            "foo" => Value::from("bar"),
            "int" => Value::from(123),
            "comma" => Value::from("abc,bcd"),
            "float" => Value::Float(NotNan::new(3.1415925).unwrap()),
            "space" => Value::from("sp ace"),
            "time" => Value::Timestamp(DateTime::parse_from_rfc3339("2023-02-27T15:04:49.363+08:00").unwrap().into()),
            "quote" => Value::from("the \"quote\" should be escaped"),
            "bool" => Value::from(true),
            "other" => Value::from("data"),
        }));
        let fields = vec![
            "foo".into(),
            "int".into(),
            "comma".into(),
            "float".into(),
            "missing".into(),
            "space".into(),
            "time".into(),
            "quote".into(),
            "bool".into(),
        ];
        let opts = CsvSerializerOptions {
            fields,
            ..Default::default()
        };
        let config = CsvSerializerConfig::new(opts);
        let mut serializer = config.build().unwrap();
        let mut bytes = BytesMut::new();
        serializer.encode(event, &mut bytes).unwrap();
        assert_eq!(
            bytes.freeze(),
            b"bar,123,\"abc,bcd\",3.1415925,,sp ace,2023-02-27T07:04:49.363Z,\"the \"\"quote\"\" should be escaped\",true".as_slice()
        );
    }
    #[test]
    fn serialize_order() {
        let event = Event::Log(LogEvent::from(btreemap! {
            "field1" => Value::from("value1"),
            "field2" => Value::from("value2"),
            "field3" => Value::from("value3"),
            "field4" => Value::from("value4"),
            "field5" => Value::from("value5"),
        }));
        let fields = vec![
            "field1".into(),
            "field5".into(),
            "field5".into(),
            "field3".into(),
            "field2".into(),
        ];
        let opts = CsvSerializerOptions {
            fields,
            ..Default::default()
        };
        let config = CsvSerializerConfig::new(opts);
        let mut serializer = config.build().unwrap();
        let mut bytes = BytesMut::new();
        serializer.encode(event, &mut bytes).unwrap();
        assert_eq!(
            bytes.freeze(),
            b"value1,value5,value5,value3,value2".as_slice()
        );
    }
    #[test]
    fn correct_quoting() {
        let event = Event::Log(LogEvent::from(btreemap! {
            "field1" => Value::from("hello world"),
            "field2" => Value::from(1),
            "field3" => Value::from("foo\"bar"),
            "field4" => Value::from("baz,bas"),
        }));
        let fields = vec![
            "field1".into(),
            "field2".into(),
            "field3".into(),
            "field4".into(),
        ];
        let mut default_bytes = BytesMut::new();
        let mut never_bytes = BytesMut::new();
        let mut always_bytes = BytesMut::new();
        let mut non_numeric_bytes = BytesMut::new();
        CsvSerializerConfig::new(CsvSerializerOptions {
            fields: fields.clone(),
            ..Default::default()
        })
        .build()
        .unwrap()
        .encode(event.clone(), &mut default_bytes)
        .unwrap();
        CsvSerializerConfig::new(CsvSerializerOptions {
            fields: fields.clone(),
            quote_style: QuoteStyle::Never,
            ..Default::default()
        })
        .build()
        .unwrap()
        .encode(event.clone(), &mut never_bytes)
        .unwrap();
        CsvSerializerConfig::new(CsvSerializerOptions {
            fields: fields.clone(),
            quote_style: QuoteStyle::Always,
            ..Default::default()
        })
        .build()
        .unwrap()
        .encode(event.clone(), &mut always_bytes)
        .unwrap();
        CsvSerializerConfig::new(CsvSerializerOptions {
            fields: fields.clone(),
            quote_style: QuoteStyle::NonNumeric,
            ..Default::default()
        })
        .build()
        .unwrap()
        .encode(event.clone(), &mut non_numeric_bytes)
        .unwrap();
        assert_eq!(
            default_bytes.freeze(),
            b"hello world,1,\"foo\"\"bar\",\"baz,bas\"".as_slice()
        );
        assert_eq!(
            never_bytes.freeze(),
            b"hello world,1,foo\"bar,baz,bas".as_slice()
        );
        assert_eq!(
            always_bytes.freeze(),
            b"\"hello world\",\"1\",\"foo\"\"bar\",\"baz,bas\"".as_slice()
        );
        assert_eq!(
            non_numeric_bytes.freeze(),
            b"\"hello world\",1,\"foo\"\"bar\",\"baz,bas\"".as_slice()
        );
    }
    #[test]
    fn custom_delimiter() {
        let (fields, event) =
            make_event_with_fields(vec![("field1", "value1"), ("field2", "value2")]);
        let opts = CsvSerializerOptions {
            fields,
            delimiter: b'\t',
            ..Default::default()
        };
        let config = CsvSerializerConfig::new(opts);
        let mut serializer = config.build().unwrap();
        let mut bytes = BytesMut::new();
        serializer.encode(event, &mut bytes).unwrap();
        assert_eq!(bytes.freeze(), b"value1\tvalue2".as_slice());
    }
    #[test]
    fn custom_escape_char() {
        let (fields, event) = make_event_with_fields(vec![("field1", "foo\"bar")]);
        let opts = CsvSerializerOptions {
            fields,
            double_quote: false,
            escape: b'\\',
            ..Default::default()
        };
        let config = CsvSerializerConfig::new(opts);
        let mut serializer = config.build().unwrap();
        let mut bytes = BytesMut::new();
        serializer.encode(event, &mut bytes).unwrap();
        assert_eq!(bytes.freeze(), b"\"foo\\\"bar\"".as_slice());
    }
    #[test]
    fn custom_quote_char() {
        let (fields, event) = make_event_with_fields(vec![("field1", "foo \" $ bar")]);
        let opts = CsvSerializerOptions {
            fields,
            quote: b'$',
            ..Default::default()
        };
        let config = CsvSerializerConfig::new(opts);
        let mut serializer = config.build().unwrap();
        let mut bytes = BytesMut::new();
        serializer.encode(event, &mut bytes).unwrap();
        assert_eq!(bytes.freeze(), b"$foo \" $$ bar$".as_slice());
    }
    #[test]
    fn more_input_then_capacity() {
        let (fields, event) = make_event_with_fields(vec![("field1", "foo bar")]);
        let opts = CsvSerializerOptions {
            fields,
            capacity: 3,
            ..Default::default()
        };
        let config = CsvSerializerConfig::new(opts);
        let mut serializer = config.build().unwrap();
        let mut bytes = BytesMut::new();
        serializer.encode(event, &mut bytes).unwrap();
        assert_eq!(bytes.freeze(), b"foo bar".as_slice());
    }
    #[test]
    fn multiple_events() {
        let (fields, event1) = make_event_with_fields(vec![("field1", "foo,")]);
        let (_, event2) = make_event_with_fields(vec![("field1", "\nbar")]);
        let opts = CsvSerializerOptions {
            fields,
            ..Default::default()
        };
        let config = CsvSerializerConfig::new(opts);
        let mut serializer = config.build().unwrap();
        let mut bytes = BytesMut::new();
        serializer.encode(event1, &mut bytes).unwrap();
        serializer.encode(event2, &mut bytes).unwrap();
        assert_eq!(bytes.freeze(), b"\"foo,\"\"\nbar\"".as_slice());
    }
}