use std::{fs::remove_file, path::PathBuf, time::Duration};
use bytes::Bytes;
use futures::{FutureExt, StreamExt};
use tokio::{
    io::AsyncWriteExt,
    net::{UnixListener, UnixStream},
    time::sleep,
};
use tokio_stream::wrappers::UnixListenerStream;
use tokio_util::codec::FramedRead;
use tracing::{field, Instrument};
use vector_lib::codecs::StreamDecodingError;
use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
use vector_lib::EstimatedJsonEncodedSizeOf;
use super::AfterReadExt;
use crate::{
    async_read::VecAsyncReadExt,
    codecs::Decoder,
    event::Event,
    internal_events::{
        ConnectionOpen, OpenGauge, SocketEventsReceived, SocketMode, StreamClosedError,
        UnixSocketError, UnixSocketFileDeleteError,
    },
    shutdown::ShutdownSignal,
    sources::util::change_socket_permissions,
    sources::util::unix::UNNAMED_SOCKET_HOST,
    sources::Source,
    SourceSender,
};
pub fn build_unix_stream_source(
    listen_path: PathBuf,
    socket_file_mode: Option<u32>,
    decoder: Decoder,
    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
    shutdown: ShutdownSignal,
    out: SourceSender,
) -> crate::Result<Source> {
    Ok(Box::pin(async move {
        let listener = UnixListener::bind(&listen_path).unwrap_or_else(|e| {
            panic!(
                "Failed to bind to listener socket at path: {}. Err: {}",
                listen_path.to_string_lossy(),
                e
            )
        });
        info!(message = "Listening.", path = ?listen_path, r#type = "unix");
        change_socket_permissions(&listen_path, socket_file_mode)
            .expect("Failed to set socket permissions");
        let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
        let connection_open = OpenGauge::new();
        let stream = UnixListenerStream::new(listener).take_until(shutdown.clone());
        tokio::pin!(stream);
        while let Some(socket) = stream.next().await {
            let socket = match socket {
                Err(error) => {
                    error!(message = "Failed to accept socket.", %error);
                    continue;
                }
                Ok(socket) => socket,
            };
            let listen_path = listen_path.clone();
            let span = info_span!("connection");
            let received_from: Bytes = socket
                .peer_addr()
                .ok()
                .and_then(|addr| {
                    addr.as_pathname().map(|e| e.to_owned()).map({
                        |path| {
                            span.record("peer_path", field::debug(&path));
                            path.to_string_lossy().into_owned().into()
                        }
                    })
                })
                .unwrap_or_else(|| UNNAMED_SOCKET_HOST.into());
            let handle_events = handle_events.clone();
            let bytes_received = bytes_received.clone();
            let stream = socket
                .after_read(move |byte_size| {
                    bytes_received.emit(ByteSize(byte_size));
                })
                .allow_read_until(shutdown.clone().map(|_| ()));
            let mut stream = FramedRead::new(stream, decoder.clone());
            let connection_open = connection_open.clone();
            let mut out = out.clone();
            tokio::spawn(
                async move {
                    let _open_token = connection_open.open(|count| emit!(ConnectionOpen { count }));
                    while let Some(result) = stream.next().await {
                        match result {
                            Ok((mut events, _byte_size)) => {
                                emit!(SocketEventsReceived {
                                    mode: SocketMode::Unix,
                                    byte_size: events.estimated_json_encoded_size_of(),
                                    count: events.len(),
                                });
                                handle_events(&mut events, Some(received_from.clone()));
                                let count = events.len();
                                if (out.send_batch(events).await).is_err() {
                                    emit!(StreamClosedError { count });
                                }
                            }
                            Err(error) => {
                                emit!(UnixSocketError {
                                    error: &error,
                                    path: &listen_path
                                });
                                if !error.can_continue() {
                                    break;
                                }
                            }
                        }
                    }
                    info!("Finished sending.");
                    let socket: &mut UnixStream = stream.get_mut().get_mut().get_mut_ref();
                    if let Err(error) = socket.shutdown().await {
                        error!(message = "Failed shutting down socket.", %error);
                    }
                }
                .instrument(span.or_current()),
            );
        }
        while connection_open.any_open() {
            sleep(Duration::from_millis(10)).await;
        }
        if let Err(error) = remove_file(&listen_path) {
            emit!(UnixSocketFileDeleteError {
                path: &listen_path,
                error
            });
        }
        Ok(())
    }))
}