1use std::task::Poll;
2
3use chrono::Utc;
4use fakedata::logs::*;
5use futures::StreamExt;
6use rand::prelude::IndexedRandom;
7use serde_with::serde_as;
8use snafu::Snafu;
9use tokio::time::{self, Duration};
10use vector_lib::{
11 EstimatedJsonEncodedSizeOf,
12 codecs::{
13 DecoderFramedRead, StreamDecodingError,
14 decoding::{DeserializerConfig, FramingConfig},
15 },
16 config::{DataType, LegacyKey, LogNamespace},
17 configurable::configurable_component,
18 internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
19 lookup::{owned_value_path, path},
20};
21use vrl::value::Kind;
22
23use crate::{
24 SourceSender,
25 codecs::{Decoder, DecodingConfig},
26 config::{SourceConfig, SourceContext, SourceOutput},
27 internal_events::{DemoLogsEventProcessed, EventsReceived, StreamClosedError},
28 serde::{default_decoding, default_framing_message_based},
29 shutdown::ShutdownSignal,
30};
31
32#[serde_as]
34#[configurable_component(source(
35 "demo_logs",
36 "Generate fake log events, which can be useful for testing and demos."
37))]
38#[derive(Clone, Debug, Derivative)]
39#[derivative(Default)]
40pub struct DemoLogsConfig {
41 #[serde(alias = "batch_interval")]
46 #[derivative(Default(value = "default_interval()"))]
47 #[serde(default = "default_interval")]
48 #[configurable(metadata(docs::examples = 1.0, docs::examples = 0.1, docs::examples = 0.01,))]
49 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
50 pub interval: Duration,
51
52 #[derivative(Default(value = "default_count()"))]
56 #[serde(default = "default_count")]
57 pub count: usize,
58
59 #[serde(flatten)]
60 #[configurable(metadata(
61 docs::enum_tag_description = "The format of the randomly generated output."
62 ))]
63 pub format: OutputFormat,
64
65 #[configurable(derived)]
66 #[derivative(Default(value = "default_framing_message_based()"))]
67 #[serde(default = "default_framing_message_based")]
68 pub framing: FramingConfig,
69
70 #[configurable(derived)]
71 #[derivative(Default(value = "default_decoding()"))]
72 #[serde(default = "default_decoding")]
73 pub decoding: DeserializerConfig,
74
75 #[serde(default)]
77 #[configurable(metadata(docs::hidden))]
78 pub log_namespace: Option<bool>,
79}
80
81const fn default_interval() -> Duration {
82 Duration::from_secs(1)
83}
84
85const fn default_count() -> usize {
86 isize::MAX as usize
87}
88
89#[derive(Debug, PartialEq, Eq, Snafu)]
90pub enum DemoLogsConfigError {
91 #[snafu(display("A non-empty list of lines is required for the shuffle format"))]
92 ShuffleDemoLogsItemsEmpty,
93}
94
95#[configurable_component]
97#[derive(Clone, Debug, Default)]
98#[serde(tag = "format", rename_all = "snake_case")]
99#[configurable(metadata(
100 docs::enum_tag_description = "The format of the randomly generated output."
101))]
102pub enum OutputFormat {
103 Shuffle {
105 #[serde(default)]
107 sequence: bool,
108 #[configurable(metadata(docs::examples = "lines_example()"))]
110 lines: Vec<String>,
111 },
112
113 ApacheCommon,
117
118 ApacheError,
122
123 #[serde(alias = "rfc5424")]
127 Syslog,
128
129 #[serde(alias = "rfc3164")]
133 BsdSyslog,
134
135 #[default]
139 Json,
140}
141
142const fn lines_example() -> [&'static str; 2] {
143 ["line1", "line2"]
144}
145
146impl OutputFormat {
147 fn generate_line(&self, n: usize) -> String {
148 emit!(DemoLogsEventProcessed);
149
150 match self {
151 Self::Shuffle { sequence, lines } => Self::shuffle_generate(*sequence, lines, n),
152 Self::ApacheCommon => apache_common_log_line(),
153 Self::ApacheError => apache_error_log_line(),
154 Self::Syslog => syslog_5424_log_line(),
155 Self::BsdSyslog => syslog_3164_log_line(),
156 Self::Json => json_log_line(),
157 }
158 }
159
160 fn shuffle_generate(sequence: bool, lines: &[String], n: usize) -> String {
161 let line = lines.choose(&mut rand::rng()).unwrap();
163
164 if sequence {
165 format!("{n} {line}")
166 } else {
167 line.into()
168 }
169 }
170
171 pub(self) const fn validate(&self) -> Result<(), DemoLogsConfigError> {
173 match self {
174 Self::Shuffle { lines, .. } => {
175 if lines.is_empty() {
176 Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
177 } else {
178 Ok(())
179 }
180 }
181 _ => Ok(()),
182 }
183 }
184}
185
186impl DemoLogsConfig {
187 #[cfg(test)]
188 pub fn repeat(
189 lines: Vec<String>,
190 count: usize,
191 interval: Duration,
192 log_namespace: Option<bool>,
193 ) -> Self {
194 Self {
195 count,
196 interval,
197 format: OutputFormat::Shuffle {
198 lines,
199 sequence: false,
200 },
201 framing: default_framing_message_based(),
202 decoding: default_decoding(),
203 log_namespace,
204 }
205 }
206}
207
208async fn demo_logs_source(
209 interval: Duration,
210 count: usize,
211 format: OutputFormat,
212 decoder: Decoder,
213 mut shutdown: ShutdownSignal,
214 mut out: SourceSender,
215 log_namespace: LogNamespace,
216) -> Result<(), ()> {
217 let interval: Option<Duration> = (interval != Duration::ZERO).then_some(interval);
218 let mut interval = interval.map(time::interval);
219
220 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
221 let events_received = register!(EventsReceived);
222
223 for n in 0..count {
224 if matches!(futures::poll!(&mut shutdown), Poll::Ready(_)) {
225 break;
226 }
227
228 if let Some(interval) = &mut interval {
229 interval.tick().await;
230 }
231 bytes_received.emit(ByteSize(0));
232
233 let line = format.generate_line(n);
234
235 let mut stream = DecoderFramedRead::new(line.as_bytes(), decoder.clone());
236 while let Some(next) = stream.next().await {
237 match next {
238 Ok((events, _byte_size)) => {
239 let count = events.len();
240 let byte_size = events.estimated_json_encoded_size_of();
241 events_received.emit(CountByteSize(count, byte_size));
242 let now = Utc::now();
243
244 let events = events.into_iter().map(|mut event| {
245 let log = event.as_mut_log();
246 log_namespace.insert_standard_vector_source_metadata(
247 log,
248 DemoLogsConfig::NAME,
249 now,
250 );
251 log_namespace.insert_source_metadata(
252 DemoLogsConfig::NAME,
253 log,
254 Some(LegacyKey::InsertIfEmpty(path!("service"))),
255 path!("service"),
256 "vector",
257 );
258 log_namespace.insert_source_metadata(
259 DemoLogsConfig::NAME,
260 log,
261 Some(LegacyKey::InsertIfEmpty(path!("host"))),
262 path!("host"),
263 "localhost",
264 );
265
266 event
267 });
268 out.send_batch(events).await.map_err(|_| {
269 emit!(StreamClosedError { count });
270 })?;
271 }
272 Err(error) => {
273 if !error.can_continue() {
276 break;
277 }
278 }
279 }
280 }
281 }
282
283 Ok(())
284}
285
286impl_generate_config_from_default!(DemoLogsConfig);
287
288#[async_trait::async_trait]
289#[typetag::serde(name = "demo_logs")]
290impl SourceConfig for DemoLogsConfig {
291 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
292 let log_namespace = cx.log_namespace(self.log_namespace);
293
294 self.format.validate()?;
295 let decoder =
296 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
297 .build()?;
298 Ok(Box::pin(demo_logs_source(
299 self.interval,
300 self.count,
301 self.format.clone(),
302 decoder,
303 cx.shutdown,
304 cx.out,
305 log_namespace,
306 )))
307 }
308
309 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
310 let log_namespace = global_log_namespace.merge(self.log_namespace);
313
314 let schema_definition = self
315 .decoding
316 .schema_definition(log_namespace)
317 .with_standard_vector_source_metadata()
318 .with_source_metadata(
319 DemoLogsConfig::NAME,
320 Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
321 &owned_value_path!("service"),
322 Kind::bytes(),
323 Some("service"),
324 );
325
326 vec![SourceOutput::new_maybe_logs(
327 DataType::Log,
328 schema_definition,
329 )]
330 }
331
332 fn can_acknowledge(&self) -> bool {
333 false
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use std::time::{Duration, Instant};
340
341 use futures::{Stream, StreamExt, poll};
342 use indoc::indoc;
343
344 use super::*;
345 use crate::{
346 SourceSender,
347 config::log_schema,
348 event::Event,
349 shutdown::ShutdownSignal,
350 test_util::components::{SOURCE_TAGS, assert_source_compliance},
351 };
352
353 #[test]
354 fn generate_config() {
355 crate::test_util::test_generate_config::<DemoLogsConfig>();
356 }
357
358 async fn runit(config: &str) -> impl Stream<Item = Event> + use<> {
359 assert_source_compliance(&SOURCE_TAGS, async {
360 let (tx, rx) = SourceSender::new_test();
361 let config: DemoLogsConfig = serde_yaml::from_str(config).unwrap();
362 let decoder = DecodingConfig::new(
363 default_framing_message_based(),
364 default_decoding(),
365 LogNamespace::Legacy,
366 )
367 .build()
368 .unwrap();
369 demo_logs_source(
370 config.interval,
371 config.count,
372 config.format,
373 decoder,
374 ShutdownSignal::noop(),
375 tx,
376 LogNamespace::Legacy,
377 )
378 .await
379 .unwrap();
380
381 rx
382 })
383 .await
384 }
385
386 #[test]
387 fn config_shuffle_lines_not_empty() {
388 let empty_lines: Vec<String> = Vec::new();
389
390 let errant_config = DemoLogsConfig {
391 format: OutputFormat::Shuffle {
392 sequence: false,
393 lines: empty_lines,
394 },
395 ..DemoLogsConfig::default()
396 };
397
398 assert_eq!(
399 errant_config.format.validate(),
400 Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
401 );
402 }
403
404 #[tokio::test]
405 async fn shuffle_demo_logs_copies_lines() {
406 let message_key = log_schema().message_key().unwrap().to_string();
407 let mut rx = runit(indoc! {r#"
408 format: shuffle
409 lines:
410 - one
411 - two
412 - three
413 - four
414 count: 5
415 "#})
416 .await;
417
418 let lines = &["one", "two", "three", "four"];
419
420 for _ in 0..5 {
421 let event = match poll!(rx.next()) {
422 Poll::Ready(event) => event.unwrap(),
423 _ => unreachable!(),
424 };
425 let log = event.as_log();
426 let message = log[&message_key].to_string_lossy();
427 assert!(lines.contains(&&*message));
428 }
429
430 assert_eq!(poll!(rx.next()), Poll::Ready(None));
431 }
432
433 #[tokio::test]
434 async fn shuffle_demo_logs_limits_count() {
435 let mut rx = runit(indoc! {r#"
436 format: shuffle
437 lines:
438 - one
439 - two
440 count: 5
441 "#})
442 .await;
443
444 for _ in 0..5 {
445 assert!(poll!(rx.next()).is_ready());
446 }
447 assert_eq!(poll!(rx.next()), Poll::Ready(None));
448 }
449
450 #[tokio::test]
451 async fn shuffle_demo_logs_adds_sequence() {
452 let message_key = log_schema().message_key().unwrap().to_string();
453 let mut rx = runit(indoc! {r#"
454 format: shuffle
455 lines:
456 - one
457 - two
458 sequence: true
459 count: 5
460 "#})
461 .await;
462
463 for n in 0..5 {
464 let event = match poll!(rx.next()) {
465 Poll::Ready(event) => event.unwrap(),
466 _ => unreachable!(),
467 };
468 let log = event.as_log();
469 let message = log[&message_key].to_string_lossy();
470 assert!(message.starts_with(&n.to_string()));
471 }
472
473 assert_eq!(poll!(rx.next()), Poll::Ready(None));
474 }
475
476 #[tokio::test]
477 async fn shuffle_demo_logs_obeys_interval() {
478 let start = Instant::now();
479 let mut rx = runit(indoc! {r#"
480 format: shuffle
481 lines:
482 - one
483 - two
484 count: 3
485 interval: 1.0
486 "#})
487 .await;
488
489 for _ in 0..3 {
490 assert!(poll!(rx.next()).is_ready());
491 }
492 assert_eq!(poll!(rx.next()), Poll::Ready(None));
493
494 let duration = start.elapsed();
495 assert!(duration >= Duration::from_secs(2));
496 }
497
498 #[tokio::test]
499 async fn host_is_set() {
500 let host_key = log_schema().host_key().unwrap().to_string();
501 let mut rx = runit(indoc! {r#"
502 format: syslog
503 count: 5
504 "#})
505 .await;
506
507 let event = match poll!(rx.next()) {
508 Poll::Ready(event) => event.unwrap(),
509 _ => unreachable!(),
510 };
511 let log = event.as_log();
512 let host = log[&host_key].to_string_lossy();
513 assert_eq!("localhost", host);
514 }
515
516 #[tokio::test]
517 async fn apache_common_format_generates_output() {
518 let mut rx = runit(indoc! {r#"
519 format: apache_common
520 count: 5
521 "#})
522 .await;
523
524 for _ in 0..5 {
525 assert!(poll!(rx.next()).is_ready());
526 }
527 assert_eq!(poll!(rx.next()), Poll::Ready(None));
528 }
529
530 #[tokio::test]
531 async fn apache_error_format_generates_output() {
532 let mut rx = runit(indoc! {r#"
533 format: apache_error
534 count: 5
535 "#})
536 .await;
537
538 for _ in 0..5 {
539 assert!(poll!(rx.next()).is_ready());
540 }
541 assert_eq!(poll!(rx.next()), Poll::Ready(None));
542 }
543
544 #[tokio::test]
545 async fn syslog_5424_format_generates_output() {
546 let mut rx = runit(indoc! {r#"
547 format: syslog
548 count: 5
549 "#})
550 .await;
551
552 for _ in 0..5 {
553 assert!(poll!(rx.next()).is_ready());
554 }
555 assert_eq!(poll!(rx.next()), Poll::Ready(None));
556 }
557
558 #[tokio::test]
559 async fn syslog_3164_format_generates_output() {
560 let mut rx = runit(indoc! {r#"
561 format: bsd_syslog
562 count: 5
563 "#})
564 .await;
565
566 for _ in 0..5 {
567 assert!(poll!(rx.next()).is_ready());
568 }
569 assert_eq!(poll!(rx.next()), Poll::Ready(None));
570 }
571
572 #[tokio::test]
573 async fn json_format_generates_output() {
574 let message_key = log_schema().message_key().unwrap().to_string();
575 let mut rx = runit(indoc! {r#"
576 format: json
577 count: 5
578 "#})
579 .await;
580
581 for _ in 0..5 {
582 let event = match poll!(rx.next()) {
583 Poll::Ready(event) => event.unwrap(),
584 _ => unreachable!(),
585 };
586 let log = event.as_log();
587 let message = log[&message_key].to_string_lossy();
588 assert!(serde_json::from_str::<serde_json::Value>(&message).is_ok());
589 }
590 assert_eq!(poll!(rx.next()), Poll::Ready(None));
591 }
592}