use bytes::{Bytes, BytesMut};
use derivative::Derivative;
use tokio_util::codec::Decoder;
use vector_config::configurable_component;
use super::{BoxedFramingError, CharacterDelimitedDecoder};
#[configurable_component]
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct NewlineDelimitedDecoderConfig {
#[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
pub newline_delimited: NewlineDelimitedDecoderOptions,
}
#[configurable_component]
#[derive(Clone, Debug, Derivative, PartialEq, Eq)]
#[derivative(Default)]
pub struct NewlineDelimitedDecoderOptions {
#[serde(skip_serializing_if = "vector_core::serde::is_default")]
pub max_length: Option<usize>,
}
impl NewlineDelimitedDecoderOptions {
pub const fn new_with_max_length(max_length: usize) -> Self {
Self {
max_length: Some(max_length),
}
}
}
impl NewlineDelimitedDecoderConfig {
pub fn new() -> Self {
Default::default()
}
pub const fn new_with_max_length(max_length: usize) -> Self {
Self {
newline_delimited: { NewlineDelimitedDecoderOptions::new_with_max_length(max_length) },
}
}
pub const fn build(&self) -> NewlineDelimitedDecoder {
if let Some(max_length) = self.newline_delimited.max_length {
NewlineDelimitedDecoder::new_with_max_length(max_length)
} else {
NewlineDelimitedDecoder::new()
}
}
}
#[derive(Debug, Clone)]
pub struct NewlineDelimitedDecoder(CharacterDelimitedDecoder);
impl NewlineDelimitedDecoder {
pub const fn new() -> Self {
Self(CharacterDelimitedDecoder::new(b'\n'))
}
pub const fn new_with_max_length(max_length: usize) -> Self {
Self(CharacterDelimitedDecoder::new_with_max_length(
b'\n', max_length,
))
}
}
impl Default for NewlineDelimitedDecoder {
fn default() -> Self {
Self::new()
}
}
impl Decoder for NewlineDelimitedDecoder {
type Item = Bytes;
type Error = BoxedFramingError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
self.0.decode(src)
}
fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
self.0.decode_eof(src)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn decode_bytes_with_newlines() {
let mut input = BytesMut::from("foo\nbar\nbaz");
let mut decoder = NewlineDelimitedDecoder::new();
assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
assert_eq!(decoder.decode(&mut input).unwrap(), None);
}
#[test]
fn decode_bytes_with_newlines_trailing() {
let mut input = BytesMut::from("foo\nbar\nbaz\n");
let mut decoder = NewlineDelimitedDecoder::new();
assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "baz");
assert_eq!(decoder.decode(&mut input).unwrap(), None);
}
#[test]
fn decode_bytes_with_newlines_and_max_length() {
let mut input = BytesMut::from("foo\nbarbara\nbaz\n");
let mut decoder = NewlineDelimitedDecoder::new_with_max_length(3);
assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "baz");
assert_eq!(decoder.decode(&mut input).unwrap(), None);
}
#[test]
fn decode_eof_bytes_with_newlines() {
let mut input = BytesMut::from("foo\nbar\nbaz");
let mut decoder = NewlineDelimitedDecoder::new();
assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
}
#[test]
fn decode_eof_bytes_with_newlines_trailing() {
let mut input = BytesMut::from("foo\nbar\nbaz\n");
let mut decoder = NewlineDelimitedDecoder::new();
assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
}
#[test]
fn decode_eof_bytes_with_newlines_and_max_length() {
let mut input = BytesMut::from("foo\nbarbara\nbaz\n");
let mut decoder = NewlineDelimitedDecoder::new_with_max_length(3);
assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
}
}