1use std::{collections::HashMap, fmt, task::Poll, time::Instant};
2
3use futures::{Stream, StreamExt};
4use futures_util::{pending, poll};
5use indexmap::IndexMap;
6use tokio::sync::mpsc;
7use tokio_util::sync::ReusableBoxFuture;
8use vector_buffers::topology::channel::BufferSender;
9
10use crate::{
11 config::ComponentKey,
12 event::{EventArray, EventContainer},
13};
14
15pub enum ControlMessage {
16 Add(ComponentKey, BufferSender<EventArray>),
18
19 Remove(ComponentKey),
21
22 Pause(ComponentKey),
27
28 Replace(ComponentKey, BufferSender<EventArray>),
30}
31
32impl fmt::Debug for ControlMessage {
33 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34 write!(f, "ControlMessage::")?;
35 match self {
36 Self::Add(id, _) => write!(f, "Add({id:?})"),
37 Self::Remove(id) => write!(f, "Remove({id:?})"),
38 Self::Pause(id) => write!(f, "Pause({id:?})"),
39 Self::Replace(id, _) => write!(f, "Replace({id:?})"),
40 }
41 }
42}
43
44pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
47
48pub struct Fanout {
49 senders: IndexMap<ComponentKey, Option<Sender>>,
50 control_channel: mpsc::UnboundedReceiver<ControlMessage>,
51 upstream_component: ComponentKey,
52}
53
54impl Fanout {
55 pub fn new(upstream_component: ComponentKey) -> (Self, ControlChannel) {
56 let (control_tx, control_rx) = mpsc::unbounded_channel();
57
58 let fanout = Self {
59 senders: Default::default(),
60 control_channel: control_rx,
61 upstream_component,
62 };
63
64 (fanout, control_tx)
65 }
66
67 pub fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
73 assert!(
74 !self.senders.contains_key(&id),
75 "Adding duplicate output id to fanout: {id}"
76 );
77 self.senders.insert(id, Some(Sender::new(sink)));
78 }
79
80 fn remove(&mut self, id: &ComponentKey) {
81 assert!(
82 self.senders.shift_remove(id).is_some(),
83 "Removing nonexistent sink from fanout: {id}"
84 );
85 }
86
87 fn replace(&mut self, id: &ComponentKey, sink: BufferSender<EventArray>) {
88 match self.senders.get_mut(id) {
89 Some(sender) => {
90 assert!(
94 sender.replace(Sender::new(sink)).is_none(),
95 "Replacing existing sink is not valid: {id}"
96 );
97 }
98 None => panic!("Replacing unknown sink from fanout: {id}"),
99 }
100 }
101
102 fn pause(&mut self, id: &ComponentKey) {
103 match self.senders.get_mut(id) {
104 Some(sender) => {
105 assert!(
108 sender.take().is_some(),
109 "Pausing nonexistent sink is not valid: {id}"
110 );
111 }
112 None => panic!("Pausing unknown sink from fanout: {id}"),
113 }
114 }
115
116 pub async fn recv_control_message(&mut self) -> bool {
121 match self.control_channel.recv().await {
122 Some(msg) => {
123 self.apply_control_message(msg);
124 true
125 }
126 None => false,
127 }
128 }
129
130 fn apply_control_message(&mut self, message: ControlMessage) {
134 trace!("Processing control message outside of send: {:?}", message);
135
136 match message {
137 ControlMessage::Add(id, sink) => self.add(id, sink),
138 ControlMessage::Remove(id) => self.remove(&id),
139 ControlMessage::Pause(id) => self.pause(&id),
140 ControlMessage::Replace(id, sink) => self.replace(&id, sink),
141 }
142 }
143
144 async fn wait_for_replacements(&mut self) {
149 while self.senders.values().any(Option::is_none) {
150 if let Some(msg) = self.control_channel.recv().await {
151 self.apply_control_message(msg);
152 } else {
153 }
171 }
172 }
173
174 pub async fn send_stream(
190 &mut self,
191 events: impl Stream<Item = (EventArray, Instant)>,
192 ) -> crate::Result<()> {
193 tokio::pin!(events);
194 while let Some((event_array, send_reference)) = events.next().await {
195 self.send(event_array, Some(send_reference)).await?;
196 }
197 Ok(())
198 }
199
200 pub async fn send(
216 &mut self,
217 events: EventArray,
218 send_reference: Option<Instant>,
219 ) -> crate::Result<()> {
220 while let Ok(message) = self.control_channel.try_recv() {
222 self.apply_control_message(message);
223 }
224
225 self.wait_for_replacements().await;
227
228 debug_assert!(
232 !events.is_empty(),
233 "Fanout received empty event batch from upstream component '{}'",
234 self.upstream_component,
235 );
236 #[cfg(not(debug_assertions))]
240 if events.is_empty() {
241 warn!(
242 message = "Dropping empty event batch emitted by upstream component. This is likely a bug in that component.",
243 component_id = %self.upstream_component,
244 downstream_count = self.senders.len(),
245 );
246 return Ok(());
247 }
248
249 if self.senders.is_empty() {
251 trace!("No senders present.");
252 return Ok(());
253 }
254
255 let mut control_channel_open = true;
262
263 let mut send_group = SendGroup::new(&mut self.senders, events, send_reference);
266
267 loop {
268 tokio::select! {
269 biased;
273
274 maybe_msg = self.control_channel.recv(), if control_channel_open => {
275 trace!("Processing control message inside of send: {:?}", maybe_msg);
276
277 match maybe_msg {
280 Some(ControlMessage::Add(id, sink)) => {
281 send_group.add(id, sink);
282 },
283 Some(ControlMessage::Remove(id)) => {
284 send_group.remove(&id);
285 },
286 Some(ControlMessage::Pause(id)) => {
287 send_group.pause(&id);
288 },
289 Some(ControlMessage::Replace(id, sink)) => {
290 send_group.replace(&id, Sender::new(sink));
291 },
292 None => {
293 control_channel_open = false;
295 }
296 }
297 }
298
299 result = send_group.send() => match result {
300 Ok(()) => {
301 trace!("Sent item to fanout.");
302 break;
303 },
304 Err(e) => return Err(e),
305 }
306 }
307 }
308
309 Ok(())
310 }
311}
312
313struct SendGroup<'a> {
314 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
315 sends: HashMap<ComponentKey, ReusableBoxFuture<'static, crate::Result<Sender>>>,
316}
317
318impl<'a> SendGroup<'a> {
319 fn new(
320 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
321 events: EventArray,
322 send_reference: Option<Instant>,
323 ) -> Self {
324 debug_assert!(senders.values().all(Option::is_some));
327
328 let last_sender_idx = senders.len().saturating_sub(1);
329 let mut events = Some(events);
330
331 let mut sends = HashMap::new();
334 for (i, (key, sender)) in senders.iter_mut().enumerate() {
335 let mut sender = sender
336 .take()
337 .expect("sender must be present to initialize SendGroup");
338
339 if i == last_sender_idx {
341 sender.input = events.take();
342 } else {
343 sender.input.clone_from(&events);
344 }
345 sender.send_reference = send_reference;
346
347 let send = async move {
349 sender.flush().await?;
350 Ok(sender)
351 };
352
353 sends.insert(key.clone(), ReusableBoxFuture::new(send));
354 }
355
356 Self { senders, sends }
357 }
358
359 fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
360 if let Some(send) = self.sends.remove(id) {
361 tokio::spawn(async move {
366 if let Err(e) = send.await {
367 warn!(
368 cause = %e,
369 message = "Encountered error writing to component after detaching from topology.",
370 );
371 }
372 });
373 true
374 } else {
375 false
376 }
377 }
378
379 #[allow(clippy::needless_pass_by_value)]
380 fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
381 assert!(
384 self.senders
385 .insert(id.clone(), Some(Sender::new(sink)))
386 .is_none(),
387 "Adding duplicate output id to fanout: {id}"
388 );
389 }
390
391 fn remove(&mut self, id: &ComponentKey) {
392 assert!(
396 self.senders.shift_remove(id).is_some(),
397 "Removing nonexistent sink from fanout: {id}"
398 );
399
400 self.try_detach_send(id);
405 }
406
407 fn replace(&mut self, id: &ComponentKey, sink: Sender) {
408 match self.senders.get_mut(id) {
409 Some(sender) => {
410 assert!(
414 sender.replace(sink).is_none(),
415 "Replacing existing sink is not valid: {id}"
416 );
417 }
418 None => panic!("Replacing unknown sink from fanout: {id}"),
419 }
420 }
421
422 fn pause(&mut self, id: &ComponentKey) {
423 match self.senders.get_mut(id) {
424 Some(sender) => {
425 if sender.take().is_none() {
432 assert!(
433 self.try_detach_send(id),
434 "Pausing already-paused sink is invalid: {id}"
435 );
436 }
437 }
438 None => panic!("Pausing unknown sink from fanout: {id}"),
439 }
440 }
441
442 async fn send(&mut self) -> crate::Result<()> {
443 loop {
447 if self.sends.is_empty() {
448 break;
449 }
450
451 let mut done = Vec::new();
452 for (key, send) in &mut self.sends {
453 if let Poll::Ready(result) = poll!(send.get_pin()) {
454 let sender = result?;
455
456 done.push((key.clone(), sender));
459 }
460 }
461
462 for (key, sender) in done {
463 self.sends.remove(&key);
464 self.replace(&key, sender);
465 }
466
467 if !self.sends.is_empty() {
468 pending!();
472 }
473 }
474
475 Ok(())
476 }
477}
478
479struct Sender {
480 inner: BufferSender<EventArray>,
481 input: Option<EventArray>,
482 send_reference: Option<Instant>,
483}
484
485impl Sender {
486 fn new(inner: BufferSender<EventArray>) -> Self {
487 Self {
488 inner,
489 input: None,
490 send_reference: None,
491 }
492 }
493
494 async fn flush(&mut self) -> crate::Result<()> {
495 let send_reference = self.send_reference.take();
496 if let Some(input) = self.input.take() {
497 self.inner.send(input, send_reference).await?;
498 self.inner.flush().await?;
499 }
500
501 Ok(())
502 }
503}
504
505#[cfg(test)]
506mod tests {
507 use std::{mem, num::NonZeroUsize};
508
509 use futures::poll;
510 use tokio::sync::mpsc::UnboundedSender;
511 use tokio_test::{assert_pending, assert_ready, task::spawn};
512 use tracing::Span;
513 use vector_buffers::{
514 WhenFull,
515 topology::{
516 builder::TopologyBuilder,
517 channel::{BufferReceiver, BufferSender},
518 },
519 };
520 use vrl::value::Value;
521
522 use super::{ControlMessage, Fanout};
523 use crate::{
524 config::ComponentKey,
525 event::{Event, EventArray, EventContainer, LogEvent},
526 test_util::{collect_ready, collect_ready_events},
527 };
528
529 fn build_sender_pair(
530 capacity: usize,
531 ) -> (BufferSender<EventArray>, BufferReceiver<EventArray>) {
532 TopologyBuilder::standalone_memory(
533 NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
534 WhenFull::Block,
535 &Span::current(),
536 None,
537 None,
538 )
539 }
540
541 fn build_sender_pairs(
542 capacities: &[usize],
543 ) -> Vec<(BufferSender<EventArray>, BufferReceiver<EventArray>)> {
544 let mut pairs = Vec::new();
545 for capacity in capacities {
546 pairs.push(build_sender_pair(*capacity));
547 }
548 pairs
549 }
550
551 fn fanout_from_senders(
552 capacities: &[usize],
553 ) -> (
554 Fanout,
555 UnboundedSender<ControlMessage>,
556 Vec<BufferReceiver<EventArray>>,
557 ) {
558 let (mut fanout, control) = Fanout::new(ComponentKey::from("test_upstream"));
559 let pairs = build_sender_pairs(capacities);
560
561 let mut receivers = Vec::new();
562 for (i, (sender, receiver)) in pairs.into_iter().enumerate() {
563 fanout.add(ComponentKey::from(i.to_string()), sender);
564 receivers.push(receiver);
565 }
566
567 (fanout, control, receivers)
568 }
569
570 fn add_sender_to_fanout(
571 fanout: &mut Fanout,
572 receivers: &mut Vec<BufferReceiver<EventArray>>,
573 sender_id: usize,
574 capacity: usize,
575 ) {
576 let (sender, receiver) = build_sender_pair(capacity);
577 receivers.push(receiver);
578
579 fanout.add(ComponentKey::from(sender_id.to_string()), sender);
580 }
581
582 fn remove_sender_from_fanout(control: &UnboundedSender<ControlMessage>, sender_id: usize) {
583 control
584 .send(ControlMessage::Remove(ComponentKey::from(
585 sender_id.to_string(),
586 )))
587 .expect("sending control message should not fail");
588 }
589
590 fn replace_sender_in_fanout(
591 control: &UnboundedSender<ControlMessage>,
592 receivers: &mut [BufferReceiver<EventArray>],
593 sender_id: usize,
594 capacity: usize,
595 ) -> BufferReceiver<EventArray> {
596 let (sender, receiver) = build_sender_pair(capacity);
597 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
598
599 control
600 .send(ControlMessage::Pause(ComponentKey::from(
601 sender_id.to_string(),
602 )))
603 .expect("sending control message should not fail");
604
605 control
606 .send(ControlMessage::Replace(
607 ComponentKey::from(sender_id.to_string()),
608 sender,
609 ))
610 .expect("sending control message should not fail");
611
612 old_receiver
613 }
614
615 fn start_sender_replace(
616 control: &UnboundedSender<ControlMessage>,
617 receivers: &mut [BufferReceiver<EventArray>],
618 sender_id: usize,
619 capacity: usize,
620 ) -> (BufferReceiver<EventArray>, BufferSender<EventArray>) {
621 let (sender, receiver) = build_sender_pair(capacity);
622 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
623
624 control
625 .send(ControlMessage::Pause(ComponentKey::from(
626 sender_id.to_string(),
627 )))
628 .expect("sending control message should not fail");
629
630 (old_receiver, sender)
631 }
632
633 fn finish_sender_resume(
634 control: &UnboundedSender<ControlMessage>,
635 sender_id: usize,
636 sender: BufferSender<EventArray>,
637 ) {
638 control
639 .send(ControlMessage::Replace(
640 ComponentKey::from(sender_id.to_string()),
641 sender,
642 ))
643 .expect("sending control message should not fail");
644 }
645
646 fn unwrap_log_event_message<E>(event: E) -> String
647 where
648 E: EventContainer,
649 {
650 let event = event
651 .into_events()
652 .next()
653 .expect("must have at least one event");
654 let event = event.into_log();
655 event
656 .get("message")
657 .and_then(Value::as_bytes)
658 .and_then(|b| String::from_utf8(b.to_vec()).ok())
659 .expect("must be valid log event with `message` field")
660 }
661
662 #[tokio::test]
663 async fn fanout_writes_to_all() {
664 let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]);
665 let events = make_event_array(2);
666
667 let clones = events.clone();
668 fanout.send(clones, None).await.expect("should not fail");
669
670 for receiver in receivers {
671 assert_eq!(
672 collect_ready(receiver.into_stream()),
673 std::slice::from_ref(&events)
674 );
675 }
676 }
677
678 #[tokio::test]
679 async fn fanout_notready() {
680 let (mut fanout, _, mut receivers) = fanout_from_senders(&[2, 1, 2]);
681 let events = make_events(2);
682
683 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
685 assert_ready!(first_send.poll()).expect("should not fail");
686 drop(first_send);
687
688 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
690 assert_pending!(second_send.poll());
691
692 for receiver in &mut receivers {
694 assert_eq!(Some(events[0].clone().into()), receiver.next().await);
695 }
696
697 assert_ready!(second_send.poll()).expect("should not fail");
699 drop(second_send);
700
701 for receiver in &mut receivers {
703 assert_eq!(Some(events[1].clone().into()), receiver.next().await);
704 }
705 }
706
707 #[tokio::test]
708 async fn fanout_grow() {
709 let (mut fanout, _, mut receivers) = fanout_from_senders(&[4, 4]);
710 let events = make_events(3);
711
712 fanout
714 .send(events[0].clone().into(), None)
715 .await
716 .expect("should not fail");
717 fanout
718 .send(events[1].clone().into(), None)
719 .await
720 .expect("should not fail");
721
722 add_sender_to_fanout(&mut fanout, &mut receivers, 2, 4);
724
725 fanout
727 .send(events[2].clone().into(), None)
728 .await
729 .expect("should not fail");
730
731 let expected_events = [&events, &events, &events[2..]];
734 for (i, receiver) in receivers.into_iter().enumerate() {
735 assert_eq!(
736 collect_ready_events(receiver.into_stream()),
737 expected_events[i]
738 );
739 }
740 }
741
742 #[tokio::test]
743 async fn fanout_shrink() {
744 let (mut fanout, control, receivers) = fanout_from_senders(&[4, 4]);
745 let events = make_events(3);
746
747 fanout
749 .send(events[0].clone().into(), None)
750 .await
751 .expect("should not fail");
752 fanout
753 .send(events[1].clone().into(), None)
754 .await
755 .expect("should not fail");
756
757 remove_sender_from_fanout(&control, 1);
759
760 fanout
762 .send(events[2].clone().into(), None)
763 .await
764 .expect("should not fail");
765
766 let expected_events = [&events, &events[..2]];
768 for (i, receiver) in receivers.into_iter().enumerate() {
769 assert_eq!(
770 collect_ready_events(receiver.into_stream()),
771 expected_events[i]
772 );
773 }
774 }
775
776 #[tokio::test]
777 async fn fanout_shrink_when_notready() {
778 let events = make_events(2);
784 let expected_first_event = unwrap_log_event_message(events[0].clone());
785 let expected_second_event = unwrap_log_event_message(events[1].clone());
786
787 let cases = [
788 (
791 0,
792 false,
793 [
794 expected_second_event.clone(),
795 expected_first_event.clone(),
796 expected_second_event.clone(),
797 ],
798 ),
799 (
800 1,
801 true,
802 [
803 expected_second_event.clone(),
804 expected_second_event.clone(),
805 expected_second_event.clone(),
806 ],
807 ),
808 (
809 2,
810 false,
811 [
812 expected_second_event.clone(),
813 expected_first_event.clone(),
814 expected_second_event.clone(),
815 ],
816 ),
817 ];
818
819 for (sender_id, should_complete, expected_last_seen) in cases {
820 let (mut fanout, control, mut receivers) = fanout_from_senders(&[2, 1, 2]);
821
822 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
824 assert_ready!(first_send.poll()).expect("should not fail");
825 drop(first_send);
826
827 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
829 assert_pending!(second_send.poll());
830
831 remove_sender_from_fanout(&control, sender_id);
833
834 if should_complete {
835 assert_ready!(second_send.poll()).expect("should not fail");
836 } else {
837 assert_pending!(second_send.poll());
838 }
839 drop(second_send);
840
841 drop(fanout);
843
844 let mut last_seen = Vec::new();
845 for receiver in &mut receivers {
846 let mut events = Vec::new();
847 while let Some(event) = receiver.next().await {
848 events.insert(0, event);
849 }
850
851 last_seen.push(unwrap_log_event_message(events.remove(0)));
852 }
853
854 assert_eq!(&expected_last_seen[..], &last_seen);
855 }
856 }
857
858 #[tokio::test]
859 async fn fanout_no_sinks() {
860 let (mut fanout, _) = Fanout::new(ComponentKey::from("test_upstream"));
861 let events = make_events(2);
862
863 fanout
864 .send(events[0].clone().into(), None)
865 .await
866 .expect("should not fail");
867 fanout
868 .send(events[1].clone().into(), None)
869 .await
870 .expect("should not fail");
871 }
872
873 #[cfg(debug_assertions)]
874 #[tokio::test]
875 #[should_panic(expected = "Fanout received empty event batch from upstream component")]
876 async fn fanout_panics_on_empty_event_array_in_debug_builds() {
877 let (mut fanout, _, _receivers) = fanout_from_senders(&[2, 2]);
878 let empty: EventArray = Vec::<LogEvent>::new().into();
879
880 _ = fanout.send(empty, None).await;
881 }
882
883 #[tokio::test]
884 async fn fanout_replace() {
885 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]);
886 let events = make_events(3);
887
888 fanout
890 .send(events[0].clone().into(), None)
891 .await
892 .expect("should not fail");
893 fanout
894 .send(events[1].clone().into(), None)
895 .await
896 .expect("should not fail");
897
898 let old_first_receiver = replace_sender_in_fanout(&control, &mut receivers, 0, 4);
900
901 fanout
903 .send(events[2].clone().into(), None)
904 .await
905 .expect("should not fail");
906
907 let expected_events = [&events[2..], &events, &events];
910 for (i, receiver) in receivers.into_iter().enumerate() {
911 assert_eq!(
912 collect_ready_events(receiver.into_stream()),
913 expected_events[i]
914 );
915 }
916
917 assert_eq!(
919 collect_ready_events(old_first_receiver.into_stream()),
920 &events[..2]
921 );
922 }
923
924 #[tokio::test]
925 async fn fanout_wait() {
926 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4]);
927 let events = make_events(3);
928
929 let send1 = Box::pin(fanout.send(events[0].clone().into(), None));
931 assert_ready!(poll!(send1)).expect("should not fail");
932 let send2 = Box::pin(fanout.send(events[1].clone().into(), None));
933 assert_ready!(poll!(send2)).expect("should not fail");
934
935 let (old_first_receiver, new_first_sender) =
939 start_sender_replace(&control, &mut receivers, 0, 4);
940
941 let mut third_send = spawn(fanout.send(events[2].clone().into(), None));
943 assert_pending!(third_send.poll());
944
945 finish_sender_resume(&control, 0, new_first_sender);
948 assert!(third_send.is_woken());
949 assert_ready!(third_send.poll()).expect("should not fail");
950
951 assert_eq!(
954 collect_ready_events(old_first_receiver.into_stream()),
955 &events[0..2]
956 );
957
958 let expected_events = [&events[2..], &events];
959 for (i, receiver) in receivers.into_iter().enumerate() {
960 assert_eq!(
961 collect_ready_events(receiver.into_stream()),
962 expected_events[i]
963 );
964 }
965 }
966
967 fn make_events_inner(count: usize) -> impl Iterator<Item = LogEvent> {
968 (0..count).map(|i| LogEvent::from(format!("line {i}")))
969 }
970
971 fn make_events(count: usize) -> Vec<Event> {
972 make_events_inner(count).map(Into::into).collect()
973 }
974
975 fn make_event_array(count: usize) -> EventArray {
976 make_events_inner(count).collect::<Vec<_>>().into()
977 }
978}