use std::{fmt, sync::Arc};
use super::request_builder::HecLogsRequestBuilder;
use crate::{
    internal_events::SplunkEventTimestampInvalidType,
    internal_events::SplunkEventTimestampMissing,
    sinks::{
        prelude::*,
        splunk_hec::common::{
            render_template_string, request::HecRequest, EndpointTarget, INDEX_FIELD,
            SOURCETYPE_FIELD, SOURCE_FIELD,
        },
        util::processed_event::ProcessedEvent,
    },
};
use vector_lib::{
    config::{log_schema, LogNamespace},
    lookup::{event_path, lookup_v2::OptionalTargetPath, OwnedValuePath, PathPrefix},
    schema::meaning,
};
use vrl::path::OwnedTargetPath;
pub struct HecLogsSink<S> {
    pub service: S,
    pub request_builder: HecLogsRequestBuilder,
    pub batch_settings: BatcherSettings,
    pub sourcetype: Option<Template>,
    pub source: Option<Template>,
    pub index: Option<Template>,
    pub indexed_fields: Vec<OwnedValuePath>,
    pub host_key: Option<OptionalTargetPath>,
    pub timestamp_nanos_key: Option<String>,
    pub timestamp_key: Option<OptionalTargetPath>,
    pub endpoint_target: EndpointTarget,
    pub auto_extract_timestamp: bool,
}
pub struct HecLogData<'a> {
    pub sourcetype: Option<&'a Template>,
    pub source: Option<&'a Template>,
    pub index: Option<&'a Template>,
    pub indexed_fields: &'a [OwnedValuePath],
    pub host_key: Option<OptionalTargetPath>,
    pub timestamp_nanos_key: Option<&'a String>,
    pub timestamp_key: Option<OptionalTargetPath>,
    pub endpoint_target: EndpointTarget,
    pub auto_extract_timestamp: bool,
}
impl<S> HecLogsSink<S>
where
    S: Service<HecRequest> + Send + 'static,
    S::Future: Send + 'static,
    S::Response: DriverResponse + Send + 'static,
    S::Error: fmt::Debug + Into<crate::Error> + Send,
{
    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        let data = HecLogData {
            sourcetype: self.sourcetype.as_ref(),
            source: self.source.as_ref(),
            index: self.index.as_ref(),
            indexed_fields: self.indexed_fields.as_slice(),
            host_key: self.host_key.clone(),
            timestamp_nanos_key: self.timestamp_nanos_key.as_ref(),
            timestamp_key: self.timestamp_key.clone(),
            endpoint_target: self.endpoint_target,
            auto_extract_timestamp: self.auto_extract_timestamp,
        };
        let batch_settings = self.batch_settings;
        input
            .map(move |event| process_log(event, &data))
            .batched_partitioned(
                if self.endpoint_target == EndpointTarget::Raw {
                    EventPartitioner::new(
                        self.sourcetype.clone(),
                        self.source.clone(),
                        self.index.clone(),
                        self.host_key.clone(),
                    )
                } else {
                    EventPartitioner::new(None, None, None, None)
                },
                || batch_settings.as_byte_size_config(),
            )
            .request_builder(
                default_request_builder_concurrency_limit(),
                self.request_builder,
            )
            .filter_map(|request| async move {
                match request {
                    Err(e) => {
                        error!("Failed to build HEC Logs request: {:?}.", e);
                        None
                    }
                    Ok(req) => Some(req),
                }
            })
            .into_driver(self.service)
            .run()
            .await
    }
}
#[async_trait]
impl<S> StreamSink<Event> for HecLogsSink<S>
where
    S: Service<HecRequest> + Send + 'static,
    S::Future: Send + 'static,
    S::Response: DriverResponse + Send + 'static,
    S::Error: fmt::Debug + Into<crate::Error> + Send,
{
    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        self.run_inner(input).await
    }
}
#[derive(Clone, Debug, PartialEq, Hash, Eq)]
pub(super) struct Partitioned {
    pub(super) token: Option<Arc<str>>,
    pub(super) source: Option<String>,
    pub(super) sourcetype: Option<String>,
    pub(super) index: Option<String>,
    pub(super) host: Option<String>,
}
#[derive(Default)]
struct EventPartitioner {
    pub sourcetype: Option<Template>,
    pub source: Option<Template>,
    pub index: Option<Template>,
    pub host_key: Option<OptionalTargetPath>,
}
impl EventPartitioner {
    const fn new(
        sourcetype: Option<Template>,
        source: Option<Template>,
        index: Option<Template>,
        host_key: Option<OptionalTargetPath>,
    ) -> Self {
        Self {
            sourcetype,
            source,
            index,
            host_key,
        }
    }
}
impl Partitioner for EventPartitioner {
    type Item = HecProcessedEvent;
    type Key = Option<Partitioned>;
    fn partition(&self, item: &Self::Item) -> Self::Key {
        let emit_err = |error, field| {
            emit!(TemplateRenderingError {
                error,
                field: Some(field),
                drop_event: false,
            })
        };
        let source = self.source.as_ref().and_then(|source| {
            source
                .render_string(&item.event)
                .map_err(|error| emit_err(error, SOURCE_FIELD))
                .ok()
        });
        let sourcetype = self.sourcetype.as_ref().and_then(|sourcetype| {
            sourcetype
                .render_string(&item.event)
                .map_err(|error| emit_err(error, SOURCETYPE_FIELD))
                .ok()
        });
        let index = self.index.as_ref().and_then(|index| {
            index
                .render_string(&item.event)
                .map_err(|error| emit_err(error, INDEX_FIELD))
                .ok()
        });
        let host = user_or_namespaced_path(
            &item.event,
            self.host_key.as_ref(),
            meaning::HOST,
            log_schema().host_key_target_path(),
        )
        .and_then(|path| item.event.get(&path))
        .and_then(|value| value.as_str().map(|s| s.to_string()));
        Some(Partitioned {
            token: item.event.metadata().splunk_hec_token(),
            source,
            sourcetype,
            index,
            host,
        })
    }
}
#[derive(PartialEq, Default, Clone, Debug)]
pub struct HecLogsProcessedEventMetadata {
    pub sourcetype: Option<String>,
    pub source: Option<String>,
    pub index: Option<String>,
    pub host: Option<Value>,
    pub timestamp: Option<f64>,
    pub fields: LogEvent,
    pub endpoint_target: EndpointTarget,
}
impl ByteSizeOf for HecLogsProcessedEventMetadata {
    fn allocated_bytes(&self) -> usize {
        self.sourcetype.allocated_bytes()
            + self.source.allocated_bytes()
            + self.index.allocated_bytes()
            + self.host.allocated_bytes()
            + self.fields.allocated_bytes()
    }
}
pub type HecProcessedEvent = ProcessedEvent<LogEvent, HecLogsProcessedEventMetadata>;
fn user_or_namespaced_path(
    log: &LogEvent,
    user_key: Option<&OptionalTargetPath>,
    semantic: &str,
    legacy_path: Option<&OwnedTargetPath>,
) -> Option<OwnedTargetPath> {
    match user_key {
        Some(maybe_key) => maybe_key.path.clone(),
        None => match log.namespace() {
            LogNamespace::Vector => log.find_key_by_meaning(semantic).cloned(),
            LogNamespace::Legacy => legacy_path.cloned(),
        },
    }
}
pub fn process_log(event: Event, data: &HecLogData) -> HecProcessedEvent {
    let mut log = event.into_log();
    let sourcetype = data
        .sourcetype
        .and_then(|sourcetype| render_template_string(sourcetype, &log, SOURCETYPE_FIELD));
    let source = data
        .source
        .and_then(|source| render_template_string(source, &log, SOURCE_FIELD));
    let index = data
        .index
        .and_then(|index| render_template_string(index, &log, INDEX_FIELD));
    let host = user_or_namespaced_path(
        &log,
        data.host_key.as_ref(),
        meaning::HOST,
        log_schema().host_key_target_path(),
    )
    .and_then(|path| log.get(&path))
    .cloned();
    let timestamp = if EndpointTarget::Event == data.endpoint_target && !data.auto_extract_timestamp
    {
        user_or_namespaced_path(
            &log,
            data.timestamp_key.as_ref(),
            meaning::TIMESTAMP,
            log_schema().timestamp_key_target_path(),
        )
        .and_then(|timestamp_path| {
            match log.remove(×tamp_path) {
                Some(Value::Timestamp(ts)) => {
                    if let Some(key) = data.timestamp_nanos_key {
                        log.try_insert(event_path!(key), ts.timestamp_subsec_nanos() % 1_000_000);
                    }
                    Some((ts.timestamp_millis() as f64) / 1000f64)
                }
                Some(value) => {
                    emit!(SplunkEventTimestampInvalidType {
                        r#type: value.kind_str()
                    });
                    None
                }
                None => {
                    emit!(SplunkEventTimestampMissing {});
                    None
                }
            }
        })
    } else {
        None
    };
    let fields = data
        .indexed_fields
        .iter()
        .filter_map(|field| {
            log.get((PathPrefix::Event, field))
                .map(|value| (field.to_string(), value.clone()))
        })
        .collect::<LogEvent>();
    let metadata = HecLogsProcessedEventMetadata {
        sourcetype,
        source,
        index,
        host,
        timestamp,
        fields,
        endpoint_target: data.endpoint_target,
    };
    ProcessedEvent {
        event: log,
        metadata,
    }
}
impl EventCount for HecProcessedEvent {
    fn event_count(&self) -> usize {
        1
    }
}