use std::{
    collections::{HashMap, HashSet},
    io::SeekFrom,
    path::PathBuf,
    process::Stdio,
    str::FromStr,
    sync::{Arc, LazyLock},
    time::Duration,
};
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use futures::{poll, stream::BoxStream, task::Poll, StreamExt};
use nix::{
    sys::signal::{kill, Signal},
    unistd::Pid,
};
use serde_json::{Error as JsonError, Value as JsonValue};
use snafu::{ResultExt, Snafu};
use tokio::{
    fs::{File, OpenOptions},
    io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
    process::{Child, Command},
    sync::{Mutex, MutexGuard},
    time::sleep,
};
use tokio_util::codec::FramedRead;
use vector_lib::codecs::{decoding::BoxedFramingError, CharacterDelimitedDecoder};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::{metadata_path, owned_value_path, path};
use vector_lib::{
    config::{LegacyKey, LogNamespace},
    schema::Definition,
    EstimatedJsonEncodedSizeOf,
};
use vector_lib::{
    finalizer::OrderedFinalizer,
    internal_event::{
        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
    },
};
use vrl::event_path;
use vrl::value::{kind::Collection, Kind, Value};
use crate::{
    config::{
        log_schema, DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
        SourceOutput,
    },
    event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
    internal_events::{
        EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
        JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
        StreamClosedError,
    },
    serde::bool_or_struct,
    shutdown::ShutdownSignal,
    SourceSender,
};
const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
const CURSOR: &str = "__CURSOR";
const HOSTNAME: &str = "_HOSTNAME";
const MESSAGE: &str = "MESSAGE";
const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
const BACKOFF_DURATION: Duration = Duration::from_secs(1);
static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
#[derive(Debug, Snafu)]
enum BuildError {
    #[snafu(display("journalctl failed to execute: {}", source))]
    JournalctlSpawn { source: io::Error },
    #[snafu(display(
        "The unit {:?} is duplicated in both include_units and exclude_units",
        unit
    ))]
    DuplicatedUnit { unit: String },
    #[snafu(display(
        "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
        field,
        value,
    ))]
    DuplicatedMatches { field: String, value: String },
}
type Matches = HashMap<String, HashSet<String>>;
#[configurable_component(source("journald", "Collect logs from JournalD."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct JournaldConfig {
    #[serde(default)]
    pub since_now: bool,
    #[serde(default = "crate::serde::default_true")]
    pub current_boot_only: bool,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
    pub include_units: Vec<String>,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
    pub exclude_units: Vec<String>,
    #[serde(default)]
    #[configurable(metadata(
        docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
    ))]
    #[configurable(metadata(docs::examples = "matches_examples()"))]
    pub include_matches: Matches,
    #[serde(default)]
    #[configurable(metadata(
        docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
    ))]
    #[configurable(metadata(docs::examples = "matches_examples()"))]
    pub exclude_matches: Matches,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "/var/lib/vector"))]
    #[configurable(metadata(docs::human_name = "Data Directory"))]
    pub data_dir: Option<PathBuf>,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "--merge"))]
    pub extra_args: Vec<String>,
    #[serde(default = "default_batch_size")]
    #[configurable(metadata(docs::type_unit = "events"))]
    pub batch_size: usize,
    #[serde(default)]
    pub journalctl_path: Option<PathBuf>,
    #[serde(default)]
    pub journal_directory: Option<PathBuf>,
    #[serde(default)]
    pub journal_namespace: Option<String>,
    #[configurable(derived)]
    #[serde(default, deserialize_with = "bool_or_struct")]
    acknowledgements: SourceAcknowledgementsConfig,
    #[serde(default)]
    #[configurable(
        deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
    )]
    remap_priority: bool,
    #[configurable(metadata(docs::hidden))]
    #[serde(default)]
    log_namespace: Option<bool>,
    #[serde(default = "crate::serde::default_false")]
    emit_cursor: bool,
}
const fn default_batch_size() -> usize {
    16
}
fn matches_examples() -> HashMap<String, Vec<String>> {
    HashMap::<_, _>::from_iter([
        (
            "_SYSTEMD_UNIT".to_owned(),
            vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
        ),
        ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
    ])
}
impl JournaldConfig {
    fn merged_include_matches(&self) -> Matches {
        Self::merge_units(&self.include_matches, &self.include_units)
    }
    fn merged_exclude_matches(&self) -> Matches {
        Self::merge_units(&self.exclude_matches, &self.exclude_units)
    }
    fn merge_units(matches: &Matches, units: &[String]) -> Matches {
        let mut matches = matches.clone();
        for unit in units {
            let entry = matches.entry(String::from(SYSTEMD_UNIT));
            entry.or_default().insert(fixup_unit(unit));
        }
        matches
    }
    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
        let schema_definition = match log_namespace {
            LogNamespace::Vector => Definition::new_with_default_metadata(
                Kind::bytes().or_null(),
                [LogNamespace::Vector],
            ),
            LogNamespace::Legacy => Definition::new_with_default_metadata(
                Kind::object(Collection::empty()),
                [LogNamespace::Legacy],
            ),
        };
        let mut schema_definition = schema_definition
            .with_standard_vector_source_metadata()
            .with_source_metadata(
                JournaldConfig::NAME,
                None,
                &owned_value_path!("metadata"),
                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
                None,
            )
            .with_source_metadata(
                JournaldConfig::NAME,
                None,
                &owned_value_path!("timestamp"),
                Kind::timestamp().or_undefined(),
                Some("timestamp"),
            )
            .with_source_metadata(
                JournaldConfig::NAME,
                log_schema().host_key().cloned().map(LegacyKey::Overwrite),
                &owned_value_path!("host"),
                Kind::bytes().or_undefined(),
                Some("host"),
            );
        if log_namespace == LogNamespace::Legacy {
            schema_definition = schema_definition.unknown_fields(Kind::bytes());
        }
        schema_definition
    }
}
impl Default for JournaldConfig {
    fn default() -> Self {
        Self {
            since_now: false,
            current_boot_only: true,
            include_units: vec![],
            exclude_units: vec![],
            include_matches: Default::default(),
            exclude_matches: Default::default(),
            data_dir: None,
            batch_size: default_batch_size(),
            journalctl_path: None,
            journal_directory: None,
            journal_namespace: None,
            extra_args: vec![],
            acknowledgements: Default::default(),
            remap_priority: false,
            log_namespace: None,
            emit_cursor: false,
        }
    }
}
impl_generate_config_from_default!(JournaldConfig);
type Record = HashMap<String, String>;
#[async_trait::async_trait]
#[typetag::serde(name = "journald")]
impl SourceConfig for JournaldConfig {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        if self.remap_priority {
            warn!("DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead.");
        }
        let data_dir = cx
            .globals
            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
        if let Some(unit) = self
            .include_units
            .iter()
            .find(|unit| self.exclude_units.contains(unit))
        {
            let unit = unit.into();
            return Err(BuildError::DuplicatedUnit { unit }.into());
        }
        let include_matches = self.merged_include_matches();
        let exclude_matches = self.merged_exclude_matches();
        if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
            return Err(BuildError::DuplicatedMatches { field, value }.into());
        }
        let mut checkpoint_path = data_dir;
        checkpoint_path.push(CHECKPOINT_FILENAME);
        let journalctl_path = self
            .journalctl_path
            .clone()
            .unwrap_or_else(|| JOURNALCTL.clone());
        let starter = StartJournalctl::new(
            journalctl_path,
            self.journal_directory.clone(),
            self.journal_namespace.clone(),
            self.current_boot_only,
            self.since_now,
            self.extra_args.clone(),
        );
        let batch_size = self.batch_size;
        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
        let log_namespace = cx.log_namespace(self.log_namespace);
        Ok(Box::pin(
            JournaldSource {
                include_matches,
                exclude_matches,
                checkpoint_path,
                batch_size,
                remap_priority: self.remap_priority,
                out: cx.out,
                acknowledgements,
                starter,
                log_namespace,
                emit_cursor: self.emit_cursor,
            }
            .run_shutdown(cx.shutdown),
        ))
    }
    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        let schema_definition =
            self.schema_definition(global_log_namespace.merge(self.log_namespace));
        vec![SourceOutput::new_maybe_logs(
            DataType::Log,
            schema_definition,
        )]
    }
    fn can_acknowledge(&self) -> bool {
        true
    }
}
struct JournaldSource {
    include_matches: Matches,
    exclude_matches: Matches,
    checkpoint_path: PathBuf,
    batch_size: usize,
    remap_priority: bool,
    out: SourceSender,
    acknowledgements: bool,
    starter: StartJournalctl,
    log_namespace: LogNamespace,
    emit_cursor: bool,
}
impl JournaldSource {
    async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
        let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
            .await
            .map_err(|error| {
                emit!(JournaldCheckpointFileOpenError {
                    error,
                    path: self
                        .checkpoint_path
                        .to_str()
                        .unwrap_or("unknown")
                        .to_string(),
                });
            })?;
        let checkpointer = SharedCheckpointer::new(checkpointer);
        let finalizer = Finalizer::new(
            self.acknowledgements,
            checkpointer.clone(),
            shutdown.clone(),
        );
        self.run(checkpointer, finalizer, shutdown).await;
        Ok(())
    }
    async fn run(
        mut self,
        checkpointer: SharedCheckpointer,
        finalizer: Finalizer,
        mut shutdown: ShutdownSignal,
    ) {
        loop {
            if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
                break;
            }
            info!("Starting journalctl.");
            let cursor = checkpointer.lock().await.cursor.clone();
            match self.starter.start(cursor.as_deref()) {
                Ok((stream, running)) => {
                    if !self.run_stream(stream, &finalizer, shutdown.clone()).await {
                        return;
                    }
                    drop(running);
                }
                Err(error) => {
                    emit!(JournaldStartJournalctlError { error });
                }
            }
            tokio::select! {
                _ = &mut shutdown => break,
                _ = sleep(BACKOFF_DURATION) => (),
            }
        }
    }
    async fn run_stream<'a>(
        &'a mut self,
        mut stream: JournalStream,
        finalizer: &'a Finalizer,
        mut shutdown: ShutdownSignal,
    ) -> bool {
        let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
        let events_received = register!(EventsReceived);
        let batch_size = self.batch_size;
        loop {
            let mut batch = Batch::new(self);
            while batch.events.is_empty() {
                let item = tokio::select! {
                    _ = &mut shutdown => return false,
                    item = stream.next() => item,
                };
                if !batch.handle_next(item) {
                    return true;
                }
            }
            let timeout = tokio::time::sleep(BATCH_TIMEOUT);
            tokio::pin!(timeout);
            for _ in 1..batch_size {
                tokio::select! {
                    _ = &mut timeout => break,
                    result = stream.next() => if !batch.handle_next(result) {
                        break;
                    }
                }
            }
            if let Some(x) = batch
                .finish(finalizer, &bytes_received, &events_received)
                .await
            {
                break x;
            }
        }
    }
}
struct Batch<'a> {
    events: Vec<LogEvent>,
    record_size: usize,
    exiting: Option<bool>,
    batch: Option<BatchNotifier>,
    receiver: Option<BatchStatusReceiver>,
    source: &'a mut JournaldSource,
    cursor: Option<String>,
}
impl<'a> Batch<'a> {
    fn new(source: &'a mut JournaldSource) -> Self {
        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
        Self {
            events: Vec::new(),
            record_size: 0,
            exiting: None,
            batch,
            receiver,
            source,
            cursor: None,
        }
    }
    fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
        match result {
            None => {
                warn!("Journalctl process stopped.");
                self.exiting = Some(true);
                false
            }
            Some(Err(error)) => {
                emit!(JournaldReadError { error });
                false
            }
            Some(Ok(bytes)) => {
                match decode_record(&bytes, self.source.remap_priority) {
                    Ok(mut record) => {
                        if self.source.emit_cursor {
                            if let Some(tmp) = record.get(CURSOR) {
                                self.cursor = Some(tmp.clone());
                            }
                        } else if let Some(tmp) = record.remove(CURSOR) {
                            self.cursor = Some(tmp);
                        }
                        if !filter_matches(
                            &record,
                            &self.source.include_matches,
                            &self.source.exclude_matches,
                        ) {
                            self.record_size += bytes.len();
                            let mut event = create_log_event_from_record(
                                record,
                                &self.batch,
                                self.source.log_namespace,
                            );
                            enrich_log_event(&mut event, self.source.log_namespace);
                            self.events.push(event);
                        }
                    }
                    Err(error) => {
                        emit!(JournaldInvalidRecordError {
                            error,
                            text: String::from_utf8_lossy(&bytes).into_owned()
                        });
                    }
                }
                true
            }
        }
    }
    async fn finish(
        mut self,
        finalizer: &Finalizer,
        bytes_received: &'a Registered<BytesReceived>,
        events_received: &'a Registered<EventsReceived>,
    ) -> Option<bool> {
        drop(self.batch);
        if self.record_size > 0 {
            bytes_received.emit(ByteSize(self.record_size));
        }
        if !self.events.is_empty() {
            let count = self.events.len();
            let byte_size = self.events.estimated_json_encoded_size_of();
            events_received.emit(CountByteSize(count, byte_size));
            match self.source.out.send_batch(self.events).await {
                Ok(_) => {
                    if let Some(cursor) = self.cursor {
                        finalizer.finalize(cursor, self.receiver).await;
                    }
                }
                Err(_) => {
                    emit!(StreamClosedError { count });
                    self.exiting = Some(false);
                }
            }
        }
        self.exiting
    }
}
type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
struct StartJournalctl {
    path: PathBuf,
    journal_dir: Option<PathBuf>,
    journal_namespace: Option<String>,
    current_boot_only: bool,
    since_now: bool,
    extra_args: Vec<String>,
}
impl StartJournalctl {
    const fn new(
        path: PathBuf,
        journal_dir: Option<PathBuf>,
        journal_namespace: Option<String>,
        current_boot_only: bool,
        since_now: bool,
        extra_args: Vec<String>,
    ) -> Self {
        Self {
            path,
            journal_dir,
            journal_namespace,
            current_boot_only,
            since_now,
            extra_args,
        }
    }
    fn make_command(&self, checkpoint: Option<&str>) -> Command {
        let mut command = Command::new(&self.path);
        command.stdout(Stdio::piped());
        command.arg("--follow");
        command.arg("--all");
        command.arg("--show-cursor");
        command.arg("--output=json");
        if let Some(dir) = &self.journal_dir {
            command.arg(format!("--directory={}", dir.display()));
        }
        if let Some(namespace) = &self.journal_namespace {
            command.arg(format!("--namespace={}", namespace));
        }
        if self.current_boot_only {
            command.arg("--boot");
        }
        if let Some(cursor) = checkpoint {
            command.arg(format!("--after-cursor={}", cursor));
        } else if self.since_now {
            command.arg("--since=now");
        } else {
            command.arg("--since=2000-01-01");
        }
        if !self.extra_args.is_empty() {
            command.args(&self.extra_args);
        }
        command
    }
    fn start(
        &mut self,
        checkpoint: Option<&str>,
    ) -> crate::Result<(JournalStream, RunningJournalctl)> {
        let mut command = self.make_command(checkpoint);
        let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
        let stream = FramedRead::new(
            child.stdout.take().unwrap(),
            CharacterDelimitedDecoder::new(b'\n'),
        )
        .boxed();
        Ok((stream, RunningJournalctl(child)))
    }
}
struct RunningJournalctl(Child);
impl Drop for RunningJournalctl {
    fn drop(&mut self) {
        if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
            _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
        }
    }
}
fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
    match log_namespace {
        LogNamespace::Vector => {
            if let Some(host) = log
                .get(metadata_path!(JournaldConfig::NAME, "metadata"))
                .and_then(|meta| meta.get(HOSTNAME))
            {
                log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
            }
        }
        LogNamespace::Legacy => {
            if let Some(host) = log.remove(event_path!(HOSTNAME)) {
                log_namespace.insert_source_metadata(
                    JournaldConfig::NAME,
                    log,
                    log_schema().host_key().map(LegacyKey::Overwrite),
                    path!("host"),
                    host,
                );
            }
        }
    }
    let timestamp_value = match log_namespace {
        LogNamespace::Vector => log
            .get(metadata_path!(JournaldConfig::NAME, "metadata"))
            .and_then(|meta| {
                meta.get(SOURCE_TIMESTAMP)
                    .or_else(|| meta.get(RECEIVED_TIMESTAMP))
            }),
        LogNamespace::Legacy => log
            .get(event_path!(SOURCE_TIMESTAMP))
            .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
    };
    let timestamp = timestamp_value
        .filter(|&ts| ts.is_bytes())
        .and_then(|ts| {
            String::from_utf8_lossy(ts.as_bytes().unwrap())
                .parse::<u64>()
                .ok()
        })
        .map(|ts| {
            chrono::Utc
                .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
                .single()
                .expect("invalid timestamp")
        });
    match log_namespace {
        LogNamespace::Vector => {
            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
            if let Some(ts) = timestamp {
                log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
            }
        }
        LogNamespace::Legacy => {
            if let Some(ts) = timestamp {
                log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
            }
        }
    }
    log_namespace.insert_vector_metadata(
        log,
        log_schema().source_type_key(),
        path!("source_type"),
        JournaldConfig::NAME,
    );
}
fn create_log_event_from_record(
    mut record: Record,
    batch: &Option<BatchNotifier>,
    log_namespace: LogNamespace,
) -> LogEvent {
    match log_namespace {
        LogNamespace::Vector => {
            let message_value = record
                .remove(MESSAGE)
                .map(|msg| Value::Bytes(Bytes::from(msg)))
                .unwrap_or(Value::Null);
            let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
            record.iter().for_each(|(key, value)| {
                log.metadata_mut()
                    .value_mut()
                    .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
            });
            log
        }
        LogNamespace::Legacy => {
            let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
            if let Some(message) = log.remove(event_path!(MESSAGE)) {
                log.maybe_insert(log_schema().message_key_target_path(), message);
            }
            log
        }
    }
}
fn fixup_unit(unit: &str) -> String {
    if unit.contains('.') {
        unit.into()
    } else {
        format!("{}.service", unit)
    }
}
fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
    let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
    if let Some(record) = record.as_object_mut() {
        for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
            *value = decode_array(value.as_array().expect("already validated"));
        }
    }
    if remap {
        record.get_mut("PRIORITY").map(remap_priority);
    }
    serde_json::from_value(record)
}
fn decode_array(array: &[JsonValue]) -> JsonValue {
    decode_array_as_bytes(array).unwrap_or_else(|| {
        let ser = serde_json::to_string(array).expect("already deserialized");
        JsonValue::String(ser)
    })
}
fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
    array
        .iter()
        .map(|item| {
            item.as_u64().and_then(|num| match num {
                num if num <= u8::MAX as u64 => Some(num as u8),
                _ => None,
            })
        })
        .collect::<Option<Vec<u8>>>()
        .map(|array| String::from_utf8_lossy(&array).into())
}
fn remap_priority(priority: &mut JsonValue) {
    if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
        let text = match num {
            0 => "EMERG",
            1 => "ALERT",
            2 => "CRIT",
            3 => "ERR",
            4 => "WARNING",
            5 => "NOTICE",
            6 => "INFO",
            7 => "DEBUG",
            _ => "UNKNOWN",
        };
        *priority = JsonValue::String(text.into());
    }
}
fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
    match (includes.is_empty(), excludes.is_empty()) {
        (true, true) => false,
        (false, true) => !contains_match(record, includes),
        (true, false) => contains_match(record, excludes),
        (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
    }
}
fn contains_match(record: &Record, matches: &Matches) -> bool {
    let f = move |(field, value)| {
        matches
            .get(field)
            .map(|x| x.contains(value))
            .unwrap_or(false)
    };
    record.iter().any(f)
}
fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
    for (a_key, a_values) in a_matches {
        if let Some(b_values) = b_matches.get(a_key.as_str()) {
            for (a, b) in a_values
                .iter()
                .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
            {
                if a == b {
                    return Some((a_key.into(), b.into()));
                }
            }
        }
    }
    None
}
enum Finalizer {
    Sync(SharedCheckpointer),
    Async(OrderedFinalizer<String>),
}
impl Finalizer {
    fn new(
        acknowledgements: bool,
        checkpointer: SharedCheckpointer,
        shutdown: ShutdownSignal,
    ) -> Self {
        if acknowledgements {
            let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
            tokio::spawn(async move {
                while let Some((status, cursor)) = ack_stream.next().await {
                    if status == BatchStatus::Delivered {
                        checkpointer.lock().await.set(cursor).await;
                    }
                }
            });
            Self::Async(finalizer)
        } else {
            Self::Sync(checkpointer)
        }
    }
    async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
        match (self, receiver) {
            (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
            (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
            _ => {
                unreachable!("Cannot have async finalization without a receiver in journald source")
            }
        }
    }
}
struct Checkpointer {
    file: File,
    filename: PathBuf,
}
impl Checkpointer {
    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
        let file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .truncate(false)
            .open(&filename)
            .await?;
        Ok(Checkpointer { file, filename })
    }
    async fn set(&mut self, token: &str) -> Result<(), io::Error> {
        self.file.seek(SeekFrom::Start(0)).await?;
        self.file.write_all(format!("{}\n", token).as_bytes()).await
    }
    async fn get(&mut self) -> Result<Option<String>, io::Error> {
        let mut buf = Vec::<u8>::new();
        self.file.seek(SeekFrom::Start(0)).await?;
        self.file.read_to_end(&mut buf).await?;
        match buf.len() {
            0 => Ok(None),
            _ => {
                let text = String::from_utf8_lossy(&buf);
                match text.find('\n') {
                    Some(nl) => Ok(Some(String::from(&text[..nl]))),
                    None => Ok(None), }
            }
        }
    }
}
struct StatefulCheckpointer {
    checkpointer: Checkpointer,
    cursor: Option<String>,
}
impl StatefulCheckpointer {
    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
        let mut checkpointer = Checkpointer::new(filename).await?;
        let cursor = checkpointer.get().await?;
        Ok(Self {
            checkpointer,
            cursor,
        })
    }
    async fn set(&mut self, token: impl Into<String>) {
        let token = token.into();
        if let Err(error) = self.checkpointer.set(&token).await {
            emit!(JournaldCheckpointSetError {
                error,
                filename: self
                    .checkpointer
                    .filename
                    .to_str()
                    .unwrap_or("unknown")
                    .to_string(),
            });
        }
        self.cursor = Some(token);
    }
}
#[derive(Clone)]
struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
impl SharedCheckpointer {
    fn new(c: StatefulCheckpointer) -> Self {
        Self(Arc::new(Mutex::new(c)))
    }
    async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
        self.0.lock().await
    }
}
#[cfg(test)]
mod checkpointer_tests {
    use tempfile::tempdir;
    use tokio::fs::read_to_string;
    use super::*;
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<JournaldConfig>();
    }
    #[tokio::test]
    async fn journald_checkpointer_works() {
        let tempdir = tempdir().unwrap();
        let mut filename = tempdir.path().to_path_buf();
        filename.push(CHECKPOINT_FILENAME);
        let mut checkpointer = Checkpointer::new(filename.clone())
            .await
            .expect("Creating checkpointer failed!");
        assert!(checkpointer.get().await.unwrap().is_none());
        checkpointer
            .set("first test")
            .await
            .expect("Setting checkpoint failed");
        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
        let contents = read_to_string(filename.clone())
            .await
            .unwrap_or_else(|_| panic!("Failed to read: {:?}", filename));
        assert!(contents.starts_with("first test\n"));
        checkpointer
            .set("second")
            .await
            .expect("Setting checkpoint failed");
        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
        let contents = read_to_string(filename.clone())
            .await
            .unwrap_or_else(|_| panic!("Failed to read: {:?}", filename));
        assert!(contents.starts_with("second\n"));
    }
}
#[cfg(test)]
mod tests {
    use std::{fs, path::Path};
    use tempfile::tempdir;
    use tokio::time::{sleep, timeout, Duration, Instant};
    use vrl::value::{kind::Collection, Value};
    use super::*;
    use crate::{
        config::ComponentKey, event::Event, event::EventStatus,
        test_util::components::assert_source_compliance,
    };
    const TEST_COMPONENT: &str = "journald-test";
    const TEST_JOURNALCTL: &str = "tests/data/journalctl";
    async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
        let include_matches = create_unit_matches(iunits.to_vec());
        let exclude_matches = create_unit_matches(xunits.to_vec());
        run_journal(include_matches, exclude_matches, cursor, false).await
    }
    async fn run_journal(
        include_matches: Matches,
        exclude_matches: Matches,
        checkpoint: Option<&str>,
        emit_cursor: bool,
    ) -> Vec<Event> {
        assert_source_compliance(&["protocol"], async move {
            let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
            let tempdir = tempdir().unwrap();
            let tempdir = tempdir.path().to_path_buf();
            if let Some(cursor) = checkpoint {
                let mut checkpoint_path = tempdir.clone();
                checkpoint_path.push(TEST_COMPONENT);
                fs::create_dir(&checkpoint_path).unwrap();
                checkpoint_path.push(CHECKPOINT_FILENAME);
                let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
                    .await
                    .expect("Creating checkpointer failed!");
                checkpointer
                    .set(cursor)
                    .await
                    .expect("Could not set checkpoint");
            }
            let (cx, shutdown) =
                SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
            let config = JournaldConfig {
                journalctl_path: Some(TEST_JOURNALCTL.into()),
                include_matches,
                exclude_matches,
                data_dir: Some(tempdir),
                remap_priority: true,
                acknowledgements: false.into(),
                emit_cursor,
                ..Default::default()
            };
            let source = config.build(cx).await.unwrap();
            tokio::spawn(async move { source.await.unwrap() });
            sleep(Duration::from_millis(100)).await;
            shutdown
                .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
                .await;
            timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
        })
        .await
    }
    fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
        let units: HashSet<String> = units.into_iter().map(Into::into).collect();
        let mut map = HashMap::new();
        if !units.is_empty() {
            map.insert(String::from(SYSTEMD_UNIT), units);
        }
        map
    }
    fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
        let mut matches: Matches = HashMap::new();
        for (field, value) in conditions {
            matches
                .entry(field.into())
                .or_default()
                .insert(value.into());
        }
        matches
    }
    #[tokio::test]
    async fn reads_journal() {
        let received = run_with_units(&[], &[], None).await;
        assert_eq!(received.len(), 8);
        assert_eq!(
            message(&received[0]),
            Value::Bytes("System Initialization".into())
        );
        assert_eq!(
            received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
            "journald".into()
        );
        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
        assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
        assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
        assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
    }
    #[tokio::test]
    async fn includes_units() {
        let received = run_with_units(&["unit.service"], &[], None).await;
        assert_eq!(received.len(), 1);
        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
    }
    #[tokio::test]
    async fn excludes_units() {
        let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
        assert_eq!(received.len(), 6);
        assert_eq!(
            message(&received[0]),
            Value::Bytes("System Initialization".into())
        );
        assert_eq!(
            message(&received[1]),
            Value::Bytes("Missing timestamp".into())
        );
        assert_eq!(
            message(&received[2]),
            Value::Bytes("Different timestamps".into())
        );
    }
    #[tokio::test]
    async fn emits_cursor() {
        let received = run_journal(Matches::new(), Matches::new(), None, true).await;
        assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
        assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
        assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
    }
    #[tokio::test]
    async fn includes_matches() {
        let matches = create_matches(vec![("PRIORITY", "ERR")]);
        let received = run_journal(matches, HashMap::new(), None, false).await;
        assert_eq!(received.len(), 2);
        assert_eq!(
            message(&received[0]),
            Value::Bytes("Different timestamps".into())
        );
        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
        assert_eq!(
            message(&received[1]),
            Value::Bytes("Non-ASCII in other field".into())
        );
        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
    }
    #[tokio::test]
    async fn includes_kernel() {
        let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
        let received = run_journal(matches, HashMap::new(), None, false).await;
        assert_eq!(received.len(), 1);
        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
        assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
    }
    #[tokio::test]
    async fn excludes_matches() {
        let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
        let received = run_journal(HashMap::new(), matches, None, false).await;
        assert_eq!(received.len(), 5);
        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
        assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
        assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
        assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
    }
    #[tokio::test]
    async fn handles_checkpoint() {
        let received = run_with_units(&[], &[], Some("1")).await;
        assert_eq!(received.len(), 7);
        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
    }
    #[tokio::test]
    async fn parses_array_messages() {
        let received = run_with_units(&["badunit.service"], &[], None).await;
        assert_eq!(received.len(), 1);
        assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
    }
    #[tokio::test]
    async fn parses_array_fields() {
        let received = run_with_units(&["syslog.service"], &[], None).await;
        assert_eq!(received.len(), 1);
        assert_eq!(
            received[0].as_log()["SYSLOG_RAW"],
            Value::Bytes("¿World?".into())
        );
    }
    #[tokio::test]
    async fn parses_string_sequences() {
        let received = run_with_units(&["NetworkManager.service"], &[], None).await;
        assert_eq!(received.len(), 1);
        assert_eq!(
            received[0].as_log()["SYSLOG_FACILITY"],
            Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
        );
    }
    #[tokio::test]
    async fn handles_missing_timestamp() {
        let received = run_with_units(&["stdout"], &[], None).await;
        assert_eq!(received.len(), 2);
        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
    }
    #[tokio::test]
    async fn handles_acknowledgements() {
        let (tx, mut rx) = SourceSender::new_test();
        let tempdir = tempdir().unwrap();
        let tempdir = tempdir.path().to_path_buf();
        let mut checkpoint_path = tempdir.clone();
        checkpoint_path.push(TEST_COMPONENT);
        fs::create_dir(&checkpoint_path).unwrap();
        checkpoint_path.push(CHECKPOINT_FILENAME);
        let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
            .await
            .expect("Creating checkpointer failed!");
        let config = JournaldConfig {
            journalctl_path: Some(TEST_JOURNALCTL.into()),
            data_dir: Some(tempdir),
            remap_priority: true,
            acknowledgements: true.into(),
            ..Default::default()
        };
        let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
        let source = config.build(cx).await.unwrap();
        tokio::spawn(async move { source.await.unwrap() });
        assert_eq!(checkpointer.get().await.unwrap(), None);
        tokio::time::sleep(Duration::from_millis(100)).await;
        let mut count = 0;
        while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
            assert_eq!(checkpointer.get().await.unwrap(), None);
            event.metadata().update_status(EventStatus::Delivered);
            count += 1;
        }
        assert_eq!(count, 8);
        tokio::time::sleep(Duration::from_millis(100)).await;
        assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
    }
    #[test]
    fn filter_matches_works_correctly() {
        let empty: Matches = HashMap::new();
        let includes = create_unit_matches(vec!["one", "two"]);
        let excludes = create_unit_matches(vec!["foo", "bar"]);
        let zero = HashMap::new();
        assert!(!filter_matches(&zero, &empty, &empty));
        assert!(filter_matches(&zero, &includes, &empty));
        assert!(!filter_matches(&zero, &empty, &excludes));
        assert!(filter_matches(&zero, &includes, &excludes));
        let mut one = HashMap::new();
        one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
        assert!(!filter_matches(&one, &empty, &empty));
        assert!(!filter_matches(&one, &includes, &empty));
        assert!(!filter_matches(&one, &empty, &excludes));
        assert!(!filter_matches(&one, &includes, &excludes));
        let mut two = HashMap::new();
        two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
        assert!(!filter_matches(&two, &empty, &empty));
        assert!(filter_matches(&two, &includes, &empty));
        assert!(filter_matches(&two, &empty, &excludes));
        assert!(filter_matches(&two, &includes, &excludes));
    }
    #[test]
    fn merges_units_and_matches_option() {
        let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
        let include_matches = create_matches(vec![
            ("_SYSTEMD_UNIT", "three.service"),
            ("_TRANSPORT", "kernel"),
        ]);
        let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
        let exclude_matches = create_matches(vec![
            ("_SYSTEMD_UNIT", "baz.service"),
            ("PRIORITY", "DEBUG"),
        ]);
        let journald_config = JournaldConfig {
            include_units,
            include_matches,
            exclude_units,
            exclude_matches,
            ..Default::default()
        };
        let hashset =
            |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
        let matches = journald_config.merged_include_matches();
        let units = matches.get("_SYSTEMD_UNIT").unwrap();
        assert_eq!(
            units,
            &hashset(&["one.service", "two.service", "three.service"])
        );
        let units = matches.get("_TRANSPORT").unwrap();
        assert_eq!(units, &hashset(&["kernel"]));
        let matches = journald_config.merged_exclude_matches();
        let units = matches.get("_SYSTEMD_UNIT").unwrap();
        assert_eq!(
            units,
            &hashset(&["foo.service", "bar.service", "baz.service"])
        );
        let units = matches.get("PRIORITY").unwrap();
        assert_eq!(units, &hashset(&["DEBUG"]));
    }
    #[test]
    fn find_duplicate_match_works_correctly() {
        let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
        let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
        let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
        assert_eq!(field, "_TRANSPORT");
        assert_eq!(value, "kernel");
        let empty = HashMap::new();
        let actual = find_duplicate_match(&empty, &empty);
        assert!(actual.is_none());
        let actual = find_duplicate_match(&include_matches, &empty);
        assert!(actual.is_none());
        let actual = find_duplicate_match(&empty, &exclude_matches);
        assert!(actual.is_none());
    }
    #[test]
    fn command_options() {
        let path = PathBuf::from("journalctl");
        let journal_dir = None;
        let journal_namespace = None;
        let current_boot_only = false;
        let cursor = None;
        let since_now = false;
        let extra_args = vec![];
        let command = create_command(
            &path,
            journal_dir,
            journal_namespace,
            current_boot_only,
            since_now,
            cursor,
            extra_args,
        );
        let cmd_line = format!("{:?}", command);
        assert!(!cmd_line.contains("--directory="));
        assert!(!cmd_line.contains("--namespace="));
        assert!(!cmd_line.contains("--boot"));
        assert!(cmd_line.contains("--since=2000-01-01"));
        let journal_dir = None;
        let journal_namespace = None;
        let since_now = true;
        let extra_args = vec![];
        let command = create_command(
            &path,
            journal_dir,
            journal_namespace,
            current_boot_only,
            since_now,
            cursor,
            extra_args,
        );
        let cmd_line = format!("{:?}", command);
        assert!(cmd_line.contains("--since=now"));
        let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
        let journal_namespace = Some(String::from("my_namespace"));
        let current_boot_only = true;
        let cursor = Some("2021-01-01");
        let extra_args = vec!["--merge".to_string()];
        let command = create_command(
            &path,
            journal_dir,
            journal_namespace,
            current_boot_only,
            since_now,
            cursor,
            extra_args,
        );
        let cmd_line = format!("{:?}", command);
        assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
        assert!(cmd_line.contains("--namespace=my_namespace"));
        assert!(cmd_line.contains("--boot"));
        assert!(cmd_line.contains("--after-cursor="));
        assert!(cmd_line.contains("--merge"));
    }
    fn create_command(
        path: &Path,
        journal_dir: Option<PathBuf>,
        journal_namespace: Option<String>,
        current_boot_only: bool,
        since_now: bool,
        cursor: Option<&str>,
        extra_args: Vec<String>,
    ) -> Command {
        StartJournalctl::new(
            path.into(),
            journal_dir,
            journal_namespace,
            current_boot_only,
            since_now,
            extra_args,
        )
        .make_command(cursor)
    }
    fn message(event: &Event) -> Value {
        event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
    }
    fn timestamp(event: &Event) -> Value {
        event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
    }
    fn cursor(event: &Event) -> Value {
        event.as_log()[CURSOR].clone()
    }
    fn value_ts(secs: i64, usecs: u32) -> Value {
        Value::Timestamp(
            chrono::Utc
                .timestamp_opt(secs, usecs)
                .single()
                .expect("invalid timestamp"),
        )
    }
    fn priority(event: &Event) -> Value {
        event.as_log()["PRIORITY"].clone()
    }
    #[test]
    fn output_schema_definition_vector_namespace() {
        let config = JournaldConfig {
            log_namespace: Some(true),
            ..Default::default()
        };
        let definitions = config
            .outputs(LogNamespace::Vector)
            .remove(0)
            .schema_definition(true);
        let expected_definition =
            Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
                .with_metadata_field(
                    &owned_value_path!("vector", "source_type"),
                    Kind::bytes(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!("vector", "ingest_timestamp"),
                    Kind::timestamp(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!(JournaldConfig::NAME, "metadata"),
                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
                    None,
                )
                .with_metadata_field(
                    &owned_value_path!(JournaldConfig::NAME, "timestamp"),
                    Kind::timestamp().or_undefined(),
                    Some("timestamp"),
                )
                .with_metadata_field(
                    &owned_value_path!(JournaldConfig::NAME, "host"),
                    Kind::bytes().or_undefined(),
                    Some("host"),
                );
        assert_eq!(definitions, Some(expected_definition))
    }
    #[test]
    fn output_schema_definition_legacy_namespace() {
        let config = JournaldConfig::default();
        let definitions = config
            .outputs(LogNamespace::Legacy)
            .remove(0)
            .schema_definition(true);
        let expected_definition = Definition::new_with_default_metadata(
            Kind::object(Collection::empty()),
            [LogNamespace::Legacy],
        )
        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
        .with_event_field(
            &owned_value_path!("host"),
            Kind::bytes().or_undefined(),
            Some("host"),
        )
        .unknown_fields(Kind::bytes());
        assert_eq!(definitions, Some(expected_definition))
    }
    fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
        let record = r#"{
            "PRIORITY":"6",
            "SYSLOG_FACILITY":"3",
            "SYSLOG_IDENTIFIER":"ntpd",
            "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
            "_CAP_EFFECTIVE":"3fffffffff",
            "_CMDLINE":"ntpd: [priv]",
            "_COMM":"ntpd",
            "_EXE":"/usr/sbin/ntpd",
            "_GID":"0",
            "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
            "_PID":"2156",
            "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
            "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
            "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
            "_SYSTEMD_SLICE":"system.slice",
            "_SYSTEMD_UNIT":"ntpd.service",
            "_TRANSPORT":"stdout",
            "_UID":"0",
            "__MONOTONIC_TIMESTAMP":"98694000446",
            "__REALTIME_TIMESTAMP":"1564173027000443",
            "host":"my-host.local",
            "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
            "source_type":"journald"
        }"#;
        let json: serde_json::Value = serde_json::from_str(record).unwrap();
        let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
        event.as_mut_log().insert("timestamp", chrono::Utc::now());
        let definitions = config.outputs(namespace).remove(0).schema_definition(true);
        definitions.unwrap().assert_valid_for_event(&event);
    }
    #[test]
    fn matches_schema_legacy() {
        let config = JournaldConfig::default();
        matches_schema(&config, LogNamespace::Legacy)
    }
}