1use std::{fs::DirBuilder, path::PathBuf, time::Duration};
2
3use snafu::{ResultExt, Snafu};
4use vector_common::TimeZone;
5use vector_config::{configurable_component, impl_generate_config_from_default};
6
7use super::{
8 super::default_data_dir, AcknowledgementsConfig, LogSchema, Telemetry,
9 metrics_expiration::PerMetricSetExpiration, proxy::ProxyConfig,
10};
11use crate::serde::bool_or_struct;
12
13#[expect(
14 clippy::ref_option,
15 reason = "we have to follow the serde calling convention"
16)]
17fn is_default_buffer_utilization_ewma_half_life_seconds(value: &Option<f64>) -> bool {
18 value.is_none_or(|seconds| {
19 seconds == vector_buffers::topology::channel::DEFAULT_EWMA_HALF_LIFE_SECONDS
20 })
21}
22
23#[derive(Debug, Snafu)]
24pub(crate) enum DataDirError {
25 #[snafu(display("data_dir option required, but not given here or globally"))]
26 MissingDataDir,
27 #[snafu(display("data_dir {:?} does not exist", data_dir))]
28 DoesNotExist { data_dir: PathBuf },
29 #[snafu(display("data_dir {:?} is not writable", data_dir))]
30 NotWritable { data_dir: PathBuf },
31 #[snafu(display(
32 "Could not create subdirectory {:?} inside of data dir {:?}: {}",
33 subdir,
34 data_dir,
35 source
36 ))]
37 CouldNotCreate {
38 subdir: PathBuf,
39 data_dir: PathBuf,
40 source: std::io::Error,
41 },
42}
43
44#[configurable_component]
46#[derive(Clone, Debug, Copy, PartialEq, Eq, Default)]
47#[serde(rename_all = "lowercase")]
48pub enum WildcardMatching {
49 #[default]
51 Strict,
52
53 Relaxed,
55}
56
57#[configurable_component]
62#[derive(Clone, Debug, Default, PartialEq)]
63pub struct GlobalOptions {
64 #[serde(default = "crate::default_data_dir")]
71 #[configurable(metadata(docs::common = false))]
72 pub data_dir: Option<PathBuf>,
73
74 #[serde(skip_serializing_if = "crate::serde::is_default")]
79 #[configurable(metadata(docs::common = false, docs::required = false))]
80 pub wildcard_matching: Option<WildcardMatching>,
81
82 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
87 #[configurable(metadata(
88 docs::common = false,
89 docs::required = false,
90 docs::warnings = "These settings are ignored when `schema.log_namespace` is set to `true`."
91 ))]
92 pub log_schema: LogSchema,
93
94 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
99 #[configurable(metadata(docs::common = false, docs::required = false))]
100 pub telemetry: Telemetry,
101
102 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
111 #[configurable(metadata(docs::common = false))]
112 pub timezone: Option<TimeZone>,
113
114 #[configurable(derived)]
115 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
116 #[configurable(metadata(docs::common = false, docs::required = false))]
117 pub proxy: ProxyConfig,
118
119 #[serde(
126 default,
127 deserialize_with = "bool_or_struct",
128 skip_serializing_if = "crate::serde::is_default"
129 )]
130 #[configurable(metadata(docs::common = true, docs::required = false))]
131 pub acknowledgements: AcknowledgementsConfig,
132
133 #[configurable(deprecated)]
138 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
139 #[configurable(metadata(docs::hidden))]
140 pub expire_metrics: Option<Duration>,
141
142 #[serde(skip_serializing_if = "crate::serde::is_default")]
148 #[configurable(metadata(docs::common = false, docs::required = false))]
149 pub expire_metrics_secs: Option<f64>,
150
151 #[serde(skip_serializing_if = "crate::serde::is_default")]
155 pub expire_metrics_per_metric_set: Option<Vec<PerMetricSetExpiration>>,
156
157 #[serde(skip_serializing_if = "is_default_buffer_utilization_ewma_half_life_seconds")]
173 #[configurable(validation(range(min = 0.0)))]
174 #[configurable(metadata(docs::advanced))]
175 pub buffer_utilization_ewma_half_life_seconds: Option<f64>,
176
177 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
186 #[configurable(validation(range(min = 0.0, max = 1.0)))]
187 #[configurable(metadata(docs::advanced))]
188 pub latency_ewma_alpha: Option<f64>,
189
190 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
196 pub metrics_storage_refresh_period: Option<f64>,
197}
198
199impl_generate_config_from_default!(GlobalOptions);
200
201impl GlobalOptions {
202 pub fn resolve_and_validate_data_dir(
209 &self,
210 local_data_dir: Option<&PathBuf>,
211 ) -> crate::Result<PathBuf> {
212 let data_dir = local_data_dir
213 .or(self.data_dir.as_ref())
214 .ok_or(DataDirError::MissingDataDir)
215 .map_err(Box::new)?
216 .clone();
217 if !data_dir.exists() {
218 return Err(DataDirError::DoesNotExist { data_dir }.into());
219 }
220 let readonly =
221 std::fs::metadata(&data_dir).map_or(true, |meta| meta.permissions().readonly());
222 if readonly {
223 return Err(DataDirError::NotWritable { data_dir }.into());
224 }
225 Ok(data_dir)
226 }
227
228 pub fn resolve_and_make_data_subdir(
235 &self,
236 local: Option<&PathBuf>,
237 subdir: &str,
238 ) -> crate::Result<PathBuf> {
239 let data_dir = self.resolve_and_validate_data_dir(local)?;
240
241 let mut data_subdir = data_dir.clone();
242 data_subdir.push(subdir);
243
244 DirBuilder::new()
245 .recursive(true)
246 .create(&data_subdir)
247 .with_context(|_| CouldNotCreateSnafu { subdir, data_dir })?;
248 Ok(data_subdir)
249 }
250
251 pub fn merge(&self, with: Self) -> Result<Self, Vec<String>> {
258 let mut errors = Vec::new();
259
260 if conflicts(
261 self.wildcard_matching.as_ref(),
262 with.wildcard_matching.as_ref(),
263 ) {
264 errors.push("conflicting values for 'wildcard_matching' found".to_owned());
265 }
266
267 if conflicts(self.proxy.http.as_ref(), with.proxy.http.as_ref()) {
268 errors.push("conflicting values for 'proxy.http' found".to_owned());
269 }
270
271 if conflicts(self.proxy.https.as_ref(), with.proxy.https.as_ref()) {
272 errors.push("conflicting values for 'proxy.https' found".to_owned());
273 }
274
275 if !self.proxy.no_proxy.is_empty() && !with.proxy.no_proxy.is_empty() {
276 errors.push("conflicting values for 'proxy.no_proxy' found".to_owned());
277 }
278
279 if conflicts(self.timezone.as_ref(), with.timezone.as_ref()) {
280 errors.push("conflicting values for 'timezone' found".to_owned());
281 }
282
283 if conflicts(
284 self.acknowledgements.enabled.as_ref(),
285 with.acknowledgements.enabled.as_ref(),
286 ) {
287 errors.push("conflicting values for 'acknowledgements' found".to_owned());
288 }
289
290 if conflicts(self.expire_metrics.as_ref(), with.expire_metrics.as_ref()) {
291 errors.push("conflicting values for 'expire_metrics' found".to_owned());
292 }
293
294 if conflicts(
295 self.expire_metrics_secs.as_ref(),
296 with.expire_metrics_secs.as_ref(),
297 ) {
298 errors.push("conflicting values for 'expire_metrics_secs' found".to_owned());
299 }
300
301 let data_dir = if self.data_dir.is_none() || self.data_dir == default_data_dir() {
302 with.data_dir
303 } else if with.data_dir != default_data_dir() && self.data_dir != with.data_dir {
304 errors.push("conflicting values for 'data_dir' found".to_owned());
307 None
308 } else {
309 self.data_dir.clone()
310 };
311
312 let mut log_schema = self.log_schema.clone();
315 if let Err(merge_errors) = log_schema.merge(&with.log_schema) {
316 errors.extend(merge_errors);
317 }
318
319 let mut telemetry = self.telemetry.clone();
320 telemetry.merge(&with.telemetry);
321
322 let merged_expire_metrics_per_metric_set = match (
323 &self.expire_metrics_per_metric_set,
324 &with.expire_metrics_per_metric_set,
325 ) {
326 (Some(a), Some(b)) => Some(a.iter().chain(b).cloned().collect()),
327 (Some(a), None) => Some(a.clone()),
328 (None, Some(b)) => Some(b.clone()),
329 (None, None) => None,
330 };
331
332 if errors.is_empty() {
333 Ok(Self {
334 data_dir,
335 wildcard_matching: self.wildcard_matching.or(with.wildcard_matching),
336 log_schema,
337 telemetry,
338 acknowledgements: self.acknowledgements.merge_default(&with.acknowledgements),
339 timezone: self.timezone.or(with.timezone),
340 proxy: self.proxy.merge(&with.proxy),
341 expire_metrics: self.expire_metrics.or(with.expire_metrics),
342 expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs),
343 expire_metrics_per_metric_set: merged_expire_metrics_per_metric_set,
344 buffer_utilization_ewma_half_life_seconds: self
345 .buffer_utilization_ewma_half_life_seconds
346 .or(with.buffer_utilization_ewma_half_life_seconds),
347 latency_ewma_alpha: self.latency_ewma_alpha.or(with.latency_ewma_alpha),
348 metrics_storage_refresh_period: self
349 .metrics_storage_refresh_period
350 .or(with.metrics_storage_refresh_period),
351 })
352 } else {
353 Err(errors)
354 }
355 }
356
357 pub fn timezone(&self) -> TimeZone {
359 self.timezone.unwrap_or(TimeZone::Local)
360 }
361
362 pub fn diff(&self, other: &Self) -> Result<Vec<String>, serde_json::Error> {
376 let old_value = serde_json::to_value(self)?;
377 let new_value = serde_json::to_value(other)?;
378
379 let serde_json::Value::Object(old_map) = old_value else {
380 return Ok(vec![]);
381 };
382 let serde_json::Value::Object(new_map) = new_value else {
383 return Ok(vec![]);
384 };
385
386 Ok(old_map
387 .iter()
388 .filter_map(|(k, v_old)| match new_map.get(k) {
389 Some(v_new) if v_new != v_old => Some(k.clone()),
390 _ => None,
391 })
392 .collect())
393 }
394}
395
396fn conflicts<T: PartialEq>(this: Option<&T>, that: Option<&T>) -> bool {
397 matches!((this, that), (Some(this), Some(that)) if this != that)
398}
399
400#[cfg(test)]
401mod tests {
402 use std::fmt::Debug;
403
404 use chrono_tz::Tz;
405
406 use super::*;
407
408 #[test]
409 fn merges_data_dir() {
410 let merge = |a, b| merge("data_dir", a, b, |result| result.data_dir);
411
412 assert_eq!(merge(None, None), Ok(default_data_dir()));
413 assert_eq!(merge(Some("/test1"), None), Ok(Some("/test1".into())));
414 assert_eq!(merge(None, Some("/test2")), Ok(Some("/test2".into())));
415 assert_eq!(
416 merge(Some("/test3"), Some("/test3")),
417 Ok(Some("/test3".into()))
418 );
419 assert_eq!(
420 merge(Some("/test4"), Some("/test5")),
421 Err(vec!["conflicting values for 'data_dir' found".into()])
422 );
423 }
424
425 #[test]
426 fn merges_timezones() {
427 let merge = |a, b| merge("timezone", a, b, |result| result.timezone());
428
429 assert_eq!(merge(None, None), Ok(TimeZone::Local));
430 assert_eq!(merge(Some("local"), None), Ok(TimeZone::Local));
431 assert_eq!(merge(None, Some("local")), Ok(TimeZone::Local));
432 assert_eq!(merge(Some("local"), Some("local")), Ok(TimeZone::Local),);
433 assert_eq!(merge(Some("UTC"), None), Ok(TimeZone::Named(Tz::UTC)));
434 assert_eq!(
435 merge(None, Some("EST5EDT")),
436 Ok(TimeZone::Named(Tz::EST5EDT))
437 );
438 assert_eq!(
439 merge(Some("UTC"), Some("UTC")),
440 Ok(TimeZone::Named(Tz::UTC))
441 );
442 assert_eq!(
443 merge(Some("CST6CDT"), Some("GMT")),
444 Err(vec!["conflicting values for 'timezone' found".into()])
445 );
446 }
447
448 #[test]
449 fn merges_proxy() {
450 let merge = |a, b| merge("proxy.http", a, b, |result| result.proxy.http);
453
454 assert_eq!(merge(None, None), Ok(None));
455 assert_eq!(merge(Some("test1"), None), Ok(Some("test1".into())));
456 assert_eq!(merge(None, Some("test2")), Ok(Some("test2".into())));
457 assert_eq!(
458 merge(Some("test3"), Some("test3")),
459 Ok(Some("test3".into()))
460 );
461 assert_eq!(
462 merge(Some("test4"), Some("test5")),
463 Err(vec!["conflicting values for 'proxy.http' found".into()])
464 );
465 }
466
467 #[test]
468 fn merges_acknowledgements() {
469 let merge = |a, b| merge("acknowledgements", a, b, |result| result.acknowledgements);
470
471 assert_eq!(merge(None, None), Ok(None.into()));
472 assert_eq!(merge(Some(false), None), Ok(false.into()));
473 assert_eq!(merge(Some(true), None), Ok(true.into()));
474 assert_eq!(merge(None, Some(false)), Ok(false.into()));
475 assert_eq!(merge(None, Some(true)), Ok(true.into()));
476 assert_eq!(merge(Some(false), Some(false)), Ok(false.into()));
477 assert_eq!(merge(Some(true), Some(true)), Ok(true.into()));
478 assert_eq!(
479 merge(Some(false), Some(true)),
480 Err(vec![
481 "conflicting values for 'acknowledgements' found".into()
482 ])
483 );
484 assert_eq!(
485 merge(Some(true), Some(false)),
486 Err(vec![
487 "conflicting values for 'acknowledgements' found".into()
488 ])
489 );
490 }
491
492 #[test]
493 fn merges_expire_metrics() {
494 let merge = |a, b| {
495 merge("expire_metrics_secs", a, b, |result| {
496 result.expire_metrics_secs
497 })
498 };
499
500 assert_eq!(merge(None, None), Ok(None));
501 assert_eq!(merge(Some(1.0), None), Ok(Some(1.0)));
502 assert_eq!(merge(None, Some(2.0)), Ok(Some(2.0)));
503 assert_eq!(merge(Some(3.0), Some(3.0)), Ok(Some(3.0)));
504 assert_eq!(
505 merge(Some(4.0), Some(5.0)),
506 Err(vec![
507 "conflicting values for 'expire_metrics_secs' found".into()
508 ])
509 );
510 }
511
512 #[test]
513 fn diff_detects_changed_keys() {
514 let old = GlobalOptions {
515 data_dir: Some(std::path::PathBuf::from("/path1")),
516 ..Default::default()
517 };
518 let new = GlobalOptions {
519 data_dir: Some(std::path::PathBuf::from("/path2")),
520 ..Default::default()
521 };
522 assert_eq!(
523 old.diff(&new).expect("diff failed"),
524 vec!["data_dir".to_string()]
525 );
526 }
527
528 fn merge<P: Debug, T>(
529 name: &str,
530 dd1: Option<P>,
531 dd2: Option<P>,
532 result: impl Fn(GlobalOptions) -> T,
533 ) -> Result<T, Vec<String>> {
534 make_config(name, dd1)
536 .merge(make_config(name, dd2))
537 .map(result)
538 }
539
540 fn make_config<P: Debug>(name: &str, value: Option<P>) -> GlobalOptions {
541 toml::from_str(&value.map_or(String::new(), |value| format!(r"{name} = {value:?}")))
542 .unwrap()
543 }
544}