Skip to main content

vector/sources/docker_logs/
mod.rs

1use std::{
2    collections::HashMap,
3    convert::TryFrom,
4    future::ready,
5    pin::Pin,
6    sync::{Arc, LazyLock},
7    time::Duration,
8};
9
10use bollard::{
11    Docker,
12    container::LogOutput,
13    errors::Error as DockerError,
14    query_parameters::{
15        EventsOptionsBuilder, InspectContainerOptions, ListContainersOptionsBuilder,
16        LogsOptionsBuilder,
17    },
18    service::{ContainerInspectResponse, EventMessage},
19};
20use bytes::{Buf, Bytes};
21use chrono::{DateTime, FixedOffset, Local, ParseError, Utc};
22use futures::{Stream, StreamExt};
23use serde_with::serde_as;
24use tokio::sync::mpsc;
25use vector_lib::{
26    codecs::{BytesDeserializer, BytesDeserializerConfig},
27    config::{LegacyKey, LogNamespace},
28    configurable::configurable_component,
29    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered},
30    lookup::{
31        OwnedValuePath, PathPrefix, lookup_v2::OptionalValuePath, metadata_path, owned_value_path,
32        path,
33    },
34};
35use vrl::{
36    event_path,
37    value::{Kind, kind::Collection},
38};
39
40use super::util::MultilineConfig;
41use crate::{
42    SourceSender,
43    common::backoff::ExponentialBackoff,
44    config::{DataType, SourceConfig, SourceContext, SourceOutput, log_schema},
45    docker::{DockerTlsConfig, docker},
46    event::{self, EstimatedJsonEncodedSizeOf, LogEvent, Value, merge_state::LogEventMergeState},
47    internal_events::{
48        DockerLogsCommunicationError, DockerLogsContainerEventReceived,
49        DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
50        DockerLogsContainerWatch, DockerLogsEventsReceived,
51        DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError,
52    },
53    line_agg::{self, LineAgg},
54    shutdown::ShutdownSignal,
55};
56
57#[cfg(test)]
58mod tests;
59
60const IMAGE: &str = "image";
61const CREATED_AT: &str = "container_created_at";
62const NAME: &str = "container_name";
63const STREAM: &str = "stream";
64const CONTAINER: &str = "container_id";
65// Prevent short hostname from being wrongly recognized as a container's short ID.
66const MIN_HOSTNAME_LENGTH: usize = 6;
67
68static STDERR: LazyLock<Bytes> = LazyLock::new(|| "stderr".into());
69static STDOUT: LazyLock<Bytes> = LazyLock::new(|| "stdout".into());
70static CONSOLE: LazyLock<Bytes> = LazyLock::new(|| "console".into());
71
72/// Configuration for the `docker_logs` source.
73#[serde_as]
74#[configurable_component(source("docker_logs", "Collect container logs from a Docker Daemon."))]
75#[derive(Clone, Debug)]
76#[serde(deny_unknown_fields, default)]
77pub struct DockerLogsConfig {
78    /// Overrides the name of the log field used to add the current hostname to each event.
79    ///
80    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
81    ///
82    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
83    host_key: Option<OptionalValuePath>,
84
85    /// Docker host to connect to.
86    ///
87    /// Use an HTTPS URL to enable TLS encryption.
88    ///
89    /// If absent, the `DOCKER_HOST` environment variable is used. If `DOCKER_HOST` is also absent,
90    /// the default Docker local socket (`/var/run/docker.sock` on Unix platforms,
91    /// `//./pipe/docker_engine` on Windows) is used.
92    #[configurable(metadata(docs::examples = "http://localhost:2375"))]
93    #[configurable(metadata(docs::examples = "https://localhost:2376"))]
94    #[configurable(metadata(docs::examples = "unix:///var/run/docker.sock"))]
95    #[configurable(metadata(docs::examples = "npipe:////./pipe/docker_engine"))]
96    #[configurable(metadata(docs::examples = "/var/run/docker.sock"))]
97    #[configurable(metadata(docs::examples = "//./pipe/docker_engine"))]
98    docker_host: Option<String>,
99
100    /// A list of container IDs or names of containers to exclude from log collection.
101    ///
102    /// Matching is prefix first, so specifying a value of `foo` would match any container named `foo` as well as any
103    /// container whose name started with `foo`. This applies equally whether matching container IDs or names.
104    ///
105    /// By default, the source collects logs for all containers. If `exclude_containers` is configured, any
106    /// container that matches a configured exclusion is excluded even if it is also included with
107    /// `include_containers`, so care should be taken when using prefix matches as they cannot be overridden by a
108    /// corresponding entry in `include_containers`, for example, excluding `foo` by attempting to include `foo-specific-id`.
109    ///
110    /// This can be used in conjunction with `include_containers`.
111    #[configurable(metadata(
112        docs::examples = "exclude_",
113        docs::examples = "exclude_me_0",
114        docs::examples = "ad08cc418cf9"
115    ))]
116    exclude_containers: Option<Vec<String>>, // Starts with actually, not exclude
117
118    /// A list of container IDs or names of containers to include in log collection.
119    ///
120    /// Matching is prefix first, so specifying a value of `foo` would match any container named `foo` as well as any
121    /// container whose name started with `foo`. This applies equally whether matching container IDs or names.
122    ///
123    /// By default, the source collects logs for all containers. If `include_containers` is configured, only
124    /// containers that match a configured inclusion and are also not excluded get matched.
125    ///
126    /// This can be used in conjunction with `exclude_containers`.
127    #[configurable(metadata(
128        docs::examples = "include_",
129        docs::examples = "include_me_0",
130        docs::examples = "ad08cc418cf9"
131    ))]
132    include_containers: Option<Vec<String>>, // Starts with actually, not include
133
134    /// A list of container object labels to match against when filtering running containers.
135    ///
136    /// Labels should follow the syntax described in the [Docker object labels](https://docs.docker.com/config/labels-custom-metadata/) documentation.
137    #[configurable(metadata(
138        docs::examples = "org.opencontainers.image.vendor=Vector",
139        docs::examples = "com.mycorp.internal.animal=fish",
140    ))]
141    include_labels: Option<Vec<String>>,
142
143    /// A list of image names to match against.
144    ///
145    /// If not provided, all images are included.
146    #[configurable(metadata(docs::examples = "httpd", docs::examples = "redis",))]
147    include_images: Option<Vec<String>>,
148
149    /// Overrides the name of the log field used to mark an event as partial.
150    ///
151    /// If `auto_partial_merge` is disabled, partial events are emitted with a log field, set by this
152    /// configuration value, indicating that the event is not complete.
153    #[serde(default = "default_partial_event_marker_field")]
154    partial_event_marker_field: Option<String>,
155
156    /// Enables automatic merging of partial events.
157    auto_partial_merge: bool,
158
159    /// The amount of time to wait before retrying after an error.
160    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
161    #[serde(default = "default_retry_backoff_secs")]
162    #[configurable(metadata(docs::human_name = "Retry Backoff"))]
163    retry_backoff_secs: Duration,
164
165    /// Multiline aggregation configuration.
166    ///
167    /// If not specified, multiline aggregation is disabled.
168    #[configurable(derived)]
169    multiline: Option<MultilineConfig>,
170
171    #[configurable(derived)]
172    tls: Option<DockerTlsConfig>,
173
174    /// The namespace to use for logs. This overrides the global setting.
175    #[serde(default)]
176    #[configurable(metadata(docs::hidden))]
177    pub log_namespace: Option<bool>,
178}
179
180impl Default for DockerLogsConfig {
181    fn default() -> Self {
182        Self {
183            host_key: None,
184            docker_host: None,
185            tls: None,
186            exclude_containers: None,
187            include_containers: None,
188            include_labels: None,
189            include_images: None,
190            partial_event_marker_field: default_partial_event_marker_field(),
191            auto_partial_merge: true,
192            multiline: None,
193            retry_backoff_secs: default_retry_backoff_secs(),
194            log_namespace: None,
195        }
196    }
197}
198
199fn default_partial_event_marker_field() -> Option<String> {
200    Some(event::PARTIAL.to_string())
201}
202
203const fn default_retry_backoff_secs() -> Duration {
204    Duration::from_secs(2)
205}
206
207impl DockerLogsConfig {
208    fn container_name_or_id_included<'a>(
209        &self,
210        id: &str,
211        names: impl IntoIterator<Item = &'a str>,
212    ) -> bool {
213        let containers: Vec<String> = names.into_iter().map(Into::into).collect();
214
215        self.include_containers
216            .as_ref()
217            .map(|include_list| Self::name_or_id_matches(id, &containers, include_list))
218            .unwrap_or(true)
219            && !(self
220                .exclude_containers
221                .as_ref()
222                .map(|exclude_list| Self::name_or_id_matches(id, &containers, exclude_list))
223                .unwrap_or(false))
224    }
225
226    fn name_or_id_matches(id: &str, names: &[String], items: &[String]) -> bool {
227        items.iter().any(|flag| id.starts_with(flag))
228            || names
229                .iter()
230                .any(|name| items.iter().any(|item| name.starts_with(item)))
231    }
232
233    fn with_empty_partial_event_marker_field_as_none(mut self) -> Self {
234        if let Some(val) = &self.partial_event_marker_field
235            && val.is_empty()
236        {
237            self.partial_event_marker_field = None;
238        }
239        self
240    }
241}
242
243impl_generate_config_from_default!(DockerLogsConfig);
244
245#[async_trait::async_trait]
246#[typetag::serde(name = "docker_logs")]
247impl SourceConfig for DockerLogsConfig {
248    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
249        let log_namespace = cx.log_namespace(self.log_namespace);
250        let source = DockerLogsSource::new(
251            self.clone().with_empty_partial_event_marker_field_as_none(),
252            cx.out,
253            cx.shutdown.clone(),
254            log_namespace,
255        )?;
256
257        // Capture currently running containers, and do main future(run)
258        let fut = async move {
259            match source.handle_running_containers().await {
260                Ok(source) => source.run().await,
261                Err(error) => {
262                    error!(
263                        message = "Listing currently running containers failed.",
264                        ?error
265                    );
266                }
267            }
268        };
269
270        let shutdown = cx.shutdown;
271        // Once this ShutdownSignal resolves it will drop DockerLogsSource and by extension it's ShutdownSignal.
272        Ok(Box::pin(async move {
273            Ok(tokio::select! {
274                _ = fut => {}
275                _ = shutdown => {}
276            })
277        }))
278    }
279
280    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
281        let host_key = self
282            .host_key
283            .clone()
284            .unwrap_or(log_schema().host_key().cloned().into())
285            .path
286            .map(LegacyKey::Overwrite);
287
288        let schema_definition = BytesDeserializerConfig
289            .schema_definition(global_log_namespace.merge(self.log_namespace))
290            .with_source_metadata(
291                Self::NAME,
292                host_key,
293                &owned_value_path!("host"),
294                Kind::bytes().or_undefined(),
295                Some("host"),
296            )
297            .with_source_metadata(
298                Self::NAME,
299                Some(LegacyKey::Overwrite(owned_value_path!(CONTAINER))),
300                &owned_value_path!(CONTAINER),
301                Kind::bytes(),
302                None,
303            )
304            .with_source_metadata(
305                Self::NAME,
306                Some(LegacyKey::Overwrite(owned_value_path!(IMAGE))),
307                &owned_value_path!(IMAGE),
308                Kind::bytes(),
309                None,
310            )
311            .with_source_metadata(
312                Self::NAME,
313                Some(LegacyKey::Overwrite(owned_value_path!(NAME))),
314                &owned_value_path!(NAME),
315                Kind::bytes(),
316                None,
317            )
318            .with_source_metadata(
319                Self::NAME,
320                Some(LegacyKey::Overwrite(owned_value_path!(CREATED_AT))),
321                &owned_value_path!(CREATED_AT),
322                Kind::timestamp(),
323                None,
324            )
325            .with_source_metadata(
326                Self::NAME,
327                Some(LegacyKey::Overwrite(owned_value_path!("label"))),
328                &owned_value_path!("labels"),
329                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
330                None,
331            )
332            .with_source_metadata(
333                Self::NAME,
334                Some(LegacyKey::Overwrite(owned_value_path!(STREAM))),
335                &owned_value_path!(STREAM),
336                Kind::bytes(),
337                None,
338            )
339            .with_source_metadata(
340                Self::NAME,
341                log_schema()
342                    .timestamp_key()
343                    .cloned()
344                    .map(LegacyKey::Overwrite),
345                &owned_value_path!("timestamp"),
346                Kind::timestamp(),
347                Some("timestamp"),
348            )
349            .with_vector_metadata(
350                log_schema().source_type_key(),
351                &owned_value_path!("source_type"),
352                Kind::bytes(),
353                None,
354            )
355            .with_vector_metadata(
356                None,
357                &owned_value_path!("ingest_timestamp"),
358                Kind::timestamp(),
359                None,
360            );
361
362        vec![SourceOutput::new_maybe_logs(
363            DataType::Log,
364            schema_definition,
365        )]
366    }
367
368    fn can_acknowledge(&self) -> bool {
369        false
370    }
371}
372
373struct DockerLogsSourceCore {
374    config: DockerLogsConfig,
375    line_agg_config: Option<line_agg::Config>,
376    docker: Docker,
377    /// Only logs created at, or after this moment are logged.
378    now_timestamp: DateTime<Utc>,
379}
380
381impl DockerLogsSourceCore {
382    fn new(config: DockerLogsConfig) -> crate::Result<Self> {
383        // ?NOTE: Constructs a new Docker instance for a docker host listening at url specified by an env var DOCKER_HOST.
384        // ?      Otherwise connects to unix socket which requires sudo privileges, or docker group membership.
385        let docker = docker(config.docker_host.clone(), config.tls.clone())?;
386
387        // Only log events created at-or-after this moment are logged.
388        let now = Local::now();
389        info!(
390            message = "Capturing logs from now on.",
391            now = %now.to_rfc3339()
392        );
393
394        let line_agg_config = if let Some(ref multiline_config) = config.multiline {
395            Some(line_agg::Config::try_from(multiline_config)?)
396        } else {
397            None
398        };
399
400        Ok(DockerLogsSourceCore {
401            config,
402            line_agg_config,
403            docker,
404            now_timestamp: now.into(),
405        })
406    }
407
408    /// Returns event stream coming from docker.
409    fn docker_logs_event_stream(
410        &self,
411    ) -> impl Stream<Item = Result<EventMessage, DockerError>> + Send + use<> {
412        let mut filters = HashMap::new();
413
414        // event  | emitted on commands
415        // -------+-------------------
416        // start  | docker start, docker run, restart policy, docker restart
417        // unpause | docker unpause
418        // die    | docker restart, docker stop, docker kill, process exited, oom
419        // pause  | docker pause
420        filters.insert(
421            "event".to_owned(),
422            vec![
423                "start".to_owned(),
424                "unpause".to_owned(),
425                "die".to_owned(),
426                "pause".to_owned(),
427            ],
428        );
429        filters.insert("type".to_owned(), vec!["container".to_owned()]);
430
431        // Apply include filters.
432        if let Some(include_labels) = &self.config.include_labels {
433            filters.insert("label".to_owned(), include_labels.clone());
434        }
435
436        if let Some(include_images) = &self.config.include_images {
437            filters.insert("image".to_owned(), include_images.clone());
438        }
439
440        self.docker.events(Some(
441            EventsOptionsBuilder::new()
442                .since(&self.now_timestamp.timestamp().to_string())
443                .filters(&filters)
444                .build(),
445        ))
446    }
447}
448
449/// Main future which listens for events coming from docker, and maintains
450/// a fan of event_stream futures.
451/// Where each event_stream corresponds to a running container marked with ContainerLogInfo.
452/// While running, event_stream streams Events to out channel.
453/// Once a log stream has ended, it sends ContainerLogInfo back to main.
454///
455/// Future  channel     Future      channel
456///           |<---- event_stream ---->out
457/// main <----|<---- event_stream ---->out
458///           | ...                 ...out
459///
460struct DockerLogsSource {
461    esb: EventStreamBuilder,
462    /// event stream from docker
463    events: Pin<Box<dyn Stream<Item = Result<EventMessage, DockerError>> + Send>>,
464    ///  mappings of seen container_id to their data
465    containers: HashMap<ContainerId, ContainerState>,
466    ///receives ContainerLogInfo coming from event stream futures
467    main_recv: mpsc::UnboundedReceiver<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
468    /// It may contain shortened container id.
469    hostname: Option<String>,
470    backoff_duration: Duration,
471    /// Backoff strategy for events stream retries
472    events_backoff: ExponentialBackoff,
473}
474
475impl DockerLogsSource {
476    fn new(
477        config: DockerLogsConfig,
478        out: SourceSender,
479        shutdown: ShutdownSignal,
480        log_namespace: LogNamespace,
481    ) -> crate::Result<DockerLogsSource> {
482        let backoff_secs = config.retry_backoff_secs;
483
484        let host_key = config
485            .host_key
486            .clone()
487            .unwrap_or(log_schema().host_key().cloned().into());
488        let hostname = crate::get_hostname().ok();
489
490        // Only logs created at, or after this moment are logged.
491        let core = DockerLogsSourceCore::new(config)?;
492
493        // main event stream, with whom only newly started/restarted containers will be logged.
494        let events = core.docker_logs_event_stream();
495        info!(message = "Listening to docker log events.");
496
497        // Channel of communication between main future and event_stream futures
498        let (main_send, main_recv) =
499            mpsc::unbounded_channel::<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>();
500
501        // Starting with logs from now.
502        // TODO: Is this exception acceptable?
503        // Only somewhat exception to this is case where:
504        // t0 -- outside: container running
505        // t1 -- now_timestamp
506        // t2 -- outside: container stopped
507        // t3 -- list_containers
508        // In that case, logs between [t1,t2] will be pulled to vector only on next start/unpause of that container.
509        let esb = EventStreamBuilder {
510            host_key,
511            hostname: hostname.clone(),
512            core: Arc::new(core),
513            out,
514            main_send,
515            shutdown,
516            log_namespace,
517        };
518
519        Ok(DockerLogsSource {
520            esb,
521            events: Box::pin(events),
522            containers: HashMap::new(),
523            main_recv,
524            hostname,
525            backoff_duration: backoff_secs,
526            events_backoff: ExponentialBackoff::default(),
527        })
528    }
529
530    /// Future that captures currently running containers, and starts event streams for them.
531    async fn handle_running_containers(mut self) -> crate::Result<Self> {
532        let mut filters = HashMap::new();
533
534        // Apply include filters
535        if let Some(include_labels) = &self.esb.core.config.include_labels {
536            filters.insert("label".to_owned(), include_labels.clone());
537        }
538
539        if let Some(include_images) = &self.esb.core.config.include_images {
540            filters.insert("ancestor".to_owned(), include_images.clone());
541        }
542
543        self.esb
544            .core
545            .docker
546            .list_containers(Some(
547                ListContainersOptionsBuilder::new()
548                    .all(false)
549                    .filters(&filters)
550                    .build(),
551            ))
552            .await?
553            .into_iter()
554            .for_each(|container| {
555                let id = container.id.unwrap();
556                let names = container.names.unwrap();
557
558                trace!(message = "Found already running container.", id = %id, names = ?names);
559
560                if self.exclude_self(id.as_str()) {
561                    info!(message = "Excluded self container.", id = %id);
562                    return;
563                }
564
565                if !self.esb.core.config.container_name_or_id_included(
566                    id.as_str(),
567                    names.iter().map(|s| {
568                        // In this case bollard / shiplift gives names with starting '/' so it needs to be removed.
569                        let s = s.as_str();
570                        if s.starts_with('/') {
571                            s.split_at('/'.len_utf8()).1
572                        } else {
573                            s
574                        }
575                    }),
576                ) {
577                    info!(message = "Excluded container.", id = %id);
578                    return;
579                }
580
581                let id = ContainerId::new(id);
582                self.containers.insert(id.clone(), self.esb.start(id, None));
583            });
584
585        Ok(self)
586    }
587
588    async fn run(mut self) {
589        loop {
590            tokio::select! {
591                value = self.main_recv.recv() => {
592                    match value {
593                        Some(Ok(info)) => {
594                            let state = self
595                                .containers
596                                .get_mut(&info.id)
597                                .expect("Every ContainerLogInfo has it's ContainerState");
598                            if state.return_info(info) {
599                                self.esb.restart(state);
600                            }
601                        },
602                        Some(Err((id,persistence))) => {
603                            let state = self
604                                .containers
605                                .remove(&id)
606                                .expect("Every started ContainerId has it's ContainerState");
607                            match persistence{
608                                ErrorPersistence::Transient => if state.is_running() {
609                                    let backoff= Some(self.backoff_duration);
610                                    self.containers.insert(id.clone(), self.esb.start(id, backoff));
611                                }
612                                // Forget the container since the error is permanent.
613                                ErrorPersistence::Permanent => (),
614                            }
615                        }
616                        None => {
617                            error!(message = "The docker_logs source main stream has ended unexpectedly.", internal_log_rate_limit = false);
618                            info!(message = "Shutting down docker_logs source.");
619                            return;
620                        }
621                    };
622                }
623                value = self.events.next() => {
624                    match value {
625                        Some(Ok(mut event)) => {
626                            // Reset backoff on successful event
627                            self.events_backoff.reset();
628
629                            let action = event.action.unwrap();
630                            let actor = event.actor.take().unwrap();
631                            let id = actor.id.unwrap();
632                            let attributes = actor.attributes.unwrap();
633
634                            emit!(DockerLogsContainerEventReceived { container_id: &id, action: &action });
635
636                            let id = ContainerId::new(id.to_owned());
637
638                            // Update container status
639                            match action.as_str() {
640                                "die" | "pause" => {
641                                    if let Some(state) = self.containers.get_mut(&id) {
642                                        state.stopped();
643                                    }
644                                }
645                                "start" | "unpause" => {
646                                    if let Some(state) = self.containers.get_mut(&id) {
647                                        state.running();
648                                        self.esb.restart(state);
649                                    } else {
650                                        let include_name =
651                                            self.esb.core.config.container_name_or_id_included(
652                                                id.as_str(),
653                                                attributes.get("name").map(|s| s.as_str()),
654                                            );
655
656                                        let exclude_self = self.exclude_self(id.as_str());
657
658                                        if include_name && !exclude_self {
659                                            self.containers.insert(id.clone(), self.esb.start(id, None));
660                                        }
661                                    }
662                                }
663                                _ => {},
664                            };
665                        }
666                        Some(Err(error)) => {
667                            emit!(DockerLogsCommunicationError {
668                                error,
669                                container_id: None,
670                            });
671                            // Retry events stream with exponential backoff
672                            if !self.retry_events_stream_with_backoff("Docker events stream failed").await {
673                                error!("Docker events stream failed and retry exhausted, shutting down.");
674                                return;
675                            }
676                        },
677                        None => {
678                            // Retry events stream with exponential backoff
679                            if !self.retry_events_stream_with_backoff("Docker events stream ended").await {
680                                error!("Docker events stream ended and retry exhausted, shutting down.");
681                                return;
682                            }
683                        }
684                    };
685                }
686            };
687        }
688    }
689
690    /// Retry events stream with exponential backoff
691    /// Returns true if retry was attempted, false if exhausted or shutdown
692    async fn retry_events_stream_with_backoff(&mut self, reason: &str) -> bool {
693        if let Some(delay) = self.events_backoff.next() {
694            warn!(
695                message = reason,
696                action = "retrying with backoff",
697                delay_ms = delay.as_millis()
698            );
699            tokio::select! {
700                _ = tokio::time::sleep(delay) => {
701                    self.events = Box::pin(self.esb.core.docker_logs_event_stream());
702                    true
703                }
704                _ = self.esb.shutdown.clone() => {
705                    info!("Shutdown signal received during retry backoff.");
706                    false
707                }
708            }
709        } else {
710            error!(message = "Events stream retry exhausted.", reason = reason);
711            false
712        }
713    }
714
715    fn exclude_self(&self, id: &str) -> bool {
716        self.hostname
717            .as_ref()
718            .map(|hostname| id.starts_with(hostname) && hostname.len() >= MIN_HOSTNAME_LENGTH)
719            .unwrap_or(false)
720    }
721}
722
723/// Used to construct and start event stream futures
724#[derive(Clone)]
725struct EventStreamBuilder {
726    host_key: OptionalValuePath,
727    hostname: Option<String>,
728    core: Arc<DockerLogsSourceCore>,
729    /// Event stream futures send events through this
730    out: SourceSender,
731    /// End through which event stream futures send ContainerLogInfo to main future
732    main_send: mpsc::UnboundedSender<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
733    /// Self and event streams will end on this.
734    shutdown: ShutdownSignal,
735    log_namespace: LogNamespace,
736}
737
738impl EventStreamBuilder {
739    /// Spawn a task to runs event stream until shutdown.
740    fn start(&self, id: ContainerId, backoff: Option<Duration>) -> ContainerState {
741        let this = self.clone();
742        crate::spawn_in_current_span(async move {
743            if let Some(duration) = backoff {
744                tokio::time::sleep(duration).await;
745            }
746
747            match this
748                .core
749                .docker
750                .inspect_container(id.as_str(), None::<InspectContainerOptions>)
751                .await
752            {
753                Ok(details) => match ContainerMetadata::from_details(details) {
754                    Ok(metadata) => {
755                        let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp);
756                        this.run_event_stream(info).await;
757                        return;
758                    }
759                    Err(error) => emit!(DockerLogsTimestampParseError {
760                        error,
761                        container_id: id.as_str()
762                    }),
763                },
764                Err(error) => emit!(DockerLogsContainerMetadataFetchError {
765                    error,
766                    container_id: id.as_str()
767                }),
768            }
769
770            this.finish(Err((id, ErrorPersistence::Transient)));
771        });
772
773        ContainerState::new_running()
774    }
775
776    /// If info is present, restarts event stream which will run until shutdown.
777    fn restart(&self, container: &mut ContainerState) {
778        if let Some(info) = container.take_info() {
779            let this = self.clone();
780            crate::spawn_in_current_span(this.run_event_stream(info));
781        }
782    }
783
784    async fn run_event_stream(mut self, mut info: ContainerLogInfo) {
785        // Establish connection
786        let options = Some(
787            LogsOptionsBuilder::new()
788                .follow(true)
789                .stdout(true)
790                .stderr(true)
791                .since(info.log_since() as i32) // 2038 bug (I think)
792                .timestamps(true)
793                .build(),
794        );
795
796        let stream = self.core.docker.logs(info.id.as_str(), options);
797        emit!(DockerLogsContainerWatch {
798            container_id: info.id.as_str()
799        });
800
801        // Create event streamer
802        let mut partial_event_merge_state = None;
803
804        let core = Arc::clone(&self.core);
805
806        let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
807
808        let mut error = None;
809        let events_stream = stream
810            .map(|value| {
811                match value {
812                    Ok(message) => Ok(info.new_event(
813                        message,
814                        core.config.partial_event_marker_field.clone(),
815                        core.config.auto_partial_merge,
816                        &mut partial_event_merge_state,
817                        &bytes_received,
818                        self.log_namespace,
819                    )),
820                    Err(error) => {
821                        // On any error, restart connection
822                        match &error {
823                            DockerError::DockerResponseServerError { status_code, .. }
824                                if *status_code == http::StatusCode::NOT_IMPLEMENTED =>
825                            {
826                                emit!(DockerLogsLoggingDriverUnsupportedError {
827                                    error,
828                                    container_id: info.id.as_str(),
829                                });
830                                Err(ErrorPersistence::Permanent)
831                            }
832                            _ => {
833                                emit!(DockerLogsCommunicationError {
834                                    error,
835                                    container_id: Some(info.id.as_str())
836                                });
837                                Err(ErrorPersistence::Transient)
838                            }
839                        }
840                    }
841                }
842            })
843            .take_while(|v| {
844                error = v.as_ref().err().cloned();
845                ready(v.is_ok())
846            })
847            .filter_map(|v| ready(v.ok().flatten()))
848            .take_until(self.shutdown.clone());
849
850        let events_stream: Box<dyn Stream<Item = LogEvent> + Unpin + Send> =
851            if let Some(ref line_agg_config) = core.line_agg_config {
852                Box::new(line_agg_adapter(
853                    events_stream,
854                    line_agg::Logic::new(line_agg_config.clone()),
855                    self.log_namespace,
856                ))
857            } else {
858                Box::new(events_stream)
859            };
860
861        let host_key = self.host_key.clone().path;
862        let hostname = self.hostname.clone();
863        let result = {
864            let mut stream = events_stream
865                .map(move |event| add_hostname(event, &host_key, &hostname, self.log_namespace));
866            self.out.send_event_stream(&mut stream).await.map_err(|_| {
867                let (count, _) = stream.size_hint();
868                emit!(StreamClosedError { count });
869            })
870        };
871
872        // End of stream
873        emit!(DockerLogsContainerUnwatch {
874            container_id: info.id.as_str()
875        });
876
877        let result = match (result, error) {
878            (Ok(()), None) => Ok(info),
879            (Err(()), _) => Err((info.id, ErrorPersistence::Permanent)),
880            (_, Some(occurrence)) => Err((info.id, occurrence)),
881        };
882
883        self.finish(result);
884    }
885
886    fn finish(self, result: Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>) {
887        // This can legally fail when shutting down, and any other
888        // reason should have been logged in the main future.
889        _ = self.main_send.send(result);
890    }
891}
892
893fn add_hostname(
894    mut log: LogEvent,
895    host_key: &Option<OwnedValuePath>,
896    hostname: &Option<String>,
897    log_namespace: LogNamespace,
898) -> LogEvent {
899    if let Some(hostname) = hostname {
900        let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
901
902        log_namespace.insert_source_metadata(
903            DockerLogsConfig::NAME,
904            &mut log,
905            legacy_host_key,
906            path!("host"),
907            hostname.clone(),
908        );
909    }
910
911    log
912}
913
914#[derive(Copy, Clone, Debug, Eq, PartialEq)]
915enum ErrorPersistence {
916    Transient,
917    Permanent,
918}
919
920/// Container ID as assigned by Docker.
921/// Is actually a string.
922#[derive(Hash, Clone, Eq, PartialEq, Ord, PartialOrd)]
923struct ContainerId(Bytes);
924
925impl ContainerId {
926    fn new(id: String) -> Self {
927        ContainerId(id.into())
928    }
929
930    fn as_str(&self) -> &str {
931        std::str::from_utf8(&self.0).expect("Container Id Bytes aren't String")
932    }
933}
934
935/// Kept by main to keep track of container state
936struct ContainerState {
937    /// None if there is a event_stream of this container.
938    info: Option<ContainerLogInfo>,
939    /// True if Container is currently running
940    running: bool,
941    /// Of running
942    generation: u64,
943}
944
945impl ContainerState {
946    /// It's ContainerLogInfo pair must be created exactly once.
947    const fn new_running() -> Self {
948        ContainerState {
949            info: None,
950            running: true,
951            generation: 0,
952        }
953    }
954
955    const fn running(&mut self) {
956        self.running = true;
957        self.generation += 1;
958    }
959
960    const fn stopped(&mut self) {
961        self.running = false;
962    }
963
964    const fn is_running(&self) -> bool {
965        self.running
966    }
967
968    /// True if it needs to be restarted.
969    #[must_use]
970    fn return_info(&mut self, info: ContainerLogInfo) -> bool {
971        debug_assert!(self.info.is_none());
972        // Generation is the only one strictly necessary,
973        // but with v.running, restarting event_stream is automatically done.
974        let restart = self.running || info.generation < self.generation;
975        self.info = Some(info);
976        restart
977    }
978
979    fn take_info(&mut self) -> Option<ContainerLogInfo> {
980        self.info.take().map(|mut info| {
981            // Update info
982            info.generation = self.generation;
983            info
984        })
985    }
986}
987
988/// Exchanged between main future and event_stream futures
989struct ContainerLogInfo {
990    /// Container docker ID
991    id: ContainerId,
992    /// Timestamp of event which created this struct
993    created: DateTime<Utc>,
994    /// Timestamp of last log message with it's generation
995    last_log: Option<(DateTime<FixedOffset>, u64)>,
996    /// generation of ContainerState at event_stream creation
997    generation: u64,
998    metadata: ContainerMetadata,
999}
1000
1001impl ContainerLogInfo {
1002    /// Container docker ID
1003    /// Unix timestamp of event which created this struct
1004    const fn new(id: ContainerId, metadata: ContainerMetadata, created: DateTime<Utc>) -> Self {
1005        ContainerLogInfo {
1006            id,
1007            created,
1008            last_log: None,
1009            generation: 0,
1010            metadata,
1011        }
1012    }
1013
1014    /// Only logs after or equal to this point need to be fetched
1015    fn log_since(&self) -> i64 {
1016        self.last_log
1017            .as_ref()
1018            .map(|(d, _)| d.timestamp())
1019            .unwrap_or_else(|| self.created.timestamp())
1020            - 1
1021    }
1022
1023    /// Expects timestamp at the beginning of message.
1024    /// Expects messages to be ordered by timestamps.
1025    fn new_event(
1026        &mut self,
1027        log_output: LogOutput,
1028        partial_event_marker_field: Option<String>,
1029        auto_partial_merge: bool,
1030        partial_event_merge_state: &mut Option<LogEventMergeState>,
1031        bytes_received: &Registered<BytesReceived>,
1032        log_namespace: LogNamespace,
1033    ) -> Option<LogEvent> {
1034        let (stream, mut bytes_message) = match log_output {
1035            LogOutput::StdErr { message } => (STDERR.clone(), message),
1036            LogOutput::StdOut { message } => (STDOUT.clone(), message),
1037            LogOutput::Console { message } => (CONSOLE.clone(), message),
1038            LogOutput::StdIn { message: _ } => return None,
1039        };
1040
1041        bytes_received.emit(ByteSize(bytes_message.len()));
1042
1043        let message = String::from_utf8_lossy(&bytes_message);
1044        let mut splitter = message.splitn(2, char::is_whitespace);
1045        let timestamp_str = splitter.next()?;
1046        let timestamp = match DateTime::parse_from_rfc3339(timestamp_str) {
1047            Ok(timestamp) => {
1048                // Timestamp check. This is included to avoid processing the same log multiple times, which can
1049                // occur when a container changes generations, and to avoid processing logs with timestamps before
1050                // the created timestamp.
1051                match self.last_log.as_ref() {
1052                    Some(&(last, generation)) => {
1053                        if last < timestamp || (last == timestamp && generation == self.generation)
1054                        {
1055                            // Noop - log received in order.
1056                        } else {
1057                            // Docker returns logs in order.
1058                            // If we reach this state, this log is from a previous generation of the container.
1059                            // It was already processed, so we can safely skip it.
1060                            trace!(
1061                                message = "Received log from previous container generation.",
1062                                log_timestamp = %timestamp_str,
1063                                last_log_timestamp = %last,
1064                            );
1065                            return None;
1066                        }
1067                    }
1068                    None => {
1069                        if self.created < timestamp.with_timezone(&Utc) {
1070                            // Noop - first log to process.
1071                        } else {
1072                            // Received a log with a timestamp before that provided to the Docker API.
1073                            // This should not happen, but if it does, we can just ignore these logs.
1074                            trace!(
1075                                message = "Received log from before created timestamp.",
1076                                log_timestamp = %timestamp_str,
1077                                created_timestamp = %self.created
1078                            );
1079                            return None;
1080                        }
1081                    }
1082                }
1083
1084                self.last_log = Some((timestamp, self.generation));
1085
1086                let log_len = splitter.next().map(|log| log.len()).unwrap_or(0);
1087                let remove_len = message.len() - log_len;
1088                bytes_message.advance(remove_len);
1089
1090                // Provide the timestamp.
1091                Some(timestamp.with_timezone(&Utc))
1092            }
1093            Err(error) => {
1094                // Received bad timestamp, if any at all.
1095                emit!(DockerLogsTimestampParseError {
1096                    error,
1097                    container_id: self.id.as_str()
1098                });
1099                // So continue normally but without a timestamp.
1100                None
1101            }
1102        };
1103
1104        // Message is actually one line from stderr or stdout, and they are
1105        // delimited with newline, so that newline needs to be removed.
1106        // If there's no newline, the event is considered partial, and will
1107        // either be merged within the docker source, or marked accordingly
1108        // before sending out, depending on the configuration.
1109        let is_partial = if bytes_message
1110            .last()
1111            .map(|&b| b as char == '\n')
1112            .unwrap_or(false)
1113        {
1114            bytes_message.truncate(bytes_message.len() - 1);
1115            if bytes_message
1116                .last()
1117                .map(|&b| b as char == '\r')
1118                .unwrap_or(false)
1119            {
1120                bytes_message.truncate(bytes_message.len() - 1);
1121            }
1122            false
1123        } else {
1124            true
1125        };
1126
1127        // Build the log.
1128        let deserializer = BytesDeserializer;
1129        let mut log = deserializer.parse_single(bytes_message, log_namespace);
1130
1131        // Container ID
1132        log_namespace.insert_source_metadata(
1133            DockerLogsConfig::NAME,
1134            &mut log,
1135            Some(LegacyKey::Overwrite(path!(CONTAINER))),
1136            path!(CONTAINER),
1137            self.id.0.clone(),
1138        );
1139        // Container image
1140        log_namespace.insert_source_metadata(
1141            DockerLogsConfig::NAME,
1142            &mut log,
1143            Some(LegacyKey::Overwrite(path!(IMAGE))),
1144            path!(IMAGE),
1145            self.metadata.image.clone(),
1146        );
1147        // Container name
1148        log_namespace.insert_source_metadata(
1149            DockerLogsConfig::NAME,
1150            &mut log,
1151            Some(LegacyKey::Overwrite(path!(NAME))),
1152            path!(NAME),
1153            self.metadata.name.clone(),
1154        );
1155        // Created at timestamp
1156        log_namespace.insert_source_metadata(
1157            DockerLogsConfig::NAME,
1158            &mut log,
1159            Some(LegacyKey::Overwrite(path!(CREATED_AT))),
1160            path!(CREATED_AT),
1161            self.metadata.created_at,
1162        );
1163        // Labels
1164        if !self.metadata.labels.is_empty() {
1165            for (key, value) in self.metadata.labels.iter() {
1166                log_namespace.insert_source_metadata(
1167                    DockerLogsConfig::NAME,
1168                    &mut log,
1169                    Some(LegacyKey::Overwrite(path!("label", key))),
1170                    path!("labels", key),
1171                    value.clone(),
1172                )
1173            }
1174        }
1175        log_namespace.insert_source_metadata(
1176            DockerLogsConfig::NAME,
1177            &mut log,
1178            Some(LegacyKey::Overwrite(path!(STREAM))),
1179            path!(STREAM),
1180            stream,
1181        );
1182
1183        log_namespace.insert_vector_metadata(
1184            &mut log,
1185            log_schema().source_type_key(),
1186            path!("source_type"),
1187            Bytes::from_static(DockerLogsConfig::NAME.as_bytes()),
1188        );
1189
1190        // This handles the transition from the original timestamp logic. Originally the
1191        // `timestamp_key` was only populated when a timestamp was parsed from the event.
1192        match log_namespace {
1193            LogNamespace::Vector => {
1194                if let Some(timestamp) = timestamp {
1195                    log.insert(
1196                        metadata_path!(DockerLogsConfig::NAME, "timestamp"),
1197                        timestamp,
1198                    );
1199                }
1200
1201                log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
1202            }
1203            LogNamespace::Legacy => {
1204                if let Some(timestamp) = timestamp
1205                    && let Some(timestamp_key) = log_schema().timestamp_key()
1206                {
1207                    log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
1208                }
1209            }
1210        };
1211
1212        // If automatic partial event merging is requested - perform the
1213        // merging.
1214        // Otherwise mark partial events and return all the events with no
1215        // merging.
1216        let log = if auto_partial_merge {
1217            // Partial event events merging logic.
1218
1219            // If event is partial, stash it and return `None`.
1220            if is_partial {
1221                // If we already have a partial event merge state, the current
1222                // message has to be merged into that existing state.
1223                // Otherwise, create a new partial event merge state with the
1224                // current message being the initial one.
1225                if let Some(partial_event_merge_state) = partial_event_merge_state {
1226                    // Depending on the log namespace the actual contents of the log "message" will be
1227                    // found in either the root of the event ("."), or at the globally configured "message_key".
1228                    match log_namespace {
1229                        LogNamespace::Vector => {
1230                            partial_event_merge_state.merge_in_next_event(log, &["."]);
1231                        }
1232                        LogNamespace::Legacy => {
1233                            partial_event_merge_state.merge_in_next_event(
1234                                log,
1235                                &[log_schema()
1236                                    .message_key()
1237                                    .expect("global log_schema.message_key to be valid path")
1238                                    .to_string()],
1239                            );
1240                        }
1241                    }
1242                } else {
1243                    *partial_event_merge_state = Some(LogEventMergeState::new(log));
1244                };
1245                return None;
1246            };
1247
1248            // This is not a partial event. If we have a partial event merge
1249            // state from before, the current event must be a final event, that
1250            // would give us a merged event we can return.
1251            // Otherwise it's just a regular event that we return as-is.
1252            match partial_event_merge_state.take() {
1253                // Depending on the log namespace the actual contents of the log "message" will be
1254                // found in either the root of the event ("."), or at the globally configured "message_key".
1255                Some(partial_event_merge_state) => match log_namespace {
1256                    LogNamespace::Vector => {
1257                        partial_event_merge_state.merge_in_final_event(log, &["."])
1258                    }
1259                    LogNamespace::Legacy => partial_event_merge_state.merge_in_final_event(
1260                        log,
1261                        &[log_schema()
1262                            .message_key()
1263                            .expect("global log_schema.message_key to be valid path")
1264                            .to_string()],
1265                    ),
1266                },
1267                None => log,
1268            }
1269        } else {
1270            // If the event is partial, just set the partial event marker field.
1271            if is_partial {
1272                // Only add partial event marker field if it's requested.
1273                if let Some(partial_event_marker_field) = partial_event_marker_field {
1274                    log_namespace.insert_source_metadata(
1275                        DockerLogsConfig::NAME,
1276                        &mut log,
1277                        Some(LegacyKey::Overwrite(path!(
1278                            partial_event_marker_field.as_str()
1279                        ))),
1280                        path!(event::PARTIAL),
1281                        true,
1282                    );
1283                }
1284            }
1285            // Return the log event as is, partial or not. No merging here.
1286            log
1287        };
1288
1289        // Partial or not partial - we return the event we got here, because all
1290        // other cases were handled earlier.
1291        emit!(DockerLogsEventsReceived {
1292            byte_size: log.estimated_json_encoded_size_of(),
1293            container_id: self.id.as_str(),
1294            container_name: &self.metadata.name_str
1295        });
1296
1297        Some(log)
1298    }
1299}
1300
1301struct ContainerMetadata {
1302    /// label.key -> String
1303    labels: HashMap<String, String>,
1304    /// name -> String
1305    name: Value,
1306    /// name
1307    name_str: String,
1308    /// image -> String
1309    image: Value,
1310    /// created_at
1311    created_at: DateTime<Utc>,
1312}
1313
1314impl ContainerMetadata {
1315    fn from_details(details: ContainerInspectResponse) -> Result<Self, ParseError> {
1316        let config = details.config.unwrap();
1317        let name = details.name.unwrap();
1318        let created = details.created.unwrap();
1319
1320        let labels = config.labels.unwrap_or_default();
1321
1322        Ok(ContainerMetadata {
1323            labels,
1324            name: name.as_str().trim_start_matches('/').to_owned().into(),
1325            name_str: name,
1326            image: config.image.unwrap().into(),
1327            created_at: created.with_timezone(&Utc),
1328        })
1329    }
1330}
1331
1332fn line_agg_adapter(
1333    inner: impl Stream<Item = LogEvent> + Unpin,
1334    logic: line_agg::Logic<Bytes, LogEvent>,
1335    log_namespace: LogNamespace,
1336) -> impl Stream<Item = LogEvent> {
1337    let line_agg_in = inner.map(move |mut log| {
1338        let message_value = match log_namespace {
1339            LogNamespace::Vector => log
1340                .remove(event_path!())
1341                .expect("`.` must exist in the event"),
1342            LogNamespace::Legacy => log
1343                .remove(
1344                    log_schema()
1345                        .message_key_target_path()
1346                        .expect("global log_schema.message_key to be valid path"),
1347                )
1348                .expect("`message` must exist in the event"),
1349        };
1350        let stream_value = match log_namespace {
1351            LogNamespace::Vector => log
1352                .get(metadata_path!(DockerLogsConfig::NAME, STREAM))
1353                .expect("`docker_logs.stream` must exist in the metadata"),
1354            LogNamespace::Legacy => log
1355                .get(event_path!(STREAM))
1356                .expect("stream must exist in the event"),
1357        };
1358
1359        let stream = stream_value.coerce_to_bytes();
1360        let message = message_value.coerce_to_bytes();
1361        (stream, message, log)
1362    });
1363    let line_agg_out = LineAgg::<_, Bytes, LogEvent>::new(line_agg_in, logic);
1364    line_agg_out.map(move |(_, message, mut log, _)| {
1365        match log_namespace {
1366            LogNamespace::Vector => log.insert(event_path!(), message),
1367            LogNamespace::Legacy => log.insert(
1368                log_schema()
1369                    .message_key_target_path()
1370                    .expect("global log_schema.message_key to be valid path"),
1371                message,
1372            ),
1373        };
1374        log
1375    })
1376}