Skip to main content

vector/sources/host_metrics/
mod.rs

1use std::{
2    path::{Path, PathBuf},
3    time::Duration,
4};
5
6use chrono::{DateTime, Utc};
7use futures::StreamExt;
8use glob::{Pattern, PatternError};
9#[cfg(not(windows))]
10use heim::units::ratio::ratio;
11use heim::units::time::second;
12use serde_with::serde_as;
13use sysinfo::{Components, System};
14use tokio::time;
15use tokio_stream::wrappers::IntervalStream;
16use vector_lib::{
17    EstimatedJsonEncodedSizeOf,
18    config::LogNamespace,
19    configurable::configurable_component,
20    internal_event::{
21        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
22    },
23};
24
25use crate::{
26    SourceSender,
27    config::{SourceConfig, SourceContext, SourceOutput},
28    event::metric::{Metric, MetricKind, MetricTags, MetricValue},
29    internal_events::{EventsReceived, HostMetricsScrapeDetailError, StreamClosedError},
30    shutdown::ShutdownSignal,
31};
32
33#[cfg(target_os = "linux")]
34mod cgroups;
35mod cpu;
36mod disk;
37mod filesystem;
38mod memory;
39mod network;
40mod process;
41#[cfg(target_os = "linux")]
42mod tcp;
43mod temperature;
44
45/// Collector types.
46#[serde_as]
47#[configurable_component]
48#[derive(Clone, Copy, Debug, Eq, PartialEq)]
49#[serde(rename_all = "lowercase")]
50pub enum Collector {
51    /// Metrics related to Linux control groups.
52    ///
53    /// Only available on Linux.
54    CGroups,
55
56    /// Metrics related to CPU utilization.
57    Cpu,
58
59    /// Metrics related to Process utilization.
60    Process,
61
62    /// Metrics related to disk I/O utilization.
63    Disk,
64
65    /// Metrics related to filesystem space utilization.
66    Filesystem,
67
68    /// Metrics related to the system load average.
69    Load,
70
71    /// Metrics related to the host.
72    Host,
73
74    /// Metrics related to memory utilization.
75    Memory,
76
77    /// Metrics related to network utilization.
78    Network,
79
80    /// Metrics related to TCP connections.
81    TCP,
82
83    /// Metrics related to component temperatures.
84    Temperature,
85}
86
87/// Filtering configuration.
88#[configurable_component]
89#[derive(Clone, Debug, Default)]
90struct FilterList {
91    /// Any patterns which should be included.
92    ///
93    /// The patterns are matched using globbing.
94    includes: Option<Vec<PatternWrapper>>,
95
96    /// Any patterns which should be excluded.
97    ///
98    /// The patterns are matched using globbing.
99    excludes: Option<Vec<PatternWrapper>>,
100}
101
102/// Configuration for the `host_metrics` source.
103#[serde_as]
104#[configurable_component(source("host_metrics", "Collect metric data from the local system."))]
105#[derive(Clone, Debug, Derivative)]
106#[derivative(Default)]
107#[serde(deny_unknown_fields)]
108pub struct HostMetricsConfig {
109    /// The interval between metric gathering, in seconds.
110    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
111    #[serde(default = "default_scrape_interval")]
112    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
113    pub scrape_interval_secs: Duration,
114
115    /// The list of host metric collector services to use.
116    ///
117    /// Defaults to all collectors.
118    #[configurable(metadata(docs::examples = "example_collectors()"))]
119    #[derivative(Default(value = "default_collectors()"))]
120    #[serde(default = "default_collectors")]
121    pub collectors: Option<Vec<Collector>>,
122
123    /// Overrides the default namespace for the metrics emitted by the source.
124    #[derivative(Default(value = "default_namespace()"))]
125    #[serde(default = "default_namespace")]
126    pub namespace: Option<String>,
127
128    #[configurable(derived)]
129    #[derivative(Default(value = "default_cgroups_config()"))]
130    #[serde(default = "default_cgroups_config")]
131    pub cgroups: Option<CGroupsConfig>,
132
133    #[configurable(derived)]
134    #[serde(default)]
135    pub disk: disk::DiskConfig,
136
137    #[configurable(derived)]
138    #[serde(default)]
139    pub filesystem: filesystem::FilesystemConfig,
140
141    #[configurable(derived)]
142    #[serde(default)]
143    pub network: network::NetworkConfig,
144
145    #[configurable(derived)]
146    #[serde(default)]
147    pub process: process::ProcessConfig,
148}
149
150/// Options for the cgroups (controller groups) metrics collector.
151///
152/// This collector is only available on Linux systems, and only supports either version 2 or hybrid cgroups.
153#[configurable_component]
154#[derive(Clone, Debug, Derivative)]
155#[derivative(Default)]
156#[serde(default)]
157pub struct CGroupsConfig {
158    /// The number of levels of the cgroups hierarchy for which to report metrics.
159    ///
160    /// A value of `1` means the root or named cgroup.
161    #[derivative(Default(value = "default_levels()"))]
162    #[serde(default = "default_levels")]
163    #[configurable(metadata(docs::examples = 1))]
164    #[configurable(metadata(docs::examples = 3))]
165    levels: usize,
166
167    /// The base cgroup name to provide metrics for.
168    #[configurable(metadata(docs::examples = "/"))]
169    #[configurable(metadata(docs::examples = "system.slice/snapd.service"))]
170    pub(super) base: Option<PathBuf>,
171
172    /// Lists of cgroup name patterns to include or exclude in gathering
173    /// usage metrics.
174    #[configurable(metadata(docs::examples = "example_cgroups()"))]
175    #[serde(default = "default_all_devices")]
176    groups: FilterList,
177
178    /// Base cgroup directory, for testing use only
179    #[serde(skip_serializing)]
180    #[configurable(metadata(docs::hidden))]
181    #[configurable(metadata(docs::human_name = "Base Directory"))]
182    base_dir: Option<PathBuf>,
183}
184
185const fn default_scrape_interval() -> Duration {
186    Duration::from_secs(15)
187}
188
189pub fn default_namespace() -> Option<String> {
190    Some(String::from("host"))
191}
192
193const fn example_collectors() -> [&'static str; 10] {
194    [
195        "cgroups",
196        "cpu",
197        "disk",
198        "filesystem",
199        "load",
200        "host",
201        "memory",
202        "network",
203        "tcp",
204        "temperature",
205    ]
206}
207
208fn default_collectors() -> Option<Vec<Collector>> {
209    let mut collectors = vec![
210        Collector::Cpu,
211        Collector::Disk,
212        Collector::Filesystem,
213        Collector::Load,
214        Collector::Host,
215        Collector::Memory,
216        Collector::Network,
217        Collector::Process,
218    ];
219
220    #[cfg(target_os = "linux")]
221    {
222        collectors.push(Collector::CGroups);
223        collectors.push(Collector::TCP);
224    }
225    #[cfg(not(target_os = "linux"))]
226    if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
227        collectors.push(Collector::CGroups);
228        collectors.push(Collector::TCP);
229    }
230
231    Some(collectors)
232}
233
234fn example_devices() -> FilterList {
235    FilterList {
236        includes: Some(vec!["sda".try_into().unwrap()]),
237        excludes: Some(vec!["dm-*".try_into().unwrap()]),
238    }
239}
240
241fn default_all_devices() -> FilterList {
242    FilterList {
243        includes: Some(vec!["*".try_into().unwrap()]),
244        excludes: None,
245    }
246}
247
248fn example_processes() -> FilterList {
249    FilterList {
250        includes: Some(vec!["docker".try_into().unwrap()]),
251        excludes: None,
252    }
253}
254
255fn default_all_processes() -> FilterList {
256    FilterList {
257        includes: Some(vec!["*".try_into().unwrap()]),
258        excludes: None,
259    }
260}
261
262const fn default_levels() -> usize {
263    100
264}
265
266fn example_cgroups() -> FilterList {
267    FilterList {
268        includes: Some(vec!["user.slice/*".try_into().unwrap()]),
269        excludes: Some(vec!["*.service".try_into().unwrap()]),
270    }
271}
272
273fn default_cgroups_config() -> Option<CGroupsConfig> {
274    // Check env variable to allow generating docs on non-linux systems.
275    if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
276        return Some(CGroupsConfig::default());
277    }
278
279    #[cfg(not(target_os = "linux"))]
280    {
281        None
282    }
283
284    #[cfg(target_os = "linux")]
285    {
286        Some(CGroupsConfig::default())
287    }
288}
289
290impl_generate_config_from_default!(HostMetricsConfig);
291
292#[async_trait::async_trait]
293#[typetag::serde(name = "host_metrics")]
294impl SourceConfig for HostMetricsConfig {
295    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
296        init_roots();
297
298        #[cfg(not(target_os = "linux"))]
299        {
300            if self.cgroups.is_some() || self.has_collector(Collector::CGroups) {
301                return Err("CGroups collector is only available on Linux systems".into());
302            }
303            if self.has_collector(Collector::TCP) {
304                return Err("TCP collector is only available on Linux systems".into());
305            }
306        }
307
308        let mut config = self.clone();
309        config.namespace = config.namespace.filter(|namespace| !namespace.is_empty());
310
311        Ok(Box::pin(config.run(cx.out, cx.shutdown)))
312    }
313
314    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
315        vec![SourceOutput::new_metrics()]
316    }
317
318    fn can_acknowledge(&self) -> bool {
319        false
320    }
321}
322
323impl HostMetricsConfig {
324    /// Set the interval to collect internal metrics.
325    pub fn scrape_interval_secs(&mut self, value: f64) {
326        self.scrape_interval_secs = Duration::from_secs_f64(value);
327    }
328
329    async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
330        let duration = self.scrape_interval_secs;
331        let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
332
333        let mut generator = HostMetrics::new(self);
334
335        let bytes_received = register!(BytesReceived::from(Protocol::NONE));
336
337        while interval.next().await.is_some() {
338            bytes_received.emit(ByteSize(0));
339            let metrics = generator.capture_metrics().await;
340            let count = metrics.len();
341            if (out.send_batch(metrics).await).is_err() {
342                emit!(StreamClosedError { count });
343                return Err(());
344            }
345        }
346
347        Ok(())
348    }
349
350    fn has_collector(&self, collector: Collector) -> bool {
351        match &self.collectors {
352            None => true,
353            Some(collectors) => collectors.contains(&collector),
354        }
355    }
356}
357
358pub struct HostMetrics {
359    config: HostMetricsConfig,
360    system: System,
361    // Kept across scrapes so that sysinfo-derived values such as
362    // `Component::max()` retain their refresh history instead of resetting on
363    // every collection (see `temperature_metrics`).
364    components: Components,
365    #[cfg(target_os = "linux")]
366    root_cgroup: Option<cgroups::CGroupRoot>,
367    events_received: Registered<EventsReceived>,
368}
369
370impl HostMetrics {
371    #[cfg(not(target_os = "linux"))]
372    pub fn new(config: HostMetricsConfig) -> Self {
373        Self {
374            config,
375            system: System::new(),
376            components: Components::new_with_refreshed_list(),
377            events_received: register!(EventsReceived),
378        }
379    }
380
381    #[cfg(target_os = "linux")]
382    pub fn new(config: HostMetricsConfig) -> Self {
383        let cgroups = config.cgroups.clone().unwrap_or_default();
384        let root_cgroup = cgroups::CGroupRoot::new(&cgroups);
385        Self {
386            config,
387            system: System::new(),
388            components: Components::new_with_refreshed_list(),
389            root_cgroup,
390            events_received: register!(EventsReceived),
391        }
392    }
393
394    pub fn buffer(&self) -> MetricsBuffer {
395        MetricsBuffer::new(self.config.namespace.clone())
396    }
397
398    async fn capture_metrics(&mut self) -> Vec<Metric> {
399        let mut buffer = self.buffer();
400
401        #[cfg(target_os = "linux")]
402        if self.config.has_collector(Collector::CGroups) {
403            self.cgroups_metrics(&mut buffer).await;
404        }
405        if self.config.has_collector(Collector::Cpu) {
406            self.cpu_metrics(&mut buffer).await;
407        }
408        if self.config.has_collector(Collector::Process) {
409            self.process_metrics(&mut buffer).await;
410        }
411        if self.config.has_collector(Collector::Disk) {
412            self.disk_metrics(&mut buffer).await;
413        }
414        if self.config.has_collector(Collector::Filesystem) {
415            self.filesystem_metrics(&mut buffer).await;
416        }
417        if self.config.has_collector(Collector::Load) {
418            self.loadavg_metrics(&mut buffer).await;
419        }
420        if self.config.has_collector(Collector::Host) {
421            self.host_metrics(&mut buffer).await;
422        }
423        if self.config.has_collector(Collector::Memory) {
424            self.memory_metrics(&mut buffer).await;
425            self.swap_metrics(&mut buffer).await;
426        }
427        if self.config.has_collector(Collector::Network) {
428            self.network_metrics(&mut buffer).await;
429        }
430        #[cfg(target_os = "linux")]
431        if self.config.has_collector(Collector::TCP) {
432            self.tcp_metrics(&mut buffer).await;
433        }
434        if self.config.has_collector(Collector::Temperature) {
435            self.temperature_metrics(&mut buffer).await;
436        }
437
438        let metrics = buffer.metrics;
439        self.events_received.emit(CountByteSize(
440            metrics.len(),
441            metrics.estimated_json_encoded_size_of(),
442        ));
443        metrics
444    }
445
446    pub async fn loadavg_metrics(&self, output: &mut MetricsBuffer) {
447        output.name = "load";
448        #[cfg(unix)]
449        match heim::cpu::os::unix::loadavg().await {
450            Ok(loadavg) => {
451                output.gauge(
452                    "load1",
453                    loadavg.0.get::<ratio>() as f64,
454                    MetricTags::default(),
455                );
456                output.gauge(
457                    "load5",
458                    loadavg.1.get::<ratio>() as f64,
459                    MetricTags::default(),
460                );
461                output.gauge(
462                    "load15",
463                    loadavg.2.get::<ratio>() as f64,
464                    MetricTags::default(),
465                );
466            }
467            Err(error) => {
468                emit!(HostMetricsScrapeDetailError {
469                    message: "Failed to load average info",
470                    error,
471                });
472            }
473        }
474    }
475
476    pub async fn host_metrics(&self, output: &mut MetricsBuffer) {
477        output.name = "host";
478        match heim::host::uptime().await {
479            Ok(time) => output.gauge("uptime", time.get::<second>(), MetricTags::default()),
480            Err(error) => {
481                emit!(HostMetricsScrapeDetailError {
482                    message: "Failed to load host uptime info",
483                    error,
484                });
485            }
486        }
487
488        match heim::host::boot_time().await {
489            Ok(time) => output.gauge("boot_time", time.get::<second>(), MetricTags::default()),
490            Err(error) => {
491                emit!(HostMetricsScrapeDetailError {
492                    message: "Failed to load host boot time info",
493                    error,
494                });
495            }
496        }
497    }
498}
499
500#[derive(Default)]
501pub struct MetricsBuffer {
502    pub metrics: Vec<Metric>,
503    name: &'static str,
504    host: Option<String>,
505    timestamp: DateTime<Utc>,
506    namespace: Option<String>,
507}
508
509impl MetricsBuffer {
510    fn new(namespace: Option<String>) -> Self {
511        Self {
512            metrics: Vec::new(),
513            name: "",
514            host: crate::get_hostname().ok(),
515            timestamp: Utc::now(),
516            namespace,
517        }
518    }
519
520    fn tags(&self, mut tags: MetricTags) -> MetricTags {
521        tags.replace("collector".into(), self.name.to_string());
522        if let Some(host) = &self.host {
523            tags.replace("host".into(), host.clone());
524        }
525        tags
526    }
527
528    fn counter(&mut self, name: &str, value: f64, tags: MetricTags) {
529        self.metrics.push(
530            Metric::new(name, MetricKind::Absolute, MetricValue::Counter { value })
531                .with_namespace(self.namespace.clone())
532                .with_tags(Some(self.tags(tags)))
533                .with_timestamp(Some(self.timestamp)),
534        )
535    }
536
537    fn gauge(&mut self, name: &str, value: f64, tags: MetricTags) {
538        self.metrics.push(
539            Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
540                .with_namespace(self.namespace.clone())
541                .with_tags(Some(self.tags(tags)))
542                .with_timestamp(Some(self.timestamp)),
543        )
544    }
545}
546
547fn filter_result_sync<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
548where
549    E: std::error::Error,
550{
551    result
552        .map_err(|error| emit!(HostMetricsScrapeDetailError { message, error }))
553        .ok()
554}
555
556async fn filter_result<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
557where
558    E: std::error::Error,
559{
560    filter_result_sync(result, message)
561}
562
563#[allow(clippy::missing_const_for_fn)]
564fn init_roots() {
565    #[cfg(target_os = "linux")]
566    {
567        use std::sync::Once;
568
569        static INIT: Once = Once::new();
570
571        INIT.call_once(|| {
572            match std::env::var_os("PROCFS_ROOT") {
573                Some(procfs_root) => {
574                    info!(
575                        message = "PROCFS_ROOT is set in envvars. Using custom for procfs.",
576                        custom = ?procfs_root
577                    );
578                    heim::os::linux::set_procfs_root(std::path::PathBuf::from(&procfs_root));
579                }
580                None => info!("PROCFS_ROOT is unset. Using default '/proc' for procfs root."),
581            };
582
583            match std::env::var_os("SYSFS_ROOT") {
584                Some(sysfs_root) => {
585                    info!(
586                        message = "SYSFS_ROOT is set in envvars. Using custom for sysfs.",
587                        custom = ?sysfs_root
588                    );
589                    heim::os::linux::set_sysfs_root(std::path::PathBuf::from(&sysfs_root));
590                }
591                None => info!("SYSFS_ROOT is unset. Using default '/sys' for sysfs root."),
592            }
593        });
594    };
595}
596
597impl FilterList {
598    fn contains<T, M>(&self, value: &Option<T>, matches: M) -> bool
599    where
600        M: Fn(&PatternWrapper, &T) -> bool,
601    {
602        (match (&self.includes, value) {
603            // No includes list includes everything
604            (None, _) => true,
605            // Includes list matched against empty value returns false
606            (Some(_), None) => false,
607            // Otherwise find the given value
608            (Some(includes), Some(value)) => includes.iter().any(|pattern| matches(pattern, value)),
609        }) && match (&self.excludes, value) {
610            // No excludes, list excludes nothing
611            (None, _) => true,
612            // No value, never excluded
613            (Some(_), None) => true,
614            // Otherwise find the given value
615            (Some(excludes), Some(value)) => {
616                !excludes.iter().any(|pattern| matches(pattern, value))
617            }
618        }
619    }
620
621    fn contains_str(&self, value: Option<&str>) -> bool {
622        self.contains(&value, |pattern, s| pattern.matches_str(s))
623    }
624
625    fn contains_path(&self, value: Option<&Path>) -> bool {
626        self.contains(&value, |pattern, path| pattern.matches_path(path))
627    }
628
629    #[cfg(test)]
630    fn contains_test(&self, value: Option<&str>) -> bool {
631        let result = self.contains_str(value);
632        assert_eq!(result, self.contains_path(value.map(std::path::Path::new)));
633        result
634    }
635}
636
637/// A compiled Unix shell-style pattern.
638///
639/// - `?` matches any single character.
640/// - `*` matches any (possibly empty) sequence of characters.
641/// - `**` matches the current directory and arbitrary subdirectories. This sequence must form a single path component,
642///   so both `**a` and `b**` are invalid and will result in an error. A sequence of more than two consecutive `*`
643///   characters is also invalid.
644/// - `[...]` matches any character inside the brackets. Character sequences can also specify ranges of characters, as
645///   ordered by Unicode, so e.g. `[0-9]` specifies any character between 0 and 9 inclusive. An unclosed bracket is
646///   invalid.
647/// - `[!...]` is the negation of `[...]`, i.e. it matches any characters not in the brackets.
648///
649/// The metacharacters `?`, `*`, `[`, `]` can be matched by using brackets (e.g. `[?]`). When a `]` occurs immediately
650/// following `[` or `[!` then it is interpreted as being part of, rather then ending, the character set, so `]` and NOT
651/// `]` can be matched by `[]]` and `[!]]` respectively. The `-` character can be specified inside a character sequence
652/// pattern by placing it at the start or the end, e.g. `[abc-]`.
653#[configurable_component]
654#[derive(Clone, Debug)]
655#[serde(try_from = "String", into = "String")]
656struct PatternWrapper(Pattern);
657
658impl PatternWrapper {
659    fn matches_str(&self, s: &str) -> bool {
660        self.0.matches(s)
661    }
662
663    fn matches_path(&self, p: &Path) -> bool {
664        self.0.matches_path(p)
665    }
666}
667
668impl TryFrom<String> for PatternWrapper {
669    type Error = PatternError;
670
671    fn try_from(value: String) -> Result<Self, Self::Error> {
672        Pattern::new(value.as_ref()).map(PatternWrapper)
673    }
674}
675
676impl TryFrom<&str> for PatternWrapper {
677    type Error = PatternError;
678
679    fn try_from(value: &str) -> Result<Self, Self::Error> {
680        value.to_string().try_into()
681    }
682}
683
684impl From<PatternWrapper> for String {
685    fn from(pattern: PatternWrapper) -> Self {
686        pattern.0.to_string()
687    }
688}
689
690#[cfg(test)]
691mod tests {
692    use std::{collections::HashSet, future::Future, time::Duration};
693
694    use super::*;
695    use crate::test_util::components::{SOURCE_TAGS, run_and_assert_source_compliance};
696
697    #[test]
698    fn filterlist_default_includes_everything() {
699        let filters = FilterList::default();
700        assert!(filters.contains_test(Some("anything")));
701        assert!(filters.contains_test(Some("should")));
702        assert!(filters.contains_test(Some("work")));
703        assert!(filters.contains_test(None));
704    }
705
706    #[test]
707    fn filterlist_includes_works() {
708        let filters = FilterList {
709            includes: Some(vec![
710                PatternWrapper::try_from("sda".to_string()).unwrap(),
711                PatternWrapper::try_from("dm-*".to_string()).unwrap(),
712            ]),
713            excludes: None,
714        };
715        assert!(!filters.contains_test(Some("sd")));
716        assert!(filters.contains_test(Some("sda")));
717        assert!(!filters.contains_test(Some("sda1")));
718        assert!(filters.contains_test(Some("dm-")));
719        assert!(filters.contains_test(Some("dm-5")));
720        assert!(!filters.contains_test(Some("xda")));
721        assert!(!filters.contains_test(None));
722    }
723
724    #[test]
725    fn filterlist_excludes_works() {
726        let filters = FilterList {
727            includes: None,
728            excludes: Some(vec![
729                PatternWrapper::try_from("sda".to_string()).unwrap(),
730                PatternWrapper::try_from("dm-*".to_string()).unwrap(),
731            ]),
732        };
733        assert!(filters.contains_test(Some("sd")));
734        assert!(!filters.contains_test(Some("sda")));
735        assert!(filters.contains_test(Some("sda1")));
736        assert!(!filters.contains_test(Some("dm-")));
737        assert!(!filters.contains_test(Some("dm-5")));
738        assert!(filters.contains_test(Some("xda")));
739        assert!(filters.contains_test(None));
740    }
741
742    #[test]
743    fn filterlist_includes_and_excludes_works() {
744        let filters = FilterList {
745            includes: Some(vec![
746                PatternWrapper::try_from("sda".to_string()).unwrap(),
747                PatternWrapper::try_from("dm-*".to_string()).unwrap(),
748            ]),
749            excludes: Some(vec![PatternWrapper::try_from("dm-5".to_string()).unwrap()]),
750        };
751        assert!(!filters.contains_test(Some("sd")));
752        assert!(filters.contains_test(Some("sda")));
753        assert!(!filters.contains_test(Some("sda1")));
754        assert!(filters.contains_test(Some("dm-")));
755        assert!(filters.contains_test(Some("dm-1")));
756        assert!(!filters.contains_test(Some("dm-5")));
757        assert!(!filters.contains_test(Some("xda")));
758        assert!(!filters.contains_test(None));
759    }
760
761    #[tokio::test]
762    async fn filters_on_collectors() {
763        let all_metrics_count = HostMetrics::new(HostMetricsConfig::default())
764            .capture_metrics()
765            .await
766            .len();
767
768        for collector in &[
769            #[cfg(target_os = "linux")]
770            Collector::CGroups,
771            Collector::Cpu,
772            Collector::Process,
773            Collector::Disk,
774            Collector::Filesystem,
775            Collector::Load,
776            Collector::Host,
777            Collector::Memory,
778            Collector::Network,
779        ] {
780            let some_metrics = HostMetrics::new(HostMetricsConfig {
781                collectors: Some(vec![*collector]),
782                ..Default::default()
783            })
784            .capture_metrics()
785            .await;
786
787            assert!(
788                all_metrics_count > some_metrics.len(),
789                "collector={collector:?}"
790            );
791        }
792    }
793
794    #[tokio::test]
795    async fn are_tagged_with_hostname() {
796        let metrics = HostMetrics::new(HostMetricsConfig::default())
797            .capture_metrics()
798            .await;
799        let hostname = crate::get_hostname().expect("Broken hostname");
800        assert!(!metrics.into_iter().any(|event| {
801            event
802                .tags()
803                .expect("Missing tags")
804                .get("host")
805                .expect("Missing \"host\" tag")
806                != hostname
807        }));
808    }
809
810    #[tokio::test]
811    async fn uses_custom_namespace() {
812        let metrics = HostMetrics::new(HostMetricsConfig {
813            namespace: Some("other".into()),
814            ..Default::default()
815        })
816        .capture_metrics()
817        .await;
818
819        assert!(
820            metrics
821                .into_iter()
822                .all(|event| event.namespace() == Some("other"))
823        );
824    }
825
826    #[tokio::test]
827    async fn uses_default_namespace() {
828        let metrics = HostMetrics::new(HostMetricsConfig::default())
829            .capture_metrics()
830            .await;
831
832        assert!(
833            metrics
834                .iter()
835                .all(|event| event.namespace() == Some("host"))
836        );
837    }
838
839    // Windows does not produce load average metrics.
840    #[cfg(not(windows))]
841    #[tokio::test]
842    async fn generates_loadavg_metrics() {
843        let mut buffer = MetricsBuffer::new(None);
844        HostMetrics::new(HostMetricsConfig::default())
845            .loadavg_metrics(&mut buffer)
846            .await;
847        let metrics = buffer.metrics;
848        assert_eq!(metrics.len(), 3);
849        assert!(all_gauges(&metrics));
850
851        // All metrics are named load*
852        assert!(
853            !metrics
854                .iter()
855                .any(|metric| !metric.name().starts_with("load"))
856        );
857    }
858
859    #[tokio::test]
860    async fn generates_host_metrics() {
861        let mut buffer = MetricsBuffer::new(None);
862        HostMetrics::new(HostMetricsConfig::default())
863            .host_metrics(&mut buffer)
864            .await;
865        let metrics = buffer.metrics;
866        assert_eq!(metrics.len(), 2);
867        assert!(all_gauges(&metrics));
868    }
869
870    pub(super) fn all_counters(metrics: &[Metric]) -> bool {
871        !metrics
872            .iter()
873            .any(|metric| !matches!(metric.value(), &MetricValue::Counter { .. }))
874    }
875
876    pub(super) fn all_gauges(metrics: &[Metric]) -> bool {
877        !metrics
878            .iter()
879            .any(|metric| !matches!(metric.value(), &MetricValue::Gauge { .. }))
880    }
881
882    fn all_tags_match(metrics: &[Metric], tag: &str, matches: impl Fn(&str) -> bool) -> bool {
883        !metrics.iter().any(|metric| {
884            metric
885                .tags()
886                .unwrap()
887                .get(tag)
888                .map(|value| !matches(value))
889                .unwrap_or(false)
890        })
891    }
892
893    pub(super) fn count_name(metrics: &[Metric], name: &str) -> usize {
894        metrics
895            .iter()
896            .filter(|metric| metric.name() == name)
897            .count()
898    }
899
900    pub(super) fn count_tag(metrics: &[Metric], tag: &str) -> usize {
901        metrics
902            .iter()
903            .filter(|metric| {
904                metric
905                    .tags()
906                    .expect("Metric is missing tags")
907                    .contains_key(tag)
908            })
909            .count()
910    }
911
912    fn collect_tag_values(metrics: &[Metric], tag: &str) -> HashSet<String> {
913        metrics
914            .iter()
915            .filter_map(|metric| metric.tags().unwrap().get(tag).map(ToOwned::to_owned))
916            .collect::<HashSet<_>>()
917    }
918
919    // Run a series of tests using filters to ensure they are obeyed
920    pub(super) async fn assert_filtered_metrics<Get, Fut>(tag: &str, get_metrics: Get)
921    where
922        Get: Fn(FilterList) -> Fut,
923        Fut: Future<Output = Vec<Metric>>,
924    {
925        let all_metrics = get_metrics(FilterList::default()).await;
926
927        let keys = collect_tag_values(&all_metrics, tag);
928        // Pick an arbitrary key value
929        if let Some(key) = keys.into_iter().next() {
930            #[expect(
931                clippy::string_slice,
932                reason = "index from char_indices, always a char boundary"
933            )]
934            let key_prefix =
935                &key[..key.char_indices().next_back().map_or(0, |(i, _)| i)].to_string();
936            let key_prefix_pattern = PatternWrapper::try_from(format!("{key_prefix}*")).unwrap();
937            let key_pattern = PatternWrapper::try_from(key.clone()).unwrap();
938
939            let filter = FilterList {
940                includes: Some(vec![key_pattern.clone()]),
941                excludes: None,
942            };
943            let filtered_metrics_with = get_metrics(filter).await;
944
945            assert!(filtered_metrics_with.len() <= all_metrics.len());
946            assert!(!filtered_metrics_with.is_empty());
947            assert!(all_tags_match(&filtered_metrics_with, tag, |s| s == key));
948
949            let filter = FilterList {
950                includes: Some(vec![key_prefix_pattern.clone()]),
951                excludes: None,
952            };
953            let filtered_metrics_with_match = get_metrics(filter).await;
954
955            assert!(filtered_metrics_with_match.len() >= filtered_metrics_with.len());
956            assert!(all_tags_match(&filtered_metrics_with_match, tag, |s| {
957                s.starts_with(key_prefix)
958            }));
959
960            let filter = FilterList {
961                includes: None,
962                excludes: Some(vec![key_pattern]),
963            };
964            let filtered_metrics_without = get_metrics(filter).await;
965
966            assert!(filtered_metrics_without.len() <= all_metrics.len());
967            assert!(all_tags_match(&filtered_metrics_without, tag, |s| s != key));
968
969            let filter = FilterList {
970                includes: None,
971                excludes: Some(vec![key_prefix_pattern]),
972            };
973            let filtered_metrics_without_match = get_metrics(filter).await;
974
975            assert!(filtered_metrics_without_match.len() <= filtered_metrics_without.len());
976            assert!(all_tags_match(&filtered_metrics_without_match, tag, |s| {
977                !s.starts_with(key_prefix)
978            }));
979
980            assert!(
981                filtered_metrics_with.len() + filtered_metrics_without.len() <= all_metrics.len()
982            );
983        }
984    }
985
986    #[tokio::test]
987    async fn source_compliance() {
988        let config = HostMetricsConfig {
989            scrape_interval_secs: Duration::from_secs(1),
990            ..Default::default()
991        };
992
993        let events =
994            run_and_assert_source_compliance(config, Duration::from_secs(2), &SOURCE_TAGS).await;
995
996        assert!(!events.is_empty());
997    }
998}