use std::{
    error, fmt,
    num::{ParseFloatError, ParseIntError},
    str::Utf8Error,
    sync::LazyLock,
};
use regex::Regex;
use crate::{
    event::metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind},
    sources::util::extract_tag_key_and_value,
};
static WHITESPACE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\s+").unwrap());
static NONALPHANUM: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[^a-zA-Z_\-0-9\.]").unwrap());
#[derive(Clone)]
pub struct Parser {
    sanitize: bool,
}
impl Parser {
    pub const fn new(sanitize_keys: bool) -> Self {
        Self {
            sanitize: sanitize_keys,
        }
    }
    pub fn parse(&self, packet: &str) -> Result<Metric, ParseError> {
        let key_and_body = packet.splitn(2, ':').collect::<Vec<_>>();
        if key_and_body.len() != 2 {
            return Err(ParseError::Malformed(
                "should be key and body with ':' separator",
            ));
        }
        let (key, body) = (key_and_body[0], key_and_body[1]);
        let parts = body.split('|').collect::<Vec<_>>();
        if parts.len() < 2 {
            return Err(ParseError::Malformed(
                "body should have at least two pipe separated components",
            ));
        }
        let name = sanitize_key(key, self.sanitize);
        let metric_type = parts[1];
        let sampling = parts.get(2).filter(|s| s.starts_with('@'));
        let sample_rate = if let Some(s) = sampling {
            1.0 / sanitize_sampling(parse_sampling(s)?)
        } else {
            1.0
        };
        let tags = if sampling.is_none() {
            parts.get(2)
        } else {
            parts.get(3)
        };
        let tags = tags.filter(|s| s.starts_with('#'));
        let tags = tags.map(parse_tags).transpose()?;
        let metric = match metric_type {
            "c" => {
                let val: f64 = parts[0].parse()?;
                Metric::new(
                    name,
                    MetricKind::Incremental,
                    MetricValue::Counter {
                        value: val * sample_rate,
                    },
                )
                .with_tags(tags)
            }
            unit @ "h" | unit @ "ms" | unit @ "d" => {
                let val: f64 = parts[0].parse()?;
                Metric::new(
                name, MetricKind::Incremental, MetricValue::Distribution {
                    samples: vector_lib::samples![convert_to_base_units(unit, val) => sample_rate as u32],
                    statistic: convert_to_statistic(unit),
                },
            ).with_tags(tags)
            }
            "g" => {
                let value = if parts[0]
                    .chars()
                    .next()
                    .map(|c| c.is_ascii_digit())
                    .ok_or(ParseError::Malformed("empty first body component"))?
                {
                    parts[0].parse()?
                } else {
                    parts[0][1..].parse()?
                };
                match parse_direction(parts[0])? {
                    None => Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
                        .with_tags(tags),
                    Some(sign) => Metric::new(
                        name,
                        MetricKind::Incremental,
                        MetricValue::Gauge {
                            value: value * sign,
                        },
                    )
                    .with_tags(tags),
                }
            }
            "s" => Metric::new(
                name,
                MetricKind::Incremental,
                MetricValue::Set {
                    values: vec![parts[0].into()].into_iter().collect(),
                },
            )
            .with_tags(tags),
            other => return Err(ParseError::UnknownMetricType(other.into())),
        };
        Ok(metric)
    }
}
fn parse_sampling(input: &str) -> Result<f64, ParseError> {
    if !input.starts_with('@') || input.len() < 2 {
        return Err(ParseError::Malformed(
            "expected non empty '@'-prefixed sampling component",
        ));
    }
    let num: f64 = input[1..].parse()?;
    if num.is_sign_positive() {
        Ok(num)
    } else {
        Err(ParseError::Malformed("sample rate can't be negative"))
    }
}
fn parse_tags(input: &&str) -> Result<MetricTags, ParseError> {
    if !input.starts_with('#') || input.len() < 2 {
        return Err(ParseError::Malformed(
            "expected non empty '#'-prefixed tags component",
        ));
    }
    Ok(input[1..]
        .split(',')
        .map(extract_tag_key_and_value)
        .collect())
}
fn parse_direction(input: &str) -> Result<Option<f64>, ParseError> {
    match input
        .chars()
        .next()
        .ok_or(ParseError::Malformed("empty body component"))?
    {
        '+' => Ok(Some(1.0)),
        '-' => Ok(Some(-1.0)),
        c if c.is_ascii_digit() => Ok(None),
        _other => Err(ParseError::Malformed("invalid gauge value prefix")),
    }
}
fn sanitize_key(key: &str, sanitize: bool) -> String {
    if !sanitize {
        key.to_owned()
    } else {
        let s = key.replace('/', "'-");
        let s = WHITESPACE.replace_all(&s, "_");
        let s = NONALPHANUM.replace_all(&s, "");
        s.into()
    }
}
fn sanitize_sampling(sampling: f64) -> f64 {
    if sampling == 0.0 {
        1.0
    } else {
        sampling
    }
}
fn convert_to_base_units(unit: &str, val: f64) -> f64 {
    match unit {
        "ms" => val / 1000.0,
        _ => val,
    }
}
fn convert_to_statistic(unit: &str) -> StatisticKind {
    match unit {
        "d" => StatisticKind::Summary,
        _ => StatisticKind::Histogram,
    }
}
#[derive(Debug, PartialEq, Eq)]
pub enum ParseError {
    InvalidUtf8(Utf8Error),
    Malformed(&'static str),
    UnknownMetricType(String),
    InvalidInteger(ParseIntError),
    InvalidFloat(ParseFloatError),
}
impl fmt::Display for ParseError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "Statsd parse error: {:?}", self)
    }
}
vector_lib::impl_event_data_eq!(ParseError);
impl error::Error for ParseError {}
impl From<ParseIntError> for ParseError {
    fn from(e: ParseIntError) -> ParseError {
        ParseError::InvalidInteger(e)
    }
}
impl From<ParseFloatError> for ParseError {
    fn from(e: ParseFloatError) -> ParseError {
        ParseError::InvalidFloat(e)
    }
}
#[cfg(test)]
mod test {
    use vector_lib::assert_event_data_eq;
    use vector_lib::{event::metric::TagValue, metric_tags};
    use super::{sanitize_key, sanitize_sampling, ParseError, Parser};
    use crate::event::metric::{Metric, MetricKind, MetricValue, StatisticKind};
    const SANITIZING_PARSER: Parser = Parser::new(true);
    fn parse(packet: &str) -> Result<Metric, ParseError> {
        SANITIZING_PARSER.parse(packet)
    }
    const NON_SANITIZING_PARSER: Parser = Parser::new(false);
    fn unsanitized_parse(packet: &str) -> Result<Metric, ParseError> {
        NON_SANITIZING_PARSER.parse(packet)
    }
    #[test]
    fn basic_counter() {
        assert_event_data_eq!(
            parse("foo:1|c"),
            Ok(Metric::new(
                "foo",
                MetricKind::Incremental,
                MetricValue::Counter { value: 1.0 },
            )),
        );
    }
    #[test]
    fn tagged_counter() {
        assert_event_data_eq!(
            parse("foo/how@ever baz:1|c|#tag1,tag2:value"),
            Ok(Metric::new(
                "foo-however_baz",
                MetricKind::Incremental,
                MetricValue::Counter { value: 1.0 },
            )
            .with_tags(Some(metric_tags!(
                "tag1" => TagValue::Bare,
                "tag2" => "value",
            )))),
        );
    }
    #[test]
    fn tagged_not_sanitized_counter() {
        assert_event_data_eq!(
            unsanitized_parse("foo/bar@baz baz:1|c|#tag1,tag2:value"),
            Ok(Metric::new(
                "foo/bar@baz baz",
                MetricKind::Incremental,
                MetricValue::Counter { value: 1.0 },
            )
            .with_tags(Some(metric_tags!(
                "tag1" => TagValue::Bare,
                "tag2" => "value",
            )))),
        );
    }
    #[test]
    fn enhanced_tags() {
        assert_event_data_eq!(
            parse("foo:1|c|#tag1,tag2:valueA,tag2:valueB,tag3:value,tag3,tag4:"),
            Ok(Metric::new(
                "foo",
                MetricKind::Incremental,
                MetricValue::Counter { value: 1.0 },
            )
            .with_tags(Some(metric_tags!(
                "tag1" => TagValue::Bare,
                "tag2" => "valueA",
                "tag2" => "valueB",
                "tag3" => "value",
                "tag3" => TagValue::Bare,
                "tag4" => "",
            )))),
        );
    }
    #[test]
    fn sampled_counter() {
        assert_event_data_eq!(
            parse("bar:2|c|@0.1"),
            Ok(Metric::new(
                "bar",
                MetricKind::Incremental,
                MetricValue::Counter { value: 20.0 },
            )),
        );
    }
    #[test]
    fn zero_sampled_counter() {
        assert_event_data_eq!(
            parse("bar:2|c|@0"),
            Ok(Metric::new(
                "bar",
                MetricKind::Incremental,
                MetricValue::Counter { value: 2.0 },
            )),
        );
    }
    #[test]
    fn sampled_timer() {
        assert_event_data_eq!(
            parse("glork:320|ms|@0.1"),
            Ok(Metric::new(
                "glork",
                MetricKind::Incremental,
                MetricValue::Distribution {
                    samples: vector_lib::samples![0.320 => 10],
                    statistic: StatisticKind::Histogram
                },
            )),
        );
    }
    #[test]
    fn sampled_tagged_histogram() {
        assert_event_data_eq!(
            parse("glork:320|h|@0.1|#region:us-west1,production,e:"),
            Ok(Metric::new(
                "glork",
                MetricKind::Incremental,
                MetricValue::Distribution {
                    samples: vector_lib::samples![320.0 => 10],
                    statistic: StatisticKind::Histogram
                },
            )
            .with_tags(Some(metric_tags!(
                "region" => "us-west1",
                "production" => TagValue::Bare,
                "e" => "",
            )))),
        );
    }
    #[test]
    fn sampled_distribution() {
        assert_event_data_eq!(
            parse("glork:320|d|@0.1|#region:us-west1,production,e:"),
            Ok(Metric::new(
                "glork",
                MetricKind::Incremental,
                MetricValue::Distribution {
                    samples: vector_lib::samples![320.0 => 10],
                    statistic: StatisticKind::Summary
                },
            )
            .with_tags(Some(metric_tags!(
                "region" => "us-west1",
                "production" => TagValue::Bare,
                "e" => "",
            )))),
        );
    }
    #[test]
    fn simple_gauge() {
        assert_event_data_eq!(
            parse("gaugor:333|g"),
            Ok(Metric::new(
                "gaugor",
                MetricKind::Absolute,
                MetricValue::Gauge { value: 333.0 },
            )),
        );
    }
    #[test]
    fn signed_gauge() {
        assert_event_data_eq!(
            parse("gaugor:-4|g"),
            Ok(Metric::new(
                "gaugor",
                MetricKind::Incremental,
                MetricValue::Gauge { value: -4.0 },
            )),
        );
        assert_event_data_eq!(
            parse("gaugor:+10|g"),
            Ok(Metric::new(
                "gaugor",
                MetricKind::Incremental,
                MetricValue::Gauge { value: 10.0 },
            )),
        );
    }
    #[test]
    fn sets() {
        assert_event_data_eq!(
            parse("uniques:765|s"),
            Ok(Metric::new(
                "uniques",
                MetricKind::Incremental,
                MetricValue::Set {
                    values: vec!["765".into()].into_iter().collect()
                },
            )),
        );
    }
    #[test]
    fn sanitizing_keys() {
        assert_eq!("foo-bar-baz", sanitize_key("foo/bar/baz", true));
        assert_eq!("foo/bar/baz", sanitize_key("foo/bar/baz", false));
        assert_eq!("foo_bar_baz", sanitize_key("foo bar  baz", true));
        assert_eq!("foo.__bar_.baz", sanitize_key("foo. @& bar_$!#.baz", true));
        assert_eq!(
            "foo. @& bar_$!#.baz",
            sanitize_key("foo. @& bar_$!#.baz", false)
        );
    }
    #[test]
    fn sanitizing_sampling() {
        assert_eq!(1.0, sanitize_sampling(0.0));
        assert_eq!(2.5, sanitize_sampling(2.5));
        assert_eq!(-5.0, sanitize_sampling(-5.0));
    }
}