use crate::config::schema;
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use heim::{disk::Partition, units::information::byte};
use indexmap::IndexMap;
use std::{collections::HashMap, path::PathBuf};
use vector_lib::{buffers::config::DiskUsage, internal_event::DEFAULT_OUTPUT};
use super::{
    builder::ConfigBuilder, transform::get_transform_output_ids, ComponentKey, Config, OutputId,
    Resource,
};
pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
    if config.provider.is_some()
        && (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty())
    {
        Err(vec![
            "No sources/transforms/sinks are allowed if provider config is present.".to_owned(),
        ])
    } else {
        Ok(())
    }
}
pub fn check_names<'a, I: Iterator<Item = &'a ComponentKey>>(names: I) -> Result<(), Vec<String>> {
    let errors: Vec<_> = names
        .filter(|component_key| component_key.id().contains('.'))
        .map(|component_key| {
            format!(
                "Component name \"{}\" should not contain a \".\"",
                component_key.id()
            )
        })
        .collect();
    if errors.is_empty() {
        Ok(())
    } else {
        Err(errors)
    }
}
pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
    let mut errors = vec![];
    if !config.allow_empty {
        if config.sources.is_empty() {
            errors.push("No sources defined in the config.".to_owned());
        }
        if config.sinks.is_empty() {
            errors.push("No sinks defined in the config.".to_owned());
        }
    }
    fn tagged<'a>(
        tag: &'static str,
        iter: impl Iterator<Item = &'a ComponentKey>,
    ) -> impl Iterator<Item = (&'static str, &'a ComponentKey)> {
        iter.map(move |x| (tag, x))
    }
    let mut used_keys = HashMap::<&ComponentKey, Vec<&'static str>>::new();
    for (ctype, id) in tagged("source", config.sources.keys())
        .chain(tagged("transform", config.transforms.keys()))
        .chain(tagged("sink", config.sinks.keys()))
    {
        let uses = used_keys.entry(id).or_default();
        uses.push(ctype);
    }
    for (id, uses) in used_keys.into_iter().filter(|(_id, uses)| uses.len() > 1) {
        errors.push(format!(
            "More than one component with name \"{}\" ({}).",
            id,
            uses.join(", ")
        ));
    }
    let sink_inputs = config
        .sinks
        .iter()
        .map(|(key, sink)| ("sink", key.clone(), sink.inputs.clone()));
    let transform_inputs = config
        .transforms
        .iter()
        .map(|(key, transform)| ("transform", key.clone(), transform.inputs.clone()));
    for (output_type, key, inputs) in sink_inputs.chain(transform_inputs) {
        if inputs.is_empty() {
            errors.push(format!(
                "{} \"{}\" has no inputs",
                capitalize(output_type),
                key
            ));
        }
        let mut frequencies = HashMap::new();
        for input in inputs {
            let entry = frequencies.entry(input).or_insert(0usize);
            *entry += 1;
        }
        for (dup, count) in frequencies.into_iter().filter(|(_name, count)| *count > 1) {
            errors.push(format!(
                "{} \"{}\" has input \"{}\" duplicated {} times",
                capitalize(output_type),
                key,
                dup,
                count,
            ));
        }
    }
    if errors.is_empty() {
        Ok(())
    } else {
        Err(errors)
    }
}
pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
    let source_resources = config
        .sources
        .iter()
        .map(|(id, config)| (id, config.inner.resources()));
    let sink_resources = config
        .sinks
        .iter()
        .map(|(id, config)| (id, config.resources(id)));
    let conflicting_components = Resource::conflicts(source_resources.chain(sink_resources));
    if conflicting_components.is_empty() {
        Ok(())
    } else {
        Err(conflicting_components
            .into_iter()
            .map(|(resource, components)| {
                format!(
                    "Resource `{}` is claimed by multiple components: {:?}",
                    resource, components
                )
            })
            .collect())
    }
}
pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
    let mut errors = Vec::new();
    for (key, source) in config.sources.iter() {
        let outputs = source.inner.outputs(config.schema.log_namespace());
        if outputs
            .iter()
            .map(|output| output.port.as_deref().unwrap_or(""))
            .any(|name| name == DEFAULT_OUTPUT)
        {
            errors.push(format!(
                "Source {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
            ));
        }
    }
    for (key, transform) in config.transforms.iter() {
        let definition = schema::Definition::any();
        if let Err(errs) = transform.inner.validate(&definition) {
            errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
        }
        if get_transform_output_ids(
            transform.inner.as_ref(),
            key.clone(),
            config.schema.log_namespace(),
        )
        .any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
        {
            errors.push(format!(
                "Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
            ));
        }
    }
    if errors.is_empty() {
        Ok(())
    } else {
        Err(errors)
    }
}
pub async fn check_buffer_preconditions(config: &Config) -> Result<(), Vec<String>> {
    let global_data_dir = config.global.data_dir.clone();
    let configured_disk_buffers = config
        .sinks()
        .flat_map(|(id, sink)| {
            sink.buffer
                .stages()
                .iter()
                .filter_map(|stage| stage.disk_usage(global_data_dir.clone(), id))
        })
        .collect::<Vec<_>>();
    if configured_disk_buffers.is_empty() {
        return Ok(());
    }
    let mountpoints = heim::disk::partitions()
        .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
        .or_else(|_| {
            heim::disk::partitions_physical()
                .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
        })
        .await;
    let mountpoints = match mountpoints {
        Ok(mut mountpoints) => {
            mountpoints.sort_by(|m1, _, m2, _| m2.cmp(m1));
            mountpoints
        }
        Err(e) => {
            warn!(
                cause = %e,
                message = "Failed to query disk partitions. Cannot ensure that buffer size limits are within physical storage capacity limits.",
            );
            return Ok(());
        }
    };
    let mountpoint_buffer_mapping = configured_disk_buffers.into_iter().fold(
        HashMap::new(),
        |mut mappings: HashMap<PathBuf, Vec<DiskUsage>>, usage| {
            let canonicalized_data_dir = usage
                .data_dir()
                .canonicalize()
                .unwrap_or_else(|_| usage.data_dir().to_path_buf());
            let mountpoint = mountpoints
                .keys()
                .find(|mountpoint| canonicalized_data_dir.starts_with(mountpoint));
            match mountpoint {
                None => warn!(
                    buffer_id = usage.id().id(),
                    data_dir = usage.data_dir().to_string_lossy().as_ref(),
                    canonicalized_data_dir = canonicalized_data_dir.to_string_lossy().as_ref(),
                    message = "Found no matching mountpoint for buffer data directory.",
                ),
                Some(mountpoint) => {
                    mappings.entry(mountpoint.clone()).or_default().push(usage);
                }
            }
            mappings
        },
    );
    let mut errors = Vec::new();
    for (mountpoint, buffers) in mountpoint_buffer_mapping {
        let buffer_max_size_total: u64 = buffers.iter().map(|usage| usage.max_size()).sum();
        let mountpoint_total_capacity = mountpoints
            .get(&mountpoint)
            .copied()
            .expect("mountpoint must exist");
        if buffer_max_size_total > mountpoint_total_capacity {
            let component_ids = buffers
                .iter()
                .map(|usage| usage.id().id())
                .collect::<Vec<_>>();
            errors.push(format!(
                "Mountpoint '{}' has total capacity of {} bytes, but configured buffers using mountpoint have total maximum size of {} bytes. \
Reduce the `max_size` of the buffers to fit within the total capacity of the mountpoint. (components associated with mountpoint: {})",
                mountpoint.to_string_lossy(), mountpoint_total_capacity, buffer_max_size_total, component_ids.join(", "),
            ));
        }
    }
    if errors.is_empty() {
        Ok(())
    } else {
        Err(errors)
    }
}
async fn process_partitions(partitions: Vec<Partition>) -> heim::Result<IndexMap<PathBuf, u64>> {
    stream::iter(partitions)
        .map(Ok)
        .and_then(|partition| {
            let mountpoint_path = partition.mount_point().to_path_buf();
            heim::disk::usage(mountpoint_path.clone())
                .map(|usage| usage.map(|usage| (mountpoint_path, usage.total().get::<byte>())))
        })
        .try_collect::<IndexMap<_, _>>()
        .await
}
pub fn warnings(config: &Config) -> Vec<String> {
    let mut warnings = vec![];
    let source_ids = config.sources.iter().flat_map(|(key, source)| {
        source
            .inner
            .outputs(config.schema.log_namespace())
            .iter()
            .map(|output| {
                if let Some(port) = &output.port {
                    ("source", OutputId::from((key, port.clone())))
                } else {
                    ("source", OutputId::from(key))
                }
            })
            .collect::<Vec<_>>()
    });
    let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
        get_transform_output_ids(
            transform.inner.as_ref(),
            key.clone(),
            config.schema.log_namespace(),
        )
        .map(|output| ("transform", output))
        .collect::<Vec<_>>()
    });
    for (input_type, id) in transform_ids.chain(source_ids) {
        if !config
            .transforms
            .iter()
            .any(|(_, transform)| transform.inputs.contains(&id))
            && !config
                .sinks
                .iter()
                .any(|(_, sink)| sink.inputs.contains(&id))
        {
            warnings.push(format!(
                "{} \"{}\" has no consumers",
                capitalize(input_type),
                id
            ));
        }
    }
    warnings
}
fn capitalize(s: &str) -> String {
    let mut s = s.to_owned();
    if let Some(r) = s.get_mut(0..1) {
        r.make_ascii_uppercase();
    }
    s
}