use crate::encoding::BuildError;
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use tokio_util::codec::Encoder;
use vector_config::configurable_component;
use vector_core::{config::DataType, event::Event, schema};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AvroSerializerConfig {
    pub avro: AvroSerializerOptions,
}
impl AvroSerializerConfig {
    pub const fn new(schema: String) -> Self {
        Self {
            avro: AvroSerializerOptions { schema },
        }
    }
    pub fn build(&self) -> Result<AvroSerializer, BuildError> {
        let schema = apache_avro::Schema::parse_str(&self.avro.schema)
            .map_err(|error| format!("Failed building Avro serializer: {}", error))?;
        Ok(AvroSerializer { schema })
    }
    pub fn input_type(&self) -> DataType {
        DataType::Log
    }
    pub fn schema_requirement(&self) -> schema::Requirement {
        schema::Requirement::empty()
    }
}
#[configurable_component]
#[derive(Clone, Debug)]
pub struct AvroSerializerOptions {
    #[configurable(metadata(
        docs::examples = r#"{ "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] }"#
    ))]
    #[configurable(metadata(docs::human_name = "Schema JSON"))]
    pub schema: String,
}
#[derive(Debug, Clone)]
pub struct AvroSerializer {
    schema: apache_avro::Schema,
}
impl AvroSerializer {
    pub const fn new(schema: apache_avro::Schema) -> Self {
        Self { schema }
    }
}
impl Encoder<Event> for AvroSerializer {
    type Error = vector_common::Error;
    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
        let log = event.into_log();
        let value = apache_avro::to_value(log)?;
        let value = value.resolve(&self.schema)?;
        let bytes = apache_avro::to_avro_datum(&self.schema, value)?;
        buffer.put_slice(&bytes);
        Ok(())
    }
}
#[cfg(test)]
mod tests {
    use bytes::BytesMut;
    use indoc::indoc;
    use vector_core::event::{LogEvent, Value};
    use vrl::btreemap;
    use super::*;
    #[test]
    fn serialize_avro() {
        let event = Event::Log(LogEvent::from(btreemap! {
            "foo" => Value::from("bar")
        }));
        let schema = indoc! {r#"
            {
                "type": "record",
                "name": "Log",
                "fields": [
                    {
                        "name": "foo",
                        "type": ["string"]
                    }
                ]
            }
        "#}
        .to_owned();
        let config = AvroSerializerConfig::new(schema);
        let mut serializer = config.build().unwrap();
        let mut bytes = BytesMut::new();
        serializer.encode(event, &mut bytes).unwrap();
        assert_eq!(bytes.freeze(), b"\0\x06bar".as_slice());
    }
}