use std::{collections::HashMap, time::Duration};
use futures::FutureExt;
use rdkafka::ClientConfig;
use serde_with::serde_as;
use vector_lib::codecs::JsonSerializerConfig;
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::lookup_v2::ConfigTargetPath;
use vrl::value::Kind;
use crate::{
    kafka::{KafkaAuthConfig, KafkaCompression},
    serde::json::to_string,
    sinks::{
        kafka::sink::{healthcheck, KafkaSink},
        prelude::*,
    },
};
#[serde_as]
#[configurable_component(sink(
    "kafka",
    "Publish observability event data to Apache Kafka topics."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct KafkaSinkConfig {
    #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
    pub bootstrap_servers: String,
    #[configurable(metadata(docs::templateable))]
    #[configurable(metadata(
        docs::examples = "topic-1234",
        docs::examples = "logs-{{unit}}-%Y-%m-%d"
    ))]
    pub topic: Template,
    pub healthcheck_topic: Option<String>,
    #[configurable(metadata(docs::advanced))]
    #[configurable(metadata(docs::examples = "user_id"))]
    #[configurable(metadata(docs::examples = ".my_topic"))]
    #[configurable(metadata(docs::examples = "%my_topic"))]
    pub key_field: Option<ConfigTargetPath>,
    #[configurable(derived)]
    pub encoding: EncodingConfig,
    #[configurable(derived)]
    #[configurable(metadata(docs::advanced))]
    #[serde(default)]
    pub batch: BatchConfig<NoDefaultsBatchSettings>,
    #[configurable(derived)]
    #[configurable(metadata(docs::advanced))]
    #[serde(default)]
    pub compression: KafkaCompression,
    #[configurable(derived)]
    #[serde(flatten)]
    pub auth: KafkaAuthConfig,
    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
    #[serde(default = "default_socket_timeout_ms")]
    #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
    #[configurable(metadata(docs::advanced))]
    #[configurable(metadata(docs::human_name = "Socket Timeout"))]
    pub socket_timeout_ms: Duration,
    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
    #[configurable(metadata(docs::examples = 150000, docs::examples = 450000))]
    #[serde(default = "default_message_timeout_ms")]
    #[configurable(metadata(docs::human_name = "Message Timeout"))]
    #[configurable(metadata(docs::advanced))]
    pub message_timeout_ms: Duration,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
    #[configurable(metadata(docs::advanced))]
    #[configurable(metadata(
        docs::additional_props_description = "A librdkafka configuration option."
    ))]
    pub librdkafka_options: HashMap<String, String>,
    #[configurable(metadata(docs::advanced))]
    #[serde(alias = "headers_field")] #[configurable(metadata(docs::examples = "headers"))]
    pub headers_key: Option<ConfigTargetPath>,
    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    pub acknowledgements: AcknowledgementsConfig,
}
const fn default_socket_timeout_ms() -> Duration {
    Duration::from_millis(60000) }
const fn default_message_timeout_ms() -> Duration {
    Duration::from_millis(300000) }
fn example_librdkafka_options() -> HashMap<String, String> {
    HashMap::<_, _>::from_iter([
        ("client.id".to_string(), "${ENV_VAR}".to_string()),
        ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
        ("socket.send.buffer.bytes".to_string(), "100".to_string()),
    ])
}
impl KafkaSinkConfig {
    pub(crate) fn to_rdkafka(&self) -> crate::Result<ClientConfig> {
        let mut client_config = ClientConfig::new();
        client_config
            .set("bootstrap.servers", &self.bootstrap_servers)
            .set(
                "socket.timeout.ms",
                self.socket_timeout_ms.as_millis().to_string(),
            )
            .set("statistics.interval.ms", "1000");
        self.auth.apply(&mut client_config)?;
        client_config
            .set("compression.codec", to_string(self.compression))
            .set(
                "message.timeout.ms",
                self.message_timeout_ms.as_millis().to_string(),
            );
        if let Some(value) = self.batch.timeout_secs {
            let key = "queue.buffering.max.ms";
            if let Some(val) = self.librdkafka_options.get(key) {
                return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{key}={value}`.\
                                    The config already sets this as `librdkafka_options.queue.buffering.max.ms={val}`.\
                                    Please delete one.").into());
            }
            debug!(
                librdkafka_option = key,
                batch_option = "timeout_secs",
                value,
                "Applying batch option as librdkafka option."
            );
            client_config.set(key, (value * 1000.0).round().to_string());
        }
        if let Some(value) = self.batch.max_events {
            let key = "batch.num.messages";
            if let Some(val) = self.librdkafka_options.get(key) {
                return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{key}={value}`.\
                                    The config already sets this as `librdkafka_options.batch.num.messages={val}`.\
                                    Please delete one.").into());
            }
            debug!(
                librdkafka_option = key,
                batch_option = "max_events",
                value,
                "Applying batch option as librdkafka option."
            );
            client_config.set(key, value.to_string());
        }
        if let Some(value) = self.batch.max_bytes {
            let key = "batch.size";
            if let Some(val) = self.librdkafka_options.get(key) {
                return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{key}={value}`.\
                                    The config already sets this as `librdkafka_options.batch.size={val}`.\
                                    Please delete one.").into());
            }
            debug!(
                librdkafka_option = key,
                batch_option = "max_bytes",
                value,
                "Applying batch option as librdkafka option."
            );
            client_config.set(key, value.to_string());
        }
        for (key, value) in self.librdkafka_options.iter() {
            debug!(option = %key, value = %value, "Setting librdkafka option.");
            client_config.set(key.as_str(), value.as_str());
        }
        Ok(client_config)
    }
}
impl GenerateConfig for KafkaSinkConfig {
    fn generate_config() -> toml::Value {
        toml::Value::try_from(Self {
            bootstrap_servers: "10.14.22.123:9092,10.14.23.332:9092".to_owned(),
            topic: Template::try_from("topic-1234".to_owned()).unwrap(),
            healthcheck_topic: None,
            key_field: Some(ConfigTargetPath::try_from("user_id".to_owned()).unwrap()),
            encoding: JsonSerializerConfig::default().into(),
            batch: Default::default(),
            compression: KafkaCompression::None,
            auth: Default::default(),
            socket_timeout_ms: default_socket_timeout_ms(),
            message_timeout_ms: default_message_timeout_ms(),
            librdkafka_options: Default::default(),
            headers_key: None,
            acknowledgements: Default::default(),
        })
        .unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "kafka")]
impl SinkConfig for KafkaSinkConfig {
    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
        let sink = KafkaSink::new(self.clone())?;
        let hc = healthcheck(self.clone()).boxed();
        Ok((VectorSink::from_event_streamsink(sink), hc))
    }
    fn input(&self) -> Input {
        let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
        Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
            .with_schema_requirement(requirements)
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn generate_config() {
        KafkaSinkConfig::generate_config();
    }
}