Skip to main content

vector/config/unit_test/
unit_test_components.rs

1// Derivative's Debug impl generates `let _ = field.fmt(f)` which triggers this lint.
2#![allow(clippy::let_underscore_must_use)]
3
4use std::sync::Arc;
5
6use futures::{Sink, Stream, stream};
7use futures_util::{FutureExt, StreamExt, future, stream::BoxStream};
8use tokio::sync::{Mutex, oneshot};
9use vector_lib::{
10    config::{DataType, Input, LogNamespace},
11    configurable::configurable_component,
12    event::Event,
13    schema,
14    sink::{StreamSink, VectorSink},
15};
16
17use crate::{
18    conditions::Condition,
19    config::{
20        AcknowledgementsConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
21    },
22    sinks::Healthcheck,
23    sources,
24};
25
26/// Configuration for the `unit_test` source.
27#[configurable_component(source("unit_test", "Unit test."))]
28#[derive(Clone, Debug, Default)]
29pub struct UnitTestSourceConfig {
30    /// List of events sent from this source as part of the test.
31    #[serde(skip)]
32    pub events: Vec<Event>,
33}
34
35impl_generate_config_from_default!(UnitTestSourceConfig);
36
37#[async_trait::async_trait]
38#[typetag::serde(name = "unit_test")]
39impl SourceConfig for UnitTestSourceConfig {
40    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
41        let events = self.events.clone().into_iter();
42
43        Ok(Box::pin(async move {
44            let mut out = cx.out;
45            let _shutdown = cx.shutdown;
46            out.send_batch(events).await.map_err(|_| ())?;
47            Ok(())
48        }))
49    }
50
51    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
52        vec![SourceOutput::new_maybe_logs(
53            DataType::all_bits(),
54            schema::Definition::default_legacy_namespace(),
55        )]
56    }
57
58    fn can_acknowledge(&self) -> bool {
59        false
60    }
61}
62
63/// Configuration for the `unit_test_stream` source.
64#[configurable_component(source("unit_test_stream", "Unit test stream."))]
65#[derive(Clone)]
66pub struct UnitTestStreamSourceConfig {
67    #[serde(skip)]
68    stream: Arc<Mutex<Option<stream::BoxStream<'static, Event>>>>,
69}
70
71impl_generate_config_from_default!(UnitTestStreamSourceConfig);
72
73impl UnitTestStreamSourceConfig {
74    pub fn new(stream: impl Stream<Item = Event> + Send + 'static) -> Self {
75        Self {
76            stream: Arc::new(Mutex::new(Some(stream.boxed()))),
77        }
78    }
79}
80
81impl Default for UnitTestStreamSourceConfig {
82    fn default() -> Self {
83        Self::new(stream::empty().boxed())
84    }
85}
86
87impl std::fmt::Debug for UnitTestStreamSourceConfig {
88    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        formatter
90            .debug_struct("UnitTestStreamSourceConfig")
91            .finish()
92    }
93}
94
95#[async_trait::async_trait]
96#[typetag::serde(name = "unit_test_stream")]
97impl SourceConfig for UnitTestStreamSourceConfig {
98    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
99        let stream = self.stream.lock().await.take().unwrap();
100        Ok(Box::pin(async move {
101            let mut out = cx.out;
102            let _shutdown = cx.shutdown;
103            out.send_event_stream(stream).await.map_err(|_| ())?;
104            Ok(())
105        }))
106    }
107
108    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
109        vec![SourceOutput::new_maybe_logs(
110            DataType::all_bits(),
111            schema::Definition::default_legacy_namespace(),
112        )]
113    }
114
115    fn can_acknowledge(&self) -> bool {
116        false
117    }
118}
119
120#[derive(Clone, Default)]
121pub enum UnitTestSinkCheck {
122    /// Check all events that are received against the list of conditions.
123    Checks {
124        conditions: Vec<Vec<Condition>>,
125        expected_event_count: Option<usize>,
126    },
127
128    /// Check that no events were received.
129    NoOutputs,
130
131    /// Do nothing.
132    #[default]
133    NoOp,
134}
135
136#[derive(Debug)]
137pub struct UnitTestSinkResult {
138    pub test_name: String,
139    pub test_errors: Vec<String>,
140}
141
142/// Configuration for the `unit_test` sink.
143#[configurable_component(sink("unit_test", "Unit test."))]
144#[derive(Clone, Default, Derivative)]
145#[derivative(Debug)]
146pub struct UnitTestSinkConfig {
147    /// Name of the test that this sink is being used for.
148    pub test_name: String,
149
150    /// List of names of the transform/branch associated with this sink.
151    pub transform_ids: Vec<String>,
152
153    /// Sender side of the test result channel.
154    #[serde(skip)]
155    pub result_tx: Arc<Mutex<Option<oneshot::Sender<UnitTestSinkResult>>>>,
156
157    /// Predicate applied to each event that reaches the sink.
158    #[serde(skip)]
159    #[derivative(Debug = "ignore")]
160    pub check: UnitTestSinkCheck,
161}
162
163impl_generate_config_from_default!(UnitTestSinkConfig);
164
165#[async_trait::async_trait]
166#[typetag::serde(name = "unit_test")]
167impl SinkConfig for UnitTestSinkConfig {
168    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
169        let tx = self.result_tx.lock().await.take();
170        let sink = UnitTestSink {
171            test_name: self.test_name.clone(),
172            transform_ids: self.transform_ids.clone(),
173            result_tx: tx,
174            check: self.check.clone(),
175        };
176        let healthcheck = future::ok(()).boxed();
177
178        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
179    }
180
181    fn input(&self) -> Input {
182        Input::all()
183    }
184
185    fn acknowledgements(&self) -> &AcknowledgementsConfig {
186        &AcknowledgementsConfig::DEFAULT
187    }
188}
189
190pub struct UnitTestSink {
191    pub test_name: String,
192    pub transform_ids: Vec<String>,
193    // None for NoOp test sinks
194    pub result_tx: Option<oneshot::Sender<UnitTestSinkResult>>,
195    pub check: UnitTestSinkCheck,
196}
197
198#[async_trait::async_trait]
199impl StreamSink<Event> for UnitTestSink {
200    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
201        let mut output_events = Vec::new();
202        let mut result = UnitTestSinkResult {
203            test_name: self.test_name,
204            test_errors: Vec::new(),
205        };
206
207        while let Some(event) = input.next().await {
208            output_events.push(event);
209        }
210
211        match self.check {
212            UnitTestSinkCheck::Checks {
213                conditions: checks,
214                expected_event_count,
215            } => {
216                if let Some(expected) = expected_event_count {
217                    let actual = output_events.len();
218                    if actual != expected {
219                        result.test_errors.push(format!(
220                            "expected {} events from transforms {:?}, but received {}",
221                            expected, self.transform_ids, actual
222                        ));
223                    }
224                }
225
226                if output_events.is_empty() && expected_event_count != Some(0) {
227                    result
228                        .test_errors
229                        .push(format!("checks for transforms {:?} failed: no events received. Topology may be disconnected or transform is missing inputs.", self.transform_ids));
230                } else {
231                    for (i, check) in checks.iter().enumerate() {
232                        let mut check_errors = Vec::new();
233                        for (j, condition) in check.iter().enumerate() {
234                            let mut condition_errors = Vec::new();
235                            for event in output_events.iter() {
236                                match condition.check_with_context(event.clone()).0 {
237                                    Ok(_) => {
238                                        condition_errors.clear();
239                                        break;
240                                    }
241                                    Err(error) => {
242                                        condition_errors.push(format!("  condition[{j}]: {error}"));
243                                    }
244                                }
245                            }
246                            check_errors.extend(condition_errors);
247                        }
248                        // If there are errors, add a preamble to the output
249                        if !check_errors.is_empty() {
250                            check_errors.insert(
251                                0,
252                                format!(
253                                    "check[{}] for transforms {:?} failed conditions:",
254                                    i, self.transform_ids
255                                ),
256                            );
257                        }
258
259                        result.test_errors.extend(check_errors);
260                    }
261
262                    // If there are errors, add a summary of events received
263                    if !result.test_errors.is_empty() {
264                        result.test_errors.push(format!(
265                            "output payloads from {:?} (events encoded as JSON):\n  {}",
266                            self.transform_ids,
267                            events_to_string(&output_events)
268                        ));
269                    }
270                }
271            }
272            UnitTestSinkCheck::NoOutputs => {
273                if !output_events.is_empty() {
274                    result.test_errors.push(format!(
275                        "check for transforms {:?} failed: expected no outputs",
276                        self.transform_ids
277                    ));
278                }
279            }
280            UnitTestSinkCheck::NoOp => {}
281        }
282
283        if let Some(tx) = self.result_tx
284            && tx.send(result).is_err()
285        {
286            error!(message = "Sending unit test results failed in unit test sink.");
287        }
288        Ok(())
289    }
290}
291
292/// Configuration for the `unit_test_stream` sink.
293#[configurable_component(sink("unit_test_stream", "Unit test stream."))]
294#[derive(Clone, Default)]
295pub struct UnitTestStreamSinkConfig {
296    /// Sink that receives the processed events.
297    #[serde(skip)]
298    sink: Arc<Mutex<Option<Box<dyn Sink<Event, Error = ()> + Send + Unpin>>>>,
299}
300
301impl_generate_config_from_default!(UnitTestStreamSinkConfig);
302
303impl UnitTestStreamSinkConfig {
304    pub fn new(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
305        Self {
306            sink: Arc::new(Mutex::new(Some(Box::new(sink)))),
307        }
308    }
309}
310
311impl std::fmt::Debug for UnitTestStreamSinkConfig {
312    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        formatter.debug_struct("UnitTestStreamSinkConfig").finish()
314    }
315}
316
317#[async_trait::async_trait]
318#[typetag::serde(name = "unit_test_stream")]
319impl SinkConfig for UnitTestStreamSinkConfig {
320    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
321        let sink = self.sink.lock().await.take().unwrap();
322        let healthcheck = future::ok(()).boxed();
323
324        #[allow(deprecated)]
325        Ok((VectorSink::from_event_sink(sink), healthcheck))
326    }
327
328    fn input(&self) -> Input {
329        Input::all()
330    }
331
332    fn acknowledgements(&self) -> &AcknowledgementsConfig {
333        &AcknowledgementsConfig::DEFAULT
334    }
335}
336
337fn events_to_string(events: &[Event]) -> String {
338    events
339        .iter()
340        .map(|event| match event {
341            Event::Log(log) => serde_json::to_string(log).unwrap_or_else(|_| "{}".to_string()),
342            Event::Metric(metric) => {
343                serde_json::to_string(metric).unwrap_or_else(|_| "{}".to_string())
344            }
345            Event::Trace(trace) => {
346                serde_json::to_string(trace).unwrap_or_else(|_| "{}".to_string())
347            }
348        })
349        .collect::<Vec<_>>()
350        .join("\n  ")
351}