Skip to main content

vector/transforms/
remap.rs

1// Derivative's Debug impl generates `let _ = field.fmt(f)` which triggers this lint.
2#![allow(clippy::let_underscore_must_use)]
3
4use std::{
5    collections::{BTreeMap, HashMap},
6    fs::File,
7    io::{self, Read},
8    path::PathBuf,
9    sync::Mutex,
10};
11
12use snafu::{ResultExt, Snafu};
13use vector_lib::{
14    TimeZone,
15    codecs::MetricTagValues,
16    compile_vrl,
17    config::LogNamespace,
18    configurable::configurable_component,
19    enrichment::TableRegistry,
20    lookup::{PathPrefix, metadata_path, owned_value_path},
21    schema::Definition,
22};
23use vector_vrl_functions::set_semantic_meaning::MeaningList;
24use vector_vrl_metrics::MetricsStorage;
25use vrl::{
26    compiler::{
27        CompileConfig, ExpressionError, Program, TypeState, VrlRuntime,
28        runtime::{Runtime, Terminate},
29        state::ExternalEnv,
30    },
31    diagnostic::{DiagnosticMessage, Note},
32    path,
33    path::ValuePath,
34    value::{Kind, Value},
35};
36
37use crate::{
38    Result,
39    config::{
40        ComponentKey, DataType, Input, OutputId, TransformConfig, TransformContext,
41        TransformOutput, log_schema,
42    },
43    event::{Event, TargetEvents, VrlTarget},
44    format_vrl_diagnostics,
45    internal_events::{RemapMappingAbort, RemapMappingError},
46    schema,
47    transforms::{SyncTransform, Transform, TransformOutputsBuf},
48};
49
50const DROPPED: &str = "dropped";
51type CacheKey = (TableRegistry, schema::Definition);
52type CacheValue = (Program, String, MeaningList);
53
54/// Configuration for the `remap` transform.
55#[configurable_component(transform(
56    "remap",
57    "Modify your observability data as it passes through your topology using Vector Remap Language (VRL)."
58))]
59#[derive(Derivative)]
60#[serde(deny_unknown_fields)]
61#[derivative(Default, Debug)]
62pub struct RemapConfig {
63    /// The [Vector Remap Language][vrl] (VRL) program to execute for each event.
64    ///
65    /// Required if `file` is missing.
66    ///
67    /// [vrl]: https://vector.dev/docs/reference/vrl
68    #[configurable(metadata(
69        docs::examples = ". = parse_json!(.message)\n.new_field = \"new value\"\n.status = to_int!(.status)\n.duration = parse_duration!(.duration, \"s\")\n.new_name = del(.old_name)",
70        docs::syntax_override = "remap_program"
71    ))]
72    pub source: Option<String>,
73
74    /// File path to the [Vector Remap Language][vrl] (VRL) program to execute for each event.
75    ///
76    /// If a relative path is provided, its root is the current working directory.
77    ///
78    /// Required if `source` is missing.
79    ///
80    /// [vrl]: https://vector.dev/docs/reference/vrl
81    #[configurable(metadata(docs::examples = "./my/program.vrl"))]
82    pub file: Option<PathBuf>,
83
84    /// File paths to the [Vector Remap Language][vrl] (VRL) programs to execute for each event.
85    ///
86    /// If a relative path is provided, its root is the current working directory.
87    ///
88    /// Required if `source` or `file` are missing.
89    ///
90    /// [vrl]: https://vector.dev/docs/reference/vrl
91    #[configurable(metadata(docs::examples = "['./my/program.vrl', './my/program2.vrl']"))]
92    pub files: Option<Vec<PathBuf>>,
93
94    /// When set to `single`, metric tag values are exposed as single strings, the
95    /// same as they were before this config option. Tags with multiple values show the last assigned value, and null values
96    /// are ignored.
97    ///
98    /// When set to `full`, all metric tags are exposed as arrays of either string or null
99    /// values.
100    #[serde(default)]
101    pub metric_tag_values: MetricTagValues,
102
103    /// The name of the timezone to apply to timestamp conversions that do not contain an explicit
104    /// time zone.
105    ///
106    /// This overrides the [global `timezone`][global_timezone] option. The time zone name may be
107    /// any name in the [TZ database][tz_database], or `local` to indicate system local time.
108    ///
109    /// [global_timezone]: https://vector.dev/docs/reference/configuration//global-options#timezone
110    /// [tz_database]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
111    #[serde(default)]
112    #[configurable(metadata(docs::advanced))]
113    pub timezone: Option<TimeZone>,
114
115    /// Drops any event that encounters an error during processing.
116    ///
117    /// Normally, if a VRL program encounters an error when processing an event, the original,
118    /// unmodified event is sent downstream. In some cases, you may not want to send the event
119    /// any further, such as if certain transformation or enrichment is strictly required. Setting
120    /// `drop_on_error` to `true` allows you to ensure these events do not get processed any
121    /// further.
122    ///
123    /// Additionally, dropped events can potentially be diverted to a specially named output for
124    /// further logging and analysis by setting `reroute_dropped`.
125    #[serde(default = "crate::serde::default_false")]
126    #[configurable(metadata(docs::human_name = "Drop Event on Error"))]
127    pub drop_on_error: bool,
128
129    /// Drops any event that is manually aborted during processing.
130    ///
131    /// If a VRL program is manually aborted (using [`abort`][vrl_docs_abort]) when
132    /// processing an event, this option controls whether the original, unmodified event is sent
133    /// downstream without any modifications or if it is dropped.
134    ///
135    /// Additionally, dropped events can potentially be diverted to a specially-named output for
136    /// further logging and analysis by setting `reroute_dropped`.
137    ///
138    /// [vrl_docs_abort]: https://vector.dev/docs/reference/vrl/expressions/#abort
139    #[serde(default = "crate::serde::default_true")]
140    #[configurable(metadata(docs::human_name = "Drop Event on Abort"))]
141    pub drop_on_abort: bool,
142
143    /// Reroutes dropped events to a named output instead of halting processing on them.
144    ///
145    /// When using `drop_on_error` or `drop_on_abort`, events that are "dropped" are processed no
146    /// further. In some cases, it may be desirable to keep the events around for further analysis,
147    /// debugging, or retrying.
148    ///
149    /// In these cases, `reroute_dropped` can be set to `true` which forwards the original event
150    /// to a specially-named output, `dropped`. The original event is annotated with additional
151    /// fields describing why the event was dropped.
152    #[serde(default = "crate::serde::default_false")]
153    #[configurable(metadata(docs::human_name = "Reroute Dropped Events"))]
154    pub reroute_dropped: bool,
155
156    #[configurable(derived, metadata(docs::hidden))]
157    #[serde(default)]
158    pub runtime: VrlRuntime,
159
160    #[configurable(derived, metadata(docs::hidden))]
161    #[serde(skip)]
162    #[derivative(Debug = "ignore")]
163    /// Cache can't be `BTreeMap` or `HashMap` because of `TableRegistry`, which doesn't allow us to inspect tables inside it.
164    /// And even if we allowed the inspection, the tables can be huge, resulting in a long comparison or hash computation
165    /// while using `Vec` allows us to use just a shallow comparison
166    pub cache: Mutex<Vec<(CacheKey, std::result::Result<CacheValue, String>)>>,
167}
168
169impl Clone for RemapConfig {
170    fn clone(&self) -> Self {
171        Self {
172            source: self.source.clone(),
173            file: self.file.clone(),
174            files: self.files.clone(),
175            metric_tag_values: self.metric_tag_values,
176            timezone: self.timezone,
177            drop_on_error: self.drop_on_error,
178            drop_on_abort: self.drop_on_abort,
179            reroute_dropped: self.reroute_dropped,
180            runtime: self.runtime,
181            cache: Mutex::new(Default::default()),
182        }
183    }
184}
185
186impl RemapConfig {
187    fn compile_vrl_program(
188        &self,
189        enrichment_tables: TableRegistry,
190        metrics_storage: MetricsStorage,
191        merged_schema_definition: schema::Definition,
192    ) -> Result<(Program, String, MeaningList)> {
193        if let Some((_, res)) = self
194            .cache
195            .lock()
196            .expect("Data poisoned")
197            .iter()
198            .find(|v| v.0.0 == enrichment_tables && v.0.1 == merged_schema_definition)
199        {
200            return res.clone().map_err(Into::into);
201        }
202
203        let source = match (&self.source, &self.file, &self.files) {
204            (Some(source), None, None) => source.to_owned(),
205            (None, Some(path), None) => Self::read_file(path)?,
206            (None, None, Some(paths)) => {
207                let mut combined_source = String::new();
208                for path in paths {
209                    let content = Self::read_file(path)?;
210                    combined_source.push_str(&content);
211                    combined_source.push('\n');
212                }
213                combined_source
214            }
215            _ => return Err(Box::new(BuildError::SourceAndOrFileOrFiles)),
216        };
217
218        let state = TypeState {
219            local: Default::default(),
220            external: ExternalEnv::new_with_kind(
221                merged_schema_definition.event_kind().clone(),
222                merged_schema_definition.metadata_kind().clone(),
223            ),
224        };
225        let mut config = CompileConfig::default();
226
227        config.set_custom(enrichment_tables.clone());
228        config.set_custom(metrics_storage);
229        config.set_custom(MeaningList::default());
230
231        let res = compile_vrl(&source, &vector_vrl_functions::all(), &state, config)
232            .map_err(|diagnostics| format_vrl_diagnostics(&source, diagnostics))
233            .map(|result| {
234                (
235                    result.program,
236                    format_vrl_diagnostics(&source, result.warnings),
237                    result.config.get_custom::<MeaningList>().unwrap().clone(),
238                )
239            });
240
241        self.cache
242            .lock()
243            .expect("Data poisoned")
244            .push(((enrichment_tables, merged_schema_definition), res.clone()));
245
246        res.map_err(Into::into)
247    }
248
249    fn read_file(path: &PathBuf) -> Result<String> {
250        let mut buffer = String::new();
251        File::open(path)
252            .with_context(|_| FileOpenFailedSnafu { path })?
253            .read_to_string(&mut buffer)
254            .with_context(|_| FileReadFailedSnafu { path })?;
255        Ok(buffer)
256    }
257}
258
259impl_generate_config_from_default!(RemapConfig);
260
261#[async_trait::async_trait]
262#[typetag::serde(name = "remap")]
263impl TransformConfig for RemapConfig {
264    async fn build(&self, context: &TransformContext) -> Result<Transform> {
265        let (transform, warnings) = match self.runtime {
266            VrlRuntime::Ast => {
267                let (remap, warnings) = Remap::new_ast(self.clone(), context)?;
268                (Transform::synchronous(remap), warnings)
269            }
270        };
271
272        // TODO: We could improve on this by adding support for non-fatal error
273        // messages in the topology. This would make the topology responsible
274        // for printing warnings (including potentially emitting metrics),
275        // instead of individual transforms.
276        if !warnings.is_empty() {
277            warn!(message = "VRL compilation warning.", %warnings);
278        }
279
280        Ok(transform)
281    }
282
283    fn input(&self) -> Input {
284        Input::all()
285    }
286
287    fn outputs(
288        &self,
289        context: &TransformContext,
290        input_definitions: &[(OutputId, schema::Definition)],
291    ) -> Vec<TransformOutput> {
292        let merged_definition: Definition = input_definitions
293            .iter()
294            .map(|(_output, definition)| definition.clone())
295            .reduce(Definition::merge)
296            .unwrap_or_else(Definition::any);
297
298        // We need to compile the VRL program in order to know the schema definition output of this
299        // transform. We ignore any compilation errors, as those are caught by the transform build
300        // step.
301        let compiled = self
302            .compile_vrl_program(
303                context.enrichment_tables.clone(),
304                context.metrics_storage.clone(),
305                merged_definition,
306            )
307            .map(|(program, _, meaning_list)| (program.final_type_info().state, meaning_list.0))
308            .map_err(|_| ());
309
310        let mut dropped_definitions = HashMap::new();
311        let mut default_definitions = HashMap::new();
312
313        for (output_id, input_definition) in input_definitions {
314            let default_definition = compiled
315                .clone()
316                .map(|(state, meaning)| {
317                    let mut new_type_def = Definition::new(
318                        state.external.target_kind().clone(),
319                        state.external.metadata_kind().clone(),
320                        input_definition.log_namespaces().clone(),
321                    );
322
323                    for (id, path) in input_definition.meanings() {
324                        // Attempt to copy over the meanings from the input definition.
325                        // The function will fail if the meaning that now points to a field that no longer exists,
326                        // this is fine since we will no longer want that meaning in the output definition.
327                        new_type_def.try_with_meaning(path.clone(), id).ok();
328                    }
329
330                    // Apply any semantic meanings set in the VRL program
331                    for (id, path) in meaning {
332                        // currently only event paths are supported
333                        new_type_def = new_type_def.with_meaning(path, &id);
334                    }
335                    new_type_def
336                })
337                .unwrap_or_else(|_| {
338                    Definition::new_with_default_metadata(
339                        // The program failed to compile, so it can "never" return a value
340                        Kind::never(),
341                        input_definition.log_namespaces().clone(),
342                    )
343                });
344
345            // When a message is dropped and re-routed, we keep the original event, but also annotate
346            // it with additional metadata.
347            let dropped_definition = Definition::combine_log_namespaces(
348                input_definition.log_namespaces(),
349                input_definition.clone().with_event_field(
350                    log_schema().metadata_key().expect("valid metadata key"),
351                    Kind::object(BTreeMap::from([
352                        ("reason".into(), Kind::bytes()),
353                        ("message".into(), Kind::bytes()),
354                        ("component_id".into(), Kind::bytes()),
355                        ("component_type".into(), Kind::bytes()),
356                        ("component_kind".into(), Kind::bytes()),
357                    ])),
358                    Some("metadata"),
359                ),
360                input_definition
361                    .clone()
362                    .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None)
363                    .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None)
364                    .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None)
365                    .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None)
366                    .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None),
367            );
368
369            default_definitions.insert(
370                output_id.clone(),
371                VrlTarget::modify_schema_definition_for_into_events(default_definition),
372            );
373            dropped_definitions.insert(
374                output_id.clone(),
375                VrlTarget::modify_schema_definition_for_into_events(dropped_definition),
376            );
377        }
378
379        let default_output = TransformOutput::new(DataType::all_bits(), default_definitions);
380
381        if self.reroute_dropped {
382            vec![
383                default_output,
384                TransformOutput::new(DataType::all_bits(), dropped_definitions).with_port(DROPPED),
385            ]
386        } else {
387            vec![default_output]
388        }
389    }
390
391    fn enable_concurrency(&self) -> bool {
392        true
393    }
394
395    fn files_to_watch(&self) -> Vec<&PathBuf> {
396        self.file
397            .iter()
398            .chain(self.files.iter().flatten())
399            .collect()
400    }
401}
402
403#[derive(Debug, Clone)]
404pub struct Remap<Runner>
405where
406    Runner: VrlRunner,
407{
408    component_key: Option<ComponentKey>,
409    program: Program,
410    timezone: TimeZone,
411    drop_on_error: bool,
412    drop_on_abort: bool,
413    reroute_dropped: bool,
414    runner: Runner,
415    metric_tag_values: MetricTagValues,
416}
417
418pub trait VrlRunner {
419    fn run(
420        &mut self,
421        target: &mut VrlTarget,
422        program: &Program,
423        timezone: &TimeZone,
424    ) -> std::result::Result<Value, Terminate>;
425}
426
427#[derive(Debug)]
428pub struct AstRunner {
429    pub runtime: Runtime,
430}
431
432impl Clone for AstRunner {
433    fn clone(&self) -> Self {
434        Self {
435            runtime: Runtime::default(),
436        }
437    }
438}
439
440impl VrlRunner for AstRunner {
441    fn run(
442        &mut self,
443        target: &mut VrlTarget,
444        program: &Program,
445        timezone: &TimeZone,
446    ) -> std::result::Result<Value, Terminate> {
447        let result = self.runtime.resolve(target, program, timezone);
448        self.runtime.clear();
449        result
450    }
451}
452
453impl Remap<AstRunner> {
454    pub fn new_ast(
455        config: RemapConfig,
456        context: &TransformContext,
457    ) -> crate::Result<(Self, String)> {
458        let (program, warnings, _) = config.compile_vrl_program(
459            context.enrichment_tables.clone(),
460            context.metrics_storage.clone(),
461            context.merged_schema_definition.clone(),
462        )?;
463
464        let runtime = Runtime::default();
465        let runner = AstRunner { runtime };
466
467        Self::new(config, context, program, runner).map(|remap| (remap, warnings))
468    }
469}
470
471impl<Runner> Remap<Runner>
472where
473    Runner: VrlRunner,
474{
475    fn new(
476        config: RemapConfig,
477        context: &TransformContext,
478        program: Program,
479        runner: Runner,
480    ) -> crate::Result<Self> {
481        Ok(Remap {
482            component_key: context.key.clone(),
483            program,
484            timezone: config
485                .timezone
486                .unwrap_or_else(|| context.globals.timezone()),
487            drop_on_error: config.drop_on_error,
488            drop_on_abort: config.drop_on_abort,
489            reroute_dropped: config.reroute_dropped,
490            runner,
491            metric_tag_values: config.metric_tag_values,
492        })
493    }
494
495    #[cfg(test)]
496    const fn runner(&self) -> &Runner {
497        &self.runner
498    }
499
500    fn dropped_data(&self, reason: &str, error: ExpressionError) -> serde_json::Value {
501        let message = error
502            .notes()
503            .iter()
504            .rfind(|note| matches!(note, Note::UserErrorMessage(_)))
505            .map(|note| note.to_string())
506            .unwrap_or_else(|| error.to_string());
507        serde_json::json!({
508                "reason": reason,
509                "message": message,
510                "component_id": self.component_key,
511                "component_type": "remap",
512                "component_kind": "transform",
513        })
514    }
515
516    fn annotate_dropped(&self, event: &mut Event, reason: &str, error: ExpressionError) {
517        match event {
518            Event::Log(log) => match log.namespace() {
519                LogNamespace::Legacy => {
520                    if let Some(metadata_key) = log_schema().metadata_key() {
521                        log.insert(
522                            (PathPrefix::Event, metadata_key.concat(path!("dropped"))),
523                            self.dropped_data(reason, error),
524                        );
525                    }
526                }
527                LogNamespace::Vector => {
528                    log.insert(
529                        metadata_path!("vector", "dropped"),
530                        self.dropped_data(reason, error),
531                    );
532                }
533            },
534            Event::Metric(metric) => {
535                if let Some(metadata_key) = log_schema().metadata_key() {
536                    metric.replace_tag(format!("{metadata_key}.dropped.reason"), reason.into());
537                    metric.replace_tag(
538                        format!("{metadata_key}.dropped.component_id"),
539                        self.component_key
540                            .as_ref()
541                            .map(ToString::to_string)
542                            .unwrap_or_default(),
543                    );
544                    metric.replace_tag(
545                        format!("{metadata_key}.dropped.component_type"),
546                        "remap".into(),
547                    );
548                    metric.replace_tag(
549                        format!("{metadata_key}.dropped.component_kind"),
550                        "transform".into(),
551                    );
552                }
553            }
554            Event::Trace(trace) => {
555                trace.maybe_insert(log_schema().metadata_key_target_path(), || {
556                    self.dropped_data(reason, error).into()
557                });
558            }
559        }
560    }
561
562    fn run_vrl(&mut self, target: &mut VrlTarget) -> std::result::Result<Value, Terminate> {
563        self.runner.run(target, &self.program, &self.timezone)
564    }
565}
566
567impl<Runner> SyncTransform for Remap<Runner>
568where
569    Runner: VrlRunner + Clone + Send + Sync,
570{
571    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
572        // If a program can fail or abort at runtime and we know that we will still need to forward
573        // the event in that case (either to the main output or `dropped`, depending on the
574        // config), we need to clone the original event and keep it around, to allow us to discard
575        // any mutations made to the event while the VRL program runs, before it failed or aborted.
576        //
577        // The `drop_on_{error, abort}` transform config allows operators to remove events from the
578        // main output if they're failed or aborted, in which case we can skip the cloning, since
579        // any mutations made by VRL will be ignored regardless. If they have configured
580        // `reroute_dropped`, however, we still need to do the clone to ensure that we can forward
581        // the event to the `dropped` output.
582        let forward_on_error = !self.drop_on_error || self.reroute_dropped;
583        let forward_on_abort = !self.drop_on_abort || self.reroute_dropped;
584        let original_event = if (self.program.info().fallible && forward_on_error)
585            || (self.program.info().abortable && forward_on_abort)
586        {
587            Some(event.clone())
588        } else {
589            None
590        };
591
592        let log_namespace = event
593            .maybe_as_log()
594            .map(|log| log.namespace())
595            .unwrap_or(LogNamespace::Legacy);
596
597        let mut target = VrlTarget::new(
598            event,
599            self.program.info(),
600            match self.metric_tag_values {
601                MetricTagValues::Single => false,
602                MetricTagValues::Full => true,
603            },
604        );
605        let result = self.run_vrl(&mut target);
606
607        match result {
608            Ok(_) => match target.into_events(log_namespace) {
609                TargetEvents::One(event) => push_default(event, output),
610                TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)),
611                TargetEvents::Traces(events) => {
612                    events.for_each(|event| push_default(event, output))
613                }
614            },
615            Err(reason) => {
616                let (reason, error, drop) = match reason {
617                    Terminate::Abort(error) => {
618                        if !self.reroute_dropped {
619                            emit!(RemapMappingAbort {
620                                event_dropped: self.drop_on_abort,
621                            });
622                        }
623                        ("abort", error, self.drop_on_abort)
624                    }
625                    Terminate::Error(error) => {
626                        if !self.reroute_dropped {
627                            emit!(RemapMappingError {
628                                error: error.to_string(),
629                                event_dropped: self.drop_on_error,
630                            });
631                        }
632                        ("error", error, self.drop_on_error)
633                    }
634                };
635
636                if !drop {
637                    let event = original_event.expect("event will be set");
638
639                    push_default(event, output);
640                } else if self.reroute_dropped {
641                    let mut event = original_event.expect("event will be set");
642
643                    self.annotate_dropped(&mut event, reason, error);
644                    push_dropped(event, output);
645                }
646            }
647        }
648    }
649}
650
651#[inline]
652fn push_default(event: Event, output: &mut TransformOutputsBuf) {
653    output.push(None, event)
654}
655
656#[inline]
657fn push_dropped(event: Event, output: &mut TransformOutputsBuf) {
658    output.push(Some(DROPPED), event);
659}
660
661#[derive(Debug, Snafu)]
662pub enum BuildError {
663    #[snafu(display("must provide exactly one of `source` or `file` or `files` configuration"))]
664    SourceAndOrFileOrFiles,
665
666    #[snafu(display("Could not open vrl program {:?}: {}", path, source))]
667    FileOpenFailed { path: PathBuf, source: io::Error },
668    #[snafu(display("Could not read vrl program {:?}: {}", path, source))]
669    FileReadFailed { path: PathBuf, source: io::Error },
670}
671
672#[cfg(test)]
673mod tests {
674    use std::{
675        collections::{HashMap, HashSet},
676        sync::Arc,
677    };
678
679    use chrono::DateTime;
680    use indoc::{formatdoc, indoc};
681    use tokio::sync::mpsc;
682    use tokio_stream::wrappers::ReceiverStream;
683    use vector_lib::{config::GlobalOptions, event::EventMetadata, metric_tags};
684    use vrl::{btreemap, event_path, value::kind::Collection};
685
686    use super::*;
687    use crate::{
688        config::{ConfigBuilder, build_unit_tests},
689        event::{
690            LogEvent, Metric, Value,
691            metric::{MetricKind, MetricValue},
692        },
693        metrics::Controller,
694        schema,
695        test_util::components::{
696            COMPONENT_MULTIPLE_OUTPUTS_TESTS, assert_transform_compliance, init_test,
697        },
698        transforms::{OutputBuffer, test::create_topology},
699    };
700
701    fn test_default_schema_definition() -> schema::Definition {
702        schema::Definition::empty_legacy_namespace().with_event_field(
703            &owned_value_path!("a default field"),
704            Kind::integer().or_bytes(),
705            Some("default"),
706        )
707    }
708
709    fn test_dropped_schema_definition() -> schema::Definition {
710        schema::Definition::empty_legacy_namespace().with_event_field(
711            &owned_value_path!("a dropped field"),
712            Kind::boolean().or_null(),
713            Some("dropped"),
714        )
715    }
716
717    fn remap(config: RemapConfig) -> Result<Remap<AstRunner>> {
718        let schema_definitions = HashMap::from([
719            (
720                None,
721                [("source".into(), test_default_schema_definition())].into(),
722            ),
723            (
724                Some(DROPPED.to_owned()),
725                [("source".into(), test_dropped_schema_definition())].into(),
726            ),
727        ]);
728
729        Remap::new_ast(config, &TransformContext::new_test(schema_definitions))
730            .map(|(remap, _)| remap)
731    }
732
733    #[test]
734    fn generate_config() {
735        crate::test_util::test_generate_config::<RemapConfig>();
736    }
737
738    #[test]
739    fn config_missing_source_and_file() {
740        let config = RemapConfig {
741            source: None,
742            file: None,
743            ..Default::default()
744        };
745
746        let err = remap(config).unwrap_err().to_string();
747        assert_eq!(
748            &err,
749            "must provide exactly one of `source` or `file` or `files` configuration"
750        )
751    }
752
753    #[test]
754    fn config_both_source_and_file() {
755        let config = RemapConfig {
756            source: Some("".to_owned()),
757            file: Some("".into()),
758            ..Default::default()
759        };
760
761        let err = remap(config).unwrap_err().to_string();
762        assert_eq!(
763            &err,
764            "must provide exactly one of `source` or `file` or `files` configuration"
765        )
766    }
767
768    fn get_field_string(event: &Event, field: &str) -> String {
769        event
770            .as_log()
771            .get(field)
772            .unwrap()
773            .to_string_lossy()
774            .into_owned()
775    }
776
777    #[test]
778    fn check_remap_doesnt_share_state_between_events() {
779        let conf = RemapConfig {
780            source: Some(".foo = .sentinel".to_string()),
781            file: None,
782            drop_on_error: true,
783            drop_on_abort: false,
784            ..Default::default()
785        };
786        let mut tform = remap(conf).unwrap();
787        assert!(tform.runner().runtime.is_empty());
788
789        let event1 = {
790            let mut event1 = LogEvent::from("event1");
791            event1.insert("sentinel", "bar");
792            Event::from(event1)
793        };
794        let result1 = transform_one(&mut tform, event1).unwrap();
795        assert_eq!(get_field_string(&result1, "message"), "event1");
796        assert_eq!(get_field_string(&result1, "foo"), "bar");
797        assert!(tform.runner().runtime.is_empty());
798
799        let event2 = {
800            let event2 = LogEvent::from("event2");
801            Event::from(event2)
802        };
803        let result2 = transform_one(&mut tform, event2).unwrap();
804        assert_eq!(get_field_string(&result2, "message"), "event2");
805        assert_eq!(result2.as_log().get("foo"), Some(&Value::Null));
806        assert!(tform.runner().runtime.is_empty());
807    }
808
809    #[test]
810    fn remap_return_raw_string_vector_namespace() {
811        let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into());
812
813        let event = {
814            let mut metadata = EventMetadata::default()
815                .with_schema_definition(&Arc::new(initial_definition.clone()));
816            // the Vector metadata field is required for an event to correctly detect the namespace at runtime
817            metadata
818                .value_mut()
819                .insert(&owned_value_path!("vector"), BTreeMap::new());
820
821            let mut event = LogEvent::new_with_metadata(metadata);
822            event.insert("copy_from", "buz");
823            Event::from(event)
824        };
825
826        let conf = RemapConfig {
827            source: Some(r#"  . = "root string";"#.to_string()),
828            file: None,
829            drop_on_error: true,
830            drop_on_abort: false,
831            ..Default::default()
832        };
833        let mut tform = remap(conf.clone()).unwrap();
834        let result = transform_one(&mut tform, event).unwrap();
835        assert_eq!(get_field_string(&result, "."), "root string");
836
837        let mut outputs = conf.outputs(
838            &Default::default(),
839            &[(OutputId::dummy(), initial_definition)],
840        );
841
842        assert_eq!(outputs.len(), 1);
843        let output = outputs.pop().unwrap();
844        assert_eq!(output.port, None);
845        let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
846        let expected_schema =
847            Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
848        assert_eq!(actual_schema_def, expected_schema);
849    }
850
851    #[test]
852    fn check_remap_adds() {
853        let event = {
854            let mut event = LogEvent::from("augment me");
855            event.insert("copy_from", "buz");
856            Event::from(event)
857        };
858
859        let conf = RemapConfig {
860            source: Some(
861                r#"  .foo = "bar"
862  .bar = "baz"
863  .copy = .copy_from
864"#
865                .to_string(),
866            ),
867            file: None,
868            drop_on_error: true,
869            drop_on_abort: false,
870            ..Default::default()
871        };
872        let mut tform = remap(conf).unwrap();
873        let result = transform_one(&mut tform, event).unwrap();
874        assert_eq!(get_field_string(&result, "message"), "augment me");
875        assert_eq!(get_field_string(&result, "copy_from"), "buz");
876        assert_eq!(get_field_string(&result, "foo"), "bar");
877        assert_eq!(get_field_string(&result, "bar"), "baz");
878        assert_eq!(get_field_string(&result, "copy"), "buz");
879    }
880
881    #[test]
882    fn check_remap_emits_multiple() {
883        let event = {
884            let mut event = LogEvent::from("augment me");
885            event.insert(
886                "events",
887                vec![btreemap!("message" => "foo"), btreemap!("message" => "bar")],
888            );
889            Event::from(event)
890        };
891
892        let conf = RemapConfig {
893            source: Some(
894                indoc! {r"
895                . = .events
896            "}
897                .to_owned(),
898            ),
899            file: None,
900            drop_on_error: true,
901            drop_on_abort: false,
902            ..Default::default()
903        };
904        let mut tform = remap(conf).unwrap();
905
906        let out = collect_outputs(&mut tform, event);
907        assert_eq!(2, out.primary.len());
908        let mut result = out.primary.into_events();
909
910        let r = result.next().unwrap();
911        assert_eq!(get_field_string(&r, "message"), "foo");
912        let r = result.next().unwrap();
913        assert_eq!(get_field_string(&r, "message"), "bar");
914    }
915
916    #[test]
917    fn check_remap_error() {
918        let event = {
919            let mut event = Event::Log(LogEvent::from("augment me"));
920            event.as_mut_log().insert("bar", "is a string");
921            event
922        };
923
924        let conf = RemapConfig {
925            source: Some(formatdoc! {r#"
926                .foo = "foo"
927                .not_an_int = int!(.bar)
928                .baz = 12
929            "#}),
930            file: None,
931            drop_on_error: false,
932            drop_on_abort: false,
933            ..Default::default()
934        };
935        let mut tform = remap(conf).unwrap();
936
937        let event = transform_one(&mut tform, event).unwrap();
938
939        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
940        assert!(event.as_log().get("foo").is_none());
941        assert!(event.as_log().get("baz").is_none());
942    }
943
944    #[test]
945    fn check_remap_error_drop() {
946        let event = {
947            let mut event = Event::Log(LogEvent::from("augment me"));
948            event.as_mut_log().insert("bar", "is a string");
949            event
950        };
951
952        let conf = RemapConfig {
953            source: Some(formatdoc! {r#"
954                .foo = "foo"
955                .not_an_int = int!(.bar)
956                .baz = 12
957            "#}),
958            file: None,
959            drop_on_error: true,
960            drop_on_abort: false,
961            ..Default::default()
962        };
963        let mut tform = remap(conf).unwrap();
964
965        assert!(transform_one(&mut tform, event).is_none())
966    }
967
968    #[test]
969    fn check_remap_error_infallible() {
970        let event = {
971            let mut event = Event::Log(LogEvent::from("augment me"));
972            event.as_mut_log().insert("bar", "is a string");
973            event
974        };
975
976        let conf = RemapConfig {
977            source: Some(formatdoc! {r#"
978                .foo = "foo"
979                .baz = 12
980            "#}),
981            file: None,
982            drop_on_error: false,
983            drop_on_abort: false,
984            ..Default::default()
985        };
986        let mut tform = remap(conf).unwrap();
987
988        let event = transform_one(&mut tform, event).unwrap();
989
990        assert_eq!(event.as_log().get("foo"), Some(&Value::from("foo")));
991        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
992        assert_eq!(event.as_log().get("baz"), Some(&Value::from(12)));
993    }
994
995    #[test]
996    fn check_remap_abort() {
997        let event = {
998            let mut event = Event::Log(LogEvent::from("augment me"));
999            event.as_mut_log().insert("bar", "is a string");
1000            event
1001        };
1002
1003        let conf = RemapConfig {
1004            source: Some(formatdoc! {r#"
1005                .foo = "foo"
1006                abort
1007                .baz = 12
1008            "#}),
1009            file: None,
1010            drop_on_error: false,
1011            drop_on_abort: false,
1012            ..Default::default()
1013        };
1014        let mut tform = remap(conf).unwrap();
1015
1016        let event = transform_one(&mut tform, event).unwrap();
1017
1018        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
1019        assert!(event.as_log().get("foo").is_none());
1020        assert!(event.as_log().get("baz").is_none());
1021    }
1022
1023    #[test]
1024    fn check_remap_abort_drop() {
1025        let event = {
1026            let mut event = Event::Log(LogEvent::from("augment me"));
1027            event.as_mut_log().insert("bar", "is a string");
1028            event
1029        };
1030
1031        let conf = RemapConfig {
1032            source: Some(formatdoc! {r#"
1033                .foo = "foo"
1034                abort
1035                .baz = 12
1036            "#}),
1037            file: None,
1038            drop_on_error: false,
1039            drop_on_abort: true,
1040            ..Default::default()
1041        };
1042        let mut tform = remap(conf).unwrap();
1043
1044        assert!(transform_one(&mut tform, event).is_none())
1045    }
1046
1047    #[test]
1048    fn check_remap_metric() {
1049        let metric = Event::Metric(Metric::new(
1050            "counter",
1051            MetricKind::Absolute,
1052            MetricValue::Counter { value: 1.0 },
1053        ));
1054        let metadata = metric.metadata().clone();
1055
1056        let conf = RemapConfig {
1057            source: Some(
1058                r#".tags.host = "zoobub"
1059                       .name = "zork"
1060                       .namespace = "zerk"
1061                       .kind = "incremental""#
1062                    .to_string(),
1063            ),
1064            file: None,
1065            drop_on_error: true,
1066            drop_on_abort: false,
1067            ..Default::default()
1068        };
1069        let mut tform = remap(conf).unwrap();
1070
1071        let result = transform_one(&mut tform, metric).unwrap();
1072        assert_eq!(
1073            result,
1074            Event::Metric(
1075                Metric::new_with_metadata(
1076                    "zork",
1077                    MetricKind::Incremental,
1078                    MetricValue::Counter { value: 1.0 },
1079                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1080                    // to the actual value to skip the assertion here
1081                    metadata
1082                )
1083                .with_namespace(Some("zerk"))
1084                .with_tags(Some(metric_tags! {
1085                    "host" => "zoobub",
1086                }))
1087            )
1088        );
1089    }
1090
1091    #[test]
1092    fn remap_timezone_fallback() {
1093        let error = Event::from_json_value(
1094            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1095            LogNamespace::Legacy,
1096        )
1097        .unwrap();
1098        let conf = RemapConfig {
1099            source: Some(formatdoc! {r#"
1100                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1101            "#}),
1102            drop_on_error: true,
1103            drop_on_abort: true,
1104            reroute_dropped: true,
1105            ..Default::default()
1106        };
1107        let context = TransformContext {
1108            key: Some(ComponentKey::from("remapper")),
1109            globals: GlobalOptions {
1110                timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1111                ..Default::default()
1112            },
1113            ..Default::default()
1114        };
1115        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1116
1117        let output = transform_one_fallible(&mut tform, error).unwrap();
1118        let log = output.as_log();
1119        assert_eq!(
1120            log["timestamp"],
1121            DateTime::<chrono::Utc>::from(
1122                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1123            )
1124            .into()
1125        );
1126    }
1127
1128    #[test]
1129    fn remap_timezone_override() {
1130        let error = Event::from_json_value(
1131            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1132            LogNamespace::Legacy,
1133        )
1134        .unwrap();
1135        let conf = RemapConfig {
1136            source: Some(formatdoc! {r#"
1137                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1138            "#}),
1139            drop_on_error: true,
1140            drop_on_abort: true,
1141            reroute_dropped: true,
1142            timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1143            ..Default::default()
1144        };
1145        let context = TransformContext {
1146            key: Some(ComponentKey::from("remapper")),
1147            globals: GlobalOptions {
1148                timezone: Some(TimeZone::parse("Etc/UTC").unwrap()),
1149                ..Default::default()
1150            },
1151            ..Default::default()
1152        };
1153        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1154
1155        let output = transform_one_fallible(&mut tform, error).unwrap();
1156        let log = output.as_log();
1157        assert_eq!(
1158            log["timestamp"],
1159            DateTime::<chrono::Utc>::from(
1160                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1161            )
1162            .into()
1163        );
1164    }
1165
1166    #[test]
1167    fn check_remap_branching() {
1168        let happy =
1169            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1170                .unwrap();
1171        let abort = Event::from_json_value(
1172            serde_json::json!({"hello": "goodbye"}),
1173            LogNamespace::Legacy,
1174        )
1175        .unwrap();
1176        let error =
1177            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1178
1179        let happy_metric = {
1180            let mut metric = Metric::new(
1181                "counter",
1182                MetricKind::Absolute,
1183                MetricValue::Counter { value: 1.0 },
1184            );
1185            metric.replace_tag("hello".into(), "world".into());
1186            Event::Metric(metric)
1187        };
1188
1189        let abort_metric = {
1190            let mut metric = Metric::new(
1191                "counter",
1192                MetricKind::Absolute,
1193                MetricValue::Counter { value: 1.0 },
1194            );
1195            metric.replace_tag("hello".into(), "goodbye".into());
1196            Event::Metric(metric)
1197        };
1198
1199        let error_metric = {
1200            let mut metric = Metric::new(
1201                "counter",
1202                MetricKind::Absolute,
1203                MetricValue::Counter { value: 1.0 },
1204            );
1205            metric.replace_tag("not_hello".into(), "oops".into());
1206            Event::Metric(metric)
1207        };
1208
1209        let conf = RemapConfig {
1210            source: Some(formatdoc! {r#"
1211                if exists(.tags) {{
1212                    # metrics
1213                    .tags.foo = "bar"
1214                    if string!(.tags.hello) == "goodbye" {{
1215                      abort
1216                    }}
1217                }} else {{
1218                    # logs
1219                    .foo = "bar"
1220                    if string(.hello) == "goodbye" {{
1221                      abort
1222                    }}
1223                }}
1224            "#}),
1225            drop_on_error: true,
1226            drop_on_abort: true,
1227            reroute_dropped: true,
1228            ..Default::default()
1229        };
1230        let schema_definitions = HashMap::from([
1231            (
1232                None,
1233                [("source".into(), test_default_schema_definition())].into(),
1234            ),
1235            (
1236                Some(DROPPED.to_owned()),
1237                [("source".into(), test_dropped_schema_definition())].into(),
1238            ),
1239        ]);
1240        let context = TransformContext {
1241            key: Some(ComponentKey::from("remapper")),
1242            schema_definitions,
1243            merged_schema_definition: schema::Definition::new_with_default_metadata(
1244                Kind::any_object(),
1245                [LogNamespace::Legacy],
1246            )
1247            .with_event_field(&owned_value_path!("hello"), Kind::bytes(), None),
1248            ..Default::default()
1249        };
1250        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1251
1252        let output = transform_one_fallible(&mut tform, happy).unwrap();
1253        let log = output.as_log();
1254        assert_eq!(log["hello"], "world".into());
1255        assert_eq!(log["foo"], "bar".into());
1256        assert!(!log.contains(event_path!("metadata")));
1257
1258        let output = transform_one_fallible(&mut tform, abort).unwrap_err();
1259        let log = output.as_log();
1260        assert_eq!(log["hello"], "goodbye".into());
1261        assert!(!log.contains(event_path!("foo")));
1262        assert_eq!(
1263            log["metadata"],
1264            serde_json::json!({
1265                "dropped": {
1266                    "reason": "abort",
1267                    "message": "aborted",
1268                    "component_id": "remapper",
1269                    "component_type": "remap",
1270                    "component_kind": "transform",
1271                }
1272            })
1273            .try_into()
1274            .unwrap()
1275        );
1276
1277        let output = transform_one_fallible(&mut tform, error).unwrap_err();
1278        let log = output.as_log();
1279        assert_eq!(log["hello"], 42.into());
1280        assert!(!log.contains(event_path!("foo")));
1281        assert_eq!(
1282            log["metadata"],
1283            serde_json::json!({
1284                "dropped": {
1285                    "reason": "error",
1286                    "message": "function call error for \"string\" at (160:174): expected string, got integer",
1287                    "component_id": "remapper",
1288                    "component_type": "remap",
1289                    "component_kind": "transform",
1290                }
1291            })
1292            .try_into()
1293            .unwrap()
1294        );
1295
1296        let output = transform_one_fallible(&mut tform, happy_metric).unwrap();
1297        similar_asserts::assert_eq!(
1298            output,
1299            Event::Metric(
1300                Metric::new_with_metadata(
1301                    "counter",
1302                    MetricKind::Absolute,
1303                    MetricValue::Counter { value: 1.0 },
1304                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1305                    // to the actual value to skip the assertion here
1306                    EventMetadata::default()
1307                        .with_schema_definition(output.metadata().schema_definition()),
1308                )
1309                .with_tags(Some(metric_tags! {
1310                    "hello" => "world",
1311                    "foo" => "bar",
1312                }))
1313            )
1314        );
1315
1316        let output = transform_one_fallible(&mut tform, abort_metric).unwrap_err();
1317        similar_asserts::assert_eq!(
1318            output,
1319            Event::Metric(
1320                Metric::new_with_metadata(
1321                    "counter",
1322                    MetricKind::Absolute,
1323                    MetricValue::Counter { value: 1.0 },
1324                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1325                    // to the actual value to skip the assertion here
1326                    EventMetadata::default()
1327                        .with_schema_definition(output.metadata().schema_definition()),
1328                )
1329                .with_tags(Some(metric_tags! {
1330                    "hello" => "goodbye",
1331                    "metadata.dropped.reason" => "abort",
1332                    "metadata.dropped.component_id" => "remapper",
1333                    "metadata.dropped.component_type" => "remap",
1334                    "metadata.dropped.component_kind" => "transform",
1335                }))
1336            )
1337        );
1338
1339        let output = transform_one_fallible(&mut tform, error_metric).unwrap_err();
1340        similar_asserts::assert_eq!(
1341            output,
1342            Event::Metric(
1343                Metric::new_with_metadata(
1344                    "counter",
1345                    MetricKind::Absolute,
1346                    MetricValue::Counter { value: 1.0 },
1347                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1348                    // to the actual value to skip the assertion here
1349                    EventMetadata::default()
1350                        .with_schema_definition(output.metadata().schema_definition()),
1351                )
1352                .with_tags(Some(metric_tags! {
1353                    "not_hello" => "oops",
1354                    "metadata.dropped.reason" => "error",
1355                    "metadata.dropped.component_id" => "remapper",
1356                    "metadata.dropped.component_type" => "remap",
1357                    "metadata.dropped.component_kind" => "transform",
1358                }))
1359            )
1360        );
1361    }
1362
1363    #[test]
1364    fn check_remap_branching_assert_with_message() {
1365        let error_trigger_assert_custom_message =
1366            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1367        let error_trigger_default_assert_message =
1368            Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
1369        let conf = RemapConfig {
1370            source: Some(formatdoc! {r#"
1371                assert_eq!(.hello, 0, "custom message here")
1372                assert_eq!(.hello, 1)
1373            "#}),
1374            drop_on_error: true,
1375            drop_on_abort: true,
1376            reroute_dropped: true,
1377            ..Default::default()
1378        };
1379        let context = TransformContext {
1380            key: Some(ComponentKey::from("remapper")),
1381            ..Default::default()
1382        };
1383        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1384
1385        let output =
1386            transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err();
1387        let log = output.as_log();
1388        assert_eq!(log["hello"], 42.into());
1389        assert!(!log.contains(event_path!("foo")));
1390        assert_eq!(
1391            log["metadata"],
1392            serde_json::json!({
1393                "dropped": {
1394                    "reason": "error",
1395                    "message": "custom message here",
1396                    "component_id": "remapper",
1397                    "component_type": "remap",
1398                    "component_kind": "transform",
1399                }
1400            })
1401            .try_into()
1402            .unwrap()
1403        );
1404
1405        let output =
1406            transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err();
1407        let log = output.as_log();
1408        assert_eq!(log["hello"], 0.into());
1409        assert!(!log.contains(event_path!("foo")));
1410        assert_eq!(
1411            log["metadata"],
1412            serde_json::json!({
1413                "dropped": {
1414                    "reason": "error",
1415                    "message": "function call error for \"assert_eq\" at (45:66): assertion failed: 0 == 1",
1416                    "component_id": "remapper",
1417                    "component_type": "remap",
1418                    "component_kind": "transform",
1419                }
1420            })
1421            .try_into()
1422            .unwrap()
1423        );
1424    }
1425
1426    #[test]
1427    fn check_remap_branching_abort_with_message() {
1428        let error =
1429            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1430        let conf = RemapConfig {
1431            source: Some(formatdoc! {r#"
1432                abort "custom message here"
1433            "#}),
1434            drop_on_error: true,
1435            drop_on_abort: true,
1436            reroute_dropped: true,
1437            ..Default::default()
1438        };
1439        let context = TransformContext {
1440            key: Some(ComponentKey::from("remapper")),
1441            ..Default::default()
1442        };
1443        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1444
1445        let output = transform_one_fallible(&mut tform, error).unwrap_err();
1446        let log = output.as_log();
1447        assert_eq!(log["hello"], 42.into());
1448        assert!(!log.contains(event_path!("foo")));
1449        assert_eq!(
1450            log["metadata"],
1451            serde_json::json!({
1452                "dropped": {
1453                    "reason": "abort",
1454                    "message": "custom message here",
1455                    "component_id": "remapper",
1456                    "component_type": "remap",
1457                    "component_kind": "transform",
1458                }
1459            })
1460            .try_into()
1461            .unwrap()
1462        );
1463    }
1464
1465    #[test]
1466    fn check_remap_branching_disabled() {
1467        let happy =
1468            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1469                .unwrap();
1470        let abort = Event::from_json_value(
1471            serde_json::json!({"hello": "goodbye"}),
1472            LogNamespace::Legacy,
1473        )
1474        .unwrap();
1475        let error =
1476            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1477
1478        let conf = RemapConfig {
1479            source: Some(formatdoc! {r#"
1480                if exists(.tags) {{
1481                    # metrics
1482                    .tags.foo = "bar"
1483                    if string!(.tags.hello) == "goodbye" {{
1484                      abort
1485                    }}
1486                }} else {{
1487                    # logs
1488                    .foo = "bar"
1489                    if string!(.hello) == "goodbye" {{
1490                      abort
1491                    }}
1492                }}
1493            "#}),
1494            drop_on_error: true,
1495            drop_on_abort: true,
1496            reroute_dropped: false,
1497            ..Default::default()
1498        };
1499
1500        let schema_definition = schema::Definition::new_with_default_metadata(
1501            Kind::any_object(),
1502            [LogNamespace::Legacy],
1503        )
1504        .with_event_field(&owned_value_path!("foo"), Kind::any(), None)
1505        .with_event_field(&owned_value_path!("tags"), Kind::any(), None);
1506
1507        assert_eq!(
1508            conf.outputs(
1509                &Default::default(),
1510                &[(
1511                    "test".into(),
1512                    schema::Definition::new_with_default_metadata(
1513                        Kind::any_object(),
1514                        [LogNamespace::Legacy]
1515                    )
1516                )],
1517            ),
1518            vec![TransformOutput::new(
1519                DataType::all_bits(),
1520                [("test".into(), schema_definition)].into()
1521            )]
1522        );
1523
1524        let context = TransformContext {
1525            key: Some(ComponentKey::from("remapper")),
1526            ..Default::default()
1527        };
1528        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1529
1530        let output = transform_one_fallible(&mut tform, happy).unwrap();
1531        let log = output.as_log();
1532        assert_eq!(log["hello"], "world".into());
1533        assert_eq!(log["foo"], "bar".into());
1534        assert!(!log.contains(event_path!("metadata")));
1535
1536        let out = collect_outputs(&mut tform, abort);
1537        assert!(out.primary.is_empty());
1538        assert!(out.named[DROPPED].is_empty());
1539
1540        let out = collect_outputs(&mut tform, error);
1541        assert!(out.primary.is_empty());
1542        assert!(out.named[DROPPED].is_empty());
1543    }
1544
1545    #[tokio::test]
1546    async fn check_remap_branching_metrics_with_output() {
1547        init_test();
1548
1549        let config: ConfigBuilder = serde_yaml::from_str(indoc! {"
1550            transforms:
1551              foo:
1552                inputs: []
1553                type: remap
1554                drop_on_abort: true
1555                reroute_dropped: true
1556                source: abort
1557            tests:
1558              - name: metric output
1559                input:
1560                  insert_at: foo
1561                  value: none
1562                outputs:
1563                  - extract_from: foo.dropped
1564                    conditions:
1565                      - type: vrl
1566                        source: \"true\"
1567        "})
1568        .unwrap();
1569
1570        let mut tests = build_unit_tests(config).await.unwrap();
1571        assert!(tests.remove(0).run().await.errors.is_empty());
1572        // Check that metrics were emitted with output tag
1573        COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
1574    }
1575
1576    struct CollectedOutput {
1577        primary: OutputBuffer,
1578        named: HashMap<String, OutputBuffer>,
1579    }
1580
1581    fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOutput {
1582        let mut outputs = TransformOutputsBuf::new_with_capacity(
1583            vec![
1584                TransformOutput::new(DataType::all_bits(), HashMap::new()),
1585                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1586            ],
1587            1,
1588        );
1589
1590        ft.transform(event, &mut outputs);
1591
1592        CollectedOutput {
1593            primary: outputs.take_primary(),
1594            named: outputs.take_all_named(),
1595        }
1596    }
1597
1598    fn transform_one(ft: &mut dyn SyncTransform, event: Event) -> Option<Event> {
1599        let out = collect_outputs(ft, event);
1600        assert_eq!(0, out.named.values().map(|v| v.len()).sum::<usize>());
1601        assert!(out.primary.len() <= 1);
1602        out.primary.into_events().next()
1603    }
1604
1605    fn transform_one_fallible(
1606        ft: &mut dyn SyncTransform,
1607        event: Event,
1608    ) -> std::result::Result<Event, Event> {
1609        let mut outputs = TransformOutputsBuf::new_with_capacity(
1610            vec![
1611                TransformOutput::new(DataType::all_bits(), HashMap::new()),
1612                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1613            ],
1614            1,
1615        );
1616
1617        ft.transform(event, &mut outputs);
1618
1619        let mut buf = outputs.drain().collect::<Vec<_>>();
1620        let mut err_buf = outputs.drain_named(DROPPED).collect::<Vec<_>>();
1621
1622        assert!(buf.len() < 2);
1623        assert!(err_buf.len() < 2);
1624        match (buf.pop(), err_buf.pop()) {
1625            (Some(good), None) => Ok(good),
1626            (None, Some(bad)) => Err(bad),
1627            (a, b) => panic!("expected output xor error output, got {a:?} and {b:?}"),
1628        }
1629    }
1630
1631    #[tokio::test]
1632    async fn emits_internal_events() {
1633        assert_transform_compliance(async move {
1634            let config = RemapConfig {
1635                source: Some("abort".to_owned()),
1636                drop_on_abort: true,
1637                ..Default::default()
1638            };
1639
1640            let (tx, rx) = mpsc::channel(1);
1641            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1642
1643            let log = LogEvent::from("hello world");
1644            tx.send(log.into()).await.unwrap();
1645
1646            drop(tx);
1647            topology.stop().await;
1648            assert_eq!(out.recv().await, None);
1649        })
1650        .await
1651    }
1652
1653    #[test]
1654    fn test_combined_transforms_simple() {
1655        // Make sure that when getting the definitions from one transform and
1656        // passing them to another the correct definition is still produced.
1657
1658        // Transform 1 sets a simple value.
1659        let transform1 = RemapConfig {
1660            source: Some(r#".thing = "potato""#.to_string()),
1661            ..Default::default()
1662        };
1663
1664        let transform2 = RemapConfig {
1665            source: Some(".thang = .thing".to_string()),
1666            ..Default::default()
1667        };
1668
1669        let outputs1 = transform1.outputs(
1670            &Default::default(),
1671            &[("in".into(), schema::Definition::default_legacy_namespace())],
1672        );
1673
1674        assert_eq!(
1675            vec![TransformOutput::new(
1676                DataType::all_bits(),
1677                // The `never` definition should have been passed on to the end.
1678                [(
1679                    "in".into(),
1680                    Definition::default_legacy_namespace().with_event_field(
1681                        &owned_value_path!("thing"),
1682                        Kind::bytes(),
1683                        None
1684                    ),
1685                )]
1686                .into()
1687            )],
1688            outputs1
1689        );
1690
1691        let outputs2 = transform2.outputs(
1692            &Default::default(),
1693            &[(
1694                "in1".into(),
1695                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1696            )],
1697        );
1698
1699        assert_eq!(
1700            vec![TransformOutput::new(
1701                DataType::all_bits(),
1702                [(
1703                    "in1".into(),
1704                    Definition::default_legacy_namespace()
1705                        .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None)
1706                        .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None),
1707                )]
1708                .into(),
1709            )],
1710            outputs2
1711        );
1712    }
1713
1714    #[test]
1715    fn test_combined_transforms_unnest() {
1716        // Make sure that when getting the definitions from one transform and
1717        // passing them to another the correct definition is still produced.
1718
1719        // Transform 1 sets a simple value.
1720        let transform1 = RemapConfig {
1721            source: Some(
1722                indoc! {
1723                r#"
1724                .thing = [{"cabbage": 32}, {"parsnips": 45}]
1725                . = unnest(.thing)
1726                "#
1727                }
1728                .to_string(),
1729            ),
1730            ..Default::default()
1731        };
1732
1733        let transform2 = RemapConfig {
1734            source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()),
1735            ..Default::default()
1736        };
1737
1738        let outputs1 = transform1.outputs(
1739            &Default::default(),
1740            &[(
1741                "in".into(),
1742                schema::Definition::new_with_default_metadata(
1743                    Kind::any_object(),
1744                    [LogNamespace::Legacy],
1745                ),
1746            )],
1747        );
1748
1749        assert_eq!(
1750            vec![TransformOutput::new(
1751                DataType::all_bits(),
1752                [(
1753                    "in".into(),
1754                    Definition::new_with_default_metadata(
1755                        Kind::any_object(),
1756                        [LogNamespace::Legacy]
1757                    )
1758                    .with_event_field(
1759                        &owned_value_path!("thing"),
1760                        Kind::object(Collection::from(BTreeMap::from([
1761                            ("cabbage".into(), Kind::integer().or_undefined(),),
1762                            ("parsnips".into(), Kind::integer().or_undefined(),)
1763                        ]))),
1764                        None
1765                    ),
1766                )]
1767                .into(),
1768            )],
1769            outputs1
1770        );
1771
1772        let outputs2 = transform2.outputs(
1773            &Default::default(),
1774            &[(
1775                "in1".into(),
1776                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1777            )],
1778        );
1779
1780        assert_eq!(
1781            vec![TransformOutput::new(
1782                DataType::all_bits(),
1783                [(
1784                    "in1".into(),
1785                    Definition::default_legacy_namespace()
1786                        .with_event_field(
1787                            &owned_value_path!("thing"),
1788                            Kind::object(Collection::from(BTreeMap::from([
1789                                ("cabbage".into(), Kind::integer().or_undefined(),),
1790                                ("parsnips".into(), Kind::integer().or_undefined(),)
1791                            ]))),
1792                            None
1793                        )
1794                        .with_event_field(
1795                            &owned_value_path!("thang"),
1796                            Kind::integer().or_null(),
1797                            None
1798                        ),
1799                )]
1800                .into(),
1801            )],
1802            outputs2
1803        );
1804    }
1805
1806    #[test]
1807    fn test_transform_abort() {
1808        // An abort should not change the typedef.
1809
1810        let transform1 = RemapConfig {
1811            source: Some(r"abort".to_string()),
1812            ..Default::default()
1813        };
1814
1815        let outputs1 = transform1.outputs(
1816            &Default::default(),
1817            &[(
1818                "in".into(),
1819                schema::Definition::new_with_default_metadata(
1820                    Kind::any_object(),
1821                    [LogNamespace::Legacy],
1822                ),
1823            )],
1824        );
1825
1826        assert_eq!(
1827            vec![TransformOutput::new(
1828                DataType::all_bits(),
1829                [(
1830                    "in".into(),
1831                    Definition::new_with_default_metadata(
1832                        Kind::any_object(),
1833                        [LogNamespace::Legacy]
1834                    ),
1835                )]
1836                .into(),
1837            )],
1838            outputs1
1839        );
1840    }
1841
1842    #[test]
1843    fn test_error_outputs() {
1844        // Even if we fail to compile the VRL it should still output
1845        // the correct ports. This may change if we separate the
1846        // `outputs` function into one returning outputs and a separate
1847        // returning schema definitions.
1848        let transform1 = RemapConfig {
1849            // This enrichment table does not exist.
1850            source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()),
1851            reroute_dropped: true,
1852            ..Default::default()
1853        };
1854
1855        let outputs1 = transform1.outputs(
1856            &Default::default(),
1857            &[(
1858                "in".into(),
1859                schema::Definition::new_with_default_metadata(
1860                    Kind::any_object(),
1861                    [LogNamespace::Legacy],
1862                ),
1863            )],
1864        );
1865
1866        assert_eq!(
1867            HashSet::from([None, Some("dropped".to_string())]),
1868            outputs1
1869                .into_iter()
1870                .map(|output| output.port)
1871                .collect::<HashSet<_>>()
1872        );
1873    }
1874
1875    #[test]
1876    fn test_non_object_events() {
1877        let transform1 = RemapConfig {
1878            // This enrichment table does not exist.
1879            source: Some(r#". = "fish" "#.to_string()),
1880            ..Default::default()
1881        };
1882
1883        let outputs1 = transform1.outputs(
1884            &Default::default(),
1885            &[(
1886                "in".into(),
1887                schema::Definition::new_with_default_metadata(
1888                    Kind::any_object(),
1889                    [LogNamespace::Legacy],
1890                ),
1891            )],
1892        );
1893
1894        let wanted = schema::Definition::new_with_default_metadata(
1895            Kind::object(Collection::from_unknown(Kind::undefined())),
1896            [LogNamespace::Legacy],
1897        )
1898        .with_event_field(&owned_value_path!("message"), Kind::bytes(), None);
1899
1900        assert_eq!(
1901            HashMap::from([(OutputId::from("in"), wanted)]),
1902            outputs1[0].schema_definitions(true),
1903        );
1904    }
1905
1906    #[test]
1907    fn test_array_and_non_object_events() {
1908        let transform1 = RemapConfig {
1909            source: Some(
1910                indoc! {r#"
1911                    if .lizard == true {
1912                        .thing = [{"cabbage": 42}];
1913                        . = unnest(.thing)
1914                    } else {
1915                      . = "fish"
1916                    }
1917                    "#}
1918                .to_string(),
1919            ),
1920            ..Default::default()
1921        };
1922
1923        let outputs1 = transform1.outputs(
1924            &Default::default(),
1925            &[(
1926                "in".into(),
1927                schema::Definition::new_with_default_metadata(
1928                    Kind::any_object(),
1929                    [LogNamespace::Legacy],
1930                ),
1931            )],
1932        );
1933
1934        let wanted = schema::Definition::new_with_default_metadata(
1935            Kind::any_object(),
1936            [LogNamespace::Legacy],
1937        )
1938        .with_event_field(&owned_value_path!("message"), Kind::any(), None)
1939        .with_event_field(
1940            &owned_value_path!("thing"),
1941            Kind::object(Collection::from(BTreeMap::from([(
1942                "cabbage".into(),
1943                Kind::integer(),
1944            )])))
1945            .or_undefined(),
1946            None,
1947        );
1948
1949        assert_eq!(
1950            HashMap::from([(OutputId::from("in"), wanted)]),
1951            outputs1[0].schema_definitions(true),
1952        );
1953    }
1954
1955    #[test]
1956    fn check_remap_array_vector_namespace() {
1957        let event = {
1958            let mut event = LogEvent::from("input");
1959            // mark the event as a "Vector" namespaced log
1960            event
1961                .metadata_mut()
1962                .value_mut()
1963                .insert("vector", BTreeMap::new());
1964            Event::from(event)
1965        };
1966
1967        let conf = RemapConfig {
1968            source: Some(
1969                r". = [null]
1970"
1971                .to_string(),
1972            ),
1973            file: None,
1974            drop_on_error: true,
1975            drop_on_abort: false,
1976            ..Default::default()
1977        };
1978        let mut tform = remap(conf.clone()).unwrap();
1979        let result = transform_one(&mut tform, event).unwrap();
1980
1981        // Legacy namespace nests this under "message", Vector should set it as the root
1982        assert_eq!(result.as_log().get("."), Some(&Value::Null));
1983
1984        let outputs1 = conf.outputs(
1985            &Default::default(),
1986            &[(
1987                "in".into(),
1988                schema::Definition::new_with_default_metadata(
1989                    Kind::any_object(),
1990                    [LogNamespace::Vector],
1991                ),
1992            )],
1993        );
1994
1995        let wanted =
1996            schema::Definition::new_with_default_metadata(Kind::null(), [LogNamespace::Vector]);
1997
1998        assert_eq!(
1999            HashMap::from([(OutputId::from("in"), wanted)]),
2000            outputs1[0].schema_definitions(true),
2001        );
2002    }
2003
2004    fn assert_no_metrics(source: String) {
2005        vector_lib::metrics::init_test();
2006
2007        let config = RemapConfig {
2008            source: Some(source),
2009            drop_on_error: true,
2010            drop_on_abort: true,
2011            reroute_dropped: true,
2012            ..Default::default()
2013        };
2014        let mut ast_runner = remap(config).unwrap();
2015        let input_event =
2016            Event::from_json_value(serde_json::json!({"a": 42}), LogNamespace::Vector).unwrap();
2017        let dropped_event = transform_one_fallible(&mut ast_runner, input_event).unwrap_err();
2018        let dropped_log = dropped_event.as_log();
2019        assert_eq!(dropped_log.get(event_path!("a")), Some(&Value::from(42)));
2020
2021        let controller = Controller::get().expect("no controller");
2022        let metrics = controller
2023            .capture_metrics()
2024            .into_iter()
2025            .map(|metric| (metric.name().to_string(), metric))
2026            .collect::<BTreeMap<String, Metric>>();
2027        assert_eq!(metrics.get("component_discarded_events_total"), None);
2028        assert_eq!(metrics.get("component_errors_total"), None);
2029    }
2030    #[test]
2031    fn do_not_emit_metrics_when_dropped() {
2032        assert_no_metrics("abort".to_string());
2033    }
2034
2035    #[test]
2036    fn do_not_emit_metrics_when_errored() {
2037        assert_no_metrics("parse_key_value!(.message)".to_string());
2038    }
2039}