use std::{marker::PhantomData, num::NonZeroUsize, time::Duration};
use derivative::Derivative;
use serde_with::serde_as;
use snafu::Snafu;
use vector_lib::configurable::configurable_component;
use vector_lib::json_size::JsonSize;
use vector_lib::stream::BatcherSettings;
use super::EncodedEvent;
use crate::{event::EventFinalizers, internal_events::LargeEventDroppedError};
#[derive(Debug, Snafu, PartialEq, Eq)]
pub enum BatchError {
    #[snafu(display("This sink does not allow setting `max_bytes`"))]
    BytesNotAllowed,
    #[snafu(display("`max_bytes` must be greater than zero"))]
    InvalidMaxBytes,
    #[snafu(display("`max_events` must be greater than zero"))]
    InvalidMaxEvents,
    #[snafu(display("`timeout_secs` must be greater than zero"))]
    InvalidTimeout,
    #[snafu(display("provided `max_bytes` exceeds the maximum limit of {}", limit))]
    MaxBytesExceeded { limit: usize },
    #[snafu(display("provided `max_events` exceeds the maximum limit of {}", limit))]
    MaxEventsExceeded { limit: usize },
}
pub trait SinkBatchSettings {
    const MAX_EVENTS: Option<usize>;
    const MAX_BYTES: Option<usize>;
    const TIMEOUT_SECS: f64;
}
#[derive(Clone, Copy, Debug, Default)]
pub struct RealtimeEventBasedDefaultBatchSettings;
impl SinkBatchSettings for RealtimeEventBasedDefaultBatchSettings {
    const MAX_EVENTS: Option<usize> = Some(1000);
    const MAX_BYTES: Option<usize> = None;
    const TIMEOUT_SECS: f64 = 1.0;
}
#[derive(Clone, Copy, Debug, Default)]
pub struct RealtimeSizeBasedDefaultBatchSettings;
impl SinkBatchSettings for RealtimeSizeBasedDefaultBatchSettings {
    const MAX_EVENTS: Option<usize> = None;
    const MAX_BYTES: Option<usize> = Some(10_000_000);
    const TIMEOUT_SECS: f64 = 1.0;
}
#[derive(Clone, Copy, Debug, Default)]
pub struct BulkSizeBasedDefaultBatchSettings;
impl SinkBatchSettings for BulkSizeBasedDefaultBatchSettings {
    const MAX_EVENTS: Option<usize> = None;
    const MAX_BYTES: Option<usize> = Some(10_000_000);
    const TIMEOUT_SECS: f64 = 300.0;
}
#[derive(Clone, Copy, Debug, Default)]
pub struct NoDefaultsBatchSettings;
impl SinkBatchSettings for NoDefaultsBatchSettings {
    const MAX_EVENTS: Option<usize> = None;
    const MAX_BYTES: Option<usize> = None;
    const TIMEOUT_SECS: f64 = 1.0;
}
#[derive(Clone, Copy, Debug, Default)]
pub struct Merged;
#[derive(Clone, Copy, Debug, Default)]
pub struct Unmerged;
#[serde_as]
#[configurable_component]
#[configurable(metadata(docs::advanced))]
#[derive(Clone, Copy, Debug, Default)]
pub struct BatchConfig<D: SinkBatchSettings + Clone, S = Unmerged>
where
    S: Clone,
{
    #[serde(default = "default_max_bytes::<D>")]
    #[configurable(metadata(docs::type_unit = "bytes"))]
    pub max_bytes: Option<usize>,
    #[serde(default = "default_max_events::<D>")]
    #[configurable(metadata(docs::type_unit = "events"))]
    pub max_events: Option<usize>,
    #[serde(default = "default_timeout::<D>")]
    #[configurable(metadata(docs::type_unit = "seconds"))]
    #[configurable(metadata(docs::human_name = "Timeout"))]
    pub timeout_secs: Option<f64>,
    #[serde(skip)]
    _d: PhantomData<D>,
    #[serde(skip)]
    _s: PhantomData<S>,
}
const fn default_max_bytes<D: SinkBatchSettings>() -> Option<usize> {
    D::MAX_BYTES
}
const fn default_max_events<D: SinkBatchSettings>() -> Option<usize> {
    D::MAX_EVENTS
}
const fn default_timeout<D: SinkBatchSettings>() -> Option<f64> {
    Some(D::TIMEOUT_SECS)
}
impl<D: SinkBatchSettings + Clone> BatchConfig<D, Unmerged> {
    pub fn validate(self) -> Result<BatchConfig<D, Merged>, BatchError> {
        let config = BatchConfig {
            max_bytes: self.max_bytes.or(D::MAX_BYTES),
            max_events: self.max_events.or(D::MAX_EVENTS),
            timeout_secs: self.timeout_secs.or(Some(D::TIMEOUT_SECS)),
            _d: PhantomData,
            _s: PhantomData,
        };
        match (config.max_bytes, config.max_events, config.timeout_secs) {
            (Some(0), _, _) => Err(BatchError::InvalidMaxBytes),
            (_, Some(0), _) => Err(BatchError::InvalidMaxEvents),
            (_, _, Some(timeout)) if timeout <= 0.0 => Err(BatchError::InvalidTimeout),
            _ => Ok(config),
        }
    }
    pub fn into_batch_settings<T: Batch>(self) -> Result<BatchSettings<T>, BatchError> {
        let config = self.validate()?;
        config.into_batch_settings()
    }
    pub fn into_batcher_settings(self) -> Result<BatcherSettings, BatchError> {
        let config = self.validate()?;
        config.into_batcher_settings()
    }
}
impl<D: SinkBatchSettings + Clone> BatchConfig<D, Merged> {
    pub const fn validate(self) -> Result<BatchConfig<D, Merged>, BatchError> {
        Ok(self)
    }
    pub const fn disallow_max_bytes(self) -> Result<Self, BatchError> {
        match self.max_bytes {
            Some(_) => Err(BatchError::BytesNotAllowed),
            None => Ok(self),
        }
    }
    pub const fn limit_max_bytes(self, limit: usize) -> Result<Self, BatchError> {
        match self.max_bytes {
            Some(n) if n > limit => Err(BatchError::MaxBytesExceeded { limit }),
            _ => Ok(self),
        }
    }
    pub const fn limit_max_events(self, limit: usize) -> Result<Self, BatchError> {
        match self.max_events {
            Some(n) if n > limit => Err(BatchError::MaxEventsExceeded { limit }),
            _ => Ok(self),
        }
    }
    pub fn into_batch_settings<T: Batch>(self) -> Result<BatchSettings<T>, BatchError> {
        let adjusted = T::get_settings_defaults(self)?;
        let timeout_secs = adjusted.timeout_secs.ok_or(BatchError::InvalidTimeout)?;
        Ok(BatchSettings {
            size: BatchSize {
                bytes: adjusted.max_bytes.unwrap_or(usize::MAX),
                events: adjusted.max_events.unwrap_or(usize::MAX),
                _type_marker: PhantomData,
            },
            timeout: Duration::from_secs_f64(timeout_secs),
        })
    }
    pub fn into_batcher_settings(self) -> Result<BatcherSettings, BatchError> {
        let max_bytes = self
            .max_bytes
            .and_then(NonZeroUsize::new)
            .or_else(|| NonZeroUsize::new(usize::MAX))
            .expect("`max_bytes` should already be validated");
        let max_events = self
            .max_events
            .and_then(NonZeroUsize::new)
            .or_else(|| NonZeroUsize::new(usize::MAX))
            .expect("`max_bytes` should already be validated");
        let timeout_secs = self.timeout_secs.ok_or(BatchError::InvalidTimeout)?;
        Ok(BatcherSettings::new(
            Duration::from_secs_f64(timeout_secs),
            max_bytes,
            max_events,
        ))
    }
}
impl<D1, D2> From<BatchConfig<D1, Merged>> for BatchConfig<D2, Unmerged>
where
    D1: SinkBatchSettings + Clone,
    D2: SinkBatchSettings + Clone,
{
    fn from(config: BatchConfig<D1, Merged>) -> Self {
        BatchConfig {
            max_bytes: config.max_bytes,
            max_events: config.max_events,
            timeout_secs: config.timeout_secs,
            _d: PhantomData,
            _s: PhantomData,
        }
    }
}
#[derive(Debug, Derivative)]
#[derivative(Clone(bound = ""))]
#[derivative(Copy(bound = ""))]
pub struct BatchSize<B> {
    pub bytes: usize,
    pub events: usize,
    _type_marker: PhantomData<B>,
}
impl<B> BatchSize<B> {
    pub const fn const_default() -> Self {
        BatchSize {
            bytes: usize::MAX,
            events: usize::MAX,
            _type_marker: PhantomData,
        }
    }
}
impl<B> Default for BatchSize<B> {
    fn default() -> Self {
        BatchSize::const_default()
    }
}
#[derive(Debug, Derivative)]
#[derivative(Clone(bound = ""))]
#[derivative(Copy(bound = ""))]
pub struct BatchSettings<B> {
    pub size: BatchSize<B>,
    pub timeout: Duration,
}
impl<B> Default for BatchSettings<B> {
    fn default() -> Self {
        BatchSettings {
            size: BatchSize {
                bytes: 10_000_000,
                events: usize::MAX,
                _type_marker: PhantomData,
            },
            timeout: Duration::from_secs(1),
        }
    }
}
pub(super) fn err_event_too_large<T>(length: usize, max_length: usize) -> PushResult<T> {
    emit!(LargeEventDroppedError { length, max_length });
    PushResult::Ok(false)
}
#[must_use]
#[derive(Debug, Eq, PartialEq)]
pub enum PushResult<T> {
    Ok(bool),
    Overflow(T),
}
pub trait Batch: Sized {
    type Input;
    type Output;
    fn get_settings_defaults<D: SinkBatchSettings + Clone>(
        config: BatchConfig<D, Merged>,
    ) -> Result<BatchConfig<D, Merged>, BatchError> {
        Ok(config)
    }
    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input>;
    fn is_empty(&self) -> bool;
    fn fresh(&self) -> Self;
    fn finish(self) -> Self::Output;
    fn num_items(&self) -> usize;
}
#[derive(Debug)]
pub struct EncodedBatch<I> {
    pub items: I,
    pub finalizers: EventFinalizers,
    pub count: usize,
    pub byte_size: usize,
    pub json_byte_size: JsonSize,
}
#[derive(Clone, Debug)]
pub struct FinalizersBatch<B> {
    inner: B,
    finalizers: EventFinalizers,
    count: usize,
    byte_size: usize,
    json_byte_size: JsonSize,
}
impl<B: Batch> From<B> for FinalizersBatch<B> {
    fn from(inner: B) -> Self {
        Self {
            inner,
            finalizers: Default::default(),
            count: 0,
            byte_size: 0,
            json_byte_size: JsonSize::zero(),
        }
    }
}
impl<B: Batch> Batch for FinalizersBatch<B> {
    type Input = EncodedEvent<B::Input>;
    type Output = EncodedBatch<B::Output>;
    fn get_settings_defaults<D: SinkBatchSettings + Clone>(
        config: BatchConfig<D, Merged>,
    ) -> Result<BatchConfig<D, Merged>, BatchError> {
        B::get_settings_defaults(config)
    }
    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
        let EncodedEvent {
            item,
            finalizers,
            byte_size,
            json_byte_size,
        } = item;
        match self.inner.push(item) {
            PushResult::Ok(full) => {
                self.finalizers.merge(finalizers);
                self.count += 1;
                self.byte_size += byte_size;
                self.json_byte_size += json_byte_size;
                PushResult::Ok(full)
            }
            PushResult::Overflow(item) => PushResult::Overflow(EncodedEvent {
                item,
                finalizers,
                byte_size,
                json_byte_size,
            }),
        }
    }
    fn is_empty(&self) -> bool {
        self.inner.is_empty()
    }
    fn fresh(&self) -> Self {
        Self {
            inner: self.inner.fresh(),
            finalizers: Default::default(),
            count: 0,
            byte_size: 0,
            json_byte_size: JsonSize::zero(),
        }
    }
    fn finish(self) -> Self::Output {
        EncodedBatch {
            items: self.inner.finish(),
            finalizers: self.finalizers,
            count: self.count,
            byte_size: self.byte_size,
            json_byte_size: self.json_byte_size,
        }
    }
    fn num_items(&self) -> usize {
        self.inner.num_items()
    }
}
#[derive(Clone, Debug)]
pub struct StatefulBatch<B> {
    inner: B,
    was_full: bool,
}
impl<B: Batch> From<B> for StatefulBatch<B> {
    fn from(inner: B) -> Self {
        Self {
            inner,
            was_full: false,
        }
    }
}
impl<B> StatefulBatch<B> {
    pub const fn was_full(&self) -> bool {
        self.was_full
    }
    #[allow(clippy::missing_const_for_fn)] pub fn into_inner(self) -> B {
        self.inner
    }
}
impl<B: Batch> Batch for StatefulBatch<B> {
    type Input = B::Input;
    type Output = B::Output;
    fn get_settings_defaults<D: SinkBatchSettings + Clone>(
        config: BatchConfig<D, Merged>,
    ) -> Result<BatchConfig<D, Merged>, BatchError> {
        B::get_settings_defaults(config)
    }
    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
        if self.was_full {
            PushResult::Overflow(item)
        } else {
            let result = self.inner.push(item);
            self.was_full =
                matches!(result, PushResult::Overflow(_)) || matches!(result, PushResult::Ok(true));
            result
        }
    }
    fn is_empty(&self) -> bool {
        !self.was_full && self.inner.is_empty()
    }
    fn fresh(&self) -> Self {
        Self {
            inner: self.inner.fresh(),
            was_full: false,
        }
    }
    fn finish(self) -> Self::Output {
        self.inner.finish()
    }
    fn num_items(&self) -> usize {
        self.inner.num_items()
    }
}