Skip to main content

vector/sources/
kafka.rs

1use std::{
2    collections::{HashMap, HashSet},
3    io::Cursor,
4    pin::Pin,
5    sync::{
6        Arc, OnceLock, Weak,
7        mpsc::{SyncSender, sync_channel},
8    },
9    time::Duration,
10};
11
12use async_stream::stream;
13use bytes::Bytes;
14use chrono::{DateTime, TimeZone, Utc};
15use futures::{Stream, StreamExt};
16use futures_util::future::OptionFuture;
17use rdkafka::{
18    ClientConfig, ClientContext, Statistics, TopicPartitionList,
19    consumer::{
20        BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer,
21        stream_consumer::StreamPartitionQueue,
22    },
23    error::KafkaError,
24    message::{BorrowedMessage, Headers as _, Message},
25    types::RDKafkaErrorCode,
26};
27use serde_with::serde_as;
28use snafu::{ResultExt, Snafu};
29use tokio::{
30    runtime::Handle,
31    sync::{
32        mpsc::{self, UnboundedReceiver, UnboundedSender},
33        oneshot,
34    },
35    task::JoinSet,
36    time::Sleep,
37};
38use tracing::{Instrument, Span};
39use vector_lib::{
40    EstimatedJsonEncodedSizeOf,
41    codecs::{
42        DecoderFramedRead, StreamDecodingError,
43        decoding::{DeserializerConfig, FramingConfig},
44    },
45    config::{LegacyKey, LogNamespace},
46    configurable::configurable_component,
47    finalizer::OrderedFinalizer,
48    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
49};
50use vrl::value::{Kind, ObjectMap, kind::Collection};
51
52use crate::{
53    SourceSender,
54    codecs::{Decoder, DecodingConfig},
55    config::{
56        LogSchema, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
57        log_schema,
58    },
59    event::{BatchNotifier, BatchStatus, Event, Value},
60    internal_events::{
61        KafkaBytesReceived, KafkaEventsReceived, KafkaOffsetUpdateError, KafkaReadError,
62        StreamClosedError,
63    },
64    kafka,
65    serde::{bool_or_struct, default_decoding, default_framing_message_based},
66    shutdown::ShutdownSignal,
67};
68
69#[derive(Debug, Snafu)]
70enum BuildError {
71    #[snafu(display("The drain_timeout_ms ({}) must be less than session_timeout_ms ({})", value, session_timeout_ms.as_millis()))]
72    InvalidDrainTimeout {
73        value: u64,
74        session_timeout_ms: Duration,
75    },
76    #[snafu(display("Could not create Kafka consumer: {}", source))]
77    CreateError { source: rdkafka::error::KafkaError },
78    #[snafu(display("Could not subscribe to Kafka topics: {}", source))]
79    SubscribeError { source: rdkafka::error::KafkaError },
80}
81
82/// Metrics (beta) configuration.
83#[configurable_component]
84#[derive(Clone, Debug, Default)]
85struct Metrics {
86    /// Expose topic lag metrics for all topics and partitions. Metric names are `kafka_consumer_lag`.
87    pub topic_lag_metric: bool,
88}
89
90/// Configuration for the `kafka` source.
91#[serde_as]
92#[configurable_component(source("kafka", "Collect logs from Apache Kafka."))]
93#[derive(Clone, Debug, Derivative)]
94#[derivative(Default)]
95#[serde(deny_unknown_fields)]
96pub struct KafkaSourceConfig {
97    /// A comma-separated list of Kafka bootstrap servers.
98    ///
99    /// These are the servers in a Kafka cluster that a client should use to bootstrap its connection to the cluster,
100    /// allowing discovery of all the other hosts in the cluster.
101    ///
102    /// Must be in the form of `host:port`, and comma-separated.
103    #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
104    bootstrap_servers: String,
105
106    /// The Kafka topics names to read events from.
107    ///
108    /// Regular expression syntax is supported if the topic begins with `^`.
109    #[configurable(metadata(
110        docs::examples = "^(prefix1|prefix2)-.+",
111        docs::examples = "topic-1",
112        docs::examples = "topic-2"
113    ))]
114    topics: Vec<String>,
115
116    /// The consumer group name to be used to consume events from Kafka.
117    #[configurable(metadata(docs::examples = "consumer-group-name"))]
118    group_id: String,
119
120    /// If offsets for consumer group do not exist, set them using this strategy.
121    ///
122    /// See the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for the `auto.offset.reset` option for further clarification.
123    #[serde(default = "default_auto_offset_reset")]
124    #[configurable(metadata(docs::examples = "example_auto_offset_reset_values()"))]
125    auto_offset_reset: String,
126
127    /// The Kafka session timeout.
128    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
129    #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
130    #[configurable(metadata(docs::advanced))]
131    #[serde(default = "default_session_timeout_ms")]
132    #[configurable(metadata(docs::human_name = "Session Timeout"))]
133    session_timeout_ms: Duration,
134
135    /// Timeout to drain pending acknowledgements during shutdown or a Kafka
136    /// consumer group rebalance.
137    ///
138    /// When Vector shuts down or the Kafka consumer group revokes partitions from this
139    /// consumer, wait a maximum of `drain_timeout_ms` for the source to
140    /// process pending acknowledgements. Must be less than `session_timeout_ms`
141    /// to ensure the consumer is not excluded from the group during a rebalance.
142    ///
143    /// Default value is half of `session_timeout_ms`.
144    #[serde(skip_serializing_if = "Option::is_none")]
145    #[configurable(metadata(docs::examples = 2500, docs::examples = 5000))]
146    #[configurable(metadata(docs::advanced))]
147    #[configurable(metadata(docs::human_name = "Drain Timeout"))]
148    drain_timeout_ms: Option<u64>,
149
150    /// Timeout for network requests.
151    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
152    #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
153    #[configurable(metadata(docs::advanced))]
154    #[serde(default = "default_socket_timeout_ms")]
155    #[configurable(metadata(docs::human_name = "Socket Timeout"))]
156    socket_timeout_ms: Duration,
157
158    /// Maximum time the broker may wait to fill the response.
159    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
160    #[configurable(metadata(docs::examples = 50, docs::examples = 100))]
161    #[configurable(metadata(docs::advanced))]
162    #[serde(default = "default_fetch_wait_max_ms")]
163    #[configurable(metadata(docs::human_name = "Max Fetch Wait Time"))]
164    fetch_wait_max_ms: Duration,
165
166    /// The frequency that the consumer offsets are committed (written) to offset storage.
167    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
168    #[serde(default = "default_commit_interval_ms")]
169    #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
170    #[configurable(metadata(docs::human_name = "Commit Interval"))]
171    commit_interval_ms: Duration,
172
173    /// Overrides the name of the log field used to add the message key to each event.
174    ///
175    /// The value is the message key of the Kafka message itself.
176    ///
177    /// By default, `"message_key"` is used.
178    #[serde(default = "default_key_field")]
179    #[configurable(metadata(docs::examples = "message_key"))]
180    key_field: OptionalValuePath,
181
182    /// Overrides the name of the log field used to add the topic to each event.
183    ///
184    /// The value is the topic from which the Kafka message was consumed from.
185    ///
186    /// By default, `"topic"` is used.
187    #[serde(default = "default_topic_key")]
188    #[configurable(metadata(docs::examples = "topic"))]
189    topic_key: OptionalValuePath,
190
191    /// Overrides the name of the log field used to add the partition to each event.
192    ///
193    /// The value is the partition from which the Kafka message was consumed from.
194    ///
195    /// By default, `"partition"` is used.
196    #[serde(default = "default_partition_key")]
197    #[configurable(metadata(docs::examples = "partition"))]
198    partition_key: OptionalValuePath,
199
200    /// Overrides the name of the log field used to add the offset to each event.
201    ///
202    /// The value is the offset of the Kafka message itself.
203    ///
204    /// By default, `"offset"` is used.
205    #[serde(default = "default_offset_key")]
206    #[configurable(metadata(docs::examples = "offset"))]
207    offset_key: OptionalValuePath,
208
209    /// Overrides the name of the log field used to add the headers to each event.
210    ///
211    /// The value is the headers of the Kafka message itself.
212    ///
213    /// By default, `"headers"` is used.
214    #[serde(default = "default_headers_key")]
215    #[configurable(metadata(docs::examples = "headers"))]
216    headers_key: OptionalValuePath,
217
218    /// Advanced options set directly on the underlying `librdkafka` client.
219    ///
220    /// See the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for details.
221    #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
222    #[configurable(metadata(docs::advanced))]
223    #[configurable(metadata(
224        docs::additional_props_description = "A librdkafka configuration option."
225    ))]
226    librdkafka_options: Option<HashMap<String, String>>,
227
228    #[serde(flatten)]
229    auth: kafka::KafkaAuthConfig,
230
231    #[configurable(derived)]
232    #[configurable(metadata(docs::advanced))]
233    #[serde(default = "default_framing_message_based")]
234    #[derivative(Default(value = "default_framing_message_based()"))]
235    framing: FramingConfig,
236
237    #[configurable(derived)]
238    #[serde(default = "default_decoding")]
239    #[derivative(Default(value = "default_decoding()"))]
240    decoding: DeserializerConfig,
241
242    #[configurable(derived)]
243    #[serde(default, deserialize_with = "bool_or_struct")]
244    acknowledgements: SourceAcknowledgementsConfig,
245
246    /// The namespace to use for logs. This overrides the global setting.
247    #[configurable(metadata(docs::hidden))]
248    #[serde(default)]
249    log_namespace: Option<bool>,
250
251    #[configurable(derived)]
252    #[serde(default)]
253    metrics: Metrics,
254}
255
256impl KafkaSourceConfig {
257    fn keys(&self) -> Keys {
258        Keys::from(log_schema(), self)
259    }
260}
261
262const fn default_session_timeout_ms() -> Duration {
263    Duration::from_millis(10000) // default in librdkafka
264}
265
266const fn default_socket_timeout_ms() -> Duration {
267    Duration::from_millis(60000) // default in librdkafka
268}
269
270const fn default_fetch_wait_max_ms() -> Duration {
271    Duration::from_millis(100) // default in librdkafka
272}
273
274const fn default_commit_interval_ms() -> Duration {
275    Duration::from_millis(5000)
276}
277
278fn default_auto_offset_reset() -> String {
279    "largest".into() // default in librdkafka
280}
281
282fn default_key_field() -> OptionalValuePath {
283    OptionalValuePath::from(owned_value_path!("message_key"))
284}
285
286fn default_topic_key() -> OptionalValuePath {
287    OptionalValuePath::from(owned_value_path!("topic"))
288}
289
290fn default_partition_key() -> OptionalValuePath {
291    OptionalValuePath::from(owned_value_path!("partition"))
292}
293
294fn default_offset_key() -> OptionalValuePath {
295    OptionalValuePath::from(owned_value_path!("offset"))
296}
297
298fn default_headers_key() -> OptionalValuePath {
299    OptionalValuePath::from(owned_value_path!("headers"))
300}
301
302const fn example_auto_offset_reset_values() -> [&'static str; 7] {
303    [
304        "smallest",
305        "earliest",
306        "beginning",
307        "largest",
308        "latest",
309        "end",
310        "error",
311    ]
312}
313
314fn example_librdkafka_options() -> HashMap<String, String> {
315    HashMap::<_, _>::from_iter([
316        ("client.id".to_string(), "${ENV_VAR}".to_string()),
317        ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
318        ("socket.send.buffer.bytes".to_string(), "100".to_string()),
319    ])
320}
321
322impl_generate_config_from_default!(KafkaSourceConfig);
323
324#[async_trait::async_trait]
325#[typetag::serde(name = "kafka")]
326impl SourceConfig for KafkaSourceConfig {
327    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
328        let log_namespace = cx.log_namespace(self.log_namespace);
329
330        let decoder =
331            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
332                .build()?;
333        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
334
335        if let Some(d) = self.drain_timeout_ms {
336            snafu::ensure!(
337                Duration::from_millis(d) <= self.session_timeout_ms,
338                InvalidDrainTimeoutSnafu {
339                    value: d,
340                    session_timeout_ms: self.session_timeout_ms
341                }
342            );
343        }
344
345        let (consumer, callback_rx) = create_consumer(self, acknowledgements)?;
346
347        Ok(Box::pin(kafka_source(
348            self.clone(),
349            consumer,
350            callback_rx,
351            decoder,
352            cx.out,
353            cx.shutdown,
354            false,
355            log_namespace,
356        )))
357    }
358
359    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
360        let log_namespace = global_log_namespace.merge(self.log_namespace);
361        let keys = self.keys();
362
363        let schema_definition = self
364            .decoding
365            .schema_definition(log_namespace)
366            .with_standard_vector_source_metadata()
367            .with_source_metadata(
368                Self::NAME,
369                keys.timestamp.map(LegacyKey::Overwrite),
370                &owned_value_path!("timestamp"),
371                Kind::timestamp(),
372                Some("timestamp"),
373            )
374            .with_source_metadata(
375                Self::NAME,
376                keys.topic.clone().map(LegacyKey::Overwrite),
377                &owned_value_path!("topic"),
378                Kind::bytes(),
379                None,
380            )
381            .with_source_metadata(
382                Self::NAME,
383                keys.partition.clone().map(LegacyKey::Overwrite),
384                &owned_value_path!("partition"),
385                Kind::bytes(),
386                None,
387            )
388            .with_source_metadata(
389                Self::NAME,
390                keys.offset.clone().map(LegacyKey::Overwrite),
391                &owned_value_path!("offset"),
392                Kind::bytes(),
393                None,
394            )
395            .with_source_metadata(
396                Self::NAME,
397                keys.headers.clone().map(LegacyKey::Overwrite),
398                &owned_value_path!("headers"),
399                Kind::object(Collection::empty().with_unknown(Kind::bytes())),
400                None,
401            )
402            .with_source_metadata(
403                Self::NAME,
404                keys.key_field.clone().map(LegacyKey::Overwrite),
405                &owned_value_path!("message_key"),
406                Kind::bytes(),
407                None,
408            );
409
410        vec![SourceOutput::new_maybe_logs(
411            self.decoding.output_type(),
412            schema_definition,
413        )]
414    }
415
416    fn can_acknowledge(&self) -> bool {
417        true
418    }
419}
420
421#[allow(clippy::too_many_arguments)]
422async fn kafka_source(
423    config: KafkaSourceConfig,
424    consumer: StreamConsumer<KafkaSourceContext>,
425    callback_rx: UnboundedReceiver<KafkaCallback>,
426    decoder: Decoder,
427    out: SourceSender,
428    shutdown: ShutdownSignal,
429    eof: bool,
430    log_namespace: LogNamespace,
431) -> Result<(), ()> {
432    let span = info_span!("kafka_source");
433    let consumer = Arc::new(consumer);
434
435    consumer
436        .context()
437        .consumer
438        .set(Arc::downgrade(&consumer))
439        .expect("Error setting up consumer context.");
440
441    // EOF signal allowing the coordination task to tell the kafka client task when all partitions have reached EOF
442    let (eof_tx, eof_rx) = eof.then(oneshot::channel::<()>).unzip();
443
444    let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
445    if let Err(e) = consumer.subscribe(&topics).context(SubscribeSnafu) {
446        error!("{}", e);
447        return Err(());
448    }
449
450    let coordination_task = {
451        let span = span.clone();
452        let consumer = Arc::clone(&consumer);
453        let drain_timeout_ms = config
454            .drain_timeout_ms
455            .map_or(config.session_timeout_ms / 2, Duration::from_millis);
456        let consumer_state =
457            ConsumerStateInner::<Consuming>::new(config, decoder, out, log_namespace, span);
458        crate::spawn_in_current_span(async move {
459            coordinate_kafka_callbacks(
460                consumer,
461                callback_rx,
462                consumer_state,
463                drain_timeout_ms,
464                eof_tx,
465            )
466            .await;
467        })
468    };
469
470    let client_task = {
471        let consumer = Arc::clone(&consumer);
472        tokio::task::spawn_blocking(move || {
473            let _enter = span.enter();
474            drive_kafka_consumer(consumer, shutdown, eof_rx);
475        })
476    };
477
478    _ = tokio::join!(client_task, coordination_task);
479    consumer.context().commit_consumer_state();
480
481    Ok(())
482}
483
484/// ConsumerStateInner implements a small struct/enum-based state machine.
485///
486/// With a ConsumerStateInner<Consuming>, the client is able to spawn new tasks
487/// when partitions are assigned. When a shutdown signal is received, or
488/// partitions are being revoked, the Consuming state is traded for a Draining
489/// state (and associated drain deadline future) via the `begin_drain` method
490///
491/// A ConsumerStateInner<Draining> keeps track of partitions that are expected
492/// to complete, and also owns the signal that, when dropped, indicates to the
493/// client driver task that it is safe to proceed with the rebalance or shutdown.
494/// When draining is complete, or the deadline is reached, Draining is traded in for
495/// either a Consuming (after a revoke) or Complete (in the case of shutdown) state,
496/// via the `finish_drain` method.
497///
498/// A ConsumerStateInner<Complete> is the final state, reached after a shutdown
499/// signal is received. This can not be traded for another state, and the
500/// coordination task should exit when this state is reached.
501struct ConsumerStateInner<S> {
502    config: KafkaSourceConfig,
503    decoder: Decoder,
504    out: SourceSender,
505    log_namespace: LogNamespace,
506    consumer_state: S,
507}
508struct Consuming {
509    /// The source's tracing Span used to instrument metrics emitted by consumer tasks
510    span: Span,
511}
512struct Draining {
513    /// The rendezvous channel sender from the revoke or shutdown callback. Sending on this channel
514    /// indicates to the kafka client task that one or more partitions have been drained, while
515    /// closing this channel indicates that all expected partitions have drained, or the drain
516    /// timeout has been reached.
517    signal: SyncSender<()>,
518
519    /// The set of topic-partition tasks that are required to complete during
520    /// the draining phase, populated at the beginning of a rebalance or shutdown.
521    /// Partitions that are being revoked, but not being actively consumed
522    /// (e.g. due to the consumer task exiting early) should not be included.
523    /// The draining phase is considered complete when this set is empty.
524    expect_drain: HashSet<TopicPartition>,
525
526    /// Whether the client is shutting down after draining. If set to true,
527    /// the `finish_drain` method will return a Complete state, otherwise
528    /// a Consuming state.
529    shutdown: bool,
530
531    /// The source's tracing Span used to instrument metrics emitted by consumer tasks
532    span: Span,
533}
534type OptionDeadline = OptionFuture<Pin<Box<Sleep>>>;
535enum ConsumerState {
536    Consuming(ConsumerStateInner<Consuming>),
537    Draining(ConsumerStateInner<Draining>),
538    Complete,
539}
540impl Draining {
541    fn new(signal: SyncSender<()>, shutdown: bool, span: Span) -> Self {
542        Self {
543            signal,
544            shutdown,
545            expect_drain: HashSet::new(),
546            span,
547        }
548    }
549
550    fn is_complete(&self) -> bool {
551        self.expect_drain.is_empty()
552    }
553}
554
555impl<C> ConsumerStateInner<C> {
556    fn complete(self, _deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
557        (None.into(), ConsumerState::Complete)
558    }
559}
560
561impl ConsumerStateInner<Consuming> {
562    const fn new(
563        config: KafkaSourceConfig,
564        decoder: Decoder,
565        out: SourceSender,
566        log_namespace: LogNamespace,
567        span: Span,
568    ) -> Self {
569        Self {
570            config,
571            decoder,
572            out,
573            log_namespace,
574            consumer_state: Consuming { span },
575        }
576    }
577
578    /// Spawn a task on the provided JoinSet to consume the kafka StreamPartitionQueue, and handle
579    /// acknowledgements for the messages consumed Returns a channel sender that can be used to
580    /// signal that the consumer should stop and drain pending acknowledgements, and an AbortHandle
581    /// that can be used to forcefully end the task.
582    fn consume_partition(
583        &self,
584        join_set: &mut JoinSet<(TopicPartition, PartitionConsumerStatus)>,
585        tp: TopicPartition,
586        consumer: Arc<StreamConsumer<KafkaSourceContext>>,
587        p: StreamPartitionQueue<KafkaSourceContext>,
588        acknowledgements: bool,
589        exit_eof: bool,
590    ) -> (oneshot::Sender<()>, tokio::task::AbortHandle) {
591        let keys = self.config.keys();
592        let decoder = self.decoder.clone();
593        let log_namespace = self.log_namespace;
594        let mut out = self.out.clone();
595
596        let (end_tx, mut end_signal) = oneshot::channel::<()>();
597
598        let handle = join_set.spawn(async move {
599            let mut messages = p.stream();
600            let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
601
602            // finalizer is the entry point for new pending acknowledgements;
603            // when it is dropped, no new messages will be consumed, and the
604            // task will end when it reaches the end of ack_stream
605            let mut finalizer = Some(finalizer);
606
607            let mut status = PartitionConsumerStatus::NormalExit;
608
609            loop {
610                tokio::select!(
611                    // Make sure to handle the acknowledgement stream before new messages to prevent
612                    // unbounded memory growth caused by those acks being handled slower than
613                    // incoming messages when the load is high.
614                    biased;
615
616                    // is_some() checks prevent polling end_signal after it completes
617                    _ = &mut end_signal, if finalizer.is_some() => {
618                        finalizer.take();
619                    },
620
621                    ack = ack_stream.next() => match ack {
622                        Some((status, entry)) => {
623                            if status == BatchStatus::Delivered
624                                && let Err(error) =  consumer.store_offset(&entry.topic, entry.partition, entry.offset) {
625                                    emit!(KafkaOffsetUpdateError { error });
626                                }
627                        }
628                        None if finalizer.is_none() => {
629                            debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1);
630                            break
631                        }
632                        None => {
633                            debug!("Acknowledgement stream empty for {}:{}", &tp.0, tp.1);
634                        }
635                    },
636
637                    message = messages.next(), if finalizer.is_some() => match message {
638                        None => unreachable!("MessageStream never calls Ready(None)"),
639                        Some(Err(error)) => match error {
640                            rdkafka::error::KafkaError::PartitionEOF(partition) if exit_eof => {
641                                debug!("EOF for partition {}.", partition);
642                                status = PartitionConsumerStatus::PartitionEOF;
643                                finalizer.take();
644                            },
645                            _ => emit!(KafkaReadError { error }),
646                        },
647                        Some(Ok(msg)) => {
648                            emit!(KafkaBytesReceived {
649                                byte_size: msg.payload_len(),
650                                protocol: "tcp",
651                                topic: msg.topic(),
652                                partition: msg.partition(),
653                            });
654                            parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await;
655                        }
656                    },
657                )
658            }
659            (tp, status)
660        }.instrument(self.consumer_state.span.clone()));
661        (end_tx, handle)
662    }
663
664    /// Consume self, and return a "Draining" ConsumerState, along with a Future
665    /// representing a drain deadline, based on max_drain_ms
666    fn begin_drain(
667        self,
668        max_drain_ms: Duration,
669        sig: SyncSender<()>,
670        shutdown: bool,
671    ) -> (OptionDeadline, ConsumerStateInner<Draining>) {
672        let deadline = Box::pin(tokio::time::sleep(max_drain_ms));
673
674        let draining = ConsumerStateInner {
675            config: self.config,
676            decoder: self.decoder,
677            out: self.out,
678            log_namespace: self.log_namespace,
679            consumer_state: Draining::new(sig, shutdown, self.consumer_state.span),
680        };
681
682        (Some(deadline).into(), draining)
683    }
684
685    pub const fn keep_consuming(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
686        (deadline, ConsumerState::Consuming(self))
687    }
688}
689
690impl ConsumerStateInner<Draining> {
691    /// Mark the given TopicPartition as being revoked, adding it to the set of
692    /// partitions expected to drain
693    fn revoke_partition(&mut self, tp: TopicPartition, end_signal: oneshot::Sender<()>) {
694        // Note that if this send() returns Err, it means the task has already
695        // ended, but the completion has not been processed yet (otherwise we wouldn't have access to the end_signal),
696        // so we should still add it to the "expect to drain" set
697        _ = end_signal.send(());
698        self.consumer_state.expect_drain.insert(tp);
699    }
700
701    /// Add the given TopicPartition to the set of known "drained" partitions,
702    /// i.e. the consumer has drained the acknowledgement channel. A signal is
703    /// sent on the signal channel, indicating to the client that offsets may be committed
704    fn partition_drained(&mut self, tp: TopicPartition) {
705        // This send() will only return Err if the receiver has already been disconnected (i.e. the
706        // kafka client task is no longer running)
707        _ = self.consumer_state.signal.send(());
708        self.consumer_state.expect_drain.remove(&tp);
709    }
710
711    /// Return true if all expected partitions have drained
712    fn is_drain_complete(&self) -> bool {
713        self.consumer_state.is_complete()
714    }
715
716    /// Finish partition drain mode. Consumes self and the drain deadline
717    /// future, and returns a "Consuming" or "Complete" ConsumerState
718    fn finish_drain(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
719        if self.consumer_state.shutdown {
720            self.complete(deadline)
721        } else {
722            (
723                None.into(),
724                ConsumerState::Consuming(ConsumerStateInner {
725                    config: self.config,
726                    decoder: self.decoder,
727                    out: self.out,
728                    log_namespace: self.log_namespace,
729                    consumer_state: Consuming {
730                        span: self.consumer_state.span,
731                    },
732                }),
733            )
734        }
735    }
736
737    pub const fn keep_draining(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
738        (deadline, ConsumerState::Draining(self))
739    }
740}
741
742async fn coordinate_kafka_callbacks(
743    consumer: Arc<StreamConsumer<KafkaSourceContext>>,
744    mut callbacks: UnboundedReceiver<KafkaCallback>,
745    consumer_state: ConsumerStateInner<Consuming>,
746    max_drain_ms: Duration,
747    mut eof: Option<oneshot::Sender<()>>,
748) {
749    let mut drain_deadline: OptionFuture<_> = None.into();
750    let mut consumer_state = ConsumerState::Consuming(consumer_state);
751
752    // A oneshot channel is used for each consumed partition, so that we can
753    // signal to that task to stop consuming, drain pending acks, and exit
754    let mut end_signals: HashMap<TopicPartition, oneshot::Sender<()>> = HashMap::new();
755
756    // The set of consumer tasks, each consuming a specific partition. The task
757    // is both consuming the messages (passing them to the output stream) _and_
758    // processing the corresponding acknowledgement stream. A consumer task
759    // should completely drain its acknowledgement stream after receiving an end signal
760    let mut partition_consumers: JoinSet<(TopicPartition, PartitionConsumerStatus)> =
761        Default::default();
762
763    // Handles that will let us end any consumer task that exceeds a drain deadline
764    let mut abort_handles: HashMap<TopicPartition, tokio::task::AbortHandle> = HashMap::new();
765
766    let exit_eof = eof.is_some();
767
768    while let ConsumerState::Consuming(_) | ConsumerState::Draining(_) = consumer_state {
769        tokio::select! {
770            Some(Ok((finished_partition, status))) = partition_consumers.join_next(), if !partition_consumers.is_empty() => {
771                debug!("Partition consumer finished for {}:{}", &finished_partition.0, finished_partition.1);
772                // If this task ended on its own, the end_signal for it will still be in here.
773                end_signals.remove(&finished_partition);
774                abort_handles.remove(&finished_partition);
775
776                (drain_deadline, consumer_state) = match consumer_state {
777                    ConsumerState::Complete => unreachable!("Partition consumer finished after completion."),
778                    ConsumerState::Draining(mut state) => {
779                        state.partition_drained(finished_partition);
780
781                        if state.is_drain_complete() {
782                            debug!("All expected partitions have drained.");
783                            state.finish_drain(drain_deadline)
784                        } else {
785                            state.keep_draining(drain_deadline)
786                        }
787                    },
788                    ConsumerState::Consuming(state) => {
789                        // If we are here, it is likely because the consumer
790                        // tasks are set up to exit upon reaching the end of the
791                        // partition.
792                        if !exit_eof {
793                            debug!("Partition consumer task finished, while not in draining mode.");
794                        }
795                        state.keep_consuming(drain_deadline)
796                    },
797                };
798
799                // PartitionConsumerStatus differentiates between a task that exited after
800                // being signaled to end, and one that reached the end of its partition and
801                // was configured to exit. After the last such task ends, we signal the kafka
802                // driver task to shut down the main consumer too. Note this is only used in tests.
803                if exit_eof && status == PartitionConsumerStatus::PartitionEOF && partition_consumers.is_empty() {
804                    debug!("All partitions have exited or reached EOF.");
805                    let _ = eof.take().map(|e| e.send(()));
806                }
807            },
808            Some(callback) = callbacks.recv() => match callback {
809                KafkaCallback::PartitionsAssigned(mut assigned_partitions, done) => match consumer_state {
810                    ConsumerState::Complete => unreachable!("Partition assignment received after completion."),
811                    ConsumerState::Draining(_) => error!("Partition assignment received while draining revoked partitions, maybe an invalid assignment."),
812                    ConsumerState::Consuming(ref consumer_state) => {
813                        let acks = consumer.context().acknowledgements;
814                        for tp in assigned_partitions.drain(0..) {
815                            let topic = tp.0.as_str();
816                            let partition = tp.1;
817                            match consumer.split_partition_queue(topic, partition) { Some(pq) => {
818                                debug!("Consuming partition {}:{}.", &tp.0, tp.1);
819                                let (end_tx, handle) = consumer_state.consume_partition(&mut partition_consumers, tp.clone(), Arc::clone(&consumer), pq, acks, exit_eof);
820                                abort_handles.insert(tp.clone(), handle);
821                                end_signals.insert(tp, end_tx);
822                            } _ => {
823                                warn!("Failed to get queue for assigned partition {}:{}.", &tp.0, tp.1);
824                            }}
825                        }
826                        // ensure this is retained until all individual queues are set up
827                        drop(done);
828                    }
829                },
830                KafkaCallback::PartitionsRevoked(mut revoked_partitions, drain) => (drain_deadline, consumer_state) = match consumer_state {
831                    ConsumerState::Complete => unreachable!("Partitions revoked after completion."),
832                    ConsumerState::Draining(d) => {
833                        // NB: This would only happen if the task driving the kafka client (i.e. rebalance handlers)
834                        // is not handling shutdown signals, and a revoke happens during a shutdown drain; otherwise
835                        // this is unreachable code.
836                        warn!("Kafka client is already draining revoked partitions.");
837                        d.keep_draining(drain_deadline)
838                    },
839                    ConsumerState::Consuming(state) => {
840                        let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, false);
841
842                        for tp in revoked_partitions.drain(0..) {
843                            match end_signals.remove(&tp) { Some(end) => {
844                                debug!("Revoking partition {}:{}", &tp.0, tp.1);
845                                state.revoke_partition(tp, end);
846                            } _ => {
847                                debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
848                            }}
849                        }
850
851                        state.keep_draining(deadline)
852                    }
853                },
854                KafkaCallback::ShuttingDown(drain) => (drain_deadline, consumer_state) = match consumer_state {
855                    ConsumerState::Complete => unreachable!("Shutdown received after completion."),
856                    // Shutting down is just like a full assignment revoke, but we also close the
857                    // callback channels, since we don't expect additional assignments or rebalances
858                    ConsumerState::Draining(state) => {
859                        // NB: This would only happen if the task driving the kafka client is
860                        // not handling shutdown signals; otherwise this is unreachable code
861                        error!("Kafka client handled a shutdown signal while a rebalance was in progress.");
862                        callbacks.close();
863                        state.keep_draining(drain_deadline)
864                    },
865                    ConsumerState::Consuming(state) => {
866                        callbacks.close();
867                        let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, true);
868                        if let Ok(tpl) = consumer.assignment() {
869                            // TODO  workaround for https://github.com/fede1024/rust-rdkafka/issues/681
870                            if tpl.capacity() == 0 {
871                                return;
872                            }
873                            tpl.elements()
874                                .iter()
875                                .for_each(|el| {
876
877                                let tp: TopicPartition = (el.topic().into(), el.partition());
878                                match end_signals.remove(&tp) { Some(end) => {
879                                    debug!("Shutting down and revoking partition {}:{}", &tp.0, tp.1);
880                                    state.revoke_partition(tp, end);
881                                } _ => {
882                                    debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
883                                }}
884                            });
885                        }
886                        // If shutdown was initiated by partition EOF mode, the drain phase
887                        // will already be complete and would time out if not accounted for here
888                        if state.is_drain_complete() {
889                            state.finish_drain(deadline)
890                        } else {
891                            state.keep_draining(deadline)
892                        }
893                    }
894                },
895            },
896
897            Some(_) = &mut drain_deadline => (drain_deadline, consumer_state) = match consumer_state {
898                ConsumerState::Complete => unreachable!("Drain deadline received after completion."),
899                ConsumerState::Consuming(state) => {
900                    warn!("A drain deadline fired outside of draining mode.");
901                    state.keep_consuming(None.into())
902                },
903                ConsumerState::Draining(mut draining) => {
904                    debug!("Acknowledgement drain deadline reached. Dropping any pending ack streams for revoked partitions.");
905                    for tp in draining.consumer_state.expect_drain.drain() {
906                        if let Some(handle) = abort_handles.remove(&tp) {
907                            handle.abort();
908                        }
909                    }
910                    draining.finish_drain(drain_deadline)
911                }
912            },
913        }
914    }
915}
916
917fn drive_kafka_consumer(
918    consumer: Arc<StreamConsumer<KafkaSourceContext>>,
919    mut shutdown: ShutdownSignal,
920    eof: Option<oneshot::Receiver<()>>,
921) {
922    Handle::current().block_on(async move {
923        let mut eof: OptionFuture<_> = eof.into();
924        let mut stream = consumer.stream();
925        loop {
926            tokio::select! {
927                _ = &mut shutdown => {
928                    consumer.context().shutdown();
929                    break
930                },
931
932                Some(_) = &mut eof => {
933                    consumer.context().shutdown();
934                    break
935                },
936
937                // NB: messages are not received on this thread, however we poll
938                // the consumer to serve client callbacks, such as rebalance notifications
939                message = stream.next() => match message {
940                    None => unreachable!("MessageStream never returns Ready(None)"),
941                    Some(Err(error)) => emit!(KafkaReadError { error }),
942                    Some(Ok(_msg)) => {
943                        unreachable!("Messages are consumed in dedicated tasks for each partition.")
944                    }
945                },
946            }
947        }
948    });
949}
950
951async fn parse_message(
952    msg: BorrowedMessage<'_>,
953    decoder: Decoder,
954    keys: &'_ Keys,
955    out: &mut SourceSender,
956    acknowledgements: bool,
957    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
958    log_namespace: LogNamespace,
959) {
960    if let Some((count, stream)) = parse_stream(&msg, decoder, keys, log_namespace) {
961        let (batch, receiver) = BatchNotifier::new_with_receiver();
962        let mut stream = stream.map(|event| {
963            // All acknowledgements flow through the normal Finalizer stream so
964            // that they can be handled in one place, but are only tied to the
965            // batch when acknowledgements are enabled
966            if acknowledgements {
967                event.with_batch_notifier(&batch)
968            } else {
969                event
970            }
971        });
972        match out.send_event_stream(&mut stream).await {
973            Err(_) => {
974                emit!(StreamClosedError { count });
975            }
976            Ok(_) => {
977                // Drop stream to avoid borrowing `msg`: "[...] borrow might be used
978                // here, when `stream` is dropped and runs the destructor [...]".
979                drop(stream);
980                if let Some(f) = finalizer.as_ref() {
981                    f.add(msg.into(), receiver)
982                }
983            }
984        }
985    }
986}
987
988// Turn the received message into a stream of parsed events.
989fn parse_stream<'a>(
990    msg: &BorrowedMessage<'a>,
991    decoder: Decoder,
992    keys: &'a Keys,
993    log_namespace: LogNamespace,
994) -> Option<(usize, impl Stream<Item = Event> + 'a + use<'a>)> {
995    let payload = msg.payload()?; // skip messages with empty payload
996
997    let rmsg = ReceivedMessage::from(msg);
998
999    let payload = Cursor::new(Bytes::copy_from_slice(payload));
1000
1001    let mut stream = DecoderFramedRead::with_capacity(payload, decoder, msg.payload_len());
1002    let (count, _) = stream.size_hint();
1003    let stream = stream! {
1004        while let Some(result) = stream.next().await {
1005            match result {
1006                Ok((events, _byte_size)) => {
1007                    emit!(KafkaEventsReceived {
1008                        count: events.len(),
1009                        byte_size: events.estimated_json_encoded_size_of(),
1010                        topic: &rmsg.topic,
1011                        partition: rmsg.partition,
1012                    });
1013                    for mut event in events {
1014                        rmsg.apply(keys, &mut event, log_namespace);
1015                        yield event;
1016                    }
1017                },
1018                Err(error) => {
1019                    // Error is logged by `codecs::Decoder`, no further handling
1020                    // is needed here.
1021                    if !error.can_continue() {
1022                        break;
1023                    }
1024                }
1025            }
1026        }
1027    }
1028    .boxed();
1029    Some((count, stream))
1030}
1031
1032#[derive(Clone, Debug)]
1033struct Keys {
1034    timestamp: Option<OwnedValuePath>,
1035    key_field: Option<OwnedValuePath>,
1036    topic: Option<OwnedValuePath>,
1037    partition: Option<OwnedValuePath>,
1038    offset: Option<OwnedValuePath>,
1039    headers: Option<OwnedValuePath>,
1040}
1041
1042impl Keys {
1043    fn from(schema: &LogSchema, config: &KafkaSourceConfig) -> Self {
1044        Self {
1045            timestamp: schema.timestamp_key().cloned(),
1046            key_field: config.key_field.path.clone(),
1047            topic: config.topic_key.path.clone(),
1048            partition: config.partition_key.path.clone(),
1049            offset: config.offset_key.path.clone(),
1050            headers: config.headers_key.path.clone(),
1051        }
1052    }
1053}
1054
1055struct ReceivedMessage {
1056    timestamp: Option<DateTime<Utc>>,
1057    key: Value,
1058    headers: ObjectMap,
1059    topic: String,
1060    partition: i32,
1061    offset: i64,
1062}
1063
1064impl ReceivedMessage {
1065    fn from(msg: &BorrowedMessage<'_>) -> Self {
1066        // Extract timestamp from kafka message
1067        let timestamp = msg
1068            .timestamp()
1069            .to_millis()
1070            .and_then(|millis| Utc.timestamp_millis_opt(millis).latest());
1071
1072        let key = msg
1073            .key()
1074            .map(|key| Value::from(Bytes::from(key.to_owned())))
1075            .unwrap_or(Value::Null);
1076
1077        let mut headers_map = ObjectMap::new();
1078        if let Some(headers) = msg.headers() {
1079            for header in headers.iter() {
1080                if let Some(value) = header.value {
1081                    headers_map.insert(
1082                        header.key.into(),
1083                        Value::from(Bytes::from(value.to_owned())),
1084                    );
1085                }
1086            }
1087        }
1088
1089        Self {
1090            timestamp,
1091            key,
1092            headers: headers_map,
1093            topic: msg.topic().to_string(),
1094            partition: msg.partition(),
1095            offset: msg.offset(),
1096        }
1097    }
1098
1099    fn apply(&self, keys: &Keys, event: &mut Event, log_namespace: LogNamespace) {
1100        if let Event::Log(log) = event {
1101            match log_namespace {
1102                LogNamespace::Vector => {
1103                    // We'll only use this function in Vector namespaces because we don't want
1104                    // "timestamp" to be set automatically in legacy namespaces. In legacy
1105                    // namespaces, the "timestamp" field corresponds to the Kafka message, not the
1106                    // timestamp when the event was processed.
1107                    log_namespace.insert_standard_vector_source_metadata(
1108                        log,
1109                        KafkaSourceConfig::NAME,
1110                        Utc::now(),
1111                    );
1112                }
1113                LogNamespace::Legacy => {
1114                    if let Some(source_type_key) = log_schema().source_type_key_target_path() {
1115                        log.insert(source_type_key, KafkaSourceConfig::NAME);
1116                    }
1117                }
1118            }
1119
1120            log_namespace.insert_source_metadata(
1121                KafkaSourceConfig::NAME,
1122                log,
1123                keys.key_field.as_ref().map(LegacyKey::Overwrite),
1124                path!("message_key"),
1125                self.key.clone(),
1126            );
1127
1128            log_namespace.insert_source_metadata(
1129                KafkaSourceConfig::NAME,
1130                log,
1131                keys.timestamp.as_ref().map(LegacyKey::Overwrite),
1132                path!("timestamp"),
1133                self.timestamp,
1134            );
1135
1136            log_namespace.insert_source_metadata(
1137                KafkaSourceConfig::NAME,
1138                log,
1139                keys.topic.as_ref().map(LegacyKey::Overwrite),
1140                path!("topic"),
1141                self.topic.clone(),
1142            );
1143
1144            log_namespace.insert_source_metadata(
1145                KafkaSourceConfig::NAME,
1146                log,
1147                keys.partition.as_ref().map(LegacyKey::Overwrite),
1148                path!("partition"),
1149                self.partition,
1150            );
1151
1152            log_namespace.insert_source_metadata(
1153                KafkaSourceConfig::NAME,
1154                log,
1155                keys.offset.as_ref().map(LegacyKey::Overwrite),
1156                path!("offset"),
1157                self.offset,
1158            );
1159
1160            log_namespace.insert_source_metadata(
1161                KafkaSourceConfig::NAME,
1162                log,
1163                keys.headers.as_ref().map(LegacyKey::Overwrite),
1164                path!("headers"),
1165                self.headers.clone(),
1166            );
1167        }
1168    }
1169}
1170
1171#[derive(Debug, Eq, PartialEq, Hash)]
1172struct FinalizerEntry {
1173    topic: String,
1174    partition: i32,
1175    offset: i64,
1176}
1177
1178impl<'a> From<BorrowedMessage<'a>> for FinalizerEntry {
1179    fn from(msg: BorrowedMessage<'a>) -> Self {
1180        Self {
1181            topic: msg.topic().into(),
1182            partition: msg.partition(),
1183            offset: msg.offset(),
1184        }
1185    }
1186}
1187
1188fn create_consumer(
1189    config: &KafkaSourceConfig,
1190    acknowledgements: bool,
1191) -> crate::Result<(
1192    StreamConsumer<KafkaSourceContext>,
1193    UnboundedReceiver<KafkaCallback>,
1194)> {
1195    let mut client_config = ClientConfig::new();
1196    client_config
1197        .set("group.id", &config.group_id)
1198        .set("bootstrap.servers", &config.bootstrap_servers)
1199        .set("auto.offset.reset", &config.auto_offset_reset)
1200        .set(
1201            "session.timeout.ms",
1202            config.session_timeout_ms.as_millis().to_string(),
1203        )
1204        .set(
1205            "socket.timeout.ms",
1206            config.socket_timeout_ms.as_millis().to_string(),
1207        )
1208        .set(
1209            "fetch.wait.max.ms",
1210            config.fetch_wait_max_ms.as_millis().to_string(),
1211        )
1212        .set("enable.partition.eof", "false")
1213        .set("enable.auto.commit", "true")
1214        .set(
1215            "auto.commit.interval.ms",
1216            config.commit_interval_ms.as_millis().to_string(),
1217        )
1218        .set("enable.auto.offset.store", "false")
1219        .set("statistics.interval.ms", "1000")
1220        .set("client.id", "vector");
1221
1222    config.auth.apply(&mut client_config)?;
1223
1224    if let Some(librdkafka_options) = &config.librdkafka_options {
1225        for (key, value) in librdkafka_options {
1226            client_config.set(key.as_str(), value.as_str());
1227        }
1228    }
1229
1230    let (callbacks, callback_rx) = mpsc::unbounded_channel();
1231    let consumer = client_config
1232        .create_with_context::<_, StreamConsumer<_>>(KafkaSourceContext::new(
1233            config.metrics.topic_lag_metric,
1234            acknowledgements,
1235            callbacks,
1236            Span::current(),
1237        ))
1238        .context(CreateSnafu)?;
1239
1240    Ok((consumer, callback_rx))
1241}
1242
1243type TopicPartition = (String, i32);
1244
1245/// Status returned by partition consumer tasks, allowing the coordination task
1246/// to differentiate between a consumer exiting normally (after receiving an end
1247/// signal) and exiting when it reaches the end of a partition
1248#[derive(PartialEq)]
1249enum PartitionConsumerStatus {
1250    NormalExit,
1251    PartitionEOF,
1252}
1253
1254enum KafkaCallback {
1255    PartitionsAssigned(Vec<TopicPartition>, SyncSender<()>),
1256    PartitionsRevoked(Vec<TopicPartition>, SyncSender<()>),
1257    ShuttingDown(SyncSender<()>),
1258}
1259
1260struct KafkaSourceContext {
1261    acknowledgements: bool,
1262    stats: kafka::KafkaStatisticsContext,
1263
1264    /// A callback channel used to coordinate between the main consumer task and the acknowledgement task
1265    callbacks: UnboundedSender<KafkaCallback>,
1266
1267    /// A weak reference to the consumer, so that we can commit offsets during a rebalance operation
1268    consumer: OnceLock<Weak<StreamConsumer<KafkaSourceContext>>>,
1269}
1270
1271impl KafkaSourceContext {
1272    fn new(
1273        expose_lag_metrics: bool,
1274        acknowledgements: bool,
1275        callbacks: UnboundedSender<KafkaCallback>,
1276        span: Span,
1277    ) -> Self {
1278        Self {
1279            stats: kafka::KafkaStatisticsContext {
1280                expose_lag_metrics,
1281                span,
1282            },
1283            acknowledgements,
1284            consumer: OnceLock::default(),
1285            callbacks,
1286        }
1287    }
1288
1289    fn shutdown(&self) {
1290        let (send, rendezvous) = sync_channel(0);
1291        if self
1292            .callbacks
1293            .send(KafkaCallback::ShuttingDown(send))
1294            .is_ok()
1295        {
1296            while rendezvous.recv().is_ok() {
1297                self.commit_consumer_state();
1298            }
1299        }
1300    }
1301
1302    /// Emit a PartitionsAssigned callback with the topic-partitions to be consumed,
1303    /// and block until confirmation is received that a stream and consumer for
1304    /// each topic-partition has been set up. This function blocks until the
1305    /// rendezvous channel sender is dropped by the callback handler.
1306    fn consume_partitions(&self, tpl: &TopicPartitionList) {
1307        // TODO  workaround for https://github.com/fede1024/rust-rdkafka/issues/681
1308        if tpl.capacity() == 0 {
1309            return;
1310        }
1311        let (send, rendezvous) = sync_channel(0);
1312        self.callbacks
1313            .send(KafkaCallback::PartitionsAssigned(
1314                tpl.elements()
1315                    .iter()
1316                    .map(|tp| (tp.topic().into(), tp.partition()))
1317                    .collect(),
1318                send,
1319            ))
1320            .ok();
1321
1322        while rendezvous.recv().is_ok() {
1323            // no-op: wait for partition assignment handler to complete
1324        }
1325    }
1326
1327    /// Emit a PartitionsRevoked callback and block until confirmation is
1328    /// received that acknowledgements have been processed for each of them.
1329    /// The rendezvous channel used in the callback can send multiple times to
1330    /// signal individual partitions completing. This function blocks until the
1331    /// sender is dropped by the callback handler.
1332    fn revoke_partitions(&self, tpl: &TopicPartitionList) {
1333        let (send, rendezvous) = sync_channel(0);
1334        self.callbacks
1335            .send(KafkaCallback::PartitionsRevoked(
1336                tpl.elements()
1337                    .iter()
1338                    .map(|tp| (tp.topic().into(), tp.partition()))
1339                    .collect(),
1340                send,
1341            ))
1342            .ok();
1343
1344        while rendezvous.recv().is_ok() {
1345            self.commit_consumer_state();
1346        }
1347    }
1348
1349    fn commit_consumer_state(&self) {
1350        if let Some(consumer) = self
1351            .consumer
1352            .get()
1353            .expect("Consumer reference was not initialized.")
1354            .upgrade()
1355        {
1356            match consumer.commit_consumer_state(CommitMode::Sync) {
1357                Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => {
1358                    /* Success, or nothing to do - yay \0/ */
1359                }
1360                Err(error) => emit!(KafkaOffsetUpdateError { error }),
1361            }
1362        }
1363    }
1364}
1365
1366impl ClientContext for KafkaSourceContext {
1367    fn stats(&self, statistics: Statistics) {
1368        self.stats.stats(statistics)
1369    }
1370}
1371
1372impl ConsumerContext for KafkaSourceContext {
1373    fn pre_rebalance(&self, _base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance) {
1374        match rebalance {
1375            Rebalance::Assign(tpl) => self.consume_partitions(tpl),
1376
1377            Rebalance::Revoke(tpl) => {
1378                self.revoke_partitions(tpl);
1379                self.commit_consumer_state();
1380            }
1381
1382            Rebalance::Error(message) => {
1383                error!("Error during Kafka consumer group rebalance: {}.", message);
1384            }
1385        }
1386    }
1387}
1388
1389#[cfg(test)]
1390mod test {
1391    use vector_lib::{lookup::OwnedTargetPath, schema::Definition};
1392
1393    use super::*;
1394
1395    pub fn kafka_host() -> String {
1396        std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
1397    }
1398    pub fn kafka_port() -> u16 {
1399        let port = std::env::var("KAFKA_PORT").unwrap_or_else(|_| "9091".into());
1400        port.parse().expect("Invalid port number")
1401    }
1402
1403    pub fn kafka_address() -> String {
1404        format!("{}:{}", kafka_host(), kafka_port())
1405    }
1406
1407    #[test]
1408    fn generate_config() {
1409        crate::test_util::test_generate_config::<KafkaSourceConfig>();
1410    }
1411
1412    pub(super) fn make_config(
1413        topic: &str,
1414        group: &str,
1415        log_namespace: LogNamespace,
1416        librdkafka_options: Option<HashMap<String, String>>,
1417    ) -> KafkaSourceConfig {
1418        KafkaSourceConfig {
1419            bootstrap_servers: kafka_address(),
1420            topics: vec![topic.into()],
1421            group_id: group.into(),
1422            auto_offset_reset: "beginning".into(),
1423            session_timeout_ms: Duration::from_millis(6000),
1424            commit_interval_ms: Duration::from_millis(1),
1425            librdkafka_options,
1426            key_field: default_key_field(),
1427            topic_key: default_topic_key(),
1428            partition_key: default_partition_key(),
1429            offset_key: default_offset_key(),
1430            headers_key: default_headers_key(),
1431            socket_timeout_ms: Duration::from_millis(60000),
1432            fetch_wait_max_ms: Duration::from_millis(100),
1433            log_namespace: Some(log_namespace == LogNamespace::Vector),
1434            ..Default::default()
1435        }
1436    }
1437
1438    #[test]
1439    fn test_output_schema_definition_vector_namespace() {
1440        let definitions = make_config("topic", "group", LogNamespace::Vector, None)
1441            .outputs(LogNamespace::Vector)
1442            .remove(0)
1443            .schema_definition(true);
1444
1445        assert_eq!(
1446            definitions,
1447            Some(
1448                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1449                    .with_meaning(OwnedTargetPath::event_root(), "message")
1450                    .with_metadata_field(
1451                        &owned_value_path!("kafka", "timestamp"),
1452                        Kind::timestamp(),
1453                        Some("timestamp")
1454                    )
1455                    .with_metadata_field(
1456                        &owned_value_path!("kafka", "message_key"),
1457                        Kind::bytes(),
1458                        None
1459                    )
1460                    .with_metadata_field(&owned_value_path!("kafka", "topic"), Kind::bytes(), None)
1461                    .with_metadata_field(
1462                        &owned_value_path!("kafka", "partition"),
1463                        Kind::bytes(),
1464                        None
1465                    )
1466                    .with_metadata_field(&owned_value_path!("kafka", "offset"), Kind::bytes(), None)
1467                    .with_metadata_field(
1468                        &owned_value_path!("kafka", "headers"),
1469                        Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1470                        None
1471                    )
1472                    .with_metadata_field(
1473                        &owned_value_path!("vector", "ingest_timestamp"),
1474                        Kind::timestamp(),
1475                        None
1476                    )
1477                    .with_metadata_field(
1478                        &owned_value_path!("vector", "source_type"),
1479                        Kind::bytes(),
1480                        None
1481                    )
1482            )
1483        )
1484    }
1485
1486    #[test]
1487    fn test_output_schema_definition_legacy_namespace() {
1488        let definitions = make_config("topic", "group", LogNamespace::Legacy, None)
1489            .outputs(LogNamespace::Legacy)
1490            .remove(0)
1491            .schema_definition(true);
1492
1493        assert_eq!(
1494            definitions,
1495            Some(
1496                Definition::new_with_default_metadata(Kind::json(), [LogNamespace::Legacy])
1497                    .unknown_fields(Kind::undefined())
1498                    .with_event_field(
1499                        &owned_value_path!("message"),
1500                        Kind::bytes(),
1501                        Some("message")
1502                    )
1503                    .with_event_field(
1504                        &owned_value_path!("timestamp"),
1505                        Kind::timestamp(),
1506                        Some("timestamp")
1507                    )
1508                    .with_event_field(&owned_value_path!("message_key"), Kind::bytes(), None)
1509                    .with_event_field(&owned_value_path!("topic"), Kind::bytes(), None)
1510                    .with_event_field(&owned_value_path!("partition"), Kind::bytes(), None)
1511                    .with_event_field(&owned_value_path!("offset"), Kind::bytes(), None)
1512                    .with_event_field(
1513                        &owned_value_path!("headers"),
1514                        Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1515                        None
1516                    )
1517                    .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1518            )
1519        )
1520    }
1521
1522    #[tokio::test]
1523    async fn consumer_create_ok() {
1524        let config = make_config("topic", "group", LogNamespace::Legacy, None);
1525        assert!(create_consumer(&config, true).is_ok());
1526    }
1527
1528    #[tokio::test]
1529    async fn consumer_create_incorrect_auto_offset_reset() {
1530        let config = KafkaSourceConfig {
1531            auto_offset_reset: "incorrect-auto-offset-reset".to_string(),
1532            ..make_config("topic", "group", LogNamespace::Legacy, None)
1533        };
1534        assert!(create_consumer(&config, true).is_err());
1535    }
1536}
1537
1538#[cfg(feature = "kafka-integration-tests")]
1539#[cfg(test)]
1540mod integration_test {
1541    use std::time::Duration;
1542
1543    use chrono::{DateTime, SubsecRound, Utc};
1544    use futures::Stream;
1545    use futures_util::stream::FuturesUnordered;
1546    use rdkafka::{
1547        Offset, TopicPartitionList,
1548        admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
1549        client::DefaultClientContext,
1550        config::{ClientConfig, FromClientConfig},
1551        consumer::BaseConsumer,
1552        message::{Header, OwnedHeaders},
1553        producer::{FutureProducer, FutureRecord},
1554        util::Timeout,
1555    };
1556    use stream_cancel::{Trigger, Tripwire};
1557    use tokio::time::sleep;
1558    use vector_lib::event::EventStatus;
1559    use vrl::{event_path, value};
1560
1561    use super::{test::*, *};
1562    use crate::{
1563        SourceSender,
1564        event::{EventArray, EventContainer},
1565        shutdown::ShutdownSignal,
1566        test_util::{collect_n, components::assert_source_compliance, random_string},
1567    };
1568
1569    const KEY: &str = "my key";
1570    const TEXT: &str = "my message";
1571    const HEADER_KEY: &str = "my header";
1572    const HEADER_VALUE: &str = "my header value";
1573
1574    fn kafka_test_topic() -> String {
1575        std::env::var("KAFKA_TEST_TOPIC")
1576            .unwrap_or_else(|_| format!("test-topic-{}", random_string(10)))
1577    }
1578    fn kafka_max_bytes() -> String {
1579        std::env::var("KAFKA_MAX_BYTES").unwrap_or_else(|_| "1024".into())
1580    }
1581
1582    fn client_config<T: FromClientConfig>(group: Option<&str>) -> T {
1583        let mut client = ClientConfig::new();
1584        client.set("bootstrap.servers", kafka_address());
1585        client.set("produce.offset.report", "true");
1586        client.set("message.timeout.ms", "5000");
1587        client.set("auto.commit.interval.ms", "1");
1588        if let Some(group) = group {
1589            client.set("group.id", group);
1590        }
1591        client.create().expect("Producer creation error")
1592    }
1593
1594    async fn send_events(topic: String, partitions: i32, count: usize) -> DateTime<Utc> {
1595        let now = Utc::now();
1596        let timestamp = now.timestamp_millis();
1597
1598        let producer: &FutureProducer = &client_config(None);
1599        let topic_name = topic.as_ref();
1600
1601        create_topic(topic_name, partitions).await;
1602
1603        (0..count)
1604            .map(|i| async move {
1605                let text = format!("{TEXT} {i:03}");
1606                let key = format!("{KEY} {i}");
1607                let record = FutureRecord::to(topic_name)
1608                    .payload(&text)
1609                    .key(&key)
1610                    .timestamp(timestamp)
1611                    .headers(OwnedHeaders::new().insert(Header {
1612                        key: HEADER_KEY,
1613                        value: Some(HEADER_VALUE),
1614                    }));
1615                if let Err(error) = producer.send(record, Timeout::Never).await {
1616                    panic!("Cannot send event to Kafka: {error:?}");
1617                }
1618            })
1619            .collect::<FuturesUnordered<_>>()
1620            .collect::<Vec<_>>()
1621            .await;
1622
1623        now
1624    }
1625
1626    async fn send_to_test_topic(partitions: i32, count: usize) -> (String, String, DateTime<Utc>) {
1627        let topic = kafka_test_topic();
1628        let group_id = format!("test-group-{}", random_string(10));
1629
1630        let sent_at = send_events(topic.clone(), partitions, count).await;
1631
1632        (topic, group_id, sent_at)
1633    }
1634
1635    #[tokio::test]
1636    async fn consumes_event_with_acknowledgements() {
1637        send_receive(true, |_| false, 10, LogNamespace::Legacy).await;
1638    }
1639
1640    #[tokio::test]
1641    async fn consumes_event_with_acknowledgements_vector_namespace() {
1642        send_receive(true, |_| false, 10, LogNamespace::Vector).await;
1643    }
1644
1645    #[tokio::test]
1646    async fn consumes_event_without_acknowledgements() {
1647        send_receive(false, |_| false, 10, LogNamespace::Legacy).await;
1648    }
1649
1650    #[tokio::test]
1651    async fn consumes_event_without_acknowledgements_vector_namespace() {
1652        send_receive(false, |_| false, 10, LogNamespace::Vector).await;
1653    }
1654
1655    #[tokio::test]
1656    async fn handles_one_negative_acknowledgement() {
1657        send_receive(true, |n| n == 2, 10, LogNamespace::Legacy).await;
1658    }
1659
1660    #[tokio::test]
1661    async fn handles_one_negative_acknowledgement_vector_namespace() {
1662        send_receive(true, |n| n == 2, 10, LogNamespace::Vector).await;
1663    }
1664
1665    #[tokio::test]
1666    async fn handles_permanent_negative_acknowledgement() {
1667        send_receive(true, |n| n >= 2, 2, LogNamespace::Legacy).await;
1668    }
1669
1670    #[tokio::test]
1671    async fn handles_permanent_negative_acknowledgement_vector_namespace() {
1672        send_receive(true, |n| n >= 2, 2, LogNamespace::Vector).await;
1673    }
1674
1675    async fn send_receive(
1676        acknowledgements: bool,
1677        error_at: impl Fn(usize) -> bool,
1678        receive_count: usize,
1679        log_namespace: LogNamespace,
1680    ) {
1681        const SEND_COUNT: usize = 10;
1682
1683        let topic = format!("test-topic-{}", random_string(10));
1684        let group_id = format!("test-group-{}", random_string(10));
1685        let config = make_config(&topic, &group_id, log_namespace, None);
1686
1687        let now = send_events(topic.clone(), 1, 10).await;
1688
1689        let events = assert_source_compliance(&["protocol", "topic", "partition"], async move {
1690            let (tx, rx) = SourceSender::new_test_errors(error_at);
1691            let (trigger_shutdown, shutdown_done) =
1692                spawn_kafka(tx, config, acknowledgements, false, log_namespace);
1693            let events = collect_n(rx, SEND_COUNT).await;
1694            // Yield to the finalization task to let it collect the
1695            // batch status receivers before signalling the shutdown.
1696            tokio::task::yield_now().await;
1697            drop(trigger_shutdown);
1698            shutdown_done.await;
1699
1700            events
1701        })
1702        .await;
1703
1704        let offset = fetch_tpl_offset(&group_id, &topic, 0);
1705        assert_eq!(offset, Offset::from_raw(receive_count as i64));
1706
1707        assert_eq!(events.len(), SEND_COUNT);
1708        for (i, event) in events.into_iter().enumerate() {
1709            if let LogNamespace::Legacy = log_namespace {
1710                assert_eq!(
1711                    event.as_log()[log_schema().message_key().unwrap().to_string()],
1712                    format!("{TEXT} {i:03}").into()
1713                );
1714                assert_eq!(event.as_log()["message_key"], format!("{KEY} {i}").into());
1715                assert_eq!(
1716                    event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1717                    "kafka".into()
1718                );
1719                assert_eq!(
1720                    event.as_log()[log_schema().timestamp_key().unwrap().to_string()],
1721                    now.trunc_subsecs(3).into()
1722                );
1723                assert_eq!(event.as_log()["topic"], topic.clone().into());
1724                assert!(event.as_log().contains("partition"));
1725                assert!(event.as_log().contains("offset"));
1726                let mut expected_headers = ObjectMap::new();
1727                expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1728                assert_eq!(event.as_log()["headers"], Value::from(expected_headers));
1729            } else {
1730                let meta = event.as_log().metadata().value();
1731
1732                assert_eq!(
1733                    meta.get(path!("vector", "source_type")).unwrap(),
1734                    &value!(KafkaSourceConfig::NAME)
1735                );
1736                assert!(
1737                    meta.get(path!("vector", "ingest_timestamp"))
1738                        .unwrap()
1739                        .is_timestamp()
1740                );
1741
1742                assert_eq!(
1743                    event.as_log().value(),
1744                    &value!(format!("{} {:03}", TEXT, i))
1745                );
1746                assert_eq!(
1747                    meta.get(path!("kafka", "message_key")).unwrap(),
1748                    &value!(format!("{} {}", KEY, i))
1749                );
1750
1751                assert_eq!(
1752                    meta.get(path!("kafka", "timestamp")).unwrap(),
1753                    &value!(now.trunc_subsecs(3))
1754                );
1755                assert_eq!(
1756                    meta.get(path!("kafka", "topic")).unwrap(),
1757                    &value!(topic.clone())
1758                );
1759                assert!(meta.get(path!("kafka", "partition")).unwrap().is_integer(),);
1760                assert!(meta.get(path!("kafka", "offset")).unwrap().is_integer(),);
1761
1762                let mut expected_headers = ObjectMap::new();
1763                expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1764                assert_eq!(
1765                    meta.get(path!("kafka", "headers")).unwrap(),
1766                    &Value::from(expected_headers)
1767                );
1768            }
1769        }
1770    }
1771
1772    fn make_rand_config() -> (String, String, KafkaSourceConfig) {
1773        let topic = format!("test-topic-{}", random_string(10));
1774        let group_id = format!("test-group-{}", random_string(10));
1775        let config = make_config(&topic, &group_id, LogNamespace::Legacy, None);
1776        (topic, group_id, config)
1777    }
1778
1779    fn delay_pipeline(
1780        id: usize,
1781        delay: Duration,
1782        status: EventStatus,
1783    ) -> (SourceSender, impl Stream<Item = EventArray> + Unpin) {
1784        let (pipe, recv) = SourceSender::new_test_sender_with_options(100, None);
1785        let recv = recv.into_stream();
1786        let recv = recv.then(move |item| async move {
1787            let mut events = item.events;
1788            events.iter_logs_mut().for_each(|log| {
1789                log.insert(event_path!("pipeline_id"), id.to_string());
1790            });
1791            sleep(delay).await;
1792            events.iter_events_mut().for_each(|mut event| {
1793                let metadata = event.metadata_mut();
1794                metadata.update_status(status);
1795                metadata.update_sources();
1796            });
1797            events
1798        });
1799        (pipe, Box::pin(recv))
1800    }
1801
1802    fn spawn_kafka(
1803        out: SourceSender,
1804        config: KafkaSourceConfig,
1805        acknowledgements: bool,
1806        eof: bool,
1807        log_namespace: LogNamespace,
1808    ) -> (Trigger, Tripwire) {
1809        let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
1810
1811        let decoder = DecodingConfig::new(
1812            config.framing.clone(),
1813            config.decoding.clone(),
1814            log_namespace,
1815        )
1816        .build()
1817        .unwrap();
1818
1819        let (consumer, callback_rx) = create_consumer(&config, acknowledgements).unwrap();
1820
1821        tokio::spawn(kafka_source(
1822            config,
1823            consumer,
1824            callback_rx,
1825            decoder,
1826            out,
1827            shutdown,
1828            eof,
1829            log_namespace,
1830        ));
1831        (trigger_shutdown, shutdown_done)
1832    }
1833
1834    fn fetch_tpl_offset(group_id: &str, topic: &str, partition: i32) -> Offset {
1835        let client: BaseConsumer = client_config(Some(group_id));
1836        client.subscribe(&[topic]).expect("Subscribing failed");
1837
1838        let mut tpl = TopicPartitionList::new();
1839        tpl.add_partition(topic, partition);
1840        client
1841            .committed_offsets(tpl, Duration::from_secs(1))
1842            .expect("Getting committed offsets failed")
1843            .find_partition(topic, partition)
1844            .expect("Missing topic/partition")
1845            .offset()
1846    }
1847
1848    async fn create_topic(topic: &str, partitions: i32) {
1849        let client: AdminClient<DefaultClientContext> = client_config(None);
1850        let topic_results = client
1851            .create_topics(
1852                [&NewTopic {
1853                    name: topic,
1854                    num_partitions: partitions,
1855                    replication: TopicReplication::Fixed(1),
1856                    config: vec![],
1857                }],
1858                &AdminOptions::default(),
1859            )
1860            .await
1861            .expect("create_topics failed");
1862
1863        for result in topic_results {
1864            if let Err((topic, err)) = result
1865                && err != rdkafka::types::RDKafkaErrorCode::TopicAlreadyExists
1866            {
1867                panic!("Creating a topic failed: {:?}", (topic, err))
1868            }
1869        }
1870    }
1871
1872    // Failure timeline:
1873    // - Topic exists on multiple partitions
1874    // - Consumer A connects to topic, is assigned both partitions
1875    // - Consumer A receives some messages
1876    // - Consumer B connects to topic
1877    // - Consumer A has one partition revoked (rebalance)
1878    // - Consumer B is assigned a partition
1879    // - Consumer A stores an order on the revoked partition
1880    // - Consumer B skips receiving messages?
1881    #[ignore]
1882    #[tokio::test]
1883    async fn handles_rebalance() {
1884        // The test plan here is to:
1885        // - Set up one source instance, feeding into a pipeline that delays acks.
1886        // - Wait a bit, and set up a second source instance. This should cause a rebalance.
1887        // - Wait further until all events will have been pulled down.
1888        // - Verify that all events are captured by the two sources, and that offsets are set right, etc.
1889
1890        // However this test, as written, does not actually cause the
1891        // conditions required to test this. We have had external
1892        // validation that the sink behaves properly on rebalance
1893        // events.  This test also requires the insertion of a small
1894        // delay into the source to guarantee the timing, which is not
1895        // suitable for production code.
1896
1897        const NEVENTS: usize = 200;
1898        const DELAY: u64 = 100;
1899
1900        let (topic, group_id, config) = make_rand_config();
1901        create_topic(&topic, 2).await;
1902
1903        let _send_start = send_events(topic.clone(), 1, NEVENTS).await;
1904
1905        let (tx, rx1) = delay_pipeline(1, Duration::from_millis(200), EventStatus::Delivered);
1906        let (trigger_shutdown1, shutdown_done1) =
1907            spawn_kafka(tx, config.clone(), true, false, LogNamespace::Legacy);
1908        let events1 = tokio::spawn(collect_n(rx1, NEVENTS));
1909
1910        sleep(Duration::from_secs(1)).await;
1911
1912        let (tx, rx2) = delay_pipeline(2, Duration::from_millis(DELAY), EventStatus::Delivered);
1913        let (trigger_shutdown2, shutdown_done2) =
1914            spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1915        let events2 = tokio::spawn(collect_n(rx2, NEVENTS));
1916
1917        sleep(Duration::from_secs(5)).await;
1918
1919        drop(trigger_shutdown1);
1920        let events1 = events1.await.unwrap();
1921        shutdown_done1.await;
1922
1923        sleep(Duration::from_secs(5)).await;
1924
1925        drop(trigger_shutdown2);
1926        let events2 = events2.await.unwrap();
1927        shutdown_done2.await;
1928
1929        sleep(Duration::from_secs(1)).await;
1930
1931        assert!(!events1.is_empty());
1932        assert!(!events2.is_empty());
1933
1934        match fetch_tpl_offset(&group_id, &topic, 0) {
1935            Offset::Offset(offset) => {
1936                assert!((offset as isize - events1.len() as isize).abs() <= 1)
1937            }
1938            o => panic!("Invalid offset for partition 0 {o:?}"),
1939        }
1940
1941        match fetch_tpl_offset(&group_id, &topic, 1) {
1942            Offset::Offset(offset) => {
1943                assert!((offset as isize - events2.len() as isize).abs() <= 1)
1944            }
1945            o => panic!("Invalid offset for partition 0 {o:?}"),
1946        }
1947
1948        let mut all_events = events1
1949            .into_iter()
1950            .chain(events2.into_iter())
1951            .flat_map(map_logs)
1952            .collect::<Vec<String>>();
1953        all_events.sort();
1954
1955        // Assert they are all in sequential order and no dupes, TODO
1956    }
1957
1958    #[tokio::test]
1959    async fn drains_acknowledgements_at_shutdown() {
1960        // 1. Send N events (if running against a pre-populated kafka topic, use send_count=0 and expect_count=expected number of messages; otherwise just set send_count)
1961        let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
1962            .unwrap_or_else(|_| "125000".into())
1963            .parse()
1964            .expect("Number of messages to send to kafka.");
1965        let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
1966            .unwrap_or_else(|_| format!("{send_count}"))
1967            .parse()
1968            .expect("Number of messages to expect consumers to process.");
1969        let delay_ms: u64 = std::env::var("KAFKA_SHUTDOWN_DELAY")
1970            .unwrap_or_else(|_| "2000".into())
1971            .parse()
1972            .expect("Number of milliseconds before shutting down first consumer.");
1973
1974        let (topic, group_id, _) = send_to_test_topic(1, send_count).await;
1975
1976        // 2. Run the kafka source to read some of the events
1977        // 3. Send a shutdown signal (at some point before all events are read)
1978        let mut opts = HashMap::new();
1979        // Set options to get partition EOF notifications, and fetch data in small/configurable size chunks
1980        opts.insert("enable.partition.eof".into(), "true".into());
1981        opts.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
1982        let events1 = {
1983            let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts.clone()));
1984            let (tx, rx) = SourceSender::new_test_errors(|_| false);
1985            let (trigger_shutdown, shutdown_done) =
1986                spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1987            let (events, _) = tokio::join!(rx.collect::<Vec<Event>>(), async move {
1988                sleep(Duration::from_millis(delay_ms)).await;
1989                drop(trigger_shutdown);
1990            });
1991            shutdown_done.await;
1992            events
1993        };
1994
1995        debug!("Consumer group.id: {}", &group_id);
1996        debug!(
1997            "First consumer read {} of {} messages.",
1998            events1.len(),
1999            expect_count
2000        );
2001
2002        // 4. Run the kafka source again to finish reading the events
2003        let events2 = {
2004            let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts));
2005            let (tx, rx) = SourceSender::new_test_errors(|_| false);
2006            let (trigger_shutdown, shutdown_done) =
2007                spawn_kafka(tx, config, true, true, LogNamespace::Legacy);
2008            let events = rx.collect::<Vec<Event>>().await;
2009            drop(trigger_shutdown);
2010            shutdown_done.await;
2011            events
2012        };
2013
2014        debug!(
2015            "Second consumer read {} of {} messages.",
2016            events2.len(),
2017            expect_count
2018        );
2019
2020        // 5. Total number of events processed should equal the number sent
2021        let total = events1.len() + events2.len();
2022        assert_ne!(
2023            events1.len(),
2024            0,
2025            "First batch of events should be non-zero (increase KAFKA_SHUTDOWN_DELAY?)"
2026        );
2027        assert_ne!(
2028            events2.len(),
2029            0,
2030            "Second batch of events should be non-zero (decrease KAFKA_SHUTDOWN_DELAY or increase KAFKA_SEND_COUNT?) "
2031        );
2032        assert_eq!(total, expect_count);
2033    }
2034
2035    async fn consume_with_rebalance(rebalance_strategy: String) {
2036        // 1. Send N events (if running against a pre-populated kafka topic, use send_count=0 and expect_count=expected number of messages; otherwise just set send_count)
2037        let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
2038            .unwrap_or_else(|_| "125000".into())
2039            .parse()
2040            .expect("Number of messages to send to kafka.");
2041        let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
2042            .unwrap_or_else(|_| format!("{send_count}"))
2043            .parse()
2044            .expect("Number of messages to expect consumers to process.");
2045        let delay_ms: u64 = std::env::var("KAFKA_CONSUMER_DELAY")
2046            .unwrap_or_else(|_| "2000".into())
2047            .parse()
2048            .expect("Number of milliseconds before shutting down first consumer.");
2049
2050        let (topic, group_id, _) = send_to_test_topic(6, send_count).await;
2051        debug!("Topic: {}", &topic);
2052        debug!("Consumer group.id: {}", &group_id);
2053
2054        // 2. Run the kafka source to read some of the events
2055        // 3. Start 2nd & 3rd consumers using the same group.id, triggering rebalance events
2056        let mut kafka_options = HashMap::new();
2057        kafka_options.insert("enable.partition.eof".into(), "true".into());
2058        kafka_options.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
2059        kafka_options.insert("partition.assignment.strategy".into(), rebalance_strategy);
2060        let config1 = make_config(
2061            &topic,
2062            &group_id,
2063            LogNamespace::Legacy,
2064            Some(kafka_options.clone()),
2065        );
2066        let config2 = config1.clone();
2067        let config3 = config1.clone();
2068        let config4 = config1.clone();
2069
2070        let (events1, events2, events3) = tokio::join!(
2071            async move {
2072                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2073                let (_trigger_shutdown, _shutdown_done) =
2074                    spawn_kafka(tx, config1, true, true, LogNamespace::Legacy);
2075
2076                rx.collect::<Vec<Event>>().await
2077            },
2078            async move {
2079                sleep(Duration::from_millis(delay_ms)).await;
2080                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2081                let (_trigger_shutdown, _shutdown_done) =
2082                    spawn_kafka(tx, config2, true, true, LogNamespace::Legacy);
2083
2084                rx.collect::<Vec<Event>>().await
2085            },
2086            async move {
2087                sleep(Duration::from_millis(delay_ms * 2)).await;
2088                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2089                let (_trigger_shutdown, _shutdown_done) =
2090                    spawn_kafka(tx, config3, true, true, LogNamespace::Legacy);
2091
2092                rx.collect::<Vec<Event>>().await
2093            }
2094        );
2095
2096        let unconsumed = async move {
2097            let (tx, rx) = SourceSender::new_test_errors(|_| false);
2098            let (_trigger_shutdown, _shutdown_done) =
2099                spawn_kafka(tx, config4, true, true, LogNamespace::Legacy);
2100
2101            rx.collect::<Vec<Event>>().await
2102        }
2103        .await;
2104
2105        debug!(
2106            "First consumer read {} of {} messages.",
2107            events1.len(),
2108            expect_count
2109        );
2110
2111        debug!(
2112            "Second consumer read {} of {} messages.",
2113            events2.len(),
2114            expect_count
2115        );
2116        debug!(
2117            "Third consumer read {} of {} messages.",
2118            events3.len(),
2119            expect_count
2120        );
2121
2122        // 5. Total number of events processed should equal the number sent
2123        let total = events1.len() + events2.len() + events3.len();
2124        assert_ne!(
2125            events1.len(),
2126            0,
2127            "First batch of events should be non-zero (increase delay?)"
2128        );
2129        assert_ne!(
2130            events2.len(),
2131            0,
2132            "Second batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2133        );
2134        assert_ne!(
2135            events3.len(),
2136            0,
2137            "Third batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2138        );
2139        assert_eq!(
2140            unconsumed.len(),
2141            0,
2142            "The first set of consumers should consume and ack all messages."
2143        );
2144        assert_eq!(total, expect_count);
2145    }
2146
2147    #[tokio::test]
2148    async fn drains_acknowledgements_during_rebalance_default_assignments() {
2149        // the default, eager rebalance strategies generally result in more revocations
2150        consume_with_rebalance("range,roundrobin".into()).await;
2151    }
2152    #[tokio::test]
2153    async fn drains_acknowledgements_during_rebalance_sticky_assignments() {
2154        // Cooperative rebalance strategies generally result in fewer revokes,
2155        // as only reassigned partitions are revoked
2156        consume_with_rebalance("cooperative-sticky".into()).await;
2157    }
2158
2159    fn map_logs(events: EventArray) -> impl Iterator<Item = String> {
2160        events.into_events().map(|event| {
2161            let log = event.into_log();
2162            format!(
2163                "{} {} {} {}",
2164                log["message"].to_string_lossy(),
2165                log["topic"].to_string_lossy(),
2166                log["partition"].to_string_lossy(),
2167                log["offset"].to_string_lossy(),
2168            )
2169        })
2170    }
2171}