Skip to main content

vector_core/config/
global_options.rs

1use std::{fs::DirBuilder, path::PathBuf, time::Duration};
2
3use snafu::{ResultExt, Snafu};
4use vector_common::TimeZone;
5use vector_config::{configurable_component, impl_generate_config_from_default};
6
7use super::{
8    super::default_data_dir, AcknowledgementsConfig, LogSchema, Telemetry,
9    metrics_expiration::PerMetricSetExpiration, proxy::ProxyConfig,
10};
11use crate::serde::bool_or_struct;
12
13#[expect(
14    clippy::ref_option,
15    reason = "we have to follow the serde calling convention"
16)]
17fn is_default_buffer_utilization_ewma_half_life_seconds(value: &Option<f64>) -> bool {
18    value.is_none_or(|seconds| {
19        seconds == vector_buffers::topology::channel::DEFAULT_EWMA_HALF_LIFE_SECONDS
20    })
21}
22
23#[derive(Debug, Snafu)]
24pub(crate) enum DataDirError {
25    #[snafu(display("data_dir option required, but not given here or globally"))]
26    MissingDataDir,
27    #[snafu(display("data_dir {:?} does not exist", data_dir))]
28    DoesNotExist { data_dir: PathBuf },
29    #[snafu(display("data_dir {:?} is not writable", data_dir))]
30    NotWritable { data_dir: PathBuf },
31    #[snafu(display(
32        "Could not create subdirectory {:?} inside of data dir {:?}: {}",
33        subdir,
34        data_dir,
35        source
36    ))]
37    CouldNotCreate {
38        subdir: PathBuf,
39        data_dir: PathBuf,
40        source: std::io::Error,
41    },
42}
43
44/// Specifies the wildcard matching mode, relaxed allows configurations where wildcard doesn not match any existing inputs
45#[configurable_component]
46#[derive(Clone, Debug, Copy, PartialEq, Eq, Default)]
47#[serde(rename_all = "lowercase")]
48pub enum WildcardMatching {
49    /// Strict matching (must match at least one existing input)
50    #[default]
51    Strict,
52
53    /// Relaxed matching (must match 0 or more inputs)
54    Relaxed,
55}
56
57/// Global configuration options.
58//
59// If this is modified, make sure those changes are reflected in the `ConfigBuilder::append`
60// function!
61#[configurable_component]
62#[derive(Clone, Debug, Default, PartialEq)]
63pub struct GlobalOptions {
64    /// The directory used for persisting Vector state data.
65    ///
66    /// This is the directory where Vector will store any state data, such as disk buffers, file
67    /// checkpoints, and more.
68    ///
69    /// Vector must have write permissions to this directory.
70    #[serde(default = "crate::default_data_dir")]
71    #[configurable(metadata(docs::common = false))]
72    pub data_dir: Option<PathBuf>,
73
74    /// Set wildcard matching mode for inputs
75    ///
76    /// Setting this to "relaxed" allows configurations with wildcards that do not match any inputs
77    /// to be accepted without causing an error.
78    #[serde(skip_serializing_if = "crate::serde::is_default")]
79    #[configurable(metadata(docs::common = false, docs::required = false))]
80    pub wildcard_matching: Option<WildcardMatching>,
81
82    /// Default log schema for all events.
83    ///
84    /// This is used if a component does not have its own specific log schema. All events use a log
85    /// schema, whether or not the default is used, to assign event fields on incoming events.
86    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
87    #[configurable(metadata(
88        docs::common = false,
89        docs::required = false,
90        docs::warnings = "These settings are ignored when `schema.log_namespace` is set to `true`."
91    ))]
92    pub log_schema: LogSchema,
93
94    /// Telemetry options.
95    ///
96    /// Determines whether `source` and `service` tags should be emitted with the
97    /// `component_sent_*` and `component_received_*` events.
98    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
99    #[configurable(metadata(docs::common = false, docs::required = false))]
100    pub telemetry: Telemetry,
101
102    /// The name of the time zone to apply to timestamp conversions that do not contain an explicit time zone.
103    ///
104    /// The time zone name may be any name in the [TZ database][tzdb] or `local` to indicate system
105    /// local time.
106    ///
107    /// Note that in Vector/VRL all timestamps are represented in UTC.
108    ///
109    /// [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
110    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
111    #[configurable(metadata(docs::common = false))]
112    pub timezone: Option<TimeZone>,
113
114    #[configurable(derived)]
115    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
116    #[configurable(metadata(docs::common = false, docs::required = false))]
117    pub proxy: ProxyConfig,
118
119    /// Controls how acknowledgements are handled for all sinks by default.
120    ///
121    /// See [End-to-end Acknowledgements][e2e_acks] for more information on how Vector handles event
122    /// acknowledgement.
123    ///
124    /// [e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/
125    #[serde(
126        default,
127        deserialize_with = "bool_or_struct",
128        skip_serializing_if = "crate::serde::is_default"
129    )]
130    #[configurable(metadata(docs::common = true, docs::required = false))]
131    pub acknowledgements: AcknowledgementsConfig,
132
133    /// The amount of time, in seconds, that internal metrics will persist after having not been
134    /// updated before they expire and are removed.
135    ///
136    /// Deprecated: use `expire_metrics_secs` instead
137    #[configurable(deprecated)]
138    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
139    #[configurable(metadata(docs::hidden))]
140    pub expire_metrics: Option<Duration>,
141
142    /// The amount of time, in seconds, that internal metrics will persist after having not been
143    /// updated before they expire and are removed.
144    ///
145    /// Set this to a value larger than your `internal_metrics` scrape interval (default 5 minutes)
146    /// so metrics live long enough to be emitted and captured.
147    #[serde(skip_serializing_if = "crate::serde::is_default")]
148    #[configurable(metadata(docs::common = false, docs::required = false))]
149    pub expire_metrics_secs: Option<f64>,
150
151    /// This allows configuring different expiration intervals for different metric sets.
152    /// By default this is empty and any metric not matched by one of these sets will use
153    /// the global default value, defined using `expire_metrics_secs`.
154    #[serde(skip_serializing_if = "crate::serde::is_default")]
155    pub expire_metrics_per_metric_set: Option<Vec<PerMetricSetExpiration>>,
156
157    /// The half-life, in seconds, for the exponential weighted moving average (EWMA) of source
158    /// and transform buffer utilization metrics.
159    ///
160    /// This controls how quickly the `*_buffer_utilization_mean` gauges respond to new
161    /// observations. Longer half-lives retain more of the previous value, leading to slower
162    /// adjustments.
163    ///
164    /// - Lower values (< 1): Metrics update quickly but may be volatile
165    /// - Default (5): Balanced between responsiveness and stability
166    /// - Higher values (> 5): Smooth, stable metrics that update slowly
167    ///
168    /// Adjust based on whether you need fast detection of buffer issues (lower)
169    /// or want to see sustained trends without noise (higher).
170    ///
171    /// Must be greater than 0.
172    #[serde(skip_serializing_if = "is_default_buffer_utilization_ewma_half_life_seconds")]
173    #[configurable(validation(range(min = 0.0)))]
174    #[configurable(metadata(docs::advanced))]
175    pub buffer_utilization_ewma_half_life_seconds: Option<f64>,
176
177    /// The alpha value for the exponential weighted moving average (EWMA) of transform latency
178    /// metrics.
179    ///
180    /// This controls how quickly the `component_latency_mean_seconds` gauge responds to new
181    /// observations. Values closer to 1.0 retain more of the previous value, leading to slower
182    /// adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements.
183    ///
184    /// Must be between 0 and 1 exclusively (0 < alpha < 1).
185    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
186    #[configurable(validation(range(min = 0.0, max = 1.0)))]
187    #[configurable(metadata(docs::advanced))]
188    pub latency_ewma_alpha: Option<f64>,
189
190    /// The interval, in seconds, at which the internal metrics cache for VRL is refreshed.
191    /// This must be set to be able to access metrics in VRL functions.
192    ///
193    /// Higher values lead to stale metric values from `get_vector_metric`,
194    /// `find_vector_metrics`, and `aggregate_vector_metrics` functions.
195    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
196    pub metrics_storage_refresh_period: Option<f64>,
197}
198
199impl_generate_config_from_default!(GlobalOptions);
200
201impl GlobalOptions {
202    /// Resolve the `data_dir` option in either the global or local config, and
203    /// validate that it exists and is writable.
204    ///
205    /// # Errors
206    ///
207    /// Function will error if it is unable to make data directory.
208    pub fn resolve_and_validate_data_dir(
209        &self,
210        local_data_dir: Option<&PathBuf>,
211    ) -> crate::Result<PathBuf> {
212        let data_dir = local_data_dir
213            .or(self.data_dir.as_ref())
214            .ok_or(DataDirError::MissingDataDir)
215            .map_err(Box::new)?
216            .clone();
217        if !data_dir.exists() {
218            return Err(DataDirError::DoesNotExist { data_dir }.into());
219        }
220        let readonly =
221            std::fs::metadata(&data_dir).map_or(true, |meta| meta.permissions().readonly());
222        if readonly {
223            return Err(DataDirError::NotWritable { data_dir }.into());
224        }
225        Ok(data_dir)
226    }
227
228    /// Resolve the `data_dir` option using `resolve_and_validate_data_dir` and
229    /// then ensure a named subdirectory exists.
230    ///
231    /// # Errors
232    ///
233    /// Function will error if it is unable to make data subdirectory.
234    pub fn resolve_and_make_data_subdir(
235        &self,
236        local: Option<&PathBuf>,
237        subdir: &str,
238    ) -> crate::Result<PathBuf> {
239        let data_dir = self.resolve_and_validate_data_dir(local)?;
240
241        let mut data_subdir = data_dir.clone();
242        data_subdir.push(subdir);
243
244        DirBuilder::new()
245            .recursive(true)
246            .create(&data_subdir)
247            .with_context(|_| CouldNotCreateSnafu { subdir, data_dir })?;
248        Ok(data_subdir)
249    }
250
251    /// Merge a second global configuration into self, and return the new merged data.
252    ///
253    /// # Errors
254    ///
255    /// Returns a list of textual errors if there is a merge conflict between the two global
256    /// configs.
257    pub fn merge(&self, with: Self) -> Result<Self, Vec<String>> {
258        let mut errors = Vec::new();
259
260        if conflicts(
261            self.wildcard_matching.as_ref(),
262            with.wildcard_matching.as_ref(),
263        ) {
264            errors.push("conflicting values for 'wildcard_matching' found".to_owned());
265        }
266
267        if conflicts(self.proxy.http.as_ref(), with.proxy.http.as_ref()) {
268            errors.push("conflicting values for 'proxy.http' found".to_owned());
269        }
270
271        if conflicts(self.proxy.https.as_ref(), with.proxy.https.as_ref()) {
272            errors.push("conflicting values for 'proxy.https' found".to_owned());
273        }
274
275        if !self.proxy.no_proxy.is_empty() && !with.proxy.no_proxy.is_empty() {
276            errors.push("conflicting values for 'proxy.no_proxy' found".to_owned());
277        }
278
279        if conflicts(self.timezone.as_ref(), with.timezone.as_ref()) {
280            errors.push("conflicting values for 'timezone' found".to_owned());
281        }
282
283        if conflicts(
284            self.acknowledgements.enabled.as_ref(),
285            with.acknowledgements.enabled.as_ref(),
286        ) {
287            errors.push("conflicting values for 'acknowledgements' found".to_owned());
288        }
289
290        if conflicts(self.expire_metrics.as_ref(), with.expire_metrics.as_ref()) {
291            errors.push("conflicting values for 'expire_metrics' found".to_owned());
292        }
293
294        if conflicts(
295            self.expire_metrics_secs.as_ref(),
296            with.expire_metrics_secs.as_ref(),
297        ) {
298            errors.push("conflicting values for 'expire_metrics_secs' found".to_owned());
299        }
300
301        let data_dir = if self.data_dir.is_none() || self.data_dir == default_data_dir() {
302            with.data_dir
303        } else if with.data_dir != default_data_dir() && self.data_dir != with.data_dir {
304            // If two configs both set 'data_dir' and have conflicting values
305            // we consider this an error.
306            errors.push("conflicting values for 'data_dir' found".to_owned());
307            None
308        } else {
309            self.data_dir.clone()
310        };
311
312        // If the user has multiple config files, we must *merge* log schemas
313        // until we meet a conflict, then we are allowed to error.
314        let mut log_schema = self.log_schema.clone();
315        if let Err(merge_errors) = log_schema.merge(&with.log_schema) {
316            errors.extend(merge_errors);
317        }
318
319        let mut telemetry = self.telemetry.clone();
320        telemetry.merge(&with.telemetry);
321
322        let merged_expire_metrics_per_metric_set = match (
323            &self.expire_metrics_per_metric_set,
324            &with.expire_metrics_per_metric_set,
325        ) {
326            (Some(a), Some(b)) => Some(a.iter().chain(b).cloned().collect()),
327            (Some(a), None) => Some(a.clone()),
328            (None, Some(b)) => Some(b.clone()),
329            (None, None) => None,
330        };
331
332        if errors.is_empty() {
333            Ok(Self {
334                data_dir,
335                wildcard_matching: self.wildcard_matching.or(with.wildcard_matching),
336                log_schema,
337                telemetry,
338                acknowledgements: self.acknowledgements.merge_default(&with.acknowledgements),
339                timezone: self.timezone.or(with.timezone),
340                proxy: self.proxy.merge(&with.proxy),
341                expire_metrics: self.expire_metrics.or(with.expire_metrics),
342                expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs),
343                expire_metrics_per_metric_set: merged_expire_metrics_per_metric_set,
344                buffer_utilization_ewma_half_life_seconds: self
345                    .buffer_utilization_ewma_half_life_seconds
346                    .or(with.buffer_utilization_ewma_half_life_seconds),
347                latency_ewma_alpha: self.latency_ewma_alpha.or(with.latency_ewma_alpha),
348                metrics_storage_refresh_period: self
349                    .metrics_storage_refresh_period
350                    .or(with.metrics_storage_refresh_period),
351            })
352        } else {
353            Err(errors)
354        }
355    }
356
357    /// Get the configured time zone, using "local" time if none is set.
358    pub fn timezone(&self) -> TimeZone {
359        self.timezone.unwrap_or(TimeZone::Local)
360    }
361
362    /// Returns a list of top-level field names that differ between two [`GlobalOptions`] values.
363    ///
364    /// This function performs a shallow comparison by serializing both configs to JSON
365    /// and comparing their top-level keys.
366    ///
367    /// Useful for logging which global fields changed during config reload attempts.
368    ///
369    /// # Errors
370    ///
371    /// Returns a [`serde_json::Error`] if either of the [`GlobalOptions`] values
372    /// cannot be serialized into a JSON object. This is unlikely under normal usage,
373    /// but may occur if serialization fails due to unexpected data structures or changes
374    /// in the type definition.
375    pub fn diff(&self, other: &Self) -> Result<Vec<String>, serde_json::Error> {
376        let old_value = serde_json::to_value(self)?;
377        let new_value = serde_json::to_value(other)?;
378
379        let serde_json::Value::Object(old_map) = old_value else {
380            return Ok(vec![]);
381        };
382        let serde_json::Value::Object(new_map) = new_value else {
383            return Ok(vec![]);
384        };
385
386        Ok(old_map
387            .iter()
388            .filter_map(|(k, v_old)| match new_map.get(k) {
389                Some(v_new) if v_new != v_old => Some(k.clone()),
390                _ => None,
391            })
392            .collect())
393    }
394}
395
396fn conflicts<T: PartialEq>(this: Option<&T>, that: Option<&T>) -> bool {
397    matches!((this, that), (Some(this), Some(that)) if this != that)
398}
399
400#[cfg(test)]
401mod tests {
402    use std::fmt::Debug;
403
404    use chrono_tz::Tz;
405
406    use super::*;
407
408    #[test]
409    fn merges_data_dir() {
410        let merge = |a, b| merge("data_dir", a, b, |result| result.data_dir);
411
412        assert_eq!(merge(None, None), Ok(default_data_dir()));
413        assert_eq!(merge(Some("/test1"), None), Ok(Some("/test1".into())));
414        assert_eq!(merge(None, Some("/test2")), Ok(Some("/test2".into())));
415        assert_eq!(
416            merge(Some("/test3"), Some("/test3")),
417            Ok(Some("/test3".into()))
418        );
419        assert_eq!(
420            merge(Some("/test4"), Some("/test5")),
421            Err(vec!["conflicting values for 'data_dir' found".into()])
422        );
423    }
424
425    #[test]
426    fn merges_timezones() {
427        let merge = |a, b| merge("timezone", a, b, |result| result.timezone());
428
429        assert_eq!(merge(None, None), Ok(TimeZone::Local));
430        assert_eq!(merge(Some("local"), None), Ok(TimeZone::Local));
431        assert_eq!(merge(None, Some("local")), Ok(TimeZone::Local));
432        assert_eq!(merge(Some("local"), Some("local")), Ok(TimeZone::Local),);
433        assert_eq!(merge(Some("UTC"), None), Ok(TimeZone::Named(Tz::UTC)));
434        assert_eq!(
435            merge(None, Some("EST5EDT")),
436            Ok(TimeZone::Named(Tz::EST5EDT))
437        );
438        assert_eq!(
439            merge(Some("UTC"), Some("UTC")),
440            Ok(TimeZone::Named(Tz::UTC))
441        );
442        assert_eq!(
443            merge(Some("CST6CDT"), Some("GMT")),
444            Err(vec!["conflicting values for 'timezone' found".into()])
445        );
446    }
447
448    #[test]
449    fn merges_proxy() {
450        // We use the `.http` settings as a proxy for the other settings, as they are all compared
451        // for equality above.
452        let merge = |a, b| merge("proxy.http", a, b, |result| result.proxy.http);
453
454        assert_eq!(merge(None, None), Ok(None));
455        assert_eq!(merge(Some("test1"), None), Ok(Some("test1".into())));
456        assert_eq!(merge(None, Some("test2")), Ok(Some("test2".into())));
457        assert_eq!(
458            merge(Some("test3"), Some("test3")),
459            Ok(Some("test3".into()))
460        );
461        assert_eq!(
462            merge(Some("test4"), Some("test5")),
463            Err(vec!["conflicting values for 'proxy.http' found".into()])
464        );
465    }
466
467    #[test]
468    fn merges_acknowledgements() {
469        let merge = |a, b| merge("acknowledgements", a, b, |result| result.acknowledgements);
470
471        assert_eq!(merge(None, None), Ok(None.into()));
472        assert_eq!(merge(Some(false), None), Ok(false.into()));
473        assert_eq!(merge(Some(true), None), Ok(true.into()));
474        assert_eq!(merge(None, Some(false)), Ok(false.into()));
475        assert_eq!(merge(None, Some(true)), Ok(true.into()));
476        assert_eq!(merge(Some(false), Some(false)), Ok(false.into()));
477        assert_eq!(merge(Some(true), Some(true)), Ok(true.into()));
478        assert_eq!(
479            merge(Some(false), Some(true)),
480            Err(vec![
481                "conflicting values for 'acknowledgements' found".into()
482            ])
483        );
484        assert_eq!(
485            merge(Some(true), Some(false)),
486            Err(vec![
487                "conflicting values for 'acknowledgements' found".into()
488            ])
489        );
490    }
491
492    #[test]
493    fn merges_expire_metrics() {
494        let merge = |a, b| {
495            merge("expire_metrics_secs", a, b, |result| {
496                result.expire_metrics_secs
497            })
498        };
499
500        assert_eq!(merge(None, None), Ok(None));
501        assert_eq!(merge(Some(1.0), None), Ok(Some(1.0)));
502        assert_eq!(merge(None, Some(2.0)), Ok(Some(2.0)));
503        assert_eq!(merge(Some(3.0), Some(3.0)), Ok(Some(3.0)));
504        assert_eq!(
505            merge(Some(4.0), Some(5.0)),
506            Err(vec![
507                "conflicting values for 'expire_metrics_secs' found".into()
508            ])
509        );
510    }
511
512    #[test]
513    fn diff_detects_changed_keys() {
514        let old = GlobalOptions {
515            data_dir: Some(std::path::PathBuf::from("/path1")),
516            ..Default::default()
517        };
518        let new = GlobalOptions {
519            data_dir: Some(std::path::PathBuf::from("/path2")),
520            ..Default::default()
521        };
522        assert_eq!(
523            old.diff(&new).expect("diff failed"),
524            vec!["data_dir".to_string()]
525        );
526    }
527
528    fn merge<P: Debug, T>(
529        name: &str,
530        dd1: Option<P>,
531        dd2: Option<P>,
532        result: impl Fn(GlobalOptions) -> T,
533    ) -> Result<T, Vec<String>> {
534        // Use TOML parsing to match the behavior of what a user would actually configure.
535        make_config(name, dd1)
536            .merge(make_config(name, dd2))
537            .map(result)
538    }
539
540    fn make_config<P: Debug>(name: &str, value: Option<P>) -> GlobalOptions {
541        toml::from_str(&value.map_or(String::new(), |value| format!(r"{name} = {value:?}")))
542            .unwrap()
543    }
544}