use std::{
    collections::HashMap,
    io::{BufRead, BufReader},
    net::SocketAddr,
    str::FromStr,
};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use chrono::{DateTime, Utc};
use smallvec::SmallVec;
use tokio_util::codec::Decoder as _;
use vector_lib::codecs::{
    decoding::{DeserializerConfig, FramingConfig},
    StreamDecodingError,
};
use vector_lib::lookup::{lookup_v2::parse_value_path, owned_value_path, path};
use vrl::value::{kind::Collection, Kind};
use warp::http::{HeaderMap, StatusCode};
use vector_lib::configurable::configurable_component;
use vector_lib::{
    config::{LegacyKey, LogNamespace},
    schema::Definition,
};
use crate::{
    codecs::{Decoder, DecodingConfig},
    config::{
        log_schema, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
        SourceContext, SourceOutput,
    },
    event::{Event, LogEvent},
    http::KeepaliveConfig,
    internal_events::{HerokuLogplexRequestReadError, HerokuLogplexRequestReceived},
    serde::{bool_or_struct, default_decoding, default_framing_message_based},
    sources::{
        http_server::{build_param_matcher, remove_duplicates, HttpConfigParamKind},
        util::{
            http::{add_query_parameters, HttpMethod},
            ErrorMessage, HttpSource, HttpSourceAuthConfig,
        },
    },
    tls::TlsEnableableConfig,
};
#[configurable_component(source("heroku_logs", "Collect logs from Heroku's Logplex, the router responsible for receiving logs from your Heroku apps."))]
#[derive(Clone, Debug)]
pub struct LogplexConfig {
    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
    #[configurable(metadata(docs::examples = "localhost:80"))]
    address: SocketAddr,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "application"))]
    #[configurable(metadata(docs::examples = "source"))]
    #[configurable(metadata(docs::examples = "param*"))]
    #[configurable(metadata(docs::examples = "*"))]
    query_parameters: Vec<String>,
    #[configurable(derived)]
    tls: Option<TlsEnableableConfig>,
    #[configurable(derived)]
    auth: Option<HttpSourceAuthConfig>,
    #[configurable(derived)]
    #[serde(default = "default_framing_message_based")]
    framing: FramingConfig,
    #[configurable(derived)]
    #[serde(default = "default_decoding")]
    decoding: DeserializerConfig,
    #[configurable(derived)]
    #[serde(default, deserialize_with = "bool_or_struct")]
    acknowledgements: SourceAcknowledgementsConfig,
    #[configurable(metadata(docs::hidden))]
    #[serde(default)]
    log_namespace: Option<bool>,
    #[configurable(derived)]
    #[serde(default)]
    keepalive: KeepaliveConfig,
}
impl LogplexConfig {
    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
        let mut schema_definition = self
            .decoding
            .schema_definition(log_namespace)
            .with_standard_vector_source_metadata()
            .with_source_metadata(
                LogplexConfig::NAME,
                None,
                &owned_value_path!("timestamp"),
                Kind::timestamp().or_undefined(),
                Some("timestamp"),
            )
            .with_source_metadata(
                LogplexConfig::NAME,
                log_schema()
                    .host_key()
                    .cloned()
                    .map(LegacyKey::InsertIfEmpty),
                &owned_value_path!("host"),
                Kind::bytes(),
                Some("host"),
            )
            .with_source_metadata(
                LogplexConfig::NAME,
                Some(LegacyKey::InsertIfEmpty(owned_value_path!("app_name"))),
                &owned_value_path!("app_name"),
                Kind::bytes(),
                Some("service"),
            )
            .with_source_metadata(
                LogplexConfig::NAME,
                Some(LegacyKey::InsertIfEmpty(owned_value_path!("proc_id"))),
                &owned_value_path!("proc_id"),
                Kind::bytes(),
                None,
            )
            .with_source_metadata(
                LogplexConfig::NAME,
                None,
                &owned_value_path!("query_parameters"),
                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
                None,
            );
        if log_namespace == LogNamespace::Legacy {
            schema_definition = schema_definition.unknown_fields(Kind::bytes());
        }
        schema_definition
    }
}
impl Default for LogplexConfig {
    fn default() -> Self {
        Self {
            address: "0.0.0.0:80".parse().unwrap(),
            query_parameters: Vec::new(),
            tls: None,
            auth: None,
            framing: default_framing_message_based(),
            decoding: default_decoding(),
            acknowledgements: SourceAcknowledgementsConfig::default(),
            log_namespace: None,
            keepalive: KeepaliveConfig::default(),
        }
    }
}
impl GenerateConfig for LogplexConfig {
    fn generate_config() -> toml::Value {
        toml::Value::try_from(LogplexConfig::default()).unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "heroku_logs")]
impl SourceConfig for LogplexConfig {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        let log_namespace = cx.log_namespace(self.log_namespace);
        let decoder =
            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
                .build()?;
        let source = LogplexSource {
            query_parameters: build_param_matcher(&remove_duplicates(
                self.query_parameters.clone(),
                "query_parameters",
            ))?,
            decoder,
            log_namespace,
        };
        source.run(
            self.address,
            "events",
            HttpMethod::Post,
            StatusCode::OK,
            true,
            &self.tls,
            &self.auth,
            cx,
            self.acknowledgements,
            self.keepalive.clone(),
        )
    }
    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        let schema_def = self.schema_definition(global_log_namespace.merge(self.log_namespace));
        vec![SourceOutput::new_maybe_logs(
            self.decoding.output_type(),
            schema_def,
        )]
    }
    fn resources(&self) -> Vec<Resource> {
        vec![Resource::tcp(self.address)]
    }
    fn can_acknowledge(&self) -> bool {
        true
    }
}
#[derive(Clone, Default)]
struct LogplexSource {
    query_parameters: Vec<HttpConfigParamKind>,
    decoder: Decoder,
    log_namespace: LogNamespace,
}
impl LogplexSource {
    fn decode_message(
        &self,
        body: Bytes,
        header_map: &HeaderMap,
    ) -> Result<Vec<Event>, ErrorMessage> {
        let msg_count = match usize::from_str(get_header(header_map, "Logplex-Msg-Count")?) {
            Ok(v) => v,
            Err(e) => return Err(header_error_message("Logplex-Msg-Count", &e.to_string())),
        };
        let frame_id = get_header(header_map, "Logplex-Frame-Id")?;
        let drain_token = get_header(header_map, "Logplex-Drain-Token")?;
        emit!(HerokuLogplexRequestReceived {
            msg_count,
            frame_id,
            drain_token
        });
        let events = self.body_to_events(body);
        if events.len() != msg_count {
            let error_msg = format!(
                "Parsed event count does not match message count header: {} vs {}",
                events.len(),
                msg_count
            );
            if cfg!(test) {
                panic!("{}", error_msg);
            }
            return Err(header_error_message("Logplex-Msg-Count", &error_msg));
        }
        Ok(events)
    }
    fn body_to_events(&self, body: Bytes) -> Vec<Event> {
        let rdr = BufReader::new(body.reader());
        rdr.lines()
            .filter_map(|res| {
                res.map_err(|error| emit!(HerokuLogplexRequestReadError { error }))
                    .ok()
            })
            .filter(|s| !s.is_empty())
            .flat_map(|line| line_to_events(self.decoder.clone(), self.log_namespace, line))
            .collect()
    }
}
impl HttpSource for LogplexSource {
    fn build_events(
        &self,
        body: Bytes,
        header_map: &HeaderMap,
        _query_parameters: &HashMap<String, String>,
        _full_path: &str,
    ) -> Result<Vec<Event>, ErrorMessage> {
        self.decode_message(body, header_map)
    }
    fn enrich_events(
        &self,
        events: &mut [Event],
        _request_path: &str,
        _headers_config: &HeaderMap,
        query_parameters: &HashMap<String, String>,
        _source_ip: Option<&SocketAddr>,
    ) {
        add_query_parameters(
            events,
            &self.query_parameters,
            query_parameters,
            self.log_namespace,
            LogplexConfig::NAME,
        );
    }
}
fn get_header<'a>(header_map: &'a HeaderMap, name: &str) -> Result<&'a str, ErrorMessage> {
    if let Some(header_value) = header_map.get(name) {
        header_value
            .to_str()
            .map_err(|e| header_error_message(name, &e.to_string()))
    } else {
        Err(header_error_message(name, "Header does not exist"))
    }
}
fn header_error_message(name: &str, msg: &str) -> ErrorMessage {
    ErrorMessage::new(
        StatusCode::BAD_REQUEST,
        format!("Invalid request header {:?}: {:?}", name, msg),
    )
}
fn line_to_events(
    mut decoder: Decoder,
    log_namespace: LogNamespace,
    line: String,
) -> SmallVec<[Event; 1]> {
    let parts = line.splitn(8, ' ').collect::<Vec<&str>>();
    let mut events = SmallVec::<[Event; 1]>::new();
    if parts.len() == 8 {
        let timestamp = parts[2];
        let hostname = parts[3];
        let app_name = parts[4];
        let proc_id = parts[5];
        let message = parts[7];
        let mut buffer = BytesMut::new();
        buffer.put(message.as_bytes());
        let legacy_host_key = log_schema().host_key().cloned();
        let legacy_app_key = parse_value_path("app_name").ok();
        let legacy_proc_key = parse_value_path("proc_id").ok();
        loop {
            match decoder.decode_eof(&mut buffer) {
                Ok(Some((decoded, _byte_size))) => {
                    for mut event in decoded {
                        if let Event::Log(ref mut log) = event {
                            if let Ok(ts) = timestamp.parse::<DateTime<Utc>>() {
                                log_namespace.insert_vector_metadata(
                                    log,
                                    log_schema().timestamp_key(),
                                    path!("timestamp"),
                                    ts,
                                );
                            }
                            log_namespace.insert_source_metadata(
                                LogplexConfig::NAME,
                                log,
                                legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
                                path!("host"),
                                hostname.to_owned(),
                            );
                            log_namespace.insert_source_metadata(
                                LogplexConfig::NAME,
                                log,
                                legacy_app_key.as_ref().map(LegacyKey::InsertIfEmpty),
                                path!("app_name"),
                                app_name.to_owned(),
                            );
                            log_namespace.insert_source_metadata(
                                LogplexConfig::NAME,
                                log,
                                legacy_proc_key.as_ref().map(LegacyKey::InsertIfEmpty),
                                path!("proc_id"),
                                proc_id.to_owned(),
                            );
                        }
                        events.push(event);
                    }
                }
                Ok(None) => break,
                Err(error) => {
                    if !error.can_continue() {
                        break;
                    }
                }
            }
        }
    } else {
        warn!(
            message = "Line didn't match expected logplex format, so raw message is forwarded.",
            fields = parts.len(),
            internal_log_rate_limit = true
        );
        events.push(LogEvent::from_str_legacy(line).into())
    };
    let now = Utc::now();
    for event in &mut events {
        if let Event::Log(log) = event {
            log_namespace.insert_standard_vector_source_metadata(log, LogplexConfig::NAME, now);
        }
    }
    events
}
#[cfg(test)]
mod tests {
    use std::net::SocketAddr;
    use chrono::{DateTime, Utc};
    use futures::Stream;
    use similar_asserts::assert_eq;
    use vector_lib::lookup::{owned_value_path, OwnedTargetPath};
    use vector_lib::{
        config::LogNamespace,
        event::{Event, EventStatus, Value},
        schema::Definition,
    };
    use vrl::value::{kind::Collection, Kind};
    use super::{HttpSourceAuthConfig, LogplexConfig};
    use crate::{
        config::{log_schema, SourceConfig, SourceContext},
        serde::{default_decoding, default_framing_message_based},
        test_util::{
            components::{assert_source_compliance, HTTP_PUSH_SOURCE_TAGS},
            next_addr, random_string, spawn_collect_n, wait_for_tcp,
        },
        SourceSender,
    };
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<LogplexConfig>();
    }
    async fn source(
        auth: Option<HttpSourceAuthConfig>,
        query_parameters: Vec<String>,
        status: EventStatus,
        acknowledgements: bool,
    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
        let (sender, recv) = SourceSender::new_test_finalize(status);
        let address = next_addr();
        let context = SourceContext::new_test(sender, None);
        tokio::spawn(async move {
            LogplexConfig {
                address,
                query_parameters,
                tls: None,
                auth,
                framing: default_framing_message_based(),
                decoding: default_decoding(),
                acknowledgements: acknowledgements.into(),
                log_namespace: None,
                keepalive: Default::default(),
            }
            .build(context)
            .await
            .unwrap()
            .await
            .unwrap()
        });
        wait_for_tcp(address).await;
        (recv, address)
    }
    async fn send(
        address: SocketAddr,
        body: &str,
        auth: Option<HttpSourceAuthConfig>,
        query: &str,
    ) -> u16 {
        let len = body.lines().count();
        let mut req = reqwest::Client::new().post(format!("http://{}/events?{}", address, query));
        if let Some(auth) = auth {
            req = req.basic_auth(auth.username, Some(auth.password.inner()));
        }
        req.header("Logplex-Msg-Count", len)
            .header("Logplex-Frame-Id", "frame-foo")
            .header("Logplex-Drain-Token", "drain-bar")
            .body(body.to_owned())
            .send()
            .await
            .unwrap()
            .status()
            .as_u16()
    }
    fn make_auth() -> HttpSourceAuthConfig {
        HttpSourceAuthConfig {
            username: random_string(16),
            password: random_string(16).into(),
        }
    }
    const SAMPLE_BODY: &str = r#"267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#;
    #[tokio::test]
    async fn logplex_handles_router_log() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let auth = make_auth();
            let (rx, addr) = source(
                Some(auth.clone()),
                vec!["appname".to_string(), "absent".to_string()],
                EventStatus::Delivered,
                true,
            )
            .await;
            let mut events = spawn_collect_n(
                async move {
                    assert_eq!(
                        200,
                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
                    )
                },
                rx,
                SAMPLE_BODY.lines().count(),
            )
            .await;
            let event = events.remove(0);
            let log = event.as_log();
            assert_eq!(
                *log.get_message().unwrap(),
                r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
            );
            assert_eq!(
                log[log_schema().timestamp_key().unwrap().to_string()],
                "2020-01-08T22:33:57.353034+00:00"
                    .parse::<DateTime<Utc>>()
                    .unwrap()
                    .into()
            );
            assert_eq!(*log.get_host().unwrap(), "host".into());
            assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
            assert_eq!(log["appname"], "lumberjack-store".into());
            assert_eq!(log["absent"], Value::Null);
        }).await;
    }
    #[tokio::test]
    async fn logplex_query_parameters_wildcard() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let auth = make_auth();
            let (rx, addr) = source(
                Some(auth.clone()),
                vec!["*".to_string()],
                EventStatus::Delivered,
                true,
            )
            .await;
            let mut events = spawn_collect_n(
                async move {
                    assert_eq!(
                        200,
                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
                    )
                },
                rx,
                SAMPLE_BODY.lines().count(),
            )
            .await;
            let event = events.remove(0);
            let log = event.as_log();
            assert_eq!(
                *log.get_message().unwrap(),
                r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
            );
            assert_eq!(
                log[log_schema().timestamp_key().unwrap().to_string()],
                "2020-01-08T22:33:57.353034+00:00"
                    .parse::<DateTime<Utc>>()
                    .unwrap()
                    .into()
            );
            assert_eq!(*log.get_host().unwrap(), "host".into());
            assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
            assert_eq!(log["appname"], "lumberjack-store".into());
        }).await;
    }
    #[tokio::test]
    async fn logplex_handles_failures() {
        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
            let auth = make_auth();
            let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, true).await;
            let events = spawn_collect_n(
                async move {
                    assert_eq!(
                        400,
                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
                    )
                },
                rx,
                SAMPLE_BODY.lines().count(),
            )
            .await;
            assert_eq!(events.len(), SAMPLE_BODY.lines().count());
        })
        .await;
    }
    #[tokio::test]
    async fn logplex_ignores_disabled_acknowledgements() {
        let auth = make_auth();
        let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, false).await;
        let events = spawn_collect_n(
            async move {
                assert_eq!(
                    200,
                    send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
                )
            },
            rx,
            SAMPLE_BODY.lines().count(),
        )
        .await;
        assert_eq!(events.len(), SAMPLE_BODY.lines().count());
    }
    #[tokio::test]
    async fn logplex_auth_failure() {
        let (_rx, addr) = source(Some(make_auth()), vec![], EventStatus::Delivered, true).await;
        assert_eq!(
            401,
            send(
                addr,
                SAMPLE_BODY,
                Some(make_auth()),
                "appname=lumberjack-store"
            )
            .await
        );
    }
    #[test]
    fn logplex_handles_normal_lines() {
        let log_namespace = LogNamespace::Legacy;
        let body = "267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - foo bar baz";
        let events = super::line_to_events(Default::default(), log_namespace, body.into());
        let log = events[0].as_log();
        assert_eq!(*log.get_message().unwrap(), "foo bar baz".into());
        assert_eq!(
            log[log_schema().timestamp_key().unwrap().to_string()],
            "2020-01-08T22:33:57.353034+00:00"
                .parse::<DateTime<Utc>>()
                .unwrap()
                .into()
        );
        assert_eq!(*log.get_host().unwrap(), "host".into());
        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
    }
    #[test]
    fn logplex_handles_malformed_lines() {
        let log_namespace = LogNamespace::Legacy;
        let body = "what am i doing here";
        let events = super::line_to_events(Default::default(), log_namespace, body.into());
        let log = events[0].as_log();
        assert_eq!(*log.get_message().unwrap(), "what am i doing here".into());
        assert!(log.get_timestamp().is_some());
        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
    }
    #[test]
    fn logplex_doesnt_blow_up_on_bad_framing() {
        let log_namespace = LogNamespace::Legacy;
        let body = "1000000 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - i'm not that long";
        let events = super::line_to_events(Default::default(), log_namespace, body.into());
        let log = events[0].as_log();
        assert_eq!(*log.get_message().unwrap(), "i'm not that long".into());
        assert_eq!(
            log[log_schema().timestamp_key().unwrap().to_string()],
            "2020-01-08T22:33:57.353034+00:00"
                .parse::<DateTime<Utc>>()
                .unwrap()
                .into()
        );
        assert_eq!(*log.get_host().unwrap(), "host".into());
        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
    }
    #[test]
    fn output_schema_definition_vector_namespace() {
        let config = LogplexConfig {
            log_namespace: Some(true),
            ..Default::default()
        };
        let definitions = config
            .outputs(LogNamespace::Vector)
            .remove(0)
            .schema_definition(true);
        let expected_definition =
            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
                .with_meaning(OwnedTargetPath::event_root(), "message")
                .with_metadata_field(
                    &owned_value_path!("vector", "source_type"),
                    Kind::bytes(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!("vector", "ingest_timestamp"),
                    Kind::timestamp(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!(LogplexConfig::NAME, "timestamp"),
                    Kind::timestamp().or_undefined(),
                    Some("timestamp"),
                )
                .with_metadata_field(
                    &owned_value_path!(LogplexConfig::NAME, "host"),
                    Kind::bytes(),
                    Some("host"),
                )
                .with_metadata_field(
                    &owned_value_path!(LogplexConfig::NAME, "app_name"),
                    Kind::bytes(),
                    Some("service"),
                )
                .with_metadata_field(
                    &owned_value_path!(LogplexConfig::NAME, "proc_id"),
                    Kind::bytes(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!(LogplexConfig::NAME, "query_parameters"),
                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
                    None,
                );
        assert_eq!(definitions, Some(expected_definition))
    }
    #[test]
    fn output_schema_definition_legacy_namespace() {
        let config = LogplexConfig::default();
        let definitions = config
            .outputs(LogNamespace::Legacy)
            .remove(0)
            .schema_definition(true);
        let expected_definition = Definition::new_with_default_metadata(
            Kind::object(Collection::empty()),
            [LogNamespace::Legacy],
        )
        .with_event_field(
            &owned_value_path!("message"),
            Kind::bytes(),
            Some("message"),
        )
        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
        .with_event_field(
            &owned_value_path!("app_name"),
            Kind::bytes(),
            Some("service"),
        )
        .with_event_field(&owned_value_path!("proc_id"), Kind::bytes(), None)
        .unknown_fields(Kind::bytes());
        assert_eq!(definitions, Some(expected_definition))
    }
}