use std::{
    collections::HashMap,
    io::{Error, ErrorKind},
    path::PathBuf,
    process::ExitStatus,
};
use chrono::Utc;
use futures::StreamExt;
use smallvec::SmallVec;
use snafu::Snafu;
use tokio::{
    io::{AsyncRead, BufReader},
    process::Command,
    sync::mpsc::{channel, Sender},
    time::{self, sleep, Duration, Instant},
};
use tokio_stream::wrappers::IntervalStream;
use tokio_util::codec::FramedRead;
use vector_lib::codecs::{
    decoding::{DeserializerConfig, FramingConfig},
    StreamDecodingError,
};
use vector_lib::configurable::configurable_component;
use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
use vector_lib::{config::LegacyKey, EstimatedJsonEncodedSizeOf};
use vrl::path::OwnedValuePath;
use vrl::value::Kind;
use crate::{
    codecs::{Decoder, DecodingConfig},
    config::{SourceConfig, SourceContext, SourceOutput},
    event::Event,
    internal_events::{
        ExecChannelClosedError, ExecCommandExecuted, ExecEventsReceived, ExecFailedError,
        ExecFailedToSignalChild, ExecFailedToSignalChildError, ExecTimeoutError, StreamClosedError,
    },
    serde::default_decoding,
    shutdown::ShutdownSignal,
    SourceSender,
};
use vector_lib::config::{log_schema, LogNamespace};
use vector_lib::lookup::{owned_value_path, path};
#[cfg(test)]
mod tests;
#[configurable_component(source("exec", "Collect output from a process running on the host."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct ExecConfig {
    #[configurable(derived)]
    pub mode: Mode,
    #[configurable(derived)]
    pub scheduled: Option<ScheduledConfig>,
    #[configurable(derived)]
    pub streaming: Option<StreamingConfig>,
    #[configurable(metadata(docs::examples = "echo", docs::examples = "Hello World!"))]
    pub command: Vec<String>,
    #[serde(default)]
    #[configurable(metadata(docs::additional_props_description = "An environment variable."))]
    #[configurable(metadata(docs::examples = "environment_examples()"))]
    pub environment: Option<HashMap<String, String>>,
    #[serde(default = "default_clear_environment")]
    pub clear_environment: bool,
    pub working_directory: Option<PathBuf>,
    #[serde(default = "default_include_stderr")]
    pub include_stderr: bool,
    #[serde(default = "default_maximum_buffer_size")]
    pub maximum_buffer_size_bytes: usize,
    #[configurable(derived)]
    framing: Option<FramingConfig>,
    #[configurable(derived)]
    #[serde(default = "default_decoding")]
    decoding: DeserializerConfig,
    #[configurable(metadata(docs::hidden))]
    #[serde(default)]
    log_namespace: Option<bool>,
}
#[configurable_component]
#[derive(Clone, Copy, Debug)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub enum Mode {
    Scheduled,
    Streaming,
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct ScheduledConfig {
    #[serde(default = "default_exec_interval_secs")]
    exec_interval_secs: u64,
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct StreamingConfig {
    #[serde(default = "default_respawn_on_exit")]
    respawn_on_exit: bool,
    #[serde(default = "default_respawn_interval_secs")]
    #[configurable(metadata(docs::human_name = "Respawn Interval"))]
    respawn_interval_secs: u64,
}
#[derive(Debug, PartialEq, Eq, Snafu)]
pub enum ExecConfigError {
    #[snafu(display("A non-empty list for command must be provided"))]
    CommandEmpty,
    #[snafu(display("The maximum buffer size must be greater than zero"))]
    ZeroBuffer,
}
impl Default for ExecConfig {
    fn default() -> Self {
        ExecConfig {
            mode: Mode::Scheduled,
            scheduled: Some(ScheduledConfig {
                exec_interval_secs: default_exec_interval_secs(),
            }),
            streaming: None,
            command: vec!["echo".to_owned(), "Hello World!".to_owned()],
            environment: None,
            clear_environment: default_clear_environment(),
            working_directory: None,
            include_stderr: default_include_stderr(),
            maximum_buffer_size_bytes: default_maximum_buffer_size(),
            framing: None,
            decoding: default_decoding(),
            log_namespace: None,
        }
    }
}
const fn default_maximum_buffer_size() -> usize {
    1000000
}
const fn default_exec_interval_secs() -> u64 {
    60
}
const fn default_respawn_interval_secs() -> u64 {
    5
}
const fn default_respawn_on_exit() -> bool {
    true
}
const fn default_clear_environment() -> bool {
    false
}
const fn default_include_stderr() -> bool {
    true
}
fn environment_examples() -> HashMap<String, String> {
    HashMap::<_, _>::from_iter([
        ("LANG".to_owned(), "es_ES.UTF-8".to_owned()),
        ("TZ".to_owned(), "Etc/UTC".to_owned()),
        ("PATH".to_owned(), "/bin:/usr/bin:/usr/local/bin".to_owned()),
    ])
}
fn get_hostname() -> Option<String> {
    crate::get_hostname().ok()
}
const STDOUT: &str = "stdout";
const STDERR: &str = "stderr";
const STREAM_KEY: &str = "stream";
const PID_KEY: &str = "pid";
const COMMAND_KEY: &str = "command";
impl_generate_config_from_default!(ExecConfig);
impl ExecConfig {
    fn validate(&self) -> Result<(), ExecConfigError> {
        if self.command.is_empty() {
            Err(ExecConfigError::CommandEmpty)
        } else if self.maximum_buffer_size_bytes == 0 {
            Err(ExecConfigError::ZeroBuffer)
        } else {
            Ok(())
        }
    }
    fn command_line(&self) -> String {
        self.command.join(" ")
    }
    const fn exec_interval_secs_or_default(&self) -> u64 {
        match &self.scheduled {
            None => default_exec_interval_secs(),
            Some(config) => config.exec_interval_secs,
        }
    }
    const fn respawn_on_exit_or_default(&self) -> bool {
        match &self.streaming {
            None => default_respawn_on_exit(),
            Some(config) => config.respawn_on_exit,
        }
    }
    const fn respawn_interval_secs_or_default(&self) -> u64 {
        match &self.streaming {
            None => default_respawn_interval_secs(),
            Some(config) => config.respawn_interval_secs,
        }
    }
}
#[async_trait::async_trait]
#[typetag::serde(name = "exec")]
impl SourceConfig for ExecConfig {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        self.validate()?;
        let hostname = get_hostname();
        let log_namespace = cx.log_namespace(self.log_namespace);
        let framing = self
            .framing
            .clone()
            .unwrap_or_else(|| self.decoding.default_stream_framing());
        let decoder = DecodingConfig::new(framing, self.decoding.clone(), log_namespace).build()?;
        match &self.mode {
            Mode::Scheduled => {
                let exec_interval_secs = self.exec_interval_secs_or_default();
                Ok(Box::pin(run_scheduled(
                    self.clone(),
                    hostname,
                    exec_interval_secs,
                    decoder,
                    cx.shutdown,
                    cx.out,
                    log_namespace,
                )))
            }
            Mode::Streaming => {
                let respawn_on_exit = self.respawn_on_exit_or_default();
                let respawn_interval_secs = self.respawn_interval_secs_or_default();
                Ok(Box::pin(run_streaming(
                    self.clone(),
                    hostname,
                    respawn_on_exit,
                    respawn_interval_secs,
                    decoder,
                    cx.shutdown,
                    cx.out,
                    log_namespace,
                )))
            }
        }
    }
    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        let log_namespace = global_log_namespace.merge(Some(self.log_namespace.unwrap_or(false)));
        let schema_definition = self
            .decoding
            .schema_definition(log_namespace)
            .with_standard_vector_source_metadata()
            .with_source_metadata(
                Self::NAME,
                Some(LegacyKey::InsertIfEmpty(
                    log_schema()
                        .host_key()
                        .map_or(OwnedValuePath::root(), |key| key.clone()),
                )),
                &owned_value_path!("host"),
                Kind::bytes().or_undefined(),
                Some("host"),
            )
            .with_source_metadata(
                Self::NAME,
                Some(LegacyKey::InsertIfEmpty(owned_value_path!(STREAM_KEY))),
                &owned_value_path!(STREAM_KEY),
                Kind::bytes().or_undefined(),
                None,
            )
            .with_source_metadata(
                Self::NAME,
                Some(LegacyKey::InsertIfEmpty(owned_value_path!(PID_KEY))),
                &owned_value_path!(PID_KEY),
                Kind::integer().or_undefined(),
                None,
            )
            .with_source_metadata(
                Self::NAME,
                Some(LegacyKey::InsertIfEmpty(owned_value_path!(COMMAND_KEY))),
                &owned_value_path!(COMMAND_KEY),
                Kind::bytes(),
                None,
            );
        vec![SourceOutput::new_maybe_logs(
            self.decoding.output_type(),
            schema_definition,
        )]
    }
    fn can_acknowledge(&self) -> bool {
        false
    }
}
async fn run_scheduled(
    config: ExecConfig,
    hostname: Option<String>,
    exec_interval_secs: u64,
    decoder: Decoder,
    shutdown: ShutdownSignal,
    out: SourceSender,
    log_namespace: LogNamespace,
) -> Result<(), ()> {
    debug!("Starting scheduled exec runs.");
    let schedule = Duration::from_secs(exec_interval_secs);
    let mut interval = IntervalStream::new(time::interval(schedule)).take_until(shutdown.clone());
    while interval.next().await.is_some() {
        let timeout = tokio::time::timeout(
            schedule,
            run_command(
                config.clone(),
                hostname.clone(),
                decoder.clone(),
                shutdown.clone(),
                out.clone(),
                log_namespace,
            ),
        )
        .await;
        match timeout {
            Ok(output) => {
                if let Err(command_error) = output {
                    emit!(ExecFailedError {
                        command: config.command_line().as_str(),
                        error: command_error,
                    });
                }
            }
            Err(error) => {
                emit!(ExecTimeoutError {
                    command: config.command_line().as_str(),
                    elapsed_seconds: schedule.as_secs(),
                    error,
                });
            }
        }
    }
    debug!("Finished scheduled exec runs.");
    Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn run_streaming(
    config: ExecConfig,
    hostname: Option<String>,
    respawn_on_exit: bool,
    respawn_interval_secs: u64,
    decoder: Decoder,
    mut shutdown: ShutdownSignal,
    out: SourceSender,
    log_namespace: LogNamespace,
) -> Result<(), ()> {
    if respawn_on_exit {
        let duration = Duration::from_secs(respawn_interval_secs);
        loop {
            let output = run_command(
                config.clone(),
                hostname.clone(),
                decoder.clone(),
                shutdown.clone(),
                out.clone(),
                log_namespace,
            )
            .await;
            if let Err(command_error) = output {
                emit!(ExecFailedError {
                    command: config.command_line().as_str(),
                    error: command_error,
                });
            }
            tokio::select! {
                _ = &mut shutdown => break, _ = sleep(duration) => debug!("Restarting streaming process."),
            }
        }
    } else {
        let output = run_command(
            config.clone(),
            hostname,
            decoder,
            shutdown,
            out,
            log_namespace,
        )
        .await;
        if let Err(command_error) = output {
            emit!(ExecFailedError {
                command: config.command_line().as_str(),
                error: command_error,
            });
        }
    }
    Ok(())
}
async fn run_command(
    config: ExecConfig,
    hostname: Option<String>,
    decoder: Decoder,
    mut shutdown: ShutdownSignal,
    mut out: SourceSender,
    log_namespace: LogNamespace,
) -> Result<Option<ExitStatus>, Error> {
    debug!("Starting command run.");
    let mut command = build_command(&config);
    let start = Instant::now();
    let mut child = command.spawn()?;
    let (sender, mut receiver) = channel(1024);
    if config.include_stderr {
        let stderr = child.stderr.take().ok_or_else(|| {
            Error::new(ErrorKind::Other, "Unable to take stderr of spawned process")
        })?;
        let stderr_reader = BufReader::new(stderr);
        spawn_reader_thread(stderr_reader, decoder.clone(), STDERR, sender.clone());
    }
    let stdout = child
        .stdout
        .take()
        .ok_or_else(|| Error::new(ErrorKind::Other, "Unable to take stdout of spawned process"))?;
    let stdout_reader = BufReader::new(stdout);
    let pid = child.id();
    spawn_reader_thread(stdout_reader, decoder.clone(), STDOUT, sender);
    let bytes_received = register!(BytesReceived::from(Protocol::NONE));
    'outer: loop {
        tokio::select! {
            _ = &mut shutdown => {
                if !shutdown_child(&mut child, &command).await {
                        break 'outer; }
            }
            v = receiver.recv() => {
                match v {
                    None => break 'outer,
                    Some(((mut events, byte_size), stream)) => {
                        bytes_received.emit(ByteSize(byte_size));
                        let count = events.len();
                        emit!(ExecEventsReceived {
                            count,
                            command: config.command_line().as_str(),
                            byte_size: events.estimated_json_encoded_size_of(),
                        });
                        for event in &mut events {
                            handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace);
                        }
                        if (out.send_batch(events).await).is_err() {
                            emit!(StreamClosedError { count });
                            break;
                        }
                    },
                }
            }
        }
    }
    let elapsed = start.elapsed();
    let result = match child.try_wait() {
        Ok(Some(exit_status)) => {
            handle_exit_status(&config, exit_status.code(), elapsed);
            Ok(Some(exit_status))
        }
        Ok(None) => {
            handle_exit_status(&config, None, elapsed);
            Ok(None)
        }
        Err(error) => {
            error!(message = "Unable to obtain exit status.", %error);
            handle_exit_status(&config, None, elapsed);
            Ok(None)
        }
    };
    debug!("Finished command run.");
    result
}
fn handle_exit_status(config: &ExecConfig, exit_status: Option<i32>, exec_duration: Duration) {
    emit!(ExecCommandExecuted {
        command: config.command_line().as_str(),
        exit_status,
        exec_duration,
    });
}
#[cfg(unix)]
async fn shutdown_child(
    child: &mut tokio::process::Child,
    command: &tokio::process::Command,
) -> bool {
    match child.id().map(i32::try_from) {
        Some(Ok(pid)) => {
            if let Err(error) = nix::sys::signal::kill(
                nix::unistd::Pid::from_raw(pid),
                nix::sys::signal::Signal::SIGTERM,
            ) {
                emit!(ExecFailedToSignalChildError {
                    command,
                    error: ExecFailedToSignalChild::SignalError(error)
                });
                false
            } else {
                true
            }
        }
        Some(Err(err)) => {
            emit!(ExecFailedToSignalChildError {
                command,
                error: ExecFailedToSignalChild::FailedToMarshalPid(err)
            });
            false
        }
        None => {
            emit!(ExecFailedToSignalChildError {
                command,
                error: ExecFailedToSignalChild::NoPid
            });
            false
        }
    }
}
#[cfg(windows)]
async fn shutdown_child(
    child: &mut tokio::process::Child,
    command: &tokio::process::Command,
) -> bool {
    match child.kill().await {
        Ok(()) => true,
        Err(err) => {
            emit!(ExecFailedToSignalChildError {
                command: &command,
                error: ExecFailedToSignalChild::IoError(err)
            });
            false
        }
    }
}
fn build_command(config: &ExecConfig) -> Command {
    let command = &config.command[0];
    let mut command = Command::new(command);
    if config.command.len() > 1 {
        command.args(&config.command[1..]);
    };
    command.kill_on_drop(true);
    if config.clear_environment {
        command.env_clear();
    }
    if let Some(envs) = &config.environment {
        command.envs(envs);
    }
    if let Some(current_dir) = &config.working_directory {
        command.current_dir(current_dir);
    }
    command.stdout(std::process::Stdio::piped());
    if config.include_stderr {
        command.stderr(std::process::Stdio::piped());
    } else {
        command.stderr(std::process::Stdio::null());
    }
    command.stdin(std::process::Stdio::null());
    command
}
fn handle_event(
    config: &ExecConfig,
    hostname: &Option<String>,
    data_stream: &Option<String>,
    pid: Option<u32>,
    event: &mut Event,
    log_namespace: LogNamespace,
) {
    if let Event::Log(log) = event {
        log_namespace.insert_standard_vector_source_metadata(log, ExecConfig::NAME, Utc::now());
        if let Some(data_stream) = data_stream {
            log_namespace.insert_source_metadata(
                ExecConfig::NAME,
                log,
                Some(LegacyKey::InsertIfEmpty(path!(STREAM_KEY))),
                path!(STREAM_KEY),
                data_stream.clone(),
            );
        }
        if let Some(pid) = pid {
            log_namespace.insert_source_metadata(
                ExecConfig::NAME,
                log,
                Some(LegacyKey::InsertIfEmpty(path!(PID_KEY))),
                path!(PID_KEY),
                pid as i64,
            );
        }
        if let Some(hostname) = hostname {
            log_namespace.insert_source_metadata(
                ExecConfig::NAME,
                log,
                log_schema().host_key().map(LegacyKey::InsertIfEmpty),
                path!("host"),
                hostname.clone(),
            );
        }
        log_namespace.insert_source_metadata(
            ExecConfig::NAME,
            log,
            Some(LegacyKey::InsertIfEmpty(path!(COMMAND_KEY))),
            path!(COMMAND_KEY),
            config.command.clone(),
        );
    }
}
fn spawn_reader_thread<R: 'static + AsyncRead + Unpin + std::marker::Send>(
    reader: BufReader<R>,
    decoder: Decoder,
    origin: &'static str,
    sender: Sender<((SmallVec<[Event; 1]>, usize), &'static str)>,
) {
    drop(tokio::spawn(async move {
        debug!("Start capturing {} command output.", origin);
        let mut stream = FramedRead::new(reader, decoder);
        while let Some(result) = stream.next().await {
            match result {
                Ok(next) => {
                    if sender.send((next, origin)).await.is_err() {
                        emit!(ExecChannelClosedError);
                        break;
                    }
                }
                Err(error) => {
                    if !error.can_continue() {
                        break;
                    }
                }
            }
        }
        debug!("Finished capturing {} command output.", origin);
    }));
}