Skip to main content

vector/transforms/
aggregate.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    pin::Pin,
4    time::Duration,
5};
6
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use vector_lib::{
10    configurable::configurable_component,
11    event::{
12        MetricValue,
13        metric::{Metric, MetricData, MetricKind, MetricSeries},
14    },
15};
16
17use crate::{
18    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
19    event::{Event, EventMetadata},
20    internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
21    schema,
22    transforms::{TaskTransform, Transform},
23};
24
25/// Configuration for the `aggregate` transform.
26#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
27#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
28#[serde(deny_unknown_fields)]
29pub struct AggregateConfig {
30    /// The interval between flushes, in milliseconds.
31    ///
32    /// During this time frame, metrics (beta) with the same series data (name, namespace, tags, and so on) are aggregated.
33    #[serde(default = "default_interval_ms")]
34    #[configurable(metadata(docs::human_name = "Flush Interval"))]
35    pub interval_ms: u64,
36    /// Function to use for aggregation.
37    ///
38    /// Some of the functions may only function on incremental and some only on absolute metrics.
39    #[serde(default = "default_mode")]
40    #[configurable(derived)]
41    pub mode: AggregationMode,
42}
43
44#[configurable_component]
45#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
46#[configurable(description = "The aggregation mode to use.")]
47pub enum AggregationMode {
48    /// Default mode. Sums incremental metrics and uses the latest value for absolute metrics.
49    #[default]
50    Auto,
51
52    /// Sums incremental metrics; absolute metrics pass through unchanged.
53    Sum,
54
55    /// Returns the latest value for absolute metrics; incremental metrics pass through unchanged.
56    Latest,
57
58    /// Counts metrics for incremental and absolute metrics
59    Count,
60
61    /// Returns difference between latest value for absolute; incremental metrics pass through unchanged.
62    Diff,
63
64    /// Max value of absolute metric; incremental metrics pass through unchanged.
65    Max,
66
67    /// Min value of absolute metric; incremental metrics pass through unchanged.
68    Min,
69
70    /// Mean value of absolute metric; incremental metrics pass through unchanged.
71    Mean,
72
73    /// Stdev value of absolute metric; incremental metrics pass through unchanged.
74    Stdev,
75}
76
77#[derive(Clone, Debug, Default, PartialEq)]
78enum InnerMode {
79    /// Default mode. Sums incremental metrics and uses the latest value for absolute metrics.
80    #[default]
81    Auto,
82
83    /// Sums incremental metrics; absolute metrics pass through unchanged.
84    Sum,
85
86    /// Returns the latest value for absolute metrics; incremental metrics pass through unchanged.
87    Latest,
88
89    /// Counts metrics for incremental and absolute metrics
90    Count,
91
92    /// Returns difference between latest value for absolute; incremental metrics pass through unchanged.
93    Diff {
94        prev_map: HashMap<MetricSeries, MetricEntry>,
95    },
96
97    /// Max value of absolute metric; incremental metrics pass through unchanged.
98    Max,
99
100    /// Min value of absolute metric; incremental metrics pass through unchanged.
101    Min,
102
103    /// Mean value of absolute metric; incremental metrics pass through unchanged.
104    Mean {
105        multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
106    },
107
108    /// Stdev value of absolute metric; incremental metrics pass through unchanged.
109    Stdev {
110        multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
111    },
112}
113
114impl From<AggregationMode> for InnerMode {
115    fn from(value: AggregationMode) -> Self {
116        match value {
117            AggregationMode::Auto => InnerMode::Auto,
118            AggregationMode::Sum => InnerMode::Sum,
119            AggregationMode::Latest => InnerMode::Latest,
120            AggregationMode::Count => InnerMode::Count,
121            AggregationMode::Diff => InnerMode::Diff {
122                prev_map: HashMap::default(),
123            },
124            AggregationMode::Max => InnerMode::Max,
125            AggregationMode::Min => InnerMode::Min,
126            AggregationMode::Mean => InnerMode::Mean {
127                multi_map: HashMap::default(),
128            },
129            AggregationMode::Stdev => InnerMode::Stdev {
130                multi_map: HashMap::default(),
131            },
132        }
133    }
134}
135
136const fn default_mode() -> AggregationMode {
137    AggregationMode::Auto
138}
139
140const fn default_interval_ms() -> u64 {
141    10 * 1000
142}
143
144impl_generate_config_from_default!(AggregateConfig);
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "aggregate")]
148impl TransformConfig for AggregateConfig {
149    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
150        Aggregate::new(self).map(Transform::event_task)
151    }
152
153    fn input(&self) -> Input {
154        Input::metric()
155    }
156
157    fn outputs(
158        &self,
159        _: &TransformContext,
160        _: &[(OutputId, schema::Definition)],
161    ) -> Vec<TransformOutput> {
162        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
163    }
164}
165
166type MetricEntry = (MetricData, EventMetadata);
167
168#[derive(Debug)]
169pub struct Aggregate {
170    interval: Duration,
171    map: HashMap<MetricSeries, MetricEntry>,
172    mode: InnerMode,
173}
174
175impl Aggregate {
176    pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
177        Ok(Self {
178            interval: Duration::from_millis(config.interval_ms),
179            map: Default::default(),
180            mode: config.mode.into(),
181        })
182    }
183
184    pub fn record(&mut self, event: Event) -> Option<Event> {
185        let (series, data, metadata) = event.into_metric().into_parts();
186
187        match (&mut self.mode, data.kind) {
188            (InnerMode::Sum, MetricKind::Absolute)
189            | (InnerMode::Latest | InnerMode::Diff { .. }, MetricKind::Incremental)
190            | (InnerMode::Max | InnerMode::Min, MetricKind::Incremental)
191            | (InnerMode::Mean { .. } | InnerMode::Stdev { .. }, MetricKind::Incremental) => {
192                return Some(Event::Metric(Metric::from_parts(series, data, metadata)));
193            }
194            (InnerMode::Auto | InnerMode::Sum, MetricKind::Incremental) => {
195                self.record_sum(series, data, metadata);
196            }
197            (InnerMode::Auto, MetricKind::Absolute)
198            | (InnerMode::Latest | InnerMode::Diff { .. }, MetricKind::Absolute) => {
199                self.map.insert(series, (data, metadata));
200            }
201            (InnerMode::Count, _) => {
202                self.record_count(series, data, metadata);
203            }
204            (InnerMode::Max | InnerMode::Min, MetricKind::Absolute) => {
205                self.record_comparison(series, data, metadata);
206            }
207            (
208                InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map },
209                MetricKind::Absolute,
210            ) => {
211                if matches!(data.value, MetricValue::Gauge { value: _ }) {
212                    match multi_map.entry(series) {
213                        Entry::Occupied(mut entry) => entry.get_mut().push((data, metadata)),
214                        Entry::Vacant(entry) => {
215                            entry.insert(vec![(data, metadata)]);
216                        }
217                    }
218                }
219            }
220        }
221        emit!(AggregateEventRecorded);
222        None
223    }
224
225    fn record_count(
226        &mut self,
227        series: MetricSeries,
228        mut data: MetricData,
229        metadata: EventMetadata,
230    ) {
231        let mut count_data = data.clone();
232        let existing = self.map.entry(series).or_insert_with(|| {
233            *data.value_mut() = MetricValue::Counter { value: 0f64 };
234            (data.clone(), metadata.clone())
235        });
236        *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
237        if existing.0.kind == data.kind && existing.0.update(&count_data) {
238            existing.1.merge(metadata);
239        } else {
240            emit!(AggregateUpdateFailed);
241        }
242    }
243
244    fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
245        match self.map.entry(series) {
246            Entry::Occupied(mut entry) => {
247                let existing = entry.get_mut();
248                // In order to update (add) the new and old kind's must match
249                if existing.0.kind == data.kind && existing.0.update(&data) {
250                    existing.1.merge(metadata);
251                } else {
252                    emit!(AggregateUpdateFailed);
253                    *existing = (data, metadata);
254                }
255            }
256            Entry::Vacant(entry) => {
257                entry.insert((data, metadata));
258            }
259        }
260    }
261
262    fn record_comparison(
263        &mut self,
264        series: MetricSeries,
265        data: MetricData,
266        metadata: EventMetadata,
267    ) {
268        match self.map.entry(series) {
269            Entry::Occupied(mut entry) => {
270                let existing = entry.get_mut();
271                // In order to update (add) the new and old kind's must match
272                if existing.0.kind == data.kind {
273                    if let MetricValue::Gauge {
274                        value: existing_value,
275                    } = existing.0.value()
276                        && let MetricValue::Gauge { value: new_value } = data.value()
277                    {
278                        let should_update = match self.mode {
279                            InnerMode::Max => new_value > existing_value,
280                            InnerMode::Min => new_value < existing_value,
281                            _ => false,
282                        };
283                        if should_update {
284                            *existing = (data, metadata);
285                        }
286                    }
287                } else {
288                    emit!(AggregateUpdateFailed);
289                    *existing = (data, metadata);
290                }
291            }
292            Entry::Vacant(entry) => {
293                entry.insert((data, metadata));
294            }
295        }
296    }
297
298    pub fn flush_into(&mut self, output: &mut Vec<Event>) {
299        let map = std::mem::take(&mut self.map);
300        for (series, entry) in map.clone().into_iter() {
301            let mut metric = Metric::from_parts(series, entry.0, entry.1);
302            if let InnerMode::Diff { prev_map } = &self.mode
303                && let Some(prev_entry) = prev_map.get(metric.series())
304                && metric.data().kind == prev_entry.0.kind
305                && !metric.subtract(&prev_entry.0)
306            {
307                emit!(AggregateUpdateFailed);
308            }
309            output.push(Event::Metric(metric));
310        }
311
312        let multi_map = match &mut self.mode {
313            InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map } => {
314                std::mem::take(multi_map)
315            }
316            _ => HashMap::default(),
317        };
318
319        'outer: for (series, entries) in multi_map.into_iter() {
320            if entries.is_empty() {
321                continue;
322            }
323
324            let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
325            for (data, metadata) in entries.iter().skip(1) {
326                if !final_sum.update(data) {
327                    // Incompatible types, skip this metric
328                    emit!(AggregateUpdateFailed);
329                    continue 'outer;
330                }
331                final_metadata.merge(metadata.clone());
332            }
333
334            let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
335                // Entries are not empty so this is safe.
336                *value /= entries.len() as f64;
337                *value
338            } else {
339                0.0
340            };
341
342            let final_mean = final_sum.clone();
343            match self.mode {
344                InnerMode::Mean { .. } => {
345                    let metric = Metric::from_parts(series, final_mean, final_metadata);
346                    output.push(Event::Metric(metric));
347                }
348                InnerMode::Stdev { .. } => {
349                    let variance = entries
350                        .iter()
351                        .filter_map(|(data, _)| {
352                            if let MetricValue::Gauge { value } = data.value() {
353                                let diff = final_mean_value - value;
354                                Some(diff * diff)
355                            } else {
356                                None
357                            }
358                        })
359                        .sum::<f64>()
360                        / entries.len() as f64;
361                    let mut final_stdev = final_mean;
362                    if let MetricValue::Gauge { value } = final_stdev.value_mut() {
363                        *value = variance.sqrt()
364                    }
365                    let metric = Metric::from_parts(series, final_stdev, final_metadata);
366                    output.push(Event::Metric(metric));
367                }
368                _ => (),
369            }
370        }
371
372        if let InnerMode::Diff { prev_map } = &mut self.mode {
373            *prev_map = map;
374        }
375        emit!(AggregateFlushed);
376    }
377}
378
379impl TaskTransform<Event> for Aggregate {
380    fn transform(
381        mut self: Box<Self>,
382        mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
383    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
384    where
385        Self: 'static,
386    {
387        let mut flush_stream = tokio::time::interval(self.interval);
388
389        Box::pin(stream! {
390            let mut output = Vec::new();
391            let mut done = false;
392            while !done {
393                tokio::select! {
394                    _ = flush_stream.tick() => {
395                        self.flush_into(&mut output);
396                    },
397                    maybe_event = input_rx.next() => {
398                        match maybe_event {
399                            None => {
400                                self.flush_into(&mut output);
401                                done = true;
402                            }
403                            Some(event) => {
404                                if let Some(passthrough) = self.record(event) {
405                                    output.push(passthrough);
406                                }
407                            }
408                        }
409                    }
410                };
411                for event in output.drain(..) {
412                    yield event;
413                }
414            }
415        })
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use std::{collections::BTreeSet, sync::Arc, task::Poll};
422
423    use futures::stream;
424    use indoc::indoc;
425    use tokio::sync::mpsc;
426    use tokio_stream::wrappers::ReceiverStream;
427    use vector_lib::config::{ComponentKey, LogNamespace};
428    use vrl::value::Kind;
429
430    use super::*;
431    use crate::{
432        event::{
433            Event, Metric,
434            metric::{MetricKind, MetricValue},
435        },
436        schema::Definition,
437        test_util::components::assert_transform_compliance,
438        transforms::test::create_topology,
439    };
440
441    #[test]
442    fn generate_config() {
443        crate::test_util::test_generate_config::<AggregateConfig>();
444    }
445
446    fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
447        let mut event = Event::Metric(Metric::new(name, kind, value))
448            .with_source_id(Arc::new(ComponentKey::from("in")))
449            .with_upstream_id(Arc::new(OutputId::from("transform")));
450        event.metadata_mut().set_schema_definition(&Arc::new(
451            Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
452        ));
453
454        event.metadata_mut().set_source_type("unit_test_stream");
455
456        event
457    }
458
459    #[test]
460    fn incremental_auto() {
461        let mut agg = Aggregate::new(&AggregateConfig {
462            interval_ms: 1000_u64,
463            mode: AggregationMode::Auto,
464        })
465        .unwrap();
466
467        let counter_a_1 = make_metric(
468            "counter_a",
469            MetricKind::Incremental,
470            MetricValue::Counter { value: 42.0 },
471        );
472        let counter_a_2 = make_metric(
473            "counter_a",
474            MetricKind::Incremental,
475            MetricValue::Counter { value: 43.0 },
476        );
477        let counter_a_summed = make_metric(
478            "counter_a",
479            MetricKind::Incremental,
480            MetricValue::Counter { value: 85.0 },
481        );
482
483        // Single item, just stored regardless of kind
484        assert_eq!(agg.record(counter_a_1.clone()), None);
485        let mut out = vec![];
486        // We should flush 1 item counter_a_1
487        agg.flush_into(&mut out);
488        assert_eq!(1, out.len());
489        assert_eq!(&counter_a_1, &out[0]);
490
491        // A subsequent flush doesn't send out anything
492        out.clear();
493        agg.flush_into(&mut out);
494        assert_eq!(0, out.len());
495
496        // One more just to make sure that we don't re-see from the other buffer
497        out.clear();
498        agg.flush_into(&mut out);
499        assert_eq!(0, out.len());
500
501        // Two increments with the same series, should sum into 1
502        assert_eq!(agg.record(counter_a_1.clone()), None);
503        assert_eq!(agg.record(counter_a_2), None);
504        out.clear();
505        agg.flush_into(&mut out);
506        assert_eq!(1, out.len());
507        assert_eq!(&counter_a_summed, &out[0]);
508
509        let counter_b_1 = make_metric(
510            "counter_b",
511            MetricKind::Incremental,
512            MetricValue::Counter { value: 44.0 },
513        );
514        // Two increments with the different series, should get each back as-is
515        assert_eq!(agg.record(counter_a_1.clone()), None);
516        assert_eq!(agg.record(counter_b_1.clone()), None);
517        out.clear();
518        agg.flush_into(&mut out);
519        assert_eq!(2, out.len());
520        // B/c we don't know the order they'll come back
521        for event in out {
522            match event.as_metric().series().name.name.as_str() {
523                "counter_a" => assert_eq!(counter_a_1, event),
524                "counter_b" => assert_eq!(counter_b_1, event),
525                _ => panic!("Unexpected metric name in aggregate output"),
526            }
527        }
528    }
529
530    #[test]
531    fn absolute_auto() {
532        let mut agg = Aggregate::new(&AggregateConfig {
533            interval_ms: 1000_u64,
534            mode: AggregationMode::Auto,
535        })
536        .unwrap();
537
538        let gauge_a_1 = make_metric(
539            "gauge_a",
540            MetricKind::Absolute,
541            MetricValue::Gauge { value: 42.0 },
542        );
543        let gauge_a_2 = make_metric(
544            "gauge_a",
545            MetricKind::Absolute,
546            MetricValue::Gauge { value: 43.0 },
547        );
548
549        // Single item, just stored regardless of kind
550        assert_eq!(agg.record(gauge_a_1.clone()), None);
551        let mut out = vec![];
552        // We should flush 1 item gauge_a_1
553        agg.flush_into(&mut out);
554        assert_eq!(1, out.len());
555        assert_eq!(&gauge_a_1, &out[0]);
556
557        // A subsequent flush doesn't send out anything
558        out.clear();
559        agg.flush_into(&mut out);
560        assert_eq!(0, out.len());
561
562        // One more just to make sure that we don't re-see from the other buffer
563        out.clear();
564        agg.flush_into(&mut out);
565        assert_eq!(0, out.len());
566
567        // Two absolutes with the same series, should get the 2nd (last) back.
568        assert_eq!(agg.record(gauge_a_1.clone()), None);
569        assert_eq!(agg.record(gauge_a_2.clone()), None);
570        out.clear();
571        agg.flush_into(&mut out);
572        assert_eq!(1, out.len());
573        assert_eq!(&gauge_a_2, &out[0]);
574
575        let gauge_b_1 = make_metric(
576            "gauge_b",
577            MetricKind::Absolute,
578            MetricValue::Gauge { value: 44.0 },
579        );
580        // Two increments with the different series, should get each back as-is
581        assert_eq!(agg.record(gauge_a_1.clone()), None);
582        assert_eq!(agg.record(gauge_b_1.clone()), None);
583        out.clear();
584        agg.flush_into(&mut out);
585        assert_eq!(2, out.len());
586        // B/c we don't know the order they'll come back
587        for event in out {
588            match event.as_metric().series().name.name.as_str() {
589                "gauge_a" => assert_eq!(gauge_a_1, event),
590                "gauge_b" => assert_eq!(gauge_b_1, event),
591                _ => panic!("Unexpected metric name in aggregate output"),
592            }
593        }
594    }
595
596    #[test]
597    fn count_agg() {
598        let mut agg = Aggregate::new(&AggregateConfig {
599            interval_ms: 1000_u64,
600            mode: AggregationMode::Count,
601        })
602        .unwrap();
603
604        let gauge_a_1 = make_metric(
605            "gauge_a",
606            MetricKind::Absolute,
607            MetricValue::Gauge { value: 42.0 },
608        );
609        let gauge_a_2 = make_metric(
610            "gauge_a",
611            MetricKind::Absolute,
612            MetricValue::Gauge { value: 43.0 },
613        );
614        let result_count = make_metric(
615            "gauge_a",
616            MetricKind::Absolute,
617            MetricValue::Counter { value: 1.0 },
618        );
619        let result_count_2 = make_metric(
620            "gauge_a",
621            MetricKind::Absolute,
622            MetricValue::Counter { value: 2.0 },
623        );
624
625        // Single item, counter should be 1
626        assert_eq!(agg.record(gauge_a_1.clone()), None);
627        let mut out = vec![];
628        // We should flush 1 item gauge_a_1
629        agg.flush_into(&mut out);
630        assert_eq!(1, out.len());
631        assert_eq!(&result_count, &out[0]);
632
633        // A subsequent flush doesn't send out anything
634        out.clear();
635        agg.flush_into(&mut out);
636        assert_eq!(0, out.len());
637
638        // One more just to make sure that we don't re-see from the other buffer
639        out.clear();
640        agg.flush_into(&mut out);
641        assert_eq!(0, out.len());
642
643        // Two absolutes with the same series, counter should be 2
644        assert_eq!(agg.record(gauge_a_1.clone()), None);
645        assert_eq!(agg.record(gauge_a_2.clone()), None);
646        out.clear();
647        agg.flush_into(&mut out);
648        assert_eq!(1, out.len());
649        assert_eq!(&result_count_2, &out[0]);
650    }
651
652    #[test]
653    fn absolute_max() {
654        let mut agg = Aggregate::new(&AggregateConfig {
655            interval_ms: 1000_u64,
656            mode: AggregationMode::Max,
657        })
658        .unwrap();
659
660        let gauge_a_1 = make_metric(
661            "gauge_a",
662            MetricKind::Absolute,
663            MetricValue::Gauge { value: 112.0 },
664        );
665        let gauge_a_2 = make_metric(
666            "gauge_a",
667            MetricKind::Absolute,
668            MetricValue::Gauge { value: 89.0 },
669        );
670
671        // Single item, it should be returned as is
672        assert_eq!(agg.record(gauge_a_2.clone()), None);
673        let mut out = vec![];
674        // We should flush 1 item gauge_a_2
675        agg.flush_into(&mut out);
676        assert_eq!(1, out.len());
677        assert_eq!(&gauge_a_2, &out[0]);
678
679        // A subsequent flush doesn't send out anything
680        out.clear();
681        agg.flush_into(&mut out);
682        assert_eq!(0, out.len());
683
684        // One more just to make sure that we don't re-see from the other buffer
685        out.clear();
686        agg.flush_into(&mut out);
687        assert_eq!(0, out.len());
688
689        // Two absolutes, result should be higher of the 2
690        assert_eq!(agg.record(gauge_a_1.clone()), None);
691        assert_eq!(agg.record(gauge_a_2.clone()), None);
692        out.clear();
693        agg.flush_into(&mut out);
694        assert_eq!(1, out.len());
695        assert_eq!(&gauge_a_1, &out[0]);
696    }
697
698    #[test]
699    fn absolute_min() {
700        let mut agg = Aggregate::new(&AggregateConfig {
701            interval_ms: 1000_u64,
702            mode: AggregationMode::Min,
703        })
704        .unwrap();
705
706        let gauge_a_1 = make_metric(
707            "gauge_a",
708            MetricKind::Absolute,
709            MetricValue::Gauge { value: 32.0 },
710        );
711        let gauge_a_2 = make_metric(
712            "gauge_a",
713            MetricKind::Absolute,
714            MetricValue::Gauge { value: 89.0 },
715        );
716
717        // Single item, it should be returned as is
718        assert_eq!(agg.record(gauge_a_2.clone()), None);
719        let mut out = vec![];
720        // We should flush 1 item gauge_a_2
721        agg.flush_into(&mut out);
722        assert_eq!(1, out.len());
723        assert_eq!(&gauge_a_2, &out[0]);
724
725        // A subsequent flush doesn't send out anything
726        out.clear();
727        agg.flush_into(&mut out);
728        assert_eq!(0, out.len());
729
730        // One more just to make sure that we don't re-see from the other buffer
731        out.clear();
732        agg.flush_into(&mut out);
733        assert_eq!(0, out.len());
734
735        // Two absolutes, result should be lower of the 2
736        assert_eq!(agg.record(gauge_a_1.clone()), None);
737        assert_eq!(agg.record(gauge_a_2.clone()), None);
738        out.clear();
739        agg.flush_into(&mut out);
740        assert_eq!(1, out.len());
741        assert_eq!(&gauge_a_1, &out[0]);
742    }
743
744    #[test]
745    fn absolute_diff() {
746        let mut agg = Aggregate::new(&AggregateConfig {
747            interval_ms: 1000_u64,
748            mode: AggregationMode::Diff,
749        })
750        .unwrap();
751
752        let gauge_a_1 = make_metric(
753            "gauge_a",
754            MetricKind::Absolute,
755            MetricValue::Gauge { value: 32.0 },
756        );
757        let gauge_a_2 = make_metric(
758            "gauge_a",
759            MetricKind::Absolute,
760            MetricValue::Gauge { value: 82.0 },
761        );
762        let result = make_metric(
763            "gauge_a",
764            MetricKind::Absolute,
765            MetricValue::Gauge { value: 50.0 },
766        );
767
768        // Single item, it should be returned as is
769        assert_eq!(agg.record(gauge_a_2.clone()), None);
770        let mut out = vec![];
771        // We should flush 1 item gauge_a_2
772        agg.flush_into(&mut out);
773        assert_eq!(1, out.len());
774        assert_eq!(&gauge_a_2, &out[0]);
775
776        // A subsequent flush doesn't send out anything
777        out.clear();
778        agg.flush_into(&mut out);
779        assert_eq!(0, out.len());
780
781        // One more just to make sure that we don't re-see from the other buffer
782        out.clear();
783        agg.flush_into(&mut out);
784        assert_eq!(0, out.len());
785
786        // Two absolutes in 2 separate flushes, result should be diff between the 2
787        assert_eq!(agg.record(gauge_a_1.clone()), None);
788        out.clear();
789        agg.flush_into(&mut out);
790        assert_eq!(1, out.len());
791        assert_eq!(&gauge_a_1, &out[0]);
792
793        assert_eq!(agg.record(gauge_a_2.clone()), None);
794        out.clear();
795        agg.flush_into(&mut out);
796        assert_eq!(1, out.len());
797        assert_eq!(&result, &out[0]);
798    }
799
800    #[test]
801    fn absolute_diff_conflicting_type() {
802        let mut agg = Aggregate::new(&AggregateConfig {
803            interval_ms: 1000_u64,
804            mode: AggregationMode::Diff,
805        })
806        .unwrap();
807
808        let gauge_a_1 = make_metric(
809            "gauge_a",
810            MetricKind::Absolute,
811            MetricValue::Gauge { value: 32.0 },
812        );
813        let gauge_a_2 = make_metric(
814            "gauge_a",
815            MetricKind::Absolute,
816            MetricValue::Counter { value: 1.0 },
817        );
818
819        let mut out = vec![];
820        // Two absolutes in 2 separate flushes, result should be second one due to different types
821        assert_eq!(agg.record(gauge_a_1.clone()), None);
822        out.clear();
823        agg.flush_into(&mut out);
824        assert_eq!(1, out.len());
825        assert_eq!(&gauge_a_1, &out[0]);
826
827        assert_eq!(agg.record(gauge_a_2.clone()), None);
828        out.clear();
829        agg.flush_into(&mut out);
830        assert_eq!(1, out.len());
831        // Due to incompatible results, the new value just overwrites the old one
832        assert_eq!(&gauge_a_2, &out[0]);
833    }
834
835    #[test]
836    fn absolute_mean() {
837        let mut agg = Aggregate::new(&AggregateConfig {
838            interval_ms: 1000_u64,
839            mode: AggregationMode::Mean,
840        })
841        .unwrap();
842
843        let gauge_a_1 = make_metric(
844            "gauge_a",
845            MetricKind::Absolute,
846            MetricValue::Gauge { value: 32.0 },
847        );
848        let gauge_a_2 = make_metric(
849            "gauge_a",
850            MetricKind::Absolute,
851            MetricValue::Gauge { value: 82.0 },
852        );
853        let gauge_a_3 = make_metric(
854            "gauge_a",
855            MetricKind::Absolute,
856            MetricValue::Gauge { value: 51.0 },
857        );
858        let mean_result = make_metric(
859            "gauge_a",
860            MetricKind::Absolute,
861            MetricValue::Gauge { value: 55.0 },
862        );
863
864        // Single item, it should be returned as is
865        assert_eq!(agg.record(gauge_a_2.clone()), None);
866        let mut out = vec![];
867        // We should flush 1 item gauge_a_2
868        agg.flush_into(&mut out);
869        assert_eq!(1, out.len());
870        assert_eq!(&gauge_a_2, &out[0]);
871
872        // A subsequent flush doesn't send out anything
873        out.clear();
874        agg.flush_into(&mut out);
875        assert_eq!(0, out.len());
876
877        // One more just to make sure that we don't re-see from the other buffer
878        out.clear();
879        agg.flush_into(&mut out);
880        assert_eq!(0, out.len());
881
882        // Three absolutes, result should be mean
883        assert_eq!(agg.record(gauge_a_1.clone()), None);
884        assert_eq!(agg.record(gauge_a_2.clone()), None);
885        assert_eq!(agg.record(gauge_a_3.clone()), None);
886        out.clear();
887        agg.flush_into(&mut out);
888        assert_eq!(1, out.len());
889        assert_eq!(&mean_result, &out[0]);
890    }
891
892    #[test]
893    fn absolute_stdev() {
894        let mut agg = Aggregate::new(&AggregateConfig {
895            interval_ms: 1000_u64,
896            mode: AggregationMode::Stdev,
897        })
898        .unwrap();
899
900        let gauges = vec![
901            make_metric(
902                "gauge_a",
903                MetricKind::Absolute,
904                MetricValue::Gauge { value: 25.0 },
905            ),
906            make_metric(
907                "gauge_a",
908                MetricKind::Absolute,
909                MetricValue::Gauge { value: 30.0 },
910            ),
911            make_metric(
912                "gauge_a",
913                MetricKind::Absolute,
914                MetricValue::Gauge { value: 35.0 },
915            ),
916            make_metric(
917                "gauge_a",
918                MetricKind::Absolute,
919                MetricValue::Gauge { value: 40.0 },
920            ),
921            make_metric(
922                "gauge_a",
923                MetricKind::Absolute,
924                MetricValue::Gauge { value: 45.0 },
925            ),
926            make_metric(
927                "gauge_a",
928                MetricKind::Absolute,
929                MetricValue::Gauge { value: 50.0 },
930            ),
931            make_metric(
932                "gauge_a",
933                MetricKind::Absolute,
934                MetricValue::Gauge { value: 55.0 },
935            ),
936        ];
937        let stdev_result = make_metric(
938            "gauge_a",
939            MetricKind::Absolute,
940            MetricValue::Gauge { value: 10.0 },
941        );
942
943        for gauge in gauges {
944            assert_eq!(agg.record(gauge), None);
945        }
946        let mut out = vec![];
947        agg.flush_into(&mut out);
948        assert_eq!(1, out.len());
949        assert_eq!(&stdev_result, &out[0]);
950    }
951
952    #[test]
953    fn passes_through_ignored_kind() {
954        // Sum mode aggregates incremental, passes through absolute without collapsing.
955        let mut agg = Aggregate::new(&AggregateConfig {
956            interval_ms: 1000_u64,
957            mode: AggregationMode::Sum,
958        })
959        .unwrap();
960
961        let counter_1 = make_metric(
962            "counter_a",
963            MetricKind::Incremental,
964            MetricValue::Counter { value: 10.0 },
965        );
966        let counter_2 = make_metric(
967            "counter_a",
968            MetricKind::Incremental,
969            MetricValue::Counter { value: 5.0 },
970        );
971        let counter_summed = make_metric(
972            "counter_a",
973            MetricKind::Incremental,
974            MetricValue::Counter { value: 15.0 },
975        );
976        let gauge_1 = make_metric(
977            "gauge_a",
978            MetricKind::Absolute,
979            MetricValue::Gauge { value: 42.0 },
980        );
981        let gauge_2 = make_metric(
982            "gauge_a",
983            MetricKind::Absolute,
984            MetricValue::Gauge { value: 99.0 },
985        );
986
987        // Absolute metrics pass through immediately (not held until flush).
988        assert_eq!(agg.record(gauge_1.clone()), Some(gauge_1));
989        assert_eq!(agg.record(gauge_2.clone()), Some(gauge_2));
990
991        // Each is returned individually — no collapsing to latest.
992        assert_eq!(agg.record(counter_1), None);
993        assert_eq!(agg.record(counter_2), None);
994
995        let mut out = vec![];
996        agg.flush_into(&mut out);
997        // Only the summed incremental counter appears at flush; the gauges already passed through.
998        assert_eq!(1, out.len());
999        assert_eq!(&counter_summed, &out[0]);
1000    }
1001
1002    #[test]
1003    fn conflicting_value_type() {
1004        let mut agg = Aggregate::new(&AggregateConfig {
1005            interval_ms: 1000_u64,
1006            mode: AggregationMode::Auto,
1007        })
1008        .unwrap();
1009
1010        let counter = make_metric(
1011            "the-thing",
1012            MetricKind::Incremental,
1013            MetricValue::Counter { value: 42.0 },
1014        );
1015        let mut values = BTreeSet::<String>::new();
1016        values.insert("a".into());
1017        values.insert("b".into());
1018        let set = make_metric(
1019            "the-thing",
1020            MetricKind::Incremental,
1021            MetricValue::Set { values },
1022        );
1023        let summed = make_metric(
1024            "the-thing",
1025            MetricKind::Incremental,
1026            MetricValue::Counter { value: 84.0 },
1027        );
1028
1029        // when types conflict the new values replaces whatever is there
1030
1031        // Start with an counter
1032        assert_eq!(agg.record(counter.clone()), None);
1033        // Another will "add" to it
1034        assert_eq!(agg.record(counter.clone()), None);
1035        // Then an set will replace it due to a failed update
1036        assert_eq!(agg.record(set.clone()), None);
1037        // Then a set union would be a noop
1038        assert_eq!(agg.record(set.clone()), None);
1039        let mut out = vec![];
1040        // We should flush 1 item counter
1041        agg.flush_into(&mut out);
1042        assert_eq!(1, out.len());
1043        assert_eq!(&set, &out[0]);
1044
1045        // Start out with an set
1046        assert_eq!(agg.record(set.clone()), None);
1047        // Union with itself, a noop
1048        assert_eq!(agg.record(set), None);
1049        // Send an counter with the same name, will replace due to a failed update
1050        assert_eq!(agg.record(counter.clone()), None);
1051        // Send another counter will "add"
1052        assert_eq!(agg.record(counter), None);
1053        let mut out = vec![];
1054        // We should flush 1 item counter
1055        agg.flush_into(&mut out);
1056        assert_eq!(1, out.len());
1057        assert_eq!(&summed, &out[0]);
1058    }
1059
1060    #[test]
1061    fn conflicting_kinds() {
1062        let mut agg = Aggregate::new(&AggregateConfig {
1063            interval_ms: 1000_u64,
1064            mode: AggregationMode::Auto,
1065        })
1066        .unwrap();
1067
1068        let incremental = make_metric(
1069            "the-thing",
1070            MetricKind::Incremental,
1071            MetricValue::Counter { value: 42.0 },
1072        );
1073        let absolute = make_metric(
1074            "the-thing",
1075            MetricKind::Absolute,
1076            MetricValue::Counter { value: 43.0 },
1077        );
1078        let summed = make_metric(
1079            "the-thing",
1080            MetricKind::Incremental,
1081            MetricValue::Counter { value: 84.0 },
1082        );
1083
1084        // when types conflict the new values replaces whatever is there
1085
1086        // Start with an incremental
1087        assert_eq!(agg.record(incremental.clone()), None);
1088        // Another will "add" to it
1089        assert_eq!(agg.record(incremental.clone()), None);
1090        // Then an absolute will replace it with a failed update
1091        assert_eq!(agg.record(absolute.clone()), None);
1092        // Then another absolute will replace it normally
1093        assert_eq!(agg.record(absolute.clone()), None);
1094        let mut out = vec![];
1095        // We should flush 1 item incremental
1096        agg.flush_into(&mut out);
1097        assert_eq!(1, out.len());
1098        assert_eq!(&absolute, &out[0]);
1099
1100        // Start out with an absolute
1101        assert_eq!(agg.record(absolute.clone()), None);
1102        // Replace it normally
1103        assert_eq!(agg.record(absolute), None);
1104        // Send an incremental with the same name, will replace due to a failed update
1105        assert_eq!(agg.record(incremental.clone()), None);
1106        // Send another incremental will "add"
1107        assert_eq!(agg.record(incremental), None);
1108        let mut out = vec![];
1109        // We should flush 1 item incremental
1110        agg.flush_into(&mut out);
1111        assert_eq!(1, out.len());
1112        assert_eq!(&summed, &out[0]);
1113    }
1114
1115    #[tokio::test]
1116    async fn transform_shutdown() {
1117        let agg = serde_yaml::from_str::<AggregateConfig>(indoc! {"
1118            interval_ms: 999999
1119        "})
1120        .unwrap()
1121        .build(&TransformContext::default())
1122        .await
1123        .unwrap();
1124
1125        let agg = agg.into_task();
1126
1127        let counter_a_1 = make_metric(
1128            "counter_a",
1129            MetricKind::Incremental,
1130            MetricValue::Counter { value: 42.0 },
1131        );
1132        let counter_a_2 = make_metric(
1133            "counter_a",
1134            MetricKind::Incremental,
1135            MetricValue::Counter { value: 43.0 },
1136        );
1137        let counter_a_summed = make_metric(
1138            "counter_a",
1139            MetricKind::Incremental,
1140            MetricValue::Counter { value: 85.0 },
1141        );
1142        let gauge_a_1 = make_metric(
1143            "gauge_a",
1144            MetricKind::Absolute,
1145            MetricValue::Gauge { value: 42.0 },
1146        );
1147        let gauge_a_2 = make_metric(
1148            "gauge_a",
1149            MetricKind::Absolute,
1150            MetricValue::Gauge { value: 43.0 },
1151        );
1152        let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
1153
1154        // Queue up some events to be consumed & recorded
1155        let in_stream = Box::pin(stream::iter(inputs));
1156        // Kick off the transform process which should consume & record them
1157        let mut out_stream = agg.transform_events(in_stream);
1158
1159        // B/c the input stream has ended we will have gone through the `input_rx.next() => None`
1160        // part of the loop and do the shutting down final flush immediately. We'll already be able
1161        // to read our expected bits on the output.
1162        let mut count = 0_u8;
1163        while let Some(event) = out_stream.next().await {
1164            count += 1;
1165            match event.as_metric().series().name.name.as_str() {
1166                "counter_a" => assert_eq!(counter_a_summed, event),
1167                "gauge_a" => assert_eq!(gauge_a_2, event),
1168                _ => panic!("Unexpected metric name in aggregate output"),
1169            };
1170        }
1171        // There were only 2
1172        assert_eq!(2, count);
1173    }
1174
1175    #[tokio::test]
1176    async fn transform_interval() {
1177        let transform_config = serde_yaml::from_str::<AggregateConfig>("{}").unwrap();
1178
1179        let counter_a_1 = make_metric(
1180            "counter_a",
1181            MetricKind::Incremental,
1182            MetricValue::Counter { value: 42.0 },
1183        );
1184        let counter_a_2 = make_metric(
1185            "counter_a",
1186            MetricKind::Incremental,
1187            MetricValue::Counter { value: 43.0 },
1188        );
1189        let counter_a_summed = make_metric(
1190            "counter_a",
1191            MetricKind::Incremental,
1192            MetricValue::Counter { value: 85.0 },
1193        );
1194        let gauge_a_1 = make_metric(
1195            "gauge_a",
1196            MetricKind::Absolute,
1197            MetricValue::Gauge { value: 42.0 },
1198        );
1199        let gauge_a_2 = make_metric(
1200            "gauge_a",
1201            MetricKind::Absolute,
1202            MetricValue::Gauge { value: 43.0 },
1203        );
1204
1205        assert_transform_compliance(async {
1206            let (tx, rx) = mpsc::channel(10);
1207            let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
1208            let mut out = ReceiverStream::new(out);
1209
1210            tokio::time::pause();
1211
1212            // tokio interval is always immediately ready, so we poll once to make sure
1213            // we trip it/set the interval in the future
1214            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1215
1216            // Now send our events
1217            tx.send(counter_a_1).await.unwrap();
1218            tx.send(counter_a_2).await.unwrap();
1219            tx.send(gauge_a_1).await.unwrap();
1220            tx.send(gauge_a_2.clone()).await.unwrap();
1221            // We won't have flushed yet b/c the interval hasn't elapsed, so no outputs
1222            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1223            // Now fast forward time enough that our flush should trigger.
1224            tokio::time::advance(Duration::from_secs(11)).await;
1225            // We should have had an interval fire now and our output aggregate events should be
1226            // available.
1227            let mut count = 0_u8;
1228            while count < 2 {
1229                match out.next().await {
1230                    Some(event) => {
1231                        match event.as_metric().series().name.name.as_str() {
1232                            "counter_a" => assert_eq!(counter_a_summed, event),
1233                            "gauge_a" => assert_eq!(gauge_a_2, event),
1234                            _ => panic!("Unexpected metric name in aggregate output"),
1235                        };
1236                        count += 1;
1237                    }
1238                    _ => {
1239                        panic!("Unexpectedly received None in output stream");
1240                    }
1241                }
1242            }
1243            // We should be back to pending, having nothing waiting for us
1244            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1245
1246            drop(tx);
1247            topology.stop().await;
1248            assert_eq!(out.next().await, None);
1249        })
1250        .await;
1251    }
1252}