use bytes::{BufMut, BytesMut};
use syslog::{Facility, Formatter3164, LogFormat, Severity};
use vector_lib::configurable::configurable_component;
use vrl::value::Kind;
use crate::{
    codecs::{Encoder, EncodingConfig, Transformer},
    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
    event::Event,
    internal_events::TemplateRenderingError,
    schema,
    sinks::util::{tcp::TcpSinkConfig, UriSerde},
    tcp::TcpKeepaliveConfig,
    template::Template,
    tls::TlsEnableableConfig,
};
#[configurable_component(sink("papertrail", "Deliver log events to Papertrail from SolarWinds."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct PapertrailConfig {
    #[configurable(metadata(docs::examples = "logs.papertrailapp.com:12345"))]
    endpoint: UriSerde,
    #[configurable(derived)]
    encoding: EncodingConfig,
    #[configurable(derived)]
    keepalive: Option<TcpKeepaliveConfig>,
    #[configurable(derived)]
    tls: Option<TlsEnableableConfig>,
    send_buffer_bytes: Option<usize>,
    #[configurable(metadata(docs::examples = "{{ process }}", docs::examples = "my-process",))]
    #[serde(default = "default_process")]
    process: Template,
    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    acknowledgements: AcknowledgementsConfig,
}
fn default_process() -> Template {
    Template::try_from("vector").unwrap()
}
impl GenerateConfig for PapertrailConfig {
    fn generate_config() -> toml::Value {
        toml::from_str(
            r#"endpoint = "logs.papertrailapp.com:12345"
            encoding.codec = "json""#,
        )
        .unwrap()
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "papertrail")]
impl SinkConfig for PapertrailConfig {
    async fn build(
        &self,
        _cx: SinkContext,
    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
        let host = self
            .endpoint
            .uri
            .host()
            .map(str::to_string)
            .ok_or_else(|| "A host is required for endpoint".to_string())?;
        let port = self
            .endpoint
            .uri
            .port_u16()
            .ok_or_else(|| "A port is required for endpoint".to_string())?;
        let address = format!("{}:{}", host, port);
        let tls = Some(
            self.tls
                .clone()
                .unwrap_or_else(TlsEnableableConfig::enabled),
        );
        let pid = std::process::id();
        let process = self.process.clone();
        let sink_config = TcpSinkConfig::new(address, self.keepalive, tls, self.send_buffer_bytes);
        let transformer = self.encoding.transformer();
        let serializer = self.encoding.build()?;
        let encoder = Encoder::<()>::new(serializer);
        sink_config.build(
            Transformer::default(),
            PapertrailEncoder {
                pid,
                process,
                transformer,
                encoder,
            },
        )
    }
    fn input(&self) -> Input {
        let requirement = schema::Requirement::empty().optional_meaning("host", Kind::bytes());
        Input::new(self.encoding.config().input_type() & DataType::Log)
            .with_schema_requirement(requirement)
    }
    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}
#[derive(Debug, Clone)]
struct PapertrailEncoder {
    pid: u32,
    process: Template,
    transformer: Transformer,
    encoder: Encoder<()>,
}
impl tokio_util::codec::Encoder<Event> for PapertrailEncoder {
    type Error = vector_lib::codecs::encoding::Error;
    fn encode(
        &mut self,
        mut event: Event,
        buffer: &mut bytes::BytesMut,
    ) -> Result<(), Self::Error> {
        let host = event
            .as_mut_log()
            .get_host()
            .map(|host| host.to_string_lossy().into_owned());
        let process = self
            .process
            .render_string(&event)
            .map_err(|error| {
                emit!(TemplateRenderingError {
                    error,
                    field: Some("process"),
                    drop_event: false,
                })
            })
            .ok()
            .unwrap_or_else(|| String::from("vector"));
        let formatter = Formatter3164 {
            facility: Facility::LOG_USER,
            hostname: host,
            process,
            pid: self.pid,
        };
        self.transformer.transform(&mut event);
        let mut bytes = BytesMut::new();
        self.encoder.encode(event, &mut bytes)?;
        let message = String::from_utf8_lossy(&bytes);
        formatter
            .format(&mut buffer.writer(), Severity::LOG_INFO, message)
            .map_err(|error| Self::Error::SerializingError(format!("{}", error).into()))?;
        buffer.put_u8(b'\n');
        Ok(())
    }
}
#[cfg(test)]
mod tests {
    use serde::Deserialize;
    use std::convert::TryFrom;
    use bytes::BytesMut;
    use futures::{future::ready, stream};
    use tokio_util::codec::Encoder as _;
    use vector_lib::codecs::JsonSerializerConfig;
    use vector_lib::event::{Event, LogEvent};
    use crate::test_util::{
        components::{run_and_assert_sink_compliance, SINK_TAGS},
        http::{always_200_response, spawn_blackhole_http_server},
    };
    use super::*;
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<PapertrailConfig>();
    }
    #[tokio::test]
    async fn component_spec_compliance() {
        let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
        let config = PapertrailConfig::generate_config().to_string();
        let mut config = PapertrailConfig::deserialize(toml::de::ValueDeserializer::new(&config))
            .expect("config should be valid");
        config.endpoint = mock_endpoint.into();
        config.tls = Some(TlsEnableableConfig::default());
        let context = SinkContext::default();
        let (sink, _healthcheck) = config.build(context).await.unwrap();
        let event = Event::Log(LogEvent::from("simple message"));
        run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
    }
    #[test]
    fn encode_event_apply_rules() {
        let mut evt = Event::Log(LogEvent::from("vector"));
        evt.as_mut_log().insert("magic", "key");
        evt.as_mut_log().insert("process", "foo");
        let mut encoder = PapertrailEncoder {
            pid: 0,
            process: Template::try_from("{{ process }}").unwrap(),
            transformer: Transformer::new(None, Some(vec!["magic".into()]), None).unwrap(),
            encoder: Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
        };
        let mut bytes = BytesMut::new();
        encoder.encode(evt, &mut bytes).unwrap();
        let bytes = bytes.freeze();
        let msg = bytes.slice(String::from_utf8_lossy(&bytes).find(": ").unwrap() + 2..bytes.len());
        let value: serde_json::Value = serde_json::from_slice(&msg).unwrap();
        let value = value.as_object().unwrap();
        assert!(!value.contains_key("magic"));
        assert_eq!(value.get("process").unwrap().as_str(), Some("foo"));
    }
}