use std::{
    collections::HashMap,
    convert::Infallible,
    io::Read,
    net::{Ipv4Addr, SocketAddr},
    sync::Arc,
    time::Duration,
};
use bytes::{Buf, Bytes};
use chrono::{DateTime, TimeZone, Utc};
use flate2::read::MultiGzDecoder;
use futures::FutureExt;
use http::StatusCode;
use hyper::{service::make_service_fn, Server};
use serde::Serialize;
use serde_json::{
    de::{Read as JsonRead, StrRead},
    Deserializer, Value as JsonValue,
};
use snafu::Snafu;
use tokio::net::TcpStream;
use tower::ServiceBuilder;
use tracing::Span;
use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered};
use vector_lib::lookup::lookup_v2::OptionalValuePath;
use vector_lib::lookup::{self, event_path, owned_value_path};
use vector_lib::sensitive_string::SensitiveString;
use vector_lib::{
    config::{LegacyKey, LogNamespace},
    event::BatchNotifier,
    schema::meaning,
    EstimatedJsonEncodedSizeOf,
};
use vector_lib::{configurable::configurable_component, tls::MaybeTlsIncomingStream};
use vrl::path::OwnedTargetPath;
use vrl::value::{kind::Collection, Kind};
use warp::{filters::BoxedFilter, path, reject::Rejection, reply::Response, Filter, Reply};
use self::{
    acknowledgements::{
        HecAckStatusRequest, HecAckStatusResponse, HecAcknowledgementsConfig,
        IndexerAcknowledgement,
    },
    splunk_response::{HecResponse, HecResponseMetadata, HecStatusCode},
};
use crate::{
    config::{log_schema, DataType, Resource, SourceConfig, SourceContext, SourceOutput},
    event::{Event, LogEvent, Value},
    http::{build_http_trace_layer, KeepaliveConfig, MaxConnectionAgeLayer},
    internal_events::{
        EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError,
    },
    serde::bool_or_struct,
    source_sender::ClosedError,
    tls::{MaybeTlsSettings, TlsEnableableConfig},
    SourceSender,
};
mod acknowledgements;
pub const CHANNEL: &str = "splunk_channel";
pub const INDEX: &str = "splunk_index";
pub const SOURCE: &str = "splunk_source";
pub const SOURCETYPE: &str = "splunk_sourcetype";
const X_SPLUNK_REQUEST_CHANNEL: &str = "x-splunk-request-channel";
#[configurable_component(source("splunk_hec", "Receive logs from Splunk."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields, default)]
pub struct SplunkConfig {
    #[serde(default = "default_socket_address")]
    pub address: SocketAddr,
    #[configurable(deprecated = "This option has been deprecated, use `valid_tokens` instead.")]
    token: Option<SensitiveString>,
    #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
    valid_tokens: Option<Vec<SensitiveString>>,
    store_hec_token: bool,
    #[configurable(derived)]
    tls: Option<TlsEnableableConfig>,
    #[configurable(derived)]
    #[serde(deserialize_with = "bool_or_struct")]
    acknowledgements: HecAcknowledgementsConfig,
    #[configurable(metadata(docs::hidden))]
    #[serde(default)]
    log_namespace: Option<bool>,
    #[configurable(derived)]
    #[serde(default)]
    keepalive: KeepaliveConfig,
}
impl_generate_config_from_default!(SplunkConfig);
impl Default for SplunkConfig {
    fn default() -> Self {
        SplunkConfig {
            address: default_socket_address(),
            token: None,
            valid_tokens: None,
            tls: None,
            acknowledgements: Default::default(),
            store_hec_token: false,
            log_namespace: None,
            keepalive: Default::default(),
        }
    }
}
fn default_socket_address() -> SocketAddr {
    SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 8088)
}
#[async_trait::async_trait]
#[typetag::serde(name = "splunk_hec")]
impl SourceConfig for SplunkConfig {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        let tls = MaybeTlsSettings::from_config(&self.tls, true)?;
        let shutdown = cx.shutdown.clone();
        let out = cx.out.clone();
        let source = SplunkSource::new(self, tls.http_protocol_name(), cx);
        let event_service = source.event_service(out.clone());
        let raw_service = source.raw_service(out);
        let health_service = source.health_service();
        let ack_service = source.ack_service();
        let options = SplunkSource::options();
        let services = path!("services" / "collector" / ..)
            .and(
                event_service
                    .or(raw_service)
                    .unify()
                    .or(health_service)
                    .unify()
                    .or(ack_service)
                    .unify()
                    .or(options)
                    .unify(),
            )
            .or_else(finish_err);
        let listener = tls.bind(&self.address).await?;
        let keepalive_settings = self.keepalive.clone();
        Ok(Box::pin(async move {
            let span = Span::current();
            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
                let svc = ServiceBuilder::new()
                    .layer(build_http_trace_layer(span.clone()))
                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
                        MaxConnectionAgeLayer::new(
                            Duration::from_secs(secs),
                            keepalive_settings.max_connection_age_jitter_factor,
                            conn.peer_addr(),
                        )
                    }))
                    .service(warp::service(services.clone()));
                futures_util::future::ok::<_, Infallible>(svc)
            });
            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
                .serve(make_svc)
                .with_graceful_shutdown(shutdown.map(|_| ()))
                .await
                .map_err(|err| {
                    error!("An error occurred: {:?}.", err);
                })?;
            Ok(())
        }))
    }
    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        let log_namespace = global_log_namespace.merge(self.log_namespace);
        let schema_definition = match log_namespace {
            LogNamespace::Legacy => {
                let definition = vector_lib::schema::Definition::empty_legacy_namespace()
                    .with_event_field(
                        &owned_value_path!("line"),
                        Kind::object(Collection::empty())
                            .or_array(Collection::empty())
                            .or_undefined(),
                        None,
                    );
                if let Some(message_key) = log_schema().message_key() {
                    definition.with_event_field(
                        message_key,
                        Kind::bytes().or_undefined(),
                        Some(meaning::MESSAGE),
                    )
                } else {
                    definition
                }
            }
            LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
                Kind::bytes().or_object(Collection::empty()),
                [log_namespace],
            )
            .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
        }
        .with_standard_vector_source_metadata()
        .with_source_metadata(
            SplunkConfig::NAME,
            log_schema()
                .host_key()
                .cloned()
                .map(LegacyKey::InsertIfEmpty),
            &owned_value_path!("host"),
            Kind::bytes(),
            Some(meaning::HOST),
        )
        .with_source_metadata(
            SplunkConfig::NAME,
            Some(LegacyKey::Overwrite(owned_value_path!(CHANNEL))),
            &owned_value_path!("channel"),
            Kind::bytes(),
            None,
        )
        .with_source_metadata(
            SplunkConfig::NAME,
            Some(LegacyKey::Overwrite(owned_value_path!(INDEX))),
            &owned_value_path!("index"),
            Kind::bytes(),
            None,
        )
        .with_source_metadata(
            SplunkConfig::NAME,
            Some(LegacyKey::Overwrite(owned_value_path!(SOURCE))),
            &owned_value_path!("source"),
            Kind::bytes(),
            Some(meaning::SERVICE),
        )
        .with_source_metadata(
            SplunkConfig::NAME,
            Some(LegacyKey::Overwrite(owned_value_path!(SOURCETYPE))),
            &owned_value_path!("sourcetype"),
            Kind::bytes(),
            None,
        );
        vec![SourceOutput::new_maybe_logs(
            DataType::Log,
            schema_definition,
        )]
    }
    fn resources(&self) -> Vec<Resource> {
        vec![Resource::tcp(self.address)]
    }
    fn can_acknowledge(&self) -> bool {
        true
    }
}
struct SplunkSource {
    valid_credentials: Vec<String>,
    protocol: &'static str,
    idx_ack: Option<Arc<IndexerAcknowledgement>>,
    store_hec_token: bool,
    log_namespace: LogNamespace,
    events_received: Registered<EventsReceived>,
}
impl SplunkSource {
    fn new(config: &SplunkConfig, protocol: &'static str, cx: SourceContext) -> Self {
        let log_namespace = cx.log_namespace(config.log_namespace);
        let acknowledgements = cx.do_acknowledgements(config.acknowledgements.enabled.into());
        let shutdown = cx.shutdown;
        let valid_tokens = config
            .valid_tokens
            .iter()
            .flatten()
            .chain(config.token.iter());
        let idx_ack = acknowledgements.then(|| {
            Arc::new(IndexerAcknowledgement::new(
                config.acknowledgements.clone(),
                shutdown,
            ))
        });
        SplunkSource {
            valid_credentials: valid_tokens
                .map(|token| format!("Splunk {}", token.inner()))
                .collect(),
            protocol,
            idx_ack,
            store_hec_token: config.store_hec_token,
            log_namespace,
            events_received: register!(EventsReceived),
        }
    }
    fn event_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
        let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
            .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
        let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
        let splunk_channel = splunk_channel_header
            .and(splunk_channel_query_param)
            .map(|header: Option<String>, query_param| header.or(query_param));
        let protocol = self.protocol;
        let idx_ack = self.idx_ack.clone();
        let store_hec_token = self.store_hec_token;
        let log_namespace = self.log_namespace;
        let events_received = self.events_received.clone();
        warp::post()
            .and(
                path!("event")
                    .or(path!("event" / "1.0"))
                    .or(warp::path::end()),
            )
            .and(self.authorization())
            .and(splunk_channel)
            .and(warp::addr::remote())
            .and(warp::header::optional::<String>("X-Forwarded-For"))
            .and(self.gzip())
            .and(warp::body::bytes())
            .and(warp::path::full())
            .and_then(
                move |_,
                      token: Option<String>,
                      channel: Option<String>,
                      remote: Option<SocketAddr>,
                      remote_addr: Option<String>,
                      gzip: bool,
                      body: Bytes,
                      path: warp::path::FullPath| {
                    let mut out = out.clone();
                    let idx_ack = idx_ack.clone();
                    let events_received = events_received.clone();
                    async move {
                        if idx_ack.is_some() && channel.is_none() {
                            return Err(Rejection::from(ApiError::MissingChannel));
                        }
                        let mut data = Vec::new();
                        let (byte_size, body) = if gzip {
                            MultiGzDecoder::new(body.reader())
                                .read_to_end(&mut data)
                                .map_err(|_| Rejection::from(ApiError::BadRequest))?;
                            (data.len(), String::from_utf8_lossy(data.as_slice()))
                        } else {
                            (body.len(), String::from_utf8_lossy(body.as_ref()))
                        };
                        emit!(HttpBytesReceived {
                            byte_size,
                            http_path: path.as_str(),
                            protocol,
                        });
                        let (batch, receiver) =
                            BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
                        let maybe_ack_id = match (idx_ack, receiver, channel.clone()) {
                            (Some(idx_ack), Some(receiver), Some(channel_id)) => {
                                match idx_ack.get_ack_id_from_channel(channel_id, receiver).await {
                                    Ok(ack_id) => Some(ack_id),
                                    Err(rej) => return Err(rej),
                                }
                            }
                            _ => None,
                        };
                        let mut error = None;
                        let mut events = Vec::new();
                        let iter: EventIterator<'_, StrRead<'_>> = EventIteratorGenerator {
                            deserializer: Deserializer::from_str(&body).into_iter::<JsonValue>(),
                            channel,
                            remote,
                            remote_addr,
                            batch,
                            token: token.filter(|_| store_hec_token).map(Into::into),
                            log_namespace,
                            events_received,
                        }
                        .into();
                        for result in iter {
                            match result {
                                Ok(event) => events.push(event),
                                Err(err) => {
                                    error = Some(err);
                                    break;
                                }
                            }
                        }
                        if !events.is_empty() {
                            if let Err(ClosedError) = out.send_batch(events).await {
                                return Err(Rejection::from(ApiError::ServerShutdown));
                            }
                        }
                        if let Some(error) = error {
                            Err(error)
                        } else {
                            Ok(maybe_ack_id)
                        }
                    }
                },
            )
            .map(finish_ok)
            .boxed()
    }
    fn raw_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
        let protocol = self.protocol;
        let idx_ack = self.idx_ack.clone();
        let store_hec_token = self.store_hec_token;
        let events_received = self.events_received.clone();
        let log_namespace = self.log_namespace;
        warp::post()
            .and(path!("raw" / "1.0").or(path!("raw")))
            .and(self.authorization())
            .and(SplunkSource::required_channel())
            .and(warp::addr::remote())
            .and(warp::header::optional::<String>("X-Forwarded-For"))
            .and(self.gzip())
            .and(warp::body::bytes())
            .and(warp::path::full())
            .and_then(
                move |_,
                      token: Option<String>,
                      channel_id: String,
                      remote: Option<SocketAddr>,
                      xff: Option<String>,
                      gzip: bool,
                      body: Bytes,
                      path: warp::path::FullPath| {
                    let mut out = out.clone();
                    let idx_ack = idx_ack.clone();
                    let events_received = events_received.clone();
                    emit!(HttpBytesReceived {
                        byte_size: body.len(),
                        http_path: path.as_str(),
                        protocol,
                    });
                    async move {
                        let (batch, receiver) =
                            BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
                        let maybe_ack_id = match (idx_ack, receiver) {
                            (Some(idx_ack), Some(receiver)) => Some(
                                idx_ack
                                    .get_ack_id_from_channel(channel_id.clone(), receiver)
                                    .await?,
                            ),
                            _ => None,
                        };
                        let mut event = raw_event(
                            body,
                            gzip,
                            channel_id,
                            remote,
                            xff,
                            batch,
                            log_namespace,
                            &events_received,
                        )?;
                        if let Some(token) = token.filter(|_| store_hec_token) {
                            event.metadata_mut().set_splunk_hec_token(token.into());
                        }
                        let res = out.send_event(event).await;
                        res.map(|_| maybe_ack_id)
                            .map_err(|_| Rejection::from(ApiError::ServerShutdown))
                    }
                },
            )
            .map(finish_ok)
            .boxed()
    }
    fn health_service(&self) -> BoxedFilter<(Response,)> {
        warp::get()
            .and(path!("health" / "1.0").or(path!("health")))
            .map(move |_| {
                http::Response::builder()
                    .header(http::header::CONTENT_TYPE, "application/json")
                    .body(hyper::Body::from(r#"{"text":"HEC is healthy","code":17}"#))
                    .expect("static response")
            })
            .boxed()
    }
    fn ack_service(&self) -> BoxedFilter<(Response,)> {
        let idx_ack = self.idx_ack.clone();
        warp::post()
            .and(path!("ack"))
            .and(self.authorization())
            .and(SplunkSource::required_channel())
            .and(warp::body::json())
            .and_then(move |_, channel_id: String, body: HecAckStatusRequest| {
                let idx_ack = idx_ack.clone();
                async move {
                    if let Some(idx_ack) = idx_ack {
                        let ack_statuses = idx_ack
                            .get_acks_status_from_channel(channel_id, &body.acks)
                            .await?;
                        Ok(
                            warp::reply::json(&HecAckStatusResponse { acks: ack_statuses })
                                .into_response(),
                        )
                    } else {
                        Err(Rejection::from(ApiError::AckIsDisabled))
                    }
                }
            })
            .boxed()
    }
    fn options() -> BoxedFilter<(Response,)> {
        let post = warp::options()
            .and(
                path!("event")
                    .or(path!("event" / "1.0"))
                    .or(path!("raw" / "1.0"))
                    .or(path!("raw")),
            )
            .map(|_| warp::reply::with_header(warp::reply(), "Allow", "POST").into_response());
        let get = warp::options()
            .and(path!("health").or(path!("health" / "1.0")))
            .map(|_| warp::reply::with_header(warp::reply(), "Allow", "GET").into_response());
        post.or(get).unify().boxed()
    }
    fn authorization(&self) -> BoxedFilter<(Option<String>,)> {
        let valid_credentials = self.valid_credentials.clone();
        warp::header::optional("Authorization")
            .and_then(move |token: Option<String>| {
                let valid_credentials = valid_credentials.clone();
                async move {
                    match (token, valid_credentials.is_empty()) {
                        (token, true) => {
                            Ok(token
                                .map(|t| t.strip_prefix("Splunk ").map(Into::into).unwrap_or(t)))
                        }
                        (Some(token), false) if valid_credentials.contains(&token) => Ok(Some(
                            token
                                .strip_prefix("Splunk ")
                                .map(Into::into)
                                .unwrap_or(token),
                        )),
                        (Some(_), false) => Err(Rejection::from(ApiError::InvalidAuthorization)),
                        (None, false) => Err(Rejection::from(ApiError::MissingAuthorization)),
                    }
                }
            })
            .boxed()
    }
    fn gzip(&self) -> BoxedFilter<(bool,)> {
        warp::header::optional::<String>("Content-Encoding")
            .and_then(|encoding: Option<String>| async move {
                match encoding {
                    Some(s) if s.as_bytes() == b"gzip" => Ok(true),
                    Some(_) => Err(Rejection::from(ApiError::UnsupportedEncoding)),
                    None => Ok(false),
                }
            })
            .boxed()
    }
    fn required_channel() -> BoxedFilter<(String,)> {
        let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
            .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
        let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
        splunk_channel_header
            .and(splunk_channel_query_param)
            .and_then(|header: Option<String>, query_param| async move {
                header
                    .or(query_param)
                    .ok_or_else(|| Rejection::from(ApiError::MissingChannel))
            })
            .boxed()
    }
}
struct EventIterator<'de, R: JsonRead<'de>> {
    deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
    events: usize,
    channel: Option<Value>,
    time: Time,
    extractors: [DefaultExtractor; 4],
    batch: Option<BatchNotifier>,
    token: Option<Arc<str>>,
    log_namespace: LogNamespace,
    events_received: Registered<EventsReceived>,
}
struct EventIteratorGenerator<'de, R: JsonRead<'de>> {
    deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
    channel: Option<String>,
    batch: Option<BatchNotifier>,
    token: Option<Arc<str>>,
    log_namespace: LogNamespace,
    events_received: Registered<EventsReceived>,
    remote: Option<SocketAddr>,
    remote_addr: Option<String>,
}
impl<'de, R: JsonRead<'de>> From<EventIteratorGenerator<'de, R>> for EventIterator<'de, R> {
    fn from(f: EventIteratorGenerator<'de, R>) -> Self {
        Self {
            deserializer: f.deserializer,
            events: 0,
            channel: f.channel.map(Value::from),
            time: Time::Now(Utc::now()),
            extractors: [
                DefaultExtractor::new_with(
                    "host",
                    log_schema().host_key().cloned().into(),
                    f.remote_addr
                        .or_else(|| f.remote.map(|addr| addr.to_string()))
                        .map(Value::from),
                    f.log_namespace,
                ),
                DefaultExtractor::new("index", OptionalValuePath::new(INDEX), f.log_namespace),
                DefaultExtractor::new("source", OptionalValuePath::new(SOURCE), f.log_namespace),
                DefaultExtractor::new(
                    "sourcetype",
                    OptionalValuePath::new(SOURCETYPE),
                    f.log_namespace,
                ),
            ],
            batch: f.batch,
            token: f.token,
            log_namespace: f.log_namespace,
            events_received: f.events_received,
        }
    }
}
impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
    fn build_event(&mut self, mut json: JsonValue) -> Result<Event, Rejection> {
        let mut log = match self.log_namespace {
            LogNamespace::Vector => self.build_log_vector(&mut json)?,
            LogNamespace::Legacy => self.build_log_legacy(&mut json)?,
        };
        self.log_namespace.insert_vector_metadata(
            &mut log,
            log_schema().source_type_key(),
            &owned_value_path!("source_type"),
            SplunkConfig::NAME,
        );
        let channel_path = owned_value_path!(CHANNEL);
        if let Some(JsonValue::String(guid)) = json.get_mut("channel").map(JsonValue::take) {
            self.log_namespace.insert_source_metadata(
                SplunkConfig::NAME,
                &mut log,
                Some(LegacyKey::Overwrite(&channel_path)),
                lookup::path!(CHANNEL),
                guid,
            );
        } else if let Some(guid) = self.channel.as_ref() {
            self.log_namespace.insert_source_metadata(
                SplunkConfig::NAME,
                &mut log,
                Some(LegacyKey::Overwrite(&channel_path)),
                lookup::path!(CHANNEL),
                guid.clone(),
            );
        }
        if let Some(JsonValue::Object(object)) = json.get_mut("fields").map(JsonValue::take) {
            for (key, value) in object {
                self.log_namespace.insert_source_metadata(
                    SplunkConfig::NAME,
                    &mut log,
                    Some(LegacyKey::Overwrite(&owned_value_path!(key.as_str()))),
                    lookup::path!(key.as_str()),
                    value,
                );
            }
        }
        let parsed_time = match json.get_mut("time").map(JsonValue::take) {
            Some(JsonValue::Number(time)) => Some(Some(time)),
            Some(JsonValue::String(time)) => Some(time.parse::<serde_json::Number>().ok()),
            _ => None,
        };
        match parsed_time {
            None => (),
            Some(Some(t)) => {
                if let Some(t) = t.as_u64() {
                    let time = parse_timestamp(t as i64)
                        .ok_or(ApiError::InvalidDataFormat { event: self.events })?;
                    self.time = Time::Provided(time);
                } else if let Some(t) = t.as_f64() {
                    self.time = Time::Provided(
                        Utc.timestamp_opt(
                            t.floor() as i64,
                            (t.fract() * 1000.0 * 1000.0 * 1000.0) as u32,
                        )
                        .single()
                        .expect("invalid timestamp"),
                    );
                } else {
                    return Err(ApiError::InvalidDataFormat { event: self.events }.into());
                }
            }
            Some(None) => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
        }
        let timestamp = match self.time.clone() {
            Time::Provided(time) => time,
            Time::Now(time) => time,
        };
        self.log_namespace.insert_source_metadata(
            SplunkConfig::NAME,
            &mut log,
            log_schema().timestamp_key().map(LegacyKey::Overwrite),
            lookup::path!("timestamp"),
            timestamp,
        );
        for de in self.extractors.iter_mut() {
            de.extract(&mut log, &mut json);
        }
        if let Some(token) = &self.token {
            log.metadata_mut().set_splunk_hec_token(Arc::clone(token));
        }
        if let Some(batch) = self.batch.clone() {
            log = log.with_batch_notifier(&batch);
        }
        self.events += 1;
        Ok(log.into())
    }
    fn build_log_vector(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
        match json.get("event") {
            Some(event) => {
                let event: Value = event.into();
                let mut log = LogEvent::from(event);
                self.events_received
                    .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
                self.log_namespace.insert_vector_metadata(
                    &mut log,
                    log_schema().timestamp_key(),
                    lookup::path!("ingest_timestamp"),
                    chrono::Utc::now(),
                );
                Ok(log)
            }
            None => Err(ApiError::MissingEventField { event: self.events }.into()),
        }
    }
    fn build_log_legacy(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
        let mut log = LogEvent::default();
        match json.get_mut("event") {
            Some(event) => match event.take() {
                JsonValue::String(string) => {
                    if string.is_empty() {
                        return Err(ApiError::EmptyEventField { event: self.events }.into());
                    }
                    log.maybe_insert(log_schema().message_key_target_path(), string);
                }
                JsonValue::Object(mut object) => {
                    if object.is_empty() {
                        return Err(ApiError::EmptyEventField { event: self.events }.into());
                    }
                    if let Some(line) = object.remove("line") {
                        match line {
                            JsonValue::Array(_) | JsonValue::Object(_) => {
                                log.insert(event_path!("line"), line);
                            }
                            _ => {
                                log.maybe_insert(log_schema().message_key_target_path(), line);
                            }
                        }
                    }
                    for (key, value) in object {
                        log.insert(event_path!(key.as_str()), value);
                    }
                }
                _ => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
            },
            None => return Err(ApiError::MissingEventField { event: self.events }.into()),
        };
        self.events_received
            .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
        Ok(log)
    }
}
impl<'de, R: JsonRead<'de>> Iterator for EventIterator<'de, R> {
    type Item = Result<Event, Rejection>;
    fn next(&mut self) -> Option<Self::Item> {
        match self.deserializer.next() {
            Some(Ok(json)) => Some(self.build_event(json)),
            None => {
                if self.events == 0 {
                    Some(Err(ApiError::NoData.into()))
                } else {
                    None
                }
            }
            Some(Err(error)) => {
                emit!(SplunkHecRequestBodyInvalidError {
                    error: error.into()
                });
                Some(Err(
                    ApiError::InvalidDataFormat { event: self.events }.into()
                ))
            }
        }
    }
}
fn parse_timestamp(t: i64) -> Option<DateTime<Utc>> {
    const SEC_CUTOFF: i64 = 13569465600;
    const MILLISEC_CUTOFF: i64 = 253402300800000;
    if t < 0 {
        return None;
    }
    let ts = if t < SEC_CUTOFF {
        Utc.timestamp_opt(t, 0).single().expect("invalid timestamp")
    } else if t < MILLISEC_CUTOFF {
        Utc.timestamp_millis_opt(t)
            .single()
            .expect("invalid timestamp")
    } else {
        Utc.timestamp_nanos(t)
    };
    Some(ts)
}
struct DefaultExtractor {
    field: &'static str,
    to_field: OptionalValuePath,
    value: Option<Value>,
    log_namespace: LogNamespace,
}
impl DefaultExtractor {
    const fn new(
        field: &'static str,
        to_field: OptionalValuePath,
        log_namespace: LogNamespace,
    ) -> Self {
        DefaultExtractor {
            field,
            to_field,
            value: None,
            log_namespace,
        }
    }
    fn new_with(
        field: &'static str,
        to_field: OptionalValuePath,
        value: impl Into<Option<Value>>,
        log_namespace: LogNamespace,
    ) -> Self {
        DefaultExtractor {
            field,
            to_field,
            value: value.into(),
            log_namespace,
        }
    }
    fn extract(&mut self, log: &mut LogEvent, value: &mut JsonValue) {
        if let Some(JsonValue::String(new_value)) = value.get_mut(self.field).map(JsonValue::take) {
            self.value = Some(new_value.into());
        }
        if let Some(index) = self.value.as_ref() {
            if let Some(metadata_key) = self.to_field.path.as_ref() {
                self.log_namespace.insert_source_metadata(
                    SplunkConfig::NAME,
                    log,
                    Some(LegacyKey::Overwrite(metadata_key)),
                    &self.to_field.path.clone().unwrap_or(owned_value_path!("")),
                    index.clone(),
                )
            }
        }
    }
}
#[derive(Clone, Debug)]
enum Time {
    Now(DateTime<Utc>),
    Provided(DateTime<Utc>),
}
#[allow(clippy::too_many_arguments)]
fn raw_event(
    bytes: Bytes,
    gzip: bool,
    channel: String,
    remote: Option<SocketAddr>,
    xff: Option<String>,
    batch: Option<BatchNotifier>,
    log_namespace: LogNamespace,
    events_received: &Registered<EventsReceived>,
) -> Result<Event, Rejection> {
    let message: Value = if gzip {
        let mut data = Vec::new();
        match MultiGzDecoder::new(bytes.reader()).read_to_end(&mut data) {
            Ok(0) => return Err(ApiError::NoData.into()),
            Ok(_) => Value::from(Bytes::from(data)),
            Err(error) => {
                emit!(SplunkHecRequestBodyInvalidError { error });
                return Err(ApiError::InvalidDataFormat { event: 0 }.into());
            }
        }
    } else {
        bytes.into()
    };
    let mut log = match log_namespace {
        LogNamespace::Vector => LogEvent::from(message),
        LogNamespace::Legacy => {
            let mut log = LogEvent::default();
            log.maybe_insert(log_schema().message_key_target_path(), message);
            log
        }
    };
    events_received.emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
    log_namespace.insert_source_metadata(
        SplunkConfig::NAME,
        &mut log,
        Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))),
        lookup::path!(CHANNEL),
        channel,
    );
    let host = if let Some(remote_address) = xff {
        Some(remote_address)
    } else {
        remote.map(|remote| remote.to_string())
    };
    if let Some(host) = host {
        log_namespace.insert_source_metadata(
            SplunkConfig::NAME,
            &mut log,
            log_schema().host_key().map(LegacyKey::InsertIfEmpty),
            lookup::path!("host"),
            host,
        );
    }
    log_namespace.insert_standard_vector_source_metadata(&mut log, SplunkConfig::NAME, Utc::now());
    if let Some(batch) = batch {
        log = log.with_batch_notifier(&batch);
    }
    Ok(Event::from(log))
}
#[derive(Clone, Copy, Debug, Snafu)]
pub(crate) enum ApiError {
    MissingAuthorization,
    InvalidAuthorization,
    UnsupportedEncoding,
    MissingChannel,
    NoData,
    InvalidDataFormat { event: usize },
    ServerShutdown,
    EmptyEventField { event: usize },
    MissingEventField { event: usize },
    BadRequest,
    ServiceUnavailable,
    AckIsDisabled,
}
impl warp::reject::Reject for ApiError {}
mod splunk_response {
    use serde::Serialize;
    pub enum HecStatusCode {
        Success = 0,
        TokenIsRequired = 2,
        InvalidAuthorization = 3,
        NoData = 5,
        InvalidDataFormat = 6,
        ServerIsBusy = 9,
        DataChannelIsMissing = 10,
        EventFieldIsRequired = 12,
        EventFieldCannotBeBlank = 13,
        AckIsDisabled = 14,
    }
    #[derive(Serialize)]
    pub enum HecResponseMetadata {
        #[serde(rename = "ackId")]
        AckId(u64),
        #[serde(rename = "invalid-event-number")]
        InvalidEventNumber(usize),
    }
    #[derive(Serialize)]
    pub struct HecResponse {
        text: &'static str,
        code: u8,
        #[serde(skip_serializing_if = "Option::is_none", flatten)]
        pub metadata: Option<HecResponseMetadata>,
    }
    impl HecResponse {
        pub const fn new(code: HecStatusCode) -> Self {
            let text = match code {
                HecStatusCode::Success => "Success",
                HecStatusCode::TokenIsRequired => "Token is required",
                HecStatusCode::InvalidAuthorization => "Invalid authorization",
                HecStatusCode::NoData => "No data",
                HecStatusCode::InvalidDataFormat => "Invalid data format",
                HecStatusCode::DataChannelIsMissing => "Data channel is missing",
                HecStatusCode::EventFieldIsRequired => "Event field is required",
                HecStatusCode::EventFieldCannotBeBlank => "Event field cannot be blank",
                HecStatusCode::ServerIsBusy => "Server is busy",
                HecStatusCode::AckIsDisabled => "Ack is disabled",
            };
            Self {
                text,
                code: code as u8,
                metadata: None,
            }
        }
        pub const fn with_metadata(mut self, metadata: HecResponseMetadata) -> Self {
            self.metadata = Some(metadata);
            self
        }
    }
    pub const INVALID_AUTHORIZATION: HecResponse =
        HecResponse::new(HecStatusCode::InvalidAuthorization);
    pub const TOKEN_IS_REQUIRED: HecResponse = HecResponse::new(HecStatusCode::TokenIsRequired);
    pub const NO_DATA: HecResponse = HecResponse::new(HecStatusCode::NoData);
    pub const SUCCESS: HecResponse = HecResponse::new(HecStatusCode::Success);
    pub const SERVER_IS_BUSY: HecResponse = HecResponse::new(HecStatusCode::ServerIsBusy);
    pub const NO_CHANNEL: HecResponse = HecResponse::new(HecStatusCode::DataChannelIsMissing);
    pub const ACK_IS_DISABLED: HecResponse = HecResponse::new(HecStatusCode::AckIsDisabled);
}
fn finish_ok(maybe_ack_id: Option<u64>) -> Response {
    let body = if let Some(ack_id) = maybe_ack_id {
        HecResponse::new(HecStatusCode::Success).with_metadata(HecResponseMetadata::AckId(ack_id))
    } else {
        splunk_response::SUCCESS
    };
    response_json(StatusCode::OK, body)
}
async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
    if let Some(&error) = rejection.find::<ApiError>() {
        emit!(SplunkHecRequestError { error });
        Ok((match error {
            ApiError::MissingAuthorization => {
                response_json(StatusCode::UNAUTHORIZED, splunk_response::TOKEN_IS_REQUIRED)
            }
            ApiError::InvalidAuthorization => response_json(
                StatusCode::UNAUTHORIZED,
                splunk_response::INVALID_AUTHORIZATION,
            ),
            ApiError::UnsupportedEncoding => empty_response(StatusCode::UNSUPPORTED_MEDIA_TYPE),
            ApiError::MissingChannel => {
                response_json(StatusCode::BAD_REQUEST, splunk_response::NO_CHANNEL)
            }
            ApiError::NoData => response_json(StatusCode::BAD_REQUEST, splunk_response::NO_DATA),
            ApiError::ServerShutdown => empty_response(StatusCode::SERVICE_UNAVAILABLE),
            ApiError::InvalidDataFormat { event } => response_json(
                StatusCode::BAD_REQUEST,
                HecResponse::new(HecStatusCode::InvalidDataFormat)
                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
            ),
            ApiError::EmptyEventField { event } => response_json(
                StatusCode::BAD_REQUEST,
                HecResponse::new(HecStatusCode::EventFieldCannotBeBlank)
                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
            ),
            ApiError::MissingEventField { event } => response_json(
                StatusCode::BAD_REQUEST,
                HecResponse::new(HecStatusCode::EventFieldIsRequired)
                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
            ),
            ApiError::BadRequest => empty_response(StatusCode::BAD_REQUEST),
            ApiError::ServiceUnavailable => response_json(
                StatusCode::SERVICE_UNAVAILABLE,
                splunk_response::SERVER_IS_BUSY,
            ),
            ApiError::AckIsDisabled => {
                response_json(StatusCode::BAD_REQUEST, splunk_response::ACK_IS_DISABLED)
            }
        },))
    } else {
        Err(rejection)
    }
}
fn empty_response(code: StatusCode) -> Response {
    let mut res = Response::default();
    *res.status_mut() = code;
    res
}
fn response_json(code: StatusCode, body: impl Serialize) -> Response {
    warp::reply::with_status(warp::reply::json(&body), code).into_response()
}
#[cfg(feature = "sinks-splunk_hec")]
#[cfg(test)]
mod tests {
    use std::{net::SocketAddr, num::NonZeroU64};
    use chrono::{TimeZone, Utc};
    use futures_util::Stream;
    use http::Uri;
    use reqwest::{RequestBuilder, Response};
    use serde::Deserialize;
    use vector_lib::codecs::{
        decoding::DeserializerConfig, BytesDecoderConfig, JsonSerializerConfig,
        TextSerializerConfig,
    };
    use vector_lib::sensitive_string::SensitiveString;
    use vector_lib::{event::EventStatus, schema::Definition};
    use vrl::path::PathPrefix;
    use super::*;
    use crate::{
        codecs::{DecodingConfig, EncodingConfig},
        components::validation::prelude::*,
        config::{log_schema, SinkConfig, SinkContext, SourceConfig, SourceContext},
        event::{Event, LogEvent},
        sinks::{
            splunk_hec::logs::config::HecLogsSinkConfig,
            util::{BatchConfig, Compression, TowerRequestConfig},
            Healthcheck, VectorSink,
        },
        sources::splunk_hec::acknowledgements::{HecAckStatusRequest, HecAckStatusResponse},
        test_util::{
            collect_n,
            components::{
                assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
                HTTP_PUSH_SOURCE_TAGS,
            },
            next_addr, wait_for_tcp,
        },
        SourceSender,
    };
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<SplunkConfig>();
    }
    const TOKEN: &str = "token";
    const VALID_TOKENS: &[&str; 2] = &[TOKEN, "secondary-token"];
    async fn source(
        acknowledgements: Option<HecAcknowledgementsConfig>,
    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
        source_with(Some(TOKEN.to_owned().into()), None, acknowledgements, false).await
    }
    async fn source_with(
        token: Option<SensitiveString>,
        valid_tokens: Option<&[&str]>,
        acknowledgements: Option<HecAcknowledgementsConfig>,
        store_hec_token: bool,
    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
        let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
        let address = next_addr();
        let valid_tokens =
            valid_tokens.map(|tokens| tokens.iter().map(|v| v.to_string().into()).collect());
        let cx = SourceContext::new_test(sender, None);
        tokio::spawn(async move {
            SplunkConfig {
                address,
                token,
                valid_tokens,
                tls: None,
                acknowledgements: acknowledgements.unwrap_or_default(),
                store_hec_token,
                log_namespace: None,
                keepalive: Default::default(),
            }
            .build(cx)
            .await
            .unwrap()
            .await
            .unwrap()
        });
        wait_for_tcp(address).await;
        (recv, address)
    }
    async fn sink(
        address: SocketAddr,
        encoding: EncodingConfig,
        compression: Compression,
    ) -> (VectorSink, Healthcheck) {
        HecLogsSinkConfig {
            default_token: TOKEN.to_owned().into(),
            endpoint: format!("http://{}", address),
            host_key: None,
            indexed_fields: vec![],
            index: None,
            sourcetype: None,
            source: None,
            encoding,
            compression,
            batch: BatchConfig::default(),
            request: TowerRequestConfig::default(),
            tls: None,
            acknowledgements: Default::default(),
            timestamp_nanos_key: None,
            timestamp_key: None,
            auto_extract_timestamp: None,
            endpoint_target: Default::default(),
        }
        .build(SinkContext::default())
        .await
        .unwrap()
    }
    async fn start(
        encoding: EncodingConfig,
        compression: Compression,
        acknowledgements: Option<HecAcknowledgementsConfig>,
    ) -> (VectorSink, impl Stream<Item = Event> + Unpin) {
        let (source, address) = source(acknowledgements).await;
        let (sink, health) = sink(address, encoding, compression).await;
        assert!(health.await.is_ok());
        (sink, source)
    }
    async fn channel_n(
        messages: Vec<impl Into<String> + Send + 'static>,
        sink: VectorSink,
        source: impl Stream<Item = Event> + Unpin,
    ) -> Vec<Event> {
        let n = messages.len();
        tokio::spawn(async move {
            sink.run_events(
                messages
                    .into_iter()
                    .map(|s| Event::Log(LogEvent::from(s.into()))),
            )
            .await
            .unwrap();
        });
        let events = collect_n(source, n).await;
        assert_eq!(n, events.len());
        events
    }
    #[derive(Clone, Copy, Debug)]
    enum Channel<'a> {
        Header(&'a str),
        QueryParam(&'a str),
    }
    #[derive(Default)]
    struct SendWithOpts<'a> {
        channel: Option<Channel<'a>>,
        forwarded_for: Option<String>,
    }
    async fn post(address: SocketAddr, api: &str, message: &str) -> u16 {
        let channel = Channel::Header("channel");
        let options = SendWithOpts {
            channel: Some(channel),
            forwarded_for: None,
        };
        send_with(address, api, message, TOKEN, &options).await
    }
    fn build_request(
        address: SocketAddr,
        api: &str,
        message: &str,
        token: &str,
        opts: &SendWithOpts<'_>,
    ) -> RequestBuilder {
        let mut b = reqwest::Client::new()
            .post(format!("http://{}/{}", address, api))
            .header("Authorization", format!("Splunk {}", token));
        b = match opts.channel {
            Some(c) => match c {
                Channel::Header(v) => b.header("x-splunk-request-channel", v),
                Channel::QueryParam(v) => b.query(&[("channel", v)]),
            },
            None => b,
        };
        b = match &opts.forwarded_for {
            Some(f) => b.header("X-Forwarded-For", f),
            None => b,
        };
        b.body(message.to_owned())
    }
    async fn send_with<'a>(
        address: SocketAddr,
        api: &str,
        message: &str,
        token: &str,
        opts: &SendWithOpts<'_>,
    ) -> u16 {
        let b = build_request(address, api, message, token, opts);
        b.send().await.unwrap().status().as_u16()
    }
    async fn send_with_response<'a>(
        address: SocketAddr,
        api: &str,
        message: &str,
        token: &str,
        opts: &SendWithOpts<'_>,
    ) -> Response {
        let b = build_request(address, api, message, token, opts);
        b.send().await.unwrap()
    }
    #[tokio::test]
    async fn no_compression_text_event() {
        let message = "gzip_text_event";
        let (sink, source) = start(
            TextSerializerConfig::default().into(),
            Compression::None,
            None,
        )
        .await;
        let event = channel_n(vec![message], sink, source).await.remove(0);
        assert_eq!(
            event.as_log()[log_schema().message_key().unwrap().to_string()],
            message.into()
        );
        assert!(event.as_log().get_timestamp().is_some());
        assert_eq!(
            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
            "splunk_hec".into()
        );
        assert!(event.metadata().splunk_hec_token().is_none());
    }
    #[tokio::test]
    async fn one_simple_text_event() {
        let message = "one_simple_text_event";
        let (sink, source) = start(
            TextSerializerConfig::default().into(),
            Compression::gzip_default(),
            None,
        )
        .await;
        let event = channel_n(vec![message], sink, source).await.remove(0);
        assert_eq!(
            event.as_log()[log_schema().message_key().unwrap().to_string()],
            message.into()
        );
        assert!(event.as_log().get_timestamp().is_some());
        assert_eq!(
            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
            "splunk_hec".into()
        );
        assert!(event.metadata().splunk_hec_token().is_none());
    }
    #[tokio::test]
    async fn multiple_simple_text_event() {
        let n = 200;
        let (sink, source) = start(
            TextSerializerConfig::default().into(),
            Compression::None,
            None,
        )
        .await;
        let messages = (0..n)
            .map(|i| format!("multiple_simple_text_event_{}", i))
            .collect::<Vec<_>>();
        let events = channel_n(messages.clone(), sink, source).await;
        for (msg, event) in messages.into_iter().zip(events.into_iter()) {
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                msg.into()
            );
            assert!(event.as_log().get_timestamp().is_some());
            assert_eq!(
                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
                "splunk_hec".into()
            );
            assert!(event.metadata().splunk_hec_token().is_none());
        }
    }
    #[tokio::test]
    async fn one_simple_json_event() {
        let message = "one_simple_json_event";
        let (sink, source) = start(
            JsonSerializerConfig::default().into(),
            Compression::gzip_default(),
            None,
        )
        .await;
        let event = channel_n(vec![message], sink, source).await.remove(0);
        assert_eq!(
            event.as_log()[log_schema().message_key().unwrap().to_string()],
            message.into()
        );
        assert!(event.as_log().get_timestamp().is_some());
        assert_eq!(
            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
            "splunk_hec".into()
        );
        assert!(event.metadata().splunk_hec_token().is_none());
    }
    #[tokio::test]
    async fn multiple_simple_json_event() {
        let n = 200;
        let (sink, source) = start(
            JsonSerializerConfig::default().into(),
            Compression::gzip_default(),
            None,
        )
        .await;
        let messages = (0..n)
            .map(|i| format!("multiple_simple_json_event{}", i))
            .collect::<Vec<_>>();
        let events = channel_n(messages.clone(), sink, source).await;
        for (msg, event) in messages.into_iter().zip(events.into_iter()) {
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                msg.into()
            );
            assert!(event.as_log().get_timestamp().is_some());
            assert_eq!(
                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
                "splunk_hec".into()
            );
            assert!(event.metadata().splunk_hec_token().is_none());
        }
    }
    #[tokio::test]
    async fn json_event() {
        let (sink, source) = start(
            JsonSerializerConfig::default().into(),
            Compression::gzip_default(),
            None,
        )
        .await;
        let mut log = LogEvent::default();
        log.insert("greeting", "hello");
        log.insert("name", "bob");
        sink.run_events(vec![log.into()]).await.unwrap();
        let event = collect_n(source, 1).await.remove(0).into_log();
        assert_eq!(event["greeting"], "hello".into());
        assert_eq!(event["name"], "bob".into());
        assert!(event.get_timestamp().is_some());
        assert_eq!(
            event[log_schema().source_type_key().unwrap().to_string()],
            "splunk_hec".into()
        );
        assert!(event.metadata().splunk_hec_token().is_none());
    }
    #[tokio::test]
    async fn json_invalid_path_event() {
        let (sink, source) = start(
            JsonSerializerConfig::default().into(),
            Compression::gzip_default(),
            None,
        )
        .await;
        let mut log = LogEvent::default();
        log.insert(event_path!("(greeting | thing"), "hello");
        sink.run_events(vec![log.into()]).await.unwrap();
        let event = collect_n(source, 1).await.remove(0).into_log();
        assert_eq!(
            event.get(event_path!("(greeting | thing")),
            Some(&Value::from("hello"))
        );
    }
    #[tokio::test]
    async fn line_to_message() {
        let (sink, source) = start(
            JsonSerializerConfig::default().into(),
            Compression::gzip_default(),
            None,
        )
        .await;
        let mut event = LogEvent::default();
        event.insert("line", "hello");
        sink.run_events(vec![event.into()]).await.unwrap();
        let event = collect_n(source, 1).await.remove(0);
        assert_eq!(
            event.as_log()[log_schema().message_key().unwrap().to_string()],
            "hello".into()
        );
        assert!(event.metadata().splunk_hec_token().is_none());
    }
    #[tokio::test]
    async fn raw() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = "raw";
            let (source, address) = source(None).await;
            assert_eq!(200, post(address, "services/collector/raw", message).await);
            let event = collect_n(source, 1).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                message.into()
            );
            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
            assert!(event.as_log().get_timestamp().is_some());
            assert_eq!(
                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
                "splunk_hec".into()
            );
            assert!(event.metadata().splunk_hec_token().is_none());
        })
        .await;
    }
    #[tokio::test]
    async fn root() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = r#"{ "event": { "message": "root"} }"#;
            let (source, address) = source(None).await;
            assert_eq!(200, post(address, "services/collector", message).await);
            let event = collect_n(source, 1).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                "root".into()
            );
            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
            assert!(event.as_log().get_timestamp().is_some());
            assert_eq!(
                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
                "splunk_hec".into()
            );
            assert!(event.metadata().splunk_hec_token().is_none());
        })
        .await;
    }
    #[tokio::test]
    async fn channel_header() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = "raw";
            let (source, address) = source(None).await;
            let opts = SendWithOpts {
                channel: Some(Channel::Header("guid")),
                forwarded_for: None,
            };
            assert_eq!(
                200,
                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
            );
            let event = collect_n(source, 1).await.remove(0);
            assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
        })
        .await;
    }
    #[tokio::test]
    async fn xff_header_raw() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = "raw";
            let (source, address) = source(None).await;
            let opts = SendWithOpts {
                channel: Some(Channel::Header("guid")),
                forwarded_for: Some(String::from("10.0.0.1")),
            };
            assert_eq!(
                200,
                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
            );
            let event = collect_n(source, 1).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
                "10.0.0.1".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn xff_header_event_with_host_field() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = r#"{"event":"first", "host": "10.1.0.2"}"#;
            let (source, address) = source(None).await;
            let opts = SendWithOpts {
                channel: Some(Channel::Header("guid")),
                forwarded_for: Some(String::from("10.0.0.1")),
            };
            assert_eq!(
                200,
                send_with(address, "services/collector/event", message, TOKEN, &opts).await
            );
            let event = collect_n(source, 1).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
                "10.1.0.2".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn xff_header_event_without_host_field() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = r#"{"event":"first", "color": "blue"}"#;
            let (source, address) = source(None).await;
            let opts = SendWithOpts {
                channel: Some(Channel::Header("guid")),
                forwarded_for: Some(String::from("10.0.0.1")),
            };
            assert_eq!(
                200,
                send_with(address, "services/collector/event", message, TOKEN, &opts).await
            );
            let event = collect_n(source, 1).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
                "10.0.0.1".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn channel_query_param() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = "raw";
            let (source, address) = source(None).await;
            let opts = SendWithOpts {
                channel: Some(Channel::QueryParam("guid")),
                forwarded_for: None,
            };
            assert_eq!(
                200,
                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
            );
            let event = collect_n(source, 1).await.remove(0);
            assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
        })
        .await;
    }
    #[tokio::test]
    async fn no_data() {
        let (_source, address) = source(None).await;
        assert_eq!(400, post(address, "services/collector/event", "").await);
    }
    #[tokio::test]
    async fn invalid_token() {
        assert_source_error(&COMPONENT_ERROR_TAGS, async {
            let (_source, address) = source(None).await;
            let opts = SendWithOpts {
                channel: Some(Channel::Header("channel")),
                forwarded_for: None,
            };
            assert_eq!(
                401,
                send_with(address, "services/collector/event", "", "nope", &opts).await
            );
        })
        .await;
    }
    #[tokio::test]
    async fn health_ignores_token() {
        let (_source, address) = source(None).await;
        let res = reqwest::Client::new()
            .get(format!("http://{}/services/collector/health", address))
            .header("Authorization", format!("Splunk {}", "invalid token"))
            .send()
            .await
            .unwrap();
        assert_eq!(200, res.status().as_u16());
    }
    #[tokio::test]
    async fn health() {
        let (_source, address) = source(None).await;
        let res = reqwest::Client::new()
            .get(format!("http://{}/services/collector/health", address))
            .send()
            .await
            .unwrap();
        assert_eq!(200, res.status().as_u16());
    }
    #[tokio::test]
    async fn secondary_token() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = r#"{"event":"first", "color": "blue"}"#;
            let (_source, address) = source_with(None, Some(VALID_TOKENS), None, false).await;
            let options = SendWithOpts {
                channel: None,
                forwarded_for: None,
            };
            assert_eq!(
                200,
                send_with(
                    address,
                    "services/collector/event",
                    message,
                    VALID_TOKENS.get(1).unwrap(),
                    &options
                )
                .await
            );
        })
        .await;
    }
    #[tokio::test]
    async fn event_service_token_passthrough_enabled() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = "passthrough_token_enabled";
            let (source, address) = source_with(None, Some(VALID_TOKENS), None, true).await;
            let (sink, health) = sink(
                address,
                TextSerializerConfig::default().into(),
                Compression::gzip_default(),
            )
            .await;
            assert!(health.await.is_ok());
            let event = channel_n(vec![message], sink, source).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                message.into()
            );
            assert_eq!(
                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
                TOKEN
            );
        })
        .await;
    }
    #[tokio::test]
    async fn raw_service_token_passthrough_enabled() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = "raw";
            let (source, address) = source_with(None, Some(VALID_TOKENS), None, true).await;
            assert_eq!(200, post(address, "services/collector/raw", message).await);
            let event = collect_n(source, 1).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                message.into()
            );
            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
            assert!(event.as_log().get_timestamp().is_some());
            assert_eq!(
                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
                "splunk_hec".into()
            );
            assert_eq!(
                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
                TOKEN
            );
        })
        .await;
    }
    #[tokio::test]
    async fn no_authorization() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = "no_authorization";
            let (source, address) = source_with(None, None, None, false).await;
            let (sink, health) = sink(
                address,
                TextSerializerConfig::default().into(),
                Compression::gzip_default(),
            )
            .await;
            assert!(health.await.is_ok());
            let event = channel_n(vec![message], sink, source).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                message.into()
            );
            assert!(event.metadata().splunk_hec_token().is_none());
        })
        .await;
    }
    #[tokio::test]
    async fn no_authorization_token_passthrough_enabled() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = "no_authorization";
            let (source, address) = source_with(None, None, None, true).await;
            let (sink, health) = sink(
                address,
                TextSerializerConfig::default().into(),
                Compression::gzip_default(),
            )
            .await;
            assert!(health.await.is_ok());
            let event = channel_n(vec![message], sink, source).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                message.into()
            );
            assert_eq!(
                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
                TOKEN
            );
        })
        .await;
    }
    #[tokio::test]
    async fn partial() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = r#"{"event":"first"}{"event":"second""#;
            let (source, address) = source(None).await;
            assert_eq!(
                400,
                post(address, "services/collector/event", message).await
            );
            let event = collect_n(source, 1).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                "first".into()
            );
            assert!(event.as_log().get_timestamp().is_some());
            assert_eq!(
                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
                "splunk_hec".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn handles_newlines() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = r#"
{"event":"first"}
        "#;
            let (source, address) = source(None).await;
            assert_eq!(
                200,
                post(address, "services/collector/event", message).await
            );
            let event = collect_n(source, 1).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                "first".into()
            );
            assert!(event.as_log().get_timestamp().is_some());
            assert_eq!(
                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
                "splunk_hec".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn handles_spaces() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = r#" {"event":"first"} "#;
            let (source, address) = source(None).await;
            assert_eq!(
                200,
                post(address, "services/collector/event", message).await
            );
            let event = collect_n(source, 1).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                "first".into()
            );
            assert!(event.as_log().get_timestamp().is_some());
            assert_eq!(
                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
                "splunk_hec".into()
            );
        })
        .await;
    }
    #[tokio::test]
    async fn handles_non_utf8() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
        let message = b" {\"event\": { \"non\": \"A non UTF8 character \xE4\", \"number\": 2, \"bool\": true } } ";
        let (source, address) = source(None).await;
        let b = reqwest::Client::new()
            .post(format!(
                "http://{}/{}",
                address, "services/collector/event"
            ))
            .header("Authorization", format!("Splunk {}", TOKEN))
            .body::<&[u8]>(message);
        assert_eq!(200, b.send().await.unwrap().status().as_u16());
        let event = collect_n(source, 1).await.remove(0);
        assert_eq!(event.as_log()["non"], "A non UTF8 character �".into());
        assert_eq!(event.as_log()["number"], 2.into());
        assert_eq!(event.as_log()["bool"], true.into());
        assert!(event.as_log().get((lookup::PathPrefix::Event, log_schema().timestamp_key().unwrap())).is_some());
        assert_eq!(
            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
            "splunk_hec".into()
        );
    }).await;
    }
    #[tokio::test]
    async fn default() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
        let message = r#"{"event":"first","source":"main"}{"event":"second"}{"event":"third","source":"secondary"}"#;
        let (source, address) = source(None).await;
        assert_eq!(
            200,
            post(address, "services/collector/event", message).await
        );
        let events = collect_n(source, 3).await;
        assert_eq!(
            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
            "first".into()
        );
        assert_eq!(events[0].as_log()[&super::SOURCE], "main".into());
        assert_eq!(
            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
            "second".into()
        );
        assert_eq!(events[1].as_log()[&super::SOURCE], "main".into());
        assert_eq!(
            events[2].as_log()[log_schema().message_key().unwrap().to_string()],
            "third".into()
        );
        assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into());
    }).await;
    }
    #[test]
    fn parse_timestamps() {
        let cases = vec![
            Utc::now(),
            Utc.with_ymd_and_hms(1971, 11, 7, 1, 1, 1)
                .single()
                .expect("invalid timestamp"),
            Utc.with_ymd_and_hms(2011, 8, 5, 1, 1, 1)
                .single()
                .expect("invalid timestamp"),
            Utc.with_ymd_and_hms(2189, 11, 4, 2, 2, 2)
                .single()
                .expect("invalid timestamp"),
        ];
        for case in cases {
            let sec = case.timestamp();
            let millis = case.timestamp_millis();
            let nano = case.timestamp_nanos_opt().expect("Timestamp out of range");
            assert_eq!(parse_timestamp(sec).unwrap().timestamp(), case.timestamp());
            assert_eq!(
                parse_timestamp(millis).unwrap().timestamp_millis(),
                case.timestamp_millis()
            );
            assert_eq!(
                parse_timestamp(nano)
                    .unwrap()
                    .timestamp_nanos_opt()
                    .unwrap(),
                case.timestamp_nanos_opt().expect("Timestamp out of range")
            );
        }
        assert!(parse_timestamp(-1).is_none());
    }
    #[tokio::test]
    async fn host_test() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let message = "for the host";
            let (sink, source) = start(
                TextSerializerConfig::default().into(),
                Compression::gzip_default(),
                None,
            )
            .await;
            let event = channel_n(vec![message], sink, source).await.remove(0);
            assert_eq!(
                event.as_log()[log_schema().message_key().unwrap().to_string()],
                message.into()
            );
            assert!(event
                .as_log()
                .get((PathPrefix::Event, log_schema().host_key().unwrap()))
                .is_none());
        })
        .await;
    }
    #[derive(Deserialize)]
    struct HecAckEventResponse {
        text: String,
        code: u8,
        #[serde(rename = "ackId")]
        ack_id: u64,
    }
    #[tokio::test]
    async fn ack_json_event() {
        let ack_config = HecAcknowledgementsConfig {
            enabled: Some(true),
            ..Default::default()
        };
        let (source, address) = source(Some(ack_config)).await;
        let event_message = r#"{"event":"first", "color": "blue"}{"event":"second"}"#;
        let opts = SendWithOpts {
            channel: Some(Channel::Header("guid")),
            forwarded_for: None,
        };
        let event_res = send_with_response(
            address,
            "services/collector/event",
            event_message,
            TOKEN,
            &opts,
        )
        .await
        .json::<HecAckEventResponse>()
        .await
        .unwrap();
        assert_eq!("Success", event_res.text.as_str());
        assert_eq!(0, event_res.code);
        _ = collect_n(source, 1).await;
        let ack_message = serde_json::to_string(&HecAckStatusRequest {
            acks: vec![event_res.ack_id],
        })
        .unwrap();
        let ack_res = send_with_response(
            address,
            "services/collector/ack",
            ack_message.as_str(),
            TOKEN,
            &opts,
        )
        .await
        .json::<HecAckStatusResponse>()
        .await
        .unwrap();
        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
    }
    #[tokio::test]
    async fn ack_raw_event() {
        let ack_config = HecAcknowledgementsConfig {
            enabled: Some(true),
            ..Default::default()
        };
        let (source, address) = source(Some(ack_config)).await;
        let event_message = "raw event message";
        let opts = SendWithOpts {
            channel: Some(Channel::Header("guid")),
            forwarded_for: None,
        };
        let event_res = send_with_response(
            address,
            "services/collector/raw",
            event_message,
            TOKEN,
            &opts,
        )
        .await
        .json::<HecAckEventResponse>()
        .await
        .unwrap();
        assert_eq!("Success", event_res.text.as_str());
        assert_eq!(0, event_res.code);
        _ = collect_n(source, 1).await;
        let ack_message = serde_json::to_string(&HecAckStatusRequest {
            acks: vec![event_res.ack_id],
        })
        .unwrap();
        let ack_res = send_with_response(
            address,
            "services/collector/ack",
            ack_message.as_str(),
            TOKEN,
            &opts,
        )
        .await
        .json::<HecAckStatusResponse>()
        .await
        .unwrap();
        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
    }
    #[tokio::test]
    async fn ack_repeat_ack_query() {
        let ack_config = HecAcknowledgementsConfig {
            enabled: Some(true),
            ..Default::default()
        };
        let (source, address) = source(Some(ack_config)).await;
        let event_message = "raw event message";
        let opts = SendWithOpts {
            channel: Some(Channel::Header("guid")),
            forwarded_for: None,
        };
        let event_res = send_with_response(
            address,
            "services/collector/raw",
            event_message,
            TOKEN,
            &opts,
        )
        .await
        .json::<HecAckEventResponse>()
        .await
        .unwrap();
        _ = collect_n(source, 1).await;
        let ack_message = serde_json::to_string(&HecAckStatusRequest {
            acks: vec![event_res.ack_id],
        })
        .unwrap();
        let ack_res = send_with_response(
            address,
            "services/collector/ack",
            ack_message.as_str(),
            TOKEN,
            &opts,
        )
        .await
        .json::<HecAckStatusResponse>()
        .await
        .unwrap();
        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
        let ack_res = send_with_response(
            address,
            "services/collector/ack",
            ack_message.as_str(),
            TOKEN,
            &opts,
        )
        .await
        .json::<HecAckStatusResponse>()
        .await
        .unwrap();
        assert!(!ack_res.acks.get(&event_res.ack_id).unwrap());
    }
    #[tokio::test]
    async fn ack_exceed_max_number_of_ack_channels() {
        let ack_config = HecAcknowledgementsConfig {
            enabled: Some(true),
            max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
            ..Default::default()
        };
        let (_source, address) = source(Some(ack_config)).await;
        let mut opts = SendWithOpts {
            channel: Some(Channel::Header("guid")),
            forwarded_for: None,
        };
        assert_eq!(
            200,
            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
        );
        opts.channel = Some(Channel::Header("other-guid"));
        assert_eq!(
            503,
            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
        );
        assert_eq!(
            503,
            send_with(
                address,
                "services/collector/event",
                r#"{"event":"first"}"#,
                TOKEN,
                &opts
            )
            .await
        );
    }
    #[tokio::test]
    async fn ack_exceed_max_pending_acks_per_channel() {
        let ack_config = HecAcknowledgementsConfig {
            enabled: Some(true),
            max_pending_acks_per_channel: NonZeroU64::new(1).unwrap(),
            ..Default::default()
        };
        let (source, address) = source(Some(ack_config)).await;
        let opts = SendWithOpts {
            channel: Some(Channel::Header("guid")),
            forwarded_for: None,
        };
        for _ in 0..5 {
            send_with(
                address,
                "services/collector/event",
                r#"{"event":"first"}"#,
                TOKEN,
                &opts,
            )
            .await;
        }
        for _ in 0..5 {
            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await;
        }
        let event_res = send_with_response(
            address,
            "services/collector/event",
            r#"{"event":"this will be acked"}"#,
            TOKEN,
            &opts,
        )
        .await
        .json::<HecAckEventResponse>()
        .await
        .unwrap();
        _ = collect_n(source, 11).await;
        let ack_message_dropped = serde_json::to_string(&HecAckStatusRequest {
            acks: (0..10).collect::<Vec<u64>>(),
        })
        .unwrap();
        let ack_res = send_with_response(
            address,
            "services/collector/ack",
            ack_message_dropped.as_str(),
            TOKEN,
            &opts,
        )
        .await
        .json::<HecAckStatusResponse>()
        .await
        .unwrap();
        assert!(ack_res.acks.values().all(|ack_status| !*ack_status));
        let ack_message_acked = serde_json::to_string(&HecAckStatusRequest {
            acks: vec![event_res.ack_id],
        })
        .unwrap();
        let ack_res = send_with_response(
            address,
            "services/collector/ack",
            ack_message_acked.as_str(),
            TOKEN,
            &opts,
        )
        .await
        .json::<HecAckStatusResponse>()
        .await
        .unwrap();
        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
    }
    #[tokio::test]
    async fn event_service_acknowledgements_enabled_channel_required() {
        let message = r#"{"event":"first", "color": "blue"}"#;
        let ack_config = HecAcknowledgementsConfig {
            enabled: Some(true),
            ..Default::default()
        };
        let (_, address) = source(Some(ack_config)).await;
        let opts = SendWithOpts {
            channel: None,
            forwarded_for: None,
        };
        assert_eq!(
            400,
            send_with(address, "services/collector/event", message, TOKEN, &opts).await
        );
    }
    #[tokio::test]
    async fn ack_service_acknowledgements_disabled() {
        let message = r#" {"acks":[0]} "#;
        let (_, address) = source(None).await;
        let opts = SendWithOpts {
            channel: Some(Channel::Header("guid")),
            forwarded_for: None,
        };
        assert_eq!(
            400,
            send_with(address, "services/collector/ack", message, TOKEN, &opts).await
        );
    }
    #[test]
    fn output_schema_definition_vector_namespace() {
        let config = SplunkConfig {
            log_namespace: Some(true),
            ..Default::default()
        };
        let definition = config
            .outputs(LogNamespace::Vector)
            .remove(0)
            .schema_definition(true);
        let expected_definition = Definition::new_with_default_metadata(
            Kind::object(Collection::empty()).or_bytes(),
            [LogNamespace::Vector],
        )
        .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE)
        .with_metadata_field(
            &owned_value_path!("vector", "source_type"),
            Kind::bytes(),
            None,
        )
        .with_metadata_field(
            &owned_value_path!("vector", "ingest_timestamp"),
            Kind::timestamp(),
            None,
        )
        .with_metadata_field(
            &owned_value_path!("splunk_hec", "host"),
            Kind::bytes(),
            Some("host"),
        )
        .with_metadata_field(
            &owned_value_path!("splunk_hec", "index"),
            Kind::bytes(),
            None,
        )
        .with_metadata_field(
            &owned_value_path!("splunk_hec", "source"),
            Kind::bytes(),
            Some("service"),
        )
        .with_metadata_field(
            &owned_value_path!("splunk_hec", "channel"),
            Kind::bytes(),
            None,
        )
        .with_metadata_field(
            &owned_value_path!("splunk_hec", "sourcetype"),
            Kind::bytes(),
            None,
        );
        assert_eq!(definition, Some(expected_definition));
    }
    #[test]
    fn output_schema_definition_legacy_namespace() {
        let config = SplunkConfig::default();
        let definitions = config
            .outputs(LogNamespace::Legacy)
            .remove(0)
            .schema_definition(true);
        let expected_definition = Definition::new_with_default_metadata(
            Kind::object(Collection::empty()),
            [LogNamespace::Legacy],
        )
        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
        .with_event_field(
            &owned_value_path!("message"),
            Kind::bytes().or_undefined(),
            Some("message"),
        )
        .with_event_field(
            &owned_value_path!("line"),
            Kind::array(Collection::empty())
                .or_object(Collection::empty())
                .or_undefined(),
            None,
        )
        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
        .with_event_field(&owned_value_path!("splunk_channel"), Kind::bytes(), None)
        .with_event_field(&owned_value_path!("splunk_index"), Kind::bytes(), None)
        .with_event_field(
            &owned_value_path!("splunk_source"),
            Kind::bytes(),
            Some("service"),
        )
        .with_event_field(&owned_value_path!("splunk_sourcetype"), Kind::bytes(), None)
        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None);
        assert_eq!(definitions, Some(expected_definition));
    }
    impl ValidatableComponent for SplunkConfig {
        fn validation_configuration() -> ValidationConfiguration {
            let config = Self {
                address: default_socket_address(),
                ..Default::default()
            };
            let listen_addr_http = format!("http://{}/services/collector/event", config.address);
            let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
            let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into();
            let framing = BytesDecoderConfig::new().into();
            let decoding = DeserializerConfig::Json(Default::default());
            let external_resource = ExternalResource::new(
                ResourceDirection::Push,
                HttpResourceConfig::from_parts(uri, None).with_headers(HashMap::from([(
                    X_SPLUNK_REQUEST_CHANNEL.to_string(),
                    "channel".to_string(),
                )])),
                DecodingConfig::new(framing, decoding, false.into()),
            );
            ValidationConfiguration::from_source(
                Self::NAME,
                log_namespace,
                vec![ComponentTestCaseConfig::from_source(
                    config,
                    None,
                    Some(external_resource),
                )],
            )
        }
    }
    register_validatable_component!(SplunkConfig);
}