use bytes::{Bytes, BytesMut};
use smallvec::SmallVec;
use vector_lib::codecs::decoding::{
format::Deserializer as _, BoxedFramingError, BytesDeserializer, Deserializer, Error, Framer,
NewlineDelimitedDecoder,
};
use vector_lib::config::LogNamespace;
use crate::{
event::Event,
internal_events::{DecoderDeserializeError, DecoderFramingError},
};
#[derive(Clone)]
pub struct Decoder {
pub framer: Framer,
pub deserializer: Deserializer,
pub log_namespace: LogNamespace,
}
impl Default for Decoder {
fn default() -> Self {
Self {
framer: Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
deserializer: Deserializer::Bytes(BytesDeserializer),
log_namespace: LogNamespace::Legacy,
}
}
}
impl Decoder {
pub const fn new(framer: Framer, deserializer: Deserializer) -> Self {
Self {
framer,
deserializer,
log_namespace: LogNamespace::Legacy,
}
}
pub const fn with_log_namespace(mut self, log_namespace: LogNamespace) -> Self {
self.log_namespace = log_namespace;
self
}
fn handle_framing_result(
&mut self,
frame: Result<Option<Bytes>, BoxedFramingError>,
) -> Result<Option<(SmallVec<[Event; 1]>, usize)>, Error> {
let frame = frame.map_err(|error| {
emit!(DecoderFramingError { error: &error });
Error::FramingError(error)
})?;
frame
.map(|frame| self.deserializer_parse(frame))
.transpose()
}
pub fn deserializer_parse(&self, frame: Bytes) -> Result<(SmallVec<[Event; 1]>, usize), Error> {
let byte_size = frame.len();
self.deserializer
.parse(frame, self.log_namespace)
.map(|events| (events, byte_size))
.map_err(|error| {
emit!(DecoderDeserializeError { error: &error });
Error::ParsingError(error)
})
}
}
impl tokio_util::codec::Decoder for Decoder {
type Item = (SmallVec<[Event; 1]>, usize);
type Error = Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let frame = self.framer.decode(buf);
self.handle_framing_result(frame)
}
fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let frame = self.framer.decode_eof(buf);
self.handle_framing_result(frame)
}
}
#[cfg(test)]
mod tests {
use super::Decoder;
use bytes::Bytes;
use futures::{stream, StreamExt};
use tokio_util::{codec::FramedRead, io::StreamReader};
use vector_lib::codecs::{
decoding::{Deserializer, Framer},
JsonDeserializer, NewlineDelimitedDecoder, StreamDecodingError,
};
use vrl::value::Value;
#[tokio::test]
async fn framed_read_recover_from_error() {
let iter = stream::iter(
["{ \"foo\": 1 }\n", "invalid\n", "{ \"bar\": 2 }\n"]
.into_iter()
.map(Bytes::from),
);
let stream = iter.map(Ok::<_, std::io::Error>);
let reader = StreamReader::new(stream);
let decoder = Decoder::new(
Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
Deserializer::Json(JsonDeserializer::default()),
);
let mut stream = FramedRead::new(reader, decoder);
let next = stream.next().await.unwrap();
let event = next.unwrap().0.pop().unwrap().into_log();
assert_eq!(event.get("foo").unwrap(), &Value::from(1));
let next = stream.next().await.unwrap();
let error = next.unwrap_err();
assert!(error.can_continue());
let next = stream.next().await.unwrap();
let event = next.unwrap().0.pop().unwrap().into_log();
assert_eq!(event.get("bar").unwrap(), &Value::from(2));
}
}