1use std::{collections::HashMap, future::ready, pin::Pin, time::Duration};
2
3use futures::{Stream, StreamExt};
4use vector_lib::configurable::configurable_component;
5
6use crate::{
7 config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
8 event::Event,
9 schema,
10 sinks::util::buffer::metrics::{MetricSet, NormalizerConfig, NormalizerSettings},
11 transforms::{TaskTransform, Transform},
12};
13
14#[configurable_component(transform(
16 "incremental_to_absolute",
17 "Convert incremental metrics to absolute."
18))]
19#[derive(Clone, Debug, Default)]
20#[serde(deny_unknown_fields)]
21pub struct IncrementalToAbsoluteConfig {
22 #[configurable(derived)]
28 #[serde(default)]
29 pub cache: NormalizerConfig<IncrementalToAbsoluteDefaultNormalizerSettings>,
30}
31
32#[derive(Clone, Copy, Debug, Default)]
33pub struct IncrementalToAbsoluteDefaultNormalizerSettings;
34
35impl NormalizerSettings for IncrementalToAbsoluteDefaultNormalizerSettings {
36 const MAX_EVENTS: Option<usize> = None;
37 const MAX_BYTES: Option<usize> = None;
38 const TIME_TO_LIVE: Option<u64> = Some(300);
39}
40
41pub const fn default_expire_metrics_secs() -> Duration {
42 Duration::from_secs(120)
43}
44
45impl_generate_config_from_default!(IncrementalToAbsoluteConfig);
46
47#[async_trait::async_trait]
48#[typetag::serde(name = "incremental_to_absolute")]
49impl TransformConfig for IncrementalToAbsoluteConfig {
50 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
51 IncrementalToAbsolute::new(self).map(Transform::event_task)
52 }
53
54 fn input(&self) -> Input {
55 Input::metric()
56 }
57
58 fn outputs(
59 &self,
60 _: &TransformContext,
61 _: &[(OutputId, schema::Definition)],
62 ) -> Vec<TransformOutput> {
63 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
64 }
65}
66#[derive(Debug)]
67pub struct IncrementalToAbsolute {
68 data: MetricSet,
69}
70
71impl IncrementalToAbsolute {
72 pub fn new(config: &IncrementalToAbsoluteConfig) -> crate::Result<Self> {
73 Ok(Self {
75 data: MetricSet::new(config.cache.validate()?.into_settings()),
76 })
77 }
78 pub fn transform_one(&mut self, event: Event) -> Option<Event> {
79 self.data
80 .make_absolute(event.as_metric().clone())
81 .map(Event::Metric)
82 }
83}
84
85impl TaskTransform<Event> for IncrementalToAbsolute {
86 fn transform(
87 self: Box<Self>,
88 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
89 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
90 where
91 Self: 'static,
92 {
93 let mut inner = self;
94 Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use std::sync::Arc;
101
102 use futures_util::SinkExt;
103 use indoc::indoc;
104 use similar_asserts::assert_eq;
105 use vector_lib::config::ComponentKey;
106
107 use super::*;
108 use crate::event::{
109 Metric,
110 metric::{MetricKind, MetricValue},
111 };
112
113 fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
114 let mut event = Event::Metric(Metric::new(name, kind, value))
115 .with_source_id(Arc::new(ComponentKey::from("in")))
116 .with_upstream_id(Arc::new(OutputId::from("transform")));
117
118 event.metadata_mut().set_source_type("unit_test_stream");
119
120 event
121 }
122
123 async fn assert_metric_eq(
124 tx: &mut futures::channel::mpsc::Sender<Event>,
125 mut out_stream: impl Stream<Item = Event> + Unpin,
126 metric: Event,
127 expected_metric: Event,
128 ) {
129 tx.send(metric).await.unwrap();
130 if let Some(out_event) = out_stream.next().await {
131 let result = out_event;
132 assert_eq!(result, expected_metric);
133 } else {
134 panic!("Unexpectedly received None in output stream");
135 }
136 }
137
138 #[tokio::test]
139 async fn test_incremental_to_absolute() {
140 let config = serde_yaml::from_str::<IncrementalToAbsoluteConfig>(indoc! {"
141 cache:
142 max_events: 100
143 "})
144 .unwrap();
145 let incremental_to_absolute = IncrementalToAbsolute::new(&config)
146 .map(Transform::event_task)
147 .unwrap();
148 let incremental_to_absolute = incremental_to_absolute.into_task();
149 let (mut tx, rx) = futures::channel::mpsc::channel(10);
150 let mut out_stream = incremental_to_absolute.transform_events(Box::pin(rx));
151
152 let inc_counter_1 = make_metric(
153 "incremental_counter",
154 MetricKind::Incremental,
155 MetricValue::Counter { value: 10.0 },
156 );
157 let expected_inc_counter_1 = make_metric(
158 "incremental_counter",
159 MetricKind::Absolute,
160 MetricValue::Counter { value: 10.0 },
161 );
162 assert_metric_eq(
163 &mut tx,
164 &mut out_stream,
165 inc_counter_1,
166 expected_inc_counter_1,
167 )
168 .await;
169
170 let inc_counter_2 = make_metric(
171 "incremental_counter",
172 MetricKind::Incremental,
173 MetricValue::Counter { value: 10.0 },
174 );
175 let expected_inc_counter_2 = make_metric(
176 "incremental_counter",
177 MetricKind::Absolute,
178 MetricValue::Counter { value: 20.0 },
179 );
180 assert_metric_eq(
181 &mut tx,
182 &mut out_stream,
183 inc_counter_2,
184 expected_inc_counter_2,
185 )
186 .await;
187
188 let inc_counter_3 = make_metric(
189 "incremental_counter",
190 MetricKind::Incremental,
191 MetricValue::Counter { value: 10.0 },
192 );
193 let expected_inc_counter_3 = make_metric(
194 "incremental_counter",
195 MetricKind::Absolute,
196 MetricValue::Counter { value: 30.0 },
197 );
198 assert_metric_eq(
199 &mut tx,
200 &mut out_stream,
201 inc_counter_3,
202 expected_inc_counter_3,
203 )
204 .await;
205
206 let gauge = make_metric(
208 "gauge",
209 MetricKind::Absolute,
210 MetricValue::Gauge { value: 42.0 },
211 );
212 let expected_gauge = gauge.clone();
213 assert_metric_eq(&mut tx, &mut out_stream, gauge, expected_gauge).await;
214
215 let absolute_counter = make_metric(
216 "absolute_counter",
217 MetricKind::Absolute,
218 MetricValue::Counter { value: 42.0 },
219 );
220 let absolute_counter_expected = absolute_counter.clone();
221 assert_metric_eq(
222 &mut tx,
223 &mut out_stream,
224 absolute_counter,
225 absolute_counter_expected,
226 )
227 .await;
228 }
229}