use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::time::Duration;
use indexmap::IndexMap;
use serde_with::serde_as;
use vrl::path::{parse_target_path, PathPrefix};
use vrl::prelude::{Collection, KeyString, Kind};
use vector_lib::configurable::configurable_component;
use crate::conditions::AnyCondition;
use crate::config::{
    schema, DataType, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
    TransformOutput,
};
use crate::schema::Definition;
use crate::transforms::reduce::merge_strategy::MergeStrategy;
use crate::transforms::{reduce::transform::Reduce, Transform};
#[serde_as]
#[configurable_component(transform(
"reduce",
"Collapse multiple log events into a single event based on a set of conditions and merge strategies.",
))]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(deny_unknown_fields)]
pub struct ReduceConfig {
    #[serde(default = "default_expire_after_ms")]
    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
    #[derivative(Default(value = "default_expire_after_ms()"))]
    #[configurable(metadata(docs::human_name = "Expire After"))]
    pub expire_after_ms: Duration,
    #[serde_as(as = "Option<serde_with::DurationMilliSeconds<u64>>")]
    #[derivative(Default(value = "Option::None"))]
    #[configurable(metadata(docs::human_name = "End-Every Period"))]
    pub end_every_period_ms: Option<Duration>,
    #[serde(default = "default_flush_period_ms")]
    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
    #[derivative(Default(value = "default_flush_period_ms()"))]
    #[configurable(metadata(docs::human_name = "Flush Period"))]
    pub flush_period_ms: Duration,
    pub max_events: Option<NonZeroUsize>,
    #[serde(default)]
    #[configurable(metadata(
        docs::examples = "request_id",
        docs::examples = "user_id",
        docs::examples = "transaction_id",
    ))]
    pub group_by: Vec<String>,
    #[serde(default)]
    #[configurable(metadata(
        docs::additional_props_description = "An individual merge strategy."
    ))]
    pub merge_strategies: IndexMap<KeyString, MergeStrategy>,
    pub ends_when: Option<AnyCondition>,
    pub starts_when: Option<AnyCondition>,
}
const fn default_expire_after_ms() -> Duration {
    Duration::from_millis(30000)
}
const fn default_flush_period_ms() -> Duration {
    Duration::from_millis(1000)
}
impl_generate_config_from_default!(ReduceConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "reduce")]
impl TransformConfig for ReduceConfig {
    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
        Reduce::new(self, &context.enrichment_tables).map(Transform::event_task)
    }
    fn input(&self) -> Input {
        Input::log()
    }
    fn outputs(
        &self,
        _: vector_lib::enrichment::TableRegistry,
        input_definitions: &[(OutputId, schema::Definition)],
        _: LogNamespace,
    ) -> Vec<TransformOutput> {
        let merged_definition: Definition = input_definitions
            .iter()
            .map(|(_output, definition)| definition.clone())
            .reduce(Definition::merge)
            .unwrap_or_else(Definition::any);
        let mut schema_definition = merged_definition;
        for (key, merge_strategy) in self.merge_strategies.iter() {
            let key = if let Ok(key) = parse_target_path(key) {
                key
            } else {
                continue;
            };
            let input_kind = match key.prefix {
                PathPrefix::Event => schema_definition.event_kind().at_path(&key.path),
                PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path),
            };
            let new_kind = match merge_strategy {
                MergeStrategy::Discard | MergeStrategy::Retain => {
                    input_kind.clone()
                }
                MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => {
                    match (input_kind.contains_integer(), input_kind.contains_float()) {
                        (true, true) => Kind::float().or_integer(),
                        (true, false) => Kind::integer(),
                        (false, true) => Kind::float(),
                        (false, false) => Kind::undefined(),
                    }
                }
                MergeStrategy::Array => {
                    let unknown_kind = input_kind.clone();
                    Kind::array(Collection::empty().with_unknown(unknown_kind))
                }
                MergeStrategy::Concat => {
                    let mut new_kind = Kind::never();
                    if input_kind.contains_bytes() {
                        new_kind.add_bytes();
                    }
                    if let Some(array) = input_kind.as_array() {
                        let array_elements = array.reduced_kind().union(input_kind.without_array());
                        new_kind.add_array(Collection::empty().with_unknown(array_elements));
                    }
                    new_kind
                }
                MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => {
                    if input_kind.contains_bytes() {
                        Kind::bytes()
                    } else {
                        Kind::undefined()
                    }
                }
                MergeStrategy::ShortestArray | MergeStrategy::LongestArray => {
                    if let Some(array) = input_kind.as_array() {
                        Kind::array(array.clone())
                    } else {
                        Kind::undefined()
                    }
                }
                MergeStrategy::FlatUnique => {
                    let mut array_elements = input_kind.without_array().without_object();
                    if let Some(array) = input_kind.as_array() {
                        array_elements = array_elements.union(array.reduced_kind());
                    }
                    if let Some(object) = input_kind.as_object() {
                        array_elements = array_elements.union(object.reduced_kind());
                    }
                    Kind::array(Collection::empty().with_unknown(array_elements))
                }
            };
            let new_kind = if input_kind.contains_undefined() {
                new_kind.or_undefined()
            } else {
                new_kind
            };
            schema_definition = schema_definition.with_field(&key, new_kind, None);
        }
        let mut output_definitions = HashMap::new();
        for (output, _input) in input_definitions {
            output_definitions.insert(output.clone(), schema_definition.clone());
        }
        vec![TransformOutput::new(DataType::Log, output_definitions)]
    }
}
#[cfg(test)]
mod test {
    use super::*;
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<ReduceConfig>();
    }
}