use vector_lib::{
    config::{clone_input_definitions, LogNamespace},
    configurable::configurable_component,
};
use crate::{
    config::{
        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
        TransformOutput,
    },
    schema,
    transforms::Transform,
};
use super::{
    common::{default_cache_config, fill_default_fields_match, CacheConfig, FieldMatchConfig},
    transform::Dedupe,
};
#[configurable_component(transform("dedupe", "Deduplicate logs passing through a topology."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct DedupeConfig {
    #[configurable(derived)]
    #[serde(default)]
    pub fields: Option<FieldMatchConfig>,
    #[configurable(derived)]
    #[serde(default = "default_cache_config")]
    pub cache: CacheConfig,
}
impl GenerateConfig for DedupeConfig {
    fn generate_config() -> toml::Value {
        toml::Value::try_from(Self {
            fields: None,
            cache: default_cache_config(),
        })
        .unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "dedupe")]
impl TransformConfig for DedupeConfig {
    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
        Ok(Transform::event_task(Dedupe::new(
            self.cache.num_events,
            fill_default_fields_match(self.fields.as_ref()),
        )))
    }
    fn input(&self) -> Input {
        Input::log()
    }
    fn outputs(
        &self,
        _: vector_lib::enrichment::TableRegistry,
        input_definitions: &[(OutputId, schema::Definition)],
        _: LogNamespace,
    ) -> Vec<TransformOutput> {
        vec![TransformOutput::new(
            DataType::Log,
            clone_input_definitions(input_definitions),
        )]
    }
}
#[cfg(test)]
mod tests {
    use std::sync::Arc;
    use tokio::sync::mpsc;
    use tokio_stream::wrappers::ReceiverStream;
    use vector_lib::config::ComponentKey;
    use vector_lib::config::OutputId;
    use vector_lib::lookup::lookup_v2::ConfigTargetPath;
    use crate::config::schema::Definition;
    use crate::{
        event::{Event, LogEvent, ObjectMap, Value},
        test_util::components::assert_transform_compliance,
        transforms::{
            dedupe::config::{CacheConfig, DedupeConfig, FieldMatchConfig},
            test::create_topology,
        },
    };
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<DedupeConfig>();
    }
    fn make_match_transform_config(
        num_events: usize,
        fields: Vec<ConfigTargetPath>,
    ) -> DedupeConfig {
        DedupeConfig {
            cache: CacheConfig {
                num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
            },
            fields: Some(FieldMatchConfig::MatchFields(fields)),
        }
    }
    fn make_ignore_transform_config(
        num_events: usize,
        given_fields: Vec<ConfigTargetPath>,
    ) -> DedupeConfig {
        let mut fields = vec!["message".into(), "timestamp".into()];
        fields.extend(given_fields);
        DedupeConfig {
            cache: CacheConfig {
                num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
            },
            fields: Some(FieldMatchConfig::IgnoreFields(fields)),
        }
    }
    #[tokio::test]
    async fn dedupe_match_basic() {
        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
        basic(transform_config, "matched", "unmatched").await;
    }
    #[tokio::test]
    async fn dedupe_ignore_basic() {
        let transform_config = make_ignore_transform_config(5, vec!["unmatched".into()]);
        basic(transform_config, "matched", "unmatched").await;
    }
    #[tokio::test]
    async fn dedupe_ignore_with_metadata_field() {
        let transform_config = make_ignore_transform_config(5, vec!["%ignored".into()]);
        basic(transform_config, "matched", "%ignored").await;
    }
    async fn basic(transform_config: DedupeConfig, first_path: &str, second_path: &str) {
        assert_transform_compliance(async {
            let (tx, rx) = mpsc::channel(1);
            let (topology, mut out) =
                create_topology(ReceiverStream::new(rx), transform_config).await;
            let mut event1 = Event::Log(LogEvent::from("message"));
            event1.as_mut_log().insert(first_path, "some value");
            event1.as_mut_log().insert(second_path, "another value");
            let mut event2 = Event::Log(LogEvent::from("message"));
            event2.as_mut_log().insert(first_path, "some value2");
            event2.as_mut_log().insert(second_path, "another value");
            let mut event3 = Event::Log(LogEvent::from("message"));
            event3.as_mut_log().insert(first_path, "some value");
            event3.as_mut_log().insert(second_path, "another value2");
            tx.send(event1.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event1.set_source_id(Arc::new(ComponentKey::from("in")));
            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
            event1
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event1);
            tx.send(event2.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event2.set_source_id(Arc::new(ComponentKey::from("in")));
            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
            event2
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event2);
            tx.send(event3.clone()).await.unwrap();
            drop(tx);
            topology.stop().await;
            assert_eq!(out.recv().await, None);
        })
        .await;
    }
    #[tokio::test]
    async fn dedupe_match_field_name_matters() {
        let transform_config =
            make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
        field_name_matters(transform_config).await;
    }
    #[tokio::test]
    async fn dedupe_ignore_field_name_matters() {
        let transform_config = make_ignore_transform_config(5, vec![]);
        field_name_matters(transform_config).await;
    }
    async fn field_name_matters(transform_config: DedupeConfig) {
        assert_transform_compliance(async {
            let (tx, rx) = mpsc::channel(1);
            let (topology, mut out) =
                create_topology(ReceiverStream::new(rx), transform_config).await;
            let mut event1 = Event::Log(LogEvent::from("message"));
            event1.as_mut_log().insert("matched1", "some value");
            let mut event2 = Event::Log(LogEvent::from("message"));
            event2.as_mut_log().insert("matched2", "some value");
            tx.send(event1.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event1.set_source_id(Arc::new(ComponentKey::from("in")));
            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
            event1
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event1);
            tx.send(event2.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event2.set_source_id(Arc::new(ComponentKey::from("in")));
            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
            event2
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event2);
            drop(tx);
            topology.stop().await;
            assert_eq!(out.recv().await, None);
        })
        .await;
    }
    #[tokio::test]
    async fn dedupe_match_field_order_irrelevant() {
        let transform_config =
            make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
        field_order_irrelevant(transform_config).await;
    }
    #[tokio::test]
    async fn dedupe_ignore_field_order_irrelevant() {
        let transform_config = make_ignore_transform_config(5, vec!["randomData".into()]);
        field_order_irrelevant(transform_config).await;
    }
    async fn field_order_irrelevant(transform_config: DedupeConfig) {
        assert_transform_compliance(async {
            let (tx, rx) = mpsc::channel(1);
            let (topology, mut out) =
                create_topology(ReceiverStream::new(rx), transform_config).await;
            let mut event1 = Event::Log(LogEvent::from("message"));
            event1.as_mut_log().insert("matched1", "value1");
            event1.as_mut_log().insert("matched2", "value2");
            let mut event2 = Event::Log(LogEvent::from("message"));
            event2.as_mut_log().insert("matched2", "value2");
            event2.as_mut_log().insert("matched1", "value1");
            tx.send(event1.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event1.set_source_id(Arc::new(ComponentKey::from("in")));
            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
            event1
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event1);
            tx.send(event2).await.unwrap();
            drop(tx);
            topology.stop().await;
            assert_eq!(out.recv().await, None);
        })
        .await;
    }
    #[tokio::test]
    async fn dedupe_match_age_out() {
        let transform_config = make_match_transform_config(1, vec!["matched".into()]);
        age_out(transform_config).await;
    }
    #[tokio::test]
    async fn dedupe_ignore_age_out() {
        let transform_config = make_ignore_transform_config(1, vec![]);
        age_out(transform_config).await;
    }
    async fn age_out(transform_config: DedupeConfig) {
        assert_transform_compliance(async {
            let (tx, rx) = mpsc::channel(1);
            let (topology, mut out) =
                create_topology(ReceiverStream::new(rx), transform_config).await;
            let mut event1 = Event::Log(LogEvent::from("message"));
            event1.as_mut_log().insert("matched", "some value");
            let mut event2 = Event::Log(LogEvent::from("message"));
            event2.as_mut_log().insert("matched", "some value2");
            tx.send(event1.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event1.set_source_id(Arc::new(ComponentKey::from("in")));
            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
            event1
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event1);
            tx.send(event2.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event2.set_source_id(Arc::new(ComponentKey::from("in")));
            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
            event2
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event2);
            tx.send(event1.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event1.set_source_id(Arc::new(ComponentKey::from("in")));
            assert_eq!(new_event, event1);
            drop(tx);
            topology.stop().await;
            assert_eq!(out.recv().await, None);
        })
        .await;
    }
    #[tokio::test]
    async fn dedupe_match_type_matching() {
        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
        type_matching(transform_config).await;
    }
    #[tokio::test]
    async fn dedupe_ignore_type_matching() {
        let transform_config = make_ignore_transform_config(5, vec![]);
        type_matching(transform_config).await;
    }
    async fn type_matching(transform_config: DedupeConfig) {
        assert_transform_compliance(async {
            let (tx, rx) = mpsc::channel(1);
            let (topology, mut out) =
                create_topology(ReceiverStream::new(rx), transform_config).await;
            let mut event1 = Event::Log(LogEvent::from("message"));
            event1.as_mut_log().insert("matched", "123");
            let mut event2 = Event::Log(LogEvent::from("message"));
            event2.as_mut_log().insert("matched", 123);
            tx.send(event1.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event1.set_source_id(Arc::new(ComponentKey::from("in")));
            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
            event1
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event1);
            tx.send(event2.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event2.set_source_id(Arc::new(ComponentKey::from("in")));
            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
            event2
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event2);
            drop(tx);
            topology.stop().await;
            assert_eq!(out.recv().await, None);
        })
        .await;
    }
    #[tokio::test]
    async fn dedupe_match_type_matching_nested_objects() {
        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
        type_matching_nested_objects(transform_config).await;
    }
    #[tokio::test]
    async fn dedupe_ignore_type_matching_nested_objects() {
        let transform_config = make_ignore_transform_config(5, vec![]);
        type_matching_nested_objects(transform_config).await;
    }
    async fn type_matching_nested_objects(transform_config: DedupeConfig) {
        assert_transform_compliance(async {
            let (tx, rx) = mpsc::channel(1);
            let (topology, mut out) =
                create_topology(ReceiverStream::new(rx), transform_config).await;
            let mut map1 = ObjectMap::new();
            map1.insert("key".into(), "123".into());
            let mut event1 = Event::Log(LogEvent::from("message"));
            event1.as_mut_log().insert("matched", map1);
            let mut map2 = ObjectMap::new();
            map2.insert("key".into(), 123.into());
            let mut event2 = Event::Log(LogEvent::from("message"));
            event2.as_mut_log().insert("matched", map2);
            tx.send(event1.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event1.set_source_id(Arc::new(ComponentKey::from("in")));
            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
            event1
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event1);
            tx.send(event2.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event2.set_source_id(Arc::new(ComponentKey::from("in")));
            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
            event2
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event2);
            drop(tx);
            topology.stop().await;
            assert_eq!(out.recv().await, None);
        })
        .await;
    }
    #[tokio::test]
    async fn dedupe_match_null_vs_missing() {
        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
        ignore_vs_missing(transform_config).await;
    }
    #[tokio::test]
    async fn dedupe_ignore_null_vs_missing() {
        let transform_config = make_ignore_transform_config(5, vec![]);
        ignore_vs_missing(transform_config).await;
    }
    async fn ignore_vs_missing(transform_config: DedupeConfig) {
        assert_transform_compliance(async {
            let (tx, rx) = mpsc::channel(1);
            let (topology, mut out) =
                create_topology(ReceiverStream::new(rx), transform_config).await;
            let mut event1 = Event::Log(LogEvent::from("message"));
            event1.as_mut_log().insert("matched", Value::Null);
            let mut event2 = Event::Log(LogEvent::from("message"));
            tx.send(event1.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event1.set_source_id(Arc::new(ComponentKey::from("in")));
            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
            event1
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event1);
            tx.send(event2.clone()).await.unwrap();
            let new_event = out.recv().await.unwrap();
            event2.set_source_id(Arc::new(ComponentKey::from("in")));
            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
            event2
                .metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(new_event, event2);
            drop(tx);
            topology.stop().await;
            assert_eq!(out.recv().await, None);
        })
        .await;
    }
}