use std::{
    path::{Path, PathBuf},
    time::Duration,
};
use std::{
    sync::mpsc::{channel, Receiver},
    thread,
};
use notify::{recommended_watcher, EventKind, RecursiveMode};
use crate::Error;
const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
pub enum WatcherConfig {
    RecommendedWatcher,
    PollWatcher(u64),
}
enum Watcher {
    RecommendedWatcher(notify::RecommendedWatcher),
    PollWatcher(notify::PollWatcher),
}
impl Watcher {
    fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
        for path in config_paths {
            self.watch(path, RecursiveMode::Recursive)?;
        }
        Ok(())
    }
    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
        use notify::Watcher as NotifyWatcher;
        match self {
            Watcher::RecommendedWatcher(watcher) => {
                watcher.watch(path, recursive_mode)?;
            }
            Watcher::PollWatcher(watcher) => {
                watcher.watch(path, recursive_mode)?;
            }
        }
        Ok(())
    }
}
pub fn spawn_thread<'a>(
    watcher_conf: WatcherConfig,
    signal_tx: crate::signal::SignalTx,
    config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
    delay: impl Into<Option<Duration>>,
) -> Result<(), Error> {
    let config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
    let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);
    let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);
    info!("Watching configuration files.");
    thread::spawn(move || loop {
        if let Some((mut watcher, receiver)) = watcher.take() {
            while let Ok(Ok(event)) = receiver.recv() {
                if matches!(
                    event.kind,
                    EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_)
                ) {
                    debug!(message = "Configuration file change detected.", event = ?event);
                    while receiver.recv_timeout(delay).is_ok() {}
                    debug!(message = "Consumed file change events for delay.", delay = ?delay);
                    if let Err(error) = watcher.add_paths(&config_paths) {
                        error!(message = "Failed to read files to watch.", %error);
                        break;
                    }
                    debug!(message = "Reloaded paths.");
                    info!("Configuration file changed.");
                    _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
                        error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
                    });
                } else {
                    debug!(message = "Ignoring event.", event = ?event)
                }
            }
        }
        thread::sleep(RETRY_TIMEOUT);
        watcher = create_watcher(&watcher_conf, &config_paths)
            .map_err(|error| error!(message = "Failed to create file watcher.", %error))
            .ok();
        if watcher.is_some() {
            info!("Speculating that configuration files have changed.");
            _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
                error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
            });
        }
    });
    Ok(())
}
fn create_watcher(
    watcher_conf: &WatcherConfig,
    config_paths: &[PathBuf],
) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
    info!("Creating configuration file watcher.");
    let (sender, receiver) = channel();
    let mut watcher = match watcher_conf {
        WatcherConfig::RecommendedWatcher => {
            let recommended_watcher = recommended_watcher(sender)?;
            Watcher::RecommendedWatcher(recommended_watcher)
        }
        WatcherConfig::PollWatcher(interval) => {
            let config =
                notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
            let poll_watcher = notify::PollWatcher::new(sender, config)?;
            Watcher::PollWatcher(poll_watcher)
        }
    };
    watcher.add_paths(config_paths)?;
    Ok((watcher, receiver))
}
#[cfg(all(test, unix, not(target_os = "macos")))] mod tests {
    use super::*;
    use crate::{
        signal::SignalRx,
        test_util::{temp_dir, temp_file, trace_init},
    };
    use std::{fs::File, io::Write, time::Duration};
    use tokio::sync::broadcast;
    async fn test(file: &mut File, timeout: Duration, mut receiver: SignalRx) -> bool {
        file.write_all(&[0]).unwrap();
        file.sync_all().unwrap();
        matches!(
            tokio::time::timeout(timeout, receiver.recv()).await,
            Ok(Ok(crate::signal::SignalTo::ReloadFromDisk))
        )
    }
    #[tokio::test]
    async fn file_directory_update() {
        trace_init();
        let delay = Duration::from_secs(3);
        let dir = temp_dir().to_path_buf();
        let file_path = dir.join("vector.toml");
        let watcher_conf = WatcherConfig::RecommendedWatcher;
        std::fs::create_dir(&dir).unwrap();
        let mut file = File::create(&file_path).unwrap();
        let (signal_tx, signal_rx) = broadcast::channel(128);
        spawn_thread(watcher_conf, signal_tx, &[dir], delay).unwrap();
        if !test(&mut file, delay * 5, signal_rx).await {
            panic!("Test timed out");
        }
    }
    #[tokio::test]
    async fn file_update() {
        trace_init();
        let delay = Duration::from_secs(3);
        let file_path = temp_file();
        let mut file = File::create(&file_path).unwrap();
        let watcher_conf = WatcherConfig::RecommendedWatcher;
        let (signal_tx, signal_rx) = broadcast::channel(128);
        spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap();
        if !test(&mut file, delay * 5, signal_rx).await {
            panic!("Test timed out");
        }
    }
    #[tokio::test]
    #[cfg(unix)]
    async fn sym_file_update() {
        trace_init();
        let delay = Duration::from_secs(3);
        let file_path = temp_file();
        let sym_file = temp_file();
        let mut file = File::create(&file_path).unwrap();
        std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();
        let watcher_conf = WatcherConfig::RecommendedWatcher;
        let (signal_tx, signal_rx) = broadcast::channel(128);
        spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap();
        if !test(&mut file, delay * 5, signal_rx).await {
            panic!("Test timed out");
        }
    }
    #[tokio::test]
    async fn recursive_directory_file_update() {
        trace_init();
        let delay = Duration::from_secs(3);
        let dir = temp_dir().to_path_buf();
        let sub_dir = dir.join("sources");
        let file_path = sub_dir.join("input.toml");
        let watcher_conf = WatcherConfig::RecommendedWatcher;
        std::fs::create_dir_all(&sub_dir).unwrap();
        let mut file = File::create(&file_path).unwrap();
        let (signal_tx, signal_rx) = broadcast::channel(128);
        spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap();
        if !test(&mut file, delay * 5, signal_rx).await {
            panic!("Test timed out");
        }
    }
}