1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
use vector_common::byte_size_of::ByteSizeOf;
use crate::batcher::data::BatchData;
pub trait BatchLimiter<T, B> {
    type ItemMetadata;
    /// Return true if it is not possible for another item to fit in the batch
    fn is_batch_full(&self, batch: &B) -> bool;
    /// It is safe to assume that `is_batch_full` would return `false` before this is called.
    /// You can return arbitrary metadata for an item that will be given back when the item
    /// is actually pushed onto the batch. This is useful if there is an expensive calculation
    /// to determine the "size" of the item.
    fn item_fits_in_batch(&self, item: &T, batch: &B) -> (bool, Self::ItemMetadata);
    /// Add a single item to the batch using the metadata that was calculated by `item_fits_in_batch`
    fn push_item(&mut self, metadata: Self::ItemMetadata);
    /// Reset internal state from a batch being taken.
    fn reset(&mut self);
}
pub struct SizeLimit<I> {
    /// The total "size" of all items in a batch. Size is intentionally
    /// vague here since it is user defined, and can vary.
    ///
    /// To ensure any individual event can be placed in a batch, the first element in a batch is not
    /// subject to this limit.
    pub batch_size_limit: usize,
    /// Total number of items that will be placed in a single batch.
    ///
    /// To ensure any individual event can be placed in a batch, the first element in a batch is not
    /// subject to this limit.
    pub batch_item_limit: usize,
    pub current_size: usize,
    pub item_size_calculator: I,
}
impl<T, B, I> BatchLimiter<T, B> for SizeLimit<I>
where
    B: BatchData<T>,
    I: ItemBatchSize<T>,
{
    type ItemMetadata = usize;
    fn is_batch_full(&self, batch: &B) -> bool {
        batch.len() >= self.batch_item_limit || self.current_size >= self.batch_size_limit
    }
    fn item_fits_in_batch(&self, item: &T, batch: &B) -> (bool, Self::ItemMetadata) {
        let item_size = self.item_size_calculator.size(item);
        if batch.len() == 0 {
            // make sure any individual item can always fit in a batch
            return (true, item_size);
        }
        let fits = self.current_size + item_size <= self.batch_size_limit;
        (fits, item_size)
    }
    fn push_item(&mut self, item_size: usize) {
        self.current_size += item_size;
    }
    fn reset(&mut self) {
        self.current_size = 0;
    }
}
pub trait ItemBatchSize<T> {
    /// The size of an individual item in a batch.
    fn size(&self, item: &T) -> usize;
}
pub struct ByteSizeOfItemSize;
impl<T: ByteSizeOf> ItemBatchSize<T> for ByteSizeOfItemSize {
    fn size(&self, item: &T) -> usize {
        item.size_of()
    }
}
impl<T, F> ItemBatchSize<T> for F
where
    F: Fn(&T) -> usize,
{
    fn size(&self, item: &T) -> usize {
        (self)(item)
    }
}