1use std::{
2 collections::{HashMap, hash_map::Entry},
3 pin::Pin,
4 time::Duration,
5};
6
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use vector_lib::{
10 configurable::configurable_component,
11 event::{
12 MetricValue,
13 metric::{Metric, MetricData, MetricKind, MetricSeries},
14 },
15};
16
17use crate::{
18 config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
19 event::{Event, EventMetadata},
20 internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
21 schema,
22 transforms::{TaskTransform, Transform},
23};
24
25#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
27#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
28#[serde(deny_unknown_fields)]
29pub struct AggregateConfig {
30 #[serde(default = "default_interval_ms")]
34 #[configurable(metadata(docs::human_name = "Flush Interval"))]
35 pub interval_ms: u64,
36 #[serde(default = "default_mode")]
40 #[configurable(derived)]
41 pub mode: AggregationMode,
42}
43
44#[configurable_component]
45#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
46#[configurable(description = "The aggregation mode to use.")]
47pub enum AggregationMode {
48 #[default]
50 Auto,
51
52 Sum,
54
55 Latest,
57
58 Count,
60
61 Diff,
63
64 Max,
66
67 Min,
69
70 Mean,
72
73 Stdev,
75}
76
77#[derive(Clone, Debug, Default, PartialEq)]
78enum InnerMode {
79 #[default]
81 Auto,
82
83 Sum,
85
86 Latest,
88
89 Count,
91
92 Diff {
94 prev_map: HashMap<MetricSeries, MetricEntry>,
95 },
96
97 Max,
99
100 Min,
102
103 Mean {
105 multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
106 },
107
108 Stdev {
110 multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
111 },
112}
113
114impl From<AggregationMode> for InnerMode {
115 fn from(value: AggregationMode) -> Self {
116 match value {
117 AggregationMode::Auto => InnerMode::Auto,
118 AggregationMode::Sum => InnerMode::Sum,
119 AggregationMode::Latest => InnerMode::Latest,
120 AggregationMode::Count => InnerMode::Count,
121 AggregationMode::Diff => InnerMode::Diff {
122 prev_map: HashMap::default(),
123 },
124 AggregationMode::Max => InnerMode::Max,
125 AggregationMode::Min => InnerMode::Min,
126 AggregationMode::Mean => InnerMode::Mean {
127 multi_map: HashMap::default(),
128 },
129 AggregationMode::Stdev => InnerMode::Stdev {
130 multi_map: HashMap::default(),
131 },
132 }
133 }
134}
135
136const fn default_mode() -> AggregationMode {
137 AggregationMode::Auto
138}
139
140const fn default_interval_ms() -> u64 {
141 10 * 1000
142}
143
144impl_generate_config_from_default!(AggregateConfig);
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "aggregate")]
148impl TransformConfig for AggregateConfig {
149 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
150 Aggregate::new(self).map(Transform::event_task)
151 }
152
153 fn input(&self) -> Input {
154 Input::metric()
155 }
156
157 fn outputs(
158 &self,
159 _: &TransformContext,
160 _: &[(OutputId, schema::Definition)],
161 ) -> Vec<TransformOutput> {
162 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
163 }
164}
165
166type MetricEntry = (MetricData, EventMetadata);
167
168#[derive(Debug)]
169pub struct Aggregate {
170 interval: Duration,
171 map: HashMap<MetricSeries, MetricEntry>,
172 mode: InnerMode,
173}
174
175impl Aggregate {
176 pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
177 Ok(Self {
178 interval: Duration::from_millis(config.interval_ms),
179 map: Default::default(),
180 mode: config.mode.into(),
181 })
182 }
183
184 pub fn record(&mut self, event: Event) -> Option<Event> {
185 let (series, data, metadata) = event.into_metric().into_parts();
186
187 match (&mut self.mode, data.kind) {
188 (InnerMode::Sum, MetricKind::Absolute)
189 | (InnerMode::Latest | InnerMode::Diff { .. }, MetricKind::Incremental)
190 | (InnerMode::Max | InnerMode::Min, MetricKind::Incremental)
191 | (InnerMode::Mean { .. } | InnerMode::Stdev { .. }, MetricKind::Incremental) => {
192 return Some(Event::Metric(Metric::from_parts(series, data, metadata)));
193 }
194 (InnerMode::Auto | InnerMode::Sum, MetricKind::Incremental) => {
195 self.record_sum(series, data, metadata);
196 }
197 (InnerMode::Auto, MetricKind::Absolute)
198 | (InnerMode::Latest | InnerMode::Diff { .. }, MetricKind::Absolute) => {
199 self.map.insert(series, (data, metadata));
200 }
201 (InnerMode::Count, _) => {
202 self.record_count(series, data, metadata);
203 }
204 (InnerMode::Max | InnerMode::Min, MetricKind::Absolute) => {
205 self.record_comparison(series, data, metadata);
206 }
207 (
208 InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map },
209 MetricKind::Absolute,
210 ) => {
211 if matches!(data.value, MetricValue::Gauge { value: _ }) {
212 match multi_map.entry(series) {
213 Entry::Occupied(mut entry) => entry.get_mut().push((data, metadata)),
214 Entry::Vacant(entry) => {
215 entry.insert(vec![(data, metadata)]);
216 }
217 }
218 }
219 }
220 }
221 emit!(AggregateEventRecorded);
222 None
223 }
224
225 fn record_count(
226 &mut self,
227 series: MetricSeries,
228 mut data: MetricData,
229 metadata: EventMetadata,
230 ) {
231 let mut count_data = data.clone();
232 let existing = self.map.entry(series).or_insert_with(|| {
233 *data.value_mut() = MetricValue::Counter { value: 0f64 };
234 (data.clone(), metadata.clone())
235 });
236 *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
237 if existing.0.kind == data.kind && existing.0.update(&count_data) {
238 existing.1.merge(metadata);
239 } else {
240 emit!(AggregateUpdateFailed);
241 }
242 }
243
244 fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
245 match self.map.entry(series) {
246 Entry::Occupied(mut entry) => {
247 let existing = entry.get_mut();
248 if existing.0.kind == data.kind && existing.0.update(&data) {
250 existing.1.merge(metadata);
251 } else {
252 emit!(AggregateUpdateFailed);
253 *existing = (data, metadata);
254 }
255 }
256 Entry::Vacant(entry) => {
257 entry.insert((data, metadata));
258 }
259 }
260 }
261
262 fn record_comparison(
263 &mut self,
264 series: MetricSeries,
265 data: MetricData,
266 metadata: EventMetadata,
267 ) {
268 match self.map.entry(series) {
269 Entry::Occupied(mut entry) => {
270 let existing = entry.get_mut();
271 if existing.0.kind == data.kind {
273 if let MetricValue::Gauge {
274 value: existing_value,
275 } = existing.0.value()
276 && let MetricValue::Gauge { value: new_value } = data.value()
277 {
278 let should_update = match self.mode {
279 InnerMode::Max => new_value > existing_value,
280 InnerMode::Min => new_value < existing_value,
281 _ => false,
282 };
283 if should_update {
284 *existing = (data, metadata);
285 }
286 }
287 } else {
288 emit!(AggregateUpdateFailed);
289 *existing = (data, metadata);
290 }
291 }
292 Entry::Vacant(entry) => {
293 entry.insert((data, metadata));
294 }
295 }
296 }
297
298 pub fn flush_into(&mut self, output: &mut Vec<Event>) {
299 let map = std::mem::take(&mut self.map);
300 for (series, entry) in map.clone().into_iter() {
301 let mut metric = Metric::from_parts(series, entry.0, entry.1);
302 if let InnerMode::Diff { prev_map } = &self.mode
303 && let Some(prev_entry) = prev_map.get(metric.series())
304 && metric.data().kind == prev_entry.0.kind
305 && !metric.subtract(&prev_entry.0)
306 {
307 emit!(AggregateUpdateFailed);
308 }
309 output.push(Event::Metric(metric));
310 }
311
312 let multi_map = match &mut self.mode {
313 InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map } => {
314 std::mem::take(multi_map)
315 }
316 _ => HashMap::default(),
317 };
318
319 'outer: for (series, entries) in multi_map.into_iter() {
320 if entries.is_empty() {
321 continue;
322 }
323
324 let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
325 for (data, metadata) in entries.iter().skip(1) {
326 if !final_sum.update(data) {
327 emit!(AggregateUpdateFailed);
329 continue 'outer;
330 }
331 final_metadata.merge(metadata.clone());
332 }
333
334 let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
335 *value /= entries.len() as f64;
337 *value
338 } else {
339 0.0
340 };
341
342 let final_mean = final_sum.clone();
343 match self.mode {
344 InnerMode::Mean { .. } => {
345 let metric = Metric::from_parts(series, final_mean, final_metadata);
346 output.push(Event::Metric(metric));
347 }
348 InnerMode::Stdev { .. } => {
349 let variance = entries
350 .iter()
351 .filter_map(|(data, _)| {
352 if let MetricValue::Gauge { value } = data.value() {
353 let diff = final_mean_value - value;
354 Some(diff * diff)
355 } else {
356 None
357 }
358 })
359 .sum::<f64>()
360 / entries.len() as f64;
361 let mut final_stdev = final_mean;
362 if let MetricValue::Gauge { value } = final_stdev.value_mut() {
363 *value = variance.sqrt()
364 }
365 let metric = Metric::from_parts(series, final_stdev, final_metadata);
366 output.push(Event::Metric(metric));
367 }
368 _ => (),
369 }
370 }
371
372 if let InnerMode::Diff { prev_map } = &mut self.mode {
373 *prev_map = map;
374 }
375 emit!(AggregateFlushed);
376 }
377}
378
379impl TaskTransform<Event> for Aggregate {
380 fn transform(
381 mut self: Box<Self>,
382 mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
383 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
384 where
385 Self: 'static,
386 {
387 let mut flush_stream = tokio::time::interval(self.interval);
388
389 Box::pin(stream! {
390 let mut output = Vec::new();
391 let mut done = false;
392 while !done {
393 tokio::select! {
394 _ = flush_stream.tick() => {
395 self.flush_into(&mut output);
396 },
397 maybe_event = input_rx.next() => {
398 match maybe_event {
399 None => {
400 self.flush_into(&mut output);
401 done = true;
402 }
403 Some(event) => {
404 if let Some(passthrough) = self.record(event) {
405 output.push(passthrough);
406 }
407 }
408 }
409 }
410 };
411 for event in output.drain(..) {
412 yield event;
413 }
414 }
415 })
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use std::{collections::BTreeSet, sync::Arc, task::Poll};
422
423 use futures::stream;
424 use indoc::indoc;
425 use tokio::sync::mpsc;
426 use tokio_stream::wrappers::ReceiverStream;
427 use vector_lib::config::{ComponentKey, LogNamespace};
428 use vrl::value::Kind;
429
430 use super::*;
431 use crate::{
432 event::{
433 Event, Metric,
434 metric::{MetricKind, MetricValue},
435 },
436 schema::Definition,
437 test_util::components::assert_transform_compliance,
438 transforms::test::create_topology,
439 };
440
441 #[test]
442 fn generate_config() {
443 crate::test_util::test_generate_config::<AggregateConfig>();
444 }
445
446 fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
447 let mut event = Event::Metric(Metric::new(name, kind, value))
448 .with_source_id(Arc::new(ComponentKey::from("in")))
449 .with_upstream_id(Arc::new(OutputId::from("transform")));
450 event.metadata_mut().set_schema_definition(&Arc::new(
451 Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
452 ));
453
454 event.metadata_mut().set_source_type("unit_test_stream");
455
456 event
457 }
458
459 #[test]
460 fn incremental_auto() {
461 let mut agg = Aggregate::new(&AggregateConfig {
462 interval_ms: 1000_u64,
463 mode: AggregationMode::Auto,
464 })
465 .unwrap();
466
467 let counter_a_1 = make_metric(
468 "counter_a",
469 MetricKind::Incremental,
470 MetricValue::Counter { value: 42.0 },
471 );
472 let counter_a_2 = make_metric(
473 "counter_a",
474 MetricKind::Incremental,
475 MetricValue::Counter { value: 43.0 },
476 );
477 let counter_a_summed = make_metric(
478 "counter_a",
479 MetricKind::Incremental,
480 MetricValue::Counter { value: 85.0 },
481 );
482
483 assert_eq!(agg.record(counter_a_1.clone()), None);
485 let mut out = vec![];
486 agg.flush_into(&mut out);
488 assert_eq!(1, out.len());
489 assert_eq!(&counter_a_1, &out[0]);
490
491 out.clear();
493 agg.flush_into(&mut out);
494 assert_eq!(0, out.len());
495
496 out.clear();
498 agg.flush_into(&mut out);
499 assert_eq!(0, out.len());
500
501 assert_eq!(agg.record(counter_a_1.clone()), None);
503 assert_eq!(agg.record(counter_a_2), None);
504 out.clear();
505 agg.flush_into(&mut out);
506 assert_eq!(1, out.len());
507 assert_eq!(&counter_a_summed, &out[0]);
508
509 let counter_b_1 = make_metric(
510 "counter_b",
511 MetricKind::Incremental,
512 MetricValue::Counter { value: 44.0 },
513 );
514 assert_eq!(agg.record(counter_a_1.clone()), None);
516 assert_eq!(agg.record(counter_b_1.clone()), None);
517 out.clear();
518 agg.flush_into(&mut out);
519 assert_eq!(2, out.len());
520 for event in out {
522 match event.as_metric().series().name.name.as_str() {
523 "counter_a" => assert_eq!(counter_a_1, event),
524 "counter_b" => assert_eq!(counter_b_1, event),
525 _ => panic!("Unexpected metric name in aggregate output"),
526 }
527 }
528 }
529
530 #[test]
531 fn absolute_auto() {
532 let mut agg = Aggregate::new(&AggregateConfig {
533 interval_ms: 1000_u64,
534 mode: AggregationMode::Auto,
535 })
536 .unwrap();
537
538 let gauge_a_1 = make_metric(
539 "gauge_a",
540 MetricKind::Absolute,
541 MetricValue::Gauge { value: 42.0 },
542 );
543 let gauge_a_2 = make_metric(
544 "gauge_a",
545 MetricKind::Absolute,
546 MetricValue::Gauge { value: 43.0 },
547 );
548
549 assert_eq!(agg.record(gauge_a_1.clone()), None);
551 let mut out = vec![];
552 agg.flush_into(&mut out);
554 assert_eq!(1, out.len());
555 assert_eq!(&gauge_a_1, &out[0]);
556
557 out.clear();
559 agg.flush_into(&mut out);
560 assert_eq!(0, out.len());
561
562 out.clear();
564 agg.flush_into(&mut out);
565 assert_eq!(0, out.len());
566
567 assert_eq!(agg.record(gauge_a_1.clone()), None);
569 assert_eq!(agg.record(gauge_a_2.clone()), None);
570 out.clear();
571 agg.flush_into(&mut out);
572 assert_eq!(1, out.len());
573 assert_eq!(&gauge_a_2, &out[0]);
574
575 let gauge_b_1 = make_metric(
576 "gauge_b",
577 MetricKind::Absolute,
578 MetricValue::Gauge { value: 44.0 },
579 );
580 assert_eq!(agg.record(gauge_a_1.clone()), None);
582 assert_eq!(agg.record(gauge_b_1.clone()), None);
583 out.clear();
584 agg.flush_into(&mut out);
585 assert_eq!(2, out.len());
586 for event in out {
588 match event.as_metric().series().name.name.as_str() {
589 "gauge_a" => assert_eq!(gauge_a_1, event),
590 "gauge_b" => assert_eq!(gauge_b_1, event),
591 _ => panic!("Unexpected metric name in aggregate output"),
592 }
593 }
594 }
595
596 #[test]
597 fn count_agg() {
598 let mut agg = Aggregate::new(&AggregateConfig {
599 interval_ms: 1000_u64,
600 mode: AggregationMode::Count,
601 })
602 .unwrap();
603
604 let gauge_a_1 = make_metric(
605 "gauge_a",
606 MetricKind::Absolute,
607 MetricValue::Gauge { value: 42.0 },
608 );
609 let gauge_a_2 = make_metric(
610 "gauge_a",
611 MetricKind::Absolute,
612 MetricValue::Gauge { value: 43.0 },
613 );
614 let result_count = make_metric(
615 "gauge_a",
616 MetricKind::Absolute,
617 MetricValue::Counter { value: 1.0 },
618 );
619 let result_count_2 = make_metric(
620 "gauge_a",
621 MetricKind::Absolute,
622 MetricValue::Counter { value: 2.0 },
623 );
624
625 assert_eq!(agg.record(gauge_a_1.clone()), None);
627 let mut out = vec![];
628 agg.flush_into(&mut out);
630 assert_eq!(1, out.len());
631 assert_eq!(&result_count, &out[0]);
632
633 out.clear();
635 agg.flush_into(&mut out);
636 assert_eq!(0, out.len());
637
638 out.clear();
640 agg.flush_into(&mut out);
641 assert_eq!(0, out.len());
642
643 assert_eq!(agg.record(gauge_a_1.clone()), None);
645 assert_eq!(agg.record(gauge_a_2.clone()), None);
646 out.clear();
647 agg.flush_into(&mut out);
648 assert_eq!(1, out.len());
649 assert_eq!(&result_count_2, &out[0]);
650 }
651
652 #[test]
653 fn absolute_max() {
654 let mut agg = Aggregate::new(&AggregateConfig {
655 interval_ms: 1000_u64,
656 mode: AggregationMode::Max,
657 })
658 .unwrap();
659
660 let gauge_a_1 = make_metric(
661 "gauge_a",
662 MetricKind::Absolute,
663 MetricValue::Gauge { value: 112.0 },
664 );
665 let gauge_a_2 = make_metric(
666 "gauge_a",
667 MetricKind::Absolute,
668 MetricValue::Gauge { value: 89.0 },
669 );
670
671 assert_eq!(agg.record(gauge_a_2.clone()), None);
673 let mut out = vec![];
674 agg.flush_into(&mut out);
676 assert_eq!(1, out.len());
677 assert_eq!(&gauge_a_2, &out[0]);
678
679 out.clear();
681 agg.flush_into(&mut out);
682 assert_eq!(0, out.len());
683
684 out.clear();
686 agg.flush_into(&mut out);
687 assert_eq!(0, out.len());
688
689 assert_eq!(agg.record(gauge_a_1.clone()), None);
691 assert_eq!(agg.record(gauge_a_2.clone()), None);
692 out.clear();
693 agg.flush_into(&mut out);
694 assert_eq!(1, out.len());
695 assert_eq!(&gauge_a_1, &out[0]);
696 }
697
698 #[test]
699 fn absolute_min() {
700 let mut agg = Aggregate::new(&AggregateConfig {
701 interval_ms: 1000_u64,
702 mode: AggregationMode::Min,
703 })
704 .unwrap();
705
706 let gauge_a_1 = make_metric(
707 "gauge_a",
708 MetricKind::Absolute,
709 MetricValue::Gauge { value: 32.0 },
710 );
711 let gauge_a_2 = make_metric(
712 "gauge_a",
713 MetricKind::Absolute,
714 MetricValue::Gauge { value: 89.0 },
715 );
716
717 assert_eq!(agg.record(gauge_a_2.clone()), None);
719 let mut out = vec![];
720 agg.flush_into(&mut out);
722 assert_eq!(1, out.len());
723 assert_eq!(&gauge_a_2, &out[0]);
724
725 out.clear();
727 agg.flush_into(&mut out);
728 assert_eq!(0, out.len());
729
730 out.clear();
732 agg.flush_into(&mut out);
733 assert_eq!(0, out.len());
734
735 assert_eq!(agg.record(gauge_a_1.clone()), None);
737 assert_eq!(agg.record(gauge_a_2.clone()), None);
738 out.clear();
739 agg.flush_into(&mut out);
740 assert_eq!(1, out.len());
741 assert_eq!(&gauge_a_1, &out[0]);
742 }
743
744 #[test]
745 fn absolute_diff() {
746 let mut agg = Aggregate::new(&AggregateConfig {
747 interval_ms: 1000_u64,
748 mode: AggregationMode::Diff,
749 })
750 .unwrap();
751
752 let gauge_a_1 = make_metric(
753 "gauge_a",
754 MetricKind::Absolute,
755 MetricValue::Gauge { value: 32.0 },
756 );
757 let gauge_a_2 = make_metric(
758 "gauge_a",
759 MetricKind::Absolute,
760 MetricValue::Gauge { value: 82.0 },
761 );
762 let result = make_metric(
763 "gauge_a",
764 MetricKind::Absolute,
765 MetricValue::Gauge { value: 50.0 },
766 );
767
768 assert_eq!(agg.record(gauge_a_2.clone()), None);
770 let mut out = vec![];
771 agg.flush_into(&mut out);
773 assert_eq!(1, out.len());
774 assert_eq!(&gauge_a_2, &out[0]);
775
776 out.clear();
778 agg.flush_into(&mut out);
779 assert_eq!(0, out.len());
780
781 out.clear();
783 agg.flush_into(&mut out);
784 assert_eq!(0, out.len());
785
786 assert_eq!(agg.record(gauge_a_1.clone()), None);
788 out.clear();
789 agg.flush_into(&mut out);
790 assert_eq!(1, out.len());
791 assert_eq!(&gauge_a_1, &out[0]);
792
793 assert_eq!(agg.record(gauge_a_2.clone()), None);
794 out.clear();
795 agg.flush_into(&mut out);
796 assert_eq!(1, out.len());
797 assert_eq!(&result, &out[0]);
798 }
799
800 #[test]
801 fn absolute_diff_conflicting_type() {
802 let mut agg = Aggregate::new(&AggregateConfig {
803 interval_ms: 1000_u64,
804 mode: AggregationMode::Diff,
805 })
806 .unwrap();
807
808 let gauge_a_1 = make_metric(
809 "gauge_a",
810 MetricKind::Absolute,
811 MetricValue::Gauge { value: 32.0 },
812 );
813 let gauge_a_2 = make_metric(
814 "gauge_a",
815 MetricKind::Absolute,
816 MetricValue::Counter { value: 1.0 },
817 );
818
819 let mut out = vec![];
820 assert_eq!(agg.record(gauge_a_1.clone()), None);
822 out.clear();
823 agg.flush_into(&mut out);
824 assert_eq!(1, out.len());
825 assert_eq!(&gauge_a_1, &out[0]);
826
827 assert_eq!(agg.record(gauge_a_2.clone()), None);
828 out.clear();
829 agg.flush_into(&mut out);
830 assert_eq!(1, out.len());
831 assert_eq!(&gauge_a_2, &out[0]);
833 }
834
835 #[test]
836 fn absolute_mean() {
837 let mut agg = Aggregate::new(&AggregateConfig {
838 interval_ms: 1000_u64,
839 mode: AggregationMode::Mean,
840 })
841 .unwrap();
842
843 let gauge_a_1 = make_metric(
844 "gauge_a",
845 MetricKind::Absolute,
846 MetricValue::Gauge { value: 32.0 },
847 );
848 let gauge_a_2 = make_metric(
849 "gauge_a",
850 MetricKind::Absolute,
851 MetricValue::Gauge { value: 82.0 },
852 );
853 let gauge_a_3 = make_metric(
854 "gauge_a",
855 MetricKind::Absolute,
856 MetricValue::Gauge { value: 51.0 },
857 );
858 let mean_result = make_metric(
859 "gauge_a",
860 MetricKind::Absolute,
861 MetricValue::Gauge { value: 55.0 },
862 );
863
864 assert_eq!(agg.record(gauge_a_2.clone()), None);
866 let mut out = vec![];
867 agg.flush_into(&mut out);
869 assert_eq!(1, out.len());
870 assert_eq!(&gauge_a_2, &out[0]);
871
872 out.clear();
874 agg.flush_into(&mut out);
875 assert_eq!(0, out.len());
876
877 out.clear();
879 agg.flush_into(&mut out);
880 assert_eq!(0, out.len());
881
882 assert_eq!(agg.record(gauge_a_1.clone()), None);
884 assert_eq!(agg.record(gauge_a_2.clone()), None);
885 assert_eq!(agg.record(gauge_a_3.clone()), None);
886 out.clear();
887 agg.flush_into(&mut out);
888 assert_eq!(1, out.len());
889 assert_eq!(&mean_result, &out[0]);
890 }
891
892 #[test]
893 fn absolute_stdev() {
894 let mut agg = Aggregate::new(&AggregateConfig {
895 interval_ms: 1000_u64,
896 mode: AggregationMode::Stdev,
897 })
898 .unwrap();
899
900 let gauges = vec![
901 make_metric(
902 "gauge_a",
903 MetricKind::Absolute,
904 MetricValue::Gauge { value: 25.0 },
905 ),
906 make_metric(
907 "gauge_a",
908 MetricKind::Absolute,
909 MetricValue::Gauge { value: 30.0 },
910 ),
911 make_metric(
912 "gauge_a",
913 MetricKind::Absolute,
914 MetricValue::Gauge { value: 35.0 },
915 ),
916 make_metric(
917 "gauge_a",
918 MetricKind::Absolute,
919 MetricValue::Gauge { value: 40.0 },
920 ),
921 make_metric(
922 "gauge_a",
923 MetricKind::Absolute,
924 MetricValue::Gauge { value: 45.0 },
925 ),
926 make_metric(
927 "gauge_a",
928 MetricKind::Absolute,
929 MetricValue::Gauge { value: 50.0 },
930 ),
931 make_metric(
932 "gauge_a",
933 MetricKind::Absolute,
934 MetricValue::Gauge { value: 55.0 },
935 ),
936 ];
937 let stdev_result = make_metric(
938 "gauge_a",
939 MetricKind::Absolute,
940 MetricValue::Gauge { value: 10.0 },
941 );
942
943 for gauge in gauges {
944 assert_eq!(agg.record(gauge), None);
945 }
946 let mut out = vec![];
947 agg.flush_into(&mut out);
948 assert_eq!(1, out.len());
949 assert_eq!(&stdev_result, &out[0]);
950 }
951
952 #[test]
953 fn passes_through_ignored_kind() {
954 let mut agg = Aggregate::new(&AggregateConfig {
956 interval_ms: 1000_u64,
957 mode: AggregationMode::Sum,
958 })
959 .unwrap();
960
961 let counter_1 = make_metric(
962 "counter_a",
963 MetricKind::Incremental,
964 MetricValue::Counter { value: 10.0 },
965 );
966 let counter_2 = make_metric(
967 "counter_a",
968 MetricKind::Incremental,
969 MetricValue::Counter { value: 5.0 },
970 );
971 let counter_summed = make_metric(
972 "counter_a",
973 MetricKind::Incremental,
974 MetricValue::Counter { value: 15.0 },
975 );
976 let gauge_1 = make_metric(
977 "gauge_a",
978 MetricKind::Absolute,
979 MetricValue::Gauge { value: 42.0 },
980 );
981 let gauge_2 = make_metric(
982 "gauge_a",
983 MetricKind::Absolute,
984 MetricValue::Gauge { value: 99.0 },
985 );
986
987 assert_eq!(agg.record(gauge_1.clone()), Some(gauge_1));
989 assert_eq!(agg.record(gauge_2.clone()), Some(gauge_2));
990
991 assert_eq!(agg.record(counter_1), None);
993 assert_eq!(agg.record(counter_2), None);
994
995 let mut out = vec![];
996 agg.flush_into(&mut out);
997 assert_eq!(1, out.len());
999 assert_eq!(&counter_summed, &out[0]);
1000 }
1001
1002 #[test]
1003 fn conflicting_value_type() {
1004 let mut agg = Aggregate::new(&AggregateConfig {
1005 interval_ms: 1000_u64,
1006 mode: AggregationMode::Auto,
1007 })
1008 .unwrap();
1009
1010 let counter = make_metric(
1011 "the-thing",
1012 MetricKind::Incremental,
1013 MetricValue::Counter { value: 42.0 },
1014 );
1015 let mut values = BTreeSet::<String>::new();
1016 values.insert("a".into());
1017 values.insert("b".into());
1018 let set = make_metric(
1019 "the-thing",
1020 MetricKind::Incremental,
1021 MetricValue::Set { values },
1022 );
1023 let summed = make_metric(
1024 "the-thing",
1025 MetricKind::Incremental,
1026 MetricValue::Counter { value: 84.0 },
1027 );
1028
1029 assert_eq!(agg.record(counter.clone()), None);
1033 assert_eq!(agg.record(counter.clone()), None);
1035 assert_eq!(agg.record(set.clone()), None);
1037 assert_eq!(agg.record(set.clone()), None);
1039 let mut out = vec![];
1040 agg.flush_into(&mut out);
1042 assert_eq!(1, out.len());
1043 assert_eq!(&set, &out[0]);
1044
1045 assert_eq!(agg.record(set.clone()), None);
1047 assert_eq!(agg.record(set), None);
1049 assert_eq!(agg.record(counter.clone()), None);
1051 assert_eq!(agg.record(counter), None);
1053 let mut out = vec![];
1054 agg.flush_into(&mut out);
1056 assert_eq!(1, out.len());
1057 assert_eq!(&summed, &out[0]);
1058 }
1059
1060 #[test]
1061 fn conflicting_kinds() {
1062 let mut agg = Aggregate::new(&AggregateConfig {
1063 interval_ms: 1000_u64,
1064 mode: AggregationMode::Auto,
1065 })
1066 .unwrap();
1067
1068 let incremental = make_metric(
1069 "the-thing",
1070 MetricKind::Incremental,
1071 MetricValue::Counter { value: 42.0 },
1072 );
1073 let absolute = make_metric(
1074 "the-thing",
1075 MetricKind::Absolute,
1076 MetricValue::Counter { value: 43.0 },
1077 );
1078 let summed = make_metric(
1079 "the-thing",
1080 MetricKind::Incremental,
1081 MetricValue::Counter { value: 84.0 },
1082 );
1083
1084 assert_eq!(agg.record(incremental.clone()), None);
1088 assert_eq!(agg.record(incremental.clone()), None);
1090 assert_eq!(agg.record(absolute.clone()), None);
1092 assert_eq!(agg.record(absolute.clone()), None);
1094 let mut out = vec![];
1095 agg.flush_into(&mut out);
1097 assert_eq!(1, out.len());
1098 assert_eq!(&absolute, &out[0]);
1099
1100 assert_eq!(agg.record(absolute.clone()), None);
1102 assert_eq!(agg.record(absolute), None);
1104 assert_eq!(agg.record(incremental.clone()), None);
1106 assert_eq!(agg.record(incremental), None);
1108 let mut out = vec![];
1109 agg.flush_into(&mut out);
1111 assert_eq!(1, out.len());
1112 assert_eq!(&summed, &out[0]);
1113 }
1114
1115 #[tokio::test]
1116 async fn transform_shutdown() {
1117 let agg = serde_yaml::from_str::<AggregateConfig>(indoc! {"
1118 interval_ms: 999999
1119 "})
1120 .unwrap()
1121 .build(&TransformContext::default())
1122 .await
1123 .unwrap();
1124
1125 let agg = agg.into_task();
1126
1127 let counter_a_1 = make_metric(
1128 "counter_a",
1129 MetricKind::Incremental,
1130 MetricValue::Counter { value: 42.0 },
1131 );
1132 let counter_a_2 = make_metric(
1133 "counter_a",
1134 MetricKind::Incremental,
1135 MetricValue::Counter { value: 43.0 },
1136 );
1137 let counter_a_summed = make_metric(
1138 "counter_a",
1139 MetricKind::Incremental,
1140 MetricValue::Counter { value: 85.0 },
1141 );
1142 let gauge_a_1 = make_metric(
1143 "gauge_a",
1144 MetricKind::Absolute,
1145 MetricValue::Gauge { value: 42.0 },
1146 );
1147 let gauge_a_2 = make_metric(
1148 "gauge_a",
1149 MetricKind::Absolute,
1150 MetricValue::Gauge { value: 43.0 },
1151 );
1152 let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
1153
1154 let in_stream = Box::pin(stream::iter(inputs));
1156 let mut out_stream = agg.transform_events(in_stream);
1158
1159 let mut count = 0_u8;
1163 while let Some(event) = out_stream.next().await {
1164 count += 1;
1165 match event.as_metric().series().name.name.as_str() {
1166 "counter_a" => assert_eq!(counter_a_summed, event),
1167 "gauge_a" => assert_eq!(gauge_a_2, event),
1168 _ => panic!("Unexpected metric name in aggregate output"),
1169 };
1170 }
1171 assert_eq!(2, count);
1173 }
1174
1175 #[tokio::test]
1176 async fn transform_interval() {
1177 let transform_config = serde_yaml::from_str::<AggregateConfig>("{}").unwrap();
1178
1179 let counter_a_1 = make_metric(
1180 "counter_a",
1181 MetricKind::Incremental,
1182 MetricValue::Counter { value: 42.0 },
1183 );
1184 let counter_a_2 = make_metric(
1185 "counter_a",
1186 MetricKind::Incremental,
1187 MetricValue::Counter { value: 43.0 },
1188 );
1189 let counter_a_summed = make_metric(
1190 "counter_a",
1191 MetricKind::Incremental,
1192 MetricValue::Counter { value: 85.0 },
1193 );
1194 let gauge_a_1 = make_metric(
1195 "gauge_a",
1196 MetricKind::Absolute,
1197 MetricValue::Gauge { value: 42.0 },
1198 );
1199 let gauge_a_2 = make_metric(
1200 "gauge_a",
1201 MetricKind::Absolute,
1202 MetricValue::Gauge { value: 43.0 },
1203 );
1204
1205 assert_transform_compliance(async {
1206 let (tx, rx) = mpsc::channel(10);
1207 let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
1208 let mut out = ReceiverStream::new(out);
1209
1210 tokio::time::pause();
1211
1212 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1215
1216 tx.send(counter_a_1).await.unwrap();
1218 tx.send(counter_a_2).await.unwrap();
1219 tx.send(gauge_a_1).await.unwrap();
1220 tx.send(gauge_a_2.clone()).await.unwrap();
1221 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1223 tokio::time::advance(Duration::from_secs(11)).await;
1225 let mut count = 0_u8;
1228 while count < 2 {
1229 match out.next().await {
1230 Some(event) => {
1231 match event.as_metric().series().name.name.as_str() {
1232 "counter_a" => assert_eq!(counter_a_summed, event),
1233 "gauge_a" => assert_eq!(gauge_a_2, event),
1234 _ => panic!("Unexpected metric name in aggregate output"),
1235 };
1236 count += 1;
1237 }
1238 _ => {
1239 panic!("Unexpectedly received None in output stream");
1240 }
1241 }
1242 }
1243 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1245
1246 drop(tx);
1247 topology.stop().await;
1248 assert_eq!(out.next().await, None);
1249 })
1250 .await;
1251 }
1252}