use std::fmt;
use vector_lib::lookup::lookup_v2::ConfigValuePath;
use vrl::path::PathPrefix;
use crate::{
    sinks::{
        elasticsearch::{
            encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder,
            service::ElasticsearchRequest, BulkAction, ElasticsearchCommonMode,
        },
        prelude::*,
    },
    transforms::metric_to_log::MetricToLog,
};
use super::{
    encoder::{DocumentMetadata, DocumentVersion, DocumentVersionType},
    ElasticsearchCommon, ElasticsearchConfig, VersionType,
};
#[derive(Clone, Eq, Hash, PartialEq)]
pub struct PartitionKey {
    pub index: String,
    pub bulk_action: BulkAction,
}
pub struct ElasticsearchSink<S> {
    pub batch_settings: BatcherSettings,
    pub request_builder: ElasticsearchRequestBuilder,
    pub transformer: Transformer,
    pub service: S,
    pub metric_to_log: MetricToLog,
    pub mode: ElasticsearchCommonMode,
    pub id_key_field: Option<ConfigValuePath>,
}
impl<S> ElasticsearchSink<S> {
    pub fn new(
        common: &ElasticsearchCommon,
        config: &ElasticsearchConfig,
        service: S,
    ) -> crate::Result<Self> {
        let batch_settings = config.batch.into_batcher_settings()?;
        Ok(ElasticsearchSink {
            batch_settings,
            request_builder: common.request_builder.clone(),
            transformer: config.encoding.clone(),
            service,
            metric_to_log: common.metric_to_log.clone(),
            mode: common.mode.clone(),
            id_key_field: config.id_key.clone(),
        })
    }
}
impl<S> ElasticsearchSink<S>
where
    S: Service<ElasticsearchRequest> + Send + 'static,
    S::Future: Send + 'static,
    S::Response: DriverResponse + Send + 'static,
    S::Error: fmt::Debug + Into<crate::Error> + Send,
{
    pub async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        let mode = self.mode;
        let id_key_field = self.id_key_field.as_ref();
        let transformer = self.transformer.clone();
        input
            .scan(self.metric_to_log, |metric_to_log, event| {
                future::ready(Some(match event {
                    Event::Metric(metric) => metric_to_log.transform_one(metric),
                    Event::Log(log) => Some(log),
                    Event::Trace(_) => {
                        None
                    }
                }))
            })
            .filter_map(|x| async move { x })
            .filter_map(move |log| {
                future::ready(process_log(log, &mode, id_key_field, &transformer))
            })
            .batched(self.batch_settings.as_byte_size_config())
            .request_builder(
                default_request_builder_concurrency_limit(),
                self.request_builder,
            )
            .filter_map(|request| async move {
                match request {
                    Err(error) => {
                        emit!(SinkRequestBuildError { error });
                        None
                    }
                    Ok(req) => Some(req),
                }
            })
            .into_driver(self.service)
            .run()
            .await
    }
}
pub(super) fn process_log(
    mut log: LogEvent,
    mode: &ElasticsearchCommonMode,
    id_key_field: Option<&ConfigValuePath>,
    transformer: &Transformer,
) -> Option<ProcessedEvent> {
    let index = mode.index(&log)?;
    let bulk_action = mode.bulk_action(&log)?;
    if let Some(cfg) = mode.as_data_stream_config() {
        cfg.sync_fields(&mut log);
        cfg.remap_timestamp(&mut log);
    };
    let id = if let Some(Value::Bytes(key)) =
        id_key_field.and_then(|key| log.remove((PathPrefix::Event, key)))
    {
        Some(String::from_utf8_lossy(&key).into_owned())
    } else {
        None
    };
    let document_metadata = match (id.clone(), mode.version_type(), mode.version(&log)) {
        (None, _, _) => DocumentMetadata::WithoutId,
        (Some(id), None, None) | (Some(id), None, Some(_)) | (Some(id), Some(_), None) => {
            DocumentMetadata::Id(id)
        }
        (Some(id), Some(version_type), Some(version)) => match version_type {
            VersionType::Internal => DocumentMetadata::Id(id),
            VersionType::External => DocumentMetadata::IdAndVersion(
                id,
                DocumentVersion {
                    kind: DocumentVersionType::External,
                    value: version,
                },
            ),
            VersionType::ExternalGte => DocumentMetadata::IdAndVersion(
                id,
                DocumentVersion {
                    kind: DocumentVersionType::ExternalGte,
                    value: version,
                },
            ),
        },
    };
    let log = {
        let mut event = Event::from(log);
        transformer.transform(&mut event);
        event.into_log()
    };
    Some(ProcessedEvent {
        index,
        bulk_action,
        log,
        document_metadata,
    })
}
#[async_trait]
impl<S> StreamSink<Event> for ElasticsearchSink<S>
where
    S: Service<ElasticsearchRequest> + Send + 'static,
    S::Future: Send + 'static,
    S::Response: DriverResponse + Send + 'static,
    S::Error: fmt::Debug + Into<crate::Error> + Send,
{
    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        self.run_inner(input).await
    }
}