use bytes::BytesMut;
use tokio_util::codec::Encoder as _;
use vector_lib::codecs::{
    encoding::{Error, Framer, Serializer},
    CharacterDelimitedEncoder, NewlineDelimitedEncoder, TextSerializerConfig,
};
use crate::{
    event::Event,
    internal_events::{EncoderFramingError, EncoderSerializeError},
};
#[derive(Debug, Clone)]
pub struct Encoder<Framer>
where
    Framer: Clone,
{
    framer: Framer,
    serializer: Serializer,
}
impl Default for Encoder<Framer> {
    fn default() -> Self {
        Self {
            framer: NewlineDelimitedEncoder::default().into(),
            serializer: TextSerializerConfig::default().build().into(),
        }
    }
}
impl Default for Encoder<()> {
    fn default() -> Self {
        Self {
            framer: (),
            serializer: TextSerializerConfig::default().build().into(),
        }
    }
}
impl<Framer> Encoder<Framer>
where
    Framer: Clone,
{
    pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
        let len = buffer.len();
        let mut payload = buffer.split_off(len);
        self.serialize_at_start(event, &mut payload)?;
        buffer.unsplit(payload);
        Ok(())
    }
    fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
        self.serializer.encode(event, buffer).map_err(|error| {
            emit!(EncoderSerializeError { error: &error });
            Error::SerializingError(error)
        })
    }
}
impl Encoder<Framer> {
    pub const fn new(framer: Framer, serializer: Serializer) -> Self {
        Self { framer, serializer }
    }
    pub const fn framer(&self) -> &Framer {
        &self.framer
    }
    pub const fn serializer(&self) -> &Serializer {
        &self.serializer
    }
    pub const fn batch_prefix(&self) -> &[u8] {
        match (&self.framer, &self.serializer) {
            (
                Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
                Serializer::Json(_) | Serializer::NativeJson(_),
            ) => b"[",
            _ => &[],
        }
    }
    pub const fn batch_suffix(&self) -> &[u8] {
        match (&self.framer, &self.serializer) {
            (
                Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
                Serializer::Json(_) | Serializer::NativeJson(_),
            ) => b"]",
            _ => &[],
        }
    }
    pub const fn content_type(&self) -> &'static str {
        match (&self.serializer, &self.framer) {
            (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
                "application/x-ndjson"
            }
            (
                Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
                Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
            ) => "application/json",
            (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
            (
                Serializer::Avro(_)
                | Serializer::Cef(_)
                | Serializer::Csv(_)
                | Serializer::Gelf(_)
                | Serializer::Json(_)
                | Serializer::Logfmt(_)
                | Serializer::NativeJson(_)
                | Serializer::RawMessage(_)
                | Serializer::Text(_),
                _,
            ) => "text/plain",
        }
    }
}
impl Encoder<()> {
    pub const fn new(serializer: Serializer) -> Self {
        Self {
            framer: (),
            serializer,
        }
    }
    pub const fn serializer(&self) -> &Serializer {
        &self.serializer
    }
}
impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
    type Error = Error;
    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
        let len = buffer.len();
        let mut payload = buffer.split_off(len);
        self.serialize_at_start(event, &mut payload)?;
        self.framer.encode((), &mut payload).map_err(|error| {
            emit!(EncoderFramingError { error: &error });
            Error::FramingError(error)
        })?;
        buffer.unsplit(payload);
        Ok(())
    }
}
impl tokio_util::codec::Encoder<Event> for Encoder<()> {
    type Error = Error;
    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
        let len = buffer.len();
        let mut payload = buffer.split_off(len);
        self.serialize_at_start(event, &mut payload)?;
        buffer.unsplit(payload);
        Ok(())
    }
}
#[cfg(test)]
mod tests {
    use bytes::BufMut;
    use futures_util::{SinkExt, StreamExt};
    use tokio_util::codec::FramedWrite;
    use vector_lib::codecs::encoding::BoxedFramingError;
    use vector_lib::event::LogEvent;
    use super::*;
    #[derive(Debug, Clone)]
    struct ParenEncoder;
    impl ParenEncoder {
        pub(super) const fn new() -> Self {
            Self
        }
    }
    impl tokio_util::codec::Encoder<()> for ParenEncoder {
        type Error = BoxedFramingError;
        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
            dst.reserve(2);
            let inner = dst.split();
            dst.put_u8(b'(');
            dst.unsplit(inner);
            dst.put_u8(b')');
            Ok(())
        }
    }
    #[derive(Debug, Clone)]
    struct ErrorNthEncoder<T>(T, usize, usize)
    where
        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
    impl<T> ErrorNthEncoder<T>
    where
        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
    {
        pub(super) const fn new(encoder: T, n: usize) -> Self {
            Self(encoder, 0, n)
        }
    }
    impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
    where
        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
    {
        type Error = BoxedFramingError;
        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
            self.0.encode((), dst)?;
            let result = if self.1 == self.2 {
                Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, "error")) as _)
            } else {
                Ok(())
            };
            self.1 += 1;
            result
        }
    }
    #[tokio::test]
    async fn test_encode_events_sink_empty() {
        let encoder = Encoder::<Framer>::new(
            Framer::Boxed(Box::new(ParenEncoder::new())),
            TextSerializerConfig::default().build().into(),
        );
        let source = futures::stream::iter(vec![
            Event::Log(LogEvent::from("foo")),
            Event::Log(LogEvent::from("bar")),
            Event::Log(LogEvent::from("baz")),
        ])
        .map(Ok);
        let sink = Vec::new();
        let mut framed = FramedWrite::new(sink, encoder);
        source.forward(&mut framed).await.unwrap();
        let sink = framed.into_inner();
        assert_eq!(sink, b"(foo)(bar)(baz)");
    }
    #[tokio::test]
    async fn test_encode_events_sink_non_empty() {
        let encoder = Encoder::<Framer>::new(
            Framer::Boxed(Box::new(ParenEncoder::new())),
            TextSerializerConfig::default().build().into(),
        );
        let source = futures::stream::iter(vec![
            Event::Log(LogEvent::from("bar")),
            Event::Log(LogEvent::from("baz")),
            Event::Log(LogEvent::from("bat")),
        ])
        .map(Ok);
        let sink = Vec::from("(foo)");
        let mut framed = FramedWrite::new(sink, encoder);
        source.forward(&mut framed).await.unwrap();
        let sink = framed.into_inner();
        assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
    }
    #[tokio::test]
    async fn test_encode_events_sink_empty_handle_framing_error() {
        let encoder = Encoder::<Framer>::new(
            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
            TextSerializerConfig::default().build().into(),
        );
        let source = futures::stream::iter(vec![
            Event::Log(LogEvent::from("foo")),
            Event::Log(LogEvent::from("bar")),
            Event::Log(LogEvent::from("baz")),
        ])
        .map(Ok);
        let sink = Vec::new();
        let mut framed = FramedWrite::new(sink, encoder);
        assert!(source.forward(&mut framed).await.is_err());
        framed.flush().await.unwrap();
        let sink = framed.into_inner();
        assert_eq!(sink, b"(foo)");
    }
    #[tokio::test]
    async fn test_encode_events_sink_non_empty_handle_framing_error() {
        let encoder = Encoder::<Framer>::new(
            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
            TextSerializerConfig::default().build().into(),
        );
        let source = futures::stream::iter(vec![
            Event::Log(LogEvent::from("bar")),
            Event::Log(LogEvent::from("baz")),
            Event::Log(LogEvent::from("bat")),
        ])
        .map(Ok);
        let sink = Vec::from("(foo)");
        let mut framed = FramedWrite::new(sink, encoder);
        assert!(source.forward(&mut framed).await.is_err());
        framed.flush().await.unwrap();
        let sink = framed.into_inner();
        assert_eq!(sink, b"(foo)(bar)");
    }
}