use std::time::Duration;
use futures::{FutureExt, StreamExt};
use http::Uri;
use hyper::{Body, Request};
use serde_with::serde_as;
use tokio_stream::wrappers::IntervalStream;
use vector_lib::config::LogNamespace;
use vector_lib::configurable::configurable_component;
use vector_lib::internal_event::{
    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
};
use vector_lib::EstimatedJsonEncodedSizeOf;
use self::types::Stats;
use crate::{
    config::{SourceConfig, SourceContext, SourceOutput},
    http::HttpClient,
    internal_events::{
        EventStoreDbMetricsHttpError, EventStoreDbStatsParsingError, EventsReceived,
        StreamClosedError,
    },
    tls::TlsSettings,
};
pub mod types;
#[serde_as]
#[configurable_component(source(
    "eventstoredb_metrics",
    "Receive metrics from collected by a EventStoreDB."
))]
#[derive(Clone, Debug, Default)]
pub struct EventStoreDbConfig {
    #[serde(default = "default_endpoint")]
    #[configurable(metadata(docs::examples = "https://localhost:2113/stats"))]
    endpoint: String,
    #[serde(default = "default_scrape_interval_secs")]
    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
    scrape_interval_secs: Duration,
    #[configurable(metadata(docs::examples = "eventstoredb"))]
    default_namespace: Option<String>,
}
const fn default_scrape_interval_secs() -> Duration {
    Duration::from_secs(15)
}
pub fn default_endpoint() -> String {
    "https://localhost:2113/stats".to_string()
}
impl_generate_config_from_default!(EventStoreDbConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "eventstoredb_metrics")]
impl SourceConfig for EventStoreDbConfig {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        eventstoredb(
            self.endpoint.clone(),
            self.scrape_interval_secs,
            self.default_namespace.clone(),
            cx,
        )
    }
    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        vec![SourceOutput::new_metrics()]
    }
    fn can_acknowledge(&self) -> bool {
        false
    }
}
fn eventstoredb(
    endpoint: String,
    interval: Duration,
    namespace: Option<String>,
    mut cx: SourceContext,
) -> crate::Result<super::Source> {
    let mut ticks = IntervalStream::new(tokio::time::interval(interval)).take_until(cx.shutdown);
    let tls_settings = TlsSettings::from_options(&None)?;
    let client = HttpClient::new(tls_settings, &cx.proxy)?;
    let url: Uri = endpoint.as_str().parse()?;
    let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
    let events_received = register!(EventsReceived);
    Ok(Box::pin(
        async move {
            while ticks.next().await.is_some() {
                let req = Request::get(&url)
                    .header("content-type", "application/json")
                    .body(Body::empty())
                    .expect("Building request should be infallible.");
                match client.send(req).await {
                    Err(error) => {
                        emit!(EventStoreDbMetricsHttpError {
                            error: error.into(),
                        });
                        continue;
                    }
                    Ok(resp) => {
                        let bytes = match hyper::body::to_bytes(resp.into_body()).await {
                            Ok(b) => b,
                            Err(error) => {
                                emit!(EventStoreDbMetricsHttpError {
                                    error: error.into(),
                                });
                                continue;
                            }
                        };
                        bytes_received.emit(ByteSize(bytes.len()));
                        match serde_json::from_slice::<Stats>(bytes.as_ref()) {
                            Err(error) => {
                                emit!(EventStoreDbStatsParsingError { error });
                                continue;
                            }
                            Ok(stats) => {
                                let metrics = stats.metrics(namespace.clone());
                                let count = metrics.len();
                                let byte_size = metrics.estimated_json_encoded_size_of();
                                events_received.emit(CountByteSize(count, byte_size));
                                if (cx.out.send_batch(metrics).await).is_err() {
                                    emit!(StreamClosedError { count });
                                    break;
                                }
                            }
                        }
                    }
                }
            }
        }
        .map(Ok)
        .boxed(),
    ))
}
#[cfg(all(test, feature = "eventstoredb_metrics-integration-tests"))]
mod integration_tests {
    use tokio::time::Duration;
    use super::*;
    use crate::test_util::components::{run_and_assert_source_compliance, SOURCE_TAGS};
    const EVENTSTOREDB_SCRAPE_ADDRESS: &str = "http://eventstoredb:2113/stats";
    #[tokio::test]
    async fn scrape_something() {
        let config = EventStoreDbConfig {
            endpoint: EVENTSTOREDB_SCRAPE_ADDRESS.to_owned(),
            scrape_interval_secs: Duration::from_secs(1),
            default_namespace: None,
        };
        let events =
            run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;
        assert!(!events.is_empty());
    }
}