Skip to main content

vector/sources/
journald.rs

1use std::{
2    collections::{HashMap, HashSet},
3    io::SeekFrom,
4    path::PathBuf,
5    process::Stdio,
6    str::FromStr,
7    sync::{Arc, LazyLock},
8    time::Duration,
9};
10
11use bytes::Bytes;
12use chrono::{TimeZone, Utc};
13use futures::{StreamExt, poll, stream::BoxStream, task::Poll};
14use nix::{
15    sys::signal::{Signal, kill},
16    unistd::Pid,
17};
18use serde_json::{Error as JsonError, Value as JsonValue};
19use snafu::{ResultExt, Snafu};
20use tokio::{
21    fs::{File, OpenOptions},
22    io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23    process::{Child, Command},
24    sync::{Mutex, MutexGuard},
25    time::sleep,
26};
27use tokio_util::codec::FramedRead;
28use vector_lib::{
29    EstimatedJsonEncodedSizeOf,
30    codecs::{CharacterDelimitedDecoder, decoding::BoxedFramingError},
31    config::{LegacyKey, LogNamespace},
32    configurable::configurable_component,
33    finalizer::OrderedFinalizer,
34    internal_event::{
35        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
36    },
37    lookup::{metadata_path, owned_value_path, path},
38    schema::Definition,
39};
40use vrl::{
41    event_path,
42    value::{Kind, Value, kind::Collection},
43};
44
45use crate::{
46    SourceSender,
47    config::{
48        DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
49        log_schema,
50    },
51    event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
52    internal_events::{
53        EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
54        JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
55        StreamClosedError,
56    },
57    serde::bool_or_struct,
58    shutdown::ShutdownSignal,
59};
60
61const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
62
63const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
64const CURSOR: &str = "__CURSOR";
65const HOSTNAME: &str = "_HOSTNAME";
66const MESSAGE: &str = "MESSAGE";
67const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
68const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
69const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
70
71const BACKOFF_DURATION: Duration = Duration::from_secs(1);
72
73static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
74
75#[derive(Debug, Snafu)]
76enum BuildError {
77    #[snafu(display("journalctl failed to execute: {}", source))]
78    JournalctlSpawn { source: io::Error },
79    #[snafu(display("failed to parse output of `journalctl --version`: {:?}", output))]
80    JournalctlParseVersion { output: String },
81    #[snafu(display(
82        "The unit {:?} is duplicated in both include_units and exclude_units",
83        unit
84    ))]
85    DuplicatedUnit { unit: String },
86    #[snafu(display(
87        "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
88        field,
89        value,
90    ))]
91    DuplicatedMatches { field: String, value: String },
92    #[snafu(display(
93        "`current_boot_only: false` not supported for systemd versions 250 through 257 (got {}).",
94        systemd_version
95    ))]
96    AllBootsNotSupported { systemd_version: u32 },
97}
98
99type Matches = HashMap<String, HashSet<String>>;
100
101/// Configuration for the `journald` source.
102#[configurable_component(source("journald", "Collect logs from JournalD."))]
103#[derive(Clone, Debug)]
104#[serde(deny_unknown_fields)]
105pub struct JournaldConfig {
106    /// Only include entries that appended to the journal after the entries have been read.
107    #[serde(default)]
108    pub since_now: bool,
109
110    /// Only include entries that occurred after the current boot of the system.
111    #[serde(default = "crate::serde::default_true")]
112    pub current_boot_only: bool,
113
114    /// A list of unit names to monitor.
115    ///
116    /// If empty or not present, all units are accepted.
117    ///
118    /// Unit names lacking a `.` have `.service` appended to make them a valid service unit name.
119    ///
120    /// **Note:** This option matches only the `_SYSTEMD_UNIT` field, which is narrower than `journalctl --unit`.
121    /// Messages from systemd about unit lifecycle (start/stop) have `_SYSTEMD_UNIT=init.scope` and will not match.
122    /// To capture these, explicitly include `init.scope` or use `include_matches` for finer control.
123    #[serde(default)]
124    #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
125    pub include_units: Vec<String>,
126
127    /// A list of unit names to exclude from monitoring.
128    ///
129    /// Unit names lacking a `.` have `.service` appended to make them a valid service unit
130    /// name.
131    #[serde(default)]
132    #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
133    pub exclude_units: Vec<String>,
134
135    /// A list of sets of field/value pairs to monitor.
136    ///
137    /// If empty or not present, all journal fields are accepted.
138    ///
139    /// If `include_units` is specified, it is merged into this list.
140    #[serde(default)]
141    #[configurable(metadata(
142        docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
143    ))]
144    #[configurable(metadata(docs::examples = "matches_examples()"))]
145    pub include_matches: Matches,
146
147    /// A list of sets of field/value pairs that, if any are present in a journal entry,
148    /// excludes the entry from this source.
149    ///
150    /// If `exclude_units` is specified, it is merged into this list.
151    #[serde(default)]
152    #[configurable(metadata(
153        docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
154    ))]
155    #[configurable(metadata(docs::examples = "matches_examples()"))]
156    pub exclude_matches: Matches,
157
158    /// The directory used to persist file checkpoint positions.
159    ///
160    /// By default, the [global `data_dir` option][global_data_dir] is used.
161    /// Make sure the running user has write permissions to this directory.
162    ///
163    /// If this directory is specified, then Vector will attempt to create it.
164    ///
165    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
166    #[serde(default)]
167    #[configurable(metadata(docs::examples = "/var/lib/vector"))]
168    #[configurable(metadata(docs::human_name = "Data Directory"))]
169    pub data_dir: Option<PathBuf>,
170
171    /// A list of extra command line arguments to pass to `journalctl`.
172    ///
173    /// If specified, it is merged to the command line arguments as-is.
174    #[serde(default)]
175    #[configurable(metadata(docs::examples = "--merge"))]
176    pub extra_args: Vec<String>,
177
178    /// The systemd journal is read in batches, and a checkpoint is set at the end of each batch.
179    ///
180    /// This option limits the size of the batch.
181    #[serde(default = "default_batch_size")]
182    #[configurable(metadata(docs::type_unit = "events"))]
183    pub batch_size: usize,
184
185    /// The full path of the `journalctl` executable.
186    ///
187    /// If not set, a search is done for the `journalctl` path.
188    #[serde(default)]
189    pub journalctl_path: Option<PathBuf>,
190
191    /// The full path of the journal directory.
192    ///
193    /// If not set, `journalctl` uses the default system journal path.
194    #[serde(default)]
195    pub journal_directory: Option<PathBuf>,
196
197    /// The [journal namespace][journal-namespace].
198    ///
199    /// This value is passed to `journalctl` through the [`--namespace` option][journalctl-namespace-option].
200    /// If not set, `journalctl` uses the default namespace.
201    ///
202    /// [journal-namespace]: https://www.freedesktop.org/software/systemd/man/systemd-journald.service.html#Journal%20Namespaces
203    /// [journalctl-namespace-option]: https://www.freedesktop.org/software/systemd/man/journalctl.html#--namespace=NAMESPACE
204    #[serde(default)]
205    pub journal_namespace: Option<String>,
206
207    #[configurable(derived)]
208    #[serde(default, deserialize_with = "bool_or_struct")]
209    acknowledgements: SourceAcknowledgementsConfig,
210
211    /// Enables remapping the `PRIORITY` field from an integer to string value.
212    ///
213    /// Has no effect unless the value of the field is already an integer.
214    #[serde(default)]
215    #[configurable(
216        deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
217    )]
218    remap_priority: bool,
219
220    /// The namespace to use for logs. This overrides the global setting.
221    #[configurable(metadata(docs::hidden))]
222    #[serde(default)]
223    log_namespace: Option<bool>,
224
225    /// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor].
226    ///
227    /// [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields
228    /// [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html
229    #[serde(default = "crate::serde::default_false")]
230    emit_cursor: bool,
231}
232
233const fn default_batch_size() -> usize {
234    16
235}
236
237fn matches_examples() -> HashMap<String, Vec<String>> {
238    HashMap::<_, _>::from_iter([
239        (
240            "_SYSTEMD_UNIT".to_owned(),
241            vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
242        ),
243        ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
244    ])
245}
246
247impl JournaldConfig {
248    fn merged_include_matches(&self) -> Matches {
249        Self::merge_units(&self.include_matches, &self.include_units)
250    }
251
252    fn merged_exclude_matches(&self) -> Matches {
253        Self::merge_units(&self.exclude_matches, &self.exclude_units)
254    }
255
256    fn merge_units(matches: &Matches, units: &[String]) -> Matches {
257        let mut matches = matches.clone();
258        for unit in units {
259            let entry = matches.entry(String::from(SYSTEMD_UNIT));
260            entry.or_default().insert(fixup_unit(unit));
261        }
262        matches
263    }
264
265    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
266    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
267        let schema_definition = match log_namespace {
268            LogNamespace::Vector => Definition::new_with_default_metadata(
269                Kind::bytes().or_null(),
270                [LogNamespace::Vector],
271            ),
272            LogNamespace::Legacy => Definition::new_with_default_metadata(
273                Kind::object(Collection::empty()),
274                [LogNamespace::Legacy],
275            ),
276        };
277
278        let mut schema_definition = schema_definition
279            .with_standard_vector_source_metadata()
280            // for metadata that is added to the events dynamically through the Record
281            .with_source_metadata(
282                JournaldConfig::NAME,
283                None,
284                &owned_value_path!("metadata"),
285                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
286                None,
287            )
288            .with_source_metadata(
289                JournaldConfig::NAME,
290                None,
291                &owned_value_path!("timestamp"),
292                Kind::timestamp().or_undefined(),
293                Some("timestamp"),
294            )
295            .with_source_metadata(
296                JournaldConfig::NAME,
297                log_schema().host_key().cloned().map(LegacyKey::Overwrite),
298                &owned_value_path!("host"),
299                Kind::bytes().or_undefined(),
300                Some("host"),
301            );
302
303        // for metadata that is added to the events dynamically through the Record
304        if log_namespace == LogNamespace::Legacy {
305            schema_definition = schema_definition.unknown_fields(Kind::bytes());
306        }
307
308        schema_definition
309    }
310}
311
312impl Default for JournaldConfig {
313    fn default() -> Self {
314        Self {
315            since_now: false,
316            current_boot_only: true,
317            include_units: vec![],
318            exclude_units: vec![],
319            include_matches: Default::default(),
320            exclude_matches: Default::default(),
321            data_dir: None,
322            batch_size: default_batch_size(),
323            journalctl_path: None,
324            journal_directory: None,
325            journal_namespace: None,
326            extra_args: vec![],
327            acknowledgements: Default::default(),
328            remap_priority: false,
329            log_namespace: None,
330            emit_cursor: false,
331        }
332    }
333}
334
335impl_generate_config_from_default!(JournaldConfig);
336
337type Record = HashMap<String, String>;
338
339#[async_trait::async_trait]
340#[typetag::serde(name = "journald")]
341impl SourceConfig for JournaldConfig {
342    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
343        if self.remap_priority {
344            warn!(
345                "DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead."
346            );
347        }
348
349        let data_dir = cx
350            .globals
351            // source are only global, name can be used for subdir
352            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
353
354        if let Some(unit) = self
355            .include_units
356            .iter()
357            .find(|unit| self.exclude_units.contains(unit))
358        {
359            let unit = unit.into();
360            return Err(BuildError::DuplicatedUnit { unit }.into());
361        }
362
363        let include_matches = self.merged_include_matches();
364        let exclude_matches = self.merged_exclude_matches();
365
366        if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
367            return Err(BuildError::DuplicatedMatches { field, value }.into());
368        }
369
370        let mut checkpoint_path = data_dir;
371        checkpoint_path.push(CHECKPOINT_FILENAME);
372
373        let journalctl_path = self
374            .journalctl_path
375            .clone()
376            .unwrap_or_else(|| JOURNALCTL.clone());
377
378        let systemd_version = get_systemd_version_from_journalctl(&journalctl_path).await?;
379
380        if !self.current_boot_only && (250..=257).contains(&systemd_version) {
381            // https://github.com/vectordotdev/vector/issues/18068
382            return Err(BuildError::AllBootsNotSupported { systemd_version }.into());
383        }
384
385        let starter = StartJournalctl::new(
386            journalctl_path,
387            systemd_version,
388            self.journal_directory.clone(),
389            self.journal_namespace.clone(),
390            self.current_boot_only,
391            self.since_now,
392            self.extra_args.clone(),
393        );
394
395        let batch_size = self.batch_size;
396        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
397        let log_namespace = cx.log_namespace(self.log_namespace);
398
399        Ok(Box::pin(
400            JournaldSource {
401                include_matches,
402                exclude_matches,
403                checkpoint_path,
404                batch_size,
405                remap_priority: self.remap_priority,
406                out: cx.out,
407                acknowledgements,
408                starter,
409                log_namespace,
410                emit_cursor: self.emit_cursor,
411            }
412            .run_shutdown(cx.shutdown),
413        ))
414    }
415
416    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
417        let schema_definition =
418            self.schema_definition(global_log_namespace.merge(self.log_namespace));
419
420        vec![SourceOutput::new_maybe_logs(
421            DataType::Log,
422            schema_definition,
423        )]
424    }
425
426    fn can_acknowledge(&self) -> bool {
427        true
428    }
429}
430
431struct JournaldSource {
432    include_matches: Matches,
433    exclude_matches: Matches,
434    checkpoint_path: PathBuf,
435    batch_size: usize,
436    remap_priority: bool,
437    out: SourceSender,
438    acknowledgements: bool,
439    starter: StartJournalctl,
440    log_namespace: LogNamespace,
441    emit_cursor: bool,
442}
443
444impl JournaldSource {
445    async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
446        let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
447            .await
448            .map_err(|error| {
449                emit!(JournaldCheckpointFileOpenError {
450                    error,
451                    path: self
452                        .checkpoint_path
453                        .to_str()
454                        .unwrap_or("unknown")
455                        .to_string(),
456                });
457            })?;
458
459        let checkpointer = SharedCheckpointer::new(checkpointer);
460        let finalizer = Finalizer::new(
461            self.acknowledgements,
462            checkpointer.clone(),
463            shutdown.clone(),
464        );
465
466        self.run(checkpointer, finalizer, shutdown).await;
467
468        Ok(())
469    }
470
471    async fn run(
472        mut self,
473        checkpointer: SharedCheckpointer,
474        finalizer: Finalizer,
475        mut shutdown: ShutdownSignal,
476    ) {
477        loop {
478            if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
479                break;
480            }
481
482            info!("Starting journalctl.");
483            let cursor = checkpointer.lock().await.cursor.clone();
484            match self.starter.start(cursor.as_deref()) {
485                Ok((stdout_stream, stderr_stream, running)) => {
486                    if !self
487                        .run_stream(stdout_stream, stderr_stream, &finalizer, shutdown.clone())
488                        .await
489                    {
490                        return;
491                    }
492                    // Explicit drop to ensure it isn't dropped earlier.
493                    drop(running);
494                }
495                Err(error) => {
496                    emit!(JournaldStartJournalctlError { error });
497                }
498            }
499
500            // journalctl process should never stop,
501            // so it is an error if we reach here.
502            tokio::select! {
503                _ = &mut shutdown => break,
504                _ = sleep(BACKOFF_DURATION) => (),
505            }
506        }
507    }
508
509    /// Process `journalctl` output until some error occurs.
510    /// Return `true` if should restart `journalctl`.
511    async fn run_stream<'a>(
512        &'a mut self,
513        mut stdout_stream: JournalStream,
514        stderr_stream: JournalStream,
515        finalizer: &'a Finalizer,
516        mut shutdown: ShutdownSignal,
517    ) -> bool {
518        let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
519        let events_received = register!(EventsReceived);
520
521        // Spawn stderr handler task
522        let stderr_handler = crate::spawn_in_current_span(Self::handle_stderr(stderr_stream));
523
524        let batch_size = self.batch_size;
525        let result = loop {
526            let mut batch = Batch::new(self);
527
528            // Start the timeout counter only once we have received a
529            // valid and non-filtered event.
530            while batch.events.is_empty() {
531                let item = tokio::select! {
532                    _ = &mut shutdown => {
533                        stderr_handler.abort();
534                        return false;
535                    },
536                    item = stdout_stream.next() => item,
537                };
538                if !batch.handle_next(item) {
539                    stderr_handler.abort();
540                    return true;
541                }
542            }
543
544            let timeout = tokio::time::sleep(BATCH_TIMEOUT);
545            tokio::pin!(timeout);
546
547            for _ in 1..batch_size {
548                tokio::select! {
549                    _ = &mut timeout => break,
550                    result = stdout_stream.next() => if !batch.handle_next(result) {
551                        break;
552                    }
553                }
554            }
555            if let Some(x) = batch
556                .finish(finalizer, &bytes_received, &events_received)
557                .await
558            {
559                break x;
560            }
561        };
562
563        stderr_handler.abort();
564        result
565    }
566
567    /// Handle stderr stream from journalctl process
568    async fn handle_stderr(mut stderr_stream: JournalStream) {
569        while let Some(result) = stderr_stream.next().await {
570            match result {
571                Ok(line) => {
572                    let line_str = String::from_utf8_lossy(&line);
573                    let trimmed = line_str.trim();
574                    if !trimmed.is_empty() {
575                        warn!("Warning journalctl stderr: {trimmed}");
576                    }
577                }
578                Err(err) => {
579                    error!("Error reading journalctl stderr: {err}");
580                    break;
581                }
582            }
583        }
584    }
585}
586
587struct Batch<'a> {
588    events: Vec<LogEvent>,
589    record_size: usize,
590    exiting: Option<bool>,
591    batch: Option<BatchNotifier>,
592    receiver: Option<BatchStatusReceiver>,
593    source: &'a mut JournaldSource,
594    cursor: Option<String>,
595}
596
597impl<'a> Batch<'a> {
598    fn new(source: &'a mut JournaldSource) -> Self {
599        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
600        Self {
601            events: Vec::new(),
602            record_size: 0,
603            exiting: None,
604            batch,
605            receiver,
606            source,
607            cursor: None,
608        }
609    }
610
611    fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
612        match result {
613            None => {
614                warn!("Journalctl process stopped.");
615                self.exiting = Some(true);
616                false
617            }
618            Some(Err(error)) => {
619                emit!(JournaldReadError { error });
620                false
621            }
622            Some(Ok(bytes)) => {
623                match decode_record(&bytes, self.source.remap_priority) {
624                    Ok(mut record) => {
625                        if self.source.emit_cursor {
626                            if let Some(tmp) = record.get(CURSOR) {
627                                self.cursor = Some(tmp.clone());
628                            }
629                        } else if let Some(tmp) = record.remove(CURSOR) {
630                            self.cursor = Some(tmp);
631                        }
632
633                        if !filter_matches(
634                            &record,
635                            &self.source.include_matches,
636                            &self.source.exclude_matches,
637                        ) {
638                            self.record_size += bytes.len();
639
640                            let mut event = create_log_event_from_record(
641                                record,
642                                &self.batch,
643                                self.source.log_namespace,
644                            );
645
646                            enrich_log_event(&mut event, self.source.log_namespace);
647
648                            self.events.push(event);
649                        }
650                    }
651                    Err(error) => {
652                        emit!(JournaldInvalidRecordError {
653                            error,
654                            text: String::from_utf8_lossy(&bytes).into_owned()
655                        });
656                    }
657                }
658                true
659            }
660        }
661    }
662
663    async fn finish(
664        mut self,
665        finalizer: &Finalizer,
666        bytes_received: &'a Registered<BytesReceived>,
667        events_received: &'a Registered<EventsReceived>,
668    ) -> Option<bool> {
669        drop(self.batch);
670
671        if self.record_size > 0 {
672            bytes_received.emit(ByteSize(self.record_size));
673        }
674
675        if !self.events.is_empty() {
676            let count = self.events.len();
677            let byte_size = self.events.estimated_json_encoded_size_of();
678            events_received.emit(CountByteSize(count, byte_size));
679
680            match self.source.out.send_batch(self.events).await {
681                Ok(_) => {
682                    if let Some(cursor) = self.cursor {
683                        finalizer.finalize(cursor, self.receiver).await;
684                    }
685                }
686                Err(_) => {
687                    emit!(StreamClosedError { count });
688                    // `out` channel is closed, don't restart journalctl.
689                    self.exiting = Some(false);
690                }
691            }
692        }
693        self.exiting
694    }
695}
696
697type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
698
699struct StartJournalctl {
700    path: PathBuf,
701    systemd_version: u32,
702    journal_dir: Option<PathBuf>,
703    journal_namespace: Option<String>,
704    current_boot_only: bool,
705    since_now: bool,
706    extra_args: Vec<String>,
707}
708
709impl StartJournalctl {
710    const fn new(
711        path: PathBuf,
712        systemd_version: u32,
713        journal_dir: Option<PathBuf>,
714        journal_namespace: Option<String>,
715        current_boot_only: bool,
716        since_now: bool,
717        extra_args: Vec<String>,
718    ) -> Self {
719        Self {
720            path,
721            systemd_version,
722            journal_dir,
723            journal_namespace,
724            current_boot_only,
725            since_now,
726            extra_args,
727        }
728    }
729
730    fn make_command(&self, checkpoint: Option<&str>) -> Command {
731        let mut command = Command::new(&self.path);
732        command.stdout(Stdio::piped());
733        command.stderr(Stdio::piped());
734        command.arg("--follow");
735        command.arg("--all");
736        command.arg("--show-cursor");
737        command.arg("--output=json");
738
739        if let Some(dir) = &self.journal_dir {
740            command.arg(format!("--directory={}", dir.display()));
741        }
742
743        if let Some(namespace) = &self.journal_namespace {
744            command.arg(format!("--namespace={namespace}"));
745        }
746
747        // By default entries from all boots are included
748        // systemd 242 introduces support for --boot=all
749        // systemd 250 lets --follow imply --boot (with no facility to override)
750        // systemd 258 allows to override --boot as implied by --follow
751        if self.current_boot_only {
752            if self.systemd_version < 250 {
753                command.arg("--boot");
754            }
755        } else if self.systemd_version >= 258 {
756            command.arg("--boot=all");
757        }
758
759        if let Some(cursor) = checkpoint {
760            command.arg(format!("--after-cursor={cursor}"));
761        } else if self.since_now {
762            command.arg("--since=now");
763        } else {
764            // journalctl --follow only outputs a few lines without a starting point
765            command.arg("--since=2000-01-01");
766        }
767
768        if !self.extra_args.is_empty() {
769            command.args(&self.extra_args);
770        }
771
772        command
773    }
774
775    fn start(
776        &mut self,
777        checkpoint: Option<&str>,
778    ) -> crate::Result<(JournalStream, JournalStream, RunningJournalctl)> {
779        let mut command = self.make_command(checkpoint);
780
781        let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
782
783        let stdout_stream = FramedRead::new(
784            child.stdout.take().unwrap(),
785            CharacterDelimitedDecoder::new(b'\n'),
786        );
787
788        let stderr = child.stderr.take().unwrap();
789        let stderr_stream = FramedRead::new(stderr, CharacterDelimitedDecoder::new(b'\n'));
790
791        Ok((
792            stdout_stream.boxed(),
793            stderr_stream.boxed(),
794            RunningJournalctl(child),
795        ))
796    }
797}
798
799struct RunningJournalctl(Child);
800
801impl Drop for RunningJournalctl {
802    fn drop(&mut self) {
803        if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
804            _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
805        }
806    }
807}
808
809async fn get_systemd_version_from_journalctl(journalctl_path: &PathBuf) -> crate::Result<u32> {
810    let stdout = Command::new(journalctl_path)
811        .arg("--version")
812        .output()
813        .await
814        .context(JournalctlSpawnSnafu)?
815        .stdout;
816
817    // output format: `systemd {version_number} ({full_version}){newline}{config ...}`
818    let stdout = String::from_utf8_lossy(&stdout);
819    Ok(stdout
820        .split_whitespace()
821        .nth(1)
822        .and_then(|s| s.parse::<u32>().ok())
823        .ok_or_else(|| BuildError::JournalctlParseVersion {
824            output: {
825                let cutoff = 40;
826                let length = stdout.chars().count();
827                format!(
828                    "{}{}",
829                    stdout.chars().take(cutoff).collect::<String>(),
830                    if length > cutoff {
831                        format!(" ..{} more char(s)", length - cutoff)
832                    } else {
833                        "".to_string()
834                    }
835                )
836            },
837        })?)
838}
839
840fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
841    match log_namespace {
842        LogNamespace::Vector => {
843            if let Some(host) = log
844                .get(metadata_path!(JournaldConfig::NAME, "metadata"))
845                .and_then(|meta| meta.get(HOSTNAME))
846            {
847                log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
848            }
849        }
850        LogNamespace::Legacy => {
851            if let Some(host) = log.remove(event_path!(HOSTNAME)) {
852                log_namespace.insert_source_metadata(
853                    JournaldConfig::NAME,
854                    log,
855                    log_schema().host_key().map(LegacyKey::Overwrite),
856                    path!("host"),
857                    host,
858                );
859            }
860        }
861    }
862
863    // Create a Utc timestamp from an existing log field if present.
864    let timestamp_value = match log_namespace {
865        LogNamespace::Vector => log
866            .get(metadata_path!(JournaldConfig::NAME, "metadata"))
867            .and_then(|meta| {
868                meta.get(SOURCE_TIMESTAMP)
869                    .or_else(|| meta.get(RECEIVED_TIMESTAMP))
870            }),
871        LogNamespace::Legacy => log
872            .get(event_path!(SOURCE_TIMESTAMP))
873            .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
874    };
875
876    let timestamp = timestamp_value
877        .filter(|&ts| ts.is_bytes())
878        .and_then(|ts| ts.as_str().unwrap().parse::<u64>().ok())
879        .map(|ts| {
880            chrono::Utc
881                .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
882                .single()
883                .expect("invalid timestamp")
884        });
885
886    // Add timestamp.
887    match log_namespace {
888        LogNamespace::Vector => {
889            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
890
891            if let Some(ts) = timestamp {
892                log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
893            }
894        }
895        LogNamespace::Legacy => {
896            if let Some(ts) = timestamp {
897                log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
898            }
899        }
900    }
901
902    // Add source type.
903    log_namespace.insert_vector_metadata(
904        log,
905        log_schema().source_type_key(),
906        path!("source_type"),
907        JournaldConfig::NAME,
908    );
909}
910
911fn create_log_event_from_record(
912    mut record: Record,
913    batch: &Option<BatchNotifier>,
914    log_namespace: LogNamespace,
915) -> LogEvent {
916    match log_namespace {
917        LogNamespace::Vector => {
918            let message_value = record
919                .remove(MESSAGE)
920                .map(|msg| Value::Bytes(Bytes::from(msg)))
921                .unwrap_or(Value::Null);
922
923            let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
924
925            // Add the remaining fields from the Record to the log event into an object to avoid collisions.
926            record.iter().for_each(|(key, value)| {
927                log.metadata_mut()
928                    .value_mut()
929                    .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
930            });
931
932            log
933        }
934        LogNamespace::Legacy => {
935            let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
936
937            if let Some(message) = log.remove(event_path!(MESSAGE)) {
938                log.maybe_insert(log_schema().message_key_target_path(), message);
939            }
940
941            log
942        }
943    }
944}
945
946/// Map the given unit name into a valid systemd unit
947/// by appending ".service" if no extension is present.
948fn fixup_unit(unit: &str) -> String {
949    if unit.contains('.') {
950        unit.into()
951    } else {
952        format!("{unit}.service")
953    }
954}
955
956fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
957    let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
958    // journalctl will output non-ASCII values using an array
959    // of integers. Look for those values and re-parse them.
960    if let Some(record) = record.as_object_mut() {
961        for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
962            *value = decode_array(value.as_array().expect("already validated"));
963        }
964    }
965    if remap {
966        record.get_mut("PRIORITY").map(remap_priority);
967    }
968    serde_json::from_value(record)
969}
970
971fn decode_array(array: &[JsonValue]) -> JsonValue {
972    decode_array_as_bytes(array).unwrap_or_else(|| {
973        let ser = serde_json::to_string(array).expect("already deserialized");
974        JsonValue::String(ser)
975    })
976}
977
978fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
979    // From the array of values, turn all the numbers into bytes, and
980    // then the bytes into a string, but return None if any value in the
981    // array was not a valid byte.
982    array
983        .iter()
984        .map(|item| {
985            item.as_u64().and_then(|num| match num {
986                num if num <= u8::MAX as u64 => Some(num as u8),
987                _ => None,
988            })
989        })
990        .collect::<Option<Vec<u8>>>()
991        .map(|array| String::from_utf8_lossy(&array).into())
992}
993
994fn remap_priority(priority: &mut JsonValue) {
995    if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
996        let text = match num {
997            0 => "EMERG",
998            1 => "ALERT",
999            2 => "CRIT",
1000            3 => "ERR",
1001            4 => "WARNING",
1002            5 => "NOTICE",
1003            6 => "INFO",
1004            7 => "DEBUG",
1005            _ => "UNKNOWN",
1006        };
1007        *priority = JsonValue::String(text.into());
1008    }
1009}
1010
1011fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
1012    match (includes.is_empty(), excludes.is_empty()) {
1013        (true, true) => false,
1014        (false, true) => !contains_match(record, includes),
1015        (true, false) => contains_match(record, excludes),
1016        (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
1017    }
1018}
1019
1020fn contains_match(record: &Record, matches: &Matches) -> bool {
1021    let f = move |(field, value)| {
1022        matches
1023            .get(field)
1024            .map(|x| x.contains(value))
1025            .unwrap_or(false)
1026    };
1027    record.iter().any(f)
1028}
1029
1030fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
1031    for (a_key, a_values) in a_matches {
1032        if let Some(b_values) = b_matches.get(a_key.as_str()) {
1033            for (a, b) in a_values
1034                .iter()
1035                .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
1036            {
1037                if a == b {
1038                    return Some((a_key.into(), b.into()));
1039                }
1040            }
1041        }
1042    }
1043    None
1044}
1045
1046enum Finalizer {
1047    Sync(SharedCheckpointer),
1048    Async(OrderedFinalizer<String>),
1049}
1050
1051impl Finalizer {
1052    fn new(
1053        acknowledgements: bool,
1054        checkpointer: SharedCheckpointer,
1055        shutdown: ShutdownSignal,
1056    ) -> Self {
1057        if acknowledgements {
1058            let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
1059            crate::spawn_in_current_span(async move {
1060                while let Some((status, cursor)) = ack_stream.next().await {
1061                    if status == BatchStatus::Delivered {
1062                        checkpointer.lock().await.set(cursor).await;
1063                    }
1064                }
1065            });
1066            Self::Async(finalizer)
1067        } else {
1068            Self::Sync(checkpointer)
1069        }
1070    }
1071
1072    async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
1073        match (self, receiver) {
1074            (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
1075            (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
1076            _ => {
1077                unreachable!("Cannot have async finalization without a receiver in journald source")
1078            }
1079        }
1080    }
1081}
1082
1083struct Checkpointer {
1084    file: File,
1085    filename: PathBuf,
1086}
1087
1088impl Checkpointer {
1089    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1090        let file = OpenOptions::new()
1091            .read(true)
1092            .write(true)
1093            .create(true)
1094            .truncate(false)
1095            .open(&filename)
1096            .await?;
1097        Ok(Checkpointer { file, filename })
1098    }
1099
1100    async fn set(&mut self, token: &str) -> Result<(), io::Error> {
1101        self.file.seek(SeekFrom::Start(0)).await?;
1102        self.file.write_all(format!("{token}\n").as_bytes()).await
1103    }
1104
1105    async fn get(&mut self) -> Result<Option<String>, io::Error> {
1106        let mut buf = Vec::<u8>::new();
1107        self.file.seek(SeekFrom::Start(0)).await?;
1108        self.file.read_to_end(&mut buf).await?;
1109        match buf.len() {
1110            0 => Ok(None),
1111            _ => {
1112                let text = String::from_utf8_lossy(&buf);
1113                Ok(text.split_once('\n').map(|(line, _)| line.to_string()))
1114            }
1115        }
1116    }
1117}
1118
1119struct StatefulCheckpointer {
1120    checkpointer: Checkpointer,
1121    cursor: Option<String>,
1122}
1123
1124impl StatefulCheckpointer {
1125    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1126        let mut checkpointer = Checkpointer::new(filename).await?;
1127        let cursor = checkpointer.get().await?;
1128        Ok(Self {
1129            checkpointer,
1130            cursor,
1131        })
1132    }
1133
1134    async fn set(&mut self, token: impl Into<String>) {
1135        let token = token.into();
1136        if let Err(error) = self.checkpointer.set(&token).await {
1137            emit!(JournaldCheckpointSetError {
1138                error,
1139                filename: self
1140                    .checkpointer
1141                    .filename
1142                    .to_str()
1143                    .unwrap_or("unknown")
1144                    .to_string(),
1145            });
1146        }
1147        self.cursor = Some(token);
1148    }
1149}
1150
1151#[derive(Clone)]
1152struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
1153
1154impl SharedCheckpointer {
1155    fn new(c: StatefulCheckpointer) -> Self {
1156        Self(Arc::new(Mutex::new(c)))
1157    }
1158
1159    async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
1160        self.0.lock().await
1161    }
1162}
1163
1164#[cfg(test)]
1165mod checkpointer_tests {
1166    use tempfile::tempdir;
1167    use tokio::fs::read_to_string;
1168
1169    use super::*;
1170
1171    #[test]
1172    fn generate_config() {
1173        crate::test_util::test_generate_config::<JournaldConfig>();
1174    }
1175
1176    #[tokio::test]
1177    async fn journald_checkpointer_works() {
1178        let tempdir = tempdir().unwrap();
1179        let mut filename = tempdir.path().to_path_buf();
1180        filename.push(CHECKPOINT_FILENAME);
1181        let mut checkpointer = Checkpointer::new(filename.clone())
1182            .await
1183            .expect("Creating checkpointer failed!");
1184
1185        assert!(checkpointer.get().await.unwrap().is_none());
1186
1187        checkpointer
1188            .set("first test")
1189            .await
1190            .expect("Setting checkpoint failed");
1191        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
1192        let contents = read_to_string(filename.clone())
1193            .await
1194            .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1195        assert!(contents.starts_with("first test\n"));
1196
1197        checkpointer
1198            .set("second")
1199            .await
1200            .expect("Setting checkpoint failed");
1201        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
1202        let contents = read_to_string(filename.clone())
1203            .await
1204            .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1205        assert!(contents.starts_with("second\n"));
1206    }
1207}
1208
1209#[cfg(test)]
1210mod tests {
1211    use std::{fs, path::Path};
1212
1213    use tempfile::tempdir;
1214    use tokio::time::{Duration, Instant, sleep, timeout};
1215    use vrl::value::{Value, kind::Collection};
1216
1217    use super::*;
1218    use crate::{
1219        config::ComponentKey,
1220        event::{Event, EventStatus},
1221        test_util::components::assert_source_compliance,
1222    };
1223
1224    const TEST_COMPONENT: &str = "journald-test";
1225    const TEST_JOURNALCTL: &str = "tests/data/journalctl";
1226
1227    async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
1228        let include_matches = create_unit_matches(iunits.to_vec());
1229        let exclude_matches = create_unit_matches(xunits.to_vec());
1230        run_journal(include_matches, exclude_matches, cursor, false).await
1231    }
1232
1233    async fn run_journal(
1234        include_matches: Matches,
1235        exclude_matches: Matches,
1236        checkpoint: Option<&str>,
1237        emit_cursor: bool,
1238    ) -> Vec<Event> {
1239        assert_source_compliance(&["protocol"], async move {
1240            let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
1241
1242            let tempdir = tempdir().unwrap();
1243            let tempdir = tempdir.path().to_path_buf();
1244
1245            if let Some(cursor) = checkpoint {
1246                let mut checkpoint_path = tempdir.clone();
1247                checkpoint_path.push(TEST_COMPONENT);
1248                fs::create_dir(&checkpoint_path).unwrap();
1249                checkpoint_path.push(CHECKPOINT_FILENAME);
1250
1251                let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1252                    .await
1253                    .expect("Creating checkpointer failed!");
1254
1255                checkpointer
1256                    .set(cursor)
1257                    .await
1258                    .expect("Could not set checkpoint");
1259            }
1260
1261            let (cx, shutdown) =
1262                SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1263            let config = JournaldConfig {
1264                journalctl_path: Some(TEST_JOURNALCTL.into()),
1265                include_matches,
1266                exclude_matches,
1267                data_dir: Some(tempdir),
1268                remap_priority: true,
1269                acknowledgements: false.into(),
1270                emit_cursor,
1271                ..Default::default()
1272            };
1273            let source = config.build(cx).await.unwrap();
1274            tokio::spawn(async move { source.await.unwrap() });
1275
1276            // Hack: Sleep to ensure journalctl process starts and emits events before shutdown.
1277            sleep(Duration::from_secs(1)).await;
1278            shutdown
1279                .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
1280                .await;
1281
1282            timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
1283        })
1284        .await
1285    }
1286
1287    fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
1288        let units: HashSet<String> = units.into_iter().map(Into::into).collect();
1289        let mut map = HashMap::new();
1290        if !units.is_empty() {
1291            map.insert(String::from(SYSTEMD_UNIT), units);
1292        }
1293        map
1294    }
1295
1296    fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
1297        let mut matches: Matches = HashMap::new();
1298        for (field, value) in conditions {
1299            matches
1300                .entry(field.into())
1301                .or_default()
1302                .insert(value.into());
1303        }
1304        matches
1305    }
1306
1307    #[tokio::test]
1308    async fn reads_journal() {
1309        let received = run_with_units(&[], &[], None).await;
1310        assert_eq!(received.len(), 8);
1311        assert_eq!(
1312            message(&received[0]),
1313            Value::Bytes("System Initialization".into())
1314        );
1315        assert_eq!(
1316            received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1317            "journald".into()
1318        );
1319        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
1320        assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
1321        assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
1322        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
1323        assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
1324    }
1325
1326    #[tokio::test]
1327    async fn includes_units() {
1328        let received = run_with_units(&["unit.service"], &[], None).await;
1329        assert_eq!(received.len(), 1);
1330        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1331    }
1332
1333    #[tokio::test]
1334    async fn excludes_units() {
1335        let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
1336        assert_eq!(received.len(), 6);
1337        assert_eq!(
1338            message(&received[0]),
1339            Value::Bytes("System Initialization".into())
1340        );
1341        assert_eq!(
1342            message(&received[1]),
1343            Value::Bytes("Missing timestamp".into())
1344        );
1345        assert_eq!(
1346            message(&received[2]),
1347            Value::Bytes("Different timestamps".into())
1348        );
1349    }
1350
1351    #[tokio::test]
1352    async fn emits_cursor() {
1353        let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1354        assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1355        assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1356        assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1357    }
1358
1359    #[tokio::test]
1360    async fn includes_matches() {
1361        let matches = create_matches(vec![("PRIORITY", "ERR")]);
1362        let received = run_journal(matches, HashMap::new(), None, false).await;
1363        assert_eq!(received.len(), 2);
1364        assert_eq!(
1365            message(&received[0]),
1366            Value::Bytes("Different timestamps".into())
1367        );
1368        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
1369        assert_eq!(
1370            message(&received[1]),
1371            Value::Bytes("Non-ASCII in other field".into())
1372        );
1373        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1374    }
1375
1376    #[tokio::test]
1377    async fn includes_kernel() {
1378        let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1379        let received = run_journal(matches, HashMap::new(), None, false).await;
1380        assert_eq!(received.len(), 1);
1381        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
1382        assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
1383    }
1384
1385    #[tokio::test]
1386    async fn excludes_matches() {
1387        let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1388        let received = run_journal(HashMap::new(), matches, None, false).await;
1389        assert_eq!(received.len(), 5);
1390        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
1391        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
1392        assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
1393        assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
1394        assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
1395    }
1396
1397    #[tokio::test]
1398    async fn handles_checkpoint() {
1399        let received = run_with_units(&[], &[], Some("1")).await;
1400        assert_eq!(received.len(), 7);
1401        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1402        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
1403    }
1404
1405    #[tokio::test]
1406    async fn parses_array_messages() {
1407        let received = run_with_units(&["badunit.service"], &[], None).await;
1408        assert_eq!(received.len(), 1);
1409        assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
1410    }
1411
1412    #[tokio::test]
1413    async fn parses_array_fields() {
1414        let received = run_with_units(&["syslog.service"], &[], None).await;
1415        assert_eq!(received.len(), 1);
1416        assert_eq!(
1417            received[0].as_log()["SYSLOG_RAW"],
1418            Value::Bytes("¿World?".into())
1419        );
1420    }
1421
1422    #[tokio::test]
1423    async fn parses_string_sequences() {
1424        let received = run_with_units(&["NetworkManager.service"], &[], None).await;
1425        assert_eq!(received.len(), 1);
1426        assert_eq!(
1427            received[0].as_log()["SYSLOG_FACILITY"],
1428            Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
1429        );
1430    }
1431
1432    #[tokio::test]
1433    async fn handles_missing_timestamp() {
1434        let received = run_with_units(&["stdout"], &[], None).await;
1435        assert_eq!(received.len(), 2);
1436        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
1437        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1438    }
1439
1440    #[tokio::test]
1441    async fn handles_acknowledgements() {
1442        let (tx, mut rx) = SourceSender::new_test();
1443
1444        let tempdir = tempdir().unwrap();
1445        let tempdir = tempdir.path().to_path_buf();
1446        let mut checkpoint_path = tempdir.clone();
1447        checkpoint_path.push(TEST_COMPONENT);
1448        fs::create_dir(&checkpoint_path).unwrap();
1449        checkpoint_path.push(CHECKPOINT_FILENAME);
1450
1451        let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1452            .await
1453            .expect("Creating checkpointer failed!");
1454
1455        let config = JournaldConfig {
1456            journalctl_path: Some(TEST_JOURNALCTL.into()),
1457            data_dir: Some(tempdir),
1458            remap_priority: true,
1459            acknowledgements: true.into(),
1460            ..Default::default()
1461        };
1462        let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1463        let source = config.build(cx).await.unwrap();
1464        tokio::spawn(async move { source.await.unwrap() });
1465
1466        // Make sure the checkpointer cursor is empty
1467        assert_eq!(checkpointer.get().await.unwrap(), None);
1468
1469        // Hack: Sleep to ensure journalctl process starts and emits events.
1470        sleep(Duration::from_secs(1)).await;
1471
1472        // Acknowledge all the received events.
1473        let mut count = 0;
1474        while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
1475            // The checkpointer shouldn't set the cursor until the end of the batch.
1476            assert_eq!(checkpointer.get().await.unwrap(), None);
1477            event.metadata().update_status(EventStatus::Delivered);
1478            count += 1;
1479        }
1480        assert_eq!(count, 8);
1481
1482        sleep(Duration::from_millis(100)).await;
1483        assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
1484    }
1485
1486    #[test]
1487    fn filter_matches_works_correctly() {
1488        let empty: Matches = HashMap::new();
1489        let includes = create_unit_matches(vec!["one", "two"]);
1490        let excludes = create_unit_matches(vec!["foo", "bar"]);
1491
1492        let zero = HashMap::new();
1493        assert!(!filter_matches(&zero, &empty, &empty));
1494        assert!(filter_matches(&zero, &includes, &empty));
1495        assert!(!filter_matches(&zero, &empty, &excludes));
1496        assert!(filter_matches(&zero, &includes, &excludes));
1497        let mut one = HashMap::new();
1498        one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
1499        assert!(!filter_matches(&one, &empty, &empty));
1500        assert!(!filter_matches(&one, &includes, &empty));
1501        assert!(!filter_matches(&one, &empty, &excludes));
1502        assert!(!filter_matches(&one, &includes, &excludes));
1503        let mut two = HashMap::new();
1504        two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
1505        assert!(!filter_matches(&two, &empty, &empty));
1506        assert!(filter_matches(&two, &includes, &empty));
1507        assert!(filter_matches(&two, &empty, &excludes));
1508        assert!(filter_matches(&two, &includes, &excludes));
1509    }
1510
1511    #[test]
1512    fn merges_units_and_matches_option() {
1513        let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
1514        let include_matches = create_matches(vec![
1515            ("_SYSTEMD_UNIT", "three.service"),
1516            ("_TRANSPORT", "kernel"),
1517        ]);
1518
1519        let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
1520        let exclude_matches = create_matches(vec![
1521            ("_SYSTEMD_UNIT", "baz.service"),
1522            ("PRIORITY", "DEBUG"),
1523        ]);
1524
1525        let journald_config = JournaldConfig {
1526            include_units,
1527            include_matches,
1528            exclude_units,
1529            exclude_matches,
1530            ..Default::default()
1531        };
1532
1533        let hashset =
1534            |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
1535
1536        let matches = journald_config.merged_include_matches();
1537        let units = matches.get("_SYSTEMD_UNIT").unwrap();
1538        assert_eq!(
1539            units,
1540            &hashset(&["one.service", "two.service", "three.service"])
1541        );
1542        let units = matches.get("_TRANSPORT").unwrap();
1543        assert_eq!(units, &hashset(&["kernel"]));
1544
1545        let matches = journald_config.merged_exclude_matches();
1546        let units = matches.get("_SYSTEMD_UNIT").unwrap();
1547        assert_eq!(
1548            units,
1549            &hashset(&["foo.service", "bar.service", "baz.service"])
1550        );
1551        let units = matches.get("PRIORITY").unwrap();
1552        assert_eq!(units, &hashset(&["DEBUG"]));
1553    }
1554
1555    #[test]
1556    fn find_duplicate_match_works_correctly() {
1557        let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1558        let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1559        let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
1560        assert_eq!(field, "_TRANSPORT");
1561        assert_eq!(value, "kernel");
1562
1563        let empty = HashMap::new();
1564        let actual = find_duplicate_match(&empty, &empty);
1565        assert!(actual.is_none());
1566
1567        let actual = find_duplicate_match(&include_matches, &empty);
1568        assert!(actual.is_none());
1569
1570        let actual = find_duplicate_match(&empty, &exclude_matches);
1571        assert!(actual.is_none());
1572    }
1573
1574    #[test]
1575    fn command_options() {
1576        let path = PathBuf::from("journalctl");
1577
1578        let systemd_version = 239;
1579        let journal_dir = None;
1580        let journal_namespace = None;
1581        let current_boot_only = false;
1582        let cursor = None;
1583        let since_now = false;
1584        let extra_args = vec![];
1585
1586        let command = create_command(
1587            &path,
1588            systemd_version,
1589            journal_dir,
1590            journal_namespace,
1591            current_boot_only,
1592            since_now,
1593            cursor,
1594            extra_args,
1595        );
1596        let cmd_line = format!("{command:?}");
1597        assert!(!cmd_line.contains("--directory="));
1598        assert!(!cmd_line.contains("--namespace="));
1599        assert!(!cmd_line.contains("--boot=all"));
1600        assert!(cmd_line.contains("--since=2000-01-01"));
1601
1602        let journal_dir = None;
1603        let journal_namespace = None;
1604        let since_now = true;
1605        let extra_args = vec![];
1606
1607        let command = create_command(
1608            &path,
1609            systemd_version,
1610            journal_dir,
1611            journal_namespace,
1612            current_boot_only,
1613            since_now,
1614            cursor,
1615            extra_args,
1616        );
1617        let cmd_line = format!("{command:?}");
1618        assert!(cmd_line.contains("--since=now"));
1619
1620        let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
1621        let journal_namespace = Some(String::from("my_namespace"));
1622        let current_boot_only = true;
1623        let cursor = Some("2021-01-01");
1624        let extra_args = vec!["--merge".to_string()];
1625
1626        let command = create_command(
1627            &path,
1628            systemd_version,
1629            journal_dir,
1630            journal_namespace,
1631            current_boot_only,
1632            since_now,
1633            cursor,
1634            extra_args,
1635        );
1636        let cmd_line = format!("{command:?}");
1637        assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
1638        assert!(cmd_line.contains("--namespace=my_namespace"));
1639        assert!(cmd_line.contains("--boot"));
1640        assert!(cmd_line.contains("--after-cursor="));
1641        assert!(cmd_line.contains("--merge"));
1642
1643        let systemd_version = 258;
1644        let journal_dir = None;
1645        let journal_namespace = None;
1646        let current_boot_only = false;
1647        let extra_args = vec![];
1648
1649        let command = create_command(
1650            &path,
1651            systemd_version,
1652            journal_dir,
1653            journal_namespace,
1654            current_boot_only,
1655            since_now,
1656            cursor,
1657            extra_args,
1658        );
1659        let cmd_line = format!("{command:?}");
1660        assert!(cmd_line.contains("--boot=all"));
1661    }
1662
1663    #[allow(clippy::too_many_arguments)]
1664    fn create_command(
1665        path: &Path,
1666        systemd_version: u32,
1667        journal_dir: Option<PathBuf>,
1668        journal_namespace: Option<String>,
1669        current_boot_only: bool,
1670        since_now: bool,
1671        cursor: Option<&str>,
1672        extra_args: Vec<String>,
1673    ) -> Command {
1674        StartJournalctl::new(
1675            path.into(),
1676            systemd_version,
1677            journal_dir,
1678            journal_namespace,
1679            current_boot_only,
1680            since_now,
1681            extra_args,
1682        )
1683        .make_command(cursor)
1684    }
1685
1686    fn message(event: &Event) -> Value {
1687        event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
1688    }
1689
1690    fn timestamp(event: &Event) -> Value {
1691        event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
1692    }
1693
1694    fn cursor(event: &Event) -> Value {
1695        event.as_log()[CURSOR].clone()
1696    }
1697
1698    fn value_ts(secs: i64, usecs: u32) -> Value {
1699        Value::Timestamp(
1700            chrono::Utc
1701                .timestamp_opt(secs, usecs)
1702                .single()
1703                .expect("invalid timestamp"),
1704        )
1705    }
1706
1707    fn priority(event: &Event) -> Value {
1708        event.as_log()["PRIORITY"].clone()
1709    }
1710
1711    #[test]
1712    fn output_schema_definition_vector_namespace() {
1713        let config = JournaldConfig {
1714            log_namespace: Some(true),
1715            ..Default::default()
1716        };
1717
1718        let definitions = config
1719            .outputs(LogNamespace::Vector)
1720            .remove(0)
1721            .schema_definition(true);
1722
1723        let expected_definition =
1724            Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
1725                .with_metadata_field(
1726                    &owned_value_path!("vector", "source_type"),
1727                    Kind::bytes(),
1728                    None,
1729                )
1730                .with_metadata_field(
1731                    &owned_value_path!("vector", "ingest_timestamp"),
1732                    Kind::timestamp(),
1733                    None,
1734                )
1735                .with_metadata_field(
1736                    &owned_value_path!(JournaldConfig::NAME, "metadata"),
1737                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1738                    None,
1739                )
1740                .with_metadata_field(
1741                    &owned_value_path!(JournaldConfig::NAME, "timestamp"),
1742                    Kind::timestamp().or_undefined(),
1743                    Some("timestamp"),
1744                )
1745                .with_metadata_field(
1746                    &owned_value_path!(JournaldConfig::NAME, "host"),
1747                    Kind::bytes().or_undefined(),
1748                    Some("host"),
1749                );
1750
1751        assert_eq!(definitions, Some(expected_definition))
1752    }
1753
1754    #[test]
1755    fn output_schema_definition_legacy_namespace() {
1756        let config = JournaldConfig::default();
1757
1758        let definitions = config
1759            .outputs(LogNamespace::Legacy)
1760            .remove(0)
1761            .schema_definition(true);
1762
1763        let expected_definition = Definition::new_with_default_metadata(
1764            Kind::object(Collection::empty()),
1765            [LogNamespace::Legacy],
1766        )
1767        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1768        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1769        .with_event_field(
1770            &owned_value_path!("host"),
1771            Kind::bytes().or_undefined(),
1772            Some("host"),
1773        )
1774        .unknown_fields(Kind::bytes());
1775
1776        assert_eq!(definitions, Some(expected_definition))
1777    }
1778
1779    fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
1780        let record = r#"{
1781            "PRIORITY":"6",
1782            "SYSLOG_FACILITY":"3",
1783            "SYSLOG_IDENTIFIER":"ntpd",
1784            "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
1785            "_CAP_EFFECTIVE":"3fffffffff",
1786            "_CMDLINE":"ntpd: [priv]",
1787            "_COMM":"ntpd",
1788            "_EXE":"/usr/sbin/ntpd",
1789            "_GID":"0",
1790            "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
1791            "_PID":"2156",
1792            "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
1793            "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
1794            "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
1795            "_SYSTEMD_SLICE":"system.slice",
1796            "_SYSTEMD_UNIT":"ntpd.service",
1797            "_TRANSPORT":"stdout",
1798            "_UID":"0",
1799            "__MONOTONIC_TIMESTAMP":"98694000446",
1800            "__REALTIME_TIMESTAMP":"1564173027000443",
1801            "host":"my-host.local",
1802            "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
1803            "source_type":"journald"
1804        }"#;
1805
1806        let json: serde_json::Value = serde_json::from_str(record).unwrap();
1807        let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
1808
1809        event.as_mut_log().insert("timestamp", chrono::Utc::now());
1810
1811        let definitions = config.outputs(namespace).remove(0).schema_definition(true);
1812
1813        definitions.unwrap().assert_valid_for_event(&event);
1814    }
1815
1816    #[test]
1817    fn matches_schema_legacy() {
1818        let config = JournaldConfig::default();
1819
1820        matches_schema(&config, LogNamespace::Legacy)
1821    }
1822}