Skip to main content

vector_core/source_sender/
builder.rs

1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use vector_buffers::topology::channel::LimitedReceiver;
4use vector_common::histogram;
5use vector_common::internal_event::DEFAULT_OUTPUT;
6
7use super::{
8    LAG_TIME_NAME, Output, OutputMetrics, PostProcessor, SEND_BATCH_LATENCY_NAME,
9    SEND_LATENCY_NAME, SourceSender, SourceSenderItem, chunk_size_events,
10};
11use crate::config::{ComponentKey, OutputId, SourceOutput};
12
13pub struct Builder {
14    buf_size: usize,
15    default_output: Option<Output>,
16    named_outputs: HashMap<String, Output>,
17    output_metrics: OutputMetrics,
18    timeout: Option<Duration>,
19    ewma_half_life_seconds: Option<f64>,
20    post_processor: Option<Arc<dyn PostProcessor>>,
21}
22
23impl Default for Builder {
24    fn default() -> Self {
25        Self {
26            buf_size: chunk_size_events(),
27            default_output: None,
28            named_outputs: Default::default(),
29            output_metrics: OutputMetrics::new(
30                Some(histogram!(LAG_TIME_NAME)),
31                Some(histogram!(SEND_LATENCY_NAME)),
32                Some(histogram!(SEND_BATCH_LATENCY_NAME)),
33            ),
34            timeout: None,
35            ewma_half_life_seconds: None,
36            post_processor: None,
37        }
38    }
39}
40
41impl Builder {
42    #[must_use]
43    pub fn with_buffer(mut self, n: usize) -> Self {
44        self.buf_size = n;
45        self
46    }
47
48    #[must_use]
49    pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
50        self.timeout = timeout;
51        self
52    }
53
54    #[must_use]
55    pub fn with_ewma_half_life_seconds(mut self, half_life_seconds: Option<f64>) -> Self {
56        self.ewma_half_life_seconds = half_life_seconds;
57        self
58    }
59
60    /// Attach a post-processing step that will be applied to every event on **all** outputs
61    /// (default and named ports) produced by this builder.
62    ///
63    /// The processor runs before schema metadata is attached to each event, immediately
64    /// before the event is placed on the output channel. See [`PostProcessor`] for the trait
65    /// definition and its contract.
66    ///
67    #[must_use]
68    pub fn with_post_processor(mut self, post_processor: Arc<dyn PostProcessor>) -> Self {
69        // Retroactively apply to any outputs already created so that call order does not matter.
70        if let Some(output) = &mut self.default_output {
71            output.set_post_processor(Arc::clone(&post_processor));
72        }
73        for output in self.named_outputs.values_mut() {
74            output.set_post_processor(Arc::clone(&post_processor));
75        }
76        self.post_processor = Some(post_processor);
77        self
78    }
79
80    pub fn add_source_output(
81        &mut self,
82        output: SourceOutput,
83        component_key: ComponentKey,
84    ) -> LimitedReceiver<SourceSenderItem> {
85        let log_definition = output.schema_definition.clone();
86        let output_id = OutputId {
87            component: component_key,
88            port: output.port.clone(),
89        };
90        match output.port {
91            None => {
92                let (output, rx) = Output::new_with_buffer(
93                    self.buf_size,
94                    DEFAULT_OUTPUT.to_owned(),
95                    self.output_metrics.clone(),
96                    log_definition,
97                    output_id,
98                    self.timeout,
99                    self.ewma_half_life_seconds,
100                );
101                let output = match &self.post_processor {
102                    Some(pp) => output.with_post_processor(Arc::clone(pp)),
103                    None => output,
104                };
105                self.default_output = Some(output);
106                rx
107            }
108            Some(name) => {
109                let (output, rx) = Output::new_with_buffer(
110                    self.buf_size,
111                    name.clone(),
112                    self.output_metrics.clone(),
113                    log_definition,
114                    output_id,
115                    self.timeout,
116                    self.ewma_half_life_seconds,
117                );
118                let output = match &self.post_processor {
119                    Some(pp) => output.with_post_processor(Arc::clone(pp)),
120                    None => output,
121                };
122                self.named_outputs.insert(name, output);
123                rx
124            }
125        }
126    }
127
128    pub fn build(self) -> SourceSender {
129        SourceSender {
130            default_output: self.default_output,
131            named_outputs: self.named_outputs,
132        }
133    }
134}