use std::cell::RefCell;
use std::collections::HashMap;
use async_trait::async_trait;
use dyn_clone::DynClone;
use vector_config::{Configurable, GenerateError, Metadata, NamedComponent};
use vector_config_common::attributes::CustomAttribute;
use vector_config_common::schema::{SchemaGenerator, SchemaObject};
use vector_config_macros::configurable_component;
use vector_lib::{
    config::{
        AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
        SourceOutput,
    },
    source::Source,
};
use super::{dot_graph::GraphConfig, schema, ComponentKey, ProxyConfig, Resource};
use crate::{extra_context::ExtraContext, shutdown::ShutdownSignal, SourceSender};
pub type BoxedSource = Box<dyn SourceConfig>;
impl Configurable for BoxedSource {
    fn referenceable_name() -> Option<&'static str> {
        Some("vector::sources::Sources")
    }
    fn metadata() -> Metadata {
        let mut metadata = Metadata::default();
        metadata.set_description("Configurable sources in Vector.");
        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
        metadata
    }
    fn generate_schema(gen: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
        vector_lib::configurable::component::SourceDescription::generate_schemas(gen)
    }
}
impl<T: SourceConfig + 'static> From<T> for BoxedSource {
    fn from(value: T) -> Self {
        Box::new(value)
    }
}
#[configurable_component]
#[configurable(metadata(docs::component_base_type = "source"))]
#[derive(Clone, Debug)]
pub struct SourceOuter {
    #[configurable(derived)]
    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
    pub proxy: ProxyConfig,
    #[configurable(derived)]
    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
    pub graph: GraphConfig,
    #[serde(default, skip)]
    pub sink_acknowledgements: bool,
    #[configurable(metadata(docs::hidden))]
    #[serde(flatten)]
    pub(crate) inner: BoxedSource,
}
impl SourceOuter {
    pub(crate) fn new<I: Into<BoxedSource>>(inner: I) -> Self {
        Self {
            proxy: Default::default(),
            graph: Default::default(),
            sink_acknowledgements: false,
            inner: inner.into(),
        }
    }
}
#[async_trait]
#[typetag::serde(tag = "type")]
pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
    async fn build(&self, cx: SourceContext) -> crate::Result<Source>;
    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput>;
    fn resources(&self) -> Vec<Resource> {
        Vec::new()
    }
    fn can_acknowledge(&self) -> bool;
}
dyn_clone::clone_trait_object!(SourceConfig);
pub struct SourceContext {
    pub key: ComponentKey,
    pub globals: GlobalOptions,
    pub shutdown: ShutdownSignal,
    pub out: SourceSender,
    pub proxy: ProxyConfig,
    pub acknowledgements: bool,
    pub schema: schema::Options,
    pub schema_definitions: HashMap<Option<String>, schema::Definition>,
    pub extra_context: ExtraContext,
}
impl SourceContext {
    #[cfg(any(test, feature = "test-utils"))]
    pub fn new_shutdown(
        key: &ComponentKey,
        out: SourceSender,
    ) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
        let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
        let (shutdown_signal, _) = shutdown.register_source(key, false);
        (
            Self {
                key: key.clone(),
                globals: GlobalOptions::default(),
                shutdown: shutdown_signal,
                out,
                proxy: Default::default(),
                acknowledgements: false,
                schema_definitions: HashMap::default(),
                schema: Default::default(),
                extra_context: Default::default(),
            },
            shutdown,
        )
    }
    #[cfg(any(test, feature = "test-utils"))]
    pub fn new_test(
        out: SourceSender,
        schema_definitions: Option<HashMap<Option<String>, schema::Definition>>,
    ) -> Self {
        Self {
            key: ComponentKey::from("default"),
            globals: GlobalOptions::default(),
            shutdown: ShutdownSignal::noop(),
            out,
            proxy: Default::default(),
            acknowledgements: false,
            schema_definitions: schema_definitions.unwrap_or_default(),
            schema: Default::default(),
            extra_context: Default::default(),
        }
    }
    pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
        let config = AcknowledgementsConfig::from(config);
        if config.enabled() {
            warn!(
                message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
                component_id = self.key.id(),
            );
        }
        config
            .merge_default(&self.globals.acknowledgements)
            .merge_default(&self.acknowledgements.into())
            .enabled()
    }
    pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
        namespace
            .or(self.schema.log_namespace)
            .unwrap_or(false)
            .into()
    }
}