1use std::{
2 collections::HashMap,
3 convert::TryFrom,
4 future::ready,
5 pin::Pin,
6 sync::{Arc, LazyLock},
7 time::Duration,
8};
9
10use bollard::{
11 Docker,
12 container::LogOutput,
13 errors::Error as DockerError,
14 query_parameters::{
15 EventsOptionsBuilder, InspectContainerOptions, ListContainersOptionsBuilder,
16 LogsOptionsBuilder,
17 },
18 service::{ContainerInspectResponse, EventMessage},
19};
20use bytes::{Buf, Bytes};
21use chrono::{DateTime, FixedOffset, Local, ParseError, Utc};
22use futures::{Stream, StreamExt};
23use serde_with::serde_as;
24use tokio::sync::mpsc;
25use vector_lib::{
26 codecs::{BytesDeserializer, BytesDeserializerConfig},
27 config::{LegacyKey, LogNamespace},
28 configurable::configurable_component,
29 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered},
30 lookup::{
31 OwnedValuePath, PathPrefix, lookup_v2::OptionalValuePath, metadata_path, owned_value_path,
32 path,
33 },
34};
35use vrl::{
36 event_path,
37 value::{Kind, kind::Collection},
38};
39
40use super::util::MultilineConfig;
41use crate::{
42 SourceSender,
43 common::backoff::ExponentialBackoff,
44 config::{DataType, SourceConfig, SourceContext, SourceOutput, log_schema},
45 docker::{DockerTlsConfig, docker},
46 event::{self, EstimatedJsonEncodedSizeOf, LogEvent, Value, merge_state::LogEventMergeState},
47 internal_events::{
48 DockerLogsCommunicationError, DockerLogsContainerEventReceived,
49 DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
50 DockerLogsContainerWatch, DockerLogsEventsReceived,
51 DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError,
52 },
53 line_agg::{self, LineAgg},
54 shutdown::ShutdownSignal,
55};
56
57#[cfg(test)]
58mod tests;
59
60const IMAGE: &str = "image";
61const CREATED_AT: &str = "container_created_at";
62const NAME: &str = "container_name";
63const STREAM: &str = "stream";
64const CONTAINER: &str = "container_id";
65const MIN_HOSTNAME_LENGTH: usize = 6;
67
68static STDERR: LazyLock<Bytes> = LazyLock::new(|| "stderr".into());
69static STDOUT: LazyLock<Bytes> = LazyLock::new(|| "stdout".into());
70static CONSOLE: LazyLock<Bytes> = LazyLock::new(|| "console".into());
71
72#[serde_as]
74#[configurable_component(source("docker_logs", "Collect container logs from a Docker Daemon."))]
75#[derive(Clone, Debug)]
76#[serde(deny_unknown_fields, default)]
77pub struct DockerLogsConfig {
78 host_key: Option<OptionalValuePath>,
84
85 #[configurable(metadata(docs::examples = "http://localhost:2375"))]
93 #[configurable(metadata(docs::examples = "https://localhost:2376"))]
94 #[configurable(metadata(docs::examples = "unix:///var/run/docker.sock"))]
95 #[configurable(metadata(docs::examples = "npipe:////./pipe/docker_engine"))]
96 #[configurable(metadata(docs::examples = "/var/run/docker.sock"))]
97 #[configurable(metadata(docs::examples = "//./pipe/docker_engine"))]
98 docker_host: Option<String>,
99
100 #[configurable(metadata(
112 docs::examples = "exclude_",
113 docs::examples = "exclude_me_0",
114 docs::examples = "ad08cc418cf9"
115 ))]
116 exclude_containers: Option<Vec<String>>, #[configurable(metadata(
128 docs::examples = "include_",
129 docs::examples = "include_me_0",
130 docs::examples = "ad08cc418cf9"
131 ))]
132 include_containers: Option<Vec<String>>, #[configurable(metadata(
138 docs::examples = "org.opencontainers.image.vendor=Vector",
139 docs::examples = "com.mycorp.internal.animal=fish",
140 ))]
141 include_labels: Option<Vec<String>>,
142
143 #[configurable(metadata(docs::examples = "httpd", docs::examples = "redis",))]
147 include_images: Option<Vec<String>>,
148
149 #[serde(default = "default_partial_event_marker_field")]
154 partial_event_marker_field: Option<String>,
155
156 auto_partial_merge: bool,
158
159 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
161 #[serde(default = "default_retry_backoff_secs")]
162 #[configurable(metadata(docs::human_name = "Retry Backoff"))]
163 retry_backoff_secs: Duration,
164
165 #[configurable(derived)]
169 multiline: Option<MultilineConfig>,
170
171 #[configurable(derived)]
172 tls: Option<DockerTlsConfig>,
173
174 #[serde(default)]
176 #[configurable(metadata(docs::hidden))]
177 pub log_namespace: Option<bool>,
178}
179
180impl Default for DockerLogsConfig {
181 fn default() -> Self {
182 Self {
183 host_key: None,
184 docker_host: None,
185 tls: None,
186 exclude_containers: None,
187 include_containers: None,
188 include_labels: None,
189 include_images: None,
190 partial_event_marker_field: default_partial_event_marker_field(),
191 auto_partial_merge: true,
192 multiline: None,
193 retry_backoff_secs: default_retry_backoff_secs(),
194 log_namespace: None,
195 }
196 }
197}
198
199fn default_partial_event_marker_field() -> Option<String> {
200 Some(event::PARTIAL.to_string())
201}
202
203const fn default_retry_backoff_secs() -> Duration {
204 Duration::from_secs(2)
205}
206
207impl DockerLogsConfig {
208 fn container_name_or_id_included<'a>(
209 &self,
210 id: &str,
211 names: impl IntoIterator<Item = &'a str>,
212 ) -> bool {
213 let containers: Vec<String> = names.into_iter().map(Into::into).collect();
214
215 self.include_containers
216 .as_ref()
217 .map(|include_list| Self::name_or_id_matches(id, &containers, include_list))
218 .unwrap_or(true)
219 && !(self
220 .exclude_containers
221 .as_ref()
222 .map(|exclude_list| Self::name_or_id_matches(id, &containers, exclude_list))
223 .unwrap_or(false))
224 }
225
226 fn name_or_id_matches(id: &str, names: &[String], items: &[String]) -> bool {
227 items.iter().any(|flag| id.starts_with(flag))
228 || names
229 .iter()
230 .any(|name| items.iter().any(|item| name.starts_with(item)))
231 }
232
233 fn with_empty_partial_event_marker_field_as_none(mut self) -> Self {
234 if let Some(val) = &self.partial_event_marker_field
235 && val.is_empty()
236 {
237 self.partial_event_marker_field = None;
238 }
239 self
240 }
241}
242
243impl_generate_config_from_default!(DockerLogsConfig);
244
245#[async_trait::async_trait]
246#[typetag::serde(name = "docker_logs")]
247impl SourceConfig for DockerLogsConfig {
248 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
249 let log_namespace = cx.log_namespace(self.log_namespace);
250 let source = DockerLogsSource::new(
251 self.clone().with_empty_partial_event_marker_field_as_none(),
252 cx.out,
253 cx.shutdown.clone(),
254 log_namespace,
255 )?;
256
257 let fut = async move {
259 match source.handle_running_containers().await {
260 Ok(source) => source.run().await,
261 Err(error) => {
262 error!(
263 message = "Listing currently running containers failed.",
264 ?error
265 );
266 }
267 }
268 };
269
270 let shutdown = cx.shutdown;
271 Ok(Box::pin(async move {
273 Ok(tokio::select! {
274 _ = fut => {}
275 _ = shutdown => {}
276 })
277 }))
278 }
279
280 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
281 let host_key = self
282 .host_key
283 .clone()
284 .unwrap_or(log_schema().host_key().cloned().into())
285 .path
286 .map(LegacyKey::Overwrite);
287
288 let schema_definition = BytesDeserializerConfig
289 .schema_definition(global_log_namespace.merge(self.log_namespace))
290 .with_source_metadata(
291 Self::NAME,
292 host_key,
293 &owned_value_path!("host"),
294 Kind::bytes().or_undefined(),
295 Some("host"),
296 )
297 .with_source_metadata(
298 Self::NAME,
299 Some(LegacyKey::Overwrite(owned_value_path!(CONTAINER))),
300 &owned_value_path!(CONTAINER),
301 Kind::bytes(),
302 None,
303 )
304 .with_source_metadata(
305 Self::NAME,
306 Some(LegacyKey::Overwrite(owned_value_path!(IMAGE))),
307 &owned_value_path!(IMAGE),
308 Kind::bytes(),
309 None,
310 )
311 .with_source_metadata(
312 Self::NAME,
313 Some(LegacyKey::Overwrite(owned_value_path!(NAME))),
314 &owned_value_path!(NAME),
315 Kind::bytes(),
316 None,
317 )
318 .with_source_metadata(
319 Self::NAME,
320 Some(LegacyKey::Overwrite(owned_value_path!(CREATED_AT))),
321 &owned_value_path!(CREATED_AT),
322 Kind::timestamp(),
323 None,
324 )
325 .with_source_metadata(
326 Self::NAME,
327 Some(LegacyKey::Overwrite(owned_value_path!("label"))),
328 &owned_value_path!("labels"),
329 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
330 None,
331 )
332 .with_source_metadata(
333 Self::NAME,
334 Some(LegacyKey::Overwrite(owned_value_path!(STREAM))),
335 &owned_value_path!(STREAM),
336 Kind::bytes(),
337 None,
338 )
339 .with_source_metadata(
340 Self::NAME,
341 log_schema()
342 .timestamp_key()
343 .cloned()
344 .map(LegacyKey::Overwrite),
345 &owned_value_path!("timestamp"),
346 Kind::timestamp(),
347 Some("timestamp"),
348 )
349 .with_vector_metadata(
350 log_schema().source_type_key(),
351 &owned_value_path!("source_type"),
352 Kind::bytes(),
353 None,
354 )
355 .with_vector_metadata(
356 None,
357 &owned_value_path!("ingest_timestamp"),
358 Kind::timestamp(),
359 None,
360 );
361
362 vec![SourceOutput::new_maybe_logs(
363 DataType::Log,
364 schema_definition,
365 )]
366 }
367
368 fn can_acknowledge(&self) -> bool {
369 false
370 }
371}
372
373struct DockerLogsSourceCore {
374 config: DockerLogsConfig,
375 line_agg_config: Option<line_agg::Config>,
376 docker: Docker,
377 now_timestamp: DateTime<Utc>,
379}
380
381impl DockerLogsSourceCore {
382 fn new(config: DockerLogsConfig) -> crate::Result<Self> {
383 let docker = docker(config.docker_host.clone(), config.tls.clone())?;
386
387 let now = Local::now();
389 info!(
390 message = "Capturing logs from now on.",
391 now = %now.to_rfc3339()
392 );
393
394 let line_agg_config = if let Some(ref multiline_config) = config.multiline {
395 Some(line_agg::Config::try_from(multiline_config)?)
396 } else {
397 None
398 };
399
400 Ok(DockerLogsSourceCore {
401 config,
402 line_agg_config,
403 docker,
404 now_timestamp: now.into(),
405 })
406 }
407
408 fn docker_logs_event_stream(
410 &self,
411 ) -> impl Stream<Item = Result<EventMessage, DockerError>> + Send + use<> {
412 let mut filters = HashMap::new();
413
414 filters.insert(
421 "event".to_owned(),
422 vec![
423 "start".to_owned(),
424 "unpause".to_owned(),
425 "die".to_owned(),
426 "pause".to_owned(),
427 ],
428 );
429 filters.insert("type".to_owned(), vec!["container".to_owned()]);
430
431 if let Some(include_labels) = &self.config.include_labels {
433 filters.insert("label".to_owned(), include_labels.clone());
434 }
435
436 if let Some(include_images) = &self.config.include_images {
437 filters.insert("image".to_owned(), include_images.clone());
438 }
439
440 self.docker.events(Some(
441 EventsOptionsBuilder::new()
442 .since(&self.now_timestamp.timestamp().to_string())
443 .filters(&filters)
444 .build(),
445 ))
446 }
447}
448
449struct DockerLogsSource {
461 esb: EventStreamBuilder,
462 events: Pin<Box<dyn Stream<Item = Result<EventMessage, DockerError>> + Send>>,
464 containers: HashMap<ContainerId, ContainerState>,
466 main_recv: mpsc::UnboundedReceiver<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
468 hostname: Option<String>,
470 backoff_duration: Duration,
471 events_backoff: ExponentialBackoff,
473}
474
475impl DockerLogsSource {
476 fn new(
477 config: DockerLogsConfig,
478 out: SourceSender,
479 shutdown: ShutdownSignal,
480 log_namespace: LogNamespace,
481 ) -> crate::Result<DockerLogsSource> {
482 let backoff_secs = config.retry_backoff_secs;
483
484 let host_key = config
485 .host_key
486 .clone()
487 .unwrap_or(log_schema().host_key().cloned().into());
488 let hostname = crate::get_hostname().ok();
489
490 let core = DockerLogsSourceCore::new(config)?;
492
493 let events = core.docker_logs_event_stream();
495 info!(message = "Listening to docker log events.");
496
497 let (main_send, main_recv) =
499 mpsc::unbounded_channel::<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>();
500
501 let esb = EventStreamBuilder {
510 host_key,
511 hostname: hostname.clone(),
512 core: Arc::new(core),
513 out,
514 main_send,
515 shutdown,
516 log_namespace,
517 };
518
519 Ok(DockerLogsSource {
520 esb,
521 events: Box::pin(events),
522 containers: HashMap::new(),
523 main_recv,
524 hostname,
525 backoff_duration: backoff_secs,
526 events_backoff: ExponentialBackoff::default(),
527 })
528 }
529
530 async fn handle_running_containers(mut self) -> crate::Result<Self> {
532 let mut filters = HashMap::new();
533
534 if let Some(include_labels) = &self.esb.core.config.include_labels {
536 filters.insert("label".to_owned(), include_labels.clone());
537 }
538
539 if let Some(include_images) = &self.esb.core.config.include_images {
540 filters.insert("ancestor".to_owned(), include_images.clone());
541 }
542
543 self.esb
544 .core
545 .docker
546 .list_containers(Some(
547 ListContainersOptionsBuilder::new()
548 .all(false)
549 .filters(&filters)
550 .build(),
551 ))
552 .await?
553 .into_iter()
554 .for_each(|container| {
555 let id = container.id.unwrap();
556 let names = container.names.unwrap();
557
558 trace!(message = "Found already running container.", id = %id, names = ?names);
559
560 if self.exclude_self(id.as_str()) {
561 info!(message = "Excluded self container.", id = %id);
562 return;
563 }
564
565 if !self.esb.core.config.container_name_or_id_included(
566 id.as_str(),
567 names.iter().map(|s| {
568 let s = s.as_str();
570 if s.starts_with('/') {
571 s.split_at('/'.len_utf8()).1
572 } else {
573 s
574 }
575 }),
576 ) {
577 info!(message = "Excluded container.", id = %id);
578 return;
579 }
580
581 let id = ContainerId::new(id);
582 self.containers.insert(id.clone(), self.esb.start(id, None));
583 });
584
585 Ok(self)
586 }
587
588 async fn run(mut self) {
589 loop {
590 tokio::select! {
591 value = self.main_recv.recv() => {
592 match value {
593 Some(Ok(info)) => {
594 let state = self
595 .containers
596 .get_mut(&info.id)
597 .expect("Every ContainerLogInfo has it's ContainerState");
598 if state.return_info(info) {
599 self.esb.restart(state);
600 }
601 },
602 Some(Err((id,persistence))) => {
603 let state = self
604 .containers
605 .remove(&id)
606 .expect("Every started ContainerId has it's ContainerState");
607 match persistence{
608 ErrorPersistence::Transient => if state.is_running() {
609 let backoff= Some(self.backoff_duration);
610 self.containers.insert(id.clone(), self.esb.start(id, backoff));
611 }
612 ErrorPersistence::Permanent => (),
614 }
615 }
616 None => {
617 error!(message = "The docker_logs source main stream has ended unexpectedly.", internal_log_rate_limit = false);
618 info!(message = "Shutting down docker_logs source.");
619 return;
620 }
621 };
622 }
623 value = self.events.next() => {
624 match value {
625 Some(Ok(mut event)) => {
626 self.events_backoff.reset();
628
629 let action = event.action.unwrap();
630 let actor = event.actor.take().unwrap();
631 let id = actor.id.unwrap();
632 let attributes = actor.attributes.unwrap();
633
634 emit!(DockerLogsContainerEventReceived { container_id: &id, action: &action });
635
636 let id = ContainerId::new(id.to_owned());
637
638 match action.as_str() {
640 "die" | "pause" => {
641 if let Some(state) = self.containers.get_mut(&id) {
642 state.stopped();
643 }
644 }
645 "start" | "unpause" => {
646 if let Some(state) = self.containers.get_mut(&id) {
647 state.running();
648 self.esb.restart(state);
649 } else {
650 let include_name =
651 self.esb.core.config.container_name_or_id_included(
652 id.as_str(),
653 attributes.get("name").map(|s| s.as_str()),
654 );
655
656 let exclude_self = self.exclude_self(id.as_str());
657
658 if include_name && !exclude_self {
659 self.containers.insert(id.clone(), self.esb.start(id, None));
660 }
661 }
662 }
663 _ => {},
664 };
665 }
666 Some(Err(error)) => {
667 emit!(DockerLogsCommunicationError {
668 error,
669 container_id: None,
670 });
671 if !self.retry_events_stream_with_backoff("Docker events stream failed").await {
673 error!("Docker events stream failed and retry exhausted, shutting down.");
674 return;
675 }
676 },
677 None => {
678 if !self.retry_events_stream_with_backoff("Docker events stream ended").await {
680 error!("Docker events stream ended and retry exhausted, shutting down.");
681 return;
682 }
683 }
684 };
685 }
686 };
687 }
688 }
689
690 async fn retry_events_stream_with_backoff(&mut self, reason: &str) -> bool {
693 if let Some(delay) = self.events_backoff.next() {
694 warn!(
695 message = reason,
696 action = "retrying with backoff",
697 delay_ms = delay.as_millis()
698 );
699 tokio::select! {
700 _ = tokio::time::sleep(delay) => {
701 self.events = Box::pin(self.esb.core.docker_logs_event_stream());
702 true
703 }
704 _ = self.esb.shutdown.clone() => {
705 info!("Shutdown signal received during retry backoff.");
706 false
707 }
708 }
709 } else {
710 error!(message = "Events stream retry exhausted.", reason = reason);
711 false
712 }
713 }
714
715 fn exclude_self(&self, id: &str) -> bool {
716 self.hostname
717 .as_ref()
718 .map(|hostname| id.starts_with(hostname) && hostname.len() >= MIN_HOSTNAME_LENGTH)
719 .unwrap_or(false)
720 }
721}
722
723#[derive(Clone)]
725struct EventStreamBuilder {
726 host_key: OptionalValuePath,
727 hostname: Option<String>,
728 core: Arc<DockerLogsSourceCore>,
729 out: SourceSender,
731 main_send: mpsc::UnboundedSender<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
733 shutdown: ShutdownSignal,
735 log_namespace: LogNamespace,
736}
737
738impl EventStreamBuilder {
739 fn start(&self, id: ContainerId, backoff: Option<Duration>) -> ContainerState {
741 let this = self.clone();
742 crate::spawn_in_current_span(async move {
743 if let Some(duration) = backoff {
744 tokio::time::sleep(duration).await;
745 }
746
747 match this
748 .core
749 .docker
750 .inspect_container(id.as_str(), None::<InspectContainerOptions>)
751 .await
752 {
753 Ok(details) => match ContainerMetadata::from_details(details) {
754 Ok(metadata) => {
755 let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp);
756 this.run_event_stream(info).await;
757 return;
758 }
759 Err(error) => emit!(DockerLogsTimestampParseError {
760 error,
761 container_id: id.as_str()
762 }),
763 },
764 Err(error) => emit!(DockerLogsContainerMetadataFetchError {
765 error,
766 container_id: id.as_str()
767 }),
768 }
769
770 this.finish(Err((id, ErrorPersistence::Transient)));
771 });
772
773 ContainerState::new_running()
774 }
775
776 fn restart(&self, container: &mut ContainerState) {
778 if let Some(info) = container.take_info() {
779 let this = self.clone();
780 crate::spawn_in_current_span(this.run_event_stream(info));
781 }
782 }
783
784 async fn run_event_stream(mut self, mut info: ContainerLogInfo) {
785 let options = Some(
787 LogsOptionsBuilder::new()
788 .follow(true)
789 .stdout(true)
790 .stderr(true)
791 .since(info.log_since() as i32) .timestamps(true)
793 .build(),
794 );
795
796 let stream = self.core.docker.logs(info.id.as_str(), options);
797 emit!(DockerLogsContainerWatch {
798 container_id: info.id.as_str()
799 });
800
801 let mut partial_event_merge_state = None;
803
804 let core = Arc::clone(&self.core);
805
806 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
807
808 let mut error = None;
809 let events_stream = stream
810 .map(|value| {
811 match value {
812 Ok(message) => Ok(info.new_event(
813 message,
814 core.config.partial_event_marker_field.clone(),
815 core.config.auto_partial_merge,
816 &mut partial_event_merge_state,
817 &bytes_received,
818 self.log_namespace,
819 )),
820 Err(error) => {
821 match &error {
823 DockerError::DockerResponseServerError { status_code, .. }
824 if *status_code == http::StatusCode::NOT_IMPLEMENTED =>
825 {
826 emit!(DockerLogsLoggingDriverUnsupportedError {
827 error,
828 container_id: info.id.as_str(),
829 });
830 Err(ErrorPersistence::Permanent)
831 }
832 _ => {
833 emit!(DockerLogsCommunicationError {
834 error,
835 container_id: Some(info.id.as_str())
836 });
837 Err(ErrorPersistence::Transient)
838 }
839 }
840 }
841 }
842 })
843 .take_while(|v| {
844 error = v.as_ref().err().cloned();
845 ready(v.is_ok())
846 })
847 .filter_map(|v| ready(v.ok().flatten()))
848 .take_until(self.shutdown.clone());
849
850 let events_stream: Box<dyn Stream<Item = LogEvent> + Unpin + Send> =
851 if let Some(ref line_agg_config) = core.line_agg_config {
852 Box::new(line_agg_adapter(
853 events_stream,
854 line_agg::Logic::new(line_agg_config.clone()),
855 self.log_namespace,
856 ))
857 } else {
858 Box::new(events_stream)
859 };
860
861 let host_key = self.host_key.clone().path;
862 let hostname = self.hostname.clone();
863 let result = {
864 let mut stream = events_stream
865 .map(move |event| add_hostname(event, &host_key, &hostname, self.log_namespace));
866 self.out.send_event_stream(&mut stream).await.map_err(|_| {
867 let (count, _) = stream.size_hint();
868 emit!(StreamClosedError { count });
869 })
870 };
871
872 emit!(DockerLogsContainerUnwatch {
874 container_id: info.id.as_str()
875 });
876
877 let result = match (result, error) {
878 (Ok(()), None) => Ok(info),
879 (Err(()), _) => Err((info.id, ErrorPersistence::Permanent)),
880 (_, Some(occurrence)) => Err((info.id, occurrence)),
881 };
882
883 self.finish(result);
884 }
885
886 fn finish(self, result: Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>) {
887 _ = self.main_send.send(result);
890 }
891}
892
893fn add_hostname(
894 mut log: LogEvent,
895 host_key: &Option<OwnedValuePath>,
896 hostname: &Option<String>,
897 log_namespace: LogNamespace,
898) -> LogEvent {
899 if let Some(hostname) = hostname {
900 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
901
902 log_namespace.insert_source_metadata(
903 DockerLogsConfig::NAME,
904 &mut log,
905 legacy_host_key,
906 path!("host"),
907 hostname.clone(),
908 );
909 }
910
911 log
912}
913
914#[derive(Copy, Clone, Debug, Eq, PartialEq)]
915enum ErrorPersistence {
916 Transient,
917 Permanent,
918}
919
920#[derive(Hash, Clone, Eq, PartialEq, Ord, PartialOrd)]
923struct ContainerId(Bytes);
924
925impl ContainerId {
926 fn new(id: String) -> Self {
927 ContainerId(id.into())
928 }
929
930 fn as_str(&self) -> &str {
931 std::str::from_utf8(&self.0).expect("Container Id Bytes aren't String")
932 }
933}
934
935struct ContainerState {
937 info: Option<ContainerLogInfo>,
939 running: bool,
941 generation: u64,
943}
944
945impl ContainerState {
946 const fn new_running() -> Self {
948 ContainerState {
949 info: None,
950 running: true,
951 generation: 0,
952 }
953 }
954
955 const fn running(&mut self) {
956 self.running = true;
957 self.generation += 1;
958 }
959
960 const fn stopped(&mut self) {
961 self.running = false;
962 }
963
964 const fn is_running(&self) -> bool {
965 self.running
966 }
967
968 #[must_use]
970 fn return_info(&mut self, info: ContainerLogInfo) -> bool {
971 debug_assert!(self.info.is_none());
972 let restart = self.running || info.generation < self.generation;
975 self.info = Some(info);
976 restart
977 }
978
979 fn take_info(&mut self) -> Option<ContainerLogInfo> {
980 self.info.take().map(|mut info| {
981 info.generation = self.generation;
983 info
984 })
985 }
986}
987
988struct ContainerLogInfo {
990 id: ContainerId,
992 created: DateTime<Utc>,
994 last_log: Option<(DateTime<FixedOffset>, u64)>,
996 generation: u64,
998 metadata: ContainerMetadata,
999}
1000
1001impl ContainerLogInfo {
1002 const fn new(id: ContainerId, metadata: ContainerMetadata, created: DateTime<Utc>) -> Self {
1005 ContainerLogInfo {
1006 id,
1007 created,
1008 last_log: None,
1009 generation: 0,
1010 metadata,
1011 }
1012 }
1013
1014 fn log_since(&self) -> i64 {
1016 self.last_log
1017 .as_ref()
1018 .map(|(d, _)| d.timestamp())
1019 .unwrap_or_else(|| self.created.timestamp())
1020 - 1
1021 }
1022
1023 fn new_event(
1026 &mut self,
1027 log_output: LogOutput,
1028 partial_event_marker_field: Option<String>,
1029 auto_partial_merge: bool,
1030 partial_event_merge_state: &mut Option<LogEventMergeState>,
1031 bytes_received: &Registered<BytesReceived>,
1032 log_namespace: LogNamespace,
1033 ) -> Option<LogEvent> {
1034 let (stream, mut bytes_message) = match log_output {
1035 LogOutput::StdErr { message } => (STDERR.clone(), message),
1036 LogOutput::StdOut { message } => (STDOUT.clone(), message),
1037 LogOutput::Console { message } => (CONSOLE.clone(), message),
1038 LogOutput::StdIn { message: _ } => return None,
1039 };
1040
1041 bytes_received.emit(ByteSize(bytes_message.len()));
1042
1043 let message = String::from_utf8_lossy(&bytes_message);
1044 let mut splitter = message.splitn(2, char::is_whitespace);
1045 let timestamp_str = splitter.next()?;
1046 let timestamp = match DateTime::parse_from_rfc3339(timestamp_str) {
1047 Ok(timestamp) => {
1048 match self.last_log.as_ref() {
1052 Some(&(last, generation)) => {
1053 if last < timestamp || (last == timestamp && generation == self.generation)
1054 {
1055 } else {
1057 trace!(
1061 message = "Received log from previous container generation.",
1062 log_timestamp = %timestamp_str,
1063 last_log_timestamp = %last,
1064 );
1065 return None;
1066 }
1067 }
1068 None => {
1069 if self.created < timestamp.with_timezone(&Utc) {
1070 } else {
1072 trace!(
1075 message = "Received log from before created timestamp.",
1076 log_timestamp = %timestamp_str,
1077 created_timestamp = %self.created
1078 );
1079 return None;
1080 }
1081 }
1082 }
1083
1084 self.last_log = Some((timestamp, self.generation));
1085
1086 let log_len = splitter.next().map(|log| log.len()).unwrap_or(0);
1087 let remove_len = message.len() - log_len;
1088 bytes_message.advance(remove_len);
1089
1090 Some(timestamp.with_timezone(&Utc))
1092 }
1093 Err(error) => {
1094 emit!(DockerLogsTimestampParseError {
1096 error,
1097 container_id: self.id.as_str()
1098 });
1099 None
1101 }
1102 };
1103
1104 let is_partial = if bytes_message
1110 .last()
1111 .map(|&b| b as char == '\n')
1112 .unwrap_or(false)
1113 {
1114 bytes_message.truncate(bytes_message.len() - 1);
1115 if bytes_message
1116 .last()
1117 .map(|&b| b as char == '\r')
1118 .unwrap_or(false)
1119 {
1120 bytes_message.truncate(bytes_message.len() - 1);
1121 }
1122 false
1123 } else {
1124 true
1125 };
1126
1127 let deserializer = BytesDeserializer;
1129 let mut log = deserializer.parse_single(bytes_message, log_namespace);
1130
1131 log_namespace.insert_source_metadata(
1133 DockerLogsConfig::NAME,
1134 &mut log,
1135 Some(LegacyKey::Overwrite(path!(CONTAINER))),
1136 path!(CONTAINER),
1137 self.id.0.clone(),
1138 );
1139 log_namespace.insert_source_metadata(
1141 DockerLogsConfig::NAME,
1142 &mut log,
1143 Some(LegacyKey::Overwrite(path!(IMAGE))),
1144 path!(IMAGE),
1145 self.metadata.image.clone(),
1146 );
1147 log_namespace.insert_source_metadata(
1149 DockerLogsConfig::NAME,
1150 &mut log,
1151 Some(LegacyKey::Overwrite(path!(NAME))),
1152 path!(NAME),
1153 self.metadata.name.clone(),
1154 );
1155 log_namespace.insert_source_metadata(
1157 DockerLogsConfig::NAME,
1158 &mut log,
1159 Some(LegacyKey::Overwrite(path!(CREATED_AT))),
1160 path!(CREATED_AT),
1161 self.metadata.created_at,
1162 );
1163 if !self.metadata.labels.is_empty() {
1165 for (key, value) in self.metadata.labels.iter() {
1166 log_namespace.insert_source_metadata(
1167 DockerLogsConfig::NAME,
1168 &mut log,
1169 Some(LegacyKey::Overwrite(path!("label", key))),
1170 path!("labels", key),
1171 value.clone(),
1172 )
1173 }
1174 }
1175 log_namespace.insert_source_metadata(
1176 DockerLogsConfig::NAME,
1177 &mut log,
1178 Some(LegacyKey::Overwrite(path!(STREAM))),
1179 path!(STREAM),
1180 stream,
1181 );
1182
1183 log_namespace.insert_vector_metadata(
1184 &mut log,
1185 log_schema().source_type_key(),
1186 path!("source_type"),
1187 Bytes::from_static(DockerLogsConfig::NAME.as_bytes()),
1188 );
1189
1190 match log_namespace {
1193 LogNamespace::Vector => {
1194 if let Some(timestamp) = timestamp {
1195 log.insert(
1196 metadata_path!(DockerLogsConfig::NAME, "timestamp"),
1197 timestamp,
1198 );
1199 }
1200
1201 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
1202 }
1203 LogNamespace::Legacy => {
1204 if let Some(timestamp) = timestamp
1205 && let Some(timestamp_key) = log_schema().timestamp_key()
1206 {
1207 log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
1208 }
1209 }
1210 };
1211
1212 let log = if auto_partial_merge {
1217 if is_partial {
1221 if let Some(partial_event_merge_state) = partial_event_merge_state {
1226 match log_namespace {
1229 LogNamespace::Vector => {
1230 partial_event_merge_state.merge_in_next_event(log, &["."]);
1231 }
1232 LogNamespace::Legacy => {
1233 partial_event_merge_state.merge_in_next_event(
1234 log,
1235 &[log_schema()
1236 .message_key()
1237 .expect("global log_schema.message_key to be valid path")
1238 .to_string()],
1239 );
1240 }
1241 }
1242 } else {
1243 *partial_event_merge_state = Some(LogEventMergeState::new(log));
1244 };
1245 return None;
1246 };
1247
1248 match partial_event_merge_state.take() {
1253 Some(partial_event_merge_state) => match log_namespace {
1256 LogNamespace::Vector => {
1257 partial_event_merge_state.merge_in_final_event(log, &["."])
1258 }
1259 LogNamespace::Legacy => partial_event_merge_state.merge_in_final_event(
1260 log,
1261 &[log_schema()
1262 .message_key()
1263 .expect("global log_schema.message_key to be valid path")
1264 .to_string()],
1265 ),
1266 },
1267 None => log,
1268 }
1269 } else {
1270 if is_partial {
1272 if let Some(partial_event_marker_field) = partial_event_marker_field {
1274 log_namespace.insert_source_metadata(
1275 DockerLogsConfig::NAME,
1276 &mut log,
1277 Some(LegacyKey::Overwrite(path!(
1278 partial_event_marker_field.as_str()
1279 ))),
1280 path!(event::PARTIAL),
1281 true,
1282 );
1283 }
1284 }
1285 log
1287 };
1288
1289 emit!(DockerLogsEventsReceived {
1292 byte_size: log.estimated_json_encoded_size_of(),
1293 container_id: self.id.as_str(),
1294 container_name: &self.metadata.name_str
1295 });
1296
1297 Some(log)
1298 }
1299}
1300
1301struct ContainerMetadata {
1302 labels: HashMap<String, String>,
1304 name: Value,
1306 name_str: String,
1308 image: Value,
1310 created_at: DateTime<Utc>,
1312}
1313
1314impl ContainerMetadata {
1315 fn from_details(details: ContainerInspectResponse) -> Result<Self, ParseError> {
1316 let config = details.config.unwrap();
1317 let name = details.name.unwrap();
1318 let created = details.created.unwrap();
1319
1320 let labels = config.labels.unwrap_or_default();
1321
1322 Ok(ContainerMetadata {
1323 labels,
1324 name: name.as_str().trim_start_matches('/').to_owned().into(),
1325 name_str: name,
1326 image: config.image.unwrap().into(),
1327 created_at: created.with_timezone(&Utc),
1328 })
1329 }
1330}
1331
1332fn line_agg_adapter(
1333 inner: impl Stream<Item = LogEvent> + Unpin,
1334 logic: line_agg::Logic<Bytes, LogEvent>,
1335 log_namespace: LogNamespace,
1336) -> impl Stream<Item = LogEvent> {
1337 let line_agg_in = inner.map(move |mut log| {
1338 let message_value = match log_namespace {
1339 LogNamespace::Vector => log
1340 .remove(event_path!())
1341 .expect("`.` must exist in the event"),
1342 LogNamespace::Legacy => log
1343 .remove(
1344 log_schema()
1345 .message_key_target_path()
1346 .expect("global log_schema.message_key to be valid path"),
1347 )
1348 .expect("`message` must exist in the event"),
1349 };
1350 let stream_value = match log_namespace {
1351 LogNamespace::Vector => log
1352 .get(metadata_path!(DockerLogsConfig::NAME, STREAM))
1353 .expect("`docker_logs.stream` must exist in the metadata"),
1354 LogNamespace::Legacy => log
1355 .get(event_path!(STREAM))
1356 .expect("stream must exist in the event"),
1357 };
1358
1359 let stream = stream_value.coerce_to_bytes();
1360 let message = message_value.coerce_to_bytes();
1361 (stream, message, log)
1362 });
1363 let line_agg_out = LineAgg::<_, Bytes, LogEvent>::new(line_agg_in, logic);
1364 line_agg_out.map(move |(_, message, mut log, _)| {
1365 match log_namespace {
1366 LogNamespace::Vector => log.insert(event_path!(), message),
1367 LogNamespace::Legacy => log.insert(
1368 log_schema()
1369 .message_key_target_path()
1370 .expect("global log_schema.message_key to be valid path"),
1371 message,
1372 ),
1373 };
1374 log
1375 })
1376}