use std::{convert::TryInto, future, path::PathBuf, time::Duration};
use bytes::Bytes;
use chrono::Utc;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use regex::bytes::Regex;
use serde_with::serde_as;
use snafu::{ResultExt, Snafu};
use tokio::{sync::oneshot, task::spawn_blocking};
use tracing::{Instrument, Span};
use vector_lib::codecs::{BytesDeserializer, BytesDeserializerConfig};
use vector_lib::configurable::configurable_component;
use vector_lib::file_source::{
    calculate_ignore_before,
    paths_provider::glob::{Glob, MatchOptions},
    Checkpointer, FileFingerprint, FileServer, FingerprintStrategy, Fingerprinter, Line, ReadFrom,
    ReadFromConfig,
};
use vector_lib::finalizer::OrderedFinalizer;
use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
use vector_lib::{
    config::{LegacyKey, LogNamespace},
    EstimatedJsonEncodedSizeOf,
};
use vrl::value::Kind;
use super::util::{EncodingConfig, MultilineConfig};
use crate::{
    config::{
        log_schema, DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
        SourceOutput,
    },
    encoding_transcode::{Decoder, Encoder},
    event::{BatchNotifier, BatchStatus, LogEvent},
    internal_events::{
        FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
        FileSourceInternalEventsEmitter, StreamClosedError,
    },
    line_agg::{self, LineAgg},
    serde::bool_or_struct,
    shutdown::ShutdownSignal,
    SourceSender,
};
#[derive(Debug, Snafu)]
enum BuildError {
    #[snafu(display(
        "message_start_indicator {:?} is not a valid regex: {}",
        indicator,
        source
    ))]
    InvalidMessageStartIndicator {
        indicator: String,
        source: regex::Error,
    },
}
#[serde_as]
#[configurable_component(source("file", "Collect logs from files."))]
#[derive(Clone, Debug, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct FileConfig {
    #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
    pub include: Vec<PathBuf>,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
    pub exclude: Vec<PathBuf>,
    #[serde(default = "default_file_key")]
    #[configurable(metadata(docs::examples = "path"))]
    pub file_key: OptionalValuePath,
    #[configurable(
        deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
    )]
    #[configurable(metadata(docs::hidden))]
    #[serde(default)]
    pub start_at_beginning: Option<bool>,
    #[serde(default)]
    pub ignore_checkpoints: Option<bool>,
    #[serde(default = "default_read_from")]
    #[configurable(derived)]
    pub read_from: ReadFromConfig,
    #[serde(alias = "ignore_older", default)]
    #[configurable(metadata(docs::type_unit = "seconds"))]
    #[configurable(metadata(docs::examples = 600))]
    #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
    pub ignore_older_secs: Option<u64>,
    #[serde(default = "default_max_line_bytes")]
    #[configurable(metadata(docs::type_unit = "bytes"))]
    pub max_line_bytes: usize,
    #[configurable(metadata(docs::examples = "hostname"))]
    pub host_key: Option<OptionalValuePath>,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
    #[configurable(metadata(docs::human_name = "Data Directory"))]
    pub data_dir: Option<PathBuf>,
    #[serde(default)]
    #[configurable(metadata(docs::examples = "offset"))]
    pub offset_key: Option<OptionalValuePath>,
    #[serde(
        alias = "glob_minimum_cooldown",
        default = "default_glob_minimum_cooldown_ms"
    )]
    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
    #[configurable(metadata(docs::type_unit = "milliseconds"))]
    #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
    pub glob_minimum_cooldown_ms: Duration,
    #[configurable(derived)]
    #[serde(alias = "fingerprinting", default)]
    fingerprint: FingerprintConfig,
    #[serde(default)]
    pub ignore_not_found: bool,
    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
    #[configurable(metadata(docs::hidden))]
    #[serde(default)]
    pub message_start_indicator: Option<String>,
    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
    #[configurable(metadata(docs::hidden))]
    #[serde(default = "default_multi_line_timeout")]
    pub multi_line_timeout: u64,
    #[configurable(derived)]
    #[serde(default)]
    pub multiline: Option<MultilineConfig>,
    #[serde(default = "default_max_read_bytes")]
    #[configurable(metadata(docs::type_unit = "bytes"))]
    pub max_read_bytes: usize,
    #[serde(default)]
    pub oldest_first: bool,
    #[serde(alias = "remove_after", default)]
    #[configurable(metadata(docs::type_unit = "seconds"))]
    #[configurable(metadata(docs::examples = 0))]
    #[configurable(metadata(docs::examples = 5))]
    #[configurable(metadata(docs::examples = 60))]
    #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
    pub remove_after_secs: Option<u64>,
    #[serde(default = "default_line_delimiter")]
    #[configurable(metadata(docs::examples = "\r\n"))]
    pub line_delimiter: String,
    #[configurable(derived)]
    #[serde(default)]
    pub encoding: Option<EncodingConfig>,
    #[configurable(derived)]
    #[serde(default, deserialize_with = "bool_or_struct")]
    acknowledgements: SourceAcknowledgementsConfig,
    #[configurable(metadata(docs::hidden))]
    #[serde(default)]
    log_namespace: Option<bool>,
    #[configurable(derived)]
    #[serde(default)]
    internal_metrics: FileInternalMetricsConfig,
    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
    #[configurable(metadata(docs::type_unit = "seconds"))]
    #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
    pub rotate_wait: Duration,
}
fn default_max_line_bytes() -> usize {
    bytesize::kib(100u64) as usize
}
fn default_file_key() -> OptionalValuePath {
    OptionalValuePath::from(owned_value_path!("file"))
}
const fn default_read_from() -> ReadFromConfig {
    ReadFromConfig::Beginning
}
const fn default_glob_minimum_cooldown_ms() -> Duration {
    Duration::from_millis(1000)
}
const fn default_multi_line_timeout() -> u64 {
    1000
} const fn default_max_read_bytes() -> usize {
    2048
}
fn default_line_delimiter() -> String {
    "\n".to_string()
}
const fn default_rotate_wait() -> Duration {
    Duration::from_secs(u64::MAX / 2)
}
#[configurable_component]
#[derive(Clone, Debug, PartialEq, Eq)]
#[serde(tag = "strategy", rename_all = "snake_case")]
#[configurable(metadata(
    docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
))]
pub enum FingerprintConfig {
    Checksum {
        #[serde(alias = "fingerprint_bytes")]
        #[configurable(metadata(docs::hidden))]
        #[configurable(metadata(docs::type_unit = "bytes"))]
        bytes: Option<usize>,
        #[serde(default = "default_ignored_header_bytes")]
        #[configurable(metadata(docs::type_unit = "bytes"))]
        ignored_header_bytes: usize,
        #[serde(default = "default_lines")]
        #[configurable(metadata(docs::type_unit = "lines"))]
        lines: usize,
    },
    #[serde(rename = "device_and_inode")]
    DevInode,
}
impl Default for FingerprintConfig {
    fn default() -> Self {
        Self::Checksum {
            bytes: None,
            ignored_header_bytes: 0,
            lines: default_lines(),
        }
    }
}
const fn default_ignored_header_bytes() -> usize {
    0
}
const fn default_lines() -> usize {
    1
}
impl From<FingerprintConfig> for FingerprintStrategy {
    fn from(config: FingerprintConfig) -> FingerprintStrategy {
        match config {
            FingerprintConfig::Checksum {
                bytes,
                ignored_header_bytes,
                lines,
            } => {
                let bytes = match bytes {
                    Some(bytes) => {
                        warn!(message = "The `fingerprint.bytes` option will be used to convert old file fingerprints created by vector < v0.11.0, but are not supported for new file fingerprints. The first line will be used instead.");
                        bytes
                    }
                    None => 256,
                };
                FingerprintStrategy::Checksum {
                    bytes,
                    ignored_header_bytes,
                    lines,
                }
            }
            FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
        }
    }
}
#[derive(Debug)]
pub(crate) struct FinalizerEntry {
    pub(crate) file_id: FileFingerprint,
    pub(crate) offset: u64,
}
impl Default for FileConfig {
    fn default() -> Self {
        Self {
            include: vec![PathBuf::from("/var/log/**/*.log")],
            exclude: vec![],
            file_key: default_file_key(),
            start_at_beginning: None,
            ignore_checkpoints: None,
            read_from: default_read_from(),
            ignore_older_secs: None,
            max_line_bytes: default_max_line_bytes(),
            fingerprint: FingerprintConfig::default(),
            ignore_not_found: false,
            host_key: None,
            offset_key: None,
            data_dir: None,
            glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
            message_start_indicator: None,
            multi_line_timeout: default_multi_line_timeout(), multiline: None,
            max_read_bytes: default_max_read_bytes(),
            oldest_first: false,
            remove_after_secs: None,
            line_delimiter: default_line_delimiter(),
            encoding: None,
            acknowledgements: Default::default(),
            log_namespace: None,
            internal_metrics: Default::default(),
            rotate_wait: default_rotate_wait(),
        }
    }
}
impl_generate_config_from_default!(FileConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "file")]
impl SourceConfig for FileConfig {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        let data_dir = cx
            .globals
            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
        #[allow(clippy::suspicious_else_formatting)]
        {
            if let Some(ref config) = self.multiline {
                let _: line_agg::Config = config.try_into()?;
            }
            if let Some(ref indicator) = self.message_start_indicator {
                Regex::new(indicator)
                    .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
            }
        }
        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
        let log_namespace = cx.log_namespace(self.log_namespace);
        Ok(file_source(
            self,
            data_dir,
            cx.shutdown,
            cx.out,
            acknowledgements,
            log_namespace,
        ))
    }
    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
        let host_key = self
            .host_key
            .clone()
            .unwrap_or(log_schema().host_key().cloned().into())
            .path
            .map(LegacyKey::Overwrite);
        let offset_key = self
            .offset_key
            .clone()
            .and_then(|k| k.path)
            .map(LegacyKey::Overwrite);
        let schema_definition = BytesDeserializerConfig
            .schema_definition(global_log_namespace.merge(self.log_namespace))
            .with_standard_vector_source_metadata()
            .with_source_metadata(
                Self::NAME,
                host_key,
                &owned_value_path!("host"),
                Kind::bytes().or_undefined(),
                Some("host"),
            )
            .with_source_metadata(
                Self::NAME,
                offset_key,
                &owned_value_path!("offset"),
                Kind::integer(),
                None,
            )
            .with_source_metadata(
                Self::NAME,
                file_key,
                &owned_value_path!("path"),
                Kind::bytes(),
                None,
            );
        vec![SourceOutput::new_maybe_logs(
            DataType::Log,
            schema_definition,
        )]
    }
    fn can_acknowledge(&self) -> bool {
        true
    }
}
pub fn file_source(
    config: &FileConfig,
    data_dir: PathBuf,
    shutdown: ShutdownSignal,
    mut out: SourceSender,
    acknowledgements: bool,
    log_namespace: LogNamespace,
) -> super::Source {
    if config.include.is_empty() {
        error!(message = "`include` configuration option must contain at least one file pattern.");
        return Box::pin(future::ready(Err(())));
    }
    let exclude_patterns = config
        .exclude
        .iter()
        .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
        .collect::<Vec<PathBuf>>();
    let ignore_before = calculate_ignore_before(config.ignore_older_secs);
    let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
    let (ignore_checkpoints, read_from) = reconcile_position_options(
        config.start_at_beginning,
        config.ignore_checkpoints,
        Some(config.read_from),
    );
    let emitter = FileSourceInternalEventsEmitter {
        include_file_metric_tag: config.internal_metrics.include_file_tag,
    };
    let paths_provider = Glob::new(
        &config.include,
        &exclude_patterns,
        MatchOptions::default(),
        emitter.clone(),
    )
    .expect("invalid glob patterns");
    let encoding_charset = config.encoding.clone().map(|e| e.charset);
    let line_delimiter_as_bytes = match encoding_charset {
        Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
        None => Bytes::from(config.line_delimiter.clone()),
    };
    let checkpointer = Checkpointer::new(&data_dir);
    let file_server = FileServer {
        paths_provider,
        max_read_bytes: config.max_read_bytes,
        ignore_checkpoints,
        read_from,
        ignore_before,
        max_line_bytes: config.max_line_bytes,
        line_delimiter: line_delimiter_as_bytes,
        data_dir,
        glob_minimum_cooldown,
        fingerprinter: Fingerprinter {
            strategy: config.fingerprint.clone().into(),
            max_line_length: config.max_line_bytes,
            ignore_not_found: config.ignore_not_found,
        },
        oldest_first: config.oldest_first,
        remove_after: config.remove_after_secs.map(Duration::from_secs),
        emitter,
        handle: tokio::runtime::Handle::current(),
        rotate_wait: config.rotate_wait,
    };
    let event_metadata = EventMetadata {
        host_key: config
            .host_key
            .clone()
            .unwrap_or(log_schema().host_key().cloned().into())
            .path,
        hostname: crate::get_hostname().ok(),
        file_key: config.file_key.clone().path,
        offset_key: config.offset_key.clone().and_then(|k| k.path),
    };
    let include = config.include.clone();
    let exclude = config.exclude.clone();
    let multiline_config = config.multiline.clone();
    let message_start_indicator = config.message_start_indicator.clone();
    let multi_line_timeout = config.multi_line_timeout;
    let (finalizer, shutdown_checkpointer) = if acknowledgements {
        let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
        let (send_shutdown, shutdown2) = oneshot::channel::<()>();
        let checkpoints = checkpointer.view();
        tokio::spawn(async move {
            while let Some((status, entry)) = ack_stream.next().await {
                if status == BatchStatus::Delivered {
                    checkpoints.update(entry.file_id, entry.offset);
                }
            }
            send_shutdown.send(())
        });
        (Some(finalizer), shutdown2.map(|_| ()).boxed())
    } else {
        (None, shutdown.clone().map(|_| ()).boxed())
    };
    let checkpoints = checkpointer.view();
    let include_file_metric_tag = config.internal_metrics.include_file_tag;
    Box::pin(async move {
        info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
        let mut encoding_decoder = encoding_charset.map(Decoder::new);
        let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
        let rx = rx
            .map(futures::stream::iter)
            .flatten()
            .map(move |mut line| {
                emit!(FileBytesReceived {
                    byte_size: line.text.len(),
                    file: &line.filename,
                    include_file_metric_tag,
                });
                line.text = match encoding_decoder.as_mut() {
                    Some(d) => d.decode_to_utf8(line.text),
                    None => line.text,
                };
                line
            });
        let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
            if let Some(ref multiline_config) = multiline_config {
                wrap_with_line_agg(
                    rx,
                    multiline_config.try_into().unwrap(), )
            } else if let Some(msi) = message_start_indicator {
                wrap_with_line_agg(
                    rx,
                    line_agg::Config::for_legacy(
                        Regex::new(&msi).unwrap(), multi_line_timeout,
                    ),
                )
            } else {
                Box::new(rx)
            };
        let span = Span::current();
        let mut messages = messages.map(move |line| {
            let mut event = create_event(
                line.text,
                line.start_offset,
                &line.filename,
                &event_metadata,
                log_namespace,
                include_file_metric_tag,
            );
            if let Some(finalizer) = &finalizer {
                let (batch, receiver) = BatchNotifier::new_with_receiver();
                event = event.with_batch_notifier(&batch);
                let entry = FinalizerEntry {
                    file_id: line.file_id,
                    offset: line.end_offset,
                };
                finalizer.add(entry, receiver);
            } else {
                checkpoints.update(line.file_id, line.end_offset);
            }
            event
        });
        tokio::spawn(async move {
            match out
                .send_event_stream(&mut messages)
                .instrument(span.or_current())
                .await
            {
                Ok(()) => {
                    debug!("Finished sending.");
                }
                Err(_) => {
                    let (count, _) = messages.size_hint();
                    emit!(StreamClosedError { count });
                }
            }
        });
        let span = info_span!("file_server");
        spawn_blocking(move || {
            let _enter = span.enter();
            let result = file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer);
            emit!(FileOpen { count: 0 });
            result.unwrap();
        })
        .map_err(|error| error!(message="File server unexpectedly stopped.", %error))
        .await
    })
}
fn reconcile_position_options(
    start_at_beginning: Option<bool>,
    ignore_checkpoints: Option<bool>,
    read_from: Option<ReadFromConfig>,
) -> (bool, ReadFrom) {
    if start_at_beginning.is_some() {
        warn!(message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead.")
    }
    match start_at_beginning {
        Some(true) => (
            ignore_checkpoints.unwrap_or(true),
            read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
        ),
        _ => (
            ignore_checkpoints.unwrap_or(false),
            read_from.map(Into::into).unwrap_or_default(),
        ),
    }
}
fn wrap_with_line_agg(
    rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
    config: line_agg::Config,
) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
    let logic = line_agg::Logic::new(config);
    Box::new(
        LineAgg::new(
            rx.map(|line| {
                (
                    line.filename,
                    line.text,
                    (line.file_id, line.start_offset, line.end_offset),
                )
            }),
            logic,
        )
        .map(
            |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
                text,
                filename,
                file_id,
                start_offset,
                end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
                    lastline_end_offset
                }),
            },
        ),
    )
}
struct EventMetadata {
    host_key: Option<OwnedValuePath>,
    hostname: Option<String>,
    file_key: Option<OwnedValuePath>,
    offset_key: Option<OwnedValuePath>,
}
fn create_event(
    line: Bytes,
    offset: u64,
    file: &str,
    meta: &EventMetadata,
    log_namespace: LogNamespace,
    include_file_metric_tag: bool,
) -> LogEvent {
    let deserializer = BytesDeserializer;
    let mut event = deserializer.parse_single(line, log_namespace);
    log_namespace.insert_vector_metadata(
        &mut event,
        log_schema().source_type_key(),
        path!("source_type"),
        Bytes::from_static(FileConfig::NAME.as_bytes()),
    );
    log_namespace.insert_vector_metadata(
        &mut event,
        log_schema().timestamp_key(),
        path!("ingest_timestamp"),
        Utc::now(),
    );
    let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
    if let Some(hostname) = &meta.hostname {
        log_namespace.insert_source_metadata(
            FileConfig::NAME,
            &mut event,
            legacy_host_key,
            path!("host"),
            hostname.clone(),
        );
    }
    let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
    log_namespace.insert_source_metadata(
        FileConfig::NAME,
        &mut event,
        legacy_offset_key,
        path!("offset"),
        offset,
    );
    let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
    log_namespace.insert_source_metadata(
        FileConfig::NAME,
        &mut event,
        legacy_file_key,
        path!("path"),
        file,
    );
    emit!(FileEventsReceived {
        count: 1,
        file,
        byte_size: event.estimated_json_encoded_size_of(),
        include_file_metric_tag,
    });
    event
}
#[cfg(test)]
mod tests {
    use std::{
        collections::HashSet,
        fs::{self, File},
        future::Future,
        io::{Seek, Write},
    };
    use encoding_rs::UTF_16LE;
    use similar_asserts::assert_eq;
    use tempfile::tempdir;
    use tokio::time::{sleep, timeout, Duration};
    use vector_lib::schema::Definition;
    use vrl::value::kind::Collection;
    use super::*;
    use crate::{
        config::Config,
        event::{Event, EventStatus, Value},
        shutdown::ShutdownSignal,
        sources::file,
        test_util::components::{assert_source_compliance, FILE_SOURCE_TAGS},
    };
    use vrl::value;
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<FileConfig>();
    }
    fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
        file::FileConfig {
            fingerprint: FingerprintConfig::Checksum {
                bytes: Some(8),
                ignored_header_bytes: 0,
                lines: 1,
            },
            data_dir: Some(dir.path().to_path_buf()),
            glob_minimum_cooldown_ms: Duration::from_millis(100),
            internal_metrics: FileInternalMetricsConfig {
                include_file_tag: true,
            },
            ..Default::default()
        }
    }
    async fn sleep_500_millis() {
        sleep(Duration::from_millis(500)).await;
    }
    #[test]
    fn parse_config() {
        let config: FileConfig = toml::from_str(
            r#"
            include = [ "/var/log/**/*.log" ]
            file_key = "file"
            glob_minimum_cooldown_ms = 1000
            multi_line_timeout = 1000
            max_read_bytes = 2048
            line_delimiter = "\n"
        "#,
        )
        .unwrap();
        assert_eq!(config, FileConfig::default());
        assert_eq!(
            config.fingerprint,
            FingerprintConfig::Checksum {
                bytes: None,
                ignored_header_bytes: 0,
                lines: 1
            }
        );
        let config: FileConfig = toml::from_str(
            r#"
        include = [ "/var/log/**/*.log" ]
        [fingerprint]
        strategy = "device_and_inode"
        "#,
        )
        .unwrap();
        assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
        let config: FileConfig = toml::from_str(
            r#"
        include = [ "/var/log/**/*.log" ]
        [fingerprint]
        strategy = "checksum"
        bytes = 128
        ignored_header_bytes = 512
        "#,
        )
        .unwrap();
        assert_eq!(
            config.fingerprint,
            FingerprintConfig::Checksum {
                bytes: Some(128),
                ignored_header_bytes: 512,
                lines: 1
            }
        );
        let config: FileConfig = toml::from_str(
            r#"
        include = [ "/var/log/**/*.log" ]
        [encoding]
        charset = "utf-16le"
        "#,
        )
        .unwrap();
        assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
        let config: FileConfig = toml::from_str(
            r#"
        include = [ "/var/log/**/*.log" ]
        read_from = "beginning"
        "#,
        )
        .unwrap();
        assert_eq!(config.read_from, ReadFromConfig::Beginning);
        let config: FileConfig = toml::from_str(
            r#"
        include = [ "/var/log/**/*.log" ]
        read_from = "end"
        "#,
        )
        .unwrap();
        assert_eq!(config.read_from, ReadFromConfig::End);
    }
    #[test]
    fn resolve_data_dir() {
        let global_dir = tempdir().unwrap();
        let local_dir = tempdir().unwrap();
        let mut config = Config::default();
        config.global.data_dir = global_dir.into_path().into();
        let res = config
            .global
            .resolve_and_validate_data_dir(test_default_file_config(&local_dir).data_dir.as_ref())
            .unwrap();
        assert_eq!(res, local_dir.path());
        let res = config.global.resolve_and_validate_data_dir(None).unwrap();
        assert_eq!(res, config.global.data_dir.unwrap());
    }
    #[test]
    fn output_schema_definition_vector_namespace() {
        let definitions = FileConfig::default()
            .outputs(LogNamespace::Vector)
            .remove(0)
            .schema_definition(true);
        assert_eq!(
            definitions,
            Some(
                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
                    .with_meaning(OwnedTargetPath::event_root(), "message")
                    .with_metadata_field(
                        &owned_value_path!("vector", "source_type"),
                        Kind::bytes(),
                        None
                    )
                    .with_metadata_field(
                        &owned_value_path!("vector", "ingest_timestamp"),
                        Kind::timestamp(),
                        None
                    )
                    .with_metadata_field(
                        &owned_value_path!("file", "host"),
                        Kind::bytes().or_undefined(),
                        Some("host")
                    )
                    .with_metadata_field(
                        &owned_value_path!("file", "offset"),
                        Kind::integer(),
                        None
                    )
                    .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
            )
        )
    }
    #[test]
    fn output_schema_definition_legacy_namespace() {
        let definitions = FileConfig::default()
            .outputs(LogNamespace::Legacy)
            .remove(0)
            .schema_definition(true);
        assert_eq!(
            definitions,
            Some(
                Definition::new_with_default_metadata(
                    Kind::object(Collection::empty()),
                    [LogNamespace::Legacy]
                )
                .with_event_field(
                    &owned_value_path!("message"),
                    Kind::bytes(),
                    Some("message")
                )
                .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
                .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
                .with_event_field(
                    &owned_value_path!("host"),
                    Kind::bytes().or_undefined(),
                    Some("host")
                )
                .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
                .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
            )
        )
    }
    #[test]
    fn create_event_legacy_namespace() {
        let line = Bytes::from("hello world");
        let file = "some_file.rs";
        let offset: u64 = 0;
        let meta = EventMetadata {
            host_key: Some(owned_value_path!("host")),
            hostname: Some("Some.Machine".to_string()),
            file_key: Some(owned_value_path!("file")),
            offset_key: Some(owned_value_path!("offset")),
        };
        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
        assert_eq!(log["file"], "some_file.rs".into());
        assert_eq!(log["host"], "Some.Machine".into());
        assert_eq!(log["offset"], 0.into());
        assert_eq!(*log.get_message().unwrap(), "hello world".into());
        assert_eq!(*log.get_source_type().unwrap(), "file".into());
        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
    }
    #[test]
    fn create_event_custom_fields_legacy_namespace() {
        let line = Bytes::from("hello world");
        let file = "some_file.rs";
        let offset: u64 = 0;
        let meta = EventMetadata {
            host_key: Some(owned_value_path!("hostname")),
            hostname: Some("Some.Machine".to_string()),
            file_key: Some(owned_value_path!("file_path")),
            offset_key: Some(owned_value_path!("off")),
        };
        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
        assert_eq!(log["file_path"], "some_file.rs".into());
        assert_eq!(log["hostname"], "Some.Machine".into());
        assert_eq!(log["off"], 0.into());
        assert_eq!(*log.get_message().unwrap(), "hello world".into());
        assert_eq!(*log.get_source_type().unwrap(), "file".into());
        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
    }
    #[test]
    fn create_event_vector_namespace() {
        let line = Bytes::from("hello world");
        let file = "some_file.rs";
        let offset: u64 = 0;
        let meta = EventMetadata {
            host_key: Some(owned_value_path!("ignored")),
            hostname: Some("Some.Machine".to_string()),
            file_key: Some(owned_value_path!("ignored")),
            offset_key: Some(owned_value_path!("ignored")),
        };
        let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
        assert_eq!(log.value(), &value!("hello world"));
        assert_eq!(
            log.metadata()
                .value()
                .get(path!("vector", "source_type"))
                .unwrap(),
            &value!("file")
        );
        assert!(log
            .metadata()
            .value()
            .get(path!("vector", "ingest_timestamp"))
            .unwrap()
            .is_timestamp());
        assert_eq!(
            log.metadata()
                .value()
                .get(path!(FileConfig::NAME, "host"))
                .unwrap(),
            &value!("Some.Machine")
        );
        assert_eq!(
            log.metadata()
                .value()
                .get(path!(FileConfig::NAME, "offset"))
                .unwrap(),
            &value!(0)
        );
        assert_eq!(
            log.metadata()
                .value()
                .get(path!(FileConfig::NAME, "path"))
                .unwrap(),
            &value!("some_file.rs")
        );
    }
    #[tokio::test]
    async fn file_happy_path() {
        let n = 5;
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            ..test_default_file_config(&dir)
        };
        let path1 = dir.path().join("file1");
        let path2 = dir.path().join("file2");
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            let mut file1 = File::create(&path1).unwrap();
            let mut file2 = File::create(&path2).unwrap();
            sleep_500_millis().await; for i in 0..n {
                writeln!(&mut file1, "hello {}", i).unwrap();
                writeln!(&mut file2, "goodbye {}", i).unwrap();
            }
            sleep_500_millis().await;
        })
        .await;
        let mut hello_i = 0;
        let mut goodbye_i = 0;
        for event in received {
            let line =
                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
            if line.starts_with("hello") {
                assert_eq!(line, format!("hello {}", hello_i));
                assert_eq!(
                    event.as_log()["file"].to_string_lossy(),
                    path1.to_str().unwrap()
                );
                hello_i += 1;
            } else {
                assert_eq!(line, format!("goodbye {}", goodbye_i));
                assert_eq!(
                    event.as_log()["file"].to_string_lossy(),
                    path2.to_str().unwrap()
                );
                goodbye_i += 1;
            }
        }
        assert_eq!(hello_i, n);
        assert_eq!(goodbye_i, n);
    }
    #[tokio::test]
    async fn file_read_empty_lines() {
        let n = 5;
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            let mut file = File::create(&path).unwrap();
            sleep_500_millis().await; writeln!(&mut file, "line for checkpointing").unwrap();
            for _i in 0..n {
                writeln!(&mut file).unwrap();
            }
            sleep_500_millis().await;
        })
        .await;
        assert_eq!(received.len(), n + 1);
    }
    #[tokio::test]
    async fn file_truncate() {
        let n = 5;
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            let mut file = File::create(&path).unwrap();
            sleep_500_millis().await; for i in 0..n {
                writeln!(&mut file, "pretrunc {}", i).unwrap();
            }
            sleep_500_millis().await; file.set_len(0).unwrap();
            file.seek(std::io::SeekFrom::Start(0)).unwrap();
            sleep_500_millis().await; for i in 0..n {
                writeln!(&mut file, "posttrunc {}", i).unwrap();
            }
            sleep_500_millis().await;
        })
        .await;
        let mut i = 0;
        let mut pre_trunc = true;
        for event in received {
            assert_eq!(
                event.as_log()["file"].to_string_lossy(),
                path.to_str().unwrap()
            );
            let line =
                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
            if pre_trunc {
                assert_eq!(line, format!("pretrunc {}", i));
            } else {
                assert_eq!(line, format!("posttrunc {}", i));
            }
            i += 1;
            if i == n {
                i = 0;
                pre_trunc = false;
            }
        }
    }
    #[tokio::test]
    async fn file_rotate() {
        let n = 5;
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let archive_path = dir.path().join("file");
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            let mut file = File::create(&path).unwrap();
            sleep_500_millis().await; for i in 0..n {
                writeln!(&mut file, "prerot {}", i).unwrap();
            }
            sleep_500_millis().await; fs::rename(&path, archive_path).expect("could not rename");
            let mut file = File::create(&path).unwrap();
            sleep_500_millis().await; for i in 0..n {
                writeln!(&mut file, "postrot {}", i).unwrap();
            }
            sleep_500_millis().await;
        })
        .await;
        let mut i = 0;
        let mut pre_rot = true;
        for event in received {
            assert_eq!(
                event.as_log()["file"].to_string_lossy(),
                path.to_str().unwrap()
            );
            let line =
                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
            if pre_rot {
                assert_eq!(line, format!("prerot {}", i));
            } else {
                assert_eq!(line, format!("postrot {}", i));
            }
            i += 1;
            if i == n {
                i = 0;
                pre_rot = false;
            }
        }
    }
    #[tokio::test]
    async fn file_multiple_paths() {
        let n = 5;
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
            exclude: vec![dir.path().join("a.*.txt")],
            ..test_default_file_config(&dir)
        };
        let path1 = dir.path().join("a.txt");
        let path2 = dir.path().join("b.txt");
        let path3 = dir.path().join("a.log");
        let path4 = dir.path().join("a.ignore.txt");
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            let mut file1 = File::create(&path1).unwrap();
            let mut file2 = File::create(&path2).unwrap();
            let mut file3 = File::create(&path3).unwrap();
            let mut file4 = File::create(&path4).unwrap();
            sleep_500_millis().await; for i in 0..n {
                writeln!(&mut file1, "1 {}", i).unwrap();
                writeln!(&mut file2, "2 {}", i).unwrap();
                writeln!(&mut file3, "3 {}", i).unwrap();
                writeln!(&mut file4, "4 {}", i).unwrap();
            }
            sleep_500_millis().await;
        })
        .await;
        let mut is = [0; 3];
        for event in received {
            let line =
                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
            let mut split = line.split(' ');
            let file = split.next().unwrap().parse::<usize>().unwrap();
            assert_ne!(file, 4);
            let i = split.next().unwrap().parse::<usize>().unwrap();
            assert_eq!(is[file - 1], i);
            is[file - 1] += 1;
        }
        assert_eq!(is, [n as usize; 3]);
    }
    #[tokio::test]
    async fn file_exclude_paths() {
        let n = 5;
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("a//b/*.log.*")],
            exclude: vec![dir.path().join("a//b/test.log.*")],
            ..test_default_file_config(&dir)
        };
        let path1 = dir.path().join("a//b/a.log.1");
        let path2 = dir.path().join("a//b/test.log.1");
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
            let mut file1 = File::create(&path1).unwrap();
            let mut file2 = File::create(&path2).unwrap();
            sleep_500_millis().await; for i in 0..n {
                writeln!(&mut file1, "1 {}", i).unwrap();
                writeln!(&mut file2, "2 {}", i).unwrap();
            }
            sleep_500_millis().await;
        })
        .await;
        let mut is = [0; 1];
        for event in received {
            let line =
                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
            let mut split = line.split(' ');
            let file = split.next().unwrap().parse::<usize>().unwrap();
            assert_ne!(file, 4);
            let i = split.next().unwrap().parse::<usize>().unwrap();
            assert_eq!(is[file - 1], i);
            is[file - 1] += 1;
        }
        assert_eq!(is, [n as usize; 1]);
    }
    #[tokio::test]
    async fn file_key_acknowledged() {
        file_key(Acks).await
    }
    #[tokio::test]
    async fn file_key_no_acknowledge() {
        file_key(NoAcks).await
    }
    async fn file_key(acks: AckingMode) {
        {
            let dir = tempdir().unwrap();
            let config = file::FileConfig {
                include: vec![dir.path().join("*")],
                ..test_default_file_config(&dir)
            };
            let path = dir.path().join("file");
            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
                let mut file = File::create(&path).unwrap();
                sleep_500_millis().await;
                writeln!(&mut file, "hello there").unwrap();
                sleep_500_millis().await;
            })
            .await;
            assert_eq!(received.len(), 1);
            assert_eq!(
                received[0].as_log()["file"].to_string_lossy(),
                path.to_str().unwrap()
            );
        }
        {
            let dir = tempdir().unwrap();
            let config = file::FileConfig {
                include: vec![dir.path().join("*")],
                file_key: OptionalValuePath::from(owned_value_path!("source")),
                ..test_default_file_config(&dir)
            };
            let path = dir.path().join("file");
            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
                let mut file = File::create(&path).unwrap();
                sleep_500_millis().await;
                writeln!(&mut file, "hello there").unwrap();
                sleep_500_millis().await;
            })
            .await;
            assert_eq!(received.len(), 1);
            assert_eq!(
                received[0].as_log()["source"].to_string_lossy(),
                path.to_str().unwrap()
            );
        }
        {
            let dir = tempdir().unwrap();
            let config = file::FileConfig {
                include: vec![dir.path().join("*")],
                ..test_default_file_config(&dir)
            };
            let path = dir.path().join("file");
            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
                let mut file = File::create(&path).unwrap();
                sleep_500_millis().await;
                writeln!(&mut file, "hello there").unwrap();
                sleep_500_millis().await;
            })
            .await;
            assert_eq!(received.len(), 1);
            assert_eq!(
                received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
                vec![
                    default_file_key()
                        .path
                        .expect("file key to exist")
                        .to_string()
                        .into(),
                    log_schema().host_key().unwrap().to_string().into(),
                    log_schema().message_key().unwrap().to_string().into(),
                    log_schema().timestamp_key().unwrap().to_string().into(),
                    log_schema().source_type_key().unwrap().to_string().into()
                ]
                .into_iter()
                .collect::<HashSet<_>>()
            );
        }
    }
    #[cfg(target_os = "linux")] #[tokio::test]
    async fn file_start_position_server_restart_acknowledged() {
        file_start_position_server_restart(Acks).await
    }
    #[cfg(target_os = "linux")] #[tokio::test]
    async fn file_start_position_server_restart_no_acknowledge() {
        file_start_position_server_restart(NoAcks).await
    }
    #[cfg(target_os = "linux")] async fn file_start_position_server_restart(acking: AckingMode) {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let mut file = File::create(&path).unwrap();
        writeln!(&mut file, "zeroth line").unwrap();
        sleep_500_millis().await;
        {
            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
                sleep_500_millis().await;
                writeln!(&mut file, "first line").unwrap();
                sleep_500_millis().await;
            })
            .await;
            let lines = extract_messages_string(received);
            assert_eq!(lines, vec!["zeroth line", "first line"]);
        }
        {
            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
                sleep_500_millis().await;
                writeln!(&mut file, "second line").unwrap();
                sleep_500_millis().await;
            })
            .await;
            let lines = extract_messages_string(received);
            assert_eq!(lines, vec!["second line"]);
        }
        {
            let config = file::FileConfig {
                include: vec![dir.path().join("*")],
                ignore_checkpoints: Some(true),
                read_from: ReadFromConfig::Beginning,
                ..test_default_file_config(&dir)
            };
            let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
                sleep_500_millis().await;
                writeln!(&mut file, "third line").unwrap();
                sleep_500_millis().await;
            })
            .await;
            let lines = extract_messages_string(received);
            assert_eq!(
                lines,
                vec!["zeroth line", "first line", "second line", "third line"]
            );
        }
    }
    #[tokio::test]
    async fn file_start_position_server_restart_unfinalized() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let mut file = File::create(&path).unwrap();
        writeln!(&mut file, "the line").unwrap();
        sleep_500_millis().await;
        let received = run_file_source(
            &config,
            false,
            Unfinalized,
            LogNamespace::Legacy,
            sleep_500_millis(),
        )
        .await;
        let lines = extract_messages_string(received);
        assert_eq!(lines, vec!["the line"]);
        let received = run_file_source(
            &config,
            false,
            Unfinalized,
            LogNamespace::Legacy,
            sleep_500_millis(),
        )
        .await;
        let lines = extract_messages_string(received);
        assert_eq!(lines, vec!["the line"]);
    }
    #[tokio::test]
    async fn file_duplicate_processing_after_restart() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let mut file = File::create(&path).unwrap();
        let line_count = 4000;
        for i in 0..line_count {
            writeln!(&mut file, "Here's a line for you: {}", i).unwrap();
        }
        sleep_500_millis().await;
        let received = run_file_source(
            &config,
            true,
            Acks,
            LogNamespace::Legacy,
            sleep_500_millis(),
        )
        .await;
        let lines = extract_messages_string(received);
        assert!(lines.len() < line_count);
        let received = run_file_source(
            &config,
            true,
            Acks,
            LogNamespace::Legacy,
            sleep(Duration::from_secs(5)),
        )
        .await;
        let lines2 = extract_messages_string(received);
        assert_eq!(lines.len() + lines2.len(), line_count);
    }
    #[tokio::test]
    async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
        file_start_position_server_restart_with_file_rotation(Acks).await
    }
    #[tokio::test]
    async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
        file_start_position_server_restart_with_file_rotation(NoAcks).await
    }
    async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let path_for_old_file = dir.path().join("file.old");
        {
            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
                let mut file = File::create(&path).unwrap();
                sleep_500_millis().await;
                writeln!(&mut file, "first line").unwrap();
                sleep_500_millis().await;
            })
            .await;
            let lines = extract_messages_string(received);
            assert_eq!(lines, vec!["first line"]);
        }
        fs::rename(&path, &path_for_old_file).expect("could not rename");
        {
            let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
                let mut file = File::create(&path).unwrap();
                sleep_500_millis().await;
                writeln!(&mut file, "second line").unwrap();
                sleep_500_millis().await;
            })
            .await;
            let lines = extract_messages_string(received);
            assert_eq!(lines, vec!["second line"]);
        }
    }
    #[cfg(unix)] #[tokio::test]
    async fn file_start_position_ignore_old_files() {
        use std::{
            os::unix::io::AsRawFd,
            time::{Duration, SystemTime},
        };
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            ignore_older_secs: Some(5),
            ..test_default_file_config(&dir)
        };
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            let before_path = dir.path().join("before");
            let mut before_file = File::create(&before_path).unwrap();
            let after_path = dir.path().join("after");
            let mut after_file = File::create(&after_path).unwrap();
            writeln!(&mut before_file, "first line").unwrap(); writeln!(&mut after_file, "_first line").unwrap(); {
                let before = SystemTime::now() - Duration::from_secs(8);
                let after = SystemTime::now() - Duration::from_secs(2);
                let before_time = libc::timeval {
                    tv_sec: before
                        .duration_since(SystemTime::UNIX_EPOCH)
                        .unwrap()
                        .as_secs() as _,
                    tv_usec: 0,
                };
                let before_times = [before_time, before_time];
                let after_time = libc::timeval {
                    tv_sec: after
                        .duration_since(SystemTime::UNIX_EPOCH)
                        .unwrap()
                        .as_secs() as _,
                    tv_usec: 0,
                };
                let after_times = [after_time, after_time];
                unsafe {
                    libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
                    libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
                }
            }
            sleep_500_millis().await;
            writeln!(&mut before_file, "second line").unwrap();
            writeln!(&mut after_file, "_second line").unwrap();
            sleep_500_millis().await;
        })
        .await;
        let before_lines = received
            .iter()
            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
            .map(|event| {
                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
            })
            .collect::<Vec<_>>();
        let after_lines = received
            .iter()
            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
            .map(|event| {
                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
            })
            .collect::<Vec<_>>();
        assert_eq!(before_lines, vec!["second line"]);
        assert_eq!(after_lines, vec!["_first line", "_second line"]);
    }
    #[tokio::test]
    async fn file_max_line_bytes() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            max_line_bytes: 10,
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            let mut file = File::create(&path).unwrap();
            sleep_500_millis().await; writeln!(&mut file, "short").unwrap();
            writeln!(&mut file, "this is too long").unwrap();
            writeln!(&mut file, "11 eleven11").unwrap();
            let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
            writeln!(&mut file, "{}", super_long).unwrap();
            writeln!(&mut file, "exactly 10").unwrap();
            writeln!(&mut file, "it can end on a line that's too long").unwrap();
            sleep_500_millis().await;
            sleep_500_millis().await;
            writeln!(&mut file, "and then continue").unwrap();
            writeln!(&mut file, "last short").unwrap();
            sleep_500_millis().await;
            sleep_500_millis().await;
        }).await;
        let received = extract_messages_value(received);
        assert_eq!(
            received,
            vec!["short".into(), "exactly 10".into(), "last short".into()]
        );
    }
    #[tokio::test]
    async fn test_multi_line_aggregation_legacy() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            message_start_indicator: Some("INFO".into()),
            multi_line_timeout: 25, ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            let mut file = File::create(&path).unwrap();
            sleep_500_millis().await; writeln!(&mut file, "leftover foo").unwrap();
            writeln!(&mut file, "INFO hello").unwrap();
            writeln!(&mut file, "INFO goodbye").unwrap();
            writeln!(&mut file, "part of goodbye").unwrap();
            sleep_500_millis().await;
            writeln!(&mut file, "INFO hi again").unwrap();
            writeln!(&mut file, "and some more").unwrap();
            writeln!(&mut file, "INFO hello").unwrap();
            sleep_500_millis().await;
            writeln!(&mut file, "too slow").unwrap();
            writeln!(&mut file, "INFO doesn't have").unwrap();
            writeln!(&mut file, "to be INFO in").unwrap();
            writeln!(&mut file, "the middle").unwrap();
            sleep_500_millis().await;
        })
        .await;
        let received = extract_messages_value(received);
        assert_eq!(
            received,
            vec![
                "leftover foo".into(),
                "INFO hello".into(),
                "INFO goodbye\npart of goodbye".into(),
                "INFO hi again\nand some more".into(),
                "INFO hello".into(),
                "too slow".into(),
                "INFO doesn't have".into(),
                "to be INFO in\nthe middle".into(),
            ]
        );
    }
    #[tokio::test]
    async fn test_multi_line_aggregation() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            multiline: Some(MultilineConfig {
                start_pattern: "INFO".to_owned(),
                condition_pattern: "INFO".to_owned(),
                mode: line_agg::Mode::HaltBefore,
                timeout_ms: Duration::from_millis(25), }),
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            let mut file = File::create(&path).unwrap();
            sleep_500_millis().await; writeln!(&mut file, "leftover foo").unwrap();
            writeln!(&mut file, "INFO hello").unwrap();
            writeln!(&mut file, "INFO goodbye").unwrap();
            writeln!(&mut file, "part of goodbye").unwrap();
            sleep_500_millis().await;
            writeln!(&mut file, "INFO hi again").unwrap();
            writeln!(&mut file, "and some more").unwrap();
            writeln!(&mut file, "INFO hello").unwrap();
            sleep_500_millis().await;
            writeln!(&mut file, "too slow").unwrap();
            writeln!(&mut file, "INFO doesn't have").unwrap();
            writeln!(&mut file, "to be INFO in").unwrap();
            writeln!(&mut file, "the middle").unwrap();
            sleep_500_millis().await;
        })
        .await;
        let received = extract_messages_value(received);
        assert_eq!(
            received,
            vec![
                "leftover foo".into(),
                "INFO hello".into(),
                "INFO goodbye\npart of goodbye".into(),
                "INFO hi again\nand some more".into(),
                "INFO hello".into(),
                "too slow".into(),
                "INFO doesn't have".into(),
                "to be INFO in\nthe middle".into(),
            ]
        );
    }
    #[tokio::test]
    async fn test_multi_line_checkpointing() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
            multiline: Some(MultilineConfig {
                start_pattern: "INFO".to_owned(),
                condition_pattern: "INFO".to_owned(),
                mode: line_agg::Mode::HaltBefore,
                timeout_ms: Duration::from_millis(25), }),
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let mut file = File::create(&path).unwrap();
        writeln!(&mut file, "INFO hello").unwrap();
        writeln!(&mut file, "part of hello").unwrap();
        let received = run_file_source(
            &config,
            false,
            Acks,
            LogNamespace::Legacy,
            sleep_500_millis(),
        )
        .await;
        assert_eq!(received[0].as_log()["offset"], 0.into());
        let lines = extract_messages_string(received);
        assert_eq!(lines, vec!["INFO hello\npart of hello"]);
        let received_after_restart =
            run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
                writeln!(&mut file, "INFO goodbye").unwrap();
            })
            .await;
        assert_eq!(
            received_after_restart[0].as_log()["offset"],
            (lines[0].len() + 1).into()
        );
        let lines = extract_messages_string(received_after_restart);
        assert_eq!(lines, vec!["INFO goodbye"]);
    }
    #[tokio::test]
    async fn test_fair_reads() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            max_read_bytes: 1,
            oldest_first: false,
            ..test_default_file_config(&dir)
        };
        let older_path = dir.path().join("z_older_file");
        let mut older = File::create(&older_path).unwrap();
        sleep_500_millis().await;
        let newer_path = dir.path().join("a_newer_file");
        let mut newer = File::create(&newer_path).unwrap();
        writeln!(&mut older, "hello i am the old file").unwrap();
        writeln!(&mut older, "i have been around a while").unwrap();
        writeln!(&mut older, "you can read newer files at the same time").unwrap();
        writeln!(&mut newer, "and i am the new file").unwrap();
        writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
        writeln!(&mut newer, "which is fine because we want fairness").unwrap();
        sleep_500_millis().await;
        let received = run_file_source(
            &config,
            false,
            NoAcks,
            LogNamespace::Legacy,
            sleep_500_millis(),
        )
        .await;
        let received = extract_messages_value(received);
        assert_eq!(
            received,
            vec![
                "hello i am the old file".into(),
                "and i am the new file".into(),
                "i have been around a while".into(),
                "this should be interleaved with the old one".into(),
                "you can read newer files at the same time".into(),
                "which is fine because we want fairness".into(),
            ]
        );
    }
    #[tokio::test]
    async fn test_oldest_first() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            max_read_bytes: 1,
            oldest_first: true,
            ..test_default_file_config(&dir)
        };
        let older_path = dir.path().join("z_older_file");
        let mut older = File::create(&older_path).unwrap();
        sleep_500_millis().await;
        let newer_path = dir.path().join("a_newer_file");
        let mut newer = File::create(&newer_path).unwrap();
        writeln!(&mut older, "hello i am the old file").unwrap();
        writeln!(&mut older, "i have been around a while").unwrap();
        writeln!(&mut older, "you should definitely read all of me first").unwrap();
        writeln!(&mut newer, "i'm new").unwrap();
        writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
        writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
        sleep_500_millis().await;
        let received = run_file_source(
            &config,
            false,
            NoAcks,
            LogNamespace::Legacy,
            sleep_500_millis(),
        )
        .await;
        let received = extract_messages_value(received);
        assert_eq!(
            received,
            vec![
                "hello i am the old file".into(),
                "i have been around a while".into(),
                "you should definitely read all of me first".into(),
                "i'm new".into(),
                "hopefully you read all the old stuff first".into(),
                "because otherwise i'm not going to make sense".into(),
            ]
        );
    }
    #[cfg(not(target_os = "macos"))]
    #[tokio::test]
    async fn test_split_reads() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            max_read_bytes: 1,
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let mut file = File::create(&path).unwrap();
        writeln!(&mut file, "hello i am a normal line").unwrap();
        sleep_500_millis().await;
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            sleep_500_millis().await;
            write!(&mut file, "i am not a full line").unwrap();
            sleep_500_millis().await;
            writeln!(&mut file, " until now").unwrap();
            sleep_500_millis().await;
        })
        .await;
        let received = extract_messages_value(received);
        assert_eq!(
            received,
            vec![
                "hello i am a normal line".into(),
                "i am not a full line until now".into(),
            ]
        );
    }
    #[tokio::test]
    async fn test_gzipped_file() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![PathBuf::from("tests/data/gzipped.log")],
            max_line_bytes: 100,
            ..test_default_file_config(&dir)
        };
        let received = run_file_source(
            &config,
            false,
            NoAcks,
            LogNamespace::Legacy,
            sleep_500_millis(),
        )
        .await;
        let received = extract_messages_value(received);
        assert_eq!(
            received,
            vec![
                "this is a simple file".into(),
                "i have been compressed".into(),
                "in order to make me smaller".into(),
                "but you can still read me".into(),
                "hooray".into(),
            ]
        );
    }
    #[tokio::test]
    async fn test_non_utf8_encoded_file() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![PathBuf::from("tests/data/utf-16le.log")],
            encoding: Some(EncodingConfig { charset: UTF_16LE }),
            ..test_default_file_config(&dir)
        };
        let received = run_file_source(
            &config,
            false,
            NoAcks,
            LogNamespace::Legacy,
            sleep_500_millis(),
        )
        .await;
        let received = extract_messages_value(received);
        assert_eq!(
            received,
            vec![
                "hello i am a file".into(),
                "i can unicode".into(),
                "but i do so in 16 bits".into(),
                "and when i byte".into(),
                "i become little-endian".into(),
            ]
        );
    }
    #[tokio::test]
    async fn test_non_default_line_delimiter() {
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            line_delimiter: "\r\n".to_string(),
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
            let mut file = File::create(&path).unwrap();
            sleep_500_millis().await; write!(&mut file, "hello i am a line\r\n").unwrap();
            write!(&mut file, "and i am too\r\n").unwrap();
            write!(&mut file, "CRLF is how we end\r\n").unwrap();
            write!(&mut file, "please treat us well\r\n").unwrap();
            sleep_500_millis().await;
        })
        .await;
        let received = extract_messages_value(received);
        assert_eq!(
            received,
            vec![
                "hello i am a line".into(),
                "and i am too".into(),
                "CRLF is how we end".into(),
                "please treat us well".into()
            ]
        );
    }
    #[tokio::test]
    async fn remove_file() {
        let n = 5;
        let remove_after_secs = 1;
        let dir = tempdir().unwrap();
        let config = file::FileConfig {
            include: vec![dir.path().join("*")],
            remove_after_secs: Some(remove_after_secs),
            ..test_default_file_config(&dir)
        };
        let path = dir.path().join("file");
        let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
            let mut file = File::create(&path).unwrap();
            sleep_500_millis().await; for i in 0..n {
                writeln!(&mut file, "{}", i).unwrap();
            }
            drop(file);
            for _ in 0..10 {
                sleep(Duration::from_secs(remove_after_secs + 1)).await;
                if File::open(&path).is_err() {
                    break;
                }
            }
        })
        .await;
        assert_eq!(received.len(), n);
        match File::open(&path) {
            Ok(_) => panic!("File wasn't removed"),
            Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
        }
    }
    #[derive(Clone, Copy, Eq, PartialEq)]
    enum AckingMode {
        NoAcks,      Unfinalized, Acks,        }
    use vector_lib::lookup::OwnedTargetPath;
    use AckingMode::*;
    async fn run_file_source(
        config: &FileConfig,
        wait_shutdown: bool,
        acking_mode: AckingMode,
        log_namespace: LogNamespace,
        inner: impl Future<Output = ()>,
    ) -> Vec<Event> {
        assert_source_compliance(&FILE_SOURCE_TAGS, async move {
            let (tx, rx) = if acking_mode == Acks {
                let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
                (tx, rx.boxed())
            } else {
                let (tx, rx) = SourceSender::new_test();
                (tx, rx.boxed())
            };
            let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
            let data_dir = config.data_dir.clone().unwrap();
            let acks = !matches!(acking_mode, NoAcks);
            tokio::spawn(file::file_source(
                config,
                data_dir,
                shutdown,
                tx,
                acks,
                log_namespace,
            ));
            inner.await;
            drop(trigger_shutdown);
            let result = if acking_mode == Unfinalized {
                rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
                    .collect::<Vec<_>>()
                    .await
            } else {
                timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
                    .await
                    .expect(
                        "Unclosed channel: may indicate file-server could not shutdown gracefully.",
                    )
            };
            if wait_shutdown {
                shutdown_done.await;
            }
            result
        })
        .await
    }
    fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
        received
            .into_iter()
            .map(Event::into_log)
            .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
            .collect()
    }
    fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
        received
            .into_iter()
            .map(Event::into_log)
            .map(|log| log.get_message().unwrap().clone())
            .collect()
    }
}