1use std::{
2 path::{Path, PathBuf},
3 time::Duration,
4};
5
6use chrono::{DateTime, Utc};
7use futures::StreamExt;
8use glob::{Pattern, PatternError};
9#[cfg(not(windows))]
10use heim::units::ratio::ratio;
11use heim::units::time::second;
12use serde_with::serde_as;
13use sysinfo::{Components, System};
14use tokio::time;
15use tokio_stream::wrappers::IntervalStream;
16use vector_lib::{
17 EstimatedJsonEncodedSizeOf,
18 config::LogNamespace,
19 configurable::configurable_component,
20 internal_event::{
21 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
22 },
23};
24
25use crate::{
26 SourceSender,
27 config::{SourceConfig, SourceContext, SourceOutput},
28 event::metric::{Metric, MetricKind, MetricTags, MetricValue},
29 internal_events::{EventsReceived, HostMetricsScrapeDetailError, StreamClosedError},
30 shutdown::ShutdownSignal,
31};
32
33#[cfg(target_os = "linux")]
34mod cgroups;
35mod cpu;
36mod disk;
37mod filesystem;
38mod memory;
39mod network;
40mod process;
41#[cfg(target_os = "linux")]
42mod tcp;
43mod temperature;
44
45#[serde_as]
47#[configurable_component]
48#[derive(Clone, Copy, Debug, Eq, PartialEq)]
49#[serde(rename_all = "lowercase")]
50pub enum Collector {
51 CGroups,
55
56 Cpu,
58
59 Process,
61
62 Disk,
64
65 Filesystem,
67
68 Load,
70
71 Host,
73
74 Memory,
76
77 Network,
79
80 TCP,
82
83 Temperature,
85}
86
87#[configurable_component]
89#[derive(Clone, Debug, Default)]
90struct FilterList {
91 includes: Option<Vec<PatternWrapper>>,
95
96 excludes: Option<Vec<PatternWrapper>>,
100}
101
102#[serde_as]
104#[configurable_component(source("host_metrics", "Collect metric data from the local system."))]
105#[derive(Clone, Debug, Derivative)]
106#[derivative(Default)]
107#[serde(deny_unknown_fields)]
108pub struct HostMetricsConfig {
109 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
111 #[serde(default = "default_scrape_interval")]
112 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
113 pub scrape_interval_secs: Duration,
114
115 #[configurable(metadata(docs::examples = "example_collectors()"))]
119 #[derivative(Default(value = "default_collectors()"))]
120 #[serde(default = "default_collectors")]
121 pub collectors: Option<Vec<Collector>>,
122
123 #[derivative(Default(value = "default_namespace()"))]
125 #[serde(default = "default_namespace")]
126 pub namespace: Option<String>,
127
128 #[configurable(derived)]
129 #[derivative(Default(value = "default_cgroups_config()"))]
130 #[serde(default = "default_cgroups_config")]
131 pub cgroups: Option<CGroupsConfig>,
132
133 #[configurable(derived)]
134 #[serde(default)]
135 pub disk: disk::DiskConfig,
136
137 #[configurable(derived)]
138 #[serde(default)]
139 pub filesystem: filesystem::FilesystemConfig,
140
141 #[configurable(derived)]
142 #[serde(default)]
143 pub network: network::NetworkConfig,
144
145 #[configurable(derived)]
146 #[serde(default)]
147 pub process: process::ProcessConfig,
148}
149
150#[configurable_component]
154#[derive(Clone, Debug, Derivative)]
155#[derivative(Default)]
156#[serde(default)]
157pub struct CGroupsConfig {
158 #[derivative(Default(value = "default_levels()"))]
162 #[serde(default = "default_levels")]
163 #[configurable(metadata(docs::examples = 1))]
164 #[configurable(metadata(docs::examples = 3))]
165 levels: usize,
166
167 #[configurable(metadata(docs::examples = "/"))]
169 #[configurable(metadata(docs::examples = "system.slice/snapd.service"))]
170 pub(super) base: Option<PathBuf>,
171
172 #[configurable(metadata(docs::examples = "example_cgroups()"))]
175 #[serde(default = "default_all_devices")]
176 groups: FilterList,
177
178 #[serde(skip_serializing)]
180 #[configurable(metadata(docs::hidden))]
181 #[configurable(metadata(docs::human_name = "Base Directory"))]
182 base_dir: Option<PathBuf>,
183}
184
185const fn default_scrape_interval() -> Duration {
186 Duration::from_secs(15)
187}
188
189pub fn default_namespace() -> Option<String> {
190 Some(String::from("host"))
191}
192
193const fn example_collectors() -> [&'static str; 10] {
194 [
195 "cgroups",
196 "cpu",
197 "disk",
198 "filesystem",
199 "load",
200 "host",
201 "memory",
202 "network",
203 "tcp",
204 "temperature",
205 ]
206}
207
208fn default_collectors() -> Option<Vec<Collector>> {
209 let mut collectors = vec![
210 Collector::Cpu,
211 Collector::Disk,
212 Collector::Filesystem,
213 Collector::Load,
214 Collector::Host,
215 Collector::Memory,
216 Collector::Network,
217 Collector::Process,
218 ];
219
220 #[cfg(target_os = "linux")]
221 {
222 collectors.push(Collector::CGroups);
223 collectors.push(Collector::TCP);
224 }
225 #[cfg(not(target_os = "linux"))]
226 if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
227 collectors.push(Collector::CGroups);
228 collectors.push(Collector::TCP);
229 }
230
231 Some(collectors)
232}
233
234fn example_devices() -> FilterList {
235 FilterList {
236 includes: Some(vec!["sda".try_into().unwrap()]),
237 excludes: Some(vec!["dm-*".try_into().unwrap()]),
238 }
239}
240
241fn default_all_devices() -> FilterList {
242 FilterList {
243 includes: Some(vec!["*".try_into().unwrap()]),
244 excludes: None,
245 }
246}
247
248fn example_processes() -> FilterList {
249 FilterList {
250 includes: Some(vec!["docker".try_into().unwrap()]),
251 excludes: None,
252 }
253}
254
255fn default_all_processes() -> FilterList {
256 FilterList {
257 includes: Some(vec!["*".try_into().unwrap()]),
258 excludes: None,
259 }
260}
261
262const fn default_levels() -> usize {
263 100
264}
265
266fn example_cgroups() -> FilterList {
267 FilterList {
268 includes: Some(vec!["user.slice/*".try_into().unwrap()]),
269 excludes: Some(vec!["*.service".try_into().unwrap()]),
270 }
271}
272
273fn default_cgroups_config() -> Option<CGroupsConfig> {
274 if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
276 return Some(CGroupsConfig::default());
277 }
278
279 #[cfg(not(target_os = "linux"))]
280 {
281 None
282 }
283
284 #[cfg(target_os = "linux")]
285 {
286 Some(CGroupsConfig::default())
287 }
288}
289
290impl_generate_config_from_default!(HostMetricsConfig);
291
292#[async_trait::async_trait]
293#[typetag::serde(name = "host_metrics")]
294impl SourceConfig for HostMetricsConfig {
295 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
296 init_roots();
297
298 #[cfg(not(target_os = "linux"))]
299 {
300 if self.cgroups.is_some() || self.has_collector(Collector::CGroups) {
301 return Err("CGroups collector is only available on Linux systems".into());
302 }
303 if self.has_collector(Collector::TCP) {
304 return Err("TCP collector is only available on Linux systems".into());
305 }
306 }
307
308 let mut config = self.clone();
309 config.namespace = config.namespace.filter(|namespace| !namespace.is_empty());
310
311 Ok(Box::pin(config.run(cx.out, cx.shutdown)))
312 }
313
314 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
315 vec![SourceOutput::new_metrics()]
316 }
317
318 fn can_acknowledge(&self) -> bool {
319 false
320 }
321}
322
323impl HostMetricsConfig {
324 pub fn scrape_interval_secs(&mut self, value: f64) {
326 self.scrape_interval_secs = Duration::from_secs_f64(value);
327 }
328
329 async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
330 let duration = self.scrape_interval_secs;
331 let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
332
333 let mut generator = HostMetrics::new(self);
334
335 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
336
337 while interval.next().await.is_some() {
338 bytes_received.emit(ByteSize(0));
339 let metrics = generator.capture_metrics().await;
340 let count = metrics.len();
341 if (out.send_batch(metrics).await).is_err() {
342 emit!(StreamClosedError { count });
343 return Err(());
344 }
345 }
346
347 Ok(())
348 }
349
350 fn has_collector(&self, collector: Collector) -> bool {
351 match &self.collectors {
352 None => true,
353 Some(collectors) => collectors.contains(&collector),
354 }
355 }
356}
357
358pub struct HostMetrics {
359 config: HostMetricsConfig,
360 system: System,
361 components: Components,
365 #[cfg(target_os = "linux")]
366 root_cgroup: Option<cgroups::CGroupRoot>,
367 events_received: Registered<EventsReceived>,
368}
369
370impl HostMetrics {
371 #[cfg(not(target_os = "linux"))]
372 pub fn new(config: HostMetricsConfig) -> Self {
373 Self {
374 config,
375 system: System::new(),
376 components: Components::new_with_refreshed_list(),
377 events_received: register!(EventsReceived),
378 }
379 }
380
381 #[cfg(target_os = "linux")]
382 pub fn new(config: HostMetricsConfig) -> Self {
383 let cgroups = config.cgroups.clone().unwrap_or_default();
384 let root_cgroup = cgroups::CGroupRoot::new(&cgroups);
385 Self {
386 config,
387 system: System::new(),
388 components: Components::new_with_refreshed_list(),
389 root_cgroup,
390 events_received: register!(EventsReceived),
391 }
392 }
393
394 pub fn buffer(&self) -> MetricsBuffer {
395 MetricsBuffer::new(self.config.namespace.clone())
396 }
397
398 async fn capture_metrics(&mut self) -> Vec<Metric> {
399 let mut buffer = self.buffer();
400
401 #[cfg(target_os = "linux")]
402 if self.config.has_collector(Collector::CGroups) {
403 self.cgroups_metrics(&mut buffer).await;
404 }
405 if self.config.has_collector(Collector::Cpu) {
406 self.cpu_metrics(&mut buffer).await;
407 }
408 if self.config.has_collector(Collector::Process) {
409 self.process_metrics(&mut buffer).await;
410 }
411 if self.config.has_collector(Collector::Disk) {
412 self.disk_metrics(&mut buffer).await;
413 }
414 if self.config.has_collector(Collector::Filesystem) {
415 self.filesystem_metrics(&mut buffer).await;
416 }
417 if self.config.has_collector(Collector::Load) {
418 self.loadavg_metrics(&mut buffer).await;
419 }
420 if self.config.has_collector(Collector::Host) {
421 self.host_metrics(&mut buffer).await;
422 }
423 if self.config.has_collector(Collector::Memory) {
424 self.memory_metrics(&mut buffer).await;
425 self.swap_metrics(&mut buffer).await;
426 }
427 if self.config.has_collector(Collector::Network) {
428 self.network_metrics(&mut buffer).await;
429 }
430 #[cfg(target_os = "linux")]
431 if self.config.has_collector(Collector::TCP) {
432 self.tcp_metrics(&mut buffer).await;
433 }
434 if self.config.has_collector(Collector::Temperature) {
435 self.temperature_metrics(&mut buffer).await;
436 }
437
438 let metrics = buffer.metrics;
439 self.events_received.emit(CountByteSize(
440 metrics.len(),
441 metrics.estimated_json_encoded_size_of(),
442 ));
443 metrics
444 }
445
446 pub async fn loadavg_metrics(&self, output: &mut MetricsBuffer) {
447 output.name = "load";
448 #[cfg(unix)]
449 match heim::cpu::os::unix::loadavg().await {
450 Ok(loadavg) => {
451 output.gauge(
452 "load1",
453 loadavg.0.get::<ratio>() as f64,
454 MetricTags::default(),
455 );
456 output.gauge(
457 "load5",
458 loadavg.1.get::<ratio>() as f64,
459 MetricTags::default(),
460 );
461 output.gauge(
462 "load15",
463 loadavg.2.get::<ratio>() as f64,
464 MetricTags::default(),
465 );
466 }
467 Err(error) => {
468 emit!(HostMetricsScrapeDetailError {
469 message: "Failed to load average info",
470 error,
471 });
472 }
473 }
474 }
475
476 pub async fn host_metrics(&self, output: &mut MetricsBuffer) {
477 output.name = "host";
478 match heim::host::uptime().await {
479 Ok(time) => output.gauge("uptime", time.get::<second>(), MetricTags::default()),
480 Err(error) => {
481 emit!(HostMetricsScrapeDetailError {
482 message: "Failed to load host uptime info",
483 error,
484 });
485 }
486 }
487
488 match heim::host::boot_time().await {
489 Ok(time) => output.gauge("boot_time", time.get::<second>(), MetricTags::default()),
490 Err(error) => {
491 emit!(HostMetricsScrapeDetailError {
492 message: "Failed to load host boot time info",
493 error,
494 });
495 }
496 }
497 }
498}
499
500#[derive(Default)]
501pub struct MetricsBuffer {
502 pub metrics: Vec<Metric>,
503 name: &'static str,
504 host: Option<String>,
505 timestamp: DateTime<Utc>,
506 namespace: Option<String>,
507}
508
509impl MetricsBuffer {
510 fn new(namespace: Option<String>) -> Self {
511 Self {
512 metrics: Vec::new(),
513 name: "",
514 host: crate::get_hostname().ok(),
515 timestamp: Utc::now(),
516 namespace,
517 }
518 }
519
520 fn tags(&self, mut tags: MetricTags) -> MetricTags {
521 tags.replace("collector".into(), self.name.to_string());
522 if let Some(host) = &self.host {
523 tags.replace("host".into(), host.clone());
524 }
525 tags
526 }
527
528 fn counter(&mut self, name: &str, value: f64, tags: MetricTags) {
529 self.metrics.push(
530 Metric::new(name, MetricKind::Absolute, MetricValue::Counter { value })
531 .with_namespace(self.namespace.clone())
532 .with_tags(Some(self.tags(tags)))
533 .with_timestamp(Some(self.timestamp)),
534 )
535 }
536
537 fn gauge(&mut self, name: &str, value: f64, tags: MetricTags) {
538 self.metrics.push(
539 Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
540 .with_namespace(self.namespace.clone())
541 .with_tags(Some(self.tags(tags)))
542 .with_timestamp(Some(self.timestamp)),
543 )
544 }
545}
546
547fn filter_result_sync<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
548where
549 E: std::error::Error,
550{
551 result
552 .map_err(|error| emit!(HostMetricsScrapeDetailError { message, error }))
553 .ok()
554}
555
556async fn filter_result<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
557where
558 E: std::error::Error,
559{
560 filter_result_sync(result, message)
561}
562
563#[allow(clippy::missing_const_for_fn)]
564fn init_roots() {
565 #[cfg(target_os = "linux")]
566 {
567 use std::sync::Once;
568
569 static INIT: Once = Once::new();
570
571 INIT.call_once(|| {
572 match std::env::var_os("PROCFS_ROOT") {
573 Some(procfs_root) => {
574 info!(
575 message = "PROCFS_ROOT is set in envvars. Using custom for procfs.",
576 custom = ?procfs_root
577 );
578 heim::os::linux::set_procfs_root(std::path::PathBuf::from(&procfs_root));
579 }
580 None => info!("PROCFS_ROOT is unset. Using default '/proc' for procfs root."),
581 };
582
583 match std::env::var_os("SYSFS_ROOT") {
584 Some(sysfs_root) => {
585 info!(
586 message = "SYSFS_ROOT is set in envvars. Using custom for sysfs.",
587 custom = ?sysfs_root
588 );
589 heim::os::linux::set_sysfs_root(std::path::PathBuf::from(&sysfs_root));
590 }
591 None => info!("SYSFS_ROOT is unset. Using default '/sys' for sysfs root."),
592 }
593 });
594 };
595}
596
597impl FilterList {
598 fn contains<T, M>(&self, value: &Option<T>, matches: M) -> bool
599 where
600 M: Fn(&PatternWrapper, &T) -> bool,
601 {
602 (match (&self.includes, value) {
603 (None, _) => true,
605 (Some(_), None) => false,
607 (Some(includes), Some(value)) => includes.iter().any(|pattern| matches(pattern, value)),
609 }) && match (&self.excludes, value) {
610 (None, _) => true,
612 (Some(_), None) => true,
614 (Some(excludes), Some(value)) => {
616 !excludes.iter().any(|pattern| matches(pattern, value))
617 }
618 }
619 }
620
621 fn contains_str(&self, value: Option<&str>) -> bool {
622 self.contains(&value, |pattern, s| pattern.matches_str(s))
623 }
624
625 fn contains_path(&self, value: Option<&Path>) -> bool {
626 self.contains(&value, |pattern, path| pattern.matches_path(path))
627 }
628
629 #[cfg(test)]
630 fn contains_test(&self, value: Option<&str>) -> bool {
631 let result = self.contains_str(value);
632 assert_eq!(result, self.contains_path(value.map(std::path::Path::new)));
633 result
634 }
635}
636
637#[configurable_component]
654#[derive(Clone, Debug)]
655#[serde(try_from = "String", into = "String")]
656struct PatternWrapper(Pattern);
657
658impl PatternWrapper {
659 fn matches_str(&self, s: &str) -> bool {
660 self.0.matches(s)
661 }
662
663 fn matches_path(&self, p: &Path) -> bool {
664 self.0.matches_path(p)
665 }
666}
667
668impl TryFrom<String> for PatternWrapper {
669 type Error = PatternError;
670
671 fn try_from(value: String) -> Result<Self, Self::Error> {
672 Pattern::new(value.as_ref()).map(PatternWrapper)
673 }
674}
675
676impl TryFrom<&str> for PatternWrapper {
677 type Error = PatternError;
678
679 fn try_from(value: &str) -> Result<Self, Self::Error> {
680 value.to_string().try_into()
681 }
682}
683
684impl From<PatternWrapper> for String {
685 fn from(pattern: PatternWrapper) -> Self {
686 pattern.0.to_string()
687 }
688}
689
690#[cfg(test)]
691mod tests {
692 use std::{collections::HashSet, future::Future, time::Duration};
693
694 use super::*;
695 use crate::test_util::components::{SOURCE_TAGS, run_and_assert_source_compliance};
696
697 #[test]
698 fn filterlist_default_includes_everything() {
699 let filters = FilterList::default();
700 assert!(filters.contains_test(Some("anything")));
701 assert!(filters.contains_test(Some("should")));
702 assert!(filters.contains_test(Some("work")));
703 assert!(filters.contains_test(None));
704 }
705
706 #[test]
707 fn filterlist_includes_works() {
708 let filters = FilterList {
709 includes: Some(vec![
710 PatternWrapper::try_from("sda".to_string()).unwrap(),
711 PatternWrapper::try_from("dm-*".to_string()).unwrap(),
712 ]),
713 excludes: None,
714 };
715 assert!(!filters.contains_test(Some("sd")));
716 assert!(filters.contains_test(Some("sda")));
717 assert!(!filters.contains_test(Some("sda1")));
718 assert!(filters.contains_test(Some("dm-")));
719 assert!(filters.contains_test(Some("dm-5")));
720 assert!(!filters.contains_test(Some("xda")));
721 assert!(!filters.contains_test(None));
722 }
723
724 #[test]
725 fn filterlist_excludes_works() {
726 let filters = FilterList {
727 includes: None,
728 excludes: Some(vec![
729 PatternWrapper::try_from("sda".to_string()).unwrap(),
730 PatternWrapper::try_from("dm-*".to_string()).unwrap(),
731 ]),
732 };
733 assert!(filters.contains_test(Some("sd")));
734 assert!(!filters.contains_test(Some("sda")));
735 assert!(filters.contains_test(Some("sda1")));
736 assert!(!filters.contains_test(Some("dm-")));
737 assert!(!filters.contains_test(Some("dm-5")));
738 assert!(filters.contains_test(Some("xda")));
739 assert!(filters.contains_test(None));
740 }
741
742 #[test]
743 fn filterlist_includes_and_excludes_works() {
744 let filters = FilterList {
745 includes: Some(vec![
746 PatternWrapper::try_from("sda".to_string()).unwrap(),
747 PatternWrapper::try_from("dm-*".to_string()).unwrap(),
748 ]),
749 excludes: Some(vec![PatternWrapper::try_from("dm-5".to_string()).unwrap()]),
750 };
751 assert!(!filters.contains_test(Some("sd")));
752 assert!(filters.contains_test(Some("sda")));
753 assert!(!filters.contains_test(Some("sda1")));
754 assert!(filters.contains_test(Some("dm-")));
755 assert!(filters.contains_test(Some("dm-1")));
756 assert!(!filters.contains_test(Some("dm-5")));
757 assert!(!filters.contains_test(Some("xda")));
758 assert!(!filters.contains_test(None));
759 }
760
761 #[tokio::test]
762 async fn filters_on_collectors() {
763 let all_metrics_count = HostMetrics::new(HostMetricsConfig::default())
764 .capture_metrics()
765 .await
766 .len();
767
768 for collector in &[
769 #[cfg(target_os = "linux")]
770 Collector::CGroups,
771 Collector::Cpu,
772 Collector::Process,
773 Collector::Disk,
774 Collector::Filesystem,
775 Collector::Load,
776 Collector::Host,
777 Collector::Memory,
778 Collector::Network,
779 ] {
780 let some_metrics = HostMetrics::new(HostMetricsConfig {
781 collectors: Some(vec![*collector]),
782 ..Default::default()
783 })
784 .capture_metrics()
785 .await;
786
787 assert!(
788 all_metrics_count > some_metrics.len(),
789 "collector={collector:?}"
790 );
791 }
792 }
793
794 #[tokio::test]
795 async fn are_tagged_with_hostname() {
796 let metrics = HostMetrics::new(HostMetricsConfig::default())
797 .capture_metrics()
798 .await;
799 let hostname = crate::get_hostname().expect("Broken hostname");
800 assert!(!metrics.into_iter().any(|event| {
801 event
802 .tags()
803 .expect("Missing tags")
804 .get("host")
805 .expect("Missing \"host\" tag")
806 != hostname
807 }));
808 }
809
810 #[tokio::test]
811 async fn uses_custom_namespace() {
812 let metrics = HostMetrics::new(HostMetricsConfig {
813 namespace: Some("other".into()),
814 ..Default::default()
815 })
816 .capture_metrics()
817 .await;
818
819 assert!(
820 metrics
821 .into_iter()
822 .all(|event| event.namespace() == Some("other"))
823 );
824 }
825
826 #[tokio::test]
827 async fn uses_default_namespace() {
828 let metrics = HostMetrics::new(HostMetricsConfig::default())
829 .capture_metrics()
830 .await;
831
832 assert!(
833 metrics
834 .iter()
835 .all(|event| event.namespace() == Some("host"))
836 );
837 }
838
839 #[cfg(not(windows))]
841 #[tokio::test]
842 async fn generates_loadavg_metrics() {
843 let mut buffer = MetricsBuffer::new(None);
844 HostMetrics::new(HostMetricsConfig::default())
845 .loadavg_metrics(&mut buffer)
846 .await;
847 let metrics = buffer.metrics;
848 assert_eq!(metrics.len(), 3);
849 assert!(all_gauges(&metrics));
850
851 assert!(
853 !metrics
854 .iter()
855 .any(|metric| !metric.name().starts_with("load"))
856 );
857 }
858
859 #[tokio::test]
860 async fn generates_host_metrics() {
861 let mut buffer = MetricsBuffer::new(None);
862 HostMetrics::new(HostMetricsConfig::default())
863 .host_metrics(&mut buffer)
864 .await;
865 let metrics = buffer.metrics;
866 assert_eq!(metrics.len(), 2);
867 assert!(all_gauges(&metrics));
868 }
869
870 pub(super) fn all_counters(metrics: &[Metric]) -> bool {
871 !metrics
872 .iter()
873 .any(|metric| !matches!(metric.value(), &MetricValue::Counter { .. }))
874 }
875
876 pub(super) fn all_gauges(metrics: &[Metric]) -> bool {
877 !metrics
878 .iter()
879 .any(|metric| !matches!(metric.value(), &MetricValue::Gauge { .. }))
880 }
881
882 fn all_tags_match(metrics: &[Metric], tag: &str, matches: impl Fn(&str) -> bool) -> bool {
883 !metrics.iter().any(|metric| {
884 metric
885 .tags()
886 .unwrap()
887 .get(tag)
888 .map(|value| !matches(value))
889 .unwrap_or(false)
890 })
891 }
892
893 pub(super) fn count_name(metrics: &[Metric], name: &str) -> usize {
894 metrics
895 .iter()
896 .filter(|metric| metric.name() == name)
897 .count()
898 }
899
900 pub(super) fn count_tag(metrics: &[Metric], tag: &str) -> usize {
901 metrics
902 .iter()
903 .filter(|metric| {
904 metric
905 .tags()
906 .expect("Metric is missing tags")
907 .contains_key(tag)
908 })
909 .count()
910 }
911
912 fn collect_tag_values(metrics: &[Metric], tag: &str) -> HashSet<String> {
913 metrics
914 .iter()
915 .filter_map(|metric| metric.tags().unwrap().get(tag).map(ToOwned::to_owned))
916 .collect::<HashSet<_>>()
917 }
918
919 pub(super) async fn assert_filtered_metrics<Get, Fut>(tag: &str, get_metrics: Get)
921 where
922 Get: Fn(FilterList) -> Fut,
923 Fut: Future<Output = Vec<Metric>>,
924 {
925 let all_metrics = get_metrics(FilterList::default()).await;
926
927 let keys = collect_tag_values(&all_metrics, tag);
928 if let Some(key) = keys.into_iter().next() {
930 #[expect(
931 clippy::string_slice,
932 reason = "index from char_indices, always a char boundary"
933 )]
934 let key_prefix =
935 &key[..key.char_indices().next_back().map_or(0, |(i, _)| i)].to_string();
936 let key_prefix_pattern = PatternWrapper::try_from(format!("{key_prefix}*")).unwrap();
937 let key_pattern = PatternWrapper::try_from(key.clone()).unwrap();
938
939 let filter = FilterList {
940 includes: Some(vec![key_pattern.clone()]),
941 excludes: None,
942 };
943 let filtered_metrics_with = get_metrics(filter).await;
944
945 assert!(filtered_metrics_with.len() <= all_metrics.len());
946 assert!(!filtered_metrics_with.is_empty());
947 assert!(all_tags_match(&filtered_metrics_with, tag, |s| s == key));
948
949 let filter = FilterList {
950 includes: Some(vec![key_prefix_pattern.clone()]),
951 excludes: None,
952 };
953 let filtered_metrics_with_match = get_metrics(filter).await;
954
955 assert!(filtered_metrics_with_match.len() >= filtered_metrics_with.len());
956 assert!(all_tags_match(&filtered_metrics_with_match, tag, |s| {
957 s.starts_with(key_prefix)
958 }));
959
960 let filter = FilterList {
961 includes: None,
962 excludes: Some(vec![key_pattern]),
963 };
964 let filtered_metrics_without = get_metrics(filter).await;
965
966 assert!(filtered_metrics_without.len() <= all_metrics.len());
967 assert!(all_tags_match(&filtered_metrics_without, tag, |s| s != key));
968
969 let filter = FilterList {
970 includes: None,
971 excludes: Some(vec![key_prefix_pattern]),
972 };
973 let filtered_metrics_without_match = get_metrics(filter).await;
974
975 assert!(filtered_metrics_without_match.len() <= filtered_metrics_without.len());
976 assert!(all_tags_match(&filtered_metrics_without_match, tag, |s| {
977 !s.starts_with(key_prefix)
978 }));
979
980 assert!(
981 filtered_metrics_with.len() + filtered_metrics_without.len() <= all_metrics.len()
982 );
983 }
984 }
985
986 #[tokio::test]
987 async fn source_compliance() {
988 let config = HostMetricsConfig {
989 scrape_interval_secs: Duration::from_secs(1),
990 ..Default::default()
991 };
992
993 let events =
994 run_and_assert_source_compliance(config, Duration::from_secs(2), &SOURCE_TAGS).await;
995
996 assert!(!events.is_empty());
997 }
998}