use std::{
    convert::Infallible,
    net::SocketAddr,
    sync::{atomic::AtomicBool, Arc},
};
use async_graphql::{
    http::{playground_source, GraphQLPlaygroundConfig, WebSocketProtocols},
    Data, Request, Schema,
};
use async_graphql_warp::{graphql_protocol, GraphQLResponse, GraphQLWebSocket};
use hyper::{server::conn::AddrIncoming, service::make_service_fn, Server as HyperServer};
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tower::ServiceBuilder;
use tracing::Span;
use vector_lib::tap::topology;
use warp::{filters::BoxedFilter, http::Response, ws::Ws, Filter, Reply};
use super::{handler, schema};
use crate::{
    config::{self, api},
    http::build_http_trace_layer,
    internal_events::{SocketBindError, SocketMode},
};
pub struct Server {
    _shutdown: oneshot::Sender<()>,
    addr: SocketAddr,
}
impl Server {
    pub fn start(
        config: &config::Config,
        watch_rx: topology::WatchRx,
        running: Arc<AtomicBool>,
        handle: &Handle,
    ) -> crate::Result<Self> {
        let routes = make_routes(config.api, watch_rx, running);
        let (_shutdown, rx) = oneshot::channel();
        let _guard = handle.enter();
        let addr = config.api.address.expect("No socket address");
        let incoming = AddrIncoming::bind(&addr).inspect_err(|error| {
            emit!(SocketBindError {
                mode: SocketMode::Tcp,
                error,
            });
        })?;
        let span = Span::current();
        let make_svc = make_service_fn(move |_conn| {
            let svc = ServiceBuilder::new()
                .layer(build_http_trace_layer(span.clone()))
                .service(warp::service(routes.clone()));
            futures_util::future::ok::<_, Infallible>(svc)
        });
        let server = async move {
            HyperServer::builder(incoming)
                .serve(make_svc)
                .with_graceful_shutdown(async {
                    rx.await.ok();
                })
                .await
                .map_err(|err| {
                    error!("An error occurred: {:?}.", err);
                })
        };
        schema::components::update_config(config);
        handle.spawn(server);
        Ok(Self { _shutdown, addr })
    }
    pub const fn addr(&self) -> SocketAddr {
        self.addr
    }
    pub fn update_config(&self, config: &config::Config) {
        schema::components::update_config(config)
    }
}
fn make_routes(
    api: api::Options,
    watch_tx: topology::WatchRx,
    running: Arc<AtomicBool>,
) -> BoxedFilter<(impl Reply,)> {
    let health = warp::path("health")
        .and(with_shared(running))
        .and_then(handler::health);
    let not_found_graphql = warp::any().and_then(|| async { Err(warp::reject::not_found()) });
    let not_found = warp::any().and_then(|| async { Err(warp::reject::not_found()) });
    let graphql_subscription_handler =
        warp::ws()
            .and(graphql_protocol())
            .map(move |ws: Ws, protocol: WebSocketProtocols| {
                let schema = schema::build_schema().finish();
                let watch_tx = watch_tx.clone();
                let reply = ws.on_upgrade(move |socket| {
                    let mut data = Data::default();
                    data.insert(watch_tx);
                    GraphQLWebSocket::new(socket, schema, protocol)
                        .with_data(data)
                        .serve()
                });
                warp::reply::with_header(
                    reply,
                    "Sec-WebSocket-Protocol",
                    protocol.sec_websocket_protocol(),
                )
            });
    let graphql_handler = if api.graphql {
        warp::path("graphql")
            .and(graphql_subscription_handler.or(
                async_graphql_warp::graphql(schema::build_schema().finish()).and_then(
                    |(schema, request): (Schema<_, _, _>, Request)| async move {
                        Ok::<_, Infallible>(GraphQLResponse::from(schema.execute(request).await))
                    },
                ),
            ))
            .boxed()
    } else {
        not_found_graphql.boxed()
    };
    let graphql_playground = if api.playground && api.graphql {
        warp::path("playground")
            .map(move || {
                Response::builder()
                    .header("content-type", "text/html")
                    .body(playground_source(
                        GraphQLPlaygroundConfig::new("/graphql").subscription_endpoint("/graphql"),
                    ))
            })
            .boxed()
    } else {
        not_found.boxed()
    };
    health
        .or(graphql_handler)
        .or(graphql_playground)
        .or(not_found)
        .with(
            warp::cors()
                .allow_any_origin()
                .allow_headers(vec![
                    "User-Agent",
                    "Sec-Fetch-Mode",
                    "Referer",
                    "Origin",
                    "Access-Control-Request-Method",
                    "Access-Control-Allow-Origin",
                    "Access-Control-Request-Headers",
                    "Content-Type",
                    "X-Apollo-Tracing", "Pragma",
                    "Host",
                    "Connection",
                    "Cache-Control",
                ])
                .allow_methods(vec!["POST", "GET"]),
        )
        .boxed()
}
fn with_shared(
    shared: Arc<AtomicBool>,
) -> impl Filter<Extract = (Arc<AtomicBool>,), Error = Infallible> + Clone {
    warp::any().map(move || Arc::<AtomicBool>::clone(&shared))
}