use std::borrow::Cow;
use std::{collections::BTreeMap, convert::TryFrom, marker::PhantomData};
use lookup::lookup_v2::OwnedSegment;
use lookup::{OwnedTargetPath, OwnedValuePath, PathPrefix};
use snafu::Snafu;
use vrl::compiler::value::VrlValueConvert;
use vrl::compiler::{ProgramInfo, SecretTarget, Target};
use vrl::prelude::Collection;
use vrl::value::{Kind, ObjectMap, Value};
use super::{metric::TagValue, Event, EventMetadata, LogEvent, Metric, MetricKind, TraceEvent};
use crate::config::{log_schema, LogNamespace};
use crate::schema::Definition;
const VALID_METRIC_PATHS_SET: &str = ".name, .namespace, .timestamp, .kind, .tags";
const VALID_METRIC_PATHS_GET: &str = ".name, .namespace, .timestamp, .kind, .tags, .type";
const MAX_METRIC_PATH_DEPTH: usize = 3;
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)]
pub enum VrlTarget {
    LogEvent(Value, EventMetadata),
    Metric {
        metric: Metric,
        value: Value,
        multi_value_tags: bool,
    },
    Trace(Value, EventMetadata),
}
pub enum TargetEvents {
    One(Event),
    Logs(TargetIter<LogEvent>),
    Traces(TargetIter<TraceEvent>),
}
pub struct TargetIter<T> {
    iter: std::vec::IntoIter<Value>,
    metadata: EventMetadata,
    _marker: PhantomData<T>,
    log_namespace: LogNamespace,
}
fn create_log_event(value: Value, metadata: EventMetadata) -> LogEvent {
    let mut log = LogEvent::new_with_metadata(metadata);
    log.maybe_insert(log_schema().message_key_target_path(), value);
    log
}
impl Iterator for TargetIter<LogEvent> {
    type Item = Event;
    fn next(&mut self) -> Option<Self::Item> {
        self.iter.next().map(|v| {
            match self.log_namespace {
                LogNamespace::Legacy => match v {
                    value @ Value::Object(_) => LogEvent::from_parts(value, self.metadata.clone()),
                    value => create_log_event(value, self.metadata.clone()),
                },
                LogNamespace::Vector => LogEvent::from_parts(v, self.metadata.clone()),
            }
            .into()
        })
    }
}
impl Iterator for TargetIter<TraceEvent> {
    type Item = Event;
    fn next(&mut self) -> Option<Self::Item> {
        self.iter.next().map(|v| {
            match v {
                value @ Value::Object(_) => {
                    TraceEvent::from(LogEvent::from_parts(value, self.metadata.clone()))
                }
                value => TraceEvent::from(create_log_event(value, self.metadata.clone())),
            }
            .into()
        })
    }
}
impl VrlTarget {
    pub fn new(event: Event, info: &ProgramInfo, multi_value_metric_tags: bool) -> Self {
        match event {
            Event::Log(event) => {
                let (value, metadata) = event.into_parts();
                VrlTarget::LogEvent(value, metadata)
            }
            Event::Metric(metric) => {
                let value = precompute_metric_value(&metric, info);
                VrlTarget::Metric {
                    metric,
                    value,
                    multi_value_tags: multi_value_metric_tags,
                }
            }
            Event::Trace(event) => {
                let (fields, metadata) = event.into_parts();
                VrlTarget::Trace(Value::Object(fields), metadata)
            }
        }
    }
    pub fn modify_schema_definition_for_into_events(input: Definition) -> Definition {
        let log_namespaces = input.log_namespaces().clone();
        let merged_arrays = merge_array_definitions(input);
        Definition::combine_log_namespaces(
            &log_namespaces,
            move_field_definitions_into_message(merged_arrays.clone()),
            merged_arrays,
        )
    }
    pub fn into_events(self, log_namespace: LogNamespace) -> TargetEvents {
        match self {
            VrlTarget::LogEvent(value, metadata) => match value {
                value @ Value::Object(_) => {
                    TargetEvents::One(LogEvent::from_parts(value, metadata).into())
                }
                Value::Array(values) => TargetEvents::Logs(TargetIter {
                    iter: values.into_iter(),
                    metadata,
                    _marker: PhantomData,
                    log_namespace,
                }),
                v => match log_namespace {
                    LogNamespace::Vector => {
                        TargetEvents::One(LogEvent::from_parts(v, metadata).into())
                    }
                    LogNamespace::Legacy => TargetEvents::One(create_log_event(v, metadata).into()),
                },
            },
            VrlTarget::Trace(value, metadata) => match value {
                value @ Value::Object(_) => {
                    let log = LogEvent::from_parts(value, metadata);
                    TargetEvents::One(TraceEvent::from(log).into())
                }
                Value::Array(values) => TargetEvents::Traces(TargetIter {
                    iter: values.into_iter(),
                    metadata,
                    _marker: PhantomData,
                    log_namespace,
                }),
                v => TargetEvents::One(create_log_event(v, metadata).into()),
            },
            VrlTarget::Metric { metric, .. } => TargetEvents::One(Event::Metric(metric)),
        }
    }
    fn metadata(&self) -> &EventMetadata {
        match self {
            VrlTarget::LogEvent(_, metadata) | VrlTarget::Trace(_, metadata) => metadata,
            VrlTarget::Metric { metric, .. } => metric.metadata(),
        }
    }
    fn metadata_mut(&mut self) -> &mut EventMetadata {
        match self {
            VrlTarget::LogEvent(_, metadata) | VrlTarget::Trace(_, metadata) => metadata,
            VrlTarget::Metric { metric, .. } => metric.metadata_mut(),
        }
    }
}
fn move_field_definitions_into_message(mut definition: Definition) -> Definition {
    let mut message = definition.event_kind().clone();
    message.remove_object();
    message.remove_array();
    if !message.is_never() {
        if let Some(message_key) = log_schema().message_key() {
            let message = Kind::object(Collection::from(BTreeMap::from([(
                message_key.to_string().into(),
                message,
            )])));
            definition.event_kind_mut().remove_bytes();
            definition.event_kind_mut().remove_integer();
            definition.event_kind_mut().remove_float();
            definition.event_kind_mut().remove_boolean();
            definition.event_kind_mut().remove_timestamp();
            definition.event_kind_mut().remove_regex();
            definition.event_kind_mut().remove_null();
            *definition.event_kind_mut() = definition.event_kind().union(message);
        }
    }
    definition
}
fn merge_array_definitions(mut definition: Definition) -> Definition {
    if let Some(array) = definition.event_kind().as_array() {
        let array_kinds = array.reduced_kind();
        let kind = definition.event_kind_mut();
        kind.remove_array();
        *kind = kind.union(array_kinds);
    }
    definition
}
fn set_metric_tag_values(name: String, value: &Value, metric: &mut Metric, multi_value_tags: bool) {
    if multi_value_tags {
        let tag_values = value
            .as_array()
            .unwrap_or(&[])
            .iter()
            .filter_map(|value| match value {
                Value::Bytes(bytes) => {
                    Some(TagValue::Value(String::from_utf8_lossy(bytes).to_string()))
                }
                Value::Null => Some(TagValue::Bare),
                _ => None,
            })
            .collect::<Vec<_>>();
        metric.set_multi_value_tag(name, tag_values);
    } else {
        if let Ok(tag_value) = value.try_bytes_utf8_lossy().map(Cow::into_owned) {
            metric.replace_tag(name, tag_value);
        } else if value.is_null() {
            metric.set_multi_value_tag(name, vec![TagValue::Bare]);
        }
    }
}
impl Target for VrlTarget {
    fn target_insert(&mut self, target_path: &OwnedTargetPath, value: Value) -> Result<(), String> {
        let path = &target_path.path;
        match target_path.prefix {
            PathPrefix::Event => match self {
                VrlTarget::LogEvent(ref mut log, _) | VrlTarget::Trace(ref mut log, _) => {
                    log.insert(path, value);
                    Ok(())
                }
                VrlTarget::Metric {
                    ref mut metric,
                    value: metric_value,
                    multi_value_tags,
                } => {
                    if path.is_root() {
                        return Err(MetricPathError::SetPathError.to_string());
                    }
                    if let Some(paths) = path
                        .to_alternative_components(MAX_METRIC_PATH_DEPTH)
                        .first()
                    {
                        match paths.as_slice() {
                            ["tags"] => {
                                let value =
                                    value.clone().try_object().map_err(|e| e.to_string())?;
                                metric.remove_tags();
                                for (field, value) in &value {
                                    set_metric_tag_values(
                                        field[..].into(),
                                        value,
                                        metric,
                                        *multi_value_tags,
                                    );
                                }
                            }
                            ["tags", field] => {
                                set_metric_tag_values(
                                    (*field).to_owned(),
                                    &value,
                                    metric,
                                    *multi_value_tags,
                                );
                            }
                            ["name"] => {
                                let value = value.clone().try_bytes().map_err(|e| e.to_string())?;
                                metric.series.name.name =
                                    String::from_utf8_lossy(&value).into_owned();
                            }
                            ["namespace"] => {
                                let value = value.clone().try_bytes().map_err(|e| e.to_string())?;
                                metric.series.name.namespace =
                                    Some(String::from_utf8_lossy(&value).into_owned());
                            }
                            ["timestamp"] => {
                                let value =
                                    value.clone().try_timestamp().map_err(|e| e.to_string())?;
                                metric.data.time.timestamp = Some(value);
                            }
                            ["kind"] => {
                                metric.data.kind = MetricKind::try_from(value.clone())?;
                            }
                            _ => {
                                return Err(MetricPathError::InvalidPath {
                                    path: &path.to_string(),
                                    expected: VALID_METRIC_PATHS_SET,
                                }
                                .to_string())
                            }
                        }
                        metric_value.insert(path, value);
                        return Ok(());
                    }
                    Err(MetricPathError::InvalidPath {
                        path: &path.to_string(),
                        expected: VALID_METRIC_PATHS_SET,
                    }
                    .to_string())
                }
            },
            PathPrefix::Metadata => {
                self.metadata_mut()
                    .value_mut()
                    .insert(&target_path.path, value);
                Ok(())
            }
        }
    }
    #[allow(clippy::redundant_closure_for_method_calls)] fn target_get(&self, target_path: &OwnedTargetPath) -> Result<Option<&Value>, String> {
        match target_path.prefix {
            PathPrefix::Event => match self {
                VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
                    Ok(log.get(&target_path.path))
                }
                VrlTarget::Metric { value, .. } => target_get_metric(&target_path.path, value),
            },
            PathPrefix::Metadata => Ok(self.metadata().value().get(&target_path.path)),
        }
    }
    fn target_get_mut(
        &mut self,
        target_path: &OwnedTargetPath,
    ) -> Result<Option<&mut Value>, String> {
        match target_path.prefix {
            PathPrefix::Event => match self {
                VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
                    Ok(log.get_mut(&target_path.path))
                }
                VrlTarget::Metric { value, .. } => target_get_mut_metric(&target_path.path, value),
            },
            PathPrefix::Metadata => Ok(self.metadata_mut().value_mut().get_mut(&target_path.path)),
        }
    }
    fn target_remove(
        &mut self,
        target_path: &OwnedTargetPath,
        compact: bool,
    ) -> Result<Option<vrl::value::Value>, String> {
        match target_path.prefix {
            PathPrefix::Event => match self {
                VrlTarget::LogEvent(ref mut log, _) | VrlTarget::Trace(ref mut log, _) => {
                    Ok(log.remove(&target_path.path, compact))
                }
                VrlTarget::Metric {
                    ref mut metric,
                    value,
                    multi_value_tags: _,
                } => {
                    if target_path.path.is_root() {
                        return Err(MetricPathError::SetPathError.to_string());
                    }
                    if let Some(paths) = target_path
                        .path
                        .to_alternative_components(MAX_METRIC_PATH_DEPTH)
                        .first()
                    {
                        let removed_value = match paths.as_slice() {
                            ["namespace"] => metric.series.name.namespace.take().map(Into::into),
                            ["timestamp"] => metric.data.time.timestamp.take().map(Into::into),
                            ["tags"] => metric.series.tags.take().map(|map| {
                                map.into_iter_single()
                                    .map(|(k, v)| (k, v.into()))
                                    .collect::<vrl::value::Value>()
                            }),
                            ["tags", field] => metric.remove_tag(field).map(Into::into),
                            _ => {
                                return Err(MetricPathError::InvalidPath {
                                    path: &target_path.path.to_string(),
                                    expected: VALID_METRIC_PATHS_SET,
                                }
                                .to_string())
                            }
                        };
                        value.remove(&target_path.path, false);
                        return Ok(removed_value);
                    }
                    Ok(None)
                }
            },
            PathPrefix::Metadata => Ok(self
                .metadata_mut()
                .value_mut()
                .remove(&target_path.path, compact)),
        }
    }
}
impl SecretTarget for VrlTarget {
    fn get_secret(&self, key: &str) -> Option<&str> {
        self.metadata().secrets().get_secret(key)
    }
    fn insert_secret(&mut self, key: &str, value: &str) {
        self.metadata_mut().secrets_mut().insert_secret(key, value);
    }
    fn remove_secret(&mut self, key: &str) {
        self.metadata_mut().secrets_mut().remove_secret(key);
    }
}
fn target_get_metric<'a>(
    path: &OwnedValuePath,
    value: &'a Value,
) -> Result<Option<&'a Value>, String> {
    if path.is_root() {
        return Ok(Some(value));
    }
    let value = value.get(path);
    for paths in path.to_alternative_components(MAX_METRIC_PATH_DEPTH) {
        match paths.as_slice() {
            ["name"] | ["kind"] | ["type"] | ["tags", _] => return Ok(value),
            ["namespace"] | ["timestamp"] | ["tags"] => {
                if let Some(value) = value {
                    return Ok(Some(value));
                }
            }
            _ => {
                return Err(MetricPathError::InvalidPath {
                    path: &path.to_string(),
                    expected: VALID_METRIC_PATHS_GET,
                }
                .to_string())
            }
        }
    }
    Ok(None)
}
fn target_get_mut_metric<'a>(
    path: &OwnedValuePath,
    value: &'a mut Value,
) -> Result<Option<&'a mut Value>, String> {
    if path.is_root() {
        return Ok(Some(value));
    }
    let value = value.get_mut(path);
    for paths in path.to_alternative_components(MAX_METRIC_PATH_DEPTH) {
        match paths.as_slice() {
            ["name"] | ["kind"] | ["tags", _] => return Ok(value),
            ["namespace"] | ["timestamp"] | ["tags"] => {
                if let Some(value) = value {
                    return Ok(Some(value));
                }
            }
            _ => {
                return Err(MetricPathError::InvalidPath {
                    path: &path.to_string(),
                    expected: VALID_METRIC_PATHS_SET,
                }
                .to_string())
            }
        }
    }
    Ok(None)
}
fn precompute_metric_value(metric: &Metric, info: &ProgramInfo) -> Value {
    let mut map = ObjectMap::default();
    let mut set_name = false;
    let mut set_kind = false;
    let mut set_type = false;
    let mut set_namespace = false;
    let mut set_timestamp = false;
    let mut set_tags = false;
    for target_path in &info.target_queries {
        if target_path == &OwnedTargetPath::event_root() {
            if !set_name {
                map.insert("name".into(), metric.name().to_owned().into());
            }
            if !set_kind {
                map.insert("kind".into(), metric.kind().into());
            }
            if !set_type {
                map.insert("type".into(), metric.value().clone().into());
            }
            if !set_namespace {
                if let Some(namespace) = metric.namespace() {
                    map.insert("namespace".into(), namespace.to_owned().into());
                }
            }
            if !set_timestamp {
                if let Some(timestamp) = metric.timestamp() {
                    map.insert("timestamp".into(), timestamp.into());
                }
            }
            if !set_tags {
                if let Some(tags) = metric.tags().cloned() {
                    map.insert(
                        "tags".into(),
                        tags.into_iter_single()
                            .map(|(tag, value)| (tag.into(), value.into()))
                            .collect::<ObjectMap>()
                            .into(),
                    );
                }
            }
            break;
        }
        if let Some(OwnedSegment::Field(field)) = target_path.path.segments.first() {
            match field.as_ref() {
                "name" if !set_name => {
                    set_name = true;
                    map.insert("name".into(), metric.name().to_owned().into());
                }
                "kind" if !set_kind => {
                    set_kind = true;
                    map.insert("kind".into(), metric.kind().into());
                }
                "type" if !set_type => {
                    set_type = true;
                    map.insert("type".into(), metric.value().clone().into());
                }
                "namespace" if !set_namespace && metric.namespace().is_some() => {
                    set_namespace = true;
                    map.insert(
                        "namespace".into(),
                        metric.namespace().unwrap().to_owned().into(),
                    );
                }
                "timestamp" if !set_timestamp && metric.timestamp().is_some() => {
                    set_timestamp = true;
                    map.insert("timestamp".into(), metric.timestamp().unwrap().into());
                }
                "tags" if !set_tags && metric.tags().is_some() => {
                    set_tags = true;
                    map.insert(
                        "tags".into(),
                        metric
                            .tags()
                            .cloned()
                            .unwrap()
                            .into_iter_single()
                            .map(|(tag, value)| (tag.into(), value.into()))
                            .collect::<ObjectMap>()
                            .into(),
                    );
                }
                _ => {}
            }
        }
    }
    map.into()
}
#[derive(Debug, Snafu)]
enum MetricPathError<'a> {
    #[snafu(display("cannot set root path"))]
    SetPathError,
    #[snafu(display("invalid path {}: expected one of {}", path, expected))]
    InvalidPath { path: &'a str, expected: &'a str },
}
#[cfg(test)]
mod test {
    use chrono::{offset::TimeZone, Utc};
    use lookup::owned_value_path;
    use similar_asserts::assert_eq;
    use vrl::btreemap;
    use vrl::value::kind::Index;
    use super::super::MetricValue;
    use super::*;
    use crate::metric_tags;
    #[test]
    fn test_field_definitions_in_message() {
        let definition =
            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Legacy]);
        assert_eq!(
            Definition::new_with_default_metadata(
                Kind::object(BTreeMap::from([("message".into(), Kind::bytes())])),
                [LogNamespace::Legacy]
            ),
            move_field_definitions_into_message(definition)
        );
        let definition = Definition::new_with_default_metadata(
            Kind::object(BTreeMap::from([("message".into(), Kind::integer())])).or_bytes(),
            [LogNamespace::Legacy],
        );
        assert_eq!(
            Definition::new_with_default_metadata(
                Kind::object(BTreeMap::from([(
                    "message".into(),
                    Kind::bytes().or_integer()
                )])),
                [LogNamespace::Legacy]
            ),
            move_field_definitions_into_message(definition)
        );
    }
    #[test]
    fn test_merged_array_definitions_simple() {
        let object: BTreeMap<vrl::value::kind::Field, Kind> = [
            ("carrot".into(), Kind::bytes()),
            ("potato".into(), Kind::integer()),
        ]
        .into();
        let kind = Kind::array(Collection::from_unknown(Kind::object(object)));
        let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
        let kind = Kind::object(BTreeMap::from([
            ("carrot".into(), Kind::bytes()),
            ("potato".into(), Kind::integer()),
        ]));
        let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
        let merged = merge_array_definitions(definition);
        assert_eq!(wanted, merged);
    }
    #[test]
    fn test_merged_array_definitions_complex() {
        let object: BTreeMap<vrl::value::kind::Field, Kind> = [
            ("carrot".into(), Kind::bytes()),
            ("potato".into(), Kind::integer()),
        ]
        .into();
        let array: BTreeMap<Index, Kind> = [
            (Index::from(0), Kind::integer()),
            (Index::from(1), Kind::boolean()),
            (
                Index::from(2),
                Kind::object(BTreeMap::from([("peas".into(), Kind::bytes())])),
            ),
        ]
        .into();
        let mut kind = Kind::bytes();
        kind.add_object(object);
        kind.add_array(array);
        let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
        let mut kind = Kind::bytes();
        kind.add_integer();
        kind.add_boolean();
        kind.add_object(BTreeMap::from([
            ("carrot".into(), Kind::bytes().or_undefined()),
            ("potato".into(), Kind::integer().or_undefined()),
            ("peas".into(), Kind::bytes().or_undefined()),
        ]));
        let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
        let merged = merge_array_definitions(definition);
        assert_eq!(wanted, merged);
    }
    #[test]
    fn log_get() {
        let cases = vec![
            (
                BTreeMap::new(),
                owned_value_path!(),
                Ok(Some(BTreeMap::new().into())),
            ),
            (
                BTreeMap::from([("foo".into(), "bar".into())]),
                owned_value_path!(),
                Ok(Some(BTreeMap::from([("foo".into(), "bar".into())]).into())),
            ),
            (
                BTreeMap::from([("foo".into(), "bar".into())]),
                owned_value_path!("foo"),
                Ok(Some("bar".into())),
            ),
            (
                BTreeMap::from([("foo".into(), "bar".into())]),
                owned_value_path!("bar"),
                Ok(None),
            ),
            (
                btreemap! { "foo" => vec![btreemap! { "bar" => true }] },
                owned_value_path!("foo", 0, "bar"),
                Ok(Some(true.into())),
            ),
            (
                btreemap! { "foo" => btreemap! { "bar baz" => btreemap! { "baz" => 2 } } },
                owned_value_path!("foo", r"bar baz", "baz"),
                Ok(Some(2.into())),
            ),
        ];
        for (value, path, expect) in cases {
            let value: ObjectMap = value;
            let info = ProgramInfo {
                fallible: false,
                abortable: false,
                target_queries: vec![],
                target_assignments: vec![],
            };
            let target = VrlTarget::new(Event::Log(LogEvent::from(value)), &info, false);
            let path = OwnedTargetPath::event(path);
            assert_eq!(
                Target::target_get(&target, &path).map(Option::<&Value>::cloned),
                expect
            );
        }
    }
    #[allow(clippy::too_many_lines)]
    #[test]
    fn log_insert() {
        let cases = vec![
            (
                BTreeMap::from([("foo".into(), "bar".into())]),
                owned_value_path!(0),
                btreemap! { "baz" => "qux" }.into(),
                btreemap! { "baz" => "qux" },
                Ok(()),
            ),
            (
                BTreeMap::from([("foo".into(), "bar".into())]),
                owned_value_path!("foo"),
                "baz".into(),
                btreemap! { "foo" => "baz" },
                Ok(()),
            ),
            (
                BTreeMap::from([("foo".into(), "bar".into())]),
                owned_value_path!("foo", 2, "bar baz", "a", "b"),
                true.into(),
                btreemap! {
                    "foo" => vec![
                        Value::Null,
                        Value::Null,
                        btreemap! {
                            "bar baz" => btreemap! { "a" => btreemap! { "b" => true } },
                        }.into()
                    ]
                },
                Ok(()),
            ),
            (
                btreemap! { "foo" => vec![0, 1, 2] },
                owned_value_path!("foo", 5),
                "baz".into(),
                btreemap! {
                    "foo" => vec![
                        0.into(),
                        1.into(),
                        2.into(),
                        Value::Null,
                        Value::Null,
                        Value::from("baz"),
                    ],
                },
                Ok(()),
            ),
            (
                BTreeMap::from([("foo".into(), "bar".into())]),
                owned_value_path!("foo", 0),
                "baz".into(),
                btreemap! { "foo" => vec!["baz"] },
                Ok(()),
            ),
            (
                btreemap! { "foo" => Value::Array(vec![]) },
                owned_value_path!("foo", 0),
                "baz".into(),
                btreemap! { "foo" => vec!["baz"] },
                Ok(()),
            ),
            (
                btreemap! { "foo" => Value::Array(vec![0.into()]) },
                owned_value_path!("foo", 0),
                "baz".into(),
                btreemap! { "foo" => vec!["baz"] },
                Ok(()),
            ),
            (
                btreemap! { "foo" => Value::Array(vec![0.into(), 1.into()]) },
                owned_value_path!("foo", 0),
                "baz".into(),
                btreemap! { "foo" => Value::Array(vec!["baz".into(), 1.into()]) },
                Ok(()),
            ),
            (
                btreemap! { "foo" => Value::Array(vec![0.into(), 1.into()]) },
                owned_value_path!("foo", 1),
                "baz".into(),
                btreemap! { "foo" => Value::Array(vec![0.into(), "baz".into()]) },
                Ok(()),
            ),
        ];
        for (object, path, value, expect, result) in cases {
            let object: ObjectMap = object;
            let info = ProgramInfo {
                fallible: false,
                abortable: false,
                target_queries: vec![],
                target_assignments: vec![],
            };
            let mut target = VrlTarget::new(Event::Log(LogEvent::from(object)), &info, false);
            let expect = LogEvent::from(expect);
            let value: Value = value;
            let path = OwnedTargetPath::event(path);
            assert_eq!(
                Target::target_insert(&mut target, &path, value.clone()),
                result
            );
            assert_eq!(
                Target::target_get(&target, &path).map(Option::<&Value>::cloned),
                Ok(Some(value))
            );
            assert_eq!(
                match target.into_events(LogNamespace::Legacy) {
                    TargetEvents::One(event) => vec![event],
                    TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
                    TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
                }
                .first()
                .cloned()
                .unwrap(),
                Event::Log(expect)
            );
        }
    }
    #[test]
    fn log_remove() {
        let cases = vec![
            (
                BTreeMap::from([("foo".into(), "bar".into())]),
                owned_value_path!("foo"),
                false,
                Some(BTreeMap::new().into()),
            ),
            (
                BTreeMap::from([("foo".into(), "bar".into())]),
                owned_value_path!(r"foo bar", "foo"),
                false,
                Some(btreemap! { "foo" => "bar"}.into()),
            ),
            (
                btreemap! { "foo" => "bar", "baz" => "qux" },
                owned_value_path!(),
                false,
                Some(BTreeMap::new().into()),
            ),
            (
                btreemap! { "foo" => "bar", "baz" => "qux" },
                owned_value_path!(),
                true,
                Some(BTreeMap::new().into()),
            ),
            (
                btreemap! { "foo" => vec![0] },
                owned_value_path!("foo", 0),
                false,
                Some(btreemap! { "foo" => Value::Array(vec![]) }.into()),
            ),
            (
                btreemap! { "foo" => vec![0] },
                owned_value_path!("foo", 0),
                true,
                Some(BTreeMap::new().into()),
            ),
            (
                btreemap! {
                    "foo" => btreemap! { "bar baz" => vec![0] },
                    "bar" => "baz",
                },
                owned_value_path!("foo", r"bar baz", 0),
                false,
                Some(
                    btreemap! {
                        "foo" => btreemap! { "bar baz" => Value::Array(vec![]) },
                        "bar" => "baz",
                    }
                    .into(),
                ),
            ),
            (
                btreemap! {
                    "foo" => btreemap! { "bar baz" => vec![0] },
                    "bar" => "baz",
                },
                owned_value_path!("foo", r"bar baz", 0),
                true,
                Some(btreemap! { "bar" => "baz" }.into()),
            ),
        ];
        for (object, path, compact, expect) in cases {
            let info = ProgramInfo {
                fallible: false,
                abortable: false,
                target_queries: vec![],
                target_assignments: vec![],
            };
            let mut target = VrlTarget::new(Event::Log(LogEvent::from(object)), &info, false);
            let path = OwnedTargetPath::event(path);
            let removed = Target::target_get(&target, &path).unwrap().cloned();
            assert_eq!(
                Target::target_remove(&mut target, &path, compact),
                Ok(removed)
            );
            assert_eq!(
                Target::target_get(&target, &OwnedTargetPath::event_root())
                    .map(Option::<&Value>::cloned),
                Ok(expect)
            );
        }
    }
    #[test]
    fn log_into_events() {
        use vrl::btreemap;
        let cases = vec![
            (
                Value::from(btreemap! {"foo" => "bar"}),
                vec![btreemap! {"foo" => "bar"}],
            ),
            (Value::from(1), vec![btreemap! {"message" => 1}]),
            (Value::from("2"), vec![btreemap! {"message" => "2"}]),
            (Value::from(true), vec![btreemap! {"message" => true}]),
            (
                Value::from(vec![
                    Value::from(1),
                    Value::from("2"),
                    Value::from(true),
                    Value::from(btreemap! {"foo" => "bar"}),
                ]),
                vec![
                    btreemap! {"message" => 1},
                    btreemap! {"message" => "2"},
                    btreemap! {"message" => true},
                    btreemap! {"foo" => "bar"},
                ],
            ),
        ];
        for (value, expect) in cases {
            let metadata = EventMetadata::default();
            let info = ProgramInfo {
                fallible: false,
                abortable: false,
                target_queries: vec![],
                target_assignments: vec![],
            };
            let mut target = VrlTarget::new(
                Event::Log(LogEvent::new_with_metadata(metadata.clone())),
                &info,
                false,
            );
            Target::target_insert(&mut target, &OwnedTargetPath::event_root(), value).unwrap();
            assert_eq!(
                match target.into_events(LogNamespace::Legacy) {
                    TargetEvents::One(event) => vec![event],
                    TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
                    TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
                },
                expect
                    .into_iter()
                    .map(|v| Event::Log(LogEvent::from_map(v, metadata.clone())))
                    .collect::<Vec<_>>()
            );
        }
    }
    #[test]
    fn metric_all_fields() {
        let metric = Metric::new(
            "zub",
            MetricKind::Absolute,
            MetricValue::Counter { value: 1.23 },
        )
        .with_namespace(Some("zoob"))
        .with_tags(Some(metric_tags!("tig" => "tog")))
        .with_timestamp(Some(
            Utc.with_ymd_and_hms(2020, 12, 10, 12, 0, 0)
                .single()
                .expect("invalid timestamp"),
        ));
        let info = ProgramInfo {
            fallible: false,
            abortable: false,
            target_queries: vec![
                OwnedTargetPath::event(owned_value_path!("name")),
                OwnedTargetPath::event(owned_value_path!("namespace")),
                OwnedTargetPath::event(owned_value_path!("timestamp")),
                OwnedTargetPath::event(owned_value_path!("kind")),
                OwnedTargetPath::event(owned_value_path!("type")),
                OwnedTargetPath::event(owned_value_path!("tags")),
            ],
            target_assignments: vec![],
        };
        let target = VrlTarget::new(Event::Metric(metric), &info, false);
        assert_eq!(
            Ok(Some(
                btreemap! {
                    "name" => "zub",
                    "namespace" => "zoob",
                    "timestamp" => Utc.with_ymd_and_hms(2020, 12, 10, 12, 0, 0).single().expect("invalid timestamp"),
                    "tags" => btreemap! { "tig" => "tog" },
                    "kind" => "absolute",
                    "type" => "counter",
                }
                .into()
            )),
            target
                .target_get(&OwnedTargetPath::event_root())
                .map(Option::<&Value>::cloned)
        );
    }
    #[test]
    fn metric_fields() {
        let metric = Metric::new(
            "name",
            MetricKind::Absolute,
            MetricValue::Counter { value: 1.23 },
        )
        .with_tags(Some(metric_tags!("tig" => "tog")));
        let cases = vec![
            (
                owned_value_path!("name"), Some(Value::from("name")), Value::from("namefoo"),    false,                     ),
            (
                owned_value_path!("namespace"),
                None,
                "namespacefoo".into(),
                true,
            ),
            (
                owned_value_path!("timestamp"),
                None,
                Utc.with_ymd_and_hms(2020, 12, 8, 12, 0, 0)
                    .single()
                    .expect("invalid timestamp")
                    .into(),
                true,
            ),
            (
                owned_value_path!("kind"),
                Some(Value::from("absolute")),
                "incremental".into(),
                false,
            ),
            (
                owned_value_path!("tags", "thing"),
                None,
                "footag".into(),
                true,
            ),
        ];
        let info = ProgramInfo {
            fallible: false,
            abortable: false,
            target_queries: vec![
                OwnedTargetPath::event(owned_value_path!("name")),
                OwnedTargetPath::event(owned_value_path!("namespace")),
                OwnedTargetPath::event(owned_value_path!("timestamp")),
                OwnedTargetPath::event(owned_value_path!("kind")),
            ],
            target_assignments: vec![],
        };
        let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
        for (path, current, new, delete) in cases {
            let path = OwnedTargetPath::event(path);
            assert_eq!(
                Ok(current),
                target.target_get(&path).map(Option::<&Value>::cloned)
            );
            assert_eq!(Ok(()), target.target_insert(&path, new.clone()));
            assert_eq!(
                Ok(Some(new.clone())),
                target.target_get(&path).map(Option::<&Value>::cloned)
            );
            if delete {
                assert_eq!(Ok(Some(new)), target.target_remove(&path, true));
                assert_eq!(
                    Ok(None),
                    target.target_get(&path).map(Option::<&Value>::cloned)
                );
            }
        }
    }
    #[test]
    fn metric_set_tags() {
        let metric = Metric::new(
            "name",
            MetricKind::Absolute,
            MetricValue::Counter { value: 1.23 },
        )
        .with_tags(Some(metric_tags!("tig" => "tog")));
        let info = ProgramInfo {
            fallible: false,
            abortable: false,
            target_queries: vec![],
            target_assignments: vec![],
        };
        let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
        let _result = target.target_insert(
            &OwnedTargetPath::event(owned_value_path!("tags")),
            Value::Object(BTreeMap::from([("a".into(), "b".into())])),
        );
        match target {
            VrlTarget::Metric {
                metric,
                value: _,
                multi_value_tags: _,
            } => {
                assert!(metric.tags().is_some());
                assert_eq!(metric.tags().unwrap(), &crate::metric_tags!("a" => "b"));
            }
            _ => panic!("must be a metric"),
        }
    }
    #[test]
    fn metric_invalid_paths() {
        let metric = Metric::new(
            "name",
            MetricKind::Absolute,
            MetricValue::Counter { value: 1.23 },
        );
        let validpaths_get = [
            ".name",
            ".namespace",
            ".timestamp",
            ".kind",
            ".tags",
            ".type",
        ];
        let validpaths_set = [".name", ".namespace", ".timestamp", ".kind", ".tags"];
        let info = ProgramInfo {
            fallible: false,
            abortable: false,
            target_queries: vec![],
            target_assignments: vec![],
        };
        let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
        assert_eq!(
            Err(format!(
                "invalid path zork: expected one of {}",
                validpaths_get.join(", ")
            )),
            target.target_get(&OwnedTargetPath::event(owned_value_path!("zork")))
        );
        assert_eq!(
            Err(format!(
                "invalid path zork: expected one of {}",
                validpaths_set.join(", ")
            )),
            target.target_insert(
                &OwnedTargetPath::event(owned_value_path!("zork")),
                "thing".into()
            )
        );
        assert_eq!(
            Err(format!(
                "invalid path zork: expected one of {}",
                validpaths_set.join(", ")
            )),
            target.target_remove(&OwnedTargetPath::event(owned_value_path!("zork")), true)
        );
        assert_eq!(
            Err(format!(
                "invalid path tags.foo.flork: expected one of {}",
                validpaths_get.join(", ")
            )),
            target.target_get(&OwnedTargetPath::event(owned_value_path!(
                "tags", "foo", "flork"
            )))
        );
    }
    #[test]
    fn test_metric_insert_get_multi_value_tag() {
        let metric = Metric::new(
            "name",
            MetricKind::Absolute,
            MetricValue::Counter { value: 1.23 },
        );
        let info = ProgramInfo {
            fallible: false,
            abortable: false,
            target_queries: vec![],
            target_assignments: vec![],
        };
        let mut target = VrlTarget::new(Event::Metric(metric), &info, true);
        let value = Value::Array(vec!["a".into(), "".into(), Value::Null, "b".into()]);
        target
            .target_insert(
                &OwnedTargetPath::event(owned_value_path!("tags", "foo")),
                value,
            )
            .unwrap();
        let vrl_tags_value = target
            .target_get(&OwnedTargetPath::event(owned_value_path!("tags")))
            .unwrap()
            .unwrap();
        assert_eq!(
            vrl_tags_value,
            &Value::Object(BTreeMap::from([(
                "foo".into(),
                Value::Array(vec!["a".into(), "".into(), Value::Null, "b".into()])
            )]))
        );
        let VrlTarget::Metric { metric, .. } = target else {
            unreachable!()
        };
        assert_eq!(metric.tag_value("foo"), Some("b".into()));
    }
}