1#![allow(clippy::let_underscore_must_use)]
3
4use std::sync::Arc;
5
6use futures::{Sink, Stream, stream};
7use futures_util::{FutureExt, StreamExt, future, stream::BoxStream};
8use tokio::sync::{Mutex, oneshot};
9use vector_lib::{
10 config::{DataType, Input, LogNamespace},
11 configurable::configurable_component,
12 event::Event,
13 schema,
14 sink::{StreamSink, VectorSink},
15};
16
17use crate::{
18 conditions::Condition,
19 config::{
20 AcknowledgementsConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
21 },
22 sinks::Healthcheck,
23 sources,
24};
25
26#[configurable_component(source("unit_test", "Unit test."))]
28#[derive(Clone, Debug, Default)]
29pub struct UnitTestSourceConfig {
30 #[serde(skip)]
32 pub events: Vec<Event>,
33}
34
35impl_generate_config_from_default!(UnitTestSourceConfig);
36
37#[async_trait::async_trait]
38#[typetag::serde(name = "unit_test")]
39impl SourceConfig for UnitTestSourceConfig {
40 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
41 let events = self.events.clone().into_iter();
42
43 Ok(Box::pin(async move {
44 let mut out = cx.out;
45 let _shutdown = cx.shutdown;
46 out.send_batch(events).await.map_err(|_| ())?;
47 Ok(())
48 }))
49 }
50
51 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
52 vec![SourceOutput::new_maybe_logs(
53 DataType::all_bits(),
54 schema::Definition::default_legacy_namespace(),
55 )]
56 }
57
58 fn can_acknowledge(&self) -> bool {
59 false
60 }
61}
62
63#[configurable_component(source("unit_test_stream", "Unit test stream."))]
65#[derive(Clone)]
66pub struct UnitTestStreamSourceConfig {
67 #[serde(skip)]
68 stream: Arc<Mutex<Option<stream::BoxStream<'static, Event>>>>,
69}
70
71impl_generate_config_from_default!(UnitTestStreamSourceConfig);
72
73impl UnitTestStreamSourceConfig {
74 pub fn new(stream: impl Stream<Item = Event> + Send + 'static) -> Self {
75 Self {
76 stream: Arc::new(Mutex::new(Some(stream.boxed()))),
77 }
78 }
79}
80
81impl Default for UnitTestStreamSourceConfig {
82 fn default() -> Self {
83 Self::new(stream::empty().boxed())
84 }
85}
86
87impl std::fmt::Debug for UnitTestStreamSourceConfig {
88 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 formatter
90 .debug_struct("UnitTestStreamSourceConfig")
91 .finish()
92 }
93}
94
95#[async_trait::async_trait]
96#[typetag::serde(name = "unit_test_stream")]
97impl SourceConfig for UnitTestStreamSourceConfig {
98 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
99 let stream = self.stream.lock().await.take().unwrap();
100 Ok(Box::pin(async move {
101 let mut out = cx.out;
102 let _shutdown = cx.shutdown;
103 out.send_event_stream(stream).await.map_err(|_| ())?;
104 Ok(())
105 }))
106 }
107
108 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
109 vec![SourceOutput::new_maybe_logs(
110 DataType::all_bits(),
111 schema::Definition::default_legacy_namespace(),
112 )]
113 }
114
115 fn can_acknowledge(&self) -> bool {
116 false
117 }
118}
119
120#[derive(Clone, Default)]
121pub enum UnitTestSinkCheck {
122 Checks {
124 conditions: Vec<Vec<Condition>>,
125 expected_event_count: Option<usize>,
126 },
127
128 NoOutputs,
130
131 #[default]
133 NoOp,
134}
135
136#[derive(Debug)]
137pub struct UnitTestSinkResult {
138 pub test_name: String,
139 pub test_errors: Vec<String>,
140}
141
142#[configurable_component(sink("unit_test", "Unit test."))]
144#[derive(Clone, Default, Derivative)]
145#[derivative(Debug)]
146pub struct UnitTestSinkConfig {
147 pub test_name: String,
149
150 pub transform_ids: Vec<String>,
152
153 #[serde(skip)]
155 pub result_tx: Arc<Mutex<Option<oneshot::Sender<UnitTestSinkResult>>>>,
156
157 #[serde(skip)]
159 #[derivative(Debug = "ignore")]
160 pub check: UnitTestSinkCheck,
161}
162
163impl_generate_config_from_default!(UnitTestSinkConfig);
164
165#[async_trait::async_trait]
166#[typetag::serde(name = "unit_test")]
167impl SinkConfig for UnitTestSinkConfig {
168 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
169 let tx = self.result_tx.lock().await.take();
170 let sink = UnitTestSink {
171 test_name: self.test_name.clone(),
172 transform_ids: self.transform_ids.clone(),
173 result_tx: tx,
174 check: self.check.clone(),
175 };
176 let healthcheck = future::ok(()).boxed();
177
178 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
179 }
180
181 fn input(&self) -> Input {
182 Input::all()
183 }
184
185 fn acknowledgements(&self) -> &AcknowledgementsConfig {
186 &AcknowledgementsConfig::DEFAULT
187 }
188}
189
190pub struct UnitTestSink {
191 pub test_name: String,
192 pub transform_ids: Vec<String>,
193 pub result_tx: Option<oneshot::Sender<UnitTestSinkResult>>,
195 pub check: UnitTestSinkCheck,
196}
197
198#[async_trait::async_trait]
199impl StreamSink<Event> for UnitTestSink {
200 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
201 let mut output_events = Vec::new();
202 let mut result = UnitTestSinkResult {
203 test_name: self.test_name,
204 test_errors: Vec::new(),
205 };
206
207 while let Some(event) = input.next().await {
208 output_events.push(event);
209 }
210
211 match self.check {
212 UnitTestSinkCheck::Checks {
213 conditions: checks,
214 expected_event_count,
215 } => {
216 if let Some(expected) = expected_event_count {
217 let actual = output_events.len();
218 if actual != expected {
219 result.test_errors.push(format!(
220 "expected {} events from transforms {:?}, but received {}",
221 expected, self.transform_ids, actual
222 ));
223 }
224 }
225
226 if output_events.is_empty() && expected_event_count != Some(0) {
227 result
228 .test_errors
229 .push(format!("checks for transforms {:?} failed: no events received. Topology may be disconnected or transform is missing inputs.", self.transform_ids));
230 } else {
231 for (i, check) in checks.iter().enumerate() {
232 let mut check_errors = Vec::new();
233 for (j, condition) in check.iter().enumerate() {
234 let mut condition_errors = Vec::new();
235 for event in output_events.iter() {
236 match condition.check_with_context(event.clone()).0 {
237 Ok(_) => {
238 condition_errors.clear();
239 break;
240 }
241 Err(error) => {
242 condition_errors.push(format!(" condition[{j}]: {error}"));
243 }
244 }
245 }
246 check_errors.extend(condition_errors);
247 }
248 if !check_errors.is_empty() {
250 check_errors.insert(
251 0,
252 format!(
253 "check[{}] for transforms {:?} failed conditions:",
254 i, self.transform_ids
255 ),
256 );
257 }
258
259 result.test_errors.extend(check_errors);
260 }
261
262 if !result.test_errors.is_empty() {
264 result.test_errors.push(format!(
265 "output payloads from {:?} (events encoded as JSON):\n {}",
266 self.transform_ids,
267 events_to_string(&output_events)
268 ));
269 }
270 }
271 }
272 UnitTestSinkCheck::NoOutputs => {
273 if !output_events.is_empty() {
274 result.test_errors.push(format!(
275 "check for transforms {:?} failed: expected no outputs",
276 self.transform_ids
277 ));
278 }
279 }
280 UnitTestSinkCheck::NoOp => {}
281 }
282
283 if let Some(tx) = self.result_tx
284 && tx.send(result).is_err()
285 {
286 error!(message = "Sending unit test results failed in unit test sink.");
287 }
288 Ok(())
289 }
290}
291
292#[configurable_component(sink("unit_test_stream", "Unit test stream."))]
294#[derive(Clone, Default)]
295pub struct UnitTestStreamSinkConfig {
296 #[serde(skip)]
298 sink: Arc<Mutex<Option<Box<dyn Sink<Event, Error = ()> + Send + Unpin>>>>,
299}
300
301impl_generate_config_from_default!(UnitTestStreamSinkConfig);
302
303impl UnitTestStreamSinkConfig {
304 pub fn new(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
305 Self {
306 sink: Arc::new(Mutex::new(Some(Box::new(sink)))),
307 }
308 }
309}
310
311impl std::fmt::Debug for UnitTestStreamSinkConfig {
312 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 formatter.debug_struct("UnitTestStreamSinkConfig").finish()
314 }
315}
316
317#[async_trait::async_trait]
318#[typetag::serde(name = "unit_test_stream")]
319impl SinkConfig for UnitTestStreamSinkConfig {
320 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
321 let sink = self.sink.lock().await.take().unwrap();
322 let healthcheck = future::ok(()).boxed();
323
324 #[allow(deprecated)]
325 Ok((VectorSink::from_event_sink(sink), healthcheck))
326 }
327
328 fn input(&self) -> Input {
329 Input::all()
330 }
331
332 fn acknowledgements(&self) -> &AcknowledgementsConfig {
333 &AcknowledgementsConfig::DEFAULT
334 }
335}
336
337fn events_to_string(events: &[Event]) -> String {
338 events
339 .iter()
340 .map(|event| match event {
341 Event::Log(log) => serde_json::to_string(log).unwrap_or_else(|_| "{}".to_string()),
342 Event::Metric(metric) => {
343 serde_json::to_string(metric).unwrap_or_else(|_| "{}".to_string())
344 }
345 Event::Trace(trace) => {
346 serde_json::to_string(trace).unwrap_or_else(|_| "{}".to_string())
347 }
348 })
349 .collect::<Vec<_>>()
350 .join("\n ")
351}