Skip to main content

vector/
app.rs

1#![allow(missing_docs)]
2#[cfg(unix)]
3use std::os::unix::process::ExitStatusExt;
4#[cfg(windows)]
5use std::os::windows::process::ExitStatusExt;
6use std::{
7    num::{NonZeroU64, NonZeroUsize},
8    path::PathBuf,
9    process::ExitStatus,
10    sync::atomic::{AtomicUsize, Ordering},
11    time::Duration,
12};
13
14use exitcode::ExitCode;
15use futures::StreamExt;
16use tokio::{
17    runtime::{self, Handle, Runtime},
18    sync::{MutexGuard, broadcast::error::RecvError},
19};
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22#[cfg(feature = "api")]
23use crate::api;
24#[cfg(feature = "api")]
25use crate::internal_events::ApiStarted;
26use crate::{
27    cli::{LogFormat, Opts, RootOpts, WatchConfigMethod, handle_config_errors},
28    config::{self, ComponentConfig, ComponentType, Config, ConfigPath},
29    extra_context::ExtraContext,
30    heartbeat,
31    internal_events::{
32        VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped, VectorStopping,
33    },
34    signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
35    topology::{
36        ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
37        TopologyController,
38    },
39    trace,
40};
41
42static WORKER_THREADS: AtomicUsize = AtomicUsize::new(0);
43
44pub fn worker_threads() -> Option<NonZeroUsize> {
45    NonZeroUsize::new(WORKER_THREADS.load(Ordering::Relaxed))
46}
47
48pub struct ApplicationConfig {
49    pub config_paths: Vec<config::ConfigPath>,
50    pub topology: RunningTopology,
51    pub graceful_crash_receiver: ShutdownErrorReceiver,
52    pub internal_topologies: Vec<RunningTopology>,
53    #[cfg(feature = "api")]
54    pub api: config::api::Options,
55    pub extra_context: ExtraContext,
56}
57
58pub struct Application {
59    pub root_opts: RootOpts,
60    pub config: ApplicationConfig,
61    pub signals: SignalPair,
62}
63
64impl ApplicationConfig {
65    pub async fn from_opts(
66        opts: &RootOpts,
67        signal_handler: &mut SignalHandler,
68        extra_context: ExtraContext,
69    ) -> Result<Self, ExitCode> {
70        let config_paths = opts.config_paths_with_formats();
71
72        let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
73            .then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));
74
75        let watcher_conf = if opts.watch_config {
76            Some(watcher_config(
77                opts.watch_config_method,
78                opts.watch_config_poll_interval_seconds,
79            ))
80        } else {
81            None
82        };
83
84        let config = load_configs(
85            &config_paths,
86            watcher_conf,
87            opts.require_healthy,
88            opts.allow_empty_config,
89            !opts.disable_env_var_interpolation,
90            graceful_shutdown_duration,
91            signal_handler,
92        )
93        .await?;
94
95        Self::from_config(config_paths, config, extra_context).await
96    }
97
98    pub async fn from_config(
99        config_paths: Vec<ConfigPath>,
100        config: Config,
101        extra_context: ExtraContext,
102    ) -> Result<Self, ExitCode> {
103        #[cfg(feature = "api")]
104        let api = config.api;
105
106        let (topology, graceful_crash_receiver) =
107            RunningTopology::start_init_validated(config, extra_context.clone())
108                .await
109                .ok_or(exitcode::CONFIG)?;
110
111        Ok(Self {
112            config_paths,
113            topology,
114            graceful_crash_receiver,
115            internal_topologies: Vec::new(),
116            #[cfg(feature = "api")]
117            api,
118            extra_context,
119        })
120    }
121
122    pub async fn add_internal_config(
123        &mut self,
124        config: Config,
125        extra_context: ExtraContext,
126    ) -> Result<(), ExitCode> {
127        let Some((topology, _)) =
128            RunningTopology::start_init_validated(config, extra_context).await
129        else {
130            return Err(exitcode::CONFIG);
131        };
132        self.internal_topologies.push(topology);
133        Ok(())
134    }
135
136    /// Configure the gRPC API server, if applicable
137    #[cfg(feature = "api")]
138    pub fn setup_api(&self, handle: &Handle) -> Option<api::GrpcServer> {
139        if self.api.enabled {
140            // Start gRPC server
141            let api_server = handle.block_on(api::GrpcServer::start(
142                self.topology.config(),
143                self.topology.watch(),
144            ));
145            match api_server {
146                Ok(server) => {
147                    emit!(ApiStarted {
148                        addr: server.addr()
149                    });
150                    Some(server)
151                }
152                Err(error) => {
153                    let error = error.to_string();
154                    error!(
155                        message = "An error occurred that Vector couldn't handle.",
156                        %error,
157                        internal_log_rate_limit = false
158                    );
159                    // Trigger shutdown because the API was explicitly enabled but failed to start
160                    // This ensures users don't run Vector thinking top/tap will work when they won't
161                    _ = self
162                        .topology
163                        .abort_tx
164                        .send(crate::signal::ShutdownError::ApiFailed { error });
165                    None
166                }
167            }
168        } else {
169            info!(
170                message = "API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`."
171            );
172            None
173        }
174    }
175}
176
177impl Application {
178    pub fn run(extra_context: ExtraContext) -> ExitStatus {
179        let (runtime, app) =
180            Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));
181
182        runtime.block_on(app.run())
183    }
184
185    pub fn prepare_start(
186        extra_context: ExtraContext,
187    ) -> Result<(Runtime, StartedApplication), ExitCode> {
188        Self::prepare(extra_context)
189            .and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
190    }
191
192    pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
193        let opts = Opts::get_matches().map_err(|error| {
194            // Printing to stdout/err can itself fail; ignore it.
195            _ = error.print();
196            exitcode::USAGE
197        })?;
198
199        Self::prepare_from_opts(opts, extra_context)
200    }
201
202    pub fn prepare_from_opts(
203        opts: Opts,
204        extra_context: ExtraContext,
205    ) -> Result<(Runtime, Self), ExitCode> {
206        opts.root.init_global();
207
208        #[cfg(feature = "sources-utils-http-encoding")]
209        crate::sources::util::http::set_max_decompressed_size_bytes(
210            opts.root.max_decompressed_size_bytes,
211        );
212
213        let color = opts.root.color.use_color();
214
215        init_logging(
216            color,
217            opts.root.log_format,
218            opts.log_level(),
219            opts.root.internal_log_rate_limit,
220            opts.root.internal_logs_source_rate_limit,
221        );
222
223        #[cfg(unix)]
224        if opts.root.raise_fd_limit {
225            crate::cli::raise_file_descriptor_limit();
226        }
227
228        // Set global color preference for downstream modules
229        crate::set_global_color(color);
230
231        // Can only log this after initializing the logging subsystem
232        if opts.root.openssl_no_probe {
233            debug!(
234                message = "Disabled probing and configuration of root certificate locations on the system for OpenSSL."
235            );
236        }
237
238        let runtime = build_runtime(
239            opts.root.threads,
240            opts.root.chunk_size_events,
241            "vector-worker",
242        )?;
243
244        // Signal handler for OS and provider messages.
245        let mut signals = SignalPair::new(&runtime);
246
247        if let Some(sub_command) = &opts.sub_command {
248            return Err(runtime.block_on(sub_command.execute(signals, color)));
249        }
250
251        let config = runtime.block_on(ApplicationConfig::from_opts(
252            &opts.root,
253            &mut signals.handler,
254            extra_context,
255        ))?;
256
257        Ok((
258            runtime,
259            Self {
260                root_opts: opts.root,
261                config,
262                signals,
263            },
264        ))
265    }
266
267    pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
268        // Any internal_logs sources will have grabbed a copy of the
269        // early buffer by this point and set up a subscriber.
270        crate::trace::stop_early_buffering();
271
272        emit!(VectorStarted);
273        handle.spawn(heartbeat::heartbeat());
274
275        let Self {
276            root_opts,
277            config,
278            signals,
279        } = self;
280
281        #[cfg(feature = "api")]
282        let api_server = config.setup_api(handle);
283
284        let topology_controller = SharedTopologyController::new(TopologyController {
285            #[cfg(feature = "api")]
286            api_server,
287            topology: config.topology,
288            config_paths: config.config_paths.clone(),
289            require_healthy: root_opts.require_healthy,
290            extra_context: config.extra_context,
291        });
292
293        Ok(StartedApplication {
294            config_paths: config.config_paths,
295            internal_topologies: config.internal_topologies,
296            graceful_crash_receiver: config.graceful_crash_receiver,
297            signals,
298            topology_controller,
299            allow_empty_config: root_opts.allow_empty_config,
300            interpolate_env: !root_opts.disable_env_var_interpolation,
301        })
302    }
303}
304
305pub struct StartedApplication {
306    pub config_paths: Vec<ConfigPath>,
307    pub internal_topologies: Vec<RunningTopology>,
308    pub graceful_crash_receiver: ShutdownErrorReceiver,
309    pub signals: SignalPair,
310    pub topology_controller: SharedTopologyController,
311    pub allow_empty_config: bool,
312    pub interpolate_env: bool,
313}
314
315impl StartedApplication {
316    pub async fn run(self) -> ExitStatus {
317        self.main().await.shutdown().await
318    }
319
320    pub async fn main(self) -> FinishedApplication {
321        let Self {
322            config_paths,
323            graceful_crash_receiver,
324            signals,
325            topology_controller,
326            internal_topologies,
327            allow_empty_config,
328            interpolate_env,
329        } = self;
330
331        let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
332
333        let mut signal_handler = signals.handler;
334        let mut signal_rx = signals.receiver;
335
336        let signal = loop {
337            let has_sources = !topology_controller.lock().await.topology.config.is_empty();
338            tokio::select! {
339                signal = signal_rx.recv() => if let Some(signal) = handle_signal(
340                    signal,
341                    &topology_controller,
342                    &config_paths,
343                    &mut signal_handler,
344                    allow_empty_config,
345                    interpolate_env,
346                ).await {
347                    break signal;
348                },
349                // Trigger graceful shutdown if a component crashed, or all sources have ended.
350                error = graceful_crash.next() => break SignalTo::Shutdown(error),
351                _ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
352                    info!("All sources have finished.");
353                    break SignalTo::Shutdown(None)
354                } ,
355                else => unreachable!("Signal streams never end"),
356            }
357        };
358
359        FinishedApplication {
360            signal,
361            signal_rx,
362            topology_controller,
363            internal_topologies,
364        }
365    }
366}
367
368async fn handle_signal(
369    signal: Result<SignalTo, RecvError>,
370    topology_controller: &SharedTopologyController,
371    config_paths: &[ConfigPath],
372    signal_handler: &mut SignalHandler,
373    allow_empty_config: bool,
374    interpolate_env: bool,
375) -> Option<SignalTo> {
376    match signal {
377        Ok(SignalTo::ReloadComponents(components_to_reload)) => {
378            let mut topology_controller = topology_controller.lock().await;
379            topology_controller
380                .topology
381                .extend_reload_set(components_to_reload);
382
383            // Reload paths
384            if let Some(paths) = config::process_paths(config_paths) {
385                topology_controller.config_paths = paths;
386            }
387
388            // Reload config
389            let new_config = config::load_from_paths_with_provider_and_secrets(
390                &topology_controller.config_paths,
391                signal_handler,
392                allow_empty_config,
393                interpolate_env,
394            )
395            .await;
396
397            reload_config_from_result(topology_controller, new_config).await
398        }
399        Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
400            let topology_controller = topology_controller.lock().await;
401            reload_config_from_result(topology_controller, config_builder.build()).await
402        }
403        Ok(SignalTo::ReloadFromDisk) => {
404            let mut topology_controller = topology_controller.lock().await;
405
406            // Reload paths
407            if let Some(paths) = config::process_paths(config_paths) {
408                topology_controller.config_paths = paths;
409            }
410
411            // Reload config
412            let new_config = config::load_from_paths_with_provider_and_secrets(
413                &topology_controller.config_paths,
414                signal_handler,
415                allow_empty_config,
416                interpolate_env,
417            )
418            .await;
419
420            if let Ok(ref config) = new_config {
421                // Find all transforms that have external files to watch
422                let transform_keys_to_reload = config.transform_keys_with_external_files();
423
424                // Add these transforms to reload set
425                if !transform_keys_to_reload.is_empty() {
426                    info!(
427                        message = "Reloading transforms with external files.",
428                        count = transform_keys_to_reload.len()
429                    );
430                    topology_controller
431                        .topology
432                        .extend_reload_set(transform_keys_to_reload);
433                }
434            }
435
436            reload_config_from_result(topology_controller, new_config).await
437        }
438        Ok(SignalTo::ReloadEnrichmentTables) => {
439            let topology_controller = topology_controller.lock().await;
440
441            topology_controller
442                .topology
443                .reload_enrichment_tables()
444                .await;
445            None
446        }
447        Err(RecvError::Lagged(amt)) => {
448            warn!("Overflow, dropped {} signals.", amt);
449            None
450        }
451        Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
452        Ok(signal) => Some(signal),
453    }
454}
455
456async fn reload_config_from_result(
457    mut topology_controller: MutexGuard<'_, TopologyController>,
458    config: Result<Config, Vec<String>>,
459) -> Option<SignalTo> {
460    match config {
461        Ok(new_config) => match topology_controller.reload(new_config).await {
462            ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
463            _ => None,
464        },
465        Err(errors) => {
466            handle_config_errors(errors);
467            emit!(VectorConfigLoadError);
468            None
469        }
470    }
471}
472
473pub struct FinishedApplication {
474    pub signal: SignalTo,
475    pub signal_rx: SignalRx,
476    pub topology_controller: SharedTopologyController,
477    pub internal_topologies: Vec<RunningTopology>,
478}
479
480impl FinishedApplication {
481    pub async fn shutdown(self) -> ExitStatus {
482        let FinishedApplication {
483            signal,
484            signal_rx,
485            topology_controller,
486            internal_topologies,
487        } = self;
488
489        // At this point, we'll have the only reference to the shared topology controller and can
490        // safely remove it from the wrapper to shut down the topology.
491        let topology_controller = topology_controller
492            .try_into_inner()
493            .expect("fail to unwrap topology controller")
494            .into_inner();
495
496        let status = match signal {
497            SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
498            SignalTo::Quit => Self::quit(),
499            _ => unreachable!(),
500        };
501
502        for topology in internal_topologies {
503            topology.stop().await;
504        }
505
506        status
507    }
508
509    async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
510        emit!(VectorStopping);
511        tokio::select! {
512            _ = topology_controller.stop() => {
513                emit!(VectorStopped);
514                ExitStatus::from_raw({
515                    #[cfg(windows)]
516                    {
517                        exitcode::OK as u32
518                    }
519                    #[cfg(unix)]
520                    exitcode::OK
521                })
522            }, // Graceful shutdown finished
523            _ = signal_rx.recv() => Self::quit(),
524        }
525    }
526
527    fn quit() -> ExitStatus {
528        // It is highly unlikely that this event will exit from topology.
529        emit!(VectorQuit);
530        ExitStatus::from_raw({
531            #[cfg(windows)]
532            {
533                exitcode::UNAVAILABLE as u32
534            }
535            #[cfg(unix)]
536            exitcode::OK
537        })
538    }
539}
540
541fn get_log_levels(default: &str) -> String {
542    std::env::var("VECTOR_LOG")
543        .or_else(|_| {
544            std::env::var("LOG").inspect(|_log| {
545                warn!(
546                    message =
547                        "DEPRECATED: Use of $LOG is deprecated. Please use $VECTOR_LOG instead."
548                );
549            })
550        })
551        .unwrap_or_else(|_| default.into())
552}
553
554pub fn build_runtime(
555    threads: Option<usize>,
556    chunk_size_events: Option<NonZeroUsize>,
557    thread_name: &str,
558) -> Result<Runtime, ExitCode> {
559    let mut rt_builder = runtime::Builder::new_multi_thread();
560    rt_builder.max_blocking_threads(20_000);
561    rt_builder.enable_all().thread_name(thread_name);
562
563    let threads = threads.unwrap_or_else(crate::num_threads);
564    if threads == 0 {
565        error!("The `threads` argument must be greater or equal to 1.");
566        return Err(exitcode::CONFIG);
567    }
568    WORKER_THREADS
569        .compare_exchange(0, threads, Ordering::Acquire, Ordering::Relaxed)
570        .unwrap_or_else(|_| panic!("double thread initialization"));
571    rt_builder.worker_threads(threads);
572
573    let chunk_size_events = chunk_size_events
574        .map(NonZeroUsize::get)
575        .unwrap_or(vector_lib::source_sender::DEFAULT_CHUNK_SIZE_EVENTS);
576
577    let Some(source_sender_buffer_size) = threads.checked_mul(chunk_size_events) else {
578        error!(
579            "The `chunk_size_events` argument is too large for the configured number of threads."
580        );
581        return Err(exitcode::CONFIG);
582    };
583    let Some(ready_array_capacity) =
584        chunk_size_events.checked_mul(crate::topology::builder::READY_ARRAY_CAPACITY_CHUNKS)
585    else {
586        error!("The `chunk_size_events` argument is too large.");
587        return Err(exitcode::CONFIG);
588    };
589
590    vector_lib::source_sender::set_chunk_size_events(chunk_size_events);
591    crate::topology::builder::set_source_sender_buffer_size(source_sender_buffer_size);
592    crate::topology::builder::set_ready_array_capacity(ready_array_capacity);
593
594    debug!(
595        message = "Building runtime.",
596        worker_threads = threads,
597        chunk_size_events
598    );
599    Ok(rt_builder.build().expect("Unable to create async runtime"))
600}
601
602pub async fn load_configs(
603    config_paths: &[ConfigPath],
604    watcher_conf: Option<config::watcher::WatcherConfig>,
605    require_healthy: Option<bool>,
606    allow_empty_config: bool,
607    interpolate_env: bool,
608    graceful_shutdown_duration: Option<Duration>,
609    signal_handler: &mut SignalHandler,
610) -> Result<Config, ExitCode> {
611    let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
612
613    let watched_paths = config_paths
614        .iter()
615        .map(<&PathBuf>::from)
616        .collect::<Vec<_>>();
617
618    info!(
619        message = "Loading configs.",
620        paths = ?watched_paths
621    );
622
623    let mut config = config::load_from_paths_with_provider_and_secrets(
624        &config_paths,
625        signal_handler,
626        allow_empty_config,
627        interpolate_env,
628    )
629    .await
630    .map_err(handle_config_errors)?;
631
632    let mut watched_component_paths = Vec::new();
633
634    if let Some(watcher_conf) = watcher_conf {
635        for (name, transform) in config.transforms() {
636            let files = transform.inner.files_to_watch();
637            let component_config = ComponentConfig::new(
638                files.into_iter().cloned().collect(),
639                name.clone(),
640                ComponentType::Transform,
641            );
642            watched_component_paths.push(component_config);
643        }
644
645        for (name, sink) in config.sinks() {
646            let files = sink.inner.files_to_watch();
647            let component_config = ComponentConfig::new(
648                files.into_iter().cloned().collect(),
649                name.clone(),
650                ComponentType::Sink,
651            );
652            watched_component_paths.push(component_config);
653        }
654
655        for (name, table) in config.enrichment_tables() {
656            let files = table.inner.files_to_watch();
657            let component_config = ComponentConfig::new(
658                files.clone().into_iter().cloned().collect(),
659                name.clone(),
660                ComponentType::EnrichmentTable,
661            );
662            watched_component_paths.push(component_config);
663            if table.as_sink(name).is_some() {
664                let sink_component_config = ComponentConfig::new(
665                    files.into_iter().cloned().collect(),
666                    name.clone(),
667                    ComponentType::Sink,
668                );
669                watched_component_paths.push(sink_component_config);
670            }
671        }
672
673        info!(
674            message = "Starting watcher.",
675            paths = ?watched_paths
676        );
677        info!(
678            message = "Components to watch.",
679            paths = ?watched_component_paths
680        );
681
682        // Start listening for config changes.
683        config::watcher::spawn_thread(
684            watcher_conf,
685            signal_handler.clone_tx(),
686            watched_paths,
687            watched_component_paths,
688            None,
689        )
690        .map_err(|error| {
691            error!(message = "Unable to start config watcher.", %error);
692            exitcode::CONFIG
693        })?;
694    }
695
696    config::init_log_schema(config.global.log_schema.clone(), true);
697    config::init_telemetry(config.global.telemetry.clone(), true);
698
699    if !config.healthchecks.enabled {
700        info!("Health checks are disabled.");
701    }
702    config.healthchecks.set_require_healthy(require_healthy);
703    config.graceful_shutdown_duration = graceful_shutdown_duration;
704
705    Ok(config)
706}
707
708pub fn init_logging(
709    color: bool,
710    format: LogFormat,
711    log_level: &str,
712    internal_log_rate_limit_secs: u64,
713    internal_logs_source_rate_limit_secs: Option<NonZeroU64>,
714) {
715    let level = get_log_levels(log_level);
716    let json = match format {
717        LogFormat::Text => false,
718        LogFormat::Json => true,
719    };
720
721    trace::init(
722        color,
723        json,
724        &level,
725        internal_log_rate_limit_secs,
726        internal_logs_source_rate_limit_secs,
727    );
728    debug!(
729        message = "Internal log rate limit configured.",
730        internal_log_rate_limit_secs,
731        internal_logs_source_rate_limit_secs =
732            internal_logs_source_rate_limit_secs.map(NonZeroU64::get),
733    );
734    info!(message = "Log level is enabled.", ?level);
735}
736
737pub fn watcher_config(
738    method: WatchConfigMethod,
739    interval: NonZeroU64,
740) -> config::watcher::WatcherConfig {
741    match method {
742        WatchConfigMethod::Recommended => config::watcher::WatcherConfig::RecommendedWatcher,
743        WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
744    }
745}