use std::collections::HashMap;
use std::sync::Mutex;
use std::{
    collections::BTreeMap,
    fs::File,
    io::{self, Read},
    path::PathBuf,
};
use snafu::{ResultExt, Snafu};
use vector_lib::codecs::MetricTagValues;
use vector_lib::compile_vrl;
use vector_lib::config::LogNamespace;
use vector_lib::configurable::configurable_component;
use vector_lib::enrichment::TableRegistry;
use vector_lib::lookup::{metadata_path, owned_value_path, PathPrefix};
use vector_lib::schema::Definition;
use vector_lib::TimeZone;
use vector_vrl_functions::set_semantic_meaning::MeaningList;
use vrl::compiler::runtime::{Runtime, Terminate};
use vrl::compiler::state::ExternalEnv;
use vrl::compiler::{CompileConfig, ExpressionError, Program, TypeState, VrlRuntime};
use vrl::diagnostic::{DiagnosticMessage, Formatter, Note};
use vrl::path;
use vrl::path::ValuePath;
use vrl::value::{Kind, Value};
use crate::config::OutputId;
use crate::{
    config::{
        log_schema, ComponentKey, DataType, Input, TransformConfig, TransformContext,
        TransformOutput,
    },
    event::{Event, TargetEvents, VrlTarget},
    internal_events::{RemapMappingAbort, RemapMappingError},
    schema,
    transforms::{SyncTransform, Transform, TransformOutputsBuf},
    Result,
};
const DROPPED: &str = "dropped";
type CacheKey = (TableRegistry, schema::Definition);
type CacheValue = (Program, String, MeaningList);
#[configurable_component(transform(
    "remap",
    "Modify your observability data as it passes through your topology using Vector Remap Language (VRL)."
))]
#[derive(Derivative)]
#[serde(deny_unknown_fields)]
#[derivative(Default, Debug)]
pub struct RemapConfig {
    #[configurable(metadata(
        docs::examples = ". = parse_json!(.message)\n.new_field = \"new value\"\n.status = to_int!(.status)\n.duration = parse_duration!(.duration, \"s\")\n.new_name = del(.old_name)",
        docs::syntax_override = "remap_program"
    ))]
    pub source: Option<String>,
    #[configurable(metadata(docs::examples = "./my/program.vrl"))]
    pub file: Option<PathBuf>,
    #[configurable(metadata(docs::examples = "['./my/program.vrl', './my/program2.vrl']"))]
    pub files: Option<Vec<PathBuf>>,
    #[serde(default)]
    pub metric_tag_values: MetricTagValues,
    #[serde(default)]
    #[configurable(metadata(docs::advanced))]
    pub timezone: Option<TimeZone>,
    #[serde(default = "crate::serde::default_false")]
    #[configurable(metadata(docs::human_name = "Drop Event on Error"))]
    pub drop_on_error: bool,
    #[serde(default = "crate::serde::default_true")]
    #[configurable(metadata(docs::human_name = "Drop Event on Abort"))]
    pub drop_on_abort: bool,
    #[serde(default = "crate::serde::default_false")]
    #[configurable(metadata(docs::human_name = "Reroute Dropped Events"))]
    pub reroute_dropped: bool,
    #[configurable(derived, metadata(docs::hidden))]
    #[serde(default)]
    pub runtime: VrlRuntime,
    #[configurable(derived, metadata(docs::hidden))]
    #[serde(skip)]
    #[derivative(Debug = "ignore")]
    pub cache: Mutex<Vec<(CacheKey, std::result::Result<CacheValue, String>)>>,
}
impl Clone for RemapConfig {
    fn clone(&self) -> Self {
        Self {
            source: self.source.clone(),
            file: self.file.clone(),
            files: self.files.clone(),
            metric_tag_values: self.metric_tag_values,
            timezone: self.timezone,
            drop_on_error: self.drop_on_error,
            drop_on_abort: self.drop_on_abort,
            reroute_dropped: self.reroute_dropped,
            runtime: self.runtime,
            cache: Mutex::new(Default::default()),
        }
    }
}
impl RemapConfig {
    fn compile_vrl_program(
        &self,
        enrichment_tables: TableRegistry,
        merged_schema_definition: schema::Definition,
    ) -> Result<(Program, String, MeaningList)> {
        if let Some((_, res)) = self
            .cache
            .lock()
            .expect("Data poisoned")
            .iter()
            .find(|v| v.0 .0 == enrichment_tables && v.0 .1 == merged_schema_definition)
        {
            return res.clone().map_err(Into::into);
        }
        let source = match (&self.source, &self.file, &self.files) {
            (Some(source), None, None) => source.to_owned(),
            (None, Some(path), None) => Self::read_file(path)?,
            (None, None, Some(paths)) => {
                let mut combined_source = String::new();
                for path in paths {
                    let content = Self::read_file(path)?;
                    combined_source.push_str(&content);
                    combined_source.push('\n');
                }
                combined_source
            }
            _ => return Err(Box::new(BuildError::SourceAndOrFileOrFiles)),
        };
        let mut functions = vrl::stdlib::all();
        functions.append(&mut vector_lib::enrichment::vrl_functions());
        functions.append(&mut vector_vrl_functions::all());
        let state = TypeState {
            local: Default::default(),
            external: ExternalEnv::new_with_kind(
                merged_schema_definition.event_kind().clone(),
                merged_schema_definition.metadata_kind().clone(),
            ),
        };
        let mut config = CompileConfig::default();
        config.set_custom(enrichment_tables.clone());
        config.set_custom(MeaningList::default());
        let res = compile_vrl(&source, &functions, &state, config)
            .map_err(|diagnostics| Formatter::new(&source, diagnostics).colored().to_string())
            .map(|result| {
                (
                    result.program,
                    Formatter::new(&source, result.warnings).to_string(),
                    result.config.get_custom::<MeaningList>().unwrap().clone(),
                )
            });
        self.cache
            .lock()
            .expect("Data poisoned")
            .push(((enrichment_tables, merged_schema_definition), res.clone()));
        res.map_err(Into::into)
    }
    fn read_file(path: &PathBuf) -> Result<String> {
        let mut buffer = String::new();
        File::open(path)
            .with_context(|_| FileOpenFailedSnafu { path })?
            .read_to_string(&mut buffer)
            .with_context(|_| FileReadFailedSnafu { path })?;
        Ok(buffer)
    }
}
impl_generate_config_from_default!(RemapConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "remap")]
impl TransformConfig for RemapConfig {
    async fn build(&self, context: &TransformContext) -> Result<Transform> {
        let (transform, warnings) = match self.runtime {
            VrlRuntime::Ast => {
                let (remap, warnings) = Remap::new_ast(self.clone(), context)?;
                (Transform::synchronous(remap), warnings)
            }
        };
        if !warnings.is_empty() {
            warn!(message = "VRL compilation warning.", %warnings);
        }
        Ok(transform)
    }
    fn input(&self) -> Input {
        Input::all()
    }
    fn outputs(
        &self,
        enrichment_tables: vector_lib::enrichment::TableRegistry,
        input_definitions: &[(OutputId, schema::Definition)],
        _: LogNamespace,
    ) -> Vec<TransformOutput> {
        let merged_definition: Definition = input_definitions
            .iter()
            .map(|(_output, definition)| definition.clone())
            .reduce(Definition::merge)
            .unwrap_or_else(Definition::any);
        let compiled = self
            .compile_vrl_program(enrichment_tables, merged_definition)
            .map(|(program, _, meaning_list)| (program.final_type_info().state, meaning_list.0))
            .map_err(|_| ());
        let mut dropped_definitions = HashMap::new();
        let mut default_definitions = HashMap::new();
        for (output_id, input_definition) in input_definitions {
            let default_definition = compiled
                .clone()
                .map(|(state, meaning)| {
                    let mut new_type_def = Definition::new(
                        state.external.target_kind().clone(),
                        state.external.metadata_kind().clone(),
                        input_definition.log_namespaces().clone(),
                    );
                    for (id, path) in input_definition.meanings() {
                        let _ = new_type_def.try_with_meaning(path.clone(), id);
                    }
                    for (id, path) in meaning {
                        new_type_def = new_type_def.with_meaning(path, &id);
                    }
                    new_type_def
                })
                .unwrap_or_else(|_| {
                    Definition::new_with_default_metadata(
                        Kind::never(),
                        input_definition.log_namespaces().clone(),
                    )
                });
            let dropped_definition = Definition::combine_log_namespaces(
                input_definition.log_namespaces(),
                input_definition.clone().with_event_field(
                    log_schema().metadata_key().expect("valid metadata key"),
                    Kind::object(BTreeMap::from([
                        ("reason".into(), Kind::bytes()),
                        ("message".into(), Kind::bytes()),
                        ("component_id".into(), Kind::bytes()),
                        ("component_type".into(), Kind::bytes()),
                        ("component_kind".into(), Kind::bytes()),
                    ])),
                    Some("metadata"),
                ),
                input_definition
                    .clone()
                    .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None)
                    .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None)
                    .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None)
                    .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None)
                    .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None),
            );
            default_definitions.insert(
                output_id.clone(),
                VrlTarget::modify_schema_definition_for_into_events(default_definition),
            );
            dropped_definitions.insert(
                output_id.clone(),
                VrlTarget::modify_schema_definition_for_into_events(dropped_definition),
            );
        }
        let default_output = TransformOutput::new(DataType::all_bits(), default_definitions);
        if self.reroute_dropped {
            vec![
                default_output,
                TransformOutput::new(DataType::all_bits(), dropped_definitions).with_port(DROPPED),
            ]
        } else {
            vec![default_output]
        }
    }
    fn enable_concurrency(&self) -> bool {
        true
    }
}
#[derive(Debug, Clone)]
pub struct Remap<Runner>
where
    Runner: VrlRunner,
{
    component_key: Option<ComponentKey>,
    program: Program,
    timezone: TimeZone,
    drop_on_error: bool,
    drop_on_abort: bool,
    reroute_dropped: bool,
    runner: Runner,
    metric_tag_values: MetricTagValues,
}
pub trait VrlRunner {
    fn run(
        &mut self,
        target: &mut VrlTarget,
        program: &Program,
        timezone: &TimeZone,
    ) -> std::result::Result<Value, Terminate>;
}
#[derive(Debug)]
pub struct AstRunner {
    pub runtime: Runtime,
}
impl Clone for AstRunner {
    fn clone(&self) -> Self {
        Self {
            runtime: Runtime::default(),
        }
    }
}
impl VrlRunner for AstRunner {
    fn run(
        &mut self,
        target: &mut VrlTarget,
        program: &Program,
        timezone: &TimeZone,
    ) -> std::result::Result<Value, Terminate> {
        let result = self.runtime.resolve(target, program, timezone);
        self.runtime.clear();
        result
    }
}
impl Remap<AstRunner> {
    pub fn new_ast(
        config: RemapConfig,
        context: &TransformContext,
    ) -> crate::Result<(Self, String)> {
        let (program, warnings, _) = config.compile_vrl_program(
            context.enrichment_tables.clone(),
            context.merged_schema_definition.clone(),
        )?;
        let runtime = Runtime::default();
        let runner = AstRunner { runtime };
        Self::new(config, context, program, runner).map(|remap| (remap, warnings))
    }
}
impl<Runner> Remap<Runner>
where
    Runner: VrlRunner,
{
    fn new(
        config: RemapConfig,
        context: &TransformContext,
        program: Program,
        runner: Runner,
    ) -> crate::Result<Self> {
        Ok(Remap {
            component_key: context.key.clone(),
            program,
            timezone: config
                .timezone
                .unwrap_or_else(|| context.globals.timezone()),
            drop_on_error: config.drop_on_error,
            drop_on_abort: config.drop_on_abort,
            reroute_dropped: config.reroute_dropped,
            runner,
            metric_tag_values: config.metric_tag_values,
        })
    }
    #[cfg(test)]
    const fn runner(&self) -> &Runner {
        &self.runner
    }
    fn dropped_data(&self, reason: &str, error: ExpressionError) -> serde_json::Value {
        let message = error
            .notes()
            .iter()
            .filter(|note| matches!(note, Note::UserErrorMessage(_)))
            .last()
            .map(|note| note.to_string())
            .unwrap_or_else(|| error.to_string());
        serde_json::json!({
                "reason": reason,
                "message": message,
                "component_id": self.component_key,
                "component_type": "remap",
                "component_kind": "transform",
        })
    }
    fn annotate_dropped(&self, event: &mut Event, reason: &str, error: ExpressionError) {
        match event {
            Event::Log(ref mut log) => match log.namespace() {
                LogNamespace::Legacy => {
                    if let Some(metadata_key) = log_schema().metadata_key() {
                        log.insert(
                            (PathPrefix::Event, metadata_key.concat(path!("dropped"))),
                            self.dropped_data(reason, error),
                        );
                    }
                }
                LogNamespace::Vector => {
                    log.insert(
                        metadata_path!("vector", "dropped"),
                        self.dropped_data(reason, error),
                    );
                }
            },
            Event::Metric(ref mut metric) => {
                if let Some(metadata_key) = log_schema().metadata_key() {
                    metric.replace_tag(format!("{}.dropped.reason", metadata_key), reason.into());
                    metric.replace_tag(
                        format!("{}.dropped.component_id", metadata_key),
                        self.component_key
                            .as_ref()
                            .map(ToString::to_string)
                            .unwrap_or_default(),
                    );
                    metric.replace_tag(
                        format!("{}.dropped.component_type", metadata_key),
                        "remap".into(),
                    );
                    metric.replace_tag(
                        format!("{}.dropped.component_kind", metadata_key),
                        "transform".into(),
                    );
                }
            }
            Event::Trace(ref mut trace) => {
                trace.maybe_insert(log_schema().metadata_key_target_path(), || {
                    self.dropped_data(reason, error).into()
                });
            }
        }
    }
    fn run_vrl(&mut self, target: &mut VrlTarget) -> std::result::Result<Value, Terminate> {
        self.runner.run(target, &self.program, &self.timezone)
    }
}
impl<Runner> SyncTransform for Remap<Runner>
where
    Runner: VrlRunner + Clone + Send + Sync,
{
    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
        let forward_on_error = !self.drop_on_error || self.reroute_dropped;
        let forward_on_abort = !self.drop_on_abort || self.reroute_dropped;
        let original_event = if (self.program.info().fallible && forward_on_error)
            || (self.program.info().abortable && forward_on_abort)
        {
            Some(event.clone())
        } else {
            None
        };
        let log_namespace = event
            .maybe_as_log()
            .map(|log| log.namespace())
            .unwrap_or(LogNamespace::Legacy);
        let mut target = VrlTarget::new(
            event,
            self.program.info(),
            match self.metric_tag_values {
                MetricTagValues::Single => false,
                MetricTagValues::Full => true,
            },
        );
        let result = self.run_vrl(&mut target);
        match result {
            Ok(_) => match target.into_events(log_namespace) {
                TargetEvents::One(event) => push_default(event, output),
                TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)),
                TargetEvents::Traces(events) => {
                    events.for_each(|event| push_default(event, output))
                }
            },
            Err(reason) => {
                let (reason, error, drop) = match reason {
                    Terminate::Abort(error) => {
                        if !self.reroute_dropped {
                            emit!(RemapMappingAbort {
                                event_dropped: self.drop_on_abort,
                            });
                        }
                        ("abort", error, self.drop_on_abort)
                    }
                    Terminate::Error(error) => {
                        if !self.reroute_dropped {
                            emit!(RemapMappingError {
                                error: error.to_string(),
                                event_dropped: self.drop_on_error,
                            });
                        }
                        ("error", error, self.drop_on_error)
                    }
                };
                if !drop {
                    let event = original_event.expect("event will be set");
                    push_default(event, output);
                } else if self.reroute_dropped {
                    let mut event = original_event.expect("event will be set");
                    self.annotate_dropped(&mut event, reason, error);
                    push_dropped(event, output);
                }
            }
        }
    }
}
#[inline]
fn push_default(event: Event, output: &mut TransformOutputsBuf) {
    output.push(None, event)
}
#[inline]
fn push_dropped(event: Event, output: &mut TransformOutputsBuf) {
    output.push(Some(DROPPED), event);
}
#[derive(Debug, Snafu)]
pub enum BuildError {
    #[snafu(display("must provide exactly one of `source` or `file` or `files` configuration"))]
    SourceAndOrFileOrFiles,
    #[snafu(display("Could not open vrl program {:?}: {}", path, source))]
    FileOpenFailed { path: PathBuf, source: io::Error },
    #[snafu(display("Could not read vrl program {:?}: {}", path, source))]
    FileReadFailed { path: PathBuf, source: io::Error },
}
#[cfg(test)]
mod tests {
    use std::collections::{HashMap, HashSet};
    use std::sync::Arc;
    use indoc::{formatdoc, indoc};
    use vector_lib::{config::GlobalOptions, event::EventMetadata, metric_tags};
    use vrl::value::kind::Collection;
    use vrl::{btreemap, event_path};
    use super::*;
    use crate::metrics::Controller;
    use crate::{
        config::{build_unit_tests, ConfigBuilder},
        event::{
            metric::{MetricKind, MetricValue},
            LogEvent, Metric, Value,
        },
        schema,
        test_util::components::{
            assert_transform_compliance, init_test, COMPONENT_MULTIPLE_OUTPUTS_TESTS,
        },
        transforms::test::create_topology,
        transforms::OutputBuffer,
    };
    use chrono::DateTime;
    use tokio::sync::mpsc;
    use tokio_stream::wrappers::ReceiverStream;
    use vector_lib::enrichment::TableRegistry;
    fn test_default_schema_definition() -> schema::Definition {
        schema::Definition::empty_legacy_namespace().with_event_field(
            &owned_value_path!("a default field"),
            Kind::integer().or_bytes(),
            Some("default"),
        )
    }
    fn test_dropped_schema_definition() -> schema::Definition {
        schema::Definition::empty_legacy_namespace().with_event_field(
            &owned_value_path!("a dropped field"),
            Kind::boolean().or_null(),
            Some("dropped"),
        )
    }
    fn remap(config: RemapConfig) -> Result<Remap<AstRunner>> {
        let schema_definitions = HashMap::from([
            (
                None,
                [("source".into(), test_default_schema_definition())].into(),
            ),
            (
                Some(DROPPED.to_owned()),
                [("source".into(), test_dropped_schema_definition())].into(),
            ),
        ]);
        Remap::new_ast(config, &TransformContext::new_test(schema_definitions))
            .map(|(remap, _)| remap)
    }
    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<RemapConfig>();
    }
    #[test]
    fn config_missing_source_and_file() {
        let config = RemapConfig {
            source: None,
            file: None,
            ..Default::default()
        };
        let err = remap(config).unwrap_err().to_string();
        assert_eq!(
            &err,
            "must provide exactly one of `source` or `file` or `files` configuration"
        )
    }
    #[test]
    fn config_both_source_and_file() {
        let config = RemapConfig {
            source: Some("".to_owned()),
            file: Some("".into()),
            ..Default::default()
        };
        let err = remap(config).unwrap_err().to_string();
        assert_eq!(
            &err,
            "must provide exactly one of `source` or `file` or `files` configuration"
        )
    }
    fn get_field_string(event: &Event, field: &str) -> String {
        event
            .as_log()
            .get(field)
            .unwrap()
            .to_string_lossy()
            .into_owned()
    }
    #[test]
    fn check_remap_doesnt_share_state_between_events() {
        let conf = RemapConfig {
            source: Some(".foo = .sentinel".to_string()),
            file: None,
            drop_on_error: true,
            drop_on_abort: false,
            ..Default::default()
        };
        let mut tform = remap(conf).unwrap();
        assert!(tform.runner().runtime.is_empty());
        let event1 = {
            let mut event1 = LogEvent::from("event1");
            event1.insert("sentinel", "bar");
            Event::from(event1)
        };
        let result1 = transform_one(&mut tform, event1).unwrap();
        assert_eq!(get_field_string(&result1, "message"), "event1");
        assert_eq!(get_field_string(&result1, "foo"), "bar");
        assert!(tform.runner().runtime.is_empty());
        let event2 = {
            let event2 = LogEvent::from("event2");
            Event::from(event2)
        };
        let result2 = transform_one(&mut tform, event2).unwrap();
        assert_eq!(get_field_string(&result2, "message"), "event2");
        assert_eq!(result2.as_log().get("foo"), Some(&Value::Null));
        assert!(tform.runner().runtime.is_empty());
    }
    #[test]
    fn remap_return_raw_string_vector_namespace() {
        let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into());
        let event = {
            let mut metadata = EventMetadata::default()
                .with_schema_definition(&Arc::new(initial_definition.clone()));
            metadata
                .value_mut()
                .insert(&owned_value_path!("vector"), BTreeMap::new());
            let mut event = LogEvent::new_with_metadata(metadata);
            event.insert("copy_from", "buz");
            Event::from(event)
        };
        let conf = RemapConfig {
            source: Some(r#"  . = "root string";"#.to_string()),
            file: None,
            drop_on_error: true,
            drop_on_abort: false,
            ..Default::default()
        };
        let mut tform = remap(conf.clone()).unwrap();
        let result = transform_one(&mut tform, event).unwrap();
        assert_eq!(get_field_string(&result, "."), "root string");
        let mut outputs = conf.outputs(
            TableRegistry::default(),
            &[(OutputId::dummy(), initial_definition)],
            LogNamespace::Vector,
        );
        assert_eq!(outputs.len(), 1);
        let output = outputs.pop().unwrap();
        assert_eq!(output.port, None);
        let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
        let expected_schema =
            Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
        assert_eq!(actual_schema_def, expected_schema);
    }
    #[test]
    fn check_remap_adds() {
        let event = {
            let mut event = LogEvent::from("augment me");
            event.insert("copy_from", "buz");
            Event::from(event)
        };
        let conf = RemapConfig {
            source: Some(
                r#"  .foo = "bar"
  .bar = "baz"
  .copy = .copy_from
"#
                .to_string(),
            ),
            file: None,
            drop_on_error: true,
            drop_on_abort: false,
            ..Default::default()
        };
        let mut tform = remap(conf).unwrap();
        let result = transform_one(&mut tform, event).unwrap();
        assert_eq!(get_field_string(&result, "message"), "augment me");
        assert_eq!(get_field_string(&result, "copy_from"), "buz");
        assert_eq!(get_field_string(&result, "foo"), "bar");
        assert_eq!(get_field_string(&result, "bar"), "baz");
        assert_eq!(get_field_string(&result, "copy"), "buz");
    }
    #[test]
    fn check_remap_emits_multiple() {
        let event = {
            let mut event = LogEvent::from("augment me");
            event.insert(
                "events",
                vec![btreemap!("message" => "foo"), btreemap!("message" => "bar")],
            );
            Event::from(event)
        };
        let conf = RemapConfig {
            source: Some(
                indoc! {r"
                . = .events
            "}
                .to_owned(),
            ),
            file: None,
            drop_on_error: true,
            drop_on_abort: false,
            ..Default::default()
        };
        let mut tform = remap(conf).unwrap();
        let out = collect_outputs(&mut tform, event);
        assert_eq!(2, out.primary.len());
        let mut result = out.primary.into_events();
        let r = result.next().unwrap();
        assert_eq!(get_field_string(&r, "message"), "foo");
        let r = result.next().unwrap();
        assert_eq!(get_field_string(&r, "message"), "bar");
    }
    #[test]
    fn check_remap_error() {
        let event = {
            let mut event = Event::Log(LogEvent::from("augment me"));
            event.as_mut_log().insert("bar", "is a string");
            event
        };
        let conf = RemapConfig {
            source: Some(formatdoc! {r#"
                .foo = "foo"
                .not_an_int = int!(.bar)
                .baz = 12
            "#}),
            file: None,
            drop_on_error: false,
            drop_on_abort: false,
            ..Default::default()
        };
        let mut tform = remap(conf).unwrap();
        let event = transform_one(&mut tform, event).unwrap();
        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
        assert!(event.as_log().get("foo").is_none());
        assert!(event.as_log().get("baz").is_none());
    }
    #[test]
    fn check_remap_error_drop() {
        let event = {
            let mut event = Event::Log(LogEvent::from("augment me"));
            event.as_mut_log().insert("bar", "is a string");
            event
        };
        let conf = RemapConfig {
            source: Some(formatdoc! {r#"
                .foo = "foo"
                .not_an_int = int!(.bar)
                .baz = 12
            "#}),
            file: None,
            drop_on_error: true,
            drop_on_abort: false,
            ..Default::default()
        };
        let mut tform = remap(conf).unwrap();
        assert!(transform_one(&mut tform, event).is_none())
    }
    #[test]
    fn check_remap_error_infallible() {
        let event = {
            let mut event = Event::Log(LogEvent::from("augment me"));
            event.as_mut_log().insert("bar", "is a string");
            event
        };
        let conf = RemapConfig {
            source: Some(formatdoc! {r#"
                .foo = "foo"
                .baz = 12
            "#}),
            file: None,
            drop_on_error: false,
            drop_on_abort: false,
            ..Default::default()
        };
        let mut tform = remap(conf).unwrap();
        let event = transform_one(&mut tform, event).unwrap();
        assert_eq!(event.as_log().get("foo"), Some(&Value::from("foo")));
        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
        assert_eq!(event.as_log().get("baz"), Some(&Value::from(12)));
    }
    #[test]
    fn check_remap_abort() {
        let event = {
            let mut event = Event::Log(LogEvent::from("augment me"));
            event.as_mut_log().insert("bar", "is a string");
            event
        };
        let conf = RemapConfig {
            source: Some(formatdoc! {r#"
                .foo = "foo"
                abort
                .baz = 12
            "#}),
            file: None,
            drop_on_error: false,
            drop_on_abort: false,
            ..Default::default()
        };
        let mut tform = remap(conf).unwrap();
        let event = transform_one(&mut tform, event).unwrap();
        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
        assert!(event.as_log().get("foo").is_none());
        assert!(event.as_log().get("baz").is_none());
    }
    #[test]
    fn check_remap_abort_drop() {
        let event = {
            let mut event = Event::Log(LogEvent::from("augment me"));
            event.as_mut_log().insert("bar", "is a string");
            event
        };
        let conf = RemapConfig {
            source: Some(formatdoc! {r#"
                .foo = "foo"
                abort
                .baz = 12
            "#}),
            file: None,
            drop_on_error: false,
            drop_on_abort: true,
            ..Default::default()
        };
        let mut tform = remap(conf).unwrap();
        assert!(transform_one(&mut tform, event).is_none())
    }
    #[test]
    fn check_remap_metric() {
        let metric = Event::Metric(Metric::new(
            "counter",
            MetricKind::Absolute,
            MetricValue::Counter { value: 1.0 },
        ));
        let metadata = metric.metadata().clone();
        let conf = RemapConfig {
            source: Some(
                r#".tags.host = "zoobub"
                       .name = "zork"
                       .namespace = "zerk"
                       .kind = "incremental""#
                    .to_string(),
            ),
            file: None,
            drop_on_error: true,
            drop_on_abort: false,
            ..Default::default()
        };
        let mut tform = remap(conf).unwrap();
        let result = transform_one(&mut tform, metric).unwrap();
        assert_eq!(
            result,
            Event::Metric(
                Metric::new_with_metadata(
                    "zork",
                    MetricKind::Incremental,
                    MetricValue::Counter { value: 1.0 },
                    metadata
                )
                .with_namespace(Some("zerk"))
                .with_tags(Some(metric_tags! {
                    "host" => "zoobub",
                }))
            )
        );
    }
    #[test]
    fn remap_timezone_fallback() {
        let error = Event::from_json_value(
            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
            LogNamespace::Legacy,
        )
        .unwrap();
        let conf = RemapConfig {
            source: Some(formatdoc! {r#"
                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
            "#}),
            drop_on_error: true,
            drop_on_abort: true,
            reroute_dropped: true,
            ..Default::default()
        };
        let context = TransformContext {
            key: Some(ComponentKey::from("remapper")),
            globals: GlobalOptions {
                timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
                ..Default::default()
            },
            ..Default::default()
        };
        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
        let output = transform_one_fallible(&mut tform, error).unwrap();
        let log = output.as_log();
        assert_eq!(
            log["timestamp"],
            DateTime::<chrono::Utc>::from(
                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
            )
            .into()
        );
    }
    #[test]
    fn remap_timezone_override() {
        let error = Event::from_json_value(
            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
            LogNamespace::Legacy,
        )
        .unwrap();
        let conf = RemapConfig {
            source: Some(formatdoc! {r#"
                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
            "#}),
            drop_on_error: true,
            drop_on_abort: true,
            reroute_dropped: true,
            timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
            ..Default::default()
        };
        let context = TransformContext {
            key: Some(ComponentKey::from("remapper")),
            globals: GlobalOptions {
                timezone: Some(TimeZone::parse("Etc/UTC").unwrap()),
                ..Default::default()
            },
            ..Default::default()
        };
        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
        let output = transform_one_fallible(&mut tform, error).unwrap();
        let log = output.as_log();
        assert_eq!(
            log["timestamp"],
            DateTime::<chrono::Utc>::from(
                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
            )
            .into()
        );
    }
    #[test]
    fn check_remap_branching() {
        let happy =
            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
                .unwrap();
        let abort = Event::from_json_value(
            serde_json::json!({"hello": "goodbye"}),
            LogNamespace::Legacy,
        )
        .unwrap();
        let error =
            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
        let happy_metric = {
            let mut metric = Metric::new(
                "counter",
                MetricKind::Absolute,
                MetricValue::Counter { value: 1.0 },
            );
            metric.replace_tag("hello".into(), "world".into());
            Event::Metric(metric)
        };
        let abort_metric = {
            let mut metric = Metric::new(
                "counter",
                MetricKind::Absolute,
                MetricValue::Counter { value: 1.0 },
            );
            metric.replace_tag("hello".into(), "goodbye".into());
            Event::Metric(metric)
        };
        let error_metric = {
            let mut metric = Metric::new(
                "counter",
                MetricKind::Absolute,
                MetricValue::Counter { value: 1.0 },
            );
            metric.replace_tag("not_hello".into(), "oops".into());
            Event::Metric(metric)
        };
        let conf = RemapConfig {
            source: Some(formatdoc! {r#"
                if exists(.tags) {{
                    # metrics
                    .tags.foo = "bar"
                    if string!(.tags.hello) == "goodbye" {{
                      abort
                    }}
                }} else {{
                    # logs
                    .foo = "bar"
                    if string(.hello) == "goodbye" {{
                      abort
                    }}
                }}
            "#}),
            drop_on_error: true,
            drop_on_abort: true,
            reroute_dropped: true,
            ..Default::default()
        };
        let schema_definitions = HashMap::from([
            (
                None,
                [("source".into(), test_default_schema_definition())].into(),
            ),
            (
                Some(DROPPED.to_owned()),
                [("source".into(), test_dropped_schema_definition())].into(),
            ),
        ]);
        let context = TransformContext {
            key: Some(ComponentKey::from("remapper")),
            schema_definitions,
            merged_schema_definition: schema::Definition::new_with_default_metadata(
                Kind::any_object(),
                [LogNamespace::Legacy],
            )
            .with_event_field(&owned_value_path!("hello"), Kind::bytes(), None),
            ..Default::default()
        };
        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
        let output = transform_one_fallible(&mut tform, happy).unwrap();
        let log = output.as_log();
        assert_eq!(log["hello"], "world".into());
        assert_eq!(log["foo"], "bar".into());
        assert!(!log.contains(event_path!("metadata")));
        let output = transform_one_fallible(&mut tform, abort).unwrap_err();
        let log = output.as_log();
        assert_eq!(log["hello"], "goodbye".into());
        assert!(!log.contains(event_path!("foo")));
        assert_eq!(
            log["metadata"],
            serde_json::json!({
                "dropped": {
                    "reason": "abort",
                    "message": "aborted",
                    "component_id": "remapper",
                    "component_type": "remap",
                    "component_kind": "transform",
                }
            })
            .try_into()
            .unwrap()
        );
        let output = transform_one_fallible(&mut tform, error).unwrap_err();
        let log = output.as_log();
        assert_eq!(log["hello"], 42.into());
        assert!(!log.contains(event_path!("foo")));
        assert_eq!(
            log["metadata"],
            serde_json::json!({
                "dropped": {
                    "reason": "error",
                    "message": "function call error for \"string\" at (160:174): expected string, got integer",
                    "component_id": "remapper",
                    "component_type": "remap",
                    "component_kind": "transform",
                }
            })
            .try_into()
            .unwrap()
        );
        let output = transform_one_fallible(&mut tform, happy_metric).unwrap();
        similar_asserts::assert_eq!(
            output,
            Event::Metric(
                Metric::new_with_metadata(
                    "counter",
                    MetricKind::Absolute,
                    MetricValue::Counter { value: 1.0 },
                    EventMetadata::default()
                        .with_schema_definition(output.metadata().schema_definition()),
                )
                .with_tags(Some(metric_tags! {
                    "hello" => "world",
                    "foo" => "bar",
                }))
            )
        );
        let output = transform_one_fallible(&mut tform, abort_metric).unwrap_err();
        similar_asserts::assert_eq!(
            output,
            Event::Metric(
                Metric::new_with_metadata(
                    "counter",
                    MetricKind::Absolute,
                    MetricValue::Counter { value: 1.0 },
                    EventMetadata::default()
                        .with_schema_definition(output.metadata().schema_definition()),
                )
                .with_tags(Some(metric_tags! {
                    "hello" => "goodbye",
                    "metadata.dropped.reason" => "abort",
                    "metadata.dropped.component_id" => "remapper",
                    "metadata.dropped.component_type" => "remap",
                    "metadata.dropped.component_kind" => "transform",
                }))
            )
        );
        let output = transform_one_fallible(&mut tform, error_metric).unwrap_err();
        similar_asserts::assert_eq!(
            output,
            Event::Metric(
                Metric::new_with_metadata(
                    "counter",
                    MetricKind::Absolute,
                    MetricValue::Counter { value: 1.0 },
                    EventMetadata::default()
                        .with_schema_definition(output.metadata().schema_definition()),
                )
                .with_tags(Some(metric_tags! {
                    "not_hello" => "oops",
                    "metadata.dropped.reason" => "error",
                    "metadata.dropped.component_id" => "remapper",
                    "metadata.dropped.component_type" => "remap",
                    "metadata.dropped.component_kind" => "transform",
                }))
            )
        );
    }
    #[test]
    fn check_remap_branching_assert_with_message() {
        let error_trigger_assert_custom_message =
            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
        let error_trigger_default_assert_message =
            Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
        let conf = RemapConfig {
            source: Some(formatdoc! {r#"
                assert_eq!(.hello, 0, "custom message here")
                assert_eq!(.hello, 1)
            "#}),
            drop_on_error: true,
            drop_on_abort: true,
            reroute_dropped: true,
            ..Default::default()
        };
        let context = TransformContext {
            key: Some(ComponentKey::from("remapper")),
            ..Default::default()
        };
        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
        let output =
            transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err();
        let log = output.as_log();
        assert_eq!(log["hello"], 42.into());
        assert!(!log.contains(event_path!("foo")));
        assert_eq!(
            log["metadata"],
            serde_json::json!({
                "dropped": {
                    "reason": "error",
                    "message": "custom message here",
                    "component_id": "remapper",
                    "component_type": "remap",
                    "component_kind": "transform",
                }
            })
            .try_into()
            .unwrap()
        );
        let output =
            transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err();
        let log = output.as_log();
        assert_eq!(log["hello"], 0.into());
        assert!(!log.contains(event_path!("foo")));
        assert_eq!(
            log["metadata"],
            serde_json::json!({
                "dropped": {
                    "reason": "error",
                    "message": "function call error for \"assert_eq\" at (45:66): assertion failed: 0 == 1",
                    "component_id": "remapper",
                    "component_type": "remap",
                    "component_kind": "transform",
                }
            })
            .try_into()
            .unwrap()
        );
    }
    #[test]
    fn check_remap_branching_abort_with_message() {
        let error =
            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
        let conf = RemapConfig {
            source: Some(formatdoc! {r#"
                abort "custom message here"
            "#}),
            drop_on_error: true,
            drop_on_abort: true,
            reroute_dropped: true,
            ..Default::default()
        };
        let context = TransformContext {
            key: Some(ComponentKey::from("remapper")),
            ..Default::default()
        };
        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
        let output = transform_one_fallible(&mut tform, error).unwrap_err();
        let log = output.as_log();
        assert_eq!(log["hello"], 42.into());
        assert!(!log.contains(event_path!("foo")));
        assert_eq!(
            log["metadata"],
            serde_json::json!({
                "dropped": {
                    "reason": "abort",
                    "message": "custom message here",
                    "component_id": "remapper",
                    "component_type": "remap",
                    "component_kind": "transform",
                }
            })
            .try_into()
            .unwrap()
        );
    }
    #[test]
    fn check_remap_branching_disabled() {
        let happy =
            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
                .unwrap();
        let abort = Event::from_json_value(
            serde_json::json!({"hello": "goodbye"}),
            LogNamespace::Legacy,
        )
        .unwrap();
        let error =
            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
        let conf = RemapConfig {
            source: Some(formatdoc! {r#"
                if exists(.tags) {{
                    # metrics
                    .tags.foo = "bar"
                    if string!(.tags.hello) == "goodbye" {{
                      abort
                    }}
                }} else {{
                    # logs
                    .foo = "bar"
                    if string!(.hello) == "goodbye" {{
                      abort
                    }}
                }}
            "#}),
            drop_on_error: true,
            drop_on_abort: true,
            reroute_dropped: false,
            ..Default::default()
        };
        let schema_definition = schema::Definition::new_with_default_metadata(
            Kind::any_object(),
            [LogNamespace::Legacy],
        )
        .with_event_field(&owned_value_path!("foo"), Kind::any(), None)
        .with_event_field(&owned_value_path!("tags"), Kind::any(), None);
        assert_eq!(
            conf.outputs(
                vector_lib::enrichment::TableRegistry::default(),
                &[(
                    "test".into(),
                    schema::Definition::new_with_default_metadata(
                        Kind::any_object(),
                        [LogNamespace::Legacy]
                    )
                )],
                LogNamespace::Legacy
            ),
            vec![TransformOutput::new(
                DataType::all_bits(),
                [("test".into(), schema_definition)].into()
            )]
        );
        let context = TransformContext {
            key: Some(ComponentKey::from("remapper")),
            ..Default::default()
        };
        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
        let output = transform_one_fallible(&mut tform, happy).unwrap();
        let log = output.as_log();
        assert_eq!(log["hello"], "world".into());
        assert_eq!(log["foo"], "bar".into());
        assert!(!log.contains(event_path!("metadata")));
        let out = collect_outputs(&mut tform, abort);
        assert!(out.primary.is_empty());
        assert!(out.named[DROPPED].is_empty());
        let out = collect_outputs(&mut tform, error);
        assert!(out.primary.is_empty());
        assert!(out.named[DROPPED].is_empty());
    }
    #[tokio::test]
    async fn check_remap_branching_metrics_with_output() {
        init_test();
        let config: ConfigBuilder = toml::from_str(indoc! {r#"
            [transforms.foo]
            inputs = []
            type = "remap"
            drop_on_abort = true
            reroute_dropped = true
            source = "abort"
            [[tests]]
            name = "metric output"
            [tests.input]
                insert_at = "foo"
                value = "none"
            [[tests.outputs]]
                extract_from = "foo.dropped"
                [[tests.outputs.conditions]]
                type = "vrl"
                source = "true"
        "#})
        .unwrap();
        let mut tests = build_unit_tests(config).await.unwrap();
        assert!(tests.remove(0).run().await.errors.is_empty());
        COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
    }
    struct CollectedOuput {
        primary: OutputBuffer,
        named: HashMap<String, OutputBuffer>,
    }
    fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOuput {
        let mut outputs = TransformOutputsBuf::new_with_capacity(
            vec![
                TransformOutput::new(DataType::all_bits(), HashMap::new()),
                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
            ],
            1,
        );
        ft.transform(event, &mut outputs);
        CollectedOuput {
            primary: outputs.take_primary(),
            named: outputs.take_all_named(),
        }
    }
    fn transform_one(ft: &mut dyn SyncTransform, event: Event) -> Option<Event> {
        let out = collect_outputs(ft, event);
        assert_eq!(0, out.named.values().map(|v| v.len()).sum::<usize>());
        assert!(out.primary.len() <= 1);
        out.primary.into_events().next()
    }
    fn transform_one_fallible(
        ft: &mut dyn SyncTransform,
        event: Event,
    ) -> std::result::Result<Event, Event> {
        let mut outputs = TransformOutputsBuf::new_with_capacity(
            vec![
                TransformOutput::new(DataType::all_bits(), HashMap::new()),
                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
            ],
            1,
        );
        ft.transform(event, &mut outputs);
        let mut buf = outputs.drain().collect::<Vec<_>>();
        let mut err_buf = outputs.drain_named(DROPPED).collect::<Vec<_>>();
        assert!(buf.len() < 2);
        assert!(err_buf.len() < 2);
        match (buf.pop(), err_buf.pop()) {
            (Some(good), None) => Ok(good),
            (None, Some(bad)) => Err(bad),
            (a, b) => panic!("expected output xor error output, got {:?} and {:?}", a, b),
        }
    }
    #[tokio::test]
    async fn emits_internal_events() {
        assert_transform_compliance(async move {
            let config = RemapConfig {
                source: Some("abort".to_owned()),
                drop_on_abort: true,
                ..Default::default()
            };
            let (tx, rx) = mpsc::channel(1);
            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
            let log = LogEvent::from("hello world");
            tx.send(log.into()).await.unwrap();
            drop(tx);
            topology.stop().await;
            assert_eq!(out.recv().await, None);
        })
        .await
    }
    #[test]
    fn test_combined_transforms_simple() {
        let transform1 = RemapConfig {
            source: Some(r#".thing = "potato""#.to_string()),
            ..Default::default()
        };
        let transform2 = RemapConfig {
            source: Some(".thang = .thing".to_string()),
            ..Default::default()
        };
        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
        let outputs1 = transform1.outputs(
            enrichment_tables.clone(),
            &[("in".into(), schema::Definition::default_legacy_namespace())],
            LogNamespace::Legacy,
        );
        assert_eq!(
            vec![TransformOutput::new(
                DataType::all_bits(),
                [(
                    "in".into(),
                    Definition::default_legacy_namespace().with_event_field(
                        &owned_value_path!("thing"),
                        Kind::bytes(),
                        None
                    ),
                )]
                .into()
            )],
            outputs1
        );
        let outputs2 = transform2.outputs(
            enrichment_tables,
            &[(
                "in1".into(),
                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
            )],
            LogNamespace::Legacy,
        );
        assert_eq!(
            vec![TransformOutput::new(
                DataType::all_bits(),
                [(
                    "in1".into(),
                    Definition::default_legacy_namespace()
                        .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None)
                        .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None),
                )]
                .into(),
            )],
            outputs2
        );
    }
    #[test]
    fn test_combined_transforms_unnest() {
        let transform1 = RemapConfig {
            source: Some(
                indoc! {
                r#"
                .thing = [{"cabbage": 32}, {"parsnips": 45}]
                . = unnest(.thing)
                "#
                }
                .to_string(),
            ),
            ..Default::default()
        };
        let transform2 = RemapConfig {
            source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()),
            ..Default::default()
        };
        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
        let outputs1 = transform1.outputs(
            enrichment_tables.clone(),
            &[(
                "in".into(),
                schema::Definition::new_with_default_metadata(
                    Kind::any_object(),
                    [LogNamespace::Legacy],
                ),
            )],
            LogNamespace::Legacy,
        );
        assert_eq!(
            vec![TransformOutput::new(
                DataType::all_bits(),
                [(
                    "in".into(),
                    Definition::new_with_default_metadata(
                        Kind::any_object(),
                        [LogNamespace::Legacy]
                    )
                    .with_event_field(
                        &owned_value_path!("thing"),
                        Kind::object(Collection::from(BTreeMap::from([
                            ("cabbage".into(), Kind::integer().or_undefined(),),
                            ("parsnips".into(), Kind::integer().or_undefined(),)
                        ]))),
                        None
                    ),
                )]
                .into(),
            )],
            outputs1
        );
        let outputs2 = transform2.outputs(
            enrichment_tables,
            &[(
                "in1".into(),
                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
            )],
            LogNamespace::Legacy,
        );
        assert_eq!(
            vec![TransformOutput::new(
                DataType::all_bits(),
                [(
                    "in1".into(),
                    Definition::default_legacy_namespace()
                        .with_event_field(
                            &owned_value_path!("thing"),
                            Kind::object(Collection::from(BTreeMap::from([
                                ("cabbage".into(), Kind::integer().or_undefined(),),
                                ("parsnips".into(), Kind::integer().or_undefined(),)
                            ]))),
                            None
                        )
                        .with_event_field(
                            &owned_value_path!("thang"),
                            Kind::integer().or_null(),
                            None
                        ),
                )]
                .into(),
            )],
            outputs2
        );
    }
    #[test]
    fn test_transform_abort() {
        let transform1 = RemapConfig {
            source: Some(r"abort".to_string()),
            ..Default::default()
        };
        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
        let outputs1 = transform1.outputs(
            enrichment_tables,
            &[(
                "in".into(),
                schema::Definition::new_with_default_metadata(
                    Kind::any_object(),
                    [LogNamespace::Legacy],
                ),
            )],
            LogNamespace::Legacy,
        );
        assert_eq!(
            vec![TransformOutput::new(
                DataType::all_bits(),
                [(
                    "in".into(),
                    Definition::new_with_default_metadata(
                        Kind::any_object(),
                        [LogNamespace::Legacy]
                    ),
                )]
                .into(),
            )],
            outputs1
        );
    }
    #[test]
    fn test_error_outputs() {
        let transform1 = RemapConfig {
            source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()),
            reroute_dropped: true,
            ..Default::default()
        };
        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
        let outputs1 = transform1.outputs(
            enrichment_tables,
            &[(
                "in".into(),
                schema::Definition::new_with_default_metadata(
                    Kind::any_object(),
                    [LogNamespace::Legacy],
                ),
            )],
            LogNamespace::Legacy,
        );
        assert_eq!(
            HashSet::from([None, Some("dropped".to_string())]),
            outputs1
                .into_iter()
                .map(|output| output.port)
                .collect::<HashSet<_>>()
        );
    }
    #[test]
    fn test_non_object_events() {
        let transform1 = RemapConfig {
            source: Some(r#". = "fish" "#.to_string()),
            ..Default::default()
        };
        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
        let outputs1 = transform1.outputs(
            enrichment_tables,
            &[(
                "in".into(),
                schema::Definition::new_with_default_metadata(
                    Kind::any_object(),
                    [LogNamespace::Legacy],
                ),
            )],
            LogNamespace::Legacy,
        );
        let wanted = schema::Definition::new_with_default_metadata(
            Kind::object(Collection::from_unknown(Kind::undefined())),
            [LogNamespace::Legacy],
        )
        .with_event_field(&owned_value_path!("message"), Kind::bytes(), None);
        assert_eq!(
            HashMap::from([(OutputId::from("in"), wanted)]),
            outputs1[0].schema_definitions(true),
        );
    }
    #[test]
    fn test_array_and_non_object_events() {
        let transform1 = RemapConfig {
            source: Some(
                indoc! {r#"
                    if .lizard == true {
                        .thing = [{"cabbage": 42}];
                        . = unnest(.thing)
                    } else {
                      . = "fish"
                    }
                    "#}
                .to_string(),
            ),
            ..Default::default()
        };
        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
        let outputs1 = transform1.outputs(
            enrichment_tables,
            &[(
                "in".into(),
                schema::Definition::new_with_default_metadata(
                    Kind::any_object(),
                    [LogNamespace::Legacy],
                ),
            )],
            LogNamespace::Legacy,
        );
        let wanted = schema::Definition::new_with_default_metadata(
            Kind::any_object(),
            [LogNamespace::Legacy],
        )
        .with_event_field(&owned_value_path!("message"), Kind::any(), None)
        .with_event_field(
            &owned_value_path!("thing"),
            Kind::object(Collection::from(BTreeMap::from([(
                "cabbage".into(),
                Kind::integer(),
            )])))
            .or_undefined(),
            None,
        );
        assert_eq!(
            HashMap::from([(OutputId::from("in"), wanted)]),
            outputs1[0].schema_definitions(true),
        );
    }
    #[test]
    fn check_remap_array_vector_namespace() {
        let event = {
            let mut event = LogEvent::from("input");
            event
                .metadata_mut()
                .value_mut()
                .insert("vector", BTreeMap::new());
            Event::from(event)
        };
        let conf = RemapConfig {
            source: Some(
                r". = [null]
"
                .to_string(),
            ),
            file: None,
            drop_on_error: true,
            drop_on_abort: false,
            ..Default::default()
        };
        let mut tform = remap(conf.clone()).unwrap();
        let result = transform_one(&mut tform, event).unwrap();
        assert_eq!(result.as_log().get("."), Some(&Value::Null));
        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
        let outputs1 = conf.outputs(
            enrichment_tables,
            &[(
                "in".into(),
                schema::Definition::new_with_default_metadata(
                    Kind::any_object(),
                    [LogNamespace::Vector],
                ),
            )],
            LogNamespace::Vector,
        );
        let wanted =
            schema::Definition::new_with_default_metadata(Kind::null(), [LogNamespace::Vector]);
        assert_eq!(
            HashMap::from([(OutputId::from("in"), wanted)]),
            outputs1[0].schema_definitions(true),
        );
    }
    fn assert_no_metrics(source: String) {
        vector_lib::metrics::init_test();
        let config = RemapConfig {
            source: Some(source),
            drop_on_error: true,
            drop_on_abort: true,
            reroute_dropped: true,
            ..Default::default()
        };
        let mut ast_runner = remap(config).unwrap();
        let input_event =
            Event::from_json_value(serde_json::json!({"a": 42}), LogNamespace::Vector).unwrap();
        let dropped_event = transform_one_fallible(&mut ast_runner, input_event).unwrap_err();
        let dropped_log = dropped_event.as_log();
        assert_eq!(dropped_log.get(event_path!("a")), Some(&Value::from(42)));
        let controller = Controller::get().expect("no controller");
        let metrics = controller
            .capture_metrics()
            .into_iter()
            .map(|metric| (metric.name().to_string(), metric))
            .collect::<BTreeMap<String, Metric>>();
        assert_eq!(metrics.get("component_discarded_events_total"), None);
        assert_eq!(metrics.get("component_errors_total"), None);
    }
    #[test]
    fn do_not_emit_metrics_when_dropped() {
        assert_no_metrics("abort".to_string());
    }
    #[test]
    fn do_not_emit_metrics_when_errored() {
        assert_no_metrics("parse_key_value!(.message)".to_string());
    }
}