vector_core/source_sender/
sender.rs1#[cfg(any(test, feature = "test"))]
2use std::time::Duration;
3use std::{collections::HashMap, time::Instant};
4
5use futures::Stream;
6#[cfg(any(test, feature = "test"))]
7use futures::StreamExt as _;
8use vector_buffers::EventCount;
9#[cfg(any(test, feature = "test"))]
10use vector_buffers::topology::channel::LimitedReceiver;
11#[cfg(any(test, feature = "test"))]
12use vector_common::histogram;
13#[cfg(any(test, feature = "test"))]
14use vector_common::internal_event::DEFAULT_OUTPUT;
15#[cfg(doc)]
16use vector_common::internal_event::{ComponentEventsDropped, EventsSent};
17use vector_common::{
18 byte_size_of::ByteSizeOf,
19 finalization::{AddBatchNotifier, BatchNotifier},
20 json_size::JsonSize,
21};
22
23use std::sync::Arc;
24
25use super::{Builder, Output, PostProcessor, SendError};
26#[cfg(any(test, feature = "test"))]
27use super::{
28 LAG_TIME_NAME, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME, TEST_BUFFER_SIZE,
29};
30use crate::{
31 EstimatedJsonEncodedSizeOf,
32 event::{Event, EventArray, EventContainer, array::EventArrayIntoIter},
33};
34#[cfg(any(test, feature = "test"))]
35use crate::{
36 config::OutputId,
37 event::{EventStatus, into_event_stream},
38};
39
40#[derive(Debug)]
47pub struct SourceSenderItem {
48 pub events: EventArray,
50 pub send_reference: Instant,
52}
53
54impl AddBatchNotifier for SourceSenderItem {
55 fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
56 self.events.add_batch_notifier(notifier);
57 }
58}
59
60impl ByteSizeOf for SourceSenderItem {
61 fn allocated_bytes(&self) -> usize {
62 self.events.allocated_bytes()
63 }
64}
65
66impl EventCount for SourceSenderItem {
67 fn event_count(&self) -> usize {
68 self.events.event_count()
69 }
70}
71
72impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
73 fn estimated_json_encoded_size_of(&self) -> JsonSize {
74 self.events.estimated_json_encoded_size_of()
75 }
76}
77
78impl EventContainer for SourceSenderItem {
79 type IntoIter = EventArrayIntoIter;
80
81 fn len(&self) -> usize {
82 self.events.len()
83 }
84
85 fn into_events(self) -> Self::IntoIter {
86 self.events.into_events()
87 }
88}
89
90impl From<SourceSenderItem> for EventArray {
91 fn from(val: SourceSenderItem) -> Self {
92 val.events
93 }
94}
95
96#[derive(Debug, Clone)]
97pub struct SourceSender {
98 pub(super) default_output: Option<Output>,
101 pub(super) named_outputs: HashMap<String, Output>,
102}
103
104impl SourceSender {
105 pub fn builder() -> Builder {
106 Builder::default()
107 }
108
109 pub fn set_post_processor(&mut self, pp: &Arc<dyn PostProcessor>) {
112 if let Some(output) = &mut self.default_output {
113 output.set_post_processor(Arc::clone(pp));
114 }
115 for output in self.named_outputs.values_mut() {
116 output.set_post_processor(Arc::clone(pp));
117 }
118 }
119
120 #[cfg(any(test, feature = "test"))]
121 pub fn new_test_sender_with_options(
122 n: usize,
123 timeout: Option<Duration>,
124 ) -> (Self, LimitedReceiver<SourceSenderItem>) {
125 let lag_time = Some(histogram!(LAG_TIME_NAME));
126 let send_latency = Some(histogram!(SEND_LATENCY_NAME));
127 let send_batch_latency = Some(histogram!(SEND_BATCH_LATENCY_NAME));
128 let output_id = OutputId {
129 component: "test".to_string().into(),
130 port: None,
131 };
132 let (default_output, rx) = Output::new_with_buffer(
133 n,
134 DEFAULT_OUTPUT.to_owned(),
135 OutputMetrics::new(lag_time, send_latency, send_batch_latency),
136 None,
137 output_id,
138 timeout,
139 None,
140 );
141 (
142 Self {
143 default_output: Some(default_output),
144 named_outputs: Default::default(),
145 },
146 rx,
147 )
148 }
149
150 #[cfg(any(test, feature = "test"))]
151 pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
152 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
153 let recv = recv.into_stream().flat_map(into_event_stream);
154 (pipe, recv)
155 }
156
157 #[cfg(any(test, feature = "test"))]
158 pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
159 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
160 let recv = recv.into_stream().flat_map(move |mut item| {
164 item.events.iter_events_mut().for_each(|mut event| {
165 let metadata = event.metadata_mut();
166 metadata.update_status(status);
167 metadata.update_sources();
168 });
169 into_event_stream(item)
170 });
171 (pipe, recv)
172 }
173
174 #[cfg(any(test, feature = "test"))]
175 pub fn new_test_errors(
176 error_at: impl Fn(usize) -> bool,
177 ) -> (Self, impl Stream<Item = Event> + Unpin) {
178 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
179 let mut count: usize = 0;
183 let recv = recv.into_stream().flat_map(move |mut item| {
184 let status = if error_at(count) {
185 EventStatus::Errored
186 } else {
187 EventStatus::Delivered
188 };
189 count += 1;
190 item.events.iter_events_mut().for_each(|mut event| {
191 let metadata = event.metadata_mut();
192 metadata.update_status(status);
193 metadata.update_sources();
194 });
195 into_event_stream(item)
196 });
197 (pipe, recv)
198 }
199
200 #[cfg(any(test, feature = "test"))]
201 pub fn add_outputs(
202 &mut self,
203 status: EventStatus,
204 name: String,
205 ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
206 let output_id = OutputId {
209 component: "test".to_string().into(),
210 port: Some(name.clone()),
211 };
212 let (output, recv) = Output::new_with_buffer(
213 100,
214 name.clone(),
215 OutputMetrics::default(),
216 None,
217 output_id,
218 None,
219 None,
220 );
221 let recv = recv.into_stream().map(move |mut item| {
222 item.events.iter_events_mut().for_each(|mut event| {
223 let metadata = event.metadata_mut();
224 metadata.update_status(status);
225 metadata.update_sources();
226 });
227 item
228 });
229 self.named_outputs.insert(name, output);
230 recv
231 }
232
233 const fn default_output_mut(&mut self) -> &mut Output {
235 self.default_output.as_mut().expect("no default output")
236 }
237
238 pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), SendError> {
242 self.default_output_mut().send_event(event).await
243 }
244
245 pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), SendError>
249 where
250 S: Stream<Item = E> + Unpin,
251 E: Into<Event> + ByteSizeOf,
252 {
253 self.default_output_mut().send_event_stream(events).await
254 }
255
256 pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), SendError>
260 where
261 E: Into<Event> + ByteSizeOf,
262 I: IntoIterator<Item = E>,
263 <I as IntoIterator>::IntoIter: ExactSizeIterator,
264 {
265 self.default_output_mut().send_batch(events).await
266 }
267
268 pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), SendError>
272 where
273 E: Into<Event> + ByteSizeOf,
274 I: IntoIterator<Item = E>,
275 <I as IntoIterator>::IntoIter: ExactSizeIterator,
276 {
277 self.named_outputs
278 .get_mut(name)
279 .expect("unknown output")
280 .send_batch(events)
281 .await
282 }
283}