1use std::{
2 collections::{HashMap, HashSet},
3 sync::{Arc, Mutex},
4};
5
6use futures::{Future, FutureExt, future};
7use snafu::Snafu;
8use stream_cancel::Trigger;
9use tokio::{
10 sync::{mpsc, watch},
11 time::{Duration, Instant, interval, sleep_until},
12};
13use tracing::Instrument;
14use vector_lib::{
15 buffers::topology::channel::BufferSender,
16 shutdown::ShutdownSignal,
17 tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
18 trigger::DisabledTrigger,
19};
20
21use super::{
22 BuiltBuffer, TaskHandle,
23 builder::{self, TopologyPieces, TopologyPiecesBuilder, reload_enrichment_tables},
24 fanout::{ControlChannel, ControlMessage},
25 handle_errors, retain, take_healthchecks,
26 task::{Task, TaskOutput},
27};
28use crate::{
29 config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
30 event::EventArray,
31 extra_context::ExtraContext,
32 shutdown::SourceShutdownCoordinator,
33 signal::ShutdownError,
34 spawn_named,
35 utilization::UtilizationRegistry,
36};
37
38pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
39
40#[derive(Debug, Snafu)]
41pub enum ReloadError {
42 #[snafu(display("global options changed: {}", changed_fields.join(", ")))]
43 GlobalOptionsChanged { changed_fields: Vec<String> },
44 #[snafu(display("failed to compute global diff: {}", source))]
45 GlobalDiffFailed { source: serde_json::Error },
46 #[snafu(display("topology build failed"))]
47 TopologyBuildFailed,
48 #[snafu(display("failed to restore previous config"))]
49 FailedToRestore,
50}
51
52#[allow(dead_code)]
53pub struct RunningTopology {
54 inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
55 inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
56 outputs: HashMap<OutputId, ControlChannel>,
57 outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
58 component_type_names: HashMap<ComponentKey, String>,
59 source_tasks: HashMap<ComponentKey, TaskHandle>,
60 tasks: HashMap<ComponentKey, TaskHandle>,
61 shutdown_coordinator: SourceShutdownCoordinator,
62 detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
63 pub(crate) config: Config,
64 pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
65 watch: (WatchTx, WatchRx),
66 graceful_shutdown_duration: Option<Duration>,
67 utilization_registry: Option<UtilizationRegistry>,
68 utilization_task: Option<TaskHandle>,
69 utilization_task_shutdown_trigger: Option<Trigger>,
70 metrics_task: Option<TaskHandle>,
71 metrics_task_shutdown_trigger: Option<Trigger>,
72 pending_reload: Option<HashSet<ComponentKey>>,
73}
74
75impl RunningTopology {
76 pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
77 Self {
78 inputs: HashMap::new(),
79 inputs_tap_metadata: HashMap::new(),
80 outputs: HashMap::new(),
81 outputs_tap_metadata: HashMap::new(),
82 component_type_names: HashMap::new(),
83 shutdown_coordinator: SourceShutdownCoordinator::default(),
84 detach_triggers: HashMap::new(),
85 source_tasks: HashMap::new(),
86 tasks: HashMap::new(),
87 abort_tx,
88 watch: watch::channel(TapResource::default()),
89 graceful_shutdown_duration: config.graceful_shutdown_duration,
90 config,
91 utilization_registry: None,
92 utilization_task: None,
93 utilization_task_shutdown_trigger: None,
94 metrics_task: None,
95 metrics_task_shutdown_trigger: None,
96 pending_reload: None,
97 }
98 }
99
100 pub const fn config(&self) -> &Config {
102 &self.config
103 }
104
105 pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
108 match &mut self.pending_reload {
109 None => self.pending_reload = Some(new_set.clone()),
110 Some(existing) => existing.extend(new_set),
111 }
112 }
113
114 pub fn watch(&self) -> watch::Receiver<TapResource> {
118 self.watch.1.clone()
119 }
120
121 pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
129 self.shutdown_coordinator.shutdown_tripwire()
130 }
131
132 pub fn stop(self) -> impl Future<Output = ()> {
146 let mut wait_handles = Vec::new();
149 let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
152
153 let map_closure = |_result| ();
154
155 for (key, task) in self.tasks.into_iter().chain(self.source_tasks) {
158 let task = task.map(map_closure).shared();
159
160 wait_handles.push(task.clone());
161 check_handles.entry(key).or_default().push(task);
162 }
163
164 if let Some(utilization_task) = self.utilization_task {
165 wait_handles.push(utilization_task.map(map_closure).shared());
166 }
167
168 if let Some(metrics_task) = self.metrics_task {
169 wait_handles.push(metrics_task.map(map_closure).shared());
170 }
171
172 let deadline = self
174 .graceful_shutdown_duration
175 .map(|grace_period| Instant::now() + grace_period);
176
177 let timeout = if let Some(deadline) = deadline {
178 let mut check_handles2 = check_handles.clone();
182 Box::pin(async move {
183 sleep_until(deadline).await;
184 check_handles2.retain(|_key, handles| {
186 retain(handles, |handle| handle.peek().is_none());
187 !handles.is_empty()
188 });
189 let remaining_components = check_handles2
190 .keys()
191 .map(|item| item.to_string())
192 .collect::<Vec<_>>()
193 .join(", ");
194
195 error!(
196 components = ?remaining_components,
197 message = "Failed to gracefully shut down in time. Killing components.",
198 internal_log_rate_limit = false
199 );
200 }) as future::BoxFuture<'static, ()>
201 } else {
202 Box::pin(future::pending()) as future::BoxFuture<'static, ()>
203 };
204
205 let mut interval = interval(Duration::from_secs(5));
207 let reporter = async move {
208 loop {
209 interval.tick().await;
210
211 check_handles.retain(|_key, handles| {
213 retain(handles, |handle| handle.peek().is_none());
214 !handles.is_empty()
215 });
216 let remaining_components = check_handles
217 .keys()
218 .map(|item| item.to_string())
219 .collect::<Vec<_>>()
220 .join(", ");
221
222 let (deadline_passed, time_remaining) = match deadline {
223 Some(d) => match d.checked_duration_since(Instant::now()) {
224 Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
225 None => (true, "overdue".to_string()),
226 },
227 None => (false, "no time limit".to_string()),
228 };
229
230 info!(
231 remaining_components = ?remaining_components,
232 time_remaining = ?time_remaining,
233 "Shutting down... Waiting on running components."
234 );
235
236 let all_done = check_handles.is_empty();
237
238 if all_done {
239 info!("Shutdown reporter exiting: all components shut down.");
240 break;
241 } else if deadline_passed {
242 error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
243 break;
244 }
245 }
246 };
247
248 let success = futures::future::join_all(wait_handles).map(|_| ());
250
251 let shutdown_complete_future = future::select_all(vec![
253 Box::pin(timeout) as future::BoxFuture<'static, ()>,
254 Box::pin(reporter) as future::BoxFuture<'static, ()>,
255 Box::pin(success) as future::BoxFuture<'static, ()>,
256 ]);
257
258 let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
260 if let Some(trigger) = self.utilization_task_shutdown_trigger {
261 trigger.cancel();
262 }
263 if let Some(trigger) = self.metrics_task_shutdown_trigger {
264 trigger.cancel();
265 }
266
267 futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
268 }
269
270 pub async fn reload_config_and_respawn(
282 &mut self,
283 new_config: Config,
284 extra_context: ExtraContext,
285 ) -> Result<(), ReloadError> {
286 info!("Reloading running topology with new configuration.");
287
288 if self.config.global != new_config.global {
289 return match self.config.global.diff(&new_config.global) {
290 Ok(changed_fields) => Err(ReloadError::GlobalOptionsChanged { changed_fields }),
291 Err(source) => Err(ReloadError::GlobalDiffFailed { source }),
292 };
293 }
294
295 let diff = if let Some(components) = &self.pending_reload {
301 ConfigDiff::new(&self.config, &new_config, components.clone())
302 } else {
303 ConfigDiff::new(&self.config, &new_config, HashSet::new())
304 };
305 let buffers = self.shutdown_diff(&diff, &new_config).await;
306
307 if cfg!(windows) {
311 tokio::time::sleep(Duration::from_millis(200)).await;
313 }
314
315 if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
319 .with_buffers(buffers.clone())
320 .with_extra_context(extra_context.clone())
321 .with_utilization_registry(self.utilization_registry.clone())
322 .build_or_log_errors()
323 .await
324 {
325 if self
329 .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
330 .await
331 {
332 self.connect_diff(&diff, &mut new_pieces).await;
333 self.spawn_diff(&diff, new_pieces);
334 self.config = new_config;
335
336 info!("New configuration loaded successfully.");
337
338 return Ok(());
339 }
340 }
341
342 warn!("Failed to completely load new configuration. Restoring old configuration.");
346
347 let diff = diff.flip();
348 if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
349 .with_buffers(buffers)
350 .with_extra_context(extra_context.clone())
351 .with_utilization_registry(self.utilization_registry.clone())
352 .build_or_log_errors()
353 .await
354 && self
355 .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
356 .await
357 {
358 self.connect_diff(&diff, &mut new_pieces).await;
359 self.spawn_diff(&diff, new_pieces);
360
361 info!("Old configuration restored successfully.");
362
363 return Err(ReloadError::TopologyBuildFailed);
364 }
365
366 error!(
367 message = "Failed to restore old configuration.",
368 internal_log_rate_limit = false
369 );
370
371 Err(ReloadError::FailedToRestore)
372 }
373
374 pub(crate) async fn reload_enrichment_tables(&self) {
376 reload_enrichment_tables(&self.config).await;
377 }
378
379 pub(crate) async fn run_healthchecks(
380 &mut self,
381 diff: &ConfigDiff,
382 pieces: &mut TopologyPieces,
383 options: HealthcheckOptions,
384 ) -> bool {
385 if options.enabled {
386 let healthchecks = take_healthchecks(diff, pieces)
387 .into_iter()
388 .map(|(_, task)| task);
389 let healthchecks = future::try_join_all(healthchecks);
390
391 info!("Running healthchecks.");
392 if options.require_healthy {
393 let success = healthchecks.await;
394
395 if success.is_ok() {
396 info!("All healthchecks passed.");
397 true
398 } else {
399 error!(
400 message = "Sinks unhealthy.",
401 internal_log_rate_limit = false
402 );
403 false
404 }
405 } else {
406 tokio::spawn(healthchecks);
407 true
408 }
409 } else {
410 true
411 }
412 }
413
414 async fn shutdown_diff(
418 &mut self,
419 diff: &ConfigDiff,
420 new_config: &Config,
421 ) -> HashMap<ComponentKey, BuiltBuffer> {
422 if diff.sources.any_changed_or_removed()
425 || diff.enrichment_tables.sources.any_changed_or_removed()
426 {
427 let timeout = Duration::from_secs(30);
428 let mut source_shutdown_handles = Vec::new();
429
430 let deadline = Instant::now() + timeout;
431 for key in diff
432 .sources
433 .to_remove
434 .iter()
435 .chain(diff.enrichment_tables.sources.to_remove.iter())
436 {
437 debug!(component_id = %key, "Removing source.");
438
439 let previous = self.tasks.remove(key).unwrap();
440 drop(previous); self.remove_outputs(key);
443 source_shutdown_handles
444 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
445 }
446
447 for key in diff
448 .sources
449 .to_change
450 .iter()
451 .chain(diff.enrichment_tables.sources.to_change.iter())
452 {
453 debug!(component_id = %key, "Changing source.");
454
455 self.remove_outputs(key);
456 source_shutdown_handles
457 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
458 }
459
460 debug!(
461 "Waiting for up to {} seconds for source(s) to finish shutting down.",
462 timeout.as_secs()
463 );
464 futures::future::join_all(source_shutdown_handles).await;
465
466 for key in diff.sources.removed_and_changed() {
468 if let Some(task) = self.source_tasks.remove(key) {
469 task.await.unwrap().unwrap();
470 }
471 }
472 }
473
474 for key in &diff.transforms.to_remove {
482 debug!(component_id = %key, "Removing transform.");
483
484 let previous = self.tasks.remove(key).unwrap();
485 drop(previous); self.remove_inputs(key, diff, new_config).await;
488 self.remove_outputs(key);
489
490 if let Some(registry) = self.utilization_registry.as_ref() {
491 registry.remove_component(key);
492 }
493 }
494
495 for key in &diff.transforms.to_change {
496 debug!(component_id = %key, "Changing transform.");
497
498 self.remove_inputs(key, diff, new_config).await;
499 self.remove_outputs(key);
500 }
501
502 let removed_table_sinks = diff
508 .enrichment_tables
509 .sinks
510 .removed_and_changed()
511 .map(|key| {
512 (
513 key.clone(),
514 enrichment_table_sink_resources(&self.config, key),
515 )
516 })
517 .collect::<Vec<_>>();
518 let remove_sink = diff
519 .sinks
520 .removed_and_changed()
521 .map(|key| {
522 (
523 key,
524 self.config
525 .sink(key)
526 .map(|s| s.resources(key))
527 .unwrap_or_default(),
528 )
529 })
530 .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
531 let add_source = diff
532 .sources
533 .changed_and_added()
534 .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
535 let added_table_sinks = diff
536 .enrichment_tables
537 .sinks
538 .changed_and_added()
539 .map(|key| {
540 (
541 key.clone(),
542 enrichment_table_sink_resources(new_config, key),
543 )
544 })
545 .collect::<Vec<_>>();
546 let add_sink = diff
547 .sinks
548 .changed_and_added()
549 .map(|key| {
550 (
551 key,
552 new_config
553 .sink(key)
554 .map(|s| s.resources(key))
555 .unwrap_or_default(),
556 )
557 })
558 .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
559 let conflicts = Resource::conflicts(
560 remove_sink.map(|(key, value)| ((true, key), value)).chain(
561 add_sink
562 .chain(add_source)
563 .map(|(key, value)| ((false, key), value)),
564 ),
565 )
566 .into_values()
567 .flatten()
568 .collect::<HashSet<_>>();
569 let conflicting_sinks = conflicts
571 .into_iter()
572 .filter(|&(existing_sink, _)| existing_sink)
573 .map(|(_, key)| key.clone());
574
575 let reuse_buffers = diff
577 .sinks
578 .to_change
579 .iter()
580 .chain(diff.enrichment_tables.sinks.to_change.iter())
581 .filter(|&key| {
582 if diff.components_to_reload.contains(key) {
583 return false;
584 }
585 self.config
586 .sink(key)
587 .map(|s| s.buffer.clone())
588 .or_else(|| enrichment_table_sink_buffer(&self.config, key))
589 == new_config
590 .sink(key)
591 .map(|s| s.buffer.clone())
592 .or_else(|| enrichment_table_sink_buffer(new_config, key))
593 })
594 .cloned()
595 .collect::<HashSet<_>>();
596
597 let changed_disk_buffer_sinks = diff
602 .sinks
603 .to_change
604 .iter()
605 .filter(|key| {
606 !reuse_buffers.contains(*key)
607 && (self
608 .config
609 .sink(key)
610 .is_some_and(|s| s.buffer.has_disk_stage())
611 || enrichment_table_sink_buffer(&self.config, key)
612 .is_some_and(|buffer| buffer.has_disk_stage()))
613 })
614 .cloned()
615 .collect::<HashSet<_>>();
616
617 let wait_for_sinks = conflicting_sinks
618 .chain(reuse_buffers.iter().cloned())
619 .chain(changed_disk_buffer_sinks.iter().cloned())
620 .collect::<HashSet<_>>();
621
622 let removed_sinks = diff
624 .sinks
625 .to_remove
626 .iter()
627 .chain(diff.enrichment_tables.sinks.to_remove.iter())
628 .collect::<Vec<_>>();
629 for key in &removed_sinks {
630 debug!(component_id = %key, "Removing sink.");
631 self.remove_inputs(key, diff, new_config).await;
632
633 if let Some(registry) = self.utilization_registry.as_ref() {
634 registry.remove_component(key);
635 }
636 }
637
638 let mut buffer_tx = HashMap::new();
641
642 let sinks_to_change = diff
643 .sinks
644 .to_change
645 .iter()
646 .chain(diff.enrichment_tables.sinks.to_change.iter())
647 .collect::<Vec<_>>();
648
649 for key in &sinks_to_change {
650 debug!(component_id = %key, "Changing sink.");
651 if reuse_buffers.contains(key) || changed_disk_buffer_sinks.contains(key) {
652 self.detach_triggers
653 .remove(key)
654 .unwrap()
655 .into_inner()
656 .cancel();
657
658 if reuse_buffers.contains(key) {
659 buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
670 }
671 }
672 self.remove_inputs(key, diff, new_config).await;
673 }
674
675 for key in &removed_sinks {
682 let previous = self.tasks.remove(key).unwrap();
683 if wait_for_sinks.contains(key) {
684 debug!(message = "Waiting for sink to shutdown.", component_id = %key);
685 previous.await.unwrap().unwrap();
686 } else {
687 drop(previous); }
689 }
690
691 let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
692 for key in &sinks_to_change {
693 if wait_for_sinks.contains(key) {
694 let previous = self.tasks.remove(key).unwrap();
695 debug!(message = "Waiting for sink to shutdown.", component_id = %key);
696 let buffer = previous.await.unwrap().unwrap();
697
698 if reuse_buffers.contains(key) {
699 let tx = buffer_tx.remove(key).unwrap();
707 let rx = match buffer {
708 TaskOutput::Sink(rx) => rx.into_inner(),
709 _ => unreachable!(),
710 };
711
712 buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
713 }
714 }
715 }
716
717 buffers
718 }
719
720 pub(crate) async fn connect_diff(
722 &mut self,
723 diff: &ConfigDiff,
724 new_pieces: &mut TopologyPieces,
725 ) {
726 debug!("Connecting changed/added component(s).");
727
728 if !self.watch.0.is_closed() {
730 for key in &diff.sources.to_remove {
731 self.outputs_tap_metadata.remove(key);
733 self.component_type_names.remove(key);
734 }
735
736 for key in &diff.transforms.to_remove {
737 self.outputs_tap_metadata.remove(key);
739 self.inputs_tap_metadata.remove(key);
740 self.component_type_names.remove(key);
741 }
742
743 for key in &diff.sinks.to_remove {
744 self.inputs_tap_metadata.remove(key);
746 self.component_type_names.remove(key);
747 }
748
749 for key in &diff.enrichment_tables.sinks.to_remove {
750 self.inputs_tap_metadata.remove(key);
752 self.component_type_names.remove(key);
753 }
754
755 for key in &diff.enrichment_tables.sources.to_remove {
756 self.outputs_tap_metadata.remove(key);
758 self.component_type_names.remove(key);
759 }
760
761 for key in diff.sources.changed_and_added() {
762 if let Some(task) = new_pieces.tasks.get(key) {
763 let typetag = task.typetag().to_string();
764 self.outputs_tap_metadata
765 .insert(key.clone(), ("source", typetag.clone()));
766 self.component_type_names.insert(key.clone(), typetag);
767 }
768 }
769
770 for key in diff.enrichment_tables.sources.changed_and_added() {
771 if let Some(task) = new_pieces.tasks.get(key) {
772 self.outputs_tap_metadata
773 .insert(key.clone(), ("source", task.typetag().to_string()));
774 self.component_type_names
775 .insert(key.clone(), task.typetag().to_string());
776 }
777 }
778
779 for key in diff.transforms.changed_and_added() {
780 if let Some(task) = new_pieces.tasks.get(key) {
781 let typetag = task.typetag().to_string();
782 self.outputs_tap_metadata
783 .insert(key.clone(), ("transform", typetag.clone()));
784 self.component_type_names.insert(key.clone(), typetag);
785 }
786 }
787
788 for key in diff.sinks.changed_and_added() {
789 if let Some(task) = new_pieces.tasks.get(key) {
790 self.component_type_names
791 .insert(key.clone(), task.typetag().to_string());
792 }
793 }
794
795 for key in diff.enrichment_tables.sinks.changed_and_added() {
796 if let Some(task) = new_pieces.tasks.get(key) {
797 self.component_type_names
798 .insert(key.clone(), task.typetag().to_string());
799 }
800 }
801
802 for (key, input) in &new_pieces.inputs {
803 self.inputs_tap_metadata
804 .insert(key.clone(), input.1.clone());
805 }
806 }
807
808 for key in diff.sources.changed_and_added() {
811 debug!(component_id = %key, "Configuring outputs for source.");
812 self.setup_outputs(key, new_pieces).await;
813 }
814
815 let added_changed_table_sources: Vec<ComponentKey> = diff
816 .enrichment_tables
817 .sources
818 .changed_and_added()
819 .cloned()
820 .collect();
821 for key in &added_changed_table_sources {
822 debug!(component_id = %key, "Connecting outputs for enrichment table source.");
823 self.setup_outputs(key, new_pieces).await;
824 }
825
826 for key in diff.transforms.changed_and_added() {
829 debug!(component_id = %key, "Configuring outputs for transform.");
830 self.setup_outputs(key, new_pieces).await;
831 }
832
833 for key in diff.transforms.changed_and_added() {
836 debug!(component_id = %key, "Connecting inputs for transform.");
837 self.setup_inputs(key, diff, new_pieces).await;
838 }
839
840 for key in diff.sinks.changed_and_added() {
842 debug!(component_id = %key, "Connecting inputs for sink.");
843 self.setup_inputs(key, diff, new_pieces).await;
844 }
845 let added_changed_tables: Vec<ComponentKey> = diff
846 .enrichment_tables
847 .sinks
848 .changed_and_added()
849 .cloned()
850 .collect();
851 for key in &added_changed_tables {
852 debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
853 self.setup_inputs(key, diff, new_pieces).await;
854 }
855
856 self.reattach_severed_inputs(diff);
867
868 if !self.watch.0.is_closed() {
870 let outputs = self
871 .outputs
872 .clone()
873 .into_iter()
874 .flat_map(|(output_id, control_tx)| {
875 self.outputs_tap_metadata.get(&output_id.component).map(
876 |(component_kind, component_type)| {
877 (
878 TapOutput {
879 output_id,
880 component_kind,
881 component_type: component_type.clone(),
882 },
883 control_tx,
884 )
885 },
886 )
887 })
888 .collect::<HashMap<_, _>>();
889
890 let mut removals = diff.sources.to_remove.clone();
891 removals.extend(diff.transforms.to_remove.iter().cloned());
892 self.watch
893 .0
894 .send(TapResource {
895 outputs,
896 inputs: self.inputs_tap_metadata.clone(),
897 source_keys: diff
898 .sources
899 .changed_and_added()
900 .map(|key| key.to_string())
901 .chain(
902 added_changed_table_sources
903 .iter()
904 .map(|key| key.to_string()),
905 )
906 .collect(),
907 sink_keys: diff
908 .sinks
909 .changed_and_added()
910 .map(|key| key.to_string())
911 .chain(added_changed_tables.iter().map(|key| key.to_string()))
912 .collect(),
913 removals,
916 type_names: self
917 .component_type_names
918 .iter()
919 .map(|(k, v)| (k.to_string(), v.clone()))
920 .collect(),
921 })
922 .expect("Couldn't broadcast config changes.");
923 }
924 }
925
926 async fn setup_outputs(
927 &mut self,
928 key: &ComponentKey,
929 new_pieces: &mut builder::TopologyPieces,
930 ) {
931 let outputs = new_pieces.outputs.remove(key).unwrap();
932 for (port, output) in outputs {
933 debug!(component_id = %key, output_id = ?port, "Configuring output for component.");
934
935 let id = OutputId {
936 component: key.clone(),
937 port,
938 };
939
940 self.outputs.insert(id, output);
941 }
942 }
943
944 async fn setup_inputs(
945 &mut self,
946 key: &ComponentKey,
947 diff: &ConfigDiff,
948 new_pieces: &mut builder::TopologyPieces,
949 ) {
950 let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
951
952 let old_inputs = self
953 .config
954 .inputs_for_node(key)
955 .into_iter()
956 .flatten()
957 .cloned()
958 .collect::<HashSet<_>>();
959
960 let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
961 let inputs_to_add = &new_inputs - &old_inputs;
962
963 for input in inputs {
964 let output = self.outputs.get_mut(&input).expect("unknown output");
965
966 if diff.contains(&input.component) || inputs_to_add.contains(&input) {
967 debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");
971
972 _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
973 } else {
974 debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout.");
979
980 _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
981 }
982 }
983
984 self.inputs.insert(key.clone(), tx);
985 new_pieces
986 .detach_triggers
987 .remove(key)
988 .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
989 }
990
991 fn remove_outputs(&mut self, key: &ComponentKey) {
992 self.outputs.retain(|id, _output| &id.component != key);
993 }
994
995 async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
996 self.inputs.remove(key);
997 self.detach_triggers.remove(key);
998
999 let old_inputs = self.config.inputs_for_node(key).expect("node exists");
1000 let new_inputs = new_config
1001 .inputs_for_node(key)
1002 .unwrap_or_default()
1003 .iter()
1004 .collect::<HashSet<_>>();
1005
1006 for input in old_inputs {
1007 if let Some(output) = self.outputs.get_mut(input) {
1008 if diff.contains(&input.component)
1009 || diff.is_removed(key)
1010 || !new_inputs.contains(input)
1011 {
1012 debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");
1023
1024 _ = output.send(ControlMessage::Remove(key.clone()));
1025 } else {
1026 debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout.");
1030
1031 _ = output.send(ControlMessage::Pause(key.clone()));
1032 }
1033 }
1034 }
1035 }
1036
1037 fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
1038 let unchanged_transforms = self
1039 .config
1040 .transforms()
1041 .filter(|(key, _)| !diff.transforms.contains(key));
1042 for (transform_key, transform) in unchanged_transforms {
1043 let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
1044 for output_id in changed_outputs {
1045 debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1046
1047 let input = self.inputs.get(transform_key).cloned().unwrap();
1048 let output = self.outputs.get_mut(&output_id).unwrap();
1049 _ = output.send(ControlMessage::Add(transform_key.clone(), input));
1050 }
1051 }
1052
1053 let unchanged_table_sinks = self
1054 .config
1055 .enrichment_tables()
1056 .filter_map(|(key, table)| table.as_sink(key))
1057 .filter(|(key, _)| !diff.enrichment_tables.sinks.contains(key))
1058 .collect::<Vec<_>>();
1059 let unchanged_sinks = self
1060 .config
1061 .sinks()
1062 .filter(|(key, _)| !diff.sinks.contains(key));
1063 for (sink_key, sink) in
1064 unchanged_sinks.chain(unchanged_table_sinks.iter().map(|(k, v)| (k, v)))
1065 {
1066 let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
1067 for output_id in changed_outputs {
1068 debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1069
1070 let input = self.inputs.get(sink_key).cloned().unwrap();
1071 let output = self.outputs.get_mut(&output_id).unwrap();
1072 _ = output.send(ControlMessage::Add(sink_key.clone(), input));
1073 }
1074 }
1075 }
1076
1077 pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
1079 for key in &diff.sources.to_change {
1080 debug!(message = "Spawning changed source.", component_id = %key);
1081 self.spawn_source(key, &mut new_pieces);
1082 }
1083
1084 for key in &diff.sources.to_add {
1085 debug!(message = "Spawning new source.", component_id = %key);
1086 self.spawn_source(key, &mut new_pieces);
1087 }
1088
1089 let changed_table_sources: Vec<&ComponentKey> = diff
1090 .enrichment_tables
1091 .sources
1092 .to_change
1093 .iter()
1094 .filter(|k| new_pieces.source_tasks.contains_key(k))
1095 .collect();
1096
1097 let added_table_sources: Vec<&ComponentKey> = diff
1098 .enrichment_tables
1099 .sources
1100 .to_add
1101 .iter()
1102 .filter(|k| new_pieces.source_tasks.contains_key(k))
1103 .collect();
1104
1105 for key in changed_table_sources {
1106 debug!(message = "Spawning changed enrichment table source.", component_id = %key);
1107 self.spawn_source(key, &mut new_pieces);
1108 }
1109
1110 for key in added_table_sources {
1111 debug!(message = "Spawning new enrichment table source.", component_id = %key);
1112 self.spawn_source(key, &mut new_pieces);
1113 }
1114
1115 for key in &diff.transforms.to_change {
1116 debug!(message = "Spawning changed transform.", component_id = %key);
1117 self.spawn_transform(key, &mut new_pieces);
1118 }
1119
1120 for key in &diff.transforms.to_add {
1121 debug!(message = "Spawning new transform.", component_id = %key);
1122 self.spawn_transform(key, &mut new_pieces);
1123 }
1124
1125 for key in &diff.sinks.to_change {
1126 debug!(message = "Spawning changed sink.", component_id = %key);
1127 self.spawn_sink(key, &mut new_pieces);
1128 }
1129
1130 for key in &diff.sinks.to_add {
1131 trace!(message = "Spawning new sink.", component_id = %key);
1132 self.spawn_sink(key, &mut new_pieces);
1133 }
1134
1135 let changed_tables: Vec<&ComponentKey> = diff
1136 .enrichment_tables
1137 .sinks
1138 .to_change
1139 .iter()
1140 .filter(|k| new_pieces.tasks.contains_key(k))
1141 .collect();
1142
1143 let added_tables: Vec<&ComponentKey> = diff
1144 .enrichment_tables
1145 .sinks
1146 .to_add
1147 .iter()
1148 .filter(|k| new_pieces.tasks.contains_key(k))
1149 .collect();
1150
1151 for key in changed_tables {
1152 debug!(message = "Spawning changed enrichment table sink.", component_id = %key);
1153 self.spawn_sink(key, &mut new_pieces);
1154 }
1155
1156 for key in added_tables {
1157 debug!(message = "Spawning enrichment table new sink.", component_id = %key);
1158 self.spawn_sink(key, &mut new_pieces);
1159 }
1160 }
1161
1162 fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1163 let task = new_pieces.tasks.remove(key).unwrap();
1164 let span = error_span!(
1165 "sink",
1166 component_kind = "sink",
1167 component_id = %task.id(),
1168 component_type = %task.typetag(),
1169 );
1170
1171 let task_span = span.or_current();
1172 #[cfg(unix)]
1173 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1174 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1175 task.id().to_string(),
1176 "sink".to_string(),
1177 task.typetag().to_string(),
1178 );
1179 debug!(
1180 component_kind = "sink",
1181 component_type = task.typetag(),
1182 component_id = task.id(),
1183 group_id = group_id.as_raw().to_string(),
1184 "Registered new allocation group."
1185 );
1186 group_id.attach_to_span(&task_span);
1187 }
1188
1189 let task_name = format!(">> {} ({})", task.typetag(), task.id());
1190 let task = {
1191 let key = key.clone();
1192 handle_errors(task, self.abort_tx.clone(), |error| {
1193 ShutdownError::SinkAborted { key, error }
1194 })
1195 }
1196 .instrument(task_span);
1197 let spawned = spawn_named(task, task_name.as_ref());
1198 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1199 drop(previous); }
1201 }
1202
1203 fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1204 let task = new_pieces.tasks.remove(key).unwrap();
1205 let span = error_span!(
1206 "transform",
1207 component_kind = "transform",
1208 component_id = %task.id(),
1209 component_type = %task.typetag(),
1210 );
1211
1212 let task_span = span.or_current();
1213 #[cfg(unix)]
1214 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1215 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1216 task.id().to_string(),
1217 "transform".to_string(),
1218 task.typetag().to_string(),
1219 );
1220 debug!(
1221 component_kind = "transform",
1222 component_type = task.typetag(),
1223 component_id = task.id(),
1224 group_id = group_id.as_raw().to_string(),
1225 "Registered new allocation group."
1226 );
1227 group_id.attach_to_span(&task_span);
1228 }
1229
1230 let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1231 let task = {
1232 let key = key.clone();
1233 handle_errors(task, self.abort_tx.clone(), |error| {
1234 ShutdownError::TransformAborted { key, error }
1235 })
1236 }
1237 .instrument(task_span);
1238 let spawned = spawn_named(task, task_name.as_ref());
1239 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1240 drop(previous); }
1242 }
1243
1244 fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1245 let task = new_pieces.tasks.remove(key).unwrap();
1246 let span = error_span!(
1247 "source",
1248 component_kind = "source",
1249 component_id = %task.id(),
1250 component_type = %task.typetag(),
1251 );
1252
1253 let task_span = span.or_current();
1254 #[cfg(unix)]
1255 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1256 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1257 task.id().to_string(),
1258 "source".to_string(),
1259 task.typetag().to_string(),
1260 );
1261
1262 debug!(
1263 component_kind = "source",
1264 component_type = task.typetag(),
1265 component_id = task.id(),
1266 group_id = group_id.as_raw().to_string(),
1267 "Registered new allocation group."
1268 );
1269 group_id.attach_to_span(&task_span);
1270 }
1271
1272 let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1273 let task = {
1274 let key = key.clone();
1275 handle_errors(task, self.abort_tx.clone(), |error| {
1276 ShutdownError::SourceAborted { key, error }
1277 })
1278 }
1279 .instrument(task_span.clone());
1280 let spawned = spawn_named(task, task_name.as_ref());
1281 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1282 drop(previous); }
1284
1285 self.shutdown_coordinator
1286 .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1287
1288 let source_task = new_pieces.source_tasks.remove(key).unwrap();
1290 let source_task = {
1291 let key = key.clone();
1292 handle_errors(source_task, self.abort_tx.clone(), |error| {
1293 ShutdownError::SourceAborted { key, error }
1294 })
1295 }
1296 .instrument(task_span);
1297 self.source_tasks
1298 .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1299 }
1300
1301 pub async fn start_init_validated(
1302 config: Config,
1303 extra_context: ExtraContext,
1304 ) -> Option<(Self, ShutdownErrorReceiver)> {
1305 let diff = ConfigDiff::initial(&config);
1306 let pieces = TopologyPiecesBuilder::new(&config, &diff)
1307 .with_extra_context(extra_context)
1308 .build_or_log_errors()
1309 .await?;
1310 Self::start_validated(config, diff, pieces).await
1311 }
1312
1313 pub async fn start_validated(
1314 config: Config,
1315 diff: ConfigDiff,
1316 mut pieces: TopologyPieces,
1317 ) -> Option<(Self, ShutdownErrorReceiver)> {
1318 let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1319
1320 let expire_metrics = match (
1321 config.global.expire_metrics,
1322 config.global.expire_metrics_secs,
1323 ) {
1324 (Some(e), None) => {
1325 warn!(
1326 "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1327 );
1328 if e < Duration::from_secs(0) {
1329 None
1330 } else {
1331 Some(e.as_secs_f64())
1332 }
1333 }
1334 (Some(_), Some(_)) => {
1335 error!(
1336 message = "Cannot set both `expire_metrics` and `expire_metrics_secs`.",
1337 internal_log_rate_limit = false
1338 );
1339 return None;
1340 }
1341 (None, Some(e)) => {
1342 if e < 0f64 {
1343 None
1344 } else {
1345 Some(e)
1346 }
1347 }
1348 (None, None) => Some(300f64),
1349 };
1350
1351 if let Err(error) = crate::metrics::Controller::get()
1352 .expect("Metrics must be initialized")
1353 .set_expiry(
1354 expire_metrics,
1355 config
1356 .global
1357 .expire_metrics_per_metric_set
1358 .clone()
1359 .unwrap_or_default(),
1360 )
1361 {
1362 error!(message = "Invalid metrics expiry.", %error, internal_log_rate_limit = false);
1363 return None;
1364 }
1365
1366 let (utilization_emitter, utilization_registry) = pieces
1367 .utilization
1368 .take()
1369 .expect("Topology is missing the utilization metric emitter!");
1370 let metrics_storage = pieces.metrics_storage.clone();
1371 let metrics_refresh_period = config
1372 .global
1373 .metrics_storage_refresh_period
1374 .map(Duration::from_secs_f64);
1375 let mut running_topology = Self::new(config, abort_tx);
1376
1377 if !running_topology
1378 .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1379 .await
1380 {
1381 return None;
1382 }
1383 running_topology.connect_diff(&diff, &mut pieces).await;
1384 running_topology.spawn_diff(&diff, pieces);
1385
1386 let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1387 ShutdownSignal::new_wired();
1388 running_topology.utilization_registry = Some(utilization_registry.clone());
1389 running_topology.utilization_task_shutdown_trigger =
1390 Some(utilization_task_shutdown_trigger);
1391 running_topology.utilization_task = Some(tokio::spawn(Task::new(
1392 "utilization_heartbeat".into(),
1393 "",
1394 async move {
1395 utilization_emitter
1396 .run_utilization(utilization_shutdown_signal)
1397 .await;
1398 Ok(TaskOutput::Healthcheck)
1399 },
1400 )));
1401 if let Some(metrics_refresh_period) = metrics_refresh_period {
1402 let (metrics_task_shutdown_trigger, metrics_shutdown_signal, _) =
1403 ShutdownSignal::new_wired();
1404 running_topology.metrics_task_shutdown_trigger = Some(metrics_task_shutdown_trigger);
1405 running_topology.metrics_task = Some(tokio::spawn(Task::new(
1406 "metrics_heartbeat".into(),
1407 "",
1408 async move {
1409 metrics_storage
1410 .run_periodic_refresh(metrics_refresh_period, metrics_shutdown_signal)
1411 .await;
1412 Ok(TaskOutput::Healthcheck)
1413 },
1414 )));
1415 }
1416
1417 Some((running_topology, abort_rx))
1418 }
1419}
1420
1421fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1422 let mut changed_outputs = Vec::new();
1423
1424 for source_key in diff
1425 .sources
1426 .to_change
1427 .iter()
1428 .chain(diff.enrichment_tables.sources.to_change.iter())
1429 {
1430 changed_outputs.extend(
1431 output_ids
1432 .iter()
1433 .filter(|id| &id.component == source_key)
1434 .cloned(),
1435 );
1436 }
1437
1438 for transform_key in &diff.transforms.to_change {
1439 changed_outputs.extend(
1440 output_ids
1441 .iter()
1442 .filter(|id| &id.component == transform_key)
1443 .cloned(),
1444 );
1445 }
1446
1447 changed_outputs
1448}
1449
1450fn enrichment_table_sink_resources(config: &Config, sink_key: &ComponentKey) -> Vec<Resource> {
1451 config
1452 .enrichment_tables()
1453 .filter_map(|(table_key, table)| table.as_sink(table_key))
1454 .find(|(key, _)| key == sink_key)
1455 .map(|(key, sink)| sink.resources(&key))
1456 .unwrap_or_default()
1457}
1458
1459fn enrichment_table_sink_buffer(
1460 config: &Config,
1461 sink_key: &ComponentKey,
1462) -> Option<vector_lib::buffers::BufferConfig> {
1463 config
1464 .enrichment_tables()
1465 .filter_map(|(table_key, table)| table.as_sink(table_key))
1466 .find(|(key, _)| key == sink_key)
1467 .map(|(_, sink)| sink.buffer)
1468}