use std::{
    collections::HashMap,
    convert::{TryFrom, TryInto},
    fmt::Debug,
    iter::FromIterator,
    mem::size_of,
    num::NonZeroUsize,
    sync::{Arc, LazyLock},
};
use crate::event::util::log::all_fields_skip_array_elements;
use bytes::Bytes;
use chrono::Utc;
use crossbeam_utils::atomic::AtomicCell;
use lookup::{lookup_v2::TargetPath, metadata_path, path, PathPrefix};
use serde::{Deserialize, Serialize, Serializer};
use vector_common::{
    byte_size_of::ByteSizeOf,
    internal_event::{OptionalTag, TaggedEventsSent},
    json_size::{JsonSize, NonZeroJsonSize},
    request_metadata::GetEventCountTags,
    EventDataEq,
};
use vrl::path::{parse_target_path, OwnedTargetPath, PathParseError};
use vrl::{event_path, owned_value_path};
use super::{
    estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
    finalization::{BatchNotifier, EventFinalizer},
    metadata::EventMetadata,
    util, EventFinalizers, Finalizable, KeyString, ObjectMap, Value,
};
use crate::config::LogNamespace;
use crate::config::{log_schema, telemetry};
use crate::event::util::log::{all_fields, all_metadata_fields};
use crate::event::MaybeAsLogMut;
static VECTOR_SOURCE_TYPE_PATH: LazyLock<Option<OwnedTargetPath>> = LazyLock::new(|| {
    Some(OwnedTargetPath::metadata(owned_value_path!(
        "vector",
        "source_type"
    )))
});
#[derive(Debug, Deserialize)]
struct Inner {
    #[serde(flatten)]
    fields: Value,
    #[serde(skip)]
    size_cache: AtomicCell<Option<NonZeroUsize>>,
    #[serde(skip)]
    json_encoded_size_cache: AtomicCell<Option<NonZeroJsonSize>>,
}
impl Inner {
    fn invalidate(&self) {
        self.size_cache.store(None);
        self.json_encoded_size_cache.store(None);
    }
    fn as_value(&self) -> &Value {
        &self.fields
    }
}
impl ByteSizeOf for Inner {
    fn size_of(&self) -> usize {
        self.size_cache
            .load()
            .unwrap_or_else(|| {
                let size = size_of::<Self>() + self.allocated_bytes();
                let size = NonZeroUsize::new(size).expect("Size cannot be zero");
                self.size_cache.store(Some(size));
                size
            })
            .into()
    }
    fn allocated_bytes(&self) -> usize {
        self.fields.allocated_bytes()
    }
}
impl EstimatedJsonEncodedSizeOf for Inner {
    fn estimated_json_encoded_size_of(&self) -> JsonSize {
        self.json_encoded_size_cache
            .load()
            .unwrap_or_else(|| {
                let size = self.fields.estimated_json_encoded_size_of();
                let size = NonZeroJsonSize::new(size).expect("Size cannot be zero");
                self.json_encoded_size_cache.store(Some(size));
                size
            })
            .into()
    }
}
impl Clone for Inner {
    fn clone(&self) -> Self {
        Self {
            fields: self.fields.clone(),
            size_cache: None.into(),
            json_encoded_size_cache: None.into(),
        }
    }
}
impl Default for Inner {
    fn default() -> Self {
        Self {
            fields: Value::Object(Default::default()),
            size_cache: Default::default(),
            json_encoded_size_cache: Default::default(),
        }
    }
}
impl From<Value> for Inner {
    fn from(fields: Value) -> Self {
        Self {
            fields,
            size_cache: Default::default(),
            json_encoded_size_cache: Default::default(),
        }
    }
}
impl PartialEq for Inner {
    fn eq(&self, other: &Self) -> bool {
        self.fields.eq(&other.fields)
    }
}
#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
pub struct LogEvent {
    #[serde(flatten)]
    inner: Arc<Inner>,
    #[serde(skip)]
    metadata: EventMetadata,
}
impl LogEvent {
    pub fn from_str_legacy(msg: impl Into<String>) -> Self {
        let mut log = LogEvent::default();
        log.maybe_insert(log_schema().message_key_target_path(), msg.into());
        if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
            log.insert(timestamp_key, Utc::now());
        }
        log
    }
    pub fn from_bytes_legacy(msg: &Bytes) -> Self {
        Self::from_str_legacy(String::from_utf8_lossy(msg.as_ref()).to_string())
    }
    pub fn value(&self) -> &Value {
        self.inner.as_ref().as_value()
    }
    pub fn value_mut(&mut self) -> &mut Value {
        let result = Arc::make_mut(&mut self.inner);
        result.invalidate();
        &mut result.fields
    }
    pub fn metadata(&self) -> &EventMetadata {
        &self.metadata
    }
    pub fn metadata_mut(&mut self) -> &mut EventMetadata {
        &mut self.metadata
    }
    pub fn namespace(&self) -> LogNamespace {
        if self.contains((PathPrefix::Metadata, path!("vector"))) {
            LogNamespace::Vector
        } else {
            LogNamespace::Legacy
        }
    }
}
impl ByteSizeOf for LogEvent {
    fn allocated_bytes(&self) -> usize {
        self.inner.size_of() + self.metadata.allocated_bytes()
    }
}
impl Finalizable for LogEvent {
    fn take_finalizers(&mut self) -> EventFinalizers {
        self.metadata.take_finalizers()
    }
}
impl EstimatedJsonEncodedSizeOf for LogEvent {
    fn estimated_json_encoded_size_of(&self) -> JsonSize {
        self.inner.estimated_json_encoded_size_of()
    }
}
impl GetEventCountTags for LogEvent {
    fn get_tags(&self) -> TaggedEventsSent {
        let source = if telemetry().tags().emit_source {
            self.metadata().source_id().cloned().into()
        } else {
            OptionalTag::Ignored
        };
        let service = if telemetry().tags().emit_service {
            self.get_by_meaning("service")
                .map(|value| value.to_string_lossy().to_string())
                .into()
        } else {
            OptionalTag::Ignored
        };
        TaggedEventsSent { source, service }
    }
}
impl LogEvent {
    #[must_use]
    pub fn new_with_metadata(metadata: EventMetadata) -> Self {
        Self {
            inner: Default::default(),
            metadata,
        }
    }
    pub fn from_parts(value: Value, metadata: EventMetadata) -> Self {
        Self {
            inner: Arc::new(value.into()),
            metadata,
        }
    }
    pub fn from_map(map: ObjectMap, metadata: EventMetadata) -> Self {
        let inner = Arc::new(Inner::from(Value::Object(map)));
        Self { inner, metadata }
    }
    pub fn into_parts(mut self) -> (Value, EventMetadata) {
        self.value_mut();
        let value = Arc::try_unwrap(self.inner)
            .unwrap_or_else(|_| unreachable!("inner fields already cloned after owning"))
            .fields;
        let metadata = self.metadata;
        (value, metadata)
    }
    #[must_use]
    pub fn with_batch_notifier(mut self, batch: &BatchNotifier) -> Self {
        self.metadata = self.metadata.with_batch_notifier(batch);
        self
    }
    #[must_use]
    pub fn with_batch_notifier_option(mut self, batch: &Option<BatchNotifier>) -> Self {
        self.metadata = self.metadata.with_batch_notifier_option(batch);
        self
    }
    pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
        self.metadata.add_finalizer(finalizer);
    }
    pub fn parse_path_and_get_value(
        &self,
        path: impl AsRef<str>,
    ) -> Result<Option<&Value>, PathParseError> {
        parse_target_path(path.as_ref()).map(|path| self.get(&path))
    }
    #[allow(clippy::needless_pass_by_value)] pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> {
        match key.prefix() {
            PathPrefix::Event => self.inner.fields.get(key.value_path()),
            PathPrefix::Metadata => self.metadata.value().get(key.value_path()),
        }
    }
    pub fn get_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&Value> {
        self.metadata().dropped_field(&meaning).or_else(|| {
            self.metadata()
                .schema_definition()
                .meaning_path(meaning.as_ref())
                .and_then(|path| self.get(path))
        })
    }
    pub fn get_mut_by_meaning(&mut self, meaning: impl AsRef<str>) -> Option<&mut Value> {
        Arc::clone(self.metadata.schema_definition())
            .meaning_path(meaning.as_ref())
            .and_then(|path| self.get_mut(path))
    }
    pub fn find_key_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&OwnedTargetPath> {
        self.metadata()
            .schema_definition()
            .meaning_path(meaning.as_ref())
    }
    #[allow(clippy::needless_pass_by_value)] pub fn get_mut<'a>(&mut self, path: impl TargetPath<'a>) -> Option<&mut Value> {
        match path.prefix() {
            PathPrefix::Event => self.value_mut().get_mut(path.value_path()),
            PathPrefix::Metadata => self.metadata.value_mut().get_mut(path.value_path()),
        }
    }
    #[allow(clippy::needless_pass_by_value)] pub fn contains<'a>(&self, path: impl TargetPath<'a>) -> bool {
        match path.prefix() {
            PathPrefix::Event => self.value().contains(path.value_path()),
            PathPrefix::Metadata => self.metadata.value().contains(path.value_path()),
        }
    }
    pub fn parse_path_and_insert(
        &mut self,
        path: impl AsRef<str>,
        value: impl Into<Value>,
    ) -> Result<Option<Value>, PathParseError> {
        let target_path = parse_target_path(path.as_ref())?;
        Ok(self.insert(&target_path, value))
    }
    #[allow(clippy::needless_pass_by_value)] pub fn insert<'a>(
        &mut self,
        path: impl TargetPath<'a>,
        value: impl Into<Value>,
    ) -> Option<Value> {
        match path.prefix() {
            PathPrefix::Event => self.value_mut().insert(path.value_path(), value.into()),
            PathPrefix::Metadata => self
                .metadata
                .value_mut()
                .insert(path.value_path(), value.into()),
        }
    }
    pub fn maybe_insert<'a>(&mut self, path: Option<impl TargetPath<'a>>, value: impl Into<Value>) {
        if let Some(path) = path {
            self.insert(path, value);
        }
    }
    pub fn try_insert<'a>(&mut self, path: impl TargetPath<'a>, value: impl Into<Value>) {
        if !self.contains(path.clone()) {
            self.insert(path, value);
        }
    }
    pub fn rename_key<'a>(&mut self, from: impl TargetPath<'a>, to: impl TargetPath<'a>) {
        if let Some(val) = self.remove(from) {
            self.insert(to, val);
        }
    }
    pub fn remove<'a>(&mut self, path: impl TargetPath<'a>) -> Option<Value> {
        self.remove_prune(path, false)
    }
    #[allow(clippy::needless_pass_by_value)] pub fn remove_prune<'a>(&mut self, path: impl TargetPath<'a>, prune: bool) -> Option<Value> {
        match path.prefix() {
            PathPrefix::Event => self.value_mut().remove(path.value_path(), prune),
            PathPrefix::Metadata => self.metadata.value_mut().remove(path.value_path(), prune),
        }
    }
    pub fn keys(&self) -> Option<impl Iterator<Item = KeyString> + '_> {
        match &self.inner.fields {
            Value::Object(map) => Some(util::log::keys(map)),
            _ => None,
        }
    }
    pub fn all_event_fields(
        &self,
    ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
        self.as_map().map(all_fields)
    }
    pub fn all_event_fields_skip_array_elements(
        &self,
    ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
        self.as_map().map(all_fields_skip_array_elements)
    }
    pub fn all_metadata_fields(
        &self,
    ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
        match self.metadata.value() {
            Value::Object(metadata_map) => Some(all_metadata_fields(metadata_map)),
            _ => None,
        }
    }
    pub fn convert_to_fields(&self) -> impl Iterator<Item = (KeyString, &Value)> + Serialize {
        if let Some(map) = self.as_map() {
            util::log::all_fields(map)
        } else {
            util::log::all_fields_non_object_root(self.value())
        }
    }
    pub fn convert_to_fields_unquoted(
        &self,
    ) -> impl Iterator<Item = (KeyString, &Value)> + Serialize {
        if let Some(map) = self.as_map() {
            util::log::all_fields_unquoted(map)
        } else {
            util::log::all_fields_non_object_root(self.value())
        }
    }
    pub fn is_empty_object(&self) -> bool {
        if let Some(map) = self.as_map() {
            map.is_empty()
        } else {
            false
        }
    }
    pub fn as_map(&self) -> Option<&ObjectMap> {
        match self.value() {
            Value::Object(map) => Some(map),
            _ => None,
        }
    }
    pub fn as_map_mut(&mut self) -> Option<&mut ObjectMap> {
        match self.value_mut() {
            Value::Object(map) => Some(map),
            _ => None,
        }
    }
    pub fn merge(&mut self, mut incoming: LogEvent, fields: &[impl AsRef<str>]) {
        for field in fields {
            let field_path = event_path!(field.as_ref());
            let Some(incoming_val) = incoming.remove(field_path) else {
                continue;
            };
            match self.get_mut(field_path) {
                None => {
                    self.insert(field_path, incoming_val);
                }
                Some(current_val) => current_val.merge(incoming_val),
            }
        }
        self.metadata.merge(incoming.metadata);
    }
}
impl LogEvent {
    pub fn message_path(&self) -> Option<&OwnedTargetPath> {
        match self.namespace() {
            LogNamespace::Vector => self.find_key_by_meaning("message"),
            LogNamespace::Legacy => log_schema().message_key_target_path(),
        }
    }
    pub fn timestamp_path(&self) -> Option<&OwnedTargetPath> {
        match self.namespace() {
            LogNamespace::Vector => self.find_key_by_meaning("timestamp"),
            LogNamespace::Legacy => log_schema().timestamp_key_target_path(),
        }
    }
    pub fn host_path(&self) -> Option<&OwnedTargetPath> {
        match self.namespace() {
            LogNamespace::Vector => self.find_key_by_meaning("host"),
            LogNamespace::Legacy => log_schema().host_key_target_path(),
        }
    }
    pub fn source_type_path(&self) -> Option<&OwnedTargetPath> {
        match self.namespace() {
            LogNamespace::Vector => VECTOR_SOURCE_TYPE_PATH.as_ref(),
            LogNamespace::Legacy => log_schema().source_type_key_target_path(),
        }
    }
    pub fn get_message(&self) -> Option<&Value> {
        match self.namespace() {
            LogNamespace::Vector => self.get_by_meaning("message"),
            LogNamespace::Legacy => log_schema()
                .message_key_target_path()
                .and_then(|key| self.get(key)),
        }
    }
    pub fn get_timestamp(&self) -> Option<&Value> {
        match self.namespace() {
            LogNamespace::Vector => self.get_by_meaning("timestamp"),
            LogNamespace::Legacy => log_schema()
                .timestamp_key_target_path()
                .and_then(|key| self.get(key)),
        }
    }
    pub fn remove_timestamp(&mut self) -> Option<Value> {
        self.timestamp_path()
            .cloned()
            .and_then(|key| self.remove(&key))
    }
    pub fn get_host(&self) -> Option<&Value> {
        match self.namespace() {
            LogNamespace::Vector => self.get_by_meaning("host"),
            LogNamespace::Legacy => log_schema()
                .host_key_target_path()
                .and_then(|key| self.get(key)),
        }
    }
    pub fn get_source_type(&self) -> Option<&Value> {
        match self.namespace() {
            LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
            LogNamespace::Legacy => log_schema()
                .source_type_key_target_path()
                .and_then(|key| self.get(key)),
        }
    }
}
impl MaybeAsLogMut for LogEvent {
    fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent> {
        Some(self)
    }
}
impl EventDataEq for LogEvent {
    fn event_data_eq(&self, other: &Self) -> bool {
        self.inner.fields == other.inner.fields && self.metadata.event_data_eq(&other.metadata)
    }
}
#[cfg(any(test, feature = "test"))]
mod test_utils {
    use super::{log_schema, Bytes, LogEvent, Utc};
    impl From<Bytes> for LogEvent {
        fn from(message: Bytes) -> Self {
            let mut log = LogEvent::default();
            log.maybe_insert(log_schema().message_key_target_path(), message);
            if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
                log.insert(timestamp_key, Utc::now());
            }
            log
        }
    }
    impl From<&str> for LogEvent {
        fn from(message: &str) -> Self {
            message.to_owned().into()
        }
    }
    impl From<String> for LogEvent {
        fn from(message: String) -> Self {
            Bytes::from(message).into()
        }
    }
}
impl From<Value> for LogEvent {
    fn from(value: Value) -> Self {
        Self::from_parts(value, EventMetadata::default())
    }
}
impl From<ObjectMap> for LogEvent {
    fn from(map: ObjectMap) -> Self {
        Self::from_parts(Value::Object(map), EventMetadata::default())
    }
}
impl From<HashMap<KeyString, Value>> for LogEvent {
    fn from(map: HashMap<KeyString, Value>) -> Self {
        Self::from_parts(
            Value::Object(map.into_iter().collect::<ObjectMap>()),
            EventMetadata::default(),
        )
    }
}
impl TryFrom<serde_json::Value> for LogEvent {
    type Error = crate::Error;
    fn try_from(map: serde_json::Value) -> Result<Self, Self::Error> {
        match map {
            serde_json::Value::Object(fields) => Ok(LogEvent::from(
                fields
                    .into_iter()
                    .map(|(k, v)| (k.into(), v.into()))
                    .collect::<ObjectMap>(),
            )),
            _ => Err(crate::Error::from(
                "Attempted to convert non-Object JSON into a LogEvent.",
            )),
        }
    }
}
impl TryInto<serde_json::Value> for LogEvent {
    type Error = crate::Error;
    fn try_into(self) -> Result<serde_json::Value, Self::Error> {
        Ok(serde_json::to_value(&self.inner.fields)?)
    }
}
#[cfg(any(test, feature = "test"))]
impl<T> std::ops::Index<T> for LogEvent
where
    T: AsRef<str>,
{
    type Output = Value;
    fn index(&self, key: T) -> &Value {
        self.parse_path_and_get_value(key.as_ref())
            .ok()
            .flatten()
            .unwrap_or_else(|| panic!("Key is not found: {:?}", key.as_ref()))
    }
}
impl<K, V> Extend<(K, V)> for LogEvent
where
    K: AsRef<str>,
    V: Into<Value>,
{
    fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
        for (k, v) in iter {
            if let Ok(path) = parse_target_path(k.as_ref()) {
                self.insert(&path, v.into());
            }
        }
    }
}
impl<K: AsRef<str>, V: Into<Value>> FromIterator<(K, V)> for LogEvent {
    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
        let mut log_event = Self::default();
        log_event.extend(iter);
        log_event
    }
}
impl Serialize for LogEvent {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        self.value().serialize(serializer)
    }
}
struct TracingTargetPaths {
    pub(crate) timestamp: OwnedTargetPath,
    pub(crate) kind: OwnedTargetPath,
    pub(crate) module_path: OwnedTargetPath,
    pub(crate) level: OwnedTargetPath,
    pub(crate) target: OwnedTargetPath,
}
static TRACING_TARGET_PATHS: LazyLock<TracingTargetPaths> = LazyLock::new(|| TracingTargetPaths {
    timestamp: OwnedTargetPath::event(owned_value_path!("timestamp")),
    kind: OwnedTargetPath::event(owned_value_path!("metadata", "kind")),
    level: OwnedTargetPath::event(owned_value_path!("metadata", "level")),
    module_path: OwnedTargetPath::event(owned_value_path!("metadata", "module_path")),
    target: OwnedTargetPath::event(owned_value_path!("metadata", "target")),
});
impl From<&tracing::Event<'_>> for LogEvent {
    fn from(event: &tracing::Event<'_>) -> Self {
        let now = chrono::Utc::now();
        let mut maker = LogEvent::default();
        event.record(&mut maker);
        let mut log = maker;
        log.insert(&TRACING_TARGET_PATHS.timestamp, now);
        let meta = event.metadata();
        log.insert(
            &TRACING_TARGET_PATHS.kind,
            if meta.is_event() {
                Value::Bytes("event".to_string().into())
            } else if meta.is_span() {
                Value::Bytes("span".to_string().into())
            } else {
                Value::Null
            },
        );
        log.insert(&TRACING_TARGET_PATHS.level, meta.level().to_string());
        log.insert(
            &TRACING_TARGET_PATHS.module_path,
            meta.module_path()
                .map_or(Value::Null, |mp| Value::Bytes(mp.to_string().into())),
        );
        log.insert(&TRACING_TARGET_PATHS.target, meta.target().to_string());
        log
    }
}
impl tracing::field::Visit for LogEvent {
    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
        self.insert(event_path!(field.name()), value.to_string());
    }
    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn Debug) {
        self.insert(event_path!(field.name()), format!("{value:?}"));
    }
    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
        self.insert(event_path!(field.name()), value);
    }
    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
        let field_path = event_path!(field.name());
        let converted: Result<i64, _> = value.try_into();
        match converted {
            Ok(value) => self.insert(field_path, value),
            Err(_) => self.insert(field_path, value.to_string()),
        };
    }
    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
        self.insert(event_path!(field.name()), value);
    }
}
#[cfg(test)]
mod test {
    use super::*;
    use crate::test_util::open_fixture;
    use lookup::event_path;
    use uuid::Version;
    use vrl::{btreemap, value};
    #[test]
    fn rename_key_flat_equiv_exists() {
        let value = value!({
            one: 1,
            two: 2
        });
        let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
        base.rename_key(event_path!("one"), event_path!("one"));
        let (actual_fields, _) = base.into_parts();
        assert_eq!(value, actual_fields);
    }
    #[test]
    fn rename_key_flat_equiv_not_exists() {
        let value = value!({
            one: 1,
            two: 2
        });
        let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
        base.rename_key(event_path!("three"), event_path!("three"));
        let (actual_fields, _) = base.into_parts();
        assert_eq!(value, actual_fields);
    }
    #[test]
    fn rename_key_flat_not_exists() {
        let value = value!({
            one: 1,
            two: 2
        });
        let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
        base.rename_key(event_path!("three"), event_path!("four"));
        let (actual_fields, _) = base.into_parts();
        assert_eq!(value, actual_fields);
    }
    #[test]
    fn rename_key_flat_no_overlap() {
        let value = value!({
            one: 1,
            two: 2
        });
        let mut expected_value = value.clone();
        let one = expected_value.remove("one", true).unwrap();
        expected_value.insert("three", one);
        let mut base = LogEvent::from_parts(value, EventMetadata::default());
        base.rename_key(event_path!("one"), event_path!("three"));
        let (actual_fields, _) = base.into_parts();
        assert_eq!(expected_value, actual_fields);
    }
    #[test]
    fn rename_key_flat_overlap() {
        let value = value!({
            one: 1,
            two: 2
        });
        let mut expected_value = value.clone();
        let val = expected_value.remove("one", true).unwrap();
        expected_value.insert("two", val);
        let mut base = LogEvent::from_parts(value, EventMetadata::default());
        base.rename_key(event_path!("one"), event_path!("two"));
        let (actual_value, _) = base.into_parts();
        assert_eq!(expected_value, actual_value);
    }
    #[test]
    fn insert() {
        let mut log = LogEvent::default();
        let old = log.insert("foo", "foo");
        assert_eq!(log.get("foo"), Some(&"foo".into()));
        assert_eq!(old, None);
    }
    #[test]
    fn insert_existing() {
        let mut log = LogEvent::default();
        log.insert("foo", "foo");
        let old = log.insert("foo", "bar");
        assert_eq!(log.get("foo"), Some(&"bar".into()));
        assert_eq!(old, Some("foo".into()));
    }
    #[test]
    fn try_insert() {
        let mut log = LogEvent::default();
        log.try_insert("foo", "foo");
        assert_eq!(log.get("foo"), Some(&"foo".into()));
    }
    #[test]
    fn try_insert_existing() {
        let mut log = LogEvent::default();
        log.insert("foo", "foo");
        log.try_insert("foo", "bar");
        assert_eq!(log.get("foo"), Some(&"foo".into()));
    }
    #[test]
    fn try_insert_dotted() {
        let mut log = LogEvent::default();
        log.try_insert("foo.bar", "foo");
        assert_eq!(log.get("foo.bar"), Some(&"foo".into()));
        assert_eq!(log.get(event_path!("foo.bar")), None);
    }
    #[test]
    fn try_insert_existing_dotted() {
        let mut log = LogEvent::default();
        log.insert("foo.bar", "foo");
        log.try_insert("foo.bar", "bar");
        assert_eq!(log.get("foo.bar"), Some(&"foo".into()));
        assert_eq!(log.get(event_path!("foo.bar")), None);
    }
    #[test]
    fn try_insert_flat() {
        let mut log = LogEvent::default();
        log.try_insert(event_path!("foo"), "foo");
        assert_eq!(log.get(event_path!("foo")), Some(&"foo".into()));
    }
    #[test]
    fn try_insert_flat_existing() {
        let mut log = LogEvent::default();
        log.insert(event_path!("foo"), "foo");
        log.try_insert(event_path!("foo"), "bar");
        assert_eq!(log.get(event_path!("foo")), Some(&"foo".into()));
    }
    #[test]
    fn try_insert_flat_dotted() {
        let mut log = LogEvent::default();
        log.try_insert(event_path!("foo.bar"), "foo");
        assert_eq!(log.get(event_path!("foo.bar")), Some(&"foo".into()));
        assert_eq!(log.get("foo.bar"), None);
    }
    #[test]
    fn try_insert_flat_existing_dotted() {
        let mut log = LogEvent::default();
        log.insert(event_path!("foo.bar"), "foo");
        log.try_insert(event_path!("foo.bar"), "bar");
        assert_eq!(log.get(event_path!("foo.bar")), Some(&"foo".into()));
        assert_eq!(log.get("foo.bar"), None);
    }
    #[test]
    fn json_value_to_vector_log_event_to_json_value() {
        const FIXTURE_ROOT: &str = "tests/data/fixtures/log_event";
        std::fs::read_dir(FIXTURE_ROOT)
            .unwrap()
            .for_each(|fixture_file| match fixture_file {
                Ok(fixture_file) => {
                    let path = fixture_file.path();
                    tracing::trace!(?path, "Opening.");
                    let serde_value = open_fixture(&path).unwrap();
                    let vector_value = LogEvent::try_from(serde_value.clone()).unwrap();
                    let serde_value_again: serde_json::Value = vector_value.try_into().unwrap();
                    assert_eq!(serde_value, serde_value_again);
                }
                _ => panic!("This test should never read Err'ing test fixtures."),
            });
    }
    fn assert_merge_value(
        current: impl Into<Value>,
        incoming: impl Into<Value>,
        expected: impl Into<Value>,
    ) {
        let mut merged = current.into();
        merged.merge(incoming.into());
        assert_eq!(merged, expected.into());
    }
    #[test]
    fn merge_value_works_correctly() {
        assert_merge_value("hello ", "world", "hello world");
        assert_merge_value(true, false, false);
        assert_merge_value(false, true, true);
        assert_merge_value("my_val", true, true);
        assert_merge_value(true, "my_val", "my_val");
        assert_merge_value(1, 2, 2);
    }
    #[test]
    fn merge_event_combines_values_accordingly() {
        let fields_to_merge = vec![
            "merge".to_string(),
            "merge_a".to_string(),
            "merge_b".to_string(),
            "merge_c".to_string(),
        ];
        let current = {
            let mut log = LogEvent::default();
            log.insert("merge", "hello "); log.insert("do_not_merge", "my_first_value"); log.insert("merge_a", true); log.insert("merge_b", 123i64); log.insert("a", true); log.insert("b", 123i64); log
        };
        let incoming = {
            let mut log = LogEvent::default();
            log.insert("merge", "world"); log.insert("do_not_merge", "my_second_value"); log.insert("merge_b", 456i64); log.insert("merge_c", false); log.insert("b", 456i64); log.insert("c", true); log
        };
        let mut merged = current;
        merged.merge(incoming, &fields_to_merge);
        let expected = {
            let mut log = LogEvent::default();
            log.insert("merge", "hello world");
            log.insert("do_not_merge", "my_first_value");
            log.insert("a", true);
            log.insert("b", 123i64);
            log.insert("merge_a", true);
            log.insert("merge_b", 456i64);
            log.insert("merge_c", false);
            log
        };
        vector_common::assert_event_data_eq!(merged, expected);
    }
    #[test]
    fn event_fields_iter() {
        let mut log = LogEvent::default();
        log.insert("a", 0);
        log.insert("a.b", 1);
        log.insert("c", 2);
        let actual: Vec<(KeyString, Value)> = log
            .all_event_fields()
            .unwrap()
            .map(|(s, v)| (s, v.clone()))
            .collect();
        assert_eq!(
            actual,
            vec![("a.b".into(), 1.into()), ("c".into(), 2.into())]
        );
    }
    #[test]
    fn metadata_fields_iter() {
        let mut log = LogEvent::default();
        log.insert("%a", 0);
        log.insert("%a.b", 1);
        log.insert("%c", 2);
        let actual: Vec<(KeyString, Value)> = log
            .all_metadata_fields()
            .unwrap()
            .map(|(s, v)| (s, v.clone()))
            .collect();
        assert_eq!(
            actual,
            vec![("%a.b".into(), 1.into()), ("%c".into(), 2.into())]
        );
    }
    #[test]
    fn skip_array_elements() {
        let log = LogEvent::from(Value::from(btreemap! {
            "arr" => [1],
            "obj" => btreemap! {
                "arr" => [1,2,3]
            },
        }));
        let actual: Vec<(KeyString, Value)> = log
            .all_event_fields_skip_array_elements()
            .unwrap()
            .map(|(s, v)| (s, v.clone()))
            .collect();
        assert_eq!(
            actual,
            vec![
                ("arr".into(), [1].into()),
                ("obj.arr".into(), [1, 2, 3].into())
            ]
        );
    }
    #[test]
    fn metadata_set_unique_uuid_v7_source_event_id() {
        let log1 = LogEvent::default();
        assert_eq!(
            log1.metadata()
                .source_event_id()
                .expect("source_event_id should be auto-generated for new events")
                .get_version(),
            Some(Version::SortRand)
        );
        let log2 = LogEvent::default();
        assert_ne!(
            log1.metadata().source_event_id(),
            log2.metadata().source_event_id()
        );
    }
}