Skip to main content

vector/
trace.rs

1#![allow(missing_docs)]
2use std::{
3    collections::{HashMap, HashSet},
4    marker::PhantomData,
5    str::FromStr,
6    sync::{
7        LazyLock, Mutex, MutexGuard, OnceLock,
8        atomic::{AtomicBool, Ordering},
9    },
10};
11
12use futures_util::{Stream, StreamExt, future::ready};
13use metrics_tracing_context::MetricsLayer;
14use tokio::sync::{
15    broadcast::{self, Receiver, Sender},
16    oneshot,
17};
18use tokio_stream::wrappers::BroadcastStream;
19use tracing::{Event, Subscriber};
20use tracing_limit::RateLimitedLayer;
21use tracing_subscriber::{
22    Layer,
23    filter::LevelFilter,
24    layer::{Context, SubscriberExt},
25    registry::LookupSpan,
26    util::SubscriberInitExt,
27};
28pub use tracing_tower::{InstrumentableService, InstrumentedService};
29use vector_lib::SpanField;
30use vector_lib::lookup::event_path;
31use vrl::value::Value;
32
33use crate::event::LogEvent;
34
35/// BUFFER contains all of the internal log events generated by Vector between the initialization of `tracing` and early
36/// buffering being stopped, which occurs once the topology reports as having successfully started.
37///
38/// This means that callers must subscribe during the configuration phase of their components, and not in the core loop
39/// of the component, as the topology can only report when a component has been spawned, but not necessarily always
40/// when it has started doing, or waiting, for input.
41static BUFFER: Mutex<Option<Vec<LogEvent>>> = Mutex::new(Some(Vec::new()));
42
43/// SHOULD_BUFFER controls whether or not internal log events should be buffered or sent directly to the trace broadcast
44/// channel.
45static SHOULD_BUFFER: AtomicBool = AtomicBool::new(true);
46
47/// SUBSCRIBERS contains a list of callers interested in internal log events who will be notified when early buffering
48/// is disabled, by receiving a copy of all buffered internal log events.
49static SUBSCRIBERS: Mutex<Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> =
50    Mutex::new(Some(Vec::new()));
51
52/// SENDER holds the sender/receiver handle that will receive a copy of all the internal log events *after* the topology
53/// has been initialized.
54static SENDER: OnceLock<Sender<LogEvent>> = OnceLock::new();
55
56fn metrics_layer_enabled() -> bool {
57    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
58}
59
60pub fn init(
61    color: bool,
62    json: bool,
63    levels: &str,
64    internal_log_rate_limit: u64,
65    broadcast_rate_limit: Option<std::num::NonZeroU64>,
66) {
67    let fmt_filter = tracing_subscriber::filter::Targets::from_str(levels).expect(
68        "logging filter targets were not formatted correctly or did not specify a valid level",
69    );
70
71    let metrics_layer =
72        metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO));
73
74    // The broadcast layer feeds the internal_logs source. By default it is NOT rate limited so
75    // that users can capture ALL internal Vector logs for debugging/monitoring. Rate limiting can
76    // be opted into by passing `Some(rate)` for `broadcast_rate_limit`.
77    //
78    // Two separate `Option<Layer>` values are used rather than a single branch because
79    // `RateLimitedLayer<BroadcastLayer<S>>` and `BroadcastLayer<S>` are distinct types.
80    // `tracing_subscriber` implements `Layer` for `Option<L>`, which allows the unused layer to be
81    // represented as a no-op `None`.
82    let rate_limited_broadcast = broadcast_rate_limit.map(|rate| {
83        RateLimitedLayer::new(BroadcastLayer::new())
84            .with_default_limit(rate.get())
85            .with_filter(fmt_filter.clone())
86    });
87    let unlimited_broadcast = broadcast_rate_limit
88        .is_none()
89        .then(|| BroadcastLayer::new().with_filter(fmt_filter.clone()));
90    debug_assert_ne!(
91        rate_limited_broadcast.is_some(),
92        unlimited_broadcast.is_some()
93    );
94
95    let subscriber = tracing_subscriber::registry()
96        .with(metrics_layer)
97        .with(rate_limited_broadcast)
98        .with(unlimited_broadcast);
99
100    #[cfg(feature = "tokio-console")]
101    let subscriber = {
102        let console_layer = console_subscriber::ConsoleLayer::builder()
103            .with_default_env()
104            .spawn();
105
106        subscriber.with(console_layer)
107    };
108
109    #[cfg(unix)]
110    let subscriber = {
111        let allocation_layer = crate::internal_telemetry::allocations::AllocationLayer::new()
112            .with_filter(LevelFilter::ERROR);
113
114        subscriber.with(allocation_layer)
115    };
116
117    if json {
118        let formatter = tracing_subscriber::fmt::layer().json().flatten_event(true);
119
120        #[cfg(test)]
121        let formatter = formatter.with_test_writer();
122
123        let rate_limited =
124            RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
125        let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
126
127        _ = subscriber.try_init();
128    } else {
129        let formatter = tracing_subscriber::fmt::layer()
130            .with_ansi(color)
131            .with_writer(std::io::stderr);
132
133        #[cfg(test)]
134        let formatter = formatter.with_test_writer();
135
136        let rate_limited =
137            RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
138        let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
139
140        _ = subscriber.try_init();
141    }
142}
143
144#[cfg(test)]
145pub fn reset_early_buffer() -> Option<Vec<LogEvent>> {
146    get_early_buffer().replace(Vec::new())
147}
148
149/// Gets a  mutable reference to the early buffer.
150fn get_early_buffer() -> MutexGuard<'static, Option<Vec<LogEvent>>> {
151    BUFFER
152        .lock()
153        .expect("Couldn't acquire lock on internal logs buffer")
154}
155
156/// Determines whether tracing events should be processed (e.g. converted to log
157/// events) to avoid unnecessary performance overhead.
158///
159/// Checks if [`BUFFER`] is set or if a trace sender exists
160fn should_process_tracing_event() -> bool {
161    get_early_buffer().is_some() || maybe_get_trace_sender().is_some()
162}
163
164/// Attempts to buffer an event into the early buffer.
165fn try_buffer_event(log: &LogEvent) -> bool {
166    if SHOULD_BUFFER.load(Ordering::Acquire)
167        && let Some(buffer) = get_early_buffer().as_mut()
168    {
169        buffer.push(log.clone());
170        return true;
171    }
172
173    false
174}
175
176/// Attempts to broadcast an event to subscribers.
177///
178/// If no subscribers are connected, this does nothing.
179fn try_broadcast_event(log: LogEvent) {
180    if let Some(sender) = maybe_get_trace_sender() {
181        _ = sender.send(log);
182    }
183}
184
185/// Consumes the early buffered events.
186///
187/// # Panics
188///
189/// If the early buffered events have already been consumed, this function will panic.
190fn consume_early_buffer() -> Vec<LogEvent> {
191    get_early_buffer()
192        .take()
193        .expect("early buffer was already consumed")
194}
195
196/// Gets or creates a trace sender for sending internal log events.
197fn get_trace_sender() -> &'static broadcast::Sender<LogEvent> {
198    SENDER.get_or_init(|| broadcast::channel(99).0)
199}
200
201/// Attempts to get the trace sender for sending internal log events.
202///
203/// If the trace sender has not yet been created, `None` is returned.
204fn maybe_get_trace_sender() -> Option<&'static broadcast::Sender<LogEvent>> {
205    SENDER.get()
206}
207
208/// Creates a trace receiver that receives internal log events.
209///
210/// This will create a trace sender if one did not already exist.
211fn get_trace_receiver() -> broadcast::Receiver<LogEvent> {
212    get_trace_sender().subscribe()
213}
214
215/// Gets a mutable reference to the list of waiting subscribers, if it exists.
216fn get_trace_subscriber_list() -> MutexGuard<'static, Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> {
217    SUBSCRIBERS.lock().expect("poisoned locks are dumb")
218}
219
220/// Attempts to register for early buffered events.
221///
222/// If early buffering has not yet been stopped, `Some(receiver)` is returned. The given receiver will resolve to a
223/// vector of all early buffered events once early buffering has been stopped. Otherwise, if early buffering is already
224/// stopped, `None` is returned.
225fn try_register_for_early_events() -> Option<oneshot::Receiver<Vec<LogEvent>>> {
226    if SHOULD_BUFFER.load(Ordering::Acquire) {
227        // We're still in early buffering mode. Attempt to subscribe by adding a oneshot sender
228        // to SUBSCRIBERS. If it's already been consumed, then we've gotten beaten out by a
229        // caller that is disabling early buffering, so we just go with the flow either way.
230        get_trace_subscriber_list().as_mut().map(|subscribers| {
231            let (tx, rx) = oneshot::channel();
232            subscribers.push(tx);
233            rx
234        })
235    } else {
236        // Early buffering is being or has been disabled, so we can no longer register.
237        None
238    }
239}
240
241/// Stops early buffering.
242///
243/// This flushes any buffered log events to waiting subscribers and redirects log events from the buffer to the
244/// broadcast stream.
245pub fn stop_early_buffering() {
246    // Try and disable early buffering.
247    //
248    // If it was already disabled, or we lost the race to disable it, just return.
249    if SHOULD_BUFFER
250        .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
251        .is_err()
252    {
253        return;
254    }
255
256    // We won the right to capture all buffered events and forward them to any waiting subscribers,
257    // so let's grab the subscriber list and see if there's actually anyone waiting.
258    let subscribers = get_trace_subscriber_list().take();
259    if let Some(subscribers_tx) = subscribers {
260        // Consume the early buffer, and send a copy of it to every waiting subscriber.
261        let buffered_events = consume_early_buffer();
262        for subscriber_tx in subscribers_tx {
263            // Ignore any errors sending since the caller may have dropped or something else.
264            _ = subscriber_tx.send(buffered_events.clone());
265        }
266    }
267}
268
269/// A subscription to the log events flowing in via `tracing`, in the Vector native format.
270///
271/// Used to capture tracing events from internal log telemetry, via `tracing`, and convert them to native Vector events,
272/// specifically `LogEvent`, such that they can be shuttled around and treated as normal events.  Currently only powers
273/// the `internal_logs` source, but could be used for other purposes if need be.
274pub struct TraceSubscription {
275    buffered_events_rx: Option<oneshot::Receiver<Vec<LogEvent>>>,
276    trace_rx: Receiver<LogEvent>,
277}
278
279impl TraceSubscription {
280    /// Registers a subscription to the internal log event stream.
281    pub fn subscribe() -> TraceSubscription {
282        let buffered_events_rx = try_register_for_early_events();
283        let trace_rx = get_trace_receiver();
284
285        Self {
286            buffered_events_rx,
287            trace_rx,
288        }
289    }
290
291    /// Gets any early buffered log events.
292    ///
293    /// If this subscription was registered after early buffering was turned off, `None` will be returned immediately.
294    /// Otherwise, waits for early buffering to be stopped and returns `Some(events)` where `events` contains all events
295    /// seen from the moment `tracing` was initialized to the moment early buffering was stopped.
296    pub async fn buffered_events(&mut self) -> Option<Vec<LogEvent>> {
297        // If we have a receiver for buffered events, and it returns them successfully, then pass
298        // them back.  We don't care if the sender drops in the meantime, so just swallow that error.
299        match self.buffered_events_rx.take() {
300            Some(rx) => rx.await.ok(),
301            None => None,
302        }
303    }
304
305    /// Converts this subscription into a raw stream of log events.
306    pub fn into_stream(self) -> impl Stream<Item = LogEvent> + Unpin {
307        // We ignore errors because the only error we get is when the broadcast receiver lags, and there's nothing we
308        // can actually do about that so there's no reason to force callers to even deal with it.
309        BroadcastStream::new(self.trace_rx).filter_map(|event| ready(event.ok()))
310    }
311}
312
313struct BroadcastLayer<S> {
314    _subscriber: PhantomData<S>,
315}
316
317impl<S> BroadcastLayer<S> {
318    const fn new() -> Self {
319        BroadcastLayer {
320            _subscriber: PhantomData,
321        }
322    }
323}
324
325impl<S> Layer<S> for BroadcastLayer<S>
326where
327    S: Subscriber + 'static + for<'lookup> LookupSpan<'lookup>,
328{
329    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
330        if should_process_tracing_event() {
331            let mut log = LogEvent::from(event);
332            // Add span fields if available
333            if let Some(parent_span) = ctx.event_span(event) {
334                for span in parent_span.scope().from_root() {
335                    if let Some(fields) = span.extensions().get::<SpanFields>() {
336                        for (k, v) in &fields.0 {
337                            log.insert(event_path!("vector", *k), v.clone());
338                        }
339                    }
340                }
341            }
342            // Try buffering the event, and if we're not buffering anymore, try to
343            // send it along via the trace sender if it's been established.
344            if !try_buffer_event(&log) {
345                try_broadcast_event(log);
346            }
347        }
348    }
349
350    fn on_new_span(
351        &self,
352        attrs: &tracing_core::span::Attributes<'_>,
353        id: &tracing_core::span::Id,
354        ctx: Context<'_, S>,
355    ) {
356        let span = ctx.span(id).expect("span must already exist!");
357        let mut fields = SpanFields::default();
358        attrs.values().record(&mut fields);
359        span.extensions_mut().insert(fields);
360    }
361}
362
363#[derive(Default, Debug)]
364struct SpanFields(HashMap<&'static str, Value>);
365
366inventory::submit!(SpanField("component_id"));
367inventory::submit!(SpanField("component_type"));
368inventory::submit!(SpanField("component_kind"));
369
370/// Snapshot of every registered [`SpanField`],
371/// materialized once on first access. `inventory` populates submissions before `main`, so the
372/// snapshot is guaranteed to capture every entry; the read path on every traced span event is
373/// then a single set lookup against this static.
374static SPAN_FIELDS: LazyLock<HashSet<&'static str>> =
375    LazyLock::new(|| inventory::iter::<SpanField>().map(|f| f.0).collect());
376
377impl SpanFields {
378    fn record(&mut self, field: &tracing_core::Field, value: impl Into<Value>) {
379        let name = field.name();
380        // Filter for span fields such as component_id, component_type, etc.
381        //
382        // This captures all the basic component information provided in the
383        // span that each component is spawned with. We don't capture all fields
384        // to avoid adding unintentional noise and to prevent accidental
385        // security/privacy issues (e.g. leaking sensitive data). Downstream
386        // crates can extend the allowlist by name through
387        // `register_extra_span_field!` (see `vector_lib::SpanField`).
388        if SPAN_FIELDS.contains(name) {
389            self.0.insert(name, value.into());
390        }
391    }
392}
393
394impl tracing::field::Visit for SpanFields {
395    fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
396        self.record(field, value);
397    }
398
399    fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
400        self.record(field, value);
401    }
402
403    fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
404        self.record(field, value);
405    }
406
407    fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
408        self.record(field, value);
409    }
410
411    fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) {
412        self.record(field, format!("{value:?}"));
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use std::{str::FromStr, time::Duration};
419
420    use serial_test::serial;
421    use tracing::info;
422    use tracing_subscriber::layer::SubscriberExt;
423
424    use futures::StreamExt as _;
425
426    use super::*;
427
428    /// Verifies that `RateLimitedLayer<BroadcastLayer>` — the configuration produced by
429    /// `init` when `broadcast_rate_limit` is `Some` — suppresses repeated identical log
430    /// events within the rate-limit window and emits a summary once the window expires.
431    ///
432    /// All `info!` calls share the same call site (they are inside the loop), so the
433    /// `RateLimitedLayer` treats them as one event for suppression purposes.
434    ///
435    /// The test uses `tracing::subscriber::with_default` rather than `init` to avoid
436    /// touching the process-global subscriber, and `spawn_blocking` so that the
437    /// synchronous `std::thread::sleep` — needed because `tracing-limit` uses
438    /// `std::time::Instant` when compiled as a library dependency — does not block the
439    /// async executor.
440    #[tokio::test]
441    #[serial]
442    async fn broadcast_rate_limits_repeated_messages() {
443        let trace_sub = TraceSubscription::subscribe();
444        // Disable early buffering so events flow directly to the broadcast channel
445        // rather than being held in the startup buffer.
446        stop_early_buffering();
447
448        let filter =
449            tracing_subscriber::filter::Targets::from_str("info").expect("valid filter string");
450        let subscriber = tracing_subscriber::registry().with(
451            RateLimitedLayer::new(BroadcastLayer::new())
452                .with_default_limit(1) // 1-second window
453                .with_filter(filter),
454        );
455
456        tokio::task::spawn_blocking(move || {
457            tracing::subscriber::with_default(subscriber, || {
458                for i in 0..4u32 {
459                    if i == 3 {
460                        // Wait for the 1-second window to expire so that the 4th call
461                        // triggers a summary message before being emitted normally.
462                        std::thread::sleep(Duration::from_millis(1100));
463                    }
464                    info!("Rate limited broadcast message.");
465                }
466            });
467        })
468        .await
469        .expect("blocking task panicked");
470
471        // All sends happened synchronously inside spawn_blocking above, so items are
472        // already in the broadcast ring buffer. We drain the stream until we have the
473        // 4 expected matching messages, with a generous timeout to guard against
474        // unexpected delays on heavily loaded machines.
475        //
476        // Limitation: the "suppressed N times" summary fires on the *next* arriving
477        // event after the window expires, not at window expiry itself.
478        const EXPECTED: usize = 4;
479        let mut stream = trace_sub.into_stream();
480        let messages: Vec<String> = tokio::time::timeout(Duration::from_secs(5), async {
481            let mut collected = Vec::with_capacity(EXPECTED);
482            loop {
483                let event = stream
484                    .next()
485                    .await
486                    .expect("broadcast stream ended unexpectedly");
487                if let Some(msg) = event.get("message") {
488                    let msg = msg.to_string_lossy().into_owned();
489                    if msg.contains("Rate limited broadcast message") {
490                        collected.push(msg);
491                        if collected.len() == EXPECTED {
492                            break;
493                        }
494                    }
495                }
496            }
497            collected
498        })
499        .await
500        .expect("timed out waiting for rate-limited broadcast messages");
501
502        // Expected sequence:
503        //   [0] First occurrence  → emitted normally.
504        //   [1] Second occurrence → suppression warning emitted, original dropped.
505        //   (Third occurrence is silently dropped; nothing appears in broadcast.)
506        //   [2] Fourth occurrence (after window) → summary "suppressed 2 times" emitted.
507        //   [3] Fourth occurrence → emitted normally after the window resets.
508        assert_eq!(messages[0], "Rate limited broadcast message.");
509        assert!(
510            messages[1].contains("is being suppressed to avoid flooding."),
511            "expected suppression warning, got: {}",
512            messages[1]
513        );
514        assert!(
515            messages[2].contains("has been suppressed 2 times."),
516            "expected summary, got: {}",
517            messages[2]
518        );
519        assert_eq!(messages[3], "Rate limited broadcast message.");
520    }
521}