1#![allow(clippy::let_underscore_must_use)]
3
4use std::{
5 collections::{BTreeMap, HashMap},
6 fs::File,
7 io::{self, Read},
8 path::PathBuf,
9 sync::Mutex,
10};
11
12use snafu::{ResultExt, Snafu};
13use vector_lib::{
14 TimeZone,
15 codecs::MetricTagValues,
16 compile_vrl,
17 config::LogNamespace,
18 configurable::configurable_component,
19 enrichment::TableRegistry,
20 lookup::{PathPrefix, metadata_path, owned_value_path},
21 schema::Definition,
22};
23use vector_vrl_functions::set_semantic_meaning::MeaningList;
24use vector_vrl_metrics::MetricsStorage;
25use vrl::{
26 compiler::{
27 CompileConfig, ExpressionError, Program, TypeState, VrlRuntime,
28 runtime::{Runtime, Terminate},
29 state::ExternalEnv,
30 },
31 diagnostic::{DiagnosticMessage, Note},
32 path,
33 path::ValuePath,
34 value::{Kind, Value},
35};
36
37use crate::{
38 Result,
39 config::{
40 ComponentKey, DataType, Input, OutputId, TransformConfig, TransformContext,
41 TransformOutput, log_schema,
42 },
43 event::{Event, TargetEvents, VrlTarget},
44 format_vrl_diagnostics,
45 internal_events::{RemapMappingAbort, RemapMappingError},
46 schema,
47 transforms::{SyncTransform, Transform, TransformOutputsBuf},
48};
49
50const DROPPED: &str = "dropped";
51type CacheKey = (TableRegistry, schema::Definition);
52type CacheValue = (Program, String, MeaningList);
53
54#[configurable_component(transform(
56 "remap",
57 "Modify your observability data as it passes through your topology using Vector Remap Language (VRL)."
58))]
59#[derive(Derivative)]
60#[serde(deny_unknown_fields)]
61#[derivative(Default, Debug)]
62pub struct RemapConfig {
63 #[configurable(metadata(
69 docs::examples = ". = parse_json!(.message)\n.new_field = \"new value\"\n.status = to_int!(.status)\n.duration = parse_duration!(.duration, \"s\")\n.new_name = del(.old_name)",
70 docs::syntax_override = "remap_program"
71 ))]
72 pub source: Option<String>,
73
74 #[configurable(metadata(docs::examples = "./my/program.vrl"))]
82 pub file: Option<PathBuf>,
83
84 #[configurable(metadata(docs::examples = "['./my/program.vrl', './my/program2.vrl']"))]
92 pub files: Option<Vec<PathBuf>>,
93
94 #[serde(default)]
101 pub metric_tag_values: MetricTagValues,
102
103 #[serde(default)]
112 #[configurable(metadata(docs::advanced))]
113 pub timezone: Option<TimeZone>,
114
115 #[serde(default = "crate::serde::default_false")]
126 #[configurable(metadata(docs::human_name = "Drop Event on Error"))]
127 pub drop_on_error: bool,
128
129 #[serde(default = "crate::serde::default_true")]
140 #[configurable(metadata(docs::human_name = "Drop Event on Abort"))]
141 pub drop_on_abort: bool,
142
143 #[serde(default = "crate::serde::default_false")]
153 #[configurable(metadata(docs::human_name = "Reroute Dropped Events"))]
154 pub reroute_dropped: bool,
155
156 #[configurable(derived, metadata(docs::hidden))]
157 #[serde(default)]
158 pub runtime: VrlRuntime,
159
160 #[configurable(derived, metadata(docs::hidden))]
161 #[serde(skip)]
162 #[derivative(Debug = "ignore")]
163 pub cache: Mutex<Vec<(CacheKey, std::result::Result<CacheValue, String>)>>,
167}
168
169impl Clone for RemapConfig {
170 fn clone(&self) -> Self {
171 Self {
172 source: self.source.clone(),
173 file: self.file.clone(),
174 files: self.files.clone(),
175 metric_tag_values: self.metric_tag_values,
176 timezone: self.timezone,
177 drop_on_error: self.drop_on_error,
178 drop_on_abort: self.drop_on_abort,
179 reroute_dropped: self.reroute_dropped,
180 runtime: self.runtime,
181 cache: Mutex::new(Default::default()),
182 }
183 }
184}
185
186impl RemapConfig {
187 fn compile_vrl_program(
188 &self,
189 enrichment_tables: TableRegistry,
190 metrics_storage: MetricsStorage,
191 merged_schema_definition: schema::Definition,
192 ) -> Result<(Program, String, MeaningList)> {
193 if let Some((_, res)) = self
194 .cache
195 .lock()
196 .expect("Data poisoned")
197 .iter()
198 .find(|v| v.0.0 == enrichment_tables && v.0.1 == merged_schema_definition)
199 {
200 return res.clone().map_err(Into::into);
201 }
202
203 let source = match (&self.source, &self.file, &self.files) {
204 (Some(source), None, None) => source.to_owned(),
205 (None, Some(path), None) => Self::read_file(path)?,
206 (None, None, Some(paths)) => {
207 let mut combined_source = String::new();
208 for path in paths {
209 let content = Self::read_file(path)?;
210 combined_source.push_str(&content);
211 combined_source.push('\n');
212 }
213 combined_source
214 }
215 _ => return Err(Box::new(BuildError::SourceAndOrFileOrFiles)),
216 };
217
218 let state = TypeState {
219 local: Default::default(),
220 external: ExternalEnv::new_with_kind(
221 merged_schema_definition.event_kind().clone(),
222 merged_schema_definition.metadata_kind().clone(),
223 ),
224 };
225 let mut config = CompileConfig::default();
226
227 config.set_custom(enrichment_tables.clone());
228 config.set_custom(metrics_storage);
229 config.set_custom(MeaningList::default());
230
231 let res = compile_vrl(&source, &vector_vrl_functions::all(), &state, config)
232 .map_err(|diagnostics| format_vrl_diagnostics(&source, diagnostics))
233 .map(|result| {
234 (
235 result.program,
236 format_vrl_diagnostics(&source, result.warnings),
237 result.config.get_custom::<MeaningList>().unwrap().clone(),
238 )
239 });
240
241 self.cache
242 .lock()
243 .expect("Data poisoned")
244 .push(((enrichment_tables, merged_schema_definition), res.clone()));
245
246 res.map_err(Into::into)
247 }
248
249 fn read_file(path: &PathBuf) -> Result<String> {
250 let mut buffer = String::new();
251 File::open(path)
252 .with_context(|_| FileOpenFailedSnafu { path })?
253 .read_to_string(&mut buffer)
254 .with_context(|_| FileReadFailedSnafu { path })?;
255 Ok(buffer)
256 }
257}
258
259impl_generate_config_from_default!(RemapConfig);
260
261#[async_trait::async_trait]
262#[typetag::serde(name = "remap")]
263impl TransformConfig for RemapConfig {
264 async fn build(&self, context: &TransformContext) -> Result<Transform> {
265 let (transform, warnings) = match self.runtime {
266 VrlRuntime::Ast => {
267 let (remap, warnings) = Remap::new_ast(self.clone(), context)?;
268 (Transform::synchronous(remap), warnings)
269 }
270 };
271
272 if !warnings.is_empty() {
277 warn!(message = "VRL compilation warning.", %warnings);
278 }
279
280 Ok(transform)
281 }
282
283 fn input(&self) -> Input {
284 Input::all()
285 }
286
287 fn outputs(
288 &self,
289 context: &TransformContext,
290 input_definitions: &[(OutputId, schema::Definition)],
291 ) -> Vec<TransformOutput> {
292 let merged_definition: Definition = input_definitions
293 .iter()
294 .map(|(_output, definition)| definition.clone())
295 .reduce(Definition::merge)
296 .unwrap_or_else(Definition::any);
297
298 let compiled = self
302 .compile_vrl_program(
303 context.enrichment_tables.clone(),
304 context.metrics_storage.clone(),
305 merged_definition,
306 )
307 .map(|(program, _, meaning_list)| (program.final_type_info().state, meaning_list.0))
308 .map_err(|_| ());
309
310 let mut dropped_definitions = HashMap::new();
311 let mut default_definitions = HashMap::new();
312
313 for (output_id, input_definition) in input_definitions {
314 let default_definition = compiled
315 .clone()
316 .map(|(state, meaning)| {
317 let mut new_type_def = Definition::new(
318 state.external.target_kind().clone(),
319 state.external.metadata_kind().clone(),
320 input_definition.log_namespaces().clone(),
321 );
322
323 for (id, path) in input_definition.meanings() {
324 new_type_def.try_with_meaning(path.clone(), id).ok();
328 }
329
330 for (id, path) in meaning {
332 new_type_def = new_type_def.with_meaning(path, &id);
334 }
335 new_type_def
336 })
337 .unwrap_or_else(|_| {
338 Definition::new_with_default_metadata(
339 Kind::never(),
341 input_definition.log_namespaces().clone(),
342 )
343 });
344
345 let dropped_definition = Definition::combine_log_namespaces(
348 input_definition.log_namespaces(),
349 input_definition.clone().with_event_field(
350 log_schema().metadata_key().expect("valid metadata key"),
351 Kind::object(BTreeMap::from([
352 ("reason".into(), Kind::bytes()),
353 ("message".into(), Kind::bytes()),
354 ("component_id".into(), Kind::bytes()),
355 ("component_type".into(), Kind::bytes()),
356 ("component_kind".into(), Kind::bytes()),
357 ])),
358 Some("metadata"),
359 ),
360 input_definition
361 .clone()
362 .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None)
363 .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None)
364 .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None)
365 .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None)
366 .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None),
367 );
368
369 default_definitions.insert(
370 output_id.clone(),
371 VrlTarget::modify_schema_definition_for_into_events(default_definition),
372 );
373 dropped_definitions.insert(
374 output_id.clone(),
375 VrlTarget::modify_schema_definition_for_into_events(dropped_definition),
376 );
377 }
378
379 let default_output = TransformOutput::new(DataType::all_bits(), default_definitions);
380
381 if self.reroute_dropped {
382 vec![
383 default_output,
384 TransformOutput::new(DataType::all_bits(), dropped_definitions).with_port(DROPPED),
385 ]
386 } else {
387 vec![default_output]
388 }
389 }
390
391 fn enable_concurrency(&self) -> bool {
392 true
393 }
394
395 fn files_to_watch(&self) -> Vec<&PathBuf> {
396 self.file
397 .iter()
398 .chain(self.files.iter().flatten())
399 .collect()
400 }
401}
402
403#[derive(Debug, Clone)]
404pub struct Remap<Runner>
405where
406 Runner: VrlRunner,
407{
408 component_key: Option<ComponentKey>,
409 program: Program,
410 timezone: TimeZone,
411 drop_on_error: bool,
412 drop_on_abort: bool,
413 reroute_dropped: bool,
414 runner: Runner,
415 metric_tag_values: MetricTagValues,
416}
417
418pub trait VrlRunner {
419 fn run(
420 &mut self,
421 target: &mut VrlTarget,
422 program: &Program,
423 timezone: &TimeZone,
424 ) -> std::result::Result<Value, Terminate>;
425}
426
427#[derive(Debug)]
428pub struct AstRunner {
429 pub runtime: Runtime,
430}
431
432impl Clone for AstRunner {
433 fn clone(&self) -> Self {
434 Self {
435 runtime: Runtime::default(),
436 }
437 }
438}
439
440impl VrlRunner for AstRunner {
441 fn run(
442 &mut self,
443 target: &mut VrlTarget,
444 program: &Program,
445 timezone: &TimeZone,
446 ) -> std::result::Result<Value, Terminate> {
447 let result = self.runtime.resolve(target, program, timezone);
448 self.runtime.clear();
449 result
450 }
451}
452
453impl Remap<AstRunner> {
454 pub fn new_ast(
455 config: RemapConfig,
456 context: &TransformContext,
457 ) -> crate::Result<(Self, String)> {
458 let (program, warnings, _) = config.compile_vrl_program(
459 context.enrichment_tables.clone(),
460 context.metrics_storage.clone(),
461 context.merged_schema_definition.clone(),
462 )?;
463
464 let runtime = Runtime::default();
465 let runner = AstRunner { runtime };
466
467 Self::new(config, context, program, runner).map(|remap| (remap, warnings))
468 }
469}
470
471impl<Runner> Remap<Runner>
472where
473 Runner: VrlRunner,
474{
475 fn new(
476 config: RemapConfig,
477 context: &TransformContext,
478 program: Program,
479 runner: Runner,
480 ) -> crate::Result<Self> {
481 Ok(Remap {
482 component_key: context.key.clone(),
483 program,
484 timezone: config
485 .timezone
486 .unwrap_or_else(|| context.globals.timezone()),
487 drop_on_error: config.drop_on_error,
488 drop_on_abort: config.drop_on_abort,
489 reroute_dropped: config.reroute_dropped,
490 runner,
491 metric_tag_values: config.metric_tag_values,
492 })
493 }
494
495 #[cfg(test)]
496 const fn runner(&self) -> &Runner {
497 &self.runner
498 }
499
500 fn dropped_data(&self, reason: &str, error: ExpressionError) -> serde_json::Value {
501 let message = error
502 .notes()
503 .iter()
504 .rfind(|note| matches!(note, Note::UserErrorMessage(_)))
505 .map(|note| note.to_string())
506 .unwrap_or_else(|| error.to_string());
507 serde_json::json!({
508 "reason": reason,
509 "message": message,
510 "component_id": self.component_key,
511 "component_type": "remap",
512 "component_kind": "transform",
513 })
514 }
515
516 fn annotate_dropped(&self, event: &mut Event, reason: &str, error: ExpressionError) {
517 match event {
518 Event::Log(log) => match log.namespace() {
519 LogNamespace::Legacy => {
520 if let Some(metadata_key) = log_schema().metadata_key() {
521 log.insert(
522 (PathPrefix::Event, metadata_key.concat(path!("dropped"))),
523 self.dropped_data(reason, error),
524 );
525 }
526 }
527 LogNamespace::Vector => {
528 log.insert(
529 metadata_path!("vector", "dropped"),
530 self.dropped_data(reason, error),
531 );
532 }
533 },
534 Event::Metric(metric) => {
535 if let Some(metadata_key) = log_schema().metadata_key() {
536 metric.replace_tag(format!("{metadata_key}.dropped.reason"), reason.into());
537 metric.replace_tag(
538 format!("{metadata_key}.dropped.component_id"),
539 self.component_key
540 .as_ref()
541 .map(ToString::to_string)
542 .unwrap_or_default(),
543 );
544 metric.replace_tag(
545 format!("{metadata_key}.dropped.component_type"),
546 "remap".into(),
547 );
548 metric.replace_tag(
549 format!("{metadata_key}.dropped.component_kind"),
550 "transform".into(),
551 );
552 }
553 }
554 Event::Trace(trace) => {
555 trace.maybe_insert(log_schema().metadata_key_target_path(), || {
556 self.dropped_data(reason, error).into()
557 });
558 }
559 }
560 }
561
562 fn run_vrl(&mut self, target: &mut VrlTarget) -> std::result::Result<Value, Terminate> {
563 self.runner.run(target, &self.program, &self.timezone)
564 }
565}
566
567impl<Runner> SyncTransform for Remap<Runner>
568where
569 Runner: VrlRunner + Clone + Send + Sync,
570{
571 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
572 let forward_on_error = !self.drop_on_error || self.reroute_dropped;
583 let forward_on_abort = !self.drop_on_abort || self.reroute_dropped;
584 let original_event = if (self.program.info().fallible && forward_on_error)
585 || (self.program.info().abortable && forward_on_abort)
586 {
587 Some(event.clone())
588 } else {
589 None
590 };
591
592 let log_namespace = event
593 .maybe_as_log()
594 .map(|log| log.namespace())
595 .unwrap_or(LogNamespace::Legacy);
596
597 let mut target = VrlTarget::new(
598 event,
599 self.program.info(),
600 match self.metric_tag_values {
601 MetricTagValues::Single => false,
602 MetricTagValues::Full => true,
603 },
604 );
605 let result = self.run_vrl(&mut target);
606
607 match result {
608 Ok(_) => match target.into_events(log_namespace) {
609 TargetEvents::One(event) => push_default(event, output),
610 TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)),
611 TargetEvents::Traces(events) => {
612 events.for_each(|event| push_default(event, output))
613 }
614 },
615 Err(reason) => {
616 let (reason, error, drop) = match reason {
617 Terminate::Abort(error) => {
618 if !self.reroute_dropped {
619 emit!(RemapMappingAbort {
620 event_dropped: self.drop_on_abort,
621 });
622 }
623 ("abort", error, self.drop_on_abort)
624 }
625 Terminate::Error(error) => {
626 if !self.reroute_dropped {
627 emit!(RemapMappingError {
628 error: error.to_string(),
629 event_dropped: self.drop_on_error,
630 });
631 }
632 ("error", error, self.drop_on_error)
633 }
634 };
635
636 if !drop {
637 let event = original_event.expect("event will be set");
638
639 push_default(event, output);
640 } else if self.reroute_dropped {
641 let mut event = original_event.expect("event will be set");
642
643 self.annotate_dropped(&mut event, reason, error);
644 push_dropped(event, output);
645 }
646 }
647 }
648 }
649}
650
651#[inline]
652fn push_default(event: Event, output: &mut TransformOutputsBuf) {
653 output.push(None, event)
654}
655
656#[inline]
657fn push_dropped(event: Event, output: &mut TransformOutputsBuf) {
658 output.push(Some(DROPPED), event);
659}
660
661#[derive(Debug, Snafu)]
662pub enum BuildError {
663 #[snafu(display("must provide exactly one of `source` or `file` or `files` configuration"))]
664 SourceAndOrFileOrFiles,
665
666 #[snafu(display("Could not open vrl program {:?}: {}", path, source))]
667 FileOpenFailed { path: PathBuf, source: io::Error },
668 #[snafu(display("Could not read vrl program {:?}: {}", path, source))]
669 FileReadFailed { path: PathBuf, source: io::Error },
670}
671
672#[cfg(test)]
673mod tests {
674 use std::{
675 collections::{HashMap, HashSet},
676 sync::Arc,
677 };
678
679 use chrono::DateTime;
680 use indoc::{formatdoc, indoc};
681 use tokio::sync::mpsc;
682 use tokio_stream::wrappers::ReceiverStream;
683 use vector_lib::{config::GlobalOptions, event::EventMetadata, metric_tags};
684 use vrl::{btreemap, event_path, value::kind::Collection};
685
686 use super::*;
687 use crate::{
688 config::{ConfigBuilder, build_unit_tests},
689 event::{
690 LogEvent, Metric, Value,
691 metric::{MetricKind, MetricValue},
692 },
693 metrics::Controller,
694 schema,
695 test_util::components::{
696 COMPONENT_MULTIPLE_OUTPUTS_TESTS, assert_transform_compliance, init_test,
697 },
698 transforms::{OutputBuffer, test::create_topology},
699 };
700
701 fn test_default_schema_definition() -> schema::Definition {
702 schema::Definition::empty_legacy_namespace().with_event_field(
703 &owned_value_path!("a default field"),
704 Kind::integer().or_bytes(),
705 Some("default"),
706 )
707 }
708
709 fn test_dropped_schema_definition() -> schema::Definition {
710 schema::Definition::empty_legacy_namespace().with_event_field(
711 &owned_value_path!("a dropped field"),
712 Kind::boolean().or_null(),
713 Some("dropped"),
714 )
715 }
716
717 fn remap(config: RemapConfig) -> Result<Remap<AstRunner>> {
718 let schema_definitions = HashMap::from([
719 (
720 None,
721 [("source".into(), test_default_schema_definition())].into(),
722 ),
723 (
724 Some(DROPPED.to_owned()),
725 [("source".into(), test_dropped_schema_definition())].into(),
726 ),
727 ]);
728
729 Remap::new_ast(config, &TransformContext::new_test(schema_definitions))
730 .map(|(remap, _)| remap)
731 }
732
733 #[test]
734 fn generate_config() {
735 crate::test_util::test_generate_config::<RemapConfig>();
736 }
737
738 #[test]
739 fn config_missing_source_and_file() {
740 let config = RemapConfig {
741 source: None,
742 file: None,
743 ..Default::default()
744 };
745
746 let err = remap(config).unwrap_err().to_string();
747 assert_eq!(
748 &err,
749 "must provide exactly one of `source` or `file` or `files` configuration"
750 )
751 }
752
753 #[test]
754 fn config_both_source_and_file() {
755 let config = RemapConfig {
756 source: Some("".to_owned()),
757 file: Some("".into()),
758 ..Default::default()
759 };
760
761 let err = remap(config).unwrap_err().to_string();
762 assert_eq!(
763 &err,
764 "must provide exactly one of `source` or `file` or `files` configuration"
765 )
766 }
767
768 fn get_field_string(event: &Event, field: &str) -> String {
769 event
770 .as_log()
771 .get(field)
772 .unwrap()
773 .to_string_lossy()
774 .into_owned()
775 }
776
777 #[test]
778 fn check_remap_doesnt_share_state_between_events() {
779 let conf = RemapConfig {
780 source: Some(".foo = .sentinel".to_string()),
781 file: None,
782 drop_on_error: true,
783 drop_on_abort: false,
784 ..Default::default()
785 };
786 let mut tform = remap(conf).unwrap();
787 assert!(tform.runner().runtime.is_empty());
788
789 let event1 = {
790 let mut event1 = LogEvent::from("event1");
791 event1.insert("sentinel", "bar");
792 Event::from(event1)
793 };
794 let result1 = transform_one(&mut tform, event1).unwrap();
795 assert_eq!(get_field_string(&result1, "message"), "event1");
796 assert_eq!(get_field_string(&result1, "foo"), "bar");
797 assert!(tform.runner().runtime.is_empty());
798
799 let event2 = {
800 let event2 = LogEvent::from("event2");
801 Event::from(event2)
802 };
803 let result2 = transform_one(&mut tform, event2).unwrap();
804 assert_eq!(get_field_string(&result2, "message"), "event2");
805 assert_eq!(result2.as_log().get("foo"), Some(&Value::Null));
806 assert!(tform.runner().runtime.is_empty());
807 }
808
809 #[test]
810 fn remap_return_raw_string_vector_namespace() {
811 let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into());
812
813 let event = {
814 let mut metadata = EventMetadata::default()
815 .with_schema_definition(&Arc::new(initial_definition.clone()));
816 metadata
818 .value_mut()
819 .insert(&owned_value_path!("vector"), BTreeMap::new());
820
821 let mut event = LogEvent::new_with_metadata(metadata);
822 event.insert("copy_from", "buz");
823 Event::from(event)
824 };
825
826 let conf = RemapConfig {
827 source: Some(r#" . = "root string";"#.to_string()),
828 file: None,
829 drop_on_error: true,
830 drop_on_abort: false,
831 ..Default::default()
832 };
833 let mut tform = remap(conf.clone()).unwrap();
834 let result = transform_one(&mut tform, event).unwrap();
835 assert_eq!(get_field_string(&result, "."), "root string");
836
837 let mut outputs = conf.outputs(
838 &Default::default(),
839 &[(OutputId::dummy(), initial_definition)],
840 );
841
842 assert_eq!(outputs.len(), 1);
843 let output = outputs.pop().unwrap();
844 assert_eq!(output.port, None);
845 let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
846 let expected_schema =
847 Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
848 assert_eq!(actual_schema_def, expected_schema);
849 }
850
851 #[test]
852 fn check_remap_adds() {
853 let event = {
854 let mut event = LogEvent::from("augment me");
855 event.insert("copy_from", "buz");
856 Event::from(event)
857 };
858
859 let conf = RemapConfig {
860 source: Some(
861 r#" .foo = "bar"
862 .bar = "baz"
863 .copy = .copy_from
864"#
865 .to_string(),
866 ),
867 file: None,
868 drop_on_error: true,
869 drop_on_abort: false,
870 ..Default::default()
871 };
872 let mut tform = remap(conf).unwrap();
873 let result = transform_one(&mut tform, event).unwrap();
874 assert_eq!(get_field_string(&result, "message"), "augment me");
875 assert_eq!(get_field_string(&result, "copy_from"), "buz");
876 assert_eq!(get_field_string(&result, "foo"), "bar");
877 assert_eq!(get_field_string(&result, "bar"), "baz");
878 assert_eq!(get_field_string(&result, "copy"), "buz");
879 }
880
881 #[test]
882 fn check_remap_emits_multiple() {
883 let event = {
884 let mut event = LogEvent::from("augment me");
885 event.insert(
886 "events",
887 vec![btreemap!("message" => "foo"), btreemap!("message" => "bar")],
888 );
889 Event::from(event)
890 };
891
892 let conf = RemapConfig {
893 source: Some(
894 indoc! {r"
895 . = .events
896 "}
897 .to_owned(),
898 ),
899 file: None,
900 drop_on_error: true,
901 drop_on_abort: false,
902 ..Default::default()
903 };
904 let mut tform = remap(conf).unwrap();
905
906 let out = collect_outputs(&mut tform, event);
907 assert_eq!(2, out.primary.len());
908 let mut result = out.primary.into_events();
909
910 let r = result.next().unwrap();
911 assert_eq!(get_field_string(&r, "message"), "foo");
912 let r = result.next().unwrap();
913 assert_eq!(get_field_string(&r, "message"), "bar");
914 }
915
916 #[test]
917 fn check_remap_error() {
918 let event = {
919 let mut event = Event::Log(LogEvent::from("augment me"));
920 event.as_mut_log().insert("bar", "is a string");
921 event
922 };
923
924 let conf = RemapConfig {
925 source: Some(formatdoc! {r#"
926 .foo = "foo"
927 .not_an_int = int!(.bar)
928 .baz = 12
929 "#}),
930 file: None,
931 drop_on_error: false,
932 drop_on_abort: false,
933 ..Default::default()
934 };
935 let mut tform = remap(conf).unwrap();
936
937 let event = transform_one(&mut tform, event).unwrap();
938
939 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
940 assert!(event.as_log().get("foo").is_none());
941 assert!(event.as_log().get("baz").is_none());
942 }
943
944 #[test]
945 fn check_remap_error_drop() {
946 let event = {
947 let mut event = Event::Log(LogEvent::from("augment me"));
948 event.as_mut_log().insert("bar", "is a string");
949 event
950 };
951
952 let conf = RemapConfig {
953 source: Some(formatdoc! {r#"
954 .foo = "foo"
955 .not_an_int = int!(.bar)
956 .baz = 12
957 "#}),
958 file: None,
959 drop_on_error: true,
960 drop_on_abort: false,
961 ..Default::default()
962 };
963 let mut tform = remap(conf).unwrap();
964
965 assert!(transform_one(&mut tform, event).is_none())
966 }
967
968 #[test]
969 fn check_remap_error_infallible() {
970 let event = {
971 let mut event = Event::Log(LogEvent::from("augment me"));
972 event.as_mut_log().insert("bar", "is a string");
973 event
974 };
975
976 let conf = RemapConfig {
977 source: Some(formatdoc! {r#"
978 .foo = "foo"
979 .baz = 12
980 "#}),
981 file: None,
982 drop_on_error: false,
983 drop_on_abort: false,
984 ..Default::default()
985 };
986 let mut tform = remap(conf).unwrap();
987
988 let event = transform_one(&mut tform, event).unwrap();
989
990 assert_eq!(event.as_log().get("foo"), Some(&Value::from("foo")));
991 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
992 assert_eq!(event.as_log().get("baz"), Some(&Value::from(12)));
993 }
994
995 #[test]
996 fn check_remap_abort() {
997 let event = {
998 let mut event = Event::Log(LogEvent::from("augment me"));
999 event.as_mut_log().insert("bar", "is a string");
1000 event
1001 };
1002
1003 let conf = RemapConfig {
1004 source: Some(formatdoc! {r#"
1005 .foo = "foo"
1006 abort
1007 .baz = 12
1008 "#}),
1009 file: None,
1010 drop_on_error: false,
1011 drop_on_abort: false,
1012 ..Default::default()
1013 };
1014 let mut tform = remap(conf).unwrap();
1015
1016 let event = transform_one(&mut tform, event).unwrap();
1017
1018 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
1019 assert!(event.as_log().get("foo").is_none());
1020 assert!(event.as_log().get("baz").is_none());
1021 }
1022
1023 #[test]
1024 fn check_remap_abort_drop() {
1025 let event = {
1026 let mut event = Event::Log(LogEvent::from("augment me"));
1027 event.as_mut_log().insert("bar", "is a string");
1028 event
1029 };
1030
1031 let conf = RemapConfig {
1032 source: Some(formatdoc! {r#"
1033 .foo = "foo"
1034 abort
1035 .baz = 12
1036 "#}),
1037 file: None,
1038 drop_on_error: false,
1039 drop_on_abort: true,
1040 ..Default::default()
1041 };
1042 let mut tform = remap(conf).unwrap();
1043
1044 assert!(transform_one(&mut tform, event).is_none())
1045 }
1046
1047 #[test]
1048 fn check_remap_metric() {
1049 let metric = Event::Metric(Metric::new(
1050 "counter",
1051 MetricKind::Absolute,
1052 MetricValue::Counter { value: 1.0 },
1053 ));
1054 let metadata = metric.metadata().clone();
1055
1056 let conf = RemapConfig {
1057 source: Some(
1058 r#".tags.host = "zoobub"
1059 .name = "zork"
1060 .namespace = "zerk"
1061 .kind = "incremental""#
1062 .to_string(),
1063 ),
1064 file: None,
1065 drop_on_error: true,
1066 drop_on_abort: false,
1067 ..Default::default()
1068 };
1069 let mut tform = remap(conf).unwrap();
1070
1071 let result = transform_one(&mut tform, metric).unwrap();
1072 assert_eq!(
1073 result,
1074 Event::Metric(
1075 Metric::new_with_metadata(
1076 "zork",
1077 MetricKind::Incremental,
1078 MetricValue::Counter { value: 1.0 },
1079 metadata
1082 )
1083 .with_namespace(Some("zerk"))
1084 .with_tags(Some(metric_tags! {
1085 "host" => "zoobub",
1086 }))
1087 )
1088 );
1089 }
1090
1091 #[test]
1092 fn remap_timezone_fallback() {
1093 let error = Event::from_json_value(
1094 serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1095 LogNamespace::Legacy,
1096 )
1097 .unwrap();
1098 let conf = RemapConfig {
1099 source: Some(formatdoc! {r#"
1100 .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1101 "#}),
1102 drop_on_error: true,
1103 drop_on_abort: true,
1104 reroute_dropped: true,
1105 ..Default::default()
1106 };
1107 let context = TransformContext {
1108 key: Some(ComponentKey::from("remapper")),
1109 globals: GlobalOptions {
1110 timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1111 ..Default::default()
1112 },
1113 ..Default::default()
1114 };
1115 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1116
1117 let output = transform_one_fallible(&mut tform, error).unwrap();
1118 let log = output.as_log();
1119 assert_eq!(
1120 log["timestamp"],
1121 DateTime::<chrono::Utc>::from(
1122 DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1123 )
1124 .into()
1125 );
1126 }
1127
1128 #[test]
1129 fn remap_timezone_override() {
1130 let error = Event::from_json_value(
1131 serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1132 LogNamespace::Legacy,
1133 )
1134 .unwrap();
1135 let conf = RemapConfig {
1136 source: Some(formatdoc! {r#"
1137 .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1138 "#}),
1139 drop_on_error: true,
1140 drop_on_abort: true,
1141 reroute_dropped: true,
1142 timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1143 ..Default::default()
1144 };
1145 let context = TransformContext {
1146 key: Some(ComponentKey::from("remapper")),
1147 globals: GlobalOptions {
1148 timezone: Some(TimeZone::parse("Etc/UTC").unwrap()),
1149 ..Default::default()
1150 },
1151 ..Default::default()
1152 };
1153 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1154
1155 let output = transform_one_fallible(&mut tform, error).unwrap();
1156 let log = output.as_log();
1157 assert_eq!(
1158 log["timestamp"],
1159 DateTime::<chrono::Utc>::from(
1160 DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1161 )
1162 .into()
1163 );
1164 }
1165
1166 #[test]
1167 fn check_remap_branching() {
1168 let happy =
1169 Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1170 .unwrap();
1171 let abort = Event::from_json_value(
1172 serde_json::json!({"hello": "goodbye"}),
1173 LogNamespace::Legacy,
1174 )
1175 .unwrap();
1176 let error =
1177 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1178
1179 let happy_metric = {
1180 let mut metric = Metric::new(
1181 "counter",
1182 MetricKind::Absolute,
1183 MetricValue::Counter { value: 1.0 },
1184 );
1185 metric.replace_tag("hello".into(), "world".into());
1186 Event::Metric(metric)
1187 };
1188
1189 let abort_metric = {
1190 let mut metric = Metric::new(
1191 "counter",
1192 MetricKind::Absolute,
1193 MetricValue::Counter { value: 1.0 },
1194 );
1195 metric.replace_tag("hello".into(), "goodbye".into());
1196 Event::Metric(metric)
1197 };
1198
1199 let error_metric = {
1200 let mut metric = Metric::new(
1201 "counter",
1202 MetricKind::Absolute,
1203 MetricValue::Counter { value: 1.0 },
1204 );
1205 metric.replace_tag("not_hello".into(), "oops".into());
1206 Event::Metric(metric)
1207 };
1208
1209 let conf = RemapConfig {
1210 source: Some(formatdoc! {r#"
1211 if exists(.tags) {{
1212 # metrics
1213 .tags.foo = "bar"
1214 if string!(.tags.hello) == "goodbye" {{
1215 abort
1216 }}
1217 }} else {{
1218 # logs
1219 .foo = "bar"
1220 if string(.hello) == "goodbye" {{
1221 abort
1222 }}
1223 }}
1224 "#}),
1225 drop_on_error: true,
1226 drop_on_abort: true,
1227 reroute_dropped: true,
1228 ..Default::default()
1229 };
1230 let schema_definitions = HashMap::from([
1231 (
1232 None,
1233 [("source".into(), test_default_schema_definition())].into(),
1234 ),
1235 (
1236 Some(DROPPED.to_owned()),
1237 [("source".into(), test_dropped_schema_definition())].into(),
1238 ),
1239 ]);
1240 let context = TransformContext {
1241 key: Some(ComponentKey::from("remapper")),
1242 schema_definitions,
1243 merged_schema_definition: schema::Definition::new_with_default_metadata(
1244 Kind::any_object(),
1245 [LogNamespace::Legacy],
1246 )
1247 .with_event_field(&owned_value_path!("hello"), Kind::bytes(), None),
1248 ..Default::default()
1249 };
1250 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1251
1252 let output = transform_one_fallible(&mut tform, happy).unwrap();
1253 let log = output.as_log();
1254 assert_eq!(log["hello"], "world".into());
1255 assert_eq!(log["foo"], "bar".into());
1256 assert!(!log.contains(event_path!("metadata")));
1257
1258 let output = transform_one_fallible(&mut tform, abort).unwrap_err();
1259 let log = output.as_log();
1260 assert_eq!(log["hello"], "goodbye".into());
1261 assert!(!log.contains(event_path!("foo")));
1262 assert_eq!(
1263 log["metadata"],
1264 serde_json::json!({
1265 "dropped": {
1266 "reason": "abort",
1267 "message": "aborted",
1268 "component_id": "remapper",
1269 "component_type": "remap",
1270 "component_kind": "transform",
1271 }
1272 })
1273 .try_into()
1274 .unwrap()
1275 );
1276
1277 let output = transform_one_fallible(&mut tform, error).unwrap_err();
1278 let log = output.as_log();
1279 assert_eq!(log["hello"], 42.into());
1280 assert!(!log.contains(event_path!("foo")));
1281 assert_eq!(
1282 log["metadata"],
1283 serde_json::json!({
1284 "dropped": {
1285 "reason": "error",
1286 "message": "function call error for \"string\" at (160:174): expected string, got integer",
1287 "component_id": "remapper",
1288 "component_type": "remap",
1289 "component_kind": "transform",
1290 }
1291 })
1292 .try_into()
1293 .unwrap()
1294 );
1295
1296 let output = transform_one_fallible(&mut tform, happy_metric).unwrap();
1297 similar_asserts::assert_eq!(
1298 output,
1299 Event::Metric(
1300 Metric::new_with_metadata(
1301 "counter",
1302 MetricKind::Absolute,
1303 MetricValue::Counter { value: 1.0 },
1304 EventMetadata::default()
1307 .with_schema_definition(output.metadata().schema_definition()),
1308 )
1309 .with_tags(Some(metric_tags! {
1310 "hello" => "world",
1311 "foo" => "bar",
1312 }))
1313 )
1314 );
1315
1316 let output = transform_one_fallible(&mut tform, abort_metric).unwrap_err();
1317 similar_asserts::assert_eq!(
1318 output,
1319 Event::Metric(
1320 Metric::new_with_metadata(
1321 "counter",
1322 MetricKind::Absolute,
1323 MetricValue::Counter { value: 1.0 },
1324 EventMetadata::default()
1327 .with_schema_definition(output.metadata().schema_definition()),
1328 )
1329 .with_tags(Some(metric_tags! {
1330 "hello" => "goodbye",
1331 "metadata.dropped.reason" => "abort",
1332 "metadata.dropped.component_id" => "remapper",
1333 "metadata.dropped.component_type" => "remap",
1334 "metadata.dropped.component_kind" => "transform",
1335 }))
1336 )
1337 );
1338
1339 let output = transform_one_fallible(&mut tform, error_metric).unwrap_err();
1340 similar_asserts::assert_eq!(
1341 output,
1342 Event::Metric(
1343 Metric::new_with_metadata(
1344 "counter",
1345 MetricKind::Absolute,
1346 MetricValue::Counter { value: 1.0 },
1347 EventMetadata::default()
1350 .with_schema_definition(output.metadata().schema_definition()),
1351 )
1352 .with_tags(Some(metric_tags! {
1353 "not_hello" => "oops",
1354 "metadata.dropped.reason" => "error",
1355 "metadata.dropped.component_id" => "remapper",
1356 "metadata.dropped.component_type" => "remap",
1357 "metadata.dropped.component_kind" => "transform",
1358 }))
1359 )
1360 );
1361 }
1362
1363 #[test]
1364 fn check_remap_branching_assert_with_message() {
1365 let error_trigger_assert_custom_message =
1366 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1367 let error_trigger_default_assert_message =
1368 Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
1369 let conf = RemapConfig {
1370 source: Some(formatdoc! {r#"
1371 assert_eq!(.hello, 0, "custom message here")
1372 assert_eq!(.hello, 1)
1373 "#}),
1374 drop_on_error: true,
1375 drop_on_abort: true,
1376 reroute_dropped: true,
1377 ..Default::default()
1378 };
1379 let context = TransformContext {
1380 key: Some(ComponentKey::from("remapper")),
1381 ..Default::default()
1382 };
1383 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1384
1385 let output =
1386 transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err();
1387 let log = output.as_log();
1388 assert_eq!(log["hello"], 42.into());
1389 assert!(!log.contains(event_path!("foo")));
1390 assert_eq!(
1391 log["metadata"],
1392 serde_json::json!({
1393 "dropped": {
1394 "reason": "error",
1395 "message": "custom message here",
1396 "component_id": "remapper",
1397 "component_type": "remap",
1398 "component_kind": "transform",
1399 }
1400 })
1401 .try_into()
1402 .unwrap()
1403 );
1404
1405 let output =
1406 transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err();
1407 let log = output.as_log();
1408 assert_eq!(log["hello"], 0.into());
1409 assert!(!log.contains(event_path!("foo")));
1410 assert_eq!(
1411 log["metadata"],
1412 serde_json::json!({
1413 "dropped": {
1414 "reason": "error",
1415 "message": "function call error for \"assert_eq\" at (45:66): assertion failed: 0 == 1",
1416 "component_id": "remapper",
1417 "component_type": "remap",
1418 "component_kind": "transform",
1419 }
1420 })
1421 .try_into()
1422 .unwrap()
1423 );
1424 }
1425
1426 #[test]
1427 fn check_remap_branching_abort_with_message() {
1428 let error =
1429 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1430 let conf = RemapConfig {
1431 source: Some(formatdoc! {r#"
1432 abort "custom message here"
1433 "#}),
1434 drop_on_error: true,
1435 drop_on_abort: true,
1436 reroute_dropped: true,
1437 ..Default::default()
1438 };
1439 let context = TransformContext {
1440 key: Some(ComponentKey::from("remapper")),
1441 ..Default::default()
1442 };
1443 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1444
1445 let output = transform_one_fallible(&mut tform, error).unwrap_err();
1446 let log = output.as_log();
1447 assert_eq!(log["hello"], 42.into());
1448 assert!(!log.contains(event_path!("foo")));
1449 assert_eq!(
1450 log["metadata"],
1451 serde_json::json!({
1452 "dropped": {
1453 "reason": "abort",
1454 "message": "custom message here",
1455 "component_id": "remapper",
1456 "component_type": "remap",
1457 "component_kind": "transform",
1458 }
1459 })
1460 .try_into()
1461 .unwrap()
1462 );
1463 }
1464
1465 #[test]
1466 fn check_remap_branching_disabled() {
1467 let happy =
1468 Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1469 .unwrap();
1470 let abort = Event::from_json_value(
1471 serde_json::json!({"hello": "goodbye"}),
1472 LogNamespace::Legacy,
1473 )
1474 .unwrap();
1475 let error =
1476 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1477
1478 let conf = RemapConfig {
1479 source: Some(formatdoc! {r#"
1480 if exists(.tags) {{
1481 # metrics
1482 .tags.foo = "bar"
1483 if string!(.tags.hello) == "goodbye" {{
1484 abort
1485 }}
1486 }} else {{
1487 # logs
1488 .foo = "bar"
1489 if string!(.hello) == "goodbye" {{
1490 abort
1491 }}
1492 }}
1493 "#}),
1494 drop_on_error: true,
1495 drop_on_abort: true,
1496 reroute_dropped: false,
1497 ..Default::default()
1498 };
1499
1500 let schema_definition = schema::Definition::new_with_default_metadata(
1501 Kind::any_object(),
1502 [LogNamespace::Legacy],
1503 )
1504 .with_event_field(&owned_value_path!("foo"), Kind::any(), None)
1505 .with_event_field(&owned_value_path!("tags"), Kind::any(), None);
1506
1507 assert_eq!(
1508 conf.outputs(
1509 &Default::default(),
1510 &[(
1511 "test".into(),
1512 schema::Definition::new_with_default_metadata(
1513 Kind::any_object(),
1514 [LogNamespace::Legacy]
1515 )
1516 )],
1517 ),
1518 vec![TransformOutput::new(
1519 DataType::all_bits(),
1520 [("test".into(), schema_definition)].into()
1521 )]
1522 );
1523
1524 let context = TransformContext {
1525 key: Some(ComponentKey::from("remapper")),
1526 ..Default::default()
1527 };
1528 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1529
1530 let output = transform_one_fallible(&mut tform, happy).unwrap();
1531 let log = output.as_log();
1532 assert_eq!(log["hello"], "world".into());
1533 assert_eq!(log["foo"], "bar".into());
1534 assert!(!log.contains(event_path!("metadata")));
1535
1536 let out = collect_outputs(&mut tform, abort);
1537 assert!(out.primary.is_empty());
1538 assert!(out.named[DROPPED].is_empty());
1539
1540 let out = collect_outputs(&mut tform, error);
1541 assert!(out.primary.is_empty());
1542 assert!(out.named[DROPPED].is_empty());
1543 }
1544
1545 #[tokio::test]
1546 async fn check_remap_branching_metrics_with_output() {
1547 init_test();
1548
1549 let config: ConfigBuilder = serde_yaml::from_str(indoc! {"
1550 transforms:
1551 foo:
1552 inputs: []
1553 type: remap
1554 drop_on_abort: true
1555 reroute_dropped: true
1556 source: abort
1557 tests:
1558 - name: metric output
1559 input:
1560 insert_at: foo
1561 value: none
1562 outputs:
1563 - extract_from: foo.dropped
1564 conditions:
1565 - type: vrl
1566 source: \"true\"
1567 "})
1568 .unwrap();
1569
1570 let mut tests = build_unit_tests(config).await.unwrap();
1571 assert!(tests.remove(0).run().await.errors.is_empty());
1572 COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
1574 }
1575
1576 struct CollectedOutput {
1577 primary: OutputBuffer,
1578 named: HashMap<String, OutputBuffer>,
1579 }
1580
1581 fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOutput {
1582 let mut outputs = TransformOutputsBuf::new_with_capacity(
1583 vec![
1584 TransformOutput::new(DataType::all_bits(), HashMap::new()),
1585 TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1586 ],
1587 1,
1588 );
1589
1590 ft.transform(event, &mut outputs);
1591
1592 CollectedOutput {
1593 primary: outputs.take_primary(),
1594 named: outputs.take_all_named(),
1595 }
1596 }
1597
1598 fn transform_one(ft: &mut dyn SyncTransform, event: Event) -> Option<Event> {
1599 let out = collect_outputs(ft, event);
1600 assert_eq!(0, out.named.values().map(|v| v.len()).sum::<usize>());
1601 assert!(out.primary.len() <= 1);
1602 out.primary.into_events().next()
1603 }
1604
1605 fn transform_one_fallible(
1606 ft: &mut dyn SyncTransform,
1607 event: Event,
1608 ) -> std::result::Result<Event, Event> {
1609 let mut outputs = TransformOutputsBuf::new_with_capacity(
1610 vec![
1611 TransformOutput::new(DataType::all_bits(), HashMap::new()),
1612 TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1613 ],
1614 1,
1615 );
1616
1617 ft.transform(event, &mut outputs);
1618
1619 let mut buf = outputs.drain().collect::<Vec<_>>();
1620 let mut err_buf = outputs.drain_named(DROPPED).collect::<Vec<_>>();
1621
1622 assert!(buf.len() < 2);
1623 assert!(err_buf.len() < 2);
1624 match (buf.pop(), err_buf.pop()) {
1625 (Some(good), None) => Ok(good),
1626 (None, Some(bad)) => Err(bad),
1627 (a, b) => panic!("expected output xor error output, got {a:?} and {b:?}"),
1628 }
1629 }
1630
1631 #[tokio::test]
1632 async fn emits_internal_events() {
1633 assert_transform_compliance(async move {
1634 let config = RemapConfig {
1635 source: Some("abort".to_owned()),
1636 drop_on_abort: true,
1637 ..Default::default()
1638 };
1639
1640 let (tx, rx) = mpsc::channel(1);
1641 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1642
1643 let log = LogEvent::from("hello world");
1644 tx.send(log.into()).await.unwrap();
1645
1646 drop(tx);
1647 topology.stop().await;
1648 assert_eq!(out.recv().await, None);
1649 })
1650 .await
1651 }
1652
1653 #[test]
1654 fn test_combined_transforms_simple() {
1655 let transform1 = RemapConfig {
1660 source: Some(r#".thing = "potato""#.to_string()),
1661 ..Default::default()
1662 };
1663
1664 let transform2 = RemapConfig {
1665 source: Some(".thang = .thing".to_string()),
1666 ..Default::default()
1667 };
1668
1669 let outputs1 = transform1.outputs(
1670 &Default::default(),
1671 &[("in".into(), schema::Definition::default_legacy_namespace())],
1672 );
1673
1674 assert_eq!(
1675 vec![TransformOutput::new(
1676 DataType::all_bits(),
1677 [(
1679 "in".into(),
1680 Definition::default_legacy_namespace().with_event_field(
1681 &owned_value_path!("thing"),
1682 Kind::bytes(),
1683 None
1684 ),
1685 )]
1686 .into()
1687 )],
1688 outputs1
1689 );
1690
1691 let outputs2 = transform2.outputs(
1692 &Default::default(),
1693 &[(
1694 "in1".into(),
1695 outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1696 )],
1697 );
1698
1699 assert_eq!(
1700 vec![TransformOutput::new(
1701 DataType::all_bits(),
1702 [(
1703 "in1".into(),
1704 Definition::default_legacy_namespace()
1705 .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None)
1706 .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None),
1707 )]
1708 .into(),
1709 )],
1710 outputs2
1711 );
1712 }
1713
1714 #[test]
1715 fn test_combined_transforms_unnest() {
1716 let transform1 = RemapConfig {
1721 source: Some(
1722 indoc! {
1723 r#"
1724 .thing = [{"cabbage": 32}, {"parsnips": 45}]
1725 . = unnest(.thing)
1726 "#
1727 }
1728 .to_string(),
1729 ),
1730 ..Default::default()
1731 };
1732
1733 let transform2 = RemapConfig {
1734 source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()),
1735 ..Default::default()
1736 };
1737
1738 let outputs1 = transform1.outputs(
1739 &Default::default(),
1740 &[(
1741 "in".into(),
1742 schema::Definition::new_with_default_metadata(
1743 Kind::any_object(),
1744 [LogNamespace::Legacy],
1745 ),
1746 )],
1747 );
1748
1749 assert_eq!(
1750 vec![TransformOutput::new(
1751 DataType::all_bits(),
1752 [(
1753 "in".into(),
1754 Definition::new_with_default_metadata(
1755 Kind::any_object(),
1756 [LogNamespace::Legacy]
1757 )
1758 .with_event_field(
1759 &owned_value_path!("thing"),
1760 Kind::object(Collection::from(BTreeMap::from([
1761 ("cabbage".into(), Kind::integer().or_undefined(),),
1762 ("parsnips".into(), Kind::integer().or_undefined(),)
1763 ]))),
1764 None
1765 ),
1766 )]
1767 .into(),
1768 )],
1769 outputs1
1770 );
1771
1772 let outputs2 = transform2.outputs(
1773 &Default::default(),
1774 &[(
1775 "in1".into(),
1776 outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1777 )],
1778 );
1779
1780 assert_eq!(
1781 vec![TransformOutput::new(
1782 DataType::all_bits(),
1783 [(
1784 "in1".into(),
1785 Definition::default_legacy_namespace()
1786 .with_event_field(
1787 &owned_value_path!("thing"),
1788 Kind::object(Collection::from(BTreeMap::from([
1789 ("cabbage".into(), Kind::integer().or_undefined(),),
1790 ("parsnips".into(), Kind::integer().or_undefined(),)
1791 ]))),
1792 None
1793 )
1794 .with_event_field(
1795 &owned_value_path!("thang"),
1796 Kind::integer().or_null(),
1797 None
1798 ),
1799 )]
1800 .into(),
1801 )],
1802 outputs2
1803 );
1804 }
1805
1806 #[test]
1807 fn test_transform_abort() {
1808 let transform1 = RemapConfig {
1811 source: Some(r"abort".to_string()),
1812 ..Default::default()
1813 };
1814
1815 let outputs1 = transform1.outputs(
1816 &Default::default(),
1817 &[(
1818 "in".into(),
1819 schema::Definition::new_with_default_metadata(
1820 Kind::any_object(),
1821 [LogNamespace::Legacy],
1822 ),
1823 )],
1824 );
1825
1826 assert_eq!(
1827 vec![TransformOutput::new(
1828 DataType::all_bits(),
1829 [(
1830 "in".into(),
1831 Definition::new_with_default_metadata(
1832 Kind::any_object(),
1833 [LogNamespace::Legacy]
1834 ),
1835 )]
1836 .into(),
1837 )],
1838 outputs1
1839 );
1840 }
1841
1842 #[test]
1843 fn test_error_outputs() {
1844 let transform1 = RemapConfig {
1849 source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()),
1851 reroute_dropped: true,
1852 ..Default::default()
1853 };
1854
1855 let outputs1 = transform1.outputs(
1856 &Default::default(),
1857 &[(
1858 "in".into(),
1859 schema::Definition::new_with_default_metadata(
1860 Kind::any_object(),
1861 [LogNamespace::Legacy],
1862 ),
1863 )],
1864 );
1865
1866 assert_eq!(
1867 HashSet::from([None, Some("dropped".to_string())]),
1868 outputs1
1869 .into_iter()
1870 .map(|output| output.port)
1871 .collect::<HashSet<_>>()
1872 );
1873 }
1874
1875 #[test]
1876 fn test_non_object_events() {
1877 let transform1 = RemapConfig {
1878 source: Some(r#". = "fish" "#.to_string()),
1880 ..Default::default()
1881 };
1882
1883 let outputs1 = transform1.outputs(
1884 &Default::default(),
1885 &[(
1886 "in".into(),
1887 schema::Definition::new_with_default_metadata(
1888 Kind::any_object(),
1889 [LogNamespace::Legacy],
1890 ),
1891 )],
1892 );
1893
1894 let wanted = schema::Definition::new_with_default_metadata(
1895 Kind::object(Collection::from_unknown(Kind::undefined())),
1896 [LogNamespace::Legacy],
1897 )
1898 .with_event_field(&owned_value_path!("message"), Kind::bytes(), None);
1899
1900 assert_eq!(
1901 HashMap::from([(OutputId::from("in"), wanted)]),
1902 outputs1[0].schema_definitions(true),
1903 );
1904 }
1905
1906 #[test]
1907 fn test_array_and_non_object_events() {
1908 let transform1 = RemapConfig {
1909 source: Some(
1910 indoc! {r#"
1911 if .lizard == true {
1912 .thing = [{"cabbage": 42}];
1913 . = unnest(.thing)
1914 } else {
1915 . = "fish"
1916 }
1917 "#}
1918 .to_string(),
1919 ),
1920 ..Default::default()
1921 };
1922
1923 let outputs1 = transform1.outputs(
1924 &Default::default(),
1925 &[(
1926 "in".into(),
1927 schema::Definition::new_with_default_metadata(
1928 Kind::any_object(),
1929 [LogNamespace::Legacy],
1930 ),
1931 )],
1932 );
1933
1934 let wanted = schema::Definition::new_with_default_metadata(
1935 Kind::any_object(),
1936 [LogNamespace::Legacy],
1937 )
1938 .with_event_field(&owned_value_path!("message"), Kind::any(), None)
1939 .with_event_field(
1940 &owned_value_path!("thing"),
1941 Kind::object(Collection::from(BTreeMap::from([(
1942 "cabbage".into(),
1943 Kind::integer(),
1944 )])))
1945 .or_undefined(),
1946 None,
1947 );
1948
1949 assert_eq!(
1950 HashMap::from([(OutputId::from("in"), wanted)]),
1951 outputs1[0].schema_definitions(true),
1952 );
1953 }
1954
1955 #[test]
1956 fn check_remap_array_vector_namespace() {
1957 let event = {
1958 let mut event = LogEvent::from("input");
1959 event
1961 .metadata_mut()
1962 .value_mut()
1963 .insert("vector", BTreeMap::new());
1964 Event::from(event)
1965 };
1966
1967 let conf = RemapConfig {
1968 source: Some(
1969 r". = [null]
1970"
1971 .to_string(),
1972 ),
1973 file: None,
1974 drop_on_error: true,
1975 drop_on_abort: false,
1976 ..Default::default()
1977 };
1978 let mut tform = remap(conf.clone()).unwrap();
1979 let result = transform_one(&mut tform, event).unwrap();
1980
1981 assert_eq!(result.as_log().get("."), Some(&Value::Null));
1983
1984 let outputs1 = conf.outputs(
1985 &Default::default(),
1986 &[(
1987 "in".into(),
1988 schema::Definition::new_with_default_metadata(
1989 Kind::any_object(),
1990 [LogNamespace::Vector],
1991 ),
1992 )],
1993 );
1994
1995 let wanted =
1996 schema::Definition::new_with_default_metadata(Kind::null(), [LogNamespace::Vector]);
1997
1998 assert_eq!(
1999 HashMap::from([(OutputId::from("in"), wanted)]),
2000 outputs1[0].schema_definitions(true),
2001 );
2002 }
2003
2004 fn assert_no_metrics(source: String) {
2005 vector_lib::metrics::init_test();
2006
2007 let config = RemapConfig {
2008 source: Some(source),
2009 drop_on_error: true,
2010 drop_on_abort: true,
2011 reroute_dropped: true,
2012 ..Default::default()
2013 };
2014 let mut ast_runner = remap(config).unwrap();
2015 let input_event =
2016 Event::from_json_value(serde_json::json!({"a": 42}), LogNamespace::Vector).unwrap();
2017 let dropped_event = transform_one_fallible(&mut ast_runner, input_event).unwrap_err();
2018 let dropped_log = dropped_event.as_log();
2019 assert_eq!(dropped_log.get(event_path!("a")), Some(&Value::from(42)));
2020
2021 let controller = Controller::get().expect("no controller");
2022 let metrics = controller
2023 .capture_metrics()
2024 .into_iter()
2025 .map(|metric| (metric.name().to_string(), metric))
2026 .collect::<BTreeMap<String, Metric>>();
2027 assert_eq!(metrics.get("component_discarded_events_total"), None);
2028 assert_eq!(metrics.get("component_errors_total"), None);
2029 }
2030 #[test]
2031 fn do_not_emit_metrics_when_dropped() {
2032 assert_no_metrics("abort".to_string());
2033 }
2034
2035 #[test]
2036 fn do_not_emit_metrics_when_errored() {
2037 assert_no_metrics("parse_key_value!(.message)".to_string());
2038 }
2039}