Skip to main content

vector/transforms/reduce/
transform.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    pin::Pin,
4    time::{Duration, Instant},
5};
6
7use futures::Stream;
8use indexmap::IndexMap;
9use vector_lib::stream::expiration_map::{Emitter, map_with_expiration};
10use vector_vrl_metrics::MetricsStorage;
11use vrl::{
12    path::{OwnedTargetPath, parse_target_path},
13    prelude::KeyString,
14};
15
16use crate::{
17    conditions::Condition,
18    event::{Event, EventMetadata, LogEvent, discriminant::Discriminant},
19    internal_events::{ReduceAddEventError, ReduceStaleEventFlushed},
20    transforms::{
21        TaskTransform,
22        reduce::{
23            config::ReduceConfig,
24            merge_strategy::{MergeStrategy, ReduceValueMerger, get_value_merger},
25        },
26    },
27};
28
29#[derive(Clone, Debug)]
30struct ReduceState {
31    events: usize,
32    fields: HashMap<OwnedTargetPath, Box<dyn ReduceValueMerger>>,
33    stale_since: Instant,
34    creation: Instant,
35    metadata: EventMetadata,
36}
37
38fn is_covered_by_strategy(
39    path: &OwnedTargetPath,
40    strategies: &IndexMap<OwnedTargetPath, MergeStrategy>,
41) -> bool {
42    let mut current = OwnedTargetPath::event_root();
43    for component in &path.path.segments {
44        current = current.with_field_appended(&component.to_string());
45        if strategies.contains_key(&current) {
46            return true;
47        }
48    }
49    false
50}
51
52impl ReduceState {
53    fn new() -> Self {
54        Self {
55            events: 0,
56            stale_since: Instant::now(),
57            creation: Instant::now(),
58            fields: HashMap::new(),
59            metadata: EventMetadata::default(),
60        }
61    }
62
63    fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<OwnedTargetPath, MergeStrategy>) {
64        self.metadata.merge(e.metadata().clone());
65
66        for (path, strategy) in strategies {
67            if let Some(value) = e.get(path) {
68                match self.fields.entry(path.clone()) {
69                    Entry::Vacant(entry) => match get_value_merger(value.clone(), strategy) {
70                        Ok(m) => {
71                            entry.insert(m);
72                        }
73                        Err(error) => {
74                            warn!(message = "Failed to create value merger.", %error, %path);
75                        }
76                    },
77                    Entry::Occupied(mut entry) => {
78                        if let Err(error) = entry.get_mut().add(value.clone()) {
79                            warn!(message = "Failed to merge value.", %error);
80                        }
81                    }
82                }
83            }
84        }
85
86        if let Some(fields_iter) = e.all_event_fields_skip_array_elements() {
87            for (path, value) in fields_iter {
88                // This should not return an error, unless there is a bug in the event fields iterator.
89                let parsed_path = match parse_target_path(&path) {
90                    Ok(path) => path,
91                    Err(error) => {
92                        emit!(ReduceAddEventError { error, path });
93                        continue;
94                    }
95                };
96                if is_covered_by_strategy(&parsed_path, strategies) {
97                    continue;
98                }
99
100                let maybe_strategy = strategies.get(&parsed_path);
101                match self.fields.entry(parsed_path) {
102                    Entry::Vacant(entry) => {
103                        if let Some(strategy) = maybe_strategy {
104                            match get_value_merger(value.clone(), strategy) {
105                                Ok(m) => {
106                                    entry.insert(m);
107                                }
108                                Err(error) => {
109                                    warn!(message = "Failed to merge value.", %error);
110                                }
111                            }
112                        } else {
113                            entry.insert(value.clone().into());
114                        }
115                    }
116                    Entry::Occupied(mut entry) => {
117                        if let Err(error) = entry.get_mut().add(value.clone()) {
118                            warn!(message = "Failed to merge value.", %error);
119                        }
120                    }
121                }
122            }
123        }
124        // else the event root is not an object (see https://github.com/vectordotdev/vector/issues/18219)
125
126        self.events += 1;
127        self.stale_since = Instant::now();
128    }
129
130    fn flush(mut self) -> LogEvent {
131        let mut event = LogEvent::new_with_metadata(self.metadata);
132        for (path, v) in self.fields.drain() {
133            if let Err(error) = v.insert_into(&path, &mut event) {
134                warn!(message = "Failed to merge values for field.", %error);
135            }
136        }
137        self.events = 0;
138        event
139    }
140}
141
142#[derive(Clone, Debug)]
143pub struct Reduce {
144    expire_after: Duration,
145    flush_period: Duration,
146    end_every_period: Option<Duration>,
147    group_by: Vec<String>,
148    merge_strategies: IndexMap<OwnedTargetPath, MergeStrategy>,
149    reduce_merge_states: HashMap<Discriminant, ReduceState>,
150    ends_when: Option<Condition>,
151    starts_when: Option<Condition>,
152    max_events: Option<usize>,
153}
154
155fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
156    for (path, _) in &strategies {
157        let contains_index = parse_target_path(path)
158            .map_err(|_| format!("Could not parse path: `{path}`"))?
159            .path
160            .segments
161            .iter()
162            .any(|segment| segment.is_index());
163        if contains_index {
164            return Err(format!(
165                "Merge strategies with indexes are currently not supported. Path: `{path}`"
166            )
167            .into());
168        }
169    }
170
171    Ok(())
172}
173
174impl Reduce {
175    pub fn new(
176        config: &ReduceConfig,
177        enrichment_tables: &vector_lib::enrichment::TableRegistry,
178        metrics_storage: &MetricsStorage,
179    ) -> crate::Result<Self> {
180        if config.ends_when.is_some() && config.starts_when.is_some() {
181            return Err("only one of `ends_when` and `starts_when` can be provided".into());
182        }
183
184        let ends_when = config
185            .ends_when
186            .as_ref()
187            .map(|c| c.build(enrichment_tables, metrics_storage))
188            .transpose()?;
189        let starts_when = config
190            .starts_when
191            .as_ref()
192            .map(|c| c.build(enrichment_tables, metrics_storage))
193            .transpose()?;
194        let group_by = config.group_by.clone().into_iter().collect();
195        let max_events = config.max_events.map(|max| max.into());
196
197        validate_merge_strategies(config.merge_strategies.clone())?;
198
199        Ok(Reduce {
200            expire_after: config.expire_after_ms,
201            flush_period: config.flush_period_ms,
202            end_every_period: config.end_every_period_ms,
203            group_by,
204            merge_strategies: config
205                .merge_strategies
206                .iter()
207                .filter_map(|(path, strategy)| {
208                    // TODO Invalid paths are ignored to preserve backwards compatibility.
209                    //      Merge strategy paths should ideally be [`lookup_v2::ConfigTargetPath`]
210                    //      which means an invalid path would result in an configuration error.
211                    let parsed_path = parse_target_path(path).ok();
212                    if parsed_path.is_none() {
213                        warn!(message = "Ignoring strategy with invalid path.", %path);
214                    }
215                    parsed_path.map(|path| (path, strategy.clone()))
216                })
217                .collect(),
218            reduce_merge_states: HashMap::new(),
219            ends_when,
220            starts_when,
221            max_events,
222        })
223    }
224
225    fn flush_into(&mut self, emitter: &mut Emitter<Event>) {
226        let mut flush_discriminants = Vec::new();
227        let now = Instant::now();
228        for (k, t) in &self.reduce_merge_states {
229            if let Some(period) = self.end_every_period
230                && (now - t.creation) >= period
231            {
232                flush_discriminants.push(k.clone());
233            }
234
235            if (now - t.stale_since) >= self.expire_after {
236                flush_discriminants.push(k.clone());
237            }
238        }
239        for k in &flush_discriminants {
240            if let Some(t) = self.reduce_merge_states.remove(k) {
241                emit!(ReduceStaleEventFlushed);
242                emitter.emit(Event::from(t.flush()));
243            }
244        }
245    }
246
247    fn flush_all_into(&mut self, emitter: &mut Emitter<Event>) {
248        self.reduce_merge_states
249            .drain()
250            .for_each(|(_, s)| emitter.emit(Event::from(s.flush())));
251    }
252
253    fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) {
254        match self.reduce_merge_states.entry(discriminant) {
255            Entry::Vacant(entry) => {
256                let mut state = ReduceState::new();
257                state.add_event(event, &self.merge_strategies);
258                entry.insert(state);
259            }
260            Entry::Occupied(mut entry) => {
261                entry.get_mut().add_event(event, &self.merge_strategies);
262            }
263        };
264    }
265
266    pub fn transform_one(&mut self, emitter: &mut Emitter<Event>, event: Event) {
267        let (starts_here, event) = match &self.starts_when {
268            Some(condition) => condition.check(event),
269            None => (false, event),
270        };
271
272        let (mut ends_here, event) = match &self.ends_when {
273            Some(condition) => condition.check(event),
274            None => (false, event),
275        };
276
277        let event = event.into_log();
278        let discriminant = Discriminant::from_log_event(&event, &self.group_by);
279
280        if let Some(max_events) = self.max_events {
281            if max_events == 1 {
282                ends_here = true;
283            } else if let Some(entry) = self.reduce_merge_states.get(&discriminant) {
284                // The current event will finish this set
285                if entry.events + 1 == max_events {
286                    ends_here = true;
287                }
288            }
289        }
290
291        if starts_here {
292            if let Some(state) = self.reduce_merge_states.remove(&discriminant) {
293                emitter.emit(state.flush().into());
294            }
295
296            self.push_or_new_reduce_state(event, discriminant)
297        } else if ends_here {
298            emitter.emit(match self.reduce_merge_states.remove(&discriminant) {
299                Some(mut state) => {
300                    state.add_event(event, &self.merge_strategies);
301                    state.flush().into()
302                }
303                None => {
304                    let mut state = ReduceState::new();
305                    state.add_event(event, &self.merge_strategies);
306                    state.flush().into()
307                }
308            });
309        } else {
310            self.push_or_new_reduce_state(event, discriminant)
311        }
312    }
313}
314
315impl TaskTransform<Event> for Reduce {
316    fn transform(
317        self: Box<Self>,
318        input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
319    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
320    where
321        Self: 'static,
322    {
323        let transform_fn = move |me: &mut Box<Reduce>, event, emitter: &mut Emitter<Event>| {
324            me.transform_one(emitter, event);
325        };
326
327        construct_output_stream(self, input_rx, transform_fn)
328    }
329}
330
331pub fn construct_output_stream(
332    reduce: Box<Reduce>,
333    input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
334    mut transform_fn: impl FnMut(&mut Box<Reduce>, Event, &mut Emitter<Event>) + Send + Sync + 'static,
335) -> Pin<Box<dyn Stream<Item = Event> + Send>>
336where
337    Reduce: 'static,
338{
339    let flush_period = reduce.flush_period;
340    Box::pin(map_with_expiration(
341        reduce,
342        input_rx,
343        flush_period,
344        move |me, event, emitter| {
345            transform_fn(me, event, emitter);
346        },
347        |me, emitter| {
348            me.flush_into(emitter);
349        },
350        |me, emitter| {
351            me.flush_all_into(emitter);
352        },
353    ))
354}
355
356#[cfg(test)]
357mod test {
358    use std::sync::Arc;
359
360    use indoc::indoc;
361    use serde_json::json;
362    use tokio::sync::mpsc;
363    use tokio_stream::wrappers::ReceiverStream;
364    use vector_lib::{enrichment::TableRegistry, lookup::owned_value_path};
365    use vrl::value::Kind;
366
367    use super::*;
368    use crate::{
369        config::{OutputId, TransformConfig, schema, schema::Definition},
370        event::{LogEvent, Value},
371        test_util::components::assert_transform_compliance,
372        transforms::test::create_topology,
373    };
374
375    #[tokio::test]
376    async fn reduce_from_condition() {
377        let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
378            group_by:
379              - request_id
380            ends_when:
381              type: vrl
382              source: exists(.test_end)
383        "})
384        .unwrap();
385
386        assert_transform_compliance(async move {
387            let input_definition = schema::Definition::default_legacy_namespace()
388                .with_event_field(&owned_value_path!("counter"), Kind::integer(), None)
389                .with_event_field(&owned_value_path!("request_id"), Kind::bytes(), None)
390                .with_event_field(
391                    &owned_value_path!("test_end"),
392                    Kind::bytes().or_undefined(),
393                    None,
394                )
395                .with_event_field(
396                    &owned_value_path!("extra_field"),
397                    Kind::bytes().or_undefined(),
398                    None,
399                );
400            let schema_definitions = reduce_config
401                .outputs(&Default::default(), &[("test".into(), input_definition)])
402                .first()
403                .unwrap()
404                .schema_definitions(true)
405                .clone();
406
407            let new_schema_definition = reduce_config.outputs(
408                &Default::default(),
409                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
410            )[0]
411            .clone()
412            .log_schema_definitions
413            .get(&OutputId::from("in"))
414            .unwrap()
415            .clone();
416
417            let (tx, rx) = mpsc::channel(1);
418            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
419
420            let mut e_1 = LogEvent::from("test message 1");
421            e_1.insert("counter", 1);
422            e_1.insert("request_id", "1");
423            let mut metadata_1 = e_1.metadata().clone();
424            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
425            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
426
427            let mut e_2 = LogEvent::from("test message 2");
428            e_2.insert("counter", 2);
429            e_2.insert("request_id", "2");
430            let mut metadata_2 = e_2.metadata().clone();
431            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
432            metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone()));
433
434            let mut e_3 = LogEvent::from("test message 3");
435            e_3.insert("counter", 3);
436            e_3.insert("request_id", "1");
437
438            let mut e_4 = LogEvent::from("test message 4");
439            e_4.insert("counter", 4);
440            e_4.insert("request_id", "1");
441            e_4.insert("test_end", "yep");
442
443            let mut e_5 = LogEvent::from("test message 5");
444            e_5.insert("counter", 5);
445            e_5.insert("request_id", "2");
446            e_5.insert("extra_field", "value1");
447            e_5.insert("test_end", "yep");
448
449            for event in [e_1.into(), e_2.into(), e_3.into(), e_4.into(), e_5.into()] {
450                tx.send(event).await.unwrap();
451            }
452
453            let output_1 = out.recv().await.unwrap().into_log();
454            assert_eq!(output_1["message"], "test message 1".into());
455            assert_eq!(output_1["counter"], Value::from(8));
456            assert_eq!(output_1.metadata(), &metadata_1);
457            schema_definitions
458                .values()
459                .for_each(|definition| definition.assert_valid_for_event(&output_1.clone().into()));
460
461            let output_2 = out.recv().await.unwrap().into_log();
462            assert_eq!(output_2["message"], "test message 2".into());
463            assert_eq!(output_2["extra_field"], "value1".into());
464            assert_eq!(output_2["counter"], Value::from(7));
465            assert_eq!(output_2.metadata(), &metadata_2);
466            schema_definitions
467                .values()
468                .for_each(|definition| definition.assert_valid_for_event(&output_2.clone().into()));
469
470            drop(tx);
471            topology.stop().await;
472            assert_eq!(out.recv().await, None);
473        })
474        .await;
475    }
476
477    #[tokio::test]
478    async fn reduce_merge_strategies() {
479        let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
480            group_by:
481              - request_id
482            merge_strategies:
483              foo: concat
484              bar: array
485              baz: max
486            ends_when:
487              type: vrl
488              source: exists(.test_end)
489        "})
490        .unwrap();
491
492        assert_transform_compliance(async move {
493            let (tx, rx) = mpsc::channel(1);
494
495            let new_schema_definition = reduce_config.outputs(
496                &Default::default(),
497                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
498            )[0]
499            .clone()
500            .log_schema_definitions
501            .get(&OutputId::from("in"))
502            .unwrap()
503            .clone();
504
505            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
506
507            let mut e_1 = LogEvent::from("test message 1");
508            e_1.insert("foo", "first foo");
509            e_1.insert("bar", "first bar");
510            e_1.insert("baz", 2);
511            e_1.insert("request_id", "1");
512            let mut metadata = e_1.metadata().clone();
513            metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
514            metadata.set_schema_definition(&Arc::new(new_schema_definition.clone()));
515            tx.send(e_1.into()).await.unwrap();
516
517            let mut e_2 = LogEvent::from("test message 2");
518            e_2.insert("foo", "second foo");
519            e_2.insert("bar", 2);
520            e_2.insert("baz", "not number");
521            e_2.insert("request_id", "1");
522            tx.send(e_2.into()).await.unwrap();
523
524            let mut e_3 = LogEvent::from("test message 3");
525            e_3.insert("foo", 10);
526            e_3.insert("bar", "third bar");
527            e_3.insert("baz", 3);
528            e_3.insert("request_id", "1");
529            e_3.insert("test_end", "yep");
530            tx.send(e_3.into()).await.unwrap();
531
532            let output_1 = out.recv().await.unwrap().into_log();
533            assert_eq!(output_1["message"], "test message 1".into());
534            assert_eq!(output_1["foo"], "first foo second foo".into());
535            assert_eq!(
536                output_1["bar"],
537                Value::Array(vec!["first bar".into(), 2.into(), "third bar".into()]),
538            );
539            assert_eq!(output_1["baz"], 3.into());
540            assert_eq!(output_1.metadata(), &metadata);
541
542            drop(tx);
543            topology.stop().await;
544            assert_eq!(out.recv().await, None);
545        })
546        .await;
547    }
548
549    #[tokio::test]
550    async fn missing_group_by() {
551        let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
552            group_by:
553              - request_id
554            ends_when:
555              type: vrl
556              source: exists(.test_end)
557        "})
558        .unwrap();
559
560        assert_transform_compliance(async move {
561            let (tx, rx) = mpsc::channel(1);
562            let new_schema_definition = reduce_config.outputs(
563                &Default::default(),
564                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
565            )[0]
566            .clone()
567            .log_schema_definitions
568            .get(&OutputId::from("in"))
569            .unwrap()
570            .clone();
571
572            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
573
574            let mut e_1 = LogEvent::from("test message 1");
575            e_1.insert("counter", 1);
576            e_1.insert("request_id", "1");
577            let mut metadata_1 = e_1.metadata().clone();
578            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
579            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
580            tx.send(e_1.into()).await.unwrap();
581
582            let mut e_2 = LogEvent::from("test message 2");
583            e_2.insert("counter", 2);
584            let mut metadata_2 = e_2.metadata().clone();
585            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
586            metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
587            tx.send(e_2.into()).await.unwrap();
588
589            let mut e_3 = LogEvent::from("test message 3");
590            e_3.insert("counter", 3);
591            e_3.insert("request_id", "1");
592            tx.send(e_3.into()).await.unwrap();
593
594            let mut e_4 = LogEvent::from("test message 4");
595            e_4.insert("counter", 4);
596            e_4.insert("request_id", "1");
597            e_4.insert("test_end", "yep");
598            tx.send(e_4.into()).await.unwrap();
599
600            let mut e_5 = LogEvent::from("test message 5");
601            e_5.insert("counter", 5);
602            e_5.insert("extra_field", "value1");
603            e_5.insert("test_end", "yep");
604            tx.send(e_5.into()).await.unwrap();
605
606            let output_1 = out.recv().await.unwrap().into_log();
607            assert_eq!(output_1["message"], "test message 1".into());
608            assert_eq!(output_1["counter"], Value::from(8));
609            assert_eq!(output_1.metadata(), &metadata_1);
610
611            let output_2 = out.recv().await.unwrap().into_log();
612            assert_eq!(output_2["message"], "test message 2".into());
613            assert_eq!(output_2["extra_field"], "value1".into());
614            assert_eq!(output_2["counter"], Value::from(7));
615            assert_eq!(output_2.metadata(), &metadata_2);
616
617            drop(tx);
618            topology.stop().await;
619            assert_eq!(out.recv().await, None);
620        })
621        .await;
622    }
623
624    #[tokio::test]
625    async fn max_events_0() {
626        let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
627            group_by:
628              - id
629            merge_strategies:
630              id: retain
631              message: array
632            max_events: 0
633        "});
634
635        match reduce_config {
636            Ok(_conf) => unreachable!("max_events=0 should be rejected."),
637            Err(err) => assert!(
638                err.to_string()
639                    .contains("invalid value: integer `0`, expected a nonzero usize")
640            ),
641        }
642    }
643
644    #[tokio::test]
645    async fn max_events_1() {
646        let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
647            group_by:
648              - id
649            merge_strategies:
650              id: retain
651              message: array
652            max_events: 1
653        "})
654        .unwrap();
655        assert_transform_compliance(async move {
656            let (tx, rx) = mpsc::channel(1);
657            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
658
659            let mut e_1 = LogEvent::from("test 1");
660            e_1.insert("id", "1");
661
662            let mut e_2 = LogEvent::from("test 2");
663            e_2.insert("id", "1");
664
665            let mut e_3 = LogEvent::from("test 3");
666            e_3.insert("id", "1");
667
668            for event in [e_1.into(), e_2.into(), e_3.into()] {
669                tx.send(event).await.unwrap();
670            }
671
672            let output_1 = out.recv().await.unwrap().into_log();
673            assert_eq!(output_1["message"], vec!["test 1"].into());
674            let output_2 = out.recv().await.unwrap().into_log();
675            assert_eq!(output_2["message"], vec!["test 2"].into());
676
677            let output_3 = out.recv().await.unwrap().into_log();
678            assert_eq!(output_3["message"], vec!["test 3"].into());
679
680            drop(tx);
681            topology.stop().await;
682            assert_eq!(out.recv().await, None);
683        })
684        .await;
685    }
686
687    #[tokio::test]
688    async fn max_events() {
689        let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
690            group_by:
691              - id
692            merge_strategies:
693              id: retain
694              message: array
695            max_events: 3
696        "})
697        .unwrap();
698
699        assert_transform_compliance(async move {
700            let (tx, rx) = mpsc::channel(1);
701            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
702
703            let mut e_1 = LogEvent::from("test 1");
704            e_1.insert("id", "1");
705
706            let mut e_2 = LogEvent::from("test 2");
707            e_2.insert("id", "1");
708
709            let mut e_3 = LogEvent::from("test 3");
710            e_3.insert("id", "1");
711
712            let mut e_4 = LogEvent::from("test 4");
713            e_4.insert("id", "1");
714
715            let mut e_5 = LogEvent::from("test 5");
716            e_5.insert("id", "1");
717
718            let mut e_6 = LogEvent::from("test 6");
719            e_6.insert("id", "1");
720
721            for event in [
722                e_1.into(),
723                e_2.into(),
724                e_3.into(),
725                e_4.into(),
726                e_5.into(),
727                e_6.into(),
728            ] {
729                tx.send(event).await.unwrap();
730            }
731
732            let output_1 = out.recv().await.unwrap().into_log();
733            assert_eq!(
734                output_1["message"],
735                vec!["test 1", "test 2", "test 3"].into()
736            );
737
738            let output_2 = out.recv().await.unwrap().into_log();
739            assert_eq!(
740                output_2["message"],
741                vec!["test 4", "test 5", "test 6"].into()
742            );
743
744            drop(tx);
745            topology.stop().await;
746            assert_eq!(out.recv().await, None);
747        })
748        .await
749    }
750
751    #[tokio::test]
752    async fn arrays() {
753        let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
754            group_by:
755              - request_id
756            merge_strategies:
757              foo: array
758              bar: concat
759            ends_when:
760              type: vrl
761              source: exists(.test_end)
762        "})
763        .unwrap();
764
765        assert_transform_compliance(async move {
766            let (tx, rx) = mpsc::channel(1);
767
768            let new_schema_definition = reduce_config.outputs(
769                &Default::default(),
770                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
771            )[0]
772            .clone()
773            .log_schema_definitions
774            .get(&OutputId::from("in"))
775            .unwrap()
776            .clone();
777
778            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
779
780            let mut e_1 = LogEvent::from("test message 1");
781            e_1.insert("foo", json!([1, 3]));
782            e_1.insert("bar", json!([1, 3]));
783            e_1.insert("request_id", "1");
784            let mut metadata_1 = e_1.metadata().clone();
785            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
786            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
787
788            tx.send(e_1.into()).await.unwrap();
789
790            let mut e_2 = LogEvent::from("test message 2");
791            e_2.insert("foo", json!([2, 4]));
792            e_2.insert("bar", json!([2, 4]));
793            e_2.insert("request_id", "2");
794            let mut metadata_2 = e_2.metadata().clone();
795            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
796            metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
797            tx.send(e_2.into()).await.unwrap();
798
799            let mut e_3 = LogEvent::from("test message 3");
800            e_3.insert("foo", json!([5, 7]));
801            e_3.insert("bar", json!([5, 7]));
802            e_3.insert("request_id", "1");
803            tx.send(e_3.into()).await.unwrap();
804
805            let mut e_4 = LogEvent::from("test message 4");
806            e_4.insert("foo", json!("done"));
807            e_4.insert("bar", json!("done"));
808            e_4.insert("request_id", "1");
809            e_4.insert("test_end", "yep");
810            tx.send(e_4.into()).await.unwrap();
811
812            let mut e_5 = LogEvent::from("test message 5");
813            e_5.insert("foo", json!([6, 8]));
814            e_5.insert("bar", json!([6, 8]));
815            e_5.insert("request_id", "2");
816            tx.send(e_5.into()).await.unwrap();
817
818            let mut e_6 = LogEvent::from("test message 6");
819            e_6.insert("foo", json!("done"));
820            e_6.insert("bar", json!("done"));
821            e_6.insert("request_id", "2");
822            e_6.insert("test_end", "yep");
823            tx.send(e_6.into()).await.unwrap();
824
825            let output_1 = out.recv().await.unwrap().into_log();
826            assert_eq!(output_1["foo"], json!([[1, 3], [5, 7], "done"]).into());
827            assert_eq!(output_1["bar"], json!([1, 3, 5, 7, "done"]).into());
828            assert_eq!(output_1.metadata(), &metadata_1);
829
830            let output_2 = out.recv().await.unwrap().into_log();
831            assert_eq!(output_2["foo"], json!([[2, 4], [6, 8], "done"]).into());
832            assert_eq!(output_2["bar"], json!([2, 4, 6, 8, "done"]).into());
833            assert_eq!(output_2.metadata(), &metadata_2);
834
835            drop(tx);
836            topology.stop().await;
837            assert_eq!(out.recv().await, None);
838        })
839        .await;
840    }
841
842    #[tokio::test]
843    async fn strategy_path_with_nested_fields() {
844        let reduce_config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
845            group_by:
846              - id
847            merge_strategies:
848              id: discard
849              message.a.b: array
850            ends_when:
851              type: vrl
852              source: exists(.test_end)
853        "})
854        .unwrap();
855
856        assert_transform_compliance(async move {
857            let (tx, rx) = mpsc::channel(1);
858
859            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
860
861            let e_1 = LogEvent::from(Value::from(btreemap! {
862                "id" => 777,
863                "message" => btreemap! {
864                    "a" => btreemap! {
865                        "b" => vec![1,2],
866                        "num" => 1,
867                    },
868                },
869                "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }]
870            }));
871            let mut metadata_1 = e_1.metadata().clone();
872            metadata_1.set_upstream_id(Arc::new(OutputId::from("reduce")));
873
874            tx.send(e_1.into()).await.unwrap();
875
876            let e_2 = LogEvent::from(Value::from(btreemap! {
877                "id" => 777,
878                "message" => btreemap! {
879                        "a" => btreemap! {
880                            "b" => vec![3,4],
881                            "num" => 2,
882                        },
883                },
884                 "arr" => vec![btreemap! { "a" => 2 }, btreemap! { "b" => 2 }],
885                "test_end" => "done",
886            }));
887            tx.send(e_2.into()).await.unwrap();
888
889            let mut output = out.recv().await.unwrap().into_log();
890
891            // Remove timestamp fields which were automatically added.
892            output.remove_timestamp();
893            output.remove("timestamp_end");
894
895            assert_eq!(
896                *output.value(),
897                btreemap! {
898                    "id" => 777,
899                    "message" => btreemap! {
900                        "a" => btreemap! {
901                            "b" => vec![vec![1, 2], vec![3,4]],
902                            "num" => 3,
903                        },
904                    },
905                    "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }],
906                    "test_end" => "done",
907                }
908                .into()
909            );
910
911            drop(tx);
912            topology.stop().await;
913            assert_eq!(out.recv().await, None);
914        })
915        .await;
916    }
917
918    #[test]
919    fn invalid_merge_strategies_containing_indexes() {
920        let config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
921            group_by:
922              - id
923            merge_strategies:
924              id: discard
925              'nested.msg[0]': array
926        "})
927        .unwrap();
928        let error = Reduce::new(
929            &config,
930            &TableRegistry::default(),
931            &MetricsStorage::default(),
932        )
933        .unwrap_err();
934        assert_eq!(
935            error.to_string(),
936            "Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`"
937        );
938    }
939
940    #[tokio::test]
941    async fn merge_objects_in_array() {
942        let config = serde_yaml::from_str::<ReduceConfig>(indoc! {r#"
943            group_by:
944              - id
945            merge_strategies:
946              events: array
947              '"a-b"': retain
948              another: discard
949            ends_when:
950              type: vrl
951              source: exists(.test_end)
952        "#})
953        .unwrap();
954
955        assert_transform_compliance(async move {
956            let (tx, rx) = mpsc::channel(1);
957
958            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
959
960            let v_1 = Value::from(btreemap! {
961                "attrs" => btreemap! {
962                    "nested.msg" => "foo",
963                },
964                "sev" => 2,
965            });
966            let mut e_1 = LogEvent::from(Value::from(
967                btreemap! {"id" => 777, "another" => btreemap!{ "a" => 1}},
968            ));
969            e_1.insert("events", v_1.clone());
970            e_1.insert("\"a-b\"", 2);
971            tx.send(e_1.into()).await.unwrap();
972
973            let v_2 = Value::from(btreemap! {
974                "attrs" => btreemap! {
975                    "nested.msg" => "bar",
976                },
977                "sev" => 3,
978            });
979            let mut e_2 = LogEvent::from(Value::from(
980                btreemap! {"id" => 777, "test_end" => "done", "another" => btreemap!{ "b" => 2}},
981            ));
982            e_2.insert("events", v_2.clone());
983            e_2.insert("\"a-b\"", 2);
984            tx.send(e_2.into()).await.unwrap();
985
986            let output = out.recv().await.unwrap().into_log();
987            let expected_value = Value::from(btreemap! {
988                "id" => 1554,
989                "events" => vec![v_1, v_2],
990                "another" => btreemap!{ "a" => 1},
991                "a-b" => 2,
992                "test_end" => "done"
993            });
994            assert_eq!(*output.value(), expected_value);
995
996            drop(tx);
997            topology.stop().await;
998            assert_eq!(out.recv().await, None);
999        })
1000        .await
1001    }
1002
1003    #[tokio::test]
1004    async fn merged_quoted_path() {
1005        let config = serde_yaml::from_str::<ReduceConfig>(indoc! {"
1006            ends_when:
1007              type: vrl
1008              source: exists(.test_end)
1009        "})
1010        .unwrap();
1011
1012        assert_transform_compliance(async move {
1013            let (tx, rx) = mpsc::channel(1);
1014
1015            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1016
1017            let e_1 = LogEvent::from(Value::from(btreemap! {"a b" => 1}));
1018            tx.send(e_1.into()).await.unwrap();
1019
1020            let e_2 = LogEvent::from(Value::from(btreemap! {"a b" => 2, "test_end" => "done"}));
1021            tx.send(e_2.into()).await.unwrap();
1022
1023            let output = out.recv().await.unwrap().into_log();
1024            let expected_value = Value::from(btreemap! {
1025                "a b" => 3,
1026                "test_end" => "done"
1027            });
1028            assert_eq!(*output.value(), expected_value);
1029
1030            drop(tx);
1031            topology.stop().await;
1032            assert_eq!(out.recv().await, None);
1033        })
1034        .await
1035    }
1036}