use std::{
convert::Infallible,
net::SocketAddr,
sync::{atomic::AtomicBool, Arc},
};
use async_graphql::{
http::{playground_source, GraphQLPlaygroundConfig, WebSocketProtocols},
Data, Request, Schema,
};
use async_graphql_warp::{graphql_protocol, GraphQLResponse, GraphQLWebSocket};
use hyper::{server::conn::AddrIncoming, service::make_service_fn, Server as HyperServer};
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tower::ServiceBuilder;
use tracing::Span;
use vector_lib::tap::topology;
use warp::{filters::BoxedFilter, http::Response, ws::Ws, Filter, Reply};
use super::{handler, schema};
use crate::{
config::{self, api},
http::build_http_trace_layer,
internal_events::{SocketBindError, SocketMode},
};
pub struct Server {
_shutdown: oneshot::Sender<()>,
addr: SocketAddr,
}
impl Server {
pub fn start(
config: &config::Config,
watch_rx: topology::WatchRx,
running: Arc<AtomicBool>,
handle: &Handle,
) -> crate::Result<Self> {
let routes = make_routes(config.api, watch_rx, running);
let (_shutdown, rx) = oneshot::channel();
let _guard = handle.enter();
let addr = config.api.address.expect("No socket address");
let incoming = AddrIncoming::bind(&addr).inspect_err(|error| {
emit!(SocketBindError {
mode: SocketMode::Tcp,
error,
});
})?;
let span = Span::current();
let make_svc = make_service_fn(move |_conn| {
let svc = ServiceBuilder::new()
.layer(build_http_trace_layer(span.clone()))
.service(warp::service(routes.clone()));
futures_util::future::ok::<_, Infallible>(svc)
});
let server = async move {
HyperServer::builder(incoming)
.serve(make_svc)
.with_graceful_shutdown(async {
rx.await.ok();
})
.await
.map_err(|err| {
error!("An error occurred: {:?}.", err);
})
};
schema::components::update_config(config);
handle.spawn(server);
Ok(Self { _shutdown, addr })
}
pub const fn addr(&self) -> SocketAddr {
self.addr
}
pub fn update_config(&self, config: &config::Config) {
schema::components::update_config(config)
}
}
fn make_routes(
api: api::Options,
watch_tx: topology::WatchRx,
running: Arc<AtomicBool>,
) -> BoxedFilter<(impl Reply,)> {
let health = warp::path("health")
.and(with_shared(running))
.and_then(handler::health);
let not_found_graphql = warp::any().and_then(|| async { Err(warp::reject::not_found()) });
let not_found = warp::any().and_then(|| async { Err(warp::reject::not_found()) });
let graphql_subscription_handler =
warp::ws()
.and(graphql_protocol())
.map(move |ws: Ws, protocol: WebSocketProtocols| {
let schema = schema::build_schema().finish();
let watch_tx = watch_tx.clone();
let reply = ws.on_upgrade(move |socket| {
let mut data = Data::default();
data.insert(watch_tx);
GraphQLWebSocket::new(socket, schema, protocol)
.with_data(data)
.serve()
});
warp::reply::with_header(
reply,
"Sec-WebSocket-Protocol",
protocol.sec_websocket_protocol(),
)
});
let graphql_handler = if api.graphql {
warp::path("graphql")
.and(graphql_subscription_handler.or(
async_graphql_warp::graphql(schema::build_schema().finish()).and_then(
|(schema, request): (Schema<_, _, _>, Request)| async move {
Ok::<_, Infallible>(GraphQLResponse::from(schema.execute(request).await))
},
),
))
.boxed()
} else {
not_found_graphql.boxed()
};
let graphql_playground = if api.playground && api.graphql {
warp::path("playground")
.map(move || {
Response::builder()
.header("content-type", "text/html")
.body(playground_source(
GraphQLPlaygroundConfig::new("/graphql").subscription_endpoint("/graphql"),
))
})
.boxed()
} else {
not_found.boxed()
};
health
.or(graphql_handler)
.or(graphql_playground)
.or(not_found)
.with(
warp::cors()
.allow_any_origin()
.allow_headers(vec![
"User-Agent",
"Sec-Fetch-Mode",
"Referer",
"Origin",
"Access-Control-Request-Method",
"Access-Control-Allow-Origin",
"Access-Control-Request-Headers",
"Content-Type",
"X-Apollo-Tracing", "Pragma",
"Host",
"Connection",
"Cache-Control",
])
.allow_methods(vec!["POST", "GET"]),
)
.boxed()
}
fn with_shared(
shared: Arc<AtomicBool>,
) -> impl Filter<Extract = (Arc<AtomicBool>,), Error = Infallible> + Clone {
warp::any().map(move || Arc::<AtomicBool>::clone(&shared))
}