use std::collections::{HashMap, HashSet};
use bytes::{Bytes, BytesMut};
use futures::SinkExt;
use http::{Request, Uri};
use indoc::indoc;
use vrl::event_path;
use vrl::path::OwnedValuePath;
use vrl::value::Kind;
use vector_lib::config::log_schema;
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::lookup_v2::OptionalValuePath;
use vector_lib::lookup::PathPrefix;
use vector_lib::schema;
use super::{
    encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field,
    InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion,
};
use crate::{
    codecs::Transformer,
    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
    event::{Event, KeyString, MetricTags, Value},
    http::HttpClient,
    internal_events::InfluxdbEncodingError,
    sinks::{
        util::{
            http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
            BatchConfig, Buffer, Compression, SinkBatchSettings, TowerRequestConfig,
        },
        Healthcheck, VectorSink,
    },
    tls::{TlsConfig, TlsSettings},
};
#[derive(Clone, Copy, Debug, Default)]
pub struct InfluxDbLogsDefaultBatchSettings;
impl SinkBatchSettings for InfluxDbLogsDefaultBatchSettings {
    const MAX_EVENTS: Option<usize> = None;
    const MAX_BYTES: Option<usize> = Some(1_000_000);
    const TIMEOUT_SECS: f64 = 1.0;
}
#[configurable_component(sink("influxdb_logs", "Deliver log event data to InfluxDB."))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct InfluxDbLogsConfig {
    #[configurable(
        deprecated = "This field is deprecated, and `measurement` should be used instead."
    )]
    #[configurable(metadata(docs::examples = "service"))]
    pub namespace: Option<String>,
    #[configurable(metadata(docs::examples = "vector-logs"))]
    pub measurement: Option<String>,
    #[configurable(metadata(docs::examples = "http://localhost:8086"))]
    pub endpoint: String,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "field1"))]
    #[configurable(metadata(docs::examples = "parent.child_field"))]
    pub tags: Vec<KeyString>,
    #[serde(flatten)]
    pub influxdb1_settings: Option<InfluxDb1Settings>,
    #[serde(flatten)]
    pub influxdb2_settings: Option<InfluxDb2Settings>,
    #[configurable(derived)]
    #[serde(skip_serializing_if = "crate::serde::is_default", default)]
    pub encoding: Transformer,
    #[configurable(derived)]
    #[serde(default)]
    pub batch: BatchConfig<InfluxDbLogsDefaultBatchSettings>,
    #[configurable(derived)]
    #[serde(default)]
    pub request: TowerRequestConfig,
    #[configurable(derived)]
    pub tls: Option<TlsConfig>,
    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    acknowledgements: AcknowledgementsConfig,
    #[configurable(metadata(docs::examples = "hostname"))]
    pub host_key: Option<OptionalValuePath>,
    #[configurable(metadata(docs::examples = "text"))]
    pub message_key: Option<OptionalValuePath>,
    #[configurable(metadata(docs::examples = "source"))]
    pub source_type_key: Option<OptionalValuePath>,
}
#[derive(Debug)]
struct InfluxDbLogsSink {
    uri: Uri,
    token: String,
    protocol_version: ProtocolVersion,
    measurement: String,
    tags: HashSet<KeyString>,
    transformer: Transformer,
    host_key: OwnedValuePath,
    message_key: OwnedValuePath,
    source_type_key: OwnedValuePath,
}
impl GenerateConfig for InfluxDbLogsConfig {
    fn generate_config() -> toml::Value {
        toml::from_str(indoc! {r#"
            endpoint = "http://localhost:8086/"
            namespace = "my-namespace"
            tags = []
            org = "my-org"
            bucket = "my-bucket"
            token = "${INFLUXDB_TOKEN}"
        "#})
        .unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "influxdb_logs")]
impl SinkConfig for InfluxDbLogsConfig {
    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
        let measurement = self.get_measurement()?;
        let tags: HashSet<KeyString> = self.tags.iter().cloned().collect();
        let tls_settings = TlsSettings::from_options(&self.tls)?;
        let client = HttpClient::new(tls_settings, cx.proxy())?;
        let healthcheck = self.healthcheck(client.clone())?;
        let batch = self.batch.into_batch_settings()?;
        let request = self.request.into_settings();
        let settings = influxdb_settings(
            self.influxdb1_settings.clone(),
            self.influxdb2_settings.clone(),
        )
        .unwrap();
        let endpoint = self.endpoint.clone();
        let uri = settings.write_uri(endpoint).unwrap();
        let token = settings.token();
        let protocol_version = settings.protocol_version();
        let host_key = self
            .host_key
            .as_ref()
            .and_then(|k| k.path.clone())
            .or_else(|| log_schema().host_key().cloned())
            .expect("global log_schema.host_key to be valid path");
        let message_key = self
            .message_key
            .as_ref()
            .and_then(|k| k.path.clone())
            .or_else(|| log_schema().message_key().cloned())
            .expect("global log_schema.message_key to be valid path");
        let source_type_key = self
            .source_type_key
            .as_ref()
            .and_then(|k| k.path.clone())
            .or_else(|| log_schema().source_type_key().cloned())
            .expect("global log_schema.source_type_key to be valid path");
        let sink = InfluxDbLogsSink {
            uri,
            token: token.inner().to_owned(),
            protocol_version,
            measurement,
            tags,
            transformer: self.encoding.clone(),
            host_key,
            message_key,
            source_type_key,
        };
        let sink = BatchedHttpSink::new(
            sink,
            Buffer::new(batch.size, Compression::None),
            request,
            batch.timeout,
            client,
        )
        .sink_map_err(|error| error!(message = "Fatal influxdb_logs sink error.", %error));
        #[allow(deprecated)]
        Ok((VectorSink::from_event_sink(sink), healthcheck))
    }
    fn input(&self) -> Input {
        let requirements = schema::Requirement::empty()
            .optional_meaning("message", Kind::bytes())
            .optional_meaning("host", Kind::bytes())
            .optional_meaning("timestamp", Kind::timestamp());
        Input::log().with_schema_requirement(requirements)
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}
struct InfluxDbLogsEncoder {
    protocol_version: ProtocolVersion,
    measurement: String,
    tags: HashSet<KeyString>,
    transformer: Transformer,
    host_key: OwnedValuePath,
    message_key: OwnedValuePath,
    source_type_key: OwnedValuePath,
}
impl HttpEventEncoder<BytesMut> for InfluxDbLogsEncoder {
    fn encode_event(&mut self, event: Event) -> Option<BytesMut> {
        let mut log = event.into_log();
        if let Some(message_path) = log.message_path().cloned().as_ref() {
            log.rename_key(message_path, (PathPrefix::Event, &self.message_key));
        }
        if let Some(host_path) = log.host_path().cloned().as_ref() {
            self.tags.replace(host_path.path.to_string().into());
            log.rename_key(host_path, (PathPrefix::Event, &self.host_key));
        }
        if let Some(source_type_path) = log.source_type_path().cloned().as_ref() {
            self.tags.replace(source_type_path.path.to_string().into());
            log.rename_key(source_type_path, (PathPrefix::Event, &self.source_type_key));
        }
        self.tags.replace("metric_type".into());
        log.insert(event_path!("metric_type"), "logs");
        let timestamp = encode_timestamp(match log.remove_timestamp() {
            Some(Value::Timestamp(ts)) => Some(ts),
            _ => None,
        });
        let log = {
            let mut event = Event::from(log);
            self.transformer.transform(&mut event);
            event.into_log()
        };
        let mut tags = MetricTags::default();
        let mut fields: HashMap<KeyString, Field> = HashMap::new();
        log.convert_to_fields().for_each(|(key, value)| {
            if self.tags.contains(&key[..]) {
                tags.replace(key.into(), value.to_string_lossy().into_owned());
            } else {
                fields.insert(key, to_field(value));
            }
        });
        let mut output = BytesMut::new();
        if let Err(error_message) = influx_line_protocol(
            self.protocol_version,
            &self.measurement,
            Some(tags),
            Some(fields),
            timestamp,
            &mut output,
        ) {
            emit!(InfluxdbEncodingError {
                error_message,
                count: 1
            });
            return None;
        };
        Some(output)
    }
}
impl HttpSink for InfluxDbLogsSink {
    type Input = BytesMut;
    type Output = BytesMut;
    type Encoder = InfluxDbLogsEncoder;
    fn build_encoder(&self) -> Self::Encoder {
        InfluxDbLogsEncoder {
            protocol_version: self.protocol_version,
            measurement: self.measurement.clone(),
            tags: self.tags.clone(),
            transformer: self.transformer.clone(),
            host_key: self.host_key.clone(),
            message_key: self.message_key.clone(),
            source_type_key: self.source_type_key.clone(),
        }
    }
    async fn build_request(&self, events: Self::Output) -> crate::Result<Request<Bytes>> {
        Request::post(&self.uri)
            .header("Content-Type", "text/plain")
            .header("Authorization", format!("Token {}", &self.token))
            .body(events.freeze())
            .map_err(Into::into)
    }
}
impl InfluxDbLogsConfig {
    fn get_measurement(&self) -> Result<String, &'static str> {
        match (self.measurement.as_ref(), self.namespace.as_ref()) {
            (Some(measure), Some(_)) => {
                warn!("Option `namespace` has been superseded by `measurement`.");
                Ok(measure.clone())
            }
            (Some(measure), None) => Ok(measure.clone()),
            (None, Some(namespace)) => {
                warn!(
                    "Option `namespace` has been deprecated. Use `measurement` instead. \
                       For example, you can use `measurement=<namespace>.vector` for the \
                       same effect."
                );
                Ok(format!("{}.vector", namespace))
            }
            (None, None) => Err("The `measurement` option is required."),
        }
    }
    fn healthcheck(&self, client: HttpClient) -> crate::Result<Healthcheck> {
        let config = self.clone();
        let healthcheck = healthcheck(
            config.endpoint,
            config.influxdb1_settings,
            config.influxdb2_settings,
            client,
        )?;
        Ok(healthcheck)
    }
}
fn to_field(value: &Value) -> Field {
    match value {
        Value::Integer(num) => Field::Int(*num),
        Value::Float(num) => Field::Float(num.into_inner()),
        Value::Boolean(b) => Field::Bool(*b),
        _ => Field::String(value.to_string_lossy().into_owned()),
    }
}
#[cfg(test)]
mod tests {
    use chrono::{offset::TimeZone, Utc};
    use futures::{channel::mpsc, stream, StreamExt};
    use http::{request::Parts, StatusCode};
    use indoc::indoc;
    use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
    use vector_lib::lookup::owned_value_path;
    use crate::{
        sinks::{
            influxdb::test_util::{assert_fields, split_line_protocol, ts},
            util::test::{build_test_server_status, load_sink},
        },
        test_util::{
            components::{
                run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS,
                HTTP_SINK_TAGS,
            },
            next_addr,
        },
    };
    use super::*;
    type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>;
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<InfluxDbLogsConfig>();
    }
    #[test]
    fn test_config_without_tags() {
        let config = indoc! {r#"
            namespace = "vector-logs"
            endpoint = "http://localhost:9999"
            bucket = "my-bucket"
            org = "my-org"
            token = "my-token"
        "#};
        toml::from_str::<InfluxDbLogsConfig>(config).unwrap();
    }
    #[test]
    fn test_config_measurement_from_namespace() {
        let config = indoc! {r#"
            namespace = "ns"
            endpoint = "http://localhost:9999"
        "#};
        let sink_config = toml::from_str::<InfluxDbLogsConfig>(config).unwrap();
        assert_eq!("ns.vector", sink_config.get_measurement().unwrap());
    }
    #[test]
    fn test_encode_event_apply_rules() {
        let mut event = Event::Log(LogEvent::from("hello"));
        event.as_mut_log().insert("host", "aws.cloud.eur");
        event.as_mut_log().insert("timestamp", ts());
        let mut sink = create_sink(
            "http://localhost:9999",
            "my-token",
            ProtocolVersion::V1,
            "vector",
            ["metric_type", "host"].to_vec(),
        );
        sink.transformer
            .set_except_fields(Some(vec!["host".into()]))
            .unwrap();
        let mut encoder = sink.build_encoder();
        let bytes = encoder.encode_event(event.clone()).unwrap();
        let string = std::str::from_utf8(&bytes).unwrap();
        let line_protocol = split_line_protocol(string);
        assert_eq!("vector", line_protocol.0);
        assert_eq!("metric_type=logs", line_protocol.1);
        assert_fields(line_protocol.2.to_string(), ["message=\"hello\""].to_vec());
        assert_eq!("1542182950000000011\n", line_protocol.3);
        sink.transformer
            .set_except_fields(Some(vec!["metric_type".into()]))
            .unwrap();
        let mut encoder = sink.build_encoder();
        let bytes = encoder.encode_event(event.clone()).unwrap();
        let string = std::str::from_utf8(&bytes).unwrap();
        let line_protocol = split_line_protocol(string);
        assert_eq!(
            "host=aws.cloud.eur", line_protocol.1,
            "metric_type tag should be excluded"
        );
        assert_fields(line_protocol.2, ["message=\"hello\""].to_vec());
    }
    #[test]
    fn test_encode_event_v1() {
        let mut event = Event::Log(LogEvent::from("hello"));
        event.as_mut_log().insert("host", "aws.cloud.eur");
        event.as_mut_log().insert("source_type", "file");
        event.as_mut_log().insert("int", 4i32);
        event.as_mut_log().insert("float", 5.5);
        event.as_mut_log().insert("bool", true);
        event.as_mut_log().insert("string", "thisisastring");
        event.as_mut_log().insert("timestamp", ts());
        let sink = create_sink(
            "http://localhost:9999",
            "my-token",
            ProtocolVersion::V1,
            "vector",
            ["source_type", "host", "metric_type"].to_vec(),
        );
        let mut encoder = sink.build_encoder();
        let bytes = encoder.encode_event(event).unwrap();
        let string = std::str::from_utf8(&bytes).unwrap();
        let line_protocol = split_line_protocol(string);
        assert_eq!("vector", line_protocol.0);
        assert_eq!(
            "host=aws.cloud.eur,metric_type=logs,source_type=file",
            line_protocol.1
        );
        assert_fields(
            line_protocol.2.to_string(),
            [
                "int=4i",
                "float=5.5",
                "bool=true",
                "string=\"thisisastring\"",
                "message=\"hello\"",
            ]
            .to_vec(),
        );
        assert_eq!("1542182950000000011\n", line_protocol.3);
    }
    #[test]
    fn test_encode_event() {
        let mut event = Event::Log(LogEvent::from("hello"));
        event.as_mut_log().insert("host", "aws.cloud.eur");
        event.as_mut_log().insert("source_type", "file");
        event.as_mut_log().insert("int", 4i32);
        event.as_mut_log().insert("float", 5.5);
        event.as_mut_log().insert("bool", true);
        event.as_mut_log().insert("string", "thisisastring");
        event.as_mut_log().insert("timestamp", ts());
        let sink = create_sink(
            "http://localhost:9999",
            "my-token",
            ProtocolVersion::V2,
            "vector",
            ["source_type", "host", "metric_type"].to_vec(),
        );
        let mut encoder = sink.build_encoder();
        let bytes = encoder.encode_event(event).unwrap();
        let string = std::str::from_utf8(&bytes).unwrap();
        let line_protocol = split_line_protocol(string);
        assert_eq!("vector", line_protocol.0);
        assert_eq!(
            "host=aws.cloud.eur,metric_type=logs,source_type=file",
            line_protocol.1
        );
        assert_fields(
            line_protocol.2.to_string(),
            [
                "int=4i",
                "float=5.5",
                "bool=true",
                "string=\"thisisastring\"",
                "message=\"hello\"",
            ]
            .to_vec(),
        );
        assert_eq!("1542182950000000011\n", line_protocol.3);
    }
    #[test]
    fn test_encode_event_without_tags() {
        let mut event = Event::Log(LogEvent::from("hello"));
        event.as_mut_log().insert("value", 100);
        event.as_mut_log().insert("timestamp", ts());
        let mut sink = create_sink(
            "http://localhost:9999",
            "my-token",
            ProtocolVersion::V2,
            "vector",
            [].to_vec(),
        );
        sink.transformer
            .set_except_fields(Some(vec!["metric_type".into()]))
            .unwrap();
        let mut encoder = sink.build_encoder();
        let bytes = encoder.encode_event(event).unwrap();
        let line = std::str::from_utf8(&bytes).unwrap();
        assert!(
            line.starts_with("vector "),
            "measurement (without tags) should ends with space ' '"
        );
        let line_protocol = split_line_protocol(line);
        assert_eq!("vector", line_protocol.0);
        assert_eq!("", line_protocol.1, "tags should be empty");
        assert_fields(
            line_protocol.2,
            ["value=100i", "message=\"hello\""].to_vec(),
        );
        assert_eq!("1542182950000000011\n", line_protocol.3);
    }
    #[test]
    fn test_encode_nested_fields() {
        let mut event = LogEvent::default();
        event.insert("a", 1);
        event.insert("nested.field", "2");
        event.insert("nested.bool", true);
        event.insert("nested.array[0]", "example-value");
        event.insert("nested.array[2]", "another-value");
        event.insert("nested.array[3]", 15);
        let sink = create_sink(
            "http://localhost:9999",
            "my-token",
            ProtocolVersion::V2,
            "vector",
            ["metric_type"].to_vec(),
        );
        let mut encoder = sink.build_encoder();
        let bytes = encoder.encode_event(event.into()).unwrap();
        let string = std::str::from_utf8(&bytes).unwrap();
        let line_protocol = split_line_protocol(string);
        assert_eq!("vector", line_protocol.0);
        assert_eq!("metric_type=logs", line_protocol.1);
        assert_fields(
            line_protocol.2,
            [
                "a=1i",
                "nested.array[0]=\"example-value\"",
                "nested.array[1]=\"<null>\"",
                "nested.array[2]=\"another-value\"",
                "nested.array[3]=15i",
                "nested.bool=true",
                "nested.field=\"2\"",
            ]
            .to_vec(),
        );
    }
    #[test]
    fn test_add_tag() {
        let mut event = Event::Log(LogEvent::from("hello"));
        event.as_mut_log().insert("source_type", "file");
        event.as_mut_log().insert("as_a_tag", 10);
        event.as_mut_log().insert("timestamp", ts());
        let sink = create_sink(
            "http://localhost:9999",
            "my-token",
            ProtocolVersion::V2,
            "vector",
            ["as_a_tag", "not_exists_field", "source_type", "metric_type"].to_vec(),
        );
        let mut encoder = sink.build_encoder();
        let bytes = encoder.encode_event(event).unwrap();
        let string = std::str::from_utf8(&bytes).unwrap();
        let line_protocol = split_line_protocol(string);
        assert_eq!("vector", line_protocol.0);
        assert_eq!(
            "as_a_tag=10,metric_type=logs,source_type=file",
            line_protocol.1
        );
        assert_fields(line_protocol.2.to_string(), ["message=\"hello\""].to_vec());
        assert_eq!("1542182950000000011\n", line_protocol.3);
    }
    #[tokio::test]
    async fn smoke_v1() {
        let rx = smoke_test(
            r#"database = "my-database""#,
            StatusCode::OK,
            BatchStatus::Delivered,
        )
        .await;
        let query = receive_response(rx).await;
        assert!(query.contains("db=my-database"));
        assert!(query.contains("precision=ns"));
    }
    #[tokio::test]
    async fn smoke_v1_failure() {
        smoke_test(
            r#"database = "my-database""#,
            StatusCode::BAD_REQUEST,
            BatchStatus::Rejected,
        )
        .await;
    }
    #[tokio::test]
    async fn smoke_v2() {
        let rx = smoke_test(
            indoc! {r#"
            bucket = "my-bucket"
            org = "my-org"
            token = "my-token"
        "#},
            StatusCode::OK,
            BatchStatus::Delivered,
        )
        .await;
        let query = receive_response(rx).await;
        assert!(query.contains("org=my-org"));
        assert!(query.contains("bucket=my-bucket"));
        assert!(query.contains("precision=ns"));
    }
    #[tokio::test]
    async fn smoke_v2_failure() {
        smoke_test(
            indoc! {r#"
            bucket = "my-bucket"
            org = "my-org"
            token = "my-token"
        "#},
            StatusCode::BAD_REQUEST,
            BatchStatus::Rejected,
        )
        .await;
    }
    async fn smoke_test(
        config: &str,
        status_code: StatusCode,
        batch_status: BatchStatus,
    ) -> Receiver {
        let config = format!(
            indoc! {r#"
            measurement = "vector"
            endpoint = "http://localhost:9999"
            {}
        "#},
            config
        );
        let (mut config, cx) = load_sink::<InfluxDbLogsConfig>(&config).unwrap();
        _ = config.build(cx.clone()).await.unwrap();
        let addr = next_addr();
        let host = format!("http://{}", addr);
        config.endpoint = host;
        let (sink, _) = config.build(cx).await.unwrap();
        let (rx, _trigger, server) = build_test_server_status(addr, status_code);
        tokio::spawn(server);
        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
        let lines = std::iter::repeat(())
            .map(move |_| "message_value")
            .take(5)
            .collect::<Vec<_>>();
        let mut events = Vec::new();
        for (i, line) in lines.iter().enumerate() {
            let mut event = LogEvent::from(line.to_string()).with_batch_notifier(&batch);
            event.insert(format!("key{}", i).as_str(), format!("value{}", i));
            let timestamp = Utc
                .with_ymd_and_hms(1970, 1, 1, 0, 0, (i as u32) + 1)
                .single()
                .expect("invalid timestamp");
            event.insert("timestamp", timestamp);
            event.insert("source_type", "file");
            events.push(Event::Log(event));
        }
        drop(batch);
        if batch_status == BatchStatus::Delivered {
            run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
        } else {
            run_and_assert_sink_error(sink, stream::iter(events), &COMPONENT_ERROR_TAGS).await;
        }
        assert_eq!(receiver.try_recv(), Ok(batch_status));
        rx
    }
    async fn receive_response(mut rx: Receiver) -> String {
        let output = rx.next().await.unwrap();
        let request = &output.0;
        let query = request.uri.query().unwrap();
        let body = std::str::from_utf8(&output.1[..]).unwrap();
        let mut lines = body.lines();
        assert_eq!(5, lines.clone().count());
        assert_line_protocol(0, lines.next());
        query.into()
    }
    fn assert_line_protocol(i: i64, value: Option<&str>) {
        let line_protocol = split_line_protocol(value.unwrap());
        assert_eq!("vector", line_protocol.0);
        assert_eq!("metric_type=logs,source_type=file", line_protocol.1);
        assert_fields(
            line_protocol.2.to_string(),
            [
                &*format!("key{}=\"value{}\"", i, i),
                "message=\"message_value\"",
            ]
            .to_vec(),
        );
        assert_eq!(((i + 1) * 1000000000).to_string(), line_protocol.3);
    }
    fn create_sink(
        uri: &str,
        token: &str,
        protocol_version: ProtocolVersion,
        measurement: &str,
        tags: Vec<&str>,
    ) -> InfluxDbLogsSink {
        let uri = uri.parse::<Uri>().unwrap();
        let token = token.to_string();
        let measurement = measurement.to_string();
        let tags: HashSet<_> = tags.into_iter().map(|tag| tag.into()).collect();
        InfluxDbLogsSink {
            uri,
            token,
            protocol_version,
            measurement,
            tags,
            transformer: Default::default(),
            host_key: owned_value_path!("host"),
            message_key: owned_value_path!("message"),
            source_type_key: owned_value_path!("source_type"),
        }
    }
}
#[cfg(feature = "influxdb-integration-tests")]
#[cfg(test)]
mod integration_tests {
    use std::sync::Arc;
    use chrono::Utc;
    use futures::stream;
    use vrl::value;
    use vector_lib::codecs::BytesDeserializerConfig;
    use vector_lib::config::{LegacyKey, LogNamespace};
    use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
    use vector_lib::lookup::{owned_value_path, path};
    use crate::{
        config::SinkContext,
        sinks::influxdb::{
            logs::InfluxDbLogsConfig,
            test_util::{address_v2, onboarding_v2, BUCKET, ORG, TOKEN},
            InfluxDb2Settings,
        },
        test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
    };
    use super::*;
    #[tokio::test]
    async fn influxdb2_logs_put_data() {
        let endpoint = address_v2();
        onboarding_v2(&endpoint).await;
        let now = Utc::now();
        let measure = format!(
            "vector-{}",
            now.timestamp_nanos_opt().expect("Timestamp out of range")
        );
        let cx = SinkContext::default();
        let config = InfluxDbLogsConfig {
            namespace: None,
            measurement: Some(measure.clone()),
            endpoint: endpoint.clone(),
            tags: Default::default(),
            influxdb1_settings: None,
            influxdb2_settings: Some(InfluxDb2Settings {
                org: ORG.to_string(),
                bucket: BUCKET.to_string(),
                token: TOKEN.to_string().into(),
            }),
            encoding: Default::default(),
            batch: Default::default(),
            request: Default::default(),
            tls: None,
            acknowledgements: Default::default(),
            host_key: None,
            message_key: None,
            source_type_key: None,
        };
        let (sink, _) = config.build(cx).await.unwrap();
        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
        let mut event1 = LogEvent::from("message_1").with_batch_notifier(&batch);
        event1.insert("host", "aws.cloud.eur");
        event1.insert("source_type", "file");
        let mut event2 = LogEvent::from("message_2").with_batch_notifier(&batch);
        event2.insert("host", "aws.cloud.eur");
        event2.insert("source_type", "file");
        let mut namespaced_log =
            LogEvent::from(value!("namespaced message")).with_batch_notifier(&batch);
        LogNamespace::Vector.insert_source_metadata(
            "file",
            &mut namespaced_log,
            Some(LegacyKey::Overwrite(path!("host"))),
            path!("host"),
            "aws.cloud.eur",
        );
        LogNamespace::Vector.insert_standard_vector_source_metadata(
            &mut namespaced_log,
            "file",
            now,
        );
        let schema = BytesDeserializerConfig
            .schema_definition(LogNamespace::Vector)
            .with_metadata_field(
                &owned_value_path!("file", "host"),
                Kind::bytes(),
                Some("host"),
            );
        namespaced_log
            .metadata_mut()
            .set_schema_definition(&Arc::new(schema));
        drop(batch);
        let events = vec![
            Event::Log(event1),
            Event::Log(event2),
            Event::Log(namespaced_log),
        ];
        run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
        let mut body = std::collections::HashMap::new();
        body.insert("query", format!("from(bucket:\"my-bucket\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"{}\")", measure.clone()));
        body.insert("type", "flux".to_owned());
        let client = reqwest::Client::builder()
            .danger_accept_invalid_certs(true)
            .build()
            .unwrap();
        let res = client
            .post(format!("{}/api/v2/query?org=my-org", endpoint))
            .json(&body)
            .header("accept", "application/json")
            .header("Authorization", "Token my-token")
            .send()
            .await
            .unwrap();
        let string = res.text().await.unwrap();
        let lines = string.split('\n').collect::<Vec<&str>>();
        let header = lines[0].split(',').collect::<Vec<&str>>();
        let record1 = lines[1].split(',').collect::<Vec<&str>>();
        let record2 = lines[2].split(',').collect::<Vec<&str>>();
        let record_ns = lines[3].split(',').collect::<Vec<&str>>();
        assert_eq!(
            record1[header
                .iter()
                .position(|&r| r.trim() == "_measurement")
                .unwrap()]
            .trim(),
            measure.clone()
        );
        assert_eq!(
            record2[header
                .iter()
                .position(|&r| r.trim() == "_measurement")
                .unwrap()]
            .trim(),
            measure.clone()
        );
        assert_eq!(
            record_ns[header
                .iter()
                .position(|&r| r.trim() == "_measurement")
                .unwrap()]
            .trim(),
            measure.clone()
        );
        assert_eq!(
            record1[header
                .iter()
                .position(|&r| r.trim() == "metric_type")
                .unwrap()]
            .trim(),
            "logs"
        );
        assert_eq!(
            record2[header
                .iter()
                .position(|&r| r.trim() == "metric_type")
                .unwrap()]
            .trim(),
            "logs"
        );
        assert_eq!(
            record_ns[header
                .iter()
                .position(|&r| r.trim() == "metric_type")
                .unwrap()]
            .trim(),
            "logs"
        );
        assert_eq!(
            record1[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
            "aws.cloud.eur"
        );
        assert_eq!(
            record2[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
            "aws.cloud.eur"
        );
        assert_eq!(
            record_ns[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
            "aws.cloud.eur"
        );
        assert_eq!(
            record1[header
                .iter()
                .position(|&r| r.trim() == "source_type")
                .unwrap()]
            .trim(),
            "file"
        );
        assert_eq!(
            record2[header
                .iter()
                .position(|&r| r.trim() == "source_type")
                .unwrap()]
            .trim(),
            "file"
        );
        assert_eq!(
            record_ns[header
                .iter()
                .position(|&r| r.trim() == "source_type")
                .unwrap()]
            .trim(),
            "file"
        );
        assert_eq!(
            record1[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
            "message"
        );
        assert_eq!(
            record2[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
            "message"
        );
        assert_eq!(
            record_ns[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
            "message"
        );
        assert_eq!(
            record1[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
            "message_1"
        );
        assert_eq!(
            record2[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
            "message_2"
        );
        assert_eq!(
            record_ns[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
            "namespaced message"
        );
    }
}