1#![allow(missing_docs)]
2
3use std::{
4 num::{NonZeroU64, NonZeroUsize},
5 path::PathBuf,
6};
7
8use clap::{ArgAction, CommandFactory, FromArgMatches, Parser};
9
10#[cfg(windows)]
11use crate::service;
12#[cfg(feature = "api-client")]
13use crate::tap;
14#[cfg(feature = "top")]
15use crate::top;
16
17use crate::{
18 completion, config, convert_config, generate, generate_schema, get_version, graph, list,
19 signal, unit_test, validate,
20};
21
22#[derive(Parser, Debug)]
23#[command(rename_all = "kebab-case")]
24pub struct Opts {
25 #[command(flatten)]
26 pub root: RootOpts,
27
28 #[command(subcommand)]
29 pub sub_command: Option<SubCommand>,
30}
31
32impl Opts {
33 pub fn get_matches() -> Result<Self, clap::Error> {
34 let version = get_version();
35 let app = Opts::command().version(version);
36 Opts::from_arg_matches(&app.get_matches())
37 }
38
39 pub const fn log_level(&self) -> &'static str {
40 let (quiet_level, verbose_level) = match self.sub_command {
41 Some(SubCommand::Validate(_))
42 | Some(SubCommand::Graph(_))
43 | Some(SubCommand::Generate(_))
44 | Some(SubCommand::ConvertConfig(_))
45 | Some(SubCommand::List(_))
46 | Some(SubCommand::Test(_)) => {
47 if self.root.verbose == 0 {
48 (self.root.quiet + 1, self.root.verbose)
49 } else {
50 (self.root.quiet, self.root.verbose - 1)
51 }
52 }
53 _ => (self.root.quiet, self.root.verbose),
54 };
55 match quiet_level {
56 0 => match verbose_level {
57 0 => "info",
58 1 => "debug",
59 2..=255 => "trace",
60 },
61 1 => "warn",
62 2 => "error",
63 3..=255 => "off",
64 }
65 }
66}
67
68#[derive(Parser, Debug)]
69#[command(rename_all = "kebab-case")]
70pub struct RootOpts {
71 #[arg(
76 id = "config",
77 short,
78 long,
79 env = "VECTOR_CONFIG",
80 value_delimiter(',')
81 )]
82 pub config_paths: Vec<PathBuf>,
83
84 #[arg(
89 id = "config-dir",
90 short = 'C',
91 long,
92 env = "VECTOR_CONFIG_DIR",
93 value_delimiter(',')
94 )]
95 pub config_dirs: Vec<PathBuf>,
96
97 #[arg(
100 id = "config-toml",
101 long,
102 env = "VECTOR_CONFIG_TOML",
103 value_delimiter(',')
104 )]
105 pub config_paths_toml: Vec<PathBuf>,
106
107 #[arg(
110 id = "config-json",
111 long,
112 env = "VECTOR_CONFIG_JSON",
113 value_delimiter(',')
114 )]
115 pub config_paths_json: Vec<PathBuf>,
116
117 #[arg(
120 id = "config-yaml",
121 long,
122 env = "VECTOR_CONFIG_YAML",
123 value_delimiter(',')
124 )]
125 pub config_paths_yaml: Vec<PathBuf>,
126
127 #[arg(short, long, env = "VECTOR_REQUIRE_HEALTHY")]
129 pub require_healthy: Option<bool>,
130
131 #[arg(short, long, env = "VECTOR_THREADS")]
133 pub threads: Option<usize>,
134
135 #[arg(long, env = "VECTOR_CHUNK_SIZE_EVENTS")]
138 pub chunk_size_events: Option<NonZeroUsize>,
139
140 #[arg(short, long, action = ArgAction::Count)]
142 pub verbose: u8,
143
144 #[arg(short, long, action = ArgAction::Count)]
146 pub quiet: u8,
147
148 #[arg(
150 long,
151 env = "VECTOR_DISABLE_ENV_VAR_INTERPOLATION",
152 default_value = "false"
153 )]
154 pub disable_env_var_interpolation: bool,
155
156 #[arg(long, default_value = "text", env = "VECTOR_LOG_FORMAT")]
158 pub log_format: LogFormat,
159
160 #[arg(long, default_value = "auto", env = "VECTOR_COLOR")]
168 pub color: Color,
169
170 #[arg(short, long, env = "VECTOR_WATCH_CONFIG")]
172 pub watch_config: bool,
173
174 #[arg(
183 long,
184 default_value = "recommended",
185 env = "VECTOR_WATCH_CONFIG_METHOD"
186 )]
187 pub watch_config_method: WatchConfigMethod,
188
189 #[arg(
193 long,
194 env = "VECTOR_WATCH_CONFIG_POLL_INTERVAL_SECONDS",
195 default_value = "30"
196 )]
197 pub watch_config_poll_interval_seconds: NonZeroU64,
198
199 #[arg(
215 short,
216 long,
217 env = "VECTOR_INTERNAL_LOG_RATE_LIMIT",
218 default_value = "10"
219 )]
220 pub internal_log_rate_limit: u64,
221
222 #[arg(long, env = "VECTOR_INTERNAL_LOGS_SOURCE_RATE_LIMIT")]
230 pub internal_logs_source_rate_limit: Option<NonZeroU64>,
231
232 #[arg(
236 long,
237 default_value = "60",
238 env = "VECTOR_GRACEFUL_SHUTDOWN_LIMIT_SECS",
239 group = "graceful-shutdown-limit"
240 )]
241 pub graceful_shutdown_limit_secs: NonZeroU64,
242
243 #[arg(
247 long,
248 default_value = "false",
249 env = "VECTOR_NO_GRACEFUL_SHUTDOWN_LIMIT",
250 group = "graceful-shutdown-limit"
251 )]
252 pub no_graceful_shutdown_limit: bool,
253
254 #[cfg(all(unix, feature = "tikv-jemallocator"))]
256 #[arg(long, env = "ALLOCATION_TRACING", default_value = "false")]
257 pub allocation_tracing: bool,
258
259 #[cfg(all(unix, feature = "tikv-jemallocator"))]
261 #[arg(
262 long,
263 env = "ALLOCATION_TRACING_REPORTING_INTERVAL_MS",
264 default_value = "5000"
265 )]
266 pub allocation_tracing_reporting_interval_ms: u64,
267
268 #[arg(long, env = "VECTOR_OPENSSL_NO_PROBE", default_value = "false")]
274 pub openssl_no_probe: bool,
275
276 #[arg(long, env = "VECTOR_ALLOW_EMPTY_CONFIG", default_value = "false")]
281 pub allow_empty_config: bool,
282
283 #[arg(
292 long,
293 env = "VECTOR_MAX_DECOMPRESSED_SIZE_BYTES",
294 default_value = "104857600"
295 )]
296 pub max_decompressed_size_bytes: usize,
297
298 #[cfg(unix)]
304 #[arg(long, env = "VECTOR_RAISE_FD_LIMIT", default_value = "false")]
305 pub raise_fd_limit: bool,
306}
307
308impl RootOpts {
309 pub fn config_paths_with_formats(&self) -> Vec<config::ConfigPath> {
311 config::merge_path_lists(vec![
312 (&self.config_paths, None),
313 (&self.config_paths_toml, Some(config::Format::Toml)),
314 (&self.config_paths_json, Some(config::Format::Json)),
315 (&self.config_paths_yaml, Some(config::Format::Yaml)),
316 ])
317 .map(|(path, hint)| config::ConfigPath::File(path, hint))
318 .chain(
319 self.config_dirs
320 .iter()
321 .map(|dir| config::ConfigPath::Dir(dir.to_path_buf())),
322 )
323 .collect()
324 }
325
326 pub fn init_global(&self) {
327 if !self.openssl_no_probe {
328 unsafe {
329 openssl_probe::init_openssl_env_vars();
330 }
331 }
332
333 crate::metrics::init_global().expect("metrics initialization failed");
334 }
335}
336
337#[cfg(unix)]
347pub(crate) fn raise_file_descriptor_limit() {
348 use nix::sys::resource::{Resource, getrlimit, setrlimit};
349 use tracing::{info, warn};
350
351 let (soft, hard) = match getrlimit(Resource::RLIMIT_NOFILE) {
352 Ok(limits) => limits,
353 Err(err) => {
354 warn!(message = "Failed to get file descriptor limit.", %err);
355 return;
356 }
357 };
358
359 if soft >= hard {
360 return; }
362
363 if setrlimit(Resource::RLIMIT_NOFILE, hard, hard).is_ok() {
365 info!(
366 message = "Raised file descriptor limit.",
367 from = soft,
368 to = hard,
369 );
370 return;
371 }
372
373 #[cfg(target_os = "macos")]
376 {
377 if let Some(maxfiles) = macos_maxfilesperproc()
378 && maxfiles > soft
379 && setrlimit(Resource::RLIMIT_NOFILE, maxfiles, hard).is_ok()
380 {
381 info!(
382 message = "Raised file descriptor limit.",
383 from = soft,
384 to = maxfiles,
385 );
386 return;
387 }
388 }
389
390 warn!(
391 message = "Failed to raise file descriptor limit.",
392 current = soft,
393 attempted = hard,
394 );
395}
396
397#[cfg(target_os = "macos")]
399fn macos_maxfilesperproc() -> Option<libc::rlim_t> {
400 let mut maxfiles: libc::c_int = 0;
401 let mut len = std::mem::size_of::<libc::c_int>() as libc::size_t;
402 let ret = unsafe {
405 libc::sysctlbyname(
406 c"kern.maxfilesperproc".as_ptr(),
407 &mut maxfiles as *mut libc::c_int as *mut libc::c_void,
408 &mut len,
409 std::ptr::null_mut(),
410 0,
411 )
412 };
413 if ret == 0 && maxfiles > 0 {
414 Some(maxfiles as libc::rlim_t)
415 } else {
416 None
417 }
418}
419
420#[derive(Parser, Debug)]
421#[command(rename_all = "kebab-case")]
422pub enum SubCommand {
423 Validate(validate::Opts),
425
426 ConvertConfig(convert_config::Opts),
433
434 Generate(generate::Opts),
436
437 GenerateSchema(generate_schema::Opts),
446
447 #[command(hide = true)]
449 Completion(completion::Opts),
450
451 #[command(hide = true)]
453 Config(config::Opts),
454
455 List(list::Opts),
457
458 Test(unit_test::Opts),
461
462 Graph(graph::Opts),
464
465 #[cfg(feature = "top")]
467 Top(top::Opts),
468
469 #[cfg(feature = "api-client")]
471 Tap(tap::Opts),
472
473 #[cfg(windows)]
475 Service(service::Opts),
476
477 Vrl(vrl::cli::Opts),
479}
480
481impl SubCommand {
482 pub async fn execute(
483 &self,
484 mut signals: signal::SignalPair,
485 color: bool,
486 ) -> exitcode::ExitCode {
487 match self {
488 Self::Completion(s) => completion::cmd(s),
489 Self::Config(c) => config::cmd(c),
490 Self::ConvertConfig(opts) => convert_config::cmd(opts),
491 Self::Generate(g) => generate::cmd(g),
492 Self::GenerateSchema(opts) => generate_schema::cmd(opts),
493 Self::Graph(g) => graph::cmd(g),
494 Self::List(l) => list::cmd(l),
495 #[cfg(windows)]
496 Self::Service(s) => service::cmd(s),
497 #[cfg(feature = "api-client")]
498 Self::Tap(t) => tap::cmd(t, signals.receiver).await,
499 Self::Test(t) => unit_test::cmd(t, &mut signals.handler).await,
500 #[cfg(feature = "top")]
501 Self::Top(t) => top::cmd(t).await,
502 Self::Validate(v) => validate::validate(v, color).await,
503 Self::Vrl(s) => vrl::cli::cmd::cmd(s, vector_vrl_functions::all()),
504 }
505 }
506}
507
508#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
509pub enum Color {
510 Auto,
511 Always,
512 Never,
513}
514
515impl Color {
516 pub fn use_color(&self) -> bool {
517 match self {
518 #[cfg(unix)]
519 Color::Auto => {
520 use std::io::IsTerminal;
521 std::io::stdout().is_terminal()
522 }
523 #[cfg(windows)]
524 Color::Auto => false, Color::Always => true,
526 Color::Never => false,
527 }
528 }
529}
530
531#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
532pub enum LogFormat {
533 Text,
534 Json,
535}
536
537#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
538pub enum WatchConfigMethod {
539 Recommended,
541 Poll,
544}
545
546pub fn handle_config_errors(errors: Vec<String>) -> exitcode::ExitCode {
547 for error in errors {
548 error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
549 }
550
551 exitcode::CONFIG
552}
553
554#[cfg(test)]
555mod tests {
556 #[cfg(unix)]
557 fn run_in_subprocess(test_name: &str) {
558 let exe = std::env::current_exe().unwrap();
559 let output = std::process::Command::new(exe)
560 .env("__VECTOR_SUBPROCESS_TEST", "1")
561 .args(["--exact", test_name, "--nocapture"])
562 .output()
563 .unwrap();
564 assert!(
565 output.status.success(),
566 "subprocess test failed:\nstdout: {}\nstderr: {}",
567 String::from_utf8_lossy(&output.stdout),
568 String::from_utf8_lossy(&output.stderr),
569 );
570 }
571
572 #[test]
573 #[cfg(unix)]
574 fn test_raise_file_descriptor_limit() {
575 if std::env::var("__VECTOR_SUBPROCESS_TEST").is_err() {
576 run_in_subprocess("cli::tests::test_raise_file_descriptor_limit");
577 return;
578 }
579
580 use nix::sys::resource::{Resource, getrlimit, setrlimit};
581
582 let (original_soft, hard) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();
583 let lowered = std::cmp::min(original_soft, 256);
584 if lowered < hard {
585 setrlimit(Resource::RLIMIT_NOFILE, lowered, hard).unwrap();
586
587 let (soft_before, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();
588 assert_eq!(soft_before, lowered);
589
590 super::raise_file_descriptor_limit();
591
592 let (soft_after, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();
593 assert!(
594 soft_after > lowered,
595 "Expected soft limit to be raised above {lowered}, got {soft_after}"
596 );
597 }
598 }
599
600 #[test]
601 #[cfg(unix)]
602 fn test_raise_file_descriptor_limit_already_at_max() {
603 if std::env::var("__VECTOR_SUBPROCESS_TEST").is_err() {
604 run_in_subprocess("cli::tests::test_raise_file_descriptor_limit_already_at_max");
605 return;
606 }
607
608 use nix::sys::resource::{Resource, getrlimit, setrlimit};
609
610 let (_, hard) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();
611
612 if setrlimit(Resource::RLIMIT_NOFILE, hard, hard).is_err() {
613 #[cfg(target_os = "macos")]
614 if let Some(maxfiles) = super::macos_maxfilesperproc() {
615 setrlimit(Resource::RLIMIT_NOFILE, maxfiles, hard).ok();
616 }
617 }
618
619 let (soft_before, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();
620
621 super::raise_file_descriptor_limit();
622
623 let (soft_after, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();
624 assert_eq!(soft_before, soft_after);
625 }
626
627 #[test]
628 #[cfg(target_os = "macos")]
629 fn test_macos_maxfilesperproc_returns_positive() {
630 let result = super::macos_maxfilesperproc();
631 assert!(
632 result.is_some(),
633 "macos_maxfilesperproc() should return Some on macOS"
634 );
635 assert!(
636 result.unwrap() > 0,
637 "kern.maxfilesperproc should be positive"
638 );
639 }
640}