use std::{path::PathBuf, pin::Pin, time::Duration};
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use futures::{stream::BoxStream, SinkExt, StreamExt};
use snafu::{ResultExt, Snafu};
use tokio::{net::UnixStream, time::sleep};
use tokio_util::codec::Encoder;
use vector_lib::configurable::configurable_component;
use vector_lib::json_size::JsonSize;
use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf};
use crate::{
    codecs::Transformer,
    event::{Event, Finalizable},
    internal_events::{
        ConnectionOpen, OpenGauge, SocketMode, UnixSocketConnectionEstablished,
        UnixSocketOutgoingConnectionError, UnixSocketSendError,
    },
    sink_ext::VecSinkExt,
    sinks::{
        util::{
            retries::ExponentialBackoff,
            socket_bytes_sink::{BytesSink, ShutdownCheck},
            EncodedEvent, StreamSink,
        },
        Healthcheck, VectorSink,
    },
};
#[derive(Debug, Snafu)]
pub enum UnixError {
    #[snafu(display("Failed connecting to socket at path {}: {}", path.display(), source))]
    ConnectionError {
        source: tokio::io::Error,
        path: PathBuf,
    },
}
#[configurable_component]
#[derive(Clone, Debug)]
pub struct UnixSinkConfig {
    #[configurable(metadata(docs::examples = "/path/to/socket"))]
    pub path: PathBuf,
}
impl UnixSinkConfig {
    pub const fn new(path: PathBuf) -> Self {
        Self { path }
    }
    pub fn build(
        &self,
        transformer: Transformer,
        encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
            + Clone
            + Send
            + Sync
            + 'static,
    ) -> crate::Result<(VectorSink, Healthcheck)> {
        let connector = UnixConnector::new(self.path.clone());
        let sink = UnixSink::new(connector.clone(), transformer, encoder);
        Ok((
            VectorSink::from_event_streamsink(sink),
            Box::pin(async move { connector.healthcheck().await }),
        ))
    }
}
#[derive(Debug, Clone)]
struct UnixConnector {
    pub path: PathBuf,
}
impl UnixConnector {
    const fn new(path: PathBuf) -> Self {
        Self { path }
    }
    const fn fresh_backoff() -> ExponentialBackoff {
        ExponentialBackoff::from_millis(2)
            .factor(250)
            .max_delay(Duration::from_secs(60))
    }
    async fn connect(&self) -> Result<UnixStream, UnixError> {
        UnixStream::connect(&self.path)
            .await
            .context(ConnectionSnafu {
                path: self.path.clone(),
            })
    }
    async fn connect_backoff(&self) -> UnixStream {
        let mut backoff = Self::fresh_backoff();
        loop {
            match self.connect().await {
                Ok(stream) => {
                    emit!(UnixSocketConnectionEstablished { path: &self.path });
                    return stream;
                }
                Err(error) => {
                    emit!(UnixSocketOutgoingConnectionError { error });
                    sleep(backoff.next().unwrap()).await;
                }
            }
        }
    }
    async fn healthcheck(&self) -> crate::Result<()> {
        self.connect().await.map(|_| ()).map_err(Into::into)
    }
}
struct UnixSink<E>
where
    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
{
    connector: UnixConnector,
    transformer: Transformer,
    encoder: E,
}
impl<E> UnixSink<E>
where
    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
{
    pub const fn new(connector: UnixConnector, transformer: Transformer, encoder: E) -> Self {
        Self {
            connector,
            transformer,
            encoder,
        }
    }
    async fn connect(&mut self) -> BytesSink<UnixStream> {
        let stream = self.connector.connect_backoff().await;
        BytesSink::new(stream, |_| ShutdownCheck::Alive, SocketMode::Unix)
    }
}
#[async_trait]
impl<E> StreamSink<Event> for UnixSink<E>
where
    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
{
    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        let mut encoder = self.encoder.clone();
        let transformer = self.transformer.clone();
        let mut input = input
            .map(|mut event| {
                let byte_size = event.size_of();
                let json_byte_size = event.estimated_json_encoded_size_of();
                transformer.transform(&mut event);
                let finalizers = event.take_finalizers();
                let mut bytes = BytesMut::new();
                if encoder.encode(event, &mut bytes).is_ok() {
                    let item = bytes.freeze();
                    EncodedEvent {
                        item,
                        finalizers,
                        byte_size,
                        json_byte_size,
                    }
                } else {
                    EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
                }
            })
            .peekable();
        while Pin::new(&mut input).peek().await.is_some() {
            let mut sink = self.connect().await;
            let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count }));
            let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await {
                Ok(()) => sink.close().await,
                Err(error) => Err(error),
            };
            if let Err(error) = result {
                emit!(UnixSocketSendError {
                    error: &error,
                    path: &self.connector.path
                });
            }
        }
        Ok(())
    }
}
#[cfg(test)]
mod tests {
    use tokio::net::UnixListener;
    use vector_lib::codecs::{encoding::Framer, NewlineDelimitedEncoder, TextSerializerConfig};
    use super::*;
    use crate::{
        codecs::Encoder,
        test_util::{
            components::{assert_sink_compliance, SINK_TAGS},
            random_lines_with_stream, CountReceiver,
        },
    };
    fn temp_uds_path(name: &str) -> PathBuf {
        tempfile::tempdir().unwrap().into_path().join(name)
    }
    #[tokio::test]
    async fn unix_sink_healthcheck() {
        let good_path = temp_uds_path("valid_uds");
        let _listener = UnixListener::bind(&good_path).unwrap();
        assert!(UnixSinkConfig::new(good_path)
            .build(
                Default::default(),
                Encoder::<()>::new(TextSerializerConfig::default().build().into())
            )
            .unwrap()
            .1
            .await
            .is_ok());
        let bad_path = temp_uds_path("no_one_listening");
        assert!(UnixSinkConfig::new(bad_path)
            .build(
                Default::default(),
                Encoder::<()>::new(TextSerializerConfig::default().build().into())
            )
            .unwrap()
            .1
            .await
            .is_err());
    }
    #[tokio::test]
    async fn basic_unix_sink() {
        let num_lines = 1000;
        let out_path = temp_uds_path("unix_test");
        let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());
        let config = UnixSinkConfig::new(out_path);
        let (sink, _healthcheck) = config
            .build(
                Default::default(),
                Encoder::<Framer>::new(
                    NewlineDelimitedEncoder::default().into(),
                    TextSerializerConfig::default().build().into(),
                ),
            )
            .unwrap();
        let (input_lines, events) = random_lines_with_stream(100, num_lines, None);
        assert_sink_compliance(&SINK_TAGS, async move { sink.run(events).await })
            .await
            .expect("Running sink failed");
        receiver.connected().await;
        assert_eq!(input_lines, receiver.await);
    }
}