use std::net::SocketAddr;
use std::time::Duration;
use std::{
    collections::{BTreeMap, VecDeque},
    convert::TryFrom,
    io::{self, Read},
};
use vector_lib::ipallowlist::IpAllowlistConfig;
use bytes::{Buf, Bytes, BytesMut};
use flate2::read::ZlibDecoder;
use smallvec::{smallvec, SmallVec};
use snafu::{ResultExt, Snafu};
use tokio_util::codec::Decoder;
use vector_lib::codecs::{BytesDeserializerConfig, StreamDecodingError};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::{event_path, metadata_path, owned_value_path, path, OwnedValuePath};
use vector_lib::{
    config::{LegacyKey, LogNamespace},
    schema::Definition,
};
use vrl::value::kind::Collection;
use vrl::value::{KeyString, Kind};
use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
use crate::{
    config::{
        log_schema, DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
        SourceContext, SourceOutput,
    },
    event::{Event, LogEvent, Value},
    serde::bool_or_struct,
    tcp::TcpKeepaliveConfig,
    tls::{MaybeTlsSettings, TlsSourceConfig},
    types,
};
#[configurable_component(source("logstash", "Collect logs from a Logstash agent."))]
#[derive(Clone, Debug)]
pub struct LogstashConfig {
    #[configurable(derived)]
    address: SocketListenAddr,
    #[configurable(derived)]
    #[configurable(metadata(docs::advanced))]
    keepalive: Option<TcpKeepaliveConfig>,
    #[configurable(derived)]
    pub permit_origin: Option<IpAllowlistConfig>,
    #[configurable(derived)]
    tls: Option<TlsSourceConfig>,
    #[configurable(metadata(docs::type_unit = "bytes"))]
    #[configurable(metadata(docs::examples = 65536))]
    #[configurable(metadata(docs::advanced))]
    receive_buffer_bytes: Option<usize>,
    #[configurable(metadata(docs::type_unit = "connections"))]
    #[configurable(metadata(docs::advanced))]
    connection_limit: Option<u32>,
    #[configurable(derived)]
    #[serde(default, deserialize_with = "bool_or_struct")]
    acknowledgements: SourceAcknowledgementsConfig,
    #[configurable(metadata(docs::hidden))]
    #[serde(default)]
    log_namespace: Option<bool>,
}
impl LogstashConfig {
    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
        let host_key = log_schema()
            .host_key()
            .cloned()
            .map(LegacyKey::InsertIfEmpty);
        let tls_client_metadata_path = self
            .tls
            .as_ref()
            .and_then(|tls| tls.client_metadata_key.as_ref())
            .and_then(|k| k.path.clone())
            .map(LegacyKey::Overwrite);
        BytesDeserializerConfig
            .schema_definition(log_namespace)
            .with_standard_vector_source_metadata()
            .with_source_metadata(
                LogstashConfig::NAME,
                None,
                &owned_value_path!("timestamp"),
                Kind::timestamp().or_undefined(),
                Some("timestamp"),
            )
            .with_source_metadata(
                LogstashConfig::NAME,
                host_key,
                &owned_value_path!("host"),
                Kind::bytes(),
                Some("host"),
            )
            .with_source_metadata(
                Self::NAME,
                tls_client_metadata_path,
                &owned_value_path!("tls_client_metadata"),
                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
                None,
            )
    }
}
impl Default for LogstashConfig {
    fn default() -> Self {
        Self {
            address: SocketListenAddr::SocketAddr("0.0.0.0:5044".parse().unwrap()),
            keepalive: None,
            permit_origin: None,
            tls: None,
            receive_buffer_bytes: None,
            acknowledgements: Default::default(),
            connection_limit: None,
            log_namespace: None,
        }
    }
}
impl GenerateConfig for LogstashConfig {
    fn generate_config() -> toml::Value {
        toml::Value::try_from(LogstashConfig::default()).unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "logstash")]
impl SourceConfig for LogstashConfig {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        let log_namespace = cx.log_namespace(self.log_namespace);
        let source = LogstashSource {
            timestamp_converter: types::Conversion::Timestamp(cx.globals.timezone()),
            legacy_host_key_path: log_schema().host_key().cloned(),
            log_namespace,
        };
        let shutdown_secs = Duration::from_secs(30);
        let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
        let tls_client_metadata_key = self
            .tls
            .as_ref()
            .and_then(|tls| tls.client_metadata_key.clone())
            .and_then(|k| k.path);
        let tls = MaybeTlsSettings::from_config(&tls_config, true)?;
        source.run(
            self.address,
            self.keepalive,
            shutdown_secs,
            tls,
            tls_client_metadata_key,
            self.receive_buffer_bytes,
            None,
            cx,
            self.acknowledgements,
            self.connection_limit,
            self.permit_origin.clone().map(Into::into),
            LogstashConfig::NAME,
            log_namespace,
        )
    }
    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        vec![SourceOutput::new_maybe_logs(
            DataType::Log,
            self.schema_definition(global_log_namespace.merge(self.log_namespace)),
        )]
    }
    fn resources(&self) -> Vec<Resource> {
        vec![self.address.as_tcp_resource()]
    }
    fn can_acknowledge(&self) -> bool {
        true
    }
}
#[derive(Debug, Clone)]
struct LogstashSource {
    timestamp_converter: types::Conversion,
    log_namespace: LogNamespace,
    legacy_host_key_path: Option<OwnedValuePath>,
}
impl TcpSource for LogstashSource {
    type Error = DecodeError;
    type Item = LogstashEventFrame;
    type Decoder = LogstashDecoder;
    type Acker = LogstashAcker;
    fn decoder(&self) -> Self::Decoder {
        LogstashDecoder::new()
    }
    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
        let now = chrono::Utc::now();
        for event in events {
            let log = event.as_mut_log();
            self.log_namespace.insert_vector_metadata(
                log,
                log_schema().source_type_key(),
                path!("source_type"),
                Bytes::from_static(LogstashConfig::NAME.as_bytes()),
            );
            let log_timestamp = log.get(event_path!("@timestamp")).and_then(|timestamp| {
                self.timestamp_converter
                    .convert::<Value>(timestamp.coerce_to_bytes())
                    .ok()
            });
            match self.log_namespace {
                LogNamespace::Vector => {
                    if let Some(timestamp) = log_timestamp {
                        log.insert(metadata_path!(LogstashConfig::NAME, "timestamp"), timestamp);
                    }
                    log.insert(metadata_path!("vector", "ingest_timestamp"), now);
                }
                LogNamespace::Legacy => {
                    if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
                        log.insert(
                            timestamp_key,
                            log_timestamp.unwrap_or_else(|| Value::from(now)),
                        );
                    }
                }
            }
            self.log_namespace.insert_source_metadata(
                LogstashConfig::NAME,
                log,
                self.legacy_host_key_path
                    .as_ref()
                    .map(LegacyKey::InsertIfEmpty),
                path!("host"),
                host.ip().to_string(),
            );
        }
    }
    fn build_acker(&self, frames: &[Self::Item]) -> Self::Acker {
        LogstashAcker::new(frames)
    }
}
struct LogstashAcker {
    sequence_number: u32,
    protocol_version: Option<LogstashProtocolVersion>,
}
impl LogstashAcker {
    fn new(frames: &[LogstashEventFrame]) -> Self {
        let mut sequence_number = 0;
        let mut protocol_version = None;
        for frame in frames {
            sequence_number = std::cmp::max(sequence_number, frame.sequence_number);
            protocol_version = Some(frame.protocol);
        }
        Self {
            sequence_number,
            protocol_version,
        }
    }
}
impl TcpSourceAcker for LogstashAcker {
    fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
        match (ack, self.protocol_version) {
            (TcpSourceAck::Ack, Some(protocol_version)) => {
                let mut bytes: Vec<u8> = Vec::with_capacity(6);
                bytes.push(protocol_version.into());
                bytes.push(LogstashFrameType::Ack.into());
                bytes.extend(self.sequence_number.to_be_bytes().iter());
                Some(Bytes::from(bytes))
            }
            _ => None,
        }
    }
}
#[derive(Debug)]
enum LogstashDecoderReadState {
    ReadProtocol,
    ReadType(LogstashProtocolVersion),
    ReadFrame(LogstashProtocolVersion, LogstashFrameType),
    PendingFrames(VecDeque<(LogstashEventFrame, usize)>),
}
#[derive(Debug)]
struct LogstashDecoder {
    state: LogstashDecoderReadState,
}
impl LogstashDecoder {
    const fn new() -> Self {
        Self {
            state: LogstashDecoderReadState::ReadProtocol,
        }
    }
}
#[derive(Debug, Snafu)]
pub enum DecodeError {
    #[snafu(display("i/o error: {}", source))]
    IO { source: io::Error },
    #[snafu(display("Unknown logstash protocol version: {}", version))]
    UnknownProtocolVersion { version: char },
    #[snafu(display("Unknown logstash protocol message type: {}", frame_type))]
    UnknownFrameType { frame_type: char },
    #[snafu(display("Failed to decode JSON frame: {}", source))]
    JsonFrameFailedDecode { source: serde_json::Error },
    #[snafu(display("Failed to decompress compressed frame: {}", source))]
    DecompressionFailed { source: io::Error },
}
impl StreamDecodingError for DecodeError {
    fn can_continue(&self) -> bool {
        use DecodeError::*;
        match self {
            IO { .. } => false,
            UnknownProtocolVersion { .. } => false,
            UnknownFrameType { .. } => false,
            JsonFrameFailedDecode { .. } => true,
            DecompressionFailed { .. } => true,
        }
    }
}
impl From<io::Error> for DecodeError {
    fn from(source: io::Error) -> Self {
        DecodeError::IO { source }
    }
}
#[derive(Debug, Clone, Copy)]
enum LogstashProtocolVersion {
    V1, V2, }
impl From<LogstashProtocolVersion> for u8 {
    fn from(frame_type: LogstashProtocolVersion) -> u8 {
        use LogstashProtocolVersion::*;
        match frame_type {
            V1 => b'1',
            V2 => b'2',
        }
    }
}
impl TryFrom<u8> for LogstashProtocolVersion {
    type Error = DecodeError;
    fn try_from(frame_type: u8) -> Result<LogstashProtocolVersion, DecodeError> {
        use LogstashProtocolVersion::*;
        match frame_type {
            b'1' => Ok(V1),
            b'2' => Ok(V2),
            version => Err(DecodeError::UnknownProtocolVersion {
                version: version as char,
            }),
        }
    }
}
#[derive(Debug, Clone, Copy)]
enum LogstashFrameType {
    Ack,        WindowSize, Data,       Json,       Compressed, }
impl From<LogstashFrameType> for u8 {
    fn from(frame_type: LogstashFrameType) -> u8 {
        use LogstashFrameType::*;
        match frame_type {
            Ack => b'A',
            WindowSize => b'W',
            Data => b'D',
            Json => b'J',
            Compressed => b'C',
        }
    }
}
impl TryFrom<u8> for LogstashFrameType {
    type Error = DecodeError;
    fn try_from(frame_type: u8) -> Result<LogstashFrameType, DecodeError> {
        use LogstashFrameType::*;
        match frame_type {
            b'A' => Ok(Ack),
            b'W' => Ok(WindowSize),
            b'D' => Ok(Data),
            b'J' => Ok(Json),
            b'C' => Ok(Compressed),
            frame_type => Err(DecodeError::UnknownFrameType {
                frame_type: frame_type as char,
            }),
        }
    }
}
#[derive(Debug)]
struct LogstashEventFrame {
    protocol: LogstashProtocolVersion,
    sequence_number: u32,
    fields: BTreeMap<KeyString, serde_json::Value>,
}
impl Decoder for LogstashDecoder {
    type Item = (LogstashEventFrame, usize);
    type Error = DecodeError;
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        loop {
            self.state = match self.state {
                LogstashDecoderReadState::PendingFrames(ref mut frames) => {
                    match frames.pop_front() {
                        Some(frame) => return Ok(Some(frame)),
                        None => LogstashDecoderReadState::ReadProtocol,
                    }
                }
                LogstashDecoderReadState::ReadProtocol => {
                    if src.remaining() < 1 {
                        return Ok(None);
                    }
                    use LogstashProtocolVersion::*;
                    match LogstashProtocolVersion::try_from(src.get_u8())? {
                        V1 => LogstashDecoderReadState::ReadType(V1),
                        V2 => LogstashDecoderReadState::ReadType(V2),
                    }
                }
                LogstashDecoderReadState::ReadType(protocol) => {
                    if src.remaining() < 1 {
                        return Ok(None);
                    }
                    use LogstashFrameType::*;
                    match LogstashFrameType::try_from(src.get_u8())? {
                        WindowSize => LogstashDecoderReadState::ReadFrame(protocol, WindowSize),
                        Data => LogstashDecoderReadState::ReadFrame(protocol, Data),
                        Json => LogstashDecoderReadState::ReadFrame(protocol, Json),
                        Compressed => LogstashDecoderReadState::ReadFrame(protocol, Compressed),
                        Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack),
                    }
                }
                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => {
                    if src.remaining() < 4 {
                        return Ok(None);
                    }
                    let _window_size = src.get_u32();
                    LogstashDecoderReadState::ReadProtocol
                }
                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Ack) => {
                    if src.remaining() < 4 {
                        return Ok(None);
                    }
                    let _sequence_number = src.get_u32();
                    LogstashDecoderReadState::ReadProtocol
                }
                LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Data) => {
                    let Some(frame) = decode_data_frame(protocol, src) else {
                        return Ok(None);
                    };
                    LogstashDecoderReadState::PendingFrames([frame].into())
                }
                LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Json) => {
                    let Some(frame) = decode_json_frame(protocol, src)? else {
                        return Ok(None);
                    };
                    LogstashDecoderReadState::PendingFrames([frame].into())
                }
                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => {
                    let Some(frames) = decode_compressed_frame(src)? else {
                        return Ok(None);
                    };
                    LogstashDecoderReadState::PendingFrames(frames)
                }
            };
        }
    }
}
fn decode_data_frame(
    protocol: LogstashProtocolVersion,
    src: &mut BytesMut,
) -> Option<(LogstashEventFrame, usize)> {
    let mut rest = src.as_ref();
    if rest.remaining() < 8 {
        return None;
    }
    let sequence_number = rest.get_u32();
    let pair_count = rest.get_u32();
    if pair_count == 0 {
        return None; }
    let mut fields = BTreeMap::<KeyString, serde_json::Value>::new();
    for _ in 0..pair_count {
        let (key, value, right) = decode_pair(rest)?;
        rest = right;
        fields.insert(
            String::from_utf8_lossy(key).into(),
            String::from_utf8_lossy(value).into(),
        );
    }
    let byte_size = bytes_remaining(src, rest);
    src.advance(byte_size);
    Some((
        LogstashEventFrame {
            protocol,
            sequence_number,
            fields,
        },
        byte_size,
    ))
}
fn decode_pair(mut rest: &[u8]) -> Option<(&[u8], &[u8], &[u8])> {
    if rest.remaining() < 4 {
        return None;
    }
    let key_length = rest.get_u32() as usize;
    if rest.remaining() < key_length {
        return None;
    }
    let (key, right) = rest.split_at(key_length);
    rest = right;
    if rest.remaining() < 4 {
        return None;
    }
    let value_length = rest.get_u32() as usize;
    if rest.remaining() < value_length {
        return None;
    }
    let (value, right) = rest.split_at(value_length);
    Some((key, value, right))
}
fn decode_json_frame(
    protocol: LogstashProtocolVersion,
    src: &mut BytesMut,
) -> Result<Option<(LogstashEventFrame, usize)>, DecodeError> {
    let mut rest = src.as_ref();
    if rest.remaining() < 8 {
        return Ok(None);
    }
    let sequence_number = rest.get_u32();
    let payload_size = rest.get_u32() as usize;
    if rest.remaining() < payload_size {
        return Ok(None);
    }
    let (slice, right) = rest.split_at(payload_size);
    rest = right;
    let fields: BTreeMap<KeyString, serde_json::Value> =
        serde_json::from_slice(slice).context(JsonFrameFailedDecodeSnafu {})?;
    let byte_size = bytes_remaining(src, rest);
    src.advance(byte_size);
    Ok(Some((
        LogstashEventFrame {
            protocol,
            sequence_number,
            fields,
        },
        byte_size,
    )))
}
fn decode_compressed_frame(
    src: &mut BytesMut,
) -> Result<Option<VecDeque<(LogstashEventFrame, usize)>>, DecodeError> {
    let mut rest = src.as_ref();
    if rest.remaining() < 4 {
        return Ok(None);
    }
    let payload_size = rest.get_u32() as usize;
    if rest.remaining() < payload_size {
        src.reserve(payload_size);
        return Ok(None);
    }
    let (slice, right) = rest.split_at(payload_size);
    rest = right;
    let mut buf = Vec::new();
    let res = ZlibDecoder::new(io::Cursor::new(slice))
        .read_to_end(&mut buf)
        .context(DecompressionFailedSnafu)
        .map(|_| BytesMut::from(&buf[..]));
    let byte_size = bytes_remaining(src, rest);
    src.advance(byte_size);
    let mut buf = res?;
    let mut decoder = LogstashDecoder::new();
    let mut frames = VecDeque::new();
    while let Some(s) = decoder.decode(&mut buf)? {
        frames.push_back(s);
    }
    Ok(Some(frames))
}
fn bytes_remaining(src: &BytesMut, rest: &[u8]) -> usize {
    let remaining = rest.remaining();
    src.remaining() - remaining
}
impl From<LogstashEventFrame> for Event {
    fn from(frame: LogstashEventFrame) -> Self {
        Event::Log(LogEvent::from(
            frame
                .fields
                .into_iter()
                .map(|(key, value)| (key, Value::from(value)))
                .collect::<BTreeMap<_, _>>(),
        ))
    }
}
impl From<LogstashEventFrame> for SmallVec<[Event; 1]> {
    fn from(frame: LogstashEventFrame) -> Self {
        smallvec![frame.into()]
    }
}
#[cfg(test)]
mod test {
    use bytes::BufMut;
    use futures::Stream;
    use rand::{thread_rng, Rng};
    use tokio::io::{AsyncReadExt, AsyncWriteExt};
    use vector_lib::lookup::OwnedTargetPath;
    use vrl::value::kind::Collection;
    use super::*;
    use crate::{
        event::EventStatus,
        test_util::{
            components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
            next_addr, spawn_collect_n, wait_for_tcp,
        },
        SourceSender,
    };
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<LogstashConfig>();
    }
    #[tokio::test]
    async fn test_delivered() {
        test_protocol(EventStatus::Delivered, true).await;
    }
    #[tokio::test]
    async fn test_failed() {
        test_protocol(EventStatus::Rejected, false).await;
    }
    async fn start_logstash(
        status: EventStatus,
    ) -> (SocketAddr, impl Stream<Item = Event> + Unpin) {
        let (sender, recv) = SourceSender::new_test_finalize(status);
        let address = next_addr();
        let source = LogstashConfig {
            address: address.into(),
            tls: None,
            permit_origin: None,
            keepalive: None,
            receive_buffer_bytes: None,
            acknowledgements: true.into(),
            connection_limit: None,
            log_namespace: None,
        }
        .build(SourceContext::new_test(sender, None))
        .await
        .unwrap();
        tokio::spawn(source);
        wait_for_tcp(address).await;
        (address, recv)
    }
    async fn test_protocol(status: EventStatus, sends_ack: bool) {
        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (address, recv) = start_logstash(status).await;
            spawn_collect_n(
                send_req(address, &[("message", "Hello, world!")], sends_ack),
                recv,
                1,
            )
            .await
        })
        .await;
        assert_eq!(events.len(), 1);
        let log = events[0].as_log();
        assert_eq!(
            log.get("message").unwrap().to_string_lossy(),
            "Hello, world!".to_string()
        );
        assert_eq!(
            log.get("source_type").unwrap().to_string_lossy(),
            "logstash".to_string()
        );
        assert!(log.get("host").is_some());
        assert!(log.get("timestamp").is_some());
    }
    fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes {
        let mut req = BytesMut::new();
        req.put_u8(b'2');
        req.put_u8(b'D');
        req.put_u32(seq);
        req.put_u32(pairs.len() as u32);
        for (key, value) in pairs {
            req.put_u32(key.len() as u32);
            req.put(key.as_bytes());
            req.put_u32(value.len() as u32);
            req.put(value.as_bytes());
        }
        req.into()
    }
    #[test]
    fn v1_decoder_does_not_panic() {
        let seq = thread_rng().gen_range(1..u32::MAX);
        let req = encode_req(seq, &[("message", "Hello, World!")]);
        for i in 0..req.len() - 1 {
            assert!(
                decode_data_frame(LogstashProtocolVersion::V1, &mut BytesMut::from(&req[..i]))
                    .is_none()
            );
        }
    }
    async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) {
        let seq = thread_rng().gen_range(1..u32::MAX);
        let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
        let req = encode_req(seq, pairs);
        socket.write_all(&req).await.unwrap();
        let mut output = BytesMut::new();
        socket.read_buf(&mut output).await.unwrap();
        if sends_ack {
            assert_eq!(output.get_u8(), b'2');
            assert_eq!(output.get_u8(), b'A');
            assert_eq!(output.get_u32(), seq);
        }
        assert_eq!(output.len(), 0);
    }
    #[test]
    fn output_schema_definition_vector_namespace() {
        let config = LogstashConfig {
            log_namespace: Some(true),
            ..Default::default()
        };
        let definitions = config
            .outputs(LogNamespace::Vector)
            .remove(0)
            .schema_definition(true);
        let expected_definition =
            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
                .with_meaning(OwnedTargetPath::event_root(), "message")
                .with_metadata_field(
                    &owned_value_path!("vector", "source_type"),
                    Kind::bytes(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!("vector", "ingest_timestamp"),
                    Kind::timestamp(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!(LogstashConfig::NAME, "timestamp"),
                    Kind::timestamp().or_undefined(),
                    Some("timestamp"),
                )
                .with_metadata_field(
                    &owned_value_path!(LogstashConfig::NAME, "host"),
                    Kind::bytes(),
                    Some("host"),
                )
                .with_metadata_field(
                    &owned_value_path!(LogstashConfig::NAME, "tls_client_metadata"),
                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
                    None,
                );
        assert_eq!(definitions, Some(expected_definition))
    }
    #[test]
    fn output_schema_definition_legacy_namespace() {
        let config = LogstashConfig::default();
        let definitions = config
            .outputs(LogNamespace::Legacy)
            .remove(0)
            .schema_definition(true);
        let expected_definition = Definition::new_with_default_metadata(
            Kind::object(Collection::empty()),
            [LogNamespace::Legacy],
        )
        .with_event_field(
            &owned_value_path!("message"),
            Kind::bytes(),
            Some("message"),
        )
        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"));
        assert_eq!(definitions, Some(expected_definition))
    }
}
#[cfg(all(test, feature = "logstash-integration-tests"))]
mod integration_tests {
    use std::time::Duration;
    use futures::Stream;
    use tokio::time::timeout;
    use super::*;
    use crate::{
        config::SourceContext,
        event::EventStatus,
        test_util::{
            collect_n,
            components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
            wait_for_tcp,
        },
        tls::{TlsConfig, TlsEnableableConfig},
        SourceSender,
    };
    fn heartbeat_address() -> String {
        std::env::var("HEARTBEAT_ADDRESS")
            .expect("Address of Beats Heartbeat service must be specified.")
    }
    #[tokio::test]
    async fn beats_heartbeat() {
        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let out = source(heartbeat_address(), None).await;
            timeout(Duration::from_secs(60), collect_n(out, 1))
                .await
                .unwrap()
        })
        .await;
        assert!(!events.is_empty());
        let log = events[0].as_log();
        assert_eq!(
            log.get("@metadata.beat"),
            Some(String::from("heartbeat").into()).as_ref()
        );
        assert_eq!(log.get("summary.up"), Some(1.into()).as_ref());
        assert!(log.get("timestamp").is_some());
        assert!(log.get("host").is_some());
    }
    fn logstash_address() -> String {
        std::env::var("LOGSTASH_ADDRESS")
            .expect("Listen address for `logstash` source must be specified.")
    }
    #[tokio::test]
    async fn logstash() {
        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let out = source(
                logstash_address(),
                Some(TlsEnableableConfig {
                    enabled: Some(true),
                    options: TlsConfig {
                        crt_file: Some("tests/data/host.docker.internal.crt".into()),
                        key_file: Some("tests/data/host.docker.internal.key".into()),
                        ..Default::default()
                    },
                }),
            )
            .await;
            timeout(Duration::from_secs(60), collect_n(out, 1))
                .await
                .unwrap()
        })
        .await;
        assert!(!events.is_empty());
        let log = events[0].as_log();
        assert!(log
            .get("line")
            .unwrap()
            .to_string_lossy()
            .contains("Hello World"));
        assert!(log.get("host").is_some());
    }
    async fn source(
        address: String,
        tls: Option<TlsEnableableConfig>,
    ) -> impl Stream<Item = Event> + Unpin {
        let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
        let address: SocketAddr = address.parse().unwrap();
        let tls_config = TlsSourceConfig {
            client_metadata_key: None,
            tls_config: tls.unwrap_or_default(),
        };
        tokio::spawn(async move {
            LogstashConfig {
                address: address.into(),
                tls: Some(tls_config),
                keepalive: None,
                permit_origin: None,
                receive_buffer_bytes: None,
                acknowledgements: false.into(),
                connection_limit: None,
                log_namespace: None,
            }
            .build(SourceContext::new_test(sender, None))
            .await
            .unwrap()
            .await
            .unwrap()
        });
        wait_for_tcp(address).await;
        recv
    }
}