vector_core/source_sender/
mod.rs1#![allow(
2 missing_docs,
3 clippy::missing_errors_doc,
4 clippy::doc_markdown,
5 clippy::missing_panics_doc
6)]
7
8mod builder;
9mod errors;
10mod output;
11mod sender;
12#[cfg(test)]
13mod tests;
14
15pub use builder::Builder;
16pub use errors::SendError;
17use output::{Output, OutputMetrics};
18pub use sender::{SourceSender, SourceSenderItem};
19
20use std::sync::atomic::{AtomicUsize, Ordering};
21
22pub const DEFAULT_CHUNK_SIZE_EVENTS: usize = 1000;
25
26static CHUNK_SIZE_EVENTS: AtomicUsize = AtomicUsize::new(0);
27
28#[must_use]
31pub fn chunk_size_events() -> usize {
32 match CHUNK_SIZE_EVENTS.load(Ordering::Relaxed) {
33 0 => DEFAULT_CHUNK_SIZE_EVENTS,
34 size => size,
35 }
36}
37
38pub fn set_chunk_size_events(size: usize) {
41 CHUNK_SIZE_EVENTS
42 .compare_exchange(0, size, Ordering::Acquire, Ordering::Relaxed)
43 .unwrap_or_else(|_| panic!("double chunk_size_events initialization"));
44}
45
46#[cfg(any(test, feature = "test"))]
47const TEST_BUFFER_SIZE: usize = 100;
48
49use vector_common::internal_event::HistogramName;
50
51const LAG_TIME_NAME: HistogramName = HistogramName::SourceLagTimeSeconds;
52const SEND_LATENCY_NAME: HistogramName = HistogramName::SourceSendLatencySeconds;
53const SEND_BATCH_LATENCY_NAME: HistogramName = HistogramName::SourceSendBatchLatencySeconds;
54
55pub trait PostProcessor: Send + Sync {
64 fn process_log(&self, _event: &mut crate::event::LogEvent) {}
66 fn process_metric(&self, _event: &mut crate::event::Metric) {}
68 fn process_trace(&self, _event: &mut crate::event::TraceEvent) {}
70
71 fn process(&self, event: &mut crate::event::EventMutRef<'_>) {
77 match event {
78 crate::event::EventMutRef::Log(log) => self.process_log(log),
79 crate::event::EventMutRef::Metric(metric) => self.process_metric(metric),
80 crate::event::EventMutRef::Trace(trace) => self.process_trace(trace),
81 }
82 }
83}