Skip to main content

vector/sources/
file.rs

1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::sync::oneshot;
10use tracing::{Instrument, Span};
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    codecs::{BytesDeserializer, BytesDeserializerConfig},
14    config::{LegacyKey, LogNamespace},
15    configurable::configurable_component,
16    file_source::{
17        file_server::{FileServer, Line, calculate_ignore_before},
18        paths_provider::{Glob, MatchOptions},
19    },
20    file_source_common::{
21        Checkpointer, FileFingerprint, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
22    },
23    finalizer::OrderedFinalizer,
24    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
25};
26use vrl::value::Kind;
27
28use super::util::{EncodingConfig, MultilineConfig};
29use crate::{
30    SourceSender,
31    config::{
32        DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
33        log_schema,
34    },
35    encoding_transcode::{Decoder, Encoder},
36    event::{BatchNotifier, BatchStatus, LogEvent},
37    internal_events::{
38        FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
39        FileSourceInternalEventsEmitter, StreamClosedError,
40    },
41    line_agg::{self, LineAgg},
42    serde::bool_or_struct,
43    shutdown::ShutdownSignal,
44};
45
46#[derive(Debug, Snafu)]
47enum BuildError {
48    #[snafu(display(
49        "message_start_indicator {:?} is not a valid regex: {}",
50        indicator,
51        source
52    ))]
53    InvalidMessageStartIndicator {
54        indicator: String,
55        source: regex::Error,
56    },
57}
58
59/// Configuration for the `file` source.
60#[serde_as]
61#[configurable_component(source("file", "Collect logs from files."))]
62#[derive(Clone, Debug, PartialEq, Eq)]
63#[serde(deny_unknown_fields)]
64pub struct FileConfig {
65    /// Array of file patterns to include. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
66    #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
67    pub include: Vec<PathBuf>,
68
69    /// Array of file patterns to exclude. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
70    ///
71    /// Takes precedence over the `include` option. Note: The `exclude` patterns are applied _after_ the attempt to glob everything
72    /// in `include`. This means that all files are first matched by `include` and then filtered by the `exclude`
73    /// patterns. This can be impactful if `include` contains directories with contents that are not accessible.
74    #[serde(default)]
75    #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
76    pub exclude: Vec<PathBuf>,
77
78    /// Overrides the name of the log field used to add the file path to each event.
79    ///
80    /// The value is the full path to the file where the event was read message.
81    ///
82    /// Set to `""` to suppress this key.
83    #[serde(default = "default_file_key")]
84    #[configurable(metadata(docs::examples = "path"))]
85    pub file_key: OptionalValuePath,
86
87    /// Whether or not to start reading from the beginning of a new file.
88    #[configurable(
89        deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
90    )]
91    #[configurable(metadata(docs::hidden))]
92    #[serde(default)]
93    pub start_at_beginning: Option<bool>,
94
95    /// Whether or not to ignore existing checkpoints when determining where to start reading a file.
96    ///
97    /// Checkpoints are still written normally.
98    #[serde(default)]
99    pub ignore_checkpoints: Option<bool>,
100
101    #[serde(default = "default_read_from")]
102    #[configurable(derived)]
103    pub read_from: ReadFromConfig,
104
105    /// Ignore files with a data modification date older than the specified number of seconds.
106    #[serde(alias = "ignore_older", default)]
107    #[configurable(metadata(docs::type_unit = "seconds"))]
108    #[configurable(metadata(docs::examples = 600))]
109    #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
110    pub ignore_older_secs: Option<u64>,
111
112    /// The maximum size of a line before it is discarded.
113    ///
114    /// This protects against malformed lines or tailing incorrect files.
115    #[serde(default = "default_max_line_bytes")]
116    #[configurable(metadata(docs::type_unit = "bytes"))]
117    pub max_line_bytes: usize,
118
119    /// Overrides the name of the log field used to add the current hostname to each event.
120    ///
121    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
122    ///
123    /// Set to `""` to suppress this key.
124    ///
125    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
126    #[configurable(metadata(docs::examples = "hostname"))]
127    pub host_key: Option<OptionalValuePath>,
128
129    /// The directory used to persist file checkpoint positions.
130    ///
131    /// By default, the [global `data_dir` option][global_data_dir] is used.
132    /// Make sure the running user has write permissions to this directory.
133    ///
134    /// If this directory is specified, then Vector will attempt to create it.
135    ///
136    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
137    #[serde(default)]
138    #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
139    #[configurable(metadata(docs::human_name = "Data Directory"))]
140    pub data_dir: Option<PathBuf>,
141
142    /// Enables adding the file offset to each event and sets the name of the log field used.
143    ///
144    /// The value is the byte offset of the start of the line within the file.
145    ///
146    /// Off by default, the offset is only added to the event if this is set.
147    #[serde(default)]
148    #[configurable(metadata(docs::examples = "offset"))]
149    pub offset_key: Option<OptionalValuePath>,
150
151    /// The delay between file discovery calls.
152    ///
153    /// This controls the interval at which files are searched. A higher value results in greater
154    /// chances of some short-lived files being missed between searches, but a lower value increases
155    /// the performance impact of file discovery.
156    #[serde(
157        alias = "glob_minimum_cooldown",
158        default = "default_glob_minimum_cooldown_ms"
159    )]
160    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161    #[configurable(metadata(docs::type_unit = "milliseconds"))]
162    #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
163    pub glob_minimum_cooldown_ms: Duration,
164
165    #[configurable(derived)]
166    #[serde(alias = "fingerprinting", default)]
167    fingerprint: FingerprintConfig,
168
169    /// Ignore missing files when fingerprinting.
170    ///
171    /// This may be useful when used with source directories containing dangling symlinks.
172    #[serde(default)]
173    pub ignore_not_found: bool,
174
175    /// String value used to identify the start of a multi-line message.
176    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
177    #[configurable(metadata(docs::hidden))]
178    #[serde(default)]
179    pub message_start_indicator: Option<String>,
180
181    /// How long to wait for more data when aggregating a multi-line message, in milliseconds.
182    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
183    #[configurable(metadata(docs::hidden))]
184    #[serde(default = "default_multi_line_timeout")]
185    pub multi_line_timeout: u64,
186
187    /// Multiline aggregation configuration.
188    ///
189    /// If not specified, multiline aggregation is disabled.
190    #[configurable(derived)]
191    #[serde(default)]
192    pub multiline: Option<MultilineConfig>,
193
194    /// Max amount of bytes to read from a single file before switching over to the next file.
195    /// **Note:** This does not apply when `oldest_first` is `true`.
196    ///
197    /// This allows distributing the reads more or less evenly across
198    /// the files.
199    #[serde(default = "default_max_read_bytes")]
200    #[configurable(metadata(docs::type_unit = "bytes"))]
201    pub max_read_bytes: usize,
202
203    /// Instead of balancing read capacity fairly across all watched files, prioritize draining the oldest files before moving on to read data from more recent files.
204    #[serde(default)]
205    pub oldest_first: bool,
206
207    /// After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
208    ///
209    /// If not specified, files are not removed.
210    #[serde(alias = "remove_after", default)]
211    #[configurable(metadata(docs::type_unit = "seconds"))]
212    #[configurable(metadata(docs::examples = 0))]
213    #[configurable(metadata(docs::examples = 5))]
214    #[configurable(metadata(docs::examples = 60))]
215    #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
216    pub remove_after_secs: Option<u64>,
217
218    /// String sequence used to separate one file line from another.
219    #[serde(default = "default_line_delimiter")]
220    #[configurable(metadata(docs::examples = "\r\n"))]
221    pub line_delimiter: String,
222
223    #[configurable(derived)]
224    #[serde(default)]
225    pub encoding: Option<EncodingConfig>,
226
227    #[configurable(derived)]
228    #[serde(default, deserialize_with = "bool_or_struct")]
229    acknowledgements: SourceAcknowledgementsConfig,
230
231    /// The namespace to use for logs. This overrides the global setting.
232    #[configurable(metadata(docs::hidden))]
233    #[serde(default)]
234    log_namespace: Option<bool>,
235
236    #[configurable(derived)]
237    #[serde(default)]
238    internal_metrics: FileInternalMetricsConfig,
239
240    /// How long to keep an open handle to a rotated log file.
241    /// The default value represents "no limit"
242    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
243    #[configurable(metadata(docs::type_unit = "seconds"))]
244    #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
245    pub rotate_wait: Duration,
246}
247
248fn default_max_line_bytes() -> usize {
249    bytesize::kib(100u64) as usize
250}
251
252fn default_file_key() -> OptionalValuePath {
253    OptionalValuePath::from(owned_value_path!("file"))
254}
255
256const fn default_read_from() -> ReadFromConfig {
257    ReadFromConfig::Beginning
258}
259
260const fn default_glob_minimum_cooldown_ms() -> Duration {
261    Duration::from_millis(1000)
262}
263
264const fn default_multi_line_timeout() -> u64 {
265    1000
266} // deprecated
267
268const fn default_max_read_bytes() -> usize {
269    2048
270}
271
272fn default_line_delimiter() -> String {
273    "\n".to_string()
274}
275
276const fn default_rotate_wait() -> Duration {
277    Duration::from_secs(u64::MAX / 2)
278}
279
280/// Configuration for how files should be identified.
281///
282/// This is important for `checkpointing` when file rotation is used.
283#[configurable_component]
284#[derive(Clone, Debug, PartialEq, Eq)]
285#[serde(tag = "strategy", rename_all = "snake_case")]
286#[configurable(metadata(
287    docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
288))]
289pub enum FingerprintConfig {
290    /// Read lines from the beginning of the file and compute a checksum over them.
291    Checksum {
292        /// The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum.
293        /// If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only
294        /// gzip is supported at this time.
295        ///
296        /// This can be helpful if all files share a common header that should be skipped.
297        #[serde(default = "default_ignored_header_bytes")]
298        #[configurable(metadata(docs::type_unit = "bytes"))]
299        ignored_header_bytes: usize,
300
301        /// The number of lines to read for generating the checksum.
302        ///
303        /// The number of lines are determined from the uncompressed content if the file is compressed. Only
304        /// gzip is supported at this time.
305        ///
306        /// If the file has less than this amount of lines, it won’t be read at all.
307        #[serde(default = "default_lines")]
308        #[configurable(metadata(docs::type_unit = "lines"))]
309        lines: usize,
310    },
311
312    /// Use the [device and inode][inode] as the identifier.
313    ///
314    /// [inode]: https://en.wikipedia.org/wiki/Inode
315    #[serde(rename = "device_and_inode")]
316    DevInode,
317}
318
319impl Default for FingerprintConfig {
320    fn default() -> Self {
321        Self::Checksum {
322            ignored_header_bytes: 0,
323            lines: default_lines(),
324        }
325    }
326}
327
328const fn default_ignored_header_bytes() -> usize {
329    0
330}
331
332const fn default_lines() -> usize {
333    1
334}
335
336impl From<FingerprintConfig> for FingerprintStrategy {
337    fn from(config: FingerprintConfig) -> FingerprintStrategy {
338        match config {
339            FingerprintConfig::Checksum {
340                ignored_header_bytes,
341                lines,
342            } => FingerprintStrategy::FirstLinesChecksum {
343                ignored_header_bytes,
344                lines,
345            },
346            FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
347        }
348    }
349}
350
351#[derive(Debug)]
352pub(crate) struct FinalizerEntry {
353    pub(crate) file_id: FileFingerprint,
354    pub(crate) offset: u64,
355}
356
357impl Default for FileConfig {
358    fn default() -> Self {
359        Self {
360            include: vec![PathBuf::from("/var/log/**/*.log")],
361            exclude: vec![],
362            file_key: default_file_key(),
363            start_at_beginning: None,
364            ignore_checkpoints: None,
365            read_from: default_read_from(),
366            ignore_older_secs: None,
367            max_line_bytes: default_max_line_bytes(),
368            fingerprint: FingerprintConfig::default(),
369            ignore_not_found: false,
370            host_key: None,
371            offset_key: None,
372            data_dir: None,
373            glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
374            message_start_indicator: None,
375            multi_line_timeout: default_multi_line_timeout(), // millis
376            multiline: None,
377            max_read_bytes: default_max_read_bytes(),
378            oldest_first: false,
379            remove_after_secs: None,
380            line_delimiter: default_line_delimiter(),
381            encoding: None,
382            acknowledgements: Default::default(),
383            log_namespace: None,
384            internal_metrics: Default::default(),
385            rotate_wait: default_rotate_wait(),
386        }
387    }
388}
389
390impl_generate_config_from_default!(FileConfig);
391
392#[async_trait::async_trait]
393#[typetag::serde(name = "file")]
394impl SourceConfig for FileConfig {
395    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
396        // add the source name as a subdir, so that multiple sources can
397        // operate within the same given data_dir (e.g. the global one)
398        // without the file servers' checkpointers interfering with each
399        // other
400        let data_dir = cx
401            .globals
402            // source are only global, name can be used for subdir
403            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
404
405        // Clippy rule, because async_trait?
406        #[allow(clippy::suspicious_else_formatting)]
407        {
408            if let Some(ref config) = self.multiline {
409                let _: line_agg::Config = config.try_into()?;
410            }
411
412            if let Some(ref indicator) = self.message_start_indicator {
413                Regex::new(indicator)
414                    .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
415            }
416        }
417
418        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
419
420        let log_namespace = cx.log_namespace(self.log_namespace);
421
422        Ok(file_source(
423            self,
424            data_dir,
425            cx.shutdown,
426            cx.out,
427            acknowledgements,
428            log_namespace,
429        ))
430    }
431
432    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
433        let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
434        let host_key = self
435            .host_key
436            .clone()
437            .unwrap_or(log_schema().host_key().cloned().into())
438            .path
439            .map(LegacyKey::Overwrite);
440
441        let offset_key = self
442            .offset_key
443            .clone()
444            .and_then(|k| k.path)
445            .map(LegacyKey::Overwrite);
446
447        let schema_definition = BytesDeserializerConfig
448            .schema_definition(global_log_namespace.merge(self.log_namespace))
449            .with_standard_vector_source_metadata()
450            .with_source_metadata(
451                Self::NAME,
452                host_key,
453                &owned_value_path!("host"),
454                Kind::bytes().or_undefined(),
455                Some("host"),
456            )
457            .with_source_metadata(
458                Self::NAME,
459                offset_key,
460                &owned_value_path!("offset"),
461                Kind::integer(),
462                None,
463            )
464            .with_source_metadata(
465                Self::NAME,
466                file_key,
467                &owned_value_path!("path"),
468                Kind::bytes(),
469                None,
470            );
471
472        vec![SourceOutput::new_maybe_logs(
473            DataType::Log,
474            schema_definition,
475        )]
476    }
477
478    fn can_acknowledge(&self) -> bool {
479        true
480    }
481}
482
483pub fn file_source(
484    config: &FileConfig,
485    data_dir: PathBuf,
486    shutdown: ShutdownSignal,
487    mut out: SourceSender,
488    acknowledgements: bool,
489    log_namespace: LogNamespace,
490) -> super::Source {
491    // the include option must be specified but also must contain at least one entry.
492    if config.include.is_empty() {
493        error!(
494            message = "`include` configuration option must contain at least one file pattern.",
495            internal_log_rate_limit = false
496        );
497        return Box::pin(future::ready(Err(())));
498    }
499
500    let exclude_patterns = config
501        .exclude
502        .iter()
503        .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
504        .collect::<Vec<PathBuf>>();
505    let ignore_before = calculate_ignore_before(config.ignore_older_secs);
506    let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
507    let (ignore_checkpoints, read_from) = reconcile_position_options(
508        config.start_at_beginning,
509        config.ignore_checkpoints,
510        Some(config.read_from),
511    );
512
513    let emitter = FileSourceInternalEventsEmitter {
514        include_file_metric_tag: config.internal_metrics.include_file_tag,
515    };
516
517    let paths_provider = Glob::new(
518        &config.include,
519        &exclude_patterns,
520        MatchOptions::default(),
521        emitter.clone(),
522    )
523    .expect("invalid glob patterns");
524
525    let encoding_charset = config.encoding.clone().map(|e| e.charset);
526
527    // if file encoding is specified, need to convert the line delimiter (present as utf8)
528    // to the specified encoding, so that delimiter-based line splitting can work properly
529    let line_delimiter_as_bytes = match encoding_charset {
530        Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
531        None => Bytes::from(config.line_delimiter.clone()),
532    };
533
534    let checkpointer = Checkpointer::new(&data_dir);
535    let strategy = config.fingerprint.clone().into();
536
537    let file_server = FileServer {
538        paths_provider,
539        max_read_bytes: config.max_read_bytes,
540        ignore_checkpoints,
541        read_from,
542        ignore_before,
543        max_line_bytes: config.max_line_bytes,
544        line_delimiter: line_delimiter_as_bytes,
545        data_dir,
546        glob_minimum_cooldown,
547        fingerprinter: Fingerprinter::new(strategy, config.max_line_bytes, config.ignore_not_found),
548        oldest_first: config.oldest_first,
549        remove_after: config.remove_after_secs.map(Duration::from_secs),
550        emitter,
551        rotate_wait: config.rotate_wait,
552    };
553
554    let event_metadata = EventMetadata {
555        host_key: config
556            .host_key
557            .clone()
558            .unwrap_or(log_schema().host_key().cloned().into())
559            .path,
560        hostname: crate::get_hostname().ok(),
561        file_key: config.file_key.clone().path,
562        offset_key: config.offset_key.clone().and_then(|k| k.path),
563    };
564
565    let include = config.include.clone();
566    let exclude = config.exclude.clone();
567    let multiline_config = config.multiline.clone();
568    let message_start_indicator = config.message_start_indicator.clone();
569    let multi_line_timeout = config.multi_line_timeout;
570
571    let (finalizer, shutdown_checkpointer) = if acknowledgements {
572        // The shutdown sent in to the finalizer is the global
573        // shutdown handle used to tell it to stop accepting new batch
574        // statuses and just wait for the remaining acks to come in.
575        let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
576
577        // We set up a separate shutdown signal to tie together the
578        // finalizer and the checkpoint writer task in the file
579        // server, to make it continue to write out updated
580        // checkpoints until all the acks have come in.
581        let (send_shutdown, shutdown2) = oneshot::channel::<()>();
582        let checkpoints = checkpointer.view();
583        crate::spawn_in_current_span(async move {
584            while let Some((status, entry)) = ack_stream.next().await {
585                if status == BatchStatus::Delivered {
586                    checkpoints.update(entry.file_id, entry.offset);
587                }
588            }
589            send_shutdown.send(())
590        });
591        (Some(finalizer), shutdown2.map(|_| ()).boxed())
592    } else {
593        // When not dealing with end-to-end acknowledgements, just
594        // clone the global shutdown to stop the checkpoint writer.
595        (None, shutdown.clone().map(|_| ()).boxed())
596    };
597
598    let checkpoints = checkpointer.view();
599    let include_file_metric_tag = config.internal_metrics.include_file_tag;
600    Box::pin(async move {
601        info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
602
603        let mut encoding_decoder = encoding_charset.map(Decoder::new);
604
605        // sizing here is just a guess
606        let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
607        let rx = rx
608            .map(futures::stream::iter)
609            .flatten()
610            .map(move |mut line| {
611                emit!(FileBytesReceived {
612                    byte_size: line.text.len(),
613                    file: &line.filename,
614                    include_file_metric_tag,
615                });
616                // transcode each line from the file's encoding charset to utf8
617                line.text = match encoding_decoder.as_mut() {
618                    Some(d) => d.decode_to_utf8(line.text),
619                    None => line.text,
620                };
621                line
622            });
623
624        let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
625            if let Some(ref multiline_config) = multiline_config {
626                wrap_with_line_agg(
627                    rx,
628                    multiline_config.try_into().unwrap(), // validated in build
629                )
630            } else if let Some(msi) = message_start_indicator {
631                wrap_with_line_agg(
632                    rx,
633                    line_agg::Config::for_legacy(
634                        Regex::new(&msi).unwrap(), // validated in build
635                        multi_line_timeout,
636                    ),
637                )
638            } else {
639                Box::new(rx)
640            };
641
642        // Once file server ends this will run until it has finished processing remaining
643        // logs in the queue.
644        let span = Span::current();
645        let mut messages = messages.map(move |line| {
646            let mut event = create_event(
647                line.text,
648                line.start_offset,
649                &line.filename,
650                &event_metadata,
651                log_namespace,
652                include_file_metric_tag,
653            );
654
655            if let Some(finalizer) = &finalizer {
656                let (batch, receiver) = BatchNotifier::new_with_receiver();
657                event = event.with_batch_notifier(&batch);
658                let entry = FinalizerEntry {
659                    file_id: line.file_id,
660                    offset: line.end_offset,
661                };
662                // checkpoints.update will be called from ack_stream's thread
663                finalizer.add(entry, receiver);
664            } else {
665                checkpoints.update(line.file_id, line.end_offset);
666            }
667            event
668        });
669        tokio::spawn(async move {
670            match out
671                .send_event_stream(&mut messages)
672                .instrument(span.or_current())
673                .await
674            {
675                Ok(()) => {
676                    debug!("Finished sending.");
677                }
678                Err(_) => {
679                    let (count, _) = messages.size_hint();
680                    emit!(StreamClosedError { count });
681                }
682            }
683        });
684
685        let span = info_span!("file_server");
686        tokio::task::spawn_blocking(move || {
687            let _enter = span.enter();
688            let rt = tokio::runtime::Handle::current();
689            let result =
690                rt.block_on(file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer));
691            emit!(FileOpen { count: 0 });
692            // Panic if we encounter any error originating from the file server.
693            // We're at the `spawn_blocking` call, the panic will be caught and
694            // passed to the `JoinHandle` error, similar to the usual threads.
695            result.expect("file server exited with an error");
696        })
697        .map_err(|error| error!(message="File server unexpectedly stopped.", %error, internal_log_rate_limit = false))
698        .await
699    })
700}
701
702/// Emit deprecation warning if the old option is used, and take it into account when determining
703/// defaults. Any of the newer options will override it when set directly.
704fn reconcile_position_options(
705    start_at_beginning: Option<bool>,
706    ignore_checkpoints: Option<bool>,
707    read_from: Option<ReadFromConfig>,
708) -> (bool, ReadFrom) {
709    if start_at_beginning.is_some() {
710        warn!(
711            message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead."
712        )
713    }
714
715    match start_at_beginning {
716        Some(true) => (
717            ignore_checkpoints.unwrap_or(true),
718            read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
719        ),
720        _ => (
721            ignore_checkpoints.unwrap_or(false),
722            read_from.map(Into::into).unwrap_or_default(),
723        ),
724    }
725}
726
727fn wrap_with_line_agg(
728    rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
729    config: line_agg::Config,
730) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
731    let logic = line_agg::Logic::new(config);
732    Box::new(
733        LineAgg::new(
734            rx.map(|line| {
735                (
736                    line.filename,
737                    line.text,
738                    (line.file_id, line.start_offset, line.end_offset),
739                )
740            }),
741            logic,
742        )
743        .map(
744            |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
745                text,
746                filename,
747                file_id,
748                start_offset,
749                end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
750                    lastline_end_offset
751                }),
752            },
753        ),
754    )
755}
756
757struct EventMetadata {
758    host_key: Option<OwnedValuePath>,
759    hostname: Option<String>,
760    file_key: Option<OwnedValuePath>,
761    offset_key: Option<OwnedValuePath>,
762}
763
764fn create_event(
765    line: Bytes,
766    offset: u64,
767    file: &str,
768    meta: &EventMetadata,
769    log_namespace: LogNamespace,
770    include_file_metric_tag: bool,
771) -> LogEvent {
772    let deserializer = BytesDeserializer;
773    let mut event = deserializer.parse_single(line, log_namespace);
774
775    log_namespace.insert_vector_metadata(
776        &mut event,
777        log_schema().source_type_key(),
778        path!("source_type"),
779        Bytes::from_static(FileConfig::NAME.as_bytes()),
780    );
781    log_namespace.insert_vector_metadata(
782        &mut event,
783        log_schema().timestamp_key(),
784        path!("ingest_timestamp"),
785        Utc::now(),
786    );
787
788    let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
789    // `meta.host_key` is already `unwrap_or_else`ed so we can just pass it in.
790    if let Some(hostname) = &meta.hostname {
791        log_namespace.insert_source_metadata(
792            FileConfig::NAME,
793            &mut event,
794            legacy_host_key,
795            path!("host"),
796            hostname.clone(),
797        );
798    }
799
800    let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
801    log_namespace.insert_source_metadata(
802        FileConfig::NAME,
803        &mut event,
804        legacy_offset_key,
805        path!("offset"),
806        offset,
807    );
808
809    let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
810    log_namespace.insert_source_metadata(
811        FileConfig::NAME,
812        &mut event,
813        legacy_file_key,
814        path!("path"),
815        file,
816    );
817
818    emit!(FileEventsReceived {
819        count: 1,
820        file,
821        byte_size: event.estimated_json_encoded_size_of(),
822        include_file_metric_tag,
823    });
824
825    event
826}
827
828#[cfg(test)]
829mod tests {
830    use std::{
831        collections::HashSet,
832        fs::{self, File},
833        future::Future,
834        io::{Seek, Write},
835        sync::{
836            Arc,
837            atomic::{AtomicUsize, Ordering},
838        },
839    };
840
841    use encoding_rs::UTF_16LE;
842    use indoc::indoc;
843    use similar_asserts::assert_eq;
844    use tempfile::tempdir;
845    use tokio::time::{Duration, sleep, timeout};
846    use vector_lib::schema::Definition;
847    use vrl::{value, value::kind::Collection};
848
849    use super::*;
850    use crate::{
851        config::Config,
852        event::{Event, EventStatus, Value},
853        shutdown::ShutdownSignal,
854        sources::file,
855        test_util::{
856            components::{FILE_SOURCE_TAGS, assert_source_compliance},
857            wait_for_atomic_usize_timeout_ms,
858        },
859    };
860
861    #[test]
862    fn generate_config() {
863        crate::test_util::test_generate_config::<FileConfig>();
864    }
865
866    fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
867        // Store checkpoints in a subdirectory so they don't appear in the
868        // glob-watched directory (which covers dir.path()/*).
869        let data_dir = dir.path().join(".data");
870        fs::create_dir_all(&data_dir).unwrap();
871        file::FileConfig {
872            fingerprint: FingerprintConfig::Checksum {
873                ignored_header_bytes: 0,
874                lines: 1,
875            },
876            data_dir: Some(data_dir),
877            glob_minimum_cooldown_ms: Duration::from_millis(100),
878            internal_metrics: FileInternalMetricsConfig {
879                include_file_tag: true,
880            },
881            ..Default::default()
882        }
883    }
884
885    async fn sleep_500_millis() {
886        sleep(Duration::from_millis(500)).await;
887    }
888
889    #[test]
890    fn parse_config() {
891        let config: FileConfig = serde_yaml::from_str(indoc! {
892            r#"
893            include:
894              - /var/log/**/*.log
895            file_key: file
896            glob_minimum_cooldown_ms: 1000
897            multi_line_timeout: 1000
898            max_read_bytes: 2048
899            line_delimiter: "\n"
900            "#,
901        })
902        .unwrap();
903        assert_eq!(config, FileConfig::default());
904        assert_eq!(
905            config.fingerprint,
906            FingerprintConfig::Checksum {
907                ignored_header_bytes: 0,
908                lines: 1
909            }
910        );
911
912        let config: FileConfig = serde_yaml::from_str(indoc! {
913            r#"
914            include:
915              - /var/log/**/*.log
916            fingerprint:
917              strategy: device_and_inode
918            "#,
919        })
920        .unwrap();
921        assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
922
923        let config: FileConfig = serde_yaml::from_str(indoc! {
924            r#"
925            include:
926              - /var/log/**/*.log
927            fingerprint:
928              strategy: checksum
929              bytes: 128
930              ignored_header_bytes: 512
931            "#,
932        })
933        .unwrap();
934        assert_eq!(
935            config.fingerprint,
936            FingerprintConfig::Checksum {
937                ignored_header_bytes: 512,
938                lines: 1
939            }
940        );
941
942        let config: FileConfig = serde_yaml::from_str(indoc! {
943            r#"
944            include:
945              - /var/log/**/*.log
946            encoding:
947              charset: utf-16le
948            "#,
949        })
950        .unwrap();
951        assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
952
953        let config: FileConfig = serde_yaml::from_str(indoc! {
954            r#"
955            include:
956              - /var/log/**/*.log
957            read_from: beginning
958            "#,
959        })
960        .unwrap();
961        assert_eq!(config.read_from, ReadFromConfig::Beginning);
962
963        let config: FileConfig = serde_yaml::from_str(indoc! {
964            r#"
965            include:
966              - /var/log/**/*.log
967            read_from: end
968            "#,
969        })
970        .unwrap();
971        assert_eq!(config.read_from, ReadFromConfig::End);
972    }
973
974    #[test]
975    fn resolve_data_dir() {
976        let global_dir = tempdir().unwrap();
977        let local_dir = tempdir().unwrap();
978
979        let mut config = Config::default();
980        config.global.data_dir = global_dir.keep().into();
981
982        // local path given -- local should win
983        let local_data_dir = Some(local_dir.path().to_path_buf());
984        let res = config
985            .global
986            .resolve_and_validate_data_dir(local_data_dir.as_ref())
987            .unwrap();
988        assert_eq!(res, local_dir.path());
989
990        // no local path given -- global fallback should be in effect
991        let res = config.global.resolve_and_validate_data_dir(None).unwrap();
992        assert_eq!(res, config.global.data_dir.unwrap());
993    }
994
995    #[test]
996    fn output_schema_definition_vector_namespace() {
997        let definitions = FileConfig::default()
998            .outputs(LogNamespace::Vector)
999            .remove(0)
1000            .schema_definition(true);
1001
1002        assert_eq!(
1003            definitions,
1004            Some(
1005                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1006                    .with_meaning(OwnedTargetPath::event_root(), "message")
1007                    .with_metadata_field(
1008                        &owned_value_path!("vector", "source_type"),
1009                        Kind::bytes(),
1010                        None
1011                    )
1012                    .with_metadata_field(
1013                        &owned_value_path!("vector", "ingest_timestamp"),
1014                        Kind::timestamp(),
1015                        None
1016                    )
1017                    .with_metadata_field(
1018                        &owned_value_path!("file", "host"),
1019                        Kind::bytes().or_undefined(),
1020                        Some("host")
1021                    )
1022                    .with_metadata_field(
1023                        &owned_value_path!("file", "offset"),
1024                        Kind::integer(),
1025                        None
1026                    )
1027                    .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1028            )
1029        )
1030    }
1031
1032    #[test]
1033    fn output_schema_definition_legacy_namespace() {
1034        let definitions = FileConfig::default()
1035            .outputs(LogNamespace::Legacy)
1036            .remove(0)
1037            .schema_definition(true);
1038
1039        assert_eq!(
1040            definitions,
1041            Some(
1042                Definition::new_with_default_metadata(
1043                    Kind::object(Collection::empty()),
1044                    [LogNamespace::Legacy]
1045                )
1046                .with_event_field(
1047                    &owned_value_path!("message"),
1048                    Kind::bytes(),
1049                    Some("message")
1050                )
1051                .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1052                .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1053                .with_event_field(
1054                    &owned_value_path!("host"),
1055                    Kind::bytes().or_undefined(),
1056                    Some("host")
1057                )
1058                .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1059                .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1060            )
1061        )
1062    }
1063
1064    #[test]
1065    fn create_event_legacy_namespace() {
1066        let line = Bytes::from("hello world");
1067        let file = "some_file.rs";
1068        let offset: u64 = 0;
1069
1070        let meta = EventMetadata {
1071            host_key: Some(owned_value_path!("host")),
1072            hostname: Some("Some.Machine".to_string()),
1073            file_key: Some(owned_value_path!("file")),
1074            offset_key: Some(owned_value_path!("offset")),
1075        };
1076        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1077
1078        assert_eq!(log["file"], "some_file.rs".into());
1079        assert_eq!(log["host"], "Some.Machine".into());
1080        assert_eq!(log["offset"], 0.into());
1081        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1082        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1083        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1084    }
1085
1086    #[test]
1087    fn create_event_custom_fields_legacy_namespace() {
1088        let line = Bytes::from("hello world");
1089        let file = "some_file.rs";
1090        let offset: u64 = 0;
1091
1092        let meta = EventMetadata {
1093            host_key: Some(owned_value_path!("hostname")),
1094            hostname: Some("Some.Machine".to_string()),
1095            file_key: Some(owned_value_path!("file_path")),
1096            offset_key: Some(owned_value_path!("off")),
1097        };
1098        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1099
1100        assert_eq!(log["file_path"], "some_file.rs".into());
1101        assert_eq!(log["hostname"], "Some.Machine".into());
1102        assert_eq!(log["off"], 0.into());
1103        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1104        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1105        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1106    }
1107
1108    #[test]
1109    fn create_event_vector_namespace() {
1110        let line = Bytes::from("hello world");
1111        let file = "some_file.rs";
1112        let offset: u64 = 0;
1113
1114        let meta = EventMetadata {
1115            host_key: Some(owned_value_path!("ignored")),
1116            hostname: Some("Some.Machine".to_string()),
1117            file_key: Some(owned_value_path!("ignored")),
1118            offset_key: Some(owned_value_path!("ignored")),
1119        };
1120        let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1121
1122        assert_eq!(log.value(), &value!("hello world"));
1123
1124        assert_eq!(
1125            log.metadata()
1126                .value()
1127                .get(path!("vector", "source_type"))
1128                .unwrap(),
1129            &value!("file")
1130        );
1131        assert!(
1132            log.metadata()
1133                .value()
1134                .get(path!("vector", "ingest_timestamp"))
1135                .unwrap()
1136                .is_timestamp()
1137        );
1138
1139        assert_eq!(
1140            log.metadata()
1141                .value()
1142                .get(path!(FileConfig::NAME, "host"))
1143                .unwrap(),
1144            &value!("Some.Machine")
1145        );
1146        assert_eq!(
1147            log.metadata()
1148                .value()
1149                .get(path!(FileConfig::NAME, "offset"))
1150                .unwrap(),
1151            &value!(0)
1152        );
1153        assert_eq!(
1154            log.metadata()
1155                .value()
1156                .get(path!(FileConfig::NAME, "path"))
1157                .unwrap(),
1158            &value!("some_file.rs")
1159        );
1160    }
1161
1162    #[tokio::test]
1163    async fn file_happy_path() {
1164        let n = 5;
1165
1166        let dir = tempdir().unwrap();
1167        let config = file::FileConfig {
1168            include: vec![dir.path().join("*")],
1169            ..test_default_file_config(&dir)
1170        };
1171
1172        let path1 = dir.path().join("file1");
1173        let path2 = dir.path().join("file2");
1174
1175        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1176            let mut file1 = File::create(&path1).unwrap();
1177            let mut file2 = File::create(&path2).unwrap();
1178
1179            for i in 0..n {
1180                writeln!(&mut file1, "hello {i}").unwrap();
1181                writeln!(&mut file2, "goodbye {i}").unwrap();
1182            }
1183
1184            file1.flush().unwrap();
1185            file2.flush().unwrap();
1186
1187            sleep_500_millis().await;
1188        })
1189        .await;
1190
1191        let mut hello_i = 0;
1192        let mut goodbye_i = 0;
1193
1194        for event in received {
1195            let line =
1196                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1197            if line.starts_with("hello") {
1198                assert_eq!(line, format!("hello {}", hello_i));
1199                assert_eq!(
1200                    event.as_log()["file"].to_string_lossy(),
1201                    path1.to_str().unwrap()
1202                );
1203                hello_i += 1;
1204            } else {
1205                assert_eq!(line, format!("goodbye {}", goodbye_i));
1206                assert_eq!(
1207                    event.as_log()["file"].to_string_lossy(),
1208                    path2.to_str().unwrap()
1209                );
1210                goodbye_i += 1;
1211            }
1212        }
1213        assert_eq!(hello_i, n);
1214        assert_eq!(goodbye_i, n);
1215    }
1216
1217    // https://github.com/vectordotdev/vector/issues/8363
1218    #[tokio::test]
1219    async fn file_read_empty_lines() {
1220        let n = 5;
1221
1222        let dir = tempdir().unwrap();
1223        let config = file::FileConfig {
1224            include: vec![dir.path().join("*")],
1225            ..test_default_file_config(&dir)
1226        };
1227
1228        let path = dir.path().join("file");
1229
1230        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1231            let mut file = File::create(&path).unwrap();
1232
1233            writeln!(&mut file, "line for checkpointing").unwrap();
1234            for _i in 0..n {
1235                writeln!(&mut file).unwrap();
1236            }
1237            file.flush().unwrap();
1238
1239            sleep_500_millis().await;
1240        })
1241        .await;
1242
1243        assert_eq!(received.len(), n + 1);
1244    }
1245
1246    #[tokio::test]
1247    async fn file_truncate() {
1248        let n = 5;
1249
1250        let dir = tempdir().unwrap();
1251        let config = file::FileConfig {
1252            include: vec![dir.path().join("*")],
1253            ..test_default_file_config(&dir)
1254        };
1255        let path = dir.path().join("file");
1256        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1257            let mut file = File::create(&path).unwrap();
1258
1259            for i in 0..n {
1260                writeln!(&mut file, "pretrunc {i}").unwrap();
1261            }
1262
1263            file.flush().unwrap();
1264            sleep_500_millis().await; // The writes must be observed before truncating
1265
1266            file.set_len(0).unwrap();
1267            file.seek(std::io::SeekFrom::Start(0)).unwrap();
1268
1269            file.sync_all().unwrap();
1270            sleep_500_millis().await; // The truncate must be observed before writing again
1271
1272            for i in 0..n {
1273                writeln!(&mut file, "posttrunc {i}").unwrap();
1274            }
1275
1276            file.flush().unwrap();
1277            sleep_500_millis().await;
1278        })
1279        .await;
1280
1281        let mut i = 0;
1282        let mut pre_trunc = true;
1283
1284        for event in received {
1285            assert_eq!(
1286                event.as_log()["file"].to_string_lossy(),
1287                path.to_str().unwrap()
1288            );
1289
1290            let line =
1291                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1292
1293            if pre_trunc {
1294                assert_eq!(line, format!("pretrunc {}", i));
1295            } else {
1296                assert_eq!(line, format!("posttrunc {}", i));
1297            }
1298
1299            i += 1;
1300            if i == n {
1301                i = 0;
1302                pre_trunc = false;
1303            }
1304        }
1305    }
1306
1307    #[tokio::test]
1308    async fn file_rotate() {
1309        let n = 5;
1310
1311        let dir = tempdir().unwrap();
1312        let config = file::FileConfig {
1313            include: vec![dir.path().join("*")],
1314            ..test_default_file_config(&dir)
1315        };
1316
1317        let path = dir.path().join("file");
1318        let archive_path = dir.path().join("file");
1319        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1320            let mut file = File::create(&path).unwrap();
1321
1322            for i in 0..n {
1323                writeln!(&mut file, "prerot {i}").unwrap();
1324            }
1325
1326            file.flush().unwrap();
1327            sleep_500_millis().await; // The writes must be observed before rotating
1328
1329            fs::rename(&path, archive_path).expect("could not rename");
1330            file.sync_all().unwrap();
1331
1332            let mut file = File::create(&path).unwrap();
1333
1334            file.sync_all().unwrap();
1335            sleep_500_millis().await; // The rotation must be observed before writing again
1336
1337            for i in 0..n {
1338                writeln!(&mut file, "postrot {i}").unwrap();
1339            }
1340
1341            file.flush().unwrap();
1342            sleep_500_millis().await;
1343        })
1344        .await;
1345
1346        let mut i = 0;
1347        let mut pre_rot = true;
1348
1349        for event in received {
1350            assert_eq!(
1351                event.as_log()["file"].to_string_lossy(),
1352                path.to_str().unwrap()
1353            );
1354
1355            let line =
1356                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1357
1358            if pre_rot {
1359                assert_eq!(line, format!("prerot {}", i));
1360            } else {
1361                assert_eq!(line, format!("postrot {}", i));
1362            }
1363
1364            i += 1;
1365            if i == n {
1366                i = 0;
1367                pre_rot = false;
1368            }
1369        }
1370    }
1371
1372    #[tokio::test]
1373    async fn file_multiple_paths() {
1374        let n = 5;
1375
1376        let dir = tempdir().unwrap();
1377        let config = file::FileConfig {
1378            include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1379            exclude: vec![dir.path().join("a.*.txt")],
1380            ..test_default_file_config(&dir)
1381        };
1382
1383        let path1 = dir.path().join("a.txt");
1384        let path2 = dir.path().join("b.txt");
1385        let path3 = dir.path().join("a.log");
1386        let path4 = dir.path().join("a.ignore.txt");
1387        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1388            let mut file1 = File::create(&path1).unwrap();
1389            let mut file2 = File::create(&path2).unwrap();
1390            let mut file3 = File::create(&path3).unwrap();
1391            let mut file4 = File::create(&path4).unwrap();
1392
1393            for i in 0..n {
1394                writeln!(&mut file1, "1 {i}").unwrap();
1395                writeln!(&mut file2, "2 {i}").unwrap();
1396                writeln!(&mut file3, "3 {i}").unwrap();
1397                writeln!(&mut file4, "4 {i}").unwrap();
1398            }
1399            file1.flush().unwrap();
1400            file2.flush().unwrap();
1401            file3.flush().unwrap();
1402            file4.flush().unwrap();
1403
1404            sleep_500_millis().await;
1405        })
1406        .await;
1407
1408        let mut is = [0; 3];
1409
1410        for event in received {
1411            let line =
1412                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1413            let mut split = line.split(' ');
1414            let file = split.next().unwrap().parse::<usize>().unwrap();
1415            assert_ne!(file, 4);
1416            let i = split.next().unwrap().parse::<usize>().unwrap();
1417
1418            assert_eq!(is[file - 1], i);
1419            is[file - 1] += 1;
1420        }
1421
1422        assert_eq!(is, [n as usize; 3]);
1423    }
1424
1425    #[tokio::test]
1426    async fn file_exclude_paths() {
1427        let n = 5;
1428
1429        let dir = tempdir().unwrap();
1430        let config = file::FileConfig {
1431            include: vec![dir.path().join("a//b/*.log.*")],
1432            exclude: vec![dir.path().join("a//b/test.log.*")],
1433            ..test_default_file_config(&dir)
1434        };
1435
1436        let path1 = dir.path().join("a//b/a.log.1");
1437        let path2 = dir.path().join("a//b/test.log.1");
1438        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1439            std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1440            let mut file1 = File::create(&path1).unwrap();
1441            let mut file2 = File::create(&path2).unwrap();
1442
1443            for i in 0..n {
1444                writeln!(&mut file1, "1 {i}").unwrap();
1445                writeln!(&mut file2, "2 {i}").unwrap();
1446            }
1447
1448            file1.flush().unwrap();
1449            file2.flush().unwrap();
1450            sleep_500_millis().await;
1451        })
1452        .await;
1453
1454        let mut is = [0; 1];
1455
1456        for event in received {
1457            let line =
1458                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1459            let mut split = line.split(' ');
1460            let file = split.next().unwrap().parse::<usize>().unwrap();
1461            assert_ne!(file, 4);
1462            let i = split.next().unwrap().parse::<usize>().unwrap();
1463
1464            assert_eq!(is[file - 1], i);
1465            is[file - 1] += 1;
1466        }
1467
1468        assert_eq!(is, [n as usize; 1]);
1469    }
1470
1471    #[tokio::test]
1472    async fn file_key_acknowledged() {
1473        file_key(Acks).await
1474    }
1475
1476    #[tokio::test]
1477    async fn file_key_no_acknowledge() {
1478        file_key(NoAcks).await
1479    }
1480
1481    async fn file_key(acks: AckingMode) {
1482        // Default
1483        {
1484            let dir = tempdir().unwrap();
1485            let config = file::FileConfig {
1486                include: vec![dir.path().join("*")],
1487                ..test_default_file_config(&dir)
1488            };
1489
1490            let path = dir.path().join("file");
1491            let received =
1492                run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1493                    let mut file = File::create(&path).unwrap();
1494
1495                    writeln!(&mut file, "hello there").unwrap();
1496                    file.flush().unwrap();
1497
1498                    sleep_500_millis().await;
1499                })
1500                .await;
1501
1502            assert_eq!(received.len(), 1);
1503            assert_eq!(
1504                received[0].as_log()["file"].to_string_lossy(),
1505                path.to_str().unwrap()
1506            );
1507        }
1508
1509        // Custom
1510        {
1511            let dir = tempdir().unwrap();
1512            let config = file::FileConfig {
1513                include: vec![dir.path().join("*")],
1514                file_key: OptionalValuePath::from(owned_value_path!("source")),
1515                ..test_default_file_config(&dir)
1516            };
1517
1518            let path = dir.path().join("file");
1519            let received =
1520                run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1521                    let mut file = File::create(&path).unwrap();
1522
1523                    writeln!(&mut file, "hello there").unwrap();
1524                    file.flush().unwrap();
1525
1526                    sleep_500_millis().await;
1527                })
1528                .await;
1529
1530            assert_eq!(received.len(), 1);
1531            assert_eq!(
1532                received[0].as_log()["source"].to_string_lossy(),
1533                path.to_str().unwrap()
1534            );
1535        }
1536
1537        // Hidden
1538        {
1539            let dir = tempdir().unwrap();
1540            let config = file::FileConfig {
1541                include: vec![dir.path().join("*")],
1542                ..test_default_file_config(&dir)
1543            };
1544
1545            let path = dir.path().join("file");
1546            let received =
1547                run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1548                    let mut file = File::create(&path).unwrap();
1549
1550                    writeln!(&mut file, "hello there").unwrap();
1551
1552                    file.flush().unwrap();
1553                    sleep_500_millis().await;
1554                })
1555                .await;
1556
1557            assert_eq!(received.len(), 1);
1558            assert_eq!(
1559                received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1560                vec![
1561                    default_file_key()
1562                        .path
1563                        .expect("file key to exist")
1564                        .to_string()
1565                        .into(),
1566                    log_schema().host_key().unwrap().to_string().into(),
1567                    log_schema().message_key().unwrap().to_string().into(),
1568                    log_schema().timestamp_key().unwrap().to_string().into(),
1569                    log_schema().source_type_key().unwrap().to_string().into()
1570                ]
1571                .into_iter()
1572                .collect::<HashSet<_>>()
1573            );
1574        }
1575    }
1576
1577    #[tokio::test]
1578    async fn file_start_position_server_restart_acknowledged() {
1579        file_start_position_server_restart(Acks).await
1580    }
1581
1582    #[tokio::test]
1583    async fn file_start_position_server_restart_no_acknowledge() {
1584        file_start_position_server_restart(NoAcks).await
1585    }
1586
1587    async fn file_start_position_server_restart(acking: AckingMode) {
1588        let dir = tempdir().unwrap();
1589        let config = file::FileConfig {
1590            include: vec![dir.path().join("*")],
1591            ..test_default_file_config(&dir)
1592        };
1593
1594        let path = dir.path().join("file");
1595        let mut file = File::create(&path).unwrap();
1596        writeln!(&mut file, "zeroth line").unwrap();
1597        file.flush().unwrap();
1598
1599        // First time server runs it picks up existing lines.
1600        {
1601            let received =
1602                run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1603                    sleep_500_millis().await;
1604                    writeln!(&mut file, "first line").unwrap();
1605                    file.flush().unwrap();
1606                    sleep_500_millis().await;
1607                })
1608                .await;
1609
1610            let lines = extract_messages_string(received);
1611            assert_eq!(lines, vec!["zeroth line", "first line"]);
1612        }
1613        // Restart server, read file from checkpoint.
1614        {
1615            let received =
1616                run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1617                    sleep_500_millis().await;
1618                    writeln!(&mut file, "second line").unwrap();
1619                    file.flush().unwrap();
1620                    sleep_500_millis().await;
1621                })
1622                .await;
1623
1624            let lines = extract_messages_string(received);
1625            assert_eq!(lines, vec!["second line"]);
1626        }
1627        // Restart server, read files from beginning.
1628        {
1629            let config = file::FileConfig {
1630                include: vec![dir.path().join("*")],
1631                ignore_checkpoints: Some(true),
1632                read_from: ReadFromConfig::Beginning,
1633                ..test_default_file_config(&dir)
1634            };
1635            let received =
1636                run_file_source(&config, false, acking, LogNamespace::Legacy, None, async {
1637                    sleep_500_millis().await;
1638                    writeln!(&mut file, "third line").unwrap();
1639                    file.flush().unwrap();
1640                    sleep_500_millis().await;
1641                })
1642                .await;
1643
1644            let lines = extract_messages_string(received);
1645            assert_eq!(
1646                lines,
1647                vec!["zeroth line", "first line", "second line", "third line"]
1648            );
1649        }
1650    }
1651
1652    #[tokio::test]
1653    async fn file_start_position_server_restart_unfinalized() {
1654        let dir = tempdir().unwrap();
1655        let config = file::FileConfig {
1656            include: vec![dir.path().join("*")],
1657            ..test_default_file_config(&dir)
1658        };
1659
1660        let path = dir.path().join("file");
1661        let mut file = File::create(&path).unwrap();
1662        writeln!(&mut file, "the line").unwrap();
1663        file.flush().unwrap();
1664
1665        // First time server runs it picks up existing lines.
1666        let received = run_file_source(
1667            &config,
1668            false,
1669            Unfinalized,
1670            LogNamespace::Legacy,
1671            None,
1672            sleep(Duration::from_secs(5)),
1673        )
1674        .await;
1675        let lines = extract_messages_string(received);
1676        assert_eq!(lines, vec!["the line"]);
1677
1678        // Restart server, it re-reads file since the events were not acknowledged before shutdown
1679        let received = run_file_source(
1680            &config,
1681            false,
1682            Unfinalized,
1683            LogNamespace::Legacy,
1684            None,
1685            sleep(Duration::from_secs(5)),
1686        )
1687        .await;
1688        let lines = extract_messages_string(received);
1689        assert_eq!(lines, vec!["the line"]);
1690    }
1691
1692    #[tokio::test]
1693    async fn file_duplicate_processing_after_restart() {
1694        let dir = tempdir().unwrap();
1695        let config = file::FileConfig {
1696            include: vec![dir.path().join("*")],
1697            ..test_default_file_config(&dir)
1698        };
1699
1700        let path = dir.path().join("file");
1701        let mut file = File::create(&path).unwrap();
1702
1703        let line_count = 4000;
1704        for i in 0..line_count {
1705            writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1706        }
1707        file.flush().unwrap();
1708
1709        // First time server runs it should pick up a bunch of lines
1710        let received = run_file_source(
1711            &config,
1712            true,
1713            Acks,
1714            LogNamespace::Legacy,
1715            None,
1716            // shutdown signal is sent after this duration
1717            sleep_500_millis(),
1718        )
1719        .await;
1720        let lines = extract_messages_string(received);
1721
1722        // ...but not all the lines; if the first run processed the entire file, we may not hit the
1723        // bug we're testing for, which happens if the finalizer stream exits on shutdown with pending acks
1724        assert!(lines.len() < line_count);
1725
1726        // Restart the server, and it should read the rest without duplicating any.
1727        // Use the event counter to drain rx continuously (removing backpressure so
1728        // the file server can read all remaining lines without being stalled), then
1729        // trigger shutdown once all expected events have been received.
1730        let remaining = line_count - lines.len();
1731        let event_count = Arc::new(AtomicUsize::new(0));
1732        let received = run_file_source(
1733            &config,
1734            true,
1735            Acks,
1736            LogNamespace::Legacy,
1737            Some(Arc::clone(&event_count)),
1738            async {
1739                wait_for_atomic_usize_timeout_ms(
1740                    Arc::clone(&event_count),
1741                    |n| n >= remaining,
1742                    5_000,
1743                )
1744                .await;
1745            },
1746        )
1747        .await;
1748        let lines2 = extract_messages_string(received);
1749
1750        // Between both runs, we should have the expected number of lines
1751        assert_eq!(lines.len() + lines2.len(), line_count);
1752    }
1753
1754    #[tokio::test]
1755    async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1756        file_start_position_server_restart_with_file_rotation(Acks).await
1757    }
1758
1759    #[tokio::test]
1760    async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1761        file_start_position_server_restart_with_file_rotation(NoAcks).await
1762    }
1763
1764    async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1765        let dir = tempdir().unwrap();
1766        let config = file::FileConfig {
1767            include: vec![dir.path().join("*")],
1768            ..test_default_file_config(&dir)
1769        };
1770
1771        let path = dir.path().join("file");
1772        let path_for_old_file = dir.path().join("file.old");
1773        // Run server first time, collect some lines.
1774        {
1775            let received =
1776                run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1777                    let mut file = File::create(&path).unwrap();
1778                    writeln!(&mut file, "first line").unwrap();
1779                    file.flush().unwrap();
1780                    sleep_500_millis().await;
1781                })
1782                .await;
1783
1784            let lines = extract_messages_string(received);
1785            assert_eq!(lines, vec!["first line"]);
1786        }
1787        // Perform 'file rotation' to archive old lines.
1788        fs::rename(&path, &path_for_old_file).expect("could not rename");
1789        // Restart the server and make sure it does not re-read the old file
1790        // even though it has a new name.
1791        {
1792            let received =
1793                run_file_source(&config, false, acking, LogNamespace::Legacy, None, async {
1794                    let mut file = File::create(&path).unwrap();
1795                    writeln!(&mut file, "second line").unwrap();
1796                    file.flush().unwrap();
1797                    sleep_500_millis().await;
1798                })
1799                .await;
1800
1801            let lines = extract_messages_string(received);
1802            assert_eq!(lines, vec!["second line"]);
1803        }
1804    }
1805
1806    #[cfg(unix)] // this test uses unix-specific function `futimes` during test time
1807    #[tokio::test]
1808    async fn file_start_position_ignore_old_files() {
1809        use std::{
1810            os::unix::io::AsRawFd,
1811            time::{Duration, SystemTime},
1812        };
1813
1814        let dir = tempdir().unwrap();
1815        let config = file::FileConfig {
1816            include: vec![dir.path().join("*")],
1817            ignore_older_secs: Some(5),
1818            ..test_default_file_config(&dir)
1819        };
1820
1821        let before_path = dir.path().join("before");
1822        let mut before_file = File::create(&before_path).unwrap();
1823        let after_path = dir.path().join("after");
1824        let mut after_file = File::create(&after_path).unwrap();
1825
1826        writeln!(&mut before_file, "first line").unwrap(); // first few bytes make up unique file fingerprint
1827        writeln!(&mut after_file, "_first line").unwrap(); //   and therefore need to be non-identical
1828
1829        {
1830            // Set the modified times
1831            let before = SystemTime::now() - Duration::from_secs(8);
1832            let after = SystemTime::now() - Duration::from_secs(2);
1833
1834            let before_time = libc::timeval {
1835                tv_sec: before
1836                    .duration_since(SystemTime::UNIX_EPOCH)
1837                    .unwrap()
1838                    .as_secs() as _,
1839                tv_usec: 0,
1840            };
1841            let before_times = [before_time, before_time];
1842
1843            let after_time = libc::timeval {
1844                tv_sec: after
1845                    .duration_since(SystemTime::UNIX_EPOCH)
1846                    .unwrap()
1847                    .as_secs() as _,
1848                tv_usec: 0,
1849            };
1850            let after_times = [after_time, after_time];
1851
1852            unsafe {
1853                libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1854                libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1855            }
1856        }
1857
1858        before_file.sync_all().unwrap();
1859        after_file.sync_all().unwrap();
1860
1861        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1862            sleep_500_millis().await;
1863            writeln!(&mut before_file, "second line").unwrap();
1864            writeln!(&mut after_file, "_second line").unwrap();
1865
1866            before_file.flush().unwrap();
1867            after_file.flush().unwrap();
1868            sleep_500_millis().await;
1869        })
1870        .await;
1871
1872        let before_lines = received
1873            .iter()
1874            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1875            .map(|event| {
1876                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1877            })
1878            .collect::<Vec<_>>();
1879        let after_lines = received
1880            .iter()
1881            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1882            .map(|event| {
1883                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1884            })
1885            .collect::<Vec<_>>();
1886        assert_eq!(before_lines, vec!["second line"]);
1887        assert_eq!(after_lines, vec!["_first line", "_second line"]);
1888    }
1889
1890    #[tokio::test]
1891    async fn file_max_line_bytes() {
1892        let dir = tempdir().unwrap();
1893        let config = file::FileConfig {
1894            include: vec![dir.path().join("*")],
1895            max_line_bytes: 10,
1896            ..test_default_file_config(&dir)
1897        };
1898
1899        let path = dir.path().join("file");
1900        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1901            let mut file = File::create(&path).unwrap();
1902
1903            writeln!(&mut file, "short").unwrap();
1904            writeln!(&mut file, "this is too long").unwrap();
1905            writeln!(&mut file, "11 eleven11").unwrap();
1906            let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1907            writeln!(&mut file, "{super_long}").unwrap();
1908            writeln!(&mut file, "exactly 10").unwrap();
1909            writeln!(&mut file, "it can end on a line that's too long").unwrap();
1910
1911            file.flush().unwrap();
1912            sleep_500_millis().await;
1913            sleep_500_millis().await;
1914
1915            writeln!(&mut file, "and then continue").unwrap();
1916            writeln!(&mut file, "last short").unwrap();
1917            file.flush().unwrap();
1918
1919            sleep_500_millis().await;
1920            sleep_500_millis().await;
1921        }).await;
1922
1923        let received = extract_messages_value(received);
1924
1925        assert_eq!(
1926            received,
1927            vec!["short".into(), "exactly 10".into(), "last short".into()]
1928        );
1929    }
1930
1931    #[tokio::test]
1932    async fn test_multi_line_aggregation_legacy() {
1933        let dir = tempdir().unwrap();
1934        let config = file::FileConfig {
1935            include: vec![dir.path().join("*")],
1936            message_start_indicator: Some("INFO".into()),
1937            multi_line_timeout: 25,
1938            ..test_default_file_config(&dir)
1939        };
1940
1941        let path = dir.path().join("file");
1942        let event_count = Arc::new(AtomicUsize::new(0));
1943        let received = run_file_source(
1944            &config,
1945            false,
1946            NoAcks,
1947            LogNamespace::Legacy,
1948            Some(Arc::clone(&event_count)),
1949            async {
1950                let mut file = File::create(&path).unwrap();
1951
1952                // Write all lines through the second "INFO hello". Events 1-4
1953                // are emitted immediately by EndExclude; event 5 ("INFO hello"
1954                // standalone) requires the 25ms timeout to fire.
1955                writeln!(&mut file, "leftover foo").unwrap();
1956                writeln!(&mut file, "INFO hello").unwrap();
1957                writeln!(&mut file, "INFO goodbye").unwrap();
1958                writeln!(&mut file, "part of goodbye").unwrap();
1959                writeln!(&mut file, "INFO hi again").unwrap();
1960                writeln!(&mut file, "and some more").unwrap();
1961                writeln!(&mut file, "INFO hello").unwrap();
1962                file.flush().unwrap();
1963
1964                // Block until event 5 is observed: the timeout fired and
1965                // "INFO hello" was emitted before we write "too slow".
1966                wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 5, 500).await;
1967
1968                writeln!(&mut file, "too slow").unwrap();
1969                writeln!(&mut file, "INFO doesn't have").unwrap();
1970                writeln!(&mut file, "to be INFO in").unwrap();
1971                writeln!(&mut file, "the middle").unwrap();
1972                file.flush().unwrap();
1973
1974                // Wait for events 6 ("too slow") and 7 ("INFO doesn't have")
1975                // before triggering shutdown.
1976                wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 7, 500).await;
1977            },
1978        )
1979        .await;
1980
1981        let received = extract_messages_value(received);
1982
1983        assert_eq!(
1984            received,
1985            vec![
1986                "leftover foo".into(),
1987                "INFO hello".into(),
1988                "INFO goodbye\npart of goodbye".into(),
1989                "INFO hi again\nand some more".into(),
1990                "INFO hello".into(),
1991                "too slow".into(),
1992                "INFO doesn't have".into(),
1993                "to be INFO in\nthe middle".into(),
1994            ]
1995        );
1996    }
1997
1998    #[tokio::test]
1999    async fn test_multi_line_aggregation() {
2000        let dir = tempdir().unwrap();
2001        let config = file::FileConfig {
2002            include: vec![dir.path().join("*")],
2003            multiline: Some(MultilineConfig {
2004                start_pattern: "INFO".to_owned(),
2005                condition_pattern: "INFO".to_owned(),
2006                mode: line_agg::Mode::HaltBefore,
2007                timeout_ms: Duration::from_millis(25),
2008            }),
2009            ..test_default_file_config(&dir)
2010        };
2011
2012        let path = dir.path().join("file");
2013        let event_count = Arc::new(AtomicUsize::new(0));
2014        let received = run_file_source(
2015            &config,
2016            false,
2017            NoAcks,
2018            LogNamespace::Legacy,
2019            Some(Arc::clone(&event_count)),
2020            async {
2021                let mut file = File::create(&path).unwrap();
2022
2023                // Write all lines through the second "INFO hello". Events 1-4
2024                // are emitted immediately by EndExclude; event 5 ("INFO hello"
2025                // standalone) requires the 25ms timeout to fire.
2026                writeln!(&mut file, "leftover foo").unwrap();
2027                writeln!(&mut file, "INFO hello").unwrap();
2028                writeln!(&mut file, "INFO goodbye").unwrap();
2029                writeln!(&mut file, "part of goodbye").unwrap();
2030                writeln!(&mut file, "INFO hi again").unwrap();
2031                writeln!(&mut file, "and some more").unwrap();
2032                writeln!(&mut file, "INFO hello").unwrap();
2033                file.flush().unwrap();
2034
2035                // Block until event 5 is observed: the timeout fired and
2036                // "INFO hello" was emitted before we write "too slow".
2037                wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 5, 500).await;
2038
2039                writeln!(&mut file, "too slow").unwrap();
2040                writeln!(&mut file, "INFO doesn't have").unwrap();
2041                writeln!(&mut file, "to be INFO in").unwrap();
2042                writeln!(&mut file, "the middle").unwrap();
2043                file.flush().unwrap();
2044
2045                // Wait for events 6 ("too slow") and 7 ("INFO doesn't have")
2046                // before triggering shutdown.
2047                wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 7, 500).await;
2048            },
2049        )
2050        .await;
2051
2052        let received = extract_messages_value(received);
2053
2054        assert_eq!(
2055            received,
2056            vec![
2057                "leftover foo".into(),
2058                "INFO hello".into(),
2059                "INFO goodbye\npart of goodbye".into(),
2060                "INFO hi again\nand some more".into(),
2061                "INFO hello".into(),
2062                "too slow".into(),
2063                "INFO doesn't have".into(),
2064                "to be INFO in\nthe middle".into(),
2065            ]
2066        );
2067    }
2068
2069    #[tokio::test]
2070    async fn test_multi_line_checkpointing() {
2071        let dir = tempdir().unwrap();
2072        let config = file::FileConfig {
2073            include: vec![dir.path().join("*")],
2074            offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2075            multiline: Some(MultilineConfig {
2076                start_pattern: "INFO".to_owned(),
2077                condition_pattern: "INFO".to_owned(),
2078                mode: line_agg::Mode::HaltBefore,
2079                timeout_ms: Duration::from_millis(25), // less than 50 in sleep()
2080            }),
2081            ..test_default_file_config(&dir)
2082        };
2083
2084        let path = dir.path().join("file");
2085        let mut file = File::create(&path).unwrap();
2086
2087        writeln!(&mut file, "INFO hello").unwrap();
2088        writeln!(&mut file, "part of hello").unwrap();
2089
2090        file.sync_all().unwrap();
2091
2092        // Read and aggregate existing lines. wait_shutdown=true ensures the
2093        // checkpoint is fully written to disk before the second run reads it.
2094        let received = run_file_source(
2095            &config,
2096            true,
2097            Acks,
2098            LogNamespace::Legacy,
2099            None,
2100            sleep_500_millis(),
2101        )
2102        .await;
2103
2104        assert_eq!(received[0].as_log()["offset"], 0.into());
2105
2106        let lines = extract_messages_string(received);
2107        assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2108
2109        // After restart, we should not see any part of the previously aggregated lines
2110        let received_after_restart =
2111            run_file_source(&config, false, Acks, LogNamespace::Legacy, None, async {
2112                writeln!(&mut file, "INFO goodbye").unwrap();
2113                file.flush().unwrap();
2114                sleep_500_millis().await;
2115            })
2116            .await;
2117        assert_eq!(
2118            received_after_restart[0].as_log()["offset"],
2119            (lines[0].len() + 1).into()
2120        );
2121        let lines = extract_messages_string(received_after_restart);
2122        assert_eq!(lines, vec!["INFO goodbye"]);
2123    }
2124
2125    #[tokio::test]
2126    async fn test_fair_reads() {
2127        let dir = tempdir().unwrap();
2128        let config = file::FileConfig {
2129            include: vec![dir.path().join("*")],
2130            max_read_bytes: 1,
2131            oldest_first: false,
2132            ..test_default_file_config(&dir)
2133        };
2134
2135        let older_path = dir.path().join("z_older_file");
2136        let mut older = File::create(&older_path).unwrap();
2137
2138        writeln!(&mut older, "hello i am the old file").unwrap();
2139        writeln!(&mut older, "i have been around a while").unwrap();
2140        writeln!(&mut older, "you can read newer files at the same time").unwrap();
2141        older.sync_all().unwrap();
2142
2143        let newer_path = dir.path().join("a_newer_file");
2144        let mut newer = File::create(&newer_path).unwrap();
2145
2146        writeln!(&mut newer, "and i am the new file").unwrap();
2147        writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2148        writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2149        newer.sync_all().unwrap();
2150
2151        let received = run_file_source(
2152            &config,
2153            false,
2154            NoAcks,
2155            LogNamespace::Legacy,
2156            None,
2157            sleep_500_millis(),
2158        )
2159        .await;
2160
2161        let received = extract_messages_value(received);
2162
2163        let old_first = vec![
2164            "hello i am the old file".into(),
2165            "and i am the new file".into(),
2166            "i have been around a while".into(),
2167            "this should be interleaved with the old one".into(),
2168            "you can read newer files at the same time".into(),
2169            "which is fine because we want fairness".into(),
2170        ];
2171        let new_first: Vec<_> = old_first
2172            .chunks(2)
2173            .flat_map(|chunk| chunk.iter().rev().cloned().collect::<Vec<_>>())
2174            .collect();
2175
2176        if received[0] == old_first[0] {
2177            assert_eq!(received, old_first);
2178        } else {
2179            assert_eq!(received, new_first);
2180        }
2181    }
2182
2183    #[tokio::test]
2184    async fn test_oldest_first() {
2185        let dir = tempdir().unwrap();
2186        let config = file::FileConfig {
2187            include: vec![dir.path().join("*")],
2188            max_read_bytes: 1,
2189            oldest_first: true,
2190            ..test_default_file_config(&dir)
2191        };
2192
2193        let older_path = dir.path().join("z_older_file");
2194        let mut older = File::create(&older_path).unwrap();
2195        older.sync_all().unwrap();
2196
2197        // Sleep to ensure the creation timestamps are different
2198        sleep_500_millis().await;
2199
2200        let newer_path = dir.path().join("a_newer_file");
2201        let mut newer = File::create(&newer_path).unwrap();
2202        newer.sync_all().unwrap();
2203
2204        writeln!(&mut older, "hello i am the old file").unwrap();
2205        writeln!(&mut older, "i have been around a while").unwrap();
2206        writeln!(&mut older, "you should definitely read all of me first").unwrap();
2207        older.flush().unwrap();
2208
2209        writeln!(&mut newer, "i'm new").unwrap();
2210        writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2211        writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2212        newer.flush().unwrap();
2213
2214        let received = run_file_source(
2215            &config,
2216            false,
2217            NoAcks,
2218            LogNamespace::Legacy,
2219            None,
2220            sleep_500_millis(),
2221        )
2222        .await;
2223
2224        let received = extract_messages_value(received);
2225
2226        assert_eq!(
2227            received,
2228            vec![
2229                "hello i am the old file".into(),
2230                "i have been around a while".into(),
2231                "you should definitely read all of me first".into(),
2232                "i'm new".into(),
2233                "hopefully you read all the old stuff first".into(),
2234                "because otherwise i'm not going to make sense".into(),
2235            ]
2236        );
2237    }
2238
2239    #[tokio::test]
2240    async fn test_split_reads() {
2241        let dir = tempdir().unwrap();
2242        let config = file::FileConfig {
2243            include: vec![dir.path().join("*")],
2244            max_read_bytes: 1,
2245            ..test_default_file_config(&dir)
2246        };
2247
2248        let path = dir.path().join("file");
2249        let mut file = File::create(&path).unwrap();
2250
2251        writeln!(&mut file, "hello i am a normal line").unwrap();
2252        file.sync_all().unwrap();
2253
2254        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2255            sleep_500_millis().await;
2256
2257            write!(&mut file, "i am not a full line").unwrap();
2258
2259            file.flush().unwrap();
2260            // Longer than the EOF timeout
2261            sleep_500_millis().await;
2262
2263            writeln!(&mut file, " until now").unwrap();
2264
2265            file.flush().unwrap();
2266            sleep_500_millis().await;
2267        })
2268        .await;
2269
2270        let received = extract_messages_value(received);
2271
2272        assert_eq!(
2273            received,
2274            vec![
2275                "hello i am a normal line".into(),
2276                "i am not a full line until now".into(),
2277            ]
2278        );
2279    }
2280
2281    #[tokio::test]
2282    async fn test_gzipped_file() {
2283        let dir = tempdir().unwrap();
2284        let config = file::FileConfig {
2285            include: vec![PathBuf::from("tests/data/gzipped.log")],
2286            // TODO: remove this once files are fingerprinted after decompression
2287            //
2288            // Currently, this needs to be smaller than the total size of the compressed file
2289            // because the fingerprinter tries to read until a newline, which it's not going to see
2290            // in the compressed data, or this number of bytes. If it hits EOF before that, it
2291            // can't return a fingerprint because the value would change once more data is written.
2292            max_line_bytes: 100,
2293            ..test_default_file_config(&dir)
2294        };
2295
2296        let received = run_file_source(
2297            &config,
2298            false,
2299            NoAcks,
2300            LogNamespace::Legacy,
2301            None,
2302            sleep_500_millis(),
2303        )
2304        .await;
2305
2306        let received = extract_messages_value(received);
2307
2308        assert_eq!(
2309            received,
2310            vec![
2311                "this is a simple file".into(),
2312                "i have been compressed".into(),
2313                "in order to make me smaller".into(),
2314                "but you can still read me".into(),
2315                "hooray".into(),
2316            ]
2317        );
2318    }
2319
2320    #[tokio::test]
2321    async fn test_non_utf8_encoded_file() {
2322        let dir = tempdir().unwrap();
2323        let config = file::FileConfig {
2324            include: vec![PathBuf::from("tests/data/utf-16le.log")],
2325            encoding: Some(EncodingConfig { charset: UTF_16LE }),
2326            ..test_default_file_config(&dir)
2327        };
2328
2329        let received = run_file_source(
2330            &config,
2331            false,
2332            NoAcks,
2333            LogNamespace::Legacy,
2334            None,
2335            sleep_500_millis(),
2336        )
2337        .await;
2338
2339        let received = extract_messages_value(received);
2340
2341        assert_eq!(
2342            received,
2343            vec![
2344                "hello i am a file".into(),
2345                "i can unicode".into(),
2346                "but i do so in 16 bits".into(),
2347                "and when i byte".into(),
2348                "i become little-endian".into(),
2349            ]
2350        );
2351    }
2352
2353    #[tokio::test]
2354    async fn test_non_default_line_delimiter() {
2355        let dir = tempdir().unwrap();
2356        let config = file::FileConfig {
2357            include: vec![dir.path().join("*")],
2358            line_delimiter: "\r\n".to_string(),
2359            ..test_default_file_config(&dir)
2360        };
2361
2362        let path = dir.path().join("file");
2363        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2364            let mut file = File::create(&path).unwrap();
2365
2366            write!(&mut file, "hello i am a line\r\n").unwrap();
2367            write!(&mut file, "and i am too\r\n").unwrap();
2368            write!(&mut file, "CRLF is how we end\r\n").unwrap();
2369            write!(&mut file, "please treat us well\r\n").unwrap();
2370
2371            file.flush().unwrap();
2372            sleep_500_millis().await;
2373        })
2374        .await;
2375
2376        let received = extract_messages_value(received);
2377
2378        assert_eq!(
2379            received,
2380            vec![
2381                "hello i am a line".into(),
2382                "and i am too".into(),
2383                "CRLF is how we end".into(),
2384                "please treat us well".into()
2385            ]
2386        );
2387    }
2388
2389    // Regression test for https://github.com/vectordotdev/vector/issues/24027
2390    // Tests that multi-character delimiters (like \r\n) are correctly handled when
2391    // split across buffer boundaries. Without the fix, events would be merged together.
2392    #[tokio::test]
2393    async fn test_multi_char_delimiter_split_across_buffer_boundary() {
2394        let dir = tempdir().unwrap();
2395        let config = file::FileConfig {
2396            include: vec![dir.path().join("*")],
2397            line_delimiter: "\r\n".to_string(),
2398            ..test_default_file_config(&dir)
2399        };
2400
2401        let path = dir.path().join("file");
2402        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2403            let mut file = File::create(&path).unwrap();
2404
2405            sleep_500_millis().await;
2406
2407            // Create data where \r\n is split at 8KB buffer boundary
2408            // This reproduces the exact scenario that caused data corruption:
2409            // - Event 1 ends with \r at byte 8191
2410            // - The \n appears at byte 8192 (right at the buffer boundary)
2411            // - Without the fix, Event 1 and Event 2 would be merged
2412
2413            let buffer_size = 8192;
2414
2415            // Event 1: Position \r\n to split at first boundary
2416            let event1_prefix = "Event 1: ";
2417            let padding1_len = buffer_size - event1_prefix.len() - 1; // -1 for the \r
2418            write!(&mut file, "{}", event1_prefix).unwrap();
2419            file.write_all(&vec![b'X'; padding1_len]).unwrap();
2420            write!(&mut file, "\r\n").unwrap(); // \r at byte 8191, \n at byte 8192
2421
2422            // Event 2: Position \r\n to split at second boundary
2423            let event2_prefix = "Event 2: ";
2424            let padding2_len = buffer_size - event2_prefix.len() - 1;
2425            write!(&mut file, "{}", event2_prefix).unwrap();
2426            file.write_all(&vec![b'Y'; padding2_len]).unwrap();
2427            write!(&mut file, "\r\n").unwrap(); // \r at byte 16383, \n at byte 16384
2428
2429            // Event 3: Normal line without boundary split
2430            write!(&mut file, "Event 3: Final\r\n").unwrap();
2431
2432            sleep_500_millis().await;
2433        })
2434        .await;
2435
2436        let messages = extract_messages_value(received);
2437
2438        // The bug would cause Events 1 and 2 to be merged into a single message
2439        assert_eq!(
2440            messages.len(),
2441            3,
2442            "Should receive exactly 3 separate events (bug would merge them)"
2443        );
2444
2445        // Verify each event is correctly separated and starts with expected prefix
2446        let msg0 = messages[0].to_string_lossy();
2447        let msg1 = messages[1].to_string_lossy();
2448        let msg2 = messages[2].to_string_lossy();
2449
2450        assert!(
2451            msg0.starts_with("Event 1: "),
2452            "First event should start with 'Event 1: ', got: {}",
2453            msg0
2454        );
2455        assert!(
2456            msg1.starts_with("Event 2: "),
2457            "Second event should start with 'Event 2: ', got: {}",
2458            msg1
2459        );
2460        assert_eq!(msg2, "Event 3: Final");
2461
2462        // Ensure no event contains embedded CR/LF (sign of incorrect merging)
2463        for (i, msg) in messages.iter().enumerate() {
2464            let msg_str = msg.to_string_lossy();
2465            assert!(
2466                !msg_str.contains('\r'),
2467                "Event {} should not contain embedded \\r",
2468                i
2469            );
2470            assert!(
2471                !msg_str.contains('\n'),
2472                "Event {} should not contain embedded \\n",
2473                i
2474            );
2475        }
2476    }
2477
2478    #[tokio::test]
2479    async fn remove_file() {
2480        let n = 5;
2481        let remove_after_secs = 1;
2482
2483        let dir = tempdir().unwrap();
2484        let config = file::FileConfig {
2485            include: vec![dir.path().join("*")],
2486            remove_after_secs: Some(remove_after_secs),
2487            ..test_default_file_config(&dir)
2488        };
2489
2490        let path = dir.path().join("file");
2491        let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, None, async {
2492            let mut file = File::create(&path).unwrap();
2493
2494            for i in 0..n {
2495                writeln!(&mut file, "{i}").unwrap();
2496            }
2497            file.flush().unwrap();
2498            drop(file);
2499
2500            for _ in 0..10 {
2501                // Wait for remove grace period to end.
2502                sleep(Duration::from_secs(remove_after_secs + 1)).await;
2503
2504                if File::open(&path).is_err() {
2505                    break;
2506                }
2507            }
2508        })
2509        .await;
2510
2511        assert_eq!(received.len(), n);
2512
2513        match File::open(&path) {
2514            Ok(_) => panic!("File wasn't removed"),
2515            Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2516        }
2517    }
2518
2519    #[derive(Clone, Copy, Eq, PartialEq)]
2520    enum AckingMode {
2521        NoAcks,      // No acknowledgement handling and no finalization
2522        Unfinalized, // Acknowledgement handling but no finalization
2523        Acks,        // Full acknowledgements and proper finalization
2524    }
2525    use AckingMode::*;
2526    use vector_lib::lookup::OwnedTargetPath;
2527
2528    async fn run_file_source(
2529        config: &FileConfig,
2530        wait_shutdown: bool,
2531        acking_mode: AckingMode,
2532        log_namespace: LogNamespace,
2533        // When `Some`, events are relayed through an unbounded channel and the
2534        // counter is incremented for each event received.  The inner future can
2535        // call `wait_for_atomic_usize` on this counter to gate writes on
2536        // observed events instead of relying on wall-clock sleeps.
2537        event_counter: Option<Arc<AtomicUsize>>,
2538        inner: impl Future<Output = ()>,
2539    ) -> Vec<Event> {
2540        assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2541            let (tx, rx) = match acking_mode {
2542                Acks => {
2543                    let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2544                    (tx, rx.boxed())
2545                }
2546                Unfinalized => {
2547                    // Use Rejected so that events are finalized but checkpoints
2548                    // are NOT updated (only Delivered triggers checkpoint updates).
2549                    // This avoids a race where the default Delivered status on drop
2550                    // could leak checkpoint writes into the next run.
2551                    let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Rejected);
2552                    (tx, rx.boxed())
2553                }
2554                NoAcks => {
2555                    let (tx, rx) = SourceSender::new_test();
2556                    (tx, rx.boxed())
2557                }
2558            };
2559
2560            let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2561            let data_dir = config.data_dir.clone().unwrap();
2562            let acks = !matches!(acking_mode, NoAcks);
2563
2564            tokio::spawn(file::file_source(
2565                config,
2566                data_dir,
2567                shutdown,
2568                tx,
2569                acks,
2570                log_namespace,
2571            ));
2572
2573            let result = if let Some(counter) = event_counter {
2574                // Relay mode: a background task forwards events and increments
2575                // the counter so `inner` can observe them without arbitrary sleeps.
2576                let (relay_tx, mut relay_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
2577                tokio::spawn(async move {
2578                    let mut rx = rx;
2579                    while let Some(event) = rx.next().await {
2580                        counter.fetch_add(1, Ordering::SeqCst);
2581                        relay_tx.send(event).ok(); // receiver gone means pipeline is shutting down
2582                    }
2583                });
2584
2585                inner.await;
2586                drop(trigger_shutdown);
2587
2588                timeout(Duration::from_secs(5), async move {
2589                    let mut events = Vec::new();
2590                    while let Some(event) = relay_rx.recv().await {
2591                        events.push(event);
2592                    }
2593                    events
2594                })
2595                .await
2596                .expect("Unclosed channel: may indicate file-server could not shutdown gracefully.")
2597            } else {
2598                inner.await;
2599                drop(trigger_shutdown);
2600
2601                if acking_mode == Unfinalized {
2602                    rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2603                        .collect::<Vec<_>>()
2604                        .await
2605                } else {
2606                    timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2607                        .await
2608                        .expect(
2609                            "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2610                        )
2611                }
2612            };
2613
2614            if wait_shutdown {
2615                shutdown_done.await;
2616            }
2617
2618            result
2619        })
2620        .await
2621    }
2622
2623    fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2624        received
2625            .into_iter()
2626            .map(Event::into_log)
2627            .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2628            .collect()
2629    }
2630
2631    fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2632        received
2633            .into_iter()
2634            .map(Event::into_log)
2635            .map(|log| log.get_message().unwrap().clone())
2636            .collect()
2637    }
2638}