1use std::{collections::BTreeMap, sync::Arc, time::Duration};
2use tokio::time::interval;
3use tokio_stream::{wrappers::IntervalStream, StreamExt};
4use vector_common::shutdown::ShutdownSignal;
5use vrl::{
6 diagnostic::Label,
7 prelude::{expression::Expr, *},
8 value,
9};
10
11use arc_swap::ArcSwap;
12use vector_core::{event::Metric, metrics::Controller};
13
14#[derive(Debug)]
15pub(crate) enum Error {
16 MetricsStorageNotLoaded,
17}
18
19impl fmt::Display for Error {
20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21 match self {
22 Error::MetricsStorageNotLoaded => write!(f, "metrics storage not loaded"),
23 }
24 }
25}
26
27impl std::error::Error for Error {}
28
29impl DiagnosticMessage for Error {
30 fn code(&self) -> usize {
31 112
32 }
33
34 fn labels(&self) -> Vec<Label> {
35 match self {
36 Error::MetricsStorageNotLoaded => {
37 vec![Label::primary(
38 "VRL metrics error: metrics storage not loaded".to_string(),
39 Span::default(),
40 )]
41 }
42 }
43 }
44}
45
46#[derive(Debug, Default, Clone)]
47pub struct MetricsStorage {
48 #[doc(hidden)]
50 pub cache: Arc<ArcSwap<Vec<Metric>>>,
51}
52
53impl MetricsStorage {
54 pub(crate) fn get_metric(
55 &self,
56 metric: &str,
57 tags: BTreeMap<String, String>,
58 ) -> Option<Metric> {
59 self.cache
60 .load()
61 .iter()
62 .find(|m| m.name() == metric && tags.iter().all(|tag| tag_matches(m, tag)))
63 .cloned()
64 }
65
66 pub(crate) fn find_metrics(&self, metric: &str, tags: BTreeMap<String, String>) -> Vec<Metric> {
67 self.cache
68 .load()
69 .iter()
70 .filter(|m| m.name() == metric && tags.iter().all(|tag| tag_matches(m, tag)))
71 .cloned()
72 .collect()
73 }
74
75 pub fn refresh_metrics(&self) {
76 let new_metrics = Controller::get()
77 .expect("metrics not initialized")
78 .capture_metrics();
79 self.cache.store(new_metrics.into());
80 }
81
82 pub async fn run_periodic_refresh(
83 &self,
84 refresh_interval: Duration,
85 mut shutdown: ShutdownSignal,
86 ) {
87 let mut intervals = IntervalStream::new(interval(refresh_interval));
88 loop {
89 tokio::select! {
90 Some(_) = intervals.next() => {
91 self.refresh_metrics();
92 }
93 _ = &mut shutdown => {
94 break;
95 }
96 }
97 }
98 }
99}
100
101fn tag_matches(metric: &Metric, (tag_key, tag_value): (&String, &String)) -> bool {
103 if let Some((prefix, suffix)) = tag_value.split_once('*') {
104 let Some(metric_tag_value) = metric.tag_value(tag_key) else {
105 return false;
106 };
107
108 metric_tag_value.starts_with(prefix) && metric_tag_value.ends_with(suffix)
109 } else {
110 metric.tag_matches(tag_key, tag_value)
111 }
112}
113
114pub(crate) fn metrics_vrl_typedef() -> BTreeMap<Field, Kind> {
115 BTreeMap::from([
116 (Field::from("name"), Kind::bytes()),
117 (Field::from("tags"), Kind::any_object()),
118 (Field::from("type"), Kind::bytes()),
119 (Field::from("kind"), Kind::bytes()),
120 (Field::from("value"), Kind::float() | Kind::null()),
121 ])
122}
123
124pub(crate) fn metric_into_vrl(value: &Metric) -> Value {
125 value!({
126 name: { value.name() },
127 tags: {
128 BTreeMap::from_iter(
129 value
130 .tags()
131 .map(|t| {
132 t.iter_sets()
133 .map(|(k, v)| {
134 (
135 k.into(),
136 Value::Array(
137 v.iter()
138 .filter_map(|v| {
139 v.map(ToString::to_string).map(Into::into).map(Value::Bytes)
140 })
141 .collect(),
142 ),
143 )
144 })
145 .collect::<Vec<_>>()
146 })
147 .unwrap_or_default(),
148 )
149 },
150 "type": { value.value().as_name() },
151 kind: {
152 match value.kind() {
153 vector_core::event::MetricKind::Incremental => "incremental",
154 vector_core::event::MetricKind::Absolute => "absolute",
155 }
156 },
157 value: {
158 match value.value() {
159 vector_core::event::MetricValue::Counter { value }
160 | vector_core::event::MetricValue::Gauge { value } => NotNan::new(*value).ok(),
161 _ => None,
162 }
163 }
164 })
165}
166
167pub(crate) fn validate_tags(
168 state: &TypeState,
169 tags: &BTreeMap<KeyString, Expr>,
170) -> Result<(), Box<dyn DiagnosticMessage>> {
171 for v in tags.values() {
172 if *v.type_def(state).kind() != Kind::bytes() {
173 return Err(Box::new(vrl::compiler::function::Error::InvalidArgument {
174 keyword: "tags.value",
175 value: v.resolve_constant(state).unwrap_or(Value::Null),
176 error: "Tag values must be strings",
177 }));
178 }
179 }
180 Ok(())
181}
182
183pub(crate) fn resolve_tags(
184 ctx: &mut Context,
185 tags: &BTreeMap<KeyString, Expr>,
186) -> Result<BTreeMap<String, String>, ExpressionError> {
187 tags.iter()
188 .map(|(k, v)| {
189 v.resolve(ctx).and_then(|v| {
190 Ok((
191 k.clone().into(),
192 v.as_str().ok_or("Tag must be a string")?.into_owned(),
193 ))
194 })
195 })
196 .collect::<Result<_, _>>()
197}
198
199#[cfg(test)]
201mod tests {
202 use vector_core::{
203 compile_vrl,
204 event::{Event, LogEvent, MetricKind, MetricTags, VrlTarget},
205 };
206 use vrl::{
207 compiler::{
208 runtime::{Runtime, Terminate},
209 CompilationResult, CompileConfig,
210 },
211 diagnostic::DiagnosticList,
212 };
213
214 use super::*;
215
216 fn compile(
217 storage: MetricsStorage,
218 vrl_source: &str,
219 ) -> Result<CompilationResult, DiagnosticList> {
220 #[allow(clippy::disallowed_methods)]
222 let functions = vrl::stdlib::all().into_iter();
223
224 let functions = functions.chain(crate::all()).collect::<Vec<_>>();
225
226 let state = TypeState::default();
227
228 let mut config = CompileConfig::default();
229 config.set_custom(storage.clone());
230 config.set_read_only();
231
232 compile_vrl(vrl_source, &functions, &state, config)
233 }
234
235 fn compile_and_run(storage: MetricsStorage, vrl_source: &str) -> Result<Value, Terminate> {
236 let CompilationResult {
237 program,
238 warnings: _,
239 config: _,
240 } = compile(storage, vrl_source).expect("compilation failed");
241
242 let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
243 Runtime::default().resolve(&mut target, &program, &TimeZone::default())
244 }
245
246 fn assert_metric_matches(
247 metric: &BTreeMap<KeyString, Value>,
248 name: &str,
249 value: f64,
250 tags: Option<Vec<(&str, &str)>>,
251 ) {
252 assert_eq!(metric.get("name").unwrap().as_str().unwrap(), name);
253 assert_eq!(
254 metric.get("value").unwrap().as_float().unwrap(),
255 NotNan::new(value).unwrap()
256 );
257
258 if let Some(tags) = tags {
259 let metric_tags = metric.get("tags").unwrap().as_object().unwrap();
260 for (key, value) in tags {
261 assert_eq!(
262 metric_tags
263 .get(key)
264 .unwrap()
265 .as_array_unwrap()
266 .first()
267 .unwrap()
268 .as_str()
269 .unwrap(),
270 value
271 );
272 }
273 }
274 }
275
276 #[test]
277 fn test_get_vector_metric() {
278 let storage = MetricsStorage::default();
279 storage.cache.store(
280 vec![Metric::new(
281 "test",
282 MetricKind::Absolute,
283 vector_core::event::MetricValue::Gauge { value: 1.0 },
284 )]
285 .into(),
286 );
287
288 let result = compile_and_run(
289 storage,
290 r#"
291 get_vector_metric("test")
292 "#,
293 )
294 .expect("vrl failed");
295 let result = result.as_object().unwrap();
296
297 assert_metric_matches(result, "test", 1.0, None);
298 }
299
300 #[test]
301 fn test_find_vector_metrics() {
302 let storage = MetricsStorage::default();
303 storage.cache.store(
304 vec![
305 Metric::new(
306 "test",
307 MetricKind::Absolute,
308 vector_core::event::MetricValue::Gauge { value: 1.0 },
309 )
310 .with_tags(Some(MetricTags::from_iter([(
311 "component_id".to_string(),
312 "a".to_string(),
313 )]))),
314 Metric::new(
315 "test",
316 MetricKind::Absolute,
317 vector_core::event::MetricValue::Gauge { value: 1.0 },
318 )
319 .with_tags(Some(MetricTags::from_iter([(
320 "component_id".to_string(),
321 "b".to_string(),
322 )]))),
323 ]
324 .into(),
325 );
326
327 let result = compile_and_run(
328 storage,
329 r#"
330 find_vector_metrics("test")
331 "#,
332 )
333 .expect("vrl failed");
334 let result = result.as_array_unwrap();
335
336 assert_metric_matches(
337 result[0].as_object().unwrap(),
338 "test",
339 1.0,
340 Some(vec![("component_id", "a")]),
341 );
342 assert_metric_matches(
343 result[1].as_object().unwrap(),
344 "test",
345 1.0,
346 Some(vec![("component_id", "b")]),
347 );
348 }
349
350 #[test]
351 fn test_get_vector_metric_by_tag() {
352 let storage = MetricsStorage::default();
353 storage.cache.store(
354 vec![
355 Metric::new(
356 "test",
357 MetricKind::Absolute,
358 vector_core::event::MetricValue::Gauge { value: 1.0 },
359 )
360 .with_tags(Some(MetricTags::from_iter([(
361 "component_id".to_string(),
362 "a".to_string(),
363 )]))),
364 Metric::new(
365 "test",
366 MetricKind::Absolute,
367 vector_core::event::MetricValue::Gauge { value: 1.0 },
368 )
369 .with_tags(Some(MetricTags::from_iter([(
370 "component_id".to_string(),
371 "b".to_string(),
372 )]))),
373 ]
374 .into(),
375 );
376
377 let result = compile_and_run(
378 storage,
379 r#"
380 get_vector_metric("test", tags: { "component_id": "b" })
381 "#,
382 )
383 .expect("vrl failed");
384 let result = result.as_object().unwrap();
385
386 assert_metric_matches(result, "test", 1.0, Some(vec![("component_id", "b")]));
387 }
388
389 #[test]
390 fn test_find_vector_metrics_wildcard() {
391 let storage = MetricsStorage::default();
392 storage.cache.store(
393 vec![
394 Metric::new(
395 "test",
396 MetricKind::Absolute,
397 vector_core::event::MetricValue::Gauge { value: 1.0 },
398 )
399 .with_tags(Some(MetricTags::from_iter([(
400 "component_id".to_string(),
401 "a".to_string(),
402 )]))),
403 Metric::new(
404 "test",
405 MetricKind::Absolute,
406 vector_core::event::MetricValue::Gauge { value: 1.0 },
407 )
408 .with_tags(Some(MetricTags::from_iter([(
409 "component_id".to_string(),
410 "b".to_string(),
411 )]))),
412 Metric::new(
413 "test",
414 MetricKind::Absolute,
415 vector_core::event::MetricValue::Gauge { value: 1.0 },
416 ),
417 ]
418 .into(),
419 );
420
421 let result = compile_and_run(
422 storage,
423 r#"
424 find_vector_metrics("test", tags: { "component_id": "*" })
425 "#,
426 )
427 .expect("vrl failed");
428 let result = result.as_array_unwrap();
429
430 assert_eq!(result.len(), 2);
432 assert_metric_matches(
433 result[0].as_object().unwrap(),
434 "test",
435 1.0,
436 Some(vec![("component_id", "a")]),
437 );
438 assert_metric_matches(
439 result[1].as_object().unwrap(),
440 "test",
441 1.0,
442 Some(vec![("component_id", "b")]),
443 );
444 }
445
446 #[test]
447 fn test_find_vector_metrics_wildcard_start() {
448 let storage = MetricsStorage::default();
449 storage.cache.store(
450 vec![
451 Metric::new(
452 "test",
453 MetricKind::Absolute,
454 vector_core::event::MetricValue::Gauge { value: 1.0 },
455 )
456 .with_tags(Some(MetricTags::from_iter([(
457 "component_id".to_string(),
458 "prefix.a".to_string(),
459 )]))),
460 Metric::new(
461 "test",
462 MetricKind::Absolute,
463 vector_core::event::MetricValue::Gauge { value: 1.0 },
464 )
465 .with_tags(Some(MetricTags::from_iter([(
466 "component_id".to_string(),
467 "something_else".to_string(),
468 )]))),
469 Metric::new(
470 "test",
471 MetricKind::Absolute,
472 vector_core::event::MetricValue::Gauge { value: 1.0 },
473 )
474 .with_tags(Some(MetricTags::from_iter([(
475 "component_id".to_string(),
476 "prefix.c".to_string(),
477 )]))),
478 ]
479 .into(),
480 );
481
482 let result = compile_and_run(
483 storage,
484 r#"
485 find_vector_metrics("test", tags: { "component_id": "prefix.*" })
486 "#,
487 )
488 .expect("vrl failed");
489 let result = result.as_array_unwrap();
490
491 assert_eq!(result.len(), 2);
492 assert_metric_matches(
493 result[0].as_object().unwrap(),
494 "test",
495 1.0,
496 Some(vec![("component_id", "prefix.a")]),
497 );
498 assert_metric_matches(
499 result[1].as_object().unwrap(),
500 "test",
501 1.0,
502 Some(vec![("component_id", "prefix.c")]),
503 );
504 }
505
506 #[test]
507 fn test_find_vector_metrics_wildcard_end() {
508 let storage = MetricsStorage::default();
509 storage.cache.store(
510 vec![
511 Metric::new(
512 "test",
513 MetricKind::Absolute,
514 vector_core::event::MetricValue::Gauge { value: 1.0 },
515 )
516 .with_tags(Some(MetricTags::from_iter([(
517 "component_id".to_string(),
518 "a.suffix".to_string(),
519 )]))),
520 Metric::new(
521 "test",
522 MetricKind::Absolute,
523 vector_core::event::MetricValue::Gauge { value: 1.0 },
524 )
525 .with_tags(Some(MetricTags::from_iter([(
526 "component_id".to_string(),
527 "something_else".to_string(),
528 )]))),
529 Metric::new(
530 "test",
531 MetricKind::Absolute,
532 vector_core::event::MetricValue::Gauge { value: 1.0 },
533 )
534 .with_tags(Some(MetricTags::from_iter([(
535 "component_id".to_string(),
536 "c.suffix".to_string(),
537 )]))),
538 ]
539 .into(),
540 );
541
542 let result = compile_and_run(
543 storage,
544 r#"
545 find_vector_metrics("test", tags: { "component_id": "*.suffix" })
546 "#,
547 )
548 .expect("vrl failed");
549 let result = result.as_array_unwrap();
550
551 assert_eq!(result.len(), 2);
552 assert_metric_matches(
553 result[0].as_object().unwrap(),
554 "test",
555 1.0,
556 Some(vec![("component_id", "a.suffix")]),
557 );
558 assert_metric_matches(
559 result[1].as_object().unwrap(),
560 "test",
561 1.0,
562 Some(vec![("component_id", "c.suffix")]),
563 );
564 }
565
566 #[test]
567 fn test_find_vector_metrics_wildcard_middle() {
568 let storage = MetricsStorage::default();
569 storage.cache.store(
570 vec![
571 Metric::new(
572 "test",
573 MetricKind::Absolute,
574 vector_core::event::MetricValue::Gauge { value: 1.0 },
575 )
576 .with_tags(Some(MetricTags::from_iter([(
577 "component_id".to_string(),
578 "start.a.end".to_string(),
579 )]))),
580 Metric::new(
581 "test",
582 MetricKind::Absolute,
583 vector_core::event::MetricValue::Gauge { value: 1.0 },
584 )
585 .with_tags(Some(MetricTags::from_iter([(
586 "component_id".to_string(),
587 "something_else".to_string(),
588 )]))),
589 Metric::new(
590 "test",
591 MetricKind::Absolute,
592 vector_core::event::MetricValue::Gauge { value: 1.0 },
593 )
594 .with_tags(Some(MetricTags::from_iter([(
595 "component_id".to_string(),
596 "start.c.end".to_string(),
597 )]))),
598 ]
599 .into(),
600 );
601
602 let result = compile_and_run(
603 storage,
604 r#"
605 find_vector_metrics("test", tags: { "component_id": "start.*.end" })
606 "#,
607 )
608 .expect("vrl failed");
609 let result = result.as_array_unwrap();
610
611 assert_eq!(result.len(), 2);
612 assert_metric_matches(
613 result[0].as_object().unwrap(),
614 "test",
615 1.0,
616 Some(vec![("component_id", "start.a.end")]),
617 );
618 assert_metric_matches(
619 result[1].as_object().unwrap(),
620 "test",
621 1.0,
622 Some(vec![("component_id", "start.c.end")]),
623 );
624 }
625
626 #[test]
627 fn test_aggregate_vector_metrics_sum() {
628 let storage = MetricsStorage::default();
629 storage.cache.store(
630 vec![
631 Metric::new(
632 "test",
633 MetricKind::Absolute,
634 vector_core::event::MetricValue::Gauge { value: 6.0 },
635 )
636 .with_tags(Some(MetricTags::from_iter([(
637 "component_id".to_string(),
638 "start.a.end".to_string(),
639 )]))),
640 Metric::new(
641 "test",
642 MetricKind::Absolute,
643 vector_core::event::MetricValue::Gauge { value: 1.0 },
644 )
645 .with_tags(Some(MetricTags::from_iter([(
646 "component_id".to_string(),
647 "something_else".to_string(),
648 )]))),
649 Metric::new(
650 "test",
651 MetricKind::Absolute,
652 vector_core::event::MetricValue::Gauge { value: 3.0 },
653 )
654 .with_tags(Some(MetricTags::from_iter([(
655 "component_id".to_string(),
656 "start.c.end".to_string(),
657 )]))),
658 ]
659 .into(),
660 );
661
662 let result = compile_and_run(
663 storage,
664 r#"
665 aggregate_vector_metrics("sum", "test", tags: { "component_id": "start.*.end" })
666 "#,
667 )
668 .expect("vrl failed");
669 let result = result.as_float().unwrap();
670
671 assert_eq!(result.into_inner(), 9.0);
672 }
673
674 #[test]
675 fn test_aggregate_vector_metrics_avg() {
676 let storage = MetricsStorage::default();
677 storage.cache.store(
678 vec![
679 Metric::new(
680 "test",
681 MetricKind::Absolute,
682 vector_core::event::MetricValue::Gauge { value: 6.0 },
683 )
684 .with_tags(Some(MetricTags::from_iter([(
685 "component_id".to_string(),
686 "start.a.end".to_string(),
687 )]))),
688 Metric::new(
689 "test",
690 MetricKind::Absolute,
691 vector_core::event::MetricValue::Gauge { value: 1.0 },
692 )
693 .with_tags(Some(MetricTags::from_iter([(
694 "component_id".to_string(),
695 "something_else".to_string(),
696 )]))),
697 Metric::new(
698 "test",
699 MetricKind::Absolute,
700 vector_core::event::MetricValue::Gauge { value: 3.0 },
701 )
702 .with_tags(Some(MetricTags::from_iter([(
703 "component_id".to_string(),
704 "start.c.end".to_string(),
705 )]))),
706 ]
707 .into(),
708 );
709
710 let result = compile_and_run(
711 storage,
712 r#"
713 aggregate_vector_metrics("avg", "test", tags: { "component_id": "start.*.end" })
714 "#,
715 )
716 .expect("vrl failed");
717 let result = result.as_float().unwrap();
718
719 assert_eq!(result.into_inner(), 4.5);
720 }
721
722 #[test]
723 fn test_aggregate_vector_metrics_max() {
724 let storage = MetricsStorage::default();
725 storage.cache.store(
726 vec![
727 Metric::new(
728 "test",
729 MetricKind::Absolute,
730 vector_core::event::MetricValue::Gauge { value: 6.0 },
731 )
732 .with_tags(Some(MetricTags::from_iter([(
733 "component_id".to_string(),
734 "start.a.end".to_string(),
735 )]))),
736 Metric::new(
737 "test",
738 MetricKind::Absolute,
739 vector_core::event::MetricValue::Gauge { value: 1.0 },
740 )
741 .with_tags(Some(MetricTags::from_iter([(
742 "component_id".to_string(),
743 "something_else".to_string(),
744 )]))),
745 Metric::new(
746 "test",
747 MetricKind::Absolute,
748 vector_core::event::MetricValue::Gauge { value: 3.0 },
749 )
750 .with_tags(Some(MetricTags::from_iter([(
751 "component_id".to_string(),
752 "start.c.end".to_string(),
753 )]))),
754 ]
755 .into(),
756 );
757
758 let result = compile_and_run(
759 storage,
760 r#"
761 aggregate_vector_metrics("max", "test", tags: { "component_id": "start.*.end" })
762 "#,
763 )
764 .expect("vrl failed");
765 let result = result.as_float().unwrap();
766
767 assert_eq!(result.into_inner(), 6.0);
768 }
769
770 #[test]
771 fn test_aggregate_vector_metrics_min() {
772 let storage = MetricsStorage::default();
773 storage.cache.store(
774 vec![
775 Metric::new(
776 "test",
777 MetricKind::Absolute,
778 vector_core::event::MetricValue::Gauge { value: 6.0 },
779 )
780 .with_tags(Some(MetricTags::from_iter([(
781 "component_id".to_string(),
782 "start.a.end".to_string(),
783 )]))),
784 Metric::new(
785 "test",
786 MetricKind::Absolute,
787 vector_core::event::MetricValue::Gauge { value: 1.0 },
788 )
789 .with_tags(Some(MetricTags::from_iter([(
790 "component_id".to_string(),
791 "something_else".to_string(),
792 )]))),
793 Metric::new(
794 "test",
795 MetricKind::Absolute,
796 vector_core::event::MetricValue::Gauge { value: 3.0 },
797 )
798 .with_tags(Some(MetricTags::from_iter([(
799 "component_id".to_string(),
800 "start.c.end".to_string(),
801 )]))),
802 ]
803 .into(),
804 );
805
806 let result = compile_and_run(
807 storage,
808 r#"
809 aggregate_vector_metrics("min", "test", tags: { "component_id": "start.*.end" })
810 "#,
811 )
812 .expect("vrl failed");
813 let result = result.as_float().unwrap();
814
815 assert_eq!(result.into_inner(), 3.0);
816 }
817}