#![allow(missing_docs)]
use std::{collections::HashMap, env, path::PathBuf};
use bollard::{
    container::{Config, CreateContainerOptions},
    errors::Error as DockerError,
    image::{CreateImageOptions, ListImagesOptions},
    models::HostConfig,
    Docker, API_DEFAULT_VERSION,
};
use futures::StreamExt;
use http::uri::Uri;
use snafu::Snafu;
use vector_lib::configurable::configurable_component;
const DEFAULT_TIMEOUT: u64 = 120;
#[derive(Debug, Snafu)]
pub enum Error {
    #[snafu(display("URL has no host."))]
    NoHost,
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct DockerTlsConfig {
    ca_file: PathBuf,
    crt_file: PathBuf,
    key_file: PathBuf,
}
pub fn docker(host: Option<String>, tls: Option<DockerTlsConfig>) -> crate::Result<Docker> {
    let host = host.or_else(|| env::var("DOCKER_HOST").ok());
    match host {
        None => Docker::connect_with_local_defaults().map_err(Into::into),
        Some(host) => {
            let scheme = host
                .parse::<Uri>()
                .ok()
                .and_then(|uri| uri.into_parts().scheme);
            match scheme.as_ref().map(|scheme| scheme.as_str()) {
                Some("http") | Some("tcp") => {
                    let host = get_authority(&host)?;
                    Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
                        .map_err(Into::into)
                }
                Some("https") => {
                    let host = get_authority(&host)?;
                    let tls = tls
                        .or_else(default_certs)
                        .ok_or(DockerError::NoHomePathError)?;
                    Docker::connect_with_ssl(
                        &host,
                        &tls.key_file,
                        &tls.crt_file,
                        &tls.ca_file,
                        DEFAULT_TIMEOUT,
                        API_DEFAULT_VERSION,
                    )
                    .map_err(Into::into)
                }
                Some("unix") | Some("npipe") | None => {
                    if cfg!(windows) {
                        warn!("Named pipes are currently not available on Windows, trying to connecting to Docker with default HTTP settings instead.");
                        Docker::connect_with_http_defaults().map_err(Into::into)
                    } else {
                        Docker::connect_with_local(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
                            .map_err(Into::into)
                    }
                }
                Some(scheme) => Err(format!("Unknown scheme: {}", scheme).into()),
            }
        }
    }
}
fn default_certs() -> Option<DockerTlsConfig> {
    let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
    let base = match from_env {
        Ok(path) => PathBuf::from(path),
        Err(_) => dirs_next::home_dir()?.join(".docker"),
    };
    Some(DockerTlsConfig {
        ca_file: base.join("ca.pem"),
        key_file: base.join("key.pem"),
        crt_file: base.join("cert.pem"),
    })
}
fn get_authority(url: &str) -> Result<String, Error> {
    url.parse::<Uri>()
        .ok()
        .and_then(|uri| uri.authority().map(<_>::to_string))
        .ok_or(Error::NoHost)
}
async fn pull_image(docker: &Docker, image: &str, tag: &str) {
    let mut filters = HashMap::new();
    filters.insert(
        String::from("reference"),
        vec![format!("{}:{}", image, tag)],
    );
    let options = Some(ListImagesOptions {
        filters,
        ..Default::default()
    });
    let images = docker.list_images(options).await.unwrap();
    if images.is_empty() {
        let options = Some(CreateImageOptions {
            from_image: image,
            tag,
            ..Default::default()
        });
        docker
            .create_image(options, None, None)
            .for_each(|item| async move {
                let info = item.unwrap();
                if let Some(error) = info.error {
                    panic!("{:?}", error);
                }
            })
            .await
    }
}
async fn remove_container(docker: &Docker, id: &str) {
    trace!("Stopping container.");
    _ = docker
        .stop_container(id, None)
        .await
        .map_err(|e| error!(%e));
    trace!("Removing container.");
    _ = docker
        .remove_container(id, None)
        .await
        .map_err(|e| error!(%e));
}
pub struct Container {
    image: &'static str,
    tag: &'static str,
    binds: Option<Vec<String>>,
    cmd: Option<Vec<String>>,
}
impl Container {
    pub const fn new(image: &'static str, tag: &'static str) -> Self {
        Self {
            image,
            tag,
            binds: None,
            cmd: None,
        }
    }
    pub fn bind(mut self, src: impl std::fmt::Display, dst: &str) -> Self {
        let bind = format!("{}:{}", src, dst);
        self.binds.get_or_insert_with(Vec::new).push(bind);
        self
    }
    pub fn cmd(mut self, option: &str) -> Self {
        self.cmd.get_or_insert_with(Vec::new).push(option.into());
        self
    }
    pub async fn run<T>(self, doit: impl futures::Future<Output = T>) -> T {
        let docker = docker(None, None).unwrap();
        pull_image(&docker, self.image, self.tag).await;
        let options = Some(CreateContainerOptions {
            name: format!("vector_test_{}", uuid::Uuid::new_v4()),
            platform: None,
        });
        let config = Config {
            image: Some(format!("{}:{}", &self.image, &self.tag)),
            cmd: self.cmd,
            host_config: Some(HostConfig {
                network_mode: Some(String::from("host")),
                extra_hosts: Some(vec!["host.docker.internal:host-gateway".into()]),
                binds: self.binds,
                ..Default::default()
            }),
            ..Default::default()
        };
        let container = docker.create_container(options, config).await.unwrap();
        docker
            .start_container::<String>(&container.id, None)
            .await
            .unwrap();
        let result = doit.await;
        remove_container(&docker, &container.id).await;
        result
    }
}