use opendal::{layers::LoggingLayer, services::Webhdfs, Operator};
use tower::ServiceBuilder;
use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig};
use vector_lib::configurable::configurable_component;
use vector_lib::{
    config::{AcknowledgementsConfig, DataType, Input},
    sink::VectorSink,
};
use crate::{
    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
    config::{GenerateConfig, SinkConfig, SinkContext},
    sinks::{
        opendal_common::*,
        util::{
            partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings,
            Compression,
        },
        Healthcheck,
    },
};
#[configurable_component(sink("webhdfs", "WebHDFS."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct WebHdfsConfig {
    #[serde(default)]
    pub root: String,
    #[serde(default)]
    #[configurable(metadata(docs::templateable))]
    pub prefix: String,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "http://127.0.0.1:9870"))]
    pub endpoint: String,
    #[serde(flatten)]
    pub encoding: EncodingConfigWithFraming,
    #[configurable(derived)]
    #[serde(default = "Compression::gzip_default")]
    pub compression: Compression,
    #[configurable(derived)]
    #[serde(default)]
    pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    pub acknowledgements: AcknowledgementsConfig,
}
impl GenerateConfig for WebHdfsConfig {
    fn generate_config() -> toml::Value {
        toml::Value::try_from(Self {
            root: "/".to_string(),
            prefix: "%F/".to_string(),
            endpoint: "http://127.0.0.1:9870".to_string(),
            encoding: (
                Some(NewlineDelimitedEncoderConfig::new()),
                JsonSerializerConfig::default(),
            )
                .into(),
            compression: Compression::gzip_default(),
            batch: BatchConfig::default(),
            acknowledgements: Default::default(),
        })
        .unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "webhdfs")]
impl SinkConfig for WebHdfsConfig {
    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
        let op = self.build_operator()?;
        let check_op = op.clone();
        let healthcheck = Box::pin(async move { Ok(check_op.check().await?) });
        let sink = self.build_processor(op)?;
        Ok((sink, healthcheck))
    }
    fn input(&self) -> Input {
        Input::new(self.encoding.config().1.input_type() & DataType::Log)
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}
impl WebHdfsConfig {
    pub fn build_operator(&self) -> crate::Result<Operator> {
        let mut builder = Webhdfs::default();
        builder.root(&self.root);
        builder.endpoint(&self.endpoint);
        let op = Operator::new(builder)?
            .layer(LoggingLayer::default())
            .finish();
        Ok(op)
    }
    pub fn build_processor(&self, op: Operator) -> crate::Result<VectorSink> {
        let batcher_settings = self.batch.into_batcher_settings()?;
        let transformer = self.encoding.transformer();
        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
        let encoder = Encoder::<Framer>::new(framer, serializer);
        let request_builder = OpenDalRequestBuilder {
            encoder: (transformer, encoder),
            compression: self.compression,
        };
        let svc = ServiceBuilder::new().service(OpenDalService::new(op));
        let sink = OpenDalSink::new(
            svc,
            request_builder,
            self.key_partitioner()?,
            batcher_settings,
        );
        Ok(VectorSink::from_event_streamsink(sink))
    }
    pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
        let prefix = self.prefix.clone().try_into()?;
        Ok(KeyPartitioner::new(prefix, None))
    }
}