Skip to main content

vector/top/
cmd.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use chrono::Local;
5use futures_util::future::join_all;
6use http::Uri;
7use regex::Regex;
8use tokio::sync::{mpsc, oneshot};
9use vector_lib::api_client::{Client, RECONNECT_DELAY_MS};
10
11use vector_lib::top::{
12    dashboard::{init_dashboard, is_tty},
13    metrics,
14    state::{self, ConnectionStatus, EventType, State},
15};
16
17/// CLI command func for displaying Vector components, and communicating with a local/remote
18/// Vector API server via gRPC
19#[allow(clippy::print_stderr)]
20pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode {
21    // Exit early if the terminal is not a teletype
22    if !is_tty() {
23        eprintln!("Terminal must be a teletype (TTY) to display a Vector dashboard.");
24        return exitcode::IOERR;
25    }
26
27    let url = opts.url();
28
29    // Create a new API client for connecting to the local/remote Vector instance.
30    let Ok(uri) = url.as_str().parse::<Uri>() else {
31        eprintln!("Invalid API URL: {url}");
32        return exitcode::USAGE;
33    };
34    let mut client = Client::new(uri.clone());
35
36    if client.connect().await.is_err() || client.health().await.is_err() {
37        eprintln!(
38            indoc::indoc! {"
39            Vector API server isn't reachable ({}).
40
41            Have you enabled the API?
42
43            To enable the API, add the following to your Vector config file:
44
45            [api]
46                enabled = true"},
47            url
48        );
49        return exitcode::UNAVAILABLE;
50    }
51
52    top(opts, uri, "Vector").await
53}
54
55/// General monitoring
56pub async fn top(opts: &super::Opts, uri: Uri, dashboard_title: &str) -> exitcode::ExitCode {
57    // Channel for updating state via event messages
58    let (tx, rx) = tokio::sync::mpsc::channel(20);
59    let (ui_tx, ui_rx) = tokio::sync::mpsc::channel(20);
60    let mut starting_state = State::new(BTreeMap::new());
61    starting_state.sort_state.column = opts.sort_field;
62    starting_state.sort_state.reverse = opts.sort_desc;
63    starting_state.filter_state.column = opts.filter_field;
64    starting_state.filter_state.pattern = opts
65        .filter_value
66        .as_deref()
67        .map(Regex::new)
68        .and_then(Result::ok);
69    let state_rx = state::updater(rx, ui_rx, starting_state).await;
70    // Channel for shutdown signal
71    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
72
73    let connection = tokio::spawn(subscription(opts.clone(), uri, tx.clone(), shutdown_tx));
74
75    // Initialize the dashboard
76    match init_dashboard(
77        dashboard_title,
78        opts.url().as_str(),
79        opts.interval,
80        opts.human_metrics,
81        ui_tx,
82        state_rx,
83        shutdown_rx,
84    )
85    .await
86    {
87        Ok(_) => {
88            connection.abort();
89            exitcode::OK
90        }
91        Err(err) => {
92            #[allow(clippy::print_stderr)]
93            {
94                eprintln!("[top] Encountered shutdown error: {err}");
95            }
96            connection.abort();
97            exitcode::IOERR
98        }
99    }
100}
101
102// This task handles reconnecting the gRPC client and all
103// subscriptions in the case of a connection failure
104async fn subscription(
105    opts: super::Opts,
106    uri: Uri,
107    tx: mpsc::Sender<EventType>,
108    shutdown_tx: oneshot::Sender<()>,
109) {
110    loop {
111        // Initialize state. On future reconnects, we re-initialize state in
112        // order to accurately capture added, removed, and edited
113        // components.
114        let state = match metrics::init_components(uri.clone(), &opts.components).await {
115            Ok(state) => state,
116            Err(_) => {
117                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY_MS)).await;
118                continue;
119            }
120        };
121        let initial_components = state
122            .components
123            .keys()
124            .map(|k| k.id().to_string())
125            .collect();
126        _ = tx.send(EventType::InitializeState(state)).await;
127
128        // Subscribe to updated metrics via gRPC streaming
129        let handles = match metrics::subscribe(
130            uri.clone(),
131            tx.clone(),
132            opts.interval as i64,
133            opts.components.clone(),
134            initial_components,
135        )
136        .await
137        {
138            Ok(handles) => handles,
139            Err(_) => {
140                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY_MS)).await;
141                continue;
142            }
143        };
144
145        _ = tx
146            .send(EventType::ConnectionUpdated(ConnectionStatus::Connected(
147                Local::now(),
148            )))
149            .await;
150
151        // Wait for metric stream tasks to finish. poll_components is intentionally
152        // excluded: it runs indefinitely while get_components succeeds, so joining
153        // it here would prevent reconnection when metric streams fail first.
154        _ = join_all(handles.metric_handles).await;
155        handles.poll_handle.abort();
156
157        _ = tx
158            .send(EventType::ConnectionUpdated(
159                ConnectionStatus::Disconnected(RECONNECT_DELAY_MS),
160            ))
161            .await;
162
163        if opts.no_reconnect {
164            _ = shutdown_tx.send(());
165            break;
166        }
167    }
168}