1use std::{
2 collections::{HashMap, HashSet},
3 io::SeekFrom,
4 path::PathBuf,
5 process::Stdio,
6 str::FromStr,
7 sync::{Arc, LazyLock},
8 time::Duration,
9};
10
11use bytes::Bytes;
12use chrono::{TimeZone, Utc};
13use futures::{StreamExt, poll, stream::BoxStream, task::Poll};
14use nix::{
15 sys::signal::{Signal, kill},
16 unistd::Pid,
17};
18use serde_json::{Error as JsonError, Value as JsonValue};
19use snafu::{ResultExt, Snafu};
20use tokio::{
21 fs::{File, OpenOptions},
22 io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23 process::{Child, Command},
24 sync::{Mutex, MutexGuard},
25 time::sleep,
26};
27use tokio_util::codec::FramedRead;
28use vector_lib::{
29 EstimatedJsonEncodedSizeOf,
30 codecs::{CharacterDelimitedDecoder, decoding::BoxedFramingError},
31 config::{LegacyKey, LogNamespace},
32 configurable::configurable_component,
33 finalizer::OrderedFinalizer,
34 internal_event::{
35 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
36 },
37 lookup::{metadata_path, owned_value_path, path},
38 schema::Definition,
39};
40use vrl::{
41 event_path,
42 value::{Kind, Value, kind::Collection},
43};
44
45use crate::{
46 SourceSender,
47 config::{
48 DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
49 log_schema,
50 },
51 event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
52 internal_events::{
53 EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
54 JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
55 StreamClosedError,
56 },
57 serde::bool_or_struct,
58 shutdown::ShutdownSignal,
59};
60
61const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
62
63const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
64const CURSOR: &str = "__CURSOR";
65const HOSTNAME: &str = "_HOSTNAME";
66const MESSAGE: &str = "MESSAGE";
67const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
68const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
69const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
70
71const BACKOFF_DURATION: Duration = Duration::from_secs(1);
72
73static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
74
75#[derive(Debug, Snafu)]
76enum BuildError {
77 #[snafu(display("journalctl failed to execute: {}", source))]
78 JournalctlSpawn { source: io::Error },
79 #[snafu(display("failed to parse output of `journalctl --version`: {:?}", output))]
80 JournalctlParseVersion { output: String },
81 #[snafu(display(
82 "The unit {:?} is duplicated in both include_units and exclude_units",
83 unit
84 ))]
85 DuplicatedUnit { unit: String },
86 #[snafu(display(
87 "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
88 field,
89 value,
90 ))]
91 DuplicatedMatches { field: String, value: String },
92 #[snafu(display(
93 "`current_boot_only: false` not supported for systemd versions 250 through 257 (got {}).",
94 systemd_version
95 ))]
96 AllBootsNotSupported { systemd_version: u32 },
97}
98
99type Matches = HashMap<String, HashSet<String>>;
100
101#[configurable_component(source("journald", "Collect logs from JournalD."))]
103#[derive(Clone, Debug)]
104#[serde(deny_unknown_fields)]
105pub struct JournaldConfig {
106 #[serde(default)]
108 pub since_now: bool,
109
110 #[serde(default = "crate::serde::default_true")]
112 pub current_boot_only: bool,
113
114 #[serde(default)]
124 #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
125 pub include_units: Vec<String>,
126
127 #[serde(default)]
132 #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
133 pub exclude_units: Vec<String>,
134
135 #[serde(default)]
141 #[configurable(metadata(
142 docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
143 ))]
144 #[configurable(metadata(docs::examples = "matches_examples()"))]
145 pub include_matches: Matches,
146
147 #[serde(default)]
152 #[configurable(metadata(
153 docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
154 ))]
155 #[configurable(metadata(docs::examples = "matches_examples()"))]
156 pub exclude_matches: Matches,
157
158 #[serde(default)]
167 #[configurable(metadata(docs::examples = "/var/lib/vector"))]
168 #[configurable(metadata(docs::human_name = "Data Directory"))]
169 pub data_dir: Option<PathBuf>,
170
171 #[serde(default)]
175 #[configurable(metadata(docs::examples = "--merge"))]
176 pub extra_args: Vec<String>,
177
178 #[serde(default = "default_batch_size")]
182 #[configurable(metadata(docs::type_unit = "events"))]
183 pub batch_size: usize,
184
185 #[serde(default)]
189 pub journalctl_path: Option<PathBuf>,
190
191 #[serde(default)]
195 pub journal_directory: Option<PathBuf>,
196
197 #[serde(default)]
205 pub journal_namespace: Option<String>,
206
207 #[configurable(derived)]
208 #[serde(default, deserialize_with = "bool_or_struct")]
209 acknowledgements: SourceAcknowledgementsConfig,
210
211 #[serde(default)]
215 #[configurable(
216 deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
217 )]
218 remap_priority: bool,
219
220 #[configurable(metadata(docs::hidden))]
222 #[serde(default)]
223 log_namespace: Option<bool>,
224
225 #[serde(default = "crate::serde::default_false")]
230 emit_cursor: bool,
231}
232
233const fn default_batch_size() -> usize {
234 16
235}
236
237fn matches_examples() -> HashMap<String, Vec<String>> {
238 HashMap::<_, _>::from_iter([
239 (
240 "_SYSTEMD_UNIT".to_owned(),
241 vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
242 ),
243 ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
244 ])
245}
246
247impl JournaldConfig {
248 fn merged_include_matches(&self) -> Matches {
249 Self::merge_units(&self.include_matches, &self.include_units)
250 }
251
252 fn merged_exclude_matches(&self) -> Matches {
253 Self::merge_units(&self.exclude_matches, &self.exclude_units)
254 }
255
256 fn merge_units(matches: &Matches, units: &[String]) -> Matches {
257 let mut matches = matches.clone();
258 for unit in units {
259 let entry = matches.entry(String::from(SYSTEMD_UNIT));
260 entry.or_default().insert(fixup_unit(unit));
261 }
262 matches
263 }
264
265 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
267 let schema_definition = match log_namespace {
268 LogNamespace::Vector => Definition::new_with_default_metadata(
269 Kind::bytes().or_null(),
270 [LogNamespace::Vector],
271 ),
272 LogNamespace::Legacy => Definition::new_with_default_metadata(
273 Kind::object(Collection::empty()),
274 [LogNamespace::Legacy],
275 ),
276 };
277
278 let mut schema_definition = schema_definition
279 .with_standard_vector_source_metadata()
280 .with_source_metadata(
282 JournaldConfig::NAME,
283 None,
284 &owned_value_path!("metadata"),
285 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
286 None,
287 )
288 .with_source_metadata(
289 JournaldConfig::NAME,
290 None,
291 &owned_value_path!("timestamp"),
292 Kind::timestamp().or_undefined(),
293 Some("timestamp"),
294 )
295 .with_source_metadata(
296 JournaldConfig::NAME,
297 log_schema().host_key().cloned().map(LegacyKey::Overwrite),
298 &owned_value_path!("host"),
299 Kind::bytes().or_undefined(),
300 Some("host"),
301 );
302
303 if log_namespace == LogNamespace::Legacy {
305 schema_definition = schema_definition.unknown_fields(Kind::bytes());
306 }
307
308 schema_definition
309 }
310}
311
312impl Default for JournaldConfig {
313 fn default() -> Self {
314 Self {
315 since_now: false,
316 current_boot_only: true,
317 include_units: vec![],
318 exclude_units: vec![],
319 include_matches: Default::default(),
320 exclude_matches: Default::default(),
321 data_dir: None,
322 batch_size: default_batch_size(),
323 journalctl_path: None,
324 journal_directory: None,
325 journal_namespace: None,
326 extra_args: vec![],
327 acknowledgements: Default::default(),
328 remap_priority: false,
329 log_namespace: None,
330 emit_cursor: false,
331 }
332 }
333}
334
335impl_generate_config_from_default!(JournaldConfig);
336
337type Record = HashMap<String, String>;
338
339#[async_trait::async_trait]
340#[typetag::serde(name = "journald")]
341impl SourceConfig for JournaldConfig {
342 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
343 if self.remap_priority {
344 warn!(
345 "DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead."
346 );
347 }
348
349 let data_dir = cx
350 .globals
351 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
353
354 if let Some(unit) = self
355 .include_units
356 .iter()
357 .find(|unit| self.exclude_units.contains(unit))
358 {
359 let unit = unit.into();
360 return Err(BuildError::DuplicatedUnit { unit }.into());
361 }
362
363 let include_matches = self.merged_include_matches();
364 let exclude_matches = self.merged_exclude_matches();
365
366 if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
367 return Err(BuildError::DuplicatedMatches { field, value }.into());
368 }
369
370 let mut checkpoint_path = data_dir;
371 checkpoint_path.push(CHECKPOINT_FILENAME);
372
373 let journalctl_path = self
374 .journalctl_path
375 .clone()
376 .unwrap_or_else(|| JOURNALCTL.clone());
377
378 let systemd_version = get_systemd_version_from_journalctl(&journalctl_path).await?;
379
380 if !self.current_boot_only && (250..=257).contains(&systemd_version) {
381 return Err(BuildError::AllBootsNotSupported { systemd_version }.into());
383 }
384
385 let starter = StartJournalctl::new(
386 journalctl_path,
387 systemd_version,
388 self.journal_directory.clone(),
389 self.journal_namespace.clone(),
390 self.current_boot_only,
391 self.since_now,
392 self.extra_args.clone(),
393 );
394
395 let batch_size = self.batch_size;
396 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
397 let log_namespace = cx.log_namespace(self.log_namespace);
398
399 Ok(Box::pin(
400 JournaldSource {
401 include_matches,
402 exclude_matches,
403 checkpoint_path,
404 batch_size,
405 remap_priority: self.remap_priority,
406 out: cx.out,
407 acknowledgements,
408 starter,
409 log_namespace,
410 emit_cursor: self.emit_cursor,
411 }
412 .run_shutdown(cx.shutdown),
413 ))
414 }
415
416 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
417 let schema_definition =
418 self.schema_definition(global_log_namespace.merge(self.log_namespace));
419
420 vec![SourceOutput::new_maybe_logs(
421 DataType::Log,
422 schema_definition,
423 )]
424 }
425
426 fn can_acknowledge(&self) -> bool {
427 true
428 }
429}
430
431struct JournaldSource {
432 include_matches: Matches,
433 exclude_matches: Matches,
434 checkpoint_path: PathBuf,
435 batch_size: usize,
436 remap_priority: bool,
437 out: SourceSender,
438 acknowledgements: bool,
439 starter: StartJournalctl,
440 log_namespace: LogNamespace,
441 emit_cursor: bool,
442}
443
444impl JournaldSource {
445 async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
446 let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
447 .await
448 .map_err(|error| {
449 emit!(JournaldCheckpointFileOpenError {
450 error,
451 path: self
452 .checkpoint_path
453 .to_str()
454 .unwrap_or("unknown")
455 .to_string(),
456 });
457 })?;
458
459 let checkpointer = SharedCheckpointer::new(checkpointer);
460 let finalizer = Finalizer::new(
461 self.acknowledgements,
462 checkpointer.clone(),
463 shutdown.clone(),
464 );
465
466 self.run(checkpointer, finalizer, shutdown).await;
467
468 Ok(())
469 }
470
471 async fn run(
472 mut self,
473 checkpointer: SharedCheckpointer,
474 finalizer: Finalizer,
475 mut shutdown: ShutdownSignal,
476 ) {
477 loop {
478 if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
479 break;
480 }
481
482 info!("Starting journalctl.");
483 let cursor = checkpointer.lock().await.cursor.clone();
484 match self.starter.start(cursor.as_deref()) {
485 Ok((stdout_stream, stderr_stream, running)) => {
486 if !self
487 .run_stream(stdout_stream, stderr_stream, &finalizer, shutdown.clone())
488 .await
489 {
490 return;
491 }
492 drop(running);
494 }
495 Err(error) => {
496 emit!(JournaldStartJournalctlError { error });
497 }
498 }
499
500 tokio::select! {
503 _ = &mut shutdown => break,
504 _ = sleep(BACKOFF_DURATION) => (),
505 }
506 }
507 }
508
509 async fn run_stream<'a>(
512 &'a mut self,
513 mut stdout_stream: JournalStream,
514 stderr_stream: JournalStream,
515 finalizer: &'a Finalizer,
516 mut shutdown: ShutdownSignal,
517 ) -> bool {
518 let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
519 let events_received = register!(EventsReceived);
520
521 let stderr_handler = crate::spawn_in_current_span(Self::handle_stderr(stderr_stream));
523
524 let batch_size = self.batch_size;
525 let result = loop {
526 let mut batch = Batch::new(self);
527
528 while batch.events.is_empty() {
531 let item = tokio::select! {
532 _ = &mut shutdown => {
533 stderr_handler.abort();
534 return false;
535 },
536 item = stdout_stream.next() => item,
537 };
538 if !batch.handle_next(item) {
539 stderr_handler.abort();
540 return true;
541 }
542 }
543
544 let timeout = tokio::time::sleep(BATCH_TIMEOUT);
545 tokio::pin!(timeout);
546
547 for _ in 1..batch_size {
548 tokio::select! {
549 _ = &mut timeout => break,
550 result = stdout_stream.next() => if !batch.handle_next(result) {
551 break;
552 }
553 }
554 }
555 if let Some(x) = batch
556 .finish(finalizer, &bytes_received, &events_received)
557 .await
558 {
559 break x;
560 }
561 };
562
563 stderr_handler.abort();
564 result
565 }
566
567 async fn handle_stderr(mut stderr_stream: JournalStream) {
569 while let Some(result) = stderr_stream.next().await {
570 match result {
571 Ok(line) => {
572 let line_str = String::from_utf8_lossy(&line);
573 let trimmed = line_str.trim();
574 if !trimmed.is_empty() {
575 warn!("Warning journalctl stderr: {trimmed}");
576 }
577 }
578 Err(err) => {
579 error!("Error reading journalctl stderr: {err}");
580 break;
581 }
582 }
583 }
584 }
585}
586
587struct Batch<'a> {
588 events: Vec<LogEvent>,
589 record_size: usize,
590 exiting: Option<bool>,
591 batch: Option<BatchNotifier>,
592 receiver: Option<BatchStatusReceiver>,
593 source: &'a mut JournaldSource,
594 cursor: Option<String>,
595}
596
597impl<'a> Batch<'a> {
598 fn new(source: &'a mut JournaldSource) -> Self {
599 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
600 Self {
601 events: Vec::new(),
602 record_size: 0,
603 exiting: None,
604 batch,
605 receiver,
606 source,
607 cursor: None,
608 }
609 }
610
611 fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
612 match result {
613 None => {
614 warn!("Journalctl process stopped.");
615 self.exiting = Some(true);
616 false
617 }
618 Some(Err(error)) => {
619 emit!(JournaldReadError { error });
620 false
621 }
622 Some(Ok(bytes)) => {
623 match decode_record(&bytes, self.source.remap_priority) {
624 Ok(mut record) => {
625 if self.source.emit_cursor {
626 if let Some(tmp) = record.get(CURSOR) {
627 self.cursor = Some(tmp.clone());
628 }
629 } else if let Some(tmp) = record.remove(CURSOR) {
630 self.cursor = Some(tmp);
631 }
632
633 if !filter_matches(
634 &record,
635 &self.source.include_matches,
636 &self.source.exclude_matches,
637 ) {
638 self.record_size += bytes.len();
639
640 let mut event = create_log_event_from_record(
641 record,
642 &self.batch,
643 self.source.log_namespace,
644 );
645
646 enrich_log_event(&mut event, self.source.log_namespace);
647
648 self.events.push(event);
649 }
650 }
651 Err(error) => {
652 emit!(JournaldInvalidRecordError {
653 error,
654 text: String::from_utf8_lossy(&bytes).into_owned()
655 });
656 }
657 }
658 true
659 }
660 }
661 }
662
663 async fn finish(
664 mut self,
665 finalizer: &Finalizer,
666 bytes_received: &'a Registered<BytesReceived>,
667 events_received: &'a Registered<EventsReceived>,
668 ) -> Option<bool> {
669 drop(self.batch);
670
671 if self.record_size > 0 {
672 bytes_received.emit(ByteSize(self.record_size));
673 }
674
675 if !self.events.is_empty() {
676 let count = self.events.len();
677 let byte_size = self.events.estimated_json_encoded_size_of();
678 events_received.emit(CountByteSize(count, byte_size));
679
680 match self.source.out.send_batch(self.events).await {
681 Ok(_) => {
682 if let Some(cursor) = self.cursor {
683 finalizer.finalize(cursor, self.receiver).await;
684 }
685 }
686 Err(_) => {
687 emit!(StreamClosedError { count });
688 self.exiting = Some(false);
690 }
691 }
692 }
693 self.exiting
694 }
695}
696
697type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
698
699struct StartJournalctl {
700 path: PathBuf,
701 systemd_version: u32,
702 journal_dir: Option<PathBuf>,
703 journal_namespace: Option<String>,
704 current_boot_only: bool,
705 since_now: bool,
706 extra_args: Vec<String>,
707}
708
709impl StartJournalctl {
710 const fn new(
711 path: PathBuf,
712 systemd_version: u32,
713 journal_dir: Option<PathBuf>,
714 journal_namespace: Option<String>,
715 current_boot_only: bool,
716 since_now: bool,
717 extra_args: Vec<String>,
718 ) -> Self {
719 Self {
720 path,
721 systemd_version,
722 journal_dir,
723 journal_namespace,
724 current_boot_only,
725 since_now,
726 extra_args,
727 }
728 }
729
730 fn make_command(&self, checkpoint: Option<&str>) -> Command {
731 let mut command = Command::new(&self.path);
732 command.stdout(Stdio::piped());
733 command.stderr(Stdio::piped());
734 command.arg("--follow");
735 command.arg("--all");
736 command.arg("--show-cursor");
737 command.arg("--output=json");
738
739 if let Some(dir) = &self.journal_dir {
740 command.arg(format!("--directory={}", dir.display()));
741 }
742
743 if let Some(namespace) = &self.journal_namespace {
744 command.arg(format!("--namespace={namespace}"));
745 }
746
747 if self.current_boot_only {
752 if self.systemd_version < 250 {
753 command.arg("--boot");
754 }
755 } else if self.systemd_version >= 258 {
756 command.arg("--boot=all");
757 }
758
759 if let Some(cursor) = checkpoint {
760 command.arg(format!("--after-cursor={cursor}"));
761 } else if self.since_now {
762 command.arg("--since=now");
763 } else {
764 command.arg("--since=2000-01-01");
766 }
767
768 if !self.extra_args.is_empty() {
769 command.args(&self.extra_args);
770 }
771
772 command
773 }
774
775 fn start(
776 &mut self,
777 checkpoint: Option<&str>,
778 ) -> crate::Result<(JournalStream, JournalStream, RunningJournalctl)> {
779 let mut command = self.make_command(checkpoint);
780
781 let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
782
783 let stdout_stream = FramedRead::new(
784 child.stdout.take().unwrap(),
785 CharacterDelimitedDecoder::new(b'\n'),
786 );
787
788 let stderr = child.stderr.take().unwrap();
789 let stderr_stream = FramedRead::new(stderr, CharacterDelimitedDecoder::new(b'\n'));
790
791 Ok((
792 stdout_stream.boxed(),
793 stderr_stream.boxed(),
794 RunningJournalctl(child),
795 ))
796 }
797}
798
799struct RunningJournalctl(Child);
800
801impl Drop for RunningJournalctl {
802 fn drop(&mut self) {
803 if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
804 _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
805 }
806 }
807}
808
809async fn get_systemd_version_from_journalctl(journalctl_path: &PathBuf) -> crate::Result<u32> {
810 let stdout = Command::new(journalctl_path)
811 .arg("--version")
812 .output()
813 .await
814 .context(JournalctlSpawnSnafu)?
815 .stdout;
816
817 let stdout = String::from_utf8_lossy(&stdout);
819 Ok(stdout
820 .split_whitespace()
821 .nth(1)
822 .and_then(|s| s.parse::<u32>().ok())
823 .ok_or_else(|| BuildError::JournalctlParseVersion {
824 output: {
825 let cutoff = 40;
826 let length = stdout.chars().count();
827 format!(
828 "{}{}",
829 stdout.chars().take(cutoff).collect::<String>(),
830 if length > cutoff {
831 format!(" ..{} more char(s)", length - cutoff)
832 } else {
833 "".to_string()
834 }
835 )
836 },
837 })?)
838}
839
840fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
841 match log_namespace {
842 LogNamespace::Vector => {
843 if let Some(host) = log
844 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
845 .and_then(|meta| meta.get(HOSTNAME))
846 {
847 log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
848 }
849 }
850 LogNamespace::Legacy => {
851 if let Some(host) = log.remove(event_path!(HOSTNAME)) {
852 log_namespace.insert_source_metadata(
853 JournaldConfig::NAME,
854 log,
855 log_schema().host_key().map(LegacyKey::Overwrite),
856 path!("host"),
857 host,
858 );
859 }
860 }
861 }
862
863 let timestamp_value = match log_namespace {
865 LogNamespace::Vector => log
866 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
867 .and_then(|meta| {
868 meta.get(SOURCE_TIMESTAMP)
869 .or_else(|| meta.get(RECEIVED_TIMESTAMP))
870 }),
871 LogNamespace::Legacy => log
872 .get(event_path!(SOURCE_TIMESTAMP))
873 .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
874 };
875
876 let timestamp = timestamp_value
877 .filter(|&ts| ts.is_bytes())
878 .and_then(|ts| ts.as_str().unwrap().parse::<u64>().ok())
879 .map(|ts| {
880 chrono::Utc
881 .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
882 .single()
883 .expect("invalid timestamp")
884 });
885
886 match log_namespace {
888 LogNamespace::Vector => {
889 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
890
891 if let Some(ts) = timestamp {
892 log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
893 }
894 }
895 LogNamespace::Legacy => {
896 if let Some(ts) = timestamp {
897 log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
898 }
899 }
900 }
901
902 log_namespace.insert_vector_metadata(
904 log,
905 log_schema().source_type_key(),
906 path!("source_type"),
907 JournaldConfig::NAME,
908 );
909}
910
911fn create_log_event_from_record(
912 mut record: Record,
913 batch: &Option<BatchNotifier>,
914 log_namespace: LogNamespace,
915) -> LogEvent {
916 match log_namespace {
917 LogNamespace::Vector => {
918 let message_value = record
919 .remove(MESSAGE)
920 .map(|msg| Value::Bytes(Bytes::from(msg)))
921 .unwrap_or(Value::Null);
922
923 let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
924
925 record.iter().for_each(|(key, value)| {
927 log.metadata_mut()
928 .value_mut()
929 .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
930 });
931
932 log
933 }
934 LogNamespace::Legacy => {
935 let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
936
937 if let Some(message) = log.remove(event_path!(MESSAGE)) {
938 log.maybe_insert(log_schema().message_key_target_path(), message);
939 }
940
941 log
942 }
943 }
944}
945
946fn fixup_unit(unit: &str) -> String {
949 if unit.contains('.') {
950 unit.into()
951 } else {
952 format!("{unit}.service")
953 }
954}
955
956fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
957 let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
958 if let Some(record) = record.as_object_mut() {
961 for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
962 *value = decode_array(value.as_array().expect("already validated"));
963 }
964 }
965 if remap {
966 record.get_mut("PRIORITY").map(remap_priority);
967 }
968 serde_json::from_value(record)
969}
970
971fn decode_array(array: &[JsonValue]) -> JsonValue {
972 decode_array_as_bytes(array).unwrap_or_else(|| {
973 let ser = serde_json::to_string(array).expect("already deserialized");
974 JsonValue::String(ser)
975 })
976}
977
978fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
979 array
983 .iter()
984 .map(|item| {
985 item.as_u64().and_then(|num| match num {
986 num if num <= u8::MAX as u64 => Some(num as u8),
987 _ => None,
988 })
989 })
990 .collect::<Option<Vec<u8>>>()
991 .map(|array| String::from_utf8_lossy(&array).into())
992}
993
994fn remap_priority(priority: &mut JsonValue) {
995 if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
996 let text = match num {
997 0 => "EMERG",
998 1 => "ALERT",
999 2 => "CRIT",
1000 3 => "ERR",
1001 4 => "WARNING",
1002 5 => "NOTICE",
1003 6 => "INFO",
1004 7 => "DEBUG",
1005 _ => "UNKNOWN",
1006 };
1007 *priority = JsonValue::String(text.into());
1008 }
1009}
1010
1011fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
1012 match (includes.is_empty(), excludes.is_empty()) {
1013 (true, true) => false,
1014 (false, true) => !contains_match(record, includes),
1015 (true, false) => contains_match(record, excludes),
1016 (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
1017 }
1018}
1019
1020fn contains_match(record: &Record, matches: &Matches) -> bool {
1021 let f = move |(field, value)| {
1022 matches
1023 .get(field)
1024 .map(|x| x.contains(value))
1025 .unwrap_or(false)
1026 };
1027 record.iter().any(f)
1028}
1029
1030fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
1031 for (a_key, a_values) in a_matches {
1032 if let Some(b_values) = b_matches.get(a_key.as_str()) {
1033 for (a, b) in a_values
1034 .iter()
1035 .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
1036 {
1037 if a == b {
1038 return Some((a_key.into(), b.into()));
1039 }
1040 }
1041 }
1042 }
1043 None
1044}
1045
1046enum Finalizer {
1047 Sync(SharedCheckpointer),
1048 Async(OrderedFinalizer<String>),
1049}
1050
1051impl Finalizer {
1052 fn new(
1053 acknowledgements: bool,
1054 checkpointer: SharedCheckpointer,
1055 shutdown: ShutdownSignal,
1056 ) -> Self {
1057 if acknowledgements {
1058 let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
1059 crate::spawn_in_current_span(async move {
1060 while let Some((status, cursor)) = ack_stream.next().await {
1061 if status == BatchStatus::Delivered {
1062 checkpointer.lock().await.set(cursor).await;
1063 }
1064 }
1065 });
1066 Self::Async(finalizer)
1067 } else {
1068 Self::Sync(checkpointer)
1069 }
1070 }
1071
1072 async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
1073 match (self, receiver) {
1074 (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
1075 (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
1076 _ => {
1077 unreachable!("Cannot have async finalization without a receiver in journald source")
1078 }
1079 }
1080 }
1081}
1082
1083struct Checkpointer {
1084 file: File,
1085 filename: PathBuf,
1086}
1087
1088impl Checkpointer {
1089 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1090 let file = OpenOptions::new()
1091 .read(true)
1092 .write(true)
1093 .create(true)
1094 .truncate(false)
1095 .open(&filename)
1096 .await?;
1097 Ok(Checkpointer { file, filename })
1098 }
1099
1100 async fn set(&mut self, token: &str) -> Result<(), io::Error> {
1101 self.file.seek(SeekFrom::Start(0)).await?;
1102 self.file.write_all(format!("{token}\n").as_bytes()).await
1103 }
1104
1105 async fn get(&mut self) -> Result<Option<String>, io::Error> {
1106 let mut buf = Vec::<u8>::new();
1107 self.file.seek(SeekFrom::Start(0)).await?;
1108 self.file.read_to_end(&mut buf).await?;
1109 match buf.len() {
1110 0 => Ok(None),
1111 _ => {
1112 let text = String::from_utf8_lossy(&buf);
1113 Ok(text.split_once('\n').map(|(line, _)| line.to_string()))
1114 }
1115 }
1116 }
1117}
1118
1119struct StatefulCheckpointer {
1120 checkpointer: Checkpointer,
1121 cursor: Option<String>,
1122}
1123
1124impl StatefulCheckpointer {
1125 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1126 let mut checkpointer = Checkpointer::new(filename).await?;
1127 let cursor = checkpointer.get().await?;
1128 Ok(Self {
1129 checkpointer,
1130 cursor,
1131 })
1132 }
1133
1134 async fn set(&mut self, token: impl Into<String>) {
1135 let token = token.into();
1136 if let Err(error) = self.checkpointer.set(&token).await {
1137 emit!(JournaldCheckpointSetError {
1138 error,
1139 filename: self
1140 .checkpointer
1141 .filename
1142 .to_str()
1143 .unwrap_or("unknown")
1144 .to_string(),
1145 });
1146 }
1147 self.cursor = Some(token);
1148 }
1149}
1150
1151#[derive(Clone)]
1152struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
1153
1154impl SharedCheckpointer {
1155 fn new(c: StatefulCheckpointer) -> Self {
1156 Self(Arc::new(Mutex::new(c)))
1157 }
1158
1159 async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
1160 self.0.lock().await
1161 }
1162}
1163
1164#[cfg(test)]
1165mod checkpointer_tests {
1166 use tempfile::tempdir;
1167 use tokio::fs::read_to_string;
1168
1169 use super::*;
1170
1171 #[test]
1172 fn generate_config() {
1173 crate::test_util::test_generate_config::<JournaldConfig>();
1174 }
1175
1176 #[tokio::test]
1177 async fn journald_checkpointer_works() {
1178 let tempdir = tempdir().unwrap();
1179 let mut filename = tempdir.path().to_path_buf();
1180 filename.push(CHECKPOINT_FILENAME);
1181 let mut checkpointer = Checkpointer::new(filename.clone())
1182 .await
1183 .expect("Creating checkpointer failed!");
1184
1185 assert!(checkpointer.get().await.unwrap().is_none());
1186
1187 checkpointer
1188 .set("first test")
1189 .await
1190 .expect("Setting checkpoint failed");
1191 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
1192 let contents = read_to_string(filename.clone())
1193 .await
1194 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1195 assert!(contents.starts_with("first test\n"));
1196
1197 checkpointer
1198 .set("second")
1199 .await
1200 .expect("Setting checkpoint failed");
1201 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
1202 let contents = read_to_string(filename.clone())
1203 .await
1204 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1205 assert!(contents.starts_with("second\n"));
1206 }
1207}
1208
1209#[cfg(test)]
1210mod tests {
1211 use std::{fs, path::Path};
1212
1213 use tempfile::tempdir;
1214 use tokio::time::{Duration, Instant, sleep, timeout};
1215 use vrl::value::{Value, kind::Collection};
1216
1217 use super::*;
1218 use crate::{
1219 config::ComponentKey,
1220 event::{Event, EventStatus},
1221 test_util::components::assert_source_compliance,
1222 };
1223
1224 const TEST_COMPONENT: &str = "journald-test";
1225 const TEST_JOURNALCTL: &str = "tests/data/journalctl";
1226
1227 async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
1228 let include_matches = create_unit_matches(iunits.to_vec());
1229 let exclude_matches = create_unit_matches(xunits.to_vec());
1230 run_journal(include_matches, exclude_matches, cursor, false).await
1231 }
1232
1233 async fn run_journal(
1234 include_matches: Matches,
1235 exclude_matches: Matches,
1236 checkpoint: Option<&str>,
1237 emit_cursor: bool,
1238 ) -> Vec<Event> {
1239 assert_source_compliance(&["protocol"], async move {
1240 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
1241
1242 let tempdir = tempdir().unwrap();
1243 let tempdir = tempdir.path().to_path_buf();
1244
1245 if let Some(cursor) = checkpoint {
1246 let mut checkpoint_path = tempdir.clone();
1247 checkpoint_path.push(TEST_COMPONENT);
1248 fs::create_dir(&checkpoint_path).unwrap();
1249 checkpoint_path.push(CHECKPOINT_FILENAME);
1250
1251 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1252 .await
1253 .expect("Creating checkpointer failed!");
1254
1255 checkpointer
1256 .set(cursor)
1257 .await
1258 .expect("Could not set checkpoint");
1259 }
1260
1261 let (cx, shutdown) =
1262 SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1263 let config = JournaldConfig {
1264 journalctl_path: Some(TEST_JOURNALCTL.into()),
1265 include_matches,
1266 exclude_matches,
1267 data_dir: Some(tempdir),
1268 remap_priority: true,
1269 acknowledgements: false.into(),
1270 emit_cursor,
1271 ..Default::default()
1272 };
1273 let source = config.build(cx).await.unwrap();
1274 tokio::spawn(async move { source.await.unwrap() });
1275
1276 sleep(Duration::from_secs(1)).await;
1278 shutdown
1279 .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
1280 .await;
1281
1282 timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
1283 })
1284 .await
1285 }
1286
1287 fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
1288 let units: HashSet<String> = units.into_iter().map(Into::into).collect();
1289 let mut map = HashMap::new();
1290 if !units.is_empty() {
1291 map.insert(String::from(SYSTEMD_UNIT), units);
1292 }
1293 map
1294 }
1295
1296 fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
1297 let mut matches: Matches = HashMap::new();
1298 for (field, value) in conditions {
1299 matches
1300 .entry(field.into())
1301 .or_default()
1302 .insert(value.into());
1303 }
1304 matches
1305 }
1306
1307 #[tokio::test]
1308 async fn reads_journal() {
1309 let received = run_with_units(&[], &[], None).await;
1310 assert_eq!(received.len(), 8);
1311 assert_eq!(
1312 message(&received[0]),
1313 Value::Bytes("System Initialization".into())
1314 );
1315 assert_eq!(
1316 received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1317 "journald".into()
1318 );
1319 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
1320 assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
1321 assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
1322 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
1323 assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
1324 }
1325
1326 #[tokio::test]
1327 async fn includes_units() {
1328 let received = run_with_units(&["unit.service"], &[], None).await;
1329 assert_eq!(received.len(), 1);
1330 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1331 }
1332
1333 #[tokio::test]
1334 async fn excludes_units() {
1335 let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
1336 assert_eq!(received.len(), 6);
1337 assert_eq!(
1338 message(&received[0]),
1339 Value::Bytes("System Initialization".into())
1340 );
1341 assert_eq!(
1342 message(&received[1]),
1343 Value::Bytes("Missing timestamp".into())
1344 );
1345 assert_eq!(
1346 message(&received[2]),
1347 Value::Bytes("Different timestamps".into())
1348 );
1349 }
1350
1351 #[tokio::test]
1352 async fn emits_cursor() {
1353 let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1354 assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1355 assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1356 assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1357 }
1358
1359 #[tokio::test]
1360 async fn includes_matches() {
1361 let matches = create_matches(vec![("PRIORITY", "ERR")]);
1362 let received = run_journal(matches, HashMap::new(), None, false).await;
1363 assert_eq!(received.len(), 2);
1364 assert_eq!(
1365 message(&received[0]),
1366 Value::Bytes("Different timestamps".into())
1367 );
1368 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
1369 assert_eq!(
1370 message(&received[1]),
1371 Value::Bytes("Non-ASCII in other field".into())
1372 );
1373 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1374 }
1375
1376 #[tokio::test]
1377 async fn includes_kernel() {
1378 let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1379 let received = run_journal(matches, HashMap::new(), None, false).await;
1380 assert_eq!(received.len(), 1);
1381 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
1382 assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
1383 }
1384
1385 #[tokio::test]
1386 async fn excludes_matches() {
1387 let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1388 let received = run_journal(HashMap::new(), matches, None, false).await;
1389 assert_eq!(received.len(), 5);
1390 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
1391 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
1392 assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
1393 assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
1394 assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
1395 }
1396
1397 #[tokio::test]
1398 async fn handles_checkpoint() {
1399 let received = run_with_units(&[], &[], Some("1")).await;
1400 assert_eq!(received.len(), 7);
1401 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1402 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
1403 }
1404
1405 #[tokio::test]
1406 async fn parses_array_messages() {
1407 let received = run_with_units(&["badunit.service"], &[], None).await;
1408 assert_eq!(received.len(), 1);
1409 assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
1410 }
1411
1412 #[tokio::test]
1413 async fn parses_array_fields() {
1414 let received = run_with_units(&["syslog.service"], &[], None).await;
1415 assert_eq!(received.len(), 1);
1416 assert_eq!(
1417 received[0].as_log()["SYSLOG_RAW"],
1418 Value::Bytes("¿World?".into())
1419 );
1420 }
1421
1422 #[tokio::test]
1423 async fn parses_string_sequences() {
1424 let received = run_with_units(&["NetworkManager.service"], &[], None).await;
1425 assert_eq!(received.len(), 1);
1426 assert_eq!(
1427 received[0].as_log()["SYSLOG_FACILITY"],
1428 Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
1429 );
1430 }
1431
1432 #[tokio::test]
1433 async fn handles_missing_timestamp() {
1434 let received = run_with_units(&["stdout"], &[], None).await;
1435 assert_eq!(received.len(), 2);
1436 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
1437 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1438 }
1439
1440 #[tokio::test]
1441 async fn handles_acknowledgements() {
1442 let (tx, mut rx) = SourceSender::new_test();
1443
1444 let tempdir = tempdir().unwrap();
1445 let tempdir = tempdir.path().to_path_buf();
1446 let mut checkpoint_path = tempdir.clone();
1447 checkpoint_path.push(TEST_COMPONENT);
1448 fs::create_dir(&checkpoint_path).unwrap();
1449 checkpoint_path.push(CHECKPOINT_FILENAME);
1450
1451 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1452 .await
1453 .expect("Creating checkpointer failed!");
1454
1455 let config = JournaldConfig {
1456 journalctl_path: Some(TEST_JOURNALCTL.into()),
1457 data_dir: Some(tempdir),
1458 remap_priority: true,
1459 acknowledgements: true.into(),
1460 ..Default::default()
1461 };
1462 let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1463 let source = config.build(cx).await.unwrap();
1464 tokio::spawn(async move { source.await.unwrap() });
1465
1466 assert_eq!(checkpointer.get().await.unwrap(), None);
1468
1469 sleep(Duration::from_secs(1)).await;
1471
1472 let mut count = 0;
1474 while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
1475 assert_eq!(checkpointer.get().await.unwrap(), None);
1477 event.metadata().update_status(EventStatus::Delivered);
1478 count += 1;
1479 }
1480 assert_eq!(count, 8);
1481
1482 sleep(Duration::from_millis(100)).await;
1483 assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
1484 }
1485
1486 #[test]
1487 fn filter_matches_works_correctly() {
1488 let empty: Matches = HashMap::new();
1489 let includes = create_unit_matches(vec!["one", "two"]);
1490 let excludes = create_unit_matches(vec!["foo", "bar"]);
1491
1492 let zero = HashMap::new();
1493 assert!(!filter_matches(&zero, &empty, &empty));
1494 assert!(filter_matches(&zero, &includes, &empty));
1495 assert!(!filter_matches(&zero, &empty, &excludes));
1496 assert!(filter_matches(&zero, &includes, &excludes));
1497 let mut one = HashMap::new();
1498 one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
1499 assert!(!filter_matches(&one, &empty, &empty));
1500 assert!(!filter_matches(&one, &includes, &empty));
1501 assert!(!filter_matches(&one, &empty, &excludes));
1502 assert!(!filter_matches(&one, &includes, &excludes));
1503 let mut two = HashMap::new();
1504 two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
1505 assert!(!filter_matches(&two, &empty, &empty));
1506 assert!(filter_matches(&two, &includes, &empty));
1507 assert!(filter_matches(&two, &empty, &excludes));
1508 assert!(filter_matches(&two, &includes, &excludes));
1509 }
1510
1511 #[test]
1512 fn merges_units_and_matches_option() {
1513 let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
1514 let include_matches = create_matches(vec![
1515 ("_SYSTEMD_UNIT", "three.service"),
1516 ("_TRANSPORT", "kernel"),
1517 ]);
1518
1519 let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
1520 let exclude_matches = create_matches(vec![
1521 ("_SYSTEMD_UNIT", "baz.service"),
1522 ("PRIORITY", "DEBUG"),
1523 ]);
1524
1525 let journald_config = JournaldConfig {
1526 include_units,
1527 include_matches,
1528 exclude_units,
1529 exclude_matches,
1530 ..Default::default()
1531 };
1532
1533 let hashset =
1534 |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
1535
1536 let matches = journald_config.merged_include_matches();
1537 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1538 assert_eq!(
1539 units,
1540 &hashset(&["one.service", "two.service", "three.service"])
1541 );
1542 let units = matches.get("_TRANSPORT").unwrap();
1543 assert_eq!(units, &hashset(&["kernel"]));
1544
1545 let matches = journald_config.merged_exclude_matches();
1546 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1547 assert_eq!(
1548 units,
1549 &hashset(&["foo.service", "bar.service", "baz.service"])
1550 );
1551 let units = matches.get("PRIORITY").unwrap();
1552 assert_eq!(units, &hashset(&["DEBUG"]));
1553 }
1554
1555 #[test]
1556 fn find_duplicate_match_works_correctly() {
1557 let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1558 let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1559 let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
1560 assert_eq!(field, "_TRANSPORT");
1561 assert_eq!(value, "kernel");
1562
1563 let empty = HashMap::new();
1564 let actual = find_duplicate_match(&empty, &empty);
1565 assert!(actual.is_none());
1566
1567 let actual = find_duplicate_match(&include_matches, &empty);
1568 assert!(actual.is_none());
1569
1570 let actual = find_duplicate_match(&empty, &exclude_matches);
1571 assert!(actual.is_none());
1572 }
1573
1574 #[test]
1575 fn command_options() {
1576 let path = PathBuf::from("journalctl");
1577
1578 let systemd_version = 239;
1579 let journal_dir = None;
1580 let journal_namespace = None;
1581 let current_boot_only = false;
1582 let cursor = None;
1583 let since_now = false;
1584 let extra_args = vec![];
1585
1586 let command = create_command(
1587 &path,
1588 systemd_version,
1589 journal_dir,
1590 journal_namespace,
1591 current_boot_only,
1592 since_now,
1593 cursor,
1594 extra_args,
1595 );
1596 let cmd_line = format!("{command:?}");
1597 assert!(!cmd_line.contains("--directory="));
1598 assert!(!cmd_line.contains("--namespace="));
1599 assert!(!cmd_line.contains("--boot=all"));
1600 assert!(cmd_line.contains("--since=2000-01-01"));
1601
1602 let journal_dir = None;
1603 let journal_namespace = None;
1604 let since_now = true;
1605 let extra_args = vec![];
1606
1607 let command = create_command(
1608 &path,
1609 systemd_version,
1610 journal_dir,
1611 journal_namespace,
1612 current_boot_only,
1613 since_now,
1614 cursor,
1615 extra_args,
1616 );
1617 let cmd_line = format!("{command:?}");
1618 assert!(cmd_line.contains("--since=now"));
1619
1620 let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
1621 let journal_namespace = Some(String::from("my_namespace"));
1622 let current_boot_only = true;
1623 let cursor = Some("2021-01-01");
1624 let extra_args = vec!["--merge".to_string()];
1625
1626 let command = create_command(
1627 &path,
1628 systemd_version,
1629 journal_dir,
1630 journal_namespace,
1631 current_boot_only,
1632 since_now,
1633 cursor,
1634 extra_args,
1635 );
1636 let cmd_line = format!("{command:?}");
1637 assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
1638 assert!(cmd_line.contains("--namespace=my_namespace"));
1639 assert!(cmd_line.contains("--boot"));
1640 assert!(cmd_line.contains("--after-cursor="));
1641 assert!(cmd_line.contains("--merge"));
1642
1643 let systemd_version = 258;
1644 let journal_dir = None;
1645 let journal_namespace = None;
1646 let current_boot_only = false;
1647 let extra_args = vec![];
1648
1649 let command = create_command(
1650 &path,
1651 systemd_version,
1652 journal_dir,
1653 journal_namespace,
1654 current_boot_only,
1655 since_now,
1656 cursor,
1657 extra_args,
1658 );
1659 let cmd_line = format!("{command:?}");
1660 assert!(cmd_line.contains("--boot=all"));
1661 }
1662
1663 #[allow(clippy::too_many_arguments)]
1664 fn create_command(
1665 path: &Path,
1666 systemd_version: u32,
1667 journal_dir: Option<PathBuf>,
1668 journal_namespace: Option<String>,
1669 current_boot_only: bool,
1670 since_now: bool,
1671 cursor: Option<&str>,
1672 extra_args: Vec<String>,
1673 ) -> Command {
1674 StartJournalctl::new(
1675 path.into(),
1676 systemd_version,
1677 journal_dir,
1678 journal_namespace,
1679 current_boot_only,
1680 since_now,
1681 extra_args,
1682 )
1683 .make_command(cursor)
1684 }
1685
1686 fn message(event: &Event) -> Value {
1687 event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
1688 }
1689
1690 fn timestamp(event: &Event) -> Value {
1691 event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
1692 }
1693
1694 fn cursor(event: &Event) -> Value {
1695 event.as_log()[CURSOR].clone()
1696 }
1697
1698 fn value_ts(secs: i64, usecs: u32) -> Value {
1699 Value::Timestamp(
1700 chrono::Utc
1701 .timestamp_opt(secs, usecs)
1702 .single()
1703 .expect("invalid timestamp"),
1704 )
1705 }
1706
1707 fn priority(event: &Event) -> Value {
1708 event.as_log()["PRIORITY"].clone()
1709 }
1710
1711 #[test]
1712 fn output_schema_definition_vector_namespace() {
1713 let config = JournaldConfig {
1714 log_namespace: Some(true),
1715 ..Default::default()
1716 };
1717
1718 let definitions = config
1719 .outputs(LogNamespace::Vector)
1720 .remove(0)
1721 .schema_definition(true);
1722
1723 let expected_definition =
1724 Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
1725 .with_metadata_field(
1726 &owned_value_path!("vector", "source_type"),
1727 Kind::bytes(),
1728 None,
1729 )
1730 .with_metadata_field(
1731 &owned_value_path!("vector", "ingest_timestamp"),
1732 Kind::timestamp(),
1733 None,
1734 )
1735 .with_metadata_field(
1736 &owned_value_path!(JournaldConfig::NAME, "metadata"),
1737 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1738 None,
1739 )
1740 .with_metadata_field(
1741 &owned_value_path!(JournaldConfig::NAME, "timestamp"),
1742 Kind::timestamp().or_undefined(),
1743 Some("timestamp"),
1744 )
1745 .with_metadata_field(
1746 &owned_value_path!(JournaldConfig::NAME, "host"),
1747 Kind::bytes().or_undefined(),
1748 Some("host"),
1749 );
1750
1751 assert_eq!(definitions, Some(expected_definition))
1752 }
1753
1754 #[test]
1755 fn output_schema_definition_legacy_namespace() {
1756 let config = JournaldConfig::default();
1757
1758 let definitions = config
1759 .outputs(LogNamespace::Legacy)
1760 .remove(0)
1761 .schema_definition(true);
1762
1763 let expected_definition = Definition::new_with_default_metadata(
1764 Kind::object(Collection::empty()),
1765 [LogNamespace::Legacy],
1766 )
1767 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1768 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1769 .with_event_field(
1770 &owned_value_path!("host"),
1771 Kind::bytes().or_undefined(),
1772 Some("host"),
1773 )
1774 .unknown_fields(Kind::bytes());
1775
1776 assert_eq!(definitions, Some(expected_definition))
1777 }
1778
1779 fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
1780 let record = r#"{
1781 "PRIORITY":"6",
1782 "SYSLOG_FACILITY":"3",
1783 "SYSLOG_IDENTIFIER":"ntpd",
1784 "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
1785 "_CAP_EFFECTIVE":"3fffffffff",
1786 "_CMDLINE":"ntpd: [priv]",
1787 "_COMM":"ntpd",
1788 "_EXE":"/usr/sbin/ntpd",
1789 "_GID":"0",
1790 "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
1791 "_PID":"2156",
1792 "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
1793 "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
1794 "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
1795 "_SYSTEMD_SLICE":"system.slice",
1796 "_SYSTEMD_UNIT":"ntpd.service",
1797 "_TRANSPORT":"stdout",
1798 "_UID":"0",
1799 "__MONOTONIC_TIMESTAMP":"98694000446",
1800 "__REALTIME_TIMESTAMP":"1564173027000443",
1801 "host":"my-host.local",
1802 "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
1803 "source_type":"journald"
1804 }"#;
1805
1806 let json: serde_json::Value = serde_json::from_str(record).unwrap();
1807 let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
1808
1809 event.as_mut_log().insert("timestamp", chrono::Utc::now());
1810
1811 let definitions = config.outputs(namespace).remove(0).schema_definition(true);
1812
1813 definitions.unwrap().assert_valid_for_event(&event);
1814 }
1815
1816 #[test]
1817 fn matches_schema_legacy() {
1818 let config = JournaldConfig::default();
1819
1820 matches_schema(&config, LogNamespace::Legacy)
1821 }
1822}