Skip to main content

vector_core/source_sender/
sender.rs

1#[cfg(any(test, feature = "test"))]
2use std::time::Duration;
3use std::{collections::HashMap, time::Instant};
4
5use futures::Stream;
6#[cfg(any(test, feature = "test"))]
7use futures::StreamExt as _;
8use vector_buffers::EventCount;
9#[cfg(any(test, feature = "test"))]
10use vector_buffers::topology::channel::LimitedReceiver;
11#[cfg(any(test, feature = "test"))]
12use vector_common::histogram;
13#[cfg(any(test, feature = "test"))]
14use vector_common::internal_event::DEFAULT_OUTPUT;
15#[cfg(doc)]
16use vector_common::internal_event::{ComponentEventsDropped, EventsSent};
17use vector_common::{
18    byte_size_of::ByteSizeOf,
19    finalization::{AddBatchNotifier, BatchNotifier},
20    json_size::JsonSize,
21};
22
23use std::sync::Arc;
24
25use super::{Builder, Output, PostProcessor, SendError};
26#[cfg(any(test, feature = "test"))]
27use super::{
28    LAG_TIME_NAME, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME, TEST_BUFFER_SIZE,
29};
30use crate::{
31    EstimatedJsonEncodedSizeOf,
32    event::{Event, EventArray, EventContainer, array::EventArrayIntoIter},
33};
34#[cfg(any(test, feature = "test"))]
35use crate::{
36    config::OutputId,
37    event::{EventStatus, into_event_stream},
38};
39
40/// SourceSenderItem is a thin wrapper around [EventArray] used to track the send duration of a batch.
41///
42/// This is needed because the send duration is calculated as the difference between when the batch
43/// is sent from the origin component to when the batch is enqueued on the receiving component's input buffer.
44/// For sources in particular, this requires the batch to be enqueued on two channels: the origin component's pump
45/// channel and then the receiving component's input buffer.
46#[derive(Debug)]
47pub struct SourceSenderItem {
48    /// The batch of events to send.
49    pub events: EventArray,
50    /// Reference instant used to calculate send duration.
51    pub send_reference: Instant,
52}
53
54impl AddBatchNotifier for SourceSenderItem {
55    fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
56        self.events.add_batch_notifier(notifier);
57    }
58}
59
60impl ByteSizeOf for SourceSenderItem {
61    fn allocated_bytes(&self) -> usize {
62        self.events.allocated_bytes()
63    }
64}
65
66impl EventCount for SourceSenderItem {
67    fn event_count(&self) -> usize {
68        self.events.event_count()
69    }
70}
71
72impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
73    fn estimated_json_encoded_size_of(&self) -> JsonSize {
74        self.events.estimated_json_encoded_size_of()
75    }
76}
77
78impl EventContainer for SourceSenderItem {
79    type IntoIter = EventArrayIntoIter;
80
81    fn len(&self) -> usize {
82        self.events.len()
83    }
84
85    fn into_events(self) -> Self::IntoIter {
86        self.events.into_events()
87    }
88}
89
90impl From<SourceSenderItem> for EventArray {
91    fn from(val: SourceSenderItem) -> Self {
92        val.events
93    }
94}
95
96#[derive(Debug, Clone)]
97pub struct SourceSender {
98    // The default output is optional because some sources, e.g. `datadog_agent`
99    // and `opentelemetry`, can be configured to only output to named outputs.
100    pub(super) default_output: Option<Output>,
101    pub(super) named_outputs: HashMap<String, Output>,
102}
103
104impl SourceSender {
105    pub fn builder() -> Builder {
106        Builder::default()
107    }
108
109    /// Attach a post-processing step to every output on this sender, replacing any previously set
110    /// one.
111    pub fn set_post_processor(&mut self, pp: &Arc<dyn PostProcessor>) {
112        if let Some(output) = &mut self.default_output {
113            output.set_post_processor(Arc::clone(pp));
114        }
115        for output in self.named_outputs.values_mut() {
116            output.set_post_processor(Arc::clone(pp));
117        }
118    }
119
120    #[cfg(any(test, feature = "test"))]
121    pub fn new_test_sender_with_options(
122        n: usize,
123        timeout: Option<Duration>,
124    ) -> (Self, LimitedReceiver<SourceSenderItem>) {
125        let lag_time = Some(histogram!(LAG_TIME_NAME));
126        let send_latency = Some(histogram!(SEND_LATENCY_NAME));
127        let send_batch_latency = Some(histogram!(SEND_BATCH_LATENCY_NAME));
128        let output_id = OutputId {
129            component: "test".to_string().into(),
130            port: None,
131        };
132        let (default_output, rx) = Output::new_with_buffer(
133            n,
134            DEFAULT_OUTPUT.to_owned(),
135            OutputMetrics::new(lag_time, send_latency, send_batch_latency),
136            None,
137            output_id,
138            timeout,
139            None,
140        );
141        (
142            Self {
143                default_output: Some(default_output),
144                named_outputs: Default::default(),
145            },
146            rx,
147        )
148    }
149
150    #[cfg(any(test, feature = "test"))]
151    pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
152        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
153        let recv = recv.into_stream().flat_map(into_event_stream);
154        (pipe, recv)
155    }
156
157    #[cfg(any(test, feature = "test"))]
158    pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
159        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
160        // In a source test pipeline, there is no sink to acknowledge
161        // events, so we have to add a map to the receiver to handle the
162        // finalization.
163        let recv = recv.into_stream().flat_map(move |mut item| {
164            item.events.iter_events_mut().for_each(|mut event| {
165                let metadata = event.metadata_mut();
166                metadata.update_status(status);
167                metadata.update_sources();
168            });
169            into_event_stream(item)
170        });
171        (pipe, recv)
172    }
173
174    #[cfg(any(test, feature = "test"))]
175    pub fn new_test_errors(
176        error_at: impl Fn(usize) -> bool,
177    ) -> (Self, impl Stream<Item = Event> + Unpin) {
178        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
179        // In a source test pipeline, there is no sink to acknowledge
180        // events, so we have to add a map to the receiver to handle the
181        // finalization.
182        let mut count: usize = 0;
183        let recv = recv.into_stream().flat_map(move |mut item| {
184            let status = if error_at(count) {
185                EventStatus::Errored
186            } else {
187                EventStatus::Delivered
188            };
189            count += 1;
190            item.events.iter_events_mut().for_each(|mut event| {
191                let metadata = event.metadata_mut();
192                metadata.update_status(status);
193                metadata.update_sources();
194            });
195            into_event_stream(item)
196        });
197        (pipe, recv)
198    }
199
200    #[cfg(any(test, feature = "test"))]
201    pub fn add_outputs(
202        &mut self,
203        status: EventStatus,
204        name: String,
205    ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
206        // The lag_time parameter here will need to be filled in if this function is ever used for
207        // non-test situations.
208        let output_id = OutputId {
209            component: "test".to_string().into(),
210            port: Some(name.clone()),
211        };
212        let (output, recv) = Output::new_with_buffer(
213            100,
214            name.clone(),
215            OutputMetrics::default(),
216            None,
217            output_id,
218            None,
219            None,
220        );
221        let recv = recv.into_stream().map(move |mut item| {
222            item.events.iter_events_mut().for_each(|mut event| {
223                let metadata = event.metadata_mut();
224                metadata.update_status(status);
225                metadata.update_sources();
226            });
227            item
228        });
229        self.named_outputs.insert(name, output);
230        recv
231    }
232
233    /// Get a mutable reference to the default output, panicking if none exists.
234    const fn default_output_mut(&mut self) -> &mut Output {
235        self.default_output.as_mut().expect("no default output")
236    }
237
238    /// Send an event to the default output.
239    ///
240    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
241    pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), SendError> {
242        self.default_output_mut().send_event(event).await
243    }
244
245    /// Send a stream of events to the default output.
246    ///
247    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
248    pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), SendError>
249    where
250        S: Stream<Item = E> + Unpin,
251        E: Into<Event> + ByteSizeOf,
252    {
253        self.default_output_mut().send_event_stream(events).await
254    }
255
256    /// Send a batch of events to the default output.
257    ///
258    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
259    pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), SendError>
260    where
261        E: Into<Event> + ByteSizeOf,
262        I: IntoIterator<Item = E>,
263        <I as IntoIterator>::IntoIter: ExactSizeIterator,
264    {
265        self.default_output_mut().send_batch(events).await
266    }
267
268    /// Send a batch of events event to a named output.
269    ///
270    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
271    pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), SendError>
272    where
273        E: Into<Event> + ByteSizeOf,
274        I: IntoIterator<Item = E>,
275        <I as IntoIterator>::IntoIter: ExactSizeIterator,
276    {
277        self.named_outputs
278            .get_mut(name)
279            .expect("unknown output")
280            .send_batch(events)
281            .await
282    }
283}