1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use chrono::Local;
5use futures_util::future::join_all;
6use http::Uri;
7use regex::Regex;
8use tokio::sync::{mpsc, oneshot};
9use vector_lib::api_client::{Client, RECONNECT_DELAY_MS};
10
11use vector_lib::top::{
12 dashboard::{init_dashboard, is_tty},
13 metrics,
14 state::{self, ConnectionStatus, EventType, State},
15};
16
17#[allow(clippy::print_stderr)]
20pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode {
21 if !is_tty() {
23 eprintln!("Terminal must be a teletype (TTY) to display a Vector dashboard.");
24 return exitcode::IOERR;
25 }
26
27 let url = opts.url();
28
29 let Ok(uri) = url.as_str().parse::<Uri>() else {
31 eprintln!("Invalid API URL: {url}");
32 return exitcode::USAGE;
33 };
34 let mut client = Client::new(uri.clone());
35
36 if client.connect().await.is_err() || client.health().await.is_err() {
37 eprintln!(
38 indoc::indoc! {"
39 Vector API server isn't reachable ({}).
40
41 Have you enabled the API?
42
43 To enable the API, add the following to your Vector config file:
44
45 [api]
46 enabled = true"},
47 url
48 );
49 return exitcode::UNAVAILABLE;
50 }
51
52 top(opts, uri, "Vector").await
53}
54
55pub async fn top(opts: &super::Opts, uri: Uri, dashboard_title: &str) -> exitcode::ExitCode {
57 let (tx, rx) = tokio::sync::mpsc::channel(20);
59 let (ui_tx, ui_rx) = tokio::sync::mpsc::channel(20);
60 let mut starting_state = State::new(BTreeMap::new());
61 starting_state.sort_state.column = opts.sort_field;
62 starting_state.sort_state.reverse = opts.sort_desc;
63 starting_state.filter_state.column = opts.filter_field;
64 starting_state.filter_state.pattern = opts
65 .filter_value
66 .as_deref()
67 .map(Regex::new)
68 .and_then(Result::ok);
69 let state_rx = state::updater(rx, ui_rx, starting_state).await;
70 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
72
73 let connection = tokio::spawn(subscription(opts.clone(), uri, tx.clone(), shutdown_tx));
74
75 match init_dashboard(
77 dashboard_title,
78 opts.url().as_str(),
79 opts.interval,
80 opts.human_metrics,
81 ui_tx,
82 state_rx,
83 shutdown_rx,
84 )
85 .await
86 {
87 Ok(_) => {
88 connection.abort();
89 exitcode::OK
90 }
91 Err(err) => {
92 #[allow(clippy::print_stderr)]
93 {
94 eprintln!("[top] Encountered shutdown error: {err}");
95 }
96 connection.abort();
97 exitcode::IOERR
98 }
99 }
100}
101
102async fn subscription(
105 opts: super::Opts,
106 uri: Uri,
107 tx: mpsc::Sender<EventType>,
108 shutdown_tx: oneshot::Sender<()>,
109) {
110 loop {
111 let state = match metrics::init_components(uri.clone(), &opts.components).await {
115 Ok(state) => state,
116 Err(_) => {
117 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY_MS)).await;
118 continue;
119 }
120 };
121 let initial_components = state
122 .components
123 .keys()
124 .map(|k| k.id().to_string())
125 .collect();
126 _ = tx.send(EventType::InitializeState(state)).await;
127
128 let handles = match metrics::subscribe(
130 uri.clone(),
131 tx.clone(),
132 opts.interval as i64,
133 opts.components.clone(),
134 initial_components,
135 )
136 .await
137 {
138 Ok(handles) => handles,
139 Err(_) => {
140 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY_MS)).await;
141 continue;
142 }
143 };
144
145 _ = tx
146 .send(EventType::ConnectionUpdated(ConnectionStatus::Connected(
147 Local::now(),
148 )))
149 .await;
150
151 _ = join_all(handles.metric_handles).await;
155 handles.poll_handle.abort();
156
157 _ = tx
158 .send(EventType::ConnectionUpdated(
159 ConnectionStatus::Disconnected(RECONNECT_DELAY_MS),
160 ))
161 .await;
162
163 if opts.no_reconnect {
164 _ = shutdown_tx.send(());
165 break;
166 }
167 }
168}