use std::time::Duration;
use chrono::Local;
use futures_util::future::join_all;
use tokio::sync::{mpsc, oneshot};
use vector_lib::api_client::{connect_subscription_client, Client};
use super::{
    dashboard::{init_dashboard, is_tty},
    metrics,
    state::{self, ConnectionStatus, EventType},
};
const RECONNECT_DELAY: u64 = 5000;
pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode {
    if !is_tty() {
        #[allow(clippy::print_stderr)]
        {
            eprintln!("Terminal must be a teletype (TTY) to display a Vector dashboard.");
        }
        return exitcode::IOERR;
    }
    let url = opts.url();
    let client = Client::new(url.clone());
    #[allow(clippy::print_stderr)]
    if client.healthcheck().await.is_err() {
        eprintln!(
            indoc::indoc! {"
            Vector API server isn't reachable ({}).
            Have you enabled the API?
            To enable the API, add the following to your Vector config file:
            [api]
                enabled = true"},
            url
        );
        return exitcode::UNAVAILABLE;
    }
    top(opts, client, "Vector").await
}
pub async fn top(opts: &super::Opts, client: Client, dashboard_title: &str) -> exitcode::ExitCode {
    let (tx, rx) = tokio::sync::mpsc::channel(20);
    let state_rx = state::updater(rx).await;
    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
    let connection = tokio::spawn(subscription(opts.clone(), client, tx, shutdown_tx));
    match init_dashboard(
        dashboard_title,
        opts.url().as_str(),
        opts,
        state_rx,
        shutdown_rx,
    )
    .await
    {
        Ok(_) => {
            connection.abort();
            exitcode::OK
        }
        Err(err) => {
            #[allow(clippy::print_stderr)]
            {
                eprintln!("[top] Encountered shutdown error: {}", err);
            }
            connection.abort();
            exitcode::IOERR
        }
    }
}
async fn subscription(
    opts: super::Opts,
    client: Client,
    tx: mpsc::Sender<EventType>,
    shutdown_tx: oneshot::Sender<()>,
) {
    let ws_url = opts.web_socket_url();
    loop {
        let state = match metrics::init_components(&client).await {
            Ok(state) => state,
            Err(_) => {
                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
                continue;
            }
        };
        _ = tx.send(EventType::InitializeState(state)).await;
        let subscription_client = match connect_subscription_client(ws_url.clone()).await {
            Ok(c) => c,
            Err(_) => {
                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
                continue;
            }
        };
        let finished = metrics::subscribe(subscription_client, tx.clone(), opts.interval as i64);
        _ = tx
            .send(EventType::ConnectionUpdated(ConnectionStatus::Connected(
                Local::now(),
            )))
            .await;
        _ = join_all(finished).await;
        _ = tx
            .send(EventType::ConnectionUpdated(
                ConnectionStatus::Disconnected(RECONNECT_DELAY),
            ))
            .await;
        if opts.no_reconnect {
            _ = shutdown_tx.send(());
            break;
        }
    }
}