use std::collections::HashSet;
use crate::event::{LogEvent, Value};
use bytes::{Bytes, BytesMut};
use chrono::{DateTime, Utc};
use ordered_float::NotNan;
use vector_lib::configurable::configurable_component;
use vrl::path::OwnedTargetPath;
#[configurable_component]
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
#[serde(rename_all = "snake_case")]
pub enum MergeStrategy {
    Discard,
    Retain,
    Sum,
    Max,
    Min,
    Array,
    Concat,
    ConcatNewline,
    ConcatRaw,
    ShortestArray,
    LongestArray,
    FlatUnique,
}
#[derive(Debug, Clone)]
struct DiscardMerger {
    v: Value,
}
impl DiscardMerger {
    const fn new(v: Value) -> Self {
        Self { v }
    }
}
impl ReduceValueMerger for DiscardMerger {
    fn add(&mut self, _v: Value) -> Result<(), String> {
        Ok(())
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        v.insert(path, self.v);
        Ok(())
    }
}
#[derive(Debug, Clone)]
struct RetainMerger {
    v: Value,
}
impl RetainMerger {
    #[allow(clippy::missing_const_for_fn)] fn new(v: Value) -> Self {
        Self { v }
    }
}
impl ReduceValueMerger for RetainMerger {
    fn add(&mut self, v: Value) -> Result<(), String> {
        if Value::Null != v {
            self.v = v;
        }
        Ok(())
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        v.insert(path, self.v);
        Ok(())
    }
}
#[derive(Debug, Clone)]
struct ConcatMerger {
    v: BytesMut,
    join_by: Option<Vec<u8>>,
}
impl ConcatMerger {
    fn new(v: Bytes, join_by: Option<char>) -> Self {
        let join_by = join_by.map(|c| c.to_string().into_bytes());
        Self {
            v: BytesMut::from(&v[..]),
            join_by,
        }
    }
}
impl ReduceValueMerger for ConcatMerger {
    fn add(&mut self, v: Value) -> Result<(), String> {
        if let Value::Bytes(b) = v {
            if let Some(buf) = self.join_by.as_ref() {
                self.v.extend(&buf[..]);
            }
            self.v.extend_from_slice(&b);
            Ok(())
        } else {
            Err(format!(
                "expected string value, found: '{}'",
                v.to_string_lossy()
            ))
        }
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        v.insert(path, Value::Bytes(self.v.into()));
        Ok(())
    }
}
#[derive(Debug, Clone)]
struct ConcatArrayMerger {
    v: Vec<Value>,
}
impl ConcatArrayMerger {
    const fn new(v: Vec<Value>) -> Self {
        Self { v }
    }
}
impl ReduceValueMerger for ConcatArrayMerger {
    fn add(&mut self, v: Value) -> Result<(), String> {
        if let Value::Array(a) = v {
            self.v.extend_from_slice(&a);
        } else {
            self.v.push(v);
        }
        Ok(())
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        v.insert(path, Value::Array(self.v));
        Ok(())
    }
}
#[derive(Debug, Clone)]
struct ArrayMerger {
    v: Vec<Value>,
}
impl ArrayMerger {
    fn new(v: Value) -> Self {
        Self { v: vec![v] }
    }
}
impl ReduceValueMerger for ArrayMerger {
    fn add(&mut self, v: Value) -> Result<(), String> {
        self.v.push(v);
        Ok(())
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        v.insert(path, Value::Array(self.v));
        Ok(())
    }
}
#[derive(Debug, Clone)]
struct LongestArrayMerger {
    v: Vec<Value>,
}
impl LongestArrayMerger {
    const fn new(v: Vec<Value>) -> Self {
        Self { v }
    }
}
impl ReduceValueMerger for LongestArrayMerger {
    fn add(&mut self, v: Value) -> Result<(), String> {
        if let Value::Array(a) = v {
            if a.len() > self.v.len() {
                self.v = a;
            }
            Ok(())
        } else {
            Err(format!(
                "expected array value, found: '{}'",
                v.to_string_lossy()
            ))
        }
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        v.insert(path, Value::Array(self.v));
        Ok(())
    }
}
#[derive(Debug, Clone)]
struct ShortestArrayMerger {
    v: Vec<Value>,
}
impl ShortestArrayMerger {
    const fn new(v: Vec<Value>) -> Self {
        Self { v }
    }
}
impl ReduceValueMerger for ShortestArrayMerger {
    fn add(&mut self, v: Value) -> Result<(), String> {
        if let Value::Array(a) = v {
            if a.len() < self.v.len() {
                self.v = a;
            }
            Ok(())
        } else {
            Err(format!(
                "expected array value, found: '{}'",
                v.to_string_lossy()
            ))
        }
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        v.insert(path, Value::Array(self.v));
        Ok(())
    }
}
#[derive(Debug, Clone)]
struct FlatUniqueMerger {
    v: HashSet<Value>,
}
#[allow(clippy::mutable_key_type)] fn insert_value(h: &mut HashSet<Value>, v: Value) {
    match v {
        Value::Object(m) => {
            for (_, v) in m {
                h.insert(v);
            }
        }
        Value::Array(vec) => {
            for v in vec {
                h.insert(v);
            }
        }
        _ => {
            h.insert(v);
        }
    }
}
impl FlatUniqueMerger {
    #[allow(clippy::mutable_key_type)] fn new(v: Value) -> Self {
        let mut h = HashSet::default();
        insert_value(&mut h, v);
        Self { v: h }
    }
}
impl ReduceValueMerger for FlatUniqueMerger {
    fn add(&mut self, v: Value) -> Result<(), String> {
        insert_value(&mut self.v, v);
        Ok(())
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        v.insert(path, Value::Array(self.v.into_iter().collect()));
        Ok(())
    }
}
#[derive(Debug, Clone)]
struct TimestampWindowMerger {
    started: DateTime<Utc>,
    latest: DateTime<Utc>,
}
impl TimestampWindowMerger {
    const fn new(v: DateTime<Utc>) -> Self {
        Self {
            started: v,
            latest: v,
        }
    }
}
impl ReduceValueMerger for TimestampWindowMerger {
    fn add(&mut self, v: Value) -> Result<(), String> {
        if let Value::Timestamp(ts) = v {
            self.latest = ts
        } else {
            return Err(format!(
                "expected timestamp value, found: {}",
                v.to_string_lossy()
            ));
        }
        Ok(())
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        v.insert(
            format!("{}_end", path).as_str(),
            Value::Timestamp(self.latest),
        );
        v.insert(path, Value::Timestamp(self.started));
        Ok(())
    }
}
#[derive(Debug, Clone)]
enum NumberMergerValue {
    Int(i64),
    Float(NotNan<f64>),
}
impl From<i64> for NumberMergerValue {
    fn from(v: i64) -> Self {
        NumberMergerValue::Int(v)
    }
}
impl From<NotNan<f64>> for NumberMergerValue {
    fn from(v: NotNan<f64>) -> Self {
        NumberMergerValue::Float(v)
    }
}
#[derive(Debug, Clone)]
struct AddNumbersMerger {
    v: NumberMergerValue,
}
impl AddNumbersMerger {
    const fn new(v: NumberMergerValue) -> Self {
        Self { v }
    }
}
impl ReduceValueMerger for AddNumbersMerger {
    fn add(&mut self, v: Value) -> Result<(), String> {
        match v {
            Value::Integer(i) => match self.v {
                NumberMergerValue::Int(j) => self.v = NumberMergerValue::Int(i + j),
                NumberMergerValue::Float(j) => {
                    self.v = NumberMergerValue::Float(NotNan::new(i as f64).unwrap() + j)
                }
            },
            Value::Float(f) => match self.v {
                NumberMergerValue::Int(j) => self.v = NumberMergerValue::Float(f + j as f64),
                NumberMergerValue::Float(j) => self.v = NumberMergerValue::Float(f + j),
            },
            _ => {
                return Err(format!(
                    "expected numeric value, found: '{}'",
                    v.to_string_lossy()
                ));
            }
        }
        Ok(())
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        match self.v {
            NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
            NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
        };
        Ok(())
    }
}
#[derive(Debug, Clone)]
struct MaxNumberMerger {
    v: NumberMergerValue,
}
impl MaxNumberMerger {
    const fn new(v: NumberMergerValue) -> Self {
        Self { v }
    }
}
impl ReduceValueMerger for MaxNumberMerger {
    fn add(&mut self, v: Value) -> Result<(), String> {
        match v {
            Value::Integer(i) => {
                match self.v {
                    NumberMergerValue::Int(i2) => {
                        if i > i2 {
                            self.v = NumberMergerValue::Int(i);
                        }
                    }
                    NumberMergerValue::Float(f2) => {
                        let f = NotNan::new(i as f64).unwrap();
                        if f > f2 {
                            self.v = NumberMergerValue::Float(f);
                        }
                    }
                };
            }
            Value::Float(f) => {
                let f2 = match self.v {
                    NumberMergerValue::Int(i2) => NotNan::new(i2 as f64).unwrap(),
                    NumberMergerValue::Float(f2) => f2,
                };
                if f > f2 {
                    self.v = NumberMergerValue::Float(f);
                }
            }
            _ => {
                return Err(format!(
                    "expected numeric value, found: '{}'",
                    v.to_string_lossy()
                ));
            }
        }
        Ok(())
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        match self.v {
            NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
            NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
        };
        Ok(())
    }
}
#[derive(Debug, Clone)]
struct MinNumberMerger {
    v: NumberMergerValue,
}
impl MinNumberMerger {
    const fn new(v: NumberMergerValue) -> Self {
        Self { v }
    }
}
impl ReduceValueMerger for MinNumberMerger {
    fn add(&mut self, v: Value) -> Result<(), String> {
        match v {
            Value::Integer(i) => {
                match self.v {
                    NumberMergerValue::Int(i2) => {
                        if i < i2 {
                            self.v = NumberMergerValue::Int(i);
                        }
                    }
                    NumberMergerValue::Float(f2) => {
                        let f = NotNan::new(i as f64).unwrap();
                        if f < f2 {
                            self.v = NumberMergerValue::Float(f);
                        }
                    }
                };
            }
            Value::Float(f) => {
                let f2 = match self.v {
                    NumberMergerValue::Int(i2) => NotNan::new(i2 as f64).unwrap(),
                    NumberMergerValue::Float(f2) => f2,
                };
                if f < f2 {
                    self.v = NumberMergerValue::Float(f);
                }
            }
            _ => {
                return Err(format!(
                    "expected numeric value, found: '{}'",
                    v.to_string_lossy()
                ));
            }
        }
        Ok(())
    }
    fn insert_into(
        self: Box<Self>,
        path: &OwnedTargetPath,
        v: &mut LogEvent,
    ) -> Result<(), String> {
        match self.v {
            NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
            NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
        };
        Ok(())
    }
}
pub trait ReduceValueMerger: std::fmt::Debug + Send + Sync {
    fn add(&mut self, v: Value) -> Result<(), String>;
    fn insert_into(self: Box<Self>, path: &OwnedTargetPath, v: &mut LogEvent)
        -> Result<(), String>;
}
impl From<Value> for Box<dyn ReduceValueMerger> {
    fn from(v: Value) -> Self {
        match v {
            Value::Integer(i) => Box::new(AddNumbersMerger::new(i.into())),
            Value::Float(f) => Box::new(AddNumbersMerger::new(f.into())),
            Value::Timestamp(ts) => Box::new(TimestampWindowMerger::new(ts)),
            Value::Object(_) => Box::new(DiscardMerger::new(v)),
            Value::Null => Box::new(DiscardMerger::new(v)),
            Value::Boolean(_) => Box::new(DiscardMerger::new(v)),
            Value::Bytes(_) => Box::new(DiscardMerger::new(v)),
            Value::Regex(_) => Box::new(DiscardMerger::new(v)),
            Value::Array(_) => Box::new(DiscardMerger::new(v)),
        }
    }
}
pub(crate) fn get_value_merger(
    v: Value,
    m: &MergeStrategy,
) -> Result<Box<dyn ReduceValueMerger>, String> {
    match m {
        MergeStrategy::Sum => match v {
            Value::Integer(i) => Ok(Box::new(AddNumbersMerger::new(i.into()))),
            Value::Float(f) => Ok(Box::new(AddNumbersMerger::new(f.into()))),
            _ => Err(format!(
                "expected number value, found: '{}'",
                v.to_string_lossy()
            )),
        },
        MergeStrategy::Max => match v {
            Value::Integer(i) => Ok(Box::new(MaxNumberMerger::new(i.into()))),
            Value::Float(f) => Ok(Box::new(MaxNumberMerger::new(f.into()))),
            _ => Err(format!(
                "expected number value, found: '{}'",
                v.to_string_lossy()
            )),
        },
        MergeStrategy::Min => match v {
            Value::Integer(i) => Ok(Box::new(MinNumberMerger::new(i.into()))),
            Value::Float(f) => Ok(Box::new(MinNumberMerger::new(f.into()))),
            _ => Err(format!(
                "expected number value, found: '{}'",
                v.to_string_lossy()
            )),
        },
        MergeStrategy::Concat => match v {
            Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, Some(' ')))),
            Value::Array(a) => Ok(Box::new(ConcatArrayMerger::new(a))),
            _ => Err(format!(
                "expected string or array value, found: '{}'",
                v.to_string_lossy()
            )),
        },
        MergeStrategy::ConcatNewline => match v {
            Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, Some('\n')))),
            _ => Err(format!(
                "expected string value, found: '{}'",
                v.to_string_lossy()
            )),
        },
        MergeStrategy::ConcatRaw => match v {
            Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, None))),
            _ => Err(format!(
                "expected string value, found: '{}'",
                v.to_string_lossy()
            )),
        },
        MergeStrategy::Array => Ok(Box::new(ArrayMerger::new(v))),
        MergeStrategy::ShortestArray => match v {
            Value::Array(a) => Ok(Box::new(ShortestArrayMerger::new(a))),
            _ => Err(format!(
                "expected array value, found: '{}'",
                v.to_string_lossy()
            )),
        },
        MergeStrategy::LongestArray => match v {
            Value::Array(a) => Ok(Box::new(LongestArrayMerger::new(a))),
            _ => Err(format!(
                "expected array value, found: '{}'",
                v.to_string_lossy()
            )),
        },
        MergeStrategy::Discard => Ok(Box::new(DiscardMerger::new(v))),
        MergeStrategy::Retain => Ok(Box::new(RetainMerger::new(v))),
        MergeStrategy::FlatUnique => Ok(Box::new(FlatUniqueMerger::new(v))),
    }
}
#[cfg(test)]
mod test {
    use super::*;
    use crate::event::LogEvent;
    use serde_json::json;
    use vrl::owned_event_path;
    #[test]
    fn initial_values() {
        assert!(get_value_merger("foo".into(), &MergeStrategy::Discard).is_ok());
        assert!(get_value_merger("foo".into(), &MergeStrategy::Retain).is_ok());
        assert!(get_value_merger("foo".into(), &MergeStrategy::Sum).is_err());
        assert!(get_value_merger("foo".into(), &MergeStrategy::Max).is_err());
        assert!(get_value_merger("foo".into(), &MergeStrategy::Min).is_err());
        assert!(get_value_merger("foo".into(), &MergeStrategy::Array).is_ok());
        assert!(get_value_merger("foo".into(), &MergeStrategy::LongestArray).is_err());
        assert!(get_value_merger("foo".into(), &MergeStrategy::ShortestArray).is_err());
        assert!(get_value_merger("foo".into(), &MergeStrategy::Concat).is_ok());
        assert!(get_value_merger("foo".into(), &MergeStrategy::ConcatNewline).is_ok());
        assert!(get_value_merger("foo".into(), &MergeStrategy::ConcatRaw).is_ok());
        assert!(get_value_merger("foo".into(), &MergeStrategy::FlatUnique).is_ok());
        assert!(get_value_merger(42.into(), &MergeStrategy::Discard).is_ok());
        assert!(get_value_merger(42.into(), &MergeStrategy::Retain).is_ok());
        assert!(get_value_merger(42.into(), &MergeStrategy::Sum).is_ok());
        assert!(get_value_merger(42.into(), &MergeStrategy::Min).is_ok());
        assert!(get_value_merger(42.into(), &MergeStrategy::Max).is_ok());
        assert!(get_value_merger(42.into(), &MergeStrategy::Array).is_ok());
        assert!(get_value_merger(42.into(), &MergeStrategy::LongestArray).is_err());
        assert!(get_value_merger(42.into(), &MergeStrategy::ShortestArray).is_err());
        assert!(get_value_merger(42.into(), &MergeStrategy::Concat).is_err());
        assert!(get_value_merger(42.into(), &MergeStrategy::ConcatNewline).is_err());
        assert!(get_value_merger(42.into(), &MergeStrategy::ConcatRaw).is_err());
        assert!(get_value_merger(42.into(), &MergeStrategy::FlatUnique).is_ok());
        assert!(get_value_merger(42.into(), &MergeStrategy::Discard).is_ok());
        assert!(get_value_merger(42.into(), &MergeStrategy::Retain).is_ok());
        assert!(get_value_merger(4.2.into(), &MergeStrategy::Sum).is_ok());
        assert!(get_value_merger(4.2.into(), &MergeStrategy::Min).is_ok());
        assert!(get_value_merger(4.2.into(), &MergeStrategy::Max).is_ok());
        assert!(get_value_merger(4.2.into(), &MergeStrategy::Array).is_ok());
        assert!(get_value_merger(4.2.into(), &MergeStrategy::LongestArray).is_err());
        assert!(get_value_merger(4.2.into(), &MergeStrategy::ShortestArray).is_err());
        assert!(get_value_merger(4.2.into(), &MergeStrategy::Concat).is_err());
        assert!(get_value_merger(4.2.into(), &MergeStrategy::ConcatNewline).is_err());
        assert!(get_value_merger(4.2.into(), &MergeStrategy::ConcatRaw).is_err());
        assert!(get_value_merger(4.2.into(), &MergeStrategy::FlatUnique).is_ok());
        assert!(get_value_merger(true.into(), &MergeStrategy::Discard).is_ok());
        assert!(get_value_merger(true.into(), &MergeStrategy::Retain).is_ok());
        assert!(get_value_merger(true.into(), &MergeStrategy::Sum).is_err());
        assert!(get_value_merger(true.into(), &MergeStrategy::Max).is_err());
        assert!(get_value_merger(true.into(), &MergeStrategy::Min).is_err());
        assert!(get_value_merger(true.into(), &MergeStrategy::Array).is_ok());
        assert!(get_value_merger(true.into(), &MergeStrategy::LongestArray).is_err());
        assert!(get_value_merger(true.into(), &MergeStrategy::ShortestArray).is_err());
        assert!(get_value_merger(true.into(), &MergeStrategy::Concat).is_err());
        assert!(get_value_merger(true.into(), &MergeStrategy::ConcatNewline).is_err());
        assert!(get_value_merger(true.into(), &MergeStrategy::ConcatRaw).is_err());
        assert!(get_value_merger(true.into(), &MergeStrategy::FlatUnique).is_ok());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Discard).is_ok());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Retain).is_ok());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Sum).is_err());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Max).is_err());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Min).is_err());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Array).is_ok());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::LongestArray).is_err());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ShortestArray).is_err());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Concat).is_err());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ConcatNewline).is_err());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ConcatRaw).is_err());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Discard).is_ok());
        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::FlatUnique).is_ok());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Discard).is_ok());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Retain).is_ok());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Sum).is_err());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Max).is_err());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Min).is_err());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Array).is_ok());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::LongestArray).is_ok());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::ShortestArray).is_ok());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Concat).is_ok());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::ConcatNewline).is_err());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::ConcatRaw).is_err());
        assert!(get_value_merger(json!([]).into(), &MergeStrategy::FlatUnique).is_ok());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Discard).is_ok());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Retain).is_ok());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Sum).is_err());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Max).is_err());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Min).is_err());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Array).is_ok());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::LongestArray).is_err());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::ShortestArray).is_err());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Concat).is_err());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::ConcatNewline).is_err());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::ConcatRaw).is_err());
        assert!(get_value_merger(json!({}).into(), &MergeStrategy::FlatUnique).is_ok());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Discard).is_ok());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Retain).is_ok());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Sum).is_err());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Max).is_err());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Min).is_err());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Array).is_ok());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::LongestArray).is_err());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::ShortestArray).is_err());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Concat).is_err());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::ConcatNewline).is_err());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::ConcatRaw).is_err());
        assert!(get_value_merger(json!(null).into(), &MergeStrategy::FlatUnique).is_ok());
    }
    #[test]
    fn merging_values() {
        assert_eq!(
            merge("foo".into(), "bar".into(), &MergeStrategy::Discard),
            Ok("foo".into())
        );
        assert_eq!(
            merge("foo".into(), "bar".into(), &MergeStrategy::Retain),
            Ok("bar".into())
        );
        assert_eq!(
            merge("foo".into(), "bar".into(), &MergeStrategy::Array),
            Ok(json!(["foo", "bar"]).into())
        );
        assert_eq!(
            merge("foo".into(), "bar".into(), &MergeStrategy::Concat),
            Ok("foo bar".into())
        );
        assert_eq!(
            merge("foo".into(), "bar".into(), &MergeStrategy::ConcatNewline),
            Ok("foo\nbar".into())
        );
        assert_eq!(
            merge("foo".into(), "bar".into(), &MergeStrategy::ConcatRaw),
            Ok("foobar".into())
        );
        assert!(merge("foo".into(), 42.into(), &MergeStrategy::Concat).is_err());
        assert!(merge("foo".into(), 4.2.into(), &MergeStrategy::Concat).is_err());
        assert!(merge("foo".into(), true.into(), &MergeStrategy::Concat).is_err());
        assert!(merge("foo".into(), Utc::now().into(), &MergeStrategy::Concat).is_err());
        assert!(merge("foo".into(), json!({}).into(), &MergeStrategy::Concat).is_err());
        assert!(merge("foo".into(), json!([]).into(), &MergeStrategy::Concat).is_err());
        assert!(merge("foo".into(), json!(null).into(), &MergeStrategy::Concat).is_err());
        assert_eq!(
            merge("foo".into(), "bar".into(), &MergeStrategy::ConcatNewline),
            Ok("foo\nbar".into())
        );
        assert_eq!(
            merge(21.into(), 21.into(), &MergeStrategy::Sum),
            Ok(42.into())
        );
        assert_eq!(
            merge(41.into(), 42.into(), &MergeStrategy::Max),
            Ok(42.into())
        );
        assert_eq!(
            merge(42.into(), 41.into(), &MergeStrategy::Max),
            Ok(42.into())
        );
        assert_eq!(
            merge(42.into(), 43.into(), &MergeStrategy::Min),
            Ok(42.into())
        );
        assert_eq!(
            merge(43.into(), 42.into(), &MergeStrategy::Min),
            Ok(42.into())
        );
        assert_eq!(
            merge(2.1.into(), 2.1.into(), &MergeStrategy::Sum),
            Ok(4.2.into())
        );
        assert_eq!(
            merge(4.1.into(), 4.2.into(), &MergeStrategy::Max),
            Ok(4.2.into())
        );
        assert_eq!(
            merge(4.2.into(), 4.1.into(), &MergeStrategy::Max),
            Ok(4.2.into())
        );
        assert_eq!(
            merge(4.2.into(), 4.3.into(), &MergeStrategy::Min),
            Ok(4.2.into())
        );
        assert_eq!(
            merge(4.3.into(), 4.2.into(), &MergeStrategy::Min),
            Ok(4.2.into())
        );
        assert_eq!(
            merge(
                json!([4_i64]).into(),
                json!([2_i64]).into(),
                &MergeStrategy::Concat
            ),
            Ok(json!([4_i64, 2_i64]).into())
        );
        assert_eq!(
            merge(json!([]).into(), 42_i64.into(), &MergeStrategy::Concat),
            Ok(json!([42_i64]).into())
        );
        assert_eq!(
            merge(
                json!([34_i64]).into(),
                json!([42_i64, 43_i64]).into(),
                &MergeStrategy::ShortestArray
            ),
            Ok(json!([34_i64]).into())
        );
        assert_eq!(
            merge(
                json!([34_i64]).into(),
                json!([42_i64, 43_i64]).into(),
                &MergeStrategy::LongestArray
            ),
            Ok(json!([42_i64, 43_i64]).into())
        );
        let v = merge(34_i64.into(), 43_i64.into(), &MergeStrategy::FlatUnique).unwrap();
        if let Value::Array(v) = v.clone() {
            let v: Vec<_> = v
                .into_iter()
                .map(|i| {
                    if let Value::Integer(i) = i {
                        i
                    } else {
                        panic!("Bad value");
                    }
                })
                .collect();
            assert_eq!(v.iter().filter(|i| **i == 34i64).count(), 1);
            assert_eq!(v.iter().filter(|i| **i == 43i64).count(), 1);
        } else {
            panic!("Not array");
        }
        let v = merge(v, 34_i32.into(), &MergeStrategy::FlatUnique).unwrap();
        if let Value::Array(v) = v {
            let v: Vec<_> = v
                .into_iter()
                .map(|i| {
                    if let Value::Integer(i) = i {
                        i
                    } else {
                        panic!("Bad value");
                    }
                })
                .collect();
            assert_eq!(v.iter().filter(|i| **i == 34i64).count(), 1);
            assert_eq!(v.iter().filter(|i| **i == 43i64).count(), 1);
        } else {
            panic!("Not array");
        }
    }
    fn merge(initial: Value, additional: Value, strategy: &MergeStrategy) -> Result<Value, String> {
        let mut merger = get_value_merger(initial, strategy)?;
        merger.add(additional)?;
        let mut output = LogEvent::default();
        let out_path = owned_event_path!("out");
        merger.insert_into(&out_path, &mut output)?;
        Ok(output.remove(&out_path).unwrap())
    }
}