Skip to main content

vector_core/source_sender/
mod.rs

1#![allow(
2    missing_docs,
3    clippy::missing_errors_doc,
4    clippy::doc_markdown,
5    clippy::missing_panics_doc
6)]
7
8mod builder;
9mod errors;
10mod output;
11mod sender;
12#[cfg(test)]
13mod tests;
14
15pub use builder::Builder;
16pub use errors::SendError;
17use output::{Output, OutputMetrics};
18pub use sender::{SourceSender, SourceSenderItem};
19
20use std::sync::atomic::{AtomicUsize, Ordering};
21
22/// Default number of events batched per source send, and the base used for source output buffer
23/// sizing. Used when the chunk size has not been configured at startup.
24pub const DEFAULT_CHUNK_SIZE_EVENTS: usize = 1000;
25
26static CHUNK_SIZE_EVENTS: AtomicUsize = AtomicUsize::new(0);
27
28/// Returns the configured source sender chunk size in events, or [`DEFAULT_CHUNK_SIZE_EVENTS`] if
29/// unset.
30#[must_use]
31pub fn chunk_size_events() -> usize {
32    match CHUNK_SIZE_EVENTS.load(Ordering::Relaxed) {
33        0 => DEFAULT_CHUNK_SIZE_EVENTS,
34        size => size,
35    }
36}
37
38/// Sets the process-wide source sender chunk size in events. Must be called at most once, before
39/// the topology is built. Panics if called more than once.
40pub fn set_chunk_size_events(size: usize) {
41    CHUNK_SIZE_EVENTS
42        .compare_exchange(0, size, Ordering::Acquire, Ordering::Relaxed)
43        .unwrap_or_else(|_| panic!("double chunk_size_events initialization"));
44}
45
46#[cfg(any(test, feature = "test"))]
47const TEST_BUFFER_SIZE: usize = 100;
48
49use vector_common::internal_event::HistogramName;
50
51const LAG_TIME_NAME: HistogramName = HistogramName::SourceLagTimeSeconds;
52const SEND_LATENCY_NAME: HistogramName = HistogramName::SourceSendLatencySeconds;
53const SEND_BATCH_LATENCY_NAME: HistogramName = HistogramName::SourceSendBatchLatencySeconds;
54
55/// A post-processing step applied to every event that flows through a [`SourceSender`].
56///
57/// Implement this trait to mutate events just before they are placed on the output channel.
58/// Because each method receives a typed reference, it is impossible at the type level to
59/// accidentally change an event's variant.
60///
61/// It is applied *globally* — to all outputs (default and named ports) produced by the same
62/// [`Builder`].
63pub trait PostProcessor: Send + Sync {
64    /// Called once for every [`crate::event::LogEvent`] in a batch.
65    fn process_log(&self, _event: &mut crate::event::LogEvent) {}
66    /// Called once for every [`crate::event::Metric`] in a batch.
67    fn process_metric(&self, _event: &mut crate::event::Metric) {}
68    /// Called once for every [`crate::event::TraceEvent`] in a batch.
69    fn process_trace(&self, _event: &mut crate::event::TraceEvent) {}
70
71    /// Dispatches a single event to the appropriate typed method.
72    ///
73    /// Override [`process_log`](Self::process_log), [`process_metric`](Self::process_metric), or
74    /// [`process_trace`](Self::process_trace) rather than this method unless you need to handle
75    /// all variants uniformly.
76    fn process(&self, event: &mut crate::event::EventMutRef<'_>) {
77        match event {
78            crate::event::EventMutRef::Log(log) => self.process_log(log),
79            crate::event::EventMutRef::Metric(metric) => self.process_metric(metric),
80            crate::event::EventMutRef::Trace(trace) => self.process_trace(trace),
81        }
82    }
83}