Skip to main content

vector/sources/
demo_logs.rs

1use std::task::Poll;
2
3use chrono::Utc;
4use fakedata::logs::*;
5use futures::StreamExt;
6use rand::prelude::IndexedRandom;
7use serde_with::serde_as;
8use snafu::Snafu;
9use tokio::time::{self, Duration};
10use vector_lib::{
11    EstimatedJsonEncodedSizeOf,
12    codecs::{
13        DecoderFramedRead, StreamDecodingError,
14        decoding::{DeserializerConfig, FramingConfig},
15    },
16    config::{DataType, LegacyKey, LogNamespace},
17    configurable::configurable_component,
18    internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
19    lookup::{owned_value_path, path},
20};
21use vrl::value::Kind;
22
23use crate::{
24    SourceSender,
25    codecs::{Decoder, DecodingConfig},
26    config::{SourceConfig, SourceContext, SourceOutput},
27    internal_events::{DemoLogsEventProcessed, EventsReceived, StreamClosedError},
28    serde::{default_decoding, default_framing_message_based},
29    shutdown::ShutdownSignal,
30};
31
32/// Configuration for the `demo_logs` source.
33#[serde_as]
34#[configurable_component(source(
35    "demo_logs",
36    "Generate fake log events, which can be useful for testing and demos."
37))]
38#[derive(Clone, Debug, Derivative)]
39#[derivative(Default)]
40pub struct DemoLogsConfig {
41    /// The amount of time, in seconds, to pause between each batch of output lines.
42    ///
43    /// The default is one batch per second. To remove the delay and output batches as quickly as possible, set
44    /// `interval` to `0.0`.
45    #[serde(alias = "batch_interval")]
46    #[derivative(Default(value = "default_interval()"))]
47    #[serde(default = "default_interval")]
48    #[configurable(metadata(docs::examples = 1.0, docs::examples = 0.1, docs::examples = 0.01,))]
49    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
50    pub interval: Duration,
51
52    /// The total number of lines to output.
53    ///
54    /// By default, the source continuously prints logs (infinitely).
55    #[derivative(Default(value = "default_count()"))]
56    #[serde(default = "default_count")]
57    pub count: usize,
58
59    #[serde(flatten)]
60    #[configurable(metadata(
61        docs::enum_tag_description = "The format of the randomly generated output."
62    ))]
63    pub format: OutputFormat,
64
65    #[configurable(derived)]
66    #[derivative(Default(value = "default_framing_message_based()"))]
67    #[serde(default = "default_framing_message_based")]
68    pub framing: FramingConfig,
69
70    #[configurable(derived)]
71    #[derivative(Default(value = "default_decoding()"))]
72    #[serde(default = "default_decoding")]
73    pub decoding: DeserializerConfig,
74
75    /// The namespace to use for logs. This overrides the global setting.
76    #[serde(default)]
77    #[configurable(metadata(docs::hidden))]
78    pub log_namespace: Option<bool>,
79}
80
81const fn default_interval() -> Duration {
82    Duration::from_secs(1)
83}
84
85const fn default_count() -> usize {
86    isize::MAX as usize
87}
88
89#[derive(Debug, PartialEq, Eq, Snafu)]
90pub enum DemoLogsConfigError {
91    #[snafu(display("A non-empty list of lines is required for the shuffle format"))]
92    ShuffleDemoLogsItemsEmpty,
93}
94
95/// Output format configuration.
96#[configurable_component]
97#[derive(Clone, Debug, Default)]
98#[serde(tag = "format", rename_all = "snake_case")]
99#[configurable(metadata(
100    docs::enum_tag_description = "The format of the randomly generated output."
101))]
102pub enum OutputFormat {
103    /// Lines are chosen at random from the list specified using `lines`.
104    Shuffle {
105        /// If `true`, each output line starts with an increasing sequence number, beginning with 0.
106        #[serde(default)]
107        sequence: bool,
108        /// The list of lines to output.
109        #[configurable(metadata(docs::examples = "lines_example()"))]
110        lines: Vec<String>,
111    },
112
113    /// Randomly generated logs in [Apache common][apache_common] format.
114    ///
115    /// [apache_common]: https://httpd.apache.org/docs/current/logs.html#common
116    ApacheCommon,
117
118    /// Randomly generated logs in [Apache error][apache_error] format.
119    ///
120    /// [apache_error]: https://httpd.apache.org/docs/current/logs.html#errorlog
121    ApacheError,
122
123    /// Randomly generated logs in Syslog format ([RFC 5424][syslog_5424]).
124    ///
125    /// [syslog_5424]: https://tools.ietf.org/html/rfc5424
126    #[serde(alias = "rfc5424")]
127    Syslog,
128
129    /// Randomly generated logs in Syslog format ([RFC 3164][syslog_3164]).
130    ///
131    /// [syslog_3164]: https://tools.ietf.org/html/rfc3164
132    #[serde(alias = "rfc3164")]
133    BsdSyslog,
134
135    /// Randomly generated HTTP server logs in [JSON][json] format.
136    ///
137    /// [json]: https://en.wikipedia.org/wiki/JSON
138    #[default]
139    Json,
140}
141
142const fn lines_example() -> [&'static str; 2] {
143    ["line1", "line2"]
144}
145
146impl OutputFormat {
147    fn generate_line(&self, n: usize) -> String {
148        emit!(DemoLogsEventProcessed);
149
150        match self {
151            Self::Shuffle { sequence, lines } => Self::shuffle_generate(*sequence, lines, n),
152            Self::ApacheCommon => apache_common_log_line(),
153            Self::ApacheError => apache_error_log_line(),
154            Self::Syslog => syslog_5424_log_line(),
155            Self::BsdSyslog => syslog_3164_log_line(),
156            Self::Json => json_log_line(),
157        }
158    }
159
160    fn shuffle_generate(sequence: bool, lines: &[String], n: usize) -> String {
161        // unwrap can be called here because `lines` can't be empty
162        let line = lines.choose(&mut rand::rng()).unwrap();
163
164        if sequence {
165            format!("{n} {line}")
166        } else {
167            line.into()
168        }
169    }
170
171    // Ensures that the `lines` list is non-empty if `Shuffle` is chosen
172    pub(self) const fn validate(&self) -> Result<(), DemoLogsConfigError> {
173        match self {
174            Self::Shuffle { lines, .. } => {
175                if lines.is_empty() {
176                    Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
177                } else {
178                    Ok(())
179                }
180            }
181            _ => Ok(()),
182        }
183    }
184}
185
186impl DemoLogsConfig {
187    #[cfg(test)]
188    pub fn repeat(
189        lines: Vec<String>,
190        count: usize,
191        interval: Duration,
192        log_namespace: Option<bool>,
193    ) -> Self {
194        Self {
195            count,
196            interval,
197            format: OutputFormat::Shuffle {
198                lines,
199                sequence: false,
200            },
201            framing: default_framing_message_based(),
202            decoding: default_decoding(),
203            log_namespace,
204        }
205    }
206}
207
208async fn demo_logs_source(
209    interval: Duration,
210    count: usize,
211    format: OutputFormat,
212    decoder: Decoder,
213    mut shutdown: ShutdownSignal,
214    mut out: SourceSender,
215    log_namespace: LogNamespace,
216) -> Result<(), ()> {
217    let interval: Option<Duration> = (interval != Duration::ZERO).then_some(interval);
218    let mut interval = interval.map(time::interval);
219
220    let bytes_received = register!(BytesReceived::from(Protocol::NONE));
221    let events_received = register!(EventsReceived);
222
223    for n in 0..count {
224        if matches!(futures::poll!(&mut shutdown), Poll::Ready(_)) {
225            break;
226        }
227
228        if let Some(interval) = &mut interval {
229            interval.tick().await;
230        }
231        bytes_received.emit(ByteSize(0));
232
233        let line = format.generate_line(n);
234
235        let mut stream = DecoderFramedRead::new(line.as_bytes(), decoder.clone());
236        while let Some(next) = stream.next().await {
237            match next {
238                Ok((events, _byte_size)) => {
239                    let count = events.len();
240                    let byte_size = events.estimated_json_encoded_size_of();
241                    events_received.emit(CountByteSize(count, byte_size));
242                    let now = Utc::now();
243
244                    let events = events.into_iter().map(|mut event| {
245                        let log = event.as_mut_log();
246                        log_namespace.insert_standard_vector_source_metadata(
247                            log,
248                            DemoLogsConfig::NAME,
249                            now,
250                        );
251                        log_namespace.insert_source_metadata(
252                            DemoLogsConfig::NAME,
253                            log,
254                            Some(LegacyKey::InsertIfEmpty(path!("service"))),
255                            path!("service"),
256                            "vector",
257                        );
258                        log_namespace.insert_source_metadata(
259                            DemoLogsConfig::NAME,
260                            log,
261                            Some(LegacyKey::InsertIfEmpty(path!("host"))),
262                            path!("host"),
263                            "localhost",
264                        );
265
266                        event
267                    });
268                    out.send_batch(events).await.map_err(|_| {
269                        emit!(StreamClosedError { count });
270                    })?;
271                }
272                Err(error) => {
273                    // Error is logged by `vector_lib::codecs::Decoder`, no further
274                    // handling is needed here.
275                    if !error.can_continue() {
276                        break;
277                    }
278                }
279            }
280        }
281    }
282
283    Ok(())
284}
285
286impl_generate_config_from_default!(DemoLogsConfig);
287
288#[async_trait::async_trait]
289#[typetag::serde(name = "demo_logs")]
290impl SourceConfig for DemoLogsConfig {
291    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
292        let log_namespace = cx.log_namespace(self.log_namespace);
293
294        self.format.validate()?;
295        let decoder =
296            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
297                .build()?;
298        Ok(Box::pin(demo_logs_source(
299            self.interval,
300            self.count,
301            self.format.clone(),
302            decoder,
303            cx.shutdown,
304            cx.out,
305            log_namespace,
306        )))
307    }
308
309    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
310        // There is a global and per-source `log_namespace` config. The source config overrides the global setting,
311        // and is merged here.
312        let log_namespace = global_log_namespace.merge(self.log_namespace);
313
314        let schema_definition = self
315            .decoding
316            .schema_definition(log_namespace)
317            .with_standard_vector_source_metadata()
318            .with_source_metadata(
319                DemoLogsConfig::NAME,
320                Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
321                &owned_value_path!("service"),
322                Kind::bytes(),
323                Some("service"),
324            );
325
326        vec![SourceOutput::new_maybe_logs(
327            DataType::Log,
328            schema_definition,
329        )]
330    }
331
332    fn can_acknowledge(&self) -> bool {
333        false
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use std::time::{Duration, Instant};
340
341    use futures::{Stream, StreamExt, poll};
342    use indoc::indoc;
343
344    use super::*;
345    use crate::{
346        SourceSender,
347        config::log_schema,
348        event::Event,
349        shutdown::ShutdownSignal,
350        test_util::components::{SOURCE_TAGS, assert_source_compliance},
351    };
352
353    #[test]
354    fn generate_config() {
355        crate::test_util::test_generate_config::<DemoLogsConfig>();
356    }
357
358    async fn runit(config: &str) -> impl Stream<Item = Event> + use<> {
359        assert_source_compliance(&SOURCE_TAGS, async {
360            let (tx, rx) = SourceSender::new_test();
361            let config: DemoLogsConfig = serde_yaml::from_str(config).unwrap();
362            let decoder = DecodingConfig::new(
363                default_framing_message_based(),
364                default_decoding(),
365                LogNamespace::Legacy,
366            )
367            .build()
368            .unwrap();
369            demo_logs_source(
370                config.interval,
371                config.count,
372                config.format,
373                decoder,
374                ShutdownSignal::noop(),
375                tx,
376                LogNamespace::Legacy,
377            )
378            .await
379            .unwrap();
380
381            rx
382        })
383        .await
384    }
385
386    #[test]
387    fn config_shuffle_lines_not_empty() {
388        let empty_lines: Vec<String> = Vec::new();
389
390        let errant_config = DemoLogsConfig {
391            format: OutputFormat::Shuffle {
392                sequence: false,
393                lines: empty_lines,
394            },
395            ..DemoLogsConfig::default()
396        };
397
398        assert_eq!(
399            errant_config.format.validate(),
400            Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
401        );
402    }
403
404    #[tokio::test]
405    async fn shuffle_demo_logs_copies_lines() {
406        let message_key = log_schema().message_key().unwrap().to_string();
407        let mut rx = runit(indoc! {r#"
408            format: shuffle
409            lines:
410              - one
411              - two
412              - three
413              - four
414            count: 5
415        "#})
416        .await;
417
418        let lines = &["one", "two", "three", "four"];
419
420        for _ in 0..5 {
421            let event = match poll!(rx.next()) {
422                Poll::Ready(event) => event.unwrap(),
423                _ => unreachable!(),
424            };
425            let log = event.as_log();
426            let message = log[&message_key].to_string_lossy();
427            assert!(lines.contains(&&*message));
428        }
429
430        assert_eq!(poll!(rx.next()), Poll::Ready(None));
431    }
432
433    #[tokio::test]
434    async fn shuffle_demo_logs_limits_count() {
435        let mut rx = runit(indoc! {r#"
436            format: shuffle
437            lines:
438              - one
439              - two
440            count: 5
441        "#})
442        .await;
443
444        for _ in 0..5 {
445            assert!(poll!(rx.next()).is_ready());
446        }
447        assert_eq!(poll!(rx.next()), Poll::Ready(None));
448    }
449
450    #[tokio::test]
451    async fn shuffle_demo_logs_adds_sequence() {
452        let message_key = log_schema().message_key().unwrap().to_string();
453        let mut rx = runit(indoc! {r#"
454            format: shuffle
455            lines:
456              - one
457              - two
458            sequence: true
459            count: 5
460        "#})
461        .await;
462
463        for n in 0..5 {
464            let event = match poll!(rx.next()) {
465                Poll::Ready(event) => event.unwrap(),
466                _ => unreachable!(),
467            };
468            let log = event.as_log();
469            let message = log[&message_key].to_string_lossy();
470            assert!(message.starts_with(&n.to_string()));
471        }
472
473        assert_eq!(poll!(rx.next()), Poll::Ready(None));
474    }
475
476    #[tokio::test]
477    async fn shuffle_demo_logs_obeys_interval() {
478        let start = Instant::now();
479        let mut rx = runit(indoc! {r#"
480            format: shuffle
481            lines:
482              - one
483              - two
484            count: 3
485            interval: 1.0
486        "#})
487        .await;
488
489        for _ in 0..3 {
490            assert!(poll!(rx.next()).is_ready());
491        }
492        assert_eq!(poll!(rx.next()), Poll::Ready(None));
493
494        let duration = start.elapsed();
495        assert!(duration >= Duration::from_secs(2));
496    }
497
498    #[tokio::test]
499    async fn host_is_set() {
500        let host_key = log_schema().host_key().unwrap().to_string();
501        let mut rx = runit(indoc! {r#"
502            format: syslog
503            count: 5
504        "#})
505        .await;
506
507        let event = match poll!(rx.next()) {
508            Poll::Ready(event) => event.unwrap(),
509            _ => unreachable!(),
510        };
511        let log = event.as_log();
512        let host = log[&host_key].to_string_lossy();
513        assert_eq!("localhost", host);
514    }
515
516    #[tokio::test]
517    async fn apache_common_format_generates_output() {
518        let mut rx = runit(indoc! {r#"
519            format: apache_common
520            count: 5
521        "#})
522        .await;
523
524        for _ in 0..5 {
525            assert!(poll!(rx.next()).is_ready());
526        }
527        assert_eq!(poll!(rx.next()), Poll::Ready(None));
528    }
529
530    #[tokio::test]
531    async fn apache_error_format_generates_output() {
532        let mut rx = runit(indoc! {r#"
533            format: apache_error
534            count: 5
535        "#})
536        .await;
537
538        for _ in 0..5 {
539            assert!(poll!(rx.next()).is_ready());
540        }
541        assert_eq!(poll!(rx.next()), Poll::Ready(None));
542    }
543
544    #[tokio::test]
545    async fn syslog_5424_format_generates_output() {
546        let mut rx = runit(indoc! {r#"
547            format: syslog
548            count: 5
549        "#})
550        .await;
551
552        for _ in 0..5 {
553            assert!(poll!(rx.next()).is_ready());
554        }
555        assert_eq!(poll!(rx.next()), Poll::Ready(None));
556    }
557
558    #[tokio::test]
559    async fn syslog_3164_format_generates_output() {
560        let mut rx = runit(indoc! {r#"
561            format: bsd_syslog
562            count: 5
563        "#})
564        .await;
565
566        for _ in 0..5 {
567            assert!(poll!(rx.next()).is_ready());
568        }
569        assert_eq!(poll!(rx.next()), Poll::Ready(None));
570    }
571
572    #[tokio::test]
573    async fn json_format_generates_output() {
574        let message_key = log_schema().message_key().unwrap().to_string();
575        let mut rx = runit(indoc! {r#"
576            format: json
577            count: 5
578        "#})
579        .await;
580
581        for _ in 0..5 {
582            let event = match poll!(rx.next()) {
583                Poll::Ready(event) => event.unwrap(),
584                _ => unreachable!(),
585            };
586            let log = event.as_log();
587            let message = log[&message_key].to_string_lossy();
588            assert!(serde_json::from_str::<serde_json::Value>(&message).is_ok());
589        }
590        assert_eq!(poll!(rx.next()), Poll::Ready(None));
591    }
592}