use std::{collections::HashMap, future::ready, task::Poll};
use bytes::{Bytes, BytesMut};
use futures::{future::BoxFuture, stream, SinkExt};
use tower::Service;
use vector_lib::configurable::configurable_component;
use vector_lib::{
    event::metric::{MetricSketch, MetricTags, Quantile},
    ByteSizeOf, EstimatedJsonEncodedSizeOf,
};
use crate::{
    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
    event::{
        metric::{Metric, MetricValue, Sample, StatisticKind},
        Event, KeyString,
    },
    http::HttpClient,
    internal_events::InfluxdbEncodingError,
    sinks::{
        influxdb::{
            encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field,
            InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion,
        },
        util::{
            buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
            encode_namespace,
            http::{HttpBatchService, HttpRetryLogic},
            statistic::{validate_quantiles, DistributionStatistic},
            BatchConfig, EncodedEvent, SinkBatchSettings, TowerRequestConfig,
        },
        Healthcheck, VectorSink,
    },
    tls::{TlsConfig, TlsSettings},
};
#[derive(Clone)]
struct InfluxDbSvc {
    config: InfluxDbConfig,
    protocol_version: ProtocolVersion,
    inner: HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>>,
}
#[derive(Clone, Copy, Debug, Default)]
pub struct InfluxDbDefaultBatchSettings;
impl SinkBatchSettings for InfluxDbDefaultBatchSettings {
    const MAX_EVENTS: Option<usize> = Some(20);
    const MAX_BYTES: Option<usize> = None;
    const TIMEOUT_SECS: f64 = 1.0;
}
#[configurable_component(sink("influxdb_metrics", "Deliver metric event data to InfluxDB."))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct InfluxDbConfig {
    #[serde(alias = "namespace")]
    #[configurable(metadata(docs::examples = "service"))]
    pub default_namespace: Option<String>,
    #[configurable(metadata(docs::examples = "http://localhost:8086/"))]
    pub endpoint: String,
    #[serde(flatten)]
    pub influxdb1_settings: Option<InfluxDb1Settings>,
    #[serde(flatten)]
    pub influxdb2_settings: Option<InfluxDb2Settings>,
    #[configurable(derived)]
    #[serde(default)]
    pub batch: BatchConfig<InfluxDbDefaultBatchSettings>,
    #[configurable(derived)]
    #[serde(default)]
    pub request: TowerRequestConfig,
    #[configurable(metadata(docs::additional_props_description = "A tag key/value pair."))]
    #[configurable(metadata(docs::examples = "example_tags()"))]
    pub tags: Option<HashMap<String, String>>,
    #[configurable(derived)]
    pub tls: Option<TlsConfig>,
    #[serde(default = "default_summary_quantiles")]
    pub quantiles: Vec<f64>,
    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    acknowledgements: AcknowledgementsConfig,
}
pub fn default_summary_quantiles() -> Vec<f64> {
    vec![0.5, 0.75, 0.9, 0.95, 0.99]
}
pub fn example_tags() -> HashMap<String, String> {
    HashMap::from([("region".to_string(), "us-west-1".to_string())])
}
impl_generate_config_from_default!(InfluxDbConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "influxdb_metrics")]
impl SinkConfig for InfluxDbConfig {
    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
        let tls_settings = TlsSettings::from_options(&self.tls)?;
        let client = HttpClient::new(tls_settings, cx.proxy())?;
        let healthcheck = healthcheck(
            self.clone().endpoint,
            self.clone().influxdb1_settings,
            self.clone().influxdb2_settings,
            client.clone(),
        )?;
        validate_quantiles(&self.quantiles)?;
        let sink = InfluxDbSvc::new(self.clone(), client)?;
        Ok((sink, healthcheck))
    }
    fn input(&self) -> Input {
        Input::metric()
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}
impl InfluxDbSvc {
    pub fn new(config: InfluxDbConfig, client: HttpClient) -> crate::Result<VectorSink> {
        let settings = influxdb_settings(
            config.influxdb1_settings.clone(),
            config.influxdb2_settings.clone(),
        )?;
        let endpoint = config.endpoint.clone();
        let token = settings.token();
        let protocol_version = settings.protocol_version();
        let batch = config.batch.into_batch_settings()?;
        let request = config.request.into_settings();
        let uri = settings.write_uri(endpoint)?;
        let http_service = HttpBatchService::new(client, create_build_request(uri, token.inner()));
        let influxdb_http_service = InfluxDbSvc {
            config,
            protocol_version,
            inner: http_service,
        };
        let mut normalizer = MetricNormalizer::<InfluxMetricNormalize>::default();
        let sink = request
            .batch_sink(
                HttpRetryLogic,
                influxdb_http_service,
                MetricsBuffer::new(batch.size),
                batch.timeout,
            )
            .with_flat_map(move |event: Event| {
                stream::iter({
                    let byte_size = event.size_of();
                    let json_size = event.estimated_json_encoded_size_of();
                    normalizer
                        .normalize(event.into_metric())
                        .map(|metric| Ok(EncodedEvent::new(metric, byte_size, json_size)))
                })
            })
            .sink_map_err(|error| error!(message = "Fatal influxdb sink error.", %error));
        #[allow(deprecated)]
        Ok(VectorSink::from_event_sink(sink))
    }
}
impl Service<Vec<Metric>> for InfluxDbSvc {
    type Response = http::Response<Bytes>;
    type Error = crate::Error;
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
    fn poll_ready(&mut self, cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }
    fn call(&mut self, items: Vec<Metric>) -> Self::Future {
        let input = encode_events(
            self.protocol_version,
            items,
            self.config.default_namespace.as_deref(),
            self.config.tags.as_ref(),
            &self.config.quantiles,
        );
        let body = input.freeze();
        self.inner.call(body)
    }
}
fn create_build_request(
    uri: http::Uri,
    token: &str,
) -> impl Fn(Bytes) -> BoxFuture<'static, crate::Result<hyper::Request<Bytes>>> + Sync + Send + 'static
{
    let auth = format!("Token {}", token);
    move |body| {
        Box::pin(ready(
            hyper::Request::post(uri.clone())
                .header("Content-Type", "text/plain")
                .header("Authorization", auth.clone())
                .body(body)
                .map_err(Into::into),
        ))
    }
}
fn merge_tags(event: &Metric, tags: Option<&HashMap<String, String>>) -> Option<MetricTags> {
    match (event.tags().cloned(), tags) {
        (Some(mut event_tags), Some(config_tags)) => {
            event_tags.extend(config_tags.iter().map(|(k, v)| (k.clone(), v.clone())));
            Some(event_tags)
        }
        (Some(event_tags), None) => Some(event_tags),
        (None, Some(config_tags)) => Some(
            config_tags
                .iter()
                .map(|(k, v)| (k.clone(), v.clone()))
                .collect(),
        ),
        (None, None) => None,
    }
}
#[derive(Default)]
pub struct InfluxMetricNormalize;
impl MetricNormalize for InfluxMetricNormalize {
    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
        match (metric.kind(), &metric.value()) {
            (_, MetricValue::Counter { .. }) => state.make_incremental(metric),
            (_, MetricValue::Gauge { .. }) => state.make_absolute(metric),
            _ => Some(metric),
        }
    }
}
fn encode_events(
    protocol_version: ProtocolVersion,
    events: Vec<Metric>,
    default_namespace: Option<&str>,
    tags: Option<&HashMap<String, String>>,
    quantiles: &[f64],
) -> BytesMut {
    let mut output = BytesMut::new();
    let count = events.len();
    for event in events.into_iter() {
        let fullname = encode_namespace(event.namespace().or(default_namespace), '.', event.name());
        let ts = encode_timestamp(event.timestamp());
        let tags = merge_tags(&event, tags);
        let (metric_type, fields) = get_type_and_fields(event.value(), quantiles);
        let mut unwrapped_tags = tags.unwrap_or_default();
        unwrapped_tags.replace("metric_type".to_owned(), metric_type.to_owned());
        if let Err(error_message) = influx_line_protocol(
            protocol_version,
            &fullname,
            Some(unwrapped_tags),
            fields,
            ts,
            &mut output,
        ) {
            emit!(InfluxdbEncodingError {
                error_message,
                count,
            });
        };
    }
    if !output.is_empty() {
        output.truncate(output.len() - 1);
    }
    output
}
fn get_type_and_fields(
    value: &MetricValue,
    quantiles: &[f64],
) -> (&'static str, Option<HashMap<KeyString, Field>>) {
    match value {
        MetricValue::Counter { value } => ("counter", Some(to_fields(*value))),
        MetricValue::Gauge { value } => ("gauge", Some(to_fields(*value))),
        MetricValue::Set { values } => ("set", Some(to_fields(values.len() as f64))),
        MetricValue::AggregatedHistogram {
            buckets,
            count,
            sum,
        } => {
            let mut fields: HashMap<KeyString, Field> = buckets
                .iter()
                .map(|sample| {
                    (
                        format!("bucket_{}", sample.upper_limit).into(),
                        Field::UnsignedInt(sample.count),
                    )
                })
                .collect();
            fields.insert("count".into(), Field::UnsignedInt(*count));
            fields.insert("sum".into(), Field::Float(*sum));
            ("histogram", Some(fields))
        }
        MetricValue::AggregatedSummary {
            quantiles,
            count,
            sum,
        } => {
            let mut fields: HashMap<KeyString, Field> = quantiles
                .iter()
                .map(|quantile| {
                    (
                        format!("quantile_{}", quantile.quantile).into(),
                        Field::Float(quantile.value),
                    )
                })
                .collect();
            fields.insert("count".into(), Field::UnsignedInt(*count));
            fields.insert("sum".into(), Field::Float(*sum));
            ("summary", Some(fields))
        }
        MetricValue::Distribution { samples, statistic } => {
            let quantiles = match statistic {
                StatisticKind::Histogram => &[0.95] as &[_],
                StatisticKind::Summary => quantiles,
            };
            let fields = encode_distribution(samples, quantiles);
            ("distribution", fields)
        }
        MetricValue::Sketch { sketch } => match sketch {
            MetricSketch::AgentDDSketch(ddsketch) => {
                let mut fields = [0.5, 0.75, 0.9, 0.99]
                    .iter()
                    .map(|q| {
                        let quantile = Quantile {
                            quantile: *q,
                            value: ddsketch.quantile(*q).unwrap_or(0.0),
                        };
                        (
                            quantile.to_percentile_string().into(),
                            Field::Float(quantile.value),
                        )
                    })
                    .collect::<HashMap<KeyString, _>>();
                fields.insert(
                    "count".into(),
                    Field::UnsignedInt(u64::from(ddsketch.count())),
                );
                fields.insert(
                    "min".into(),
                    Field::Float(ddsketch.min().unwrap_or(f64::MAX)),
                );
                fields.insert(
                    "max".into(),
                    Field::Float(ddsketch.max().unwrap_or(f64::MIN)),
                );
                fields.insert("sum".into(), Field::Float(ddsketch.sum().unwrap_or(0.0)));
                fields.insert("avg".into(), Field::Float(ddsketch.avg().unwrap_or(0.0)));
                ("sketch", Some(fields))
            }
        },
    }
}
fn encode_distribution(samples: &[Sample], quantiles: &[f64]) -> Option<HashMap<KeyString, Field>> {
    let statistic = DistributionStatistic::from_samples(samples, quantiles)?;
    Some(
        [
            ("min".into(), Field::Float(statistic.min)),
            ("max".into(), Field::Float(statistic.max)),
            ("median".into(), Field::Float(statistic.median)),
            ("avg".into(), Field::Float(statistic.avg)),
            ("sum".into(), Field::Float(statistic.sum)),
            ("count".into(), Field::Float(statistic.count as f64)),
        ]
        .into_iter()
        .chain(
            statistic
                .quantiles
                .iter()
                .map(|&(p, val)| (format!("quantile_{:.2}", p).into(), Field::Float(val))),
        )
        .collect(),
    )
}
fn to_fields(value: f64) -> HashMap<KeyString, Field> {
    [("value".into(), Field::Float(value))]
        .into_iter()
        .collect()
}
#[cfg(test)]
mod tests {
    use indoc::indoc;
    use similar_asserts::assert_eq;
    use super::*;
    use crate::{
        event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
        sinks::influxdb::test_util::{assert_fields, split_line_protocol, tags, ts},
    };
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<InfluxDbConfig>();
    }
    #[test]
    fn test_config_with_tags() {
        let config = indoc! {r#"
            namespace = "vector"
            endpoint = "http://localhost:9999"
            tags = {region="us-west-1"}
        "#};
        toml::from_str::<InfluxDbConfig>(config).unwrap();
    }
    #[test]
    fn test_encode_counter() {
        let events = vec![
            Metric::new(
                "total",
                MetricKind::Incremental,
                MetricValue::Counter { value: 1.5 },
            )
            .with_namespace(Some("ns"))
            .with_timestamp(Some(ts())),
            Metric::new(
                "check",
                MetricKind::Incremental,
                MetricValue::Counter { value: 1.0 },
            )
            .with_namespace(Some("ns"))
            .with_tags(Some(tags()))
            .with_timestamp(Some(ts())),
        ];
        let line_protocols = encode_events(ProtocolVersion::V2, events, Some("vector"), None, &[]);
        assert_eq!(
            line_protocols,
            "ns.total,metric_type=counter value=1.5 1542182950000000011\n\
            ns.check,metric_type=counter,normal_tag=value,true_tag=true value=1 1542182950000000011"
        );
    }
    #[test]
    fn test_encode_gauge() {
        let events = vec![Metric::new(
            "meter",
            MetricKind::Incremental,
            MetricValue::Gauge { value: -1.5 },
        )
        .with_namespace(Some("ns"))
        .with_tags(Some(tags()))
        .with_timestamp(Some(ts()))];
        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
        assert_eq!(
            line_protocols,
            "ns.meter,metric_type=gauge,normal_tag=value,true_tag=true value=-1.5 1542182950000000011"
        );
    }
    #[test]
    fn test_encode_set() {
        let events = vec![Metric::new(
            "users",
            MetricKind::Incremental,
            MetricValue::Set {
                values: vec!["alice".into(), "bob".into()].into_iter().collect(),
            },
        )
        .with_namespace(Some("ns"))
        .with_tags(Some(tags()))
        .with_timestamp(Some(ts()))];
        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
        assert_eq!(
            line_protocols,
            "ns.users,metric_type=set,normal_tag=value,true_tag=true value=2 1542182950000000011"
        );
    }
    #[test]
    fn test_encode_histogram_v1() {
        let events = vec![Metric::new(
            "requests",
            MetricKind::Absolute,
            MetricValue::AggregatedHistogram {
                buckets: vector_lib::buckets![1.0 => 1, 2.1 => 2, 3.0 => 3],
                count: 6,
                sum: 12.5,
            },
        )
        .with_namespace(Some("ns"))
        .with_tags(Some(tags()))
        .with_timestamp(Some(ts()))];
        let line_protocols = encode_events(ProtocolVersion::V1, events, None, None, &[]);
        let line_protocols =
            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
        assert_eq!(line_protocols.len(), 1);
        let line_protocol1 = split_line_protocol(line_protocols[0]);
        assert_eq!("ns.requests", line_protocol1.0);
        assert_eq!(
            "metric_type=histogram,normal_tag=value,true_tag=true",
            line_protocol1.1
        );
        assert_fields(
            line_protocol1.2.to_string(),
            [
                "bucket_1=1i",
                "bucket_2.1=2i",
                "bucket_3=3i",
                "count=6i",
                "sum=12.5",
            ]
            .to_vec(),
        );
        assert_eq!("1542182950000000011", line_protocol1.3);
    }
    #[test]
    fn test_encode_histogram() {
        let events = vec![Metric::new(
            "requests",
            MetricKind::Absolute,
            MetricValue::AggregatedHistogram {
                buckets: vector_lib::buckets![1.0 => 1, 2.1 => 2, 3.0 => 3],
                count: 6,
                sum: 12.5,
            },
        )
        .with_namespace(Some("ns"))
        .with_tags(Some(tags()))
        .with_timestamp(Some(ts()))];
        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
        let line_protocols =
            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
        assert_eq!(line_protocols.len(), 1);
        let line_protocol1 = split_line_protocol(line_protocols[0]);
        assert_eq!("ns.requests", line_protocol1.0);
        assert_eq!(
            "metric_type=histogram,normal_tag=value,true_tag=true",
            line_protocol1.1
        );
        assert_fields(
            line_protocol1.2.to_string(),
            [
                "bucket_1=1u",
                "bucket_2.1=2u",
                "bucket_3=3u",
                "count=6u",
                "sum=12.5",
            ]
            .to_vec(),
        );
        assert_eq!("1542182950000000011", line_protocol1.3);
    }
    #[test]
    fn test_encode_summary_v1() {
        let events = vec![Metric::new(
            "requests_sum",
            MetricKind::Absolute,
            MetricValue::AggregatedSummary {
                quantiles: vector_lib::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0],
                count: 6,
                sum: 12.0,
            },
        )
        .with_namespace(Some("ns"))
        .with_tags(Some(tags()))
        .with_timestamp(Some(ts()))];
        let line_protocols = encode_events(ProtocolVersion::V1, events, None, None, &[]);
        let line_protocols =
            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
        assert_eq!(line_protocols.len(), 1);
        let line_protocol1 = split_line_protocol(line_protocols[0]);
        assert_eq!("ns.requests_sum", line_protocol1.0);
        assert_eq!(
            "metric_type=summary,normal_tag=value,true_tag=true",
            line_protocol1.1
        );
        assert_fields(
            line_protocol1.2.to_string(),
            [
                "count=6i",
                "quantile_0.01=1.5",
                "quantile_0.5=2",
                "quantile_0.99=3",
                "sum=12",
            ]
            .to_vec(),
        );
        assert_eq!("1542182950000000011", line_protocol1.3);
    }
    #[test]
    fn test_encode_summary() {
        let events = vec![Metric::new(
            "requests_sum",
            MetricKind::Absolute,
            MetricValue::AggregatedSummary {
                quantiles: vector_lib::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0],
                count: 6,
                sum: 12.0,
            },
        )
        .with_namespace(Some("ns"))
        .with_tags(Some(tags()))
        .with_timestamp(Some(ts()))];
        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
        let line_protocols =
            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
        assert_eq!(line_protocols.len(), 1);
        let line_protocol1 = split_line_protocol(line_protocols[0]);
        assert_eq!("ns.requests_sum", line_protocol1.0);
        assert_eq!(
            "metric_type=summary,normal_tag=value,true_tag=true",
            line_protocol1.1
        );
        assert_fields(
            line_protocol1.2.to_string(),
            [
                "count=6u",
                "quantile_0.01=1.5",
                "quantile_0.5=2",
                "quantile_0.99=3",
                "sum=12",
            ]
            .to_vec(),
        );
        assert_eq!("1542182950000000011", line_protocol1.3);
    }
    #[test]
    fn test_encode_distribution() {
        let events = vec![
            Metric::new(
                "requests",
                MetricKind::Incremental,
                MetricValue::Distribution {
                    samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
                    statistic: StatisticKind::Histogram,
                },
            )
            .with_namespace(Some("ns"))
            .with_tags(Some(tags()))
            .with_timestamp(Some(ts())),
            Metric::new(
                "dense_stats",
                MetricKind::Incremental,
                MetricValue::Distribution {
                    samples: (0..20)
                        .map(|v| Sample {
                            value: f64::from(v),
                            rate: 1,
                        })
                        .collect(),
                    statistic: StatisticKind::Histogram,
                },
            )
            .with_namespace(Some("ns"))
            .with_timestamp(Some(ts())),
            Metric::new(
                "sparse_stats",
                MetricKind::Incremental,
                MetricValue::Distribution {
                    samples: (1..5)
                        .map(|v| Sample {
                            value: f64::from(v),
                            rate: v,
                        })
                        .collect(),
                    statistic: StatisticKind::Histogram,
                },
            )
            .with_namespace(Some("ns"))
            .with_timestamp(Some(ts())),
        ];
        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
        let line_protocols =
            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
        assert_eq!(line_protocols.len(), 3);
        let line_protocol1 = split_line_protocol(line_protocols[0]);
        assert_eq!("ns.requests", line_protocol1.0);
        assert_eq!(
            "metric_type=distribution,normal_tag=value,true_tag=true",
            line_protocol1.1
        );
        assert_fields(
            line_protocol1.2.to_string(),
            [
                "avg=1.875",
                "count=8",
                "max=3",
                "median=2",
                "min=1",
                "quantile_0.95=3",
                "sum=15",
            ]
            .to_vec(),
        );
        assert_eq!("1542182950000000011", line_protocol1.3);
        let line_protocol2 = split_line_protocol(line_protocols[1]);
        assert_eq!("ns.dense_stats", line_protocol2.0);
        assert_eq!("metric_type=distribution", line_protocol2.1);
        assert_fields(
            line_protocol2.2.to_string(),
            [
                "avg=9.5",
                "count=20",
                "max=19",
                "median=9",
                "min=0",
                "quantile_0.95=18",
                "sum=190",
            ]
            .to_vec(),
        );
        assert_eq!("1542182950000000011", line_protocol2.3);
        let line_protocol3 = split_line_protocol(line_protocols[2]);
        assert_eq!("ns.sparse_stats", line_protocol3.0);
        assert_eq!("metric_type=distribution", line_protocol3.1);
        assert_fields(
            line_protocol3.2.to_string(),
            [
                "avg=3",
                "count=10",
                "max=4",
                "median=3",
                "min=1",
                "quantile_0.95=4",
                "sum=30",
            ]
            .to_vec(),
        );
        assert_eq!("1542182950000000011", line_protocol3.3);
    }
    #[test]
    fn test_encode_distribution_empty_stats() {
        let events = vec![Metric::new(
            "requests",
            MetricKind::Incremental,
            MetricValue::Distribution {
                samples: vec![],
                statistic: StatisticKind::Histogram,
            },
        )
        .with_namespace(Some("ns"))
        .with_tags(Some(tags()))
        .with_timestamp(Some(ts()))];
        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
        assert_eq!(line_protocols.len(), 0);
    }
    #[test]
    fn test_encode_distribution_zero_counts_stats() {
        let events = vec![Metric::new(
            "requests",
            MetricKind::Incremental,
            MetricValue::Distribution {
                samples: vector_lib::samples![1.0 => 0, 2.0 => 0],
                statistic: StatisticKind::Histogram,
            },
        )
        .with_namespace(Some("ns"))
        .with_tags(Some(tags()))
        .with_timestamp(Some(ts()))];
        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
        assert_eq!(line_protocols.len(), 0);
    }
    #[test]
    fn test_encode_distribution_summary() {
        let events = vec![Metric::new(
            "requests",
            MetricKind::Incremental,
            MetricValue::Distribution {
                samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
                statistic: StatisticKind::Summary,
            },
        )
        .with_namespace(Some("ns"))
        .with_tags(Some(tags()))
        .with_timestamp(Some(ts()))];
        let line_protocols = encode_events(
            ProtocolVersion::V2,
            events,
            None,
            None,
            &default_summary_quantiles(),
        );
        let line_protocols =
            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
        assert_eq!(line_protocols.len(), 1);
        let line_protocol = split_line_protocol(line_protocols[0]);
        assert_eq!("ns.requests", line_protocol.0);
        assert_eq!(
            "metric_type=distribution,normal_tag=value,true_tag=true",
            line_protocol.1
        );
        assert_fields(
            line_protocol.2.to_string(),
            [
                "avg=1.875",
                "count=8",
                "max=3",
                "median=2",
                "min=1",
                "sum=15",
                "quantile_0.50=2",
                "quantile_0.75=2",
                "quantile_0.90=3",
                "quantile_0.95=3",
                "quantile_0.99=3",
            ]
            .to_vec(),
        );
        assert_eq!("1542182950000000011", line_protocol.3);
    }
    #[test]
    fn test_encode_with_some_tags() {
        crate::test_util::trace_init();
        let events = vec![
            Metric::new(
                "cpu",
                MetricKind::Absolute,
                MetricValue::Gauge { value: 2.5 },
            )
            .with_namespace(Some("vector"))
            .with_timestamp(Some(ts())),
            Metric::new(
                "mem",
                MetricKind::Absolute,
                MetricValue::Gauge { value: 1000.0 },
            )
            .with_namespace(Some("vector"))
            .with_tags(Some(tags()))
            .with_timestamp(Some(ts())),
        ];
        let mut tags = HashMap::new();
        tags.insert("host".to_owned(), "local".to_owned());
        tags.insert("datacenter".to_owned(), "us-east".to_owned());
        let line_protocols = encode_events(
            ProtocolVersion::V1,
            events,
            Some("ns"),
            Some(tags).as_ref(),
            &[],
        );
        let line_protocols =
            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
        assert_eq!(line_protocols.len(), 2);
        assert_eq!(
            line_protocols[0],
            "vector.cpu,datacenter=us-east,host=local,metric_type=gauge value=2.5 1542182950000000011"
        );
        assert_eq!(
            line_protocols[1],
            "vector.mem,datacenter=us-east,host=local,metric_type=gauge,normal_tag=value,true_tag=true value=1000 1542182950000000011"
        );
    }
}
#[cfg(feature = "influxdb-integration-tests")]
#[cfg(test)]
mod integration_tests {
    use chrono::{SecondsFormat, Utc};
    use futures::stream;
    use similar_asserts::assert_eq;
    use vector_lib::metric_tags;
    use crate::{
        config::{SinkConfig, SinkContext},
        event::{
            metric::{Metric, MetricKind, MetricValue},
            Event,
        },
        http::HttpClient,
        sinks::influxdb::{
            metrics::{default_summary_quantiles, InfluxDbConfig, InfluxDbSvc},
            test_util::{
                address_v1, address_v2, cleanup_v1, format_timestamp, onboarding_v1, onboarding_v2,
                query_v1, BUCKET, ORG, TOKEN,
            },
            InfluxDb1Settings, InfluxDb2Settings,
        },
        test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
        tls::{self, TlsConfig},
    };
    #[tokio::test]
    async fn inserts_metrics_v1_over_https() {
        insert_metrics_v1(
            address_v1(true).as_str(),
            Some(TlsConfig {
                ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
                ..Default::default()
            }),
        )
        .await
    }
    #[tokio::test]
    async fn inserts_metrics_v1_over_http() {
        insert_metrics_v1(address_v1(false).as_str(), None).await
    }
    async fn insert_metrics_v1(url: &str, tls: Option<TlsConfig>) {
        crate::test_util::trace_init();
        let database = onboarding_v1(url).await;
        let cx = SinkContext::default();
        let config = InfluxDbConfig {
            endpoint: url.to_string(),
            influxdb1_settings: Some(InfluxDb1Settings {
                consistency: None,
                database: database.clone(),
                retention_policy_name: Some("autogen".to_string()),
                username: None,
                password: None,
            }),
            influxdb2_settings: None,
            batch: Default::default(),
            request: Default::default(),
            tls,
            quantiles: default_summary_quantiles(),
            tags: None,
            default_namespace: None,
            acknowledgements: Default::default(),
        };
        let events: Vec<_> = (0..10).map(create_event).collect();
        let (sink, _) = config.build(cx).await.expect("error when building config");
        run_and_assert_sink_compliance(sink, stream::iter(events.clone()), &HTTP_SINK_TAGS).await;
        let res = query_v1_json(url, &format!("show series on {}", database)).await;
        assert_eq!(
            res["results"][0]["series"][0]["values"]
                .as_array()
                .unwrap()
                .len(),
            events.len()
        );
        for event in events {
            let metric = event.into_metric();
            let name = format!("{}.{}", metric.namespace().unwrap(), metric.name());
            let value = match metric.value() {
                MetricValue::Counter { value } => *value,
                _ => unreachable!(),
            };
            let timestamp = format_timestamp(metric.timestamp().unwrap(), SecondsFormat::Nanos);
            let res =
                query_v1_json(url, &format!("select * from {}..\"{}\"", database, name)).await;
            assert_eq!(
                res,
                serde_json::json! {
                    {"results": [{
                        "statement_id": 0,
                        "series": [{
                            "name": name,
                            "columns": ["time", "metric_type", "production", "region", "value"],
                            "values": [[timestamp, "counter", "true", "us-west-1", value as isize]]
                        }]
                    }]}
                }
            );
        }
        cleanup_v1(url, &database).await;
    }
    async fn query_v1_json(url: &str, query: &str) -> serde_json::Value {
        let string = query_v1(url, query)
            .await
            .text()
            .await
            .expect("Fetching text from InfluxDB query failed");
        serde_json::from_str(&string).expect("Error when parsing InfluxDB response JSON")
    }
    #[tokio::test]
    async fn influxdb2_metrics_put_data() {
        crate::test_util::trace_init();
        let endpoint = address_v2();
        onboarding_v2(&endpoint).await;
        let cx = SinkContext::default();
        let config = InfluxDbConfig {
            endpoint,
            influxdb1_settings: None,
            influxdb2_settings: Some(InfluxDb2Settings {
                org: ORG.to_string(),
                bucket: BUCKET.to_string(),
                token: TOKEN.to_string().into(),
            }),
            quantiles: default_summary_quantiles(),
            batch: Default::default(),
            request: Default::default(),
            tags: None,
            tls: None,
            default_namespace: None,
            acknowledgements: Default::default(),
        };
        let metric = format!(
            "counter-{}",
            Utc::now()
                .timestamp_nanos_opt()
                .expect("Timestamp out of range")
        );
        let mut events = Vec::new();
        for i in 0..10 {
            let event = Event::Metric(
                Metric::new(
                    metric.clone(),
                    MetricKind::Incremental,
                    MetricValue::Counter { value: i as f64 },
                )
                .with_namespace(Some("ns"))
                .with_tags(Some(metric_tags!(
                    "region" => "us-west-1",
                    "production" => "true",
                ))),
            );
            events.push(event);
        }
        let client = HttpClient::new(None, cx.proxy()).unwrap();
        let sink = InfluxDbSvc::new(config, client).unwrap();
        run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
        let mut body = std::collections::HashMap::new();
        body.insert("query", format!("from(bucket:\"my-bucket\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"ns.{}\")", metric));
        body.insert("type", "flux".to_owned());
        let client = reqwest::Client::builder()
            .danger_accept_invalid_certs(true)
            .build()
            .unwrap();
        let res = client
            .post(format!("{}/api/v2/query?org=my-org", address_v2()))
            .json(&body)
            .header("accept", "application/json")
            .header("Authorization", "Token my-token")
            .send()
            .await
            .unwrap();
        let string = res.text().await.unwrap();
        let lines = string.split('\n').collect::<Vec<&str>>();
        let header = lines[0].split(',').collect::<Vec<&str>>();
        let record = lines[1].split(',').collect::<Vec<&str>>();
        assert_eq!(
            record[header
                .iter()
                .position(|&r| r.trim() == "metric_type")
                .unwrap()]
            .trim(),
            "counter"
        );
        assert_eq!(
            record[header
                .iter()
                .position(|&r| r.trim() == "production")
                .unwrap()]
            .trim(),
            "true"
        );
        assert_eq!(
            record[header.iter().position(|&r| r.trim() == "region").unwrap()].trim(),
            "us-west-1"
        );
        assert_eq!(
            record[header
                .iter()
                .position(|&r| r.trim() == "_measurement")
                .unwrap()]
            .trim(),
            format!("ns.{}", metric)
        );
        assert_eq!(
            record[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
            "value"
        );
        assert_eq!(
            record[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
            "45"
        );
    }
    fn create_event(i: i32) -> Event {
        Event::Metric(
            Metric::new(
                format!("counter-{}", i),
                MetricKind::Incremental,
                MetricValue::Counter { value: i as f64 },
            )
            .with_namespace(Some("ns"))
            .with_tags(Some(metric_tags!(
                "region" => "us-west-1",
                "production" => "true",
            )))
            .with_timestamp(Some(Utc::now())),
        )
    }
}