pub mod format;
pub mod framing;
use std::fmt::Debug;
use bytes::BytesMut;
pub use format::{
    AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
    CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig,
    JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer,
    LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
    NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig,
    ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer,
    TextSerializerConfig,
};
pub use framing::{
    BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder,
    CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, LengthDelimitedEncoder,
    LengthDelimitedEncoderConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig,
};
use vector_config::configurable_component;
use vector_core::{config::DataType, event::Event, schema};
pub type BuildError = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(Debug)]
pub enum Error {
    FramingError(BoxedFramingError),
    SerializingError(vector_common::Error),
}
impl std::fmt::Display for Error {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::FramingError(error) => write!(formatter, "FramingError({})", error),
            Self::SerializingError(error) => write!(formatter, "SerializingError({})", error),
        }
    }
}
impl std::error::Error for Error {}
impl From<std::io::Error> for Error {
    fn from(error: std::io::Error) -> Self {
        Self::FramingError(Box::new(error))
    }
}
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
#[serde(tag = "method", rename_all = "snake_case")]
#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
pub enum FramingConfig {
    Bytes,
    CharacterDelimited(CharacterDelimitedEncoderConfig),
    LengthDelimited(LengthDelimitedEncoderConfig),
    NewlineDelimited,
}
impl From<BytesEncoderConfig> for FramingConfig {
    fn from(_: BytesEncoderConfig) -> Self {
        Self::Bytes
    }
}
impl From<CharacterDelimitedEncoderConfig> for FramingConfig {
    fn from(config: CharacterDelimitedEncoderConfig) -> Self {
        Self::CharacterDelimited(config)
    }
}
impl From<LengthDelimitedEncoderConfig> for FramingConfig {
    fn from(config: LengthDelimitedEncoderConfig) -> Self {
        Self::LengthDelimited(config)
    }
}
impl From<NewlineDelimitedEncoderConfig> for FramingConfig {
    fn from(_: NewlineDelimitedEncoderConfig) -> Self {
        Self::NewlineDelimited
    }
}
impl FramingConfig {
    pub fn build(&self) -> Framer {
        match self {
            FramingConfig::Bytes => Framer::Bytes(BytesEncoderConfig.build()),
            FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
            FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
            FramingConfig::NewlineDelimited => {
                Framer::NewlineDelimited(NewlineDelimitedEncoderConfig.build())
            }
        }
    }
}
#[derive(Debug, Clone)]
pub enum Framer {
    Bytes(BytesEncoder),
    CharacterDelimited(CharacterDelimitedEncoder),
    LengthDelimited(LengthDelimitedEncoder),
    NewlineDelimited(NewlineDelimitedEncoder),
    Boxed(BoxedFramer),
}
impl From<BytesEncoder> for Framer {
    fn from(encoder: BytesEncoder) -> Self {
        Self::Bytes(encoder)
    }
}
impl From<CharacterDelimitedEncoder> for Framer {
    fn from(encoder: CharacterDelimitedEncoder) -> Self {
        Self::CharacterDelimited(encoder)
    }
}
impl From<LengthDelimitedEncoder> for Framer {
    fn from(encoder: LengthDelimitedEncoder) -> Self {
        Self::LengthDelimited(encoder)
    }
}
impl From<NewlineDelimitedEncoder> for Framer {
    fn from(encoder: NewlineDelimitedEncoder) -> Self {
        Self::NewlineDelimited(encoder)
    }
}
impl From<BoxedFramer> for Framer {
    fn from(encoder: BoxedFramer) -> Self {
        Self::Boxed(encoder)
    }
}
impl tokio_util::codec::Encoder<()> for Framer {
    type Error = BoxedFramingError;
    fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
        match self {
            Framer::Bytes(framer) => framer.encode((), buffer),
            Framer::CharacterDelimited(framer) => framer.encode((), buffer),
            Framer::LengthDelimited(framer) => framer.encode((), buffer),
            Framer::NewlineDelimited(framer) => framer.encode((), buffer),
            Framer::Boxed(framer) => framer.encode((), buffer),
        }
    }
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "codec", rename_all = "snake_case")]
#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
pub enum SerializerConfig {
    Avro {
        avro: AvroSerializerOptions,
    },
    Cef(
        CefSerializerConfig,
    ),
    Csv(CsvSerializerConfig),
    Gelf,
    Json(JsonSerializerConfig),
    Logfmt,
    Native,
    NativeJson,
    Protobuf(ProtobufSerializerConfig),
    RawMessage,
    Text(TextSerializerConfig),
}
impl From<AvroSerializerConfig> for SerializerConfig {
    fn from(config: AvroSerializerConfig) -> Self {
        Self::Avro { avro: config.avro }
    }
}
impl From<CefSerializerConfig> for SerializerConfig {
    fn from(config: CefSerializerConfig) -> Self {
        Self::Cef(config)
    }
}
impl From<CsvSerializerConfig> for SerializerConfig {
    fn from(config: CsvSerializerConfig) -> Self {
        Self::Csv(config)
    }
}
impl From<GelfSerializerConfig> for SerializerConfig {
    fn from(_: GelfSerializerConfig) -> Self {
        Self::Gelf
    }
}
impl From<JsonSerializerConfig> for SerializerConfig {
    fn from(config: JsonSerializerConfig) -> Self {
        Self::Json(config)
    }
}
impl From<LogfmtSerializerConfig> for SerializerConfig {
    fn from(_: LogfmtSerializerConfig) -> Self {
        Self::Logfmt
    }
}
impl From<NativeSerializerConfig> for SerializerConfig {
    fn from(_: NativeSerializerConfig) -> Self {
        Self::Native
    }
}
impl From<NativeJsonSerializerConfig> for SerializerConfig {
    fn from(_: NativeJsonSerializerConfig) -> Self {
        Self::NativeJson
    }
}
impl From<ProtobufSerializerConfig> for SerializerConfig {
    fn from(config: ProtobufSerializerConfig) -> Self {
        Self::Protobuf(config)
    }
}
impl From<RawMessageSerializerConfig> for SerializerConfig {
    fn from(_: RawMessageSerializerConfig) -> Self {
        Self::RawMessage
    }
}
impl From<TextSerializerConfig> for SerializerConfig {
    fn from(config: TextSerializerConfig) -> Self {
        Self::Text(config)
    }
}
impl SerializerConfig {
    pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
        match self {
            SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
                AvroSerializerConfig::new(avro.schema.clone()).build()?,
            )),
            SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
            SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
            SerializerConfig::Gelf => Ok(Serializer::Gelf(GelfSerializerConfig::new().build())),
            SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
            SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
            SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
            SerializerConfig::NativeJson => {
                Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
            }
            SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
            SerializerConfig::RawMessage => {
                Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
            }
            SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
        }
    }
    pub fn default_stream_framing(&self) -> FramingConfig {
        match self {
            SerializerConfig::Avro { .. }
            | SerializerConfig::Native
            | SerializerConfig::Protobuf(_) => {
                FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
            }
            SerializerConfig::Cef(_)
            | SerializerConfig::Csv(_)
            | SerializerConfig::Json(_)
            | SerializerConfig::Logfmt
            | SerializerConfig::NativeJson
            | SerializerConfig::RawMessage
            | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
            SerializerConfig::Gelf => {
                FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
            }
        }
    }
    pub fn input_type(&self) -> DataType {
        match self {
            SerializerConfig::Avro { avro } => {
                AvroSerializerConfig::new(avro.schema.clone()).input_type()
            }
            SerializerConfig::Cef(config) => config.input_type(),
            SerializerConfig::Csv(config) => config.input_type(),
            SerializerConfig::Gelf { .. } => GelfSerializerConfig::input_type(),
            SerializerConfig::Json(config) => config.input_type(),
            SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
            SerializerConfig::Native => NativeSerializerConfig.input_type(),
            SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
            SerializerConfig::Protobuf(config) => config.input_type(),
            SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
            SerializerConfig::Text(config) => config.input_type(),
        }
    }
    pub fn schema_requirement(&self) -> schema::Requirement {
        match self {
            SerializerConfig::Avro { avro } => {
                AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
            }
            SerializerConfig::Cef(config) => config.schema_requirement(),
            SerializerConfig::Csv(config) => config.schema_requirement(),
            SerializerConfig::Gelf { .. } => GelfSerializerConfig::schema_requirement(),
            SerializerConfig::Json(config) => config.schema_requirement(),
            SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
            SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
            SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
            SerializerConfig::Protobuf(config) => config.schema_requirement(),
            SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
            SerializerConfig::Text(config) => config.schema_requirement(),
        }
    }
}
#[derive(Debug, Clone)]
pub enum Serializer {
    Avro(AvroSerializer),
    Cef(CefSerializer),
    Csv(CsvSerializer),
    Gelf(GelfSerializer),
    Json(JsonSerializer),
    Logfmt(LogfmtSerializer),
    Native(NativeSerializer),
    NativeJson(NativeJsonSerializer),
    Protobuf(ProtobufSerializer),
    RawMessage(RawMessageSerializer),
    Text(TextSerializer),
}
impl Serializer {
    pub fn supports_json(&self) -> bool {
        match self {
            Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
            Serializer::Avro(_)
            | Serializer::Cef(_)
            | Serializer::Csv(_)
            | Serializer::Logfmt(_)
            | Serializer::Text(_)
            | Serializer::Native(_)
            | Serializer::Protobuf(_)
            | Serializer::RawMessage(_) => false,
        }
    }
    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
        match self {
            Serializer::Gelf(serializer) => serializer.to_json_value(event),
            Serializer::Json(serializer) => serializer.to_json_value(event),
            Serializer::NativeJson(serializer) => serializer.to_json_value(event),
            Serializer::Avro(_)
            | Serializer::Cef(_)
            | Serializer::Csv(_)
            | Serializer::Logfmt(_)
            | Serializer::Text(_)
            | Serializer::Native(_)
            | Serializer::Protobuf(_)
            | Serializer::RawMessage(_) => {
                panic!("Serializer does not support JSON")
            }
        }
    }
}
impl From<AvroSerializer> for Serializer {
    fn from(serializer: AvroSerializer) -> Self {
        Self::Avro(serializer)
    }
}
impl From<CefSerializer> for Serializer {
    fn from(serializer: CefSerializer) -> Self {
        Self::Cef(serializer)
    }
}
impl From<CsvSerializer> for Serializer {
    fn from(serializer: CsvSerializer) -> Self {
        Self::Csv(serializer)
    }
}
impl From<GelfSerializer> for Serializer {
    fn from(serializer: GelfSerializer) -> Self {
        Self::Gelf(serializer)
    }
}
impl From<JsonSerializer> for Serializer {
    fn from(serializer: JsonSerializer) -> Self {
        Self::Json(serializer)
    }
}
impl From<LogfmtSerializer> for Serializer {
    fn from(serializer: LogfmtSerializer) -> Self {
        Self::Logfmt(serializer)
    }
}
impl From<NativeSerializer> for Serializer {
    fn from(serializer: NativeSerializer) -> Self {
        Self::Native(serializer)
    }
}
impl From<NativeJsonSerializer> for Serializer {
    fn from(serializer: NativeJsonSerializer) -> Self {
        Self::NativeJson(serializer)
    }
}
impl From<ProtobufSerializer> for Serializer {
    fn from(serializer: ProtobufSerializer) -> Self {
        Self::Protobuf(serializer)
    }
}
impl From<RawMessageSerializer> for Serializer {
    fn from(serializer: RawMessageSerializer) -> Self {
        Self::RawMessage(serializer)
    }
}
impl From<TextSerializer> for Serializer {
    fn from(serializer: TextSerializer) -> Self {
        Self::Text(serializer)
    }
}
impl tokio_util::codec::Encoder<Event> for Serializer {
    type Error = vector_common::Error;
    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
        match self {
            Serializer::Avro(serializer) => serializer.encode(event, buffer),
            Serializer::Cef(serializer) => serializer.encode(event, buffer),
            Serializer::Csv(serializer) => serializer.encode(event, buffer),
            Serializer::Gelf(serializer) => serializer.encode(event, buffer),
            Serializer::Json(serializer) => serializer.encode(event, buffer),
            Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
            Serializer::Native(serializer) => serializer.encode(event, buffer),
            Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
            Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
            Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
            Serializer::Text(serializer) => serializer.encode(event, buffer),
        }
    }
}