Skip to main content

vector/transforms/
incremental_to_absolute.rs

1use std::{collections::HashMap, future::ready, pin::Pin, time::Duration};
2
3use futures::{Stream, StreamExt};
4use vector_lib::configurable::configurable_component;
5
6use crate::{
7    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
8    event::Event,
9    schema,
10    sinks::util::buffer::metrics::{MetricSet, NormalizerConfig, NormalizerSettings},
11    transforms::{TaskTransform, Transform},
12};
13
14/// Configuration for the `incremental_to_absolute` transform.
15#[configurable_component(transform(
16    "incremental_to_absolute",
17    "Convert incremental metrics to absolute."
18))]
19#[derive(Clone, Debug, Default)]
20#[serde(deny_unknown_fields)]
21pub struct IncrementalToAbsoluteConfig {
22    /// Configuration for the internal metrics cache used to normalize a stream of incremental
23    /// metrics into absolute metrics.
24    ///
25    /// By default, incremental metrics are evicted after 5 minutes of not being updated. The next
26    /// incremental value will be reset.
27    #[configurable(derived)]
28    #[serde(default)]
29    pub cache: NormalizerConfig<IncrementalToAbsoluteDefaultNormalizerSettings>,
30}
31
32#[derive(Clone, Copy, Debug, Default)]
33pub struct IncrementalToAbsoluteDefaultNormalizerSettings;
34
35impl NormalizerSettings for IncrementalToAbsoluteDefaultNormalizerSettings {
36    const MAX_EVENTS: Option<usize> = None;
37    const MAX_BYTES: Option<usize> = None;
38    const TIME_TO_LIVE: Option<u64> = Some(300);
39}
40
41pub const fn default_expire_metrics_secs() -> Duration {
42    Duration::from_secs(120)
43}
44
45impl_generate_config_from_default!(IncrementalToAbsoluteConfig);
46
47#[async_trait::async_trait]
48#[typetag::serde(name = "incremental_to_absolute")]
49impl TransformConfig for IncrementalToAbsoluteConfig {
50    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
51        IncrementalToAbsolute::new(self).map(Transform::event_task)
52    }
53
54    fn input(&self) -> Input {
55        Input::metric()
56    }
57
58    fn outputs(
59        &self,
60        _: &TransformContext,
61        _: &[(OutputId, schema::Definition)],
62    ) -> Vec<TransformOutput> {
63        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
64    }
65}
66#[derive(Debug)]
67pub struct IncrementalToAbsolute {
68    data: MetricSet,
69}
70
71impl IncrementalToAbsolute {
72    pub fn new(config: &IncrementalToAbsoluteConfig) -> crate::Result<Self> {
73        // Create a new MetricSet with the proper cache settings
74        Ok(Self {
75            data: MetricSet::new(config.cache.validate()?.into_settings()),
76        })
77    }
78    pub fn transform_one(&mut self, event: Event) -> Option<Event> {
79        self.data
80            .make_absolute(event.as_metric().clone())
81            .map(Event::Metric)
82    }
83}
84
85impl TaskTransform<Event> for IncrementalToAbsolute {
86    fn transform(
87        self: Box<Self>,
88        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
89    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
90    where
91        Self: 'static,
92    {
93        let mut inner = self;
94        Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use std::sync::Arc;
101
102    use futures_util::SinkExt;
103    use indoc::indoc;
104    use similar_asserts::assert_eq;
105    use vector_lib::config::ComponentKey;
106
107    use super::*;
108    use crate::event::{
109        Metric,
110        metric::{MetricKind, MetricValue},
111    };
112
113    fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
114        let mut event = Event::Metric(Metric::new(name, kind, value))
115            .with_source_id(Arc::new(ComponentKey::from("in")))
116            .with_upstream_id(Arc::new(OutputId::from("transform")));
117
118        event.metadata_mut().set_source_type("unit_test_stream");
119
120        event
121    }
122
123    async fn assert_metric_eq(
124        tx: &mut futures::channel::mpsc::Sender<Event>,
125        mut out_stream: impl Stream<Item = Event> + Unpin,
126        metric: Event,
127        expected_metric: Event,
128    ) {
129        tx.send(metric).await.unwrap();
130        if let Some(out_event) = out_stream.next().await {
131            let result = out_event;
132            assert_eq!(result, expected_metric);
133        } else {
134            panic!("Unexpectedly received None in output stream");
135        }
136    }
137
138    #[tokio::test]
139    async fn test_incremental_to_absolute() {
140        let config = serde_yaml::from_str::<IncrementalToAbsoluteConfig>(indoc! {"
141            cache:
142              max_events: 100
143        "})
144        .unwrap();
145        let incremental_to_absolute = IncrementalToAbsolute::new(&config)
146            .map(Transform::event_task)
147            .unwrap();
148        let incremental_to_absolute = incremental_to_absolute.into_task();
149        let (mut tx, rx) = futures::channel::mpsc::channel(10);
150        let mut out_stream = incremental_to_absolute.transform_events(Box::pin(rx));
151
152        let inc_counter_1 = make_metric(
153            "incremental_counter",
154            MetricKind::Incremental,
155            MetricValue::Counter { value: 10.0 },
156        );
157        let expected_inc_counter_1 = make_metric(
158            "incremental_counter",
159            MetricKind::Absolute,
160            MetricValue::Counter { value: 10.0 },
161        );
162        assert_metric_eq(
163            &mut tx,
164            &mut out_stream,
165            inc_counter_1,
166            expected_inc_counter_1,
167        )
168        .await;
169
170        let inc_counter_2 = make_metric(
171            "incremental_counter",
172            MetricKind::Incremental,
173            MetricValue::Counter { value: 10.0 },
174        );
175        let expected_inc_counter_2 = make_metric(
176            "incremental_counter",
177            MetricKind::Absolute,
178            MetricValue::Counter { value: 20.0 },
179        );
180        assert_metric_eq(
181            &mut tx,
182            &mut out_stream,
183            inc_counter_2,
184            expected_inc_counter_2,
185        )
186        .await;
187
188        let inc_counter_3 = make_metric(
189            "incremental_counter",
190            MetricKind::Incremental,
191            MetricValue::Counter { value: 10.0 },
192        );
193        let expected_inc_counter_3 = make_metric(
194            "incremental_counter",
195            MetricKind::Absolute,
196            MetricValue::Counter { value: 30.0 },
197        );
198        assert_metric_eq(
199            &mut tx,
200            &mut out_stream,
201            inc_counter_3,
202            expected_inc_counter_3,
203        )
204        .await;
205
206        // Absolute counters and gauges are emitted unchanged
207        let gauge = make_metric(
208            "gauge",
209            MetricKind::Absolute,
210            MetricValue::Gauge { value: 42.0 },
211        );
212        let expected_gauge = gauge.clone();
213        assert_metric_eq(&mut tx, &mut out_stream, gauge, expected_gauge).await;
214
215        let absolute_counter = make_metric(
216            "absolute_counter",
217            MetricKind::Absolute,
218            MetricValue::Counter { value: 42.0 },
219        );
220        let absolute_counter_expected = absolute_counter.clone();
221        assert_metric_eq(
222            &mut tx,
223            &mut out_stream,
224            absolute_counter,
225            absolute_counter_expected,
226        )
227        .await;
228    }
229}