Skip to main content

vector_core/
fanout.rs

1use std::{collections::HashMap, fmt, task::Poll, time::Instant};
2
3use futures::{Stream, StreamExt};
4use futures_util::{pending, poll};
5use indexmap::IndexMap;
6use tokio::sync::mpsc;
7use tokio_util::sync::ReusableBoxFuture;
8use vector_buffers::topology::channel::BufferSender;
9
10use crate::{
11    config::ComponentKey,
12    event::{EventArray, EventContainer},
13};
14
15pub enum ControlMessage {
16    /// Adds a new sink to the fanout.
17    Add(ComponentKey, BufferSender<EventArray>),
18
19    /// Removes a sink from the fanout.
20    Remove(ComponentKey),
21
22    /// Pauses a sink in the fanout.
23    ///
24    /// If a fanout has any paused sinks, subsequent sends cannot proceed until all paused sinks
25    /// have been replaced.
26    Pause(ComponentKey),
27
28    /// Replaces a paused sink with its new sender.
29    Replace(ComponentKey, BufferSender<EventArray>),
30}
31
32impl fmt::Debug for ControlMessage {
33    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34        write!(f, "ControlMessage::")?;
35        match self {
36            Self::Add(id, _) => write!(f, "Add({id:?})"),
37            Self::Remove(id) => write!(f, "Remove({id:?})"),
38            Self::Pause(id) => write!(f, "Pause({id:?})"),
39            Self::Replace(id, _) => write!(f, "Replace({id:?})"),
40        }
41    }
42}
43
44// TODO: We should really wrap this in a custom type that has dedicated methods for each operation
45// so that high-lever components don't need to do the raw channel sends, etc.
46pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
47
48pub struct Fanout {
49    senders: IndexMap<ComponentKey, Option<Sender>>,
50    control_channel: mpsc::UnboundedReceiver<ControlMessage>,
51    upstream_component: ComponentKey,
52}
53
54impl Fanout {
55    pub fn new(upstream_component: ComponentKey) -> (Self, ControlChannel) {
56        let (control_tx, control_rx) = mpsc::unbounded_channel();
57
58        let fanout = Self {
59            senders: Default::default(),
60            control_channel: control_rx,
61            upstream_component,
62        };
63
64        (fanout, control_tx)
65    }
66
67    /// Add a new sink as an output.
68    ///
69    /// # Panics
70    ///
71    /// Function will panic if a sink with the same ID is already present.
72    pub fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
73        assert!(
74            !self.senders.contains_key(&id),
75            "Adding duplicate output id to fanout: {id}"
76        );
77        self.senders.insert(id, Some(Sender::new(sink)));
78    }
79
80    fn remove(&mut self, id: &ComponentKey) {
81        assert!(
82            self.senders.shift_remove(id).is_some(),
83            "Removing nonexistent sink from fanout: {id}"
84        );
85    }
86
87    fn replace(&mut self, id: &ComponentKey, sink: BufferSender<EventArray>) {
88        match self.senders.get_mut(id) {
89            Some(sender) => {
90                // While a sink must be _known_ to be replaced, it must also be empty (previously
91                // paused or consumed when the `SendGroup` was created), otherwise an invalid
92                // sequence of control operations has been applied.
93                assert!(
94                    sender.replace(Sender::new(sink)).is_none(),
95                    "Replacing existing sink is not valid: {id}"
96                );
97            }
98            None => panic!("Replacing unknown sink from fanout: {id}"),
99        }
100    }
101
102    fn pause(&mut self, id: &ComponentKey) {
103        match self.senders.get_mut(id) {
104            Some(sender) => {
105                // A sink must be known and present to be replaced, otherwise an invalid sequence of
106                // control operations has been applied.
107                assert!(
108                    sender.take().is_some(),
109                    "Pausing nonexistent sink is not valid: {id}"
110                );
111            }
112            None => panic!("Pausing unknown sink from fanout: {id}"),
113        }
114    }
115
116    /// Waits for the next control message and applies it.
117    ///
118    /// Returns `true` if a message was processed, `false` if the control
119    /// channel was closed.
120    pub async fn recv_control_message(&mut self) -> bool {
121        match self.control_channel.recv().await {
122            Some(msg) => {
123                self.apply_control_message(msg);
124                true
125            }
126            None => false,
127        }
128    }
129
130    /// Apply a control message directly against this instance.
131    ///
132    /// This method should not be used if there is an active `SendGroup` being processed.
133    fn apply_control_message(&mut self, message: ControlMessage) {
134        trace!("Processing control message outside of send: {:?}", message);
135
136        match message {
137            ControlMessage::Add(id, sink) => self.add(id, sink),
138            ControlMessage::Remove(id) => self.remove(&id),
139            ControlMessage::Pause(id) => self.pause(&id),
140            ControlMessage::Replace(id, sink) => self.replace(&id, sink),
141        }
142    }
143
144    /// Waits for all paused sinks to be replaced.
145    ///
146    /// Control messages are processed until all senders have been replaced, so it is guaranteed
147    /// that when this method returns, all senders are ready for the next send to be triggered.
148    async fn wait_for_replacements(&mut self) {
149        while self.senders.values().any(Option::is_none) {
150            if let Some(msg) = self.control_channel.recv().await {
151                self.apply_control_message(msg);
152            } else {
153                // If the control channel is closed, there's nothing else we can do.
154
155                // TODO: It _seems_ like we should probably panic here, or at least return.
156                //
157                // Essentially, we should only land here if the control channel is closed but we
158                // haven't yet replaced all of the paused sinks... and we shouldn't have any paused
159                // sinks if Vector is stopping normally/gracefully, so like... we'd only get
160                // here during a configuration reload where we panicked in another thread due to
161                // an error of some sort, and the control channel got dropped, closed itself, and
162                // we're never going to be able to recover.
163                //
164                // The flipside is that by leaving it as-is, in the above hypothesized scenario,
165                // we'd avoid emitting additional panics/error logging when the root cause error was
166                // already doing so, like there's little value in knowing the fanout also hit an
167                // unrecoverable state if the whole process is about to come crashing down
168                // anyways... but it still does feel weird to have that encoded here by virtue of
169                // only a comment, and not an actual terminating expression. *shrug*
170            }
171        }
172    }
173
174    /// Send a stream of events to all connected sinks.
175    ///
176    /// This function will send events until the provided stream finishes. It will also block on the
177    /// resolution of any pending reload before proceeding with a send operation, similar to `send`.
178    ///
179    /// # Panics
180    ///
181    /// This method can panic if the fanout receives a control message that violates some invariant
182    /// about its current state (e.g. remove a nonexistent sink, etc.). This would imply a bug in
183    /// Vector's config reloading logic.
184    ///
185    /// # Errors
186    ///
187    /// If an error occurs while sending events to any of the connected sinks, an error variant will be
188    /// returned detailing the cause.
189    pub async fn send_stream(
190        &mut self,
191        events: impl Stream<Item = (EventArray, Instant)>,
192    ) -> crate::Result<()> {
193        tokio::pin!(events);
194        while let Some((event_array, send_reference)) = events.next().await {
195            self.send(event_array, Some(send_reference)).await?;
196        }
197        Ok(())
198    }
199
200    /// Send a batch of events to all connected sinks.
201    ///
202    /// This will block on the resolution of any pending reload before proceeding with the send
203    /// operation.
204    ///
205    /// # Panics
206    ///
207    /// This method can panic if the fanout receives a control message that violates some invariant
208    /// about its current state (e.g. remove a nonexistent sink, etc). This would imply a bug in
209    /// Vector's config reloading logic.
210    ///
211    /// # Errors
212    ///
213    /// If an error occurs while sending events to any of the connected sinks, an error variant will be
214    /// returned detailing the cause.
215    pub async fn send(
216        &mut self,
217        events: EventArray,
218        send_reference: Option<Instant>,
219    ) -> crate::Result<()> {
220        // First, process any available control messages in a non-blocking fashion.
221        while let Ok(message) = self.control_channel.try_recv() {
222            self.apply_control_message(message);
223        }
224
225        // Wait for any senders that are paused to be replaced first before continuing with the send.
226        self.wait_for_replacements().await;
227
228        // Drop empty event batches before they reach any downstream buffer, this is technically
229        // programmer error. In debug/test builds the `debug_assert!` makes the underlying bug fail
230        // loudly.
231        debug_assert!(
232            !events.is_empty(),
233            "Fanout received empty event batch from upstream component '{}'",
234            self.upstream_component,
235        );
236        // TODO: Wrap the conditional below with `std::hint::unlikely` once it stabilizes. This is an
237        // applicable situation to use it in since the following conditional should never evaluate to
238        // true.
239        #[cfg(not(debug_assertions))]
240        if events.is_empty() {
241            warn!(
242                message = "Dropping empty event batch emitted by upstream component. This is likely a bug in that component.",
243                component_id = %self.upstream_component,
244                downstream_count = self.senders.len(),
245            );
246            return Ok(());
247        }
248
249        // Nothing to send if we have no sender.
250        if self.senders.is_empty() {
251            trace!("No senders present.");
252            return Ok(());
253        }
254
255        // Keep track of whether the control channel has returned `Ready(None)`, and stop polling
256        // it once it has. If we don't do this check, it will continue to return `Ready(None)` any
257        // time it is polled, which can lead to a busy loop below.
258        //
259        // In real life this is likely a non-issue, but it can lead to strange behavior in tests if
260        // left unhandled.
261        let mut control_channel_open = true;
262
263        // Create our send group which arms all senders to send the given events, and handles
264        // adding/removing/replacing senders while the send is in-flight.
265        let mut send_group = SendGroup::new(&mut self.senders, events, send_reference);
266
267        loop {
268            tokio::select! {
269                // Semantically, it's not hugely important that this select is biased. It does,
270                // however, make testing simpler when you can count on control messages being
271                // processed first.
272                biased;
273
274                maybe_msg = self.control_channel.recv(), if control_channel_open => {
275                    trace!("Processing control message inside of send: {:?}", maybe_msg);
276
277                    // During a send operation, control messages must be applied via the
278                    // `SendGroup`, since it has exclusive access to the senders.
279                    match maybe_msg {
280                        Some(ControlMessage::Add(id, sink)) => {
281                            send_group.add(id, sink);
282                        },
283                        Some(ControlMessage::Remove(id)) => {
284                            send_group.remove(&id);
285                        },
286                        Some(ControlMessage::Pause(id)) => {
287                            send_group.pause(&id);
288                        },
289                        Some(ControlMessage::Replace(id, sink)) => {
290                            send_group.replace(&id, Sender::new(sink));
291                        },
292                        None => {
293                            // Control channel is closed, which means Vector is shutting down.
294                            control_channel_open = false;
295                        }
296                    }
297                }
298
299                result = send_group.send() => match result {
300                    Ok(()) => {
301                        trace!("Sent item to fanout.");
302                        break;
303                    },
304                    Err(e) => return Err(e),
305                }
306            }
307        }
308
309        Ok(())
310    }
311}
312
313struct SendGroup<'a> {
314    senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
315    sends: HashMap<ComponentKey, ReusableBoxFuture<'static, crate::Result<Sender>>>,
316}
317
318impl<'a> SendGroup<'a> {
319    fn new(
320        senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
321        events: EventArray,
322        send_reference: Option<Instant>,
323    ) -> Self {
324        // If we don't have a valid `Sender` for all sinks, then something went wrong in our logic
325        // to ensure we were starting with all valid/idle senders prior to initiating the send.
326        debug_assert!(senders.values().all(Option::is_some));
327
328        let last_sender_idx = senders.len().saturating_sub(1);
329        let mut events = Some(events);
330
331        // We generate a send future for each sender we have, which arms them with the events to
332        // send but also takes ownership of the sender itself, which we give back when the sender completes.
333        let mut sends = HashMap::new();
334        for (i, (key, sender)) in senders.iter_mut().enumerate() {
335            let mut sender = sender
336                .take()
337                .expect("sender must be present to initialize SendGroup");
338
339            // First, arm each sender with the item to actually send.
340            if i == last_sender_idx {
341                sender.input = events.take();
342            } else {
343                sender.input.clone_from(&events);
344            }
345            sender.send_reference = send_reference;
346
347            // Now generate a send for that sender which we'll drive to completion.
348            let send = async move {
349                sender.flush().await?;
350                Ok(sender)
351            };
352
353            sends.insert(key.clone(), ReusableBoxFuture::new(send));
354        }
355
356        Self { senders, sends }
357    }
358
359    fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
360        if let Some(send) = self.sends.remove(id) {
361            // Deliberately not instrumented with the current span: this drains a send to a sink
362            // that has just been detached from the topology, so it is unrelated to the upstream
363            // component that owns this fanout. Attaching the current span would mis-tag this
364            // task's logs with the upstream component's identity rather than the detached sink's.
365            tokio::spawn(async move {
366                if let Err(e) = send.await {
367                    warn!(
368                        cause = %e,
369                        message = "Encountered error writing to component after detaching from topology.",
370                    );
371                }
372            });
373            true
374        } else {
375            false
376        }
377    }
378
379    #[allow(clippy::needless_pass_by_value)]
380    fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
381        // When we're in the middle of a send, we can only keep track of the new sink, but can't
382        // actually send to it, as we don't have the item to send... so only add it to `senders`.
383        assert!(
384            self.senders
385                .insert(id.clone(), Some(Sender::new(sink)))
386                .is_none(),
387            "Adding duplicate output id to fanout: {id}"
388        );
389    }
390
391    fn remove(&mut self, id: &ComponentKey) {
392        // We may or may not be removing a sender that we're try to drive a send against, so we have
393        // to also detach the send future for the sender if it exists, otherwise we'd be hanging
394        // around still trying to send to it.
395        assert!(
396            self.senders.shift_remove(id).is_some(),
397            "Removing nonexistent sink from fanout: {id}"
398        );
399
400        // Now try and detach the in-flight send, if it exists.
401        //
402        // We don't ensure that a send was or wasn't detached because this could be called either
403        // during an in-flight send _or_ after the send has completed.
404        self.try_detach_send(id);
405    }
406
407    fn replace(&mut self, id: &ComponentKey, sink: Sender) {
408        match self.senders.get_mut(id) {
409            Some(sender) => {
410                // While a sink must be _known_ to be replaced, it must also be empty (previously
411                // paused or consumed when the `SendGroup` was created), otherwise an invalid
412                // sequence of control operations has been applied.
413                assert!(
414                    sender.replace(sink).is_none(),
415                    "Replacing existing sink is not valid: {id}"
416                );
417            }
418            None => panic!("Replacing unknown sink from fanout: {id}"),
419        }
420    }
421
422    fn pause(&mut self, id: &ComponentKey) {
423        match self.senders.get_mut(id) {
424            Some(sender) => {
425                // If we don't currently own the `Sender` for the given component, that implies
426                // there is an in-flight send: a `SendGroup` cannot be created without all
427                // participating components having a send operation triggered.
428                //
429                // As such, `try_detach_send` should always succeed here, as pausing only occurs
430                // when a component is being _replaced_, and should not be called multiple times.
431                if sender.take().is_none() {
432                    assert!(
433                        self.try_detach_send(id),
434                        "Pausing already-paused sink is invalid: {id}"
435                    );
436                }
437            }
438            None => panic!("Pausing unknown sink from fanout: {id}"),
439        }
440    }
441
442    async fn send(&mut self) -> crate::Result<()> {
443        // Right now, we do a linear scan of all sends, polling each send once in order to avoid
444        // waiting forever, such that we can let our control messages get picked up while sends are
445        // waiting.
446        loop {
447            if self.sends.is_empty() {
448                break;
449            }
450
451            let mut done = Vec::new();
452            for (key, send) in &mut self.sends {
453                if let Poll::Ready(result) = poll!(send.get_pin()) {
454                    let sender = result?;
455
456                    // The send completed, so we restore the sender and mark ourselves so that this
457                    // future gets dropped.
458                    done.push((key.clone(), sender));
459                }
460            }
461
462            for (key, sender) in done {
463                self.sends.remove(&key);
464                self.replace(&key, sender);
465            }
466
467            if !self.sends.is_empty() {
468                // We manually yield ourselves because we've polled all of the sends at this point,
469                // so if any are left, then we're scheduled for a wake-up... this is a really poor
470                // approximation of what `FuturesUnordered` is doing.
471                pending!();
472            }
473        }
474
475        Ok(())
476    }
477}
478
479struct Sender {
480    inner: BufferSender<EventArray>,
481    input: Option<EventArray>,
482    send_reference: Option<Instant>,
483}
484
485impl Sender {
486    fn new(inner: BufferSender<EventArray>) -> Self {
487        Self {
488            inner,
489            input: None,
490            send_reference: None,
491        }
492    }
493
494    async fn flush(&mut self) -> crate::Result<()> {
495        let send_reference = self.send_reference.take();
496        if let Some(input) = self.input.take() {
497            self.inner.send(input, send_reference).await?;
498            self.inner.flush().await?;
499        }
500
501        Ok(())
502    }
503}
504
505#[cfg(test)]
506mod tests {
507    use std::{mem, num::NonZeroUsize};
508
509    use futures::poll;
510    use tokio::sync::mpsc::UnboundedSender;
511    use tokio_test::{assert_pending, assert_ready, task::spawn};
512    use tracing::Span;
513    use vector_buffers::{
514        WhenFull,
515        topology::{
516            builder::TopologyBuilder,
517            channel::{BufferReceiver, BufferSender},
518        },
519    };
520    use vrl::value::Value;
521
522    use super::{ControlMessage, Fanout};
523    use crate::{
524        config::ComponentKey,
525        event::{Event, EventArray, EventContainer, LogEvent},
526        test_util::{collect_ready, collect_ready_events},
527    };
528
529    fn build_sender_pair(
530        capacity: usize,
531    ) -> (BufferSender<EventArray>, BufferReceiver<EventArray>) {
532        TopologyBuilder::standalone_memory(
533            NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
534            WhenFull::Block,
535            &Span::current(),
536            None,
537            None,
538        )
539    }
540
541    fn build_sender_pairs(
542        capacities: &[usize],
543    ) -> Vec<(BufferSender<EventArray>, BufferReceiver<EventArray>)> {
544        let mut pairs = Vec::new();
545        for capacity in capacities {
546            pairs.push(build_sender_pair(*capacity));
547        }
548        pairs
549    }
550
551    fn fanout_from_senders(
552        capacities: &[usize],
553    ) -> (
554        Fanout,
555        UnboundedSender<ControlMessage>,
556        Vec<BufferReceiver<EventArray>>,
557    ) {
558        let (mut fanout, control) = Fanout::new(ComponentKey::from("test_upstream"));
559        let pairs = build_sender_pairs(capacities);
560
561        let mut receivers = Vec::new();
562        for (i, (sender, receiver)) in pairs.into_iter().enumerate() {
563            fanout.add(ComponentKey::from(i.to_string()), sender);
564            receivers.push(receiver);
565        }
566
567        (fanout, control, receivers)
568    }
569
570    fn add_sender_to_fanout(
571        fanout: &mut Fanout,
572        receivers: &mut Vec<BufferReceiver<EventArray>>,
573        sender_id: usize,
574        capacity: usize,
575    ) {
576        let (sender, receiver) = build_sender_pair(capacity);
577        receivers.push(receiver);
578
579        fanout.add(ComponentKey::from(sender_id.to_string()), sender);
580    }
581
582    fn remove_sender_from_fanout(control: &UnboundedSender<ControlMessage>, sender_id: usize) {
583        control
584            .send(ControlMessage::Remove(ComponentKey::from(
585                sender_id.to_string(),
586            )))
587            .expect("sending control message should not fail");
588    }
589
590    fn replace_sender_in_fanout(
591        control: &UnboundedSender<ControlMessage>,
592        receivers: &mut [BufferReceiver<EventArray>],
593        sender_id: usize,
594        capacity: usize,
595    ) -> BufferReceiver<EventArray> {
596        let (sender, receiver) = build_sender_pair(capacity);
597        let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
598
599        control
600            .send(ControlMessage::Pause(ComponentKey::from(
601                sender_id.to_string(),
602            )))
603            .expect("sending control message should not fail");
604
605        control
606            .send(ControlMessage::Replace(
607                ComponentKey::from(sender_id.to_string()),
608                sender,
609            ))
610            .expect("sending control message should not fail");
611
612        old_receiver
613    }
614
615    fn start_sender_replace(
616        control: &UnboundedSender<ControlMessage>,
617        receivers: &mut [BufferReceiver<EventArray>],
618        sender_id: usize,
619        capacity: usize,
620    ) -> (BufferReceiver<EventArray>, BufferSender<EventArray>) {
621        let (sender, receiver) = build_sender_pair(capacity);
622        let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
623
624        control
625            .send(ControlMessage::Pause(ComponentKey::from(
626                sender_id.to_string(),
627            )))
628            .expect("sending control message should not fail");
629
630        (old_receiver, sender)
631    }
632
633    fn finish_sender_resume(
634        control: &UnboundedSender<ControlMessage>,
635        sender_id: usize,
636        sender: BufferSender<EventArray>,
637    ) {
638        control
639            .send(ControlMessage::Replace(
640                ComponentKey::from(sender_id.to_string()),
641                sender,
642            ))
643            .expect("sending control message should not fail");
644    }
645
646    fn unwrap_log_event_message<E>(event: E) -> String
647    where
648        E: EventContainer,
649    {
650        let event = event
651            .into_events()
652            .next()
653            .expect("must have at least one event");
654        let event = event.into_log();
655        event
656            .get("message")
657            .and_then(Value::as_bytes)
658            .and_then(|b| String::from_utf8(b.to_vec()).ok())
659            .expect("must be valid log event with `message` field")
660    }
661
662    #[tokio::test]
663    async fn fanout_writes_to_all() {
664        let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]);
665        let events = make_event_array(2);
666
667        let clones = events.clone();
668        fanout.send(clones, None).await.expect("should not fail");
669
670        for receiver in receivers {
671            assert_eq!(
672                collect_ready(receiver.into_stream()),
673                std::slice::from_ref(&events)
674            );
675        }
676    }
677
678    #[tokio::test]
679    async fn fanout_notready() {
680        let (mut fanout, _, mut receivers) = fanout_from_senders(&[2, 1, 2]);
681        let events = make_events(2);
682
683        // First send should immediately complete because all senders have capacity:
684        let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
685        assert_ready!(first_send.poll()).expect("should not fail");
686        drop(first_send);
687
688        // Second send should return pending because sender B is now full:
689        let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
690        assert_pending!(second_send.poll());
691
692        // Now read an item from each receiver to free up capacity for the second sender:
693        for receiver in &mut receivers {
694            assert_eq!(Some(events[0].clone().into()), receiver.next().await);
695        }
696
697        // Now our second send should actually be able to complete:
698        assert_ready!(second_send.poll()).expect("should not fail");
699        drop(second_send);
700
701        // And make sure the second item comes through:
702        for receiver in &mut receivers {
703            assert_eq!(Some(events[1].clone().into()), receiver.next().await);
704        }
705    }
706
707    #[tokio::test]
708    async fn fanout_grow() {
709        let (mut fanout, _, mut receivers) = fanout_from_senders(&[4, 4]);
710        let events = make_events(3);
711
712        // Send in the first two events to our initial two senders:
713        fanout
714            .send(events[0].clone().into(), None)
715            .await
716            .expect("should not fail");
717        fanout
718            .send(events[1].clone().into(), None)
719            .await
720            .expect("should not fail");
721
722        // Now add a third sender:
723        add_sender_to_fanout(&mut fanout, &mut receivers, 2, 4);
724
725        // Send in the last event which all three senders will now get:
726        fanout
727            .send(events[2].clone().into(), None)
728            .await
729            .expect("should not fail");
730
731        // Make sure the first two senders got all three events, but the third sender only got the
732        // last event:
733        let expected_events = [&events, &events, &events[2..]];
734        for (i, receiver) in receivers.into_iter().enumerate() {
735            assert_eq!(
736                collect_ready_events(receiver.into_stream()),
737                expected_events[i]
738            );
739        }
740    }
741
742    #[tokio::test]
743    async fn fanout_shrink() {
744        let (mut fanout, control, receivers) = fanout_from_senders(&[4, 4]);
745        let events = make_events(3);
746
747        // Send in the first two events to our initial two senders:
748        fanout
749            .send(events[0].clone().into(), None)
750            .await
751            .expect("should not fail");
752        fanout
753            .send(events[1].clone().into(), None)
754            .await
755            .expect("should not fail");
756
757        // Now remove the second sender:
758        remove_sender_from_fanout(&control, 1);
759
760        // Send in the last event which only the first sender will get:
761        fanout
762            .send(events[2].clone().into(), None)
763            .await
764            .expect("should not fail");
765
766        // Make sure the first sender got all three events, but the second sender only got the first two:
767        let expected_events = [&events, &events[..2]];
768        for (i, receiver) in receivers.into_iter().enumerate() {
769            assert_eq!(
770                collect_ready_events(receiver.into_stream()),
771                expected_events[i]
772            );
773        }
774    }
775
776    #[tokio::test]
777    async fn fanout_shrink_when_notready() {
778        // This test exercises that when we're waiting for a send to complete, we can correctly
779        // remove a sink whether or not it is the one that the send operation is still waiting on.
780        //
781        // This means that if we remove a sink that a current send is blocked on, we should be able
782        // to immediately proceed.
783        let events = make_events(2);
784        let expected_first_event = unwrap_log_event_message(events[0].clone());
785        let expected_second_event = unwrap_log_event_message(events[1].clone());
786
787        let cases = [
788            // Sender ID to drop, whether the second send should succeed after dropping, and the
789            // final "last event" a receiver should see after the second send:
790            (
791                0,
792                false,
793                [
794                    expected_second_event.clone(),
795                    expected_first_event.clone(),
796                    expected_second_event.clone(),
797                ],
798            ),
799            (
800                1,
801                true,
802                [
803                    expected_second_event.clone(),
804                    expected_second_event.clone(),
805                    expected_second_event.clone(),
806                ],
807            ),
808            (
809                2,
810                false,
811                [
812                    expected_second_event.clone(),
813                    expected_first_event.clone(),
814                    expected_second_event.clone(),
815                ],
816            ),
817        ];
818
819        for (sender_id, should_complete, expected_last_seen) in cases {
820            let (mut fanout, control, mut receivers) = fanout_from_senders(&[2, 1, 2]);
821
822            // First send should immediately complete because all senders have capacity:
823            let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
824            assert_ready!(first_send.poll()).expect("should not fail");
825            drop(first_send);
826
827            // Second send should return pending because sender B is now full:
828            let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
829            assert_pending!(second_send.poll());
830
831            // Now drop our chosen sender and assert that polling the second send behaves as expected:
832            remove_sender_from_fanout(&control, sender_id);
833
834            if should_complete {
835                assert_ready!(second_send.poll()).expect("should not fail");
836            } else {
837                assert_pending!(second_send.poll());
838            }
839            drop(second_send);
840
841            // Now grab the last value available to each receiver and assert it's the second event.
842            drop(fanout);
843
844            let mut last_seen = Vec::new();
845            for receiver in &mut receivers {
846                let mut events = Vec::new();
847                while let Some(event) = receiver.next().await {
848                    events.insert(0, event);
849                }
850
851                last_seen.push(unwrap_log_event_message(events.remove(0)));
852            }
853
854            assert_eq!(&expected_last_seen[..], &last_seen);
855        }
856    }
857
858    #[tokio::test]
859    async fn fanout_no_sinks() {
860        let (mut fanout, _) = Fanout::new(ComponentKey::from("test_upstream"));
861        let events = make_events(2);
862
863        fanout
864            .send(events[0].clone().into(), None)
865            .await
866            .expect("should not fail");
867        fanout
868            .send(events[1].clone().into(), None)
869            .await
870            .expect("should not fail");
871    }
872
873    #[cfg(debug_assertions)]
874    #[tokio::test]
875    #[should_panic(expected = "Fanout received empty event batch from upstream component")]
876    async fn fanout_panics_on_empty_event_array_in_debug_builds() {
877        let (mut fanout, _, _receivers) = fanout_from_senders(&[2, 2]);
878        let empty: EventArray = Vec::<LogEvent>::new().into();
879
880        _ = fanout.send(empty, None).await;
881    }
882
883    #[tokio::test]
884    async fn fanout_replace() {
885        let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]);
886        let events = make_events(3);
887
888        // First two sends should immediately complete because all senders have capacity:
889        fanout
890            .send(events[0].clone().into(), None)
891            .await
892            .expect("should not fail");
893        fanout
894            .send(events[1].clone().into(), None)
895            .await
896            .expect("should not fail");
897
898        // Replace the first sender with a brand new one before polling again:
899        let old_first_receiver = replace_sender_in_fanout(&control, &mut receivers, 0, 4);
900
901        // And do the third send which should also complete since all senders still have capacity:
902        fanout
903            .send(events[2].clone().into(), None)
904            .await
905            .expect("should not fail");
906
907        // Now make sure that the new "first" sender only got the third event, but that the second and
908        // third sender got all three events:
909        let expected_events = [&events[2..], &events, &events];
910        for (i, receiver) in receivers.into_iter().enumerate() {
911            assert_eq!(
912                collect_ready_events(receiver.into_stream()),
913                expected_events[i]
914            );
915        }
916
917        // And make sure our original "first" sender got the first two events:
918        assert_eq!(
919            collect_ready_events(old_first_receiver.into_stream()),
920            &events[..2]
921        );
922    }
923
924    #[tokio::test]
925    async fn fanout_wait() {
926        let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4]);
927        let events = make_events(3);
928
929        // First two sends should immediately complete because all senders have capacity:
930        let send1 = Box::pin(fanout.send(events[0].clone().into(), None));
931        assert_ready!(poll!(send1)).expect("should not fail");
932        let send2 = Box::pin(fanout.send(events[1].clone().into(), None));
933        assert_ready!(poll!(send2)).expect("should not fail");
934
935        // Now do an empty replace on the second sender, which we'll test to make sure that `Fanout`
936        // doesn't let any writes through until we replace it properly.  We get back the receiver
937        // we've replaced, but also the sender that we want to eventually install:
938        let (old_first_receiver, new_first_sender) =
939            start_sender_replace(&control, &mut receivers, 0, 4);
940
941        // Third send should return pending because now we have an in-flight replacement:
942        let mut third_send = spawn(fanout.send(events[2].clone().into(), None));
943        assert_pending!(third_send.poll());
944
945        // Finish our sender replacement, which should wake up the third send and allow it to
946        // actually complete:
947        finish_sender_resume(&control, 0, new_first_sender);
948        assert!(third_send.is_woken());
949        assert_ready!(third_send.poll()).expect("should not fail");
950
951        // Make sure the original first sender got the first two events, the new first sender got
952        // the last event, and the second sender got all three:
953        assert_eq!(
954            collect_ready_events(old_first_receiver.into_stream()),
955            &events[0..2]
956        );
957
958        let expected_events = [&events[2..], &events];
959        for (i, receiver) in receivers.into_iter().enumerate() {
960            assert_eq!(
961                collect_ready_events(receiver.into_stream()),
962                expected_events[i]
963            );
964        }
965    }
966
967    fn make_events_inner(count: usize) -> impl Iterator<Item = LogEvent> {
968        (0..count).map(|i| LogEvent::from(format!("line {i}")))
969    }
970
971    fn make_events(count: usize) -> Vec<Event> {
972        make_events_inner(count).map(Into::into).collect()
973    }
974
975    fn make_event_array(count: usize) -> EventArray {
976        make_events_inner(count).collect::<Vec<_>>().into()
977    }
978}