vector_core/source_sender/
builder.rs1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use vector_buffers::topology::channel::LimitedReceiver;
4use vector_common::histogram;
5use vector_common::internal_event::DEFAULT_OUTPUT;
6
7use super::{
8 LAG_TIME_NAME, Output, OutputMetrics, PostProcessor, SEND_BATCH_LATENCY_NAME,
9 SEND_LATENCY_NAME, SourceSender, SourceSenderItem, chunk_size_events,
10};
11use crate::config::{ComponentKey, OutputId, SourceOutput};
12
13pub struct Builder {
14 buf_size: usize,
15 default_output: Option<Output>,
16 named_outputs: HashMap<String, Output>,
17 output_metrics: OutputMetrics,
18 timeout: Option<Duration>,
19 ewma_half_life_seconds: Option<f64>,
20 post_processor: Option<Arc<dyn PostProcessor>>,
21}
22
23impl Default for Builder {
24 fn default() -> Self {
25 Self {
26 buf_size: chunk_size_events(),
27 default_output: None,
28 named_outputs: Default::default(),
29 output_metrics: OutputMetrics::new(
30 Some(histogram!(LAG_TIME_NAME)),
31 Some(histogram!(SEND_LATENCY_NAME)),
32 Some(histogram!(SEND_BATCH_LATENCY_NAME)),
33 ),
34 timeout: None,
35 ewma_half_life_seconds: None,
36 post_processor: None,
37 }
38 }
39}
40
41impl Builder {
42 #[must_use]
43 pub fn with_buffer(mut self, n: usize) -> Self {
44 self.buf_size = n;
45 self
46 }
47
48 #[must_use]
49 pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
50 self.timeout = timeout;
51 self
52 }
53
54 #[must_use]
55 pub fn with_ewma_half_life_seconds(mut self, half_life_seconds: Option<f64>) -> Self {
56 self.ewma_half_life_seconds = half_life_seconds;
57 self
58 }
59
60 #[must_use]
68 pub fn with_post_processor(mut self, post_processor: Arc<dyn PostProcessor>) -> Self {
69 if let Some(output) = &mut self.default_output {
71 output.set_post_processor(Arc::clone(&post_processor));
72 }
73 for output in self.named_outputs.values_mut() {
74 output.set_post_processor(Arc::clone(&post_processor));
75 }
76 self.post_processor = Some(post_processor);
77 self
78 }
79
80 pub fn add_source_output(
81 &mut self,
82 output: SourceOutput,
83 component_key: ComponentKey,
84 ) -> LimitedReceiver<SourceSenderItem> {
85 let log_definition = output.schema_definition.clone();
86 let output_id = OutputId {
87 component: component_key,
88 port: output.port.clone(),
89 };
90 match output.port {
91 None => {
92 let (output, rx) = Output::new_with_buffer(
93 self.buf_size,
94 DEFAULT_OUTPUT.to_owned(),
95 self.output_metrics.clone(),
96 log_definition,
97 output_id,
98 self.timeout,
99 self.ewma_half_life_seconds,
100 );
101 let output = match &self.post_processor {
102 Some(pp) => output.with_post_processor(Arc::clone(pp)),
103 None => output,
104 };
105 self.default_output = Some(output);
106 rx
107 }
108 Some(name) => {
109 let (output, rx) = Output::new_with_buffer(
110 self.buf_size,
111 name.clone(),
112 self.output_metrics.clone(),
113 log_definition,
114 output_id,
115 self.timeout,
116 self.ewma_half_life_seconds,
117 );
118 let output = match &self.post_processor {
119 Some(pp) => output.with_post_processor(Arc::clone(pp)),
120 None => output,
121 };
122 self.named_outputs.insert(name, output);
123 rx
124 }
125 }
126 }
127
128 pub fn build(self) -> SourceSender {
129 SourceSender {
130 default_output: self.default_output,
131 named_outputs: self.named_outputs,
132 }
133 }
134}