use std::collections::BTreeMap;
use databend_client::APIClient as DatabendAPIClient;
use futures::future::FutureExt;
use tower::ServiceBuilder;
use vector_lib::codecs::encoding::{Framer, FramingConfig};
use vector_lib::configurable::{component::GenerateConfig, configurable_component};
use crate::{
    codecs::{Encoder, EncodingConfig},
    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
    http::{Auth, MaybeAuth},
    sinks::{
        util::{
            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
            TowerRequestConfig, UriSerde,
        },
        Healthcheck, VectorSink,
    },
    tls::TlsConfig,
    vector_version,
};
use super::{
    compression::DatabendCompression,
    encoding::{DatabendEncodingConfig, DatabendMissingFieldAS, DatabendSerializerConfig},
    request_builder::DatabendRequestBuilder,
    service::{DatabendRetryLogic, DatabendService},
    sink::DatabendSink,
};
#[configurable_component(sink("databend", "Deliver log data to a Databend database."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct DatabendConfig {
    #[configurable(metadata(
        docs::examples = "databend://localhost:8000/default?sslmode=disable"
    ))]
    pub endpoint: UriSerde,
    #[configurable(
        deprecated = "This option has been deprecated, use arguments in the DSN instead."
    )]
    pub tls: Option<TlsConfig>,
    #[configurable(metadata(docs::examples = "mydatabase"))]
    pub database: Option<String>,
    #[configurable(derived)]
    pub auth: Option<Auth>,
    #[configurable(metadata(docs::examples = "mytable"))]
    pub table: String,
    #[configurable(derived)]
    #[serde(default)]
    pub missing_field_as: DatabendMissingFieldAS,
    #[configurable(derived)]
    #[serde(default)]
    pub encoding: DatabendEncodingConfig,
    #[configurable(derived)]
    #[serde(default)]
    pub compression: DatabendCompression,
    #[configurable(derived)]
    #[serde(default)]
    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
    #[configurable(derived)]
    #[serde(default)]
    pub request: TowerRequestConfig,
    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    pub acknowledgements: AcknowledgementsConfig,
}
impl GenerateConfig for DatabendConfig {
    fn generate_config() -> toml::Value {
        toml::from_str(
            r#"endpoint = "databend://localhost:8000/default?sslmode=disable"
            table = "default"
        "#,
        )
        .unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "databend")]
impl SinkConfig for DatabendConfig {
    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
        let ua = format!("vector/{}", vector_version());
        let auth = self.auth.choose_one(&self.endpoint.auth)?;
        let authority = self
            .endpoint
            .uri
            .authority()
            .ok_or("Endpoint missing authority")?;
        let endpoint = match self.endpoint.uri.scheme().map(|s| s.as_str()) {
            Some("databend") => self.endpoint.to_string(),
            Some("http") => format!("databend://{}/?sslmode=disable", authority),
            Some("https") => format!("databend://{}", authority),
            None => {
                return Err("Missing scheme for Databend endpoint. Expected `databend`.".into());
            }
            Some(s) => {
                return Err(format!("Unsupported scheme for Databend endpoint: {}", s).into());
            }
        };
        let mut endpoint = url::Url::parse(&endpoint)?;
        match auth {
            Some(Auth::Basic { user, password }) => {
                let _ = endpoint.set_username(&user);
                let _ = endpoint.set_password(Some(password.inner()));
            }
            Some(Auth::Bearer { .. }) => {
                return Err("Bearer authentication is not supported currently".into());
            }
            None => {}
        }
        if let Some(database) = &self.database {
            endpoint.set_path(&format!("/{}", database));
        }
        let endpoint = endpoint.to_string();
        let health_client = DatabendAPIClient::new(&endpoint, Some(ua.clone())).await?;
        let healthcheck = select_one(health_client).boxed();
        let request_settings = self.request.into_settings();
        let batch_settings = self.batch.into_batcher_settings()?;
        let mut file_format_options = BTreeMap::new();
        let compression = match self.compression {
            DatabendCompression::Gzip => {
                file_format_options.insert("compression", "GZIP");
                Compression::gzip_default()
            }
            DatabendCompression::None => {
                file_format_options.insert("compression", "NONE");
                Compression::None
            }
        };
        let encoding: EncodingConfig = self.encoding.clone().into();
        let serializer = match self.encoding.config() {
            DatabendSerializerConfig::Json(_) => {
                file_format_options.insert("type", "NDJSON");
                file_format_options.insert("missing_field_as", self.missing_field_as.as_str());
                encoding.build()?
            }
            DatabendSerializerConfig::Csv(_) => {
                file_format_options.insert("type", "CSV");
                file_format_options.insert("field_delimiter", ",");
                file_format_options.insert("record_delimiter", "\n");
                file_format_options.insert("skip_header", "0");
                encoding.build()?
            }
        };
        let framer = FramingConfig::NewlineDelimited.build();
        let transformer = encoding.transformer();
        let mut copy_options = BTreeMap::new();
        copy_options.insert("purge", "true");
        let client = DatabendAPIClient::new(&endpoint, Some(ua)).await?;
        let service = DatabendService::new(
            client,
            self.table.clone(),
            file_format_options,
            copy_options,
        )?;
        let service = ServiceBuilder::new()
            .settings(request_settings, DatabendRetryLogic)
            .service(service);
        let encoder = Encoder::<Framer>::new(framer, serializer);
        let request_builder = DatabendRequestBuilder::new(compression, (transformer, encoder));
        let sink = DatabendSink::new(batch_settings, request_builder, service);
        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
    }
    fn input(&self) -> Input {
        Input::log()
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}
async fn select_one(client: DatabendAPIClient) -> crate::Result<()> {
    client.query("SELECT 1").await?;
    Ok(())
}
#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<DatabendConfig>();
    }
    #[test]
    fn parse_config() {
        let cfg = toml::from_str::<DatabendConfig>(
            r#"
            endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
            table = "mytable"
        "#,
        )
        .unwrap();
        assert_eq!(
            cfg.endpoint.uri,
            "databend://localhost:8000/mydatabase?sslmode=disable"
        );
        assert_eq!(cfg.table, "mytable");
        assert!(matches!(
            cfg.encoding.config(),
            DatabendSerializerConfig::Json(_)
        ));
        assert!(matches!(cfg.compression, DatabendCompression::None));
    }
    #[test]
    fn parse_config_with_encoding_compression() {
        let cfg = toml::from_str::<DatabendConfig>(
            r#"
            endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
            table = "mytable"
            encoding.codec = "csv"
            encoding.csv.fields = ["host", "timestamp", "message"]
            compression = "gzip"
        "#,
        )
        .unwrap();
        assert_eq!(
            cfg.endpoint.uri,
            "databend://localhost:8000/mydatabase?sslmode=disable"
        );
        assert_eq!(cfg.table, "mytable");
        assert!(matches!(
            cfg.encoding.config(),
            DatabendSerializerConfig::Csv(_)
        ));
        assert!(matches!(cfg.compression, DatabendCompression::Gzip));
    }
}