#[cfg(all(
    test,
    feature = "sources-demo_logs",
    feature = "transforms-remap",
    feature = "transforms-route",
    feature = "transforms-filter",
    feature = "transforms-reduce",
    feature = "sinks-console"
))]
mod tests;
mod unit_test_components;
use std::{
    collections::{BTreeMap, HashMap, HashSet},
    sync::Arc,
};
use futures_util::{stream::FuturesUnordered, StreamExt};
use indexmap::IndexMap;
use ordered_float::NotNan;
use tokio::sync::{
    oneshot::{self, Receiver},
    Mutex,
};
use uuid::Uuid;
use vrl::{
    compiler::{state::RuntimeState, Context, TargetValue, TimeZone},
    diagnostic::Formatter,
    value,
};
pub use self::unit_test_components::{
    UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
    UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
};
use super::{compiler::expand_globs, graph::Graph, transform::get_transform_output_ids, OutputId};
use crate::{
    conditions::Condition,
    config::{
        self, loading, ComponentKey, Config, ConfigBuilder, ConfigPath, SinkOuter, SourceOuter,
        TestDefinition, TestInput, TestInputValue, TestOutput,
    },
    event::{Event, EventMetadata, LogEvent, Value},
    signal,
    topology::{builder::TopologyPieces, RunningTopology},
};
pub struct UnitTest {
    pub name: String,
    config: Config,
    pieces: TopologyPieces,
    test_result_rxs: Vec<Receiver<UnitTestSinkResult>>,
}
pub struct UnitTestResult {
    pub errors: Vec<String>,
}
impl UnitTest {
    pub async fn run(self) -> UnitTestResult {
        let diff = config::ConfigDiff::initial(&self.config);
        let (topology, _) = RunningTopology::start_validated(self.config, diff, self.pieces)
            .await
            .unwrap();
        topology.sources_finished().await;
        let _stop_complete = topology.stop();
        let mut in_flight = self
            .test_result_rxs
            .into_iter()
            .collect::<FuturesUnordered<_>>();
        let mut errors = Vec::new();
        while let Some(partial_result) = in_flight.next().await {
            let partial_result = partial_result.expect(
                "An unexpected error occurred while executing unit tests. Please try again.",
            );
            errors.extend(partial_result.test_errors);
        }
        UnitTestResult { errors }
    }
}
fn init_log_schema_from_paths(
    config_paths: &[ConfigPath],
    deny_if_set: bool,
) -> Result<(), Vec<String>> {
    let builder = config::loading::load_builder_from_paths(config_paths)?;
    vector_lib::config::init_log_schema(builder.global.log_schema, deny_if_set);
    Ok(())
}
pub async fn build_unit_tests_main(
    paths: &[ConfigPath],
    signal_handler: &mut signal::SignalHandler,
) -> Result<Vec<UnitTest>, Vec<String>> {
    init_log_schema_from_paths(paths, false)?;
    let mut secrets_backends_loader = loading::load_secret_backends_from_paths(paths)?;
    let config_builder = if secrets_backends_loader.has_secrets_to_retrieve() {
        let resolved_secrets = secrets_backends_loader
            .retrieve(&mut signal_handler.subscribe())
            .await
            .map_err(|e| vec![e])?;
        loading::load_builder_from_paths_with_secrets(paths, resolved_secrets)?
    } else {
        loading::load_builder_from_paths(paths)?
    };
    build_unit_tests(config_builder).await
}
pub async fn build_unit_tests(
    mut config_builder: ConfigBuilder,
) -> Result<Vec<UnitTest>, Vec<String>> {
    config_builder.sources = Default::default();
    config_builder.sinks = Default::default();
    let test_definitions = std::mem::take(&mut config_builder.tests);
    let mut tests = Vec::new();
    let mut build_errors = Vec::new();
    let metadata = UnitTestBuildMetadata::initialize(&mut config_builder)?;
    for mut test_definition in test_definitions {
        let test_name = test_definition.name.clone();
        let legacy_input = std::mem::take(&mut test_definition.input);
        if let Some(input) = legacy_input {
            test_definition.inputs.push(input);
        }
        match build_unit_test(&metadata, test_definition, config_builder.clone()).await {
            Ok(test) => tests.push(test),
            Err(errors) => {
                let mut test_error = errors.join("\n");
                test_error = test_error.replace('\n', "\n  ");
                test_error.insert_str(0, &format!("Failed to build test '{}':\n  ", test_name));
                build_errors.push(test_error);
            }
        }
    }
    if build_errors.is_empty() {
        Ok(tests)
    } else {
        Err(build_errors)
    }
}
pub struct UnitTestBuildMetadata {
    available_insert_targets: HashSet<ComponentKey>,
    source_ids: HashMap<ComponentKey, String>,
    template_sources: IndexMap<ComponentKey, UnitTestSourceConfig>,
    sink_ids: HashMap<OutputId, String>,
}
impl UnitTestBuildMetadata {
    pub fn initialize(config_builder: &mut ConfigBuilder) -> Result<Self, Vec<String>> {
        let random_id = Uuid::new_v4().to_string();
        let available_insert_targets = config_builder
            .transforms
            .keys()
            .cloned()
            .collect::<HashSet<_>>();
        let source_ids = available_insert_targets
            .iter()
            .map(|key| (key.clone(), format!("{}-{}-{}", key, "source", random_id)))
            .collect::<HashMap<_, _>>();
        let mut template_sources = IndexMap::new();
        for (key, transform) in config_builder.transforms.iter_mut() {
            let test_source_id = source_ids
                .get(key)
                .expect("Missing test source for a transform")
                .clone();
            transform.inputs.extend(Some(test_source_id));
            template_sources.insert(key.clone(), UnitTestSourceConfig::default());
        }
        let builder = config_builder.clone();
        let available_extract_targets = builder
            .transforms
            .iter()
            .flat_map(|(key, transform)| {
                get_transform_output_ids(
                    transform.inner.as_ref(),
                    key.clone(),
                    builder.schema.log_namespace(),
                )
            })
            .collect::<HashSet<_>>();
        let sink_ids = available_extract_targets
            .iter()
            .map(|key| {
                (
                    key.clone(),
                    format!(
                        "{}-{}-{}",
                        key.to_string().replace('.', "-"),
                        "sink",
                        random_id
                    ),
                )
            })
            .collect::<HashMap<_, _>>();
        Ok(Self {
            available_insert_targets,
            source_ids,
            template_sources,
            sink_ids,
        })
    }
    pub fn hydrate_into_sources(
        &self,
        inputs: &[TestInput],
    ) -> Result<IndexMap<ComponentKey, SourceOuter>, Vec<String>> {
        let inputs = build_and_validate_inputs(inputs, &self.available_insert_targets)?;
        let mut template_sources = self.template_sources.clone();
        Ok(inputs
            .into_iter()
            .map(|(insert_at, events)| {
                let mut source_config =
                    template_sources
                        .shift_remove(&insert_at)
                        .unwrap_or_else(|| {
                            panic!(
                                "Invalid input: cannot insert at {:?}",
                                insert_at.to_string()
                            )
                        });
                source_config.events.extend(events);
                let id: &str = self
                    .source_ids
                    .get(&insert_at)
                    .expect("Corresponding source must exist")
                    .as_ref();
                (ComponentKey::from(id), SourceOuter::new(source_config))
            })
            .collect::<IndexMap<_, _>>())
    }
    pub fn hydrate_into_sinks(
        &self,
        test_name: &str,
        outputs: &[TestOutput],
        no_outputs_from: &[OutputId],
    ) -> Result<
        (
            Vec<Receiver<UnitTestSinkResult>>,
            IndexMap<ComponentKey, SinkOuter<String>>,
        ),
        Vec<String>,
    > {
        if outputs.is_empty() && no_outputs_from.is_empty() {
            return Err(vec![
                "unit test must contain at least one of `outputs` or `no_outputs_from`."
                    .to_string(),
            ]);
        }
        let outputs = build_outputs(outputs)?;
        let mut template_sinks = IndexMap::new();
        let mut test_result_rxs = Vec::new();
        for (ids, checks) in outputs {
            let (tx, rx) = oneshot::channel();
            let sink_ids = ids.clone();
            let sink_config = UnitTestSinkConfig {
                test_name: test_name.to_string(),
                transform_ids: ids.iter().map(|id| id.to_string()).collect(),
                result_tx: Arc::new(Mutex::new(Some(tx))),
                check: UnitTestSinkCheck::Checks(checks),
            };
            test_result_rxs.push(rx);
            template_sinks.insert(sink_ids, sink_config);
        }
        for id in no_outputs_from {
            let (tx, rx) = oneshot::channel();
            let sink_config = UnitTestSinkConfig {
                test_name: test_name.to_string(),
                transform_ids: vec![id.to_string()],
                result_tx: Arc::new(Mutex::new(Some(tx))),
                check: UnitTestSinkCheck::NoOutputs,
            };
            test_result_rxs.push(rx);
            template_sinks.insert(vec![id.clone()], sink_config);
        }
        let sinks = template_sinks
            .into_iter()
            .map(|(transform_ids, sink_config)| {
                let transform_ids_str = transform_ids
                    .iter()
                    .map(|s| s.to_string())
                    .collect::<Vec<_>>();
                let sink_ids = transform_ids
                    .iter()
                    .map(|transform_id| {
                        self.sink_ids
                            .get(transform_id)
                            .expect("Sink does not exist")
                            .as_str()
                    })
                    .collect::<Vec<_>>();
                let sink_id = sink_ids.join(",");
                (
                    ComponentKey::from(sink_id),
                    SinkOuter::new(transform_ids_str, sink_config),
                )
            })
            .collect::<IndexMap<_, _>>();
        Ok((test_result_rxs, sinks))
    }
}
fn get_relevant_test_components(
    sources: &[&ComponentKey],
    graph: &Graph,
) -> Result<HashSet<String>, Vec<String>> {
    graph.check_for_cycles().map_err(|error| vec![error])?;
    let mut errors = Vec::new();
    let mut components = HashSet::new();
    for source in sources {
        let paths = graph.paths_to_sink_from(source);
        if paths.is_empty() {
            errors.push(format!(
                "Unable to complete topology between input target '{}' and output target(s)",
                source
                    .to_string()
                    .rsplit_once("-source-")
                    .unwrap_or(("", ""))
                    .0
            ));
        } else {
            for path in paths {
                components.extend(path.into_iter().map(|key| key.to_string()));
            }
        }
    }
    if errors.is_empty() {
        Ok(components)
    } else {
        Err(errors)
    }
}
async fn build_unit_test(
    metadata: &UnitTestBuildMetadata,
    test: TestDefinition<String>,
    mut config_builder: ConfigBuilder,
) -> Result<UnitTest, Vec<String>> {
    let transform_only_config = config_builder.clone();
    let transform_only_graph = Graph::new_unchecked(
        &transform_only_config.sources,
        &transform_only_config.transforms,
        &transform_only_config.sinks,
        transform_only_config.schema,
    );
    let test = test.resolve_outputs(&transform_only_graph)?;
    let sources = metadata.hydrate_into_sources(&test.inputs)?;
    let (test_result_rxs, sinks) =
        metadata.hydrate_into_sinks(&test.name, &test.outputs, &test.no_outputs_from)?;
    config_builder.sources = sources;
    config_builder.sinks = sinks;
    expand_globs(&mut config_builder);
    let graph = Graph::new_unchecked(
        &config_builder.sources,
        &config_builder.transforms,
        &config_builder.sinks,
        config_builder.schema,
    );
    let mut valid_components = get_relevant_test_components(
        config_builder.sources.keys().collect::<Vec<_>>().as_ref(),
        &graph,
    )?;
    let unexpanded_transforms = valid_components
        .iter()
        .filter_map(|component| {
            component
                .split_once('.')
                .map(|(original_name, _)| original_name.to_string())
        })
        .collect::<Vec<_>>();
    valid_components.extend(unexpanded_transforms);
    config_builder.transforms = config_builder
        .transforms
        .into_iter()
        .filter(|(key, _)| valid_components.contains(&key.to_string()))
        .collect();
    let graph = Graph::new_unchecked(
        &config_builder.sources,
        &config_builder.transforms,
        &config_builder.sinks,
        config_builder.schema,
    );
    let valid_inputs = graph.input_map()?;
    for (_, transform) in config_builder.transforms.iter_mut() {
        let inputs = std::mem::take(&mut transform.inputs);
        transform.inputs = inputs
            .into_iter()
            .filter(|input| valid_inputs.contains_key(input))
            .collect();
    }
    if let Some(sink) = get_loose_end_outputs_sink(&config_builder) {
        config_builder
            .sinks
            .insert(ComponentKey::from(Uuid::new_v4().to_string()), sink);
    }
    let config = config_builder.build()?;
    let diff = config::ConfigDiff::initial(&config);
    let pieces = TopologyPieces::build(&config, &diff, HashMap::new(), Default::default()).await?;
    Ok(UnitTest {
        name: test.name,
        config,
        pieces,
        test_result_rxs,
    })
}
fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option<SinkOuter<String>> {
    let config = config.clone();
    let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
        get_transform_output_ids(
            transform.inner.as_ref(),
            key.clone(),
            config.schema.log_namespace(),
        )
        .map(|output| output.to_string())
        .collect::<Vec<_>>()
    });
    let mut loose_end_outputs = Vec::new();
    for id in transform_ids {
        if !config
            .transforms
            .iter()
            .any(|(_, transform)| transform.inputs.contains(&id))
            && !config
                .sinks
                .iter()
                .any(|(_, sink)| sink.inputs.contains(&id))
        {
            loose_end_outputs.push(id);
        }
    }
    if loose_end_outputs.is_empty() {
        None
    } else {
        let noop_sink = UnitTestSinkConfig {
            test_name: "".to_string(),
            transform_ids: vec![],
            result_tx: Arc::new(Mutex::new(None)),
            check: UnitTestSinkCheck::NoOp,
        };
        Some(SinkOuter::new(loose_end_outputs, noop_sink))
    }
}
fn build_and_validate_inputs(
    test_inputs: &[TestInput],
    available_insert_targets: &HashSet<ComponentKey>,
) -> Result<HashMap<ComponentKey, Vec<Event>>, Vec<String>> {
    let mut inputs = HashMap::new();
    let mut errors = Vec::new();
    if test_inputs.is_empty() {
        errors.push("must specify at least one input.".to_string());
        return Err(errors);
    }
    for (index, input) in test_inputs.iter().enumerate() {
        if available_insert_targets.contains(&input.insert_at) {
            match build_input_event(input) {
                Ok(input_event) => {
                    inputs
                        .entry(input.insert_at.clone())
                        .and_modify(|events: &mut Vec<Event>| {
                            events.push(input_event.clone());
                        })
                        .or_insert_with(|| vec![input_event]);
                }
                Err(error) => errors.push(error),
            }
        } else {
            errors.push(format!(
                "inputs[{}]: unable to locate target transform '{}'",
                index, input.insert_at
            ))
        }
    }
    if errors.is_empty() {
        Ok(inputs)
    } else {
        Err(errors)
    }
}
fn build_outputs(
    test_outputs: &[TestOutput],
) -> Result<IndexMap<Vec<OutputId>, Vec<Vec<Condition>>>, Vec<String>> {
    let mut outputs: IndexMap<Vec<OutputId>, Vec<Vec<Condition>>> = IndexMap::new();
    let mut errors = Vec::new();
    for output in test_outputs {
        let mut conditions = Vec::new();
        for (index, condition) in output
            .conditions
            .clone()
            .unwrap_or_default()
            .iter()
            .enumerate()
        {
            match condition.build(&Default::default()) {
                Ok(condition) => conditions.push(condition),
                Err(error) => errors.push(format!(
                    "failed to create test condition '{}': {}",
                    index, error
                )),
            }
        }
        outputs
            .entry(output.extract_from.clone().to_vec())
            .and_modify(|existing_conditions| existing_conditions.push(conditions.clone()))
            .or_insert(vec![conditions.clone()]);
    }
    if errors.is_empty() {
        Ok(outputs)
    } else {
        Err(errors)
    }
}
fn build_input_event(input: &TestInput) -> Result<Event, String> {
    match input.type_str.as_ref() {
        "raw" => match input.value.as_ref() {
            Some(v) => Ok(Event::Log(LogEvent::from_str_legacy(v.clone()))),
            None => Err("input type 'raw' requires the field 'value'".to_string()),
        },
        "vrl" => {
            if let Some(source) = &input.source {
                let fns = vrl::stdlib::all();
                let result = vrl::compiler::compile(source, &fns)
                    .map_err(|e| Formatter::new(source, e.clone()).to_string())?;
                let mut target = TargetValue {
                    value: value!({}),
                    metadata: value::Value::Object(BTreeMap::new()),
                    secrets: value::Secrets::default(),
                };
                let mut state = RuntimeState::default();
                let timezone = TimeZone::default();
                let mut ctx = Context::new(&mut target, &mut state, &timezone);
                result
                    .program
                    .resolve(&mut ctx)
                    .map(|_| {
                        Event::Log(LogEvent::from_parts(
                            target.value.clone(),
                            EventMetadata::default_with_value(target.metadata.clone()),
                        ))
                    })
                    .map_err(|e| e.to_string())
            } else {
                Err("input type 'vrl' requires the field 'source'".to_string())
            }
        }
        "log" => {
            if let Some(log_fields) = &input.log_fields {
                let mut event = LogEvent::from_str_legacy("");
                for (path, value) in log_fields {
                    let value: Value = match value {
                        TestInputValue::String(s) => Value::from(s.to_owned()),
                        TestInputValue::Boolean(b) => Value::from(*b),
                        TestInputValue::Integer(i) => Value::from(*i),
                        TestInputValue::Float(f) => Value::from(
                            NotNan::new(*f).map_err(|_| "NaN value not supported".to_string())?,
                        ),
                    };
                    event
                        .parse_path_and_insert(path, value)
                        .map_err(|e| e.to_string())?;
                }
                Ok(event.into())
            } else {
                Err("input type 'log' requires the field 'log_fields'".to_string())
            }
        }
        "metric" => {
            if let Some(metric) = &input.metric {
                Ok(Event::Metric(metric.clone()))
            } else {
                Err("input type 'metric' requires the field 'metric'".to_string())
            }
        }
        _ => Err(format!(
            "unrecognized input type '{}', expected one of: 'raw', 'log' or 'metric'",
            input.type_str
        )),
    }
}