pub mod format;
pub mod framing;
use std::fmt::Debug;
use bytes::BytesMut;
pub use format::{
AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig,
JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer,
LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig,
ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer,
TextSerializerConfig,
};
pub use framing::{
BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder,
CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, LengthDelimitedEncoder,
LengthDelimitedEncoderConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig,
};
use vector_config::configurable_component;
use vector_core::{config::DataType, event::Event, schema};
pub type BuildError = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(Debug)]
pub enum Error {
FramingError(BoxedFramingError),
SerializingError(vector_common::Error),
}
impl std::fmt::Display for Error {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::FramingError(error) => write!(formatter, "FramingError({})", error),
Self::SerializingError(error) => write!(formatter, "SerializingError({})", error),
}
}
}
impl std::error::Error for Error {}
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Self::FramingError(Box::new(error))
}
}
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
#[serde(tag = "method", rename_all = "snake_case")]
#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
pub enum FramingConfig {
Bytes,
CharacterDelimited(CharacterDelimitedEncoderConfig),
LengthDelimited(LengthDelimitedEncoderConfig),
NewlineDelimited,
}
impl From<BytesEncoderConfig> for FramingConfig {
fn from(_: BytesEncoderConfig) -> Self {
Self::Bytes
}
}
impl From<CharacterDelimitedEncoderConfig> for FramingConfig {
fn from(config: CharacterDelimitedEncoderConfig) -> Self {
Self::CharacterDelimited(config)
}
}
impl From<LengthDelimitedEncoderConfig> for FramingConfig {
fn from(config: LengthDelimitedEncoderConfig) -> Self {
Self::LengthDelimited(config)
}
}
impl From<NewlineDelimitedEncoderConfig> for FramingConfig {
fn from(_: NewlineDelimitedEncoderConfig) -> Self {
Self::NewlineDelimited
}
}
impl FramingConfig {
pub fn build(&self) -> Framer {
match self {
FramingConfig::Bytes => Framer::Bytes(BytesEncoderConfig.build()),
FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
FramingConfig::NewlineDelimited => {
Framer::NewlineDelimited(NewlineDelimitedEncoderConfig.build())
}
}
}
}
#[derive(Debug, Clone)]
pub enum Framer {
Bytes(BytesEncoder),
CharacterDelimited(CharacterDelimitedEncoder),
LengthDelimited(LengthDelimitedEncoder),
NewlineDelimited(NewlineDelimitedEncoder),
Boxed(BoxedFramer),
}
impl From<BytesEncoder> for Framer {
fn from(encoder: BytesEncoder) -> Self {
Self::Bytes(encoder)
}
}
impl From<CharacterDelimitedEncoder> for Framer {
fn from(encoder: CharacterDelimitedEncoder) -> Self {
Self::CharacterDelimited(encoder)
}
}
impl From<LengthDelimitedEncoder> for Framer {
fn from(encoder: LengthDelimitedEncoder) -> Self {
Self::LengthDelimited(encoder)
}
}
impl From<NewlineDelimitedEncoder> for Framer {
fn from(encoder: NewlineDelimitedEncoder) -> Self {
Self::NewlineDelimited(encoder)
}
}
impl From<BoxedFramer> for Framer {
fn from(encoder: BoxedFramer) -> Self {
Self::Boxed(encoder)
}
}
impl tokio_util::codec::Encoder<()> for Framer {
type Error = BoxedFramingError;
fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
match self {
Framer::Bytes(framer) => framer.encode((), buffer),
Framer::CharacterDelimited(framer) => framer.encode((), buffer),
Framer::LengthDelimited(framer) => framer.encode((), buffer),
Framer::NewlineDelimited(framer) => framer.encode((), buffer),
Framer::Boxed(framer) => framer.encode((), buffer),
}
}
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "codec", rename_all = "snake_case")]
#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
pub enum SerializerConfig {
Avro {
avro: AvroSerializerOptions,
},
Cef(
CefSerializerConfig,
),
Csv(CsvSerializerConfig),
Gelf,
Json(JsonSerializerConfig),
Logfmt,
Native,
NativeJson,
Protobuf(ProtobufSerializerConfig),
RawMessage,
Text(TextSerializerConfig),
}
impl From<AvroSerializerConfig> for SerializerConfig {
fn from(config: AvroSerializerConfig) -> Self {
Self::Avro { avro: config.avro }
}
}
impl From<CefSerializerConfig> for SerializerConfig {
fn from(config: CefSerializerConfig) -> Self {
Self::Cef(config)
}
}
impl From<CsvSerializerConfig> for SerializerConfig {
fn from(config: CsvSerializerConfig) -> Self {
Self::Csv(config)
}
}
impl From<GelfSerializerConfig> for SerializerConfig {
fn from(_: GelfSerializerConfig) -> Self {
Self::Gelf
}
}
impl From<JsonSerializerConfig> for SerializerConfig {
fn from(config: JsonSerializerConfig) -> Self {
Self::Json(config)
}
}
impl From<LogfmtSerializerConfig> for SerializerConfig {
fn from(_: LogfmtSerializerConfig) -> Self {
Self::Logfmt
}
}
impl From<NativeSerializerConfig> for SerializerConfig {
fn from(_: NativeSerializerConfig) -> Self {
Self::Native
}
}
impl From<NativeJsonSerializerConfig> for SerializerConfig {
fn from(_: NativeJsonSerializerConfig) -> Self {
Self::NativeJson
}
}
impl From<ProtobufSerializerConfig> for SerializerConfig {
fn from(config: ProtobufSerializerConfig) -> Self {
Self::Protobuf(config)
}
}
impl From<RawMessageSerializerConfig> for SerializerConfig {
fn from(_: RawMessageSerializerConfig) -> Self {
Self::RawMessage
}
}
impl From<TextSerializerConfig> for SerializerConfig {
fn from(config: TextSerializerConfig) -> Self {
Self::Text(config)
}
}
impl SerializerConfig {
pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
match self {
SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
AvroSerializerConfig::new(avro.schema.clone()).build()?,
)),
SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
SerializerConfig::Gelf => Ok(Serializer::Gelf(GelfSerializerConfig::new().build())),
SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
SerializerConfig::NativeJson => {
Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
}
SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
SerializerConfig::RawMessage => {
Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
}
SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
}
}
pub fn default_stream_framing(&self) -> FramingConfig {
match self {
SerializerConfig::Avro { .. }
| SerializerConfig::Native
| SerializerConfig::Protobuf(_) => {
FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
}
SerializerConfig::Cef(_)
| SerializerConfig::Csv(_)
| SerializerConfig::Json(_)
| SerializerConfig::Logfmt
| SerializerConfig::NativeJson
| SerializerConfig::RawMessage
| SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
SerializerConfig::Gelf => {
FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
}
}
}
pub fn input_type(&self) -> DataType {
match self {
SerializerConfig::Avro { avro } => {
AvroSerializerConfig::new(avro.schema.clone()).input_type()
}
SerializerConfig::Cef(config) => config.input_type(),
SerializerConfig::Csv(config) => config.input_type(),
SerializerConfig::Gelf { .. } => GelfSerializerConfig::input_type(),
SerializerConfig::Json(config) => config.input_type(),
SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
SerializerConfig::Native => NativeSerializerConfig.input_type(),
SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
SerializerConfig::Protobuf(config) => config.input_type(),
SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
SerializerConfig::Text(config) => config.input_type(),
}
}
pub fn schema_requirement(&self) -> schema::Requirement {
match self {
SerializerConfig::Avro { avro } => {
AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
}
SerializerConfig::Cef(config) => config.schema_requirement(),
SerializerConfig::Csv(config) => config.schema_requirement(),
SerializerConfig::Gelf { .. } => GelfSerializerConfig::schema_requirement(),
SerializerConfig::Json(config) => config.schema_requirement(),
SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
SerializerConfig::Protobuf(config) => config.schema_requirement(),
SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
SerializerConfig::Text(config) => config.schema_requirement(),
}
}
}
#[derive(Debug, Clone)]
pub enum Serializer {
Avro(AvroSerializer),
Cef(CefSerializer),
Csv(CsvSerializer),
Gelf(GelfSerializer),
Json(JsonSerializer),
Logfmt(LogfmtSerializer),
Native(NativeSerializer),
NativeJson(NativeJsonSerializer),
Protobuf(ProtobufSerializer),
RawMessage(RawMessageSerializer),
Text(TextSerializer),
}
impl Serializer {
pub fn supports_json(&self) -> bool {
match self {
Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
Serializer::Avro(_)
| Serializer::Cef(_)
| Serializer::Csv(_)
| Serializer::Logfmt(_)
| Serializer::Text(_)
| Serializer::Native(_)
| Serializer::Protobuf(_)
| Serializer::RawMessage(_) => false,
}
}
pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
match self {
Serializer::Gelf(serializer) => serializer.to_json_value(event),
Serializer::Json(serializer) => serializer.to_json_value(event),
Serializer::NativeJson(serializer) => serializer.to_json_value(event),
Serializer::Avro(_)
| Serializer::Cef(_)
| Serializer::Csv(_)
| Serializer::Logfmt(_)
| Serializer::Text(_)
| Serializer::Native(_)
| Serializer::Protobuf(_)
| Serializer::RawMessage(_) => {
panic!("Serializer does not support JSON")
}
}
}
}
impl From<AvroSerializer> for Serializer {
fn from(serializer: AvroSerializer) -> Self {
Self::Avro(serializer)
}
}
impl From<CefSerializer> for Serializer {
fn from(serializer: CefSerializer) -> Self {
Self::Cef(serializer)
}
}
impl From<CsvSerializer> for Serializer {
fn from(serializer: CsvSerializer) -> Self {
Self::Csv(serializer)
}
}
impl From<GelfSerializer> for Serializer {
fn from(serializer: GelfSerializer) -> Self {
Self::Gelf(serializer)
}
}
impl From<JsonSerializer> for Serializer {
fn from(serializer: JsonSerializer) -> Self {
Self::Json(serializer)
}
}
impl From<LogfmtSerializer> for Serializer {
fn from(serializer: LogfmtSerializer) -> Self {
Self::Logfmt(serializer)
}
}
impl From<NativeSerializer> for Serializer {
fn from(serializer: NativeSerializer) -> Self {
Self::Native(serializer)
}
}
impl From<NativeJsonSerializer> for Serializer {
fn from(serializer: NativeJsonSerializer) -> Self {
Self::NativeJson(serializer)
}
}
impl From<ProtobufSerializer> for Serializer {
fn from(serializer: ProtobufSerializer) -> Self {
Self::Protobuf(serializer)
}
}
impl From<RawMessageSerializer> for Serializer {
fn from(serializer: RawMessageSerializer) -> Self {
Self::RawMessage(serializer)
}
}
impl From<TextSerializer> for Serializer {
fn from(serializer: TextSerializer) -> Self {
Self::Text(serializer)
}
}
impl tokio_util::codec::Encoder<Event> for Serializer {
type Error = vector_common::Error;
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
match self {
Serializer::Avro(serializer) => serializer.encode(event, buffer),
Serializer::Cef(serializer) => serializer.encode(event, buffer),
Serializer::Csv(serializer) => serializer.encode(event, buffer),
Serializer::Gelf(serializer) => serializer.encode(event, buffer),
Serializer::Json(serializer) => serializer.encode(event, buffer),
Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
Serializer::Native(serializer) => serializer.encode(event, buffer),
Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
Serializer::Text(serializer) => serializer.encode(event, buffer),
}
}
}