1use std::{
2 collections::{HashMap, hash_map::Entry},
3 pin::Pin,
4 time::{Duration, Instant},
5};
6
7use futures::Stream;
8use indexmap::IndexMap;
9use vector_lib::stream::expiration_map::{Emitter, map_with_expiration};
10use vector_vrl_metrics::MetricsStorage;
11use vrl::{
12 path::{OwnedTargetPath, parse_target_path},
13 prelude::KeyString,
14};
15
16use crate::{
17 conditions::Condition,
18 event::{Event, EventMetadata, LogEvent, discriminant::Discriminant},
19 internal_events::{ReduceAddEventError, ReduceStaleEventFlushed},
20 transforms::{
21 TaskTransform,
22 reduce::{
23 config::ReduceConfig,
24 merge_strategy::{MergeStrategy, ReduceValueMerger, get_value_merger},
25 },
26 },
27};
28
29#[derive(Clone, Debug)]
30struct ReduceState {
31 events: usize,
32 fields: HashMap<OwnedTargetPath, Box<dyn ReduceValueMerger>>,
33 stale_since: Instant,
34 creation: Instant,
35 metadata: EventMetadata,
36}
37
38fn is_covered_by_strategy(
39 path: &OwnedTargetPath,
40 strategies: &IndexMap<OwnedTargetPath, MergeStrategy>,
41) -> bool {
42 let mut current = OwnedTargetPath::event_root();
43 for component in &path.path.segments {
44 current = current.with_field_appended(&component.to_string());
45 if strategies.contains_key(¤t) {
46 return true;
47 }
48 }
49 false
50}
51
52impl ReduceState {
53 fn new() -> Self {
54 Self {
55 events: 0,
56 stale_since: Instant::now(),
57 creation: Instant::now(),
58 fields: HashMap::new(),
59 metadata: EventMetadata::default(),
60 }
61 }
62
63 fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<OwnedTargetPath, MergeStrategy>) {
64 self.metadata.merge(e.metadata().clone());
65
66 for (path, strategy) in strategies {
67 if let Some(value) = e.get(path) {
68 match self.fields.entry(path.clone()) {
69 Entry::Vacant(entry) => match get_value_merger(value.clone(), strategy) {
70 Ok(m) => {
71 entry.insert(m);
72 }
73 Err(error) => {
74 warn!(message = "Failed to create value merger.", %error, %path);
75 }
76 },
77 Entry::Occupied(mut entry) => {
78 if let Err(error) = entry.get_mut().add(value.clone()) {
79 warn!(message = "Failed to merge value.", %error);
80 }
81 }
82 }
83 }
84 }
85
86 if let Some(fields_iter) = e.all_event_fields_skip_array_elements() {
87 for (path, value) in fields_iter {
88 let parsed_path = match parse_target_path(&path) {
90 Ok(path) => path,
91 Err(error) => {
92 emit!(ReduceAddEventError { error, path });
93 continue;
94 }
95 };
96 if is_covered_by_strategy(&parsed_path, strategies) {
97 continue;
98 }
99
100 let maybe_strategy = strategies.get(&parsed_path);
101 match self.fields.entry(parsed_path) {
102 Entry::Vacant(entry) => {
103 if let Some(strategy) = maybe_strategy {
104 match get_value_merger(value.clone(), strategy) {
105 Ok(m) => {
106 entry.insert(m);
107 }
108 Err(error) => {
109 warn!(message = "Failed to merge value.", %error);
110 }
111 }
112 } else {
113 entry.insert(value.clone().into());
114 }
115 }
116 Entry::Occupied(mut entry) => {
117 if let Err(error) = entry.get_mut().add(value.clone()) {
118 warn!(message = "Failed to merge value.", %error);
119 }
120 }
121 }
122 }
123 }
124 self.events += 1;
127 self.stale_since = Instant::now();
128 }
129
130 fn flush(mut self) -> LogEvent {
131 let mut event = LogEvent::new_with_metadata(self.metadata);
132 for (path, v) in self.fields.drain() {
133 if let Err(error) = v.insert_into(&path, &mut event) {
134 warn!(message = "Failed to merge values for field.", %error);
135 }
136 }
137 self.events = 0;
138 event
139 }
140}
141
142#[derive(Clone, Debug)]
143pub struct Reduce {
144 expire_after: Duration,
145 flush_period: Duration,
146 end_every_period: Option<Duration>,
147 group_by: Vec<String>,
148 merge_strategies: IndexMap<OwnedTargetPath, MergeStrategy>,
149 reduce_merge_states: HashMap<Discriminant, ReduceState>,
150 ends_when: Option<Condition>,
151 starts_when: Option<Condition>,
152 max_events: Option<usize>,
153}
154
155fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
156 for (path, _) in &strategies {
157 let contains_index = parse_target_path(path)
158 .map_err(|_| format!("Could not parse path: `{path}`"))?
159 .path
160 .segments
161 .iter()
162 .any(|segment| segment.is_index());
163 if contains_index {
164 return Err(format!(
165 "Merge strategies with indexes are currently not supported. Path: `{path}`"
166 )
167 .into());
168 }
169 }
170
171 Ok(())
172}
173
174impl Reduce {
175 pub fn new(
176 config: &ReduceConfig,
177 enrichment_tables: &vector_lib::enrichment::TableRegistry,
178 metrics_storage: &MetricsStorage,
179 ) -> crate::Result<Self> {
180 if config.ends_when.is_some() && config.starts_when.is_some() {
181 return Err("only one of `ends_when` and `starts_when` can be provided".into());
182 }
183
184 let ends_when = config
185 .ends_when
186 .as_ref()
187 .map(|c| c.build(enrichment_tables, metrics_storage))
188 .transpose()?;
189 let starts_when = config
190 .starts_when
191 .as_ref()
192 .map(|c| c.build(enrichment_tables, metrics_storage))
193 .transpose()?;
194 let group_by = config.group_by.clone().into_iter().collect();
195 let max_events = config.max_events.map(|max| max.into());
196
197 validate_merge_strategies(config.merge_strategies.clone())?;
198
199 Ok(Reduce {
200 expire_after: config.expire_after_ms,
201 flush_period: config.flush_period_ms,
202 end_every_period: config.end_every_period_ms,
203 group_by,
204 merge_strategies: config
205 .merge_strategies
206 .iter()
207 .filter_map(|(path, strategy)| {
208 let parsed_path = parse_target_path(path).ok();
212 if parsed_path.is_none() {
213 warn!(message = "Ignoring strategy with invalid path.", %path);
214 }
215 parsed_path.map(|path| (path, strategy.clone()))
216 })
217 .collect(),
218 reduce_merge_states: HashMap::new(),
219 ends_when,
220 starts_when,
221 max_events,
222 })
223 }
224
225 fn flush_into(&mut self, emitter: &mut Emitter<Event>) {
226 let mut flush_discriminants = Vec::new();
227 let now = Instant::now();
228 for (k, t) in &self.reduce_merge_states {
229 if let Some(period) = self.end_every_period
230 && (now - t.creation) >= period
231 {
232 flush_discriminants.push(k.clone());
233 }
234
235 if (now - t.stale_since) >= self.expire_after {
236 flush_discriminants.push(k.clone());
237 }
238 }
239 for k in &flush_discriminants {
240 if let Some(t) = self.reduce_merge_states.remove(k) {
241 emit!(ReduceStaleEventFlushed);
242 emitter.emit(Event::from(t.flush()));
243 }
244 }
245 }
246
247 fn flush_all_into(&mut self, emitter: &mut Emitter<Event>) {
248 self.reduce_merge_states
249 .drain()
250 .for_each(|(_, s)| emitter.emit(Event::from(s.flush())));
251 }
252
253 fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) {
254 match self.reduce_merge_states.entry(discriminant) {
255 Entry::Vacant(entry) => {
256 let mut state = ReduceState::new();
257 state.add_event(event, &self.merge_strategies);
258 entry.insert(state);
259 }
260 Entry::Occupied(mut entry) => {
261 entry.get_mut().add_event(event, &self.merge_strategies);
262 }
263 };
264 }
265
266 pub fn transform_one(&mut self, emitter: &mut Emitter<Event>, event: Event) {
267 let (starts_here, event) = match &self.starts_when {
268 Some(condition) => condition.check(event),
269 None => (false, event),
270 };
271
272 let (mut ends_here, event) = match &self.ends_when {
273 Some(condition) => condition.check(event),
274 None => (false, event),
275 };
276
277 let event = event.into_log();
278 let discriminant = Discriminant::from_log_event(&event, &self.group_by);
279
280 if let Some(max_events) = self.max_events {
281 if max_events == 1 {
282 ends_here = true;
283 } else if let Some(entry) = self.reduce_merge_states.get(&discriminant) {
284 if entry.events + 1 == max_events {
286 ends_here = true;
287 }
288 }
289 }
290
291 if starts_here {
292 if let Some(state) = self.reduce_merge_states.remove(&discriminant) {
293 emitter.emit(state.flush().into());
294 }
295
296 self.push_or_new_reduce_state(event, discriminant)
297 } else if ends_here {
298 emitter.emit(match self.reduce_merge_states.remove(&discriminant) {
299 Some(mut state) => {
300 state.add_event(event, &self.merge_strategies);
301 state.flush().into()
302 }
303 None => {
304 let mut state = ReduceState::new();
305 state.add_event(event, &self.merge_strategies);
306 state.flush().into()
307 }
308 });
309 } else {
310 self.push_or_new_reduce_state(event, discriminant)
311 }
312 }
313}
314
315impl TaskTransform<Event> for Reduce {
316 fn transform(
317 self: Box<Self>,
318 input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
319 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
320 where
321 Self: 'static,
322 {
323 let transform_fn = move |me: &mut Box<Reduce>, event, emitter: &mut Emitter<Event>| {
324 me.transform_one(emitter, event);
325 };
326
327 construct_output_stream(self, input_rx, transform_fn)
328 }
329}
330
331pub fn construct_output_stream(
332 reduce: Box<Reduce>,
333 input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
334 mut transform_fn: impl FnMut(&mut Box<Reduce>, Event, &mut Emitter<Event>) + Send + Sync + 'static,
335) -> Pin<Box<dyn Stream<Item = Event> + Send>>
336where
337 Reduce: 'static,
338{
339 let flush_period = reduce.flush_period;
340 Box::pin(map_with_expiration(
341 reduce,
342 input_rx,
343 flush_period,
344 move |me, event, emitter| {
345 transform_fn(me, event, emitter);
346 },
347 |me, emitter| {
348 me.flush_into(emitter);
349 },
350 |me, emitter| {
351 me.flush_all_into(emitter);
352 },
353 ))
354}
355
356#[cfg(test)]
357mod test {
358 use std::sync::Arc;
359
360 use indoc::indoc;
361 use serde_json::json;
362 use tokio::sync::mpsc;
363 use tokio_stream::wrappers::ReceiverStream;
364 use vector_lib::{enrichment::TableRegistry, lookup::owned_value_path};
365 use vrl::value::Kind;
366
367 use super::*;
368 use crate::{
369 config::{OutputId, TransformConfig, schema, schema::Definition},
370 event::{LogEvent, Value},
371 test_util::components::assert_transform_compliance,
372 transforms::test::create_topology,
373 };
374
375 #[tokio::test]
376 async fn reduce_from_condition() {
377 let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
378 group_by:
379 - request_id
380 ends_when:
381 type: vrl
382 source: exists(.test_end)
383 "})
384 .unwrap();
385
386 assert_transform_compliance(async move {
387 let input_definition = schema::Definition::default_legacy_namespace()
388 .with_event_field(&owned_value_path!("counter"), Kind::integer(), None)
389 .with_event_field(&owned_value_path!("request_id"), Kind::bytes(), None)
390 .with_event_field(
391 &owned_value_path!("test_end"),
392 Kind::bytes().or_undefined(),
393 None,
394 )
395 .with_event_field(
396 &owned_value_path!("extra_field"),
397 Kind::bytes().or_undefined(),
398 None,
399 );
400 let schema_definitions = reduce_config
401 .outputs(&Default::default(), &[("test".into(), input_definition)])
402 .first()
403 .unwrap()
404 .schema_definitions(true)
405 .clone();
406
407 let new_schema_definition = reduce_config.outputs(
408 &Default::default(),
409 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
410 )[0]
411 .clone()
412 .log_schema_definitions
413 .get(&OutputId::from("in"))
414 .unwrap()
415 .clone();
416
417 let (tx, rx) = mpsc::channel(1);
418 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
419
420 let mut e_1 = LogEvent::from("test message 1");
421 e_1.insert("counter", 1);
422 e_1.insert("request_id", "1");
423 let mut metadata_1 = e_1.metadata().clone();
424 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
425 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
426
427 let mut e_2 = LogEvent::from("test message 2");
428 e_2.insert("counter", 2);
429 e_2.insert("request_id", "2");
430 let mut metadata_2 = e_2.metadata().clone();
431 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
432 metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone()));
433
434 let mut e_3 = LogEvent::from("test message 3");
435 e_3.insert("counter", 3);
436 e_3.insert("request_id", "1");
437
438 let mut e_4 = LogEvent::from("test message 4");
439 e_4.insert("counter", 4);
440 e_4.insert("request_id", "1");
441 e_4.insert("test_end", "yep");
442
443 let mut e_5 = LogEvent::from("test message 5");
444 e_5.insert("counter", 5);
445 e_5.insert("request_id", "2");
446 e_5.insert("extra_field", "value1");
447 e_5.insert("test_end", "yep");
448
449 for event in [e_1.into(), e_2.into(), e_3.into(), e_4.into(), e_5.into()] {
450 tx.send(event).await.unwrap();
451 }
452
453 let output_1 = out.recv().await.unwrap().into_log();
454 assert_eq!(output_1["message"], "test message 1".into());
455 assert_eq!(output_1["counter"], Value::from(8));
456 assert_eq!(output_1.metadata(), &metadata_1);
457 schema_definitions
458 .values()
459 .for_each(|definition| definition.assert_valid_for_event(&output_1.clone().into()));
460
461 let output_2 = out.recv().await.unwrap().into_log();
462 assert_eq!(output_2["message"], "test message 2".into());
463 assert_eq!(output_2["extra_field"], "value1".into());
464 assert_eq!(output_2["counter"], Value::from(7));
465 assert_eq!(output_2.metadata(), &metadata_2);
466 schema_definitions
467 .values()
468 .for_each(|definition| definition.assert_valid_for_event(&output_2.clone().into()));
469
470 drop(tx);
471 topology.stop().await;
472 assert_eq!(out.recv().await, None);
473 })
474 .await;
475 }
476
477 #[tokio::test]
478 async fn reduce_merge_strategies() {
479 let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
480 group_by:
481 - request_id
482 merge_strategies:
483 foo: concat
484 bar: array
485 baz: max
486 ends_when:
487 type: vrl
488 source: exists(.test_end)
489 "})
490 .unwrap();
491
492 assert_transform_compliance(async move {
493 let (tx, rx) = mpsc::channel(1);
494
495 let new_schema_definition = reduce_config.outputs(
496 &Default::default(),
497 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
498 )[0]
499 .clone()
500 .log_schema_definitions
501 .get(&OutputId::from("in"))
502 .unwrap()
503 .clone();
504
505 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
506
507 let mut e_1 = LogEvent::from("test message 1");
508 e_1.insert("foo", "first foo");
509 e_1.insert("bar", "first bar");
510 e_1.insert("baz", 2);
511 e_1.insert("request_id", "1");
512 let mut metadata = e_1.metadata().clone();
513 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
514 metadata.set_schema_definition(&Arc::new(new_schema_definition.clone()));
515 tx.send(e_1.into()).await.unwrap();
516
517 let mut e_2 = LogEvent::from("test message 2");
518 e_2.insert("foo", "second foo");
519 e_2.insert("bar", 2);
520 e_2.insert("baz", "not number");
521 e_2.insert("request_id", "1");
522 tx.send(e_2.into()).await.unwrap();
523
524 let mut e_3 = LogEvent::from("test message 3");
525 e_3.insert("foo", 10);
526 e_3.insert("bar", "third bar");
527 e_3.insert("baz", 3);
528 e_3.insert("request_id", "1");
529 e_3.insert("test_end", "yep");
530 tx.send(e_3.into()).await.unwrap();
531
532 let output_1 = out.recv().await.unwrap().into_log();
533 assert_eq!(output_1["message"], "test message 1".into());
534 assert_eq!(output_1["foo"], "first foo second foo".into());
535 assert_eq!(
536 output_1["bar"],
537 Value::Array(vec!["first bar".into(), 2.into(), "third bar".into()]),
538 );
539 assert_eq!(output_1["baz"], 3.into());
540 assert_eq!(output_1.metadata(), &metadata);
541
542 drop(tx);
543 topology.stop().await;
544 assert_eq!(out.recv().await, None);
545 })
546 .await;
547 }
548
549 #[tokio::test]
550 async fn missing_group_by() {
551 let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
552 group_by:
553 - request_id
554 ends_when:
555 type: vrl
556 source: exists(.test_end)
557 "})
558 .unwrap();
559
560 assert_transform_compliance(async move {
561 let (tx, rx) = mpsc::channel(1);
562 let new_schema_definition = reduce_config.outputs(
563 &Default::default(),
564 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
565 )[0]
566 .clone()
567 .log_schema_definitions
568 .get(&OutputId::from("in"))
569 .unwrap()
570 .clone();
571
572 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
573
574 let mut e_1 = LogEvent::from("test message 1");
575 e_1.insert("counter", 1);
576 e_1.insert("request_id", "1");
577 let mut metadata_1 = e_1.metadata().clone();
578 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
579 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
580 tx.send(e_1.into()).await.unwrap();
581
582 let mut e_2 = LogEvent::from("test message 2");
583 e_2.insert("counter", 2);
584 let mut metadata_2 = e_2.metadata().clone();
585 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
586 metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
587 tx.send(e_2.into()).await.unwrap();
588
589 let mut e_3 = LogEvent::from("test message 3");
590 e_3.insert("counter", 3);
591 e_3.insert("request_id", "1");
592 tx.send(e_3.into()).await.unwrap();
593
594 let mut e_4 = LogEvent::from("test message 4");
595 e_4.insert("counter", 4);
596 e_4.insert("request_id", "1");
597 e_4.insert("test_end", "yep");
598 tx.send(e_4.into()).await.unwrap();
599
600 let mut e_5 = LogEvent::from("test message 5");
601 e_5.insert("counter", 5);
602 e_5.insert("extra_field", "value1");
603 e_5.insert("test_end", "yep");
604 tx.send(e_5.into()).await.unwrap();
605
606 let output_1 = out.recv().await.unwrap().into_log();
607 assert_eq!(output_1["message"], "test message 1".into());
608 assert_eq!(output_1["counter"], Value::from(8));
609 assert_eq!(output_1.metadata(), &metadata_1);
610
611 let output_2 = out.recv().await.unwrap().into_log();
612 assert_eq!(output_2["message"], "test message 2".into());
613 assert_eq!(output_2["extra_field"], "value1".into());
614 assert_eq!(output_2["counter"], Value::from(7));
615 assert_eq!(output_2.metadata(), &metadata_2);
616
617 drop(tx);
618 topology.stop().await;
619 assert_eq!(out.recv().await, None);
620 })
621 .await;
622 }
623
624 #[tokio::test]
625 async fn max_events_0() {
626 let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
627 group_by:
628 - id
629 merge_strategies:
630 id: retain
631 message: array
632 max_events: 0
633 "});
634
635 match reduce_config {
636 Ok(_conf) => unreachable!("max_events=0 should be rejected."),
637 Err(err) => assert!(
638 err.to_string()
639 .contains("invalid value: integer `0`, expected a nonzero usize")
640 ),
641 }
642 }
643
644 #[tokio::test]
645 async fn max_events_1() {
646 let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
647 group_by:
648 - id
649 merge_strategies:
650 id: retain
651 message: array
652 max_events: 1
653 "})
654 .unwrap();
655 assert_transform_compliance(async move {
656 let (tx, rx) = mpsc::channel(1);
657 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
658
659 let mut e_1 = LogEvent::from("test 1");
660 e_1.insert("id", "1");
661
662 let mut e_2 = LogEvent::from("test 2");
663 e_2.insert("id", "1");
664
665 let mut e_3 = LogEvent::from("test 3");
666 e_3.insert("id", "1");
667
668 for event in [e_1.into(), e_2.into(), e_3.into()] {
669 tx.send(event).await.unwrap();
670 }
671
672 let output_1 = out.recv().await.unwrap().into_log();
673 assert_eq!(output_1["message"], vec!["test 1"].into());
674 let output_2 = out.recv().await.unwrap().into_log();
675 assert_eq!(output_2["message"], vec!["test 2"].into());
676
677 let output_3 = out.recv().await.unwrap().into_log();
678 assert_eq!(output_3["message"], vec!["test 3"].into());
679
680 drop(tx);
681 topology.stop().await;
682 assert_eq!(out.recv().await, None);
683 })
684 .await;
685 }
686
687 #[tokio::test]
688 async fn max_events() {
689 let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
690 group_by:
691 - id
692 merge_strategies:
693 id: retain
694 message: array
695 max_events: 3
696 "})
697 .unwrap();
698
699 assert_transform_compliance(async move {
700 let (tx, rx) = mpsc::channel(1);
701 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
702
703 let mut e_1 = LogEvent::from("test 1");
704 e_1.insert("id", "1");
705
706 let mut e_2 = LogEvent::from("test 2");
707 e_2.insert("id", "1");
708
709 let mut e_3 = LogEvent::from("test 3");
710 e_3.insert("id", "1");
711
712 let mut e_4 = LogEvent::from("test 4");
713 e_4.insert("id", "1");
714
715 let mut e_5 = LogEvent::from("test 5");
716 e_5.insert("id", "1");
717
718 let mut e_6 = LogEvent::from("test 6");
719 e_6.insert("id", "1");
720
721 for event in [
722 e_1.into(),
723 e_2.into(),
724 e_3.into(),
725 e_4.into(),
726 e_5.into(),
727 e_6.into(),
728 ] {
729 tx.send(event).await.unwrap();
730 }
731
732 let output_1 = out.recv().await.unwrap().into_log();
733 assert_eq!(
734 output_1["message"],
735 vec!["test 1", "test 2", "test 3"].into()
736 );
737
738 let output_2 = out.recv().await.unwrap().into_log();
739 assert_eq!(
740 output_2["message"],
741 vec!["test 4", "test 5", "test 6"].into()
742 );
743
744 drop(tx);
745 topology.stop().await;
746 assert_eq!(out.recv().await, None);
747 })
748 .await
749 }
750
751 #[tokio::test]
752 async fn arrays() {
753 let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
754 group_by:
755 - request_id
756 merge_strategies:
757 foo: array
758 bar: concat
759 ends_when:
760 type: vrl
761 source: exists(.test_end)
762 "})
763 .unwrap();
764
765 assert_transform_compliance(async move {
766 let (tx, rx) = mpsc::channel(1);
767
768 let new_schema_definition = reduce_config.outputs(
769 &Default::default(),
770 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
771 )[0]
772 .clone()
773 .log_schema_definitions
774 .get(&OutputId::from("in"))
775 .unwrap()
776 .clone();
777
778 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
779
780 let mut e_1 = LogEvent::from("test message 1");
781 e_1.insert("foo", json!([1, 3]));
782 e_1.insert("bar", json!([1, 3]));
783 e_1.insert("request_id", "1");
784 let mut metadata_1 = e_1.metadata().clone();
785 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
786 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
787
788 tx.send(e_1.into()).await.unwrap();
789
790 let mut e_2 = LogEvent::from("test message 2");
791 e_2.insert("foo", json!([2, 4]));
792 e_2.insert("bar", json!([2, 4]));
793 e_2.insert("request_id", "2");
794 let mut metadata_2 = e_2.metadata().clone();
795 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
796 metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
797 tx.send(e_2.into()).await.unwrap();
798
799 let mut e_3 = LogEvent::from("test message 3");
800 e_3.insert("foo", json!([5, 7]));
801 e_3.insert("bar", json!([5, 7]));
802 e_3.insert("request_id", "1");
803 tx.send(e_3.into()).await.unwrap();
804
805 let mut e_4 = LogEvent::from("test message 4");
806 e_4.insert("foo", json!("done"));
807 e_4.insert("bar", json!("done"));
808 e_4.insert("request_id", "1");
809 e_4.insert("test_end", "yep");
810 tx.send(e_4.into()).await.unwrap();
811
812 let mut e_5 = LogEvent::from("test message 5");
813 e_5.insert("foo", json!([6, 8]));
814 e_5.insert("bar", json!([6, 8]));
815 e_5.insert("request_id", "2");
816 tx.send(e_5.into()).await.unwrap();
817
818 let mut e_6 = LogEvent::from("test message 6");
819 e_6.insert("foo", json!("done"));
820 e_6.insert("bar", json!("done"));
821 e_6.insert("request_id", "2");
822 e_6.insert("test_end", "yep");
823 tx.send(e_6.into()).await.unwrap();
824
825 let output_1 = out.recv().await.unwrap().into_log();
826 assert_eq!(output_1["foo"], json!([[1, 3], [5, 7], "done"]).into());
827 assert_eq!(output_1["bar"], json!([1, 3, 5, 7, "done"]).into());
828 assert_eq!(output_1.metadata(), &metadata_1);
829
830 let output_2 = out.recv().await.unwrap().into_log();
831 assert_eq!(output_2["foo"], json!([[2, 4], [6, 8], "done"]).into());
832 assert_eq!(output_2["bar"], json!([2, 4, 6, 8, "done"]).into());
833 assert_eq!(output_2.metadata(), &metadata_2);
834
835 drop(tx);
836 topology.stop().await;
837 assert_eq!(out.recv().await, None);
838 })
839 .await;
840 }
841
842 #[tokio::test]
843 async fn strategy_path_with_nested_fields() {
844 let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
845 group_by:
846 - id
847 merge_strategies:
848 id: discard
849 message.a.b: array
850 ends_when:
851 type: vrl
852 source: exists(.test_end)
853 "})
854 .unwrap();
855
856 assert_transform_compliance(async move {
857 let (tx, rx) = mpsc::channel(1);
858
859 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
860
861 let e_1 = LogEvent::from(Value::from(btreemap! {
862 "id" => 777,
863 "message" => btreemap! {
864 "a" => btreemap! {
865 "b" => vec![1,2],
866 "num" => 1,
867 },
868 },
869 "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }]
870 }));
871 let mut metadata_1 = e_1.metadata().clone();
872 metadata_1.set_upstream_id(Arc::new(OutputId::from("reduce")));
873
874 tx.send(e_1.into()).await.unwrap();
875
876 let e_2 = LogEvent::from(Value::from(btreemap! {
877 "id" => 777,
878 "message" => btreemap! {
879 "a" => btreemap! {
880 "b" => vec![3,4],
881 "num" => 2,
882 },
883 },
884 "arr" => vec![btreemap! { "a" => 2 }, btreemap! { "b" => 2 }],
885 "test_end" => "done",
886 }));
887 tx.send(e_2.into()).await.unwrap();
888
889 let mut output = out.recv().await.unwrap().into_log();
890
891 output.remove_timestamp();
893 output.remove("timestamp_end");
894
895 assert_eq!(
896 *output.value(),
897 btreemap! {
898 "id" => 777,
899 "message" => btreemap! {
900 "a" => btreemap! {
901 "b" => vec![vec![1, 2], vec![3,4]],
902 "num" => 3,
903 },
904 },
905 "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }],
906 "test_end" => "done",
907 }
908 .into()
909 );
910
911 drop(tx);
912 topology.stop().await;
913 assert_eq!(out.recv().await, None);
914 })
915 .await;
916 }
917
918 #[test]
919 fn invalid_merge_strategies_containing_indexes() {
920 let config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
921 group_by:
922 - id
923 merge_strategies:
924 id: discard
925 'nested.msg[0]': array
926 "})
927 .unwrap();
928 let error = Reduce::new(
929 &config,
930 &TableRegistry::default(),
931 &MetricsStorage::default(),
932 )
933 .unwrap_err();
934 assert_eq!(
935 error.to_string(),
936 "Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`"
937 );
938 }
939
940 #[tokio::test]
941 async fn merge_objects_in_array() {
942 let config = serde_yaml::from_str::<ReduceConfig>(indoc! {r#"
943 group_by:
944 - id
945 merge_strategies:
946 events: array
947 '"a-b"': retain
948 another: discard
949 ends_when:
950 type: vrl
951 source: exists(.test_end)
952 "#})
953 .unwrap();
954
955 assert_transform_compliance(async move {
956 let (tx, rx) = mpsc::channel(1);
957
958 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
959
960 let v_1 = Value::from(btreemap! {
961 "attrs" => btreemap! {
962 "nested.msg" => "foo",
963 },
964 "sev" => 2,
965 });
966 let mut e_1 = LogEvent::from(Value::from(
967 btreemap! {"id" => 777, "another" => btreemap!{ "a" => 1}},
968 ));
969 e_1.insert("events", v_1.clone());
970 e_1.insert("\"a-b\"", 2);
971 tx.send(e_1.into()).await.unwrap();
972
973 let v_2 = Value::from(btreemap! {
974 "attrs" => btreemap! {
975 "nested.msg" => "bar",
976 },
977 "sev" => 3,
978 });
979 let mut e_2 = LogEvent::from(Value::from(
980 btreemap! {"id" => 777, "test_end" => "done", "another" => btreemap!{ "b" => 2}},
981 ));
982 e_2.insert("events", v_2.clone());
983 e_2.insert("\"a-b\"", 2);
984 tx.send(e_2.into()).await.unwrap();
985
986 let output = out.recv().await.unwrap().into_log();
987 let expected_value = Value::from(btreemap! {
988 "id" => 1554,
989 "events" => vec![v_1, v_2],
990 "another" => btreemap!{ "a" => 1},
991 "a-b" => 2,
992 "test_end" => "done"
993 });
994 assert_eq!(*output.value(), expected_value);
995
996 drop(tx);
997 topology.stop().await;
998 assert_eq!(out.recv().await, None);
999 })
1000 .await
1001 }
1002
1003 #[tokio::test]
1004 async fn merged_quoted_path() {
1005 let config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
1006 ends_when:
1007 type: vrl
1008 source: exists(.test_end)
1009 "})
1010 .unwrap();
1011
1012 assert_transform_compliance(async move {
1013 let (tx, rx) = mpsc::channel(1);
1014
1015 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1016
1017 let e_1 = LogEvent::from(Value::from(btreemap! {"a b" => 1}));
1018 tx.send(e_1.into()).await.unwrap();
1019
1020 let e_2 = LogEvent::from(Value::from(btreemap! {"a b" => 2, "test_end" => "done"}));
1021 tx.send(e_2.into()).await.unwrap();
1022
1023 let output = out.recv().await.unwrap().into_log();
1024 let expected_value = Value::from(btreemap! {
1025 "a b" => 3,
1026 "test_end" => "done"
1027 });
1028 assert_eq!(*output.value(), expected_value);
1029
1030 drop(tx);
1031 topology.stop().await;
1032 assert_eq!(out.recv().await, None);
1033 })
1034 .await
1035 }
1036}