1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::sync::oneshot;
10use tracing::{Instrument, Span};
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 codecs::{BytesDeserializer, BytesDeserializerConfig},
14 config::{LegacyKey, LogNamespace},
15 configurable::configurable_component,
16 file_source::{
17 file_server::{FileServer, Line, calculate_ignore_before},
18 paths_provider::{Glob, MatchOptions},
19 },
20 file_source_common::{
21 Checkpointer, FileFingerprint, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
22 },
23 finalizer::OrderedFinalizer,
24 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
25};
26use vrl::value::Kind;
27
28use super::util::{EncodingConfig, MultilineConfig};
29use crate::{
30 SourceSender,
31 config::{
32 DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
33 log_schema,
34 },
35 encoding_transcode::{Decoder, Encoder},
36 event::{BatchNotifier, BatchStatus, LogEvent},
37 internal_events::{
38 FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
39 FileSourceInternalEventsEmitter, StreamClosedError,
40 },
41 line_agg::{self, LineAgg},
42 serde::bool_or_struct,
43 shutdown::ShutdownSignal,
44};
45
46#[derive(Debug, Snafu)]
47enum BuildError {
48 #[snafu(display(
49 "message_start_indicator {:?} is not a valid regex: {}",
50 indicator,
51 source
52 ))]
53 InvalidMessageStartIndicator {
54 indicator: String,
55 source: regex::Error,
56 },
57}
58
59#[serde_as]
61#[configurable_component(source("file", "Collect logs from files."))]
62#[derive(Clone, Debug, PartialEq, Eq)]
63#[serde(deny_unknown_fields)]
64pub struct FileConfig {
65 #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
67 pub include: Vec<PathBuf>,
68
69 #[serde(default)]
75 #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
76 pub exclude: Vec<PathBuf>,
77
78 #[serde(default = "default_file_key")]
84 #[configurable(metadata(docs::examples = "path"))]
85 pub file_key: OptionalValuePath,
86
87 #[configurable(
89 deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
90 )]
91 #[configurable(metadata(docs::hidden))]
92 #[serde(default)]
93 pub start_at_beginning: Option<bool>,
94
95 #[serde(default)]
99 pub ignore_checkpoints: Option<bool>,
100
101 #[serde(default = "default_read_from")]
102 #[configurable(derived)]
103 pub read_from: ReadFromConfig,
104
105 #[serde(alias = "ignore_older", default)]
107 #[configurable(metadata(docs::type_unit = "seconds"))]
108 #[configurable(metadata(docs::examples = 600))]
109 #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
110 pub ignore_older_secs: Option<u64>,
111
112 #[serde(default = "default_max_line_bytes")]
116 #[configurable(metadata(docs::type_unit = "bytes"))]
117 pub max_line_bytes: usize,
118
119 #[configurable(metadata(docs::examples = "hostname"))]
127 pub host_key: Option<OptionalValuePath>,
128
129 #[serde(default)]
138 #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
139 #[configurable(metadata(docs::human_name = "Data Directory"))]
140 pub data_dir: Option<PathBuf>,
141
142 #[serde(default)]
148 #[configurable(metadata(docs::examples = "offset"))]
149 pub offset_key: Option<OptionalValuePath>,
150
151 #[serde(
157 alias = "glob_minimum_cooldown",
158 default = "default_glob_minimum_cooldown_ms"
159 )]
160 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161 #[configurable(metadata(docs::type_unit = "milliseconds"))]
162 #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
163 pub glob_minimum_cooldown_ms: Duration,
164
165 #[configurable(derived)]
166 #[serde(alias = "fingerprinting", default)]
167 fingerprint: FingerprintConfig,
168
169 #[serde(default)]
173 pub ignore_not_found: bool,
174
175 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
177 #[configurable(metadata(docs::hidden))]
178 #[serde(default)]
179 pub message_start_indicator: Option<String>,
180
181 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
183 #[configurable(metadata(docs::hidden))]
184 #[serde(default = "default_multi_line_timeout")]
185 pub multi_line_timeout: u64,
186
187 #[configurable(derived)]
191 #[serde(default)]
192 pub multiline: Option<MultilineConfig>,
193
194 #[serde(default = "default_max_read_bytes")]
200 #[configurable(metadata(docs::type_unit = "bytes"))]
201 pub max_read_bytes: usize,
202
203 #[serde(default)]
205 pub oldest_first: bool,
206
207 #[serde(alias = "remove_after", default)]
211 #[configurable(metadata(docs::type_unit = "seconds"))]
212 #[configurable(metadata(docs::examples = 0))]
213 #[configurable(metadata(docs::examples = 5))]
214 #[configurable(metadata(docs::examples = 60))]
215 #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
216 pub remove_after_secs: Option<u64>,
217
218 #[serde(default = "default_line_delimiter")]
220 #[configurable(metadata(docs::examples = "\r\n"))]
221 pub line_delimiter: String,
222
223 #[configurable(derived)]
224 #[serde(default)]
225 pub encoding: Option<EncodingConfig>,
226
227 #[configurable(derived)]
228 #[serde(default, deserialize_with = "bool_or_struct")]
229 acknowledgements: SourceAcknowledgementsConfig,
230
231 #[configurable(metadata(docs::hidden))]
233 #[serde(default)]
234 log_namespace: Option<bool>,
235
236 #[configurable(derived)]
237 #[serde(default)]
238 internal_metrics: FileInternalMetricsConfig,
239
240 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
243 #[configurable(metadata(docs::type_unit = "seconds"))]
244 #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
245 pub rotate_wait: Duration,
246}
247
248fn default_max_line_bytes() -> usize {
249 bytesize::kib(100u64) as usize
250}
251
252fn default_file_key() -> OptionalValuePath {
253 OptionalValuePath::from(owned_value_path!("file"))
254}
255
256const fn default_read_from() -> ReadFromConfig {
257 ReadFromConfig::Beginning
258}
259
260const fn default_glob_minimum_cooldown_ms() -> Duration {
261 Duration::from_millis(1000)
262}
263
264const fn default_multi_line_timeout() -> u64 {
265 1000
266} const fn default_max_read_bytes() -> usize {
269 2048
270}
271
272fn default_line_delimiter() -> String {
273 "\n".to_string()
274}
275
276const fn default_rotate_wait() -> Duration {
277 Duration::from_secs(u64::MAX / 2)
278}
279
280#[configurable_component]
284#[derive(Clone, Debug, PartialEq, Eq)]
285#[serde(tag = "strategy", rename_all = "snake_case")]
286#[configurable(metadata(
287 docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
288))]
289pub enum FingerprintConfig {
290 Checksum {
292 #[serde(default = "default_ignored_header_bytes")]
298 #[configurable(metadata(docs::type_unit = "bytes"))]
299 ignored_header_bytes: usize,
300
301 #[serde(default = "default_lines")]
308 #[configurable(metadata(docs::type_unit = "lines"))]
309 lines: usize,
310 },
311
312 #[serde(rename = "device_and_inode")]
316 DevInode,
317}
318
319impl Default for FingerprintConfig {
320 fn default() -> Self {
321 Self::Checksum {
322 ignored_header_bytes: 0,
323 lines: default_lines(),
324 }
325 }
326}
327
328const fn default_ignored_header_bytes() -> usize {
329 0
330}
331
332const fn default_lines() -> usize {
333 1
334}
335
336impl From<FingerprintConfig> for FingerprintStrategy {
337 fn from(config: FingerprintConfig) -> FingerprintStrategy {
338 match config {
339 FingerprintConfig::Checksum {
340 ignored_header_bytes,
341 lines,
342 } => FingerprintStrategy::FirstLinesChecksum {
343 ignored_header_bytes,
344 lines,
345 },
346 FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
347 }
348 }
349}
350
351#[derive(Debug)]
352pub(crate) struct FinalizerEntry {
353 pub(crate) file_id: FileFingerprint,
354 pub(crate) offset: u64,
355}
356
357impl Default for FileConfig {
358 fn default() -> Self {
359 Self {
360 include: vec![PathBuf::from("/var/log/**/*.log")],
361 exclude: vec![],
362 file_key: default_file_key(),
363 start_at_beginning: None,
364 ignore_checkpoints: None,
365 read_from: default_read_from(),
366 ignore_older_secs: None,
367 max_line_bytes: default_max_line_bytes(),
368 fingerprint: FingerprintConfig::default(),
369 ignore_not_found: false,
370 host_key: None,
371 offset_key: None,
372 data_dir: None,
373 glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
374 message_start_indicator: None,
375 multi_line_timeout: default_multi_line_timeout(), multiline: None,
377 max_read_bytes: default_max_read_bytes(),
378 oldest_first: false,
379 remove_after_secs: None,
380 line_delimiter: default_line_delimiter(),
381 encoding: None,
382 acknowledgements: Default::default(),
383 log_namespace: None,
384 internal_metrics: Default::default(),
385 rotate_wait: default_rotate_wait(),
386 }
387 }
388}
389
390impl_generate_config_from_default!(FileConfig);
391
392#[async_trait::async_trait]
393#[typetag::serde(name = "file")]
394impl SourceConfig for FileConfig {
395 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
396 let data_dir = cx
401 .globals
402 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
404
405 #[allow(clippy::suspicious_else_formatting)]
407 {
408 if let Some(ref config) = self.multiline {
409 let _: line_agg::Config = config.try_into()?;
410 }
411
412 if let Some(ref indicator) = self.message_start_indicator {
413 Regex::new(indicator)
414 .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
415 }
416 }
417
418 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
419
420 let log_namespace = cx.log_namespace(self.log_namespace);
421
422 Ok(file_source(
423 self,
424 data_dir,
425 cx.shutdown,
426 cx.out,
427 acknowledgements,
428 log_namespace,
429 ))
430 }
431
432 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
433 let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
434 let host_key = self
435 .host_key
436 .clone()
437 .unwrap_or(log_schema().host_key().cloned().into())
438 .path
439 .map(LegacyKey::Overwrite);
440
441 let offset_key = self
442 .offset_key
443 .clone()
444 .and_then(|k| k.path)
445 .map(LegacyKey::Overwrite);
446
447 let schema_definition = BytesDeserializerConfig
448 .schema_definition(global_log_namespace.merge(self.log_namespace))
449 .with_standard_vector_source_metadata()
450 .with_source_metadata(
451 Self::NAME,
452 host_key,
453 &owned_value_path!("host"),
454 Kind::bytes().or_undefined(),
455 Some("host"),
456 )
457 .with_source_metadata(
458 Self::NAME,
459 offset_key,
460 &owned_value_path!("offset"),
461 Kind::integer(),
462 None,
463 )
464 .with_source_metadata(
465 Self::NAME,
466 file_key,
467 &owned_value_path!("path"),
468 Kind::bytes(),
469 None,
470 );
471
472 vec![SourceOutput::new_maybe_logs(
473 DataType::Log,
474 schema_definition,
475 )]
476 }
477
478 fn can_acknowledge(&self) -> bool {
479 true
480 }
481}
482
483pub fn file_source(
484 config: &FileConfig,
485 data_dir: PathBuf,
486 shutdown: ShutdownSignal,
487 mut out: SourceSender,
488 acknowledgements: bool,
489 log_namespace: LogNamespace,
490) -> super::Source {
491 if config.include.is_empty() {
493 error!(
494 message = "`include` configuration option must contain at least one file pattern.",
495 internal_log_rate_limit = false
496 );
497 return Box::pin(future::ready(Err(())));
498 }
499
500 let exclude_patterns = config
501 .exclude
502 .iter()
503 .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
504 .collect::<Vec<PathBuf>>();
505 let ignore_before = calculate_ignore_before(config.ignore_older_secs);
506 let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
507 let (ignore_checkpoints, read_from) = reconcile_position_options(
508 config.start_at_beginning,
509 config.ignore_checkpoints,
510 Some(config.read_from),
511 );
512
513 let emitter = FileSourceInternalEventsEmitter {
514 include_file_metric_tag: config.internal_metrics.include_file_tag,
515 };
516
517 let paths_provider = Glob::new(
518 &config.include,
519 &exclude_patterns,
520 MatchOptions::default(),
521 emitter.clone(),
522 )
523 .expect("invalid glob patterns");
524
525 let encoding_charset = config.encoding.clone().map(|e| e.charset);
526
527 let line_delimiter_as_bytes = match encoding_charset {
530 Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
531 None => Bytes::from(config.line_delimiter.clone()),
532 };
533
534 let checkpointer = Checkpointer::new(&data_dir);
535 let strategy = config.fingerprint.clone().into();
536
537 let file_server = FileServer {
538 paths_provider,
539 max_read_bytes: config.max_read_bytes,
540 ignore_checkpoints,
541 read_from,
542 ignore_before,
543 max_line_bytes: config.max_line_bytes,
544 line_delimiter: line_delimiter_as_bytes,
545 data_dir,
546 glob_minimum_cooldown,
547 fingerprinter: Fingerprinter::new(strategy, config.max_line_bytes, config.ignore_not_found),
548 oldest_first: config.oldest_first,
549 remove_after: config.remove_after_secs.map(Duration::from_secs),
550 emitter,
551 rotate_wait: config.rotate_wait,
552 };
553
554 let event_metadata = EventMetadata {
555 host_key: config
556 .host_key
557 .clone()
558 .unwrap_or(log_schema().host_key().cloned().into())
559 .path,
560 hostname: crate::get_hostname().ok(),
561 file_key: config.file_key.clone().path,
562 offset_key: config.offset_key.clone().and_then(|k| k.path),
563 };
564
565 let include = config.include.clone();
566 let exclude = config.exclude.clone();
567 let multiline_config = config.multiline.clone();
568 let message_start_indicator = config.message_start_indicator.clone();
569 let multi_line_timeout = config.multi_line_timeout;
570
571 let (finalizer, shutdown_checkpointer) = if acknowledgements {
572 let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
576
577 let (send_shutdown, shutdown2) = oneshot::channel::<()>();
582 let checkpoints = checkpointer.view();
583 crate::spawn_in_current_span(async move {
584 while let Some((status, entry)) = ack_stream.next().await {
585 if status == BatchStatus::Delivered {
586 checkpoints.update(entry.file_id, entry.offset);
587 }
588 }
589 send_shutdown.send(())
590 });
591 (Some(finalizer), shutdown2.map(|_| ()).boxed())
592 } else {
593 (None, shutdown.clone().map(|_| ()).boxed())
596 };
597
598 let checkpoints = checkpointer.view();
599 let include_file_metric_tag = config.internal_metrics.include_file_tag;
600 Box::pin(async move {
601 info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
602
603 let mut encoding_decoder = encoding_charset.map(Decoder::new);
604
605 let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
607 let rx = rx
608 .map(futures::stream::iter)
609 .flatten()
610 .map(move |mut line| {
611 emit!(FileBytesReceived {
612 byte_size: line.text.len(),
613 file: &line.filename,
614 include_file_metric_tag,
615 });
616 line.text = match encoding_decoder.as_mut() {
618 Some(d) => d.decode_to_utf8(line.text),
619 None => line.text,
620 };
621 line
622 });
623
624 let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
625 if let Some(ref multiline_config) = multiline_config {
626 wrap_with_line_agg(
627 rx,
628 multiline_config.try_into().unwrap(), )
630 } else if let Some(msi) = message_start_indicator {
631 wrap_with_line_agg(
632 rx,
633 line_agg::Config::for_legacy(
634 Regex::new(&msi).unwrap(), multi_line_timeout,
636 ),
637 )
638 } else {
639 Box::new(rx)
640 };
641
642 let span = Span::current();
645 let mut messages = messages.map(move |line| {
646 let mut event = create_event(
647 line.text,
648 line.start_offset,
649 &line.filename,
650 &event_metadata,
651 log_namespace,
652 include_file_metric_tag,
653 );
654
655 if let Some(finalizer) = &finalizer {
656 let (batch, receiver) = BatchNotifier::new_with_receiver();
657 event = event.with_batch_notifier(&batch);
658 let entry = FinalizerEntry {
659 file_id: line.file_id,
660 offset: line.end_offset,
661 };
662 finalizer.add(entry, receiver);
664 } else {
665 checkpoints.update(line.file_id, line.end_offset);
666 }
667 event
668 });
669 tokio::spawn(async move {
670 match out
671 .send_event_stream(&mut messages)
672 .instrument(span.or_current())
673 .await
674 {
675 Ok(()) => {
676 debug!("Finished sending.");
677 }
678 Err(_) => {
679 let (count, _) = messages.size_hint();
680 emit!(StreamClosedError { count });
681 }
682 }
683 });
684
685 let span = info_span!("file_server");
686 tokio::task::spawn_blocking(move || {
687 let _enter = span.enter();
688 let rt = tokio::runtime::Handle::current();
689 let result =
690 rt.block_on(file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer));
691 emit!(FileOpen { count: 0 });
692 result.expect("file server exited with an error");
696 })
697 .map_err(|error| error!(message="File server unexpectedly stopped.", %error, internal_log_rate_limit = false))
698 .await
699 })
700}
701
702fn reconcile_position_options(
705 start_at_beginning: Option<bool>,
706 ignore_checkpoints: Option<bool>,
707 read_from: Option<ReadFromConfig>,
708) -> (bool, ReadFrom) {
709 if start_at_beginning.is_some() {
710 warn!(
711 message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead."
712 )
713 }
714
715 match start_at_beginning {
716 Some(true) => (
717 ignore_checkpoints.unwrap_or(true),
718 read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
719 ),
720 _ => (
721 ignore_checkpoints.unwrap_or(false),
722 read_from.map(Into::into).unwrap_or_default(),
723 ),
724 }
725}
726
727fn wrap_with_line_agg(
728 rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
729 config: line_agg::Config,
730) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
731 let logic = line_agg::Logic::new(config);
732 Box::new(
733 LineAgg::new(
734 rx.map(|line| {
735 (
736 line.filename,
737 line.text,
738 (line.file_id, line.start_offset, line.end_offset),
739 )
740 }),
741 logic,
742 )
743 .map(
744 |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
745 text,
746 filename,
747 file_id,
748 start_offset,
749 end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
750 lastline_end_offset
751 }),
752 },
753 ),
754 )
755}
756
757struct EventMetadata {
758 host_key: Option<OwnedValuePath>,
759 hostname: Option<String>,
760 file_key: Option<OwnedValuePath>,
761 offset_key: Option<OwnedValuePath>,
762}
763
764fn create_event(
765 line: Bytes,
766 offset: u64,
767 file: &str,
768 meta: &EventMetadata,
769 log_namespace: LogNamespace,
770 include_file_metric_tag: bool,
771) -> LogEvent {
772 let deserializer = BytesDeserializer;
773 let mut event = deserializer.parse_single(line, log_namespace);
774
775 log_namespace.insert_vector_metadata(
776 &mut event,
777 log_schema().source_type_key(),
778 path!("source_type"),
779 Bytes::from_static(FileConfig::NAME.as_bytes()),
780 );
781 log_namespace.insert_vector_metadata(
782 &mut event,
783 log_schema().timestamp_key(),
784 path!("ingest_timestamp"),
785 Utc::now(),
786 );
787
788 let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
789 if let Some(hostname) = &meta.hostname {
791 log_namespace.insert_source_metadata(
792 FileConfig::NAME,
793 &mut event,
794 legacy_host_key,
795 path!("host"),
796 hostname.clone(),
797 );
798 }
799
800 let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
801 log_namespace.insert_source_metadata(
802 FileConfig::NAME,
803 &mut event,
804 legacy_offset_key,
805 path!("offset"),
806 offset,
807 );
808
809 let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
810 log_namespace.insert_source_metadata(
811 FileConfig::NAME,
812 &mut event,
813 legacy_file_key,
814 path!("path"),
815 file,
816 );
817
818 emit!(FileEventsReceived {
819 count: 1,
820 file,
821 byte_size: event.estimated_json_encoded_size_of(),
822 include_file_metric_tag,
823 });
824
825 event
826}
827
828#[cfg(test)]
829mod tests {
830 use std::{
831 collections::HashSet,
832 fs::{self, File},
833 future::Future,
834 io::{Seek, Write},
835 sync::{
836 Arc,
837 atomic::{AtomicUsize, Ordering},
838 },
839 };
840
841 use encoding_rs::UTF_16LE;
842 use indoc::indoc;
843 use similar_asserts::assert_eq;
844 use tempfile::tempdir;
845 use tokio::time::{Duration, sleep, timeout};
846 use vector_lib::schema::Definition;
847 use vrl::{value, value::kind::Collection};
848
849 use super::*;
850 use crate::{
851 config::Config,
852 event::{Event, EventStatus, Value},
853 shutdown::ShutdownSignal,
854 sources::file,
855 test_util::{
856 components::{FILE_SOURCE_TAGS, assert_source_compliance},
857 wait_for_atomic_usize_timeout_ms,
858 },
859 };
860
861 #[test]
862 fn generate_config() {
863 crate::test_util::test_generate_config::<FileConfig>();
864 }
865
866 fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
867 let data_dir = dir.path().join(".data");
870 fs::create_dir_all(&data_dir).unwrap();
871 file::FileConfig {
872 fingerprint: FingerprintConfig::Checksum {
873 ignored_header_bytes: 0,
874 lines: 1,
875 },
876 data_dir: Some(data_dir),
877 glob_minimum_cooldown_ms: Duration::from_millis(100),
878 internal_metrics: FileInternalMetricsConfig {
879 include_file_tag: true,
880 },
881 ..Default::default()
882 }
883 }
884
885 async fn sleep_500_millis() {
886 sleep(Duration::from_millis(500)).await;
887 }
888
889 #[test]
890 fn parse_config() {
891 let config: FileConfig = serde_yaml::from_str(indoc! {
892 r#"
893 include:
894 - /var/log/**/*.log
895 file_key: file
896 glob_minimum_cooldown_ms: 1000
897 multi_line_timeout: 1000
898 max_read_bytes: 2048
899 line_delimiter: "\n"
900 "#,
901 })
902 .unwrap();
903 assert_eq!(config, FileConfig::default());
904 assert_eq!(
905 config.fingerprint,
906 FingerprintConfig::Checksum {
907 ignored_header_bytes: 0,
908 lines: 1
909 }
910 );
911
912 let config: FileConfig = serde_yaml::from_str(indoc! {
913 r#"
914 include:
915 - /var/log/**/*.log
916 fingerprint:
917 strategy: device_and_inode
918 "#,
919 })
920 .unwrap();
921 assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
922
923 let config: FileConfig = serde_yaml::from_str(indoc! {
924 r#"
925 include:
926 - /var/log/**/*.log
927 fingerprint:
928 strategy: checksum
929 bytes: 128
930 ignored_header_bytes: 512
931 "#,
932 })
933 .unwrap();
934 assert_eq!(
935 config.fingerprint,
936 FingerprintConfig::Checksum {
937 ignored_header_bytes: 512,
938 lines: 1
939 }
940 );
941
942 let config: FileConfig = serde_yaml::from_str(indoc! {
943 r#"
944 include:
945 - /var/log/**/*.log
946 encoding:
947 charset: utf-16le
948 "#,
949 })
950 .unwrap();
951 assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
952
953 let config: FileConfig = serde_yaml::from_str(indoc! {
954 r#"
955 include:
956 - /var/log/**/*.log
957 read_from: beginning
958 "#,
959 })
960 .unwrap();
961 assert_eq!(config.read_from, ReadFromConfig::Beginning);
962
963 let config: FileConfig = serde_yaml::from_str(indoc! {
964 r#"
965 include:
966 - /var/log/**/*.log
967 read_from: end
968 "#,
969 })
970 .unwrap();
971 assert_eq!(config.read_from, ReadFromConfig::End);
972 }
973
974 #[test]
975 fn resolve_data_dir() {
976 let global_dir = tempdir().unwrap();
977 let local_dir = tempdir().unwrap();
978
979 let mut config = Config::default();
980 config.global.data_dir = global_dir.keep().into();
981
982 let local_data_dir = Some(local_dir.path().to_path_buf());
984 let res = config
985 .global
986 .resolve_and_validate_data_dir(local_data_dir.as_ref())
987 .unwrap();
988 assert_eq!(res, local_dir.path());
989
990 let res = config.global.resolve_and_validate_data_dir(None).unwrap();
992 assert_eq!(res, config.global.data_dir.unwrap());
993 }
994
995 #[test]
996 fn output_schema_definition_vector_namespace() {
997 let definitions = FileConfig::default()
998 .outputs(LogNamespace::Vector)
999 .remove(0)
1000 .schema_definition(true);
1001
1002 assert_eq!(
1003 definitions,
1004 Some(
1005 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1006 .with_meaning(OwnedTargetPath::event_root(), "message")
1007 .with_metadata_field(
1008 &owned_value_path!("vector", "source_type"),
1009 Kind::bytes(),
1010 None
1011 )
1012 .with_metadata_field(
1013 &owned_value_path!("vector", "ingest_timestamp"),
1014 Kind::timestamp(),
1015 None
1016 )
1017 .with_metadata_field(
1018 &owned_value_path!("file", "host"),
1019 Kind::bytes().or_undefined(),
1020 Some("host")
1021 )
1022 .with_metadata_field(
1023 &owned_value_path!("file", "offset"),
1024 Kind::integer(),
1025 None
1026 )
1027 .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1028 )
1029 )
1030 }
1031
1032 #[test]
1033 fn output_schema_definition_legacy_namespace() {
1034 let definitions = FileConfig::default()
1035 .outputs(LogNamespace::Legacy)
1036 .remove(0)
1037 .schema_definition(true);
1038
1039 assert_eq!(
1040 definitions,
1041 Some(
1042 Definition::new_with_default_metadata(
1043 Kind::object(Collection::empty()),
1044 [LogNamespace::Legacy]
1045 )
1046 .with_event_field(
1047 &owned_value_path!("message"),
1048 Kind::bytes(),
1049 Some("message")
1050 )
1051 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1052 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1053 .with_event_field(
1054 &owned_value_path!("host"),
1055 Kind::bytes().or_undefined(),
1056 Some("host")
1057 )
1058 .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1059 .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1060 )
1061 )
1062 }
1063
1064 #[test]
1065 fn create_event_legacy_namespace() {
1066 let line = Bytes::from("hello world");
1067 let file = "some_file.rs";
1068 let offset: u64 = 0;
1069
1070 let meta = EventMetadata {
1071 host_key: Some(owned_value_path!("host")),
1072 hostname: Some("Some.Machine".to_string()),
1073 file_key: Some(owned_value_path!("file")),
1074 offset_key: Some(owned_value_path!("offset")),
1075 };
1076 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1077
1078 assert_eq!(log["file"], "some_file.rs".into());
1079 assert_eq!(log["host"], "Some.Machine".into());
1080 assert_eq!(log["offset"], 0.into());
1081 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1082 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1083 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1084 }
1085
1086 #[test]
1087 fn create_event_custom_fields_legacy_namespace() {
1088 let line = Bytes::from("hello world");
1089 let file = "some_file.rs";
1090 let offset: u64 = 0;
1091
1092 let meta = EventMetadata {
1093 host_key: Some(owned_value_path!("hostname")),
1094 hostname: Some("Some.Machine".to_string()),
1095 file_key: Some(owned_value_path!("file_path")),
1096 offset_key: Some(owned_value_path!("off")),
1097 };
1098 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1099
1100 assert_eq!(log["file_path"], "some_file.rs".into());
1101 assert_eq!(log["hostname"], "Some.Machine".into());
1102 assert_eq!(log["off"], 0.into());
1103 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1104 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1105 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1106 }
1107
1108 #[test]
1109 fn create_event_vector_namespace() {
1110 let line = Bytes::from("hello world");
1111 let file = "some_file.rs";
1112 let offset: u64 = 0;
1113
1114 let meta = EventMetadata {
1115 host_key: Some(owned_value_path!("ignored")),
1116 hostname: Some("Some.Machine".to_string()),
1117 file_key: Some(owned_value_path!("ignored")),
1118 offset_key: Some(owned_value_path!("ignored")),
1119 };
1120 let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1121
1122 assert_eq!(log.value(), &value!("hello world"));
1123
1124 assert_eq!(
1125 log.metadata()
1126 .value()
1127 .get(path!("vector", "source_type"))
1128 .unwrap(),
1129 &value!("file")
1130 );
1131 assert!(
1132 log.metadata()
1133 .value()
1134 .get(path!("vector", "ingest_timestamp"))
1135 .unwrap()
1136 .is_timestamp()
1137 );
1138
1139 assert_eq!(
1140 log.metadata()
1141 .value()
1142 .get(path!(FileConfig::NAME, "host"))
1143 .unwrap(),
1144 &value!("Some.Machine")
1145 );
1146 assert_eq!(
1147 log.metadata()
1148 .value()
1149 .get(path!(FileConfig::NAME, "offset"))
1150 .unwrap(),
1151 &value!(0)
1152 );
1153 assert_eq!(
1154 log.metadata()
1155 .value()
1156 .get(path!(FileConfig::NAME, "path"))
1157 .unwrap(),
1158 &value!("some_file.rs")
1159 );
1160 }
1161
1162 #[tokio::test]
1163 async fn file_happy_path() {
1164 let n = 5;
1165
1166 let dir = tempdir().unwrap();
1167 let config = file::FileConfig {
1168 include: vec![dir.path().join("*")],
1169 ..test_default_file_config(&dir)
1170 };
1171
1172 let path1 = dir.path().join("file1");
1173 let path2 = dir.path().join("file2");
1174
1175 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1176 let mut file1 = File::create(&path1).unwrap();
1177 let mut file2 = File::create(&path2).unwrap();
1178
1179 for i in 0..n {
1180 writeln!(&mut file1, "hello {i}").unwrap();
1181 writeln!(&mut file2, "goodbye {i}").unwrap();
1182 }
1183
1184 file1.flush().unwrap();
1185 file2.flush().unwrap();
1186
1187 sleep_500_millis().await;
1188 })
1189 .await;
1190
1191 let mut hello_i = 0;
1192 let mut goodbye_i = 0;
1193
1194 for event in received {
1195 let line =
1196 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1197 if line.starts_with("hello") {
1198 assert_eq!(line, format!("hello {}", hello_i));
1199 assert_eq!(
1200 event.as_log()["file"].to_string_lossy(),
1201 path1.to_str().unwrap()
1202 );
1203 hello_i += 1;
1204 } else {
1205 assert_eq!(line, format!("goodbye {}", goodbye_i));
1206 assert_eq!(
1207 event.as_log()["file"].to_string_lossy(),
1208 path2.to_str().unwrap()
1209 );
1210 goodbye_i += 1;
1211 }
1212 }
1213 assert_eq!(hello_i, n);
1214 assert_eq!(goodbye_i, n);
1215 }
1216
1217 #[tokio::test]
1219 async fn file_read_empty_lines() {
1220 let n = 5;
1221
1222 let dir = tempdir().unwrap();
1223 let config = file::FileConfig {
1224 include: vec![dir.path().join("*")],
1225 ..test_default_file_config(&dir)
1226 };
1227
1228 let path = dir.path().join("file");
1229
1230 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1231 let mut file = File::create(&path).unwrap();
1232
1233 writeln!(&mut file, "line for checkpointing").unwrap();
1234 for _i in 0..n {
1235 writeln!(&mut file).unwrap();
1236 }
1237 file.flush().unwrap();
1238
1239 sleep_500_millis().await;
1240 })
1241 .await;
1242
1243 assert_eq!(received.len(), n + 1);
1244 }
1245
1246 #[tokio::test]
1247 async fn file_truncate() {
1248 let n = 5;
1249
1250 let dir = tempdir().unwrap();
1251 let config = file::FileConfig {
1252 include: vec![dir.path().join("*")],
1253 ..test_default_file_config(&dir)
1254 };
1255 let path = dir.path().join("file");
1256 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1257 let mut file = File::create(&path).unwrap();
1258
1259 for i in 0..n {
1260 writeln!(&mut file, "pretrunc {i}").unwrap();
1261 }
1262
1263 file.flush().unwrap();
1264 sleep_500_millis().await; file.set_len(0).unwrap();
1267 file.seek(std::io::SeekFrom::Start(0)).unwrap();
1268
1269 file.sync_all().unwrap();
1270 sleep_500_millis().await; for i in 0..n {
1273 writeln!(&mut file, "posttrunc {i}").unwrap();
1274 }
1275
1276 file.flush().unwrap();
1277 sleep_500_millis().await;
1278 })
1279 .await;
1280
1281 let mut i = 0;
1282 let mut pre_trunc = true;
1283
1284 for event in received {
1285 assert_eq!(
1286 event.as_log()["file"].to_string_lossy(),
1287 path.to_str().unwrap()
1288 );
1289
1290 let line =
1291 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1292
1293 if pre_trunc {
1294 assert_eq!(line, format!("pretrunc {}", i));
1295 } else {
1296 assert_eq!(line, format!("posttrunc {}", i));
1297 }
1298
1299 i += 1;
1300 if i == n {
1301 i = 0;
1302 pre_trunc = false;
1303 }
1304 }
1305 }
1306
1307 #[tokio::test]
1308 async fn file_rotate() {
1309 let n = 5;
1310
1311 let dir = tempdir().unwrap();
1312 let config = file::FileConfig {
1313 include: vec![dir.path().join("*")],
1314 ..test_default_file_config(&dir)
1315 };
1316
1317 let path = dir.path().join("file");
1318 let archive_path = dir.path().join("file");
1319 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1320 let mut file = File::create(&path).unwrap();
1321
1322 for i in 0..n {
1323 writeln!(&mut file, "prerot {i}").unwrap();
1324 }
1325
1326 file.flush().unwrap();
1327 sleep_500_millis().await; fs::rename(&path, archive_path).expect("could not rename");
1330 file.sync_all().unwrap();
1331
1332 let mut file = File::create(&path).unwrap();
1333
1334 file.sync_all().unwrap();
1335 sleep_500_millis().await; for i in 0..n {
1338 writeln!(&mut file, "postrot {i}").unwrap();
1339 }
1340
1341 file.flush().unwrap();
1342 sleep_500_millis().await;
1343 })
1344 .await;
1345
1346 let mut i = 0;
1347 let mut pre_rot = true;
1348
1349 for event in received {
1350 assert_eq!(
1351 event.as_log()["file"].to_string_lossy(),
1352 path.to_str().unwrap()
1353 );
1354
1355 let line =
1356 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1357
1358 if pre_rot {
1359 assert_eq!(line, format!("prerot {}", i));
1360 } else {
1361 assert_eq!(line, format!("postrot {}", i));
1362 }
1363
1364 i += 1;
1365 if i == n {
1366 i = 0;
1367 pre_rot = false;
1368 }
1369 }
1370 }
1371
1372 #[tokio::test]
1373 async fn file_multiple_paths() {
1374 let n = 5;
1375
1376 let dir = tempdir().unwrap();
1377 let config = file::FileConfig {
1378 include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1379 exclude: vec![dir.path().join("a.*.txt")],
1380 ..test_default_file_config(&dir)
1381 };
1382
1383 let path1 = dir.path().join("a.txt");
1384 let path2 = dir.path().join("b.txt");
1385 let path3 = dir.path().join("a.log");
1386 let path4 = dir.path().join("a.ignore.txt");
1387 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1388 let mut file1 = File::create(&path1).unwrap();
1389 let mut file2 = File::create(&path2).unwrap();
1390 let mut file3 = File::create(&path3).unwrap();
1391 let mut file4 = File::create(&path4).unwrap();
1392
1393 for i in 0..n {
1394 writeln!(&mut file1, "1 {i}").unwrap();
1395 writeln!(&mut file2, "2 {i}").unwrap();
1396 writeln!(&mut file3, "3 {i}").unwrap();
1397 writeln!(&mut file4, "4 {i}").unwrap();
1398 }
1399 file1.flush().unwrap();
1400 file2.flush().unwrap();
1401 file3.flush().unwrap();
1402 file4.flush().unwrap();
1403
1404 sleep_500_millis().await;
1405 })
1406 .await;
1407
1408 let mut is = [0; 3];
1409
1410 for event in received {
1411 let line =
1412 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1413 let mut split = line.split(' ');
1414 let file = split.next().unwrap().parse::<usize>().unwrap();
1415 assert_ne!(file, 4);
1416 let i = split.next().unwrap().parse::<usize>().unwrap();
1417
1418 assert_eq!(is[file - 1], i);
1419 is[file - 1] += 1;
1420 }
1421
1422 assert_eq!(is, [n as usize; 3]);
1423 }
1424
1425 #[tokio::test]
1426 async fn file_exclude_paths() {
1427 let n = 5;
1428
1429 let dir = tempdir().unwrap();
1430 let config = file::FileConfig {
1431 include: vec![dir.path().join("a//b/*.log.*")],
1432 exclude: vec![dir.path().join("a//b/test.log.*")],
1433 ..test_default_file_config(&dir)
1434 };
1435
1436 let path1 = dir.path().join("a//b/a.log.1");
1437 let path2 = dir.path().join("a//b/test.log.1");
1438 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1439 std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1440 let mut file1 = File::create(&path1).unwrap();
1441 let mut file2 = File::create(&path2).unwrap();
1442
1443 for i in 0..n {
1444 writeln!(&mut file1, "1 {i}").unwrap();
1445 writeln!(&mut file2, "2 {i}").unwrap();
1446 }
1447
1448 file1.flush().unwrap();
1449 file2.flush().unwrap();
1450 sleep_500_millis().await;
1451 })
1452 .await;
1453
1454 let mut is = [0; 1];
1455
1456 for event in received {
1457 let line =
1458 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1459 let mut split = line.split(' ');
1460 let file = split.next().unwrap().parse::<usize>().unwrap();
1461 assert_ne!(file, 4);
1462 let i = split.next().unwrap().parse::<usize>().unwrap();
1463
1464 assert_eq!(is[file - 1], i);
1465 is[file - 1] += 1;
1466 }
1467
1468 assert_eq!(is, [n as usize; 1]);
1469 }
1470
1471 #[tokio::test]
1472 async fn file_key_acknowledged() {
1473 file_key(Acks).await
1474 }
1475
1476 #[tokio::test]
1477 async fn file_key_no_acknowledge() {
1478 file_key(NoAcks).await
1479 }
1480
1481 async fn file_key(acks: AckingMode) {
1482 {
1484 let dir = tempdir().unwrap();
1485 let config = file::FileConfig {
1486 include: vec![dir.path().join("*")],
1487 ..test_default_file_config(&dir)
1488 };
1489
1490 let path = dir.path().join("file");
1491 let received =
1492 run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1493 let mut file = File::create(&path).unwrap();
1494
1495 writeln!(&mut file, "hello there").unwrap();
1496 file.flush().unwrap();
1497
1498 sleep_500_millis().await;
1499 })
1500 .await;
1501
1502 assert_eq!(received.len(), 1);
1503 assert_eq!(
1504 received[0].as_log()["file"].to_string_lossy(),
1505 path.to_str().unwrap()
1506 );
1507 }
1508
1509 {
1511 let dir = tempdir().unwrap();
1512 let config = file::FileConfig {
1513 include: vec![dir.path().join("*")],
1514 file_key: OptionalValuePath::from(owned_value_path!("source")),
1515 ..test_default_file_config(&dir)
1516 };
1517
1518 let path = dir.path().join("file");
1519 let received =
1520 run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1521 let mut file = File::create(&path).unwrap();
1522
1523 writeln!(&mut file, "hello there").unwrap();
1524 file.flush().unwrap();
1525
1526 sleep_500_millis().await;
1527 })
1528 .await;
1529
1530 assert_eq!(received.len(), 1);
1531 assert_eq!(
1532 received[0].as_log()["source"].to_string_lossy(),
1533 path.to_str().unwrap()
1534 );
1535 }
1536
1537 {
1539 let dir = tempdir().unwrap();
1540 let config = file::FileConfig {
1541 include: vec![dir.path().join("*")],
1542 ..test_default_file_config(&dir)
1543 };
1544
1545 let path = dir.path().join("file");
1546 let received =
1547 run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1548 let mut file = File::create(&path).unwrap();
1549
1550 writeln!(&mut file, "hello there").unwrap();
1551
1552 file.flush().unwrap();
1553 sleep_500_millis().await;
1554 })
1555 .await;
1556
1557 assert_eq!(received.len(), 1);
1558 assert_eq!(
1559 received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1560 vec![
1561 default_file_key()
1562 .path
1563 .expect("file key to exist")
1564 .to_string()
1565 .into(),
1566 log_schema().host_key().unwrap().to_string().into(),
1567 log_schema().message_key().unwrap().to_string().into(),
1568 log_schema().timestamp_key().unwrap().to_string().into(),
1569 log_schema().source_type_key().unwrap().to_string().into()
1570 ]
1571 .into_iter()
1572 .collect::<HashSet<_>>()
1573 );
1574 }
1575 }
1576
1577 #[tokio::test]
1578 async fn file_start_position_server_restart_acknowledged() {
1579 file_start_position_server_restart(Acks).await
1580 }
1581
1582 #[tokio::test]
1583 async fn file_start_position_server_restart_no_acknowledge() {
1584 file_start_position_server_restart(NoAcks).await
1585 }
1586
1587 async fn file_start_position_server_restart(acking: AckingMode) {
1588 let dir = tempdir().unwrap();
1589 let config = file::FileConfig {
1590 include: vec![dir.path().join("*")],
1591 ..test_default_file_config(&dir)
1592 };
1593
1594 let path = dir.path().join("file");
1595 let mut file = File::create(&path).unwrap();
1596 writeln!(&mut file, "zeroth line").unwrap();
1597 file.flush().unwrap();
1598
1599 {
1601 let received =
1602 run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1603 sleep_500_millis().await;
1604 writeln!(&mut file, "first line").unwrap();
1605 file.flush().unwrap();
1606 sleep_500_millis().await;
1607 })
1608 .await;
1609
1610 let lines = extract_messages_string(received);
1611 assert_eq!(lines, vec!["zeroth line", "first line"]);
1612 }
1613 {
1615 let received =
1616 run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1617 sleep_500_millis().await;
1618 writeln!(&mut file, "second line").unwrap();
1619 file.flush().unwrap();
1620 sleep_500_millis().await;
1621 })
1622 .await;
1623
1624 let lines = extract_messages_string(received);
1625 assert_eq!(lines, vec!["second line"]);
1626 }
1627 {
1629 let config = file::FileConfig {
1630 include: vec![dir.path().join("*")],
1631 ignore_checkpoints: Some(true),
1632 read_from: ReadFromConfig::Beginning,
1633 ..test_default_file_config(&dir)
1634 };
1635 let received =
1636 run_file_source(&config, false, acking, LogNamespace::Legacy, None, async {
1637 sleep_500_millis().await;
1638 writeln!(&mut file, "third line").unwrap();
1639 file.flush().unwrap();
1640 sleep_500_millis().await;
1641 })
1642 .await;
1643
1644 let lines = extract_messages_string(received);
1645 assert_eq!(
1646 lines,
1647 vec!["zeroth line", "first line", "second line", "third line"]
1648 );
1649 }
1650 }
1651
1652 #[tokio::test]
1653 async fn file_start_position_server_restart_unfinalized() {
1654 let dir = tempdir().unwrap();
1655 let config = file::FileConfig {
1656 include: vec![dir.path().join("*")],
1657 ..test_default_file_config(&dir)
1658 };
1659
1660 let path = dir.path().join("file");
1661 let mut file = File::create(&path).unwrap();
1662 writeln!(&mut file, "the line").unwrap();
1663 file.flush().unwrap();
1664
1665 let received = run_file_source(
1667 &config,
1668 false,
1669 Unfinalized,
1670 LogNamespace::Legacy,
1671 None,
1672 sleep(Duration::from_secs(5)),
1673 )
1674 .await;
1675 let lines = extract_messages_string(received);
1676 assert_eq!(lines, vec!["the line"]);
1677
1678 let received = run_file_source(
1680 &config,
1681 false,
1682 Unfinalized,
1683 LogNamespace::Legacy,
1684 None,
1685 sleep(Duration::from_secs(5)),
1686 )
1687 .await;
1688 let lines = extract_messages_string(received);
1689 assert_eq!(lines, vec!["the line"]);
1690 }
1691
1692 #[tokio::test]
1693 async fn file_duplicate_processing_after_restart() {
1694 let dir = tempdir().unwrap();
1695 let config = file::FileConfig {
1696 include: vec![dir.path().join("*")],
1697 ..test_default_file_config(&dir)
1698 };
1699
1700 let path = dir.path().join("file");
1701 let mut file = File::create(&path).unwrap();
1702
1703 let line_count = 4000;
1704 for i in 0..line_count {
1705 writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1706 }
1707 file.flush().unwrap();
1708
1709 let received = run_file_source(
1711 &config,
1712 true,
1713 Acks,
1714 LogNamespace::Legacy,
1715 None,
1716 sleep_500_millis(),
1718 )
1719 .await;
1720 let lines = extract_messages_string(received);
1721
1722 assert!(lines.len() < line_count);
1725
1726 let remaining = line_count - lines.len();
1731 let event_count = Arc::new(AtomicUsize::new(0));
1732 let received = run_file_source(
1733 &config,
1734 true,
1735 Acks,
1736 LogNamespace::Legacy,
1737 Some(Arc::clone(&event_count)),
1738 async {
1739 wait_for_atomic_usize_timeout_ms(
1740 Arc::clone(&event_count),
1741 |n| n >= remaining,
1742 5_000,
1743 )
1744 .await;
1745 },
1746 )
1747 .await;
1748 let lines2 = extract_messages_string(received);
1749
1750 assert_eq!(lines.len() + lines2.len(), line_count);
1752 }
1753
1754 #[tokio::test]
1755 async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1756 file_start_position_server_restart_with_file_rotation(Acks).await
1757 }
1758
1759 #[tokio::test]
1760 async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1761 file_start_position_server_restart_with_file_rotation(NoAcks).await
1762 }
1763
1764 async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1765 let dir = tempdir().unwrap();
1766 let config = file::FileConfig {
1767 include: vec![dir.path().join("*")],
1768 ..test_default_file_config(&dir)
1769 };
1770
1771 let path = dir.path().join("file");
1772 let path_for_old_file = dir.path().join("file.old");
1773 {
1775 let received =
1776 run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1777 let mut file = File::create(&path).unwrap();
1778 writeln!(&mut file, "first line").unwrap();
1779 file.flush().unwrap();
1780 sleep_500_millis().await;
1781 })
1782 .await;
1783
1784 let lines = extract_messages_string(received);
1785 assert_eq!(lines, vec!["first line"]);
1786 }
1787 fs::rename(&path, &path_for_old_file).expect("could not rename");
1789 {
1792 let received =
1793 run_file_source(&config, false, acking, LogNamespace::Legacy, None, async {
1794 let mut file = File::create(&path).unwrap();
1795 writeln!(&mut file, "second line").unwrap();
1796 file.flush().unwrap();
1797 sleep_500_millis().await;
1798 })
1799 .await;
1800
1801 let lines = extract_messages_string(received);
1802 assert_eq!(lines, vec!["second line"]);
1803 }
1804 }
1805
1806 #[cfg(unix)] #[tokio::test]
1808 async fn file_start_position_ignore_old_files() {
1809 use std::{
1810 os::unix::io::AsRawFd,
1811 time::{Duration, SystemTime},
1812 };
1813
1814 let dir = tempdir().unwrap();
1815 let config = file::FileConfig {
1816 include: vec![dir.path().join("*")],
1817 ignore_older_secs: Some(5),
1818 ..test_default_file_config(&dir)
1819 };
1820
1821 let before_path = dir.path().join("before");
1822 let mut before_file = File::create(&before_path).unwrap();
1823 let after_path = dir.path().join("after");
1824 let mut after_file = File::create(&after_path).unwrap();
1825
1826 writeln!(&mut before_file, "first line").unwrap(); writeln!(&mut after_file, "_first line").unwrap(); {
1830 let before = SystemTime::now() - Duration::from_secs(8);
1832 let after = SystemTime::now() - Duration::from_secs(2);
1833
1834 let before_time = libc::timeval {
1835 tv_sec: before
1836 .duration_since(SystemTime::UNIX_EPOCH)
1837 .unwrap()
1838 .as_secs() as _,
1839 tv_usec: 0,
1840 };
1841 let before_times = [before_time, before_time];
1842
1843 let after_time = libc::timeval {
1844 tv_sec: after
1845 .duration_since(SystemTime::UNIX_EPOCH)
1846 .unwrap()
1847 .as_secs() as _,
1848 tv_usec: 0,
1849 };
1850 let after_times = [after_time, after_time];
1851
1852 unsafe {
1853 libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1854 libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1855 }
1856 }
1857
1858 before_file.sync_all().unwrap();
1859 after_file.sync_all().unwrap();
1860
1861 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1862 sleep_500_millis().await;
1863 writeln!(&mut before_file, "second line").unwrap();
1864 writeln!(&mut after_file, "_second line").unwrap();
1865
1866 before_file.flush().unwrap();
1867 after_file.flush().unwrap();
1868 sleep_500_millis().await;
1869 })
1870 .await;
1871
1872 let before_lines = received
1873 .iter()
1874 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1875 .map(|event| {
1876 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1877 })
1878 .collect::<Vec<_>>();
1879 let after_lines = received
1880 .iter()
1881 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1882 .map(|event| {
1883 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1884 })
1885 .collect::<Vec<_>>();
1886 assert_eq!(before_lines, vec!["second line"]);
1887 assert_eq!(after_lines, vec!["_first line", "_second line"]);
1888 }
1889
1890 #[tokio::test]
1891 async fn file_max_line_bytes() {
1892 let dir = tempdir().unwrap();
1893 let config = file::FileConfig {
1894 include: vec![dir.path().join("*")],
1895 max_line_bytes: 10,
1896 ..test_default_file_config(&dir)
1897 };
1898
1899 let path = dir.path().join("file");
1900 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1901 let mut file = File::create(&path).unwrap();
1902
1903 writeln!(&mut file, "short").unwrap();
1904 writeln!(&mut file, "this is too long").unwrap();
1905 writeln!(&mut file, "11 eleven11").unwrap();
1906 let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1907 writeln!(&mut file, "{super_long}").unwrap();
1908 writeln!(&mut file, "exactly 10").unwrap();
1909 writeln!(&mut file, "it can end on a line that's too long").unwrap();
1910
1911 file.flush().unwrap();
1912 sleep_500_millis().await;
1913 sleep_500_millis().await;
1914
1915 writeln!(&mut file, "and then continue").unwrap();
1916 writeln!(&mut file, "last short").unwrap();
1917 file.flush().unwrap();
1918
1919 sleep_500_millis().await;
1920 sleep_500_millis().await;
1921 }).await;
1922
1923 let received = extract_messages_value(received);
1924
1925 assert_eq!(
1926 received,
1927 vec!["short".into(), "exactly 10".into(), "last short".into()]
1928 );
1929 }
1930
1931 #[tokio::test]
1932 async fn test_multi_line_aggregation_legacy() {
1933 let dir = tempdir().unwrap();
1934 let config = file::FileConfig {
1935 include: vec![dir.path().join("*")],
1936 message_start_indicator: Some("INFO".into()),
1937 multi_line_timeout: 25,
1938 ..test_default_file_config(&dir)
1939 };
1940
1941 let path = dir.path().join("file");
1942 let event_count = Arc::new(AtomicUsize::new(0));
1943 let received = run_file_source(
1944 &config,
1945 false,
1946 NoAcks,
1947 LogNamespace::Legacy,
1948 Some(Arc::clone(&event_count)),
1949 async {
1950 let mut file = File::create(&path).unwrap();
1951
1952 writeln!(&mut file, "leftover foo").unwrap();
1956 writeln!(&mut file, "INFO hello").unwrap();
1957 writeln!(&mut file, "INFO goodbye").unwrap();
1958 writeln!(&mut file, "part of goodbye").unwrap();
1959 writeln!(&mut file, "INFO hi again").unwrap();
1960 writeln!(&mut file, "and some more").unwrap();
1961 writeln!(&mut file, "INFO hello").unwrap();
1962 file.flush().unwrap();
1963
1964 wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 5, 500).await;
1967
1968 writeln!(&mut file, "too slow").unwrap();
1969 writeln!(&mut file, "INFO doesn't have").unwrap();
1970 writeln!(&mut file, "to be INFO in").unwrap();
1971 writeln!(&mut file, "the middle").unwrap();
1972 file.flush().unwrap();
1973
1974 wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 7, 500).await;
1977 },
1978 )
1979 .await;
1980
1981 let received = extract_messages_value(received);
1982
1983 assert_eq!(
1984 received,
1985 vec![
1986 "leftover foo".into(),
1987 "INFO hello".into(),
1988 "INFO goodbye\npart of goodbye".into(),
1989 "INFO hi again\nand some more".into(),
1990 "INFO hello".into(),
1991 "too slow".into(),
1992 "INFO doesn't have".into(),
1993 "to be INFO in\nthe middle".into(),
1994 ]
1995 );
1996 }
1997
1998 #[tokio::test]
1999 async fn test_multi_line_aggregation() {
2000 let dir = tempdir().unwrap();
2001 let config = file::FileConfig {
2002 include: vec![dir.path().join("*")],
2003 multiline: Some(MultilineConfig {
2004 start_pattern: "INFO".to_owned(),
2005 condition_pattern: "INFO".to_owned(),
2006 mode: line_agg::Mode::HaltBefore,
2007 timeout_ms: Duration::from_millis(25),
2008 }),
2009 ..test_default_file_config(&dir)
2010 };
2011
2012 let path = dir.path().join("file");
2013 let event_count = Arc::new(AtomicUsize::new(0));
2014 let received = run_file_source(
2015 &config,
2016 false,
2017 NoAcks,
2018 LogNamespace::Legacy,
2019 Some(Arc::clone(&event_count)),
2020 async {
2021 let mut file = File::create(&path).unwrap();
2022
2023 writeln!(&mut file, "leftover foo").unwrap();
2027 writeln!(&mut file, "INFO hello").unwrap();
2028 writeln!(&mut file, "INFO goodbye").unwrap();
2029 writeln!(&mut file, "part of goodbye").unwrap();
2030 writeln!(&mut file, "INFO hi again").unwrap();
2031 writeln!(&mut file, "and some more").unwrap();
2032 writeln!(&mut file, "INFO hello").unwrap();
2033 file.flush().unwrap();
2034
2035 wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 5, 500).await;
2038
2039 writeln!(&mut file, "too slow").unwrap();
2040 writeln!(&mut file, "INFO doesn't have").unwrap();
2041 writeln!(&mut file, "to be INFO in").unwrap();
2042 writeln!(&mut file, "the middle").unwrap();
2043 file.flush().unwrap();
2044
2045 wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 7, 500).await;
2048 },
2049 )
2050 .await;
2051
2052 let received = extract_messages_value(received);
2053
2054 assert_eq!(
2055 received,
2056 vec![
2057 "leftover foo".into(),
2058 "INFO hello".into(),
2059 "INFO goodbye\npart of goodbye".into(),
2060 "INFO hi again\nand some more".into(),
2061 "INFO hello".into(),
2062 "too slow".into(),
2063 "INFO doesn't have".into(),
2064 "to be INFO in\nthe middle".into(),
2065 ]
2066 );
2067 }
2068
2069 #[tokio::test]
2070 async fn test_multi_line_checkpointing() {
2071 let dir = tempdir().unwrap();
2072 let config = file::FileConfig {
2073 include: vec![dir.path().join("*")],
2074 offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2075 multiline: Some(MultilineConfig {
2076 start_pattern: "INFO".to_owned(),
2077 condition_pattern: "INFO".to_owned(),
2078 mode: line_agg::Mode::HaltBefore,
2079 timeout_ms: Duration::from_millis(25), }),
2081 ..test_default_file_config(&dir)
2082 };
2083
2084 let path = dir.path().join("file");
2085 let mut file = File::create(&path).unwrap();
2086
2087 writeln!(&mut file, "INFO hello").unwrap();
2088 writeln!(&mut file, "part of hello").unwrap();
2089
2090 file.sync_all().unwrap();
2091
2092 let received = run_file_source(
2095 &config,
2096 true,
2097 Acks,
2098 LogNamespace::Legacy,
2099 None,
2100 sleep_500_millis(),
2101 )
2102 .await;
2103
2104 assert_eq!(received[0].as_log()["offset"], 0.into());
2105
2106 let lines = extract_messages_string(received);
2107 assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2108
2109 let received_after_restart =
2111 run_file_source(&config, false, Acks, LogNamespace::Legacy, None, async {
2112 writeln!(&mut file, "INFO goodbye").unwrap();
2113 file.flush().unwrap();
2114 sleep_500_millis().await;
2115 })
2116 .await;
2117 assert_eq!(
2118 received_after_restart[0].as_log()["offset"],
2119 (lines[0].len() + 1).into()
2120 );
2121 let lines = extract_messages_string(received_after_restart);
2122 assert_eq!(lines, vec!["INFO goodbye"]);
2123 }
2124
2125 #[tokio::test]
2126 async fn test_fair_reads() {
2127 let dir = tempdir().unwrap();
2128 let config = file::FileConfig {
2129 include: vec![dir.path().join("*")],
2130 max_read_bytes: 1,
2131 oldest_first: false,
2132 ..test_default_file_config(&dir)
2133 };
2134
2135 let older_path = dir.path().join("z_older_file");
2136 let mut older = File::create(&older_path).unwrap();
2137
2138 writeln!(&mut older, "hello i am the old file").unwrap();
2139 writeln!(&mut older, "i have been around a while").unwrap();
2140 writeln!(&mut older, "you can read newer files at the same time").unwrap();
2141 older.sync_all().unwrap();
2142
2143 let newer_path = dir.path().join("a_newer_file");
2144 let mut newer = File::create(&newer_path).unwrap();
2145
2146 writeln!(&mut newer, "and i am the new file").unwrap();
2147 writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2148 writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2149 newer.sync_all().unwrap();
2150
2151 let received = run_file_source(
2152 &config,
2153 false,
2154 NoAcks,
2155 LogNamespace::Legacy,
2156 None,
2157 sleep_500_millis(),
2158 )
2159 .await;
2160
2161 let received = extract_messages_value(received);
2162
2163 let old_first = vec![
2164 "hello i am the old file".into(),
2165 "and i am the new file".into(),
2166 "i have been around a while".into(),
2167 "this should be interleaved with the old one".into(),
2168 "you can read newer files at the same time".into(),
2169 "which is fine because we want fairness".into(),
2170 ];
2171 let new_first: Vec<_> = old_first
2172 .chunks(2)
2173 .flat_map(|chunk| chunk.iter().rev().cloned().collect::<Vec<_>>())
2174 .collect();
2175
2176 if received[0] == old_first[0] {
2177 assert_eq!(received, old_first);
2178 } else {
2179 assert_eq!(received, new_first);
2180 }
2181 }
2182
2183 #[tokio::test]
2184 async fn test_oldest_first() {
2185 let dir = tempdir().unwrap();
2186 let config = file::FileConfig {
2187 include: vec![dir.path().join("*")],
2188 max_read_bytes: 1,
2189 oldest_first: true,
2190 ..test_default_file_config(&dir)
2191 };
2192
2193 let older_path = dir.path().join("z_older_file");
2194 let mut older = File::create(&older_path).unwrap();
2195 older.sync_all().unwrap();
2196
2197 sleep_500_millis().await;
2199
2200 let newer_path = dir.path().join("a_newer_file");
2201 let mut newer = File::create(&newer_path).unwrap();
2202 newer.sync_all().unwrap();
2203
2204 writeln!(&mut older, "hello i am the old file").unwrap();
2205 writeln!(&mut older, "i have been around a while").unwrap();
2206 writeln!(&mut older, "you should definitely read all of me first").unwrap();
2207 older.flush().unwrap();
2208
2209 writeln!(&mut newer, "i'm new").unwrap();
2210 writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2211 writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2212 newer.flush().unwrap();
2213
2214 let received = run_file_source(
2215 &config,
2216 false,
2217 NoAcks,
2218 LogNamespace::Legacy,
2219 None,
2220 sleep_500_millis(),
2221 )
2222 .await;
2223
2224 let received = extract_messages_value(received);
2225
2226 assert_eq!(
2227 received,
2228 vec![
2229 "hello i am the old file".into(),
2230 "i have been around a while".into(),
2231 "you should definitely read all of me first".into(),
2232 "i'm new".into(),
2233 "hopefully you read all the old stuff first".into(),
2234 "because otherwise i'm not going to make sense".into(),
2235 ]
2236 );
2237 }
2238
2239 #[tokio::test]
2240 async fn test_split_reads() {
2241 let dir = tempdir().unwrap();
2242 let config = file::FileConfig {
2243 include: vec![dir.path().join("*")],
2244 max_read_bytes: 1,
2245 ..test_default_file_config(&dir)
2246 };
2247
2248 let path = dir.path().join("file");
2249 let mut file = File::create(&path).unwrap();
2250
2251 writeln!(&mut file, "hello i am a normal line").unwrap();
2252 file.sync_all().unwrap();
2253
2254 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2255 sleep_500_millis().await;
2256
2257 write!(&mut file, "i am not a full line").unwrap();
2258
2259 file.flush().unwrap();
2260 sleep_500_millis().await;
2262
2263 writeln!(&mut file, " until now").unwrap();
2264
2265 file.flush().unwrap();
2266 sleep_500_millis().await;
2267 })
2268 .await;
2269
2270 let received = extract_messages_value(received);
2271
2272 assert_eq!(
2273 received,
2274 vec![
2275 "hello i am a normal line".into(),
2276 "i am not a full line until now".into(),
2277 ]
2278 );
2279 }
2280
2281 #[tokio::test]
2282 async fn test_gzipped_file() {
2283 let dir = tempdir().unwrap();
2284 let config = file::FileConfig {
2285 include: vec![PathBuf::from("tests/data/gzipped.log")],
2286 max_line_bytes: 100,
2293 ..test_default_file_config(&dir)
2294 };
2295
2296 let received = run_file_source(
2297 &config,
2298 false,
2299 NoAcks,
2300 LogNamespace::Legacy,
2301 None,
2302 sleep_500_millis(),
2303 )
2304 .await;
2305
2306 let received = extract_messages_value(received);
2307
2308 assert_eq!(
2309 received,
2310 vec![
2311 "this is a simple file".into(),
2312 "i have been compressed".into(),
2313 "in order to make me smaller".into(),
2314 "but you can still read me".into(),
2315 "hooray".into(),
2316 ]
2317 );
2318 }
2319
2320 #[tokio::test]
2321 async fn test_non_utf8_encoded_file() {
2322 let dir = tempdir().unwrap();
2323 let config = file::FileConfig {
2324 include: vec![PathBuf::from("tests/data/utf-16le.log")],
2325 encoding: Some(EncodingConfig { charset: UTF_16LE }),
2326 ..test_default_file_config(&dir)
2327 };
2328
2329 let received = run_file_source(
2330 &config,
2331 false,
2332 NoAcks,
2333 LogNamespace::Legacy,
2334 None,
2335 sleep_500_millis(),
2336 )
2337 .await;
2338
2339 let received = extract_messages_value(received);
2340
2341 assert_eq!(
2342 received,
2343 vec![
2344 "hello i am a file".into(),
2345 "i can unicode".into(),
2346 "but i do so in 16 bits".into(),
2347 "and when i byte".into(),
2348 "i become little-endian".into(),
2349 ]
2350 );
2351 }
2352
2353 #[tokio::test]
2354 async fn test_non_default_line_delimiter() {
2355 let dir = tempdir().unwrap();
2356 let config = file::FileConfig {
2357 include: vec![dir.path().join("*")],
2358 line_delimiter: "\r\n".to_string(),
2359 ..test_default_file_config(&dir)
2360 };
2361
2362 let path = dir.path().join("file");
2363 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2364 let mut file = File::create(&path).unwrap();
2365
2366 write!(&mut file, "hello i am a line\r\n").unwrap();
2367 write!(&mut file, "and i am too\r\n").unwrap();
2368 write!(&mut file, "CRLF is how we end\r\n").unwrap();
2369 write!(&mut file, "please treat us well\r\n").unwrap();
2370
2371 file.flush().unwrap();
2372 sleep_500_millis().await;
2373 })
2374 .await;
2375
2376 let received = extract_messages_value(received);
2377
2378 assert_eq!(
2379 received,
2380 vec![
2381 "hello i am a line".into(),
2382 "and i am too".into(),
2383 "CRLF is how we end".into(),
2384 "please treat us well".into()
2385 ]
2386 );
2387 }
2388
2389 #[tokio::test]
2393 async fn test_multi_char_delimiter_split_across_buffer_boundary() {
2394 let dir = tempdir().unwrap();
2395 let config = file::FileConfig {
2396 include: vec![dir.path().join("*")],
2397 line_delimiter: "\r\n".to_string(),
2398 ..test_default_file_config(&dir)
2399 };
2400
2401 let path = dir.path().join("file");
2402 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2403 let mut file = File::create(&path).unwrap();
2404
2405 sleep_500_millis().await;
2406
2407 let buffer_size = 8192;
2414
2415 let event1_prefix = "Event 1: ";
2417 let padding1_len = buffer_size - event1_prefix.len() - 1; write!(&mut file, "{}", event1_prefix).unwrap();
2419 file.write_all(&vec![b'X'; padding1_len]).unwrap();
2420 write!(&mut file, "\r\n").unwrap(); let event2_prefix = "Event 2: ";
2424 let padding2_len = buffer_size - event2_prefix.len() - 1;
2425 write!(&mut file, "{}", event2_prefix).unwrap();
2426 file.write_all(&vec![b'Y'; padding2_len]).unwrap();
2427 write!(&mut file, "\r\n").unwrap(); write!(&mut file, "Event 3: Final\r\n").unwrap();
2431
2432 sleep_500_millis().await;
2433 })
2434 .await;
2435
2436 let messages = extract_messages_value(received);
2437
2438 assert_eq!(
2440 messages.len(),
2441 3,
2442 "Should receive exactly 3 separate events (bug would merge them)"
2443 );
2444
2445 let msg0 = messages[0].to_string_lossy();
2447 let msg1 = messages[1].to_string_lossy();
2448 let msg2 = messages[2].to_string_lossy();
2449
2450 assert!(
2451 msg0.starts_with("Event 1: "),
2452 "First event should start with 'Event 1: ', got: {}",
2453 msg0
2454 );
2455 assert!(
2456 msg1.starts_with("Event 2: "),
2457 "Second event should start with 'Event 2: ', got: {}",
2458 msg1
2459 );
2460 assert_eq!(msg2, "Event 3: Final");
2461
2462 for (i, msg) in messages.iter().enumerate() {
2464 let msg_str = msg.to_string_lossy();
2465 assert!(
2466 !msg_str.contains('\r'),
2467 "Event {} should not contain embedded \\r",
2468 i
2469 );
2470 assert!(
2471 !msg_str.contains('\n'),
2472 "Event {} should not contain embedded \\n",
2473 i
2474 );
2475 }
2476 }
2477
2478 #[tokio::test]
2479 async fn remove_file() {
2480 let n = 5;
2481 let remove_after_secs = 1;
2482
2483 let dir = tempdir().unwrap();
2484 let config = file::FileConfig {
2485 include: vec![dir.path().join("*")],
2486 remove_after_secs: Some(remove_after_secs),
2487 ..test_default_file_config(&dir)
2488 };
2489
2490 let path = dir.path().join("file");
2491 let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, None, async {
2492 let mut file = File::create(&path).unwrap();
2493
2494 for i in 0..n {
2495 writeln!(&mut file, "{i}").unwrap();
2496 }
2497 file.flush().unwrap();
2498 drop(file);
2499
2500 for _ in 0..10 {
2501 sleep(Duration::from_secs(remove_after_secs + 1)).await;
2503
2504 if File::open(&path).is_err() {
2505 break;
2506 }
2507 }
2508 })
2509 .await;
2510
2511 assert_eq!(received.len(), n);
2512
2513 match File::open(&path) {
2514 Ok(_) => panic!("File wasn't removed"),
2515 Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2516 }
2517 }
2518
2519 #[derive(Clone, Copy, Eq, PartialEq)]
2520 enum AckingMode {
2521 NoAcks, Unfinalized, Acks, }
2525 use AckingMode::*;
2526 use vector_lib::lookup::OwnedTargetPath;
2527
2528 async fn run_file_source(
2529 config: &FileConfig,
2530 wait_shutdown: bool,
2531 acking_mode: AckingMode,
2532 log_namespace: LogNamespace,
2533 event_counter: Option<Arc<AtomicUsize>>,
2538 inner: impl Future<Output = ()>,
2539 ) -> Vec<Event> {
2540 assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2541 let (tx, rx) = match acking_mode {
2542 Acks => {
2543 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2544 (tx, rx.boxed())
2545 }
2546 Unfinalized => {
2547 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Rejected);
2552 (tx, rx.boxed())
2553 }
2554 NoAcks => {
2555 let (tx, rx) = SourceSender::new_test();
2556 (tx, rx.boxed())
2557 }
2558 };
2559
2560 let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2561 let data_dir = config.data_dir.clone().unwrap();
2562 let acks = !matches!(acking_mode, NoAcks);
2563
2564 tokio::spawn(file::file_source(
2565 config,
2566 data_dir,
2567 shutdown,
2568 tx,
2569 acks,
2570 log_namespace,
2571 ));
2572
2573 let result = if let Some(counter) = event_counter {
2574 let (relay_tx, mut relay_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
2577 tokio::spawn(async move {
2578 let mut rx = rx;
2579 while let Some(event) = rx.next().await {
2580 counter.fetch_add(1, Ordering::SeqCst);
2581 relay_tx.send(event).ok(); }
2583 });
2584
2585 inner.await;
2586 drop(trigger_shutdown);
2587
2588 timeout(Duration::from_secs(5), async move {
2589 let mut events = Vec::new();
2590 while let Some(event) = relay_rx.recv().await {
2591 events.push(event);
2592 }
2593 events
2594 })
2595 .await
2596 .expect("Unclosed channel: may indicate file-server could not shutdown gracefully.")
2597 } else {
2598 inner.await;
2599 drop(trigger_shutdown);
2600
2601 if acking_mode == Unfinalized {
2602 rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2603 .collect::<Vec<_>>()
2604 .await
2605 } else {
2606 timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2607 .await
2608 .expect(
2609 "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2610 )
2611 }
2612 };
2613
2614 if wait_shutdown {
2615 shutdown_done.await;
2616 }
2617
2618 result
2619 })
2620 .await
2621 }
2622
2623 fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2624 received
2625 .into_iter()
2626 .map(Event::into_log)
2627 .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2628 .collect()
2629 }
2630
2631 fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2632 received
2633 .into_iter()
2634 .map(Event::into_log)
2635 .map(|log| log.get_message().unwrap().clone())
2636 .collect()
2637 }
2638}