use chrono::Utc;
use futures::{stream, StreamExt};
use vector_lib::codecs::BytesDeserializerConfig;
use vector_lib::config::log_schema;
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::lookup_v2::OptionalValuePath;
use vector_lib::lookup::{owned_value_path, path, OwnedValuePath};
use vector_lib::{
    config::{LegacyKey, LogNamespace},
    schema::Definition,
};
use vrl::value::Kind;
use crate::{
    config::{DataType, SourceConfig, SourceContext, SourceOutput},
    event::{EstimatedJsonEncodedSizeOf, Event},
    internal_events::{InternalLogsBytesReceived, InternalLogsEventsReceived, StreamClosedError},
    shutdown::ShutdownSignal,
    trace::TraceSubscription,
    SourceSender,
};
#[configurable_component(source(
    "internal_logs",
    "Expose internal log messages emitted by the running Vector instance."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct InternalLogsConfig {
    host_key: Option<OptionalValuePath>,
    #[serde(default = "default_pid_key")]
    pid_key: OptionalValuePath,
    #[configurable(metadata(docs::hidden))]
    #[serde(default)]
    log_namespace: Option<bool>,
}
fn default_pid_key() -> OptionalValuePath {
    OptionalValuePath::from(owned_value_path!("pid"))
}
impl_generate_config_from_default!(InternalLogsConfig);
impl Default for InternalLogsConfig {
    fn default() -> InternalLogsConfig {
        InternalLogsConfig {
            host_key: None,
            pid_key: default_pid_key(),
            log_namespace: None,
        }
    }
}
impl InternalLogsConfig {
    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
        let host_key = self
            .host_key
            .clone()
            .unwrap_or(log_schema().host_key().cloned().into())
            .path
            .map(LegacyKey::Overwrite);
        let pid_key = self.pid_key.clone().path.map(LegacyKey::Overwrite);
        BytesDeserializerConfig
            .schema_definition(log_namespace)
            .with_standard_vector_source_metadata()
            .with_source_metadata(
                InternalLogsConfig::NAME,
                host_key,
                &owned_value_path!("host"),
                Kind::bytes().or_undefined(),
                Some("host"),
            )
            .with_source_metadata(
                InternalLogsConfig::NAME,
                pid_key,
                &owned_value_path!("pid"),
                Kind::integer(),
                None,
            )
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "internal_logs")]
impl SourceConfig for InternalLogsConfig {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        let host_key = self
            .host_key
            .clone()
            .unwrap_or(log_schema().host_key().cloned().into())
            .path;
        let pid_key = self.pid_key.clone().path;
        let subscription = TraceSubscription::subscribe();
        let log_namespace = cx.log_namespace(self.log_namespace);
        Ok(Box::pin(run(
            host_key,
            pid_key,
            subscription,
            cx.out,
            cx.shutdown,
            log_namespace,
        )))
    }
    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        let schema_definition =
            self.schema_definition(global_log_namespace.merge(self.log_namespace));
        vec![SourceOutput::new_maybe_logs(
            DataType::Log,
            schema_definition,
        )]
    }
    fn can_acknowledge(&self) -> bool {
        false
    }
}
async fn run(
    host_key: Option<OwnedValuePath>,
    pid_key: Option<OwnedValuePath>,
    mut subscription: TraceSubscription,
    mut out: SourceSender,
    shutdown: ShutdownSignal,
    log_namespace: LogNamespace,
) -> Result<(), ()> {
    let hostname = crate::get_hostname();
    let pid = std::process::id();
    let buffered_events = subscription.buffered_events().await;
    let mut rx = stream::iter(buffered_events.into_iter().flatten())
        .chain(subscription.into_stream())
        .take_until(shutdown);
    while let Some(mut log) = rx.next().await {
        let byte_size = log.estimated_json_encoded_size_of().get();
        let json_byte_size = log.estimated_json_encoded_size_of();
        emit!(InternalLogsBytesReceived { byte_size });
        emit!(InternalLogsEventsReceived {
            count: 1,
            byte_size: json_byte_size,
        });
        if let Ok(hostname) = &hostname {
            let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
            log_namespace.insert_source_metadata(
                InternalLogsConfig::NAME,
                &mut log,
                legacy_host_key,
                path!("host"),
                hostname.to_owned(),
            );
        }
        let legacy_pid_key = pid_key.as_ref().map(LegacyKey::Overwrite);
        log_namespace.insert_source_metadata(
            InternalLogsConfig::NAME,
            &mut log,
            legacy_pid_key,
            path!("pid"),
            pid,
        );
        log_namespace.insert_standard_vector_source_metadata(
            &mut log,
            InternalLogsConfig::NAME,
            Utc::now(),
        );
        if (out.send_event(Event::from(log)).await).is_err() {
            emit!(StreamClosedError { count: 1 });
            return Err(());
        }
    }
    Ok(())
}
#[cfg(test)]
mod tests {
    use futures::Stream;
    use tokio::time::{sleep, Duration};
    use vector_lib::event::Value;
    use vector_lib::lookup::OwnedTargetPath;
    use vrl::value::kind::Collection;
    use super::*;
    use crate::{
        event::Event,
        test_util::{
            collect_ready,
            components::{assert_source_compliance, SOURCE_TAGS},
        },
        trace,
    };
    #[test]
    fn generates_config() {
        crate::test_util::test_generate_config::<InternalLogsConfig>();
    }
    #[tokio::test]
    async fn receives_logs() {
        trace::init(false, false, "debug", 10);
        trace::reset_early_buffer();
        assert_source_compliance(&SOURCE_TAGS, run_test()).await;
    }
    async fn run_test() {
        let test_id: u8 = rand::random();
        let start = chrono::Utc::now();
        error!(message = "Before source started without span.", %test_id);
        let span = error_span!(
            "source",
            component_kind = "source",
            component_id = "foo",
            component_type = "internal_logs",
        );
        let _enter = span.enter();
        error!(message = "Before source started.", %test_id);
        let rx = start_source().await;
        error!(message = "After source started.", %test_id);
        {
            let nested_span = error_span!(
                "nested span",
                component_kind = "bar",
                component_new_field = "baz",
                component_numerical_field = 1,
                ignored_field = "foobarbaz",
            );
            let _enter = nested_span.enter();
            error!(message = "In a nested span.", %test_id);
        }
        sleep(Duration::from_millis(1)).await;
        let mut events = collect_ready(rx).await;
        let test_id = Value::from(test_id.to_string());
        events.retain(|event| event.as_log().get("test_id") == Some(&test_id));
        let end = chrono::Utc::now();
        assert_eq!(events.len(), 4);
        assert_eq!(
            events[0].as_log()["message"],
            "Before source started without span.".into()
        );
        assert_eq!(
            events[1].as_log()["message"],
            "Before source started.".into()
        );
        assert_eq!(
            events[2].as_log()["message"],
            "After source started.".into()
        );
        assert_eq!(events[3].as_log()["message"], "In a nested span.".into());
        for (i, event) in events.iter().enumerate() {
            let log = event.as_log();
            let timestamp = *log["timestamp"]
                .as_timestamp()
                .expect("timestamp isn't a timestamp");
            assert!(timestamp >= start);
            assert!(timestamp <= end);
            assert_eq!(log["metadata.kind"], "event".into());
            assert_eq!(log["metadata.level"], "ERROR".into());
            if i == 0 {
                assert!(log.get("vector.component_id").is_none());
                assert!(log.get("vector.component_kind").is_none());
                assert!(log.get("vector.component_type").is_none());
            } else if i < 3 {
                assert_eq!(log["vector.component_id"], "foo".into());
                assert_eq!(log["vector.component_kind"], "source".into());
                assert_eq!(log["vector.component_type"], "internal_logs".into());
            } else {
                assert_eq!(log["vector.component_id"], "foo".into());
                assert_eq!(log["vector.component_kind"], "bar".into());
                assert_eq!(log["vector.component_type"], "internal_logs".into());
                assert_eq!(log["vector.component_new_field"], "baz".into());
                assert_eq!(log["vector.component_numerical_field"], 1.into());
                assert!(log.get("vector.ignored_field").is_none());
            }
        }
    }
    async fn start_source() -> impl Stream<Item = Event> + Unpin {
        let (tx, rx) = SourceSender::new_test();
        let source = InternalLogsConfig::default()
            .build(SourceContext::new_test(tx, None))
            .await
            .unwrap();
        tokio::spawn(source);
        sleep(Duration::from_millis(1)).await;
        trace::stop_early_buffering();
        rx
    }
    #[test]
    fn output_schema_definition_vector_namespace() {
        let config = InternalLogsConfig::default();
        let definitions = config
            .outputs(LogNamespace::Vector)
            .remove(0)
            .schema_definition(true);
        let expected_definition =
            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
                .with_meaning(OwnedTargetPath::event_root(), "message")
                .with_metadata_field(
                    &owned_value_path!("vector", "source_type"),
                    Kind::bytes(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!(InternalLogsConfig::NAME, "pid"),
                    Kind::integer(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!("vector", "ingest_timestamp"),
                    Kind::timestamp(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!(InternalLogsConfig::NAME, "host"),
                    Kind::bytes().or_undefined(),
                    Some("host"),
                );
        assert_eq!(definitions, Some(expected_definition))
    }
    #[test]
    fn output_schema_definition_legacy_namespace() {
        let mut config = InternalLogsConfig::default();
        let pid_key = "pid_a_pid_a_pid_pid_pid";
        config.pid_key = OptionalValuePath::from(owned_value_path!(pid_key));
        let definitions = config
            .outputs(LogNamespace::Legacy)
            .remove(0)
            .schema_definition(true);
        let expected_definition = Definition::new_with_default_metadata(
            Kind::object(Collection::empty()),
            [LogNamespace::Legacy],
        )
        .with_event_field(
            &owned_value_path!("message"),
            Kind::bytes(),
            Some("message"),
        )
        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
        .with_event_field(&owned_value_path!(pid_key), Kind::integer(), None)
        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
        .with_event_field(
            &owned_value_path!("host"),
            Kind::bytes().or_undefined(),
            Some("host"),
        );
        assert_eq!(definitions, Some(expected_definition))
    }
}