use std::{fs::remove_file, path::PathBuf};
use bytes::{Bytes, BytesMut};
use futures::StreamExt;
use tokio::net::UnixDatagram;
use tokio_util::codec::FramedRead;
use tracing::field;
use vector_lib::codecs::StreamDecodingError;
use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
use vector_lib::EstimatedJsonEncodedSizeOf;
use crate::{
    codecs::Decoder,
    event::Event,
    internal_events::{
        SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
        UnixSocketFileDeleteError,
    },
    shutdown::ShutdownSignal,
    sources::util::change_socket_permissions,
    sources::util::unix::UNNAMED_SOCKET_HOST,
    sources::Source,
    SourceSender,
};
pub fn build_unix_datagram_source(
    listen_path: PathBuf,
    socket_file_mode: Option<u32>,
    max_length: usize,
    decoder: Decoder,
    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
    shutdown: ShutdownSignal,
    out: SourceSender,
) -> crate::Result<Source> {
    Ok(Box::pin(async move {
        let socket = UnixDatagram::bind(&listen_path).expect("Failed to bind to datagram socket");
        info!(message = "Listening.", path = ?listen_path, r#type = "unix_datagram");
        change_socket_permissions(&listen_path, socket_file_mode)
            .expect("Failed to set socket permissions");
        let result = listen(socket, max_length, decoder, shutdown, handle_events, out).await;
        if let Err(error) = remove_file(&listen_path) {
            emit!(UnixSocketFileDeleteError {
                path: &listen_path,
                error
            });
        }
        result
    }))
}
async fn listen(
    socket: UnixDatagram,
    max_length: usize,
    decoder: Decoder,
    mut shutdown: ShutdownSignal,
    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
    mut out: SourceSender,
) -> Result<(), ()> {
    let mut buf = BytesMut::with_capacity(max_length);
    let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
    loop {
        buf.resize(max_length, 0);
        tokio::select! {
            recv = socket.recv_from(&mut buf) => {
                let (byte_size, address) = recv.map_err(|error| {
                    let error = vector_lib::codecs::decoding::Error::FramingError(error.into());
                    emit!(SocketReceiveError {
                        mode: SocketMode::Unix,
                        error: &error
                    })
                })?;
                let span = info_span!("datagram");
                let received_from = if !address.is_unnamed() {
                    let path = address.as_pathname().map(|e| e.to_owned()).inspect(|path| {
                        span.record("peer_path", field::debug(path));
                    });
                    path.map(|p| p.to_string_lossy().into_owned().into())
                } else {
                    span.record("peer_path", field::debug(UNNAMED_SOCKET_HOST));
                    Some(UNNAMED_SOCKET_HOST.into())
                };
                bytes_received.emit(ByteSize(byte_size));
                let payload = buf.split_to(byte_size);
                let mut stream = FramedRead::new(payload.as_ref(), decoder.clone());
                loop {
                    match stream.next().await {
                        Some(Ok((mut events, _byte_size))) => {
                            emit!(SocketEventsReceived {
                                mode: SocketMode::Unix,
                                byte_size: events.estimated_json_encoded_size_of(),
                                count: events.len()
                            });
                            handle_events(&mut events, received_from.clone());
                            let count = events.len();
                            if (out.send_batch(events).await).is_err() {
                                emit!(StreamClosedError { count });
                            }
                        },
                        Some(Err(error)) => {
                            emit!(SocketReceiveError {
                                mode: SocketMode::Unix,
                                error: &error
                            });
                            if !error.can_continue() {
                                break;
                            }
                        },
                        None => break,
                    }
                }
            }
            _ = &mut shutdown => return Ok(()),
        }
    }
}