use std::time::{Duration, Instant};
use chrono::Utc;
use futures::{
    future::{join_all, try_join_all},
    StreamExt,
};
use mongodb::{
    bson::{self, doc, from_document, Bson, Document},
    error::Error as MongoError,
    options::ClientOptions,
    Client,
};
use serde_with::serde_as;
use snafu::{ResultExt, Snafu};
use tokio::time;
use tokio_stream::wrappers::IntervalStream;
use vector_lib::configurable::configurable_component;
use vector_lib::{metric_tags, ByteSizeOf, EstimatedJsonEncodedSizeOf};
use crate::{
    config::{SourceConfig, SourceContext, SourceOutput},
    event::metric::{Metric, MetricKind, MetricTags, MetricValue},
    internal_events::{
        CollectionCompleted, EndpointBytesReceived, MongoDbMetricsBsonParseError,
        MongoDbMetricsEventsReceived, MongoDbMetricsRequestError, StreamClosedError,
    },
};
mod types;
use types::{CommandBuildInfo, CommandIsMaster, CommandServerStatus, NodeType};
use vector_lib::config::LogNamespace;
macro_rules! tags {
    ($tags:expr) => { $tags.clone() };
    ($tags:expr, $($key:expr => $value:expr),*) => {
        {
            let mut tags = $tags.clone();
            $(
                tags.replace($key.into(), $value.to_string());
            )*
            tags
        }
    };
}
macro_rules! counter {
    ($value:expr) => {
        MetricValue::Counter {
            value: $value as f64,
        }
    };
}
macro_rules! gauge {
    ($value:expr) => {
        MetricValue::Gauge {
            value: $value as f64,
        }
    };
}
#[derive(Debug, Snafu)]
enum BuildError {
    #[snafu(display("invalid endpoint: {}", source))]
    InvalidEndpoint { source: MongoError },
    #[snafu(display("invalid client options: {}", source))]
    InvalidClientOptions { source: MongoError },
}
#[derive(Debug)]
enum CollectError {
    Mongo(MongoError),
    Bson(bson::de::Error),
}
#[serde_as]
#[configurable_component(source("mongodb_metrics", "Collect metrics from the MongoDB database."))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct MongoDbMetricsConfig {
    #[configurable(metadata(docs::examples = "mongodb://localhost:27017"))]
    endpoints: Vec<String>,
    #[serde(default = "default_scrape_interval_secs")]
    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
    scrape_interval_secs: Duration,
    #[serde(default = "default_namespace")]
    namespace: String,
}
#[derive(Debug)]
struct MongoDbMetrics {
    client: Client,
    endpoint: String,
    namespace: Option<String>,
    tags: MetricTags,
}
pub const fn default_scrape_interval_secs() -> Duration {
    Duration::from_secs(15)
}
pub fn default_namespace() -> String {
    "mongodb".to_string()
}
impl_generate_config_from_default!(MongoDbMetricsConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "mongodb_metrics")]
impl SourceConfig for MongoDbMetricsConfig {
    async fn build(&self, mut cx: SourceContext) -> crate::Result<super::Source> {
        let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
        let sources = try_join_all(
            self.endpoints
                .iter()
                .map(|endpoint| MongoDbMetrics::new(endpoint, namespace.clone())),
        )
        .await?;
        let duration = self.scrape_interval_secs;
        let shutdown = cx.shutdown;
        Ok(Box::pin(async move {
            let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
            while interval.next().await.is_some() {
                let start = Instant::now();
                let metrics = join_all(sources.iter().map(|mongodb| mongodb.collect())).await;
                emit!(CollectionCompleted {
                    start,
                    end: Instant::now()
                });
                let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
                let count = metrics.len();
                if (cx.out.send_batch(metrics).await).is_err() {
                    emit!(StreamClosedError { count });
                    return Err(());
                }
            }
            Ok(())
        }))
    }
    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        vec![SourceOutput::new_metrics()]
    }
    fn can_acknowledge(&self) -> bool {
        false
    }
}
impl MongoDbMetrics {
    async fn new(endpoint: &str, namespace: Option<String>) -> Result<MongoDbMetrics, BuildError> {
        let mut client_options = ClientOptions::parse(endpoint)
            .await
            .context(InvalidEndpointSnafu)?;
        client_options.direct_connection = Some(true);
        let endpoint = sanitize_endpoint(endpoint, &client_options);
        let tags = metric_tags!(
            "endpoint" => endpoint.clone(),
            "host" => client_options.hosts[0].to_string(),
        );
        Ok(Self {
            client: Client::with_options(client_options).context(InvalidClientOptionsSnafu)?,
            endpoint,
            namespace,
            tags,
        })
    }
    async fn get_node_type(&self) -> Result<NodeType, CollectError> {
        let doc = self
            .client
            .database("admin")
            .run_command(doc! { "isMaster": 1 }, None)
            .await
            .map_err(CollectError::Mongo)?;
        let msg: CommandIsMaster = from_document(doc).map_err(CollectError::Bson)?;
        Ok(if msg.set_name.is_some() || msg.hosts.is_some() {
            NodeType::Replset
        } else if msg.msg.map(|msg| msg == "isdbgrid").unwrap_or(false) {
            NodeType::Mongos
        } else {
            NodeType::Mongod
        })
    }
    async fn get_build_info(&self) -> Result<CommandBuildInfo, CollectError> {
        let doc = self
            .client
            .database("admin")
            .run_command(doc! { "buildInfo": 1 }, None)
            .await
            .map_err(CollectError::Mongo)?;
        from_document(doc).map_err(CollectError::Bson)
    }
    async fn print_version(&self) -> Result<(), CollectError> {
        if tracing::level_enabled!(tracing::Level::DEBUG) {
            let node_type = self.get_node_type().await?;
            let build_info = self.get_build_info().await?;
            debug!(
                message = "Connected to server.", endpoint = %self.endpoint, node_type = ?node_type, server_version = ?serde_json::to_string(&build_info).unwrap()
            );
        }
        Ok(())
    }
    fn create_metric(&self, name: &str, value: MetricValue, tags: MetricTags) -> Metric {
        Metric::new(name, MetricKind::Absolute, value)
            .with_namespace(self.namespace.clone())
            .with_tags(Some(tags))
            .with_timestamp(Some(Utc::now()))
    }
    async fn collect(&self) -> Vec<Metric> {
        let (up_value, mut metrics) = match self.collect_server_status().await {
            Ok(metrics) => (1.0, metrics),
            Err(error) => {
                match error {
                    CollectError::Mongo(error) => emit!(MongoDbMetricsRequestError {
                        error,
                        endpoint: &self.endpoint,
                    }),
                    CollectError::Bson(error) => emit!(MongoDbMetricsBsonParseError {
                        error,
                        endpoint: &self.endpoint,
                    }),
                }
                (0.0, vec![])
            }
        };
        metrics.push(self.create_metric("up", gauge!(up_value), tags!(self.tags)));
        emit!(MongoDbMetricsEventsReceived {
            byte_size: metrics.estimated_json_encoded_size_of(),
            count: metrics.len(),
            endpoint: &self.endpoint,
        });
        metrics
    }
    async fn collect_server_status(&self) -> Result<Vec<Metric>, CollectError> {
        self.print_version().await?;
        let mut metrics = vec![];
        let command = doc! { "serverStatus": 1, "opLatencies": { "histograms": true }};
        let db = self.client.database("admin");
        let doc = db
            .run_command(command, None)
            .await
            .map_err(CollectError::Mongo)?;
        let byte_size = document_size(&doc);
        emit!(EndpointBytesReceived {
            byte_size,
            protocol: "tcp",
            endpoint: &self.endpoint,
        });
        let status: CommandServerStatus = from_document(doc).map_err(CollectError::Bson)?;
        metrics.push(self.create_metric(
            "asserts_total",
            counter!(status.asserts.regular),
            tags!(self.tags, "type" => "regular"),
        ));
        metrics.push(self.create_metric(
            "asserts_total",
            counter!(status.asserts.warning),
            tags!(self.tags, "type" => "warning"),
        ));
        metrics.push(self.create_metric(
            "asserts_total",
            counter!(status.asserts.msg),
            tags!(self.tags, "type" => "msg"),
        ));
        metrics.push(self.create_metric(
            "asserts_total",
            counter!(status.asserts.user),
            tags!(self.tags, "type" => "user"),
        ));
        metrics.push(self.create_metric(
            "asserts_total",
            counter!(status.asserts.rollovers),
            tags!(self.tags, "type" => "rollovers"),
        ));
        metrics.push(self.create_metric(
            "connections",
            counter!(status.connections.active),
            tags!(self.tags, "state" => "active"),
        ));
        metrics.push(self.create_metric(
            "connections",
            counter!(status.connections.available),
            tags!(self.tags, "state" => "available"),
        ));
        metrics.push(self.create_metric(
            "connections",
            counter!(status.connections.current),
            tags!(self.tags, "state" => "current"),
        ));
        if let Some(value) = status.extra_info.heap_usage_bytes {
            metrics.push(self.create_metric(
                "extra_info_heap_usage_bytes",
                gauge!(value),
                tags!(self.tags),
            ));
        }
        metrics.push(self.create_metric(
            "extra_info_page_faults",
            gauge!(status.extra_info.page_faults),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "instance_local_time",
            gauge!(status.instance.local_time.timestamp_millis() / 1000),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "instance_uptime_estimate_seconds_total",
            gauge!(status.instance.uptime_estimate),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "instance_uptime_seconds_total",
            gauge!(status.instance.uptime),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "memory",
            gauge!(status.memory.resident),
            tags!(self.tags, "type" => "resident"),
        ));
        metrics.push(self.create_metric(
            "memory",
            gauge!(status.memory.r#virtual),
            tags!(self.tags, "type" => "virtual"),
        ));
        if let Some(value) = status.memory.mapped {
            metrics.push(self.create_metric(
                "memory",
                gauge!(value),
                tags!(self.tags, "type" => "mapped"),
            ))
        }
        if let Some(value) = status.memory.mapped_with_journal {
            metrics.push(self.create_metric(
                "memory",
                gauge!(value),
                tags!(self.tags, "type" => "mapped_with_journal"),
            ))
        }
        metrics.push(self.create_metric(
            "mongod_global_lock_total_time_seconds",
            counter!(status.global_lock.total_time),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_global_lock_active_clients",
            gauge!(status.global_lock.active_clients.total),
            tags!(self.tags, "type" => "total"),
        ));
        metrics.push(self.create_metric(
            "mongod_global_lock_active_clients",
            gauge!(status.global_lock.active_clients.readers),
            tags!(self.tags, "type" => "readers"),
        ));
        metrics.push(self.create_metric(
            "mongod_global_lock_active_clients",
            gauge!(status.global_lock.active_clients.writers),
            tags!(self.tags, "type" => "writers"),
        ));
        metrics.push(self.create_metric(
            "mongod_global_lock_current_queue",
            gauge!(status.global_lock.current_queue.total),
            tags!(self.tags, "type" => "total"),
        ));
        metrics.push(self.create_metric(
            "mongod_global_lock_current_queue",
            gauge!(status.global_lock.current_queue.readers),
            tags!(self.tags, "type" => "readers"),
        ));
        metrics.push(self.create_metric(
            "mongod_global_lock_current_queue",
            gauge!(status.global_lock.current_queue.writers),
            tags!(self.tags, "type" => "writers"),
        ));
        for (r#type, lock) in status.locks {
            if let Some(modes) = lock.time_acquiring_micros {
                if let Some(value) = modes.read {
                    metrics.push(self.create_metric(
                        "mongod_locks_time_acquiring_global_seconds_total",
                        counter!(value),
                        tags!(self.tags, "type" => &r#type, "mode" => "read"),
                    ));
                }
                if let Some(value) = modes.write {
                    metrics.push(self.create_metric(
                        "mongod_locks_time_acquiring_global_seconds_total",
                        counter!(value),
                        tags!(self.tags, "type" => &r#type, "mode" => "write"),
                    ));
                }
            }
        }
        metrics.push(self.create_metric(
            "mongod_metrics_cursor_timed_out_total",
            counter!(status.metrics.cursor.timed_out),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_cursor_open",
            gauge!(status.metrics.cursor.open.no_timeout),
            tags!(self.tags, "state" => "no_timeout"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_cursor_open",
            gauge!(status.metrics.cursor.open.pinned),
            tags!(self.tags, "state" => "pinned"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_cursor_open",
            gauge!(status.metrics.cursor.open.total),
            tags!(self.tags, "state" => "total"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_document_total",
            counter!(status.metrics.document.deleted),
            tags!(self.tags, "state" => "deleted"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_document_total",
            counter!(status.metrics.document.inserted),
            tags!(self.tags, "state" => "inserted"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_document_total",
            counter!(status.metrics.document.returned),
            tags!(self.tags, "state" => "returned"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_document_total",
            counter!(status.metrics.document.updated),
            tags!(self.tags, "state" => "updated"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_get_last_error_wtime_num",
            gauge!(status.metrics.get_last_error.wtime.num),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_get_last_error_wtime_seconds_total",
            counter!(status.metrics.get_last_error.wtime.total_millis / 1000),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_get_last_error_wtimeouts_total",
            counter!(status.metrics.get_last_error.wtimeouts),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_operation_total",
            counter!(status.metrics.operation.scan_and_order),
            tags!(self.tags, "type" => "scan_and_order"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_operation_total",
            counter!(status.metrics.operation.write_conflicts),
            tags!(self.tags, "type" => "write_conflicts"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_query_executor_total",
            counter!(status.metrics.query_executor.scanned),
            tags!(self.tags, "state" => "scanned"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_query_executor_total",
            counter!(status.metrics.query_executor.scanned_objects),
            tags!(self.tags, "state" => "scanned_objects"),
        ));
        if let Some(doc) = status.metrics.query_executor.collection_scans {
            metrics.push(self.create_metric(
                "mongod_metrics_query_executor_total",
                counter!(doc.total),
                tags!(self.tags, "state" => "collection_scans"),
            ));
        }
        if let Some(record) = status.metrics.record {
            metrics.push(self.create_metric(
                "mongod_metrics_record_moves_total",
                counter!(record.moves),
                tags!(self.tags),
            ));
        }
        metrics.push(self.create_metric(
            "mongod_metrics_repl_apply_batches_num_total",
            counter!(status.metrics.repl.apply.batches.num),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_apply_batches_seconds_total",
            counter!(status.metrics.repl.apply.batches.total_millis / 1000),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_apply_ops_total",
            counter!(status.metrics.repl.apply.ops),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_buffer_count",
            counter!(status.metrics.repl.buffer.count),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_buffer_max_size_bytes_total",
            counter!(status.metrics.repl.buffer.max_size_bytes),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_buffer_size_bytes",
            counter!(status.metrics.repl.buffer.size_bytes),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_executor_queue",
            gauge!(status.metrics.repl.executor.queues.network_in_progress),
            tags!(self.tags, "type" => "network_in_progress"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_executor_queue",
            gauge!(status.metrics.repl.executor.queues.sleepers),
            tags!(self.tags, "type" => "sleepers"),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_executor_unsignaled_events",
            gauge!(status.metrics.repl.executor.unsignaled_events),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_network_bytes_total",
            counter!(status.metrics.repl.network.bytes),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_network_getmores_num_total",
            counter!(status.metrics.repl.network.getmores.num),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_network_getmores_seconds_total",
            counter!(status.metrics.repl.network.getmores.total_millis / 1000),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_network_ops_total",
            counter!(status.metrics.repl.network.ops),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_repl_network_readers_created_total",
            counter!(status.metrics.repl.network.readers_created),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_ttl_deleted_documents_total",
            counter!(status.metrics.ttl.deleted_documents),
            tags!(self.tags),
        ));
        metrics.push(self.create_metric(
            "mongod_metrics_ttl_passes_total",
            counter!(status.metrics.ttl.passes),
            tags!(self.tags),
        ));
        for (r#type, stat) in status.op_latencies {
            for bucket in stat.histogram {
                metrics.push(self.create_metric(
                    "mongod_op_latencies_histogram",
                    gauge!(bucket.count),
                    tags!(self.tags, "type" => &r#type, "micros" => bucket.micros.to_string()),
                ));
            }
            metrics.push(self.create_metric(
                "mongod_op_latencies_latency",
                gauge!(stat.latency),
                tags!(self.tags, "type" => &r#type),
            ));
            metrics.push(self.create_metric(
                "mongod_op_latencies_ops_total",
                gauge!(stat.ops),
                tags!(self.tags, "type" => &r#type),
            ));
        }
        metrics.push(self.create_metric(
            "mongod_storage_engine",
            gauge!(1),
            tags!(self.tags, "engine" => status.storage_engine.name),
        ));
        if let Some(stat) = status.wired_tiger {
            metrics.push(self.create_metric(
                "mongod_wiredtiger_blockmanager_blocks_total",
                counter!(stat.block_manager.blocks_read),
                tags!(self.tags, "type" => "blocks_read"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_blockmanager_blocks_total",
                counter!(stat.block_manager.mapped_blocks_read),
                tags!(self.tags, "type" => "blocks_read_mapped"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_blockmanager_blocks_total",
                counter!(stat.block_manager.blocks_pre_loaded),
                tags!(self.tags, "type" => "blocks_pre_loaded"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_blockmanager_blocks_total",
                counter!(stat.block_manager.blocks_written),
                tags!(self.tags, "type" => "blocks_written"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_blockmanager_bytes_total",
                counter!(stat.block_manager.bytes_read),
                tags!(self.tags, "type" => "bytes_read"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_blockmanager_bytes_total",
                counter!(stat.block_manager.mapped_bytes_read),
                tags!(self.tags, "type" => "bytes_read_mapped"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_blockmanager_bytes_total",
                counter!(stat.block_manager.bytes_written),
                tags!(self.tags, "type" => "bytes_written"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_bytes",
                gauge!(stat.cache.bytes_total),
                tags!(self.tags, "type" => "total"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_bytes",
                gauge!(stat.cache.bytes_dirty),
                tags!(self.tags, "type" => "dirty"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_bytes",
                gauge!(stat.cache.bytes_internal_pages),
                tags!(self.tags, "type" => "internal_pages"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_bytes",
                gauge!(stat.cache.bytes_leaf_pages),
                tags!(self.tags, "type" => "leaf_pages"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_bytes_total",
                counter!(stat.cache.pages_read_into),
                tags!(self.tags, "type" => "read"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_bytes_total",
                counter!(stat.cache.pages_written_from),
                tags!(self.tags, "type" => "written"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_evicted_total",
                counter!(stat.cache.evicted_modified),
                tags!(self.tags, "type" => "modified"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_evicted_total",
                counter!(stat.cache.evicted_unmodified),
                tags!(self.tags, "type" => "unmodified"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_max_bytes",
                gauge!(stat.cache.max_bytes),
                tags!(self.tags),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_overhead_percent",
                gauge!(stat.cache.percent_overhead),
                tags!(self.tags),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_pages",
                gauge!(stat.cache.pages_total),
                tags!(self.tags, "type" => "total"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_pages",
                gauge!(stat.cache.pages_dirty),
                tags!(self.tags, "type" => "dirty"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_pages_total",
                counter!(stat.cache.pages_read_into),
                tags!(self.tags, "type" => "read"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_cache_pages_total",
                counter!(stat.cache.pages_written_from),
                tags!(self.tags, "type" => "write"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_concurrent_transactions_available_tickets",
                gauge!(stat.concurrent_transactions.read.available),
                tags!(self.tags, "type" => "read"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_concurrent_transactions_available_tickets",
                gauge!(stat.concurrent_transactions.write.available),
                tags!(self.tags, "type" => "write"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_concurrent_transactions_out_tickets",
                gauge!(stat.concurrent_transactions.read.out),
                tags!(self.tags, "type" => "read"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_concurrent_transactions_out_tickets",
                gauge!(stat.concurrent_transactions.write.out),
                tags!(self.tags, "type" => "write"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_concurrent_transactions_total_tickets",
                gauge!(stat.concurrent_transactions.read.total_tickets),
                tags!(self.tags, "type" => "read"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_concurrent_transactions_total_tickets",
                gauge!(stat.concurrent_transactions.write.total_tickets),
                tags!(self.tags, "type" => "write"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_log_bytes_total",
                counter!(stat.log.bytes_payload_data),
                tags!(self.tags, "type" => "payload"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_log_bytes_total",
                counter!(stat.log.bytes_written),
                tags!(self.tags, "type" => "written"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_log_operations_total",
                counter!(stat.log.log_writes),
                tags!(self.tags, "type" => "write"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_log_operations_total",
                counter!(stat.log.log_scans),
                tags!(self.tags, "type" => "scan"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_log_operations_total",
                counter!(stat.log.log_scans_double),
                tags!(self.tags, "type" => "scan_double"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_log_operations_total",
                counter!(stat.log.log_syncs),
                tags!(self.tags, "type" => "sync"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_log_operations_total",
                counter!(stat.log.log_sync_dirs),
                tags!(self.tags, "type" => "sync_dir"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_log_operations_total",
                counter!(stat.log.log_flushes),
                tags!(self.tags, "type" => "flush"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_log_records_scanned_total",
                counter!(stat.log.records_compressed),
                tags!(self.tags, "type" => "compressed"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_log_records_scanned_total",
                counter!(stat.log.records_uncompressed),
                tags!(self.tags, "type" => "uncompressed"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_log_records_total",
                counter!(stat.log.records_processed_log_scan),
                tags!(self.tags),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_session_open_sessions",
                gauge!(stat.session.sessions),
                tags!(self.tags),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_transactions_checkpoint_seconds",
                gauge!(stat.transaction.checkpoint_min_ms / 1000),
                tags!(self.tags, "type" => "min"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_transactions_checkpoint_seconds",
                gauge!(stat.transaction.checkpoint_max_ms / 1000),
                tags!(self.tags, "type" => "max"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_transactions_checkpoint_seconds_total",
                counter!(stat.transaction.checkpoint_total_ms / 1000),
                tags!(self.tags),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_transactions_running_checkpoints",
                gauge!(stat.transaction.checkpoints_running),
                tags!(self.tags),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_transactions_total",
                counter!(stat.transaction.begins),
                tags!(self.tags, "type" => "begins"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_transactions_total",
                counter!(stat.transaction.checkpoints),
                tags!(self.tags, "type" => "checkpoints"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_transactions_total",
                counter!(stat.transaction.committed),
                tags!(self.tags, "type" => "committed"),
            ));
            metrics.push(self.create_metric(
                "mongod_wiredtiger_transactions_total",
                counter!(stat.transaction.rolled_back),
                tags!(self.tags, "type" => "rolledback"),
            ));
        }
        metrics.push(self.create_metric(
            "network_bytes_total",
            counter!(status.network.bytes_in),
            tags!(self.tags, "state" => "bytes_in"),
        ));
        metrics.push(self.create_metric(
            "network_bytes_total",
            counter!(status.network.bytes_out),
            tags!(self.tags, "state" => "bytes_out"),
        ));
        metrics.push(self.create_metric(
            "network_metrics_num_requests_total",
            counter!(status.network.num_requests),
            tags!(self.tags),
        ));
        for (r#type, value) in status.opcounters {
            metrics.push(self.create_metric(
                "op_counters_repl_total",
                counter!(value),
                tags!(self.tags, "type" => r#type),
            ));
        }
        for (r#type, value) in status.opcounters_repl {
            metrics.push(self.create_metric(
                "op_counters_total",
                counter!(value),
                tags!(self.tags, "type" => r#type),
            ));
        }
        Ok(metrics)
    }
}
fn bson_size(value: &Bson) -> usize {
    match value {
        Bson::Double(value) => value.size_of(),
        Bson::String(value) => value.size_of(),
        Bson::Array(value) => value.iter().map(bson_size).sum(),
        Bson::Document(value) => document_size(value),
        Bson::Boolean(_) => std::mem::size_of::<bool>(),
        Bson::RegularExpression(value) => value.pattern.size_of(),
        Bson::JavaScriptCode(value) => value.size_of(),
        Bson::JavaScriptCodeWithScope(value) => value.code.size_of() + document_size(&value.scope),
        Bson::Int32(value) => value.size_of(),
        Bson::Int64(value) => value.size_of(),
        Bson::Timestamp(value) => value.time.size_of() + value.increment.size_of(),
        Bson::Binary(value) => value.bytes.size_of(),
        Bson::ObjectId(value) => value.bytes().size_of(),
        Bson::DateTime(_) => std::mem::size_of::<i64>(),
        Bson::Symbol(value) => value.size_of(),
        Bson::Decimal128(value) => value.bytes().size_of(),
        Bson::DbPointer(_) => {
            0
        }
        Bson::Null | Bson::Undefined | Bson::MaxKey | Bson::MinKey => 0,
    }
}
fn document_size(doc: &Document) -> usize {
    doc.into_iter()
        .map(|(key, value)| key.size_of() + bson_size(value))
        .sum()
}
fn sanitize_endpoint(endpoint: &str, options: &ClientOptions) -> String {
    let mut endpoint = endpoint.to_owned();
    if options.credential.is_some() {
        let start = endpoint.find("://").unwrap() + 3;
        let pre_slash = match endpoint[start..].find('/') {
            Some(index) => {
                let mut segments = endpoint[start..].split_at(index);
                if segments.1.len() > 1 {
                    let lstart = start + segments.0.len() + 1;
                    let post_slash = &segments.1[1..];
                    if let Some(index) = post_slash.find('?') {
                        let segments = post_slash.split_at(index);
                        if segments.1.len() > 1 {
                            let options = segments.1[1..]
                                .split('&')
                                .filter(|pair| {
                                    let (key, _) = pair.split_at(pair.find('=').unwrap());
                                    !matches!(
                                        key.to_lowercase().as_str(),
                                        "authsource" | "authmechanism" | "authmechanismproperties"
                                    )
                                })
                                .collect::<Vec<_>>()
                                .join("&");
                            endpoint = format!(
                                "{}{}",
                                &endpoint[..lstart + segments.0.len() + 1],
                                &options
                            );
                        }
                    }
                    segments = endpoint[start..].split_at(index);
                }
                segments.0
            }
            None => &endpoint[start..],
        };
        let end = pre_slash.rfind('@').unwrap() + 1;
        endpoint = format!("{}{}", &endpoint[0..start], &endpoint[start + end..]);
    }
    endpoint
}
#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<MongoDbMetricsConfig>();
    }
    #[tokio::test]
    async fn sanitize_endpoint_test() {
        let endpoint = "mongodb://myDBReader:D1fficultP%40ssw0rd@mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?authSource=admin&tls=true";
        let client_options = ClientOptions::parse(endpoint).await.unwrap();
        let endpoint = sanitize_endpoint(endpoint, &client_options);
        assert_eq!(&endpoint, "mongodb://mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?tls=true");
    }
}
#[cfg(all(test, feature = "mongodb_metrics-integration-tests"))]
mod integration_tests {
    use futures::StreamExt;
    use tokio::time::{timeout, Duration};
    use super::*;
    use crate::{
        test_util::{
            components::{assert_source_compliance, PULL_SOURCE_TAGS},
            trace_init,
        },
        SourceSender,
    };
    fn primary_mongo_address() -> String {
        std::env::var("PRIMARY_MONGODB_ADDRESS")
            .unwrap_or_else(|_| "mongodb://localhost:27017".into())
    }
    fn secondary_mongo_address() -> String {
        std::env::var("SECONDARY_MONGODB_ADDRESS")
            .unwrap_or_else(|_| "mongodb://localhost:27019".into())
    }
    fn remove_creds(address: &str) -> String {
        let mut url = url::Url::parse(address).unwrap();
        url.set_password(None).unwrap();
        url.set_username("").unwrap();
        url.to_string()
    }
    async fn test_instance(endpoint: String) {
        assert_source_compliance(&PULL_SOURCE_TAGS, async {
            let host = ClientOptions::parse(endpoint.as_str()).await.unwrap().hosts[0].to_string();
            let namespace = "vector_mongodb";
            let (sender, mut recv) = SourceSender::new_test();
            let endpoints = vec![endpoint.clone()];
            tokio::spawn(async move {
                MongoDbMetricsConfig {
                    endpoints,
                    scrape_interval_secs: Duration::from_secs(15),
                    namespace: namespace.to_owned(),
                }
                .build(SourceContext::new_test(sender, None))
                .await
                .unwrap()
                .await
                .unwrap()
            });
            let event = timeout(Duration::from_secs(30), recv.next())
                .await
                .expect("fetch metrics timeout")
                .expect("failed to get metrics from a stream");
            let mut events = vec![event];
            loop {
                match timeout(Duration::from_millis(10), recv.next()).await {
                    Ok(Some(event)) => events.push(event),
                    Ok(None) => break,
                    Err(_) => break,
                }
            }
            let clean_endpoint = remove_creds(&endpoint);
            assert!(events.len() > 100);
            for event in events {
                let metric = event.into_metric();
                assert!(metric.namespace() == Some(namespace));
                let timestamp = metric.timestamp().expect("existed timestamp");
                assert!((timestamp - Utc::now()).num_seconds() < 1);
                let tags = metric.tags().expect("existed tags");
                assert_eq!(tags.get("endpoint"), Some(&clean_endpoint[..]));
                assert_eq!(tags.get("host"), Some(&host[..]));
            }
        })
        .await;
    }
    #[tokio::test]
    async fn fetch_metrics_mongod() {
        trace_init();
        test_instance(primary_mongo_address()).await;
    }
    #[tokio::test]
    async fn fetch_metrics_replset() {
        trace_init();
        test_instance(secondary_mongo_address()).await;
    }
}