use http::StatusCode;
use serde::Deserialize;
use crate::{
    http::HttpError,
    sinks::{
        elasticsearch::service::ElasticsearchResponse,
        util::retries::{RetryAction, RetryLogic},
    },
};
#[derive(Deserialize, Debug)]
struct EsResultResponse {
    items: Vec<EsResultItem>,
}
impl EsResultResponse {
    fn parse(body: &str) -> Result<Self, String> {
        serde_json::from_str::<EsResultResponse>(body).map_err(|json_error| {
            format!(
                "some messages failed, could not parse response, error: {}",
                json_error
            )
        })
    }
    fn iter_status(&self) -> impl Iterator<Item = (StatusCode, Option<&EsErrorDetails>)> {
        self.items.iter().filter_map(|item| {
            item.result()
                .status
                .and_then(|status| StatusCode::from_u16(status).ok())
                .map(|status| (status, item.result().error.as_ref()))
        })
    }
    fn get_error_reason(&self, body: &str) -> String {
        match self
            .items
            .iter()
            .find_map(|item| item.result().error.as_ref())
        {
            Some(error) => format!("error type: {}, reason: {}", error.err_type, error.reason),
            None => format!("error response: {}", body),
        }
    }
}
#[derive(Deserialize, Debug)]
enum EsResultItem {
    #[serde(rename = "index")]
    Index(EsIndexResult),
    #[serde(rename = "create")]
    Create(EsIndexResult),
}
impl EsResultItem {
    #[allow(clippy::missing_const_for_fn)] fn result(&self) -> &EsIndexResult {
        match self {
            EsResultItem::Index(r) => r,
            EsResultItem::Create(r) => r,
        }
    }
}
#[derive(Deserialize, Debug)]
struct EsIndexResult {
    status: Option<u16>,
    error: Option<EsErrorDetails>,
}
#[derive(Deserialize, Debug)]
struct EsErrorDetails {
    reason: String,
    #[serde(rename = "type")]
    err_type: String,
}
#[derive(Clone)]
pub struct ElasticsearchRetryLogic {
    pub retry_partial: bool,
}
impl RetryLogic for ElasticsearchRetryLogic {
    type Error = HttpError;
    type Response = ElasticsearchResponse;
    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
        true
    }
    fn should_retry_response(&self, response: &ElasticsearchResponse) -> RetryAction {
        let status = response.http_response.status();
        match status {
            StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
            StatusCode::NOT_IMPLEMENTED => {
                RetryAction::DontRetry("endpoint not implemented".into())
            }
            _ if status.is_server_error() => RetryAction::Retry(
                format!(
                    "{}: {}",
                    status,
                    String::from_utf8_lossy(response.http_response.body())
                )
                .into(),
            ),
            _ if status.is_client_error() => {
                let body = String::from_utf8_lossy(response.http_response.body());
                RetryAction::DontRetry(format!("client-side error, {}: {}", status, body).into())
            }
            _ if status.is_success() => {
                let body = String::from_utf8_lossy(response.http_response.body());
                if body.contains("\"errors\":true") {
                    match EsResultResponse::parse(&body) {
                        Ok(resp) => {
                            if self.retry_partial {
                                if let Some((status, error)) =
                                    resp.iter_status().find(|(status, _)| {
                                        *status == StatusCode::TOO_MANY_REQUESTS
                                            || status.is_server_error()
                                    })
                                {
                                    let msg = if let Some(error) = error {
                                        format!(
                                            "partial error, status: {}, error type: {}, reason: {}",
                                            status, error.err_type, error.reason
                                        )
                                    } else {
                                        format!("partial error, status: {}", status)
                                    };
                                    return RetryAction::Retry(msg.into());
                                }
                            }
                            RetryAction::DontRetry(resp.get_error_reason(&body).into())
                        }
                        Err(msg) => RetryAction::DontRetry(msg.into()),
                    }
                } else {
                    RetryAction::Successful
                }
            }
            _ => RetryAction::DontRetry(format!("response status: {}", status).into()),
        }
    }
}
#[cfg(test)]
mod tests {
    use bytes::Bytes;
    use http::Response;
    use similar_asserts::assert_eq;
    use vector_lib::{internal_event::CountByteSize, json_size::JsonSize};
    use super::*;
    use crate::event::EventStatus;
    #[test]
    fn handles_error_response() {
        let json = "{\"took\":185,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"log_lines\",\"_id\":\"3GhQLXEBE62DvOOUKdFH\",\"status\":400,\"error\":{\"type\":\"illegal_argument_exception\",\"reason\":\"mapper [message] of different type, current_type [long], merged_type [text]\"}}}]}";
        let response = Response::builder()
            .status(StatusCode::OK)
            .body(Bytes::from(json))
            .unwrap();
        let logic = ElasticsearchRetryLogic {
            retry_partial: false,
        };
        assert!(matches!(
            logic.should_retry_response(&ElasticsearchResponse {
                http_response: response,
                event_status: EventStatus::Rejected,
                events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
            }),
            RetryAction::DontRetry(_)
        ));
    }
    #[test]
    fn handles_partial_error_response() {
        let json = "{\"took\":34,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-asjkf1234\",\"_type\":\"log_lines\",\"_id\":\"4Z3QLYEBT52RtoOEKz2H\",\"status\":429}}]}";
        let response = Response::builder()
            .status(StatusCode::OK)
            .body(Bytes::from(json))
            .unwrap();
        let logic = ElasticsearchRetryLogic {
            retry_partial: true,
        };
        assert!(matches!(
            logic.should_retry_response(&ElasticsearchResponse {
                http_response: response,
                event_status: EventStatus::Errored,
                events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
            }),
            RetryAction::Retry(_)
        ));
    }
    #[test]
    fn get_index_error_reason() {
        let json = "{\"took\":185,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"log_lines\",\"_id\":\"3GhQLXEBE62DvOOUKdFH\",\"status\":400,\"error\":{\"type\":\"illegal_argument_exception\",\"reason\":\"mapper [message] of different type, current_type [long], merged_type [text]\"}}}]}";
        let reason = match EsResultResponse::parse(json) {
            Ok(resp) => resp.get_error_reason(json),
            Err(msg) => msg,
        };
        assert_eq!(reason, "error type: illegal_argument_exception, reason: mapper [message] of different type, current_type [long], merged_type [text]");
    }
    #[test]
    fn get_create_error_reason() {
        let json = "{\"took\":3,\"errors\":true,\"items\":[{\"create\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"_doc\",\"_id\":\"aBLq1HcBWD7eBWkW2nj4\",\"status\":400,\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"object mapping for [host] tried to parse field [host] as object, but found a concrete value\"}}}]}";
        let reason = match EsResultResponse::parse(json) {
            Ok(resp) => resp.get_error_reason(json),
            Err(msg) => msg,
        };
        assert_eq!(reason, "error type: mapper_parsing_exception, reason: object mapping for [host] tried to parse field [host] as object, but found a concrete value");
    }
}