1#![allow(missing_docs)]
2#[cfg(unix)]
3use std::os::unix::process::ExitStatusExt;
4#[cfg(windows)]
5use std::os::windows::process::ExitStatusExt;
6use std::{
7 num::{NonZeroU64, NonZeroUsize},
8 path::PathBuf,
9 process::ExitStatus,
10 sync::atomic::{AtomicUsize, Ordering},
11 time::Duration,
12};
13
14use exitcode::ExitCode;
15use futures::StreamExt;
16use tokio::{
17 runtime::{self, Handle, Runtime},
18 sync::{MutexGuard, broadcast::error::RecvError},
19};
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22#[cfg(feature = "api")]
23use crate::api;
24#[cfg(feature = "api")]
25use crate::internal_events::ApiStarted;
26use crate::{
27 cli::{LogFormat, Opts, RootOpts, WatchConfigMethod, handle_config_errors},
28 config::{self, ComponentConfig, ComponentType, Config, ConfigPath},
29 extra_context::ExtraContext,
30 heartbeat,
31 internal_events::{
32 VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped, VectorStopping,
33 },
34 signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
35 topology::{
36 ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
37 TopologyController,
38 },
39 trace,
40};
41
42static WORKER_THREADS: AtomicUsize = AtomicUsize::new(0);
43
44pub fn worker_threads() -> Option<NonZeroUsize> {
45 NonZeroUsize::new(WORKER_THREADS.load(Ordering::Relaxed))
46}
47
48pub struct ApplicationConfig {
49 pub config_paths: Vec<config::ConfigPath>,
50 pub topology: RunningTopology,
51 pub graceful_crash_receiver: ShutdownErrorReceiver,
52 pub internal_topologies: Vec<RunningTopology>,
53 #[cfg(feature = "api")]
54 pub api: config::api::Options,
55 pub extra_context: ExtraContext,
56}
57
58pub struct Application {
59 pub root_opts: RootOpts,
60 pub config: ApplicationConfig,
61 pub signals: SignalPair,
62}
63
64impl ApplicationConfig {
65 pub async fn from_opts(
66 opts: &RootOpts,
67 signal_handler: &mut SignalHandler,
68 extra_context: ExtraContext,
69 ) -> Result<Self, ExitCode> {
70 let config_paths = opts.config_paths_with_formats();
71
72 let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
73 .then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));
74
75 let watcher_conf = if opts.watch_config {
76 Some(watcher_config(
77 opts.watch_config_method,
78 opts.watch_config_poll_interval_seconds,
79 ))
80 } else {
81 None
82 };
83
84 let config = load_configs(
85 &config_paths,
86 watcher_conf,
87 opts.require_healthy,
88 opts.allow_empty_config,
89 !opts.disable_env_var_interpolation,
90 graceful_shutdown_duration,
91 signal_handler,
92 )
93 .await?;
94
95 Self::from_config(config_paths, config, extra_context).await
96 }
97
98 pub async fn from_config(
99 config_paths: Vec<ConfigPath>,
100 config: Config,
101 extra_context: ExtraContext,
102 ) -> Result<Self, ExitCode> {
103 #[cfg(feature = "api")]
104 let api = config.api;
105
106 let (topology, graceful_crash_receiver) =
107 RunningTopology::start_init_validated(config, extra_context.clone())
108 .await
109 .ok_or(exitcode::CONFIG)?;
110
111 Ok(Self {
112 config_paths,
113 topology,
114 graceful_crash_receiver,
115 internal_topologies: Vec::new(),
116 #[cfg(feature = "api")]
117 api,
118 extra_context,
119 })
120 }
121
122 pub async fn add_internal_config(
123 &mut self,
124 config: Config,
125 extra_context: ExtraContext,
126 ) -> Result<(), ExitCode> {
127 let Some((topology, _)) =
128 RunningTopology::start_init_validated(config, extra_context).await
129 else {
130 return Err(exitcode::CONFIG);
131 };
132 self.internal_topologies.push(topology);
133 Ok(())
134 }
135
136 #[cfg(feature = "api")]
138 pub fn setup_api(&self, handle: &Handle) -> Option<api::GrpcServer> {
139 if self.api.enabled {
140 let api_server = handle.block_on(api::GrpcServer::start(
142 self.topology.config(),
143 self.topology.watch(),
144 ));
145 match api_server {
146 Ok(server) => {
147 emit!(ApiStarted {
148 addr: server.addr()
149 });
150 Some(server)
151 }
152 Err(error) => {
153 let error = error.to_string();
154 error!(
155 message = "An error occurred that Vector couldn't handle.",
156 %error,
157 internal_log_rate_limit = false
158 );
159 _ = self
162 .topology
163 .abort_tx
164 .send(crate::signal::ShutdownError::ApiFailed { error });
165 None
166 }
167 }
168 } else {
169 info!(
170 message = "API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`."
171 );
172 None
173 }
174 }
175}
176
177impl Application {
178 pub fn run(extra_context: ExtraContext) -> ExitStatus {
179 let (runtime, app) =
180 Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));
181
182 runtime.block_on(app.run())
183 }
184
185 pub fn prepare_start(
186 extra_context: ExtraContext,
187 ) -> Result<(Runtime, StartedApplication), ExitCode> {
188 Self::prepare(extra_context)
189 .and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
190 }
191
192 pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
193 let opts = Opts::get_matches().map_err(|error| {
194 _ = error.print();
196 exitcode::USAGE
197 })?;
198
199 Self::prepare_from_opts(opts, extra_context)
200 }
201
202 pub fn prepare_from_opts(
203 opts: Opts,
204 extra_context: ExtraContext,
205 ) -> Result<(Runtime, Self), ExitCode> {
206 opts.root.init_global();
207
208 #[cfg(feature = "sources-utils-http-encoding")]
209 crate::sources::util::http::set_max_decompressed_size_bytes(
210 opts.root.max_decompressed_size_bytes,
211 );
212
213 let color = opts.root.color.use_color();
214
215 init_logging(
216 color,
217 opts.root.log_format,
218 opts.log_level(),
219 opts.root.internal_log_rate_limit,
220 opts.root.internal_logs_source_rate_limit,
221 );
222
223 #[cfg(unix)]
224 if opts.root.raise_fd_limit {
225 crate::cli::raise_file_descriptor_limit();
226 }
227
228 crate::set_global_color(color);
230
231 if opts.root.openssl_no_probe {
233 debug!(
234 message = "Disabled probing and configuration of root certificate locations on the system for OpenSSL."
235 );
236 }
237
238 let runtime = build_runtime(
239 opts.root.threads,
240 opts.root.chunk_size_events,
241 "vector-worker",
242 )?;
243
244 let mut signals = SignalPair::new(&runtime);
246
247 if let Some(sub_command) = &opts.sub_command {
248 return Err(runtime.block_on(sub_command.execute(signals, color)));
249 }
250
251 let config = runtime.block_on(ApplicationConfig::from_opts(
252 &opts.root,
253 &mut signals.handler,
254 extra_context,
255 ))?;
256
257 Ok((
258 runtime,
259 Self {
260 root_opts: opts.root,
261 config,
262 signals,
263 },
264 ))
265 }
266
267 pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
268 crate::trace::stop_early_buffering();
271
272 emit!(VectorStarted);
273 handle.spawn(heartbeat::heartbeat());
274
275 let Self {
276 root_opts,
277 config,
278 signals,
279 } = self;
280
281 #[cfg(feature = "api")]
282 let api_server = config.setup_api(handle);
283
284 let topology_controller = SharedTopologyController::new(TopologyController {
285 #[cfg(feature = "api")]
286 api_server,
287 topology: config.topology,
288 config_paths: config.config_paths.clone(),
289 require_healthy: root_opts.require_healthy,
290 extra_context: config.extra_context,
291 });
292
293 Ok(StartedApplication {
294 config_paths: config.config_paths,
295 internal_topologies: config.internal_topologies,
296 graceful_crash_receiver: config.graceful_crash_receiver,
297 signals,
298 topology_controller,
299 allow_empty_config: root_opts.allow_empty_config,
300 interpolate_env: !root_opts.disable_env_var_interpolation,
301 })
302 }
303}
304
305pub struct StartedApplication {
306 pub config_paths: Vec<ConfigPath>,
307 pub internal_topologies: Vec<RunningTopology>,
308 pub graceful_crash_receiver: ShutdownErrorReceiver,
309 pub signals: SignalPair,
310 pub topology_controller: SharedTopologyController,
311 pub allow_empty_config: bool,
312 pub interpolate_env: bool,
313}
314
315impl StartedApplication {
316 pub async fn run(self) -> ExitStatus {
317 self.main().await.shutdown().await
318 }
319
320 pub async fn main(self) -> FinishedApplication {
321 let Self {
322 config_paths,
323 graceful_crash_receiver,
324 signals,
325 topology_controller,
326 internal_topologies,
327 allow_empty_config,
328 interpolate_env,
329 } = self;
330
331 let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
332
333 let mut signal_handler = signals.handler;
334 let mut signal_rx = signals.receiver;
335
336 let signal = loop {
337 let has_sources = !topology_controller.lock().await.topology.config.is_empty();
338 tokio::select! {
339 signal = signal_rx.recv() => if let Some(signal) = handle_signal(
340 signal,
341 &topology_controller,
342 &config_paths,
343 &mut signal_handler,
344 allow_empty_config,
345 interpolate_env,
346 ).await {
347 break signal;
348 },
349 error = graceful_crash.next() => break SignalTo::Shutdown(error),
351 _ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
352 info!("All sources have finished.");
353 break SignalTo::Shutdown(None)
354 } ,
355 else => unreachable!("Signal streams never end"),
356 }
357 };
358
359 FinishedApplication {
360 signal,
361 signal_rx,
362 topology_controller,
363 internal_topologies,
364 }
365 }
366}
367
368async fn handle_signal(
369 signal: Result<SignalTo, RecvError>,
370 topology_controller: &SharedTopologyController,
371 config_paths: &[ConfigPath],
372 signal_handler: &mut SignalHandler,
373 allow_empty_config: bool,
374 interpolate_env: bool,
375) -> Option<SignalTo> {
376 match signal {
377 Ok(SignalTo::ReloadComponents(components_to_reload)) => {
378 let mut topology_controller = topology_controller.lock().await;
379 topology_controller
380 .topology
381 .extend_reload_set(components_to_reload);
382
383 if let Some(paths) = config::process_paths(config_paths) {
385 topology_controller.config_paths = paths;
386 }
387
388 let new_config = config::load_from_paths_with_provider_and_secrets(
390 &topology_controller.config_paths,
391 signal_handler,
392 allow_empty_config,
393 interpolate_env,
394 )
395 .await;
396
397 reload_config_from_result(topology_controller, new_config).await
398 }
399 Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
400 let topology_controller = topology_controller.lock().await;
401 reload_config_from_result(topology_controller, config_builder.build()).await
402 }
403 Ok(SignalTo::ReloadFromDisk) => {
404 let mut topology_controller = topology_controller.lock().await;
405
406 if let Some(paths) = config::process_paths(config_paths) {
408 topology_controller.config_paths = paths;
409 }
410
411 let new_config = config::load_from_paths_with_provider_and_secrets(
413 &topology_controller.config_paths,
414 signal_handler,
415 allow_empty_config,
416 interpolate_env,
417 )
418 .await;
419
420 if let Ok(ref config) = new_config {
421 let transform_keys_to_reload = config.transform_keys_with_external_files();
423
424 if !transform_keys_to_reload.is_empty() {
426 info!(
427 message = "Reloading transforms with external files.",
428 count = transform_keys_to_reload.len()
429 );
430 topology_controller
431 .topology
432 .extend_reload_set(transform_keys_to_reload);
433 }
434 }
435
436 reload_config_from_result(topology_controller, new_config).await
437 }
438 Ok(SignalTo::ReloadEnrichmentTables) => {
439 let topology_controller = topology_controller.lock().await;
440
441 topology_controller
442 .topology
443 .reload_enrichment_tables()
444 .await;
445 None
446 }
447 Err(RecvError::Lagged(amt)) => {
448 warn!("Overflow, dropped {} signals.", amt);
449 None
450 }
451 Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
452 Ok(signal) => Some(signal),
453 }
454}
455
456async fn reload_config_from_result(
457 mut topology_controller: MutexGuard<'_, TopologyController>,
458 config: Result<Config, Vec<String>>,
459) -> Option<SignalTo> {
460 match config {
461 Ok(new_config) => match topology_controller.reload(new_config).await {
462 ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
463 _ => None,
464 },
465 Err(errors) => {
466 handle_config_errors(errors);
467 emit!(VectorConfigLoadError);
468 None
469 }
470 }
471}
472
473pub struct FinishedApplication {
474 pub signal: SignalTo,
475 pub signal_rx: SignalRx,
476 pub topology_controller: SharedTopologyController,
477 pub internal_topologies: Vec<RunningTopology>,
478}
479
480impl FinishedApplication {
481 pub async fn shutdown(self) -> ExitStatus {
482 let FinishedApplication {
483 signal,
484 signal_rx,
485 topology_controller,
486 internal_topologies,
487 } = self;
488
489 let topology_controller = topology_controller
492 .try_into_inner()
493 .expect("fail to unwrap topology controller")
494 .into_inner();
495
496 let status = match signal {
497 SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
498 SignalTo::Quit => Self::quit(),
499 _ => unreachable!(),
500 };
501
502 for topology in internal_topologies {
503 topology.stop().await;
504 }
505
506 status
507 }
508
509 async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
510 emit!(VectorStopping);
511 tokio::select! {
512 _ = topology_controller.stop() => {
513 emit!(VectorStopped);
514 ExitStatus::from_raw({
515 #[cfg(windows)]
516 {
517 exitcode::OK as u32
518 }
519 #[cfg(unix)]
520 exitcode::OK
521 })
522 }, _ = signal_rx.recv() => Self::quit(),
524 }
525 }
526
527 fn quit() -> ExitStatus {
528 emit!(VectorQuit);
530 ExitStatus::from_raw({
531 #[cfg(windows)]
532 {
533 exitcode::UNAVAILABLE as u32
534 }
535 #[cfg(unix)]
536 exitcode::OK
537 })
538 }
539}
540
541fn get_log_levels(default: &str) -> String {
542 std::env::var("VECTOR_LOG")
543 .or_else(|_| {
544 std::env::var("LOG").inspect(|_log| {
545 warn!(
546 message =
547 "DEPRECATED: Use of $LOG is deprecated. Please use $VECTOR_LOG instead."
548 );
549 })
550 })
551 .unwrap_or_else(|_| default.into())
552}
553
554pub fn build_runtime(
555 threads: Option<usize>,
556 chunk_size_events: Option<NonZeroUsize>,
557 thread_name: &str,
558) -> Result<Runtime, ExitCode> {
559 let mut rt_builder = runtime::Builder::new_multi_thread();
560 rt_builder.max_blocking_threads(20_000);
561 rt_builder.enable_all().thread_name(thread_name);
562
563 let threads = threads.unwrap_or_else(crate::num_threads);
564 if threads == 0 {
565 error!("The `threads` argument must be greater or equal to 1.");
566 return Err(exitcode::CONFIG);
567 }
568 WORKER_THREADS
569 .compare_exchange(0, threads, Ordering::Acquire, Ordering::Relaxed)
570 .unwrap_or_else(|_| panic!("double thread initialization"));
571 rt_builder.worker_threads(threads);
572
573 let chunk_size_events = chunk_size_events
574 .map(NonZeroUsize::get)
575 .unwrap_or(vector_lib::source_sender::DEFAULT_CHUNK_SIZE_EVENTS);
576
577 let Some(source_sender_buffer_size) = threads.checked_mul(chunk_size_events) else {
578 error!(
579 "The `chunk_size_events` argument is too large for the configured number of threads."
580 );
581 return Err(exitcode::CONFIG);
582 };
583 let Some(ready_array_capacity) =
584 chunk_size_events.checked_mul(crate::topology::builder::READY_ARRAY_CAPACITY_CHUNKS)
585 else {
586 error!("The `chunk_size_events` argument is too large.");
587 return Err(exitcode::CONFIG);
588 };
589
590 vector_lib::source_sender::set_chunk_size_events(chunk_size_events);
591 crate::topology::builder::set_source_sender_buffer_size(source_sender_buffer_size);
592 crate::topology::builder::set_ready_array_capacity(ready_array_capacity);
593
594 debug!(
595 message = "Building runtime.",
596 worker_threads = threads,
597 chunk_size_events
598 );
599 Ok(rt_builder.build().expect("Unable to create async runtime"))
600}
601
602pub async fn load_configs(
603 config_paths: &[ConfigPath],
604 watcher_conf: Option<config::watcher::WatcherConfig>,
605 require_healthy: Option<bool>,
606 allow_empty_config: bool,
607 interpolate_env: bool,
608 graceful_shutdown_duration: Option<Duration>,
609 signal_handler: &mut SignalHandler,
610) -> Result<Config, ExitCode> {
611 let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
612
613 let watched_paths = config_paths
614 .iter()
615 .map(<&PathBuf>::from)
616 .collect::<Vec<_>>();
617
618 info!(
619 message = "Loading configs.",
620 paths = ?watched_paths
621 );
622
623 let mut config = config::load_from_paths_with_provider_and_secrets(
624 &config_paths,
625 signal_handler,
626 allow_empty_config,
627 interpolate_env,
628 )
629 .await
630 .map_err(handle_config_errors)?;
631
632 let mut watched_component_paths = Vec::new();
633
634 if let Some(watcher_conf) = watcher_conf {
635 for (name, transform) in config.transforms() {
636 let files = transform.inner.files_to_watch();
637 let component_config = ComponentConfig::new(
638 files.into_iter().cloned().collect(),
639 name.clone(),
640 ComponentType::Transform,
641 );
642 watched_component_paths.push(component_config);
643 }
644
645 for (name, sink) in config.sinks() {
646 let files = sink.inner.files_to_watch();
647 let component_config = ComponentConfig::new(
648 files.into_iter().cloned().collect(),
649 name.clone(),
650 ComponentType::Sink,
651 );
652 watched_component_paths.push(component_config);
653 }
654
655 for (name, table) in config.enrichment_tables() {
656 let files = table.inner.files_to_watch();
657 let component_config = ComponentConfig::new(
658 files.clone().into_iter().cloned().collect(),
659 name.clone(),
660 ComponentType::EnrichmentTable,
661 );
662 watched_component_paths.push(component_config);
663 if table.as_sink(name).is_some() {
664 let sink_component_config = ComponentConfig::new(
665 files.into_iter().cloned().collect(),
666 name.clone(),
667 ComponentType::Sink,
668 );
669 watched_component_paths.push(sink_component_config);
670 }
671 }
672
673 info!(
674 message = "Starting watcher.",
675 paths = ?watched_paths
676 );
677 info!(
678 message = "Components to watch.",
679 paths = ?watched_component_paths
680 );
681
682 config::watcher::spawn_thread(
684 watcher_conf,
685 signal_handler.clone_tx(),
686 watched_paths,
687 watched_component_paths,
688 None,
689 )
690 .map_err(|error| {
691 error!(message = "Unable to start config watcher.", %error);
692 exitcode::CONFIG
693 })?;
694 }
695
696 config::init_log_schema(config.global.log_schema.clone(), true);
697 config::init_telemetry(config.global.telemetry.clone(), true);
698
699 if !config.healthchecks.enabled {
700 info!("Health checks are disabled.");
701 }
702 config.healthchecks.set_require_healthy(require_healthy);
703 config.graceful_shutdown_duration = graceful_shutdown_duration;
704
705 Ok(config)
706}
707
708pub fn init_logging(
709 color: bool,
710 format: LogFormat,
711 log_level: &str,
712 internal_log_rate_limit_secs: u64,
713 internal_logs_source_rate_limit_secs: Option<NonZeroU64>,
714) {
715 let level = get_log_levels(log_level);
716 let json = match format {
717 LogFormat::Text => false,
718 LogFormat::Json => true,
719 };
720
721 trace::init(
722 color,
723 json,
724 &level,
725 internal_log_rate_limit_secs,
726 internal_logs_source_rate_limit_secs,
727 );
728 debug!(
729 message = "Internal log rate limit configured.",
730 internal_log_rate_limit_secs,
731 internal_logs_source_rate_limit_secs =
732 internal_logs_source_rate_limit_secs.map(NonZeroU64::get),
733 );
734 info!(message = "Log level is enabled.", ?level);
735}
736
737pub fn watcher_config(
738 method: WatchConfigMethod,
739 interval: NonZeroU64,
740) -> config::watcher::WatcherConfig {
741 match method {
742 WatchConfigMethod::Recommended => config::watcher::WatcherConfig::RecommendedWatcher,
743 WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
744 }
745}