use bytes::{Bytes, BytesMut};
use derivative::Derivative;
use tokio_util::codec::Decoder;
use vector_config::configurable_component;
use crate::common::length_delimited::LengthDelimitedCoderOptions;
use super::BoxedFramingError;
#[configurable_component]
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct LengthDelimitedDecoderConfig {
    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
    pub length_delimited: LengthDelimitedCoderOptions,
}
impl LengthDelimitedDecoderConfig {
    pub fn build(&self) -> LengthDelimitedDecoder {
        LengthDelimitedDecoder::new(&self.length_delimited)
    }
}
#[derive(Debug, Clone)]
pub struct LengthDelimitedDecoder(tokio_util::codec::LengthDelimitedCodec);
impl LengthDelimitedDecoder {
    pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
        Self(config.build_codec())
    }
}
impl Default for LengthDelimitedDecoder {
    fn default() -> Self {
        Self(tokio_util::codec::LengthDelimitedCodec::new())
    }
}
impl Decoder for LengthDelimitedDecoder {
    type Item = Bytes;
    type Error = BoxedFramingError;
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        self.0
            .decode(src)
            .map(|bytes| bytes.map(BytesMut::freeze))
            .map_err(Into::into)
    }
    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        self.0
            .decode_eof(src)
            .map(|bytes| bytes.map(BytesMut::freeze))
            .map_err(Into::into)
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn decode_frame() {
        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
        let mut decoder = LengthDelimitedDecoder::default();
        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
        assert_eq!(decoder.decode(&mut input).unwrap(), None);
    }
    #[test]
    fn decode_frame_2byte_length() {
        let mut input = BytesMut::from("\x00\x03foo");
        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
            length_field_length: 2,
            ..Default::default()
        });
        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
        assert_eq!(decoder.decode(&mut input).unwrap(), None);
    }
    #[test]
    fn decode_frame_little_endian() {
        let mut input = BytesMut::from("\x03\x00\x00\x00foo");
        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
            length_field_is_big_endian: false,
            ..Default::default()
        });
        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
        assert_eq!(decoder.decode(&mut input).unwrap(), None);
    }
    #[test]
    fn decode_frame_2byte_length_with_offset() {
        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
            length_field_length: 2,
            length_field_offset: 2,
            ..Default::default()
        });
        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
        assert_eq!(decoder.decode(&mut input).unwrap(), None);
    }
    #[test]
    fn decode_frame_ignore_unexpected_eof() {
        let mut input = BytesMut::from("\x00\x00\x00\x03fo");
        let mut decoder = LengthDelimitedDecoder::default();
        assert_eq!(decoder.decode(&mut input).unwrap(), None);
    }
    #[test]
    fn decode_frame_ignore_exceeding_bytes_without_header() {
        let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
        let mut decoder = LengthDelimitedDecoder::default();
        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
        assert_eq!(decoder.decode(&mut input).unwrap(), None);
    }
    #[test]
    fn decode_frame_ignore_missing_header() {
        let mut input = BytesMut::from("foo");
        let mut decoder = LengthDelimitedDecoder::default();
        assert_eq!(decoder.decode(&mut input).unwrap(), None);
    }
    #[test]
    fn decode_frames() {
        let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
        let mut decoder = LengthDelimitedDecoder::default();
        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
        assert_eq!(decoder.decode(&mut input).unwrap(), None);
    }
    #[test]
    fn decode_eof_frame() {
        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
        let mut decoder = LengthDelimitedDecoder::default();
        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
    }
    #[test]
    fn decode_eof_frame_unexpected_eof() {
        let mut input = BytesMut::from("\x00\x00\x00\x03fo");
        let mut decoder = LengthDelimitedDecoder::default();
        assert!(decoder.decode_eof(&mut input).is_err());
    }
    #[test]
    fn decode_eof_frame_exceeding_bytes_without_header() {
        let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
        let mut decoder = LengthDelimitedDecoder::default();
        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
        assert!(decoder.decode_eof(&mut input).is_err());
    }
    #[test]
    fn decode_eof_frame_missing_header() {
        let mut input = BytesMut::from("foo");
        let mut decoder = LengthDelimitedDecoder::default();
        assert!(decoder.decode_eof(&mut input).is_err());
    }
    #[test]
    fn decode_eof_frames() {
        let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
        let mut decoder = LengthDelimitedDecoder::default();
        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
    }
}