use vector_lib::config::{clone_input_definitions, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::internal_event::{Count, InternalEventHandle as _, Registered};
use crate::{
    conditions::{AnyCondition, Condition},
    config::{
        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
        TransformOutput,
    },
    event::Event,
    internal_events::FilterEventsDropped,
    schema,
    transforms::{FunctionTransform, OutputBuffer, Transform},
};
#[configurable_component(transform("filter", "Filter events based on a set of conditions."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct FilterConfig {
    #[configurable(derived)]
    condition: AnyCondition,
}
impl From<AnyCondition> for FilterConfig {
    fn from(condition: AnyCondition) -> Self {
        Self { condition }
    }
}
impl GenerateConfig for FilterConfig {
    fn generate_config() -> toml::Value {
        toml::from_str(r#"condition = ".message = \"value\"""#).unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "filter")]
impl TransformConfig for FilterConfig {
    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
        Ok(Transform::function(Filter::new(
            self.condition.build(&context.enrichment_tables)?,
        )))
    }
    fn input(&self) -> Input {
        Input::all()
    }
    fn outputs(
        &self,
        _enrichment_tables: vector_lib::enrichment::TableRegistry,
        input_definitions: &[(OutputId, schema::Definition)],
        _: LogNamespace,
    ) -> Vec<TransformOutput> {
        vec![TransformOutput::new(
            DataType::all_bits(),
            clone_input_definitions(input_definitions),
        )]
    }
    fn enable_concurrency(&self) -> bool {
        true
    }
}
#[derive(Clone)]
pub struct Filter {
    condition: Condition,
    events_dropped: Registered<FilterEventsDropped>,
}
impl Filter {
    pub fn new(condition: Condition) -> Self {
        Self {
            condition,
            events_dropped: register!(FilterEventsDropped),
        }
    }
}
impl FunctionTransform for Filter {
    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
        let (result, event) = self.condition.check(event);
        if result {
            output.push(event);
        } else {
            self.events_dropped.emit(Count(1));
        }
    }
}
#[cfg(test)]
mod test {
    use std::sync::Arc;
    use tokio::sync::mpsc;
    use tokio_stream::wrappers::ReceiverStream;
    use vector_lib::config::ComponentKey;
    use vector_lib::event::{Metric, MetricKind, MetricValue};
    use super::*;
    use crate::config::schema::Definition;
    use crate::{
        conditions::ConditionConfig,
        event::{Event, LogEvent},
        test_util::components::assert_transform_compliance,
        transforms::test::create_topology,
    };
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<super::FilterConfig>();
    }
    #[tokio::test]
    async fn filter_basic() {
        assert_transform_compliance(async {
            let transform_config = FilterConfig::from(AnyCondition::from(ConditionConfig::IsLog));
            let (tx, rx) = mpsc::channel(1);
            let (topology, mut out) =
                create_topology(ReceiverStream::new(rx), transform_config).await;
            let mut log = Event::from(LogEvent::from("message"));
            tx.send(log.clone()).await.unwrap();
            log.set_source_id(Arc::new(ComponentKey::from("in")));
            log.set_upstream_id(Arc::new(OutputId::from("transform")));
            log.metadata_mut()
                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
            assert_eq!(out.recv().await.unwrap(), log);
            let metric = Event::from(Metric::new(
                "test metric",
                MetricKind::Incremental,
                MetricValue::Counter { value: 1.0 },
            ));
            tx.send(metric).await.unwrap();
            drop(tx);
            topology.stop().await;
            assert_eq!(out.recv().await, None);
        })
        .await;
    }
}