use std::{
    collections::HashMap,
    fmt,
    hash::Hash,
    marker::PhantomData,
    pin::Pin,
    task::{ready, Context, Poll},
};
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, Sink, Stream, TryFutureExt};
use pin_project::pin_project;
use tokio::{
    sync::oneshot,
    time::{sleep, Duration, Sleep},
};
use tower::{Service, ServiceBuilder};
use tracing::Instrument;
use vector_lib::internal_event::{
    CallError, CountByteSize, EventsSent, InternalEventHandle as _, Output,
};
pub use vector_lib::sink::StreamSink;
use super::{
    batch::{Batch, EncodedBatch, FinalizersBatch, PushResult, StatefulBatch},
    buffer::{Partition, PartitionBuffer, PartitionInnerBuffer},
    service::{Map, ServiceBuilderExt},
    EncodedEvent,
};
use crate::event::EventStatus;
#[pin_project]
#[derive(Debug)]
pub struct BatchSink<S, B>
where
    S: Service<B::Output>,
    B: Batch,
{
    #[pin]
    inner: PartitionBatchSink<
        Map<S, PartitionInnerBuffer<B::Output, ()>, B::Output>,
        PartitionBuffer<B, ()>,
        (),
    >,
}
impl<S, B> BatchSink<S, B>
where
    S: Service<B::Output>,
    S::Future: Send + 'static,
    S::Error: Into<crate::Error> + Send + 'static,
    S::Response: Response + Send + 'static,
    B: Batch,
{
    pub fn new(service: S, batch: B, timeout: Duration) -> Self {
        let service = ServiceBuilder::new()
            .map(|req: PartitionInnerBuffer<B::Output, ()>| req.into_parts().0)
            .service(service);
        let batch = PartitionBuffer::new(batch);
        let inner = PartitionBatchSink::new(service, batch, timeout);
        Self { inner }
    }
}
#[cfg(test)]
impl<S, B> BatchSink<S, B>
where
    S: Service<B::Output>,
    B: Batch,
{
    pub const fn get_ref(&self) -> &S {
        &self.inner.service.service.inner
    }
}
impl<S, B> Sink<EncodedEvent<B::Input>> for BatchSink<S, B>
where
    S: Service<B::Output>,
    S::Future: Send + 'static,
    S::Error: Into<crate::Error> + Send + 'static,
    S::Response: Response + Send + 'static,
    B: Batch,
{
    type Error = crate::Error;
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().inner.poll_ready(cx)
    }
    fn start_send(self: Pin<&mut Self>, item: EncodedEvent<B::Input>) -> Result<(), Self::Error> {
        self.project()
            .inner
            .start_send(item.map(|item| PartitionInnerBuffer::new(item, ())))
    }
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().inner.poll_flush(cx)
    }
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().inner.poll_close(cx)
    }
}
#[pin_project]
pub struct PartitionBatchSink<S, B, K>
where
    B: Batch,
    S: Service<B::Output>,
{
    service: ServiceSink<S, B::Output>,
    buffer: Option<(K, EncodedEvent<B::Input>)>,
    batch: StatefulBatch<FinalizersBatch<B>>,
    partitions: HashMap<K, StatefulBatch<FinalizersBatch<B>>>,
    timeout: Duration,
    lingers: HashMap<K, Pin<Box<Sleep>>>,
    in_flight: Option<HashMap<K, BoxFuture<'static, ()>>>,
    closing: bool,
}
impl<S, B, K> PartitionBatchSink<S, B, K>
where
    B: Batch,
    B::Input: Partition<K>,
    K: Hash + Eq + Clone + Send + 'static,
    S: Service<B::Output>,
    S::Future: Send + 'static,
    S::Error: Into<crate::Error> + Send + 'static,
    S::Response: Response + Send + 'static,
{
    pub fn new(service: S, batch: B, timeout: Duration) -> Self {
        Self {
            service: ServiceSink::new(service),
            buffer: None,
            batch: StatefulBatch::from(FinalizersBatch::from(batch)),
            partitions: HashMap::new(),
            timeout,
            lingers: HashMap::new(),
            in_flight: None,
            closing: false,
        }
    }
    pub fn ordered(&mut self) {
        self.in_flight = Some(HashMap::new());
    }
}
impl<S, B, K> Sink<EncodedEvent<B::Input>> for PartitionBatchSink<S, B, K>
where
    B: Batch,
    B::Input: Partition<K>,
    K: Hash + Eq + Clone + Send + 'static,
    S: Service<B::Output>,
    S::Future: Send + 'static,
    S::Error: Into<crate::Error> + Send + 'static,
    S::Response: Response + Send + 'static,
{
    type Error = crate::Error;
    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        if self.buffer.is_some() {
            match self.as_mut().poll_flush(cx) {
                Poll::Ready(Ok(())) => {}
                Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
                Poll::Pending => {
                    if self.buffer.is_some() {
                        return Poll::Pending;
                    }
                }
            }
        }
        Poll::Ready(Ok(()))
    }
    fn start_send(
        mut self: Pin<&mut Self>,
        item: EncodedEvent<B::Input>,
    ) -> Result<(), Self::Error> {
        let partition = item.item.partition();
        let batch = loop {
            if let Some(batch) = self.partitions.get_mut(&partition) {
                break batch;
            }
            let batch = self.batch.fresh();
            self.partitions.insert(partition.clone(), batch);
            let delay = sleep(self.timeout);
            self.lingers.insert(partition.clone(), Box::pin(delay));
        };
        if let PushResult::Overflow(item) = batch.push(item) {
            self.buffer = Some((partition, item));
        }
        Ok(())
    }
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        loop {
            if self.buffer.is_none() && self.partitions.is_empty() {
                ready!(self.service.poll_complete(cx));
                return Poll::Ready(Ok(()));
            }
            let this = self.as_mut().project();
            let mut partitions_ready = vec![];
            for (partition, batch) in this.partitions.iter() {
                if ((*this.closing && !batch.is_empty())
                    || batch.was_full()
                    || matches!(
                        this.lingers
                            .get_mut(partition)
                            .expect("linger should exists for poll_flush")
                            .poll_unpin(cx),
                        Poll::Ready(())
                    ))
                    && this
                        .in_flight
                        .as_mut()
                        .and_then(|map| map.get_mut(partition))
                        .map(|req| matches!(req.poll_unpin(cx), Poll::Ready(())))
                        .unwrap_or(true)
                {
                    partitions_ready.push(partition.clone());
                }
            }
            let mut batch_consumed = false;
            for partition in partitions_ready.iter() {
                let service_ready = match this.service.poll_ready(cx) {
                    Poll::Ready(Ok(())) => true,
                    Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
                    Poll::Pending => false,
                };
                if service_ready {
                    trace!("Service ready; Sending batch.");
                    let batch = this.partitions.remove(partition).unwrap();
                    this.lingers.remove(partition);
                    let batch = batch.finish();
                    let future = tokio::spawn(this.service.call(batch));
                    if let Some(map) = this.in_flight.as_mut() {
                        map.insert(partition.clone(), future.map(|_| ()).fuse().boxed());
                    }
                    batch_consumed = true;
                } else {
                    break;
                }
            }
            if batch_consumed {
                continue;
            }
            if let Some(in_flight) = this.in_flight.as_mut() {
                if in_flight.len() > this.partitions.len() {
                    let partitions = this.partitions;
                    in_flight.retain(|partition, req| {
                        partitions.contains_key(partition) || req.poll_unpin(cx).is_pending()
                    });
                }
            }
            if let Some((partition, item)) = self.buffer.take() {
                if self.partitions.contains_key(&partition) {
                    self.buffer = Some((partition, item));
                } else {
                    self.as_mut().start_send(item)?;
                    if self.buffer.is_some() {
                        unreachable!("Empty buffer overflowed.");
                    }
                    continue;
                }
            }
            ready!(self.service.poll_complete(cx));
            return Poll::Pending;
        }
    }
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        trace!("Closing partition batch sink.");
        self.closing = true;
        self.poll_flush(cx)
    }
}
impl<S, B, K> fmt::Debug for PartitionBatchSink<S, B, K>
where
    S: Service<B::Output> + fmt::Debug,
    B: Batch + fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("PartitionBatchSink")
            .field("service", &self.service)
            .field("batch", &self.batch)
            .field("timeout", &self.timeout)
            .finish()
    }
}
struct ServiceSink<S, Request> {
    service: S,
    in_flight: FuturesUnordered<oneshot::Receiver<()>>,
    next_request_id: usize,
    _pd: PhantomData<Request>,
}
impl<S, Request> ServiceSink<S, Request>
where
    S: Service<Request>,
    S::Future: Send + 'static,
    S::Error: Into<crate::Error> + Send + 'static,
    S::Response: Response + Send + 'static,
{
    fn new(service: S) -> Self {
        Self {
            service,
            in_flight: FuturesUnordered::new(),
            next_request_id: 0,
            _pd: PhantomData,
        }
    }
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
        self.service.poll_ready(cx).map_err(Into::into)
    }
    fn call(&mut self, batch: EncodedBatch<Request>) -> BoxFuture<'static, ()> {
        let EncodedBatch {
            items,
            finalizers,
            count,
            json_byte_size,
            ..
        } = batch;
        let (tx, rx) = oneshot::channel();
        self.in_flight.push(rx);
        let request_id = self.next_request_id;
        self.next_request_id = request_id.wrapping_add(1);
        trace!(
            message = "Submitting service request.",
            in_flight_requests = self.in_flight.len()
        );
        let events_sent = register!(EventsSent::from(Output(None)));
        self.service
            .call(items)
            .err_into()
            .map(move |result| {
                let status = result_status(&result);
                finalizers.update_status(status);
                match status {
                    EventStatus::Delivered => {
                        events_sent.emit(CountByteSize(count, json_byte_size));
                        }
                    EventStatus::Rejected => {
                        let error = result.err().unwrap_or_else(|| "Response failed.".into());
                        emit!(CallError {
                            error,
                            request_id,
                            count,
                        });
                    }
                    _ => {} }
                _ = tx.send(());
            })
            .instrument(info_span!("request", %request_id).or_current())
            .boxed()
    }
    fn poll_complete(&mut self, cx: &mut Context<'_>) -> Poll<()> {
        while !self.in_flight.is_empty() {
            match ready!(Pin::new(&mut self.in_flight).poll_next(cx)) {
                Some(Ok(())) => {}
                Some(Err(_)) => panic!("ServiceSink service sender dropped."),
                None => break,
            }
        }
        Poll::Ready(())
    }
}
impl<S, Request> fmt::Debug for ServiceSink<S, Request>
where
    S: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ServiceSink")
            .field("service", &self.service)
            .finish()
    }
}
pub trait ServiceLogic: Clone {
    type Response: Response;
    fn result_status(&self, result: &crate::Result<Self::Response>) -> EventStatus;
}
#[derive(Derivative)]
#[derivative(Clone)]
pub struct StdServiceLogic<R> {
    _pd: PhantomData<R>,
}
impl<R> Default for StdServiceLogic<R> {
    fn default() -> Self {
        Self { _pd: PhantomData }
    }
}
impl<R> ServiceLogic for StdServiceLogic<R>
where
    R: Response + Send,
{
    type Response = R;
    fn result_status(&self, result: &crate::Result<Self::Response>) -> EventStatus {
        result_status(result)
    }
}
fn result_status<R: Response + Send>(result: &crate::Result<R>) -> EventStatus {
    match result {
        Ok(response) => {
            if response.is_successful() {
                trace!(message = "Response successful.", ?response);
                EventStatus::Delivered
            } else if response.is_transient() {
                error!(message = "Response wasn't successful.", ?response);
                EventStatus::Errored
            } else {
                error!(message = "Response failed.", ?response);
                EventStatus::Rejected
            }
        }
        Err(error) => {
            error!(message = "Request failed.", %error);
            EventStatus::Errored
        }
    }
}
pub trait Response: fmt::Debug {
    fn is_successful(&self) -> bool {
        true
    }
    fn is_transient(&self) -> bool {
        true
    }
}
impl Response for () {}
impl<'a> Response for &'a str {}
#[cfg(test)]
mod tests {
    use std::{
        convert::Infallible,
        sync::{atomic::AtomicUsize, atomic::Ordering::Relaxed, Arc, Mutex},
    };
    use bytes::Bytes;
    use futures::{future, stream, task::noop_waker_ref, SinkExt, StreamExt};
    use tokio::{task::yield_now, time::Instant};
    use vector_lib::{
        finalization::{BatchNotifier, BatchStatus, EventFinalizer, EventFinalizers},
        json_size::JsonSize,
    };
    use super::*;
    use crate::{
        sinks::util::{BatchSettings, EncodedLength, VecBuffer},
        test_util::trace_init,
    };
    const TIMEOUT: Duration = Duration::from_secs(10);
    impl EncodedLength for usize {
        fn encoded_length(&self) -> usize {
            22
        }
    }
    async fn advance_time(duration: Duration) {
        tokio::time::pause();
        tokio::time::advance(duration).await;
        tokio::time::resume();
    }
    type Counter = Arc<AtomicUsize>;
    struct Request(usize, EventFinalizers);
    impl Request {
        fn new(value: usize, counter: &Counter) -> Self {
            let (batch, receiver) = BatchNotifier::new_with_receiver();
            let counter = Arc::clone(counter);
            tokio::spawn(async move {
                if receiver.await == BatchStatus::Delivered {
                    counter.fetch_add(value, Relaxed);
                }
            });
            Self(value, EventFinalizers::new(EventFinalizer::new(batch)))
        }
        fn encoded(value: usize, counter: &Counter) -> EncodedEvent<Self> {
            let mut item = Self::new(value, counter);
            let finalizers = std::mem::take(&mut item.1);
            EncodedEvent {
                item,
                finalizers,
                json_byte_size: JsonSize::zero(),
                byte_size: 0,
            }
        }
    }
    impl EncodedLength for Request {
        fn encoded_length(&self) -> usize {
            22
        }
    }
    #[tokio::test]
    async fn batch_sink_acking_sequential() {
        let ack_counter = Counter::default();
        let svc = tower::service_fn(|_| future::ok::<_, std::io::Error>(()));
        let mut batch_settings = BatchSettings::default();
        batch_settings.size.bytes = 9999;
        batch_settings.size.events = 10;
        let buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
        buffered
            .sink_map_err(drop)
            .send_all(
                &mut stream::iter(1..=22).map(|item| Ok(Request::encoded(item, &ack_counter))),
            )
            .await
            .unwrap();
        assert_eq!(ack_counter.load(Relaxed), 22 * 23 / 2);
    }
    #[tokio::test]
    async fn batch_sink_acking_unordered() {
        let ack_counter = Counter::default();
        trace_init();
        let svc = tower::service_fn(|req: Vec<Request>| async move {
            let duration = match req[0].0 {
                1..=3 => Duration::from_secs(1),
                4 => Duration::from_secs(5),
                5 | 6 => Duration::from_secs(1),
                _ => unreachable!(),
            };
            sleep(duration).await;
            Ok::<(), Infallible>(())
        });
        let mut batch_settings = BatchSettings::default();
        batch_settings.size.bytes = 9999;
        batch_settings.size.events = 1;
        let mut sink = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
        let mut cx = Context::from_waker(noop_waker_ref());
        for item in 1..=3 {
            assert!(matches!(
                sink.poll_ready_unpin(&mut cx),
                Poll::Ready(Ok(()))
            ));
            assert!(matches!(
                sink.start_send_unpin(Request::encoded(item, &ack_counter)),
                Ok(())
            ));
        }
        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
        assert_eq!(ack_counter.load(Relaxed), 0);
        yield_now().await;
        advance_time(Duration::from_secs(3)).await;
        yield_now().await;
        for _ in 1..=3 {
            assert!(matches!(
                sink.poll_flush_unpin(&mut cx),
                Poll::Ready(Ok(()))
            ));
        }
        assert_eq!(ack_counter.load(Relaxed), 6);
        for item in 4..=6 {
            assert!(matches!(
                sink.poll_ready_unpin(&mut cx),
                Poll::Ready(Ok(()))
            ));
            assert!(matches!(
                sink.start_send_unpin(Request::encoded(item, &ack_counter)),
                Ok(())
            ));
        }
        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
        assert_eq!(ack_counter.load(Relaxed), 6);
        yield_now().await;
        advance_time(Duration::from_secs(2)).await;
        yield_now().await;
        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
        assert_eq!(ack_counter.load(Relaxed), 17);
        yield_now().await;
        advance_time(Duration::from_secs(5)).await;
        yield_now().await;
        for _ in 4..=6 {
            assert!(matches!(
                sink.poll_flush_unpin(&mut cx),
                Poll::Ready(Ok(()))
            ));
        }
        assert_eq!(ack_counter.load(Relaxed), 21);
    }
    #[tokio::test]
    async fn batch_sink_buffers_messages_until_limit() {
        let sent_requests = Arc::new(Mutex::new(Vec::new()));
        let svc = tower::service_fn(|req| {
            let sent_requests = Arc::clone(&sent_requests);
            sent_requests.lock().unwrap().push(req);
            future::ok::<_, std::io::Error>(())
        });
        let mut batch_settings = BatchSettings::default();
        batch_settings.size.bytes = 9999;
        batch_settings.size.events = 10;
        let buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
        buffered
            .sink_map_err(drop)
            .send_all(
                &mut stream::iter(0..22)
                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
            )
            .await
            .unwrap();
        let output = sent_requests.lock().unwrap();
        assert_eq!(
            &*output,
            &vec![
                vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
                vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
                vec![20, 21]
            ]
        );
    }
    #[tokio::test]
    async fn batch_sink_flushes_below_min_on_close() {
        let sent_requests = Arc::new(Mutex::new(Vec::new()));
        let svc = tower::service_fn(|req| {
            let sent_requests = Arc::clone(&sent_requests);
            sent_requests.lock().unwrap().push(req);
            future::ok::<_, std::io::Error>(())
        });
        let mut batch_settings = BatchSettings::default();
        batch_settings.size.bytes = 9999;
        batch_settings.size.events = 10;
        let mut buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
        let mut cx = Context::from_waker(noop_waker_ref());
        assert!(matches!(
            buffered.poll_ready_unpin(&mut cx),
            Poll::Ready(Ok(()))
        ));
        assert!(matches!(
            buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())),
            Ok(())
        ));
        assert!(matches!(
            buffered.poll_ready_unpin(&mut cx),
            Poll::Ready(Ok(()))
        ));
        assert!(matches!(
            buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
            Ok(())
        ));
        buffered.close().await.unwrap();
        let output = sent_requests.lock().unwrap();
        assert_eq!(&*output, &vec![vec![0, 1]]);
    }
    #[tokio::test]
    async fn batch_sink_expired_linger() {
        let sent_requests = Arc::new(Mutex::new(Vec::new()));
        let svc = tower::service_fn(|req| {
            let sent_requests = Arc::clone(&sent_requests);
            sent_requests.lock().unwrap().push(req);
            future::ok::<_, std::io::Error>(())
        });
        let mut batch_settings = BatchSettings::default();
        batch_settings.size.bytes = 9999;
        batch_settings.size.events = 10;
        let mut buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
        let mut cx = Context::from_waker(noop_waker_ref());
        assert!(matches!(
            buffered.poll_ready_unpin(&mut cx),
            Poll::Ready(Ok(()))
        ));
        assert!(matches!(
            buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())),
            Ok(())
        ));
        assert!(matches!(
            buffered.poll_ready_unpin(&mut cx),
            Poll::Ready(Ok(()))
        ));
        assert!(matches!(
            buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
            Ok(())
        ));
        advance_time(TIMEOUT + Duration::from_secs(1)).await;
        let start = Instant::now();
        buffered.flush().await.unwrap();
        let elapsed = start.duration_since(start);
        assert!(elapsed < Duration::from_millis(200));
        let output = sent_requests.lock().unwrap();
        assert_eq!(&*output, &vec![vec![0, 1]]);
    }
    #[tokio::test]
    async fn partition_batch_sink_buffers_messages_until_limit() {
        let sent_requests = Arc::new(Mutex::new(Vec::new()));
        let svc = tower::service_fn(|req| {
            let sent_requests = Arc::clone(&sent_requests);
            sent_requests.lock().unwrap().push(req);
            future::ok::<_, std::io::Error>(())
        });
        let mut batch_settings = BatchSettings::default();
        batch_settings.size.bytes = 9999;
        batch_settings.size.events = 10;
        let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
        sink.sink_map_err(drop)
            .send_all(
                &mut stream::iter(0..22)
                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
            )
            .await
            .unwrap();
        let output = sent_requests.lock().unwrap();
        assert_eq!(
            &*output,
            &vec![
                vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
                vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
                vec![20, 21]
            ]
        );
    }
    #[tokio::test]
    async fn partition_batch_sink_buffers_by_partition_buffer_size_one() {
        let sent_requests = Arc::new(Mutex::new(Vec::new()));
        let svc = tower::service_fn(|req| {
            let sent_requests = Arc::clone(&sent_requests);
            sent_requests.lock().unwrap().push(req);
            future::ok::<_, std::io::Error>(())
        });
        let mut batch_settings = BatchSettings::default();
        batch_settings.size.bytes = 9999;
        batch_settings.size.events = 1;
        let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
        let input = vec![Partitions::A, Partitions::B];
        sink.sink_map_err(drop)
            .send_all(
                &mut stream::iter(input)
                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
            )
            .await
            .unwrap();
        let mut output = sent_requests.lock().unwrap();
        output[..].sort();
        assert_eq!(&*output, &vec![vec![Partitions::A], vec![Partitions::B]]);
    }
    #[tokio::test]
    async fn partition_batch_sink_buffers_by_partition_buffer_size_two() {
        let sent_requests = Arc::new(Mutex::new(Vec::new()));
        let svc = tower::service_fn(|req| {
            let sent_requests = Arc::clone(&sent_requests);
            sent_requests.lock().unwrap().push(req);
            future::ok::<_, std::io::Error>(())
        });
        let mut batch_settings = BatchSettings::default();
        batch_settings.size.bytes = 9999;
        batch_settings.size.events = 2;
        let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
        let input = vec![Partitions::A, Partitions::B, Partitions::A, Partitions::B];
        sink.sink_map_err(drop)
            .send_all(
                &mut stream::iter(input)
                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
            )
            .await
            .unwrap();
        let mut output = sent_requests.lock().unwrap();
        output[..].sort();
        assert_eq!(
            &*output,
            &vec![
                vec![Partitions::A, Partitions::A],
                vec![Partitions::B, Partitions::B]
            ]
        );
    }
    #[tokio::test]
    async fn partition_batch_sink_submits_after_linger() {
        let sent_requests = Arc::new(Mutex::new(Vec::new()));
        let svc = tower::service_fn(|req| {
            let sent_requests = Arc::clone(&sent_requests);
            sent_requests.lock().unwrap().push(req);
            future::ok::<_, std::io::Error>(())
        });
        let mut batch_settings = BatchSettings::default();
        batch_settings.size.bytes = 9999;
        batch_settings.size.events = 10;
        let mut sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
        let mut cx = Context::from_waker(noop_waker_ref());
        assert!(matches!(
            sink.poll_ready_unpin(&mut cx),
            Poll::Ready(Ok(()))
        ));
        assert!(matches!(
            sink.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
            Ok(())
        ));
        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
        advance_time(TIMEOUT + Duration::from_secs(1)).await;
        let start = Instant::now();
        sink.flush().await.unwrap();
        let elapsed = start.duration_since(start);
        assert!(elapsed < Duration::from_millis(200));
        let output = sent_requests.lock().unwrap();
        assert_eq!(&*output, &vec![vec![1]]);
    }
    #[tokio::test]
    async fn service_sink_doesnt_propagate_error() {
        let ack_counter = Counter::default();
        let svc = tower::service_fn(|req: Request| {
            if req.0 == 3 {
                future::err("bad")
            } else {
                future::ok("good")
            }
        });
        let mut sink = ServiceSink::new(svc);
        let req = |items: usize| {
            let mut req = Request::new(items, &ack_counter);
            let finalizers = std::mem::take(&mut req.1);
            EncodedBatch {
                items: req,
                finalizers,
                count: items,
                byte_size: 1,
                json_byte_size: JsonSize::new(1),
            }
        };
        let mut fut1 = sink.call(req(1));
        let mut fut2 = sink.call(req(2));
        assert_eq!(ack_counter.load(Relaxed), 0);
        let mut cx = Context::from_waker(noop_waker_ref());
        assert!(matches!(fut1.poll_unpin(&mut cx), Poll::Ready(())));
        assert!(matches!(fut2.poll_unpin(&mut cx), Poll::Ready(())));
        assert!(matches!(sink.poll_complete(&mut cx), Poll::Ready(())));
        yield_now().await;
        assert_eq!(ack_counter.load(Relaxed), 3);
        let mut fut3 = sink.call(req(3)); let mut fut4 = sink.call(req(4));
        assert!(matches!(fut3.poll_unpin(&mut cx), Poll::Ready(())));
        assert!(matches!(fut4.poll_unpin(&mut cx), Poll::Ready(())));
        assert!(matches!(sink.poll_complete(&mut cx), Poll::Ready(())));
        yield_now().await;
        assert_eq!(ack_counter.load(Relaxed), 7);
    }
    #[tokio::test]
    async fn partition_batch_sink_ordering_per_partition() {
        let sent_requests = Arc::new(Mutex::new(Vec::new()));
        let mut delay = true;
        let svc = tower::service_fn(|req| {
            let sent_requests = Arc::clone(&sent_requests);
            if delay {
                delay = false;
                sleep(Duration::from_secs(1))
                    .map(move |_| {
                        sent_requests.lock().unwrap().push(req);
                        Result::<_, std::io::Error>::Ok(())
                    })
                    .boxed()
            } else {
                sent_requests.lock().unwrap().push(req);
                future::ok::<_, std::io::Error>(()).boxed()
            }
        });
        let mut batch_settings = BatchSettings::default();
        batch_settings.size.bytes = 9999;
        batch_settings.size.events = 10;
        let mut sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
        sink.ordered();
        let input = (0..20).map(|i| (0, i)).chain((0..20).map(|i| (1, i)));
        sink.sink_map_err(drop)
            .send_all(
                &mut stream::iter(input)
                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
            )
            .await
            .unwrap();
        let output = sent_requests.lock().unwrap();
        assert_eq!(
            &*output,
            &vec![
                (0..10).map(|i| (1, i)).collect::<Vec<_>>(),
                (10..20).map(|i| (1, i)).collect(),
                (0..10).map(|i| (0, i)).collect(),
                (10..20).map(|i| (0, i)).collect(),
            ]
        );
    }
    #[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
    enum Partitions {
        A,
        B,
    }
    impl EncodedLength for Partitions {
        fn encoded_length(&self) -> usize {
            10 }
    }
    impl Partition<Bytes> for Partitions {
        fn partition(&self) -> Bytes {
            format!("{:?}", self).into()
        }
    }
    impl Partition<Bytes> for usize {
        fn partition(&self) -> Bytes {
            "key".into()
        }
    }
    impl Partition<Bytes> for (usize, usize) {
        fn partition(&self) -> Bytes {
            self.0.to_string().into()
        }
    }
    impl EncodedLength for (usize, usize) {
        fn encoded_length(&self) -> usize {
            16
        }
    }
}