1#![deny(missing_docs)]
7use std::{cmp::min, path::PathBuf, time::Duration};
8
9use bytes::Bytes;
10use chrono::Utc;
11use futures::{future::FutureExt, stream::StreamExt};
12use futures_util::Stream;
13use http_1::{HeaderName, HeaderValue};
14use k8s_openapi::api::core::v1::{Namespace, Node, Pod};
15use k8s_paths_provider::K8sPathsProvider;
16use kube::{
17 Client, Config as ClientConfig,
18 api::Api,
19 config::{self, KubeConfigOptions},
20 runtime::{WatchStreamExt, reflector, watcher},
21};
22use lifecycle::Lifecycle;
23use serde_with::serde_as;
24use vector_lib::{
25 EstimatedJsonEncodedSizeOf, TimeZone,
26 codecs::{BytesDeserializer, BytesDeserializerConfig},
27 config::{LegacyKey, LogNamespace},
28 configurable::configurable_component,
29 file_source::file_server::{
30 FileServer, Line, Shutdown as FileServerShutdown, calculate_ignore_before,
31 },
32 file_source_common::{
33 Checkpointer, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
34 },
35 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
36 lookup::{OwnedTargetPath, lookup_v2::OptionalTargetPath, owned_value_path, path},
37};
38use vrl::value::{Kind, kind::Collection};
39
40use crate::{
41 SourceSender,
42 built_info::{PKG_NAME, PKG_VERSION},
43 config::{
44 ComponentKey, DataType, GenerateConfig, GlobalOptions, SourceConfig, SourceContext,
45 SourceOutput, log_schema,
46 },
47 event::Event,
48 internal_events::{
49 FileInternalMetricsConfig, FileSourceInternalEventsEmitter, KubernetesLifecycleError,
50 KubernetesLogsEventAnnotationError, KubernetesLogsEventNamespaceAnnotationError,
51 KubernetesLogsEventNodeAnnotationError, KubernetesLogsEventsReceived,
52 KubernetesLogsPodInfo, StreamClosedError,
53 },
54 kubernetes::{custom_reflector, meta_cache::MetaCache},
55 shutdown::ShutdownSignal,
56 sources,
57 sources::kubernetes_logs::partial_events_merger::merge_partial_events,
58 transforms::{FunctionTransform, OutputBuffer},
59};
60
61mod k8s_paths_provider;
62mod lifecycle;
63mod namespace_metadata_annotator;
64mod node_metadata_annotator;
65mod parser;
66mod partial_events_merger;
67mod path_helpers;
68mod pod_metadata_annotator;
69mod transform_utils;
70mod util;
71
72use self::{
73 namespace_metadata_annotator::NamespaceMetadataAnnotator,
74 node_metadata_annotator::NodeMetadataAnnotator, parser::Parser,
75 pod_metadata_annotator::PodMetadataAnnotator,
76};
77
78const SELF_NODE_NAME_ENV_KEY: &str = "VECTOR_SELF_NODE_NAME";
80
81#[serde_as]
83#[configurable_component(source("kubernetes_logs", "Collect Pod logs from Kubernetes Nodes."))]
84#[derive(Clone, Debug)]
85#[serde(deny_unknown_fields, default)]
86pub struct Config {
87 #[configurable(metadata(docs::examples = "my_custom_label!=my_value"))]
94 #[configurable(metadata(
95 docs::examples = "my_custom_label!=my_value,my_other_custom_label=my_value"
96 ))]
97 extra_label_selector: String,
98
99 #[configurable(metadata(docs::examples = "my_custom_label!=my_value"))]
106 #[configurable(metadata(
107 docs::examples = "my_custom_label!=my_value,my_other_custom_label=my_value"
108 ))]
109 extra_namespace_label_selector: String,
110
111 #[serde(default = "default_insert_namespace_fields")]
118 insert_namespace_fields: bool,
119
120 self_node_name: String,
127
128 #[configurable(metadata(docs::examples = "metadata.name!=pod-name-to-exclude"))]
136 #[configurable(metadata(
137 docs::examples = "metadata.name!=pod-name-to-exclude,metadata.name=mypod"
138 ))]
139 extra_field_selector: String,
140
141 auto_partial_merge: bool,
146
147 #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
156 #[configurable(metadata(docs::human_name = "Data Directory"))]
157 data_dir: Option<PathBuf>,
158
159 #[configurable(derived)]
160 #[serde(alias = "annotation_fields")]
161 pod_annotation_fields: pod_metadata_annotator::FieldsSpec,
162
163 #[configurable(derived)]
164 namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec,
165
166 #[configurable(derived)]
167 node_annotation_fields: node_metadata_annotator::FieldsSpec,
168
169 #[configurable(metadata(docs::examples = "**/include/**"))]
171 include_paths_glob_patterns: Vec<PathBuf>,
172
173 #[configurable(metadata(docs::examples = "**/exclude/**"))]
175 exclude_paths_glob_patterns: Vec<PathBuf>,
176
177 #[configurable(derived)]
178 #[serde(default = "default_read_from")]
179 read_from: ReadFromConfig,
180
181 #[serde(default)]
183 #[configurable(metadata(docs::type_unit = "seconds"))]
184 #[configurable(metadata(docs::examples = 600))]
185 #[configurable(metadata(docs::human_name = "Ignore Files Older Than"))]
186 ignore_older_secs: Option<u64>,
187
188 #[configurable(metadata(docs::type_unit = "bytes"))]
194 max_read_bytes: usize,
195
196 #[serde(default = "default_oldest_first")]
198 pub oldest_first: bool,
199
200 #[configurable(metadata(docs::type_unit = "bytes"))]
204 max_line_bytes: usize,
205
206 #[configurable(metadata(docs::type_unit = "bytes"))]
214 max_merged_line_bytes: Option<usize>,
215
216 #[configurable(metadata(docs::type_unit = "lines"))]
222 fingerprint_lines: usize,
223
224 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
231 #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
232 glob_minimum_cooldown_ms: Duration,
233
234 #[configurable(metadata(docs::examples = ".ingest_timestamp", docs::examples = "ingest_ts"))]
240 ingestion_timestamp_field: Option<OptionalTargetPath>,
241
242 timezone: Option<TimeZone>,
244
245 #[configurable(metadata(docs::examples = "/path/to/.kube/config"))]
251 kube_config_file: Option<PathBuf>,
252
253 use_apiserver_cache: bool,
255
256 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
263 #[configurable(metadata(docs::human_name = "Delay Deletion"))]
264 delay_deletion_ms: Duration,
265
266 #[configurable(metadata(docs::hidden))]
268 #[serde(default)]
269 log_namespace: Option<bool>,
270
271 #[configurable(derived)]
272 #[serde(default)]
273 internal_metrics: FileInternalMetricsConfig,
274
275 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
278 #[configurable(metadata(docs::type_unit = "seconds"))]
279 #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
280 rotate_wait: Duration,
281}
282
283const fn default_read_from() -> ReadFromConfig {
284 ReadFromConfig::Beginning
285}
286
287impl GenerateConfig for Config {
288 fn generate_config() -> toml::Value {
289 toml::Value::try_from(Self {
290 self_node_name: default_self_node_name_env_template(),
291 auto_partial_merge: true,
292 ..Default::default()
293 })
294 .unwrap()
295 }
296}
297
298impl Default for Config {
299 fn default() -> Self {
300 Self {
301 extra_label_selector: "".to_string(),
302 extra_namespace_label_selector: "".to_string(),
303 insert_namespace_fields: true,
304 self_node_name: default_self_node_name_env_template(),
305 extra_field_selector: "".to_string(),
306 auto_partial_merge: true,
307 data_dir: None,
308 pod_annotation_fields: pod_metadata_annotator::FieldsSpec::default(),
309 namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec::default(),
310 node_annotation_fields: node_metadata_annotator::FieldsSpec::default(),
311 include_paths_glob_patterns: default_path_inclusion(),
312 exclude_paths_glob_patterns: default_path_exclusion(),
313 read_from: default_read_from(),
314 ignore_older_secs: None,
315 max_read_bytes: default_max_read_bytes(),
316 oldest_first: default_oldest_first(),
317 max_line_bytes: default_max_line_bytes(),
318 max_merged_line_bytes: None,
319 fingerprint_lines: default_fingerprint_lines(),
320 glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
321 ingestion_timestamp_field: None,
322 timezone: None,
323 kube_config_file: None,
324 use_apiserver_cache: false,
325 delay_deletion_ms: default_delay_deletion_ms(),
326 log_namespace: None,
327 internal_metrics: Default::default(),
328 rotate_wait: default_rotate_wait(),
329 }
330 }
331}
332
333#[async_trait::async_trait]
334#[typetag::serde(name = "kubernetes_logs")]
335impl SourceConfig for Config {
336 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
337 let log_namespace = cx.log_namespace(self.log_namespace);
338 let source = Source::new(self, &cx.globals, &cx.key).await?;
339
340 Ok(Box::pin(
341 source
342 .run(cx.out, cx.shutdown, log_namespace)
343 .map(|result| {
344 result.map_err(|error| {
345 error!(message = "Source future failed.", %error);
346 })
347 }),
348 ))
349 }
350
351 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
352 let log_namespace = global_log_namespace.merge(self.log_namespace);
353 let schema_definition = BytesDeserializerConfig
354 .schema_definition(log_namespace)
355 .with_source_metadata(
356 Self::NAME,
357 Some(LegacyKey::Overwrite(owned_value_path!("file"))),
358 &owned_value_path!("file"),
359 Kind::bytes(),
360 None,
361 )
362 .with_source_metadata(
363 Self::NAME,
364 self.pod_annotation_fields
365 .container_id
366 .path
367 .clone()
368 .map(|k| k.path)
369 .map(LegacyKey::Overwrite),
370 &owned_value_path!("container_id"),
371 Kind::bytes().or_undefined(),
372 None,
373 )
374 .with_source_metadata(
375 Self::NAME,
376 self.pod_annotation_fields
377 .container_image
378 .path
379 .clone()
380 .map(|k| k.path)
381 .map(LegacyKey::Overwrite),
382 &owned_value_path!("container_image"),
383 Kind::bytes().or_undefined(),
384 None,
385 )
386 .with_source_metadata(
387 Self::NAME,
388 self.pod_annotation_fields
389 .container_name
390 .path
391 .clone()
392 .map(|k| k.path)
393 .map(LegacyKey::Overwrite),
394 &owned_value_path!("container_name"),
395 Kind::bytes().or_undefined(),
396 None,
397 )
398 .with_source_metadata(
399 Self::NAME,
400 self.namespace_annotation_fields
401 .namespace_labels
402 .path
403 .clone()
404 .map(|x| LegacyKey::Overwrite(x.path)),
405 &owned_value_path!("namespace_labels"),
406 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
407 None,
408 )
409 .with_source_metadata(
410 Self::NAME,
411 self.node_annotation_fields
412 .node_labels
413 .path
414 .clone()
415 .map(|x| LegacyKey::Overwrite(x.path)),
416 &owned_value_path!("node_labels"),
417 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
418 None,
419 )
420 .with_source_metadata(
421 Self::NAME,
422 self.pod_annotation_fields
423 .pod_annotations
424 .path
425 .clone()
426 .map(|k| k.path)
427 .map(LegacyKey::Overwrite),
428 &owned_value_path!("pod_annotations"),
429 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
430 None,
431 )
432 .with_source_metadata(
433 Self::NAME,
434 self.pod_annotation_fields
435 .pod_ip
436 .path
437 .clone()
438 .map(|k| k.path)
439 .map(LegacyKey::Overwrite),
440 &owned_value_path!("pod_ip"),
441 Kind::bytes().or_undefined(),
442 None,
443 )
444 .with_source_metadata(
445 Self::NAME,
446 self.pod_annotation_fields
447 .pod_ips
448 .path
449 .clone()
450 .map(|k| k.path)
451 .map(LegacyKey::Overwrite),
452 &owned_value_path!("pod_ips"),
453 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
454 None,
455 )
456 .with_source_metadata(
457 Self::NAME,
458 self.pod_annotation_fields
459 .pod_labels
460 .path
461 .clone()
462 .map(|k| k.path)
463 .map(LegacyKey::Overwrite),
464 &owned_value_path!("pod_labels"),
465 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
466 None,
467 )
468 .with_source_metadata(
469 Self::NAME,
470 self.pod_annotation_fields
471 .pod_name
472 .path
473 .clone()
474 .map(|k| k.path)
475 .map(LegacyKey::Overwrite),
476 &owned_value_path!("pod_name"),
477 Kind::bytes().or_undefined(),
478 None,
479 )
480 .with_source_metadata(
481 Self::NAME,
482 self.pod_annotation_fields
483 .pod_namespace
484 .path
485 .clone()
486 .map(|k| k.path)
487 .map(LegacyKey::Overwrite),
488 &owned_value_path!("pod_namespace"),
489 Kind::bytes().or_undefined(),
490 None,
491 )
492 .with_source_metadata(
493 Self::NAME,
494 self.pod_annotation_fields
495 .pod_node_name
496 .path
497 .clone()
498 .map(|k| k.path)
499 .map(LegacyKey::Overwrite),
500 &owned_value_path!("pod_node_name"),
501 Kind::bytes().or_undefined(),
502 None,
503 )
504 .with_source_metadata(
505 Self::NAME,
506 self.pod_annotation_fields
507 .pod_owner
508 .path
509 .clone()
510 .map(|k| k.path)
511 .map(LegacyKey::Overwrite),
512 &owned_value_path!("pod_owner"),
513 Kind::bytes().or_undefined(),
514 None,
515 )
516 .with_source_metadata(
517 Self::NAME,
518 self.pod_annotation_fields
519 .pod_uid
520 .path
521 .clone()
522 .map(|k| k.path)
523 .map(LegacyKey::Overwrite),
524 &owned_value_path!("pod_uid"),
525 Kind::bytes().or_undefined(),
526 None,
527 )
528 .with_source_metadata(
529 Self::NAME,
530 Some(LegacyKey::Overwrite(owned_value_path!("stream"))),
531 &owned_value_path!("stream"),
532 Kind::bytes(),
533 None,
534 )
535 .with_source_metadata(
536 Self::NAME,
537 log_schema()
538 .timestamp_key()
539 .cloned()
540 .map(LegacyKey::Overwrite),
541 &owned_value_path!("timestamp"),
542 Kind::timestamp(),
543 Some("timestamp"),
544 )
545 .with_standard_vector_source_metadata();
546
547 vec![SourceOutput::new_maybe_logs(
548 DataType::Log,
549 schema_definition,
550 )]
551 }
552
553 fn can_acknowledge(&self) -> bool {
554 false
555 }
556}
557
558#[derive(Clone)]
559struct Source {
560 client: Client,
561 data_dir: PathBuf,
562 auto_partial_merge: bool,
563 pod_fields_spec: pod_metadata_annotator::FieldsSpec,
564 namespace_fields_spec: namespace_metadata_annotator::FieldsSpec,
565 node_field_spec: node_metadata_annotator::FieldsSpec,
566 field_selector: String,
567 label_selector: String,
568 namespace_label_selector: String,
569 insert_namespace_fields: bool,
570 node_selector: String,
571 self_node_name: String,
572 include_paths: Vec<glob::Pattern>,
573 exclude_paths: Vec<glob::Pattern>,
574 read_from: ReadFrom,
575 ignore_older_secs: Option<u64>,
576 max_read_bytes: usize,
577 oldest_first: bool,
578 max_line_bytes: usize,
579 max_merged_line_bytes: Option<usize>,
580 fingerprint_lines: usize,
581 glob_minimum_cooldown: Duration,
582 use_apiserver_cache: bool,
583 ingestion_timestamp_field: Option<OwnedTargetPath>,
584 delay_deletion: Duration,
585 include_file_metric_tag: bool,
586 rotate_wait: Duration,
587}
588
589impl Source {
590 async fn new(
591 config: &Config,
592 globals: &GlobalOptions,
593 key: &ComponentKey,
594 ) -> crate::Result<Self> {
595 let self_node_name = if config.self_node_name.is_empty()
596 || config.self_node_name == default_self_node_name_env_template()
597 {
598 std::env::var(SELF_NODE_NAME_ENV_KEY).map_err(|_| {
599 format!(
600 "self_node_name config value or {SELF_NODE_NAME_ENV_KEY} env var is not set"
601 )
602 })?
603 } else {
604 config.self_node_name.clone()
605 };
606
607 let field_selector = prepare_field_selector(config, self_node_name.as_str())?;
608 let label_selector = prepare_label_selector(config.extra_label_selector.as_ref());
609 let namespace_label_selector =
610 prepare_label_selector(config.extra_namespace_label_selector.as_ref());
611 let node_selector = prepare_node_selector(self_node_name.as_str())?;
612
613 let mut client_config = match &config.kube_config_file {
617 Some(kc) => {
618 ClientConfig::from_custom_kubeconfig(
619 config::Kubeconfig::read_from(kc)?,
620 &KubeConfigOptions::default(),
621 )
622 .await?
623 }
624 None => ClientConfig::infer().await?,
625 };
626 if let Ok(user_agent) = HeaderValue::from_str(&format!("{PKG_NAME}/{PKG_VERSION}")) {
627 client_config
628 .headers
629 .push((HeaderName::from_static("user-agent"), user_agent));
630 }
631 let client = Client::try_from(client_config)?;
632
633 let data_dir = globals.resolve_and_make_data_subdir(config.data_dir.as_ref(), key.id())?;
634
635 let include_paths = prepare_include_paths(config)?;
636
637 let exclude_paths = prepare_exclude_paths(config)?;
638
639 let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
640
641 let delay_deletion = config.delay_deletion_ms;
642
643 let ingestion_timestamp_field = config
644 .ingestion_timestamp_field
645 .clone()
646 .and_then(|k| k.path);
647
648 Ok(Self {
649 client,
650 data_dir,
651 auto_partial_merge: config.auto_partial_merge,
652 pod_fields_spec: config.pod_annotation_fields.clone(),
653 namespace_fields_spec: config.namespace_annotation_fields.clone(),
654 node_field_spec: config.node_annotation_fields.clone(),
655 field_selector,
656 label_selector,
657 namespace_label_selector,
658 insert_namespace_fields: config.insert_namespace_fields,
659 node_selector,
660 self_node_name,
661 include_paths,
662 exclude_paths,
663 read_from: ReadFrom::from(config.read_from),
664 ignore_older_secs: config.ignore_older_secs,
665 max_read_bytes: config.max_read_bytes,
666 oldest_first: config.oldest_first,
667 max_line_bytes: config.max_line_bytes,
668 max_merged_line_bytes: config.max_merged_line_bytes,
669 fingerprint_lines: config.fingerprint_lines,
670 glob_minimum_cooldown,
671 use_apiserver_cache: config.use_apiserver_cache,
672 ingestion_timestamp_field,
673 delay_deletion,
674 include_file_metric_tag: config.internal_metrics.include_file_tag,
675 rotate_wait: config.rotate_wait,
676 })
677 }
678
679 async fn run(
680 self,
681 mut out: SourceSender,
682 global_shutdown: ShutdownSignal,
683 log_namespace: LogNamespace,
684 ) -> crate::Result<()> {
685 let Self {
686 client,
687 data_dir,
688 auto_partial_merge,
689 pod_fields_spec,
690 namespace_fields_spec,
691 node_field_spec,
692 field_selector,
693 label_selector,
694 namespace_label_selector,
695 insert_namespace_fields,
696 node_selector,
697 self_node_name,
698 include_paths,
699 exclude_paths,
700 read_from,
701 ignore_older_secs,
702 max_read_bytes,
703 oldest_first,
704 max_line_bytes,
705 max_merged_line_bytes,
706 fingerprint_lines,
707 glob_minimum_cooldown,
708 use_apiserver_cache,
709 ingestion_timestamp_field,
710 delay_deletion,
711 include_file_metric_tag,
712 rotate_wait,
713 } = self;
714
715 let mut reflectors = Vec::new();
716
717 let pods = Api::<Pod>::all(client.clone());
718
719 let list_semantic = if use_apiserver_cache {
720 watcher::ListSemantic::Any
721 } else {
722 watcher::ListSemantic::MostRecent
723 };
724
725 let pod_watcher = watcher(
726 pods,
727 watcher::Config {
728 field_selector: Some(field_selector),
729 label_selector: Some(label_selector),
730 list_semantic: list_semantic.clone(),
731 page_size: get_page_size(use_apiserver_cache),
732 ..Default::default()
733 },
734 )
735 .backoff(watcher::DefaultBackoff::default());
736
737 let pod_store_w = reflector::store::Writer::default();
738 let pod_state = pod_store_w.as_reader();
739 let pod_cacher = MetaCache::new();
740
741 reflectors.push(crate::spawn_in_current_span(custom_reflector(
742 pod_store_w,
743 pod_cacher,
744 pod_watcher,
745 delay_deletion,
746 )));
747
748 let ns_store_w = reflector::store::Writer::default();
751 let ns_state = ns_store_w.as_reader();
752 if insert_namespace_fields {
753 let namespaces = Api::<Namespace>::all(client.clone());
754 let ns_watcher = watcher(
755 namespaces,
756 watcher::Config {
757 label_selector: Some(namespace_label_selector),
758 list_semantic: list_semantic.clone(),
759 page_size: get_page_size(use_apiserver_cache),
760 ..Default::default()
761 },
762 )
763 .backoff(watcher::DefaultBackoff::default());
764
765 reflectors.push(crate::spawn_in_current_span(custom_reflector(
766 ns_store_w,
767 MetaCache::new(),
768 ns_watcher,
769 delay_deletion,
770 )));
771 }
772
773 let nodes = Api::<Node>::all(client);
776 let node_watcher = watcher(
777 nodes,
778 watcher::Config {
779 field_selector: Some(node_selector),
780 list_semantic,
781 page_size: get_page_size(use_apiserver_cache),
782 ..Default::default()
783 },
784 )
785 .backoff(watcher::DefaultBackoff::default());
786 let node_store_w = reflector::store::Writer::default();
787 let node_state = node_store_w.as_reader();
788 let node_cacher = MetaCache::new();
789
790 reflectors.push(crate::spawn_in_current_span(custom_reflector(
791 node_store_w,
792 node_cacher,
793 node_watcher,
794 delay_deletion,
795 )));
796
797 let paths_provider = K8sPathsProvider::new(
798 pod_state.clone(),
799 ns_state.clone(),
800 include_paths,
801 exclude_paths,
802 insert_namespace_fields,
803 );
804 let annotator = PodMetadataAnnotator::new(pod_state, pod_fields_spec, log_namespace);
805 let ns_annotator =
806 NamespaceMetadataAnnotator::new(ns_state, namespace_fields_spec, log_namespace);
807 let node_annotator = NodeMetadataAnnotator::new(node_state, node_field_spec, log_namespace);
808
809 let ignore_before = calculate_ignore_before(ignore_older_secs);
810
811 let mut resolved_max_line_bytes = max_line_bytes;
812 if auto_partial_merge {
813 resolved_max_line_bytes = min(
814 max_line_bytes,
815 max_merged_line_bytes.unwrap_or(max_line_bytes),
816 );
817 }
818
819 let checkpointer = Checkpointer::new(&data_dir);
822 let file_server = FileServer {
823 paths_provider,
825 max_read_bytes,
830 ignore_checkpoints: false,
833 read_from,
835 ignore_before,
841 max_line_bytes: resolved_max_line_bytes,
844 line_delimiter: Bytes::from("\n"),
846 data_dir,
848 glob_minimum_cooldown,
851 fingerprinter: Fingerprinter::new(
855 FingerprintStrategy::FirstLinesChecksum {
856 ignored_header_bytes: 0,
859 lines: fingerprint_lines,
860 },
861 resolved_max_line_bytes,
862 true,
863 ),
864 oldest_first,
865 remove_after: None,
867 emitter: FileSourceInternalEventsEmitter {
869 include_file_metric_tag,
870 },
871 rotate_wait,
873 };
874
875 let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
876
877 let checkpoints = checkpointer.view();
878 let events = file_source_rx.flat_map(futures::stream::iter);
879 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
880 let events = events.map(move |line| {
881 let byte_size = line.text.len();
882 bytes_received.emit(ByteSize(byte_size));
883
884 let mut event = create_event(
885 line.text,
886 &line.filename,
887 ingestion_timestamp_field.as_ref(),
888 log_namespace,
889 );
890
891 let file_info = annotator.annotate(&mut event, &line.filename);
892
893 emit!(KubernetesLogsEventsReceived {
894 file: &line.filename,
895 byte_size: event.estimated_json_encoded_size_of(),
896 pod_info: file_info.as_ref().map(|info| KubernetesLogsPodInfo {
897 name: info.pod_name.to_owned(),
898 namespace: info.pod_namespace.to_owned(),
899 }),
900 });
901
902 if file_info.is_none() {
903 emit!(KubernetesLogsEventAnnotationError { event: &event });
904 } else {
905 let namespace = file_info.as_ref().map(|info| info.pod_namespace);
906
907 if insert_namespace_fields
908 && let Some(name) = namespace
909 && ns_annotator.annotate(&mut event, name).is_none()
910 {
911 emit!(KubernetesLogsEventNamespaceAnnotationError { event: &event });
912 }
913
914 let node_info = node_annotator.annotate(&mut event, self_node_name.as_str());
915
916 if node_info.is_none() {
917 emit!(KubernetesLogsEventNodeAnnotationError { event: &event });
918 }
919 }
920
921 checkpoints.update(line.file_id, line.end_offset);
922 event
923 });
924
925 let mut parser = Parser::new(log_namespace);
926 let events = events.flat_map(move |event| {
927 let mut buf = OutputBuffer::with_capacity(1);
928 parser.transform(&mut buf, event);
929 futures::stream::iter(buf.into_events())
930 });
931
932 let (events_count, _) = events.size_hint();
933
934 let mut stream = if auto_partial_merge {
935 merge_partial_events(events, log_namespace, max_merged_line_bytes).left_stream()
936 } else {
937 events.right_stream()
938 };
939
940 let event_processing_loop = out.send_event_stream(&mut stream);
941
942 let mut lifecycle = Lifecycle::new();
943 {
944 let (slot, shutdown) = lifecycle.add();
945 let fut = util::run_file_server(file_server, file_source_tx, shutdown, checkpointer)
946 .map(|result| match result {
947 Ok(FileServerShutdown) => info!(message = "File server completed gracefully."),
948 Err(error) => emit!(KubernetesLifecycleError {
949 message: "File server exited with an error.",
950 error,
951 count: events_count,
952 }),
953 });
954 slot.bind(Box::pin(fut));
955 }
956 {
957 let (slot, shutdown) = lifecycle.add();
958 let fut = util::complete_with_deadline_on_signal(
959 event_processing_loop,
960 shutdown,
961 Duration::from_secs(30), )
963 .map(|result| {
964 match result {
965 Ok(Ok(())) => info!(message = "Event processing loop completed gracefully."),
966 Ok(Err(_)) => emit!(StreamClosedError {
967 count: events_count
968 }),
969 Err(error) => emit!(KubernetesLifecycleError {
970 error,
971 message: "Event processing loop timed out during the shutdown.",
972 count: events_count,
973 }),
974 };
975 });
976 slot.bind(Box::pin(fut));
977 }
978
979 lifecycle.run(global_shutdown).await;
980 for reflector in reflectors {
982 reflector.abort();
983 }
984 info!(message = "Done.");
985 Ok(())
986 }
987}
988
989fn get_page_size(use_apiserver_cache: bool) -> Option<u32> {
991 if use_apiserver_cache {
992 None
993 } else {
994 watcher::Config::default().page_size
995 }
996}
997
998fn create_event(
999 line: Bytes,
1000 file: &str,
1001 ingestion_timestamp_field: Option<&OwnedTargetPath>,
1002 log_namespace: LogNamespace,
1003) -> Event {
1004 let deserializer = BytesDeserializer;
1005 let mut log = deserializer.parse_single(line, log_namespace);
1006
1007 log_namespace.insert_source_metadata(
1008 Config::NAME,
1009 &mut log,
1010 Some(LegacyKey::Overwrite(path!("file"))),
1011 path!("file"),
1012 file,
1013 );
1014
1015 log_namespace.insert_vector_metadata(
1016 &mut log,
1017 log_schema().source_type_key(),
1018 path!("source_type"),
1019 Bytes::from(Config::NAME),
1020 );
1021 match (log_namespace, ingestion_timestamp_field) {
1022 (LogNamespace::Vector, _) => {
1024 log.metadata_mut()
1025 .value_mut()
1026 .insert(path!("vector", "ingest_timestamp"), Utc::now());
1027 }
1028 (LogNamespace::Legacy, Some(ingestion_timestamp_field)) => {
1030 log.try_insert(ingestion_timestamp_field, Utc::now())
1031 }
1032 (LogNamespace::Legacy, None) => (),
1034 };
1035
1036 log.into()
1037}
1038
1039fn default_self_node_name_env_template() -> String {
1042 format!("${{{}}}", SELF_NODE_NAME_ENV_KEY.to_owned())
1043}
1044
1045fn default_path_inclusion() -> Vec<PathBuf> {
1046 vec![PathBuf::from("**/*")]
1047}
1048
1049fn default_path_exclusion() -> Vec<PathBuf> {
1050 vec![PathBuf::from("**/*.gz"), PathBuf::from("**/*.tmp")]
1051}
1052
1053const fn default_max_read_bytes() -> usize {
1054 2048
1055}
1056
1057const fn default_oldest_first() -> bool {
1060 true
1061}
1062
1063const fn default_insert_namespace_fields() -> bool {
1065 true
1066}
1067
1068const fn default_max_line_bytes() -> usize {
1069 32 * 1024 }
1080
1081const fn default_glob_minimum_cooldown_ms() -> Duration {
1082 Duration::from_millis(60_000)
1083}
1084
1085const fn default_fingerprint_lines() -> usize {
1086 1
1087}
1088
1089const fn default_delay_deletion_ms() -> Duration {
1090 Duration::from_millis(60_000)
1091}
1092
1093const fn default_rotate_wait() -> Duration {
1094 Duration::from_secs(u64::MAX / 2)
1095}
1096
1097fn prepare_include_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
1100 prepare_glob_patterns(&config.include_paths_glob_patterns, "Including")
1101}
1102
1103fn prepare_exclude_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
1106 prepare_glob_patterns(&config.exclude_paths_glob_patterns, "Excluding")
1107}
1108
1109fn prepare_glob_patterns(paths: &[PathBuf], op: &str) -> crate::Result<Vec<glob::Pattern>> {
1112 let ret = paths
1113 .iter()
1114 .map(|pattern| {
1115 let pattern = pattern
1116 .to_str()
1117 .ok_or("glob pattern is not a valid UTF-8 string")?;
1118 Ok(glob::Pattern::new(pattern)?)
1119 })
1120 .collect::<crate::Result<Vec<_>>>()?;
1121
1122 info!(
1123 message = format!("{op} matching files."),
1124 ret = ?ret
1125 .iter()
1126 .map(glob::Pattern::as_str)
1127 .collect::<Vec<_>>()
1128 );
1129
1130 Ok(ret)
1131}
1132
1133fn prepare_field_selector(config: &Config, self_node_name: &str) -> crate::Result<String> {
1136 info!(
1137 message = "Obtained Kubernetes Node name to collect logs for (self).",
1138 ?self_node_name
1139 );
1140
1141 let field_selector = format!("spec.nodeName={self_node_name}");
1142
1143 if config.extra_field_selector.is_empty() {
1144 return Ok(field_selector);
1145 }
1146
1147 Ok(format!(
1148 "{},{}",
1149 field_selector, config.extra_field_selector
1150 ))
1151}
1152
1153fn prepare_node_selector(self_node_name: &str) -> crate::Result<String> {
1155 Ok(format!("metadata.name={self_node_name}"))
1156}
1157
1158fn prepare_label_selector(selector: &str) -> String {
1161 const BUILT_IN: &str = "vector.dev/exclude!=true";
1162
1163 if selector.is_empty() {
1164 return BUILT_IN.to_string();
1165 }
1166
1167 format!("{BUILT_IN},{selector}")
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use indoc::indoc;
1173 use similar_asserts::assert_eq;
1174 use vector_lib::{
1175 config::LogNamespace,
1176 lookup::{OwnedTargetPath, owned_value_path},
1177 schema::Definition,
1178 };
1179 use vrl::value::{Kind, kind::Collection};
1180
1181 use super::Config;
1182 use crate::config::SourceConfig;
1183
1184 #[test]
1185 fn generate_config() {
1186 crate::test_util::test_generate_config::<Config>();
1187 }
1188
1189 #[test]
1190 fn test_default_config_insert_namespace_fields() {
1191 let config = Config::default();
1192 assert_eq!(config.insert_namespace_fields, true);
1193 }
1194
1195 #[test]
1196 fn test_config_insert_namespace_fields_disabled() {
1197 let config = Config {
1198 insert_namespace_fields: false,
1199 ..Default::default()
1200 };
1201 assert_eq!(config.insert_namespace_fields, false);
1202 }
1203
1204 #[test]
1205 fn test_config_serialization_insert_namespace_fields() {
1206 let yaml_config = indoc! {r#"
1208 insert_namespace_fields: false
1209 "#};
1210 let config: Config = serde_yaml::from_str(yaml_config).unwrap();
1211 assert_eq!(config.insert_namespace_fields, false);
1212
1213 let default_yaml = "";
1214 let default_config: Config = serde_yaml::from_str(default_yaml).unwrap();
1215 assert_eq!(default_config.insert_namespace_fields, true);
1216 }
1217
1218 #[test]
1219 fn test_insert_namespace_fields_affects_behavior() {
1220 let enabled_config = Config {
1223 insert_namespace_fields: true,
1224 ..Default::default()
1225 };
1226 let disabled_config = Config {
1227 insert_namespace_fields: false,
1228 ..Default::default()
1229 };
1230
1231 assert!(should_watch_namespaces(&enabled_config));
1234 assert!(!should_watch_namespaces(&disabled_config));
1235 }
1236
1237 fn should_watch_namespaces(config: &Config) -> bool {
1239 config.insert_namespace_fields
1240 }
1241
1242 #[test]
1243 fn prepare_exclude_paths() {
1244 let cases = vec![
1245 (
1246 Config::default(),
1247 vec![
1248 glob::Pattern::new("**/*.gz").unwrap(),
1249 glob::Pattern::new("**/*.tmp").unwrap(),
1250 ],
1251 ),
1252 (
1253 Config {
1254 exclude_paths_glob_patterns: vec![std::path::PathBuf::from("**/*.tmp")],
1255 ..Default::default()
1256 },
1257 vec![glob::Pattern::new("**/*.tmp").unwrap()],
1258 ),
1259 (
1260 Config {
1261 exclude_paths_glob_patterns: vec![
1262 std::path::PathBuf::from("**/kube-system_*/**"),
1263 std::path::PathBuf::from("**/*.gz"),
1264 std::path::PathBuf::from("**/*.tmp"),
1265 ],
1266 ..Default::default()
1267 },
1268 vec![
1269 glob::Pattern::new("**/kube-system_*/**").unwrap(),
1270 glob::Pattern::new("**/*.gz").unwrap(),
1271 glob::Pattern::new("**/*.tmp").unwrap(),
1272 ],
1273 ),
1274 ];
1275
1276 for (input, mut expected) in cases {
1277 let mut output = super::prepare_exclude_paths(&input).unwrap();
1278 expected.sort();
1279 output.sort();
1280 assert_eq!(expected, output, "expected left, actual right");
1281 }
1282 }
1283
1284 #[test]
1285 fn prepare_field_selector() {
1286 let cases = vec![
1287 (
1290 Config {
1291 self_node_name: "qwe".to_owned(),
1292 ..Default::default()
1293 },
1294 "spec.nodeName=qwe",
1295 ),
1296 (
1297 Config {
1298 self_node_name: "qwe".to_owned(),
1299 extra_field_selector: "".to_owned(),
1300 ..Default::default()
1301 },
1302 "spec.nodeName=qwe",
1303 ),
1304 (
1305 Config {
1306 self_node_name: "qwe".to_owned(),
1307 extra_field_selector: "foo=bar".to_owned(),
1308 ..Default::default()
1309 },
1310 "spec.nodeName=qwe,foo=bar",
1311 ),
1312 ];
1313
1314 for (input, expected) in cases {
1315 let output = super::prepare_field_selector(&input, "qwe").unwrap();
1316 assert_eq!(expected, output, "expected left, actual right");
1317 }
1318 }
1319
1320 #[test]
1321 fn prepare_label_selector() {
1322 let cases = vec![
1323 (
1324 Config::default().extra_label_selector,
1325 "vector.dev/exclude!=true",
1326 ),
1327 (
1328 Config::default().extra_namespace_label_selector,
1329 "vector.dev/exclude!=true",
1330 ),
1331 (
1332 Config {
1333 extra_label_selector: "".to_owned(),
1334 ..Default::default()
1335 }
1336 .extra_label_selector,
1337 "vector.dev/exclude!=true",
1338 ),
1339 (
1340 Config {
1341 extra_namespace_label_selector: "".to_owned(),
1342 ..Default::default()
1343 }
1344 .extra_namespace_label_selector,
1345 "vector.dev/exclude!=true",
1346 ),
1347 (
1348 Config {
1349 extra_label_selector: "qwe".to_owned(),
1350 ..Default::default()
1351 }
1352 .extra_label_selector,
1353 "vector.dev/exclude!=true,qwe",
1354 ),
1355 (
1356 Config {
1357 extra_namespace_label_selector: "qwe".to_owned(),
1358 ..Default::default()
1359 }
1360 .extra_namespace_label_selector,
1361 "vector.dev/exclude!=true,qwe",
1362 ),
1363 ];
1364
1365 for (input, expected) in cases {
1366 let output = super::prepare_label_selector(&input);
1367 assert_eq!(expected, output, "expected left, actual right");
1368 }
1369 }
1370
1371 #[test]
1372 fn test_output_schema_definition_vector_namespace() {
1373 let definitions = serde_yaml::from_str::<Config>("")
1374 .unwrap()
1375 .outputs(LogNamespace::Vector)
1376 .remove(0)
1377 .schema_definition(true);
1378
1379 assert_eq!(
1380 definitions,
1381 Some(
1382 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1383 .with_metadata_field(
1384 &owned_value_path!("kubernetes_logs", "file"),
1385 Kind::bytes(),
1386 None
1387 )
1388 .with_metadata_field(
1389 &owned_value_path!("kubernetes_logs", "container_id"),
1390 Kind::bytes().or_undefined(),
1391 None
1392 )
1393 .with_metadata_field(
1394 &owned_value_path!("kubernetes_logs", "container_image"),
1395 Kind::bytes().or_undefined(),
1396 None
1397 )
1398 .with_metadata_field(
1399 &owned_value_path!("kubernetes_logs", "container_name"),
1400 Kind::bytes().or_undefined(),
1401 None
1402 )
1403 .with_metadata_field(
1404 &owned_value_path!("kubernetes_logs", "namespace_labels"),
1405 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1406 .or_undefined(),
1407 None
1408 )
1409 .with_metadata_field(
1410 &owned_value_path!("kubernetes_logs", "node_labels"),
1411 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1412 .or_undefined(),
1413 None
1414 )
1415 .with_metadata_field(
1416 &owned_value_path!("kubernetes_logs", "pod_annotations"),
1417 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1418 .or_undefined(),
1419 None
1420 )
1421 .with_metadata_field(
1422 &owned_value_path!("kubernetes_logs", "pod_ip"),
1423 Kind::bytes().or_undefined(),
1424 None
1425 )
1426 .with_metadata_field(
1427 &owned_value_path!("kubernetes_logs", "pod_ips"),
1428 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1429 None
1430 )
1431 .with_metadata_field(
1432 &owned_value_path!("kubernetes_logs", "pod_labels"),
1433 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1434 .or_undefined(),
1435 None
1436 )
1437 .with_metadata_field(
1438 &owned_value_path!("kubernetes_logs", "pod_name"),
1439 Kind::bytes().or_undefined(),
1440 None
1441 )
1442 .with_metadata_field(
1443 &owned_value_path!("kubernetes_logs", "pod_namespace"),
1444 Kind::bytes().or_undefined(),
1445 None
1446 )
1447 .with_metadata_field(
1448 &owned_value_path!("kubernetes_logs", "pod_node_name"),
1449 Kind::bytes().or_undefined(),
1450 None
1451 )
1452 .with_metadata_field(
1453 &owned_value_path!("kubernetes_logs", "pod_owner"),
1454 Kind::bytes().or_undefined(),
1455 None
1456 )
1457 .with_metadata_field(
1458 &owned_value_path!("kubernetes_logs", "pod_uid"),
1459 Kind::bytes().or_undefined(),
1460 None
1461 )
1462 .with_metadata_field(
1463 &owned_value_path!("kubernetes_logs", "stream"),
1464 Kind::bytes(),
1465 None
1466 )
1467 .with_metadata_field(
1468 &owned_value_path!("kubernetes_logs", "timestamp"),
1469 Kind::timestamp(),
1470 Some("timestamp")
1471 )
1472 .with_metadata_field(
1473 &owned_value_path!("vector", "source_type"),
1474 Kind::bytes(),
1475 None
1476 )
1477 .with_metadata_field(
1478 &owned_value_path!("vector", "ingest_timestamp"),
1479 Kind::timestamp(),
1480 None
1481 )
1482 .with_meaning(OwnedTargetPath::event_root(), "message")
1483 )
1484 )
1485 }
1486
1487 #[test]
1488 fn test_output_schema_definition_legacy_namespace() {
1489 let definitions = serde_yaml::from_str::<Config>("")
1490 .unwrap()
1491 .outputs(LogNamespace::Legacy)
1492 .remove(0)
1493 .schema_definition(true);
1494
1495 assert_eq!(
1496 definitions,
1497 Some(
1498 Definition::new_with_default_metadata(
1499 Kind::object(Collection::empty()),
1500 [LogNamespace::Legacy]
1501 )
1502 .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1503 .with_event_field(
1504 &owned_value_path!("message"),
1505 Kind::bytes(),
1506 Some("message")
1507 )
1508 .with_event_field(
1509 &owned_value_path!("kubernetes", "container_id"),
1510 Kind::bytes().or_undefined(),
1511 None
1512 )
1513 .with_event_field(
1514 &owned_value_path!("kubernetes", "container_image"),
1515 Kind::bytes().or_undefined(),
1516 None
1517 )
1518 .with_event_field(
1519 &owned_value_path!("kubernetes", "container_name"),
1520 Kind::bytes().or_undefined(),
1521 None
1522 )
1523 .with_event_field(
1524 &owned_value_path!("kubernetes", "namespace_labels"),
1525 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1526 None
1527 )
1528 .with_event_field(
1529 &owned_value_path!("kubernetes", "node_labels"),
1530 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1531 None
1532 )
1533 .with_event_field(
1534 &owned_value_path!("kubernetes", "pod_annotations"),
1535 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1536 None
1537 )
1538 .with_event_field(
1539 &owned_value_path!("kubernetes", "pod_ip"),
1540 Kind::bytes().or_undefined(),
1541 None
1542 )
1543 .with_event_field(
1544 &owned_value_path!("kubernetes", "pod_ips"),
1545 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1546 None
1547 )
1548 .with_event_field(
1549 &owned_value_path!("kubernetes", "pod_labels"),
1550 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1551 None
1552 )
1553 .with_event_field(
1554 &owned_value_path!("kubernetes", "pod_name"),
1555 Kind::bytes().or_undefined(),
1556 None
1557 )
1558 .with_event_field(
1559 &owned_value_path!("kubernetes", "pod_namespace"),
1560 Kind::bytes().or_undefined(),
1561 None
1562 )
1563 .with_event_field(
1564 &owned_value_path!("kubernetes", "pod_node_name"),
1565 Kind::bytes().or_undefined(),
1566 None
1567 )
1568 .with_event_field(
1569 &owned_value_path!("kubernetes", "pod_owner"),
1570 Kind::bytes().or_undefined(),
1571 None
1572 )
1573 .with_event_field(
1574 &owned_value_path!("kubernetes", "pod_uid"),
1575 Kind::bytes().or_undefined(),
1576 None
1577 )
1578 .with_event_field(&owned_value_path!("stream"), Kind::bytes(), None)
1579 .with_event_field(
1580 &owned_value_path!("timestamp"),
1581 Kind::timestamp(),
1582 Some("timestamp")
1583 )
1584 .with_event_field(
1585 &owned_value_path!("source_type"),
1586 Kind::bytes(),
1587 None
1588 )
1589 )
1590 )
1591 }
1592}