pub mod tcp;
pub mod udp;
#[cfg(unix)]
mod unix;
use vector_lib::codecs::decoding::DeserializerConfig;
use vector_lib::config::{log_schema, LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path};
use vrl::value::{kind::Collection, Kind};
use crate::{
    codecs::DecodingConfig,
    config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
    sources::util::net::TcpSource,
    tls::MaybeTlsSettings,
};
#[configurable_component(source("socket", "Collect logs over a socket."))]
#[derive(Clone, Debug)]
pub struct SocketConfig {
    #[serde(flatten)]
    pub mode: Mode,
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "mode", rename_all = "snake_case")]
#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
#[allow(clippy::large_enum_variant)] pub enum Mode {
    Tcp(tcp::TcpConfig),
    Udp(udp::UdpConfig),
    #[cfg(unix)]
    UnixDatagram(unix::UnixConfig),
    #[cfg(unix)]
    #[serde(alias = "unix")]
    UnixStream(unix::UnixConfig),
}
impl SocketConfig {
    pub fn new_tcp(tcp_config: tcp::TcpConfig) -> Self {
        tcp_config.into()
    }
    pub fn make_basic_tcp_config(addr: std::net::SocketAddr) -> Self {
        tcp::TcpConfig::from_address(addr.into()).into()
    }
    fn decoding(&self) -> DeserializerConfig {
        match &self.mode {
            Mode::Tcp(config) => config.decoding().clone(),
            Mode::Udp(config) => config.decoding().clone(),
            #[cfg(unix)]
            Mode::UnixDatagram(config) => config.decoding().clone(),
            #[cfg(unix)]
            Mode::UnixStream(config) => config.decoding().clone(),
        }
    }
    fn log_namespace(&self, global_log_namespace: LogNamespace) -> LogNamespace {
        match &self.mode {
            Mode::Tcp(config) => global_log_namespace.merge(config.log_namespace),
            Mode::Udp(config) => global_log_namespace.merge(config.log_namespace),
            #[cfg(unix)]
            Mode::UnixDatagram(config) => global_log_namespace.merge(config.log_namespace),
            #[cfg(unix)]
            Mode::UnixStream(config) => global_log_namespace.merge(config.log_namespace),
        }
    }
}
impl From<tcp::TcpConfig> for SocketConfig {
    fn from(config: tcp::TcpConfig) -> Self {
        SocketConfig {
            mode: Mode::Tcp(config),
        }
    }
}
impl From<udp::UdpConfig> for SocketConfig {
    fn from(config: udp::UdpConfig) -> Self {
        SocketConfig {
            mode: Mode::Udp(config),
        }
    }
}
impl GenerateConfig for SocketConfig {
    fn generate_config() -> toml::Value {
        toml::from_str(
            r#"mode = "tcp"
            address = "0.0.0.0:9000""#,
        )
        .unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "socket")]
impl SourceConfig for SocketConfig {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        match self.mode.clone() {
            Mode::Tcp(config) => {
                let log_namespace = cx.log_namespace(config.log_namespace);
                let decoding = config.decoding().clone();
                let decoder = DecodingConfig::new(
                    config
                        .framing
                        .clone()
                        .unwrap_or_else(|| decoding.default_stream_framing()),
                    decoding,
                    log_namespace,
                )
                .build()?;
                let tcp = tcp::RawTcpSource::new(config.clone(), decoder, log_namespace);
                let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
                let tls_client_metadata_key = config
                    .tls()
                    .as_ref()
                    .and_then(|tls| tls.client_metadata_key.clone())
                    .and_then(|k| k.path);
                let tls = MaybeTlsSettings::from_config(&tls_config, true)?;
                tcp.run(
                    config.address(),
                    config.keepalive(),
                    config.shutdown_timeout_secs(),
                    tls,
                    tls_client_metadata_key,
                    config.receive_buffer_bytes(),
                    config.max_connection_duration_secs(),
                    cx,
                    false.into(),
                    config.connection_limit,
                    config.permit_origin.map(Into::into),
                    SocketConfig::NAME,
                    log_namespace,
                )
            }
            Mode::Udp(config) => {
                let log_namespace = cx.log_namespace(config.log_namespace);
                let decoding = config.decoding().clone();
                let framing = config
                    .framing()
                    .clone()
                    .unwrap_or_else(|| decoding.default_message_based_framing());
                let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
                Ok(udp::udp(
                    config,
                    decoder,
                    cx.shutdown,
                    cx.out,
                    log_namespace,
                ))
            }
            #[cfg(unix)]
            Mode::UnixDatagram(config) => {
                let log_namespace = cx.log_namespace(config.log_namespace);
                let decoding = config.decoding.clone();
                let framing = config
                    .framing
                    .clone()
                    .unwrap_or_else(|| decoding.default_message_based_framing());
                let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
                unix::unix_datagram(config, decoder, cx.shutdown, cx.out, log_namespace)
            }
            #[cfg(unix)]
            Mode::UnixStream(config) => {
                let log_namespace = cx.log_namespace(config.log_namespace);
                let decoding = config.decoding().clone();
                let decoder = DecodingConfig::new(
                    config
                        .framing
                        .clone()
                        .unwrap_or_else(|| decoding.default_stream_framing()),
                    decoding,
                    log_namespace,
                )
                .build()?;
                unix::unix_stream(config, decoder, cx.shutdown, cx.out, log_namespace)
            }
        }
    }
    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        let log_namespace = self.log_namespace(global_log_namespace);
        let schema_definition = self
            .decoding()
            .schema_definition(log_namespace)
            .with_standard_vector_source_metadata();
        let schema_definition = match &self.mode {
            Mode::Tcp(config) => {
                let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
                let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
                let tls_client_metadata_path = config
                    .tls()
                    .as_ref()
                    .and_then(|tls| tls.client_metadata_key.as_ref())
                    .and_then(|k| k.path.clone())
                    .map(LegacyKey::Overwrite);
                schema_definition
                    .with_source_metadata(
                        Self::NAME,
                        legacy_host_key,
                        &owned_value_path!("host"),
                        Kind::bytes(),
                        Some("host"),
                    )
                    .with_source_metadata(
                        Self::NAME,
                        legacy_port_key,
                        &owned_value_path!("port"),
                        Kind::integer(),
                        None,
                    )
                    .with_source_metadata(
                        Self::NAME,
                        tls_client_metadata_path,
                        &owned_value_path!("tls_client_metadata"),
                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
                            .or_undefined(),
                        None,
                    )
            }
            Mode::Udp(config) => {
                let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
                let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
                schema_definition
                    .with_source_metadata(
                        Self::NAME,
                        legacy_host_key,
                        &owned_value_path!("host"),
                        Kind::bytes(),
                        None,
                    )
                    .with_source_metadata(
                        Self::NAME,
                        legacy_port_key,
                        &owned_value_path!("port"),
                        Kind::integer(),
                        None,
                    )
            }
            #[cfg(unix)]
            Mode::UnixDatagram(config) => {
                let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
                schema_definition.with_source_metadata(
                    Self::NAME,
                    legacy_host_key,
                    &owned_value_path!("host"),
                    Kind::bytes(),
                    None,
                )
            }
            #[cfg(unix)]
            Mode::UnixStream(config) => {
                let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
                schema_definition.with_source_metadata(
                    Self::NAME,
                    legacy_host_key,
                    &owned_value_path!("host"),
                    Kind::bytes(),
                    None,
                )
            }
        };
        vec![SourceOutput::new_maybe_logs(
            self.decoding().output_type(),
            schema_definition,
        )]
    }
    fn resources(&self) -> Vec<Resource> {
        match self.mode.clone() {
            Mode::Tcp(tcp) => vec![tcp.address().as_tcp_resource()],
            Mode::Udp(udp) => vec![udp.address().as_udp_resource()],
            #[cfg(unix)]
            Mode::UnixDatagram(_) => vec![],
            #[cfg(unix)]
            Mode::UnixStream(_) => vec![],
        }
    }
    fn can_acknowledge(&self) -> bool {
        false
    }
}
pub(crate) fn default_host_key() -> OptionalValuePath {
    log_schema().host_key().cloned().into()
}
#[cfg(test)]
mod test {
    use approx::assert_relative_eq;
    use std::{
        collections::HashMap,
        net::{SocketAddr, UdpSocket},
        sync::{
            atomic::{AtomicBool, Ordering},
            Arc,
        },
        thread,
    };
    use bytes::{BufMut, Bytes, BytesMut};
    use futures::{stream, StreamExt};
    use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
    use serde_json::json;
    use tokio::io::AsyncReadExt;
    use tokio::net::TcpStream;
    use tokio::{
        task::JoinHandle,
        time::{timeout, Duration, Instant},
    };
    #[cfg(unix)]
    use vector_lib::codecs::{
        decoding::CharacterDelimitedDecoderOptions, CharacterDelimitedDecoderConfig,
    };
    use vector_lib::codecs::{GelfDeserializerConfig, NewlineDelimitedDecoderConfig};
    use vector_lib::event::EventContainer;
    use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
    use vrl::value::ObjectMap;
    use vrl::{btreemap, value};
    #[cfg(unix)]
    use {
        super::{unix::UnixConfig, Mode},
        crate::sources::util::unix::UNNAMED_SOCKET_HOST,
        crate::test_util::wait_for,
        futures::{SinkExt, Stream},
        std::future::ready,
        std::os::unix::fs::PermissionsExt,
        std::path::PathBuf,
        tokio::{
            io::AsyncWriteExt,
            net::{UnixDatagram, UnixStream},
            task::yield_now,
        },
        tokio_util::codec::{FramedWrite, LinesCodec},
    };
    use super::{tcp::TcpConfig, udp::UdpConfig, SocketConfig};
    use crate::{
        config::{log_schema, ComponentKey, GlobalOptions, SourceConfig, SourceContext},
        event::{Event, LogEvent},
        shutdown::{ShutdownSignal, SourceShutdownCoordinator},
        sinks::util::tcp::TcpSinkConfig,
        sources::util::net::SocketListenAddr,
        test_util::{
            collect_n, collect_n_limited,
            components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
            next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp,
        },
        tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
        SourceSender,
    };
    fn get_gelf_payload(message: &str) -> String {
        serde_json::to_string(&json!({
            "version": "1.1",
            "host": "example.org",
            "short_message": message,
            "timestamp": 1234567890.123,
            "level": 6,
            "_foo": "bar",
        }))
        .unwrap()
    }
    fn create_gelf_chunk(
        message_id: u64,
        sequence_number: u8,
        total_chunks: u8,
        payload: &[u8],
    ) -> Bytes {
        const GELF_MAGIC: [u8; 2] = [0x1e, 0x0f];
        let mut chunk = BytesMut::new();
        chunk.put_slice(&GELF_MAGIC);
        chunk.put_u64(message_id);
        chunk.put_u8(sequence_number);
        chunk.put_u8(total_chunks);
        chunk.put(payload);
        chunk.freeze()
    }
    fn get_gelf_chunks(short_message: &str, max_size: usize, rng: &mut SmallRng) -> Vec<Bytes> {
        let message_id = rand::random();
        let payload = get_gelf_payload(short_message);
        let payload_chunks = payload.as_bytes().chunks(max_size).collect::<Vec<_>>();
        let total_chunks = payload_chunks.len();
        assert!(total_chunks <= 128, "too many gelf chunks");
        let mut chunks = payload_chunks
            .into_iter()
            .enumerate()
            .map(|(i, payload_chunk)| {
                create_gelf_chunk(message_id, i as u8, total_chunks as u8, payload_chunk)
            })
            .collect::<Vec<_>>();
        chunks.shuffle(rng);
        chunks
    }
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<SocketConfig>();
    }
    #[tokio::test]
    async fn tcp_it_includes_host() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, mut rx) = SourceSender::new_test();
            let addr = next_addr();
            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
                .build(SourceContext::new_test(tx, None))
                .await
                .unwrap();
            tokio::spawn(server);
            wait_for_tcp(addr).await;
            let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
                .await
                .unwrap();
            let event = rx.next().await.unwrap();
            assert_eq!(event.as_log()["host"], addr.ip().to_string().into());
            assert_eq!(event.as_log()["port"], addr.port().into());
        })
        .await;
    }
    #[tokio::test]
    async fn tcp_it_includes_vector_namespaced_fields() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, mut rx) = SourceSender::new_test();
            let addr = next_addr();
            let mut conf = TcpConfig::from_address(addr.into());
            conf.set_log_namespace(Some(true));
            let server = SocketConfig::from(conf)
                .build(SourceContext::new_test(tx, None))
                .await
                .unwrap();
            tokio::spawn(server);
            wait_for_tcp(addr).await;
            let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
                .await
                .unwrap();
            let event = rx.next().await.unwrap();
            let log = event.as_log();
            let event_meta = log.metadata().value();
            assert_eq!(log.value(), &"test".into());
            assert_eq!(
                event_meta.get(path!("vector", "source_type")).unwrap(),
                &value!(SocketConfig::NAME)
            );
            assert_eq!(
                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
                &value!(addr.ip().to_string())
            );
            assert_eq!(
                event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
                &value!(addr.port())
            );
        })
        .await;
    }
    #[tokio::test]
    async fn tcp_splits_on_newline() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let addr = next_addr();
            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
                .build(SourceContext::new_test(tx, None))
                .await
                .unwrap();
            tokio::spawn(server);
            wait_for_tcp(addr).await;
            send_lines(addr, vec!["foo\nbar".to_owned()].into_iter())
                .await
                .unwrap();
            let events = collect_n(rx, 2).await;
            assert_eq!(events.len(), 2);
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                "foo".into()
            );
            assert_eq!(
                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
                "bar".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn tcp_it_includes_source_type() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, mut rx) = SourceSender::new_test();
            let addr = next_addr();
            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
                .build(SourceContext::new_test(tx, None))
                .await
                .unwrap();
            tokio::spawn(server);
            wait_for_tcp(addr).await;
            send_lines(addr, vec!["test".to_owned()].into_iter())
                .await
                .unwrap();
            let event = rx.next().await.unwrap();
            assert_eq!(
                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
                "socket".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn tcp_continue_after_long_line() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, mut rx) = SourceSender::new_test();
            let addr = next_addr();
            let mut config = TcpConfig::from_address(addr.into());
            config.set_framing(Some(
                NewlineDelimitedDecoderConfig::new_with_max_length(10).into(),
            ));
            let server = SocketConfig::from(config)
                .build(SourceContext::new_test(tx, None))
                .await
                .unwrap();
            tokio::spawn(server);
            let lines = vec![
                "short".to_owned(),
                "this is too long".to_owned(),
                "more short".to_owned(),
            ];
            wait_for_tcp(addr).await;
            send_lines(addr, lines.into_iter()).await.unwrap();
            let event = rx.next().await.unwrap();
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                "short".into()
            );
            let event = rx.next().await.unwrap();
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                "more short".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn tcp_with_tls() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, mut rx) = SourceSender::new_test();
            let addr = next_addr();
            let mut config = TcpConfig::from_address(addr.into());
            config.set_tls(Some(TlsSourceConfig {
                tls_config: TlsEnableableConfig {
                    enabled: Some(true),
                    options: TlsConfig {
                        verify_certificate: Some(true),
                        crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
                        key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
                        ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
                        ..Default::default()
                    },
                },
                client_metadata_key: Some(OptionalValuePath::from(owned_value_path!("tls_peer"))),
            }));
            let server = SocketConfig::from(config)
                .build(SourceContext::new_test(tx, None))
                .await
                .unwrap();
            tokio::spawn(server);
            let lines = vec!["one line".to_owned(), "another line".to_owned()];
            wait_for_tcp(addr).await;
            send_lines_tls(
                addr,
                "localhost".into(),
                lines.into_iter(),
                std::path::Path::new(tls::TEST_PEM_CA_PATH),
                std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
                std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
            )
            .await
            .unwrap();
            let event = rx.next().await.unwrap();
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                "one line".into()
            );
            let tls_meta: ObjectMap = btreemap!(
                "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
            );
            assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
            let event = rx.next().await.unwrap();
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                "another line".into()
            );
            assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
        })
        .await;
    }
    #[tokio::test]
    async fn tcp_with_tls_vector_namespace() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, mut rx) = SourceSender::new_test();
            let addr = next_addr();
            let mut config = TcpConfig::from_address(addr.into());
            config.set_tls(Some(TlsSourceConfig {
                tls_config: TlsEnableableConfig {
                    enabled: Some(true),
                    options: TlsConfig {
                        verify_certificate: Some(true),
                        crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
                        key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
                        ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
                        ..Default::default()
                    },
                },
                client_metadata_key: None,
            }));
            config.log_namespace = Some(true);
            let server = SocketConfig::from(config)
                .build(SourceContext::new_test(tx, None))
                .await
                .unwrap();
            tokio::spawn(server);
            let lines = vec!["one line".to_owned(), "another line".to_owned()];
            wait_for_tcp(addr).await;
            send_lines_tls(
                addr,
                "localhost".into(),
                lines.into_iter(),
                std::path::Path::new(tls::TEST_PEM_CA_PATH),
                std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
                std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
            )
            .await
            .unwrap();
            let event = rx.next().await.unwrap();
            let log = event.as_log();
            let event_meta = log.metadata().value();
            assert_eq!(log.value(), &"one line".into());
            let tls_meta: ObjectMap = btreemap!(
                "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
            );
            assert_eq!(
                event_meta
                    .get(path!(SocketConfig::NAME, "tls_client_metadata"))
                    .unwrap(),
                &value!(tls_meta.clone())
            );
            let event = rx.next().await.unwrap();
            let log = event.as_log();
            let event_meta = log.metadata().value();
            assert_eq!(log.value(), &"another line".into());
            assert_eq!(
                event_meta
                    .get(path!(SocketConfig::NAME, "tls_client_metadata"))
                    .unwrap(),
                &value!(tls_meta.clone())
            );
        })
        .await;
    }
    #[tokio::test]
    async fn tcp_shutdown_simple() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let source_id = ComponentKey::from("tcp_shutdown_simple");
            let (tx, mut rx) = SourceSender::new_test();
            let addr = next_addr();
            let (cx, mut shutdown) = SourceContext::new_shutdown(&source_id, tx);
            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
                .build(cx)
                .await
                .unwrap();
            let source_handle = tokio::spawn(server);
            wait_for_tcp(addr).await;
            send_lines(addr, vec!["test".to_owned()].into_iter())
                .await
                .unwrap();
            let event = rx.next().await.unwrap();
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                "test".into()
            );
            let deadline = Instant::now() + Duration::from_secs(10);
            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
            let shutdown_success = shutdown_complete.await;
            assert!(shutdown_success);
            _ = source_handle.await.unwrap();
        })
        .await;
    }
    #[tokio::test]
    async fn tcp_shutdown_infinite_stream() {
        let addr = next_addr();
        let (source_tx, source_rx) = SourceSender::new_test_sender_with_buffer(10_000);
        let source_key = ComponentKey::from("tcp_shutdown_infinite_stream");
        let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx);
        let mut source_config = TcpConfig::from_address(addr.into());
        source_config.set_shutdown_timeout_secs(1);
        let source_task = SocketConfig::from(source_config)
            .build(source_cx)
            .await
            .unwrap();
        let source_handle = tokio::spawn(source_task);
        wait_for_tcp(addr).await;
        let message = random_string(512);
        let message_bytes = Bytes::from(message.clone());
        #[derive(Clone, Debug)]
        struct Serializer {
            bytes: Bytes,
        }
        impl tokio_util::codec::Encoder<Event> for Serializer {
            type Error = vector_lib::codecs::encoding::Error;
            fn encode(&mut self, _: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
                buffer.put(self.bytes.as_ref());
                buffer.put_u8(b'\n');
                Ok(())
            }
        }
        let sink_config = TcpSinkConfig::from_address(format!("localhost:{}", addr.port()));
        let encoder = Serializer {
            bytes: message_bytes,
        };
        let (sink, _healthcheck) = sink_config.build(Default::default(), encoder).unwrap();
        tokio::spawn(async move {
            let input = stream::repeat_with(|| LogEvent::default().into()).boxed();
            sink.run(input).await.unwrap();
        });
        let events = collect_n_limited(source_rx, 100)
            .await
            .into_iter()
            .collect::<Vec<_>>();
        assert_eq!(100, events.len());
        let message_key = log_schema().message_key().unwrap().to_string();
        let expected_message = message.clone().into();
        for event in events.into_iter().flat_map(EventContainer::into_events) {
            assert_eq!(event.as_log()[message_key.as_str()], expected_message);
        }
        let shutdown_timeout_limit = Duration::from_secs(10);
        let deadline = Instant::now() + shutdown_timeout_limit;
        let shutdown_complete = shutdown.shutdown_source(&source_key, deadline);
        let shutdown_result = timeout(shutdown_timeout_limit, shutdown_complete).await;
        assert_eq!(shutdown_result, Ok(true));
        let source_result = source_handle.await.expect("source task should not panic");
        assert_eq!(source_result, Ok(()));
    }
    #[tokio::test]
    async fn tcp_connection_close_after_max_duration() {
        let (tx, _) = SourceSender::new_test();
        let addr = next_addr();
        let mut source_config = TcpConfig::from_address(addr.into());
        source_config.set_max_connection_duration_secs(Some(1));
        let source_task = SocketConfig::from(source_config)
            .build(SourceContext::new_test(tx, None))
            .await
            .unwrap();
        drop(tokio::spawn(source_task));
        wait_for_tcp(addr).await;
        let mut stream: TcpStream = TcpStream::connect(addr)
            .await
            .expect("stream should be able to connect");
        let start = Instant::now();
        let timeout = tokio::time::sleep(Duration::from_millis(1200));
        let mut buffer = [0u8; 10];
        tokio::select! {
             _ = timeout => {
                 panic!("timed out waiting for stream to close")
             },
             read_result = stream.read(&mut buffer) => {
                 match read_result {
                    Ok(0) => assert_relative_eq!(start.elapsed().as_secs_f64(), 1.0, epsilon = 0.3),
                    Ok(_) => panic!("unexpectedly read data from stream"),
                    Err(e) => panic!("{:}", e)
                 }
             }
        }
    }
    fn send_lines_udp(addr: SocketAddr, lines: impl IntoIterator<Item = String>) -> SocketAddr {
        send_packets_udp(addr, lines.into_iter().map(|line| line.into()))
    }
    fn send_packets_udp(addr: SocketAddr, packets: impl IntoIterator<Item = Bytes>) -> SocketAddr {
        let bind = next_addr();
        let socket = UdpSocket::bind(bind)
            .map_err(|error| panic!("{:}", error))
            .ok()
            .unwrap();
        for packet in packets {
            assert_eq!(
                socket
                    .send_to(&packet, addr)
                    .map_err(|error| panic!("{:}", error))
                    .ok()
                    .unwrap(),
                packet.len()
            );
            thread::sleep(Duration::from_millis(1));
        }
        thread::sleep(Duration::from_millis(10));
        bind
    }
    async fn init_udp_with_shutdown(
        sender: SourceSender,
        source_id: &ComponentKey,
        shutdown: &mut SourceShutdownCoordinator,
    ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
        let (shutdown_signal, _) = shutdown.register_source(source_id, false);
        init_udp_inner(sender, source_id, shutdown_signal, None, false).await
    }
    async fn init_udp(sender: SourceSender, use_log_namespace: bool) -> SocketAddr {
        let (addr, _handle) = init_udp_inner(
            sender,
            &ComponentKey::from("default"),
            ShutdownSignal::noop(),
            None,
            use_log_namespace,
        )
        .await;
        addr
    }
    async fn init_udp_with_config(sender: SourceSender, config: UdpConfig) -> SocketAddr {
        let (addr, _handle) = init_udp_inner(
            sender,
            &ComponentKey::from("default"),
            ShutdownSignal::noop(),
            Some(config),
            false,
        )
        .await;
        addr
    }
    async fn init_udp_inner(
        sender: SourceSender,
        source_key: &ComponentKey,
        shutdown_signal: ShutdownSignal,
        config: Option<UdpConfig>,
        use_vector_namespace: bool,
    ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
        let (address, mut config) = match config {
            Some(config) => match config.address() {
                SocketListenAddr::SocketAddr(addr) => (addr, config),
                _ => panic!("listen address should not be systemd FD offset in tests"),
            },
            None => {
                let address = next_addr();
                (address, UdpConfig::from_address(address.into()))
            }
        };
        let config = if use_vector_namespace {
            config.set_log_namespace(Some(true));
            config
        } else {
            config
        };
        let server = SocketConfig::from(config)
            .build(SourceContext {
                key: source_key.clone(),
                globals: GlobalOptions::default(),
                shutdown: shutdown_signal,
                out: sender,
                proxy: Default::default(),
                acknowledgements: false,
                schema: Default::default(),
                schema_definitions: HashMap::default(),
                extra_context: Default::default(),
            })
            .await
            .unwrap();
        let source_handle = tokio::spawn(server);
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        (address, source_handle)
    }
    #[tokio::test]
    async fn udp_message() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let address = init_udp(tx, false).await;
            send_lines_udp(address, vec!["test".to_string()]);
            let events = collect_n(rx, 1).await;
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                "test".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn udp_message_preserves_newline() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let address = init_udp(tx, false).await;
            send_lines_udp(address, vec!["foo\nbar".to_string()]);
            let events = collect_n(rx, 1).await;
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                "foo\nbar".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn udp_multiple_packets() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let address = init_udp(tx, false).await;
            send_lines_udp(address, vec!["test".to_string(), "test2".to_string()]);
            let events = collect_n(rx, 2).await;
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                "test".into()
            );
            assert_eq!(
                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
                "test2".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn udp_max_length() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let address = next_addr();
            let mut config = UdpConfig::from_address(address.into());
            config.max_length = 11;
            let address = init_udp_with_config(tx, config).await;
            send_lines_udp(
                address,
                vec![
                    "short line".to_string(),
                    "test with a long line".to_string(),
                    "a short un".to_string(),
                ],
            );
            let events = collect_n(rx, 2).await;
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                "short line".into()
            );
            assert_eq!(
                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
                "a short un".into()
            );
        })
        .await;
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn udp_max_length_delimited() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let address = next_addr();
            let mut config = UdpConfig::from_address(address.into());
            config.max_length = 10;
            config.framing = Some(
                CharacterDelimitedDecoderConfig {
                    character_delimited: CharacterDelimitedDecoderOptions::new(b',', None),
                }
                .into(),
            );
            let address = init_udp_with_config(tx, config).await;
            send_lines_udp(
                address,
                vec!["test with, long line".to_string(), "short one".to_string()],
            );
            let events = collect_n(rx, 2).await;
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                "test with".into()
            );
            assert_eq!(
                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
                "short one".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn udp_decodes_chunked_gelf_messages() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let address = next_addr();
            let mut config = UdpConfig::from_address(address.into());
            config.decoding = GelfDeserializerConfig::default().into();
            let address = init_udp_with_config(tx, config).await;
            let seed = 42;
            let mut rng = SmallRng::seed_from_u64(seed);
            let max_size = 300;
            let big_message = "This is a very large message".repeat(500);
            let another_big_message = "This is another very large message".repeat(500);
            let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
            let mut another_chunks =
                get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
            chunks.append(&mut another_chunks);
            chunks.shuffle(&mut rng);
            send_packets_udp(address, chunks);
            let events = collect_n(rx, 2).await;
            assert_eq!(
                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
                big_message.into()
            );
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                another_big_message.into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn udp_it_includes_host() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let address = init_udp(tx, false).await;
            let from = send_lines_udp(address, vec!["test".to_string()]);
            let events = collect_n(rx, 1).await;
            assert_eq!(events[0].as_log()["host"], from.ip().to_string().into());
            assert_eq!(events[0].as_log()["port"], from.port().into());
        })
        .await;
    }
    #[tokio::test]
    async fn udp_it_includes_vector_namespaced_fields() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let address = init_udp(tx, true).await;
            let from = send_lines_udp(address, vec!["test".to_string()]);
            let events = collect_n(rx, 1).await;
            let log = events[0].as_log();
            let event_meta = log.metadata().value();
            assert_eq!(log.value(), &"test".into());
            assert_eq!(
                event_meta.get(path!("vector", "source_type")).unwrap(),
                &value!(SocketConfig::NAME)
            );
            assert_eq!(
                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
                &value!(from.ip().to_string())
            );
            assert_eq!(
                event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
                &value!(from.port())
            );
        })
        .await;
    }
    #[tokio::test]
    async fn udp_it_includes_source_type() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let address = init_udp(tx, false).await;
            _ = send_lines_udp(address, vec!["test".to_string()]);
            let events = collect_n(rx, 1).await;
            assert_eq!(
                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
                "socket".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn udp_shutdown_simple() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let source_id = ComponentKey::from("udp_shutdown_simple");
            let mut shutdown = SourceShutdownCoordinator::default();
            let (address, source_handle) =
                init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
            send_lines_udp(address, vec!["test".to_string()]);
            let events = collect_n(rx, 1).await;
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                "test".into()
            );
            let deadline = Instant::now() + Duration::from_secs(10);
            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
            let shutdown_success = shutdown_complete.await;
            assert!(shutdown_success);
            _ = source_handle.await.unwrap();
        })
        .await;
    }
    #[tokio::test]
    async fn udp_shutdown_infinite_stream() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let source_id = ComponentKey::from("udp_shutdown_infinite_stream");
            let mut shutdown = SourceShutdownCoordinator::default();
            let (address, source_handle) =
                init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
            let run_pump_atomic_sender = Arc::new(AtomicBool::new(true));
            let run_pump_atomic_receiver = Arc::clone(&run_pump_atomic_sender);
            let pump_handle = std::thread::spawn(move || {
                send_lines_udp(
                    address,
                    std::iter::repeat("test".to_string())
                        .take_while(move |_| run_pump_atomic_receiver.load(Ordering::Relaxed)),
                );
            });
            let events = collect_n(rx, 100).await;
            assert_eq!(100, events.len());
            for event in events {
                assert_eq!(
                    event.as_log()[log_schema().message_key().unwrap().to_string()],
                    "test".into()
                );
            }
            let deadline = Instant::now() + Duration::from_secs(10);
            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
            let shutdown_success = shutdown_complete.await;
            assert!(shutdown_success);
            _ = source_handle.await.unwrap();
            run_pump_atomic_sender.store(false, Ordering::Relaxed);
            assert!(pump_handle.join().is_ok());
        })
        .await;
    }
    #[cfg(unix)]
    async fn init_unix(sender: SourceSender, stream: bool, use_vector_namespace: bool) -> PathBuf {
        init_unix_inner(sender, stream, use_vector_namespace, None).await
    }
    #[cfg(unix)]
    async fn init_unix_with_config(
        sender: SourceSender,
        stream: bool,
        use_vector_namespace: bool,
        config: UnixConfig,
    ) -> PathBuf {
        init_unix_inner(sender, stream, use_vector_namespace, Some(config)).await
    }
    #[cfg(unix)]
    async fn init_unix_inner(
        sender: SourceSender,
        stream: bool,
        use_vector_namespace: bool,
        config: Option<UnixConfig>,
    ) -> PathBuf {
        let mut config = config.unwrap_or_else(|| {
            UnixConfig::new(tempfile::tempdir().unwrap().into_path().join("unix_test"))
        });
        let in_path = config.path.clone();
        if use_vector_namespace {
            config.log_namespace = Some(true);
        }
        let mode = if stream {
            Mode::UnixStream(config)
        } else {
            Mode::UnixDatagram(config)
        };
        let server = SocketConfig { mode }
            .build(SourceContext::new_test(sender, None))
            .await
            .unwrap();
        tokio::spawn(server);
        while if stream {
            std::os::unix::net::UnixStream::connect(&in_path).is_err()
        } else {
            let socket = std::os::unix::net::UnixDatagram::unbound().unwrap();
            socket.connect(&in_path).is_err()
        } {
            yield_now().await;
        }
        in_path
    }
    #[cfg(unix)]
    async fn unix_send_lines(stream: bool, path: PathBuf, lines: &[&str]) {
        match stream {
            false => send_lines_unix_datagram(path, lines).await,
            true => send_lines_unix_stream(path, lines).await,
        }
    }
    #[cfg(unix)]
    async fn unix_message(
        message: &str,
        stream: bool,
        use_vector_namespace: bool,
    ) -> (PathBuf, impl Stream<Item = Event>) {
        let (tx, rx) = SourceSender::new_test();
        let path = init_unix(tx, stream, use_vector_namespace).await;
        let path_clone = path.clone();
        unix_send_lines(stream, path, &[message]).await;
        (path_clone, rx)
    }
    #[cfg(unix)]
    async fn unix_multiple_packets(stream: bool) {
        let (tx, rx) = SourceSender::new_test();
        let path = init_unix(tx, stream, false).await;
        unix_send_lines(stream, path, &["test", "test2"]).await;
        let events = collect_n(rx, 2).await;
        assert_eq!(2, events.len());
        assert_eq!(
            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
            "test".into()
        );
        assert_eq!(
            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
            "test2".into()
        );
    }
    #[cfg(unix)]
    fn parses_unix_config(mode: &str) -> SocketConfig {
        toml::from_str::<SocketConfig>(&format!(
            r#"
               mode = "{}"
               path = "/does/not/exist"
            "#,
            mode
        ))
        .unwrap()
    }
    #[cfg(unix)]
    fn parses_unix_config_file_mode(mode: &str) -> SocketConfig {
        toml::from_str::<SocketConfig>(&format!(
            r#"
               mode = "{}"
               path = "/does/not/exist"
               socket_file_mode = 0o777
            "#,
            mode
        ))
        .unwrap()
    }
    #[cfg(unix)]
    async fn send_lines_unix_datagram(path: PathBuf, lines: &[&str]) {
        let packets = lines.iter().map(|line| Bytes::from(line.to_string()));
        send_packets_unix_datagram(path, packets).await;
    }
    #[cfg(unix)]
    async fn send_packets_unix_datagram(path: PathBuf, packets: impl IntoIterator<Item = Bytes>) {
        let socket = UnixDatagram::unbound().unwrap();
        socket.connect(path).unwrap();
        for packet in packets {
            socket.send(&packet).await.unwrap();
        }
        socket.shutdown(std::net::Shutdown::Both).unwrap();
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_datagram_message() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (_, rx) = unix_message("test", false, false).await;
            let events = collect_n(rx, 1).await;
            assert_eq!(events.len(), 1);
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                "test".into()
            );
            assert_eq!(
                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
                "socket".into()
            );
            assert_eq!(events[0].as_log()["host"], UNNAMED_SOCKET_HOST.into());
        })
        .await;
    }
    #[ignore]
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_datagram_socket_test() {
        use tempfile::tempdir;
        use tokio::net::UnixDatagram;
        let tmp = tempdir().unwrap();
        let tx_path = tmp.path().join("tx");
        let tx_type = "bound";
        let tx = if tx_type == "bound" {
            UnixDatagram::bind(&tx_path).unwrap()
        } else {
            UnixDatagram::unbound().unwrap()
        };
        let rx_path = tmp.path().join("rx");
        let rx = UnixDatagram::bind(&rx_path).unwrap();
        tx.connect(&rx_path).unwrap();
        let bytes = b"hello world";
        tx.send(bytes).await.unwrap();
        let mut buf = vec![0u8; 24];
        let (size, _) = rx.recv_from(&mut buf).await.unwrap();
        let dgram = &buf[..size];
        assert_eq!(dgram, bytes);
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_datagram_chunked_gelf_messages() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (tx, rx) = SourceSender::new_test();
            let in_path = tempfile::tempdir().unwrap().into_path().join("unix_test");
            let mut config = UnixConfig::new(in_path.clone());
            config.decoding = GelfDeserializerConfig::default().into();
            let path = init_unix_with_config(tx, false, false, config).await;
            let seed = 42;
            let mut rng = SmallRng::seed_from_u64(seed);
            let max_size = 20;
            let big_message = "This is a very large message".repeat(5);
            let another_big_message = "This is another very large message".repeat(5);
            let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
            let mut another_chunks =
                get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
            chunks.append(&mut another_chunks);
            chunks.shuffle(&mut rng);
            send_packets_unix_datagram(path, chunks).await;
            let events = collect_n(rx, 2).await;
            assert_eq!(
                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
                big_message.into()
            );
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                another_big_message.into()
            );
        })
        .await;
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_datagram_message_with_vector_namespace() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (_, rx) = unix_message("test", false, true).await;
            let events = collect_n(rx, 1).await;
            let log = events[0].as_log();
            let event_meta = log.metadata().value();
            assert_eq!(log.value(), &"test".into());
            assert_eq!(events.len(), 1);
            assert_eq!(
                event_meta.get(path!("vector", "source_type")).unwrap(),
                &value!(SocketConfig::NAME)
            );
            assert_eq!(
                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
                &value!(UNNAMED_SOCKET_HOST)
            );
        })
        .await;
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_datagram_message_preserves_newline() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (_, rx) = unix_message("foo\nbar", false, false).await;
            let events = collect_n(rx, 1).await;
            assert_eq!(events.len(), 1);
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                "foo\nbar".into()
            );
            assert_eq!(
                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
                "socket".into()
            );
        })
        .await;
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_datagram_multiple_packets() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            unix_multiple_packets(false).await
        })
        .await;
    }
    #[cfg(unix)]
    #[test]
    fn parses_unix_datagram_config() {
        let config = parses_unix_config("unix_datagram");
        assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
    }
    #[cfg(unix)]
    #[test]
    fn parses_unix_datagram_perms() {
        let config = parses_unix_config_file_mode("unix_datagram");
        assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_datagram_permissions() {
        let in_path = tempfile::tempdir().unwrap().into_path().join("unix_test");
        let (tx, _) = SourceSender::new_test();
        let mut config = UnixConfig::new(in_path.clone());
        config.socket_file_mode = Some(0o555);
        let mode = Mode::UnixDatagram(config);
        let server = SocketConfig { mode }
            .build(SourceContext::new_test(tx, None))
            .await
            .unwrap();
        tokio::spawn(server);
        wait_for(|| {
            match std::fs::metadata(&in_path) {
                Ok(meta) => {
                    match meta.permissions().mode() {
                        0o140555 => ready(true),
                        _ => ready(false),
                    }
                }
                Err(_) => ready(false),
            }
        })
        .await;
    }
    #[cfg(unix)]
    async fn send_lines_unix_stream(path: PathBuf, lines: &[&str]) {
        let socket = UnixStream::connect(path).await.unwrap();
        let mut sink = FramedWrite::new(socket, LinesCodec::new());
        let lines = lines.iter().map(|s| Ok(s.to_string()));
        let lines = lines.collect::<Vec<_>>();
        sink.send_all(&mut stream::iter(lines)).await.unwrap();
        let mut socket = sink.into_inner();
        socket.shutdown().await.unwrap();
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_stream_message() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (_, rx) = unix_message("test", true, false).await;
            let events = collect_n(rx, 1).await;
            assert_eq!(1, events.len());
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                "test".into()
            );
            assert_eq!(
                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
                "socket".into()
            );
        })
        .await;
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_stream_message_with_vector_namespace() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (_, rx) = unix_message("test", true, true).await;
            let events = collect_n(rx, 1).await;
            let log = events[0].as_log();
            let event_meta = log.metadata().value();
            assert_eq!(log.value(), &"test".into());
            assert_eq!(1, events.len());
            assert_eq!(
                event_meta.get(path!("vector", "source_type")).unwrap(),
                &value!(SocketConfig::NAME)
            );
            assert_eq!(
                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
                &value!(UNNAMED_SOCKET_HOST)
            );
        })
        .await;
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_stream_message_splits_on_newline() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            let (_, rx) = unix_message("foo\nbar", true, false).await;
            let events = collect_n(rx, 2).await;
            assert_eq!(events.len(), 2);
            assert_eq!(
                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
                "foo".into()
            );
            assert_eq!(
                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
                "socket".into()
            );
            assert_eq!(
                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
                "bar".into()
            );
            assert_eq!(
                events[1].as_log()[log_schema().source_type_key().unwrap().to_string()],
                "socket".into()
            );
        })
        .await;
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_stream_multiple_packets() {
        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
            unix_multiple_packets(true).await
        })
        .await;
    }
    #[cfg(unix)]
    #[test]
    fn parses_new_unix_stream_config() {
        let config = parses_unix_config("unix_stream");
        assert!(matches!(config.mode, Mode::UnixStream { .. }));
    }
    #[cfg(unix)]
    #[test]
    fn parses_new_unix_datagram_perms() {
        let config = parses_unix_config_file_mode("unix_stream");
        assert!(matches!(config.mode, Mode::UnixStream { .. }));
    }
    #[cfg(unix)]
    #[test]
    fn parses_old_unix_stream_config() {
        let config = parses_unix_config("unix");
        assert!(matches!(config.mode, Mode::UnixStream { .. }));
    }
    #[cfg(unix)]
    #[tokio::test]
    async fn unix_stream_permissions() {
        let in_path = tempfile::tempdir().unwrap().into_path().join("unix_test");
        let (tx, _) = SourceSender::new_test();
        let mut config = UnixConfig::new(in_path.clone());
        config.socket_file_mode = Some(0o421);
        let mode = Mode::UnixStream(config);
        let server = SocketConfig { mode }
            .build(SourceContext::new_test(tx, None))
            .await
            .unwrap();
        tokio::spawn(server);
        wait_for(|| {
            match std::fs::metadata(&in_path) {
                Ok(meta) => {
                    match meta.permissions().mode() {
                        0o140421 => ready(true),
                        _ => ready(false),
                    }
                }
                Err(_) => ready(false),
            }
        })
        .await;
    }
}