1#![allow(missing_docs)]
2use std::{
3 collections::{HashMap, HashSet},
4 marker::PhantomData,
5 str::FromStr,
6 sync::{
7 LazyLock, Mutex, MutexGuard, OnceLock,
8 atomic::{AtomicBool, Ordering},
9 },
10};
11
12use futures_util::{Stream, StreamExt, future::ready};
13use metrics_tracing_context::MetricsLayer;
14use tokio::sync::{
15 broadcast::{self, Receiver, Sender},
16 oneshot,
17};
18use tokio_stream::wrappers::BroadcastStream;
19use tracing::{Event, Subscriber};
20use tracing_limit::RateLimitedLayer;
21use tracing_subscriber::{
22 Layer,
23 filter::LevelFilter,
24 layer::{Context, SubscriberExt},
25 registry::LookupSpan,
26 util::SubscriberInitExt,
27};
28pub use tracing_tower::{InstrumentableService, InstrumentedService};
29use vector_lib::SpanField;
30use vector_lib::lookup::event_path;
31use vrl::value::Value;
32
33use crate::event::LogEvent;
34
35static BUFFER: Mutex<Option<Vec<LogEvent>>> = Mutex::new(Some(Vec::new()));
42
43static SHOULD_BUFFER: AtomicBool = AtomicBool::new(true);
46
47static SUBSCRIBERS: Mutex<Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> =
50 Mutex::new(Some(Vec::new()));
51
52static SENDER: OnceLock<Sender<LogEvent>> = OnceLock::new();
55
56fn metrics_layer_enabled() -> bool {
57 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
58}
59
60pub fn init(
61 color: bool,
62 json: bool,
63 levels: &str,
64 internal_log_rate_limit: u64,
65 broadcast_rate_limit: Option<std::num::NonZeroU64>,
66) {
67 let fmt_filter = tracing_subscriber::filter::Targets::from_str(levels).expect(
68 "logging filter targets were not formatted correctly or did not specify a valid level",
69 );
70
71 let metrics_layer =
72 metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO));
73
74 let rate_limited_broadcast = broadcast_rate_limit.map(|rate| {
83 RateLimitedLayer::new(BroadcastLayer::new())
84 .with_default_limit(rate.get())
85 .with_filter(fmt_filter.clone())
86 });
87 let unlimited_broadcast = broadcast_rate_limit
88 .is_none()
89 .then(|| BroadcastLayer::new().with_filter(fmt_filter.clone()));
90 debug_assert_ne!(
91 rate_limited_broadcast.is_some(),
92 unlimited_broadcast.is_some()
93 );
94
95 let subscriber = tracing_subscriber::registry()
96 .with(metrics_layer)
97 .with(rate_limited_broadcast)
98 .with(unlimited_broadcast);
99
100 #[cfg(feature = "tokio-console")]
101 let subscriber = {
102 let console_layer = console_subscriber::ConsoleLayer::builder()
103 .with_default_env()
104 .spawn();
105
106 subscriber.with(console_layer)
107 };
108
109 #[cfg(unix)]
110 let subscriber = {
111 let allocation_layer = crate::internal_telemetry::allocations::AllocationLayer::new()
112 .with_filter(LevelFilter::ERROR);
113
114 subscriber.with(allocation_layer)
115 };
116
117 if json {
118 let formatter = tracing_subscriber::fmt::layer().json().flatten_event(true);
119
120 #[cfg(test)]
121 let formatter = formatter.with_test_writer();
122
123 let rate_limited =
124 RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
125 let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
126
127 _ = subscriber.try_init();
128 } else {
129 let formatter = tracing_subscriber::fmt::layer()
130 .with_ansi(color)
131 .with_writer(std::io::stderr);
132
133 #[cfg(test)]
134 let formatter = formatter.with_test_writer();
135
136 let rate_limited =
137 RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
138 let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
139
140 _ = subscriber.try_init();
141 }
142}
143
144#[cfg(test)]
145pub fn reset_early_buffer() -> Option<Vec<LogEvent>> {
146 get_early_buffer().replace(Vec::new())
147}
148
149fn get_early_buffer() -> MutexGuard<'static, Option<Vec<LogEvent>>> {
151 BUFFER
152 .lock()
153 .expect("Couldn't acquire lock on internal logs buffer")
154}
155
156fn should_process_tracing_event() -> bool {
161 get_early_buffer().is_some() || maybe_get_trace_sender().is_some()
162}
163
164fn try_buffer_event(log: &LogEvent) -> bool {
166 if SHOULD_BUFFER.load(Ordering::Acquire)
167 && let Some(buffer) = get_early_buffer().as_mut()
168 {
169 buffer.push(log.clone());
170 return true;
171 }
172
173 false
174}
175
176fn try_broadcast_event(log: LogEvent) {
180 if let Some(sender) = maybe_get_trace_sender() {
181 _ = sender.send(log);
182 }
183}
184
185fn consume_early_buffer() -> Vec<LogEvent> {
191 get_early_buffer()
192 .take()
193 .expect("early buffer was already consumed")
194}
195
196fn get_trace_sender() -> &'static broadcast::Sender<LogEvent> {
198 SENDER.get_or_init(|| broadcast::channel(99).0)
199}
200
201fn maybe_get_trace_sender() -> Option<&'static broadcast::Sender<LogEvent>> {
205 SENDER.get()
206}
207
208fn get_trace_receiver() -> broadcast::Receiver<LogEvent> {
212 get_trace_sender().subscribe()
213}
214
215fn get_trace_subscriber_list() -> MutexGuard<'static, Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> {
217 SUBSCRIBERS.lock().expect("poisoned locks are dumb")
218}
219
220fn try_register_for_early_events() -> Option<oneshot::Receiver<Vec<LogEvent>>> {
226 if SHOULD_BUFFER.load(Ordering::Acquire) {
227 get_trace_subscriber_list().as_mut().map(|subscribers| {
231 let (tx, rx) = oneshot::channel();
232 subscribers.push(tx);
233 rx
234 })
235 } else {
236 None
238 }
239}
240
241pub fn stop_early_buffering() {
246 if SHOULD_BUFFER
250 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
251 .is_err()
252 {
253 return;
254 }
255
256 let subscribers = get_trace_subscriber_list().take();
259 if let Some(subscribers_tx) = subscribers {
260 let buffered_events = consume_early_buffer();
262 for subscriber_tx in subscribers_tx {
263 _ = subscriber_tx.send(buffered_events.clone());
265 }
266 }
267}
268
269pub struct TraceSubscription {
275 buffered_events_rx: Option<oneshot::Receiver<Vec<LogEvent>>>,
276 trace_rx: Receiver<LogEvent>,
277}
278
279impl TraceSubscription {
280 pub fn subscribe() -> TraceSubscription {
282 let buffered_events_rx = try_register_for_early_events();
283 let trace_rx = get_trace_receiver();
284
285 Self {
286 buffered_events_rx,
287 trace_rx,
288 }
289 }
290
291 pub async fn buffered_events(&mut self) -> Option<Vec<LogEvent>> {
297 match self.buffered_events_rx.take() {
300 Some(rx) => rx.await.ok(),
301 None => None,
302 }
303 }
304
305 pub fn into_stream(self) -> impl Stream<Item = LogEvent> + Unpin {
307 BroadcastStream::new(self.trace_rx).filter_map(|event| ready(event.ok()))
310 }
311}
312
313struct BroadcastLayer<S> {
314 _subscriber: PhantomData<S>,
315}
316
317impl<S> BroadcastLayer<S> {
318 const fn new() -> Self {
319 BroadcastLayer {
320 _subscriber: PhantomData,
321 }
322 }
323}
324
325impl<S> Layer<S> for BroadcastLayer<S>
326where
327 S: Subscriber + 'static + for<'lookup> LookupSpan<'lookup>,
328{
329 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
330 if should_process_tracing_event() {
331 let mut log = LogEvent::from(event);
332 if let Some(parent_span) = ctx.event_span(event) {
334 for span in parent_span.scope().from_root() {
335 if let Some(fields) = span.extensions().get::<SpanFields>() {
336 for (k, v) in &fields.0 {
337 log.insert(event_path!("vector", *k), v.clone());
338 }
339 }
340 }
341 }
342 if !try_buffer_event(&log) {
345 try_broadcast_event(log);
346 }
347 }
348 }
349
350 fn on_new_span(
351 &self,
352 attrs: &tracing_core::span::Attributes<'_>,
353 id: &tracing_core::span::Id,
354 ctx: Context<'_, S>,
355 ) {
356 let span = ctx.span(id).expect("span must already exist!");
357 let mut fields = SpanFields::default();
358 attrs.values().record(&mut fields);
359 span.extensions_mut().insert(fields);
360 }
361}
362
363#[derive(Default, Debug)]
364struct SpanFields(HashMap<&'static str, Value>);
365
366inventory::submit!(SpanField("component_id"));
367inventory::submit!(SpanField("component_type"));
368inventory::submit!(SpanField("component_kind"));
369
370static SPAN_FIELDS: LazyLock<HashSet<&'static str>> =
375 LazyLock::new(|| inventory::iter::<SpanField>().map(|f| f.0).collect());
376
377impl SpanFields {
378 fn record(&mut self, field: &tracing_core::Field, value: impl Into<Value>) {
379 let name = field.name();
380 if SPAN_FIELDS.contains(name) {
389 self.0.insert(name, value.into());
390 }
391 }
392}
393
394impl tracing::field::Visit for SpanFields {
395 fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
396 self.record(field, value);
397 }
398
399 fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
400 self.record(field, value);
401 }
402
403 fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
404 self.record(field, value);
405 }
406
407 fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
408 self.record(field, value);
409 }
410
411 fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) {
412 self.record(field, format!("{value:?}"));
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use std::{str::FromStr, time::Duration};
419
420 use serial_test::serial;
421 use tracing::info;
422 use tracing_subscriber::layer::SubscriberExt;
423
424 use futures::StreamExt as _;
425
426 use super::*;
427
428 #[tokio::test]
441 #[serial]
442 async fn broadcast_rate_limits_repeated_messages() {
443 let trace_sub = TraceSubscription::subscribe();
444 stop_early_buffering();
447
448 let filter =
449 tracing_subscriber::filter::Targets::from_str("info").expect("valid filter string");
450 let subscriber = tracing_subscriber::registry().with(
451 RateLimitedLayer::new(BroadcastLayer::new())
452 .with_default_limit(1) .with_filter(filter),
454 );
455
456 tokio::task::spawn_blocking(move || {
457 tracing::subscriber::with_default(subscriber, || {
458 for i in 0..4u32 {
459 if i == 3 {
460 std::thread::sleep(Duration::from_millis(1100));
463 }
464 info!("Rate limited broadcast message.");
465 }
466 });
467 })
468 .await
469 .expect("blocking task panicked");
470
471 const EXPECTED: usize = 4;
479 let mut stream = trace_sub.into_stream();
480 let messages: Vec<String> = tokio::time::timeout(Duration::from_secs(5), async {
481 let mut collected = Vec::with_capacity(EXPECTED);
482 loop {
483 let event = stream
484 .next()
485 .await
486 .expect("broadcast stream ended unexpectedly");
487 if let Some(msg) = event.get("message") {
488 let msg = msg.to_string_lossy().into_owned();
489 if msg.contains("Rate limited broadcast message") {
490 collected.push(msg);
491 if collected.len() == EXPECTED {
492 break;
493 }
494 }
495 }
496 }
497 collected
498 })
499 .await
500 .expect("timed out waiting for rate-limited broadcast messages");
501
502 assert_eq!(messages[0], "Rate limited broadcast message.");
509 assert!(
510 messages[1].contains("is being suppressed to avoid flooding."),
511 "expected suppression warning, got: {}",
512 messages[1]
513 );
514 assert!(
515 messages[2].contains("has been suppressed 2 times."),
516 "expected summary, got: {}",
517 messages[2]
518 );
519 assert_eq!(messages[3], "Rate limited broadcast message.");
520 }
521}