use std::collections::HashMap;
use std::io::{self, Read};
use std::net::SocketAddr;
use std::time::Duration;
use base64::prelude::{Engine as _, BASE64_STANDARD};
use bytes::{Buf, Bytes, BytesMut};
use chrono::Utc;
use flate2::read::MultiGzDecoder;
use rmp_serde::{decode, Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use tokio_util::codec::Decoder;
use vector_lib::codecs::{BytesDeserializerConfig, StreamDecodingError};
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::ipallowlist::IpAllowlistConfig;
use vector_lib::lookup::lookup_v2::parse_value_path;
use vector_lib::lookup::{metadata_path, owned_value_path, path, OwnedValuePath};
use vector_lib::schema::Definition;
use vrl::value::kind::Collection;
use vrl::value::{Kind, Value};
use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
use crate::{
    config::{
        log_schema, DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
        SourceContext, SourceOutput,
    },
    event::{Event, LogEvent},
    internal_events::{FluentMessageDecodeError, FluentMessageReceived},
    serde::bool_or_struct,
    tcp::TcpKeepaliveConfig,
    tls::{MaybeTlsSettings, TlsSourceConfig},
};
mod message;
use self::message::{FluentEntry, FluentMessage, FluentRecord, FluentTag, FluentTimestamp};
#[configurable_component(source("fluent", "Collect logs from a Fluentd or Fluent Bit agent."))]
#[derive(Clone, Debug)]
pub struct FluentConfig {
    #[configurable(derived)]
    address: SocketListenAddr,
    #[configurable(metadata(docs::type_unit = "connections"))]
    connection_limit: Option<u32>,
    #[configurable(derived)]
    keepalive: Option<TcpKeepaliveConfig>,
    #[configurable(derived)]
    pub permit_origin: Option<IpAllowlistConfig>,
    #[configurable(metadata(docs::type_unit = "bytes"))]
    #[configurable(metadata(docs::examples = 65536))]
    receive_buffer_bytes: Option<usize>,
    #[configurable(derived)]
    tls: Option<TlsSourceConfig>,
    #[configurable(derived)]
    #[serde(default, deserialize_with = "bool_or_struct")]
    acknowledgements: SourceAcknowledgementsConfig,
    #[configurable(metadata(docs::hidden))]
    #[serde(default)]
    log_namespace: Option<bool>,
}
impl GenerateConfig for FluentConfig {
    fn generate_config() -> toml::Value {
        toml::Value::try_from(Self {
            address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
            keepalive: None,
            permit_origin: None,
            tls: None,
            receive_buffer_bytes: None,
            acknowledgements: Default::default(),
            connection_limit: Some(2),
            log_namespace: None,
        })
        .unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "fluent")]
impl SourceConfig for FluentConfig {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        let log_namespace = cx.log_namespace(self.log_namespace);
        let source = FluentSource::new(log_namespace);
        let shutdown_secs = Duration::from_secs(30);
        let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
        let tls_client_metadata_key = self
            .tls
            .as_ref()
            .and_then(|tls| tls.client_metadata_key.clone())
            .and_then(|k| k.path);
        let tls = MaybeTlsSettings::from_config(&tls_config, true)?;
        source.run(
            self.address,
            self.keepalive,
            shutdown_secs,
            tls,
            tls_client_metadata_key,
            self.receive_buffer_bytes,
            None,
            cx,
            self.acknowledgements,
            self.connection_limit,
            self.permit_origin.clone().map(Into::into),
            FluentConfig::NAME,
            log_namespace,
        )
    }
    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        let log_namespace = global_log_namespace.merge(self.log_namespace);
        let schema_definition = self.schema_definition(log_namespace);
        vec![SourceOutput::new_maybe_logs(
            DataType::Log,
            schema_definition,
        )]
    }
    fn resources(&self) -> Vec<Resource> {
        vec![self.address.as_tcp_resource()]
    }
    fn can_acknowledge(&self) -> bool {
        true
    }
}
impl FluentConfig {
    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
        let host_key = log_schema()
            .host_key()
            .cloned()
            .map(LegacyKey::InsertIfEmpty);
        let tag_key = parse_value_path("tag").ok().map(LegacyKey::Overwrite);
        let tls_client_metadata_path = self
            .tls
            .as_ref()
            .and_then(|tls| tls.client_metadata_key.as_ref())
            .and_then(|k| k.path.clone())
            .map(LegacyKey::Overwrite);
        let mut schema_definition = BytesDeserializerConfig
            .schema_definition(log_namespace)
            .with_standard_vector_source_metadata()
            .with_source_metadata(
                FluentConfig::NAME,
                host_key,
                &owned_value_path!("host"),
                Kind::bytes(),
                Some("host"),
            )
            .with_source_metadata(
                FluentConfig::NAME,
                tag_key,
                &owned_value_path!("tag"),
                Kind::bytes(),
                None,
            )
            .with_source_metadata(
                FluentConfig::NAME,
                None,
                &owned_value_path!("timestamp"),
                Kind::timestamp(),
                Some("timestamp"),
            )
            .with_source_metadata(
                FluentConfig::NAME,
                None,
                &owned_value_path!("record"),
                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
                None,
            )
            .with_source_metadata(
                Self::NAME,
                tls_client_metadata_path,
                &owned_value_path!("tls_client_metadata"),
                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
                None,
            );
        if log_namespace == LogNamespace::Legacy {
            schema_definition = schema_definition.unknown_fields(Kind::bytes());
        }
        schema_definition
    }
}
#[derive(Debug, Clone)]
struct FluentSource {
    log_namespace: LogNamespace,
    legacy_host_key_path: Option<OwnedValuePath>,
}
impl FluentSource {
    fn new(log_namespace: LogNamespace) -> Self {
        Self {
            log_namespace,
            legacy_host_key_path: log_schema().host_key().cloned(),
        }
    }
}
impl TcpSource for FluentSource {
    type Error = DecodeError;
    type Item = FluentFrame;
    type Decoder = FluentDecoder;
    type Acker = FluentAcker;
    fn decoder(&self) -> Self::Decoder {
        FluentDecoder::new(self.log_namespace)
    }
    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
        for event in events {
            let log = event.as_mut_log();
            let legacy_host_key = self
                .legacy_host_key_path
                .as_ref()
                .map(LegacyKey::InsertIfEmpty);
            self.log_namespace.insert_source_metadata(
                FluentConfig::NAME,
                log,
                legacy_host_key,
                path!("host"),
                host.ip().to_string(),
            );
        }
    }
    fn build_acker(&self, frame: &[Self::Item]) -> Self::Acker {
        FluentAcker::new(frame)
    }
}
#[derive(Debug)]
pub enum DecodeError {
    IO(io::Error),
    Decode(decode::Error),
    UnknownCompression(String),
    UnexpectedValue(rmpv::Value),
}
impl std::fmt::Display for DecodeError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            DecodeError::IO(err) => write!(f, "{}", err),
            DecodeError::Decode(err) => write!(f, "{}", err),
            DecodeError::UnknownCompression(compression) => {
                write!(f, "unknown compression: {}", compression)
            }
            DecodeError::UnexpectedValue(value) => {
                write!(f, "unexpected msgpack value, ignoring: {}", value)
            }
        }
    }
}
impl StreamDecodingError for DecodeError {
    fn can_continue(&self) -> bool {
        match self {
            DecodeError::IO(_) => false,
            DecodeError::Decode(_) => true,
            DecodeError::UnknownCompression(_) => true,
            DecodeError::UnexpectedValue(_) => true,
        }
    }
}
impl From<io::Error> for DecodeError {
    fn from(e: io::Error) -> Self {
        DecodeError::IO(e)
    }
}
impl From<decode::Error> for DecodeError {
    fn from(e: decode::Error) -> Self {
        DecodeError::Decode(e)
    }
}
#[derive(Debug)]
struct FluentDecoder {
    log_namespace: LogNamespace,
}
impl FluentDecoder {
    const fn new(log_namespace: LogNamespace) -> Self {
        Self { log_namespace }
    }
    fn handle_message(
        &mut self,
        message: Result<FluentMessage, DecodeError>,
        byte_size: usize,
    ) -> Result<Option<(FluentFrame, usize)>, DecodeError> {
        let log_namespace = &self.log_namespace;
        match message? {
            FluentMessage::Message(tag, timestamp, record) => {
                let event = Event::from(FluentEvent {
                    tag,
                    timestamp,
                    record,
                    log_namespace,
                });
                let frame = FluentFrame {
                    events: smallvec![event],
                    chunk: None,
                };
                Ok(Some((frame, byte_size)))
            }
            FluentMessage::MessageWithOptions(tag, timestamp, record, options) => {
                let event = Event::from(FluentEvent {
                    tag,
                    timestamp,
                    record,
                    log_namespace,
                });
                let frame = FluentFrame {
                    events: smallvec![event],
                    chunk: options.chunk,
                };
                Ok(Some((frame, byte_size)))
            }
            FluentMessage::Forward(tag, entries) => {
                let events = entries
                    .into_iter()
                    .map(|FluentEntry(timestamp, record)| {
                        Event::from(FluentEvent {
                            tag: tag.clone(),
                            timestamp,
                            record,
                            log_namespace,
                        })
                    })
                    .collect();
                let frame = FluentFrame {
                    events,
                    chunk: None,
                };
                Ok(Some((frame, byte_size)))
            }
            FluentMessage::ForwardWithOptions(tag, entries, options) => {
                let events = entries
                    .into_iter()
                    .map(|FluentEntry(timestamp, record)| {
                        Event::from(FluentEvent {
                            tag: tag.clone(),
                            timestamp,
                            record,
                            log_namespace,
                        })
                    })
                    .collect();
                let frame = FluentFrame {
                    events,
                    chunk: options.chunk,
                };
                Ok(Some((frame, byte_size)))
            }
            FluentMessage::PackedForward(tag, bin) => {
                let mut buf = BytesMut::from(&bin[..]);
                let mut events = smallvec![];
                while let Some(FluentEntry(timestamp, record)) =
                    FluentEntryStreamDecoder.decode(&mut buf)?
                {
                    events.push(Event::from(FluentEvent {
                        tag: tag.clone(),
                        timestamp,
                        record,
                        log_namespace,
                    }));
                }
                let frame = FluentFrame {
                    events,
                    chunk: None,
                };
                Ok(Some((frame, byte_size)))
            }
            FluentMessage::PackedForwardWithOptions(tag, bin, options) => {
                let buf = match options.compressed.as_deref() {
                    Some("gzip") => {
                        let mut buf = Vec::new();
                        MultiGzDecoder::new(io::Cursor::new(bin.into_vec()))
                            .read_to_end(&mut buf)
                            .map(|_| buf)
                            .map_err(Into::into)
                    }
                    Some("text") | None => Ok(bin.into_vec()),
                    Some(s) => Err(DecodeError::UnknownCompression(s.to_owned())),
                }?;
                let mut buf = BytesMut::from(&buf[..]);
                let mut events = smallvec![];
                while let Some(FluentEntry(timestamp, record)) =
                    FluentEntryStreamDecoder.decode(&mut buf)?
                {
                    events.push(Event::from(FluentEvent {
                        tag: tag.clone(),
                        timestamp,
                        record,
                        log_namespace,
                    }));
                }
                let frame = FluentFrame {
                    events,
                    chunk: options.chunk,
                };
                Ok(Some((frame, byte_size)))
            }
            FluentMessage::Heartbeat(rmpv::Value::Nil) => Ok(None),
            FluentMessage::Heartbeat(value) => Err(DecodeError::UnexpectedValue(value)),
        }
    }
}
impl Decoder for FluentDecoder {
    type Item = (FluentFrame, usize);
    type Error = DecodeError;
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        loop {
            if src.is_empty() {
                return Ok(None);
            }
            let (byte_size, res) = {
                let mut des = Deserializer::new(io::Cursor::new(&src[..]));
                let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
                if let Err(DecodeError::Decode(
                    decode::Error::InvalidDataRead(ref custom)
                    | decode::Error::InvalidMarkerRead(ref custom),
                )) = res
                {
                    if custom.kind() == io::ErrorKind::UnexpectedEof {
                        return Ok(None);
                    }
                }
                (des.position() as usize, res)
            };
            src.advance(byte_size);
            let maybe_item = self.handle_message(res, byte_size).inspect_err(|error| {
                let base64_encoded_message = BASE64_STANDARD.encode(&src[..]);
                emit!(FluentMessageDecodeError {
                    error,
                    base64_encoded_message
                });
            })?;
            if let Some(item) = maybe_item {
                return Ok(Some(item));
            }
        }
    }
}
#[derive(Clone, Debug)]
struct FluentEntryStreamDecoder;
impl Decoder for FluentEntryStreamDecoder {
    type Item = FluentEntry;
    type Error = DecodeError;
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.is_empty() {
            return Ok(None);
        }
        let (byte_size, res) = {
            let mut des = Deserializer::new(io::Cursor::new(&src[..]));
            let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
            if let Err(DecodeError::Decode(decode::Error::InvalidDataRead(ref custom))) = res {
                if custom.kind() == io::ErrorKind::UnexpectedEof {
                    return Ok(None);
                }
            }
            let byte_size = des.position();
            emit!(FluentMessageReceived { byte_size });
            (byte_size as usize, res)
        };
        src.advance(byte_size);
        res
    }
}
struct FluentAcker {
    chunks: Vec<String>,
}
impl FluentAcker {
    fn new(frames: &[FluentFrame]) -> Self {
        Self {
            chunks: frames.iter().filter_map(|f| f.chunk.clone()).collect(),
        }
    }
}
impl TcpSourceAcker for FluentAcker {
    fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
        if self.chunks.is_empty() {
            return None;
        }
        let mut buf = Vec::new();
        let mut ser = Serializer::new(&mut buf);
        let mut ack_map = HashMap::new();
        for chunk in self.chunks {
            ack_map.clear();
            if let TcpSourceAck::Ack = ack {
                ack_map.insert("ack", chunk);
            };
            ack_map.serialize(&mut ser).unwrap();
        }
        Some(buf.into())
    }
}
#[derive(Debug, PartialEq)]
struct FluentEvent<'a> {
    tag: FluentTag,
    timestamp: FluentTimestamp,
    record: FluentRecord,
    log_namespace: &'a LogNamespace,
}
impl From<FluentEvent<'_>> for Event {
    fn from(frame: FluentEvent) -> Event {
        LogEvent::from(frame).into()
    }
}
struct FluentFrame {
    events: SmallVec<[Event; 1]>,
    chunk: Option<String>,
}
impl From<FluentFrame> for SmallVec<[Event; 1]> {
    fn from(frame: FluentFrame) -> Self {
        frame.events
    }
}
impl From<FluentEvent<'_>> for LogEvent {
    fn from(frame: FluentEvent) -> LogEvent {
        let FluentEvent {
            tag,
            timestamp,
            record,
            log_namespace,
        } = frame;
        let mut log = LogEvent::default();
        log_namespace.insert_vector_metadata(
            &mut log,
            log_schema().source_type_key(),
            path!("source_type"),
            Bytes::from_static(FluentConfig::NAME.as_bytes()),
        );
        match log_namespace {
            LogNamespace::Vector => {
                log.insert(metadata_path!(FluentConfig::NAME, "timestamp"), timestamp);
                log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
            }
            LogNamespace::Legacy => {
                log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
            }
        }
        log_namespace.insert_source_metadata(
            FluentConfig::NAME,
            &mut log,
            Some(LegacyKey::Overwrite(path!("tag"))),
            path!("tag"),
            tag,
        );
        for (key, value) in record.into_iter() {
            let value: Value = value.into();
            log_namespace.insert_source_metadata(
                FluentConfig::NAME,
                &mut log,
                Some(LegacyKey::Overwrite(path!(key.as_str()))),
                path!("record", key.as_str()),
                value,
            );
        }
        log
    }
}
#[cfg(test)]
mod tests {
    use bytes::BytesMut;
    use chrono::{DateTime, Utc};
    use rmp_serde::Serializer;
    use serde::Serialize;
    use tokio::{
        io::{AsyncReadExt, AsyncWriteExt},
        time::{error::Elapsed, timeout, Duration},
    };
    use tokio_util::codec::Decoder;
    use vector_lib::assert_event_data_eq;
    use vector_lib::lookup::OwnedTargetPath;
    use vector_lib::schema::Definition;
    use vrl::value::{kind::Collection, ObjectMap, Value};
    use super::{message::FluentMessageOptions, *};
    use crate::{
        config::{SourceConfig, SourceContext},
        event::EventStatus,
        test_util::{self, next_addr, trace_init, wait_for_tcp},
        SourceSender,
    };
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<FluentConfig>();
    }
    fn mock_event(name: &str, timestamp: &str) -> Event {
        Event::Log(LogEvent::from(ObjectMap::from([
            ("message".into(), Value::from(name)),
            (
                log_schema().source_type_key().unwrap().to_string().into(),
                Value::from(FluentConfig::NAME),
            ),
            ("tag".into(), Value::from("tag.name")),
            (
                "timestamp".into(),
                Value::Timestamp(DateTime::parse_from_rfc3339(timestamp).unwrap().into()),
            ),
        ])))
    }
    #[test]
    fn decode_message_mode() {
        let message: Vec<u8> = vec![
            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
            101, 115, 115, 97, 103, 101, 163, 98, 97, 114,
        ];
        let expected = mock_event("bar", "2015-09-07T01:23:04Z");
        let got = decode_all(message.clone()).unwrap();
        assert_event_data_eq!(got.0[0], expected);
        assert_eq!(got.1, message.len());
    }
    #[test]
    fn decode_message_mode_with_options() {
        let message: Vec<u8> = vec![
            148, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
            101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 129, 164, 115, 105, 122, 101, 1,
        ];
        let expected = mock_event("bar", "2015-09-07T01:23:04Z");
        let got = decode_all(message.clone()).unwrap();
        assert_eq!(got.1, message.len());
        assert_event_data_eq!(got.0[0], expected);
    }
    #[test]
    fn decode_forward_mode() {
        let message: Vec<u8> = vec![
            146, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
            167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
            129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
            250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122,
        ];
        let expected = vec![
            mock_event("foo", "2015-09-07T01:23:04Z"),
            mock_event("bar", "2015-09-07T01:23:05Z"),
            mock_event("baz", "2015-09-07T01:23:06Z"),
        ];
        let got = decode_all(message.clone()).unwrap();
        assert_eq!(got.1, message.len());
        assert_event_data_eq!(got.0[0], expected[0]);
        assert_event_data_eq!(got.0[1], expected[1]);
        assert_event_data_eq!(got.0[2], expected[2]);
    }
    #[test]
    fn decode_forward_mode_with_options() {
        let message: Vec<u8> = vec![
            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
            167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
            129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
            250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 164, 115, 105,
            122, 101, 3,
        ];
        let expected = vec![
            mock_event("foo", "2015-09-07T01:23:04Z"),
            mock_event("bar", "2015-09-07T01:23:05Z"),
            mock_event("baz", "2015-09-07T01:23:06Z"),
        ];
        let got = decode_all(message.clone()).unwrap();
        assert_eq!(got.1, message.len());
        assert_event_data_eq!(got.0[0], expected[0]);
        assert_event_data_eq!(got.0[1], expected[1]);
        assert_event_data_eq!(got.0[2], expected[2]);
    }
    #[test]
    fn decode_packed_forward_mode() {
        let message: Vec<u8> = vec![
            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 57, 146, 206, 85, 236, 230, 248,
            129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230,
            249, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236,
            230, 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 167, 109,
            101, 115, 115, 97, 103, 101, 163, 102, 111, 111,
        ];
        let expected = vec![
            mock_event("foo", "2015-09-07T01:23:04Z"),
            mock_event("bar", "2015-09-07T01:23:05Z"),
            mock_event("baz", "2015-09-07T01:23:06Z"),
        ];
        let got = decode_all(message.clone()).unwrap();
        assert_eq!(got.1, message.len());
        assert_event_data_eq!(got.0[0], expected[0]);
        assert_event_data_eq!(got.0[1], expected[1]);
        assert_event_data_eq!(got.0[2], expected[2]);
    }
    #[test]
    fn decode_compressed_packed_forward_mode() {
        let message: Vec<u8> = vec![
            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 55, 31, 139, 8, 0, 245, 10, 168,
            96, 0, 3, 155, 116, 46, 244, 205, 179, 31, 141, 203, 115, 83, 139, 139, 19, 211, 83,
            23, 167, 229, 231, 79, 2, 9, 253, 68, 8, 37, 37, 22, 129, 133, 126, 33, 11, 85, 1, 0,
            53, 3, 158, 28, 57, 0, 0, 0, 129, 170, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100,
            164, 103, 122, 105, 112,
        ];
        let expected = vec![
            mock_event("foo", "2015-09-07T01:23:04Z"),
            mock_event("bar", "2015-09-07T01:23:05Z"),
            mock_event("baz", "2015-09-07T01:23:06Z"),
        ];
        let got = decode_all(message.clone()).unwrap();
        assert_eq!(got.1, message.len());
        assert_event_data_eq!(got.0[0], expected[0]);
        assert_event_data_eq!(got.0[1], expected[1]);
        assert_event_data_eq!(got.0[2], expected[2]);
    }
    fn decode_all(message: Vec<u8>) -> Result<(SmallVec<[Event; 1]>, usize), DecodeError> {
        let mut buf = BytesMut::from(&message[..]);
        let mut decoder = FluentDecoder::new(LogNamespace::default());
        let (frame, byte_size) = decoder.decode(&mut buf)?.unwrap();
        Ok((frame.into(), byte_size))
    }
    #[tokio::test]
    async fn ack_delivered_without_chunk() {
        let (result, output) = check_acknowledgements(EventStatus::Delivered, false).await;
        assert!(result.is_err()); assert!(output.is_empty());
    }
    #[tokio::test]
    async fn ack_delivered_with_chunk() {
        let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await;
        assert_eq!(result.unwrap().unwrap(), output.len());
        let expected: Vec<u8> = vec![0x81, 0xa3, 0x61, 0x63]; assert_eq!(output[..expected.len()], expected);
    }
    #[tokio::test]
    async fn ack_failed_without_chunk() {
        let (result, output) = check_acknowledgements(EventStatus::Rejected, false).await;
        assert_eq!(result.unwrap().unwrap(), output.len());
        assert!(output.is_empty());
    }
    #[tokio::test]
    async fn ack_failed_with_chunk() {
        let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await;
        assert_eq!(result.unwrap().unwrap(), output.len());
        let expected: Vec<u8> = vec![0x80]; assert_eq!(output, expected);
    }
    async fn check_acknowledgements(
        status: EventStatus,
        with_chunk: bool,
    ) -> (Result<Result<usize, std::io::Error>, Elapsed>, Bytes) {
        trace_init();
        let (sender, recv) = SourceSender::new_test_finalize(status);
        let address = next_addr();
        let source = FluentConfig {
            address: address.into(),
            tls: None,
            keepalive: None,
            permit_origin: None,
            receive_buffer_bytes: None,
            acknowledgements: true.into(),
            connection_limit: None,
            log_namespace: None,
        }
        .build(SourceContext::new_test(sender, None))
        .await
        .unwrap();
        tokio::spawn(source);
        wait_for_tcp(address).await;
        let msg = uuid::Uuid::new_v4().to_string();
        let tag = uuid::Uuid::new_v4().to_string();
        let req = build_req(&tag, &[("field", &msg)], with_chunk);
        let sender = tokio::spawn(async move {
            let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
            socket.write_all(&req).await.unwrap();
            let mut output = BytesMut::new();
            (
                timeout(Duration::from_millis(250), socket.read_buf(&mut output)).await,
                output,
            )
        });
        let events = test_util::collect_n(recv, 1).await;
        let (result, output) = sender.await.unwrap();
        assert_eq!(events.len(), 1);
        let log = events[0].as_log();
        assert_eq!(log.get("field").unwrap(), &msg.into());
        assert!(matches!(log.get("host").unwrap(), Value::Bytes(_)));
        assert!(matches!(log.get("timestamp").unwrap(), Value::Timestamp(_)));
        assert_eq!(log.get("tag").unwrap(), &tag.into());
        (result, output.into())
    }
    fn build_req(tag: &str, fields: &[(&str, &str)], with_chunk: bool) -> Vec<u8> {
        let mut record = FluentRecord::default();
        for (tag, value) in fields {
            record.insert((*tag).into(), rmpv::Value::String((*value).into()).into());
        }
        let chunk = with_chunk.then(|| BASE64_STANDARD.encode(uuid::Uuid::new_v4().as_bytes()));
        let req = FluentMessage::MessageWithOptions(
            tag.into(),
            FluentTimestamp::Unix(Utc::now()),
            record,
            FluentMessageOptions {
                chunk,
                ..Default::default()
            },
        );
        let mut buf = Vec::new();
        req.serialize(&mut Serializer::new(&mut buf)).unwrap();
        buf
    }
    #[test]
    fn output_schema_definition_vector_namespace() {
        let config = FluentConfig {
            address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
            tls: None,
            keepalive: None,
            permit_origin: None,
            receive_buffer_bytes: None,
            acknowledgements: false.into(),
            connection_limit: None,
            log_namespace: Some(true),
        };
        let definitions = config
            .outputs(LogNamespace::Vector)
            .remove(0)
            .schema_definition(true);
        let expected_definition =
            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
                .with_meaning(OwnedTargetPath::event_root(), "message")
                .with_metadata_field(
                    &owned_value_path!("vector", "source_type"),
                    Kind::bytes(),
                    None,
                )
                .with_metadata_field(&owned_value_path!("fluent", "tag"), Kind::bytes(), None)
                .with_metadata_field(
                    &owned_value_path!("fluent", "timestamp"),
                    Kind::timestamp(),
                    Some("timestamp"),
                )
                .with_metadata_field(
                    &owned_value_path!("fluent", "record"),
                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!("vector", "ingest_timestamp"),
                    Kind::timestamp(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!("fluent", "host"),
                    Kind::bytes(),
                    Some("host"),
                )
                .with_metadata_field(
                    &owned_value_path!("fluent", "tls_client_metadata"),
                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
                    None,
                );
        assert_eq!(definitions, Some(expected_definition))
    }
    #[test]
    fn output_schema_definition_legacy_namespace() {
        let config = FluentConfig {
            address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
            tls: None,
            keepalive: None,
            permit_origin: None,
            receive_buffer_bytes: None,
            acknowledgements: false.into(),
            connection_limit: None,
            log_namespace: None,
        };
        let definitions = config
            .outputs(LogNamespace::Legacy)
            .remove(0)
            .schema_definition(true);
        let expected_definition = Definition::new_with_default_metadata(
            Kind::object(Collection::empty()),
            [LogNamespace::Legacy],
        )
        .with_event_field(
            &owned_value_path!("message"),
            Kind::bytes(),
            Some("message"),
        )
        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
        .with_event_field(&owned_value_path!("tag"), Kind::bytes(), None)
        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
        .unknown_fields(Kind::bytes());
        assert_eq!(definitions, Some(expected_definition))
    }
}
#[cfg(all(test, feature = "fluent-integration-tests"))]
mod integration_tests {
    use std::{fs::File, io::Write, net::SocketAddr, time::Duration};
    use futures::Stream;
    use tokio::time::sleep;
    use vector_lib::event::{Event, EventStatus};
    use crate::{
        config::{SourceConfig, SourceContext},
        docker::Container,
        sources::fluent::FluentConfig,
        test_util::{
            collect_ready,
            components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
            next_addr, next_addr_for_ip, random_string, wait_for_tcp,
        },
        SourceSender,
    };
    const FLUENT_BIT_IMAGE: &str = "fluent/fluent-bit";
    const FLUENT_BIT_TAG: &str = "1.7";
    const FLUENTD_IMAGE: &str = "fluent/fluentd";
    const FLUENTD_TAG: &str = "v1.12";
    fn make_file(name: &str, content: &str) -> tempfile::TempDir {
        let dir = tempfile::tempdir().unwrap();
        let mut file = File::create(dir.path().join(name)).unwrap();
        write!(&mut file, "{}", content).unwrap();
        dir
    }
    #[tokio::test]
    async fn fluentbit() {
        test_fluentbit(EventStatus::Delivered).await;
    }
    #[tokio::test]
    async fn fluentbit_rejection() {
        test_fluentbit(EventStatus::Rejected).await;
    }
    async fn test_fluentbit(status: EventStatus) {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
            let test_address = next_addr();
            let (out, source_address) = source(status).await;
            let dir = make_file(
                "fluent-bit.conf",
                &format!(
                    r#"
[SERVICE]
    Grace      0
    Flush      1
    Daemon     off
[INPUT]
    Name       http
    Host       {listen_host}
    Port       {listen_port}
[OUTPUT]
    Name          forward
    Match         *
    Host          host.docker.internal
    Port          {send_port}
    Require_ack_response true
    "#,
                    listen_host = test_address.ip(),
                    listen_port = test_address.port(),
                    send_port = source_address.port(),
                ),
            );
            let msg = random_string(64);
            let body = serde_json::json!({ "message": msg });
            let events = Container::new(FLUENT_BIT_IMAGE, FLUENT_BIT_TAG)
                .bind(dir.path().display(), "/fluent-bit/etc")
                .run(async move {
                    wait_for_tcp(test_address).await;
                    reqwest::Client::new()
                        .post(format!("http://{}/", test_address))
                        .header("content-type", "application/json")
                        .body(body.to_string())
                        .send()
                        .await
                        .unwrap();
                    sleep(Duration::from_secs(2)).await;
                    collect_ready(out).await
                })
                .await;
            assert_eq!(events.len(), 1);
            let log = events[0].as_log();
            assert_eq!(log["tag"], "http.0".into());
            assert_eq!(log["message"], msg.into());
            assert!(log.get("timestamp").is_some());
            assert!(log.get("host").is_some());
        })
        .await;
    }
    #[tokio::test]
    async fn fluentd() {
        test_fluentd(EventStatus::Delivered, "").await;
    }
    #[tokio::test]
    async fn fluentd_gzip() {
        test_fluentd(EventStatus::Delivered, "compress gzip").await;
    }
    #[tokio::test]
    async fn fluentd_rejection() {
        test_fluentd(EventStatus::Rejected, "").await;
    }
    async fn test_fluentd(status: EventStatus, options: &str) {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
            let test_address = next_addr();
            let (out, source_address) = source(status).await;
            let config = format!(
                r#"
<source>
  @type http
  bind {http_host}
  port {http_port}
</source>
<match *>
  @type forward
  <server>
    name  local
    host  host.docker.internal
    port  {port}
  </server>
  <buffer>
    flush_mode immediate
  </buffer>
  require_ack_response true
  ack_response_timeout 1
  {options}
</match>
"#,
                http_host = test_address.ip(),
                http_port = test_address.port(),
                port = source_address.port(),
                options = options
            );
            let dir = make_file("fluent.conf", &config);
            let msg = random_string(64);
            let body = serde_json::json!({ "message": msg });
            let events = Container::new(FLUENTD_IMAGE, FLUENTD_TAG)
                .bind(dir.path().display(), "/fluentd/etc")
                .run(async move {
                    wait_for_tcp(test_address).await;
                    reqwest::Client::new()
                        .post(format!("http://{}/", test_address))
                        .header("content-type", "application/json")
                        .body(body.to_string())
                        .send()
                        .await
                        .unwrap();
                    sleep(Duration::from_secs(2)).await;
                    collect_ready(out).await
                })
                .await;
            assert_eq!(events.len(), 1);
            assert_eq!(events[0].as_log()["tag"], "".into());
            assert_eq!(events[0].as_log()["message"], msg.into());
            assert!(events[0].as_log().get("timestamp").is_some());
            assert!(events[0].as_log().get("host").is_some());
        })
        .await;
    }
    async fn source(status: EventStatus) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
        let (sender, recv) = SourceSender::new_test_finalize(status);
        let address = next_addr_for_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
        tokio::spawn(async move {
            FluentConfig {
                address: address.into(),
                tls: None,
                keepalive: None,
                permit_origin: None,
                receive_buffer_bytes: None,
                acknowledgements: false.into(),
                connection_limit: None,
                log_namespace: None,
            }
            .build(SourceContext::new_test(sender, None))
            .await
            .unwrap()
            .await
            .unwrap()
        });
        wait_for_tcp(address).await;
        (recv, address)
    }
}