use ipnet::IpNet;
#[cfg(unix)]
use std::os::unix::{fs::PermissionsExt, io::AsRawFd};
use std::{
    convert::TryInto,
    fs,
    marker::{Send, Sync},
    net::SocketAddr,
    path::PathBuf,
    sync::{
        atomic::{AtomicU32, Ordering},
        Arc, Mutex,
    },
    thread,
    time::Duration,
};
use bytes::{Buf, Bytes, BytesMut};
use futures::{
    executor::block_on,
    future,
    sink::{Sink, SinkExt},
    stream::{self, StreamExt, TryStreamExt},
};
use futures_util::{future::BoxFuture, Future, FutureExt};
use listenfd::ListenFd;
use tokio::{
    self,
    io::{AsyncRead, AsyncWrite},
    net::{TcpStream, UnixListener},
    task::JoinHandle,
    time::sleep,
};
use tokio_stream::wrappers::UnixListenerStream;
use tokio_util::codec::{length_delimited, Framed};
use tracing::{field, Instrument, Span};
use vector_lib::{
    lookup::OwnedValuePath,
    tcp::TcpKeepaliveConfig,
    tls::{CertificateMetadata, MaybeTlsIncomingStream, MaybeTlsSettings},
};
use crate::{
    event::Event,
    internal_events::{
        ConnectionOpen, OpenGauge, SocketBindError, SocketMode, SocketReceiveError,
        TcpBytesReceived, TcpSocketError, TcpSocketTlsConnectionError, UnixSocketError,
        UnixSocketFileDeleteError,
    },
    shutdown::ShutdownSignal,
    sources::{
        util::{
            net::{try_bind_tcp_listener, MAX_IN_FLIGHT_EVENTS_TARGET},
            AfterReadExt,
        },
        Source,
    },
    SourceSender,
};
use super::net::{RequestLimiter, SocketListenAddr};
const FSTRM_CONTROL_FRAME_LENGTH_MAX: usize = 512;
const FSTRM_CONTROL_FIELD_CONTENT_TYPE_LENGTH_MAX: usize = 256;
pub type FrameStreamSink = Box<dyn Sink<Bytes, Error = std::io::Error> + Send + Unpin>;
pub struct FrameStreamReader {
    response_sink: Mutex<FrameStreamSink>,
    expected_content_type: String,
    state: FrameStreamState,
}
struct FrameStreamState {
    expect_control_frame: bool,
    control_state: ControlState,
    is_bidirectional: bool,
}
impl FrameStreamState {
    const fn new() -> Self {
        FrameStreamState {
            expect_control_frame: false,
            control_state: ControlState::Initial,
            is_bidirectional: true, }
    }
}
#[derive(PartialEq, Debug)]
enum ControlState {
    Initial,
    GotReady,
    ReadingData,
    Stopped,
}
#[derive(Copy, Clone)]
enum ControlHeader {
    Accept,
    Start,
    Stop,
    Ready,
    Finish,
}
impl ControlHeader {
    fn from_u32(val: u32) -> Result<Self, ()> {
        match val {
            0x01 => Ok(ControlHeader::Accept),
            0x02 => Ok(ControlHeader::Start),
            0x03 => Ok(ControlHeader::Stop),
            0x04 => Ok(ControlHeader::Ready),
            0x05 => Ok(ControlHeader::Finish),
            _ => {
                error!("Don't know header value {} (expected 0x01 - 0x05).", val);
                Err(())
            }
        }
    }
    const fn to_u32(self) -> u32 {
        match self {
            ControlHeader::Accept => 0x01,
            ControlHeader::Start => 0x02,
            ControlHeader::Stop => 0x03,
            ControlHeader::Ready => 0x04,
            ControlHeader::Finish => 0x05,
        }
    }
}
enum ControlField {
    ContentType,
}
impl ControlField {
    fn from_u32(val: u32) -> Result<Self, ()> {
        match val {
            0x01 => Ok(ControlField::ContentType),
            _ => {
                error!("Don't know field type {} (expected 0x01).", val);
                Err(())
            }
        }
    }
    const fn to_u32(&self) -> u32 {
        match self {
            ControlField::ContentType => 0x01,
        }
    }
}
fn advance_u32(b: &mut Bytes) -> Result<u32, ()> {
    if b.len() < 4 {
        error!("Malformed frame.");
        return Err(());
    }
    let a = b.split_to(4);
    Ok(u32::from_be_bytes(a[..].try_into().unwrap()))
}
impl FrameStreamReader {
    pub fn new(response_sink: FrameStreamSink, expected_content_type: String) -> Self {
        FrameStreamReader {
            response_sink: Mutex::new(response_sink),
            expected_content_type,
            state: FrameStreamState::new(),
        }
    }
    pub fn handle_frame(&mut self, frame: Bytes) -> Option<Bytes> {
        if frame.is_empty() {
            self.state.expect_control_frame = true;
            None
        } else if self.state.expect_control_frame {
            self.state.expect_control_frame = false;
            _ = self.handle_control_frame(frame);
            None
        } else {
            if self.state.control_state == ControlState::ReadingData {
                Some(frame) } else {
                error!(
                    "Received a data frame while in state {:?}.",
                    self.state.control_state
                );
                None
            }
        }
    }
    fn handle_control_frame(&mut self, mut frame: Bytes) -> Result<(), ()> {
        if frame.len() > FSTRM_CONTROL_FRAME_LENGTH_MAX {
            error!("Control frame is too long.");
        }
        let header = ControlHeader::from_u32(advance_u32(&mut frame)?)?;
        match self.state.control_state {
            ControlState::Initial => {
                match header {
                    ControlHeader::Ready => {
                        let content_type = self.process_fields(header, &mut frame)?.unwrap();
                        self.send_control_frame(Self::make_frame(
                            ControlHeader::Accept,
                            Some(content_type),
                        ));
                        self.state.control_state = ControlState::GotReady; }
                    ControlHeader::Start => {
                        _ = self.process_fields(header, &mut frame)?;
                        self.state.control_state = ControlState::ReadingData;
                        self.state.is_bidirectional = false; }
                    _ => error!("Got wrong control frame, expected READY."),
                }
            }
            ControlState::GotReady => {
                match header {
                    ControlHeader::Start => {
                        _ = self.process_fields(header, &mut frame)?;
                        self.state.control_state = ControlState::ReadingData;
                    }
                    _ => error!("Got wrong control frame, expected START."),
                }
            }
            ControlState::ReadingData => {
                match header {
                    ControlHeader::Stop => {
                        _ = self.process_fields(header, &mut frame)?;
                        if self.state.is_bidirectional {
                            self.send_control_frame(Self::make_frame(ControlHeader::Finish, None));
                        }
                        self.state.control_state = ControlState::Stopped; }
                    _ => error!("Got wrong control frame, expected STOP."),
                }
            }
            ControlState::Stopped => error!("Unexpected control frame, current state is STOPPED."),
        };
        Ok(())
    }
    fn process_fields(
        &mut self,
        header: ControlHeader,
        frame: &mut Bytes,
    ) -> Result<Option<String>, ()> {
        match header {
            ControlHeader::Ready => {
                let is_start_frame = false;
                let content_type = self.process_content_type(frame, is_start_frame)?;
                Ok(Some(content_type))
            }
            ControlHeader::Start => {
                if frame.is_empty() {
                    Ok(None)
                } else {
                    let is_start_frame = true;
                    let content_type = self.process_content_type(frame, is_start_frame)?;
                    Ok(Some(content_type))
                }
            }
            ControlHeader::Stop => {
                if !frame.is_empty() {
                    error!("Unexpected fields in STOP header.");
                    Err(())
                } else {
                    Ok(None)
                }
            }
            _ => {
                error!("Unexpected control header value {:?}.", header.to_u32());
                Err(())
            }
        }
    }
    fn process_content_type(&self, frame: &mut Bytes, is_start_frame: bool) -> Result<String, ()> {
        if frame.is_empty() {
            error!("No fields in control frame.");
            return Err(());
        }
        let mut content_types = vec![];
        while !frame.is_empty() {
            let field_val = advance_u32(frame)?;
            let field_type = ControlField::from_u32(field_val)?;
            match field_type {
                ControlField::ContentType => {
                    let field_len = advance_u32(frame)? as usize;
                    if field_len > FSTRM_CONTROL_FIELD_CONTENT_TYPE_LENGTH_MAX {
                        error!("Content-Type string is too long.");
                        return Err(());
                    }
                    let content_type = std::str::from_utf8(&frame[..field_len]).unwrap();
                    content_types.push(content_type.to_string());
                    frame.advance(field_len);
                }
            }
        }
        if is_start_frame && content_types.len() > 1 {
            error!(
                "START control frame can only have one content-type provided (got {}).",
                content_types.len()
            );
            return Err(());
        }
        for content_type in &content_types {
            if *content_type == self.expected_content_type {
                return Ok(content_type.clone());
            }
        }
        error!(
            "Content types did not match up. Expected {} got {:?}.",
            self.expected_content_type, content_types
        );
        Err(())
    }
    fn make_frame(header: ControlHeader, content_type: Option<String>) -> Bytes {
        let mut frame = BytesMut::new();
        frame.extend(header.to_u32().to_be_bytes());
        if let Some(s) = content_type {
            frame.extend(ControlField::ContentType.to_u32().to_be_bytes()); frame.extend((s.len() as u32).to_be_bytes()); frame.extend(s.as_bytes());
        }
        Bytes::from(frame)
    }
    fn send_control_frame(&mut self, frame: Bytes) {
        let empty_frame = Bytes::from(&b""[..]); let mut stream = stream::iter(vec![Ok(empty_frame), Ok(frame)]);
        if let Err(e) = block_on(self.response_sink.lock().unwrap().send_all(&mut stream)) {
            error!("Encountered error '{:#?}' while sending control frame.", e);
        }
    }
}
pub trait FrameHandler {
    fn content_type(&self) -> String;
    fn max_frame_length(&self) -> usize;
    fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event>;
    fn multithreaded(&self) -> bool;
    fn max_frame_handling_tasks(&self) -> u32;
    fn host_key(&self) -> &Option<OwnedValuePath>;
    fn timestamp_key(&self) -> Option<&OwnedValuePath>;
    fn source_type_key(&self) -> Option<&OwnedValuePath>;
}
pub trait UnixFrameHandler: FrameHandler {
    fn socket_path(&self) -> PathBuf;
    fn socket_file_mode(&self) -> Option<u32>;
    fn socket_receive_buffer_size(&self) -> Option<usize>;
    fn socket_send_buffer_size(&self) -> Option<usize>;
}
pub trait TcpFrameHandler: FrameHandler {
    fn address(&self) -> SocketListenAddr;
    fn keepalive(&self) -> Option<TcpKeepaliveConfig>;
    fn shutdown_timeout_secs(&self) -> Duration;
    fn tls(&self) -> MaybeTlsSettings;
    fn tls_client_metadata_key(&self) -> Option<OwnedValuePath>;
    fn receive_buffer_bytes(&self) -> Option<usize>;
    fn max_connection_duration_secs(&self) -> Option<u64>;
    fn max_connections(&self) -> Option<u32>;
    fn allowed_origins(&self) -> Option<&[IpNet]>;
    fn insert_tls_client_metadata(&mut self, metadata: Option<CertificateMetadata>);
}
pub fn build_framestream_tcp_source(
    frame_handler: impl TcpFrameHandler + Send + Sync + Clone + 'static,
    shutdown: ShutdownSignal,
    out: SourceSender,
) -> crate::Result<Source> {
    let addr = frame_handler.address();
    let tls = frame_handler.tls();
    let shutdown = shutdown.clone();
    let out = out.clone();
    Ok(Box::pin(async move {
        let listenfd = ListenFd::from_env();
        let listener = try_bind_tcp_listener(
            addr,
            listenfd,
            &tls,
            frame_handler
                .allowed_origins()
                .map(|origins| origins.to_vec()),
        )
        .await
        .map_err(|error| {
            emit!(SocketBindError {
                mode: SocketMode::Tcp,
                error: &error,
            })
        })?;
        info!(
            message = "Listening.",
            addr = %listener
                .local_addr()
                .map(SocketListenAddr::SocketAddr)
                .unwrap_or(addr)
        );
        let tripwire = shutdown.clone();
        let shutdown_timeout_secs = frame_handler.shutdown_timeout_secs();
        let tripwire = async move {
            _ = tripwire.await;
            sleep(shutdown_timeout_secs).await;
        }
        .shared();
        let connection_gauge = OpenGauge::new();
        let shutdown_clone = shutdown.clone();
        let request_limiter =
            RequestLimiter::new(MAX_IN_FLIGHT_EVENTS_TARGET, crate::num_threads());
        listener
            .accept_stream_limited(frame_handler.max_connections())
            .take_until(shutdown_clone)
            .for_each(move |(connection, tcp_connection_permit)| {
                let shutdown_signal = shutdown.clone();
                let tripwire = tripwire.clone();
                let out = out.clone();
                let connection_gauge = connection_gauge.clone();
                let request_limiter = request_limiter.clone();
                let frame_handler_clone = frame_handler.clone();
                async move {
                    let socket = match connection {
                        Ok(socket) => socket,
                        Err(error) => {
                            emit!(SocketReceiveError {
                                mode: SocketMode::Tcp,
                                error: &error
                            });
                            return;
                        }
                    };
                    let peer_addr = socket.peer_addr();
                    let span = info_span!("connection", %peer_addr);
                    let tripwire = tripwire
                        .map(move |_| {
                            info!(
                                message = "Resetting connection (still open after seconds).",
                                seconds = ?shutdown_timeout_secs
                            );
                        })
                        .boxed();
                    span.clone().in_scope(|| {
                        debug!(message = "Accepted a new connection.", peer_addr = %peer_addr);
                        let open_token =
                            connection_gauge.open(|count| emit!(ConnectionOpen { count }));
                        let fut = handle_stream(
                            frame_handler_clone,
                            shutdown_signal,
                            socket,
                            tripwire,
                            peer_addr,
                            out,
                            request_limiter,
                        );
                        tokio::spawn(
                            fut.map(move |()| {
                                drop(open_token);
                                drop(tcp_connection_permit);
                            })
                            .instrument(span.or_current()),
                        );
                    });
                }
            })
            .map(Ok)
            .await
    }))
}
#[allow(clippy::too_many_arguments)]
async fn handle_stream(
    mut frame_handler: impl TcpFrameHandler + Send + Sync + Clone + 'static,
    mut shutdown_signal: ShutdownSignal,
    mut socket: MaybeTlsIncomingStream<TcpStream>,
    _tripwire: BoxFuture<'static, ()>,
    peer_addr: SocketAddr,
    out: SourceSender,
    _request_limiter: RequestLimiter,
) {
    tokio::select! {
        result = socket.handshake() => {
            if let Err(error) = result {
                emit!(TcpSocketTlsConnectionError { error });
                return;
            }
        },
        _ = &mut shutdown_signal => {
            return;
        }
    };
    if let Some(keepalive) = frame_handler.keepalive() {
        if let Err(error) = socket.set_keepalive(keepalive) {
            warn!(message = "Failed configuring TCP keepalive.", %error);
        }
    }
    if let Some(receive_buffer_bytes) = frame_handler.receive_buffer_bytes() {
        if let Err(error) = socket.set_receive_buffer_bytes(receive_buffer_bytes) {
            warn!(message = "Failed configuring receive buffer size on TCP socket.", %error);
        }
    }
    let socket = socket.after_read(move |byte_size| {
        emit!(TcpBytesReceived {
            byte_size,
            peer_addr
        });
    });
    let certificate_metadata = socket
        .get_ref()
        .ssl_stream()
        .and_then(|stream| stream.ssl().peer_certificate())
        .map(CertificateMetadata::from);
    frame_handler.insert_tls_client_metadata(certificate_metadata);
    let span = info_span!("connection");
    span.record("peer_addr", field::debug(&peer_addr));
    let received_from: Option<Bytes> = Some(peer_addr.to_string().into());
    let active_parsing_task_nums = Arc::new(AtomicU32::new(0));
    build_framestream_source(
        frame_handler,
        socket,
        received_from,
        out,
        shutdown_signal,
        span,
        active_parsing_task_nums,
        move |error| {
            emit!(TcpSocketError {
                error: &error,
                peer_addr,
            });
        },
    );
}
pub fn build_framestream_unix_source(
    frame_handler: impl UnixFrameHandler + Send + Sync + Clone + 'static,
    shutdown: ShutdownSignal,
    out: SourceSender,
) -> crate::Result<Source> {
    let path = frame_handler.socket_path();
    match fs::metadata(&path) {
        Ok(_) => {
            info!(message = "Deleting file.", ?path);
            fs::remove_file(&path)?;
        }
        Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {} Err(e) => {
            error!("Unable to get socket information; error = {:?}.", e);
            return Err(Box::new(e));
        }
    };
    let listener = UnixListener::bind(&path)?;
    if let Some(socket_receive_buffer_size) = frame_handler.socket_receive_buffer_size() {
        _ = nix::sys::socket::setsockopt(
            listener.as_raw_fd(),
            nix::sys::socket::sockopt::RcvBuf,
            &(socket_receive_buffer_size),
        );
        let rcv_buf_size =
            nix::sys::socket::getsockopt(listener.as_raw_fd(), nix::sys::socket::sockopt::RcvBuf);
        info!(
            "Unix socket receive buffer size modified to {}.",
            rcv_buf_size.unwrap()
        );
    }
    if let Some(socket_send_buffer_size) = frame_handler.socket_send_buffer_size() {
        _ = nix::sys::socket::setsockopt(
            listener.as_raw_fd(),
            nix::sys::socket::sockopt::SndBuf,
            &(socket_send_buffer_size),
        );
        let snd_buf_size =
            nix::sys::socket::getsockopt(listener.as_raw_fd(), nix::sys::socket::sockopt::SndBuf);
        info!(
            "Unix socket buffer send size modified to {}.",
            snd_buf_size.unwrap()
        );
    }
    if let Some(socket_permission) = frame_handler.socket_file_mode() {
        if !(448..=511).contains(&socket_permission) {
            return Err(format!(
                "Invalid Socket permission {:#o}. Must between 0o700 and 0o777.",
                socket_permission
            )
            .into());
        }
        match fs::set_permissions(&path, fs::Permissions::from_mode(socket_permission)) {
            Ok(_) => {
                info!("Socket permissions updated to {:#o}.", socket_permission);
            }
            Err(e) => {
                error!(
                    "Failed to update listener socket permissions; error = {:?}.",
                    e
                );
                return Err(Box::new(e));
            }
        };
    };
    let fut = async move {
        let active_parsing_task_nums = Arc::new(AtomicU32::new(0));
        info!(message = "Listening...", ?path, r#type = "unix");
        let mut stream = UnixListenerStream::new(listener).take_until(shutdown.clone());
        while let Some(socket) = stream.next().await {
            let socket = match socket {
                Err(e) => {
                    error!("Failed to accept socket; error = {:?}.", e);
                    continue;
                }
                Ok(s) => s,
            };
            let peer_addr = socket.peer_addr().ok();
            let listen_path = path.clone();
            let active_task_nums_ = Arc::clone(&active_parsing_task_nums);
            let span = info_span!("connection");
            let path = if let Some(addr) = peer_addr {
                if let Some(path) = addr.as_pathname().map(|e| e.to_owned()) {
                    span.record("peer_path", field::debug(&path));
                    Some(path)
                } else {
                    None
                }
            } else {
                None
            };
            let received_from: Option<Bytes> =
                path.map(|p| p.to_string_lossy().into_owned().into());
            build_framestream_source(
                frame_handler.clone(),
                socket,
                received_from,
                out.clone(),
                shutdown.clone(),
                span,
                active_task_nums_,
                move |error| {
                    emit!(UnixSocketError {
                        error: &error,
                        path: &listen_path,
                    });
                },
            );
        }
        drop(stream);
        if let Err(error) = fs::remove_file(&path) {
            emit!(UnixSocketFileDeleteError { path: &path, error });
        }
        Ok(())
    };
    Ok(Box::pin(fut))
}
#[allow(clippy::too_many_arguments)]
fn build_framestream_source<T: Send + 'static>(
    frame_handler: impl FrameHandler + Send + Sync + Clone + 'static,
    socket: impl AsyncRead + AsyncWrite + Send + 'static,
    received_from: Option<Bytes>,
    out: SourceSender,
    shutdown: impl Future<Output = T> + Unpin + Send + 'static,
    span: Span,
    active_task_nums: Arc<AtomicU32>,
    error_mapper: impl FnMut(std::io::Error) + Send + 'static,
) {
    let content_type = frame_handler.content_type();
    let mut event_sink = out.clone();
    let (sock_sink, sock_stream) = Framed::new(
        socket,
        length_delimited::Builder::new()
            .max_frame_length(frame_handler.max_frame_length())
            .new_codec(),
    )
    .split();
    let mut fs_reader = FrameStreamReader::new(Box::new(sock_sink), content_type);
    let frame_handler_copy = frame_handler.clone();
    let frames = sock_stream
        .take_until(shutdown)
        .map_err(error_mapper)
        .filter_map(move |frame| {
            future::ready(match frame {
                Ok(f) => fs_reader.handle_frame(Bytes::from(f)),
                Err(_) => None,
            })
        });
    if !frame_handler.multithreaded() {
        let mut events = frames.filter_map(move |f| {
            future::ready(frame_handler_copy.handle_event(received_from.clone(), f))
        });
        let handler = async move {
            if let Err(e) = event_sink.send_event_stream(&mut events).await {
                error!("Error sending event: {:?}.", e);
            }
            info!("Finished sending.");
        };
        tokio::spawn(handler.instrument(span.or_current()));
    } else {
        let handler = async move {
            frames
                .for_each(move |f| {
                    future::ready({
                        let max_frame_handling_tasks =
                            frame_handler_copy.max_frame_handling_tasks();
                        let f_handler = frame_handler_copy.clone();
                        let received_from_copy = received_from.clone();
                        let event_sink_copy = event_sink.clone();
                        let active_task_nums_copy = Arc::clone(&active_task_nums);
                        spawn_event_handling_tasks(
                            f,
                            f_handler,
                            event_sink_copy,
                            received_from_copy,
                            active_task_nums_copy,
                            max_frame_handling_tasks,
                        );
                    })
                })
                .await;
            info!("Finished sending.");
        };
        tokio::spawn(handler.instrument(span.or_current()));
    }
}
fn spawn_event_handling_tasks(
    event_data: Bytes,
    event_handler: impl FrameHandler + Send + Sync + 'static,
    mut event_sink: SourceSender,
    received_from: Option<Bytes>,
    active_task_nums: Arc<AtomicU32>,
    max_frame_handling_tasks: u32,
) -> JoinHandle<()> {
    wait_for_task_quota(&active_task_nums, max_frame_handling_tasks);
    tokio::spawn(async move {
        future::ready({
            if let Some(evt) = event_handler.handle_event(received_from, event_data) {
                if event_sink.send_event(evt).await.is_err() {
                    error!("Encountered error while sending event.");
                }
            }
            active_task_nums.fetch_sub(1, Ordering::AcqRel);
        })
        .await;
    })
}
fn wait_for_task_quota(active_task_nums: &Arc<AtomicU32>, max_tasks: u32) {
    while max_tasks > 0 && max_tasks < active_task_nums.load(Ordering::Acquire) {
        thread::sleep(Duration::from_millis(3));
    }
    active_task_nums.fetch_add(1, Ordering::AcqRel);
}
#[cfg(test)]
mod test {
    use futures_util::Stream;
    use std::net::SocketAddr;
    #[cfg(unix)]
    use std::{
        path::PathBuf,
        sync::{
            atomic::{AtomicU32, Ordering},
            Arc,
        },
        thread,
    };
    use tokio::net::TcpStream;
    use bytes::{buf::Buf, Bytes, BytesMut};
    use futures::{
        future,
        sink::{Sink, SinkExt},
        stream::{self, StreamExt},
    };
    use ipnet::IpNet;
    use tokio::{
        self,
        net::UnixStream,
        task::JoinHandle,
        time::{Duration, Instant},
    };
    use tokio_util::codec::{length_delimited, Framed};
    use vector_lib::{
        config::{LegacyKey, LogNamespace},
        tcp::TcpKeepaliveConfig,
        tls::{CertificateMetadata, MaybeTls},
    };
    use vector_lib::{
        lookup::{owned_value_path, path, OwnedValuePath},
        tls::MaybeTlsSettings,
    };
    use super::{
        build_framestream_tcp_source, build_framestream_unix_source, spawn_event_handling_tasks,
        ControlField, ControlHeader, FrameHandler, TcpFrameHandler, UnixFrameHandler,
    };
    use crate::{
        config::{log_schema, ComponentKey},
        event::{Event, LogEvent},
        shutdown::SourceShutdownCoordinator,
        sources::util::net::SocketListenAddr,
        test_util::{collect_n, collect_n_stream, next_addr},
        SourceSender,
    };
    #[derive(Clone)]
    struct MockFrameHandler<F: Send + Sync + Clone + FnOnce() + 'static> {
        content_type: String,
        max_frame_length: usize,
        multithreaded: bool,
        max_frame_handling_tasks: u32,
        extra_task_handling_routine: F,
        host_key: Option<OwnedValuePath>,
        timestamp_key: Option<OwnedValuePath>,
        source_type_key: Option<OwnedValuePath>,
        log_namespace: LogNamespace,
    }
    #[derive(Clone)]
    struct MockUnixFrameHandler<F: Send + Sync + Clone + FnOnce() + 'static> {
        frame_handler: MockFrameHandler<F>,
        socket_path: PathBuf,
        socket_file_mode: Option<u32>,
        socket_receive_buffer_size: Option<usize>,
        socket_send_buffer_size: Option<usize>,
    }
    #[derive(Clone)]
    struct MockTcpFrameHandler<F: Send + Sync + Clone + FnOnce() + 'static> {
        frame_handler: MockFrameHandler<F>,
        address: SocketListenAddr,
        keepalive: Option<TcpKeepaliveConfig>,
        shutdown_timeout_secs: Duration,
        tls: MaybeTlsSettings,
        tls_client_metadata_key: Option<OwnedValuePath>,
        receive_buffer_bytes: Option<usize>,
        max_connection_duration_secs: Option<u64>,
        max_connections: Option<u32>,
        permit_origin: Option<Vec<IpNet>>,
    }
    impl<F: Send + Sync + Clone + FnOnce() + 'static> MockTcpFrameHandler<F> {
        pub fn new(
            addr: SocketAddr,
            content_type: String,
            multithreaded: bool,
            extra_routine: F,
            permit_origin: Option<Vec<IpNet>>,
        ) -> Self {
            Self {
                frame_handler: MockFrameHandler::new(content_type, multithreaded, extra_routine),
                address: addr.into(),
                keepalive: None,
                shutdown_timeout_secs: Duration::from_secs(30),
                tls: MaybeTls::Raw(()),
                tls_client_metadata_key: None,
                receive_buffer_bytes: None,
                max_connection_duration_secs: None,
                max_connections: None,
                permit_origin,
            }
        }
    }
    impl<F: Send + Sync + Clone + FnOnce() + 'static> MockUnixFrameHandler<F> {
        pub fn new(content_type: String, multithreaded: bool, extra_routine: F) -> Self {
            Self {
                frame_handler: MockFrameHandler::new(content_type, multithreaded, extra_routine),
                socket_path: tempfile::tempdir().unwrap().into_path().join("unix_test"),
                socket_file_mode: None,
                socket_receive_buffer_size: None,
                socket_send_buffer_size: None,
            }
        }
    }
    impl<F: Send + Sync + Clone + FnOnce() + 'static> MockFrameHandler<F> {
        pub fn new(content_type: String, multithreaded: bool, extra_routine: F) -> Self {
            Self {
                content_type,
                max_frame_length: bytesize::kib(100u64) as usize,
                multithreaded,
                max_frame_handling_tasks: 0,
                extra_task_handling_routine: extra_routine,
                host_key: Some(owned_value_path!("test_framestream")),
                timestamp_key: Some(owned_value_path!("my_timestamp")),
                source_type_key: Some(owned_value_path!("source_type")),
                log_namespace: LogNamespace::Legacy,
            }
        }
    }
    impl<F: Send + Sync + Clone + FnOnce() + 'static> FrameHandler for MockFrameHandler<F> {
        fn content_type(&self) -> String {
            self.content_type.clone()
        }
        fn max_frame_length(&self) -> usize {
            self.max_frame_length
        }
        fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
            let mut log_event = LogEvent::from(frame);
            log_event.insert(
                log_schema().source_type_key_target_path().unwrap(),
                "framestream",
            );
            if let Some(host) = received_from {
                self.log_namespace.insert_source_metadata(
                    "framestream",
                    &mut log_event,
                    self.host_key.as_ref().map(LegacyKey::Overwrite),
                    path!("host"),
                    host,
                )
            }
            (self.extra_task_handling_routine.clone())();
            Some(log_event.into())
        }
        fn multithreaded(&self) -> bool {
            self.multithreaded
        }
        fn max_frame_handling_tasks(&self) -> u32 {
            self.max_frame_handling_tasks
        }
        fn host_key(&self) -> &Option<OwnedValuePath> {
            &self.host_key
        }
        fn timestamp_key(&self) -> Option<&OwnedValuePath> {
            self.timestamp_key.as_ref()
        }
        fn source_type_key(&self) -> Option<&OwnedValuePath> {
            self.source_type_key.as_ref()
        }
    }
    impl<F: Send + Sync + Clone + FnOnce() + 'static> FrameHandler for MockUnixFrameHandler<F> {
        fn content_type(&self) -> String {
            self.frame_handler.content_type()
        }
        fn max_frame_length(&self) -> usize {
            self.frame_handler.max_frame_length()
        }
        fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
            self.frame_handler.handle_event(received_from, frame)
        }
        fn multithreaded(&self) -> bool {
            self.frame_handler.multithreaded()
        }
        fn max_frame_handling_tasks(&self) -> u32 {
            self.frame_handler.max_frame_handling_tasks()
        }
        fn host_key(&self) -> &Option<OwnedValuePath> {
            self.frame_handler.host_key()
        }
        fn timestamp_key(&self) -> Option<&OwnedValuePath> {
            self.frame_handler.timestamp_key()
        }
        fn source_type_key(&self) -> Option<&OwnedValuePath> {
            self.frame_handler.source_type_key()
        }
    }
    impl<F: Send + Sync + Clone + FnOnce() + 'static> UnixFrameHandler for MockUnixFrameHandler<F> {
        fn socket_path(&self) -> PathBuf {
            self.socket_path.clone()
        }
        fn socket_file_mode(&self) -> Option<u32> {
            self.socket_file_mode
        }
        fn socket_receive_buffer_size(&self) -> Option<usize> {
            self.socket_receive_buffer_size
        }
        fn socket_send_buffer_size(&self) -> Option<usize> {
            self.socket_send_buffer_size
        }
    }
    impl<F: Send + Sync + Clone + FnOnce() + 'static> FrameHandler for MockTcpFrameHandler<F> {
        fn content_type(&self) -> String {
            self.frame_handler.content_type()
        }
        fn max_frame_length(&self) -> usize {
            self.frame_handler.max_frame_length()
        }
        fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
            self.frame_handler.handle_event(received_from, frame)
        }
        fn multithreaded(&self) -> bool {
            self.frame_handler.multithreaded()
        }
        fn max_frame_handling_tasks(&self) -> u32 {
            self.frame_handler.max_frame_handling_tasks()
        }
        fn host_key(&self) -> &Option<OwnedValuePath> {
            self.frame_handler.host_key()
        }
        fn timestamp_key(&self) -> Option<&OwnedValuePath> {
            self.frame_handler.timestamp_key()
        }
        fn source_type_key(&self) -> Option<&OwnedValuePath> {
            self.frame_handler.source_type_key()
        }
    }
    impl<F: Send + Sync + Clone + FnOnce() + 'static> TcpFrameHandler for MockTcpFrameHandler<F> {
        fn address(&self) -> SocketListenAddr {
            self.address
        }
        fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
            self.keepalive
        }
        fn shutdown_timeout_secs(&self) -> Duration {
            self.shutdown_timeout_secs
        }
        fn tls(&self) -> MaybeTlsSettings {
            self.tls.clone()
        }
        fn tls_client_metadata_key(&self) -> Option<OwnedValuePath> {
            self.tls_client_metadata_key.clone()
        }
        fn receive_buffer_bytes(&self) -> Option<usize> {
            self.receive_buffer_bytes
        }
        fn max_connection_duration_secs(&self) -> Option<u64> {
            self.max_connection_duration_secs
        }
        fn max_connections(&self) -> Option<u32> {
            self.max_connections
        }
        fn insert_tls_client_metadata(&mut self, _: Option<CertificateMetadata>) {}
        fn allowed_origins(&self) -> Option<&[IpNet]> {
            self.permit_origin.as_deref()
        }
    }
    fn init_framestream_tcp(
        source_id: &str,
        addr: &SocketAddr,
        frame_handler: impl TcpFrameHandler + Send + Sync + Clone + 'static,
        pipeline: SourceSender,
    ) -> (JoinHandle<Result<(), ()>>, SourceShutdownCoordinator) {
        let source_id = ComponentKey::from(source_id);
        let mut shutdown = SourceShutdownCoordinator::default();
        let (shutdown_signal, _) = shutdown.register_source(&source_id, false);
        let server = build_framestream_tcp_source(frame_handler, shutdown_signal, pipeline)
            .expect("Failed to build framestream tcp source.");
        let join_handle = tokio::spawn(server);
        while std::net::TcpStream::connect(addr).is_err() {
            thread::sleep(Duration::from_millis(2));
        }
        (join_handle, shutdown)
    }
    fn init_framestream_unix(
        source_id: &str,
        frame_handler: impl UnixFrameHandler + Send + Sync + Clone + 'static,
        pipeline: SourceSender,
    ) -> (
        PathBuf,
        JoinHandle<Result<(), ()>>,
        SourceShutdownCoordinator,
    ) {
        let source_id = ComponentKey::from(source_id);
        let socket_path = frame_handler.socket_path();
        let mut shutdown = SourceShutdownCoordinator::default();
        let (shutdown_signal, _) = shutdown.register_source(&source_id, false);
        let server = build_framestream_unix_source(frame_handler, shutdown_signal, pipeline)
            .expect("Failed to build framestream unix source.");
        let join_handle = tokio::spawn(server);
        while std::os::unix::net::UnixStream::connect(&socket_path).is_err() {
            thread::sleep(Duration::from_millis(2));
        }
        (socket_path, join_handle, shutdown)
    }
    async fn make_tcp_stream(
        addr: SocketAddr,
    ) -> Framed<TcpStream, length_delimited::LengthDelimitedCodec> {
        let socket = TcpStream::connect(&addr).await.unwrap();
        Framed::new(socket, length_delimited::Builder::new().new_codec())
    }
    async fn make_unix_stream(
        path: PathBuf,
    ) -> Framed<UnixStream, length_delimited::LengthDelimitedCodec> {
        let socket = UnixStream::connect(&path).await.unwrap();
        Framed::new(socket, length_delimited::Builder::new().new_codec())
    }
    async fn send_data_frames<S: Sink<Bytes, Error = std::io::Error> + Unpin>(
        sock_sink: &mut S,
        frames: Vec<Result<Bytes, std::io::Error>>,
    ) {
        let mut stream = stream::iter(frames.into_iter());
        _ = sock_sink.send_all(&mut stream).await;
    }
    async fn send_control_frame<S: Sink<Bytes, Error = std::io::Error> + Unpin>(
        sock_sink: &mut S,
        frame: Bytes,
    ) {
        send_data_frames(sock_sink, vec![Ok(Bytes::new()), Ok(frame)]).await; }
    fn create_control_frame(header: ControlHeader) -> Bytes {
        Bytes::from(header.to_u32().to_be_bytes().to_vec())
    }
    fn create_control_frame_with_content(
        header: ControlHeader,
        content_types: Vec<Bytes>,
    ) -> Bytes {
        let mut frame = BytesMut::from(&header.to_u32().to_be_bytes()[..]);
        for content_type in content_types {
            frame.extend(ControlField::ContentType.to_u32().to_be_bytes());
            frame.extend((content_type.len() as u32).to_be_bytes());
            frame.extend(content_type.clone());
        }
        Bytes::from(frame)
    }
    fn assert_accept_frame(frame: &mut BytesMut, expected_content_type: Bytes) {
        assert_eq!(&frame[..4], &ControlHeader::Accept.to_u32().to_be_bytes(),);
        frame.advance(4);
        assert_eq!(
            &frame[..4],
            &ControlField::ContentType.to_u32().to_be_bytes(),
        );
        frame.advance(4);
        assert_eq!(
            &frame[..4],
            &(expected_content_type.len() as u32).to_be_bytes(),
        );
        frame.advance(4);
        assert_eq!(&frame[..], &expected_content_type[..]);
    }
    fn create_frame_handler(multithreaded: bool) -> impl UnixFrameHandler + Send + Sync + Clone {
        MockUnixFrameHandler::new("test_content".to_string(), multithreaded, move || {})
    }
    fn create_tcp_frame_handler(
        addr: SocketAddr,
        multithreaded: bool,
        permit_origin: Option<Vec<IpNet>>,
    ) -> impl TcpFrameHandler + Send + Sync + Clone {
        MockTcpFrameHandler::new(
            addr,
            "test_content".to_string(),
            multithreaded,
            move || {},
            permit_origin,
        )
    }
    async fn signal_shutdown(source_name: &str, shutdown: &mut SourceShutdownCoordinator) {
        let deadline = Instant::now() + Duration::from_secs(10);
        let id = ComponentKey::from(source_name);
        let shutdown_complete = shutdown.shutdown_source(&id, deadline);
        let shutdown_success = shutdown_complete.await;
        assert!(shutdown_success);
    }
    async fn test_normal_framestream<
        T: Sink<Bytes, Error = std::io::Error> + Unpin,
        U: Stream<Item = Result<BytesMut, std::io::Error>> + Unpin,
        V: Stream<Item = Event> + Unpin,
    >(
        source_name: &str,
        mut sock_sink: T,
        mut sock_stream: U,
        rx: V,
        mut shutdown: SourceShutdownCoordinator,
        source_handle: JoinHandle<Result<(), ()>>,
    ) {
        let content_type = Bytes::from(&b"test_content"[..]);
        let ready_msg =
            create_control_frame_with_content(ControlHeader::Ready, vec![content_type.clone()]);
        send_control_frame(&mut sock_sink, ready_msg).await;
        let mut frame_vec = collect_n_stream(&mut sock_stream, 2).await;
        assert_eq!(frame_vec[0].as_ref().unwrap().len(), 0);
        assert_accept_frame(frame_vec[1].as_mut().unwrap(), content_type);
        send_control_frame(&mut sock_sink, create_control_frame(ControlHeader::Start)).await;
        send_data_frames(
            &mut sock_sink,
            vec![Ok(Bytes::from("hello")), Ok(Bytes::from("world"))],
        )
        .await;
        let events = collect_n(rx, 2).await;
        send_control_frame(&mut sock_sink, create_control_frame(ControlHeader::Stop)).await;
        let message_key = log_schema().message_key().unwrap().to_string();
        assert!(events
            .iter()
            .any(|e| e.as_log()[&message_key] == "hello".into()));
        assert!(events
            .iter()
            .any(|e| e.as_log()[&message_key] == "world".into()));
        drop(sock_stream); signal_shutdown(source_name, &mut shutdown).await;
        _ = source_handle.await.unwrap();
    }
    async fn test_multiple_content_types<
        T: Sink<Bytes, Error = std::io::Error> + Unpin,
        U: Stream<Item = Result<BytesMut, std::io::Error>> + Unpin,
    >(
        source_name: &str,
        mut sock_sink: T,
        mut sock_stream: U,
        mut shutdown: SourceShutdownCoordinator,
        source_handle: JoinHandle<Result<(), ()>>,
    ) {
        let content_type = Bytes::from(&b"test_content"[..]);
        let ready_msg = create_control_frame_with_content(
            ControlHeader::Ready,
            vec![Bytes::from(&b"test_content2"[..]), content_type.clone()],
        ); send_control_frame(&mut sock_sink, ready_msg).await;
        let mut frame_vec = collect_n_stream(&mut sock_stream, 2).await;
        assert_eq!(frame_vec[0].as_ref().unwrap().len(), 0);
        assert_accept_frame(frame_vec[1].as_mut().unwrap(), content_type);
        drop(sock_stream); signal_shutdown(source_name, &mut shutdown).await;
        _ = source_handle.await.unwrap();
    }
    #[tokio::test(flavor = "multi_thread")]
    #[should_panic]
    async fn blocked_framestream_tcp() {
        let source_name = "test_source";
        let (tx, rx) = SourceSender::new_test();
        let addr = next_addr();
        let (source_handle, shutdown) = init_framestream_tcp(
            source_name,
            &addr,
            create_tcp_frame_handler(addr, false, Some(vec![])),
            tx,
        );
        let (sock_sink, sock_stream) = make_tcp_stream(addr).await.split();
        test_normal_framestream(
            source_name,
            sock_sink,
            sock_stream,
            rx,
            shutdown,
            source_handle,
        )
        .await;
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn normal_framestream_singlethreaded_tcp() {
        let source_name = "test_source";
        let (tx, rx) = SourceSender::new_test();
        let addr = next_addr();
        let (source_handle, shutdown) = init_framestream_tcp(
            source_name,
            &addr,
            create_tcp_frame_handler(addr, false, None),
            tx,
        );
        let (sock_sink, sock_stream) = make_tcp_stream(addr).await.split();
        test_normal_framestream(
            source_name,
            sock_sink,
            sock_stream,
            rx,
            shutdown,
            source_handle,
        )
        .await;
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn normal_framestream_singlethreaded_unix() {
        let source_name = "test_source";
        let (tx, rx) = SourceSender::new_test();
        let (path, source_handle, shutdown) =
            init_framestream_unix(source_name, create_frame_handler(false), tx);
        let (sock_sink, sock_stream) = make_unix_stream(path).await.split();
        test_normal_framestream(
            source_name,
            sock_sink,
            sock_stream,
            rx,
            shutdown,
            source_handle,
        )
        .await;
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn normal_framestream_multithreaded_tcp() {
        let source_name = "test_source";
        let (tx, rx) = SourceSender::new_test();
        let addr = next_addr();
        let (source_handle, shutdown) = init_framestream_tcp(
            source_name,
            &addr,
            create_tcp_frame_handler(addr, true, None),
            tx,
        );
        let (sock_sink, sock_stream) = make_tcp_stream(addr).await.split();
        test_normal_framestream(
            source_name,
            sock_sink,
            sock_stream,
            rx,
            shutdown,
            source_handle,
        )
        .await;
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn normal_framestream_multithreaded_unix() {
        let source_name = "test_source";
        let (tx, rx) = SourceSender::new_test();
        let (path, source_handle, shutdown) =
            init_framestream_unix(source_name, create_frame_handler(true), tx);
        let (sock_sink, sock_stream) = make_unix_stream(path).await.split();
        test_normal_framestream(
            source_name,
            sock_sink,
            sock_stream,
            rx,
            shutdown,
            source_handle,
        )
        .await;
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn multiple_content_types_tcp() {
        let source_name = "test_source";
        let (tx, _) = SourceSender::new_test();
        let addr = next_addr();
        let (source_handle, shutdown) = init_framestream_tcp(
            source_name,
            &addr,
            create_tcp_frame_handler(addr, false, None),
            tx,
        );
        let (sock_sink, sock_stream) = make_tcp_stream(addr).await.split();
        test_multiple_content_types(source_name, sock_sink, sock_stream, shutdown, source_handle)
            .await;
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn multiple_content_types_unix() {
        let source_name = "test_source";
        let (tx, _) = SourceSender::new_test();
        let (path, source_handle, shutdown) =
            init_framestream_unix(source_name, create_frame_handler(false), tx);
        let (sock_sink, sock_stream) = make_unix_stream(path).await.split();
        test_multiple_content_types(source_name, sock_sink, sock_stream, shutdown, source_handle)
            .await;
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn wrong_content_type() {
        let source_name = "test_source";
        let (tx, _) = SourceSender::new_test();
        let (path, source_handle, mut shutdown) =
            init_framestream_unix(source_name, create_frame_handler(false), tx);
        let (mut sock_sink, mut sock_stream) = make_unix_stream(path).await.split();
        let ready_msg = create_control_frame_with_content(
            ControlHeader::Ready,
            vec![Bytes::from(&b"test_content2"[..])],
        ); send_control_frame(&mut sock_sink, ready_msg).await;
        let content_type = Bytes::from(&b"test_content"[..]);
        let ready_msg =
            create_control_frame_with_content(ControlHeader::Ready, vec![content_type.clone()]);
        send_control_frame(&mut sock_sink, ready_msg).await;
        let mut frame_vec = collect_n_stream(&mut sock_stream, 2).await;
        assert_eq!(frame_vec[0].as_ref().unwrap().len(), 0);
        assert_accept_frame(frame_vec[1].as_mut().unwrap(), content_type);
        drop(sock_stream); signal_shutdown(source_name, &mut shutdown).await;
        _ = source_handle.await.unwrap();
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn data_too_soon() {
        let source_name = "test_source";
        let (tx, rx) = SourceSender::new_test();
        let (path, source_handle, mut shutdown) =
            init_framestream_unix(source_name, create_frame_handler(false), tx);
        let (mut sock_sink, mut sock_stream) = make_unix_stream(path).await.split();
        send_data_frames(
            &mut sock_sink,
            vec![Ok(Bytes::from("bad")), Ok(Bytes::from("data"))],
        )
        .await;
        let content_type = Bytes::from(&b"test_content"[..]);
        let ready_msg =
            create_control_frame_with_content(ControlHeader::Ready, vec![content_type.clone()]);
        send_control_frame(&mut sock_sink, ready_msg).await;
        let mut frame_vec = collect_n_stream(&mut sock_stream, 2).await;
        assert_eq!(frame_vec[0].as_ref().unwrap().len(), 0);
        assert_accept_frame(frame_vec[1].as_mut().unwrap(), content_type);
        send_control_frame(&mut sock_sink, create_control_frame(ControlHeader::Start)).await;
        send_data_frames(
            &mut sock_sink,
            vec![Ok(Bytes::from("hello")), Ok(Bytes::from("world"))],
        )
        .await;
        let events = collect_n(rx, 2).await;
        send_control_frame(&mut sock_sink, create_control_frame(ControlHeader::Stop)).await;
        assert_eq!(
            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
            "hello".into(),
        );
        assert_eq!(
            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
            "world".into(),
        );
        drop(sock_stream); signal_shutdown(source_name, &mut shutdown).await;
        _ = source_handle.await.unwrap();
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn unidirectional_framestream() {
        let source_name = "test_source";
        let (tx, rx) = SourceSender::new_test();
        let (path, source_handle, mut shutdown) =
            init_framestream_unix(source_name, create_frame_handler(false), tx);
        let (mut sock_sink, _) = make_unix_stream(path).await.split();
        let content_type = Bytes::from(&b"test_content"[..]);
        let start_msg = create_control_frame_with_content(ControlHeader::Start, vec![content_type]);
        send_control_frame(&mut sock_sink, start_msg).await;
        send_data_frames(
            &mut sock_sink,
            vec![Ok(Bytes::from("hello")), Ok(Bytes::from("world"))],
        )
        .await;
        let events = collect_n(rx, 2).await;
        send_control_frame(&mut sock_sink, create_control_frame(ControlHeader::Stop)).await;
        assert_eq!(
            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
            "hello".into(),
        );
        assert_eq!(
            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
            "world".into(),
        );
        signal_shutdown(source_name, &mut shutdown).await;
        _ = source_handle.await.unwrap();
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn test_spawn_event_handling_tasks() {
        let (out, rx) = SourceSender::new_test();
        let max_frame_handling_tasks = 20;
        let active_task_nums = Arc::new(AtomicU32::new(0));
        let active_task_nums_copy = Arc::clone(&active_task_nums);
        let max_task_nums_reached = Arc::new(AtomicU32::new(0));
        let max_task_nums_reached_copy = Arc::clone(&max_task_nums_reached);
        let mut join_handles = vec![];
        let active_task_nums_copy_2 = Arc::clone(&active_task_nums_copy);
        let extra_routine = move || {
            thread::sleep(Duration::from_millis(10));
            max_task_nums_reached_copy.fetch_max(
                active_task_nums_copy_2.load(Ordering::Acquire),
                Ordering::AcqRel,
            );
        };
        let total_events = max_frame_handling_tasks * 10;
        join_handles.push(tokio::spawn(async move {
            future::ready({
                let events = collect_n(rx, total_events as usize).await;
                assert_eq!(total_events as usize, events.len(), "Missed events");
            })
            .await;
        }));
        for i in 0..total_events {
            join_handles.push(spawn_event_handling_tasks(
                Bytes::from(format!("event_{}", i)),
                MockFrameHandler::new("test_content".to_string(), true, extra_routine.clone()),
                out.clone(),
                None,
                Arc::clone(&active_task_nums_copy),
                max_frame_handling_tasks,
            ));
        }
        future::join_all(join_handles).await;
        let final_task_nums = active_task_nums.load(Ordering::Acquire);
        assert_eq!(
            0, final_task_nums,
            "There should be NO left-over tasks at the end"
        );
        let max_task_nums_reached_value = max_task_nums_reached.load(Ordering::Acquire);
        assert!(
            max_task_nums_reached_value > 1,
            "MultiThreaded mode does NOT work"
        );
        assert!((max_task_nums_reached_value - max_frame_handling_tasks) < 2, "Max number of tasks at any given time should NOT Exceed max_frame_handling_tasks too much");
    }
}