Skip to main content

vector_vrl_metrics/
common.rs

1use std::{collections::BTreeMap, sync::Arc, time::Duration};
2use tokio::time::interval;
3use tokio_stream::{wrappers::IntervalStream, StreamExt};
4use vector_common::shutdown::ShutdownSignal;
5use vrl::{
6    diagnostic::Label,
7    prelude::{expression::Expr, *},
8    value,
9};
10
11use arc_swap::ArcSwap;
12use vector_core::{event::Metric, metrics::Controller};
13
14#[derive(Debug)]
15pub(crate) enum Error {
16    MetricsStorageNotLoaded,
17}
18
19impl fmt::Display for Error {
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        match self {
22            Error::MetricsStorageNotLoaded => write!(f, "metrics storage not loaded"),
23        }
24    }
25}
26
27impl std::error::Error for Error {}
28
29impl DiagnosticMessage for Error {
30    fn code(&self) -> usize {
31        112
32    }
33
34    fn labels(&self) -> Vec<Label> {
35        match self {
36            Error::MetricsStorageNotLoaded => {
37                vec![Label::primary(
38                    "VRL metrics error: metrics storage not loaded".to_string(),
39                    Span::default(),
40                )]
41            }
42        }
43    }
44}
45
46#[derive(Debug, Default, Clone)]
47pub struct MetricsStorage {
48    // Made pub only for vrl-test module
49    #[doc(hidden)]
50    pub cache: Arc<ArcSwap<Vec<Metric>>>,
51}
52
53impl MetricsStorage {
54    pub(crate) fn get_metric(
55        &self,
56        metric: &str,
57        tags: BTreeMap<String, String>,
58    ) -> Option<Metric> {
59        self.cache
60            .load()
61            .iter()
62            .find(|m| m.name() == metric && tags.iter().all(|tag| tag_matches(m, tag)))
63            .cloned()
64    }
65
66    pub(crate) fn find_metrics(&self, metric: &str, tags: BTreeMap<String, String>) -> Vec<Metric> {
67        self.cache
68            .load()
69            .iter()
70            .filter(|m| m.name() == metric && tags.iter().all(|tag| tag_matches(m, tag)))
71            .cloned()
72            .collect()
73    }
74
75    pub fn refresh_metrics(&self) {
76        let new_metrics = Controller::get()
77            .expect("metrics not initialized")
78            .capture_metrics();
79        self.cache.store(new_metrics.into());
80    }
81
82    pub async fn run_periodic_refresh(
83        &self,
84        refresh_interval: Duration,
85        mut shutdown: ShutdownSignal,
86    ) {
87        let mut intervals = IntervalStream::new(interval(refresh_interval));
88        loop {
89            tokio::select! {
90                Some(_) = intervals.next() => {
91                    self.refresh_metrics();
92                }
93                _ = &mut shutdown => {
94                    break;
95                }
96            }
97        }
98    }
99}
100
101/// Checks if the tag matches - also considers wildcards
102fn tag_matches(metric: &Metric, (tag_key, tag_value): (&String, &String)) -> bool {
103    if let Some((prefix, suffix)) = tag_value.split_once('*') {
104        let Some(metric_tag_value) = metric.tag_value(tag_key) else {
105            return false;
106        };
107
108        metric_tag_value.starts_with(prefix) && metric_tag_value.ends_with(suffix)
109    } else {
110        metric.tag_matches(tag_key, tag_value)
111    }
112}
113
114pub(crate) fn metrics_vrl_typedef() -> BTreeMap<Field, Kind> {
115    BTreeMap::from([
116        (Field::from("name"), Kind::bytes()),
117        (Field::from("tags"), Kind::any_object()),
118        (Field::from("type"), Kind::bytes()),
119        (Field::from("kind"), Kind::bytes()),
120        (Field::from("value"), Kind::float() | Kind::null()),
121    ])
122}
123
124pub(crate) fn metric_into_vrl(value: &Metric) -> Value {
125    value!({
126        name: { value.name() },
127        tags: {
128            BTreeMap::from_iter(
129                value
130                .tags()
131                .map(|t| {
132                    t.iter_sets()
133                        .map(|(k, v)| {
134                            (
135                                k.into(),
136                                Value::Array(
137                                    v.iter()
138                                    .filter_map(|v| {
139                                        v.map(ToString::to_string).map(Into::into).map(Value::Bytes)
140                                    })
141                                    .collect(),
142                                ),
143                            )
144                        })
145                    .collect::<Vec<_>>()
146                })
147                .unwrap_or_default(),
148            )
149        },
150        "type": { value.value().as_name() },
151        kind: {
152            match value.kind() {
153                vector_core::event::MetricKind::Incremental => "incremental",
154                vector_core::event::MetricKind::Absolute => "absolute",
155            }
156        },
157        value: {
158            match value.value() {
159                vector_core::event::MetricValue::Counter { value }
160                | vector_core::event::MetricValue::Gauge { value } => NotNan::new(*value).ok(),
161                _ => None,
162            }
163        }
164    })
165}
166
167pub(crate) fn validate_tags(
168    state: &TypeState,
169    tags: &BTreeMap<KeyString, Expr>,
170) -> Result<(), Box<dyn DiagnosticMessage>> {
171    for v in tags.values() {
172        if *v.type_def(state).kind() != Kind::bytes() {
173            return Err(Box::new(vrl::compiler::function::Error::InvalidArgument {
174                keyword: "tags.value",
175                value: v.resolve_constant(state).unwrap_or(Value::Null),
176                error: "Tag values must be strings",
177            }));
178        }
179    }
180    Ok(())
181}
182
183pub(crate) fn resolve_tags(
184    ctx: &mut Context,
185    tags: &BTreeMap<KeyString, Expr>,
186) -> Result<BTreeMap<String, String>, ExpressionError> {
187    tags.iter()
188        .map(|(k, v)| {
189            v.resolve(ctx).and_then(|v| {
190                Ok((
191                    k.clone().into(),
192                    v.as_str().ok_or("Tag must be a string")?.into_owned(),
193                ))
194            })
195        })
196        .collect::<Result<_, _>>()
197}
198
199// Tests are defined here to simplify them - enabling access to `MetricsStorage`
200#[cfg(test)]
201mod tests {
202    use vector_core::{
203        compile_vrl,
204        event::{Event, LogEvent, MetricKind, MetricTags, VrlTarget},
205    };
206    use vrl::{
207        compiler::{
208            runtime::{Runtime, Terminate},
209            CompilationResult, CompileConfig,
210        },
211        diagnostic::DiagnosticList,
212    };
213
214    use super::*;
215
216    fn compile(
217        storage: MetricsStorage,
218        vrl_source: &str,
219    ) -> Result<CompilationResult, DiagnosticList> {
220        // vector_vrl_functions depends on this crate, so we can't use that here
221        #[allow(clippy::disallowed_methods)]
222        let functions = vrl::stdlib::all().into_iter();
223
224        let functions = functions.chain(crate::all()).collect::<Vec<_>>();
225
226        let state = TypeState::default();
227
228        let mut config = CompileConfig::default();
229        config.set_custom(storage.clone());
230        config.set_read_only();
231
232        compile_vrl(vrl_source, &functions, &state, config)
233    }
234
235    fn compile_and_run(storage: MetricsStorage, vrl_source: &str) -> Result<Value, Terminate> {
236        let CompilationResult {
237            program,
238            warnings: _,
239            config: _,
240        } = compile(storage, vrl_source).expect("compilation failed");
241
242        let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
243        Runtime::default().resolve(&mut target, &program, &TimeZone::default())
244    }
245
246    fn assert_metric_matches(
247        metric: &BTreeMap<KeyString, Value>,
248        name: &str,
249        value: f64,
250        tags: Option<Vec<(&str, &str)>>,
251    ) {
252        assert_eq!(metric.get("name").unwrap().as_str().unwrap(), name);
253        assert_eq!(
254            metric.get("value").unwrap().as_float().unwrap(),
255            NotNan::new(value).unwrap()
256        );
257
258        if let Some(tags) = tags {
259            let metric_tags = metric.get("tags").unwrap().as_object().unwrap();
260            for (key, value) in tags {
261                assert_eq!(
262                    metric_tags
263                        .get(key)
264                        .unwrap()
265                        .as_array_unwrap()
266                        .first()
267                        .unwrap()
268                        .as_str()
269                        .unwrap(),
270                    value
271                );
272            }
273        }
274    }
275
276    #[test]
277    fn test_get_vector_metric() {
278        let storage = MetricsStorage::default();
279        storage.cache.store(
280            vec![Metric::new(
281                "test",
282                MetricKind::Absolute,
283                vector_core::event::MetricValue::Gauge { value: 1.0 },
284            )]
285            .into(),
286        );
287
288        let result = compile_and_run(
289            storage,
290            r#"
291            get_vector_metric("test")
292        "#,
293        )
294        .expect("vrl failed");
295        let result = result.as_object().unwrap();
296
297        assert_metric_matches(result, "test", 1.0, None);
298    }
299
300    #[test]
301    fn test_find_vector_metrics() {
302        let storage = MetricsStorage::default();
303        storage.cache.store(
304            vec![
305                Metric::new(
306                    "test",
307                    MetricKind::Absolute,
308                    vector_core::event::MetricValue::Gauge { value: 1.0 },
309                )
310                .with_tags(Some(MetricTags::from_iter([(
311                    "component_id".to_string(),
312                    "a".to_string(),
313                )]))),
314                Metric::new(
315                    "test",
316                    MetricKind::Absolute,
317                    vector_core::event::MetricValue::Gauge { value: 1.0 },
318                )
319                .with_tags(Some(MetricTags::from_iter([(
320                    "component_id".to_string(),
321                    "b".to_string(),
322                )]))),
323            ]
324            .into(),
325        );
326
327        let result = compile_and_run(
328            storage,
329            r#"
330            find_vector_metrics("test")
331        "#,
332        )
333        .expect("vrl failed");
334        let result = result.as_array_unwrap();
335
336        assert_metric_matches(
337            result[0].as_object().unwrap(),
338            "test",
339            1.0,
340            Some(vec![("component_id", "a")]),
341        );
342        assert_metric_matches(
343            result[1].as_object().unwrap(),
344            "test",
345            1.0,
346            Some(vec![("component_id", "b")]),
347        );
348    }
349
350    #[test]
351    fn test_get_vector_metric_by_tag() {
352        let storage = MetricsStorage::default();
353        storage.cache.store(
354            vec![
355                Metric::new(
356                    "test",
357                    MetricKind::Absolute,
358                    vector_core::event::MetricValue::Gauge { value: 1.0 },
359                )
360                .with_tags(Some(MetricTags::from_iter([(
361                    "component_id".to_string(),
362                    "a".to_string(),
363                )]))),
364                Metric::new(
365                    "test",
366                    MetricKind::Absolute,
367                    vector_core::event::MetricValue::Gauge { value: 1.0 },
368                )
369                .with_tags(Some(MetricTags::from_iter([(
370                    "component_id".to_string(),
371                    "b".to_string(),
372                )]))),
373            ]
374            .into(),
375        );
376
377        let result = compile_and_run(
378            storage,
379            r#"
380            get_vector_metric("test", tags: { "component_id": "b" })
381        "#,
382        )
383        .expect("vrl failed");
384        let result = result.as_object().unwrap();
385
386        assert_metric_matches(result, "test", 1.0, Some(vec![("component_id", "b")]));
387    }
388
389    #[test]
390    fn test_find_vector_metrics_wildcard() {
391        let storage = MetricsStorage::default();
392        storage.cache.store(
393            vec![
394                Metric::new(
395                    "test",
396                    MetricKind::Absolute,
397                    vector_core::event::MetricValue::Gauge { value: 1.0 },
398                )
399                .with_tags(Some(MetricTags::from_iter([(
400                    "component_id".to_string(),
401                    "a".to_string(),
402                )]))),
403                Metric::new(
404                    "test",
405                    MetricKind::Absolute,
406                    vector_core::event::MetricValue::Gauge { value: 1.0 },
407                )
408                .with_tags(Some(MetricTags::from_iter([(
409                    "component_id".to_string(),
410                    "b".to_string(),
411                )]))),
412                Metric::new(
413                    "test",
414                    MetricKind::Absolute,
415                    vector_core::event::MetricValue::Gauge { value: 1.0 },
416                ),
417            ]
418            .into(),
419        );
420
421        let result = compile_and_run(
422            storage,
423            r#"
424            find_vector_metrics("test", tags: { "component_id": "*" })
425        "#,
426        )
427        .expect("vrl failed");
428        let result = result.as_array_unwrap();
429
430        // 2 metrics, because they have component_id, 3rd one doesn't
431        assert_eq!(result.len(), 2);
432        assert_metric_matches(
433            result[0].as_object().unwrap(),
434            "test",
435            1.0,
436            Some(vec![("component_id", "a")]),
437        );
438        assert_metric_matches(
439            result[1].as_object().unwrap(),
440            "test",
441            1.0,
442            Some(vec![("component_id", "b")]),
443        );
444    }
445
446    #[test]
447    fn test_find_vector_metrics_wildcard_start() {
448        let storage = MetricsStorage::default();
449        storage.cache.store(
450            vec![
451                Metric::new(
452                    "test",
453                    MetricKind::Absolute,
454                    vector_core::event::MetricValue::Gauge { value: 1.0 },
455                )
456                .with_tags(Some(MetricTags::from_iter([(
457                    "component_id".to_string(),
458                    "prefix.a".to_string(),
459                )]))),
460                Metric::new(
461                    "test",
462                    MetricKind::Absolute,
463                    vector_core::event::MetricValue::Gauge { value: 1.0 },
464                )
465                .with_tags(Some(MetricTags::from_iter([(
466                    "component_id".to_string(),
467                    "something_else".to_string(),
468                )]))),
469                Metric::new(
470                    "test",
471                    MetricKind::Absolute,
472                    vector_core::event::MetricValue::Gauge { value: 1.0 },
473                )
474                .with_tags(Some(MetricTags::from_iter([(
475                    "component_id".to_string(),
476                    "prefix.c".to_string(),
477                )]))),
478            ]
479            .into(),
480        );
481
482        let result = compile_and_run(
483            storage,
484            r#"
485            find_vector_metrics("test", tags: { "component_id": "prefix.*" })
486        "#,
487        )
488        .expect("vrl failed");
489        let result = result.as_array_unwrap();
490
491        assert_eq!(result.len(), 2);
492        assert_metric_matches(
493            result[0].as_object().unwrap(),
494            "test",
495            1.0,
496            Some(vec![("component_id", "prefix.a")]),
497        );
498        assert_metric_matches(
499            result[1].as_object().unwrap(),
500            "test",
501            1.0,
502            Some(vec![("component_id", "prefix.c")]),
503        );
504    }
505
506    #[test]
507    fn test_find_vector_metrics_wildcard_end() {
508        let storage = MetricsStorage::default();
509        storage.cache.store(
510            vec![
511                Metric::new(
512                    "test",
513                    MetricKind::Absolute,
514                    vector_core::event::MetricValue::Gauge { value: 1.0 },
515                )
516                .with_tags(Some(MetricTags::from_iter([(
517                    "component_id".to_string(),
518                    "a.suffix".to_string(),
519                )]))),
520                Metric::new(
521                    "test",
522                    MetricKind::Absolute,
523                    vector_core::event::MetricValue::Gauge { value: 1.0 },
524                )
525                .with_tags(Some(MetricTags::from_iter([(
526                    "component_id".to_string(),
527                    "something_else".to_string(),
528                )]))),
529                Metric::new(
530                    "test",
531                    MetricKind::Absolute,
532                    vector_core::event::MetricValue::Gauge { value: 1.0 },
533                )
534                .with_tags(Some(MetricTags::from_iter([(
535                    "component_id".to_string(),
536                    "c.suffix".to_string(),
537                )]))),
538            ]
539            .into(),
540        );
541
542        let result = compile_and_run(
543            storage,
544            r#"
545            find_vector_metrics("test", tags: { "component_id": "*.suffix" })
546        "#,
547        )
548        .expect("vrl failed");
549        let result = result.as_array_unwrap();
550
551        assert_eq!(result.len(), 2);
552        assert_metric_matches(
553            result[0].as_object().unwrap(),
554            "test",
555            1.0,
556            Some(vec![("component_id", "a.suffix")]),
557        );
558        assert_metric_matches(
559            result[1].as_object().unwrap(),
560            "test",
561            1.0,
562            Some(vec![("component_id", "c.suffix")]),
563        );
564    }
565
566    #[test]
567    fn test_find_vector_metrics_wildcard_middle() {
568        let storage = MetricsStorage::default();
569        storage.cache.store(
570            vec![
571                Metric::new(
572                    "test",
573                    MetricKind::Absolute,
574                    vector_core::event::MetricValue::Gauge { value: 1.0 },
575                )
576                .with_tags(Some(MetricTags::from_iter([(
577                    "component_id".to_string(),
578                    "start.a.end".to_string(),
579                )]))),
580                Metric::new(
581                    "test",
582                    MetricKind::Absolute,
583                    vector_core::event::MetricValue::Gauge { value: 1.0 },
584                )
585                .with_tags(Some(MetricTags::from_iter([(
586                    "component_id".to_string(),
587                    "something_else".to_string(),
588                )]))),
589                Metric::new(
590                    "test",
591                    MetricKind::Absolute,
592                    vector_core::event::MetricValue::Gauge { value: 1.0 },
593                )
594                .with_tags(Some(MetricTags::from_iter([(
595                    "component_id".to_string(),
596                    "start.c.end".to_string(),
597                )]))),
598            ]
599            .into(),
600        );
601
602        let result = compile_and_run(
603            storage,
604            r#"
605            find_vector_metrics("test", tags: { "component_id": "start.*.end" })
606        "#,
607        )
608        .expect("vrl failed");
609        let result = result.as_array_unwrap();
610
611        assert_eq!(result.len(), 2);
612        assert_metric_matches(
613            result[0].as_object().unwrap(),
614            "test",
615            1.0,
616            Some(vec![("component_id", "start.a.end")]),
617        );
618        assert_metric_matches(
619            result[1].as_object().unwrap(),
620            "test",
621            1.0,
622            Some(vec![("component_id", "start.c.end")]),
623        );
624    }
625
626    #[test]
627    fn test_aggregate_vector_metrics_sum() {
628        let storage = MetricsStorage::default();
629        storage.cache.store(
630            vec![
631                Metric::new(
632                    "test",
633                    MetricKind::Absolute,
634                    vector_core::event::MetricValue::Gauge { value: 6.0 },
635                )
636                .with_tags(Some(MetricTags::from_iter([(
637                    "component_id".to_string(),
638                    "start.a.end".to_string(),
639                )]))),
640                Metric::new(
641                    "test",
642                    MetricKind::Absolute,
643                    vector_core::event::MetricValue::Gauge { value: 1.0 },
644                )
645                .with_tags(Some(MetricTags::from_iter([(
646                    "component_id".to_string(),
647                    "something_else".to_string(),
648                )]))),
649                Metric::new(
650                    "test",
651                    MetricKind::Absolute,
652                    vector_core::event::MetricValue::Gauge { value: 3.0 },
653                )
654                .with_tags(Some(MetricTags::from_iter([(
655                    "component_id".to_string(),
656                    "start.c.end".to_string(),
657                )]))),
658            ]
659            .into(),
660        );
661
662        let result = compile_and_run(
663            storage,
664            r#"
665            aggregate_vector_metrics("sum", "test", tags: { "component_id": "start.*.end" })
666        "#,
667        )
668        .expect("vrl failed");
669        let result = result.as_float().unwrap();
670
671        assert_eq!(result.into_inner(), 9.0);
672    }
673
674    #[test]
675    fn test_aggregate_vector_metrics_avg() {
676        let storage = MetricsStorage::default();
677        storage.cache.store(
678            vec![
679                Metric::new(
680                    "test",
681                    MetricKind::Absolute,
682                    vector_core::event::MetricValue::Gauge { value: 6.0 },
683                )
684                .with_tags(Some(MetricTags::from_iter([(
685                    "component_id".to_string(),
686                    "start.a.end".to_string(),
687                )]))),
688                Metric::new(
689                    "test",
690                    MetricKind::Absolute,
691                    vector_core::event::MetricValue::Gauge { value: 1.0 },
692                )
693                .with_tags(Some(MetricTags::from_iter([(
694                    "component_id".to_string(),
695                    "something_else".to_string(),
696                )]))),
697                Metric::new(
698                    "test",
699                    MetricKind::Absolute,
700                    vector_core::event::MetricValue::Gauge { value: 3.0 },
701                )
702                .with_tags(Some(MetricTags::from_iter([(
703                    "component_id".to_string(),
704                    "start.c.end".to_string(),
705                )]))),
706            ]
707            .into(),
708        );
709
710        let result = compile_and_run(
711            storage,
712            r#"
713            aggregate_vector_metrics("avg", "test", tags: { "component_id": "start.*.end" })
714        "#,
715        )
716        .expect("vrl failed");
717        let result = result.as_float().unwrap();
718
719        assert_eq!(result.into_inner(), 4.5);
720    }
721
722    #[test]
723    fn test_aggregate_vector_metrics_max() {
724        let storage = MetricsStorage::default();
725        storage.cache.store(
726            vec![
727                Metric::new(
728                    "test",
729                    MetricKind::Absolute,
730                    vector_core::event::MetricValue::Gauge { value: 6.0 },
731                )
732                .with_tags(Some(MetricTags::from_iter([(
733                    "component_id".to_string(),
734                    "start.a.end".to_string(),
735                )]))),
736                Metric::new(
737                    "test",
738                    MetricKind::Absolute,
739                    vector_core::event::MetricValue::Gauge { value: 1.0 },
740                )
741                .with_tags(Some(MetricTags::from_iter([(
742                    "component_id".to_string(),
743                    "something_else".to_string(),
744                )]))),
745                Metric::new(
746                    "test",
747                    MetricKind::Absolute,
748                    vector_core::event::MetricValue::Gauge { value: 3.0 },
749                )
750                .with_tags(Some(MetricTags::from_iter([(
751                    "component_id".to_string(),
752                    "start.c.end".to_string(),
753                )]))),
754            ]
755            .into(),
756        );
757
758        let result = compile_and_run(
759            storage,
760            r#"
761            aggregate_vector_metrics("max", "test", tags: { "component_id": "start.*.end" })
762        "#,
763        )
764        .expect("vrl failed");
765        let result = result.as_float().unwrap();
766
767        assert_eq!(result.into_inner(), 6.0);
768    }
769
770    #[test]
771    fn test_aggregate_vector_metrics_min() {
772        let storage = MetricsStorage::default();
773        storage.cache.store(
774            vec![
775                Metric::new(
776                    "test",
777                    MetricKind::Absolute,
778                    vector_core::event::MetricValue::Gauge { value: 6.0 },
779                )
780                .with_tags(Some(MetricTags::from_iter([(
781                    "component_id".to_string(),
782                    "start.a.end".to_string(),
783                )]))),
784                Metric::new(
785                    "test",
786                    MetricKind::Absolute,
787                    vector_core::event::MetricValue::Gauge { value: 1.0 },
788                )
789                .with_tags(Some(MetricTags::from_iter([(
790                    "component_id".to_string(),
791                    "something_else".to_string(),
792                )]))),
793                Metric::new(
794                    "test",
795                    MetricKind::Absolute,
796                    vector_core::event::MetricValue::Gauge { value: 3.0 },
797                )
798                .with_tags(Some(MetricTags::from_iter([(
799                    "component_id".to_string(),
800                    "start.c.end".to_string(),
801                )]))),
802            ]
803            .into(),
804        );
805
806        let result = compile_and_run(
807            storage,
808            r#"
809            aggregate_vector_metrics("min", "test", tags: { "component_id": "start.*.end" })
810        "#,
811        )
812        .expect("vrl failed");
813        let result = result.as_float().unwrap();
814
815        assert_eq!(result.into_inner(), 3.0);
816    }
817}